探究Kafka原理-4.API使用

  • 👏作者简介:大家好,我是爱吃芝士的土豆倪,24届校招生Java选手,很高兴认识大家
  • 📕系列专栏:Spring源码、JUC源码、Kafka原理
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2023计划中:源码溯源,一探究竟
  • 📝联系方式:nhs19990716,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

文章目录

  • 练习
    • 生产者
    • Ack 应答机制
    • 消费者重要参数
  • API 开发:topic 管理
    • 列出主题
    • 查看主题信息
    • 创建主题
    • 删除主题
    • 其他管理
  • 练习
    • 消费者
    • 消费者

练习

需求:

写一个生产者,不断地去生成“用户行为事件” 并写入kafka

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}

写一个消费者,不断地从kafka种取消费如上 “ 用户行为事件” 数据,并做统计计算,每五分钟,输出一次截至到当时的数据中出现过的用户总数。

生产者

public class KafkaTest{
	public static void main(String[] args){
        MyDataGen myDataGen = new MyDataGen();
        myDataGen.genData();
    }
}


/*
业务数据生成器
*/
class MyDataGen{
    Producer<String, String> produce;
    
    public MyDataGen(){
        Properties props = new Properties();
        //设置 kafka 集群的地址
		props.put("bootstrap.servers", "doitedu01:9092,doitedu02:9092,doitedu03:9092");
        //序列化器
		props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
        //ack 模式,取值有 0,1,-1(all) , all 是最慢但最安全的
props.put("acks", "all")
        producer = new KafkaProducer<>(props);
    }
    
    
    public void genData(){
        UserEvent userEvent = new UserEvent();
        while(true){
            // 造一条随机用户行为事件数据对象
            userEvent.setGuid(RandomUtils.nextInt(1,10000));
            userEvent.setEventId(RandomStringUtils.randomAlphabetic(5,8));
            userEvent.setTimeStamp(System.currentTimeMills());
            
            // 转成json串
            String json = JSON.toJSONString(userEvent);
            
            // 讲业务数据封装成ProducerRecord对象
            ProducerRecord<String,String> record = new ProducerRecord<String,String>("test",json);
            
            // 用producer写入kafka
            producer.send(record);
            
            Thread.sleep(RandomUtils.nextInt(500,2000));
        }
    }
}

@NoArgsConStructor
@AllArgConstructor
@Getter
@Setter
class UserEvent{
    private long guid;
    private String eventId;
    private long timeStamp;
}

Ack 应答机制

Ack 应答机制参数配置

在这里插入图片描述

  • 0 : 生产者发出消息后不等待服务端的确认
  • 1 : 生产者发出消息后要求服务端的分区 leader 确保数据存储成功后发送一个确认信息
  • -1: 生产者发出消息后要求服务端的分区的 ISR 副本全部同步成功后发送一个确认信息

生产者的 ack=all,也不能完全保证数据发送的 100%可靠性

为什么?因为,如果服务端目标 partition 的同步副本只有 leader 自己了,此时,它收到数据就会给生产者反馈成功!

可以修改服务端的一个参数(分区最小 ISR 数[min.insync.replicas]>=2),来避免此问题;

消费者重要参数

fetch.min.bytes=1B 一次拉取的最小字节数

fetch.max.bytes=50M 一次拉取的最大数据量

fetch.max.wait.ms=500ms 拉取时的最大等待时长

max.partition.fetch.bytes = 1MB 每个分区一次拉取的最大数据量

max.poll.records=500 一次拉取的最大条数

connections.max.idle.ms=540000ms 网络连接的最大闲置时长

request.timeout.ms=30000ms 一次请求等待响应的最大超时时间 consumer 等待请求响应的最长时间

metadata.max.age.ms=300000 元数据在限定时间内没有进行更新,则会被强制更新

reconnect.backoff.ms=50ms 尝试重新连接指定主机之前的退避时间

retry.backoff.ms=100ms 尝试重新拉取数据的重试间隔

isolation.level=read_uncommitted 隔离级别! 决定消费者能读到什么样的数据
read_uncommitted:可以消费到 LSO(LastStableOffset)位置;
read_committed:可以消费到 HW(High Watermark)位置

