Skip to content
This repository has been archived by the owner on Jul 11, 2022. It is now read-only.

Make Metrics consistent with Go client. #129

Merged
merged 8 commits into from
Mar 6, 2018
15 changes: 9 additions & 6 deletions jaeger_client/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def _consume_queue(self):
for _ in spans:
self.queue.task_done()
spans = spans[:0]
self.metrics.reporter_queue_length(self.queue.qsize())
self.logger.info('Span publisher exists')

# method for protocol factory
Expand All @@ -198,7 +199,7 @@ def _submit(self, spans):
yield self._send(batch)
self.metrics.reporter_success(len(spans))
except socket.error as e:
self.metrics.reporter_socket(len(spans))
self.metrics.reporter_failure(len(spans))
self.error_reporter.error(
'Failed to submit traces to jaeger-agent socket: %s', e)
except Exception as e:
Expand Down Expand Up @@ -231,15 +232,17 @@ def _flush(self):


class ReporterMetrics(object):
"""Reporter specific metrics."""

def __init__(self, metrics_factory):
self.reporter_success = \
metrics_factory.create_counter(name='jaeger.spans', tags={'reported': 'true'})
metrics_factory.create_counter(name='jaeger:reporter_spans', tags={'result': 'ok'})
self.reporter_failure = \
metrics_factory.create_counter(name='jaeger.spans', tags={'reported': 'false'})
metrics_factory.create_counter(name='jaeger:reporter_spans', tags={'result': 'err'})
self.reporter_dropped = \
metrics_factory.create_counter(name='jaeger.spans', tags={'dropped': 'true'})
self.reporter_socket = \
metrics_factory.create_counter(name='jaeger.spans', tags={'socket_error': 'true'})
metrics_factory.create_counter(name='jaeger:reporter_spans', tags={'result': 'dropped'})
self.reporter_queue_length = \
metrics_factory.create_gauge(name='jaeger:reporter_queue_length')


class CompositeReporter(NullReporter):
Expand Down
26 changes: 21 additions & 5 deletions jaeger_client/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,7 @@ def __init__(self, channel, service_name, **kwargs):
kwargs.get('sampling_refresh_interval', DEFAULT_SAMPLING_INTERVAL)
self.metrics_factory = kwargs.get('metrics_factory', None) \
or LegacyMetricsFactory(kwargs.get('metrics', None) or Metrics())
self.sampler_errors = \
self.metrics_factory.create_counter('jaeger.sampler', {'error': 'true'})
self.metrics = SamplerMetrics(self.metrics_factory)
self.error_reporter = kwargs.get('error_reporter') or \
ErrorReporter(Metrics())
self.max_operations = kwargs.get('max_operations', DEFAULT_MAX_OPERATIONS)
Expand Down Expand Up @@ -408,7 +407,7 @@ def _create_periodic_callback(self):
def _sampling_request_callback(self, future):
exception = future.exception()
if exception:
self.sampler_errors(1)
self.metrics.sampler_query_failure(1)
self.error_reporter.error(
'Fail to get sampling strategy from jaeger-agent: %s',
exception)
Expand All @@ -417,8 +416,9 @@ def _sampling_request_callback(self, future):
response = future.result()
try:
sampling_strategies_response = json.loads(response.body)
self.metrics.sampler_retrieved(1)
except Exception as e:
self.sampler_errors(1)
self.metrics.sampler_query_failure(1)
self.error_reporter.error(
'Fail to parse sampling strategy '
'from jaeger-agent: %s [%s]', e, response.body)
Expand All @@ -435,14 +435,15 @@ def _update_sampler(self, response):
else:
self._update_rate_limiting_or_probabilistic_sampler(response)
except Exception as e:
self.sampler_errors(1)
self.metrics.sampler_update_failure(1)
self.error_reporter.error(
'Fail to update sampler'
'from jaeger-agent: %s [%s]', e, response)

