【Kafka】SASL认证的Kafka客户端代码示例(spring-kafka和原生客户端)

文章目录

  • spring-kafka
  • 原生客户端
  • Tips

spring-kafka

添加依赖:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.6.3</version>
        </dependency>

添加spring-kafka相关配置:

#=============== 集群通用配置 ================
spring.kafka.bootstrap-servers=kafka-dyskevxt-headless.kafka-uat.svc.xke.test.xdf.cn:29092
spring.kafka.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="zhurunhua" password="pwd";

#=============== producer  =================
spring.kafka.producer.retries=5
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=1000
spring.kafka.producer.buffer-memory=1000000
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  ==================
spring.kafka.consumer.group-id=zhurunhua-test-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

消费者:

@Component
public class TestPlainConsumer {

    @KafkaListener(topics = {"zhurunhua-test-topic"})
    public void consumer(ConsumerRecord<String, String> record) {
        System.out.println(record.value());
    }

}

topic可以从配置文件读取,代码中通过${}的方式获取配置的topic:

@Component
public class TestPlainConsumer {

    @KafkaListener(topics = {"${kafka.subscribe.topic}"})
    public void consumer(ConsumerRecord<String, String> record) {
        System.out.println(record.value());
    }

}

生产者:

@Component
public class TestPlainProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${kafka.subscribe.topic}")
    private String topic;

    public void send(String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.err.println(throwable);
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                System.out.println("发送结果:" + sendResult);
            }
        });
    }
}

原生客户端

引入依赖

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.6.0</version>
      <scope>compile</scope>
    </dependency>

客户端代码

public class TestSaslClient {

    private final static String TOPIC = "zhurunhua-test-topic";

    private final static String BROKERS = "kafka-dyskevxt-headless.kafka-uat.svc.xke.test.xdf.cn:29092";

    private static KafkaConsumer<String, String> consumer;

    private static KafkaProducer<String, String> producer;

    static {
        Properties c = initConfig();
        consumer = new KafkaConsumer<>(c);
        producer = new KafkaProducer<>(c);
    }

    /**
     * 初始化配置
     *
     * @return java.util.Properties
     * @date 2023/04/17 5:51 下午
     */
    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "zhurunhua-test-1");

        // SASL/PLAIN 认证配置
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT);
        props.put(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM);
        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"zhurunhua\" password=\"pwd\";");

        // 可选设置属性
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // 自动提交offset,每1s提交一次
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        return props;
    }

    /**
     * 消费者示例
     *
     * @date 2023/04/17 5:51 下午
     */
    public void subscribe() {
        consumer.subscribe(Collections.singleton(TOPIC));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record);
                }
            }
        } catch (Exception e) {
            System.out.println("consumer error : " + e);
        } finally {
            consumer.close();
        }
    }

    /**
     * 生产者示例
     *
     * @date 2023/04/17 5:52 下午
     */
    public void send() {
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, 0, "zhurunhua", "测试消息");
        producer.send(record, (recordMetadata, e) -> {
            if (Objects.nonNull(e)) {
                System.out.println("send error: " + e);
            } else {
                System.out.println(recordMetadata);
            }
        });
    }
}

Tips

  1. SecurityProtocol定义在枚举:org.apache.kafka.common.security.auth.SecurityProtocol中:

  1. sasl.mechanism配置项有:
    1. GSSAPI
    2. PLAIN
    3. SCRAM-SHA-256
    4. SCRAM-SHA-512
    5. OAUTHBEARER

更多安全和认证相关资料,参考:https://kafka.apache.org/documentation/#security_overview

  1. 若使用ssl,相关配置示例:

    spring.kafka.ssl.trust-store-type=JKS
    spring.kafka.ssl.key-store-type=JKS
    spring.kafka.ssl.protocol=SSL
    spring.kafka.ssl.key-store-password=rxu4G5kPyAqTkERk
    spring.kafka.ssl.trust-store-password=rxu4G5kPyAqTkERk
    spring.kafka.ssl.key-store-location=/kafka-jks/keystore.jks
    spring.kafka.ssl.trust-store-location=/kafka-jks/keystore.jks
    
  2. 使用不同的sasl.mechanism时,需要注意sasl.jaas.config的配置中,LoginModule不同:

    1. PLAIN对应的是:PlainLoginModule
    2. SCRAM相关对应的是:ScramLoginModule
  3. kafka认证的参数相对来说比较复杂,需要理解每个参数的含义,错一个就会失败,启动日志会打印客户端配置,可用于协助排查问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/12682.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【springcloud 微服务】Spring Cloud Alibaba Nacos使用详解

