kafka消息队列的初步探索

消息队列的作用就是提高运行速度,防止线程堵塞。

kafka的作用

    异步

 通过在消息队列发送消息的方式,将对应的业务作为监听者,此时我们只需要考虑发送消息的时间即可,大大提高了运行的速度。

    解耦

 

如果使用原来的直接调用对应业务的方式,在被调用业务发生修改是,调用业务也需要修改代码,存在很大的耦合,所以使用消息队列的方式,后续我们只需要关注消息的发送,无需关注业务的内部实现,大大的降低了耦合性。 

     削峰

在一些业务场景小(如:限时秒杀),此时在同一个时间内会有大量的请求发向服务器,这就会导致服务器瘫痪,所以这里引入的消息队列的方式,这些请求会一一的给消息队列发送消息,服务器通过一次处理对应个数的消息来处理对应的请求,最终实现削峰,防止服务器瘫痪。

     缓冲

 和削峰类似就是通过消息队列的形式处理请求,防止服务器瘫痪。

消息模式 

1.消息点对点模式

 一对一的形式,消费者每次从消息队列中接收一个消息,在确定接收后,消息队列就会将刚刚被接收的消息从消息队列中删除。

2.消息发布订阅模式

 在消息队列中存储的消息会被分为不同的主题里(其实就是将这些消息进行分组), 消费者就是去订阅对应的topic,消费者也可以组成对应的消费者组,此时消费者就从对应的topic中获取对应的消息,在其中存在偏移量这个数据(offset),通过该偏移量获取对应的位置的消息。重点来了,在该消息队列中的消息在被使用后是不会被移除的

kafka工作原理 

 在未来的项目中,我们大多都是已微服务的形式进行开发,此时消息队列中同个topic中的消息可能会存在于不用的服务器上,这就是进行分区。为了防止其中某太服务器发生宕机后影响项目的运行,我们可以在对应分区中存储其他分区中的消息,实现备份在宕机时不影响项目的运行,此过程就是创建副本

 springboot整合kafka

导入kafka整合springboot依赖

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency><dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

 发布消息

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootTest
class KafkatestApplicationTests {

    @Autowired
    KafkaTemplate kafkaTemplate;

    @Test
    void test1() {
//设置默认的主题
        kafkaTemplate.setDefaultTopic("tiktop");
//在对应主题中添加消息,此消息以键值对的形式
        kafkaTemplate.send("tiktop", "抖音消息", "你好,秃狼");

    }

}

 测试结果为下:

特殊情况(无法识别到主机)

 解决方法(在hosts中设置主机地址)

 通过火绒修改hosts。

 设置消息的value为实体类类型

我们通过application.properties进行设置。

#设置消息值的类型,这里设置为json类型,这样我们就可以在消息中传入实体类
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

默认的value的序列化类型是: 

可以设置的序列化类型为下: 

 测试结果为下:

创建消费者 

 创建消费者

group表示该消费者的分组。

topicPartitions是监听的所有的topic和分区,@TopicPartition就是设置对应topic区的topic名字,和对应偏移量和分区(在监听中可能会同时监听多个topic)。

partitionOffsets就是设置所有分区和偏移量(在监听中可能同时监听多个分区中,在该分区中会有不同的偏移量)
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

@Component
public class Listener {
    /**
     * group表示该消费者的分组
     * topicPartitions是监听的所有的topic和分区,@TopicPartition就是设置对应topic区的topic名字,和对应偏移量和分区(在监听中可能会同时监听多个topic)
     * partitionOffsets就是设置所有分区和偏移量(在监听中可能同时监听多个分区中,在该分区中会有不同的偏移量)
    */
    @KafkaListener(groupId = "toktop-server", topicPartitions = {
            @TopicPartition(topic = "tiktop", partitionOffsets = {
                    @PartitionOffset(partition = "0", initialOffset = "0")
            })
    })
    public void listen(ConsumerRecord consumerRecord) {
        //ConsumerRecord就是整个消费者的信息
        Object key = consumerRecord.key();
        System.out.println("key=" + key);
        Object value = consumerRecord.value();
        System.out.println("value=" + value);
    }
}

在启动类上添加kafka的注解驱动,这样@KafkaListener才会被识别。(@Enablekafka)

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

@SpringBootApplication
//开启kafka的注解驱动
@EnableKafka
public class KafkatestApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkatestApplication.class, args);
    }

}

