DataX-Oracle新增writeMode支持update

目录

前言

第一步下载源码

第二步修改源码

1、Oraclewriter

2、WriterUtil

 2.1、修改getWriteTemplate方法

 2.2、新增onMergeIntoDoString与getStrings方法

3、CommonRdbmsWriter

 3.1、修改startWriteWithConnection

 3.2、修改doBatchInsert

 3.3、修改fillPreparedStatement

第三步打包

第四步脚本修改

修改后jar包地址 




前言

目前 DataX更新到datax_v202309版本还不能支持Oracle写入的update,只通过DataX只能修改源码。

原理:oracle 不支持类似 MySQL的 REPLACE INTO 和 INSERT … ON DUPLICATE KEY UPDATE,所以只支持 insert 配置项。要实现此功能,需要利用 Oracle 的 merge 语句,先来看下 merge 语法。

MERGE INTO [target-table] A USING [source-table sql] B 
ON([conditional expression] and [...]...) 
WHEN MATCHED THEN
 [UPDATE sql] 
WHEN NOT MATCHED THEN 
 [INSERT sql]

第一步下载源码

 地址:datax_v202309。

第二步修改源码

一共修改3个文件

1、Oraclewriter

 

找到该代码直接注释掉就行。 

2、WriterUtil
 2.1、修改getWriteTemplate方法
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) {
        boolean update = writeMode.trim().toLowerCase().startsWith("update");
        boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert")
                || writeMode.trim().toLowerCase().startsWith("replace")
                || update;

        if (!isWriteModeLegal) {
            throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,
                    String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode));
        }
        // && writeMode.trim().toLowerCase().startsWith("replace")
        String writeDataSqlTemplate;
        if (forceUseUpdate || update) {
            //update只在mysql下使用
            if (dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) {
                writeDataSqlTemplate = new StringBuilder()
                        .append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ","))
                        .append(") VALUES(").append(StringUtils.join(valueHolders, ","))
                        .append(")")
                        .append(onDuplicateKeyUpdateString(columnHolders))
                        .toString();
            }
            //update在Oracle下使用
            else if (dataBaseType == DataBaseType.Oracle) {
                writeDataSqlTemplate = onMergeIntoDoString(writeMode, columnHolders, valueHolders) + "INSERT (" +
                        StringUtils.join(columnHolders, ",") +
                        ") VALUES(" + StringUtils.join(valueHolders, ",") +")";
            }else {
                throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,
                        String.format("当前数据库不支持 writeMode:%s 模式.", writeMode));
            }
        } else {

            //这里是保护,如果其他错误的使用了update,需要更换为replace
            if (update) {
                writeMode = "replace";
            }
            writeDataSqlTemplate = new StringBuilder().append(writeMode)
                    .append(" INTO %s (").append(StringUtils.join(columnHolders, ","))
                    .append(") VALUES(").append(StringUtils.join(valueHolders, ","))
                    .append(")").toString();
        }

        return writeDataSqlTemplate;
    }
 2.2、新增onMergeIntoDoString与getStrings方法

代码作用:对Oracle进行update的MERGE拼接

public static String onMergeIntoDoString(String merge, List<String> columnHolders, List<String> valueHolders) {
        String[] sArray = getStrings(merge);
        StringBuilder sb = new StringBuilder();
        sb.append("MERGE INTO %s A USING ( SELECT ");
        boolean first = true;
        boolean first1 = true;
        StringBuilder str = new StringBuilder();
        StringBuilder update = new StringBuilder();
        for (String columnHolder : columnHolders) {
            if (Arrays.asList(sArray).contains(columnHolder)) {
                if (!first) {
                    sb.append(",");
                    str.append(" AND ");
                } else {
                    first = false;
                }
                str.append("TMP.").append(columnHolder);
                sb.append("?");
                str.append(" = ");
                sb.append(" AS ");
                str.append("A.").append(columnHolder);
                sb.append(columnHolder);
            }
        }

        for (String columnHolder : columnHolders) {
            if (!Arrays.asList(sArray).contains(columnHolder)) {
                if (!first1) {
                    update.append(",");
                } else {
                    first1 = false;
                }
                update.append(columnHolder);
                update.append(" = ");
                update.append("?");
            }
        }

        sb.append(" FROM DUAL ) TMP ON (");
        sb.append(str);
        sb.append(" ) WHEN MATCHED THEN UPDATE SET ");
        sb.append(update);
        sb.append(" WHEN NOT MATCHED THEN ");
        return sb.toString();
    }

    public static String[] getStrings(String merge) {
        merge = merge.replace("update", "");
        merge = merge.replace("(", "");
        merge = merge.replace(")", "");
        merge = merge.replace(" ", "");
        return merge.split(",");
    }
