Skip to content

Commit

Permalink
Expose bundler metrics via otlp exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
garryod committed Jan 3, 2024
1 parent 17cab20 commit cc6e36d
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ services:
DATABASE_URL: mysql://root:rootpassword@ispyb/ispyb_build
BUNDLER_DATABASE_URL: mysql://root:rootpassword@ispyb/ispyb_build
BUNDLER_LOG_LEVEL: DEBUG
BUNDLER_TRACING_URL: http://collector:4317
BUNDLER_OTEL_COLLECTOR_URL: http://collector:4317

opa:
image: docker.io/openpolicyagent/opa:0.59.0
Expand Down
1 change: 1 addition & 0 deletions bundler/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion bundler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ flate2 = { version = "1.0.28" }
headers = { version = "0.4.0" }
humantime = { version = "2.1.0" }
opentelemetry = { version = "0.21.0" }
opentelemetry-otlp = { version = "0.14.0", features = ["tokio"] }
opentelemetry-otlp = { version = "0.14.0", features = ["metrics", "tokio"] }
opentelemetry-semantic-conventions = { version = "0.13.0" }
opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"] }
serde = { version = "1.0.193", features = ["derive"] }
serde_json = { version = "1.0.108" }
Expand Down
100 changes: 65 additions & 35 deletions bundler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ use tokio::{
sync::RwLock,
time::{sleep_until, Instant},
};
use tower_http::trace::TraceLayer;
use tower_http::trace::{DefaultOnFailure, DefaultOnRequest, DefaultOnResponse, TraceLayer};
use tracing::instrument;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, Layer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use url::Url;

/// A wrapper containing a [`Bundle`] and the serialzied gzipped archive
Expand Down Expand Up @@ -94,24 +94,29 @@ struct Cli {
#[arg(long, env = "BUNDLER_POLLING_INTERVAL", default_value_t=humantime::Duration::from(Duration::from_secs(60)))]
polling_interval: humantime::Duration,
/// The URL of the OpenTelemetry collector to send traces to
#[arg(long, env = "BUNDLER_TRACING_URL")]
tracing_url: Option<Url>,
#[arg(long, env = "BUNDLER_OTEL_COLLECTOR_URL")]
otel_collector_url: Option<Url>,
}

#[tokio::main]
async fn main() {
dotenvy::dotenv().ok();
let args = Cli::parse();

setup_telemetry(args.log_level, args.tracing_url).unwrap();
setup_telemetry(args.log_level, args.otel_collector_url).unwrap();

let ispyb_pool = connect_ispyb(args.database_url).await.unwrap();
let current_bundle = fetch_initial_bundle(&ispyb_pool).await.unwrap();
let app = Router::new()
.route("/bundle.tar.gz", get(bundle_endpoint))
.route_layer(RequireBearerLayer::new(args.require_token))
.fallback(fallback_endpoint)
.layer(TraceLayer::new_for_http())
.layer(
TraceLayer::new_for_http()
.on_request(DefaultOnRequest::default().level(tracing::Level::INFO))
.on_response(DefaultOnResponse::new().level(tracing::Level::INFO))
.on_failure(DefaultOnFailure::new().level(tracing::Level::INFO)),
)
.with_state(current_bundle.clone());

let mut tasks = tokio::task::JoinSet::new();
Expand All @@ -127,37 +132,61 @@ async fn main() {
/// Sets up Logging & Tracing using jaeger if available
fn setup_telemetry(
log_level: tracing::Level,
tracing_url: Option<Url>,
otel_collector_url: Option<Url>,
) -> Result<(), anyhow::Error> {
let log_layer = tracing_subscriber::fmt::layer().with_filter(
tracing_subscriber::filter::LevelFilter::from_level(log_level),
);
if let Some(tracing_url) = tracing_url {
let tracer_layer = tracing_opentelemetry::layer().with_tracer(
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(tracing_url),
)
.with_trace_config(opentelemetry_sdk::trace::config().with_resource(
opentelemetry_sdk::Resource::new(vec![opentelemetry::KeyValue::new(
"service.name",
"bundler",
)]),
))
.install_batch(opentelemetry_sdk::runtime::Tokio)?,
);
tracing_subscriber::Registry::default()
.with(log_layer)
.with(tracer_layer)
.init();
let level_filter = tracing_subscriber::filter::LevelFilter::from_level(log_level);
let log_layer = tracing_subscriber::fmt::layer();
let service_name_resource = opentelemetry_sdk::Resource::new(vec![
opentelemetry::KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
built_info::PKG_NAME,
),
opentelemetry::KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
built_info::PKG_VERSION,
),
]);
let (metrics_layer, tracing_layer) = if let Some(otel_collector_url) = otel_collector_url {
(
Some(tracing_opentelemetry::MetricsLayer::new(
opentelemetry_otlp::new_pipeline()
.metrics(opentelemetry_sdk::runtime::Tokio)
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otel_collector_url.clone()),
)
.with_resource(service_name_resource.clone())
.with_period(Duration::from_secs(10))
.build()?,
)),
Some(
tracing_opentelemetry::layer().with_tracer(
opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(otel_collector_url),
)
.with_trace_config(
opentelemetry_sdk::trace::config().with_resource(service_name_resource),
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?,
),
),
)
} else {
tracing_subscriber::Registry::default()
.with(log_layer)
.init();
}
(None, None)
};

tracing_subscriber::Registry::default()
.with(level_filter)
.with(log_layer)
.with(metrics_layer)
.with(tracing_layer)
.init();

Ok(())
}

Expand Down Expand Up @@ -208,6 +237,7 @@ async fn bundle_endpoint(
State(current_bundle): State<CurrentBundle>,
if_none_match: Option<TypedHeader<IfNoneMatch>>,
) -> impl IntoResponse {
tracing::info!(counter.bundle_requests = 1);
let etag = ETag::from_str(&format!(
r#""{}""#,
current_bundle.as_ref().read().await.bundle.revision()
Expand Down

0 comments on commit cc6e36d

Please sign in to comment.