max.poll.interval.ms 超过时限没有发起 poll 操作,则消费组认为该消费者已离开消费组

enable.auto.commit=true 开启消费位移的自动提交

auto.commit.interval.ms=5000 自动提交消费位移的时间间隔

API 开发:topic 管理

如果希望将管理类的功能集成到公司内部的系统中,打造集管理、监控、运维、告警为一体的生态平台,那么就需要以程序调用 API 方式去实现。如下所示:

在这里插入图片描述

为什么在网页上,或者用命令,都能做到topic的创建、删除、列出、查看详情,其底层逻辑是什么?

其本质上是通过不同方式调用了kafka提供的创建topic api

在这里插入图片描述

底层逻辑:web的后端java程序中,去调用相关的api

工具类 KafkaAdminClient 可以用来管理 broker、配置和 ACL (Access Control List),管理 topic

在这里插入图片描述

构造一个 KafkaAdminClient

AdminClient adminClient = KafkaAdminClient.create(props);

列出主题

ListTopicsResult listTopicsResult = adminClient.listTopics();
Set<String> topics = listTopicsResult.names().get();
System.out.println(topics);

查看主题信息

DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(Arrays.asList("tpc_4", "tpc_3"));
Map<String, TopicDescription> res = describeTopicsResult.all().get();
Set<String> ksets = res.keySet();
for (String k : ksets) {
	System.out.println(res.get(k));
}

创建主题

// 参数配置
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"doit01:9092,doit02:9092,doit03:
9092");
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);
          
// 创建 admin client 对象
AdminClient adminClient = KafkaAdminClient.create(props);
// 由服务端 controller 自行分配分区及副本所在 broker
NewTopic tpc_3 = new NewTopic("tpc_3", 2, (short) 1);
// 手动指定分区及副本的 broker 分配
HashMap<Integer, List<Integer>> replicaAssignments = new HashMap<>();
// 分区 0,分配到 broker0,broker1
replicaAssignments.put(0,Arrays.asList(0,1));
// 分区 1,分配到 broker0,broker2
replicaAssignments.put(1,Arrays.asList(0,1));

NewTopic tpc_4 = new NewTopic("tpc_4", replicaAssignments);
CreateTopicsResult result = adminClient.createTopics(Arrays.asList(tpc_3,tpc_4));

// 从 future 中等待服务端返回
try {
	result.all().get();
} catch (Exception e) {
	e.printStackTrace();
}
adminClient.close();

删除主题

DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("tpc_1",
"tpc_1"));
Map<String, KafkaFuture<Void>> values = deleteTopicsResult.values();
System.out.println(values);

其他管理

除了进行 topic 管理外,KafkaAdminClient 也可进行诸如动态参数管理,分区管理等各类管理操作;

练习

需求:

写一个生产者,不断地去生成“用户行为事件” 并写入kafka

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789}

写一个消费者,不断地从kafka种取消费如上 “ 用户行为事件” 数据,并做统计计算,每五分钟,输出一次截至到当时的数据中出现过的用户总数。 本质 去重guid数

消费者

首先分析思路,可以使用hashmap来进行统计思路,如下所示:

在这里插入图片描述

如果存在线程安全的问题,可以使用ConcurrentHashMap。

还有一个业务需求就是 每五分钟去输出一个结果,这个时机、节奏,如何去把控。

实际上就是一个线程源源不断的拉数据,一个线程定期的输出结果。

public class consumer{
    public static void main(String[] args){
        ConcurrentHashMap<Long, String> guidMap = new ConcurrentHashMap<>();
        
        // 启动数据消费线程
        new Thread(new ConsumeRunnable(guidMap)).start();
        
        
        // 启动统计及消费输出结果的线程(每5s输出一次)
        // 优雅一点实现定时调度,可以用各种定时调度器(有第三方的,也可以用JDK自己的,Timer)
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new StatisticTask(guidMap),0,1000);
        
    }
}

class ConsumeRunnable implements Runnable{
    
    ConcurrentHashMap<Long, String> guidMap;
    
