Delta lake with Java--使用stream同步数据

今天继续学习Delta lake Up and Running 的第8章,处理流数据,要实现的效果就是在一个delta表(名为:YellowTaxiStreamSource)插入一条数据,然后通过流的方式能同步到另外一个delta表 (名为:YellowTaxiStreamTarget)。接着在YellowTaxiStreamSource更新数据YellowTaxiStreamTarget也能更新。至于删除也尝试过了,发现删除是没有办法同步的。

一、先上代码,今天的代码分3份

第1份:用来启动流

import io.delta.tables.DeltaTable;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;

public class DeltaLakeStream {

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .master("local[*]")
                .appName("delta_lake")
                .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
                .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
                .getOrCreate();

        String targetPath="D:\\bigdata\\detla-lake-with-java\\YellowTaxiStreamTarget";
        spark.sql("CREATE DATABASE IF NOT EXISTS taxidb");
        //定义源数据表
        spark.sql("CREATE TABLE IF NOT EXISTS taxidb.YellowTaxiSource(" +
                "RideID INT," +
                "PickupTime TIMESTAMP," +
                "CabNumber STRING)" +
                "USING DELTA LOCATION 'file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxiStreamSource'"
        );

        //定义目标数据表
        spark.sql("CREATE TABLE IF NOT EXISTS taxidb.YellowTaxiTarget(" +
                "RideID INT," +
                "PickupTime TIMESTAMP," +
                "CabNumber STRING)" +
                "USING DELTA LOCATION 'file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxiStreamTarget'"
        );

        //通过流的方式读取元数据表,记得要option("ignoreChanges", "true")否则报错
        var stream_df=spark.readStream().option("ignoreChanges", "true").format("delta").load("file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxiStreamSource");

        //打开目标表,用于后面同步数据
        var deltaTable = DeltaTable.forPath(spark, targetPath);
  //   var streamQuery=stream_df.writeStream().format("delta").option("checkpointLocation", targetPath+"\\_checkpoint").start(targetPath);

        //定义同步流,如果目标表的记录与更新记录的RideID相等则更新,没有找到则插入新记录
        var streamQuery=stream_df.writeStream().format("delta").foreachBatch((batchDf,batchId)->{
         deltaTable.as("t").merge(batchDf.as("s"),"t.RideID==s.RideID")
                 .whenMatched().updateAll()
                 .whenNotMatched().insertAll()
                 .execute();


      }).outputMode("Update").start(targetPath);


     try {
            System.out.println("启动stream监听");
            streamQuery.awaitTermination(); //启动流
        } catch (StreamingQueryException e) {
            throw new RuntimeException(e);
        }
    }
}

第2份:用来操作源数据表

import org.apache.spark.sql.SparkSession;

public class DeltaLakeStreamSource {

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .master("local[*]")
                .appName("delta_lake")
                .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
                .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
                .getOrCreate();


        spark.sql("CREATE DATABASE IF NOT EXISTS taxidb");
        spark.sql("CREATE TABLE IF NOT EXISTS taxidb.YellowTaxiSource(" +
                "RideID INT," +
                "PickupTime TIMESTAMP," +
                "CabNumber STRING)" +
                "USING DELTA LOCATION 'file:///D:\\\\bigdata\\\\detla-lake-with-java\\\\YellowTaxiStreamSource'"
        );
        //验证插入
        spark.sql("INSERT INTO taxidb.YellowTaxiSource (RideID,PickupTime,CabNumber) values (1,'2013-10-13 10:13:15','11-96')").show(false);
        //验证更新  
       //spark.sql("UPDATE taxidb.YellowTaxiSource SET CabNumber='199-99' WHERE RideID=1").show(false);
        //验证删除,不过无效  
        //spark.sql("DELETE FROM taxidb.YellowTaxiSource WHERE RideID=1").show(false);
        spark.sql("SELECT RideID,PickupTime,CabNumber FROM taxidb.YellowTaxiSource").show(false);
        spark.close();
    }
}

第3份:用来验证目标数据表的同步结果

import org.apache.spark.sql.SparkSession;


