JavaWeb_LeadNews_Day6-Kafka

JavaWeb_LeadNews_Day6-Kafka

  • Kafka
    • 概述
    • 安装配置
    • kafka入门
    • kafka高可用方案
    • kafka详解
      • 生产者同步异步发送消息
      • 生产者参数配置
      • 消费者同步异步提交偏移量
    • SpringBoot集成kafka
  • 自媒体文章上下架
    • 实现思路
    • 具体实现
  • 来源
  • Gitee

Kafka

概述

  • 对比
  • 选择
  • 介绍
    • producer: 发布消息的对象称之为主题生产者 (Kafka topic producer)
    • topic: Kafka将消息分门别类,每一类的消息称之为一个主题 (Topic)
    • consumer:订阅消息并处理发布的消息的对象称之为主题消费者 (consumers)
    • broker:已发布的消息保存在一组服务器中,称之为Kafka集群,集群中的每一个服务器都是一个代理(Broker)。消费者可以订阅个或多个主题 (topic),并从Broker拉数据,从而消费这些已发布的消息

安装配置

  • 安装zookeeper
    // 下载zookeeper镜像
    docker pull zookeeper:3.4.14
    // 创建容器
    docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
    
  • 安装kafka
    // 下载kafka镜像
    docker pull wurstmeister/kafka:2.12-2.3.1
    // 创建容器
    docker run -d --name kafka \
    --env KAFKA_ADVERTISED_HOST_NAME=192.168.174.133 \
    --env KAFKA_ZOOKEEPER_CONNECT=192.168.174.133:2181 \
    --env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.174.133:9092 \
    --env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
    --env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
    --net=host wurstmeister/kafka:2.12-2.3.1
    
    // 解释
    --net=host,直接使用容器宿主机的网络命名空间,即没有独立的网络环境。它使用宿主机的ip和端口(云主机会不好使)
    

kafka入门

  • 依赖
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
    </dependency>
    
  • Producer
    public class ProducerQuickStart {
        public static void main(String[] args) {
            // 1. kafka链接配置信息
            Properties prop = new Properties();
            // 1.1 kafka链接地址
            prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");
            // 1.2 key和value的序列化
            prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            // 2. 创建kafka生产者对象
            KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
            // 3. 发送信息
            // 参数列表: topic, key, value
            ProducerRecord<String, String> record = new ProducerRecord<>("topic-first", "key1", "Hello Kafka!");
            producer.send(record);
            // 4. 关闭消息通道
            // 必须关闭, 否则消息发送bucg
            producer.close();
        }
    }
    
  • Consumer
    public class ConsumerQuickStart {
        public static void main(String[] args) {
            // 1. kafka的配置信息
            Properties prop = new Properties();
            // 1.1 链接地址
            prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");
            // 1.2 key和value的反序列化器
            prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            // 1.3 设置消费者组
            prop.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
            // 2. 创建消费者对象
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
            // 3. 订阅主题
            consumer.subscribe(Collections.singleton("topic-first"));
            // 4. 拉取信息
            while(true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.key());
                    System.out.println(record.value());
                }
            }
        }
    }
    
  • 总结
    • 同一组只有一个消费者能够接收到消息, 如果需要所有消费者都能接收到消息, 需要消费者在不同的组

kafka高可用方案

  • 集群

  • 备份

    kafka定义了两类副本:

    • 领导者副本
    • 追随者副本

    数据在领导者副本存储后, 会同步到追随者副本

    同步方式
    leader失效后, 选择leader的原则

    1. 优先从ISR中选取, 因为ISR的数据和leader是同步的.
    2. ISR中的follower都不行了, 就从其他的follower中选取.
    3. 当所有的follower都失效了, 第一种是等待ISR中的follower活过来, 数据可靠, 但等待时间不确定, 第二种是等待任意follower活过来, 最快速度恢复可用性, 但数据不一定完整.

kafka详解

生产者同步异步发送消息

// 同步发送
RecordMetadata metadata = producer.send(record).get();
System.out.println(metadata.offset());

// 异步发送
producer.send(record, new Callback(){
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if(e != null) {
            System.out.println("记录异常信息到日志表中");
        }
        System.out.println(recordMetadata.offset());
    }
});