3、CommonRdbmsWriter
 3.1、修改startWriteWithConnection
        // 替换原先的代码块
        public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {
            this.taskPluginCollector = taskPluginCollector;
            List<String> columns = new LinkedList<>();
            if (this.dataBaseType == DataBaseType.Oracle && writeMode.trim().toLowerCase().startsWith("update") ) {
                String merge = this.writeMode;
                String[] sArray = WriterUtil.getStrings(merge);
                this.columns.forEach(column->{
                    if (Arrays.asList(sArray).contains(column)) {
                        columns.add(column);
                    }
                });
                this.columns.forEach(column->{
                    if (!Arrays.asList(sArray).contains(column)) {
                        columns.add(column);
                    }
                });
            }
            columns.addAll(this.columns);
            // 用于写入数据的时候的类型根据目的表字段类型转换
            this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, StringUtils.join(columns, ","));
            // 写数据库的SQL语句
            calcWriteRecordSql();
 
            List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);
            int bufferBytes = 0;
            try {
                Record record;
                while ((record = recordReceiver.getFromReader()) != null) {
                    if (record.getColumnNumber() != this.columnNumber) {
                        // 源头读取字段列数与目的表字段写入列数不相等,直接报错
                        throw DataXException
                                .asDataXException(
                                        DBUtilErrorCode.CONF_ERROR,
                                        String.format(
                                                "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
                                                record.getColumnNumber(),
                                                this.columnNumber));
                    }
 
                    writeBuffer.add(record);
                    bufferBytes += record.getMemorySize();
 
                    if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {
                        doBatchInsert(connection, writeBuffer);
                        writeBuffer.clear();
                        bufferBytes = 0;
                    }
                }
                if (!writeBuffer.isEmpty()) {
                    doBatchInsert(connection, writeBuffer);
                    writeBuffer.clear();
                    bufferBytes = 0;
                }
            } catch (Exception e) {
                throw DataXException.asDataXException(
                        DBUtilErrorCode.WRITE_DATA_ERROR, e);
            } finally {
                writeBuffer.clear();
                bufferBytes = 0;
                DBUtil.closeDBResources(null, null, connection);
            }
        }
 3.2、修改doBatchInsert
 protected void doBatchInsert(Connection connection, List<Record> buffer)
                throws SQLException
        {
            PreparedStatement preparedStatement = null;
            try {
                connection.setAutoCommit(false);
                preparedStatement = connection
                        .prepareStatement(this.writeRecordSql);
                if (this.dataBaseType == DataBaseType.Oracle && !"insert".equalsIgnoreCase(this.writeMode)) {
                    String merge = this.writeMode;
                    String[] sArray = WriterUtil.getStrings(merge);
                    for (Record record : buffer) {
                        List<Column> recordOne = new ArrayList<>();
                        for (int j = 0; j < this.columns.size(); j++) {
                            if (Arrays.asList(sArray).contains(this.columns.get(j))) {
                                recordOne.add(record.getColumn(j));
                            }
                        }
                        for (int j = 0; j < this.columns.size(); j++) {
                            if (!Arrays.asList(sArray).contains(this.columns.get(j))) {
                                recordOne.add(record.getColumn(j));
                            }
                        }
                        for (int j = 0; j < this.columns.size(); j++) {
                            recordOne.add(record.getColumn(j));
                        }
                        for (int j = 0; j < recordOne.size(); j++) {
                            record.setColumn(j, recordOne.get(j));
                        }
                        preparedStatement = fillPreparedStatement(
                                preparedStatement, record);
                        preparedStatement.addBatch();
                    }
                }
                else {
                    for (Record record : buffer) {
                        preparedStatement = fillPreparedStatement(
                                preparedStatement, record);
                        preparedStatement.addBatch();
                    }
                }
                preparedStatement.executeBatch();
                connection.commit();
            }
            catch (SQLException e) {
                LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为: {}", e.getMessage());
                connection.rollback();
                doOneInsert(connection, buffer);
            }
            catch (Exception e) {
                throw DataXException.asDataXException(
                        DBUtilErrorCode.WRITE_DATA_ERROR, e);
            }
            finally {
                DBUtil.closeDBResources(preparedStatement, null);
            }
        }
 3.3、修改fillPreparedStatement
  protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record)
                throws SQLException
        {
            for (int i = 0; i < record.getColumnNumber(); i++) {
                int columnSqltype = this.resultSetMetaData.getMiddle().get(i);
                String typeName = this.resultSetMetaData.getRight().get(i);
                preparedStatement = fillPreparedStatementColumnType(preparedStatement, i,columnSqltype, typeName,record.getColumn(i));
            }
            return preparedStatement;
        }

