大数据技术之Kafka集成

一、集成Flume

 1.1 Flume生产者

(1)启动Kafka集群

zkServer.sh start

nohup kafka-server-start.sh /opt/soft/kafka212/config/server.properties &

(2)启动Kafka消费者

kafka-console-consumer.sh --bootstrap-server 192.168.153.139:9092 --topic first

(3)配置flume

配置flume

(4)启动flume

mkdir /opt/soft/kafka212/conf/jobs
vim  /opt/soft/kafka212/conf/jobs/file_to_kafka.conf

# 1. 定义组件
a1.sources = r1
a1.sink2 = k1
a1.channels = c1

# 2. 配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json

# 3. 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 4. 配置sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers
hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# 5. 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(5)启动flume

cd /opt/soft/flume190/
./bin/flume-ng agent --name a1 --conf ./conf --conf-file ./conf ./jobs/file_to_kafka.conf

(6)追加数据,查看Kafka消费者消费情况 

1.2 Flume消费者

(1)配置flume

mkdir /opt/soft/kafka212/conf/jobs
vim  /opt/soft/kafka212/conf/jobs/kafka_to_file.conf

# 1. 定义组件
a1.sources = r1
a1.sink2 = k1
a1.channels = c1

# 2. 配置source
a1.sources.r1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id

# 3. 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 4. 配置sink
a1.sinks.k1.type = logger

# 5. 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(2)启动flume

cd /opt/soft/flume190/
./bin/flume-ng agent --name a1 --conf ./conf --conf-file ./conf ./jobs/kafka_to_file.conf

(3)启动Kafka生产者

bin/kafka-console-producer.sh --bootstrap-server hadoop02:9092 --topic first

(4)输入数据并监控 

二、集成SpringBoot

 

1)在IDEA中安装lombok插件

2)SpringBoot环境准备

(1)创建一个Spring Initializr

(2)添加项目依赖

 (3)检查自动生成的配置文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.1</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.atguigu</groupId>
    <artifactId>springboot</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

3.1 SpringBoot生产者

(1)修改SpringBoot核心配置文件application.properties,添加生产者相关信息

# 应用名称
    spring.application.name=atguigu_springboot_kafka

# 指定 kafka 的地址
    spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

#指定 key 和 value 的序列化器
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

(2)创建controller从浏览器接收数据,并写入指定的topic

package com.atguigu.springboot;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {
 
    // Kafka 模板用来向 kafka 发送数据
    @Autowired
    KafkaTemplate<String, String> kafka;
 
    @RequestMapping("/atguigu")
    public String data(String msg) {
        kafka.send("first", msg);
        return "ok";
    }
}

(3)在浏览器中给/atguigu接口发送数据

http://localhost:8080/atguigu?msg=hello

3.2 SpringBoot消费者

(1)修改SpringBoot核心配置文件application.properties

# =========消费者配置开始=========
# 指定 kafka 的地址
spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

# 指定 key 和 value 的反序列化器
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#指定消费者组的 group_id
spring.kafka.consumer.group-id=atguigu
# =========消费者配置结束=========

(2)创建类消费Kafka中指定topic的数据

package com.atguigu.springboot;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

@Configuration
public class KafkaConsumer {
 
    // 指定要监听的 topic
    @KafkaListener(topics = "first")
    public void consumeTopic(String msg) { // 参数: 收到的 value
        System.out.println("收到的信息: " + msg);
    }
}

(3)向first主题发送数据

 

[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap=server hadoop102:9092 --topic first

三、集成Spark

1)scala环境准备

2)spark环境准备

(1)创建一个maven项目spark-kafka

(2)在项目spark-kafka上添加框架支持Add Framework Support,选择sacla

(3)在main下创建scala文件夹,添加为源码包

(4)添加配置文件

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>

(5)将log4j.properties文件添加到resources中,更改打印日志级别为error

log4j.rootLogger=error, stdout,R
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%5L) : %m%n

log4j.appender.R=org.apache.log4j.RollingFileAppender
log4j.appender.R.File=../log/agent.log
log4j.appender.R.MaxFileSize=1024KB
log4j.appender.R.MaxBackupIndex=1

log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %5p --- [%50t] %-80c(line:%6L) : %m%n

3.1 spark生产者

(1)在当前包下创建scala object:SparkKafkaProducer

package com.atguigu.spark

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

object SparkKafkaProducer {
    def main(args: Array[String]): Unit = {

        // 0 kafka 配置信息
        val properties = new Properties()

        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092")
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])

        // 1 创建 kafka 生产者
        var producer = new KafkaProducer[String, String](properties)

        // 2 发送数据
        for (i <- 1 to 5){
            producer.send(new ProducerRecord[String,String]("first","atguigu" + i))
        }

        // 3 关闭资源
        producer.close()
    }
}

(2)启动Kafka消费者

[atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

(3)执行SparkKafkaProducer程序,观察Kafka消费者控制台情况

3.2 spark消费者

(1)添加配置文件

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>

(2)在当前包下创建scala object:SparkKafkaConsumer

package com.atguigu.spark

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object SparkKafkaConsumer {
    def main(args: Array[String]): Unit = {

        //1.创建 SparkConf
        val sparkConf: SparkConf = new SparkConf().setAppName("sparkstreaming").setMaster("local[*]")

        //2.创建 StreamingContext
        val ssc = new StreamingContext(sparkConf, Seconds(3))

        //3.定义 Kafka 参数:kafka 集群地址、消费者组名称、key 序列化、value 序列化
        val kafkaPara: Map[String, Object] = Map[String, Object](
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
            ConsumerConfig.GROUP_ID_CONFIG -> "atguiguGroup",
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
        )

        //4.读取 Kafka 数据创建 DStream
        val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
            ssc,
            LocationStrategies.PreferConsistent, //优先位置
            ConsumerStrategies.Subscribe[String, String](Set("first"), kafkaPara)// 消费策略:(订阅多个主题,配置参数)
        )

        //5.将每条消息的 KV 取出
        val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())

         //6.计算 WordCount
        valueDStream.print()

        //7.开启任务
        ssc.start()
        ssc.awaitTermination()
    }
}

(3)启动SparkKafkaConsumer消费者

(4)启动Kafka生产者

