Skip to content

Commit

Permalink
[fix][connector] JDBC sinks: verify key and nonKey fields are set wit…
Browse files Browse the repository at this point in the history
…h insertMode=UPSERT (apache#17950)
  • Loading branch information
nicoloboschi committed Oct 11, 2022
1 parent 9a8b68a commit 3208086
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public String generateUpsertQueryStatement() {
throw new IllegalStateException("UPSERT not supported");
}

@Override
public List<ColumnId> getColumnsForUpsert() {
throw new IllegalStateException("UPSERT not supported");
}

@Override
public void bindValue(PreparedStatement statement, Mutation mutation) throws Exception {
final List<ColumnId> columns = new ArrayList<>();
Expand All @@ -56,8 +61,7 @@ public void bindValue(PreparedStatement statement, Mutation mutation) throws Exc
columns.addAll(tableDefinition.getColumns());
break;
case UPSERT:
columns.addAll(tableDefinition.getColumns());
columns.addAll(tableDefinition.getNonKeyColumns());
columns.addAll(getColumnsForUpsert());
break;
case UPDATE:
columns.addAll(tableDefinition.getNonKeyColumns());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,30 +109,35 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
}

private void initStatement() throws Exception {
List<String> keyList = Lists.newArrayList();
String key = jdbcSinkConfig.getKey();
if (key != null && !key.isEmpty()) {
keyList = Arrays.asList(key.split(","));
}
List<String> nonKeyList = Lists.newArrayList();
String nonKey = jdbcSinkConfig.getNonKey();
if (nonKey != null && !nonKey.isEmpty()) {
nonKeyList = Arrays.asList(nonKey.split(","));
}
List<String> keyList = getListFromConfig(jdbcSinkConfig.getKey());
List<String> nonKeyList = getListFromConfig(jdbcSinkConfig.getNonKey());

tableDefinition = JdbcUtils.getTableDefinition(connection, tableId, keyList, nonKeyList);
insertStatement = JdbcUtils.buildInsertStatement(connection, generateInsertQueryStatement());
insertStatement = connection.prepareStatement(generateInsertQueryStatement());
if (jdbcSinkConfig.getInsertMode() == JdbcSinkConfig.InsertMode.UPSERT) {
upsertStatement = JdbcUtils.buildInsertStatement(connection, generateUpsertQueryStatement());
if (nonKeyList.isEmpty() || keyList.isEmpty()) {
throw new IllegalStateException("UPSERT mode is not configured if 'key' and 'nonKey' "
+ "config are not set.");
}
upsertStatement = connection.prepareStatement(generateUpsertQueryStatement());
}
if (!nonKeyList.isEmpty()) {
updateStatement = JdbcUtils.buildUpdateStatement(connection, generateUpdateQueryStatement());
updateStatement = connection.prepareStatement(generateUpdateQueryStatement());
}
if (!keyList.isEmpty()) {
deleteStatement = JdbcUtils.buildDeleteStatement(connection, generateDeleteQueryStatement());
deleteStatement = connection.prepareStatement(generateDeleteQueryStatement());
}
}

private static List<String> getListFromConfig(String jdbcSinkConfig) {
List<String> nonKeyList = Lists.newArrayList();
String nonKey = jdbcSinkConfig;
if (nonKey != null && !nonKey.isEmpty()) {
nonKeyList = Arrays.asList(nonKey.split(","));
}
return nonKeyList;
}

@Override
public void close() throws Exception {
if (flushExecutor != null) {
Expand Down Expand Up @@ -185,6 +190,8 @@ public String generateUpdateQueryStatement() {

public abstract String generateUpsertQueryStatement();

public abstract List<JdbcUtils.ColumnId> getColumnsForUpsert();

public String generateDeleteQueryStatement() {
return JdbcUtils.buildDeleteSql(tableDefinition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.StringJoiner;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -173,10 +171,6 @@ public static String buildInsertSql(TableDefinition table) {
return builder.toString();
}

public static PreparedStatement buildInsertStatement(Connection connection, String insertSQL) throws SQLException {
return connection.prepareStatement(insertSQL);
}

public static String combationWhere(List<ColumnId> columnIds) {
StringBuilder builder = new StringBuilder();
if (!columnIds.isEmpty()) {
Expand Down Expand Up @@ -205,6 +199,9 @@ public static String buildUpdateSql(TableDefinition table) {
}

public static StringJoiner buildUpdateSqlSetPart(TableDefinition table) {
if (table.nonKeyColumns.isEmpty()) {
throw new IllegalStateException("UPDATE operations are not supported if 'nonKey' config is not set.");
}
StringJoiner setJoiner = new StringJoiner(",");

table.nonKeyColumns.forEach((columnId) ->{
Expand All @@ -215,17 +212,9 @@ public static StringJoiner buildUpdateSqlSetPart(TableDefinition table) {
return setJoiner;
}

public static PreparedStatement buildUpdateStatement(Connection connection, String updateSQL) throws SQLException {
return connection.prepareStatement(updateSQL);
}

public static String buildDeleteSql(TableDefinition table) {
return "DELETE FROM "
+ table.tableId.getTableName()
+ combationWhere(table.keyColumns);
}

public static PreparedStatement buildDeleteStatement(Connection connection, String deleteSQL) throws SQLException {
return connection.prepareStatement(deleteSQL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.io.jdbc;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
Expand All @@ -32,10 +34,15 @@ public class MariadbJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {

@Override
public String generateUpsertQueryStatement() {
final String keys = tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName)
.collect(Collectors.joining(","));
return JdbcUtils.buildInsertSql(tableDefinition)
+ "ON DUPLICATE KEY UPDATE " + JdbcUtils.buildUpdateSqlSetPart(tableDefinition);
}

@Override
public List<JdbcUtils.ColumnId> getColumnsForUpsert() {
final List<JdbcUtils.ColumnId> columns = new ArrayList<>();
columns.addAll(tableDefinition.getColumns());
columns.addAll(tableDefinition.getNonKeyColumns());
return columns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.io.jdbc;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
Expand All @@ -32,10 +34,22 @@ public class PostgresJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {

@Override
public String generateUpsertQueryStatement() {
final String keys = tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName)
final List<JdbcUtils.ColumnId> keyColumns = tableDefinition.getKeyColumns();
if (keyColumns.isEmpty()) {
throw new IllegalStateException("UPSERT is not supported if 'key' config is not set.");
}
final String keys = keyColumns.stream().map(JdbcUtils.ColumnId::getName)
.collect(Collectors.joining(","));
return JdbcUtils.buildInsertSql(tableDefinition)
+ " ON CONFLICT(" + keys + ") "
+ "DO UPDATE SET " + JdbcUtils.buildUpdateSqlSetPart(tableDefinition);
}

@Override
public List<JdbcUtils.ColumnId> getColumnsForUpsert() {
final List<JdbcUtils.ColumnId> columns = new ArrayList<>();
columns.addAll(tableDefinition.getColumns());
columns.addAll(tableDefinition.getNonKeyColumns());
return columns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.io.jdbc;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
Expand All @@ -32,10 +34,22 @@ public class SqliteJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {

@Override
public String generateUpsertQueryStatement() {
final String keys = tableDefinition.getKeyColumns().stream().map(JdbcUtils.ColumnId::getName)
final List<JdbcUtils.ColumnId> keyColumns = tableDefinition.getKeyColumns();
if (keyColumns.isEmpty()) {
throw new IllegalStateException("UPSERT is not supported if 'key' config is not set.");
}
final String keys = keyColumns.stream().map(JdbcUtils.ColumnId::getName)
.collect(Collectors.joining(","));
return JdbcUtils.buildInsertSql(tableDefinition)
+ " ON CONFLICT(" + keys + ") "
+ "DO UPDATE SET " + JdbcUtils.buildUpdateSqlSetPart(tableDefinition);
}

@Override
public List<JdbcUtils.ColumnId> getColumnsForUpsert() {
final List<JdbcUtils.ColumnId> columns = new ArrayList<>();
columns.addAll(tableDefinition.getColumns());
columns.addAll(tableDefinition.getNonKeyColumns());
return columns;
}
}

0 comments on commit 3208086

Please sign in to comment.