简单实现,在nodejs中简单使用kafka

什么是 Kafka

Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统

Kafka 的基本术语

消息:Kafka 中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。

批次:为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。

主题:消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。

分区:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序

一、本地简单搭建kafka

1.1 下载并解压2.8.1版本 或者其他版本(点击下载:Apache Kafka)

下载后,解压到指定文件夹,并创建两个文件夹以后使用

1.2修改(如下图)配置文件

1.config下的zookeeper.properties,修改为刚才创建data的真实路径

2.config下的server.properties,修改为刚才创建kafka-log的真实路径

1.3启动项目 顺序启动

1.启动zookeeper

到解压文件夹下执行:不要关闭窗口

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

2.启动kafka

到解压文件夹下执行:不要关闭窗口

.\bin\windows\kafka-server-start.bat .\config\server.properties

3.创建一个为test的topic

不要关闭窗口

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

4.创建生产者

到解压文件夹下执行:

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test1


5.创建消费者

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning

6.测试一下

生产者发消息后消费者是否可以收到

二.在nodejs中使用kafka

2.1安装kafka-node
命令 npm install kafka-node

2.2 创建生产者
import kafka  = require('kafka-node');
var client = new kafka.KafkaClient({kafkaHost:'127.0.0.1:9092'});
var Producer = kafka.Producer;
var topic = 'test1';
var payloads = [{  // 需要发送的一些配置信息
        topic: topic,
        messages: 'arrange'  // 需要生产的消息
    }];  // 此处必须要使用数组的形式,因为payloads的遍历采用的是foreach
producer.on('ready', function () {
   producer.send(payloads, function (err, data) {
      console.log(payloads);
      console.log("=======");
      console.log(data);
   });
})

producer.on('error', function (err) {
  console.log('error', err);
});

2.3创建消费者

import kafka  = require('kafka-node');
var client = new kafka.KafkaClient({kafkaHost:'127.0.0.1:9092'});
var topic = 'test1';
var payloads = [{  // 需要发送的一些配置信息
        topic: topic,
        messages: 'arrange'  // 需要生产的消息
    }];  // 此处必须要使用数组的形式,因为payloads的遍历采用的是foreach
var options = {  // 消费者的选择
    host: 'localhost:9092',
    sessionTimeout: 15000,
    autoCommit: true
};
var consumer = new kafka.Consumer(client, payloads, options);
consumer.on('message', function (message) {
    console.log(message);
});
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/139120.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Oracle 账户被锁:the account is locked 解决方法

Oracle 账户被锁:the account is locked 解决方法 连接Oracle数据库时报错账户已锁定错误 解决方法一:命令行模式: 步骤一: WinR打开命令行输入:sqlplus 使用system或sys账户以管理员身份登录,口令即安装…

视觉分词器统一图文信息,快手提出基座模型 LaVIT 刷榜多模态任务

你是否想过,有朝一日能够仅输入寥寥数语或图片,就可以一键检索最为匹配的短视频内容。不是凭借视频标签、也不是依靠标题字幕,而是大模型真正理解了视频内容。近期,来自快手的新研究利用视觉分词器统一图文信息,LaVIT …

福建科立讯通信 指挥调度管理平台RCE漏洞复现

0x01 产品简介 福建科立讯通信指挥调度管理平台是一个专门针对通信行业的管理平台。该产品旨在提供高效的指挥调度和管理解决方案,以帮助通信运营商或相关机构实现更好的运营效率和服务质量。该平台提供强大的指挥调度功能,可以实时监控和管理通信网络设…

c语言-数据结构-带头双向循环链表

目录 1、双向循环链表的结构 2、双向循环链表的结构体创建 3、双向循环链表的初始化 3.1 双向链表的打印 4、双向循环链表的头插 5、双向循环链表的尾插 6、双向循环链表的删除 6.1 尾删 6.2 头删 6.3 小节结论 7、查找 8、在pos位置前插入数据 9、删除pos位…

机器人仿真GAZEBO开源代码分享

1、https://github.com/PRBonn/agribot 2、https://github.com/ros-mobile-robots/diffbot

腾讯待办为什么停止运营?ics文件如何导入日程APP继续使用?

有不少网友表示自己想要记录待办事项、设置待办提醒的时候,会直接使用微信中的腾讯待办小程序来记录。但是最近这段时间在使用这款小程序设置待办提醒的时候,看到了“业务关停通知”的弹窗,大意就是说,腾讯待办将于2023年12月20日…

【开源项目】snakeflow流程引擎研究

项目地址 https://gitee.com/yuqs/snakerflow https://toscode.mulanos.cn/zc-libre/snakerflow-spring-boot-stater (推荐) https://github.com/snakerflow-starter/snakerflow-spring-boot-starter 常用API 部署流程 processId engine.process().de…

