Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Sep 20, 2024
1 parent e9ca7ad commit 032c181
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 9 deletions.
26 changes: 23 additions & 3 deletions src/frontend/src/monitor/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,21 @@ impl FrontendMetrics {
}
}

pub static GLOBAL_CURSOR_METRICS: LazyLock<CursorMetrics> =
LazyLock::new(|| CursorMetrics::new(&GLOBAL_METRICS_REGISTRY));

#[derive(Clone)]
pub struct CursorMetrics {
pub subscription_cursor_error_count: GenericCounter<AtomicU64>,
pub subscription_cursor_query_duration: HistogramVec,
pub subscription_cursor_declare_duration: HistogramVec,
pub subscription_cursor_fetch_duration: HistogramVec,
_cursor_metrics_collector: Arc<CursorMetricsCollector>,
registry: Registry,
_cursor_metrics_collector: Option<Arc<CursorMetricsCollector>>,
}

impl CursorMetrics {
pub fn new(registry: &Registry, session_map: SessionMapRef) -> Self {
pub fn new(registry: &Registry) -> Self {
let subscription_cursor_error_count = register_int_counter_with_registry!(
"subscription_cursor_error_count",
"The subscription error num of cursor",
Expand Down Expand Up @@ -128,13 +132,29 @@ impl CursorMetrics {
let subscription_cursor_fetch_duration =
register_histogram_vec_with_registry!(opts, &["subscription_name"], registry).unwrap();
Self {
_cursor_metrics_collector: Arc::new(CursorMetricsCollector::new(session_map, registry)),
// _cursor_metrics_collector: Arc::new(CursorMetricsCollector::new(session_map, registry)),
_cursor_metrics_collector: None,
subscription_cursor_error_count,
subscription_cursor_query_duration,
subscription_cursor_declare_duration,
subscription_cursor_fetch_duration,
registry: registry.clone(),
}
}

pub fn for_test() -> Self {
GLOBAL_CURSOR_METRICS.clone()
}

pub fn start_with_session_map(&mut self, session_map: SessionMapRef) {
self._cursor_metrics_collector = Some(Arc::new(CursorMetricsCollector::new(session_map, &self.registry)));
}

pub fn init(session_map: SessionMapRef) -> Self {
let mut cursor_metrics = GLOBAL_CURSOR_METRICS.clone();
cursor_metrics.start_with_session_map(session_map);
cursor_metrics
}
}

pub struct PeriodicCursorMetrics {
Expand Down
8 changes: 2 additions & 6 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ use risingwave_common::config::{
load_config, BatchConfig, MetaConfig, MetricLevel, StreamingConfig,
};
use risingwave_common::memory::MemoryContext;
use risingwave_common::monitor::GLOBAL_METRICS_REGISTRY;
use risingwave_common::secret::LocalSecretManager;
use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
use risingwave_common::system_param::local_manager::{
Expand Down Expand Up @@ -236,7 +235,7 @@ impl FrontendEnv {
client_pool,
sessions_map: sessions_map.clone(),
frontend_metrics: Arc::new(FrontendMetrics::for_test()),
cursor_metrics: Arc::new(CursorMetrics::new(&GLOBAL_METRICS_REGISTRY, sessions_map)),
cursor_metrics: Arc::new(CursorMetrics::for_test()),
batch_config: BatchConfig::default(),
meta_config: MetaConfig::default(),
streaming_config: StreamingConfig::default(),
Expand Down Expand Up @@ -415,10 +414,7 @@ impl FrontendEnv {
));

let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
let cursor_metrics = Arc::new(CursorMetrics::new(
&GLOBAL_METRICS_REGISTRY,
sessions_map.clone(),
));
let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone()));
let sessions = sessions_map.clone();

// Idle transaction background monitor
Expand Down

0 comments on commit 032c181

Please sign in to comment.