def _update_adaptive_sampler(self, per_operation_strategies):
if isinstance(self.sampler, AdaptiveSampler):
self.sampler.update(per_operation_strategies)
self.metrics.sampler_updated(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should always run, for else: part as well

else:
self.sampler = AdaptiveSampler(per_operation_strategies, self.max_operations)

Expand All @@ -463,6 +464,7 @@ def _update_rate_limiting_or_probabilistic_sampler(self, response):

if self.sampler != new_sampler:
self.sampler = new_sampler
self.metrics.sampler_updated(1)

def _poll_sampling_manager(self):
self.logger.debug('Requesting tracing sampler refresh')
Expand Down Expand Up @@ -493,3 +495,17 @@ def get_rate_limit(strategy=None):
if not rate_limit_strategy:
return DEFAULT_LOWER_BOUND
return rate_limit_strategy.get(MAX_TRACES_PER_SECOND_STR, DEFAULT_LOWER_BOUND)


class SamplerMetrics(object):
"""Sampler specific metrics."""

def __init__(self, metrics_factory):
self.sampler_retrieved = \
metrics_factory.create_counter(name='jaeger:sampler_queries', tags={'result': 'ok'})
self.sampler_query_failure = \
metrics_factory.create_counter(name='jaeger:sampler_queries', tags={'result': 'err'})
self.sampler_updated = \
metrics_factory.create_counter(name='jaeger:sampler_updates', tags={'result': 'ok'})
self.sampler_update_failure = \
metrics_factory.create_counter(name='jaeger:sampler_updates', tags={'result': 'err'})
27 changes: 17 additions & 10 deletions jaeger_client/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ def close(self):

def _emit_span_metrics(self, span, join=False):
if span.is_sampled():
self.metrics.spans_sampled(1)
self.metrics.spans_started_sampled(1)
else:
self.metrics.spans_not_sampled(1)
self.metrics.spans_started_not_sampled(1)
if not span.context.parent_id:
if span.is_sampled():
if join:
Expand All @@ -224,6 +224,7 @@ def _emit_span_metrics(self, span, join=False):

def report_span(self, span):
self.reporter.report_span(span)
self.metrics.spans_finished(1)

def random_id(self):
return self.random.getrandbits(constants.MAX_ID_BITS)
Expand All @@ -234,14 +235,20 @@ class TracerMetrics(object):

def __init__(self, metrics_factory):
self.traces_started_sampled = \
metrics_factory.create_counter(name='jaeger.traces-started', tags={'sampled': 'true'})
metrics_factory.create_counter(name='jaeger:traces',
tags={'state': 'started', 'sampled': 'y'})
self.traces_started_not_sampled = \
metrics_factory.create_counter(name='jaeger.traces-started', tags={'sampled': 'false'})
metrics_factory.create_counter(name='jaeger:traces',
tags={'state': 'started', 'sampled': 'n'})
self.traces_joined_sampled = \
metrics_factory.create_counter(name='jaeger.traces-joined', tags={'sampled': 'true'})
metrics_factory.create_counter(name='jaeger:traces',
tags={'state': 'joined', 'sampled': 'y'})
self.traces_joined_not_sampled = \
metrics_factory.create_counter(name='jaeger.traces-joined', tags={'sampled': 'false'})
self.spans_sampled = \
metrics_factory.create_counter(name='jaeger.spans', tags={'sampled': 'true'})
self.spans_not_sampled = \
metrics_factory.create_counter(name='jaeger.spans', tags={'sampled': 'false'})
metrics_factory.create_counter(name='jaeger:traces',
tags={'state': 'joined', 'sampled': 'n'})
self.spans_started_sampled = \
metrics_factory.create_counter(name='jaeger:started_spans', tags={'sampled': 'y'})
self.spans_started_not_sampled = \
metrics_factory.create_counter(name='jaeger:started_spans', tags={'sampled': 'n'})
self.spans_finished = \
metrics_factory.create_counter(name='jaeger:finished_spans')
6 changes: 3 additions & 3 deletions tests/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def test_submit_batch_size_1(self):
assert 1 == len(sender.futures)

# send after close
span_dropped_key = 'jaeger.spans.dropped_true'
span_dropped_key = 'jaeger:reporter_spans.result_dropped'
assert span_dropped_key not in reporter.metrics_factory.counters
reporter.report_span(self._new_span('1'))
assert 1 == reporter.metrics_factory.counters[span_dropped_key]
Expand All @@ -195,7 +195,7 @@ def test_submit_failure(self):
reporter.error_reporter = ErrorReporter(
metrics=Metrics(), logger=logging.getLogger())

reporter_failure_key = 'jaeger.spans.reported_false'
reporter_failure_key = 'jaeger:reporter_spans.result_err'
assert reporter_failure_key not in reporter.metrics_factory.counters

# simulate exception in send
Expand All @@ -218,7 +218,7 @@ def test_submit_queue_full_batch_size_1(self):
assert 1 == len(sender.futures)
# the consumer is blocked on a future, so won't drain the queue
reporter.report_span(self._new_span('2'))
span_dropped_key = 'jaeger.spans.dropped_true'
span_dropped_key = 'jaeger:reporter_spans.result_dropped'
assert span_dropped_key not in reporter.metrics_factory.counters
reporter.report_span(self._new_span('3'))
yield self._wait_for(
Expand Down