Skip to content

Commit

Permalink
Fix Cassandra target (open-telemetry#10357)
Browse files Browse the repository at this point in the history
Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
  • Loading branch information
2 people authored and steverao committed Feb 16, 2024
1 parent a902030 commit 66f8f3b
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.cassandra.v3_0;

import com.datastax.driver.core.ExecutionInfo;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.semconv.SemanticAttributes;
import javax.annotation.Nullable;

public class CassandraAttributesExtractor
implements AttributesExtractor<CassandraRequest, ExecutionInfo> {
@Override
public void onStart(AttributesBuilder attributes, Context context, CassandraRequest request) {}

@Override
public void onEnd(
AttributesBuilder attributes,
Context context,
CassandraRequest request,
@Nullable ExecutionInfo executionInfo,
@Nullable Throwable error) {
if (executionInfo == null) {
return;
}
attributes.put(
SemanticAttributes.SERVER_ADDRESS,
executionInfo.getQueriedHost().getSocketAddress().getHostString());
attributes.put(
SemanticAttributes.SERVER_PORT,
executionInfo.getQueriedHost().getSocketAddress().getPort());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public final class CassandraSingletons {
.build())
.addAttributesExtractor(
NetworkAttributesExtractor.create(new CassandraNetworkAttributesGetter()))
.addAttributesExtractor(new CassandraAttributesExtractor())
.buildInstrumenter(SpanKindExtractor.alwaysClient());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ void syncTest(Parameter parameter) {
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
Expand All @@ -99,6 +101,8 @@ void syncTest(Parameter parameter) {
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
Expand All @@ -116,6 +120,8 @@ void syncTest(Parameter parameter) {
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
Expand Down Expand Up @@ -153,6 +159,8 @@ void asyncTest(Parameter parameter) {
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
Expand All @@ -167,6 +175,8 @@ void asyncTest(Parameter parameter) {
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
Expand All @@ -189,6 +199,8 @@ void asyncTest(Parameter parameter) {
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.NETWORK_TYPE, "ipv4"),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, cassandraPort),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(SemanticAttributes.DB_SYSTEM, "cassandra"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
import static io.opentelemetry.semconv.SemanticAttributes.DB_STATEMENT;
import static io.opentelemetry.semconv.SemanticAttributes.DB_SYSTEM;
import static io.opentelemetry.semconv.SemanticAttributes.NETWORK_TYPE;
import static io.opentelemetry.semconv.SemanticAttributes.SERVER_ADDRESS;
import static io.opentelemetry.semconv.SemanticAttributes.SERVER_PORT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Named.named;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
Expand Down Expand Up @@ -90,8 +94,20 @@ void syncTest(Parameter parameter) {
.hasKind(SpanKind.CLIENT)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(NETWORK_TYPE, "ipv4"),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
satisfies(
NETWORK_TYPE,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isEqualTo("ipv4"),
v -> assertThat(v).isEqualTo("ipv6"))),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, cassandraPort),
satisfies(
NetworkAttributes.NETWORK_PEER_ADDRESS,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isEqualTo("127.0.0.1"),
v -> assertThat(v).isEqualTo("0:0:0:0:0:0:0:1"))),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(DB_SYSTEM, "cassandra"),
equalTo(DB_NAME, parameter.keyspace),
Expand Down Expand Up @@ -137,8 +153,20 @@ void asyncTest(Parameter parameter) throws Exception {
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(NETWORK_TYPE, "ipv4"),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
satisfies(
NETWORK_TYPE,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isEqualTo("ipv4"),
v -> assertThat(v).isEqualTo("ipv6"))),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, cassandraPort),
satisfies(
NetworkAttributes.NETWORK_PEER_ADDRESS,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isEqualTo("127.0.0.1"),
v -> assertThat(v).isEqualTo("0:0:0:0:0:0:0:1"))),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(DB_SYSTEM, "cassandra"),
equalTo(DB_NAME, parameter.keyspace),
Expand Down Expand Up @@ -302,11 +330,15 @@ protected CqlSession getSession(String keyspace) {
.withDuration(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, Duration.ofSeconds(10))
.build();
return wrap(
CqlSession.builder()
.addContactPoint(new InetSocketAddress("localhost", cassandraPort))
addContactPoint(CqlSession.builder())
.withConfigLoader(configLoader)
.withLocalDatacenter("datacenter1")
.withKeyspace(keyspace)
.build());
}

protected CqlSessionBuilder addContactPoint(CqlSessionBuilder sessionBuilder) {
sessionBuilder.addContactPoint(new InetSocketAddress("localhost", cassandraPort));
return sessionBuilder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.semconv.SemanticAttributes;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import javax.annotation.Nullable;

final class CassandraAttributesExtractor
Expand All @@ -36,6 +38,12 @@ public void onEnd(

Node coordinator = executionInfo.getCoordinator();
if (coordinator != null) {
SocketAddress address = coordinator.getEndPoint().resolve();
if (address instanceof InetSocketAddress) {
attributes.put(
SemanticAttributes.SERVER_ADDRESS, ((InetSocketAddress) address).getHostString());
attributes.put(SemanticAttributes.SERVER_PORT, ((InetSocketAddress) address).getPort());
}
if (coordinator.getDatacenter() != null) {
attributes.put(SemanticAttributes.DB_CASSANDRA_COORDINATOR_DC, coordinator.getDatacenter());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,28 @@
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint;
import com.datastax.oss.driver.internal.core.metadata.SniEndPoint;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.semconv.SemanticAttributes;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

final class CassandraAttributesExtractor
implements AttributesExtractor<CassandraRequest, ExecutionInfo> {

private static final Logger logger =
Logger.getLogger(CassandraAttributesExtractor.class.getName());

private static final Field proxyAddressField = getProxyAddressField();

@Override
public void onStart(
AttributesBuilder attributes, Context parentContext, CassandraRequest request) {}
Expand All @@ -36,6 +48,8 @@ public void onEnd(

Node coordinator = executionInfo.getCoordinator();
if (coordinator != null) {
updateServerAddressAndPort(attributes, coordinator);

if (coordinator.getDatacenter() != null) {
attributes.put(SemanticAttributes.DB_CASSANDRA_COORDINATOR_DC, coordinator.getDatacenter());
}
Expand Down Expand Up @@ -74,4 +88,40 @@ public void onEnd(
}
attributes.put(SemanticAttributes.DB_CASSANDRA_IDEMPOTENCE, idempotent);
}

private static void updateServerAddressAndPort(AttributesBuilder attributes, Node coordinator) {
EndPoint endPoint = coordinator.getEndPoint();
if (endPoint instanceof DefaultEndPoint) {
InetSocketAddress address = ((DefaultEndPoint) endPoint).resolve();
attributes.put(SemanticAttributes.SERVER_ADDRESS, address.getHostString());
attributes.put(SemanticAttributes.SERVER_PORT, address.getPort());
} else if (endPoint instanceof SniEndPoint && proxyAddressField != null) {
SniEndPoint sniEndPoint = (SniEndPoint) endPoint;
Object object = null;
try {
object = proxyAddressField.get(sniEndPoint);
} catch (Exception e) {
logger.log(
Level.FINE,
"Error when accessing the private field proxyAddress of SniEndPoint using reflection.",
e);
}
if (object instanceof InetSocketAddress) {
InetSocketAddress address = (InetSocketAddress) object;
attributes.put(SemanticAttributes.SERVER_ADDRESS, address.getHostString());
attributes.put(SemanticAttributes.SERVER_PORT, address.getPort());
}
}
}

@Nullable
private static Field getProxyAddressField() {
try {
Field field = SniEndPoint.class.getDeclaredField("proxyAddress");
field.setAccessible(true);
return field;
} catch (Exception e) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
package io.opentelemetry.instrumentation.cassandra.v4_4;

import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint;
import com.datastax.oss.driver.internal.core.metadata.SniEndPoint;
import io.opentelemetry.instrumentation.api.semconv.network.NetworkAttributesGetter;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import javax.annotation.Nullable;

final class CassandraNetworkAttributesGetter
Expand All @@ -27,8 +29,12 @@ public InetSocketAddress getNetworkPeerInetSocketAddress(
return null;
}
// resolve() returns an existing InetSocketAddress, it does not do a dns resolve,
// at least in the only current EndPoint implementation (DefaultEndPoint)
SocketAddress address = coordinator.getEndPoint().resolve();
return address instanceof InetSocketAddress ? (InetSocketAddress) address : null;
EndPoint endPoint = coordinator.getEndPoint();
if (endPoint instanceof DefaultEndPoint) {
return (InetSocketAddress) coordinator.getEndPoint().resolve();
} else if (endPoint instanceof SniEndPoint) {
return ((SniEndPoint) endPoint).resolve();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@
import static io.opentelemetry.semconv.SemanticAttributes.DB_STATEMENT;
import static io.opentelemetry.semconv.SemanticAttributes.DB_SYSTEM;
import static io.opentelemetry.semconv.SemanticAttributes.NETWORK_TYPE;
import static io.opentelemetry.semconv.SemanticAttributes.SERVER_ADDRESS;
import static io.opentelemetry.semconv.SemanticAttributes.SERVER_PORT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Named.named;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.internal.core.metadata.SniEndPoint;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.cassandra.v4.common.AbstractCassandraTest;
import io.opentelemetry.instrumentation.api.semconv.network.internal.NetworkAttributes;
import java.net.InetSocketAddress;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -56,8 +62,20 @@ void reactiveTest(Parameter parameter) {
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(NETWORK_TYPE, "ipv4"),
equalTo(NetworkAttributes.NETWORK_PEER_ADDRESS, "127.0.0.1"),
satisfies(
NETWORK_TYPE,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isEqualTo("ipv4"),
v -> assertThat(v).isEqualTo("ipv6"))),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, cassandraPort),
satisfies(
NetworkAttributes.NETWORK_PEER_ADDRESS,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isEqualTo("127.0.0.1"),
v -> assertThat(v).isEqualTo("0:0:0:0:0:0:0:1"))),
equalTo(NetworkAttributes.NETWORK_PEER_PORT, cassandraPort),
equalTo(DB_SYSTEM, "cassandra"),
equalTo(DB_NAME, parameter.keyspace),
Expand Down Expand Up @@ -135,4 +153,11 @@ private static Stream<Arguments> provideReactiveParameters() {
"SELECT",
"users"))));
}

@Override
protected CqlSessionBuilder addContactPoint(CqlSessionBuilder sessionBuilder) {
InetSocketAddress address = new InetSocketAddress("localhost", cassandraPort);
sessionBuilder.addContactEndPoint(new SniEndPoint(address, "localhost"));
return sessionBuilder;
}
}

0 comments on commit 66f8f3b

Please sign in to comment.