目录 一、前言 二、nacos介绍 2.1 什么是 Nacos 2.2 nacos 核心能力 2.2.1 服务发现和服务健康监测 2.2.2 动态配置服务 2.2.3 动态 DNS 服务 2.2.4 服务及其元数据管理 2.2.5 nacos生态地图 2.3 与其他配置中心对比 三、nacos快速部署 3.1 获取安装包 3.2 修改脚…

电阻器的原理、类型、参数以及生活中常见的应用

电阻器是电子电路中最基本的元件之一&#xff0c;它的作用是限制电流流过的大小&#xff0c;在电子电路中广泛应用于电流控制、电压分压、信号衰减等方面。在本文中&#xff0c;我们将详细介绍电阻器的原理、类型、参数以及生活中常见的应用。 一、电阻器的原理 电阻器是一种…

go语言切片做函数参数传递+append()函数扩容

go语言切片函数参数传递append()函数扩容 给你二叉树的根节点 root 和一个整数目标和 targetSum &#xff0c;找出所有 从根节点到叶子节点 路径总和等于给定目标和的路径。 二叉树递归go代码&#xff1a; var ans [][]int func pathSum(root *TreeNode, targetSum int) ( [][…

ActiveMQ使用(四):在JavaScript中发送的MQTT消息在SpringBoot中变为字节数组

ActiveMQ使用(四):在JavaScript中发送的MQTT消息在SpringBoot中变为字节数组 1. 问题描述 JmsListener(destination "test_producer", containerFactory "topicListenerContainer")public void receiveTestProducer(String message) throws JMSExceptio…

spring bean

图灵课堂学习笔记 1. BeanFactory与ApplicationContext的关系 p56 ApplicationContext在BeanFactory基础上对功能进行了扩展&#xff0c;例如&#xff1a;监听功能、国际化功能等。BeanFactory的API更偏向底层&#xff0c;ApplicationContext的API大多数是对这些底层API的封…

JVM OOM问题排查与解决思路

OOM原因 1. 堆溢出 报错信息&#xff1a; java.lang.OutOfMemoryError: Java heap space 代码中可能存在大对象分配&#xff0c;无法获得足够的内存分配 可能发生内存泄露&#xff0c;导致内存被无效占用以至于耗尽 2. 永久代/元空间溢出 报错信息&#xff1a; java.lang.O…

C#,码海拾贝(19)——一般实矩阵的QR分解(QR Decomposition)方法之C#源代码,《C#数值计算算法编程》源代码升级改进版

1 实矩阵 实矩阵&#xff0c;指的是矩阵中所有的数都是实数的矩阵。如果一个矩阵中含有除实数以外的数&#xff0c;那么这个矩阵就不是实矩阵。 2 QR&#xff08;正交三角&#xff09;分解法 QR&#xff08;正交三角&#xff09;分解法是求一般矩阵全部特征值的最有效并广泛应…

Flowable从入门到源码分析

什么是工作流&#xff1f; 工作流&#xff0c;是把业务之间的各个步骤以及规则进行抽象和概括性的描述。使用特定的语言为业务流程建模&#xff0c;让其运行在计算机上&#xff0c;并让计算机进行计算和推动。 工作流解决的痛点在于&#xff0c;解除业务宏观流程和微观逻辑的…

jenkins gitlab asp.net core持续集成

什么是jenkins Jenkins直接取自其官方文档&#xff0c;是一个独立的开源自动化服务器&#xff0c;您可以使用它来自动执行与构建、测试、交付或部署软件相关的各种任务。 jenkins可以干什么 Jenkins 通过自动执行某些脚本来生成部署所需的文件来工作。这些脚本称为JenkinsFi…

2023_深入学习HTML5