    public ConsumeRunnable(ConcurrentHashMap<Long, String> guidMap){
        this.guidMap = guidMap;
    }
    
    
    public void run(){
        Properties props = new Properties();
        // 定义 kakfa 服务的地址,不需要将所有 broker 指定上
		props.put("bootstrap.servers", "doitedu01);
        // key 的反序列化类
		props.put("key.deserializer",
                  "org.apache.kafka.common.serialization.StringDeserializer");
		// value 的反序列化类
		props.put("value.deserializer",
					"org.apache.kafka.common.serialization.StringDeserializer");
                  
        // 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, none
		props.put("auto.offset.reset","latest");
                  
        // 制定 consumer group
		props.put("group.id", "g1");
                  
                  
                  
        while (true) {
			// 读取数据,读取超时时间为 5000ms
			ConsumerRecords<String, String> records = consumer.poll(5000);
			for (ConsumerRecord<String, String> record : records)
				String eventJson = record.value();
            	UserEvent userEvent = JSON.parseObject(eventJson,UserEvent.class);
            
            	guidMap.put(userEvent.getGuid(), "");
			}
    	}
}
                  
class StatisticTask extends TimerTask{
    
    ConcurrentHashMap<Long, String> guidMap;
    
    public StatisticTask(ConcurrentHashMap<Long, String> guidMap){
        this.guidMap = guidMap;
    }
    
    public void run(){
        System.out.println("截止到当前的用户总数为:"+guidMap.size());
    }
}

ConcurrentHashMap 中无论 key 还是 Value都不能填 null值,具体在源码中有体现:

其 put流程如下:

