Skip to content

Commit

Permalink
Add option to consider initial contact points during reconnection
Browse files Browse the repository at this point in the history
When control connection tries to reconnect usually it considers only nodes
provided by load balancing policy. Usually those do not include what was
initially passed to the driver but the recently seen alive nodes. In some
setups the IPs can keep changing so it may be useful to have an option to
try initial contact points as one of the options during reconnection.
Mainly if the contact point is a hostname.

This change adds the option to the `QueryOptions` to control that behaviour
and adds necessary logic to `ControlConnection` class. It is disabled
by default, meaning that default behaviour remains unchanged.

Additionally adds org.burningwave tools dependency.
This dependency has features that allow for easier host resolution mocking.
  • Loading branch information
Bouncheck committed Sep 23, 2024
1 parent ec66feb commit 0723236
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 0 deletions.
7 changes: 7 additions & 0 deletions driver-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@
<version>1.78.1</version>
</dependency>

<!-- added for easier DNS hostname resolution mocking -->
<dependency>
<groupId>org.burningwave</groupId>
<artifactId>tools</artifactId>
<scope>test</scope>
</dependency>


</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.datastax.driver.core.utils.MoreFutures;
import com.datastax.driver.core.utils.MoreObjects;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
Expand Down Expand Up @@ -160,6 +161,15 @@ protected Connection tryReconnect() throws ConnectionException {
if (isShutdown) throw new ConnectionException(null, "Control connection was shut down");

try {
if (cluster
.configuration
.getQueryOptions()
.shouldAddOriginalContactsToReconnectionPlan()) {
List<Host> initialContacts = cluster.metadata.getContactPoints();
Collections.shuffle(initialContacts);
return reconnectInternal(
Iterators.concat(queryPlan(), initialContacts.iterator()), false);
}
return reconnectInternal(queryPlan(), false);
} catch (NoHostAvailableException e) {
throw new ConnectionException(null, e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class QueryOptions {

private volatile boolean schemaQueriesPaged = true;

private volatile boolean addOriginalContactsToReconnectionPlan = false;

/**
* Creates a new {@link QueryOptions} instance using the {@link #DEFAULT_CONSISTENCY_LEVEL},
* {@link #DEFAULT_SERIAL_CONSISTENCY_LEVEL} and {@link #DEFAULT_FETCH_SIZE}.
Expand Down Expand Up @@ -499,6 +501,26 @@ public int getMaxPendingRefreshNodeRequests() {
return maxPendingRefreshNodeRequests;
}

/**
* Whether the driver should use original contact points when reconnecting to a control node. In
* practice this forces driver to manually add original contact points to the end of the query
* plan. It is possible that it may introduce duplicates (but under differnet Host class
* instances) in the query plan. If this is set to false it does not mean that original contact
* points will be excluded.
*
* <p>One use case of this feature is that if the original contact point is defined by hostname
* and its IP address changes then setting this to {@code true} allows trying reconnecting to the
* new IP if all connection was lost.
*/
public QueryOptions setAddOriginalContactsToReconnectionPlan(boolean enabled) {
this.addOriginalContactsToReconnectionPlan = enabled;
return this;
}

public boolean shouldAddOriginalContactsToReconnectionPlan() {
return this.addOriginalContactsToReconnectionPlan;
}

@Override
public boolean equals(Object that) {
if (that == null || !(that instanceof QueryOptions)) {
Expand Down
12 changes: 12 additions & 0 deletions driver-core/src/test/java/com/datastax/driver/core/CCMBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,7 @@ public static class Builder {
private static final Pattern RANDOM_PORT_PATTERN = Pattern.compile(RANDOM_PORT);

private String ipPrefix = TestUtils.IP_PREFIX;
private String providedClusterName = null;
int[] nodes = {1};
private int[] jmxPorts = {};
private boolean start = true;
Expand Down Expand Up @@ -991,6 +992,15 @@ public Builder withSniProxy() {
return this;
}

/**
* Builder takes care of naming and numbering clusters on its own. Use if you really need a
* specific name
*/
public Builder withClusterName(String clusterName) {
this.providedClusterName = clusterName;
return this;
}

/** Enables SSL encryption. */
public Builder withSSL() {
cassandraConfiguration.put("client_encryption_options.enabled", "true");
Expand Down Expand Up @@ -1115,6 +1125,8 @@ public CCMBridge build() {
// be careful NOT to alter internal state (hashCode/equals) during build!
String clusterName = TestUtils.generateIdentifier("ccm_");

if (providedClusterName != null) clusterName = providedClusterName;

VersionNumber dseVersion;
VersionNumber cassandraVersion;
boolean versionConfigured = this.version != null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.datastax.driver.core;

import java.net.InetSocketAddress;
import java.util.LinkedHashMap;
import java.util.Map;
import org.burningwave.tools.net.DefaultHostResolver;
import org.burningwave.tools.net.HostResolutionRequestInterceptor;
import org.burningwave.tools.net.MappedHostResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;

public class HostResolutionReconnectionTest {

private static final Logger logger =
LoggerFactory.getLogger(HostResolutionReconnectionTest.class);

@Test(groups = "isolated")
public void should_reconnect_to_different_cluster() {
// Configure host resolution
Map<String, String> hostAliasesA = new LinkedHashMap<>();
hostAliasesA.put("control.reconnect.test", "127.1.1.1");
HostResolutionRequestInterceptor.INSTANCE.install(
new MappedHostResolver(hostAliasesA), DefaultHostResolver.INSTANCE);

Cluster cluster = null;
Session session = null;
CCMBridge bridgeA = null;
try {
bridgeA =
CCMBridge.builder()
.withNodes(1)
.withIpPrefix("127.1.1.")
.withBinaryPort(9042)
.withClusterName("same_name")
.build();
bridgeA.start();

cluster =
Cluster.builder()
.addContactPointsWithPorts(
InetSocketAddress.createUnresolved("control.reconnect.test", 9042))
.withPort(9042)
.withoutAdvancedShardAwareness()
.withQueryOptions(new QueryOptions().setAddOriginalContactsToReconnectionPlan(true))
.build();
session = cluster.connect();

ResultSet rs = session.execute("select * from system.local");
Row row = rs.one();
String address = row.getInet("broadcast_address").toString();
logger.info("Queried node has broadcast_address: {}}", address);
System.out.flush();
} finally {
assert bridgeA != null;
bridgeA.close();
}

CCMBridge bridgeB = null;
// Overwrite host resolution
Map<String, String> hostAliasesB = new LinkedHashMap<>();
hostAliasesB.put("control.reconnect.test", "127.2.2.1");
HostResolutionRequestInterceptor.INSTANCE.install(
new MappedHostResolver(hostAliasesB), DefaultHostResolver.INSTANCE);
try {
bridgeB =
CCMBridge.builder()
.withNodes(1)
.withIpPrefix("127.2.2.")
.withBinaryPort(9042)
.withClusterName("same_name")
.build();
bridgeB.start();
Thread.sleep(1000 * 92);
ResultSet rs = session.execute("select * from system.local");
Row row = rs.one();
String address = row.getInet("broadcast_address").toString();
logger.info("Queried node has broadcast_address: {}}", address);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
assert bridgeB != null;
bridgeB.close();
}
}
}
4 changes: 4 additions & 0 deletions driver-core/src/test/resources/burningwave.static.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
managed-logger.repository=autodetect
managed-logger.repository.enabled=false
banner.hide=true
priority-of-this-configuration=1000
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
<scassandra.version>1.1.2</scassandra.version>
<logback.version>1.2.13</logback.version>
<byteman.version>3.0.8</byteman.version>
<burningwave.tools.version>0.26.2</burningwave.tools.version>
<ipprefix>127.0.1.</ipprefix>
<!-- defaults below are overridden by profiles and/or submodules -->
<test.groups>unit</test.groups>
Expand Down Expand Up @@ -398,6 +399,12 @@
<version>${groovy.version}</version>
</dependency>

<dependency>
<groupId>org.burningwave</groupId>
<artifactId>tools</artifactId>
<version>${burningwave.tools.version}</version>
</dependency>

</dependencies>

</dependencyManagement>
Expand Down

0 comments on commit 0723236

Please sign in to comment.