更多优秀文章,请扫码关注个人微信公众号或搜索“程序猿小杨”添加。
一、错误展现:
Caused by: io.debezium.connector.oracle.logminer.parser.DmlParserException: Failed to parse insert DML: 'insert into "HIS_DATA".
at io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser.parseInsert(LogMinerDmlParser.java:109)
at io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser.parse(LogMinerDmlParser.java:73)
at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.parseDmlStatement(AbstractLogMinerEventProcessor.java:1078)
... 16 common frames omitted
Caused by: java.lang.ArrayIndexOutOfBoundsException: 74
at io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser.parseColumnListClause(LogMinerDmlParser.java:239)
at io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser.parseInsert(LogMinerDmlParser.java:100)
... 18 common frames omitted
二、问题原因:
通过分析源代码,发现debezium-connector-oracle-1.9.7.Final中的方法存在问题,造成数组越界,一般方式场景在:oracle某张表表结构发生新增字段,然后flink cdc的任务加载的还是旧的表字段信息,当被监控表业务场景写入数据的时候,由于sql里面已经带了新增字段,但是table里面的字段还是旧的,造成解析错误,源代码如下:
private int parseColumnListClause(String sql, int start, String[] columnNames) {
int index = start;
boolean inQuote = false;
for(int var6 = 0; index < sql.length(); ++index) {
char c = sql.charAt(index);
if (c == '(' && !inQuote) {
start = index + 1;
} else {
if (c == ')' && !inQuote) {
++index;
break;
}
if (c == '"') {
if (inQuote) {
inQuote = false;
columnNames[var6++] = sql.substring(start + 1, index);
start = index + 2;
} else {
inQuote = true;
}
}
}
}
return index;
}
三、解决方案:
解决思路:重写io.debezium.connector.oracle.logminer.parser下的LogMinerDmlParser类中的方法即可,方案有两种:
方案1:如果新增字段的内容不需要,比如:只需获取主键信息及对应的值,那就忽略掉新增的字段,新增字段内容不包含在解析后的数据中。那只需要调整一下代码即可:
在代码中新建一个包路径为:io.debezium.connector.oracle.logminer.parser,
复制源码路径下的LogMinerDmlParser类中内容:
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package io.debezium.connector.oracle.logminer.parser;
import io.debezium.DebeziumException;
import io.debezium.connector.oracle.logminer.LogMinerHelper;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
public class LogMinerDmlParser implements DmlParser {
private static final String NULL_SENTINEL = "${DBZ_NULL}";
private static final String NULL = "NULL";
private static final String INSERT_INTO = "insert into ";
private static final String UPDATE = "update ";
private static final String DELETE_FROM = "delete from ";
private static final String AND = "and ";
private static final String OR = "or ";
private static final String SET = " set ";
private static final String WHERE = " where ";
private static final String VALUES = " values ";
private static final String IS_NULL = "IS NULL";
private static final String UNSUPPORTED = "Unsupported";
private static final String UNSUPPORTED_TYPE = "Unsupported Type";
private static final int INSERT_INTO_LENGTH = "insert into ".length();
private static final int UPDATE_LENGTH = "update ".length();
private static final int DELETE_FROM_LENGTH = "delete from ".length();
private static final int VALUES_LENGTH = " values ".length();
private static final int SET_LENGTH = " set ".length();
private static final int WHERE_LENGTH = " where ".length();
public LogMinerDmlParser() {
}
public LogMinerDmlEntry parse(String sql, Table table) {
if (table == null) {
throw new DmlParserException("DML parser requires a non-null table");
} else {
if (sql != null && sql.length() > 0) {
switch(sql.charAt(0)) {
case 'd':
return this.parseDelete(sql, table);
case 'i':
return this.pa