Kafka应用Demo:按主题订阅消费消息

安装环境

  Kafka安装可参考官方网站的指导(https://kafka.apache.org/quickstart), 按步骤解压压缩包,修改配置。然后再启动zookeeper和kafka-server即可。

  需要注意的一点:如果是在VMware虚拟机上启动的kafka, 需要修改一下server.properties配置文件,增加如下配置:
在这里插入图片描述
  advertised.listener指定访问kafka的IP和端口,IP设置为虚拟机暴露给外部访问的IP。通过本地代码连接kafka,需要使用该配置

生产者代码样例

public class KafkaProducerService {
    private static final String NEO_TOPIC = "elon-topic";
    private KafkaProducer<String, String> producer = null;

    public KafkaProducerService() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.5.128:9092");
        props.put("acks", "0");
        props.put("group.id", "1111");
        props.put("retries", "2");
        //设置key和value序列化方式
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);

        //生产者实例
        producer = new KafkaProducer<>(props);
    }

    /**
     * 外部调用的发消息接口
     */
    public void sendMessage() {
        for (int i = 0; i < 10; ++i) {
            int p = i % 2;
            ProducerRecord<String, String> record = new ProducerRecord(NEO_TOPIC, p, "neo", JSON.toJSONString(i));
            producer.send(record);
        }
    }
}

 发送消息时,将10个数据分别发送到0分区和1分区。

消费者代码样例

public class KafkaConsumerService {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);
    private static final String NEO_TOPIC = "elon-topic";

    Properties properties = new Properties();
    private KafkaConsumer consumer = null;

    public KafkaConsumerService() {
        properties.put("bootstrap.servers","192.168.5.128:9092");  // 指定 Broker
        properties.put("group.id", "neo1");              // 指定消费组群 ID
        properties.put("max.poll.records", "5");
        properties.put("enable.auto.commit", "false");
        properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象
        properties.put("value.deserializer", StringDeserializer.class);  // 将 value 的字节数组转成 Java 对象

        consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singletonList(NEO_TOPIC));  // 订阅主题 order-events
        new Thread(this::receiveMessage).start();
    }

    public void receiveMessage() {
        try {
            while (true) {
                synchronized (this) {
                    ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                    LOGGER.info("Fetch record num:{}", records.count());
                    for (ConsumerRecord<String,String> record: records) {
                        String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value());
                        LOGGER.info("Received:" + info);
                        Thread.sleep(100);
                    }
                    consumer.commitSync();
                }
            }
        } catch (Exception e){

        } finally {
            consumer.close();
        }
    }

 消费者按主题订阅。从打印的结果可以看到,消费者循环从topic下取出各个分区的消息依次消费。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/601087.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue组件传参数

在使用vue3进行开发的时候&#xff0c;我们一定绕不开的一个技术栈&#xff0c;就是组件传参。接下来我将介绍在vue3中如何运用这项技术。 在组件传参数中&#xff0c;分为两类&#xff0c;父传子参&#xff0c;或子传父参。需要了解的两个方法就是defineProps和defineEmits。…

【快捷部署】022_ZooKeeper(3.5.8)

&#x1f4e3;【快捷部署系列】022期信息 编号选型版本操作系统部署形式部署模式复检时间022ZooKeeper3.5.8Ubuntu 20.04tar包单机2024-05-07 一、快捷部署 #!/bin/bash ################################################################################# # 作者&#xff…

【Linux 命令操作】如何在 Linux 中使用多行注释呢?

文章目录 1. 给代码进行多行注释2. 给代码取消多行注释 1. 给代码进行多行注释 &#x1f427;① 首先用 vim 打开代码&#xff0c;按 Esc进入命令模式(Normal mode)&#xff1b; &#x1f427;② 然后按住 ctrl v 进入列模式&#xff1b; &#x1f427;③ 再通过按 h(左)、j(…

牛客网刷题 | BC80 奇偶统计

目前主要分为三个专栏&#xff0c;后续还会添加&#xff1a; 专栏如下&#xff1a; C语言刷题解析 C语言系列文章 我的成长经历 感谢阅读&#xff01; 初来乍到&#xff0c;如有错误请指出&#xff0c;感谢&#xff01; 描述 任意输入一个正整数…

Databend 开源周报第 143 期

Databend 是一款现代云数仓。专为弹性和高效设计&#xff0c;为您的大规模分析需求保驾护航。自由且开源。即刻体验云服务&#xff1a;https://app.databend.cn 。 Whats On In Databend 探索 Databend 本周新进展&#xff0c;遇到更贴近你心意的 Databend 。 了解 Databend …

后端开发面经系列 -- 滴滴C++一面面经

滴滴C一面面经 公众号&#xff1a;阿Q技术站 来源&#xff1a;https://www.nowcoder.com/feed/main/detail/38cf9704ef084e27903d2204352835ef 1、const在C和C区别&#xff0c;const定义的类成员变量如何初始化&#xff1f; 区别 C中的const&#xff1a; 在C中&#xff0c;c…

STM32单片机ADC功能详解

文章目录 1. ADC概述 2. ADC结构图 3. 引脚定义 4. 转换模式 5. 数据对齐 6. 转换时间 7. 硬件电路 8. STM32使用ADC单/多通道检测数据 1. ADC概述 功能&#xff1a;ADC是一个将模拟信号&#xff08;如电压&#xff09;转换为数字信号的设备。在微控制器中&#xff0c…

