Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add temporality state to update message #567

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions console-api/proto/instrument.proto
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,17 @@ message Update {

// Any new span metadata that was registered since the last update.
common.RegisterMetadata new_metadata = 5;

// The time "state" of the aggregator, such as paused or live.
Temporality temporality = 6;
}

// The time "state" of the aggregator.
enum Temporality {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make sense to add another rpc for console subscriber state updates. Right now temporality would be the only thing in there, but this would allow us to do "out of band" state updates such as this case, where no data is being sent.

This would also allow us to solve the issue with already connected clients, as we could send this new server state update without needing to send a real update, so all connected tokio console instances would get the pause update.

What do you think?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! It would be a good solution.

// The aggregator is currently live.
LIVE = 0;
// The aggregator is currently paused.
PAUSED = 1;
}

// `PauseResponse` is the value returned after a pause request.
Expand Down
32 changes: 32 additions & 0 deletions console-api/src/generated/rs.tokio.console.instrument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ pub struct Update {
/// Any new span metadata that was registered since the last update.
#[prost(message, optional, tag = "5")]
pub new_metadata: ::core::option::Option<super::common::RegisterMetadata>,
/// The time "state" of the aggregator, such as paused or live.
#[prost(enumeration = "Temporality", tag = "6")]
pub temporality: i32,
}
/// `PauseResponse` is the value returned after a pause request.
#[allow(clippy::derive_partial_eq_without_eq)]
Expand All @@ -63,6 +66,35 @@ pub struct PauseResponse {}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct ResumeResponse {}
/// The time "state" of the aggregator.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum Temporality {
/// The aggregator is currently live.
Live = 0,
/// The aggregator is currently paused.
Paused = 1,
}
impl Temporality {
/// String value of the enum field names used in the ProtoBuf definition.
///
/// The values are not transformed in any way and thus are considered stable
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
pub fn as_str_name(&self) -> &'static str {
match self {
Temporality::Live => "LIVE",
Temporality::Paused => "PAUSED",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
match value {
"LIVE" => Some(Self::Live),
"PAUSED" => Some(Self::Paused),
_ => None,
}
}
}
/// Generated client implementations.
pub mod instrument_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
Expand Down
34 changes: 34 additions & 0 deletions console-subscriber/examples/grpc_web/app/src/gen/instrument_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,32 @@ import { TaskUpdate } from "./tasks_pb.js";
import { ResourceUpdate } from "./resources_pb.js";
import { AsyncOpUpdate } from "./async_ops_pb.js";

/**
* The time "state" of the aggregator.
*
* @generated from enum rs.tokio.console.instrument.Temporality
*/
export enum Temporality {
/**
* The aggregator is currently live.
*
* @generated from enum value: LIVE = 0;
*/
LIVE = 0,

/**
* The aggregator is currently paused.
*
* @generated from enum value: PAUSED = 1;
*/
PAUSED = 1,
}
// Retrieve enum metadata with: proto3.getEnumType(Temporality)
proto3.util.setEnumType(Temporality, "rs.tokio.console.instrument.Temporality", [
{ no: 0, name: "LIVE" },
{ no: 1, name: "PAUSED" },
]);

/**
* InstrumentRequest requests the stream of updates
* to observe the async runtime state over time.
Expand Down Expand Up @@ -207,6 +233,13 @@ export class Update extends Message<Update> {
*/
newMetadata?: RegisterMetadata;

/**
* The time "state" of the aggregator, such as paused or live.
*
* @generated from field: rs.tokio.console.instrument.Temporality temporality = 6;
*/
temporality = Temporality.LIVE;

constructor(data?: PartialMessage<Update>) {
super();
proto3.util.initPartial(data, this);
Expand All @@ -220,6 +253,7 @@ export class Update extends Message<Update> {
{ no: 3, name: "resource_update", kind: "message", T: ResourceUpdate },
{ no: 4, name: "async_op_update", kind: "message", T: AsyncOpUpdate },
{ no: 5, name: "new_metadata", kind: "message", T: RegisterMetadata },
{ no: 6, name: "temporality", kind: "enum", T: proto3.getEnumType(Temporality) },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): Update {
Expand Down
8 changes: 3 additions & 5 deletions console-subscriber/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use console_api as proto;
use console_api::instrument::Temporality;
use prost::Message;
use proto::resources::resource;
use tokio::sync::{mpsc, Notify};
Expand Down Expand Up @@ -102,11 +103,6 @@ pub(crate) struct Flush {
triggered: AtomicBool,
}

#[derive(Debug)]
enum Temporality {
Live,
Paused,
}
// Represent static data for resources
struct Resource {
id: Id,
Expand Down Expand Up @@ -293,6 +289,7 @@ impl Aggregator {
new_metadata: Some(proto::RegisterMetadata {
metadata: (*self.all_metadata).clone(),
}),
temporality: self.temporality.into(),
};
let message_size = update.encoded_len();
if message_size < MAX_MESSAGE_SIZE {
Expand Down Expand Up @@ -417,6 +414,7 @@ impl Aggregator {
task_update,
resource_update,
async_op_update,
temporality: self.temporality.into(),
};

self.watchers
Expand Down
21 changes: 9 additions & 12 deletions tokio-console/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
warnings::Linter,
};
use console_api as proto;
use console_api::instrument::Temporality;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all other items used from the console_api crate, we reference by the "full" path (but starting with proto), I think we should do the same here to not make Temporality different.

Suggested change
use console_api::instrument::Temporality;

use ratatui::{
style::{Color, Modifier},
text::Span,
Expand Down Expand Up @@ -71,12 +72,6 @@ pub(crate) enum FieldValue {
Debug(String),
}

#[derive(Debug)]
enum Temporality {
Live,
Paused,
}

#[derive(Debug, Eq, PartialEq)]
pub(crate) struct Attribute {
field: Field,
Expand Down Expand Up @@ -107,6 +102,14 @@ impl State {
current_view: &view::ViewState,
update: proto::instrument::Update,
) {
match Temporality::try_from(update.temporality) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps best to stick to the way that we reference all other types from the console_api crate.

Suggested change
match Temporality::try_from(update.temporality) {
match proto::instrument::Temporality::try_from(update.temporality) {

Ok(temporality) => {
self.temporality = temporality;
}
Err(..) => {
tracing::warn!(?update.temporality, "invalid temporality");
}
}
if let Some(now) = update.now.map(|v| v.try_into().unwrap()) {
self.last_updated_at = Some(now);
}
Expand Down Expand Up @@ -250,12 +253,6 @@ impl State {
}
}

impl Default for Temporality {
fn default() -> Self {
Self::Live
}
}

impl Metadata {
fn from_proto(pb: proto::Metadata, id: u64, strings: &mut intern::Strings) -> Self {
Self {
Expand Down
Loading