kafka微服务学习

消息中间件对比:
1、吞吐、可靠性、性能
在这里插入图片描述

Kafka安装

Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

  • Docker安装zookeeper

下载镜像:

docker pull zookeeper:3.4.14

创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
  • Docker安装kafka

下载镜像:

docker pull wurstmeister/kafka:2.12-2.3.1

创建容器

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

kafka入门

  • 生产者发送消息,多个消费者只能有一个消费者接收到消息
  • 生产者发送消息,多个消费者都可以接收到消息

(1)创建kafka-demo项目,导入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

(2)生产者发送消息

package com.heima.kafka.sample;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * 生产者
 */
public class ProducerQuickStart {

    public static void main(String[] args) {
        //1.kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        //发送失败,失败的重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,5);
        //消息key的序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        //消息value的序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //2.生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);

        //封装发送的消息
        ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");

        //3.发送消息
        producer.send(record);

        //4.关闭消息通道,必须关闭,否则消息发送不成功
        producer.close();
    }

}

(3)消费者接收消息

package com.heima.kafka.sample;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * 消费者
 */
public class ConsumerQuickStart {

    public static void main(String[] args) {
        //1.添加kafka的配置信息
        Properties properties = new Properties();
        //kafka的连接地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
        //消息的反序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        //2.消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        //3.订阅主题
        consumer.subscribe(Collections.singletonList("itheima-topic"));

        //当前线程一直处于监听状态
        while (true) {
            //4.获取消息
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key());
                System.out.println(consumerRecord.value());
            }
        }

    }

}

kafka高可用设计

1、设计集群模式:

Kafka的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个Broker 组成。当一个机器宕机了,另外一个机器就会替补山
在这里插入图片描述

2、备份机制:

Kafka定义了两类副本

  1. 领导者副本(Leader Replica)
  2. 追随者副本 (Follower Replica)
    追随者副本分为两类:
    1、一种是ISR副本,同步保存
    2、普通的副本,异步保存
    出现主节点宕机,会先选ISR副本中的一个成为新的主节点,保证数据一致性,没有ISR节点,再从普通节点中挑选
    针对全部节点宕机的情况,有两种策略:
    1、等待第一个ISR副本,保证了数据的尽可能一致
    2、等待一个复活的追随者,无论是ISR还是普通,提高系统的高可用性。

kafka生产者详解

1发送类型

  • 同步发送

    使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功

RecordMetadata recordMetadata = producer.send(kvProducerRecord).get();
System.out.println(recordMetadata.offset());
  • 异步发送

    调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数

//异步消息发送
producer.send(kvProducerRecord, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if(e != null){
            System.out.println("记录异常信息到日志表中");
        }
        System.out.println(recordMetadata.offset());
    }
});

2参数详解

  • ack

代码的配置方式:

//ack配置  消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,"all");

参数的选择说明

确认机制说明
acks=0生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快
acks=1(默认值)只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
acks=all只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应
  • retries

生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms

代码中配置方式:

//重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);
  • 消息压缩

默认情况下, 消息发送时不会被压缩。

代码中配置方式:

//数据压缩
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
压缩算法说明
snappy占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用
lz4占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观
gzip占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法

使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。

kafka消费者

消息的有序性

方法:一个topic分区能保证自己的数据是按照先后消费的,但是不能保证跨分区消息处理的先后顺序。我么只能使用一个分区,在单分区种,消息可以保证严格顺序消费

提交和偏移量

在这里插入图片描述
自动提交:
当enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll0方法接收的最大偏移量提交上去,这样只是记录了规定时间内的最大偏移量,其实与数据提交的偏移量存在偏差,因此可能会出现数据的重复提交或者丢失
手动提交
当enableauto.commit被设置为false可以有以下三种提交方式

  • 提交当前偏移量(同步提交)
  • 异步提交
  • 同步和异步组合提交

同步提交:commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。

while (true){
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
        System.out.println(record.key());
        try {
            consumer.commitSync();//同步提交当前最新的偏移量
        }catch (CommitFailedException e){
            System.out.println("记录提交失败的异常:"+e);
        }

    }
}

异步提交:手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。消息没有重试机制

while (true){
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record.value());
        System.out.println(record.key());
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
            if(e!=null){
                System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
            }
        }
    });
}

同步和异步组合提交

异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖

举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。

try {
    while (true){
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
            System.out.println(record.key());
        }
        consumer.commitAsync();
    }
}catch (Exception e){+
    e.printStackTrace();
    System.out.println("记录错误信息:"+e);
}finally {
    try {
        consumer.commitSync();
    }finally {
        consumer.close();
    }
}