如何实现公网远程访问本地OpenGauss数据库【内网穿透】

文章目录 前言1. Linux 安装 openGauss2. Linux 安装cpolar3. 创建openGauss主节点端口号公网地址4. 远程连接openGauss5. 固定连接TCP公网地址6. 固定地址连接测试 前言 openGauss是一款开源关系型数据库管理系统,采用木兰宽松许可证v2发行。openGauss内核深度融合…

十七、W5100S/W5500+RP2040树莓派Pico<HTTP Server网页显示>

文章目录 1 前言2 简介2 .1 什么是HTTP?2.2 HTTP的优点2.3 HTTP工作原理2.4 HTTP应用场景 3 WIZnet以太网芯片4 HTTP网络设置示例概述以及使用4.1 流程图4.2 准备工作核心4.3 连接方式4.4 主要代码概述4.5 结果演示 5 注意事项6 相关链接 1 前言 HTTP是互联网上应用…

MapInfo Pro “偏移”命令

偏移对象的用途是什么? 将一个或多个地图对象移动特定距离和/或方向,并将其放置在可编辑层中。对象可以来自任何层。您可以在选择操作后聚合数据。 ​ “偏移对象”何时处于活动状态? 当“贴图”窗口为活动窗口时,该窗口具有可编…

【FastCAE源码阅读8】调用gmsh生成网格

FastCAE使用gmsh进行网格划分,划分的时候直接启动一个新的gmsh进程,个人猜测这么设计是为了规避gmsh的GPL协议风险。 进行网格划分时,其大体运行如下图: 一、Python到gmshModule模块 GUI操作到Python这步不再分析,比…

基于《环境影响评价技术导则大气环境(HJ 2.2-2018)》的AERMOD模型配置方法

数值模式模拟是分析大气污染物时空分布和成分贡献的重要工具,利用模拟结果可以分析大气污染的来源、成因、污染程度、持续时间、主要成分、相对贡献等问题,有助于分析并合理控制污染源排放,为产业调整提供参考。当前,针对不同理论…

深度学习 python opencv 实现人脸年龄性别识别 计算机竞赛

文章目录 0 前言1 项目课题介绍2 关键技术2.1 卷积神经网络2.2 卷积层2.3 池化层2.4 激活函数:2.5 全连接层 3 使用tensorflow中keras模块实现卷积神经网络4 Keras介绍4.1 Keras深度学习模型4.2 Keras中重要的预定义对象4.3 Keras的网络层构造 5 数据集处理训练5.1 …

C++面向对象编程(4)——浅谈C++内存模型

目录 一. 说明 二. GDB实验 2.1 实验1:栈 2.2 实验2:堆 一. 说明 不同的操作系统对程序内存的管理和划分会有所不同。如上图所示的C内存区域划分主要是针对一般的情况,说明如下: 1. Stack:栈。由编译器管理分配和回…

CKA认证模块②-K8S企业运维和落地实战-2

CKA认证模块②-K8S企业运维和落地实战-2 K8S常见的存储方案及具体应用场景分析 k8s存储-empty emptyDir类型的Volume是在Pod分配到Node上时被创建,Kubernetes会在Node上自动分配一个目录,因此无需指定宿主机Node上对应的目录文件。 这个目录的初始内容…

从0到0.01入门React | 005.精选 React 面试题

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…

快速批量去除文件夹名称中多余重复文字!一键轻松优化文件夹命名!

您是否曾经因为文件夹名称中多余重复文字而烦恼?是否因为文件夹重命名而浪费大量时间?现在,我们为您推荐一款全新的文件夹批量改名工具——快速批量去除文件夹名称中多余重复文字,轻松实现文件夹改名优化,让您的整理效…

企业微信后台通过小程序给员工发送文字信息附带超链接实现(加上A标签:<a href=“网址“> </a>)

如下&#xff0c;在编辑文本消息的时候&#xff0c;添加上HTML的A标签 <a href"www.baidu"> </a>即可实现点击直接跳转

移远EC600U-CN开发板 day04

控件探索-滑杆&#xff08;lv.slider&#xff09; 1. 显示一个简单的滑杆 def slider_event_cb(evt): slider evt.get_target()# 修改label的值label.set_text(str(slider.get_value()))slider lv.slider(scr) #创建滑杆组件 slider.set_width(200) #设置滑杆宽…

上门洗衣洗鞋app小程序

上门洗衣洗鞋app小程序作为专业的帮助用户洗衣服务的软件,许多朋友都使用过。在这里,小编就帮助大家收集一些非常不错的洗衣洗鞋软件。 不知道大家是否还在为洗衣而烦恼,而怕麻烦,现在大家都在用网上的洗衣洗鞋小程序来洗衣服,用户只需要打开手机软件,发起订单,门店即可收到订单…
最新文章