16、Flink 的广播状态 (Broadcast State) 示例

1、Broadcast State 案例

规则流:1,a,b [规则名1 规则为 a 或 b]
图形流:green,a [绿色 a]

问题:如果规则流先于数据流则匹配不上=>此时缓冲数据流中的数据【如果规则流为null】

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.util.*;

public class _06_BroadcastState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 接收广播的规则数据
        SingleOutputStreamOperator<_06_Rule> ruleStream = env.socketTextStream("localhost", 8888)
                .map(new MapFunction<String, _06_Rule>() {
                    @Override
                    public _06_Rule map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return new _06_Rule(fields[0], new Tuple2<>(fields[1], fields[2]));
                    }
                });

        // 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
        MapStateDescriptor<String, _06_Rule> ruleStateDescriptor = new MapStateDescriptor<>(
                "RulesBroadcastState",
                BasicTypeInfo.STRING_TYPE_INFO,
                TypeInformation.of(new TypeHint<_06_Rule>() {
                }));

        // 广播流,广播规则并且创建 broadcast state
        BroadcastStream<_06_Rule> ruleBroadcastStream = ruleStream
                .broadcast(ruleStateDescriptor);


        // 接收图形数据
        SingleOutputStreamOperator<_06_Shape> shapeStream = env.socketTextStream("localhost", 9999)
                .map(new MapFunction<String, _06_Shape>() {
                    @Override
                    public _06_Shape map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return new _06_Shape(fields[0], fields[1]);
                    }
                });

        shapeStream.keyBy(_06_Shape::getColour)
                .connect(ruleBroadcastStream)
                .process(new KeyedBroadcastProcessFunction<String, _06_Shape, _06_Rule, String>() {
//                    private transient ValueState<List<_06_Shape>> dataBuffer;
                    private transient ListState<_06_Shape> dataBuffer;

                    @Override
                    public void open(Configuration parameters) throws Exception {
//                        ValueStateDescriptor<List<_06_Shape>> dataListStateDescriptor = new ValueStateDescriptor<>("dataBuffer", TypeInformation.of(new TypeHint<List<_06_Shape>>() {
//                        }));
//
//                        dataBuffer = getRuntimeContext().getState(dataListStateDescriptor);

                        ListStateDescriptor<_06_Shape> listStateDescriptor = new ListStateDescriptor<>("dataBuffer", _06_Shape.class);
                        dataBuffer = getRuntimeContext().getListState(listStateDescriptor);
                    }

                    @Override
                    public void processElement(_06_Shape value, KeyedBroadcastProcessFunction<String, _06_Shape, _06_Rule, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {
                        // 获取广播的规则数据
                        System.out.println("输入的数据颜色为=>" + value.getColour() + ",类型为=>" + value.getType());
                        ReadOnlyBroadcastState<String, _06_Rule> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);

                        Iterator<Map.Entry<String, _06_Rule>> iterator = broadcastState.immutableEntries().iterator();

                        if (iterator.hasNext()) {
                            // 使用 ValueState
                            // 先从缓存中读取数据进行匹配
//                            List<_06_Shape> shapeList = dataBuffer.value();
                            // 多并行度时,防止某个并行度无数据导致报错
//                            if (shapeList != null) {
//                                if (!shapeList.isEmpty()) {
//                                    for (_06_Shape shape : shapeList) {
//                                        System.out.println("被缓冲的数据开始进行处理=>" + shape);
//                                        // 从事件数据中继续匹配
//                                        while (iterator.hasNext()) {
//                                            Map.Entry<String, _06_Rule> rule = iterator.next();
//                                            if (Objects.equals(rule.getValue().getRule().f0, shape.getType()) || Objects.equals(rule.getValue().getRule().f1, shape.getType())) {
//                                                out.collect("匹配上的数据为=>" + value + "匹配上的规则名称为=>" + rule.getValue().getRuleName());
//                                            }
//                                        }
//                                    }
//
//                                    shapeList.clear();
//                                }
//                            }

                            // 使用 ListState
                            Iterator<_06_Shape> dataIterator = dataBuffer.get().iterator();
                            while (dataIterator.hasNext()){
                                _06_Shape shape = dataIterator.next();
                                System.out.println("被缓冲的数据开始进行处理=>" + shape);

                                while (iterator.hasNext()) {
                                    Map.Entry<String, _06_Rule> rule = iterator.next();
                                    if (Objects.equals(rule.getValue().getRule().f0, value.getType()) || Objects.equals(rule.getValue().getRule().f1, value.getType())) {
                                        out.collect("匹配上的数据为=>" + value + "匹配上的规则名称为=>" + rule.getValue().getRuleName());
                                    }
                                }

                                dataIterator.remove();
                            }

                            // 从事件数据中继续匹配
                            while (iterator.hasNext()) {
                                Map.Entry<String, _06_Rule> rule = iterator.next();
                                if (Objects.equals(rule.getValue().getRule().f0, value.getType()) || Objects.equals(rule.getValue().getRule().f1, value.getType())) {
                                    out.collect("匹配上的数据为=>" + value + "匹配上的规则名称为=>" + rule.getValue().getRuleName());
                                }
                            }
                        } else {
                            System.out.println("此时规则流中无规则,先缓冲数据流");
                            // 使用 listState
                            dataBuffer.add(value);

                            // 使用 valueState
//                            List<_06_Shape> shapeList = dataBuffer.value();
//                            if (shapeList == null) {
//                                shapeList = new ArrayList<>();
//                            }
//                            shapeList.add(value);
//                            dataBuffer.update(shapeList);
                        }
                    }

                    @Override
                    public void processBroadcastElement(_06_Rule value, KeyedBroadcastProcessFunction<String, _06_Shape, _06_Rule, String>.Context ctx, Collector<String> out) throws Exception {
                        // 获取广播流输入的数据,存入广播状态
                        System.out.println("输入的规则名称为=>" + value.getRuleName() + ",规则为=>" + value.getRule());
                        BroadcastState<String, _06_Rule> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);
                        broadcastState.put(value.getRuleName(), value);
                    }
                })
                .print();

        env.execute();
    }
}