springboot整合kafka

1、在父类中的pop文件中导入依赖包

```xml
<!-- kafkfa -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
</dependency>

2、在需要用到kafka的微服务的naco中分别配置生产者和消费者配置

spring:
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
spring:
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    consumer:
      group-id: ${spring.application.name}
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

传递消息为对象

目前springboot整合后的kafka,因为序列化器是StringSerializer,这个时候如果需要传递对象可以有两种方式

方式一:可以自定义序列化器,对象类型众多,这种方式通用性不强,本章节不介绍

方式二:可以把要传递的对象进行转json字符串,接收消息后再转为对象即可,本项目采用这种方式

  • 发送消息
@GetMapping("/hello")
public String hello(){
    User user = new User();
    user.setUsername("xiaowang");
    user.setAge(18);

    kafkaTemplate.send("user-topic", JSON.toJSONString(user));

    return "ok";
}
  • 接收消息
package com.heima.kafka.listener;

import com.alibaba.fastjson.JSON;
import com.heima.kafka.pojo.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class HelloListener {

    @KafkaListener(topics = "user-topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            User user = JSON.parseObject(message, User.class);
            System.out.println(user);
        }

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/126417.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Redis缓存架构实战常见问题剖析】

文章目录 一、Redis缓存架构实战剖析1.1、大规模的商品缓存数据冷热分离机制1.2、缓存击穿导致线上数据压力暴增解决方案1.3、缓存穿透及其解决方案剖析1.4、突发性的热点缓存数重建导致系统压力暴增问题分析1.5、Redis分布式锁解决缓存与数据库双写不一致问题剖析1.6、利用多级…

Python机器学习算法入门教程(第四部分)

接着Python机器学习算法入门教程&#xff08;第三部分&#xff09;&#xff0c;继续展开描述。 十九、信息熵是什么 通过前两节的学习&#xff0c;我们对于决策树算法有了大体的认识&#xff0c;本节我们将从数学角度解析如何选择合适的“特征做为判别条件”&#xff0c;这里…

微服务 Spring Cloud 5,一图说透Spring Cloud微服务架构

目录 一、域名系统DNS二、LVS&#xff08;Linux Virtual Server&#xff09;,Linux虚拟服务器三、CDN静态资源四、Nginx反向代理服务器1、Nginx的主要作用体现在以下几个方面&#xff1a;2、Nginx静态资源服务和CDN静态资源服务&#xff0c;如何选择&#xff1f; 五、Gateway网…

C#上位机序列10: Winform上位机通用框架

C#上位机序列1: 多线程&#xff08;线程同步&#xff0c;事件触发&#xff0c;信号量&#xff0c;互斥锁&#xff0c;共享内存&#xff0c;消息队列&#xff09; C#上位机序列2: 同步异步(async、await) C#上位机序列3: 流程控制&#xff08;串行&#xff0c;并行&#xff0c…

Panorama SCADA平台的警报通知功能配置详解

1. 前言 SCADA系统的主要目标是采集与监控工业过程数据&#xff0c;以确保工业生产正常运行。通过实时警报通知功能&#xff0c;操作人员可以立即获取有关潜在问题的信息&#xff0c;从而能够快速采取行动解决问题&#xff0c;防止进一步的损害或生产中断。因此&#xff0c;及…

三相电机的某些实测特性曲线

三相电机参数&#xff1a; 0.75KW&#xff0c;额定电流是2A&#xff0c;功率因数0.71&#xff0c;效率78.9%。制式S1. 1.负载不变时的线电压与线电流的关系 1.1相关数据与python代码&#xff1a; 这里记录了一系列的实验&#xff1a; 第一组实验&#xff1a;近乎空载&#xf…

企业微信开启接收消息+验证URL有效性

企业微信开启接收消息验证URL有效性 &#x1f4d4; 千寻简笔记介绍 千寻简笔记已开源&#xff0c;Gitee与GitHub搜索chihiro-notes&#xff0c;包含笔记源文件.md&#xff0c;以及PDF版本方便阅读&#xff0c;且是用了精美主题&#xff0c;阅读体验更佳&#xff0c;如果文章对…

[Framework] Android Handler 工作原理

作者&#xff1a;Tans5 Android 中的 Handler 都被人说烂了&#xff0c;但是还是想多说一次&#xff0c;因为在 Android 的系统中它真的非常重要而且它的机制并没有很复杂&#xff0c;无论是新手和老手都可以好好学习下&#xff0c;这对理解 Android 系统很重要&#xff0c;所以…

如何进行网站测试

随着市场和技术的快速发展&#xff0c;产品需要不断更新和改进以保持竞争力&#xff0c;如果产品停滞不前&#xff0c;很可能会被市场淘汰。通过持续发展&#xff0c;企业可以不断优化产品&#xff0c;提高用户体验&#xff0c;从而赢得市场份额和客户忠诚度。而数通在激烈的市…

计算机毕业设计项目选题推荐(免费领源码)Java+springboot+Mysql停车微信小程序小程序92714

摘 要 在信息飞速发展的今天&#xff0c;网络已成为人们重要的信息交流平台。每天都有大量的农产品需要通过网络发布&#xff0c;为此&#xff0c;本人开发了一个基于springboot停车微信小程序小程序。 对于本停车微信小程序的设计来说&#xff0c;它主要是采用后台采用java语…

Vue+OpenLayers 创建地图并显示鼠标所在经纬度

1、效果 2、创建地图 本文用的是高德地图 页面 <div class"map" id"map"></div><div id"mouse-position" class"position_coordinate"></div>初始化地图 var gaodeLayer new TileLayer({title: "高德地…

PDF有限制密码,不能复制怎么办?

大家现在接触PDF文件越来越多&#xff0c;有的时候在网上下载的PDF文件打开之后&#xff0c;发现选中文字之后无法复制。甚至其他功能也都无法使用&#xff0c;这是怎么回事&#xff1f;该怎么办&#xff1f; 当我们发现文件打开之后&#xff0c;编辑功能无法使用&#xff0c;很…

数据库数据迁移常见方式

数据库数据迁移常见方式 数据库数据迁移常见方式1、通过sql2、通过数据迁移工具3、云服务进行数据迁移什么是DRS服务如何使用DRS服务DRS云服务可以干什么 数据库数据迁移常见方式 1、通过sql 批量导入sql insert into tableName select * 2、通过数据迁移工具 在数据库里面…

19.9 Boost Asio 同步字典传输

这里所代指的字典是Python中的样子&#xff0c;本节内容我们将通过使用Boost中自带的Tokenizer分词器实现对特定字符串的切割功能&#xff0c;使用Boost Tokenizer&#xff0c;可以通过构建一个分隔符或正则表达式的实例来初始化tokenizer。然后&#xff0c;可以使用该实例对输…

网络工程师回顾学习(第一部分)

根据书本目录&#xff0c;写下需要记忆的地方&#xff1a; 参考之前的笔记&#xff1a; 网络工程师回答问题_one day321的博客-CSDN博客 重构第一部分需要记忆的&#xff1a; 第一章&#xff1a;计算机网络概论 计算机网络的定义和分类&#xff1a;计算机网络是指将地理位…

Azure - 机器学习:自动化机器学习中计算机视觉任务的超参数

Azure Machine Learning借助对计算机视觉任务的支持&#xff0c;可以控制模型算法和扫描超参数。 这些模型算法和超参数将作为参数空间传入以进行扫描。 关注TechLead&#xff0c;分享AI全维度知识。作者拥有10年互联网服务架构、AI产品研发经验、团队管理经验&#xff0c;同济…

MYSQL函数,一篇文章看完!

做程序员的谁会离得开数据库呢&#xff1f;今天就来分享一下我整理的MySQL的常用函数&#xff0c;基本上囊括了平时要用的函数&#xff0c;它们已经陪我走过了不少年头了&#xff0c;风里来雨里去&#xff0c;缝缝补补又几年&#xff0c;希望能帮到你们&#xff01; 如果数据库…

UltraEdit2024免费版文本编辑器

我们必须承认软件员使用的编辑器或代码编辑器是一款强大 IDE 的重要组成部分&#xff0c;它是任何 IDE 的核心基础。用户量向我们证明了UEStudio 基于著名的 UltraEdit 进行构建&#xff0c;同样&#xff0c;软件的主干非常成熟和稳定&#xff0c;并且已经被证实成为文本和软件…

Python 标准库 subprocess 模块详解

1. Subprocess模块介绍 1.1 基本功能 subprocess 模块&#xff0c;允许生成新的进程执行命令行指令&#xff0c;python程序&#xff0c;以及其它语言编写的应用程序, 如 java, c,rust 应用等。subprocess可连接多个进程的输入、输出、错误管道&#xff0c;并且获取它们的返回…

龙芯loongarch64服务器编译安装scipy

前言 根据我之前的文章介绍&#xff0c;龙芯loongarch64服务器中的很多python依赖包安装有问题&#xff0c;发现其中安装的"scikit-learn"就无法正常使用&#xff0c;所有这里在 pip3 install scikit-learn -U -i https://pypi.tuna.tsinghua.edu.cn/simple 的时候发…
最新文章