	public V put(K key, V value) {
        return putVal(key, value, false);
        // onlyIfAbsent如果是false,那么每次都会用新值替换掉旧值
    }
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        // 其中 spread 方法会综合高位低位, 具有更好的 hash 性
        int hash = spread(key.hashCode());
        int binCount = 0;
        // 死循环
        for (Node<K,V>[] tab = table;;) {
            // f 是链表头节点
            // fh 是链表头结点的 hash
            // i 是链表在 table 中的下标
            Node<K,V> f; int n, i, fh;
            // 要创建 table
            if (tab == null || (n = tab.length) == 0)
                // 初始化 table 使用了 cas, 无需 synchronized 创建成功, 进入下一轮循环
                // 因为是懒惰初始化的,所以直到现在才开始创建 初始化使用cas 创建,其它失败得再次进入循环,没有用syn 我们得线程并没有被阻塞住
                tab = initTable();
                // 要创建链表头节点
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 添加链表头使用了 cas, 无需 synchronized
                // 用cas将头节点加进去,如果加入失败了,继续循环
                if (casTabAt(tab, i, null,
                        new Node<K,V>(hash, key, value, null)))
                    break;
            }
            // 帮忙扩容
            // 其实就是看你的头结点是不是 ForwardingNode,其对应得MOVED是一个负数
            else if ((fh = f.hash) == MOVED)
                // 帮忙之后, 进入下一轮循环
                // 锁住当前的链表,帮助去扩容
                tab = helpTransfer(tab, f);
            // 能进入这个else,说明 table既不处于扩容中,也不是处于table的初始化过程中,而且这时肯定发生了锁下标的冲突
            else {
                V oldVal = null;
                // 锁住链表头节点
                // 并没有锁住整个tab,而是锁住这个桶链表的头节点
                synchronized (f) {
                    // 再次确认链表头节点没有被移动
                    if (tabAt(tab, i) == f) {
                        // 链表
                        // 链表的头节点hash码大于等于 0 
                        if (fh >= 0) {
                            binCount = 1;
                            // 遍历链表
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
// 找到相同的 key
                                if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                                (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    // 更新
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                // 已经是最后的节点了, 新增 Node, 追加至链表尾
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                            value, null);
                                    break;
                                }
                            }
                        }
                        // 红黑树
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            // putTreeVal 会看 key 是否已经在树中, 是, 则返回对应的 TreeNode
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                    value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                    // 释放链表头节点的锁
                }

                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        // 如果链表长度 >= 树化阈值(8), 进行链表转为红黑树
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 增加 size 计数
        addCount(1L, binCount);
        return null;
    }
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        // 这个hash有没有被创建
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                // 让出cpu的使用权,如果cpu的时间片没有其它线程了,那么还是会分给这个线程,只是让他不至于充分利用cpu,少占用一点cpu的时间。
                Thread.yield();
                // 尝试将 sizeCtl 设置为 -1(表示初始化 table)
            
            // 而其它的线程,再次进入循环,首先 不小于0了,其次,之前的 sc也已经变了,cas失败,再次循环的时候,发现 tab已经不为空了,结束循环
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                // 获得锁, 创建 table, 这时其它线程会在 while() 循环中 yield 直至 table 创建
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        // 计算出下一次要扩容的阈值
                        sc = n - (n >>> 2);
                    }
                } finally {
                    // 计算出下一次要扩容的阈值
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }


    // check 是之前 binCount 的个数
	// 运用了 longadder 的思想
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        if (
            // 已经有了 counterCells, 向 cell 累加
            // 累加单元数组不为空
                (as = counterCells) != null ||
                        // 还没有, 向 baseCount 累加
            // 一个基础数值累加
                        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)
        ) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (
                // 还没有 counterCells
                    as == null || (m = as.length - 1) < 0 ||
                            // 还没有 cell
                            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                            // cell cas 增加计数失败
                            !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
            ) {
                // 创建累加单元数组和cell, 累加重试
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            // 获取元素个数
            s = sumCount();
        }
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            // 看看元素的个数是否大于扩容的阈值
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                    (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                        break;
                    // newtable 已经创建了,帮忙扩容
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        // 首次调用,因为是懒惰初始化的,所以还没有创建
                        transfer(tab, nt);
                }
                // 需要扩容,这时 newtable 未创建
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                        (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

但是这里使用一个hashmap去记录去重的guid是有弊端的。

利用map或者set来记录guid本身,比较占用内存还有性能。

guid是一个Long值,8个byte,如果咱们的用户规模达到了10亿,那你这个set或者map中存储的guid就将达到10亿个。

相当于8 * 1000000000 /1024/1024 = 7629MB相当于一个Hashmap中存储了7G数据

那么有一些经典的数据结构,bitmap、bloomfilter、hyperloglog都可以解决去统计个数判重的场景。

在这里插入图片描述

输出个数:有几个1,就是出现过几种 guid:

之前使用hashmap存,直接存guid,8个byte一个guid == 64bit 现在bitmap里面,记一个guid,只要一个bit

并且hashmap是按需扩容的,这个过程是很浪费性能的,而bigmap要一开始初始化10亿个bit,有点浪费空间。

但是api会对最原始的bitmap进行一些优化,比如说稀疏型的向量的场景中,例如roaringbitmap工具包进行了优化,慢慢的进行了扩容。

public class consumer{
    public static void main(String[] args){

        // 使用roaringbitmap来记录
        RoaringBitmap bitmap = RoaringBitmap.bigmapOf();
        
        // 启动数据消费线程
        new Thread(new ConsumeRunnable(bitmap)).start();
        
        
        // 启动统计及消费输出结果的线程(每5s输出一次)
        // 优雅一点实现定时调度,可以用各种定时调度器(有第三方的,也可以用JDK自己的,Timer)
        Timer timer = new Timer();
        timer.scheduleAtFixedRate(new StatisticTask(bitmap),0,1000);
        
    }
}

class ConsumeRunnable implements Runnable{
    
    RoaringBitmap bitmap;
    
    public ConsumeRunnable(RoaringBitmap bitmap){
        this.bitmap = bitmap;
    }
    
    
    public void run(){
        Properties props = new Properties();
        // 定义 kakfa 服务的地址,不需要将所有 broker 指定上
		props.put("bootstrap.servers", "doitedu01);
        // key 的反序列化类
		props.put("key.deserializer",
                  "org.apache.kafka.common.serialization.StringDeserializer");
		// value 的反序列化类
		props.put("value.deserializer",
					"org.apache.kafka.common.serialization.StringDeserializer");
                  
        // 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, none
		props.put("auto.offset.reset","latest");
                  
        // 制定 consumer group
		props.put("group.id", "g1");
                  
                  
                  
        while (true) {
			// 读取数据,读取超时时间为 5000ms
			ConsumerRecords<String, String> records = consumer.poll(5000);
			for (ConsumerRecord<String, String> record : records)
				String eventJson = record.value();
            	UserEvent userEvent = JSON.parseObject(eventJson,UserEvent.class);
            
            	bitmap.add(userEvent.getGuid());
			}
    	}
}
                  
class StatisticTask extends TimerTask{
    
    RoaringBitmap bitmap;
    
    public StatisticTask(RoaringBitmap bitmap){
        this.bitmap = bitmap;
    }
    
    public void run(){
        System.out.println("截止到当前的用户总数为:"+bitmap.getCardinality());
    }
}

但是如果涉及到了多维聚合,那么单纯的使用bitmap可能还不够

比如 省市区 用户数 、 省市 用户数

江苏省,南京市,鼓楼区,5

江苏省,南京市,下关区,6

从用户访问记录中统计各区域的用户数

江苏省,南京市,鼓楼区,[0,1,0,1,1,1,0,1,0,0,0,0] 5

江苏省,南京市,下关区,[1,1,0,0,0,0,0,0,1,1,1,1] 6

要到的:江苏省,南京市 [1,1,0,1,1,1,0,1,1,1,1,1] 10

这也就是使用bitmap来实现高效率的,维度再均衡

所以这也就是数据结构与应用解耦

消费者

需求2:写一个消费者,不断地从kafka中消费如上"用户行为事件",并做出如下加工处理

给每一条数据,添加一个字段来标识,该条数据所属的用户今天是否第一次出现,如是,则标注1,否则标注0

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789,“flag”:1}

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789,“flag”:0}

