diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java index 17cfe4cd97b26..45e71a5f0cedf 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java @@ -48,6 +48,11 @@ public String generateUpsertQueryStatement() { throw new IllegalStateException("UPSERT not supported"); } + @Override + public List getColumnsForUpsert() { + throw new IllegalStateException("UPSERT not supported"); + } + @Override public void bindValue(PreparedStatement statement, Mutation mutation) throws Exception { final List columns = new ArrayList<>(); @@ -56,8 +61,7 @@ public void bindValue(PreparedStatement statement, Mutation mutation) throws Exc columns.addAll(tableDefinition.getColumns()); break; case UPSERT: - columns.addAll(tableDefinition.getColumns()); - columns.addAll(tableDefinition.getNonKeyColumns()); + columns.addAll(getColumnsForUpsert()); break; case UPDATE: columns.addAll(tableDefinition.getNonKeyColumns()); diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index 3f7f62e3abb5e..95f66edf7a72a 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -109,30 +109,35 @@ public void open(Map config, SinkContext sinkContext) throws Exc } private void initStatement() throws Exception { - List keyList = Lists.newArrayList(); - String key = jdbcSinkConfig.getKey(); - if (key != null && !key.isEmpty()) { - keyList = Arrays.asList(key.split(",")); - } - List nonKeyList = Lists.newArrayList(); - String nonKey = jdbcSinkConfig.getNonKey(); - if (nonKey != null && !nonKey.isEmpty()) { - nonKeyList = Arrays.asList(nonKey.split(",")); - } + List keyList = getListFromConfig(jdbcSinkConfig.getKey()); + List nonKeyList = getListFromConfig(jdbcSinkConfig.getNonKey()); tableDefinition = JdbcUtils.getTableDefinition(connection, tableId, keyList, nonKeyList); - insertStatement = JdbcUtils.buildInsertStatement(connection, generateInsertQueryStatement()); + insertStatement = connection.prepareStatement(generateInsertQueryStatement()); if (jdbcSinkConfig.getInsertMode() == JdbcSinkConfig.InsertMode.UPSERT) { - upsertStatement = JdbcUtils.buildInsertStatement(connection, generateUpsertQueryStatement()); + if (nonKeyList.isEmpty() || keyList.isEmpty()) { + throw new IllegalStateException("UPSERT mode is not configured if 'key' and 'nonKey' " + + "config are not set."); + } + upsertStatement = connection.prepareStatement(generateUpsertQueryStatement()); } if (!nonKeyList.isEmpty()) { - updateStatement = JdbcUtils.buildUpdateStatement(connection, generateUpdateQueryStatement()); + updateStatement = connection.prepareStatement(generateUpdateQueryStatement()); } if (!keyList.isEmpty()) { - deleteStatement = JdbcUtils.buildDeleteStatement(connection, generateDeleteQueryStatement()); + deleteStatement = connection.prepareStatement(generateDeleteQueryStatement()); } } + private static List getListFromConfig(String jdbcSinkConfig) { + List nonKeyList = Lists.newArrayList(); + String nonKey = jdbcSinkConfig; + if (nonKey != null && !nonKey.isEmpty()) { + nonKeyList = Arrays.asList(nonKey.split(",")); + } + return nonKeyList; + } + @Override public void close() throws Exception { if (flushExecutor != null) { @@ -185,6 +190,8 @@ public String generateUpdateQueryStatement() { public abstract String generateUpsertQueryStatement(); + public abstract List getColumnsForUpsert(); + public String generateDeleteQueryStatement() { return JdbcUtils.buildDeleteSql(tableDefinition); } diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java index 327f1db7c6423..5fceea2754728 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java @@ -23,9 +23,7 @@ import com.google.common.collect.Lists; import java.sql.Connection; import java.sql.DatabaseMetaData; -import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.SQLException; import java.util.List; import java.util.StringJoiner; import java.util.stream.IntStream; @@ -173,10 +171,6 @@ public static String buildInsertSql(TableDefinition table) { return builder.toString(); } - public static PreparedStatement buildInsertStatement(Connection connection, String insertSQL) throws SQLException { - return connection.prepareStatement(insertSQL); - } - public static String combationWhere(List columnIds) { StringBuilder builder = new StringBuilder(); if (!columnIds.isEmpty()) { @@ -205,6 +199,9 @@ public static String buildUpdateSql(TableDefinition table) { } public static StringJoiner buildUpdateSqlSetPart(TableDefinition table) { + if (table.nonKeyColumns.isEmpty()) { + throw new IllegalStateException("UPDATE operations are not supported if 'nonKey' config is not set."); + } StringJoiner setJoiner = new StringJoiner(","); table.nonKeyColumns.forEach((columnId) ->{ @@ -215,17 +212,9 @@ public static StringJoiner buildUpdateSqlSetPart(TableDefinition table) { return setJoiner; } - public static PreparedStatement buildUpdateStatement(Connection connection, String updateSQL) throws SQLException { - return connection.prepareStatement(updateSQL); - } - public static String buildDeleteSql(TableDefinition table) { return "DELETE FROM " + table.tableId.getTableName() + combationWhere(table.keyColumns); } - - public static PreparedStatement buildDeleteStatement(Connection connection, String deleteSQL) throws SQLException { - return connection.prepareStatement(deleteSQL); - } } diff --git a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java index 5c232db1aa06a..d0f44e4c77c63 100644 --- a/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java +++ b/pulsar-io/jdbc/mariadb/src/main/java/org/apache/pulsar/io/jdbc/MariadbJdbcAutoSchemaSink.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.io.jdbc; +import java.util.ArrayList; +import java.util.List; import java.util.stream.Collectors; import org.apache.pulsar.io.core.annotations.Connector; import org.apache.pulsar.io.core.annotations.IOType; @@ -32,10 +34,15 @@ public class MariadbJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink { @Override public String generateUpsertQueryStatement() { - final String keys = tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName) - .collect(Collectors.joining(",")); return JdbcUtils.buildInsertSql(tableDefinition) + "ON DUPLICATE KEY UPDATE " + JdbcUtils.buildUpdateSqlSetPart(tableDefinition); } + @Override + public List getColumnsForUpsert() { + final List columns = new ArrayList<>(); + columns.addAll(tableDefinition.getColumns()); + columns.addAll(tableDefinition.getNonKeyColumns()); + return columns; + } } diff --git a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java index d29e3e3aaca47..803f13724376a 100644 --- a/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java +++ b/pulsar-io/jdbc/postgres/src/main/java/org/apache/pulsar/io/jdbc/PostgresJdbcAutoSchemaSink.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.io.jdbc; +import java.util.ArrayList; +import java.util.List; import java.util.stream.Collectors; import org.apache.pulsar.io.core.annotations.Connector; import org.apache.pulsar.io.core.annotations.IOType; @@ -32,10 +34,22 @@ public class PostgresJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink { @Override public String generateUpsertQueryStatement() { - final String keys = tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName) + final List keyColumns = tableDefinition.getKeyColumns(); + if (keyColumns.isEmpty()) { + throw new IllegalStateException("UPSERT is not supported if 'key' config is not set."); + } + final String keys = keyColumns.stream().map(JdbcUtils.ColumnId::getName) .collect(Collectors.joining(",")); return JdbcUtils.buildInsertSql(tableDefinition) + " ON CONFLICT(" + keys + ") " + "DO UPDATE SET " + JdbcUtils.buildUpdateSqlSetPart(tableDefinition); } + + @Override + public List getColumnsForUpsert() { + final List columns = new ArrayList<>(); + columns.addAll(tableDefinition.getColumns()); + columns.addAll(tableDefinition.getNonKeyColumns()); + return columns; + } } diff --git a/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java b/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java index 76655397003ca..f67f0f1afbfb8 100644 --- a/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java +++ b/pulsar-io/jdbc/sqlite/src/main/java/org/apache/pulsar/io/jdbc/SqliteJdbcAutoSchemaSink.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.io.jdbc; +import java.util.ArrayList; +import java.util.List; import java.util.stream.Collectors; import org.apache.pulsar.io.core.annotations.Connector; import org.apache.pulsar.io.core.annotations.IOType; @@ -32,10 +34,22 @@ public class SqliteJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink { @Override public String generateUpsertQueryStatement() { - final String keys = tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName) + final List keyColumns = tableDefinition.getKeyColumns(); + if (keyColumns.isEmpty()) { + throw new IllegalStateException("UPSERT is not supported if 'key' config is not set."); + } + final String keys = keyColumns.stream().map(JdbcUtils.ColumnId::getName) .collect(Collectors.joining(",")); return JdbcUtils.buildInsertSql(tableDefinition) + " ON CONFLICT(" + keys + ") " + "DO UPDATE SET " + JdbcUtils.buildUpdateSqlSetPart(tableDefinition); } + + @Override + public List getColumnsForUpsert() { + final List columns = new ArrayList<>(); + columns.addAll(tableDefinition.getColumns()); + columns.addAll(tableDefinition.getNonKeyColumns()); + return columns; + } }