diff --git a/jaeger_client/reporter.py b/jaeger_client/reporter.py index 5b14242c..11c1fb8f 100644 --- a/jaeger_client/reporter.py +++ b/jaeger_client/reporter.py @@ -174,6 +174,7 @@ def _consume_queue(self): for _ in spans: self.queue.task_done() spans = spans[:0] + self.metrics.reporter_queue_length(self.queue.qsize()) self.logger.info('Span publisher exists') # method for protocol factory @@ -198,7 +199,7 @@ def _submit(self, spans): yield self._send(batch) self.metrics.reporter_success(len(spans)) except socket.error as e: - self.metrics.reporter_socket(len(spans)) + self.metrics.reporter_failure(len(spans)) self.error_reporter.error( 'Failed to submit traces to jaeger-agent socket: %s', e) except Exception as e: @@ -231,15 +232,17 @@ def _flush(self): class ReporterMetrics(object): + """Reporter specific metrics.""" + def __init__(self, metrics_factory): self.reporter_success = \ - metrics_factory.create_counter(name='jaeger.spans', tags={'reported': 'true'}) + metrics_factory.create_counter(name='jaeger:reporter_spans', tags={'result': 'ok'}) self.reporter_failure = \ - metrics_factory.create_counter(name='jaeger.spans', tags={'reported': 'false'}) + metrics_factory.create_counter(name='jaeger:reporter_spans', tags={'result': 'err'}) self.reporter_dropped = \ - metrics_factory.create_counter(name='jaeger.spans', tags={'dropped': 'true'}) - self.reporter_socket = \ - metrics_factory.create_counter(name='jaeger.spans', tags={'socket_error': 'true'}) + metrics_factory.create_counter(name='jaeger:reporter_spans', tags={'result': 'dropped'}) + self.reporter_queue_length = \ + metrics_factory.create_gauge(name='jaeger:reporter_queue_length') class CompositeReporter(NullReporter): diff --git a/jaeger_client/sampler.py b/jaeger_client/sampler.py index e77834de..75dd7afa 100644 --- a/jaeger_client/sampler.py +++ b/jaeger_client/sampler.py @@ -343,8 +343,7 @@ def __init__(self, channel, service_name, **kwargs): kwargs.get('sampling_refresh_interval', DEFAULT_SAMPLING_INTERVAL) self.metrics_factory = kwargs.get('metrics_factory', None) \ or LegacyMetricsFactory(kwargs.get('metrics', None) or Metrics()) - self.sampler_errors = \ - self.metrics_factory.create_counter('jaeger.sampler', {'error': 'true'}) + self.metrics = SamplerMetrics(self.metrics_factory) self.error_reporter = kwargs.get('error_reporter') or \ ErrorReporter(Metrics()) self.max_operations = kwargs.get('max_operations', DEFAULT_MAX_OPERATIONS) @@ -408,7 +407,7 @@ def _create_periodic_callback(self): def _sampling_request_callback(self, future): exception = future.exception() if exception: - self.sampler_errors(1) + self.metrics.sampler_query_failure(1) self.error_reporter.error( 'Fail to get sampling strategy from jaeger-agent: %s', exception) @@ -417,8 +416,9 @@ def _sampling_request_callback(self, future): response = future.result() try: sampling_strategies_response = json.loads(response.body) + self.metrics.sampler_retrieved(1) except Exception as e: - self.sampler_errors(1) + self.metrics.sampler_query_failure(1) self.error_reporter.error( 'Fail to parse sampling strategy ' 'from jaeger-agent: %s [%s]', e, response.body) @@ -435,7 +435,7 @@ def _update_sampler(self, response): else: self._update_rate_limiting_or_probabilistic_sampler(response) except Exception as e: - self.sampler_errors(1) + self.metrics.sampler_update_failure(1) self.error_reporter.error( 'Fail to update sampler' 'from jaeger-agent: %s [%s]', e, response) @@ -445,6 +445,7 @@ def _update_adaptive_sampler(self, per_operation_strategies): self.sampler.update(per_operation_strategies) else: self.sampler = AdaptiveSampler(per_operation_strategies, self.max_operations) + self.metrics.sampler_updated(1) def _update_rate_limiting_or_probabilistic_sampler(self, response): s_type = response.get(STRATEGY_TYPE_STR) @@ -463,6 +464,7 @@ def _update_rate_limiting_or_probabilistic_sampler(self, response): if self.sampler != new_sampler: self.sampler = new_sampler + self.metrics.sampler_updated(1) def _poll_sampling_manager(self): self.logger.debug('Requesting tracing sampler refresh') @@ -493,3 +495,17 @@ def get_rate_limit(strategy=None): if not rate_limit_strategy: return DEFAULT_LOWER_BOUND return rate_limit_strategy.get(MAX_TRACES_PER_SECOND_STR, DEFAULT_LOWER_BOUND) + + +class SamplerMetrics(object): + """Sampler specific metrics.""" + + def __init__(self, metrics_factory): + self.sampler_retrieved = \ + metrics_factory.create_counter(name='jaeger:sampler_queries', tags={'result': 'ok'}) + self.sampler_query_failure = \ + metrics_factory.create_counter(name='jaeger:sampler_queries', tags={'result': 'err'}) + self.sampler_updated = \ + metrics_factory.create_counter(name='jaeger:sampler_updates', tags={'result': 'ok'}) + self.sampler_update_failure = \ + metrics_factory.create_counter(name='jaeger:sampler_updates', tags={'result': 'err'}) diff --git a/jaeger_client/tracer.py b/jaeger_client/tracer.py index a40f6ae6..075e98eb 100644 --- a/jaeger_client/tracer.py +++ b/jaeger_client/tracer.py @@ -205,9 +205,9 @@ def close(self): def _emit_span_metrics(self, span, join=False): if span.is_sampled(): - self.metrics.spans_sampled(1) + self.metrics.spans_started_sampled(1) else: - self.metrics.spans_not_sampled(1) + self.metrics.spans_started_not_sampled(1) if not span.context.parent_id: if span.is_sampled(): if join: @@ -223,6 +223,7 @@ def _emit_span_metrics(self, span, join=False): def report_span(self, span): self.reporter.report_span(span) + self.metrics.spans_finished(1) def random_id(self): return self.random.getrandbits(constants.MAX_ID_BITS) @@ -233,14 +234,20 @@ class TracerMetrics(object): def __init__(self, metrics_factory): self.traces_started_sampled = \ - metrics_factory.create_counter(name='jaeger.traces-started', tags={'sampled': 'true'}) + metrics_factory.create_counter(name='jaeger:traces', + tags={'state': 'started', 'sampled': 'y'}) self.traces_started_not_sampled = \ - metrics_factory.create_counter(name='jaeger.traces-started', tags={'sampled': 'false'}) + metrics_factory.create_counter(name='jaeger:traces', + tags={'state': 'started', 'sampled': 'n'}) self.traces_joined_sampled = \ - metrics_factory.create_counter(name='jaeger.traces-joined', tags={'sampled': 'true'}) + metrics_factory.create_counter(name='jaeger:traces', + tags={'state': 'joined', 'sampled': 'y'}) self.traces_joined_not_sampled = \ - metrics_factory.create_counter(name='jaeger.traces-joined', tags={'sampled': 'false'}) - self.spans_sampled = \ - metrics_factory.create_counter(name='jaeger.spans', tags={'sampled': 'true'}) - self.spans_not_sampled = \ - metrics_factory.create_counter(name='jaeger.spans', tags={'sampled': 'false'}) + metrics_factory.create_counter(name='jaeger:traces', + tags={'state': 'joined', 'sampled': 'n'}) + self.spans_started_sampled = \ + metrics_factory.create_counter(name='jaeger:started_spans', tags={'sampled': 'y'}) + self.spans_started_not_sampled = \ + metrics_factory.create_counter(name='jaeger:started_spans', tags={'sampled': 'n'}) + self.spans_finished = \ + metrics_factory.create_counter(name='jaeger:finished_spans') diff --git a/tests/test_reporter.py b/tests/test_reporter.py index 99bb4ada..f6958ec9 100644 --- a/tests/test_reporter.py +++ b/tests/test_reporter.py @@ -185,7 +185,7 @@ def test_submit_batch_size_1(self): assert 1 == len(sender.futures) # send after close - span_dropped_key = 'jaeger.spans.dropped_true' + span_dropped_key = 'jaeger:reporter_spans.result_dropped' assert span_dropped_key not in reporter.metrics_factory.counters reporter.report_span(self._new_span('1')) assert 1 == reporter.metrics_factory.counters[span_dropped_key] @@ -196,7 +196,7 @@ def test_submit_failure(self): reporter.error_reporter = ErrorReporter( metrics=Metrics(), logger=logging.getLogger()) - reporter_failure_key = 'jaeger.spans.reported_false' + reporter_failure_key = 'jaeger:reporter_spans.result_err' assert reporter_failure_key not in reporter.metrics_factory.counters # simulate exception in send @@ -219,7 +219,7 @@ def test_submit_queue_full_batch_size_1(self): assert 1 == len(sender.futures) # the consumer is blocked on a future, so won't drain the queue reporter.report_span(self._new_span('2')) - span_dropped_key = 'jaeger.spans.dropped_true' + span_dropped_key = 'jaeger:reporter_spans.result_dropped' assert span_dropped_key not in reporter.metrics_factory.counters reporter.report_span(self._new_span('3')) yield self._wait_for(