Kafka模拟器产生数据仿真-集成StructuredStreaming做到”毫秒“级实时响应StreamData落地到mysql

          这是仿真过程某图:

仿真场景kafkaStream
仿真实战kafka
 

 kafka消费sink端和StructuredStreaming集成通信成功 , 数据接收全部接收

数据落地情况: 

全部接收到并all存入mysql

下面就简单分享一下StructuredStreaming代码吧

import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.streaming.{ OutputMode, Trigger}
import org.apache.spark.sql.types.{IntegerType, StringType,  StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}


    val spark: SparkSession = SparkSession.builder()
      .appName("kafkaConsumer")
      .master("local[3]")
      .getOrCreate()

    import spark.implicits._


    // 定义json字段类型格式
    val Jsonschmea: StructType = new StructType()
      .add("id", dataType = IntegerType)
      .add("name", dataType = StringType)
      .add("sorce", dataType = IntegerType)


    val message: DataFrame = spark.readStream // message为从kafka读到的原数据
      .format("kafka")
      .option("kafka.bootstrap.servers", "xxxxx:9092,xxxx:9092,xxxx:9092")
      .option("subscribe", "xxxx")
      .option("startingOffsets", "latest")
      .load()



    // 将json字符串转化为结构化数据

    val streamData: DataFrame = message.selectExpr("cast(value as String) as message") 
      .select(from_json($"message", Jsonschmea).alias("data"))
    // 将json结构化为新的df


    // 预加载mysql驱动

    // 实时写入 第二个参数预占位,want给每一批次加入唯一表示, but本次仅占位没有传参数
    def writeToMysql(batchDF: DataFrame, epochId: Long): Unit = {
      val sqlurl = "jdbc:mysql://localhost:xxxx/xxxx"
      val sqluser = "xxxx"
      val sqlpass = "xxxxx"

      Class.forName("com.mysql.cj.jdbc.Driver")  // mysql 8.0后得驱动,旧版本去掉cj

      batchDF.foreachPartition {
        partitionOfRecords =>
          val connection = DriverManager.getConnection(sqlurl, sqluser, sqlpass)

          // 关闭自动提交以支持增量写入
          connection.setAutoCommit(false)
          // 创建预编译的插入语句
          val insertsql = "insert into jsonstream(id,name,sorce) values(?,?,?)"
          val preparedStatement = connection.prepareStatement(insertsql)

          partitionOfRecords.foreach {
            row =>
//              val id = row.getAs[Int]("data.id")
//              val name = row.getAs[String]("data.name")
//              val score = row.getAs[Int]("data.sorce")

              
              val id = row.getAs[Row]("data").getAs[Int]("id")
              val name = row.getAs[Row]("data").getAs[String]("name")
              val sorce = row.getAs[Row]("data").getAs[Int]("sorce")

            // 设置参数到预处理sql函数中
              preparedStatement.setInt(1, id)
              preparedStatement.setString(2, name)
              preparedStatement.setInt(3, sorce)

              // 执行添加到批次操作
              preparedStatement.addBatch()
          }
          preparedStatement.executeBatch()
          connection.commit() // 执行批处理后手动提交事务


          preparedStatement.close()  // 手动GC
          connection.close()
      }
    }

// 数据落地到数据库
      streamData.writeStream
        .outputMode(OutputMode.Append())
        .foreachBatch(writeToMysql _)
        .trigger(Trigger.ProcessingTime("1 millisecond")) // 1 毫秒每个batch
        .start()
        .awaitTermination()

存储按照一定批次量做存储   

友情提示 : 上述程序是经过脱敏处理的哦

----彩蛋----

如果你看到者你会知道scala在11更新之后也就是12版本如下:

