KAFKA第二课之生产者(面试重点)

生产者学习

1.1 生产者消息发送流程

在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。
生产者如何发送的?
现在Main线程中将数据进行处理,处理成IO型数据,然后调用sender进行发送
Main:
1.读取生产者配置
2.产生数据
3.过滤数据(校验什么的)
4.序列化
5.放入缓冲区 RecordAccumulator
6.发送Sender

细节: 考虑的问题 1.生产者配置的读取和修改 2.数据的过滤与分区, 3.缓冲区是如何设置的,大小
4.发送(发送失败怎么样,请求区的大小)
这里注意一下,可以在缓冲区对数据进行压缩,这样就提高缓冲区的容量和发送的数据量,提高吞吐量

1.2 同步发送与异步发送

1.什么是同步和异步

同步就是,串行,一条龙 异步 一起运行
举例: 餐馆点餐
同步: 需要等服务员过来,让服务员记录,
异步: 点餐APP直接点餐,交给队列,让他自己运行

2.发送的同步异步

同步:需要得到返回值
异步:发送过去不管了

3. 分区好处

啥是分区?
将一个数据块分成多个数据块
将数据分布式处理了
存储: 可以分在多个机器上, 也可以整多个副本。便于存储,同时提高健壮性
IO:多个数据块可以同时进行发送接收消费。生产者可以以分区为单位发送数据,消费者可以以分区为单位进行消费

4. 默认分区器

前提条件: 1.分区 2.key值
规则:

  • 1存在,按1分区
  • 1不存在,按2.key值对分区数取余得到的值分区
  • 1.2都不存在 随机选个分区,等这个批次发送完了,再换

3 就是粘性分区
那么粘性分区的缺点是什么?
因为缓冲区溢出的条件是,大小和时间双重判断,如果大小不够,但是时间够了,还是会发走,这样,最后导致,分区上产生数据倾斜
如何解决的?
3.3.1 Kafka去掉粘性分区的时间控制,批次只由大小判断

1.3.自定义分区器

1.思路

  • 1.实现接口Parititoner,重写相关方法
  • 2.修改配置 将partitioner设置为默认配置

2.1 自定义分区器代码

public class MyPartitioner implements Partitioner {
    //  自定义分区器 实现partitioner接口

    // 1.分区方法
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // 获取消息
        String data = value.toString();
        // 创建partition 作为最后的分区标识
        int partitions;

        // 分区逻辑
        // 根据含有的字符串进行判断 判断进入哪个分区
        if (data.contains("atguigu")){
             partitions = 0;
        } else if (data.contains("shangguigu")){
            partitions = 1;
        } else {
            partitions = 2;
        }
        return partitions;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

2.2 主类

package com.atguigu.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class ProducerClientAsync {
    public static void main(String[] args) {
        // 0 配置对象
        Properties properties = new Properties();

        //  --指定kafka的Broker地址
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
        //  -- 1.指定序列化器 序列化器的全限定类名
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());


        //.setProperty(ProducerConfig.LINGER_MS_CONFIG,"0");
        // -- 2.设置分区器
        properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());
        // -- 3.获取客户端连接对象
        KafkaProducer<String,String> kafkaProducer= new KafkaProducer<String,String>(properties);
        //  key是主题  v是发送内容  这里注意一下
        // -- 4.发送数据
        String[] str= {"atguigu","111","atguigu","shangguigu","222"};
        for (int i =0; i < str.length; i++) {
            System.out.println(str[i]);
            try {
                kafkaProducer.send(new ProducerRecord<>("first", str[i]), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null){

                            System.out.println("主题:" + metadata.topic() + "->"  + "分区:" + metadata.partition());
                        }else {
                            // 出现异常打印
                            exception.printStackTrace();
                        }
                    }
                }).get();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

        kafkaProducer.close();
    }
}

在这里插入图片描述

3.面试细节

1.如何提高生产者的吞吐量

  • 批次大小调到16
  • 将等待时间改成50-100ms 默认是0
  • 压缩数据量,这样每次发送的数据就多了
  • 加大缓冲区大小,进来的数据变多,发送也能提上去

2.生产者如何保证数据可靠性的

主要通过ack机制

1.什么是ACK机制?

根据ack值来决定Kafka集群服务端的存储应答

  • ack=0 最低 生产者只管发送,不用接收
  • ack=1 中等 生产者发送完需要等待Leader保存后回应,
  • ack=-1 最高 生产者发送完需要等待所有副本保存后回应

2.分析ACK机制

性能与安全是成反比的
所以,-1虽然最安全,但是效率最低

3.如果将ACK调到-1会出现什么问题?

有可能出现数据重复发送与接收
比如,在同步的瞬间,Leader死掉,但是其他副本已经落盘,这时候,就是问题了。
因为Leader死掉了,所以会直接更换Leader,选出一个副本作为Leader,注意,这时显示没有收到内容,所以,send重新发送,这时候,每个副本上,收到的就是2份该数据了。

