Skip to content

Commit

Permalink
fix(queue): rename 'send' operation to 'publish'
Browse files Browse the repository at this point in the history
  • Loading branch information
michal-kazmierczak committed Sep 5, 2023
1 parent bd32c18 commit 61f23aa
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def enqueue(*args, job_options: {}, **arg_opts)
tracer = Que::Instrumentation.instance.tracer
otel_config = Que::Instrumentation.instance.config

tracer.in_span('send', kind: :producer) do |span|
tracer.in_span('publish', kind: :producer) do |span|
# Que doesn't have a good place to store metadata. There are
# basically two options: the job payload and the job tags.
#
Expand Down Expand Up @@ -57,7 +57,7 @@ def enqueue(*args, job_options: {}, **arg_opts)
job_attrs = job.que_attrs
end

span.name = "#{job_attrs[:job_class]} send"
span.name = "#{job_attrs[:job_class]} publish"
span.add_attributes(QueJob.job_attributes(job_attrs))

job
Expand All @@ -73,7 +73,7 @@ def self.job_attributes(job_attrs)
attributes = {
'messaging.system' => 'que',
'messaging.destination_kind' => 'queue',
'messaging.operation' => 'send',
'messaging.operation' => 'publish',
'messaging.destination' => job_attrs[:queue] || 'default',
'messaging.que.job_class' => job_attrs[:job_class],
'messaging.que.priority' => job_attrs[:priority] || 100
Expand Down
58 changes: 29 additions & 29 deletions instrumentation/que/test/opentelemetry/instrumentation/que_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
TestJobAsync.enqueue

span = finished_spans.last
_(span.name).must_equal('TestJobAsync send')
_(span.name).must_equal('TestJobAsync publish')
end

it 'records attributes' do
Expand All @@ -45,7 +45,7 @@
_(attributes['messaging.system']).must_equal('que')
_(attributes['messaging.destination']).must_equal('default')
_(attributes['messaging.destination_kind']).must_equal('queue')
_(attributes['messaging.operation']).must_equal('send')
_(attributes['messaging.operation']).must_equal('publish')
_(attributes['messaging.message_id']).must_be_instance_of(Integer)
_(attributes['messaging.que.job_class']).must_equal('TestJobAsync')
_(attributes['messaging.que.priority']).must_equal(100)
Expand Down Expand Up @@ -117,7 +117,7 @@
TestJobSync.enqueue

span1 = finished_spans.last
_(span1.name).must_equal('TestJobSync send')
_(span1.name).must_equal('TestJobSync publish')

span2 = finished_spans.first
_(span2.name).must_equal('TestJobSync process')
Expand Down Expand Up @@ -171,14 +171,14 @@ def self.run(first, second); end

_(finished_spans.size).must_equal(2)

send_span = finished_spans.first
publish_span = finished_spans.first
process_span = finished_spans.last

_(send_span.trace_id).wont_equal(process_span.trace_id)
_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(send_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(send_span.span_id)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end
end

Expand All @@ -191,11 +191,11 @@ def self.run(first, second); end

_(finished_spans.size).must_equal(2)

send_span = finished_spans.first
publish_span = finished_spans.first
process_span = finished_spans.last

_(send_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(send_span.span_id)
_(publish_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(publish_span.span_id)
_(process_span.total_recorded_links).must_equal(0)
end
end
Expand All @@ -221,11 +221,11 @@ def self.run(first, second); end

_(finished_spans.size).must_equal(2)

send_span = finished_spans.first
publish_span = finished_spans.first
process_span = finished_spans.last

_(send_span.trace_id).wont_equal(process_span.trace_id)
_(send_span.total_recorded_links).must_equal(0)
_(publish_span.trace_id).wont_equal(process_span.trace_id)
_(publish_span.total_recorded_links).must_equal(0)
_(process_span.total_recorded_links).must_equal(0)
end
end
Expand Down Expand Up @@ -281,7 +281,7 @@ def self.run(first, second); end
end

span = finished_spans.last
_(span.name).must_equal('TestJobAsync send')
_(span.name).must_equal('TestJobAsync publish')
end

it 'links spans together' do
Expand All @@ -293,14 +293,14 @@ def self.run(first, second); end

_(finished_spans.size).must_equal(2)

send_span = finished_spans.first
publish_span = finished_spans.first
process_span = finished_spans.last

_(send_span.trace_id).wont_equal(process_span.trace_id)
_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(send_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(send_span.span_id)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end
end

Expand Down Expand Up @@ -376,7 +376,7 @@ def self.run(first, second); end
end

span1 = finished_spans.first
_(span1.name).must_equal('TestJobSync send')
_(span1.name).must_equal('TestJobSync publish')

span2 = finished_spans.last
_(span2.name).must_equal('TestJobSync process')
Expand Down Expand Up @@ -429,14 +429,14 @@ def self.run(first, second); end

_(finished_spans.size).must_equal(2)

send_span = finished_spans.first
publish_span = finished_spans.first
process_span = finished_spans.last

_(send_span.trace_id).wont_equal(process_span.trace_id)
_(publish_span.trace_id).wont_equal(process_span.trace_id)

_(process_span.total_recorded_links).must_equal(1)
_(process_span.links[0].span_context.trace_id).must_equal(send_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(send_span.span_id)
_(process_span.links[0].span_context.trace_id).must_equal(publish_span.trace_id)
_(process_span.links[0].span_context.span_id).must_equal(publish_span.span_id)
end
end

Expand All @@ -452,11 +452,11 @@ def self.run(first, second); end

_(finished_spans.size).must_equal(2)

send_span = finished_spans.first
publish_span = finished_spans.first
process_span = finished_spans.last

_(send_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(send_span.span_id)
_(publish_span.trace_id).must_equal(process_span.trace_id)
_(process_span.parent_span_id).must_equal(publish_span.span_id)
_(process_span.total_recorded_links).must_equal(0)
end
end
Expand All @@ -482,11 +482,11 @@ def self.run(first, second); end

_(finished_spans.size).must_equal(2)

send_span = finished_spans.first
publish_span = finished_spans.first
process_span = finished_spans.last

_(send_span.trace_id).wont_equal(process_span.trace_id)
_(send_span.total_recorded_links).must_equal(0)
_(publish_span.trace_id).wont_equal(process_span.trace_id)
_(publish_span.total_recorded_links).must_equal(0)
_(process_span.total_recorded_links).must_equal(0)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def push(queue, item)
}

span_name = case config[:span_naming]
when :job_class then "#{job_class} send"
else "#{queue} send"
when :job_class then "#{job_class} publish"
else "#{queue} publish"
end

tracer.in_span(span_name, attributes: attributes, kind: :producer) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
it 'traces' do
Resque.enqueue(DummyJob)

_(enqueue_span.name).must_equal('super_urgent send')
_(enqueue_span.name).must_equal('super_urgent publish')
_(enqueue_span.attributes['messaging.system']).must_equal('resque')
_(enqueue_span.attributes['messaging.resque.job_class']).must_equal('DummyJob')
_(enqueue_span.attributes['messaging.destination']).must_equal('super_urgent')
Expand All @@ -35,7 +35,7 @@

it 'traces when enqueued with Active Job' do
DummyJobWithActiveJob.perform_later(1, 2)
_(enqueue_span.name).must_equal('super_urgent send')
_(enqueue_span.name).must_equal('super_urgent publish')
_(enqueue_span.attributes['messaging.system']).must_equal('resque')
_(enqueue_span.attributes['messaging.resque.job_class']).must_equal('DummyJobWithActiveJob')
_(enqueue_span.attributes['messaging.destination']).must_equal('super_urgent')
Expand All @@ -48,12 +48,12 @@
it 'uses the job class name for the span name' do
Resque.enqueue(DummyJob)

_(enqueue_span.name).must_equal('DummyJob send')
_(enqueue_span.name).must_equal('DummyJob publish')
end

it 'uses the job class name when enqueued with Active Job' do
DummyJobWithActiveJob.perform_later(1, 2)
_(enqueue_span.name).must_equal('DummyJobWithActiveJob send')
_(enqueue_span.name).must_equal('DummyJobWithActiveJob publish')
end
end
end
Expand Down

0 comments on commit 61f23aa

Please sign in to comment.