batchDF.foreachPartition {
  partitionOfRecords => ... 这个位置

 Dataset的foreachPartition 里面不能处理 Row的Iterator, 所以需要转为rdd在做处理

所以更改后为

batchDF.rdd.foreachPartition { partitionOfRecords => ...

而且这里不能用foreach , 否则无法序列化就能存储到mysql, 不能被序列化的数据是不能在网络中进行传输的,通过二进制流的形式传出,在被反序列化回来转化为对象的形式存储

ok -----

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/457419.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

SpringMVC请求、响应和拦截器的使用

SpringMVC请求 RequestMapping注解 RequestMapping注解的作用是建立请求URL和处理方法之间的对应关系 RequestMapping注解可以作用在方法和类上 1. 作用在类上:第一级的访问目录 2. 作用在方法上:第二级的访问目录 3. 细节:路径可以不编写…

Gitee 服务器

Git 服务器集成 1. 创建仓库 2. 远程仓库简易操作指令 # Git 全局设置,修改成自己的信息 git config --global user.name "Muko" git config --global user.email "txk0x7d2163.com" # 创建 git 仓库,基本操作指令和其他远程仓库一…

【STL】set容器、pair队组与map容器

目录 1.修改set容器排序规则 2. set容器的各种函数 3.set构造函数multiset 4.创建pair队组 5.map容器 1.修改set容器排序规则 set容器会自动以升序的方式进行排序&#xff0c;想要改变可以制定排序规则&#xff0c;set<int,排序规则> s&#xff1b; 但需要注意&am…

Mybatis(搭建,CRUD,方法参数,XML映射文件,动态SQL)【详解】

目录 一.准备基础代码 Mybatis的通用配置 二. 基本CURD操作 1.查询-根据id查询一条 2.查询-查询数量 3.删除 4.新增 获取主键值 5.修改 6.查询-模糊查询 预编译SQL #{}与${}的区别【面试题】 三. Mybatis的方法参数与结果集 1.SQL里取方法参数的值 2.查询结果集…

mac解决brew install报错“fatal: not in a git directory“

在macbook上使用brew安装软件时&#xff0c;可能会遇到问题&#xff0c;报错如下&#xff1a; fatal: not in a git directory Error: Command failed with exit 128: git 使用brew -v&#xff0c;仔细看&#xff0c;可以发现有两个fatal(致命错误)提示: 解决方案&#xff1a;…

下载chromedrive,使用自动化

1、先看一下自己浏览器的版本 2、访问 https://googlechromelabs.github.io/chrome-for-testing/

Nginx、LVS、HAProxy工作原理和负载均衡架构

当前大多数的互联网系统都使用了服务器集群技术&#xff0c;集群是将相同服务部署在多台服务器上构成一个集群整体对外提供服务&#xff0c;这些集群可以是 Web 应用服务器集群&#xff0c;也可以是数据库服务器集群&#xff0c;还可以是分布式缓存服务器集群等等。 在实际应用…

ChatGPT提问技巧——对抗性提示

ChatGPT提问技巧——对抗性提示 对抗性提示是一种允许模型生成能够抵御某些类型的攻击或偏差的文本的技术。这种技术可用于训练更健壮、更能抵御某些类型的攻击或偏差的模型。 要在 ChatGPT 中使用对抗性提示&#xff0c;应为模型提供一个提示&#xff0c;该提示的设计应使模…

微信小程序之tabBar

1、tabBar 如果小程序是一个多 tab 应用&#xff08;客户端窗口的底部或顶部有 tab 栏可以切换页面&#xff09;&#xff0c;可以通过 tabBar 配置项指定 tab 栏的表现&#xff0c;以及 tab 切换时显示的对应页面。 属性类型必填默认值描述colorHexColor是tab 上的文字默认颜色…

Web框架开发-Django的视图层

一、视图函数 一个视图函数,简称视图,是一个简单的Python 函数,它接受Web请求并且返回Web响应。响应可以是一张网页的HTML内容,一个重定向,一个404错误,一个XML文档,或者一张图片. . . 是任何东西都可以。无论视图本身包含什么逻辑,都要返回响应。代码写在哪里也无所谓…

【QT】TCP简易聊天框

我们首先复习一下TCP通信的流程 基于linuxTCP客户端和服务器 QT下的TCP处理流程 服务器先启动&#xff08;处于监听状态&#xff09; 各函数的意义和使用 QTcpServer Class *QTcpServer*类提供了一个基于TCP的服务器。这个类可以接受传入的TCP连接。您可以指定端口或让QTcpS…

解决无法登录到 ArcGIS Server Administrator

目录 问题复现原因分析解决办法 问题复现 今天在访问arcgisserver后台准备设置arcgis api for js请求路径时&#xff0c;登录之后出现500错误。Services Directoryhttp://xxx.xxx.xxx.xxx:6080/arcgis/admin/system/handlers/rest/servicesdirectory 原因分析 我实在两台虚拟机…

信号与系统学习笔记——信号的分类

目录 一、确定与随机 二、连续与离散 三、周期与非周期 判断是否为周期函数 离散信号的周期 结论 四、能量与功率 定义 结论 五、因果与反因果 六、阶跃函数 定义 性质 七、冲激函数 定义 重要关系 作用 一、确定与随机 确定信号&#xff1a;可以确定时间函数…

提升运营效率,探索运营中台架构的力量

随着数字化转型的加速推进&#xff0c;企业需要更高效地管理和运营各项业务&#xff0c;而运营中台架构作为一种新型的业务架构设计理念&#xff0c;正在逐渐受到关注和应用。本篇博客将深入探讨运营中台架构的概念、优势和实践&#xff0c;帮助企业了解如何通过构建运营中台实…

CVPR2023 | 3D Data Augmentation for Driving Scenes on Camera

3D Data Augmentation for Driving Scenes on Camera 摄像机驾驶场景的 3D 数据增强 摘要翻译 驾驶场景极其多样和复杂&#xff0c;仅靠人力不可能收集到所有情况。虽然数据扩增是丰富训练数据的有效技术&#xff0c;但自动驾驶应用中现有的摄像头数据扩增方法仅限于二维图像…

[蓝桥杯]-最大的通过数-CPP-二分查找、前缀和

目录 一、题目描述&#xff1a; 二、整体思路&#xff1a; 三、代码&#xff1a; 一、题目描述&#xff1a; 二、整体思路&#xff1a; 首先要知道不是他们同时选择序号一样的关卡通关&#xff0c;而是两人同时进行两个入口闯关。就是说两条通道存在相同关卡编号的的关卡被通…

PlantUML Integration 编写短信服务类图

PlantUML Integration 写一个类图&#xff0c;主要功能为 1、编写一个serviceSms短信服务类&#xff1b; 2、需要用到短信的地方统一调用基建层的服务即可&#xff1b; 3、可以随意切换、增加短信厂商&#xff0c;不需要更改场景代码&#xff0c;只需要更改application.yml 里面…

SQLiteC/C++接口详细介绍之sqlite3类(七)

上一篇&#xff1a;SQLiteC/C接口详细介绍之sqlite3类&#xff08;六&#xff09; 下一篇&#xff1a; SQLiteC/C接口详细介绍之sqlite3类&#xff08;八&#xff09;&#xff08;未发表&#xff09; 22.sqlite3_create_collation、sqlite3_create_collation16和sqlite3_creat…

JavaEE--小Demo

目录 下载包 配置 修改文件 pom.xml application.properties 创建文件 HelloApi.java GreetingController.java Greeting.java DemoApplication.java 运行包 运行命令 mvn package cd target dir java -jar demo-0.0.1-SNAPSHOT.jar 浏览器测试结果 下载包 …

网络安全专题第一篇:网络安全的来源

目录 一.网络安全的由来。 二.网络安全漏洞在哪里 三.网络安全规范操作 1.从业务入手 2.从安全体系入手 3.从管理入手 四.可能遇到的网络攻击 1.DDOS 2.勒索攻击 3.单包攻击 4.员工删库跑路 5.熊猫烧香 五.应对方法 1.清洗 2.提高服务器的承受能力 3.防火墙 4…
最新文章