Skip to content

Commit

Permalink
fix(ruby_kafka): rename 'send' operation to 'publish'
Browse files Browse the repository at this point in the history
  • Loading branch information
michal-kazmierczak committed Sep 5, 2023
1 parent c3e6893 commit 54e73b8
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def deliver_message(value, topic:, key: nil, headers: {}, partition: nil, partit

attributes['messaging.kafka.partition'] = partition if partition

tracer.in_span("#{topic} send", attributes: attributes, kind: :producer) do
tracer.in_span("#{topic} publish", attributes: attributes, kind: :producer) do
OpenTelemetry.propagation.inject(headers)
super
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def produce(value, topic:, key: nil, headers: {}, partition: nil, partition_key:
# Thread's context, so the next two lines preserve the correct Thread-local context.
ctx = OpenTelemetry.propagation.extract(headers)
OpenTelemetry::Context.with_current(ctx) do
tracer.in_span("#{topic} send", attributes: attributes, kind: :producer) do
tracer.in_span("#{topic} publish", attributes: attributes, kind: :producer) do
OpenTelemetry.propagation.inject(headers)
super
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
kafka.each_message(topic: topic) { |_msg| break } # rubocop:disable Lint/UnreachableLoop

_(spans.size).must_equal(2)
_(spans[0].name).must_equal("#{topic} send")
_(spans[0].name).must_equal("#{topic} publish")
_(spans[0].kind).must_equal(:producer)

_(spans[1].name).must_equal("#{topic} process")
Expand All @@ -57,9 +57,9 @@
break if counter >= 2
end

send_spans = spans.select { |s| s.name == "#{topic} send" }
_(send_spans[0].attributes).wont_include('messaging.kafka.message_key')
_(send_spans[1].attributes['messaging.kafka.message_key']).must_equal('foobarbaz')
publish_spans = spans.select { |s| s.name == "#{topic} publish" }
_(publish_spans[0].attributes).wont_include('messaging.kafka.message_key')
_(publish_spans[1].attributes['messaging.kafka.message_key']).must_equal('foobarbaz')

process_spans = spans.select { |s| s.name == "#{topic} process" }
_(process_spans[0].attributes).wont_include('messaging.kafka.message_key')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

process_spans = spans.select { |s| s.name == "#{topic} process" }

# First pair for send and process spans
# First pair for publish and process spans
first_process_span = process_spans[0]
_(first_process_span.name).must_equal("#{topic} process")
_(first_process_span.kind).must_equal(:consumer)
Expand All @@ -68,23 +68,23 @@
first_process_span_link = first_process_span.links[0]
linked_span_context = first_process_span_link.span_context

linked_send_span = spans.find { |s| s.span_id == linked_span_context.span_id }
_(linked_send_span.name).must_equal("#{topic} send")
_(linked_send_span.trace_id).must_equal(first_process_span.trace_id)
_(linked_send_span.trace_id).must_equal(linked_span_context.trace_id)
linked_publish_span = spans.find { |s| s.span_id == linked_span_context.span_id }
_(linked_publish_span.name).must_equal("#{topic} publish")
_(linked_publish_span.trace_id).must_equal(first_process_span.trace_id)
_(linked_publish_span.trace_id).must_equal(linked_span_context.trace_id)

# Second pair of send and process spans
# Second pair of publish and process spans
second_process_span = process_spans[1]
_(second_process_span.name).must_equal("#{topic} process")
_(second_process_span.kind).must_equal(:consumer)

second_process_span_link = second_process_span.links[0]
linked_span_context = second_process_span_link.span_context

linked_send_span = spans.find { |s| s.span_id == linked_span_context.span_id }
_(linked_send_span.name).must_equal("#{topic} send")
_(linked_send_span.trace_id).must_equal(second_process_span.trace_id)
_(linked_send_span.trace_id).must_equal(linked_span_context.trace_id)
linked_publish_span = spans.find { |s| s.span_id == linked_span_context.span_id }
_(linked_publish_span.name).must_equal("#{topic} publish")
_(linked_publish_span.trace_id).must_equal(second_process_span.trace_id)
_(linked_publish_span.trace_id).must_equal(linked_span_context.trace_id)

event = second_process_span.events.first
_(event.name).must_equal('exception')
Expand All @@ -106,7 +106,7 @@

process_spans = spans.select { |s| s.name == "#{topic} process" }

# First pair for send and process spans
# First pair for publish and process spans
first_process_span = process_spans[0]
_(first_process_span.attributes).wont_include('messaging.kafka.message_key')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
let(:producer) { kafka.producer }
let(:consumer) { kafka.consumer(group_id: SecureRandom.uuid, fetcher_max_queue_size: 1) }
let(:async_producer) { kafka.async_producer(delivery_threshold: 1000) }
let(:send_span) { EXPORTER.finished_spans.find { |sp| sp.name == "#{topic} send" } }
let(:async_send_span) { EXPORTER.finished_spans.find { |sp| sp.name == "#{async_topic} send" } }
let(:publish_span) { EXPORTER.finished_spans.find { |sp| sp.name == "#{topic} publish" } }
let(:async_publish_span) { EXPORTER.finished_spans.find { |sp| sp.name == "#{async_topic} publish" } }

before do
kafka.create_topic(topic)
Expand Down Expand Up @@ -54,7 +54,7 @@
producer.produce('hello', topic: topic)
producer.deliver_messages

_(spans.first.name).must_equal("#{topic} send")
_(spans.first.name).must_equal("#{topic} publish")
_(spans.first.kind).must_equal(:producer)

_(spans.first.attributes['messaging.system']).must_equal('kafka')
Expand All @@ -68,7 +68,7 @@
# Wait for the async calls to produce spans
wait_for(error_message: 'Max wait time exceeded for async producer') { EXPORTER.finished_spans.size.positive? }

_(spans.first.name).must_equal("#{async_topic} send")
_(spans.first.name).must_equal("#{async_topic} publish")
_(spans.first.kind).must_equal(:producer)

_(spans.first.attributes['messaging.system']).must_equal('kafka')
Expand All @@ -83,8 +83,8 @@
producer.deliver_messages
end

_(send_span.hex_parent_span_id).must_equal(span_id)
_(send_span.hex_trace_id).must_equal(trace_id)
_(publish_span.hex_parent_span_id).must_equal(span_id)
_(publish_span.hex_trace_id).must_equal(trace_id)
end

it 'propagates context when tracing async produce calls' do
Expand All @@ -98,8 +98,8 @@
# Wait for the async calls to produce spans
wait_for(error_message: 'Max wait time exceeded for async producer') { EXPORTER.finished_spans.size == 2 }

_(async_send_span.trace_id).must_equal(sp.context.trace_id)
_(async_send_span.parent_span_id).must_equal(sp.context.span_id)
_(async_publish_span.trace_id).must_equal(sp.context.trace_id)
_(async_publish_span.parent_span_id).must_equal(sp.context.span_id)
end

it 'propagates context for nonrecording spans' do
Expand All @@ -122,7 +122,7 @@
end
sp.finish
_(EXPORTER.finished_spans.size).must_equal(2)
_(send_span.hex_parent_span_id).must_equal(sp.context.hex_span_id)
_(publish_span.hex_parent_span_id).must_equal(sp.context.hex_span_id)
end
end
end unless ENV['OMIT_SERVICES']

0 comments on commit 54e73b8

Please sign in to comment.