Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

refactor tracing #492

Merged
merged 1 commit into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions rust/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
use std::{path::PathBuf, process};
use structopt::StructOpt;
use tracing_subscriber::*;
use xaynet::{
rest,
services,
settings::Settings,
state_machine::{requests::Request, StateMachine},
utils::trace::Traced,
};
use xaynet::{rest, services, settings::Settings, state_machine::StateMachine};

#[derive(Debug, StructOpt)]
#[structopt(name = "Coordinator")]
Expand Down Expand Up @@ -41,7 +35,7 @@ async fn main() {
sodiumoxide::init().unwrap();

let (state_machine, requests_tx, event_subscriber) =
StateMachine::<Traced<Request>>::new(pet_settings, mask_settings, model_settings).unwrap();
StateMachine::new(pet_settings, mask_settings, model_settings).unwrap();
let fetcher = services::fetcher(&event_subscriber);
let message_handler = services::message_handler(&event_subscriber, requests_tx);

Expand Down
2 changes: 1 addition & 1 deletion rust/src/client/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Proxy {
/// handling the message.
/// * Returns `NetworkErr` if a network error occurs while posting the PET
/// message.
pub async fn post_message(&self, msg: Vec<u8>) -> Result<(), ClientError> {
pub async fn post_message(&mut self, msg: Vec<u8>) -> Result<(), ClientError> {
match self {
InMem(_, hdl) => hdl
.handle_message(msg)
Expand Down
2 changes: 1 addition & 1 deletion rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ use self::crypto::{
/// An error related to insufficient system entropy for secrets at program startup.
pub struct InitError;

#[derive(Debug, PartialEq, Display)]
#[derive(Debug, Display, Error)]
/// Errors related to the PET protocol.
pub enum PetError {
InvalidMessage,
Expand Down
17 changes: 17 additions & 0 deletions rust/src/message/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use std::borrow::Borrow;

use anyhow::{anyhow, Context};
use tracing::Span;

use crate::{
certificate::Certificate,
Expand All @@ -28,6 +29,7 @@ use crate::{
traits::{FromBytes, ToBytes},
DecodeError,
},
utils::Traceable,
LocalSeedDict,
};

Expand Down Expand Up @@ -195,3 +197,18 @@ impl<'a, 'b> MessageOpen<'a, 'b> {
Ok(message)
}
}

impl Traceable for MessageOwned {
fn make_span(&self) -> Span {
let message_type = match self.payload {
PayloadOwned::Sum(_) => "sum",
PayloadOwned::Update(_) => "update",
PayloadOwned::Sum2(_) => "sum2",
};
error_span!(
"MessageOwned",
message_type = message_type,
message_length = self.buffer_length()
)
}
}
23 changes: 9 additions & 14 deletions rust/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ pub async fn serve<F, MH>(
pet_message_handler: MH,
) where
F: Fetcher + Sync + Send + 'static,
MH: PetMessageHandler + Sync + Send + 'static,
MH: PetMessageHandler + Sync + Send + 'static + Clone,
{
let fetcher = Arc::new(fetcher);
let message_handler = Arc::new(pet_message_handler);
let message = warp::path!("message")
.and(warp::post())
.and(warp::body::bytes())
.and(with_message_handler(message_handler.clone()))
.and(with_message_handler(pet_message_handler.clone()))
.and_then(handle_message);

let sum_dict = warp::path!("sums")
Expand Down Expand Up @@ -81,15 +80,11 @@ pub async fn serve<F, MH>(
/// Handles and responds to a PET message.
async fn handle_message<MH: PetMessageHandler>(
body: Bytes,
handler: Arc<MH>,
mut handler: MH,
) -> Result<impl warp::Reply, Infallible> {
let _ = handler
.as_ref()
.handle_message(body.to_vec())
.await
.map_err(|e| {
warn!("failed to handle message: {:?}", e);
});
let _ = handler.handle_message(body.to_vec()).await.map_err(|e| {
warn!("failed to handle message: {:?}", e);
});
Ok(warp::reply())
}

Expand Down Expand Up @@ -227,9 +222,9 @@ async fn handle_params<F: Fetcher>(fetcher: Arc<F>) -> Result<impl warp::Reply,
}

/// Converts a PET message handler into a `warp` filter.
fn with_message_handler<MH: PetMessageHandler + Send + Sync + 'static>(
handler: Arc<MH>,
) -> impl Filter<Extract = (Arc<MH>,), Error = Infallible> + Clone {
fn with_message_handler<MH: PetMessageHandler + Send + Sync + 'static + Clone>(
handler: MH,
) -> impl Filter<Extract = (MH,), Error = Infallible> + Clone {
warp::any().map(move || handler.clone())
}

Expand Down
50 changes: 31 additions & 19 deletions rust/src/services/messages/message_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use rayon::ThreadPool;
use thiserror::Error;
use tokio::sync::oneshot;
use tower::Service;
use tracing::Span;

use crate::{
crypto::{encrypt::EncryptKeyPair, ByteObject},
Expand All @@ -23,7 +24,7 @@ use crate::{
events::{EventListener, EventSubscriber},
phases::PhaseName,
},
utils::trace::{Traceable, Traced},
utils::{Request, Traceable},
Signature,
};

Expand Down Expand Up @@ -58,16 +59,18 @@ impl MessageParserService {
}
}

/// Request type for the [`MessageParserService`].
///
/// It contains the encrypted message.
/// A buffer that represents an encrypted message.
#[derive(From, Debug)]
pub struct MessageParserRequest(Vec<u8>);

/// Response type for the [`MessageParserService`].
///
/// It contains the parsed message.
pub type MessageParserResponse = Result<MessageOwned, MessageParserError>;
pub struct RawMessage<T: AsRef<[u8]>>(T);
little-dude marked this conversation as resolved.
Show resolved Hide resolved

impl<T> Traceable for RawMessage<T>
where
T: AsRef<[u8]>,
{
fn make_span(&self) -> Span {
error_span!("raw_message", payload_len = self.0.as_ref().len())
little-dude marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Error type for the [`MessageParserService`]
#[derive(Debug, Error)]
Expand All @@ -93,7 +96,16 @@ pub enum MessageParserError {
InternalError(String),
}

impl Service<Traced<MessageParserRequest>> for MessageParserService {
/// Response type for the [`MessageParserService`]
pub type MessageParserResponse = Result<MessageOwned, MessageParserError>;

/// Request type for the [`MessageParserService`]
pub type MessageParserRequest<T> = Request<RawMessage<T>>;

impl<T> Service<MessageParserRequest<T>> for MessageParserService
where
T: AsRef<[u8]> + Send + 'static,
{
type Response = MessageParserResponse;
type Error = std::convert::Infallible;

Expand All @@ -107,7 +119,7 @@ impl Service<Traced<MessageParserRequest>> for MessageParserService {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Traced<MessageParserRequest>) -> Self::Future {
fn call(&mut self, req: MessageParserRequest<T>) -> Self::Future {
debug!("retrieving the current keys and current phase");
let keys_ev = self.keys_events.get_latest();
let phase_ev = self.phase_events.get_latest();
Expand All @@ -130,9 +142,9 @@ impl Service<Traced<MessageParserRequest>> for MessageParserService {

trace!("spawning pre-processor handler on thread-pool");
self.thread_pool.spawn(move || {
let span = req.span().clone();
let _enter = span.enter();
let resp = handler.call(req.into_inner().0);
let span = req.span();
let _span_guard = span.enter();
let resp = handler.call(req.into_inner());
let _ = tx.send(resp);
});
Either::Right(Box::pin(async move {
Expand All @@ -156,9 +168,9 @@ struct Handler {
impl Handler {
/// Process the request. `data` is the encrypted PET message to
/// process.
fn call(self, data: Vec<u8>) -> Result<MessageOwned, MessageParserError> {
fn call<T: AsRef<[u8]>>(self, data: RawMessage<T>) -> MessageParserResponse {
info!("decrypting message");
let raw = self.decrypt(data)?;
let raw = self.decrypt(&data.0.as_ref())?;

info!("parsing message header");
let header = self.parse_header(raw.as_slice())?;
Expand All @@ -177,11 +189,11 @@ impl Handler {
}

/// Decrypt the given payload with the coordinator secret key
fn decrypt(&self, encrypted_message: Vec<u8>) -> Result<Vec<u8>, MessageParserError> {
fn decrypt(&self, encrypted_message: &[u8]) -> Result<Vec<u8>, MessageParserError> {
Ok(self
.keys
.secret
.decrypt(&encrypted_message.as_ref(), &self.keys.public)
.decrypt(&encrypted_message, &self.keys.public)
.map_err(|_| MessageParserError::Decrypt)?)
}

Expand Down
Loading