Skip to content

Commit

Permalink
Update Cassandra Java Client to 4.16.0 and DSBulk to 1.10.0, and Mess…
Browse files Browse the repository at this point in the history
…aging Connectors Core to 1.0.15 in order to support Vector DataType (#59)
  • Loading branch information
eolivelli authored Jun 16, 2023
1 parent 589ea55 commit 95b897b
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 57 deletions.
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@
<module>tests</module>
</modules>
<properties>
<messaging.connectors.commons.version>1.0.11</messaging.connectors.commons.version>
<messaging.connectors.commons.version>1.0.15</messaging.connectors.commons.version>
<org.json.version>20230227</org.json.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<java.release.version>8</java.release.version>
<caffeine.version>2.6.2</caffeine.version>
<oss.driver.version>4.6.0</oss.driver.version>
<dsbulk.version>1.6.0</dsbulk.version>
<oss.driver.version>4.16.0</oss.driver.version>
<dsbulk.version>1.10.0</dsbulk.version>
<reactive-streams.version>1.0.3</reactive-streams.version>
<pulsar.version>2.8.2</pulsar.version>
<slf4j.version>1.7.25</slf4j.version>
Expand All @@ -56,7 +56,7 @@
<wiremock-junit.version>1.3.1</wiremock-junit.version>
<surefire.version>2.22.2</surefire.version>
<antlr4.version>4.7.1</antlr4.version>
<simulacron.version>0.10.0</simulacron.version>
<simulacron.version>0.11.0</simulacron.version>
<max.simulacron.clusters>4</max.simulacron.clusters>
<max.ccm.clusters>2</max.ccm.clusters>
<commons-exec.version>1.3</commons-exec.version>
Expand Down
2 changes: 1 addition & 1 deletion pulsar-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<relocations>
<relocation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -80,7 +81,7 @@
@ExtendWith(StreamInterceptingExtension.class)
@ExtendWith(LogInterceptingExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@SimulacronConfig(dseVersion = "5.0.8")
@SimulacronConfig()
class SimpleEndToEndSimulacronIT {

private static final String INSERT_STATEMENT =
Expand All @@ -94,23 +95,29 @@ class SimpleEndToEndSimulacronIT {
+ "+ AND TTL :"
+ SinkUtil.TTL_VARNAME;
private static final String DELETE_STATEMENT = "DELETE FROM ks1.table1 WHERE a = :a AND b = :b";
private static final ImmutableMap<String, String> PARAM_TYPES =
ImmutableMap.<String, String>builder()
.put("a", "int")
.put("b", "varchar")
.put(SinkUtil.TIMESTAMP_VARNAME, "bigint")
.build();

private static final ImmutableMap<String, String> PARAM_TYPES_TTL =
ImmutableMap.<String, String>builder()
.put("a", "int")
.put("b", "varchar")
.put(SinkUtil.TIMESTAMP_VARNAME, "bigint")
.put(SinkUtil.TTL_VARNAME, "bigint")
.build();

private static final ImmutableMap<String, String> PARAM_TYPES_CUSTOM_QUERY =
ImmutableMap.<String, String>builder().put("some1", "int").put("some2", "varchar").build();
private static final LinkedHashMap<String, String> PARAM_TYPES =
new LinkedHashMap<>(
ImmutableMap.<String, String>builder()
.put("a", "int")
.put("b", "varchar")
.put(SinkUtil.TIMESTAMP_VARNAME, "bigint")
.build());

private static final LinkedHashMap<String, String> PARAM_TYPES_TTL =
new LinkedHashMap(
ImmutableMap.<String, String>builder()
.put("a", "int")
.put("b", "varchar")
.put(SinkUtil.TIMESTAMP_VARNAME, "bigint")
.put(SinkUtil.TTL_VARNAME, "bigint")
.build());

private static final LinkedHashMap<String, String> PARAM_TYPES_CUSTOM_QUERY =
new LinkedHashMap(
ImmutableMap.<String, String>builder()
.put("some1", "int")
.put("some2", "varchar")
.build());

private static final String INSTANCE_NAME = "myinstance";
private final BoundCluster simulacron;
Expand Down Expand Up @@ -163,6 +170,10 @@ class SimpleEndToEndSimulacronIT {
.put("contactPoints", hostname)
.put("port", port)
.put("loadBalancing.localDc", "dc1")
// since we upgraded to Driver 4.16.x, we need to explicitly set the protocol version
// otherwise it will try only DSE_v1 and DSE_v2 because they are not considered "BETA"
// https://github.com/datastax/java-driver/blob/4270f93277249abb513bc2abf2ff7a7c481b1d0d/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java#L163
.put("datastax-java-driver.advanced.protocol.version", "V4")
.put("topic.mytopic.ks1.table1.mapping", "a=key, b=value.field1")
.put("topic.mytopic_with_ttl.ks1.table1_with_ttl.mapping", "a=key, b=value, __ttl=key")
.put("topic.yourtopic.ks1.table2.mapping", "a=key, b=value.field1")
Expand Down Expand Up @@ -237,25 +248,29 @@ private static Query makeTtlQuery(int a, String b, long timestamp, long ttl) {
PARAM_TYPES_TTL);
}

private static Map<String, Object> makeParamsTtl(int a, String b, long timestamp, long ttl) {
return ImmutableMap.<String, Object>builder()
.put("a", a)
.put("b", b)
.put(SinkUtil.TIMESTAMP_VARNAME, timestamp)
.put(SinkUtil.TTL_VARNAME, ttl)
.build();
private static LinkedHashMap<String, Object> makeParamsTtl(
int a, String b, long timestamp, long ttl) {
return new LinkedHashMap<>(
ImmutableMap.<String, Object>builder()
.put("a", a)
.put("b", b)
.put(SinkUtil.TIMESTAMP_VARNAME, timestamp)
.put(SinkUtil.TTL_VARNAME, ttl)
.build());
}

private static Map<String, Object> makeParams(int a, String b, long timestamp) {
return ImmutableMap.<String, Object>builder()
.put("a", a)
.put("b", b)
.put(SinkUtil.TIMESTAMP_VARNAME, timestamp)
.build();
private static LinkedHashMap<String, Object> makeParams(int a, String b, long timestamp) {
return new LinkedHashMap<>(
ImmutableMap.<String, Object>builder()
.put("a", a)
.put("b", b)
.put(SinkUtil.TIMESTAMP_VARNAME, timestamp)
.build());
}

private static Map<String, Object> makeParamsCustomQuery(int a, String b) {
return ImmutableMap.<String, Object>builder().put("some1", a).put("some2", b).build();
private static LinkedHashMap<String, Object> makeParamsCustomQuery(int a, String b) {
return new LinkedHashMap(
ImmutableMap.<String, Object>builder().put("some1", a).put("some2", b).build());
}

@BeforeEach
Expand Down Expand Up @@ -320,15 +335,17 @@ void fail_prepare_counter_table() {
.build();

String query = "UPDATE ks1.mycounter SET c = c + :c WHERE a = :a AND b = :b";
Query bad1 = new Query(query, Collections.emptyList(), makeParams(32, "fail", 2), paramTypes);
Query bad1 =
new Query(
query,
Collections.emptyList(),
makeParams(32, "fail", 2),
new LinkedHashMap<>(paramTypes));
simulacron.prime(when(bad1).then(serverError("bad thing")).applyToPrepare());

ImmutableMap<String, Object> props =
ImmutableMap.<String, Object>builder()
.put("name", INSTANCE_NAME)
.put("contactPoints", (String) connectorProperties.get("contactPoints"))
.put("port", (String) connectorProperties.get("port"))
.put("loadBalancing.localDc", "dc1")
.putAll(connectorProperties)
.put("topic.mytopic.ks1.mycounter.mapping", "a=key, b=value, c=value.f2")
.build();
assertThatThrownBy(() -> task.open(props, sinkContext))
Expand All @@ -347,14 +364,15 @@ void fail_delete() {
new Query(
"DELETE FROM ks1.mycounter WHERE a = :a AND b = :b",
Collections.emptyList(),
ImmutableMap.<String, Object>builder().put("a", 37).put("b", "delete").build(),
ImmutableMap.<String, String>builder().put("a", "int").put("b", "varchar").build());
new LinkedHashMap<>(
ImmutableMap.<String, Object>builder().put("a", 37).put("b", "delete").build()),
new LinkedHashMap<>(
ImmutableMap.<String, String>builder()
.put("a", "int")
.put("b", "varchar")
.build()));
simulacron.prime(when(bad1).then(serverError("bad thing")));
Map<String, Object> connProps = new HashMap<>();
connProps.put("name", INSTANCE_NAME);
connProps.put("contactPoints", hostname);
connProps.put("port", port);
connProps.put("loadBalancing.localDc", "dc1");
Map<String, Object> connProps = new HashMap<>(connectorProperties);
connProps.put(
"topic.mytopic.ks1.mycounter.mapping", "a=value.bigint, b=value.text, c=value.int");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.internal.core.util.DependencyCheck;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
import com.datastax.oss.dsbulk.tests.ccm.CCMCluster;
import com.datastax.oss.dsbulk.tests.ccm.CCMExtension;
Expand Down Expand Up @@ -286,11 +285,6 @@ void should_build_session_with_application_version_name_and_client_id() {
}
}

@Test
void tinkerpop_should_be_excluded() {
assertFalse(DependencyCheck.TINKERPOP.isPresent());
}

@NonNull
private EndPoint getEndPoint(CqlSession session) {
return session.getMetadata().getNodes().values().iterator().next().getEndPoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import static com.datastax.oss.sink.pulsar.CqlLogicalTypes.CQL_DURATION;
import static com.datastax.oss.sink.pulsar.CqlLogicalTypes.CQL_VARINT;
import static com.datastax.oss.sink.pulsar.CqlLogicalTypes.DATE;
import static com.datastax.oss.sink.pulsar.CqlLogicalTypes.TIME_MICROS;
import static com.datastax.oss.sink.pulsar.CqlLogicalTypes.TIMESTAMP_MILLIS;
import static com.datastax.oss.sink.pulsar.CqlLogicalTypes.TIME_MICROS;

import java.nio.ByteBuffer;
import java.util.HashMap;
Expand Down

0 comments on commit 95b897b

Please sign in to comment.