public class DeltaLakeStreamTarget {

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .master("local[*]")
                .appName("delta_lake")
                .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
                .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
                .getOrCreate();

        String targetPath="D:\\bigdata\\detla-lake-with-java\\YellowTaxiStreamTarget";
        spark.read().format("delta").load(targetPath).show();

    }
}

二、运行验证

1、先运行DeltaLakeStream,具体运行结果如下图:

2、验证插入数据同步

运行DeltaLakeStreamSource,插入一条RideID=1的数据,具体运行结果如下图:

接着运行 DeltaLakeStreamTarget,看一下数据是否已经通过流的方式同步到目标表,具体运行结果如下图:

3、验证更新数据同步

将DeltaLakeStreamSource的插入数据代码注释掉,同时将更新代码打开,然后运行,将RideID=1的记录的CabNumber值得从11-96修改成199-99,具体运行结果如下图:

接着运行 DeltaLakeStreamTarget,看一下数据是否已经通过流的方式同步到目标表,具体运行结果如下图:

至于删除也尝试过,没有成功,不知道是不是不支持,还望高手指教。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/603430.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

机器人系统可以支持对接人工系统吗?

​ 随着科技的飞速发展,机器人系统在各行各业都扮演着越来越重要的角色。它们可以高效地处理大量数据,执行繁琐的任务,甚至在某些领域超越了人类的能力。然而,机器人系统也有其局限性,特别是在处理复杂的人际交往…

自动驾驶主流芯片及平台架构(四)华为、​高通、英伟达高算力平台

上一章节有提到低算力的自动驾驶平台,本章内容重点介绍高算力的自动驾驶平台,华为、高通、英伟达。 华为自动驾驶MDC平台方案介绍 以整车数字架构为基础,全面管理软硬件的复杂性,并确保整车的可靠性:我们提出华为CCAVe…

批量图片重命名及汇总

又一堆图片文件需要处理... 源文件分布: 有N个文件夹,每个文件夹下又有M个子文件夹,每个子文件夹下有X张图片。 例如文件夹A下有子文件夹A1,A2,A3,子文件夹A1下有图片a-1,a-2,a-3...... 处理目标: 1、将所有图片汇…

算法提高之树的中心

算法提高之树的中心 核心思想:树形dp 换根dp 每个点作为根节点 找其子树的最大距离和父节点的最大距离 dfs1:求子树对于当前根节点的最大距离和次大距离 求次大距离原因:如果当前节点是其父节点子树的最大路径上的点,最大距离不…

基于Linux中的 进程相关知识 综合讲解

目录 一、进程的基本概念 二、pid,ppid,fork函数 三、进程的状态讲解 四、进程的优先级 五、完结撒❀ 一、进程的基本概念 概念: ● 课本概念:程序的一个执行实例,正在执行的程序等 ● 内核观点:担当…

深度学习实例2_车牌识别分割——自学笔记