Vitis HLS 学习笔记--理解串流Stream(1)

目录 1. 介绍 2. 示例 2.1 代码解析 2.2 串流数据类型 2.3 综合报告 3. 总结 1. 介绍 在Vitis HLS中&#xff0c;hls::stream是一个用于在C/C中进行高级合成的关键数据结构。它类似于C标准库中的std::stream&#xff0c;但是专门设计用于硬件描述语言&#xff08;如Veri…

六级段落匹配

一个段落最多匹配2个句子 一个段落对应&#xff1a;0-2 适当放题 找到三个对应点就可以选 每看三个句子划关键词之后再自己回忆一遍关键词&#xff0c;看了36 37 38 就复述一遍关键词看了39 40 41就又从36开始复述关键词&#xff08;334&#xff09;看到最后一句话就又从头…

Linux实操之常用指令详解

文章目录 vi 和 vimvi 和 vim 基本使用 开机、重启和用户登录注销关机&重启命令用户登录和注销 用户管理基本介绍基本语法细节说明修改密码删除用户查询用户信息指令切换用户查看当前用户/登录用户用户组 实用指令指定运行级别帮助指令文件目录类时间和日期类搜索查找类压缩…

Python | Leetcode Python题解之第74题搜索二维矩阵

题目&#xff1a; 题解&#xff1a; class Solution:def searchMatrix(self, matrix: List[List[int]], target: int) -> bool:row,col len(matrix),len(matrix[0])row_l,row_r 0,row-1while row_l < row_r:m (row_lrow_r)//2if target < matrix[m][0]:row_r m-1…

ICode国际青少年编程竞赛- Python-1级训练场-变量练习

ICode国际青少年编程竞赛- Python-1级训练场-变量练习 1、 a 8 for i in range(8):Dev.step(a)Dev.turnRight()a - 12、 a 3 for i in range(4):Dev.step(a)Dev.turnRight()a a 1 Dev.step(5)3、 a 4 for i in range(4):Dev.step(2)Dev.step(-5)Dev.step(3)Spaceship.…

JVM学习笔记【基础篇:垃圾回收】

自动垃圾回收 C/C的内存管理 ⚫ 在C/C这类没有自动垃圾回收机制的语言中&#xff0c;一个对象如果不再使用&#xff0c;需要手动释放&#xff0c;否则就会出现 内存泄漏。我们称这种释放对象的过程为垃圾回收&#xff0c;而需要程序员编写代码进行回收的方式为手动回收。 ⚫ …

ue引擎游戏开发笔记(34)——建立射击映射,并添加特效

1.需求分析&#xff1a; 准备处理射击系统&#xff0c;首先角色需要能射击&#xff0c;有反馈&#xff0c;先建立角色与控制器之间的映射&#xff0c;并添加简单特效&#xff0c;证明映射已经建立。 2.操作实现&#xff1a; 1.首先常规建立映射流程&#xff0c;具体可参考笔记…

线性表--数据结构设计与操作

单链表 1.单链表的定义&#xff1a; typedef struct LNode{Elemtype data;struct Lnode *next; }LNode ,*LinkList;//单链表的数据结构&#xff08;手写&#xff09; #include<iostream> #include<vector> #include<algorithm>typedef int TypeElem; //单链表…

如何解决3D模型变黑或贴图不显示的问题---模大狮模型网

在进行3D建模和视觉渲染时&#xff0c;经常会遇到模型表面变黑或贴图不显示的问题&#xff0c;这可能严重影响最终视觉效果的质量。这些问题通常与材质设置、光照配置或文件路径错误有关。本文将探讨几种常见原因及其解决方法&#xff0c;帮助3D艺术家和开发者更有效地处理这些…

☺☺☺☺☺☺☺栈的应用习题:有效的括号☺☺☺☺☺☺☺

目录 一解题思路&#xff1a; 二对解答代码分析&#xff1a; 三解答代码展示&#xff1a; 即浅学栈的创建后&#xff0c;可以简单利用其性质&#xff08;先进后出&#xff0c;后进先出&#xff09;来完成对一些题目的解答 如&#xff1a; 一解题思路&#xff1a; 这里我们可…

[法规规划|数据概念]金融行业数据资产和安全管理系列文件解析

“ 金融行业在自身数据治理和资产化建设方面一直走在前列。” 一直以来&#xff0c;金融行业由于其自身需要&#xff0c;都是国内开展信息化建设最早&#xff0c;信息化程度最高的行业。 同时&#xff0c;金融行业的数据治理、数据资产化以及过程中的管理要求和安全要求与其他…

Hive Partitioned Tables 分区表

Hive Partitioned Tables 分区表 1.分区表概念 Hive分区表&#xff08;Partitioned Tables&#xff09;是一种用于管理大量数据的机制&#xff0c;它可以将数据分散到不同的目录或分区中&#xff0c;以提高查询性能、优化数据存储和管理。 这种表结构可以根据某个列的值进行分…

【Ansible】ansible-playbook剧本

playbook 是ansible的脚本 playbook的组成 1&#xff09;Tasks&#xff1a;任务&#xff1b;通过tasks 调用ansible 的模板将多个操作组织在一个playbook中运行 2&#xff09;Variables&#xff1a;变量 3&#xff09;Templates&#xff1a;模板 4&#xff09;Handles&#xf…
最新文章