From ee14d6bd616c19e273a354068cb83a9f7b2bc03b Mon Sep 17 00:00:00 2001 From: "dongchenghao.hao" Date: Wed, 24 May 2023 17:55:11 +0800 Subject: [PATCH] fix: https://github.com/alibaba/canal/issues/3538 --- .vscode/settings.json | 4 ++ .../resources/example/instance.properties | 2 + .../inbound/mysql/dbsync/TableMetaCache.java | 47 +++++++++---------- 3 files changed, 29 insertions(+), 24 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000000..b84f89c384 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,4 @@ +{ + "java.configuration.updateBuildConfiguration": "interactive", + "java.compile.nullAnalysis.mode": "automatic" +} \ No newline at end of file diff --git a/deployer/src/main/resources/example/instance.properties b/deployer/src/main/resources/example/instance.properties index c41affe0ab..548a5a0295 100644 --- a/deployer/src/main/resources/example/instance.properties +++ b/deployer/src/main/resources/example/instance.properties @@ -52,7 +52,9 @@ canal.mq.topic=example #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config +#canal.mq.enableDynamicQueuePartition=false #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 +#canal.mq.partitionHash=test.table:id^name,.*\\..* ################################################# diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java index 6afdefdb33..8bab726dbf 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/dbsync/TableMetaCache.java @@ -31,21 +31,21 @@ */ public class TableMetaCache { - public static final String COLUMN_NAME = "COLUMN_NAME"; - public static final String COLUMN_TYPE = "COLUMN_TYPE"; - public static final String IS_NULLABLE = "IS_NULLABLE"; - public static final String COLUMN_KEY = "COLUMN_KEY"; - public static final String COLUMN_DEFAULT = "COLUMN_DEFAULT"; - public static final String EXTRA = "EXTRA"; - private MysqlConnection connection; - private boolean isOnRDS = false; - private boolean isOnTSDB = false; - - private TableMetaTSDB tableMetaTSDB; + public static final String COLUMN_NAME = "Field"; + public static final String COLUMN_TYPE = "Type"; + public static final String IS_NULLABLE = "Null"; + public static final String COLUMN_KEY = "Key"; + public static final String COLUMN_DEFAULT = "Default"; + public static final String EXTRA = "Extra"; + private MysqlConnection connection; + private boolean isOnRDS = false; + private boolean isOnTSDB = false; + + private TableMetaTSDB tableMetaTSDB; // 第一层tableId,第二层schema.table,解决tableId重复,对应多张表 private LoadingCache tableMetaDB; - public TableMetaCache(MysqlConnection con, TableMetaTSDB tableMetaTSDB){ + public TableMetaCache(MysqlConnection con, TableMetaTSDB tableMetaTSDB) { this.connection = con; this.tableMetaTSDB = tableMetaTSDB; // 如果持久存储的表结构为空,从db里面获取下 @@ -116,7 +116,7 @@ public static List parseTableMetaByDesc(ResultSetPacket packet) { Map nameMaps = new HashMap<>(6, 1f); int index = 0; for (FieldPacket fieldPacket : packet.getFieldDescriptors()) { - nameMaps.put(fieldPacket.getOriginalName(), index++); + nameMaps.put(fieldPacket.getName(), index++); } int size = packet.getFieldDescriptors().size(); @@ -128,13 +128,13 @@ public static List parseTableMetaByDesc(ResultSetPacket packet) { meta.setColumnName(packet.getFieldValues().get(nameMaps.get(COLUMN_NAME) + i * size).intern()); meta.setColumnType(packet.getFieldValues().get(nameMaps.get(COLUMN_TYPE) + i * size)); meta.setNullable(StringUtils.equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(IS_NULLABLE) + i - * size), - "YES")); + * size), + "YES")); meta.setKey("PRI".equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(COLUMN_KEY) + i * size))); meta.setUnique("UNI".equalsIgnoreCase(packet.getFieldValues().get(nameMaps.get(COLUMN_KEY) + i * size))); // 特殊处理引号 meta.setDefaultValue(DruidDdlParser.unescapeQuotaName(packet.getFieldValues() - .get(nameMaps.get(COLUMN_DEFAULT) + i * size))); + .get(nameMaps.get(COLUMN_DEFAULT) + i * size))); meta.setExtra(packet.getFieldValues().get(nameMaps.get(EXTRA) + i * size)); result.add(meta); @@ -245,16 +245,15 @@ public boolean apply(EntryPosition position, String schema, String ddl, String e private String getFullName(String schema, String table) { StringBuilder builder = new StringBuilder(); return builder.append('`') - .append(schema) - .append('`') - .append('.') - .append('`') - .append(table) - .append('`') - .toString(); + .append(schema) + .append('`') + .append('.') + .append('`') + .append(table) + .append('`') + .toString(); } - public boolean isOnTSDB() { return isOnTSDB; }