进行启动测试,测试结果为下

 在启动后消费者会直接监听消息队列,测试我们将偏移量设置为0,也就是从头部开始,此时消费者监听到消息队列中的两个消息,最终将通过的信息输出。(注意使用的模式是:发布和订阅模式,所以接收到消息后不会将消息删除,而是改变偏移量

kafka自动配置

kafka 自动配置在KafkaAutoConfiguration

  1. 容器中放了 KafkaTemplate 可以进行消息收发
  2. 容器中放了KafkaAdmin 可以进行 Kafka 的管理,比如创建 topic 等
  3. kafka 的配置在KafkaProperties中
  4. @EnableKafka可以开启基于注解的模式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/31289.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Spring高手之路6——Bean生命周期的扩展点:BeanPostProcessor

文章目录 1. 探索Spring的后置处理器&#xff08;BeanPostProcessor&#xff09;1.1 BeanPostProcessor的设计理念1.2 BeanPostProcessor的文档说明 2. BeanPostProcessor的使用2.1 BeanPostProcessor的基础使用示例2.2 利用BeanPostProcessor修改Bean的初始化结果的返回值2.3 …

Nacos配置中心交互模型是push还是pull?

对于Nacos大家应该都不太陌生&#xff0c;出身阿里名声在外&#xff0c;能做动态服务发现、配置管理&#xff0c;非常好用的一个工具。然而这样的技术用的人越多面试被问的概率也就越大&#xff0c;如果只停留在使用层面&#xff0c;那面试可能要吃大亏。 比如我们今天要讨论的…

leetcode216. 组合总和 III(回溯算法-java)

组合总和 III leetcode216. 组合总和 III题目描述解题思路代码演示 回溯算法专题 leetcode216. 组合总和 III 来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 链接&#xff1a;https://leetcode.cn/problems/combination-sum-iii 题目描述 找出所有相加之和为 n 的 k 个…

ldsc python程序安装以及测试

教程参考&#xff1a; https://zhuanlan.zhihu.com/p/379628546https://github.com/bulik/ldsc 1. 软件安装 1.1 windows安装教程 首先配置&#xff1a; anaconda&#xff0c;为了需要conda环境git&#xff0c;为了下载github中的ldsc程序 打开windows电脑中的promote&am…

阿里云服务器价格如何?与其他云服务提供商的价格对比如何?

阿里云服务器价格如何&#xff1f;与其他云服务提供商的价格对比如何&#xff1f;   阿里云服务器价格概述   作为全球领先的云计算服务提供商&#xff0c;阿里云在确保服务器性能和安全性的同时&#xff0c;也非常注重产品的价格竞争力。阿里云服务器&#xff08;ECS&…

基于STM32 ARM+FPGA的电能质量分析仪方案(一)硬件设计

本章主要给出了本系统的设计目标和硬件设计方案&#xff0c;后面详细介绍了硬件电路的设计 过程&#xff0c;包括数据采集板、 FPGAARM 控制板。 3.1系统设计目标 本系统的主要目的是实现电能质量指标的高精度测量和数据分析&#xff0c;其具体技术指标如 下所示&#xff1…

微服务中常见问题

Spring Cloud 组件 Spring Cloud五大组件有哪些&#xff1f; Eureka&#xff1a;注册中心 Ribbon&#xff1a;负载均衡 Feign&#xff1a;远程调用 Hystrix&#xff1a;服务熔断 Zuul/Gateway&#xff1a;服务网关 随着SpringCloud Alibaba在国内兴起&#xff0c;我们项目中…

C语言/C++ 之 打飞机游戏

【项目简介】 1、设计思想&#xff1a;本项目主要是为了实现打飞机游戏&#xff0c;主要包括5个函数模块&#xff0c;和1个主函数框架。分别是chu_shi_hua();、you_cao_zuo&#xff1b;、wu_cao_zuo();、show()&#xff1b;、main();等。项目完成过程中主要运用了C/C中的输入输…

网络爬虫是什么

网络爬虫又称网络蜘蛛、网络机器人&#xff0c;它是一种按照一定的规则自动浏览、检索网页信息的程序或者脚本。网络爬虫能够自动请求网页&#xff0c;并将所需要的数据抓取下来。通过对抓取的数据进行处理&#xff0c;从而提取出有价值的信息。 认识爬虫 我们所熟悉的一系列…

3 python进阶篇

文章目录 面向对象类属性和类方法类属性类方法静态方法 单例模式__new__ 方法类实现单例模式 异常 、模块和包异常自定义异常 模块和包模块的搜索顺序包的init文件发布模块&#xff08;了解&#xff09; 文件seek文件/目录的常用管理操作eval函数 补充性知识位运算小技巧 参考我…

Python入门教程:掌握for循环、while循环、字符串操作、文件读写与异常处理等基础知识

文章目录 for循环while循环字符串操作访问字符串中的字符切片总结字符串拼接 文件读写try...except 异常处理函数模块和包类和面向对象编程完结 for循环 在 Python 中&#xff0c;for 循环用于遍历序列&#xff08;list、tuple、range 对象等&#xff09;或其他可迭代对象。for…

Java中反射机制,枚举,Lambda的使用

目录 一、反射机制 1、含义 2、作用 3、※反射相关的几个类 3.1、Class类&#xff08;Class对象是反射的基石&#xff09; 3.2、Class类中相关的方法 3.2.1 (※重要)常用获得类相关的方法 3.2.2 (※重要)常用获得类中属性、变量Field相关的方法 3.2.3 获得类中注解相…

N-Gram语言模型工具kenlm的详细安装教程

【本配置过程基于Linux系统】 下载源代码&#xff1a; wget -O - https://kheafield.com/code/kenlm.tar.gz |tar xz 编译&#xff1a; makdir kenlm/build cd kenlm/build cmake .. && make -j4 发现报错&#xff1a; 系统中没有cmake&#xff0c;按照错误提示&am…

ChatGPT 指南:角色扮演让回答问题更专业

让 ChatGPT 进行角色扮演 Act as ...&#xff0c;比如&#xff0c;律师、内科医生、心理医生、运动教练、哲学家、翻译、平面设计师、IT 工程师等等&#xff0c;从而才能让 ChatGPT 从这个角色角度来分析我们的问题&#xff0c;不然&#xff0c;它的回答可能会过于广泛。 下面以…

实在智能RPA亮相2023全球人工智能技术博览会,“能对话的数字员工”引领智能自动化新篇章

随着ChatGPT火爆全网&#xff0c;人工智能再次成为学术界和科技领域“新宠”&#xff0c;一场“智能革命”的序幕悄然掀开。 6月13日&#xff0c;“智能驱动 砥砺前行”为主题的2023全球人工智能技术博览会在杭州未来科技城学术交流中心圆满落下帷幕。此次博览会以展示智能科技…

51单片机 - 期末复习重要图

AT89S51片内硬件结构 1.内部硬件结构图 2.内部部件简单介绍 3. 26个特殊功能寄存器分类 按照定时器、串口、通用I/O口和CPU 中断相关寄存器&#xff1a;3IE - 中断使能寄存器IP - 中断优先级寄存器 定时器相关寄存器6TCON - 定时器/计数器控制寄存器TMOD - 定时器/计数器模…

【Leetcode60天带刷】day07哈希表——454.四数相加II , 383. 赎金信 ,15. 三数之和 , 18. 四数之和

题目&#xff1a;454.四数相加II 454. 四数相加 II 给你四个整数数组 nums1、nums2、nums3 和 nums4 &#xff0c;数组长度都是 n &#xff0c;请你计算有多少个元组 (i, j, k, l) 能满足&#xff1a; 0 < i, j, k, l < nnums1[i] nums2[j] nums3[k] nums4[l] 0 …

神经网络:参数更新

在计算机视觉中&#xff0c;参数更新是指通过使用梯度信息来调整神经网络模型中的参数&#xff0c;从而逐步优化模型的性能。参数更新的作用、原理和意义如下&#xff1a; 1. 作用&#xff1a; 改进模型性能&#xff1a;参数更新可以使模型更好地适应训练数据&#xff0c;提高…

I/O多路复用+高性能网络模式

前言&#xff1a; 本篇文章将介绍客户端-服务端之间从最简单的Socket模型到I/O多路复用的模式演变过程&#xff0c;并介绍Reactor和Proactor两种高性能网络模式 文章内容摘自&#xff1a;小林Coding I/O多路复用高性能网络模式 . 传统Socket模型传统Socket模型的性能瓶颈多进程…

SpringCloud Alibaba入门5之使用OpenFegin调用服务

我们继续在上一章的基础上进行开发 SpringCloud Alibaba入门4之nacos注册中心管理_qinxun2008081的博客-CSDN博客 Feign是一种声明式、模板化的HTTP客户端。使用Feign&#xff0c;可以做到声明式调用。Feign是在RestTemplate和Ribbon的基础上进一步封装&#xff0c;使用RestT…
最新文章