第三步打包

1、只需要在idea里面打包修改的两个程序就可以

 2、打包成功后获取两个jar包

 3、将包替换到datax的插件里面

 将oraclewriter-0.0.1-SNAPSHOT.jar替换到datax\plugin\writer\oraclewriter

 将plugin-rdbms-util-0.0.1-SNAPSHOT.jar替换到datax\plugin\writer\oraclewriter\libs

第四步脚本修改

{
    "job": {
        "setting": {
            "speed": {
                 "byte": 1048576
            },
                "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "${r_username}",
                        "password": "${r_password}",
                        "connection": [
                            {	   
                                "querySql": ["SELECT f_year,f_code,f_name,f_order FROM tableName"],
                                "jdbcUrl": ["${r_jdbcUrl}"]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "oraclewriter",
                    "parameter": {
						"writeMode": "update(f_year,f_code)",
                        "username": "${w_username}",
                        "password": "${w_password}",
                        "column": [
                         "f_year","f_code","f_name","f_order"
                        ],
                        "session": [],
                        "preSql": [],
                        "connection": [
                            {
                                "jdbcUrl": "${w_jdbcUrl}",
                                "table": ["tableName"]
                            }
                        ]
                    }
			   }		   
            }
        ]
    }
}

参数 "writeMode": "update(f_year,f_code)" 里面f_year,f_code就是主键, 参数上不要加/"