2、Pojo 类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;

import java.io.Serializable;

@NoArgsConstructor
@AllArgsConstructor
@Data
public class _06_Rule implements Serializable {
    private String ruleName;
    private Tuple2<String,String> rule;
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@AllArgsConstructor
@NoArgsConstructor
@Data
public class _06_Shape implements Serializable {
    private String colour;
    private String type;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/594611.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

安全数据交换系统哪个好?该如何选型?

安全数据交换系统是用于在不同网络或组织之间安全、高效地传输和共享数据的解决方案。安全数据交换系统对于任何需要处理敏感数据、确保数据安全、并满足合规要求的组织来说都是至关重要的。 这种系统通常用于以下目的&#xff1a; 1&#xff09;数据传输&#xff1a;允许用户…

项目经理【人】原则

系列文章目录 【引论一】项目管理的意义 【引论二】项目管理的逻辑 【环境】概述 【环境】原则 【环境】任务 【环境】绩效 【人】概述 【人】原则 一、共创模式 1.1 共创模式 二、干系人的影响力强度和态度 2.1 干系人影响力 2.2 干系人态度 2.3 干系人管理 三、干系人权力…

自动驾驶融合定位系列教程五:惯性导航误差分析

自动驾驶融合定位系列教程五&#xff1a;惯性导航误差分析 一、概述 在定位领域的几乎所有多传感器融合系统中&#xff0c;都有IMU存在&#xff0c;而且&#xff0c;IMU是定位系统的主线与核心&#xff08;对此可能很多人并不同意&#xff0c;但是我仍然坚定地坚持这一观点&a…

spring中的bean是线程安全的嘛

在Spring框架中&#xff0c;bean默认情况下不是线程安全的。Spring容器在初始化bean时&#xff0c;会为其创建一个单例实例&#xff0c;这个实例在整个应用中是唯一的&#xff0c;并且只会被初始化一次。由于这个特性&#xff0c;bean在默认情况下不是线程安全的。 然而&#…

亚马逊测评工作室如何轻松实现高收益,跨境电商揭秘汇率差赚钱术

随着跨境电商在国内市场的持续繁荣&#xff0c;众多电商卖家纷纷将目光投向了这一充满活力的领域。面对国内市场的激烈竞争&#xff0c;许多卖家选择向外拓展&#xff0c;寻求更广阔的发展空间。其中&#xff0c;亚马逊成为了众多卖家的不二选择&#xff0c;毕竟老外的市场还是…

吴恩达2022机器学习专项课程C2(高级学习算法)W1(神经网络):2.3 案例图像识别

目录 电脑如何表示一张图像1.像素2.像素亮度值3.展开像素亮度值 神经网络构建人脸识别1.需求2.整体过程3.隐藏层识别图像4.小结 神经网络构建识别汽车神经网络在计算机视觉应用中的工作原理 电脑如何表示一张图像 1.像素 像素是图像最小单位&#xff0c;用于表示图像中的点或…

Android Ant编译环境配置(Win)

1、 载ant包: 2、设置环境变量&#xff1a; 3、检查是否设置成功及版本 4、执行命令&#xff1a; android update project -p . -n “projectname”&#xff08;例如&#xff1a;android update project --target 1 -p . -n “Couplet”&#xff09;(只输入红色部分也是可以的…

Unity3D DOTween

简单介绍一下 DOTween 插件的使用。 导入插件 先到 Asset Store 获取 DOTween 插件&#xff0c;然后在 Package Manager 的 My Assets 中搜索&#xff0c;下载并导入插件。 导入后&#xff0c;会自动弹出一个窗口&#xff0c;提示需要先对插件进行配置。 点击上图中的按钮&am…

Oracle 23ai 发布,国产数据库们都沉默了

几天前&#xff0c;全球最大的数据库软件公司 Oracle 发布了最新版的 Oracle Database 23ai &#xff0c;集成了最新的 AI Vector Search&#xff08;AI 向量搜索引擎&#xff09;&#xff0c;允许根据概念内容轻松搜索存储在任务关键型数据库中的文档、图像和关系数据&#xf…

Web安全:SQL注入漏洞详解,SQL注入常见功能、危害、分类、判断注入点、注入方式

「作者简介」&#xff1a;2022年北京冬奥会网络安全中国代表队&#xff0c;CSDN Top100&#xff0c;就职奇安信多年&#xff0c;以实战工作为基础对安全知识体系进行总结与归纳&#xff0c;著作适用于快速入门的 《网络安全自学教程》&#xff0c;内容涵盖系统安全、信息收集等…

我国碳酸甲乙酯需求量较大 市场集中度有望不断提升

我国碳酸甲乙酯需求量较大 市场集中度有望不断提升 碳酸甲乙酯&#xff08;EMC&#xff09;又称为碳酸乙基甲酯&#xff0c;是一种有机化合物。碳酸甲乙酯分子式为C4H8O3&#xff0c;多表现为一种具有果香味道的无色透明液体。碳酸甲乙酯具有毒性较低、溶解性优良等特点&#x…

书生浦语训练营第2期-第7节笔记

一、为什么要研究大模型的评测&#xff1f; 首先&#xff0c;研究评测对于我们全面了解大型语言模型的优势和限制至关重要。尽管许多研究表明大型语言模型在多个通用任务上已经达到或超越了人类水平&#xff0c;但仍然存在质疑&#xff0c;即这些模型的能力是否只是对训练数据的…

【Docker】docker compose服务编排

docker compose 简介 Dockerfile模板文件可以定义一个单独的应用容器&#xff0c;如果需要定义多个容器就需要服务编排。 docker swarm&#xff08;管理跨节点&#xff09; Dockerfile可以让用户管理一个单独的应用容器&#xff1b;而Compose则允许用户在一个模板&#xff08…

CentOS常用命令有哪些?

目录 一、CentOS常用命令有哪些&#xff1f; 二、不熟悉命令怎么办&#xff1f; 场景一&#xff1a;如果是文件操作&#xff0c;可以使用FileZilla工具来完成 场景二&#xff1a;安装CentOS桌面 一、CentOS常用命令有哪些&#xff1f; CentOS 系统中有许多常用命令及其用法…

老人摔倒监测识别摄像机

随着社会老龄化程度的不断加深&#xff0c;老年人的健康和安全问题日益凸显。在家中独居的老人&#xff0c;一旦发生意外摔倒等情况&#xff0c;往往难以及时得到帮助&#xff0c;造成了严重的安全隐患。为了解决这一问题&#xff0c;近年来&#xff0c;老人摔倒监测识别摄像机…

高效、便捷的重复文件查找与清理工具—4DDiG Duplicate File Deleter

在数字化时代&#xff0c;我们的电脑、手机、云盘等存储设备中&#xff0c;往往堆积着大量的文件。这些文件中有许多是重复的&#xff0c;它们不仅占用了宝贵的存储空间&#xff0c;还可能导致文件管理的混乱。为了解决这个问题&#xff0c;我们急需一款高效、便捷的重复文件查…

亚马逊、沃尔玛新店如何提升转化率?自养号测评的重要作用。

亚马逊作为全球最大的电商平台&#xff0c;每天都有成千上万的卖家在这里开设新店。然而&#xff0c;对于新店来说&#xff0c;如何在激烈的市场竞争中脱颖而出&#xff0c;实现高效的流量转化&#xff0c;是每位卖家都面临的挑战。 一、亚马逊新店怎么转化? 1、优化产品详情…

nginx 启动,查看,停止

nginx 启动&#xff0c;查看&#xff0c;停止 启动 start nginx 查看是否启动成功 tasklist | findstr nginx 停止 nginx -s stop 测试配置文件的语法是否有误 nginx -t 重启nginx nginx-s reload

AI算力提高,高能耗和难散热问题如何突破?

随着AI技术的广泛应用&#xff0c;从智能手机到自动驾驶汽车&#xff0c;从智能家居到工业自动化&#xff0c;AI供电芯片的需求量正呈爆炸式增长。它不仅为AI系统提供稳定的电力供应&#xff0c;确保系统的正常运行&#xff0c;而且还肩负着节能减排、降低能耗的重任。 然而随…

GaussDB数据库SQL系列-复合查询

目录 一、前言 二、复合查询基础 三、实际应用示例 1、使用UNION合并查询结果 2、使用INTERSECT找出共同元素 3、使用EXCEPT排除特定结果 四、高级技巧 1、子查询实例 2、JOIN的应用 五、总结 一、前言 GaussDB是华为自主创新研发的分布式关系型数据库&#xff0c;具…
最新文章