Flink CDC -Sqlserver to Sqlserver java 模版编写

1.基本环境

     <flink.version>1.17.0</flink.version>

2. 类文件

package com.flink.tablesql;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.io.File;
import java.util.List;

public class Main2 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

        String path = "E:\\test\\flinktestsql\\orders.sql";
//        String path = "/data/flink/flinksql/orders.sql";
        List<String> list = FileUtils.readLines(new File(path),"UTF-8");
        StringBuilder stringBuilder = new StringBuilder("");
        String sql = "";
        for(String var : list){
            if(StringUtils.isNotBlank(var)){
                stringBuilder.append(var);
                if(var.contains("$")){
                    sql = stringBuilder.toString().replace("$","");
                    System.out.println(sql);
                    System.out.println("end-----");
                    tabEnv.executeSql(sql);
                    stringBuilder = new StringBuilder("");
                }else{
                    stringBuilder.append("\n");
                }
            }
        }
    }
}

3.pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flinktestsql</artifactId>
    <version>1.0-SNAPSHOT</version>

<!--    &lt;!&ndash; 指定仓库位置,依次为aliyun、apache和cloudera仓库 &ndash;&gt;-->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>https://maven.aliyun.com/repository/public</url>
        </repository>
        <repository>
            <id>apache</id>
            <url>https://maven.aliyun.com/repository/apache-snapshots</url>
        </repository>

        <repository>
            <id>cloudera</id>
            <url>https://maven.aliyun.com/repository/gradle-plugin</url>
        </repository>
        <repository>
            <id>central</id>
            <url>https://maven.aliyun.com/repository/central</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <!--版本-->
    <properties>
        <encoding>UTF-8</encoding>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.17.0</flink.version>
        <slf4j.version>2.0.5</slf4j.version>
        <scala.binary.version>2.11</scala.binary.version>
        <scala.version>2.11.12</scala.version>
    </properties>

    <dependencies>
        <!-- Flink相关依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>1.17.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.microsoft.sqlserver</groupId>
            <artifactId>mssql-jdbc</artifactId>
            <version>11.2.1.jre8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.11.0</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-sqlserver-cdc</artifactId>
            <version>2.4.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.1.1-1.17</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-test-utils</artifactId>
            <version>1.17.1</version>
            <scope>test</scope>
        </dependency>

        <!-- 日志管理相关依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.19.0</version>
        </dependency>

    </dependencies>

</project>

4.后续只需要修改增加 .sql文件即可

       String path = "E:\\test\\flinktestsql\\orders.sql";

        // String path = "/data/flink/flinksql/orders.sql";

5.以上在本地运行未通过报错,在flink web 界面配置运行可以。我看网上可能是需要修改本地jdk配置,没有修改。

Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 驱动程序无法通过使用安全套接字层(SSL)加密与 SQL Server 建立安全连接。错误:“sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target”。 ClientConnectionId:d5fb789f-e4e9-416f-b47c-57dc09429ebf
	at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:3806)
	at com.microsoft.sqlserver.jdbc.TDSChannel.enableSSL(IOBuffer.java:1906)

6.flink web 端配置如下:只上传jar不行,还需要配置配置类,才可以

7.以上遇到很多问题,总算解决了,

如增加:

<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/180058.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

oracle数据库巡检常见脚本-系列二

简介 作为数据库管理员&#xff08;DBA&#xff09;&#xff0c;定期进行数据库的日常巡检是非常重要的。以下是一些原因&#xff1a; 保证系统的稳定性&#xff1a;通过定期巡检&#xff0c;DBA可以发现并及时解决可能导致系统不稳定的问题&#xff0c;如性能瓶颈、资源利用率…

MEMS制造的基本工艺——晶圆键合工艺