{“guid”:1,“eventId”:“pageview”,“timestamp”:163786834789,“flag”:1}

这个需求可以用HashMap来做

也可以用bitmap来做,启动bitmap.contain()能够判断是是否包含

如果是判重字符串,用bitmap就不太方便了,可以使用bloomfilter,但是bloomfilter会牺牲一定的准确率(因为bloomfilter会误判)

在判重方面,布隆过滤器比bitmap.contain()更适合。

布隆过滤器具有以下优势:

  1. 内存占用更小:布隆过滤器使用位数组和哈希函数来表示数据集合,相比于bitmap.contain()方法,它可以以较小的内存占用来存储大规模的数据集合。
  2. 高效的查询性能:布隆过滤器可以在常数时间内完成判重操作,即使数据集合非常庞大,也能以较高的速度进行查询。而bitmap.contain()方法可能需要遍历整个位图数组,时间复杂度较高。
  3. 可扩展性:布隆过滤器可以动态地添加新的元素,并且支持删除操作。相比之下,bitmap.contain()方法需要预先确定数据范围,不便于动态扩展。

虽然布隆过滤器具有以上优势,但也需要注意它存在一定的误判率(即可能将不存在的元素判断为存在)。因此,在判重的关键场景中,如果对结果的准确性要求非常高,可以使用其他更精确的去重方法进行验证。

public class consumer{
    public static void main(String[] args){  
        // 启动数据消费线程
        new Thread(new ConsumeRunnable()).start();
    }
}

class ConsumeRunnable implements Runnable{
    
    BloomFilter<Long> bloomFilter;
    
    Properties props;
    