生产者参数配置

  • 消息确认
    确认机制说明
    acks=0生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快
    acks=1(默认值)只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
    acks=all只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应
    prop.put(ProducerConfig.ACKS_CONFIG, "all");
    
  • 消息重传
    设置消息重传次数, 默认每次重试之间等待100ms
    prop.put(ProducerConfig.RETRIES_CONFIG, 10);
    
  • 消息压缩
    默认情况, 消息发送不会压缩
    使用压缩可以降低网络传输开销和存储开销, 而这往往是向kafka发送消息的瓶颈所在
    压缩算法说明
    snappy占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果看重性能和网络带宽,建议采用
    lz4占用较少的 CPU,压缩和解压缩速度较快,压缩比也很客观
    gzip占用较多的CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法
    prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    

消费者同步异步提交偏移量

// 同步提交偏移量
consumer.commitSync();

// 异步提交偏移量
consumer.commitAsync(new OffsetCommitCallback(){
    @Override
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
        if(e!=null){
            System.out.println("记录错误的提交偏移量"+map+", 异常信息为"+e);
        }
    }
});

// 同步异步提交
try {
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.key());
            System.out.println(record.value());
            System.out.println(record.partition());
            System.out.println(record.offset());
        }
        // 异步提交偏移量
        consumer.commitAsync();
    }
} catch (Exception e) {
    e.printStackTrace();
    System.out.println("记录错误的信息:"+e);
}finally {
    // 同步
    consumer.commitSync();
}

SpringBoot集成kafka

  • 依赖
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
    
  • 配置
    server:
      port: 9991
    spring:
      application:
        name: kafka-demo
      kafka:
        bootstrap-servers: 192.168.174.133:9092
        producer:
          retries: 10
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: ${spring.application.name}-test
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
  • Producer
    @RestController
    public class HelloController {
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @GetMapping("/hello")
        public String hello()
        {
            kafkaTemplate.send("itcast-topic", "黑马程序员");
            return "ok";
        }
    }
    
  • Consumer
    @Component
    public class HelloListener {
    
        @KafkaListener(topics = "itcast-topic")
        public void onMessage(String message)
        {
            if(!StringUtils.isEmpty(message)){
                System.out.println(message);
            }
        }
    }
    
  • 传递对象
    // Producer
    User user = new User();
    user.setName("tom");
    user.setAge(18);
    kafkaTemplate.send("itcast-topic", JSON.toJSONString(user));
    
    // Consumer
    System.out.println(JSON.parseObject(message, User.class));
    

自媒体文章上下架

实现思路