import cv2 from matplotlib import pyplot as plt import os import numpy as np from PIL import ImageFont, ImageDraw, Image彩色图片显示 def plt_show0(img):b,g,r = cv2.split(img)img = cv2.merge([r, g, b])plt.imshow(img)plt.show()灰度图片显示 def plt_show(img…

暗区突围PC测试资格获取 Twitch老鼠台一键领取测试资格教程

Twitch平台,这个广受欢迎的直播巨头,不仅是游戏文化的直播聚集地,还常与各类游戏携手合作,为观众带来独特的互动体验,观看直播即可解锁游戏内奖励。正值热门游戏《暗区突围》PC版测试阶段,Twitch再次发力&a…

Ai时代使用语音笔记整理文稿提高创作效率

其实传统的创作方式是用钢笔或者圆珠笔手写草稿。成稿后花钱誊抄数份邮寄给出版商。 计算机普及后,有人开始直接使用打字机或计算机创做,打字其实要比手写的速度快数倍,这样效率的提升,加上文创平台基本上都是按字数给收益&#…

去哪找高清视频素材?哪个网站有视频素材?

在这个视觉表达日益重要的时代,获取高品质的视频素材变得尤为关键。4K和无水印视频素材特别受到创作者的青睐,因为它们能极大地提升视觉作品的吸引力和专业度。接下来,我将介绍几个国内外的优秀视频素材网站,助您在创作旅程上一帆…

为什么现在越来越多的人会选择陪诊

现在越来越多的人选择陪诊的原因有多方面。 首先,随着人口老龄化、医疗资源分配不均等问题的日益突出,许多老年人和病患在就医过程中面临诸多困难,如挂号、排队、取药等繁琐的手续和流程。陪诊服务能够为他们提供极大的便利,帮助…

[初阶数据结构】单链表

前言 📚作者简介:爱编程的小马,正在学习C/C,Linux及MySQL。 📚本文收录于初阶数据结构系列,本专栏主要是针对时间、空间复杂度,顺序表和链表、栈和队列、二叉树以及各类排序算法,持…

添砖Java之路其三——自增自减运算符,数据转换与原码反码补码。

目录 运算符: 转换: 隐式转换: 小范围数据可以直接可以给大范围数据: 这里做了一张图范围向下兼容表​编辑 运算时,数据范围小的和数据范围大的,需要讲运算范围小的提升为运算范围大的同类&#xff0c…

软考系列必过资料分享-系统架构师-系统分析师-信息系统项目管理师

建议,写在前面 知识点是公用的,原则上不分新旧。每年会有少部分的题目切合当前时间段(也是通过旧的知识演变的) 信息系统项目管理师证书 系统架构师证书 系统分析师证书 资料分享 关注公众号 回复 信息系统项目管理师资料 即可获取信息系统项目管理师资…

如何使用香草看涨期权进行投机?

如何使用香草看涨期权进行投机? 香草看涨期权,通常也称为香草期权,是金融市场上的一种金融衍生品,由券商或金融机构推出。使用香草看涨期权进行投机,主要依赖于对市场走势的预测和对杠杆效应的运用。以下是一些关键步…

【前端】-【前端文件操作与文件上传】-【前端接受后端传输文件指南】

目录 前端文件操作与文件上传前端接受后端传输文件指南 前端文件操作与文件上传 一、前端文件上传有两种思路: 二进制blob传输:典型案例是formData传输,相当于用formData搭载二进制的blob传给后端base64传输:转为base64传输&…

力扣HOT100 - 35. 搜索插入位置

解题思路&#xff1a; 二分法模板 class Solution {public int searchInsert(int[] nums, int target) {int left 0;int right nums.length - 1;while (left < right) {int mid left ((right - left) >> 1);if (nums[mid] target)return mid;else if (nums[mid…

spring模块(六)spring监听器(1)ApplicationListener

一、介绍 1、简介 当某个事件触发的时候&#xff0c;就会执行的方法块。 当然&#xff0c;springboot很贴心地提供了一个 EventListener 注解来实现监听。 2、源码&#xff1a; package org.springframework.context;import java.util.EventListener; import java.util.fu…

深入解析智能指针:从实践到原理

&#x1f466;个人主页&#xff1a;晚风相伴 &#x1f440;如果觉得内容对你有所帮助的话&#xff0c;还请一键三连&#xff08;点赞、关注、收藏&#xff09;哦 如果内容有错或者不足的话&#xff0c;还望你能指出。 目录 智能指针的引入 内存泄漏 RAII 智能指针的使用及原…

安卓LayoutParams浅析

目录 前言一、使用 LayoutParams 设置宽高二、不设置 LayoutParams2.1 TextView 的 LayoutParams2.2 LinearLayout 的 LayoutParams 三、getLayoutParams 的使用四、setLayoutParams 的作用五、使用 setWidth/setHeight 设置宽高 前言 先来看一个简单的布局&#xff0c;先用 x…

百元挂耳式耳机哪款好?五款高品质一流机型不容错过

开放式耳机以其独特的不入耳设计&#xff0c;大大提升了佩戴的舒适度。相较于传统的入耳式耳机&#xff0c;它巧妙地避免了对耳朵的压迫&#xff0c;降低了中耳炎等潜在风险。不仅如此&#xff0c;开放式耳机还能让你保持对周边声音的灵敏度&#xff0c;无论是户外跑步还是骑行…
最新文章