H5 基于html5和 css3和一部分JS API 结合的开发平台(环境) 语义化标签 header : 表示头部&#xff0c;块级元素 footer &#xff1a; 表示底部&#xff0c;块级元素 section &#xff1a;区块 nav &#xff1a; 表示导航链接 aside &#xff1a; 表示侧边栏 output &am…

二叉搜索树(BSTree)

目录 一、二叉搜索树 二、二叉搜索树的接口及实现 1、二叉搜索树的查找 2、二叉搜索树的插入 3、二叉搜索树的删除 三、二叉搜索树的递归版本 本期博客主要分享二叉搜索树的底层实现。(主要是笔记&#xff0c;供自己复习使用&#x1f602;) 一、二叉搜索树 二叉搜索树(B…

MybatisPlus主键策略

Mybatis默认主键策略是TableId(type IdType.ASSIGN_ID) 这是默认策略雪花算法 此时主键类型可以是String 数据表字段类型可以是bigint int varchar 无需数据表主键自增 TableId(type IdType.ASSIGN_AUTO) 是主键自增策略:该策略为跟随数据库表的主键递增策略&…

一致性框架设计方案

补充组件依赖 前言 对于供应链业务&#xff0c;一般对数据一致性要求高。且由于业务复杂&#xff0c;可能会存在一个业务功能触发几个异步操作的场景&#xff0c;且要保证相关操作同时触发或不触发。 为了降低技术设计难度、代码编写难度&#xff0c;特意设计最终一致性框架&a…

ChatGPT+Ai绘图【stable-diffusion实战】

ai绘图 stable-diffusion生成【还有很大的提升空间】 提示词1 Picture a planet where every living thing is made of light. The landscapes are breathtakingly beautiful, with mountains and waterfalls made of swirling patterns of color. What kind of societies m…

程序员跳槽,要求涨薪50%过分吗?

如果问在TI行业涨工资最快的方式是什么&#xff1f; 回答最多的一定是&#xff1a;跳槽&#xff01; 前段时间&#xff0c;知乎上这样一条帖子引发了不少IT圈子的朋友的讨论 &#xff0c;有网友提问 “程序员跳槽要求涨薪50%过分吗&#xff1f;” 截图来源于知乎&#xff0c;…

摄影知识整理

目录 焦距 焦距分类 对焦 相机的MF与AF 自动对焦操作 自动对焦方式 镜头防抖 防抖模式 景深 景深的作用 影响景深的因素 景深预览 摄影三大元素 光圈 光圈的作用 光圈与景深的关系 感光度&#xff08;ISO) 注意 感光度的作用 快门 B门与T门 快门速度 闪…

【SSM】SpringMVC(三:SpringMVC拦截器)

文章目录 1. 登录案例2. 拦截器2.1 应用2.2 拦截器的执行原理2.3 拦截器执行的时机2.4 拦截器的实现方法2.5 拦截器的实现步骤2.6 开发拦截器 1. 登录案例 【login.jsp】 <%--Created by IntelliJ IDEA.User: BeyongDate: 2023/4/17Time: 11:43To change this template use…

SQL的函数

文章目录 一、SQL LCASE() 函数二、SQL MID() 函数三、SQL LEN() 函数四、SQL ROUND() 函数五、SQL NOW() 函数六、SQL FORMAT() 函数总结 一、SQL LCASE() 函数 LCASE() 函数把字段的值转换为小写。 SQL LCASE() 语法 SELECT LCASE(column_name) FROM table_name;用于 SQL …

入行IC选择国企、私企还是外企?(内附各IC大厂薪资福利情况)

不少人想要转行IC&#xff0c;但不知道该如何选择公司&#xff1f;下面就来为大家盘点一下IC大厂的薪资和工作情况&#xff0c;欢迎大家在评论区补充。 一&#xff0e;老 牌 巨 头 在 IC 设计领域深耕许久&#xff0c;流程完善、技术扎实&#xff0c;公司各项制度都很完善、前…

IT知识百科:什么是暴力破解?

暴力破解是一种常见的网络安全攻击方法&#xff0c;它利用计算机程序自动尝试大量的密码组合来破解密码。这种攻击方法通常用于获取未经授权的访问权限&#xff0c;如入侵网络系统或个人账户。在本文中&#xff0c;我们将探讨暴力破解的原理、工具和防范方法。 暴力破解的原理 …
最新文章