晶圆键合是一种晶圆级封装技术&#xff0c;用于制造微机电系统 (MEMS)、纳米机电系统 (NEMS)、微电子学和光电子学&#xff0c;确保机械稳定和气密密封。用于 MEMS/NEMS 的晶圆直径范围为 100 毫米至 200 毫米&#xff08;4 英寸至 8 英寸&#xff09;&#xff0c;用于生产微电…

提升企业财务管理效率的首选软件——Manager for Mac

Manager for Mac作为一款功能强大且用户友好的企业会计软件&#xff0c;不仅能够提升企业的财务管理效率&#xff0c;还能够保护企业财务数据的安全性和稳定性。如果您正在寻找一款适合Mac平台的企业会计软件&#xff0c;不妨选择Manager for Mac&#xff0c;让您的企业财务管理…

企业微信身份验证

本篇主要是在上一篇获取第三方凭证基础上&#xff0c;用户通过三方网站自定义授权登录后获取用户信息&#xff0c;以实现用户绑定登录功能。 构造第三方应用授权链接 如果第三方应用需要在打开的网页里面携带用户的身份信息&#xff0c; 第一步需要构造如下的链接来获取授权c…

opencv-Hough 直线变换

Hough 直线变换是一种在图像中检测直线的技术。它通过在极坐标空间中表示图像中的直线&#xff0c;将直线检测问题转换为参数空间的累加问题。OpenCV 提供了 cv2.HoughLines() 和 cv2.HoughLinesP() 函数来执行 Hough 直线变换。 cv2.HoughLines() lines cv2.HoughLines(ima…

iOS APP包分析工具 | 京东云技术团队

介绍 分享一款用于分析iOSipa包的脚本工具&#xff0c;使用此工具可以自动扫描发现可修复的包体积问题&#xff0c;同时可以生成包体积数据用于查看。这块工具我们团队内部已经使用很长一段时间&#xff0c;希望可以帮助到更多的开发同学更加效率的优化包体积问题。 工具下载…

深度学习之六(自编码器--Autoencoder)

概念 自编码器(Autoencoder)是一种神经网络架构,用于无监督学习和数据的降维表示。它由两部分组成:编码器(Encoder)和解码器(Decoder)。 结构: 编码器(Encoder): 接收输入数据并将其压缩为潜在表示(latent representation),通常比输入数据的维度要低。编码器的…

探索结构体的奥秘

目录 &#x1f342;结构体 1&#xff0c;结构体的声明 1.1 结构的基础知识 1.2 结构的声明 1.3 特殊的声明 1.4 结构的自引用 1.5 结构体变量的定义和初始化 1.6 结构体内存对齐 1.6.1 如何计算 1.6.2 为什么存在内存对齐 1.7 修改默认对齐数 1.8 结构体传参 2&am…

小型内衣裤洗衣机哪个牌子好?性价比小型洗衣机推荐

内衣内裤应该如何清洗才能实现在不伤衣的同时有能够洗干净呢&#xff1f;其实除了使用温水搭配手洗以外&#xff0c;还有一些清洗方式&#xff0c;那就是选择一台专门为内衣定制的内衣洗衣机。目前内衣洗衣机由于精致小巧&#xff0c;方便安装&#xff0c;方便使用&#xff0c;…

常见网络安全防护

1 阻断服务攻击&#xff08;DOS&#xff09; 阻断服务攻击&#xff0c;想办法目标网络资源用尽变种&#xff1a;分布式阻断服务攻击 影响&#xff1a; 宽带消耗性&#xff08;消耗目标的带宽&#xff09;资源消耗型&#xff08;消耗目标的计算资源&#xff09; 解决方案&am…

【OpenCV+OCR】计算机视觉:识别图像验证码中指定颜色文字

文章目录 1. 写在前面2. 读取验证码图像3. 生成颜色掩码4. 生成黑白结果图5. OCR文字识别6. 测试结果 【作者主页】&#xff1a;吴秋霖 【作者介绍】&#xff1a;Python领域优质创作者、阿里云博客专家、华为云享专家。长期致力于Python与爬虫领域研究与开发工作&#xff01; 【…