具体实现

  • Producer
    public ResponseResult downOrUp(WmNewsDto dto) {
        // 1. 检验参数
        // 1.0 检查文章dto是否为空
        if(dto == null){
            return ResponseResult.errorRe(AppHttpCodeEnum.PARAM_INVALID, "不可缺少");
        }
        // 1.1 检查文章上架参数是否合法
        if(dto.getEnable() != 0 && dto.getEnabl!= 1){
            // 默认上架
            dto.setEnable((short) 1);
        }
        // 2. 查询文章
        WmNews news = getById(dto.getId());
        if(news == null){
            return ResponseResult.errorRe(AppHttpCodeEnum.DATA_NOT_EXIST, 存在");
        }
        // 3. 查询文章状态
        if(news.getStatus() != WmNews.StaPUBLISHED.getCode()){
            return ResponseResult.errorRe(AppHttpCodeEnum.PARAM_INVALID, 章不是发布状态, 不能上下架");
        }
        // 4. 上下架
        news.setEnable(dto.getEnable());
        updateById(new
        // 5. 发送消息, 通知article修改文章的配置
        if(news.getArticleId() != null){
            HashMap<String, Object> map = HashMap<>();
            map.put("articleId", news.getArtic());
            map.put("enable", news.getEnable());
            kafkaTemplate.(WmNewsMessageConstaWM_NEWS_UP_OR_DOWN_TOPIC, JtoJSONString(map));
    
        return ResponseResult.okRe(AppHttpCodeEnum.SUCCESS);
    }
    
  • Consumer
// Listener
@KafkaListener(topics = WmNewsMessageConstants.WM_NEWS_UP_OR_DOWN_TOPIC)
public void onMessage(String message)
{
    if(StringUtils.isNotBlank(message)){
        Map map = JSON.parseObject(message, Map.class);
        apArticleConfigService.updateByMap(map);
    }
}

// Service
public void updateByMap(Map map) {
    // 0 下架, 1 上架
    Object enable = map.get("enable");
    boolean isDown = true;
    if(enable.equals(1)){
        isDown = false;
    }
    // 修改文章
    update(Wrappers.<ApArticleConfig>lambdaUpdate().eq(ApArticleConfig::getArticleId, map.get("articleId")).
            set(ApArticleConfig::getIsDown, isDown));
}

来源

黑马程序员. 黑马头条

Gitee

https://gitee.com/yu-ba-ba-ba/leadnews

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/82219.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

开源远程控制硬件 BliKVM v4测试 1000公里外远程重装系统

测试准备 测试时间&#xff1a;20230818 测试硬件&#xff1a;BliKVM v4 文档 BliKVM v4是一款生产就绪、即插即用的 KVM-over-IP 设备&#xff0c;为专业用户提供了远程服务器或工作站管理的便捷解决方案。 它基于Linux并且完全开源。 借助 BliKVM&#xff0c;您可以轻松打…

19-普通组件的注册使用

普通组件的注册使用-局部注册 一. 组件注册的两种方式:1.局部注册:只能在注册的组件内使用 (1) 创建 vue 文件(单文件组件) (2) 在使用的组件内导入,并注册 components:{ 组件名: 组件对象 } // 导入需要注册的组件 import 组件对象 from.vue文件路径 import HmHeader from ./…

深入学习前端开发,掌握HTML、CSS、JavaScript等技术

课程链接&#xff1a; 链接: https://pan.baidu.com/s/1WECwJ4T8UQfs2FyjUMbxig?pwdi654 提取码: i654 复制这段内容后打开百度网盘手机App&#xff0c;操作更方便哦 --来自百度网盘超级会员v4的分享 课程介绍&#xff1a; 第1周&#xff1a;HTML5基础语法与标签 &#x1f…

基于Java+SpringBoot+vue前后端分离在线BLOG网站系统设计实现

基于JavaSpringBootvue前后端分离在线BLOG网站系统设计实现&#xff08;程序源码毕业论文&#xff09; 大家好&#xff0c;今天给大家介绍基于JavaSpringBootvue前后端分离在线BLOG网站系统设计与实现&#xff0c;本论文只截取部分文章重点&#xff0c;文章末尾附有本毕业设计完…

基于Python的高校学生成绩分析系统

随着计算机技术发展&#xff0c;计算机系统的应用已延伸到社会的各个领域&#xff0c;大量基于网络的广泛应用给生活带来了十分的便利。所以把高校成绩分析与现在网络相结合&#xff0c;利用计算机搭建高校成绩分析系统&#xff0c;实现高校成绩分析的信息化。则对于进一步提高…

Idea中隐藏指定文件或指定类型文件

Setting ->Editor ->Code Style->File Types → Ignored Files and Folders输入要隐藏的文件名&#xff0c;支持*号通配符回车确认添加

通过爬虫抓取上市企业利润表并在睿思BI中展示

睿思BI从v5.3开始支持网络爬虫&#xff0c;可以从指定URL抓取表格数据&#xff0c;本示例实现从网络上抓取上市企业招商银行的利润表数据&#xff0c;并在睿思BI中进行展现。 功能演示URL&#xff1a;https://www.ruisitech.com/rsbi-ultimate/#/dashboard/ShareView?token31…

Maven介绍_下载_安装_使用_原理

文章目录 1 Maven介绍1.1 Maven是介绍1.2 Maven的作用 2 Maven下载与安装2.1 官网下载2.2 文件目录2.3 环境配置 3 Maven基础概念3.1 仓库分类3.2 依赖坐标3.3 坐标组成 4 Maven配置4.1 本地仓库配置4.2 远程仓库的设置4.3 镜像仓库配置4.4 IDEA配置Maven 5 Maven项目创建5.1 M…

Dockers搭建个人网盘、私有仓库,Dockerfile制作Nginx、Lamp镜像

目录 1、使用mysql:5.6和 owncloud 镜像&#xff0c;构建一个个人网盘。 &#xff08;1&#xff09;下载mysql:5.6和owncloud镜像 &#xff08;2&#xff09;创建启动mysql:5.6和owncloud容器 &#xff08;3&#xff09;在浏览器中输入网盘服务器的IP地址&#xff0c;进行账…

CentOS6.8图形界面安装Oracle11.2.0.1.0

Oracle11下载地址 https://edelivery.oracle.com/osdc/faces/SoftwareDelivery 一、环境 CentOS release 6.8 (Final)&#xff0c;测试环境&#xff1a;内存2G&#xff0c;硬盘20G&#xff0c;SWAP空间4G Oracle版本&#xff1a;Release 11.2.0.1.0 安装包&#xff1a;V175…

kafka--kafka的基本概念-副本概念replica

三、kafka的基本概念-副本概念replica Broker 表示实际的物理机器节点 Broker1中的绿色P1表示主分片Broker2中的蓝色P1表示副本分片&#xff0c;其余类似&#xff0c;就是主从的概念&#xff0c;如果一个Broker挂掉了&#xff0c;还有其它的节点来保证数据的完整性 P可以看做分…

小程序体验版不存在 无法体验

1、权限问题&#xff1a; 1、开发者有所有权限。 2、小程序访问路径也是正确的。 该有的权限都有了。 2、解决办法&#xff1a; 打开微信公众平台&#xff0c;左侧菜单【设置】- 【第三方设置】&#xff0c;取消授权即可。

VS2022远程Linux使用cmake开发c++工程配置方法

文章目录 远程连接CMakePresets.json的配置Task.vs.json配置launch.vs.json配置最近使用别人在VS2015上使用visualgdb搭建的linux开发环境,各种不顺手,一会代码不能调转了,一会行号没了,调试的时候断不到正确的位置,取消的断点仍然会进。因此重新摸索了一套使用vs的远程开…

mysql中的窗口函数

MySQL中的窗口函数&#xff08;Window Functions&#xff09;是一种用于在查询结果集内执行计算的功能。窗口函数可以在查询中进行分析和聚合操作&#xff0c;而无需将查询结果分组。它们可以用于计算排名、行号、累积值等各种分析操作。窗口函数通常与OVER子句一起使用&#x…

去除UI切图边缘上多余的线条

最近接到UI切图&#xff0c;放进项目&#xff0c;显示边缘有多余线条&#xff0c;影响UI美观。开始以为切图没切好&#xff0c;实则不是。如图&#xff1a; ->解决&#xff1a; 将该图片资源WrapMode改为Clamp

LRU 算法

LRU 缓存淘汰算法就是一种常用策略。LRU 的全称是 Least Recently Used&#xff0c;也就是说我们认为最近使用过的数据应该是是「有用的」&#xff0c;很久都没用过的数据应该是无用的&#xff0c;内存满了就优先删那些很久没用过的数据。 力扣&#xff08;LeetCode&#xff09…

使用 PyTorch 进行高效图像分割:第 4 部分

一、说明 在这个由 4 部分组成的系列中&#xff0c;我们将使用 PyTorch 中的深度学习技术从头开始逐步实现图像分割。本部分将重点介绍如何实现基于视觉转换器的图像分割模型。 图 1&#xff1a;使用视觉转换器模型架构运行图像分割的结果。 从上到下&#xff0c;输入图像、地面…

【机器学习】处理不平衡的数据集

一、介绍 假设您在一家给定的公司工作&#xff0c;并要求您创建一个模型&#xff0c;该模型根据您可以使用的各种测量来预测产品是否有缺陷。您决定使用自己喜欢的分类器&#xff0c;根据数据对其进行训练&#xff0c;瞧&#xff1a;您将获得96.2%的准确率&#xff01; …

css学习3(三种样式表与样式控制优先级)

1、外部样式表&#xff1a;当样式需要应用于很多页面时&#xff0c;外部样式表将是理想的选择。在使用外部样式表的情况下&#xff0c;你可以通过改变一个文件来改变整个站点的外观。每个页面使用 <link> 标签链接到样式表&#xff0c;也要放到<head>中。 2、外部…

LeetCode 周赛上分之旅 #40 结合特征压缩的数位 DP 问题

⭐️ 本文已收录到 AndroidFamily&#xff0c;技术和职场问题&#xff0c;请关注公众号 [彭旭锐] 和 BaguTree Pro 知识星球提问。 学习数据结构与算法的关键在于掌握问题背后的算法思维框架&#xff0c;你的思考越抽象&#xff0c;它能覆盖的问题域就越广&#xff0c;理解难度…
最新文章