4.应用场景

acks=0 几乎不用
acks=1 传输普通日志,允许丢失
acks=-1 传输高可靠性数据,一般与钱有关

5.ACK=-1一定可靠么?

不一定
如果分区副本数设置为1 ,或者ISR里应答的最小副本数设置为1(默认也是1),这时候,ack=1效果相同了。
也就是说,应答一个,就能走,就没意义了
所以需要完全可靠就需要配置一下
ACK=-1 & 分区副本大于等于2 & ISR应答最小副本数量大于等于2

3. 数据去重

1.概念

至少一次:一次或者多次 完全可靠
在这里插入图片描述
最多一次:直接不管回复只管发送 ack=0

至少:保证数据不丢失,但是无法保证数据不重复
最多: 无法保证数据不丢失

1.如何解决数据的重复发送与接收的问题,同时保证数据的不丢失

注意,这里解决的是sender和服务端的重复发送与接收,而不是生产者本身发送多个重复消息的问题,这个要搞清楚。
一般重复问题,都是通过标识来判别,从而去重的
Kafka 0.11 引入 幂等性和事务
精确一次: 幂等性 +至少一次(ack=-1 & 分区副本>=2 & ISR最小副本>=2)

4.幂等性

1.概念

啥是幂等性,标识一个消息的唯一标识
<pid,partition,Seqnumber>
Pid 是会话ID,每次重新生成会话,就会重新生成PID
partition是分区 标识 消息是哪个分区的
Seqnumber是单调递增的标识,注意,这是每个分区独享的
这三个在一起,才是唯一标识。

2.如何使用幂等性

开启参数enable.idempotence 默认为true,false关闭。
开启开关就行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/69082.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

泰国的区块链和NFT市场调研

泰国的区块链和NFT市场调研 基本介绍 参考&#xff1a; https://zh.wikipedia.org/zh-hans/%E6%B3%B0%E5%9B%BD参考&#xff1a; https://hktdc.infogram.com/thsc–1h7k2303zo75v2x zz制度&#xff1a; 君主立宪制&#xff08;议会制&#xff09; 国王&#xff1a; 玛哈哇集拉…

基于vue3+webpack5+qiankun实现微前端

一 主应用改造&#xff08;又称基座改造&#xff09; 1 在主应用中安装qiankun(npm i qiankun -S) 2 在src下新建micro-app.js文件&#xff0c;用于存放所有子应用。 const microApps [// 当匹配到activeRule 的时候&#xff0c;请求获取entry资源&#xff0c;渲染到containe…

JVM内存管理

文章目录 1、运行时数据区域1.1 程序计数器&#xff08;线程私有&#xff09;1.2 JAVA虚拟机栈&#xff08;线程私有&#xff09;1.3 本地方法栈1.4 Java堆&#xff08;线程共享&#xff09;1.5 方法区&#xff08;线程共享&#xff09;1.6 直接内存&#xff08;非运行时数据区…

拥抱AIGC浪潮,亚信科技将如何把握时代新增量?

去年底&#xff0c;由ChatGPT带起的AIGC浪潮以迅雷不及掩耳之势席卷全球。 当互联网技术的人口红利逐渐消退之际&#xff0c;AIGC就像打开通用人工智能大门的那把秘钥&#xff0c;加速开启数智化时代的到来。正如OpenAI CEO Sam Altman所言&#xff1a;一个全新的摩尔定律可能…

560. 和为 K 的子数组

思路 本题的主要思路为创建一个哈希表记录每个0~i的和&#xff0c;在遍历这个数组的时候查询有没有sum-k的值在哈希表中&#xff0c;如果有&#xff0c;说明有个位置到当前位置的和为k。   有可能不止一个&#xff0c;哈希表负责记录有几个sum-k&#xff0c;将和记录下来。这…

10个问题,带你重新认识smardaten企业级无代码

很多新客户在接触数睿数据&#xff0c;或者在初步认识smardaten企业级无代码的时候&#xff0c;大家更多地以为只是个普通的无代码工具。在交流过程中&#xff0c;大家也提出了很多疑惑&#xff1a; smardaten无代码平台包括哪些能力&#xff1f; 适合开发哪些应用&#xff1f…

AI自动驾驶

AI自动驾驶 一、自动驾驶的原理二、自动驾驶的分类三、自动驾驶的挑战四、自动驾驶的前景五、关键技术六、自动驾驶的安全问题七、AI数据与自动驾驶八、自动驾驶的AI算法总结 自动驾驶技术是近年来备受关注的热门话题。它代表了人工智能和机器学习在汽车行业的重要应用。本文将…

web集群学习:源码安装nginx配置启动服务脚本、IP、端口、域名的虚拟主机

目录 1、源码安装nginx&#xff0c;并提供服务脚本。 2、配置基于ip地址的虚拟主机 3、配置基于端口的虚拟主机 4、配置基于域名的虚拟主机 1、源码安装nginx&#xff0c;并提供服务脚本。 1、源码安装会有一些软件依赖 &#xff08;1&#xff09;检查并安装 Nginx 基础依赖…