易点易动设备管理系统提升设备能耗管理和设备状态监控效率

如今&#xff0c;能源效率和设备状态监控对于企业来说变得越发重要。传统的设备管理方式往往存在能耗浪费和难以实时监控设备状态的问题。为了解决这些问题&#xff0c;易点易动设备管理系统应运而生。本文将介绍易点易动设备管理系统的功能和优势&#xff0c;以及如何通过它提…

FreeRTOS学习之路,以STM32F103C8T6为实验MCU(2-2:中断)

学习之路主要为FreeRTOS操作系统在STM32F103&#xff08;STM32F103C8T6&#xff09;上的运用&#xff0c;采用的是标准库编程的方式&#xff0c;使用的IDE为KEIL5。 注意&#xff01;&#xff01;&#xff01;本学习之路可以通过购买STM32最小系统板以及部分配件的方式进行学习…

Python中使用pyzbar实现二维码生成和识别功能

目录 一、引言 二、pyzbar库介绍 三、安装pyzbar库 四、使用pyzbar生成二维码 五、使用pyzbar识别二维码 一、引言 随着二维码的普及和应用&#xff0c;二维码生成和识别功能在日常生活和工作中越来越重要。在Python中&#xff0c;我们可以使用pyzbar库来实现二维码生成和…

9.9 Windows驱动开发:内核远程线程实现DLL注入

在笔者上一篇文章《内核RIP劫持实现DLL注入》介绍了通过劫持RIP指针控制程序执行流实现插入DLL的目的&#xff0c;本章将继续探索全新的注入方式&#xff0c;通过NtCreateThreadEx这个内核函数实现注入DLL的目的&#xff0c;需要注意的是该函数在微软系统中未被导出使用时需要首…

TikTok与精神健康:社交媒体在压力时代的作用

在当今数字化和社交化的时代&#xff0c;社交媒体已成为人们生活中不可或缺的一部分。其中&#xff0c;TikTok作为一款备受欢迎的短视频应用&#xff0c;不仅改变了人们的娱乐方式&#xff0c;也对精神健康产生了深远的影响。 本文将深入探讨TikTok在压力时代对精神健康的作用…

【VScode】安装配置、插件及远程SSH连接

一、VSCode安装 二、配置安装插件 三、配置远程连接SSH 四、MinGW 一、VSCode安装 VS官网 Visual Studio Code - Code Editing. Redefined下载安装包&#xff1a; 二、配置安装插件 安装中文插件 配置字体为20 配置文件–>首选项->设置->Font Size为20 设置 VSC…

【2021集创赛】基于ARM-M3的双目立体视觉避障系统 SOC设计

本作品参与极术社区组织的有奖征集|秀出你的集创赛作品风采,免费电子产品等你拿~活动。 团队介绍 参赛单位&#xff1a;上海电力大学 队伍名称&#xff1a;骇行队 总决赛奖项&#xff1a;二等奖 1.摘要 随着信息技术的发展&#xff0c;AGV&#xff08;Automated Guided Vehic…

oracle的debjob挂載查詢

背景 有一個需求需要定時去執行一個produce&#xff0c;可以使用oracle的dbjob定時執行&#xff0c;相比較之前的vbs更加絲滑 --傳遞produce 開始的時間 頻率 declarea number;beginDBMS_JOB.SUBMIT(a,xx_warehouse_daliy_record_p;,to_date(202311230800,yyyymmddhh24mi),…

第二证券:北证50指数一枝独秀 短剧游戏概念股持续活跃

周三&#xff0c;沪深两市三大指数颤动调整&#xff0c;北证50指数“鹤立鸡群”&#xff0c;大涨超8%。到收盘&#xff0c;上证综指报3043.61点&#xff0c;跌0.79%&#xff1b;深证成指报9855.66点&#xff0c;跌1.41%&#xff1b;创业板指报1950.01点&#xff0c;跌1.73%。沪…
最新文章