KafkaStream:Springboot中集成

1、在kafka-demo中创建配置类

        配置kafka参数

package com.heima.kafkademo.config;

import lombok.Data;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */

@Data
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}

2、在application.yml中配置上面配置类需要的参数

server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: ${spring.application.name}-test
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
  hosts: 192.168.200.130:9092
  group: ${spring.application.name}

3、新增配置类,创建KStream对象,进行聚合

package com.heima.kafkademo.stream;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;
import java.util.Arrays;

@Configuration
@Slf4j
public class KafkaStreamHelloListener {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {
                        return Arrays.asList(value.split(" "));
                    }
                })
                //根据value进行聚合分组
                .groupBy((key,value)->value)
                //聚合计算时间间隔
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //求单词的个数
                .count()
                .toStream()
                //处理后的结果转换为string字符串
                .map((key,value)->{
                    System.out.println("key:"+key+",value:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
        return stream;
    }
}

4、启动kafka-demo服务测试

        使用生产者发送消息可以看到控制台接收成功

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/72566.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

怎么做Tik Tok海外娱乐公会呢?新加坡市场怎么样?

一、为什么选择TikTok直播 1. 海外市场潜力巨大 • 自2016年始&#xff0c;多家直播平台陆续拓展至东南亚、中东、俄罗斯、日韩、欧美、拉美等地区。 • 海外市场作为直播发展新蓝海&#xff0c;2021年直播行业整申请cmxyci体规模达百亿美元&#xff0c;并维持高速增长。 &a…

【数据结构系列】链表

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kuan 的首页,持续学…

异常(下)Java常见异常,异常的使用原则

文章目录 前言一、Java常见异常 1.常见异常2.实例展示二、异常的使用原则总结 前言 该文介绍了Java的一些常见异常&#xff0c;并给出对应的例子进行解释。介绍异常的使用原则&#xff0c;即创建&#xff0c;抛出异常的编程规范。 一、Java常见异常 前要&#xff1a;Java API中…

sklearn机器学习库(一)sklearn中的决策树

sklearn机器学习库(一)sklearn中的决策树 sklearn中决策树的类都在”tree“这个模块之下。 tree.DecisionTreeClassifier分类树tree.DecisionTreeRegressor回归树tree.export_graphviz将生成的决策树导出为DOT格式&#xff0c;画图专用tree.export_text以文字形式输出树tree.…

Jmeter(六) - 从入门到精通 - 建立数据库测试计划(详解教程)

1.简介 在实际工作中&#xff0c;我们经常会听到数据库的性能和稳定性等等&#xff0c;这些有时候也需要测试工程师去评估和测试&#xff0c;因此这篇文章主要介绍了jmeter连接和创建数据库测试计划的过程,在文中通过示例和代码非常详细地介绍给大家&#xff0c;希望对各位小伙…

基于YOLOv8+PyQt5开发的行人过马路危险行为检测告警系统(附数据集和源码下载)

系列文章目录 文章目录 系列文章目录前言欢迎来到我的博客&#xff01;我很高兴能与大家分享关于基于YOLOv8的行人过马路危险行为检测告警系统的内容。 一、系统特点1. 采用最新最优秀的目标检测算法YOLOv82. 系统分别基于PyQt5开发了两种GUI图形界面&#xff0c;供大家学习使用…

consul安装启动流程

普通软件包安装 首先cd /opt &#xff0c;将安装包放到该目录下 下载consul安装包 进入consul官网找到自己开发平台对应的安装包下载 https://www.consul.io/downloads.html 或使用命令 wget https://releases.hashicorp.com/consul/1.6.2/consul_1.6.2_linux_amd64.zip (如果…

解决lldb调试时可能出现的personality set failed: Function not implemented

最近在尝试使用Visual Studio 2022远程连接Linux进行C/C的开发&#xff0c;由于CentOS风波不断&#xff0c;所以现在的开发基本上都是使用ubuntu了&#xff0c;但是目前VS2022有一些BUG&#xff0c;就是远程调试时&#xff0c;如果目标系统是ubuntu则会出现启动调试器很慢的问题…

js设置css变量控制页面一行展示指定个数的元素

前置知识&#xff1a; CSS变量之var()函数的应用——动态修改样式 & root的使用 flex相关知识 场景&#xff1a; 动态设置给父元素内子元素设置每行排列几个 通过 document.body.style.setProperty(--itemNum, 5)设置样式变量&#xff0c;然后通过给父元素设置display: f…

PyQt5的信号与槽函数

目录 一、介绍 二、一个信号连接一个槽 三、一个信号连接多个槽 四、多个信号连接一个槽 五、自定义信号 1、创建自定义信号 2、让自定义信号携带值 一、介绍 在下图中 &#xff08;1&#xff09;widget就是PyQt中的控件对象。其实就是组件&#xff08;2&#xff09;…

uniapp 用 hbuilderx下载 uview

uView2.0重磅发布&#xff0c;利剑出鞘&#xff0c;一统江湖 - DCloud 插件市场 1.uniapp官网下载资源 2按下载 3.官网安装文档 要按 这个红色圈错了 然后看他的配置步骤 第四easycom 就可以 不用配了

Linux MQTT智能家居(温度,湿度,环境监测,摄像头等界面布局设置)

文章目录 前言一、温度湿度曲线布局二、环境监测界面布局三、摄像头界面布局总结 前言 本篇文章来完成另外三个界面的布局设置。 这里会使用到 feiyangqingyun的一些控件库。 一、温度湿度曲线布局 TempHumtiy.h: #ifndef TEMPHUMTIY_H #define TEMPHUMTIY_H#include <…

Java-运算符和控制语句(上)(基于c语言的补充)

算术运算符 关于求余 不管分子&#xff0c;分母是正还是负&#xff0c;对于分母&#xff0c;直接取正&#xff1b;对于分子若有负号&#xff0c;则先提取出来&#xff1b;剩下两个正的分子分母运算&#xff1b;最后&#xff0c;若刚才的分子有负号&#xff0c;对最后的结果添加…

[C语言] 指针

1. 指针是什么 2. 指针和指针类型 3. 野指针 4. 指针运算 5. 指针和数组 6. 二级指针 7. 指针数组 目录 1. 指针是什么&#xff1f; 2. 指针和指针类型 2.1 指针-整数 2.2 指针的解引用 3. 野指针 3.1 野指针成因 3.2 如何规避野指针 4. 指针运算 4.1 指针…

第三课-界面介绍SD-Stable Diffusion 教程

前言 我们已经安装好了SD&#xff0c;这篇文章不介绍难以理解的原理&#xff0c;说使用。以后再介绍原理。 我的想法是&#xff0c;先学会画&#xff0c;然后明白原理&#xff0c;再去提高技术。 我失败过&#xff0c;知道三天打鱼两天晒网的痛苦&#xff0c;和很多人一样试了…

Windows - UWP - 网络不好的情况下安装(微软商店)MicrosoftStore的应用

Windows - UWP - 网络不好的情况下安装&#xff08;微软商店&#xff09;MicrosoftStore的应用 前言 UWP虽然几乎被微软抛弃了&#xff0c;但不得不否认UWP应用给用户带来的体验。沙箱的运行方式加上微软的审核&#xff0c;用户使用起来非常放心&#xff0c;并且完美契合Wind…

一百四十九、Kettle——Linux上安装的kettle8.2创建共享资源库时遇到的问题(持续更新中)

一、目的 在kettle8.2在Linux上安装好可以启动界面、并且可以连接MySQL、Hive、ClickHouse等数据库后开始创建共享资源库&#xff0c;但是遇到了一些问题 二、Linux系统以及kettle版本 &#xff08;一&#xff09;Linux&#xff1a;CentOS 7 英文的图形化界面模式 &#…

加载并绘制时间域内的心电图信号,并实施Q因子为1的陷波滤波器以去除50 Hz频率研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

国产数据库-内核特性-低基数全局字典

国产数据库-内核特性-StarRocks低基数全局字典 StarRocks2.0引入了低基数全局字典&#xff0c;可以通过全局字典将字符串的相关操作转换成整型相关操作&#xff0c;大大提升查询性能。 1、低基数字典 对于利用整型替代字符串进行处理&#xff0c;通常使用字典编码进行优化。Sta…

UML—浅谈常用九种图

目录 概述: 1.用例图 2.静态图 3.行为图&#xff1a; 4.交互图&#xff1a; 5.实现图&#xff1a; 概述: UML的视图是由九种视图组成的&#xff0c;分别是用例图、类图、对象图、状态图、活动图、序列图、协作图、构件图、实施图。我们可以根据这9种图的功能和实现的目的…
最新文章