    public ConsumeRunnable(){
        bloomFilter = BloomFilter.create(Funnels.longFunnel(),1000000000,0.01);
        props = new Properties();
        // 定义 kakfa 服务的地址,不需要将所有 broker 指定上
		props.put("bootstrap.servers", "doitedu01);
        // key 的反序列化类
		props.put("key.deserializer",
                  "org.apache.kafka.common.serialization.StringDeserializer");
		// value 的反序列化类
		props.put("value.deserializer",
					"org.apache.kafka.common.serialization.StringDeserializer");
                  
        // 如果没有消费偏移量记录,则自动重设为起始 offset:latest, earliest, none
		props.put("auto.offset.reset","latest");
                  
        // 制定 consumer group
		props.put("group.id", "g1");
    }
    
    
    public void run(){
   
        while (true) {
			// 读取数据,读取超时时间为 5000ms
			ConsumerRecords<String, String> records = consumer.poll(5000);
			for (ConsumerRecord<String, String> record : records)
				String eventJson = record.value();
            	UserEvent userEvent = JSON.parseObject(eventJson,UserEvent.class);
            
            	// 去布隆过滤器判断一下
            	boolean mightContain = bloomFilter.mightContain(userEvent.getGuid());
            	if(mightContain){
                    userEvent.setFlag(0);
                }else{
                    userEvent.setFlag(1);
                    bloomFilter.put(userEvent.getGuid());
                }
			}
    	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/207936.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

程序员成了teamleader

在职场中,你是否遇到过这样的领导或同事,他可能是自恋狂,自吹自擂自我标榜;可能是团队合作的绊脚石,对团队合作态度消极并频繁拖后腿;可能是抱怨专家,满满负能量;可能是完美主义者,对细节过度挑剔;可能是技术白痴,对技术一窍不通或总是犯低级错误;可能是抢功劳者,…

消息中间件介绍

概述 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能&#xff0c;成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件&#xff0c;如ActiveMQ、RabbitMQ&#xff0c;Kafka&#xff0c;还有阿里…

【工具分享】| 阅读论文神器 使用技巧 AI润色 AI翻译

文章目录 1 使用技巧1.1 功能一 即时翻译1.2 功能二 文献跳转1.3 功能三 多设备阅读1.4 功能四 小组讨论笔记共享1.5 功能五 个人文献管理 2 其他功能 超级喜欢Readpaper这一款论文阅读软件&#xff0c;吹爆他哈哈 为什么&#xff1f; 当然是他可以解决我们传统阅读论文的种种…

Python列表切片操作详解:提取、复制、反转等应用示例

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 在Python中&#xff0c;列表切片是处理列表数据非常强大且灵活的方法。本文将全面探讨Python中列表切片的多种用法&#xff0c;包括提取子列表、复制列表、反转列表等操作&#xff0c;结合丰富的示例代码进行详细…

模糊C均值(Fuzzy C-means,FCM)聚类的可运行的python程序代码,复制即可用!!切记需要安装库 scikit-fuzzy

文章目录 前言一、安装库 scikit-fuzzy二、具体程序代码&#xff08;复制可运行&#xff09;三、结果展示总结 前言 模糊C均值&#xff08;Fuzzy C-means&#xff0c;FCM&#xff09;聚类是一种软聚类方法&#xff0c;它允许数据点属于多个聚类&#xff0c;每个数据点对所有聚…

c语言练习13周(1~5)

输入任意整数n求以下公式和的平方根。 读取一系列的整数 X&#xff0c;对于每个 X&#xff0c;输出一个 1,2,…,X 的序列。 编写double fun(int a[M][M])函数&#xff0c;返回二维数组周边元素的平均值&#xff0c;M为定义好的符号常量。 编写double fun(int a[M])函…

DAPP开发【02】Remix使用

系列文章目录 系列文章在DAPP开发专栏 文章目录 系列文章目录使用部署测试网上本地项目连接remix本地项目连接remix 使用 创建一个新的工作空间 部署测试网上 利用metaMask连接测试网络 添加成功&#xff0c;添加时需要签名 即可进行编译 即可部署 本地项目连接remix 方…

赛氪受邀参加“CCF走进高校”,助力计算机学科发展

赛氪受邀参加“CCF走进高校”&#xff0c;助力计算机学科发展。 12月1日&#xff0c;由中国计算机学会计算机应用专业委员会组织的第十二届第十六次常务委员(扩大)会议顺利召开&#xff0c;赛氪受邀参加。 本次会议“CCF走进东北师大以及长春工业大学”&#xff0c;围绕《水声…

【tailwind CSS ml 不生效】

tailwind官方文档中需要注意的一点是&#xff0c;margin或者padding的值最大就到96&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01; 附上官方文档链接tailwind官方文档

云计算如何创芯:“逆向工作法”的性感之处

在整个云计算领域&#xff0c;能让芯片规模化的用起来&#xff0c;是决定造芯是否成功的天花板。在拉斯维加斯的亚马逊云科技2023 re:Invent则是完美诠释了这一论调。 亚马逊云科技2023 re:Invent开幕前两个小时&#xff0c;有一场小型的欢迎晚宴&#xff0c;《星期日泰晤士报》…

带你手搓阻塞队列——自定义实现

&#x1f308;&#x1f308;&#x1f308;今天给大家分享的是——阻塞队列的自定义实现&#xff0c;通过自定义实现一个阻塞队列&#xff0c;可以帮助我们更清晰、更透彻的理解阻塞队列的底层原理。 清风的CSDN博客 &#x1f6e9;️&#x1f6e9;️&#x1f6e9;️希望我的文章…

喜报 | 通付盾WAAP解决方案入选国家工业信息安全发展研究中心“2023年数字化转型自主创新解决方案优选案例”

为提升自主创新产品质量和技术创新能力&#xff0c;助力重点行业自主可控基础设施建设&#xff0c;加速重点行业数字化转型工作进程&#xff0c;促进重点行业产业链数字化升级&#xff0c;推动重点行业数字化、网络化、智能化发展。国家工业信息安全发展研究中心联合中国交通建…

SQL手工注入漏洞测试(MySQL数据库-字符型)-墨者

———靶场专栏——— 声明&#xff1a;文章由作者weoptions学习或练习过程中的步骤及思路&#xff0c;非正式答案&#xff0c;仅供学习和参考。 靶场背景&#xff1a; 来源&#xff1a; 墨者学院 简介&#xff1a; 安全工程师"墨者"最近在练习SQL手工注入漏洞&#…

SpringBoot——Quartz 定时任务

优质博文&#xff1a;IT-BLOG-CN 一、Scheduled 定时任务 【1】添加Scheduled相关依赖&#xff0c;它是Spring自带的一个jar包因此引入Spring的依赖&#xff1a; <dependency><groupId>org.springframework</groupId><artifactId>spring-context-su…

对于Web标准以及W3C的理解、对viewport的理解、xhtml和html有什么区别?

1、对于Web标准以及W3C的理解 Web标准 Web标准简单来说可以分为结构、表现、行为。 其中结构是由HTML各种标签组成&#xff0c;简单来说就是body里面写入标签是为了页面的结构。 表现指的是CSS层叠样式表&#xff0c;通过CSS可以让我们的页面结构标签更具美感。 行为指的是…

2023年12月02日新闻简报(国内国际)

新闻简报 每天三分钟&#xff0c;朝闻天下事。今天是&#xff1a;2023年12月02日&#xff0c;星期六&#xff0c;农历十月廿十&#xff0c;祝工作愉快&#xff0c;身体健康&#xff0c;生活喜乐&#xff1a;&#xff1a; 国内新闻 1、商务部&#xff1a;对原产于澳大利亚的进…

【C指针】深入理解指针(最终篇)数组指针指针运算题解析(一)

&#x1f308;write in front :&#x1f50d;个人主页 &#xff1a; 啊森要自信的主页 ✏️真正相信奇迹的家伙&#xff0c;本身和奇迹一样了不起啊&#xff01; 欢迎大家关注&#x1f50d;点赞&#x1f44d;收藏⭐️留言&#x1f4dd;>希望看完我的文章对你有小小的帮助&am…

电商营销场景的RocketMQ实战01-RocketMQ原理

架构图 Broker主从架构与集群模式 RocketMQ原理深入剖析 Broker主从架构原理 HAConnection与HAClient Broker基于raft协议的主从架构 Consumer运行原理 基础知识 001_RocketMQ架构设计与运行流程分析 RocketMQ这一块&#xff0c;非常关键的一个重要的技术&#xff0c;面试的时候…

【Vue3+Ts项目】硅谷甄选 — 搭建后台管理系统模板

一、 项目初始化 一个项目要有统一的规范&#xff0c;需要使用eslintstylelintprettier来对我们的代码质量做检测和修复&#xff0c;需要使用husky来做commit拦截&#xff0c;需要使用commitlint来统一提交规范&#xff08;即统一提交信息&#xff09;&#xff0c;需要使用pre…

Day04:每日一题:2661. 找出叠涂元素

2661. 找出叠涂元素 给你一个下标从 0 开始的整数数组 arr 和一个 m x n 的整数 矩阵 mat 。 arr 和 mat 都包含范围 [1&#xff0c;m * n] 内的 所有 整数。从下标 0 开始遍历 arr 中的每个下标 i &#xff0c;并将包含整数 arr[i] 的 mat 单元格涂色。请你找出 arr 中在 mat…
最新文章