PHP智能人才招聘网站mysql数据库web结构apache计算机软件工程网页wamp

一、源码特点 PHP智能人才招聘网站 是一套完善的web设计系统&#xff0c;对理解php编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 下载地址 https://download.csdn.net/download/qq_41221322/88199392 视频演示 PH…

TextBrewer:融合并改进了NLP和CV中的多种知识蒸馏技术、提供便捷快速的知识蒸馏框架、提升模型的推理速度,减少内存占用

TextBrewer:融合并改进了NLP和CV中的多种知识蒸馏技术、提供便捷快速的知识蒸馏框架、提升模型的推理速度&#xff0c;减少内存占用 TextBrewer是一个基于PyTorch的、为实现NLP中的知识蒸馏任务而设计的工具包&#xff0c; 融合并改进了NLP和CV中的多种知识蒸馏技术&#xff0…

GB28181智慧可视化指挥控制系统之执法记录仪设计探讨

什么是智慧可视化指挥控制系统&#xff1f; 智慧可视化指挥控制平台通过4G/5G网络、WIFI实时传输视音频数据至指挥中心&#xff0c;特别是在有突发情况时&#xff0c;可以指定一台执法仪为现场视频监控器&#xff0c;实时传输当前画面到指挥中心&#xff0c;指挥中心工作人员可…

支持对接鸿蒙系统的无线模块及其常见应用介绍

近距离的无线通信得益于万物互联网的快速发展&#xff0c;基于集成部近距离无线连接&#xff0c;为固定和移动设备建立通信的蓝牙技术也已经广泛应用于汽车领域、工业生产及医疗领域。为协助物联网企业终端产品能快速接入鸿蒙生态系统&#xff0c;SKYLAB联手国产芯片厂家研发推…

VR家装提升用户信任度,线上体验家装空间感

近些年&#xff0c;VR家装逐渐被各大装修公司引入&#xff0c;VR全景装修的盛行&#xff0c;大大增加了客户“所见即所得”的沉浸式体验感&#xff0c;不再是传统二维平面的看房模式&#xff0c;而是让客户通过视觉、听觉、交互等功能更加真实的体验家装后的效果。 对于传统家装…

造个轮子-任务调度执行小框架-任务执行器代理实现

文章目录 前言执行器代理代理对象任务清单代理对象任务清单执项对象处理handler对象状态对象代理工厂任务清单代理工厂清单任务项代理工厂总结前言 不知道为啥,今天好像学不进去,没办法,那就继续编码吧。那么今天的话,加更一篇文章,那就是咱们这个任务执行器的实现。先前…

opencv基础57-模板匹配cv2.matchTemplate()->(目标检测、图像识别、特征提取)

OpenCV 提供了模板匹配&#xff08;Template Matching&#xff09;的功能&#xff0c;它允许你在图像中寻找特定模板&#xff08;小图像&#xff09;在目标图像中的匹配位置。模板匹配在计算机视觉中用于目标检测、图像识别、特征提取等领域。 以下是 OpenCV 中使用模板匹配的基…

Java SPI介绍

SPI Java SPI : Service Provider Interface 是Java平台提供的一种机制&#xff0c;用于动态的加载和扩展功能的机制&#xff0c;它为框架和库提供了一种松耦合的扩展方式&#xff0c;核心是解耦。 例如JDBC驱动&#xff0c;日志框架&#xff0c;等应用&#xff0c;它为开发…

Linux——基础IO(1)

目录 0. 文件先前理解 1. C文件接口 1.1 写文件 1.2 读文件 1.3 输出信息到显示器 1.4 总结 and stdin & stdout & stderr 2. 系统调用文件I/O 2.1 系统接口使用示例 2.2 接口介绍 2.3 open函数返回值 3. 文件描述符fd及重定向 3.1 0 & 1 & 2 3.2…

记录--前端重新部署如何通知用户

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 1. 场景 前端构建完上线&#xff0c;用户还停留还在老页面&#xff0c;用户不知道网页重新部署了&#xff0c;跳转页面的时候有时候js连接hash变了导致报错跳不过去&#xff0c;并且用户体验不到新功能…

SAP Fiori 问题收集

事务代码篇 启动工作台&#xff1a;/N/UI2/FLP 错误日志&#xff1a; /n/IWFND/ERROR_LOG 服务清单&#xff1a; /n/IWFND/MAINT_SERVICE 创建语义对象&#xff1a;/N/UI2/SEMOBJ 创建目录&#xff1a;/N/UI2/FLPD_CONF&#xff08;cross-client&#xff09;或 /N/UI2…

【ES】笔记-箭头函数的实践于应用场景

箭头函数的实践于应用场景 需求-1 点击 div 2s后颜色变成[粉色]从数组中返回偶数的元素 需求-1 点击 div 2s后颜色变成[粉色] html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport…