Skip to content

Commit

Permalink
Merge pull request #24 from dfinity/igornovg/deps
Browse files Browse the repository at this point in the history
fix(BOUN-1209): fix vector, log backend, disable signed queries
  • Loading branch information
blind-oracle committed Aug 19, 2024
2 parents 32e00ef + 11bbe9a commit f9e4627
Show file tree
Hide file tree
Showing 12 changed files with 246 additions and 239 deletions.
160 changes: 80 additions & 80 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[toolchain]
channel = "1.79.0"
channel = "1.80.0"
targets = ["x86_64-unknown-linux-musl"]
profile = "minimal"
15 changes: 10 additions & 5 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,18 @@ pub struct Ic {
#[clap(env, long)]
pub ic_root_key: Option<PathBuf>,

/// Maximum mumber of request retries in the transport layer for both network- and http-related failures.
/// Maximum number of request retries for connection failures.
#[clap(env, long, default_value = "5")]
pub ic_max_request_retries: u32,

/// Disable response verification for the IC requests.
#[clap(env, long, default_value = "false")]
#[clap(env, long)]
pub ic_unsafe_disable_response_verification: bool,

/// Disable replica-signed queries in the agent.
#[clap(env, long, default_value = "false")]
pub ic_unsafe_disable_replica_signed_queries: bool,
/// Enable replica-signed queries in the agent.
/// Since the responses' certificates are anyway validated - it makes the signed queries redundant.
#[clap(env, long)]
pub ic_enable_replica_signed_queries: bool,
}

#[derive(Args)]
Expand Down Expand Up @@ -401,6 +402,10 @@ pub struct Vector {
/// If the buffer is full then new events will be dropped.
#[clap(env, long, default_value = "131072")]
pub log_vector_buffer: usize,

/// Vector HTTP request timeout for a batch flush
#[clap(env, long, default_value = "30s", value_parser = parse_duration)]
pub log_vector_timeout: Duration,
}

