Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add the WatchState API #582

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions console-api/proto/instrument.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ service Instrument {
rpc WatchUpdates(InstrumentRequest) returns (stream Update) {}
// Produces a stream of updates describing the activity of a specific task.
rpc WatchTaskDetails(TaskDetailsRequest) returns (stream tasks.TaskDetails) {}
// Produces a stream of state of the aggregator.
rpc WatchState(StateRequest) returns (stream State) {}
// Registers that the console observer wants to pause the stream.
rpc Pause(PauseRequest) returns (PauseResponse) {}
// Registers that the console observer wants to resume the stream.
Expand Down Expand Up @@ -72,6 +74,23 @@ message Update {
common.RegisterMetadata new_metadata = 5;
}

// StateRequest requests the current state of the aggregator.
message StateRequest {
}

// State carries the current state of the aggregator.
message State {
Temporality temporality = 1;
}

// The time "state" of the aggregator.
enum Temporality {
// The aggregator is currently live.
LIVE = 0;
// The aggregator is currently paused.
PAUSED = 1;
}

// `PauseResponse` is the value returned after a pause request.
message PauseResponse {
}
Expand Down
128 changes: 128 additions & 0 deletions console-api/src/generated/rs.tokio.console.instrument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ pub struct Update {
#[prost(message, optional, tag = "5")]
pub new_metadata: ::core::option::Option<super::common::RegisterMetadata>,
}
/// StateRequest requests the current state of the aggregator.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct StateRequest {}
/// State carries the current state of the aggregator.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct State {
#[prost(enumeration = "Temporality", tag = "1")]
pub temporality: i32,
}
/// `PauseResponse` is the value returned after a pause request.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
Expand All @@ -63,6 +74,35 @@ pub struct PauseResponse {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct ResumeResponse {}
/// The time "state" of the aggregator.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum Temporality {
/// The aggregator is currently live.
Live = 0,
/// The aggregator is currently paused.
Paused = 1,
}
impl Temporality {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
Temporality::Live => "LIVE",
Temporality::Paused => "PAUSED",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"LIVE" => Some(Self::Live),
"PAUSED" => Some(Self::Paused),
_ => None,
}
}
}
/// Generated client implementations.
pub mod instrument_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
Expand Down Expand Up @@ -211,6 +251,37 @@ pub mod instrument_client {
);
self.inner.server_streaming(req, path, codec).await
}
/// Produces a stream of state of the aggregator.
pub async fn watch_state(
&mut self,
request: impl tonic::IntoRequest<super::StateRequest>,
) -> std::result::Result<
tonic::Response<tonic::codec::Streaming<super::State>>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/rs.tokio.console.instrument.Instrument/WatchState",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(
GrpcMethod::new(
"rs.tokio.console.instrument.Instrument",
"WatchState",
),
);
self.inner.server_streaming(req, path, codec).await
}
/// Registers that the console observer wants to pause the stream.
pub async fn pause(
&mut self,
Expand Down Expand Up @@ -301,6 +372,17 @@ pub mod instrument_server {
tonic::Response<Self::WatchTaskDetailsStream>,
tonic::Status,
>;
/// Server streaming response type for the WatchState method.
type WatchStateStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::State, tonic::Status>,
>
+ Send
+ 'static;
/// Produces a stream of state of the aggregator.
async fn watch_state(
&self,
request: tonic::Request<super::StateRequest>,
) -> std::result::Result<tonic::Response<Self::WatchStateStream>, tonic::Status>;
/// Registers that the console observer wants to pause the stream.
async fn pause(
&self,
Expand Down Expand Up @@ -481,6 +563,52 @@ pub mod instrument_server {
};
Box::pin(fut)
}
"/rs.tokio.console.instrument.Instrument/WatchState" => {
#[allow(non_camel_case_types)]
struct WatchStateSvc<T: Instrument>(pub Arc<T>);
impl<
T: Instrument,
> tonic::server::ServerStreamingService<super::StateRequest>
for WatchStateSvc<T> {
type Response = super::State;
type ResponseStream = T::WatchStateStream;
type Future = BoxFuture<
tonic::Response<Self::ResponseStream>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::StateRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as Instrument>::watch_state(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = WatchStateSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/rs.tokio.console.instrument.Instrument/Pause" => {
#[allow(non_camel_case_types)]
struct PauseSvc<T: Instrument>(pub Arc<T>);
Expand Down
23 changes: 12 additions & 11 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub struct Aggregator {
poll_ops: Vec<proto::resources::PollOp>,

/// The time "state" of the aggregator, such as paused or live.
temporality: Temporality,
temporality: proto::instrument::Temporality,

/// Used to anchor monotonic timestamps to a base `SystemTime`, to produce a
/// timestamp that can be sent over the wire.
Expand All @@ -102,11 +102,6 @@ pub(crate) struct Flush {
triggered: AtomicBool,
}

#[derive(Debug)]
enum Temporality {
Live,
Paused,
}
// Represent static data for resources
struct Resource {
id: Id,
Expand Down Expand Up @@ -162,7 +157,7 @@ impl Aggregator {
async_ops: IdData::default(),
async_op_stats: IdData::default(),
poll_ops: Default::default(),
temporality: Temporality::Live,
temporality: proto::instrument::Temporality::Live,
base_time,
}
}
Expand All @@ -179,8 +174,8 @@ impl Aggregator {
// if the flush interval elapses, flush data to the client
_ = publish.tick() => {
match self.temporality {
Temporality::Live => true,
Temporality::Paused => false,
proto::instrument::Temporality::Live => true,
proto::instrument::Temporality::Paused => false,
}
}

Expand All @@ -199,11 +194,17 @@ impl Aggregator {
Some(Command::WatchTaskDetail(watch_request)) => {
self.add_task_detail_subscription(watch_request);
},
Some(Command::WatchState(subscription)) => {
let state = proto::instrument::State {
temporality: self.temporality.into(),
};
subscription.update(&state);
},
Some(Command::Pause) => {
self.temporality = Temporality::Paused;
self.temporality = proto::instrument::Temporality::Paused;
}
Some(Command::Resume) => {
self.temporality = Temporality::Live;
self.temporality = proto::instrument::Temporality::Live;
}
None => {
tracing::debug!("rpc channel closed, terminating");
Expand Down
15 changes: 15 additions & 0 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ struct Watch<T>(mpsc::Sender<Result<T, tonic::Status>>);
enum Command {
Instrument(Watch<proto::instrument::Update>),
WatchTaskDetail(WatchRequest<proto::tasks::TaskDetails>),
WatchState(Watch<proto::instrument::State>),
Pause,
Resume,
}
Expand Down Expand Up @@ -1190,6 +1191,8 @@ impl proto::instrument::instrument_server::Instrument for Server {
tokio_stream::wrappers::ReceiverStream<Result<proto::instrument::Update, tonic::Status>>;
type WatchTaskDetailsStream =
tokio_stream::wrappers::ReceiverStream<Result<proto::tasks::TaskDetails, tonic::Status>>;
type WatchStateStream =
tokio_stream::wrappers::ReceiverStream<Result<proto::instrument::State, tonic::Status>>;
async fn watch_updates(
&self,
req: tonic::Request<proto::instrument::InstrumentRequest>,
Expand Down Expand Up @@ -1245,6 +1248,18 @@ impl proto::instrument::instrument_server::Instrument for Server {
Ok(tonic::Response::new(stream))
}

async fn watch_state(
&self,
_req: tonic::Request<proto::instrument::StateRequest>,
) -> Result<tonic::Response<Self::WatchStateStream>, tonic::Status> {
let (stream_sender, stream_recv) = mpsc::channel(self.client_buffer);
self.subscribe.send(Command::WatchState(Watch(stream_sender))).await.map_err(|_| {
tonic::Status::internal("cannot get state, aggregation task is not running")
})?;
let stream = tokio_stream::wrappers::ReceiverStream::new(stream_recv);
Ok(tonic::Response::new(stream))
}

async fn pause(
&self,
_req: tonic::Request<proto::instrument::PauseRequest>,
Expand Down
Loading