【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

  • 1)准备环境
  • 2)代码实现
    • 2.1.主程序
    • 2.2.conf
      • 2.2.1.ConfigTools
    • 2.3.utils
      • 2.3.1.DBConn
      • 2.3.2.CommonUtils
    • 2.4.function
      • 2.4.1.MqSinkFunction
    • 2.5.resources
      • 2.5.1.appconfig.yml
      • 2.5.2.log4j.properties
      • 2.5.3.log4j2.xml

1)准备环境

这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-kafka2mq</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <hbase.version>2.3.3</hbase.version>
        <hadoop.version>3.1.1</hadoop.version>
        <spark.version>3.0.2</spark.version>
        <scala.version>2.12.10</scala.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.14.6</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <target.java.version>1.8</target.java.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.17.2</log4j.version>
        <hadoop.version>3.1.2</hadoop.version>
        <hive.version>3.1.2</hive.version>
    </properties>
    
    <dependencies>
        <!-- RocketMQ -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.68.Final</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.jyaml</groupId>
            <artifactId>jyaml</artifactId>
            <version>1.3</version>
        </dependency>
        <dependency>
            <groupId>gaei.cn.x5l</groupId>
            <artifactId>tsp-gb-decode</artifactId>
            <version>1.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-slf4j-impl</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.44</version>
            <scope>runtime</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-3</artifactId>
            <version>3.1.1.7.2.8.0-224-9.0</version>
<!--            <scope>provided</scope>-->
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-log4j12</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-slf4j-impl</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 基础依赖  开始-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- sql连接 kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- sql  结束-->
        <!-- 检查点 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-state-processor-api_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.5</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- 本地监控任务 结束 -->
        <!-- DataStream 开始 -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <!-- hdfs -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.1</version>
        </dependency>
        <!-- 重点,容易被忽略的jar -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-auth</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- rocksdb_2 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- 其他 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.1.23</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                            <!--                            <relocations>-->
                            <!--                                <relocation>-->
                            <!--                                    <pattern>org.apache.kafka</pattern>-->
                            <!--                                    <shadedPattern>org.apache.flink.kafka.shaded.org.apache.kafka</shadedPattern>>-->
                            <!--                                </relocation>-->
                            <!--                            </relocations>-->
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>org.apache.logging.log4j:*</exclude>
                                    <exclude>org.apache.flink:flink-runtime-web_2.11</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.owp.flink.kafka.KafkaSourceDemo</mainClass>
                                </transformer>
                                <!-- flink sql 需要  -->
                                <!-- The service transformer is needed to merge META-INF/services files -->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                <!-- ... -->
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <finalName>RocketMQProducerDemo</finalName>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <!-- 此处指定main方法入口的class -->
                            <mainClass>kafka2mq.api.mq.RocketMQProducerDemo</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>
                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.0.0,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>

    </build>
    <repositories>
        <repository>
            <id>cdh.releases.repo</id>
            <url>https://repository.cloudera.com/artifactory/libs-release-local/</url>
            <name>Releases Repository</name>
        </repository>
    </repositories>
</project>

2)代码实现

注意:

1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。

2、此程序中 Kafka 涉及到了 Kerberos 认证操作,大家的操作环境中没有的话可以去掉。

2.1.主程序

import cdp.kafka2mq.test.conf.ConfigTools;
import cdp.kafka2mq.test.function.MqSinkFunction;
import cdp.kafka2mq.test.utils.CommonUtils;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

import java.util.*;

public class Test {
    //    public static Logger logger = Logger.getLogger(Test.class);

    public static void main(String[] args) throws Exception {
        ConfigTools.initMySqlConf(args[0], Test.class);

        Map<String, Object> mapConf = ConfigTools.mapConf;
        Map<String, Object> mqProducer = (Map<String, Object>) mapConf.get("mq-producer");
        Map<String, Object> kafkaConsumerConf = (Map<String, Object>) mapConf.get("kafka-consumer");
        String mqTopic = String.valueOf(mqProducer.get("defaultTopic"));
        System.out.println("mq-topic:" + mqTopic);

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().disableOperatorChaining();

		//自定义source
        FlinkKafkaConsumer<ConsumerRecord<String, String>> myConsumer = CommonUtils.getKafkaConsumer(kafkaConsumerConf);
        DataStream<ConsumerRecord<String, String>> stream = env.addSource(myConsumer);
        //自定义sink
        stream.addSink(new MqSinkFunction(mqTopic, mapConf)).setParallelism(1);
        
        env.execute();
    }
}