#[derive(Args)]
Expand Down
4 changes: 2 additions & 2 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub async fn main(cli: &Cli) -> Result<(), Error> {
ENV.set(cli.misc.env.clone()).unwrap();
HOSTNAME.set(cli.misc.hostname.clone()).unwrap();

if cli.ic.ic_unsafe_disable_replica_signed_queries {
warn!("Replica-signed queries are disabled");
if cli.ic.ic_enable_replica_signed_queries {
warn!("Replica-signed queries are enabled");
}

if cli.ic.ic_unsafe_disable_response_verification {
Expand Down
118 changes: 42 additions & 76 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
},
routing::{
error_cause::ErrorCause,
ic::{BNResponseMetadata, IcResponseStatus},
ic::{BNRequestMetadata, BNResponseMetadata, IcResponseStatus},
middleware::{geoip::CountryCode, request_id::RequestId},
CanisterId, RequestCtx, RequestType, RequestTypeApi,
},
Expand Down Expand Up @@ -244,8 +244,14 @@ pub async fn middleware(
.unwrap_or_default();
let status = response.status().as_u16();

// IC request metadata
let req_meta = response
.extensions_mut()
.remove::<BNRequestMetadata>()
.unwrap_or_default();

// IC response metadata
let meta = response
let resp_meta = response
.extensions_mut()
.remove::<BNResponseMetadata>()
.unwrap_or_default();
Expand Down Expand Up @@ -288,7 +294,8 @@ pub async fn middleware(
let response_size = rx.await.unwrap_or(Ok(0)).unwrap_or(0);

let duration_full = start.elapsed();
let meta = meta.clone();
let req_meta = req_meta.clone();
let resp_meta = resp_meta.clone();

let (tls_version, tls_cipher, tls_handshake) =
tls_info.as_ref().map_or(("", "", Duration::ZERO), |x| {
Expand Down Expand Up @@ -365,16 +372,16 @@ pub async fn middleware(
canister_id,
ic_streaming,
ic_upgrade,
ic_node_id = meta.node_id,
ic_subnet_id = meta.subnet_id,
ic_subnet_type = meta.subnet_type,
ic_method_name = meta.method_name,
ic_sender = meta.sender,
ic_canister_id_cbor = meta.canister_id_cbor,
ic_error_cause = meta.error_cause,
ic_retries = meta.retries,
ic_cache_status = meta.cache_status,
ic_cache_bypass_reason = meta.cache_bypass_reason,
ic_node_id = resp_meta.node_id,
ic_subnet_id = resp_meta.subnet_id,
ic_subnet_type = resp_meta.subnet_type,
ic_method_name = resp_meta.method_name,
ic_sender = resp_meta.sender,
ic_canister_id_cbor = resp_meta.canister_id_cbor,
ic_error_cause = resp_meta.error_cause,
ic_retries = resp_meta.retries,
ic_cache_status = resp_meta.cache_status,
ic_cache_bypass_reason = resp_meta.cache_bypass_reason,
error = error_cause,
req_size = request_size,
resp_size = response_size,
Expand All @@ -386,11 +393,12 @@ pub async fn middleware(
conn_reqs = conn_req_count,
cache_status = cache_status_str,
cache_bypass_reason = cache_bypass_reason_str,
backend = req_meta.backend,
);
}

if let Some(v) = &state.clickhouse {
let meta = meta.clone();
let resp_meta = resp_meta.clone();

let row = Row {
env: ENV.get().unwrap().as_str(),
Expand All @@ -408,16 +416,16 @@ pub async fn middleware(
canister_id: canister_id.clone(),
ic_streaming,
ic_upgrade,
ic_node_id: meta.node_id,
ic_subnet_id: meta.subnet_id,
ic_subnet_type: meta.subnet_type,
ic_method_name: meta.method_name,
ic_sender: meta.sender,
ic_canister_id_cbor: meta.canister_id_cbor,
ic_error_cause: meta.error_cause,
ic_retries: meta.retries.parse().unwrap_or(0),
ic_cache_status: meta.cache_status,
ic_cache_bypass_reason: meta.cache_bypass_reason,
ic_node_id: resp_meta.node_id,
ic_subnet_id: resp_meta.subnet_id,
ic_subnet_type: resp_meta.subnet_type,
ic_method_name: resp_meta.method_name,
ic_sender: resp_meta.sender,
ic_canister_id_cbor: resp_meta.canister_id_cbor,
ic_error_cause: resp_meta.error_cause,
ic_retries: resp_meta.retries.parse().unwrap_or(0),
ic_cache_status: resp_meta.cache_status,
ic_cache_bypass_reason: resp_meta.cache_bypass_reason,
error_cause: error_cause.clone(),
tls_version: tls_version.into(),
tls_cipher: tls_cipher.into(),
Expand All @@ -438,48 +446,6 @@ pub async fn middleware(
if let Some(v) = &state.vector {
// TODO use proper names when the DB is updated

// let val = json!({
// "env": ENV.get().unwrap().as_str(),
// "hostname": HOSTNAME.get().unwrap().as_str(),
// "date": timestamp.unix_timestamp(),
// "request_id": request_id.to_string(),
// "conn_id": conn_info.id.to_string(),
// "method": method,
// "http_version": http_version,
// "request_type": request_type,
// "geo_country_code": country_code,
// "status": status,
// "domain": domain,
// "host": host,
// "path": path,
// "canister_id": canister_id,
// "ic_streaming": ic_streaming,
// "ic_upgrade": ic_upgrade,
// "ic_node_id": meta.node_id,
// "ic_subnet_id": meta.subnet_id,
// "ic_subnet_type": meta.subnet_type,
// "ic_method_name": meta.method_name,
// "ic_sender": meta.sender,
// "ic_canister_id_cbor": meta.canister_id_cbor,
// "ic_error_cause": meta.error_cause,
// "ic_retries": meta.retries,
// "ic_cache_status": meta.cache_status,
// "ic_cache_bypass_reason": meta.cache_bypass_reason,
// "error_cause": error_cause,
// "tls_version": tls_version,
// "tls_cipher": tls_cipher,
// "remote_addr": conn_info.remote_addr.to_string(),
// "req_rcvd": request_size,
// "req_sent": response_size,
// "conn_rcvd": conn_rcvd,
// "conn_sent": conn_sent,
// "duration": duration.as_secs_f64(),
// "duration_full": duration_full.as_secs_f64(),
// "duration_conn": conn_info.accepted_at.elapsed().as_secs_f64(),
// "cache_status": cache_status_str,
// "cache_bypass_reason": cache_bypass_reason_str,
// });

// Nginx-compatible log entry
let val = json!({
"env": ENV.get().unwrap().as_str(),
Expand All @@ -498,15 +464,15 @@ pub async fn middleware(
"geo_country_code": country_code,
"request_uri": uri.path_and_query().map(|x| x.as_str()).unwrap_or_default(),
"query_string": uri.query().unwrap_or_default(),
"ic_node_id": meta.node_id,
"ic_subnet_id": meta.subnet_id,
"ic_method_name": meta.method_name,
"ic_node_id": resp_meta.node_id,
"ic_subnet_id": resp_meta.subnet_id,
"ic_method_name": resp_meta.method_name,
"ic_request_type": request_type,
"ic_sender": meta.sender,
"ic_sender": resp_meta.sender,
"ic_canister_id": canister_id,
"ic_canister_id_cbor": meta.canister_id_cbor,
"ic_error_cause": meta.error_cause,
"retries": meta.retries,
"ic_canister_id_cbor": resp_meta.canister_id_cbor,
"ic_error_cause": resp_meta.error_cause,
"retries": resp_meta.retries,
"error_cause": error_cause,
"ssl_protocol": tls_version,
"ssl_cipher": tls_cipher,
Expand All @@ -516,10 +482,10 @@ pub async fn middleware(
"remote_addr": conn_info.remote_addr.ip().to_string(),
"request_time": duration_full.as_secs_f64(),
"request_time_headers": 0,
"cache_status": meta.cache_status,
"cache_status": resp_meta.cache_status,
"cache_status_nginx": cache_status_str,
"cache_bypass_reason": meta.cache_bypass_reason,
"upstream": "127.0.0.1",
"cache_bypass_reason": resp_meta.cache_bypass_reason,
"upstream": req_meta.backend.unwrap_or_default(),
});

v.send(val);
Expand Down
66 changes: 31 additions & 35 deletions src/metrics/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use reqwest::{
use tokio::{
select,
sync::mpsc::{channel, Receiver, Sender},
time::interval,
time::{interval, sleep},
};
use tokio_util::{
codec::{Encoder, LengthDelimitedCodec},
Expand Down Expand Up @@ -56,6 +56,7 @@ impl EventEncoder {
Ok(())
}

/// Encodes the provided batch into wire format leaving the provided Vec empty
fn encode_batch(&mut self, batch: &mut Vec<Event>) -> Result<Bytes, Error> {
let mut body = BytesMut::new();
for event in batch.drain(..) {
Expand Down Expand Up @@ -91,6 +92,7 @@ impl Vector {
rx,
token: token.child_token(),
encoder: EventEncoder::new(),
timeout: cli.log_vector_timeout,
};

let tracker = TaskTracker::new();
Expand Down Expand Up @@ -120,6 +122,7 @@ struct VectorActor {
batch: Vec<Event>,

client: Arc<dyn http::Client>,
timeout: Duration,
url: Url,
auth: Option<HeaderValue>,

Expand Down Expand Up @@ -153,6 +156,7 @@ impl VectorActor {
}

*request.body_mut() = Some(body.into());
*request.timeout_mut() = Some(self.timeout);

let response = self
.client
Expand All @@ -174,46 +178,37 @@ impl VectorActor {

// Encode the batch
let mut encoder = self.encoder.clone();
let body = encoder
.encode_batch(&mut self.batch)
.context("unable to encode batch")?;
let Ok(body) = encoder.encode_batch(&mut self.batch) else {
self.batch.clear();
return Err(anyhow!("unable to encode batch, dropping it"));
};

// Retry until we succeed or token is cancelled
// TODO make configurable
let mut interval = interval(Duration::from_secs(1));
let mut retries = 3;
let drain = self.token.is_cancelled();
// Retry
// TODO make configurable?
let mut interval = Duration::from_millis(200);
let mut retries = 5;

while retries > 0 {
// Bytes is cheap to clone
if let Err(e) = self.send(body.clone()).await {
warn!("Vector: unable to flush batch: {e:#}");
} else {
return Ok(());
}

loop {
select! {
biased;

// If we're draining then the token is already cancelled, so exclude this branch
() = self.token.cancelled(), if !drain => {
warn!("Vector: exiting, aborting batch sending");
_ = self.token.cancelled() => {
return Ok(());
}

_ = interval.tick() => {
// Bytes is cheap to clone
if let Err(e) = self.send(body.clone()).await {
warn!("Vector: unable to flush batch: {e:#}");

// Limit the number of retries when draining
if drain {
retries -= 1;
if retries == 0 {
return Err(e);
}
}

continue;
}

return Ok(());
}
},
_ = sleep(interval) => {}
}

// Back off a bit
retries -= 1;
interval *= 2;
}

Err(anyhow!("unable to flush batch: retries exhausted"))
}

async fn run(mut self, flush_interval: Duration) {
Expand All @@ -226,6 +221,7 @@ impl VectorActor {

() = self.token.cancelled() => {
warn!("Vector: stopping, draining");

// Close the channel
self.rx.close();

Expand Down
Loading

0 comments on commit f9e4627

Please sign in to comment.