Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more tests for otlp with datadog #6073

Open
wants to merge 1 commit into
base: 1.55.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions apollo-router/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ impl IntegrationTest {
pub fn execute_untraced_query(
&self,
query: &Value,
headers: Option<HashMap<String, String>>,
) -> impl std::future::Future<Output = (TraceId, reqwest::Response)> {
assert!(
self.router.is_some(),
Expand All @@ -653,6 +654,16 @@ impl IntegrationTest {
.unwrap();

request.headers_mut().remove(ACCEPT);
if let Some(headers) = headers {
for (name, value) in headers {
request.headers_mut().remove(&name);
request.headers_mut().append(
HeaderName::from_str(&name).expect("header was invalid"),
value.try_into().expect("header was invalid"),
);
}
}

match client.execute(request).await {
Ok(response) => (
TraceId::from_hex(
Expand Down
4 changes: 2 additions & 2 deletions apollo-router/tests/integration/telemetry/datadog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn test_no_sample() -> Result<(), BoxError> {
router.assert_started().await;

let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}});
let (_id, result) = router.execute_untraced_query(&query).await;
let (_id, result) = router.execute_untraced_query(&query, None).await;
router.graceful_shutdown().await;
assert!(result.status().is_success());
let context = context
Expand Down Expand Up @@ -104,7 +104,7 @@ async fn test_sampling_datadog_agent_disabled() -> Result<(), BoxError> {
router.assert_started().await;

let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}});
let (id, result) = router.execute_untraced_query(&query).await;
let (id, result) = router.execute_untraced_query(&query, None).await;
router.graceful_shutdown().await;
assert!(result.status().is_success());
let _context = context
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
telemetry:
apollo:
field_level_instrumentation_sampler: always_off
exporters:
tracing:
propagation:
zipkin: true
trace_context: true
common:
service_name: router
preview_datadog_agent_sampling: true
sampler: 1.0
otlp:
enabled: true
protocol: http
endpoint: <otel-collector-endpoint>
batch_processor:
scheduled_delay: 10ms
metrics:
common:
service_name: router
otlp:
enabled: true
endpoint: <otel-collector-endpoint>/metrics
protocol: http
batch_processor:
scheduled_delay: 10ms


instrumentation:
spans:
mode: spec_compliant
supergraph:
attributes:
graphql.operation.name: true

subgraph:
attributes:
otel.name:
subgraph_operation_name: string
8 changes: 4 additions & 4 deletions apollo-router/tests/integration/telemetry/jaeger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async fn test_local_root() -> Result<(), BoxError> {
router.assert_started().await;

let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}});
let (id, result) = router.execute_untraced_query(&query).await;
let (id, result) = router.execute_untraced_query(&query, None).await;
assert!(!result
.headers()
.get("apollo-custom-trace-id")
Expand Down Expand Up @@ -121,7 +121,7 @@ async fn test_local_root_no_sample() -> Result<(), BoxError> {
router.assert_started().await;

let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}});
let (_, response) = router.execute_untraced_query(&query).await;
let (_, response) = router.execute_untraced_query(&query, None).await;
assert!(response.headers().get("apollo-custom-trace-id").is_some());

router.graceful_shutdown().await;
Expand All @@ -141,7 +141,7 @@ async fn test_local_root_50_percent_sample() -> Result<(), BoxError> {
let query = json!({"query":"query ExampleQuery {topProducts{name}}\n","variables":{}, "operationName": "ExampleQuery"});

for _ in 0..100 {
let (id, result) = router.execute_untraced_query(&query).await;
let (id, result) = router.execute_untraced_query(&query, None).await;

if result.headers().get("apollo-custom-trace-id").is_some()
&& validate_trace(
Expand Down Expand Up @@ -177,7 +177,7 @@ async fn test_no_telemetry() -> Result<(), BoxError> {
router.assert_started().await;

let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}});
let (_, response) = router.execute_untraced_query(&query).await;
let (_, response) = router.execute_untraced_query(&query, None).await;
assert!(response.headers().get("apollo-custom-trace-id").is_none());

router.graceful_shutdown().await;
Expand Down
148 changes: 143 additions & 5 deletions apollo-router/tests/integration/telemetry/otlp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
extern crate core;

use std::collections::HashMap;
use std::collections::HashSet;
use std::time::Duration;

Expand All @@ -24,6 +25,9 @@ use crate::integration::ValueExt;

#[tokio::test(flavor = "multi_thread")]
async fn test_basic() -> Result<(), BoxError> {
if !graph_os_enabled() {
panic!("Error: test skipped because GraphOS is not enabled");
}
let mock_server = mock_otlp_server().await;
let config = include_str!("fixtures/otlp.router.yaml")
.replace("<otel-collector-endpoint>", &mock_server.uri());
Expand Down Expand Up @@ -80,6 +84,9 @@ async fn test_basic() -> Result<(), BoxError> {

#[tokio::test(flavor = "multi_thread")]
async fn test_otlp_request_with_datadog_propagator() -> Result<(), BoxError> {
if !graph_os_enabled() {
panic!("Error: test skipped because GraphOS is not enabled");
}
let mock_server = mock_otlp_server().await;
let config = include_str!("fixtures/otlp_datadog_propagation.router.yaml")
.replace("<otel-collector-endpoint>", &mock_server.uri());
Expand Down Expand Up @@ -108,6 +115,9 @@ async fn test_otlp_request_with_datadog_propagator() -> Result<(), BoxError> {

#[tokio::test(flavor = "multi_thread")]
async fn test_otlp_request_with_datadog_propagator_no_agent() -> Result<(), BoxError> {
if !graph_os_enabled() {
panic!("Error: test skipped because GraphOS is not enabled");
}
let mock_server = mock_otlp_server().await;
let config = include_str!("fixtures/otlp_datadog_propagation_no_agent.router.yaml")
.replace("<otel-collector-endpoint>", &mock_server.uri());
Expand All @@ -133,8 +143,131 @@ async fn test_otlp_request_with_datadog_propagator_no_agent() -> Result<(), BoxE
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_otlp_request_with_zipkin_trace_context_propagator_with_datadog(
) -> Result<(), BoxError> {
if !graph_os_enabled() {
panic!("Error: test skipped because GraphOS is not enabled");
}
let mock_server = mock_otlp_server().await;
let config = include_str!("fixtures/otlp_datadog_request_with_zipkin_propagator.router.yaml")
.replace("<otel-collector-endpoint>", &mock_server.uri());
let mut router = IntegrationTest::builder()
.telemetry(Telemetry::Otlp {
endpoint: Some(format!("{}/v1/traces", mock_server.uri())),
})
.config(&config)
.build()
.await;

router.start().await;
router.assert_started().await;

let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}});
let (id, _) = router.execute_query(&query).await;

Spec::builder()
.services(["client", "router", "subgraph"].into())
.priority_sampled("1")
.build()
.validate_trace(id, &mock_server)
.await?;
// ---------------------- zipkin propagator with unsampled trace
// Testing for an unsampled trace, so it should be sent to the otlp exporter with sampling priority set 0
// But it shouldn't send the trace to subgraph as the trace is originally not sampled, the main goal is to measure it at the DD agent level
let id = TraceId::from_hex("80f198ee56343ba864fe8b2a57d3eff7").unwrap();
let headers: HashMap<String, String> = [
(
"X-B3-TraceId".to_string(),
"80f198ee56343ba864fe8b2a57d3eff7".to_string(),
),
(
"X-B3-ParentSpanId".to_string(),
"05e3ac9a4f6e3b90".to_string(),
),
("X-B3-SpanId".to_string(), "e457b5a2e4d86bd1".to_string()),
("X-B3-Sampled".to_string(), "0".to_string()),
]
.into();

let (_id, _) = router.execute_untraced_query(&query, Some(headers)).await;
Spec::builder()
.services(["router"].into())
.priority_sampled("0")
.build()
.validate_trace(id, &mock_server)
.await?;
// ---------------------- trace context propagation
// Testing for a trace containing the right tracestate with m and psr for DD and a sampled trace, so it should be sent to the otlp exporter with sampling priority set to 1
// And it should also send the trace to subgraph as the trace is sampled
let id = TraceId::from_hex("0af7651916cd43dd8448eb211c80319c").unwrap();
let headers: HashMap<String, String> = [
(
"traceparent".to_string(),
"00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01".to_string(),
),
("tracestate".to_string(), "m=1,psr=1".to_string()),
]
.into();

let (_id, _) = router.execute_untraced_query(&query, Some(headers)).await;
Spec::builder()
.services(["router", "subgraph"].into())
.priority_sampled("1")
.build()
.validate_trace(id, &mock_server)
.await?;
// ----------------------
// Testing for a trace containing the right tracestate with m and psr for DD and an unsampled trace, so it should be sent to the otlp exporter with sampling priority set to 0
// But it shouldn't send the trace to subgraph as the trace is originally not sampled, the main goal is to measure it at the DD agent level
let id = TraceId::from_hex("0af7651916cd43dd8448eb211c80319d").unwrap();
let headers: HashMap<String, String> = [
(
"traceparent".to_string(),
"00-0af7651916cd43dd8448eb211c80319d-b7ad6b7169203331-00".to_string(),
),
("tracestate".to_string(), "m=1,psr=0".to_string()),
]
.into();

let (_id, _) = router.execute_untraced_query(&query, Some(headers)).await;
Spec::builder()
.services(["router"].into())
.priority_sampled("0")
.build()
.validate_trace(id, &mock_server)
.await?;
// ----------------------
// Testing for a trace containing a tracestate m and psr with psr set to 1 for DD and an unsampled trace, so it should be sent to the otlp exporter with sampling priority set to 1
// But it should send the trace to subgraph as the tracestate contains a psr set to 1
let id = TraceId::from_hex("0af7651916cd43dd8448eb211c80319e").unwrap();
let headers: HashMap<String, String> = [
(
"traceparent".to_string(),
"00-0af7651916cd43dd8448eb211c80319e-b7ad6b7169203331-00".to_string(),
),
("tracestate".to_string(), "m=1,psr=1".to_string()),
]
.into();

let (_id, _) = router.execute_untraced_query(&query, Some(headers)).await;
Spec::builder()
.services(["router", "subgraph"].into())
.priority_sampled("1")
.build()
.validate_trace(id, &mock_server)
.await?;

// Be careful if you add the same kind of test crafting your own trace id, make sure to increment the previous trace id by 1 if not you'll receive all the previous spans tested with the same trace id before
router.graceful_shutdown().await;
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn test_untraced_request_no_sample_datadog_agent() -> Result<(), BoxError> {
if !graph_os_enabled() {
panic!("Error: test skipped because GraphOS is not enabled");
}
let mock_server = mock_otlp_server().await;
let config = include_str!("fixtures/otlp_datadog_agent_no_sample.router.yaml")
.replace("<otel-collector-endpoint>", &mock_server.uri());
Expand All @@ -144,7 +277,7 @@ async fn test_untraced_request_no_sample_datadog_agent() -> Result<(), BoxError>
router.assert_started().await;

let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}});
let (id, _) = router.execute_untraced_query(&query).await;
let (id, _) = router.execute_untraced_query(&query, None).await;
Spec::builder()
.services(["router"].into())
.priority_sampled("0")
Expand All @@ -157,6 +290,9 @@ async fn test_untraced_request_no_sample_datadog_agent() -> Result<(), BoxError>

#[tokio::test(flavor = "multi_thread")]
async fn test_untraced_request_sample_datadog_agent() -> Result<(), BoxError> {
if !graph_os_enabled() {
panic!("Error: test skipped because GraphOS is not enabled");
}
let mock_server = mock_otlp_server().await;
let config = include_str!("fixtures/otlp_datadog_agent_sample.router.yaml")
.replace("<otel-collector-endpoint>", &mock_server.uri());
Expand All @@ -166,7 +302,7 @@ async fn test_untraced_request_sample_datadog_agent() -> Result<(), BoxError> {
router.assert_started().await;

let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}});
let (id, _) = router.execute_untraced_query(&query).await;
let (id, _) = router.execute_untraced_query(&query, None).await;
Spec::builder()
.services(["router"].into())
.priority_sampled("1")
Expand All @@ -179,6 +315,9 @@ async fn test_untraced_request_sample_datadog_agent() -> Result<(), BoxError> {

#[tokio::test(flavor = "multi_thread")]
async fn test_untraced_request_sample_datadog_agent_unsampled() -> Result<(), BoxError> {
if !graph_os_enabled() {
panic!("Error: test skipped because GraphOS is not enabled");
}
let mock_server = mock_otlp_server().await;
let config = include_str!("fixtures/otlp_datadog_agent_sample_no_sample.router.yaml")
.replace("<otel-collector-endpoint>", &mock_server.uri());
Expand All @@ -194,7 +333,7 @@ async fn test_untraced_request_sample_datadog_agent_unsampled() -> Result<(), Bo
router.assert_started().await;

let query = json!({"query":"query ExampleQuery {topProducts{name}}","variables":{}});
let (id, _) = router.execute_untraced_query(&query).await;
let (id, _) = router.execute_untraced_query(&query, None).await;
Spec::builder()
.services(["router"].into())
.priority_sampled("0")
Expand All @@ -208,7 +347,7 @@ async fn test_untraced_request_sample_datadog_agent_unsampled() -> Result<(), Bo
#[tokio::test(flavor = "multi_thread")]
async fn test_priority_sampling_propagated() -> Result<(), BoxError> {
if !graph_os_enabled() {
return Ok(());
panic!("Error: test skipped because GraphOS is not enabled");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will mean users that don't have a graphOS key will not be able to have passing tests

}
let mock_server = mock_otlp_server().await;
let config = include_str!("fixtures/otlp_datadog_propagation.router.yaml")
Expand Down Expand Up @@ -534,7 +673,6 @@ impl Spec {
.filter_map(|service| service.as_string())
.collect();
tracing::debug!("found services {:?}", actual_services);

let expected_services = self
.services
.iter()
Expand Down
Loading