2.2.conf

2.2.1.ConfigTools

读取 Mysql 中的配置

package cdp.kafka2mq.test.conf;

import com.alibaba.fastjson.JSON;
import cdp.kafka2mq.test.utils.DBConn;
import lombok.extern.slf4j.Slf4j;
import org.ho.yaml.Yaml;

import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

@Slf4j
public class ConfigTools {

    public static Map<String, Object> mapConf;

    /**
     * 获取对应的配置文件
     *
     * @param option
     */
    public static void initConf(String option) {
        String confFile = "/flink_backup_" + option + ".yml";
        try {
            InputStream dumpFile = ConfigTools.class.getResourceAsStream(confFile);
            mapConf = Yaml.loadType(dumpFile, HashMap.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取对应的配置文件
     *
     * @param option
     */
    public static void initMySqlConf(String option, Class clazz) {
        String className = clazz.getName();
        String confFile = "/appconfig.yml";
        Map<String, String> mysqlConf;
        try {
            InputStream dumpFile = ConfigTools.class.getResourceAsStream(confFile);
            mysqlConf = Yaml.loadType(dumpFile, HashMap.class);
            String username = mysqlConf.get("mysql.username");
            String password = mysqlConf.get("mysql.password");
            String url = mysqlConf.get("mysql.url");
            Connection conn = DBConn.conn(url, username, password);
            Map<String, Object> config = getConfig(conn, className, option);

            if (config == null || config.size() == 0) {
                log.error("获取配置文件失败");
                return;
            }

            mapConf = config;

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    private static Map<String, Object> getConfig(Connection conn, String className, String option) throws SQLException {
        PreparedStatement preparedStatement = null;
        try {
            String sql = "select config_context from base_app_config where app_name = '%s' and config_name = '%s'";
            preparedStatement = conn.prepareStatement(String.format(sql, className, option));

            ResultSet rs = preparedStatement.executeQuery();

            Map<String, String> map = new LinkedHashMap<>();

            String config_context = "";
            while (rs.next()) {
                config_context = rs.getString("config_context");
            }
            System.out.println("配置信息config_context:"+config_context);
            Map<String, Object> mysqlConfMap = JSON.parseObject(config_context, Map.class);
            return mysqlConfMap;
        }finally {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
            if (conn != null) {
                conn.close();
            }
        }
    }

    public static void main(String[] args) {
        initConf("local");
        String s = JSON.toJSONString(mapConf);
        System.out.println(s);
    }
}

2.3.utils

2.3.1.DBConn

Mysql 连接工具类

package cdp.kafka2mq.test.utils;

import java.sql.*;

public class DBConn {
    private static final String driver = "com.mysql.jdbc.Driver";		//mysql驱动
    private static Connection conn = null;
    private static PreparedStatement ps = null;
    private static ResultSet rs = null;
    private static final CallableStatement cs = null;

    /**
     * 连接数据库
     * @return
     */
    public static Connection conn(String url,String username,String password) {
        Connection conn = null;
        try {
            Class.forName(driver);  //加载数据库驱动
            try {
                conn = DriverManager.getConnection(url, username, password);  //连接数据库
            } catch (SQLException e) {
                e.printStackTrace();
            }
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return conn;
    }

    /**
     * 关闭数据库链接
     * @return
     */
    public static void close() {
        if(conn != null) {
            try {
                conn.close();  //关闭数据库链接
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

2.3.2.CommonUtils

Kafka 消费工具类

package cdp.kafka2mq.test.utils;

import cdp.kafka2mq.test.conf.ConfigTools;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;

@Slf4j
public class CommonUtils {
    public static FlinkKafkaConsumer<ConsumerRecord<String, String>> getKafkaConsumer(Map<String, Object> kafkaConf) throws IOException {
        String[] topics = ((String) kafkaConf.get("topics")).split(",");
        log.info("监听的topic: {}", topics);
        Properties properties = new Properties();
        Map<String, String> kafkaProp = (Map<String, String>) kafkaConf.get("prop");
        for (String key : kafkaProp.keySet()) {
            properties.setProperty(key, kafkaProp.get(key).toString());
        }

        if (!StringUtils.isBlank((String) kafkaProp.get("isKerberized")) && "1".equals(kafkaProp.get("isKerberized"))) {
            System.setProperty("java.security.krb5.conf", kafkaProp.get("krb5Conf"));
            properties.put("security.protocol", kafkaProp.get("security_protocol"));
            properties.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "
                    + "useTicketCache=" + kafkaProp.get("useTicketCache") + " "

                    + "serviceName=\"" + kafkaProp.get("serviceName") + "\" "
                    + "useKeyTab=true "
                    + "keyTab=\"" + kafkaProp.get("keytab").toString() + "\" "
                    + "principal=\"" + kafkaProp.get("principal").toString() + "\";");
        }

        properties.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");

        FlinkKafkaConsumer<ConsumerRecord<String, String>> consumerRecordFlinkKafkaConsumer = new FlinkKafkaConsumer<ConsumerRecord<String, String>>(Arrays.asList(topics), new KafkaDeserializationSchema<ConsumerRecord<String, String>>() {
            @Override
            public TypeInformation<ConsumerRecord<String, String>> getProducedType() {
                return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() {
                });
            }

            @Override
            public boolean isEndOfStream(ConsumerRecord<String, String> stringStringConsumerRecord) {
                return false;
            }

            @Override
            public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
                return new ConsumerRecord<String, String>(
                        record.topic(),
                        record.partition(),
                        record.offset(),
                        record.timestamp(),
                        record.timestampType(),
                        record.checksum(),
                        record.serializedKeySize(),
                        record.serializedValueSize(),
                        new String(record.key() == null ? "".getBytes(StandardCharsets.UTF_8) : record.key(), StandardCharsets.UTF_8),
                        new String(record.value() == null ? "{}".getBytes(StandardCharsets.UTF_8) : record.value(), StandardCharsets.UTF_8));
            }
        }, properties);
        return consumerRecordFlinkKafkaConsumer;
    }
}

2.4.function

2.4.1.MqSinkFunction

自定义 sink-function 实现数据写入 RocketMQ

package cdp.kafka2mq.test.function;

import cdp.kafka2mq.test.conf.ConfigTools;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class MqSinkFunction extends RichSinkFunction<ConsumerRecord<String, String>> {
    private static DefaultMQProducer producer = new DefaultMQProducer();
    private String topic;
    private Map<String, Object> confMap;

    public MqSinkFunction(String topic, Map<String, Object> confMap) {
        this.topic = topic;
        this.confMap = confMap;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
//        super.open(parameters);
        System.out.println("confMap: " + confMap);
        Map<String, Object> mqProducer = (Map<String, Object>) confMap.get("mq-producer");
        Map<String, Object> mqProp = (Map<String, Object>) mqProducer.get("prop");
        String groupId = String.valueOf(mqProp.get("group"));
        String srvAddr = String.valueOf(mqProp.get("server.address"));
        int retries = Integer.parseInt(String.valueOf(mqProp.get("retries")));
        System.out.println("mq生产者组:" + groupId);
        System.out.println("mq地址:" + srvAddr);
        System.out.println("retries:" + retries);
        producer.setProducerGroup(groupId);
        producer.setNamesrvAddr(srvAddr);
        producer.setRetryTimesWhenSendFailed(retries);
        producer.setUseTLS(true);
        producer.start();
    }

    @Override
    public void invoke(ConsumerRecord<String, String> record, Context context) throws Exception {
        String message = record.value();
//        System.out.println(message);
        HashMap<String, Object> infoMap = JSON.parseObject(message, HashMap.class);
//        System.out.println(infoMap);
        String id = String.valueOf(infoMap.get("id"));
        if (StringUtils.isBlank(id)) {
            return;
        }
        Message msg = new Message();
        msg.setTopic(topic);
        msg.setTags(id);
        msg.setBody(message.getBytes(StandardCharsets.UTF_8));
        msg.setTransactionId(id);
//            System.out.println("msg:" + msg);
        System.out.println("send前");
        SendResult send = producer.send(msg);
        System.out.printf("%s%n", send);
        System.out.println("send后");
    }
}

2.5.resources

其他配置文件

2.5.1.appconfig.yml

mysql.url: "jdbc:mysql://ip:3306/dbName?useSSL=false"
mysql.username: "username"
mysql.password: "password"
mysql.driver: "com.mysql.jdbc.Driver"

2.5.2.log4j.properties

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

2.5.3.log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="5">
    <Properties>
        <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
        <property name="LOG_LEVEL" value="ERROR" />
    </Properties>

    <appenders>
        <console name="console" target="SYSTEM_OUT">
            <PatternLayout pattern="${LOG_PATTERN}"/>
            <ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/>
        </console>
        <File name="log" fileName="tmp/log/job.log" append="false">
            <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
        </File>
    </appenders>

    <loggers>
        <root level="${LOG_LEVEL}">
            <appender-ref ref="console"/>
            <appender-ref ref="log"/>
        </root>
    </loggers>

</configuration>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/254509.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

虚幻学习笔记18—C++委托(多播)和事件

一、前言 委托分单播和多播&#xff0c;多播就是可以绑定多个回调函数&#xff0c;然后一次性执行。这样也可以理解为啥多播没有返回值&#xff0c;多个回调函数执行后返回哪一个都是问题啊。而事件呢官方官方文档说法是“对于事件而言&#xff0c;只有定义事件的类才能调用 Br…

专属配方重磅发布,蒙牛悠瑞开创中老年奶粉新征程

随着中国老龄化现象日益加剧&#xff0c;中老年人群营养需求市场不断扩容&#xff0c;蒙牛集团2024全球合作伙伴大会奶粉事业部分会成为了备受行业关注的一个焦点&#xff0c;会上蒙牛旗下高端中老年奶粉品牌悠瑞联合中山大学发布了《中国中老年人健康状况及专属营养解决方案》…

SpringCloud微服务之间如何进行调用通信的?

1.同步通信 RESTful API&#xff1a;RESTful 通信使用 HTTP 协议&#xff0c;以 JSON格式来传输数据&#xff0c;具有轻量级、高效、可扩展性等优势&#xff0c;是许多系统之间接口通信的首选方式。&#xff08;springcloud使用&#xff09; RPC&#xff1a;RPC&#xff08;远…

羊大师之冷天喝羊的好处大揭秘!

最近&#xff0c;冷天喝羊已经成为了一种趋势&#xff0c;受到了越来越多人的关注与喜爱。你可能会好奇&#xff0c;为什么冷天喝羊有那么多的好处呢&#xff1f;今天小编羊大师将带大家一起探索这个问题&#xff0c;揭秘冷天喝羊带来的种种益处。 冷天喝羊对于保持身体温暖是…

HarmonyOS--基础组件Button

Button组件 可以包含单个子组件。 Button(label?: ResourceStr, options?: { type?: ButtonType, stateEffect?: boolean }) 1&#xff1a;文字按钮 Button(‘点击’) 2&#xff1a;自定义按钮,嵌套其它组件 Button() {Image(https://) }.type(ButtonType.Circle)

LeetCode-数组-矩阵问题-中等难度

[toc]矩阵 矩阵是二维数组相关的应用题型&#xff0c;常见的有矩阵水平翻转、矩阵对角线翻转、矩阵遍历等。 1. 重塑矩阵 1.1 题目描述 leetcode跳转&#xff1a;566. 重塑矩阵 1.2 方法一&#xff1a;简单模拟 借助一个一维数组用来保持按行列遍历的结果&#xff0c;然后…

欧盟健身单车出口BS EN ISO 20957安全报告测试

固定的训练器材.第10部分:带固定轮或无自由飞轮的训练自行车.附加特定安全要求和试验方法 作为欧洲固定式健身器材&#xff08;儿童用固定式健身器材不在此范围&#xff09;通用安全要求和测试方法的标准&#xff0c;涉及固定式健身器材精度、使用场所分类定义、稳定性、安全间…

Redis设计与实现之订阅与发布

目录 一、 订阅与发布 1、 频道的订阅与信息发送 2、订阅频道 3、发送信息到频道 4、 退订频道 5、模式的订阅与信息发送 ​编辑 6、 订阅模式 7、 发送信息到模式 8、 退订模式 三、订阅消息断连 1、如果订阅者断开连接了&#xff0c;再次连接会不会丢失之前发布的消…

儿童玩具行业分析:发展态势良好,市场空间不断拓展

玩具是有利于促进幼儿体、德、智、美的全面发展;符合儿童年龄特征&#xff0c;能满足其好奇心、好动和探索活动的愿望;造型优美&#xff0c;反映事物的典型特征;活动多变&#xff0c;有助于鼓励学习。中国玩具产品包括毛绒玩具、塑胶玩具、纸质玩具、电子玩具、木制玩具、金属玩…

抖音网红的各种变现办法

抖音作为一款风靡全球的短视频平台&#xff0c;不仅为用户带来了娱乐和社交的乐趣&#xff0c;也为一些优秀的内容创作者提供了机会&#xff0c;成为了网红。而成为抖音网红不仅仅是一种荣誉&#xff0c;更是一种潜在的经济收入来源。在这篇文章中&#xff0c;我将介绍一些抖音…

气泡水机市场调研: 2023年行业消费需求及发展前景分析

气泡水机用于制作气泡水的机器&#xff0c;隶属于家电产业。在欧美等发达国家早已普遍使用&#xff0c;中国仅台湾等开放发达地方盛行。在中国大陆较为少见&#xff0c;近两年以健康环保产品形象兴起&#xff0c;市场饱和度不高。 中国气泡水机首次出现在中国市场是2012年&a…

小红书kop营销策略有哪些,达人投放总结!

从kol到koc&#xff0c;当今时代产品种草模式&#xff0c;层出不穷。品牌想要跟上市场更新迭代的洪流&#xff0c;就需要时刻了解新型的营销方式。那么对于新型的kop模式你了解多少呢?我们今天就将详细分享小红书kop营销策略有哪些&#xff0c;达人投放总结&#xff01; 一、什…

[Ray Tracing in One Weekend] 笔记

前言 本文参照自raytracing in one weekend教程&#xff0c;地址为&#xff1a;https://raytracing.github.io/books/RayTracingInOneWeekend.html 什么是光线追踪&#xff1f; 光线追踪模拟现实中的成像原理&#xff0c;通过模拟一条条直线在场景内反射折射&#xff0c;最终…

Tinymce 5 插入代码集成highlight.js(踩坑记录)

目录 官方教程 坑点一 坑点二 坑点三 坑点四 TinyMCE HighLight.js Plugin 官方教程 1. 引入 tinymce 库文件 <script src"tinymce.min.js"></script> 2. 引入 highlight js 库文件 <script src"plugins/becodesample/highlight.js-11.…

word怎么分页?学会这几招,轻松掌握分页功能!

Microsoft Word作为办公文档处理的主力工具&#xff0c;其强大的排版功能为用户提供了丰富的文档编辑体验。其中&#xff0c;分页是一个常用但可能被忽视的重要功能&#xff0c;能够使文档结构更清晰、更易读。本文将向您介绍word怎么分页的三种方法&#xff0c;帮助您更好地掌…

鸿蒙(HarmonyOS)项目方舟框架(ArkUI)之Text文本组件

鸿蒙&#xff08;HarmonyOS&#xff09;项目方舟框架&#xff08;ArkUI&#xff09;之文本组件 一、操作环境 操作系统: Windows 10 专业版 IDE:DevEco Studio 3.1 SDK:HarmonyOS 3.1 二、文本组件 Text 是显示文本的基础组件之一&#xff0c;它可以包含子组件 Span &…

软件测试面试八股文(超详细整理)

请你说一说测试用例的边界 参考回答&#xff1a; 边界值分析法就是对输入或输出的边界值进行测试的一种黑盒测试方法。通常边界值分析法是作为对等价类划分法的补充&#xff0c;这种情况下&#xff0c;其测试用例来自等价类的边界。 常见的边界值 1)对16-bit 的整数而言 32…

ArkTS-一次开发,多端部署

展示 官方代码适配解读 官方代码&#xff1a;一次开发&#xff0c;多端部署-视频应用 解读 适配多端&#xff1a;根据屏幕大小来判断不同客户端&#xff0c;BreakpointSystem.ets中引入官方API获取 ohos.mediaqueryCommonConstants.ets定义好不同屏幕范围大小&#xff0c;供需…

反爬虫介绍及其处理方法

反爬虫机制 封IP&#xff1a;监控短时间内同一地址的请求次数过大登录及验证码&#xff1a;对于监控后封IP之后短时间内继续的大量请求&#xff0c;要求登陆或验证码通过验证之后才能继续进行。健全账号体制&#xff1a;即核心数据只能通过账号登录后才能进行访问。动态加载数…

Xpath注入

这里学习一下xpath注入 xpath其实是前端匹配树的内容 爬虫用的挺多的 XPATH注入学习 - 先知社区 查询简单xpath注入 index.php <?php if(file_exists(t3stt3st.xml)) { $xml simplexml_load_file(t3stt3st.xml); $user$_GET[user]; $query"user/username[name&q…
最新文章