[atguigu@hadoop103 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first

(5)观察idea控制台数据打印

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/14782.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

动态内存管理

文章目录 1.动态内存函数1.1free1.2malloc1.3calloc1.4realloc 2.动态内存错误2.1解引用空指针--非法访问内存2.2越界访问动态空间2.3free释放非动态空间2.4free释放部分动态空间2.5free多次释放动态空间2.6未释放动态内存 3.动态内存题目3.1形参不影响实参3.2地址返回&#xf…

APP渗透—查脱壳、反编译、重打包签名

APP渗透—查脱壳、反编译、重打包签名 1. 前言1.1. 其它 2. 安装工具2.1. 下载jadx工具2.1.1. 下载链接2.1.2. 执行文件 2.2. 下载apktool工具2.2.1. 下载链接2.2.2. 测试 2.3. 下载dex2jar工具2.3.1. 下载链接 3. 查壳脱壳3.1. 查壳3.1.1. 探探查壳3.1.2. 棋牌查壳 3.2. 脱壳3…

FVM初启,Filecoin生态爆发着力点在哪?

Filecoin 小高潮 2023年初&#xff0c;Filecoin发文分享了今年的三项重大变更&#xff0c;分别是FVM、数据计算和检索市场的更新&#xff0c;这些更新消息在发布后迅速吸引了市场的广泛关注。 特别是在3月14日&#xff0c;Filecoin正式推出了FVM&#xff0c;这一变革使得Filec…

多维时序 | MATLAB实现BO-CNN-GRU贝叶斯优化卷积门控循环单元多变量时间序列预测

多维时序 | MATLAB实现BO-CNN-GRU贝叶斯优化卷积门控循环单元多变量时间序列预测 目录 多维时序 | MATLAB实现BO-CNN-GRU贝叶斯优化卷积门控循环单元多变量时间序列预测效果一览基本介绍模型描述程序设计参考资料 效果一览 基本介绍 基于贝叶斯(bayes)优化卷积神经网络-门控循环…

python 基础语法

标准库 语言参考手册 abstract base class – 抽象基类 abc annotation – 标注 类型注解 argument – 函数参数 关键字参数 xxx / **{xxx: , xx: }位置参数 3, 5 / *(3, 5) 基础语法 使用严格缩进 代替 大括号{} 框定代码块 使用反斜杠 \ 将一行的语句分为多行显示 三引…

ENVI 国产高分2号(GF-2)卫星数据辐射定标 大气校正 影像融合

1.数据 高分2号卫星数据&#xff0c;包含&#xff1a; MSS-1\2多光谱数据&#xff0c;4m分辨率&#xff1b; Pan-1\2全色波段数据&#xff0c;0.8m分辨率。 2.处理软件 ENVI5.3 国产插件下载地址&#xff1a;ENVI App Store (geoscene.cn) 首先下载插件文件&#xff1b; …

【STL十四】函数对象(function object)_仿函数(functor)——lambda表达式

【STL十四】函数对象&#xff08;function object&#xff09;_仿函数&#xff08;functor&#xff09;——lambda表达式 一、函数对象&#xff08;function object&#xff09;二、函数对象优点三、分类四、头文件五、用户定义函数对象demo六、std::内建函数对象1、 算术运算函…

【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的Redis延时队列的功能组件

手把手教你如何开发一个属于自己的延时队列的功能组件 前提介绍解决痛点延时队列组件的架构延时队列组件的初始化流程延时队列组件的整体核心类架构延时队列组件的整体核心类功能 延时队列的开发组件延迟队列的机制配置初始化类源码 - DelayedQueueConfigurationRedission客户端…

网络基础,InetAddress,Socket,TCP,UDP

概念&#xff1a;两台设备之间通过网络实现数据运输网络通信&#xff1a;将数据通过网络从一台设备传输到另一台设备java.net包下提供了一系列的类或接口&#xff0c;供程序员使用&#xff0c;完成网络通信网络&#xff1a;两台或多台设备通过一定物理设备连接起来构成了网络根…

文件和用户管理

Linux基础 提示&#xff1a;个人学习总结&#xff0c;仅供参考。 一、Linux系统部署 二、服务器初始化 三、文件和用户管理 四、用户的权限 提示&#xff1a;文档陆续更新整理 文件和用户管理 Linux基础一、Linux目录结构二、文件管理1.文件类型2.文件管理命令 三、用户管理…

为什么医疗保健需要MFT来帮助保护EHR文件传输

毫无疑问&#xff0c;医疗保健行业需要EHR技术来处理患者&#xff0c;设施&#xff0c;提供者等之间的敏感患者信息。但是&#xff0c;如果没有安全的MFT解决方案&#xff0c;您将无法安全地传输患者文件&#xff0c;从而使您的运营面临遭受数据泄露&#xff0c;尴尬&#xff0…

如何平衡倾斜摄影的三维模型轻量化数据文件大小和质量效果?

如何平衡倾斜摄影的三维模型轻量化数据文件大小和质量效果&#xff1f; 倾斜摄影超大场景的三维模型数据文件大小的具体范围取决于多种因素&#xff0c;如原始数据的复杂度、轻量化处理的方式和压缩算法等。一般而言&#xff0c;经过轻量化处理后&#xff0c;数据文件大小可以减…

c/c++:栈帧,传值,传址,实参传值给形参,传地址指针给形参

c/c&#xff1a;栈帧&#xff0c;传值&#xff0c;传址&#xff0c;实参传值给形参&#xff0c;传地址指针给形参 2022找工作是学历、能力和运气的超强结合体&#xff0c;遇到寒冬&#xff0c;大厂不招人&#xff0c;此时学会c的话&#xff0c; 我所知道的周边的会c的同学&…

WuxioLin 反锯齿算法(反走样算法,Xiaolin Wu Anti-aliasing algorithm) C# 代码实现

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、锯齿和反锯齿二、Xiaolin Wu 算法代码1.C#完整代码如下2.举例和测试 总结 前言 笔者前几日自己写了个佳明手表表盘的的一个入门级App&#xff0c;模拟指针…

甘肃vr全景数字化展厅提高企业品牌认知度和销售效果

相比传统式展厅给观众们呈现的是静态的视觉体会&#xff0c;缺乏实时交互水平。而720VR全景虚拟展厅能够提供高度真实的展览体验&#xff0c;融合视、听、触等各种感官享受&#xff0c;带来颠覆的沉浸式体验。 即便社恐的你也能在虚拟现实的世界游刃有余&#xff0c;想看哪里点…

AD9208子卡设计资料: 2 路 2.6GSPS/3GSPS AD 采集、2 路 12.6G DA 回放、高性能时钟发生器HMC7044 -FMC 子卡模块

板卡概述 FMC123 是一款基于 FMC 标准规范&#xff0c;实现 2 路 14-bit、3GSPSADC 采集功能、2 路 16-bit 12.6GSPS 回放子卡模块。该模块遵循 VITA57.1 标准&#xff0c;可直接与 FPGA 载卡配合使用&#xff0c;板卡 ADC 器件采用 ADI 公司的 AD9208 芯片&#xff0c;&…

《中学科技》期刊简介及投稿邮箱

《中学科技》期刊简介及投稿邮箱 《中学科技》以传播科技知识、启迪智慧、培养才能为宗旨&#xff0c;提供电子技术、计算机、陆海空模型、数学、物理、化学、生物、天文等方面的科技活动资料&#xff0c;特别注意通过科学观察&#xff0c;实验和制作实践的途径&#xff0c;培…

CCGNet用于发现共晶材料中的coformer

共晶工程&#xff08;cocrystal engineering&#xff09;在制药&#xff0c;化学和材料领域有广泛应用。然而&#xff0c;如何有效选择coformer一直是一个挑战性课题。因此&#xff0c;作者开发了一个基于GNN的深度学习框架用于快速预测共晶的形成。为了从现有报告的6819个正样…

Java项目上线之云服务器环境篇(二)——Tomcat的安装与配置

Java项目上线之云服务器环境篇&#xff08;二&#xff09;——Tomcat的安装与配置 Tomcat的选择&#xff1a; 云服务器tomcat的选择最好与本机项目运行的tomcat版本号一致&#xff0c;避免一些不必要的问题。 配置步骤&#xff1a; 1、首先进入云服务器创建好放置tomcat的文件…

重大剧透:你不用ChatGPT,它砸你饭碗

早晨看到路透社报道&#xff0c;盖茨说&#xff0c;与其争论技术的未来&#xff0c;不如专注于如何更好地利用人工智能。 这可能是他对马斯克他们呼吁暂停AI研发6个月的一种回应吧。 有种古语说&#xff1a;天下大势&#xff0c;浩浩汤汤&#xff0c;顺之者昌&#xff0c;逆之者…