update(\"f_year\",\"f_code\")这样是拼不上sql的,这个问题调试了好久才解决。

这时候运行就成功了

参考文章DataX 二次开发支持 Oracle 更新数据icon-default.png?t=N7T8https://blog.csdn.net/xch_yang/article/details/128250190?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-128250190-blog-106881907.235%5Ev43%5Epc_blog_bottom_relevance_base8&spm=1001.2101.3001.4242.1&utm_relevant_index=3Datax oracle 支持增量并且支持全量更新icon-default.png?t=N7T8https://blog.csdn.net/weixin_41250031/article/details/122615271?spm=1001.2101.3001.6650.5&utm_medium=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-5-122615271-blog-129723622.235%5Ev43%5Epc_blog_bottom_relevance_base8&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-5-122615271-blog-129723622.235%5Ev43%5Epc_blog_bottom_relevance_base8&utm_relevant_index=7

修改后jar包地址 

懒得修改可以直接下载两个jar替换到你们的datax对应目录。

https://download.csdn.net/download/qq_36802726/89046154icon-default.png?t=N7T8https://download.csdn.net/download/qq_36802726/89046154

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/498522.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

红酒:红酒分类与消费者教育的重要性

在红酒的世界里&#xff0c;品种繁多&#xff0c;口感各异。对于消费者而言&#xff0c;了解红酒的分类以及接受相关的消费者教育至关重要。云仓酒庄雷盛红酒作为业界的持续发展者&#xff0c;深知这一点&#xff0c;致力于为消费者提供品质的教育内容&#xff0c;帮助他们更好…

Verilog语法之case语句学习

case分支语句是一种实现多路分支控制的分支语句。与使用if-else条件分支语句相比&#xff0c;采用case分支语句来实现多路控制会变得更加的方便直观。 case分支语句通常用于对微处理器指令译码功能的描述以及对有限状态机的描述。Case分支语句有“case”、“casez”、“casex”…

MybatisPlus学习总结

MybatisPlus.xmind 一、MybatisPlus快速入门 1.基本介绍 官网: 简介 | MyBatis-Plus MyBatis Plus是一个基于MyBatis的增强工具&#xff0c;它简化了MyBatis的使用&#xff0c;提供了一系列的增强功能&#xff0c;使开发更加方便快捷。 MyBatis Plus的主要特点包括&#xff…

3月23日笔记

广播域与泛洪范围是相同的 广播&#xff1a;在同一个泛洪范围内&#xff0c;强迫交换机泛洪&#xff08;主动&#xff09; 泛洪&#xff08;被动&#xff09; ARP的工作原理&#xff1a;ARP先通过广播发送请求包&#xff0c;所有收到该广播包的设备都会将其中的源IP和源MAC相…

《Vision mamba》论文笔记

原文出处&#xff1a; [2401.09417] Vision Mamba: Efficient Visual Representation Learning with Bidirectional State Space Model (arxiv.org) 原文笔记&#xff1a; What&#xff1a; Vision Mamba: Efficient Visual Representation Learning with Bidirectional St…

【Python】#2 基本数据类型

文章目录 一、数字类型1. 整数类型2. 浮点数类型tips&#xff1a;为什么浮点数计算的小数部分经常“错误”&#xff1f;如 为什么0.10.20.3在计算机中不为真 3. 复数形式<classcomplex>4. 数字类型的操作符与部分函数tips: 数字类型的类型提升tips:Python中除法 基本数据…

电商控价的效果有哪些

品牌在做价格治理时&#xff0c;肯定是不再希望线上平台出现低价、窜货链接&#xff0c;但现实却难如品牌所愿&#xff0c;有几个难以实现的原因&#xff0c;首先&#xff0c;电商平台链接上架下架是很容易的&#xff0c;此刻将链接治理下架&#xff0c;下一刻店铺可能又会再上…

《QT实用小工具·二》图片文字转base64编码

1、概述 源码放在文章末尾 base64编码转换类 图片转base64字符串。base64字符串转图片。字符转base64字符串。base64字符串转字符。后期增加数据压缩。Qt6对base64编码转换进行了重写效率提升至少200%。 下面是demo演示&#xff1a; 项目部分代码如下所示&#xff1a; #ifn…

python pytz是什么

pytz模块常用于时区的转换&#xff0c;常常配合datetime一起使用。我们知道datetime除了data方法生成的时间是没有时区概念&#xff0c;其他如time、datetime等都是有时区概念&#xff0c;即指定了tzinfo信息。 >>> import datetime >>> datetime.datetime.n…

【机器学习】深入探讨基于实例的学习及K-最近邻算法

深入探讨基于实例的学习及K-最近邻算法 在机器学习的众多策略中&#xff0c;基于实例的学习方法因其简单性和高效性而备受关注。这种方法的核心理念在于利用已知的数据实例来预测新数据的标签或属性。本文将深入探讨其中的两个重要概念&#xff1a;最近邻算法和K-最近邻算法&a…

浏览器工作原理与实践--块级作用域:var缺陷以及为什么要引入let和const

在前面《07 | 变量提升&#xff1a;JavaScript代码是按顺序执行的吗&#xff1f;》这篇文章中&#xff0c;我们已经讲解了JavaScript中变量提升的相关内容&#xff0c;正是由于JavaScript存在变量提升这种特性&#xff0c;从而导致了很多与直觉不符的代码&#xff0c;这也是Jav…

考研数学|高效刷透汤家凤《1800》经验分享

当然不需要换老师&#xff0c;如果你在基础阶段连汤老师的课都听不进去&#xff0c;那么换其他老师的话&#xff0c;很大可能也是白搭。 如果你现在对于1800还是一筹莫展的话&#xff0c;那么很明显&#xff0c;这反映出前期基础不扎实&#xff0c;没有真正理解和掌握这部分内…

【NOI】树的初步认识

文章目录 前言一、树1.什么是树&#xff1f;2.树的基本概念3.树的基本术语3.1 节点3.1.1 根节点3.1.2 父节点、子节点3.1.3 兄弟节点、堂兄弟节点3.1.4 祖先节点、子孙节点3.1.5 叶子节点/终端节点3.1.6 分支节点/非终端节点 3.2 边3.3 度3.3.1 树的度 3.4 层次3.4.1 树的深度3…

学习JavaEE的日子 Day32 线程池

Day32 线程池 1.引入 一个线程完成一项任务所需时间为&#xff1a; 创建线程时间 - Time1线程中执行任务的时间 - Time2销毁线程时间 - Time3 2.为什么需要线程池(重要) 线程池技术正是关注如何缩短或调整Time1和Time3的时间&#xff0c;从而提高程序的性能。项目中可以把Time…

磐启微PAN1020低功耗SOC芯片

PAN1020低功耗蓝牙芯片 典型应用 ⚫ 电视和机顶盒遥控器 ⚫ 无线游戏手柄 ⚫ 无线鼠键 ⚫ 智能家居 需要此物料&#xff0c;可联系周小姐 主要特性 ⚫ RF - 2.4GHz 射频收发机&#xff08;兼容 BLE4.2&#xff09; - 接收灵敏度&#xff1a;-90 dBm1Mbps - 接收信号&a…

智慧公厕解决方案打造更加智能的卫生空间

一、智慧公厕方案概述 智慧公厕方案旨在解决现有公厕存在的诸多问题&#xff0c;包括民众用厕困难、环境卫生状况不佳、管理效率低下等方面。针对民众的需求和管理方面的挑战&#xff0c;智慧公厕提供了一套综合解决方案&#xff0c;包括智能导航、环境监测、资源管理等功能&a…

jvm(虚拟机)运行时数据区域介绍

Java虚拟机&#xff08;JVM&#xff09;运行时数据区域是Java程序在运行过程中使用的内存区域&#xff0c;它主要包括以下几个部分&#xff1a; 程序计数器&#xff08;Program Counter Register&#xff09;&#xff1a; 程序计数器是一块较小的内存区域&#xff0c;是线程私有…

25G SFP28 AOC线缆最新数据传输解决方案

在当今云计算、大数据、人工智能等领域&#xff0c;对高速数据传输的需求不断增加。传统的1G和10G网络已经无法满足数据中心日益增长的流量&#xff0c;因此迫切需要更高速的解决方案。25G SFP28 AOC有源光缆迎合了这一需求&#xff0c;成为连接数据中心、服务器、存储等25G设备…

RPA使用Native Messaging协议实现浏览器自动化

RPA 即机器人流程自动化&#xff0c;是一种利用软件机器人或人工智能来自动化业务流程中规则性、重复性任务的技术。RPA 技术可以模拟和执行人类在计算机上的交互操作&#xff0c;从而实现自动化处理数据、处理交易、触发通知等任务。帮助企业或个人实现业务流程的自动化和优化…

速通数据结构第二站 顺序表

文章目录 速通数据结构与算法系列 1 速通数据结构与算法第一站 复杂度 http://t.csdnimg.cn/sxEGF 感谢佬们支持&#xff01; 目录 系列文章目录 前言一、顺序表 0 结构体 1 接口声明 2 初始化和销毁 3 扩容函数 4 打印和判空 5 尾插 …