Skip to content

Commit

Permalink
feat(pubsub): Add opentelemetry trace support
Browse files Browse the repository at this point in the history
  • Loading branch information
quartzmo committed Dec 23, 2021
1 parent 711bb9b commit c6c31a5
Show file tree
Hide file tree
Showing 7 changed files with 331 additions and 198 deletions.
1 change: 1 addition & 0 deletions google-cloud-pubsub/google-cloud-pubsub.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Gem::Specification.new do |gem|
gem.add_dependency "concurrent-ruby", "~> 1.1"
gem.add_dependency "google-cloud-core", "~> 1.5"
gem.add_dependency "google-cloud-pubsub-v1", "~> 0.0"
gem.add_dependency "opentelemetry-sdk", "~> 1.0"

gem.add_development_dependency "autotest-suffix", "~> 1.1"
gem.add_development_dependency "avro", "~> 1.10"
Expand Down
92 changes: 63 additions & 29 deletions google-cloud-pubsub/lib/google/cloud/pubsub/async_publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
require "google/cloud/pubsub/publish_result"
require "google/cloud/pubsub/service"
require "google/cloud/pubsub/convert"
require "opentelemetry"

module Google
module Cloud
Expand Down Expand Up @@ -105,6 +106,7 @@ def initialize topic_name,
@cond = new_cond
@flow_controller = FlowController.new(**@flow_control)
@thread = Thread.new { run_background }
@tracer = OpenTelemetry.tracer_provider.tracer "Google::Cloud::PubSub", Google::Cloud::PubSub::VERSION
end

##
Expand Down Expand Up @@ -133,35 +135,57 @@ def initialize topic_name,
#
def publish data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback
msg = Convert.pubsub_message data, attributes, ordering_key, extra_attrs
begin
@flow_controller.acquire msg.to_proto.bytesize
rescue FlowControlLimitError => e
stop_publish ordering_key, e if ordering_key
raise
end
span_attrs = Convert.span_attributes topic_name, msg
span = @tracer.start_span "#{topic_name} send",
attributes: span_attrs,
kind: OpenTelemetry::Trace::SpanKind::PRODUCER

# TODO: message size in this span will be incorrect after propagation, below.
@tracer.in_span "#{topic_name} add to batch",
attributes: span_attrs,
kind: OpenTelemetry::Trace::SpanKind::PRODUCER do
propagate_span_in_message span, msg
begin
@flow_controller.acquire msg.to_proto.bytesize
rescue FlowControlLimitError => e
stop_publish ordering_key, e if ordering_key
raise
end

synchronize do
raise AsyncPublisherStopped if @stopped
raise OrderedMessagesDisabled if !@ordered && !msg.ordering_key.empty? # default is empty string
synchronize do
raise AsyncPublisherStopped if @stopped
raise OrderedMessagesDisabled if !@ordered && !msg.ordering_key.empty? # default is empty string

batch = resolve_batch_for_message msg
if batch.canceled?
@flow_controller.release msg.to_proto.bytesize
raise OrderingKeyError, batch.ordering_key
end
batch_action = batch.add msg, callback
if batch_action == :full
publish_batches!
elsif @published_at.nil?
# Set initial time to now to start the background counter
@published_at = Time.now
batch = resolve_batch_for_message msg
if batch.canceled?
@flow_controller.release msg.to_proto.bytesize
raise OrderingKeyError, batch.ordering_key
end
batch_action = batch.add msg, callback, span
if batch_action == :full
publish_batches!
elsif @published_at.nil?
# Set initial time to now to start the background counter
@published_at = Time.now
end
@cond.signal
end
@cond.signal
end

nil
end

def propagate_span_in_message span, msg
# TODO: How do we keep traceparent out of the pubsub message unless OTEL is being used?
# Ensure body executes only conditional on active tracing.
return unless span.context.valid?
# Add span context to pubsub message attributes.
propagator = OpenTelemetry::Trace::Propagation::TraceContext.text_map_propagator
propagator.inject msg, setter: TextMapSetter.new
# Update the message size in the span attributes after adding span context to message attributes.
span.set_attribute "messaging.message_payload_size_bytes", msg.to_proto.bytesize
end

##
# Begins the process of stopping the publisher. Messages already in
# the queue will be published, but no new messages can be added. Use
Expand Down Expand Up @@ -296,6 +320,14 @@ def resume_publish ordering_key

protected

# Open Telemetry type used by TraceContext#inject to write "traceparent" context into the message attributes.
class TextMapSetter
# Writes key into a message protobuf.
def set msg, key, value
msg.attributes[key] = value
end
end

def run_background
synchronize do
until @stopped
Expand Down Expand Up @@ -342,6 +374,7 @@ def stop_publish ordering_key, err
end

def publish_batches! stop: nil
# @tracer.in_span "#{name} add to batch" do
@batches.reject! { |_ordering_key, batch| batch.empty? }
@batches.each_value do |batch|
ready = batch.publish! stop: stop
Expand All @@ -355,9 +388,9 @@ def publish_batch_async topic_name, batch
# TODO: raise unless @publish_thread_pool.running?
return unless @publish_thread_pool.running?

Concurrent::Promises.future_on(
@publish_thread_pool, topic_name, batch
) { |t, b| publish_batch_sync t, b }
Concurrent::Promises.future_on @publish_thread_pool, topic_name, batch do |t, b|
publish_batch_sync t, b
end
end

# rubocop:disable Metrics/AbcSize
Expand All @@ -373,11 +406,12 @@ def publish_batch_sync topic_name, batch
grpc = @service.publish topic_name, items.map(&:msg)
items.zip Array(grpc.message_ids) do |item, id|
@flow_controller.release item.bytesize
next unless item.callback

item.msg.message_id = id
publish_result = PublishResult.from_grpc item.msg
execute_callback_async item.callback, publish_result
if item.callback
item.msg.message_id = id
publish_result = PublishResult.from_grpc item.msg
execute_callback_async item.callback, publish_result
end
item.span.finish
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,18 @@ def initialize publisher, ordering_key
# * `:full` - Batch is full and ready to be published, and the
# message is queued.
#
def add msg, callback
def add msg, callback, span
synchronize do
raise AsyncPublisherStopped if @stopping
raise OrderingKeyError, @ordering_key if @canceled

if @publishing
queue_add msg, callback
queue_add msg, callback, span
:queued
elsif try_add msg, callback
elsif try_add msg, callback, span
:added
else
queue_add msg, callback
queue_add msg, callback, span
:full
end
end
Expand Down Expand Up @@ -141,7 +141,7 @@ def rebalance!

until @queue.empty?
item = @queue.first
if try_add item.msg, item.callback
if try_add item.msg, item.callback, item.span
@queue.shift
next
end
Expand Down Expand Up @@ -180,7 +180,7 @@ def reset!

until @queue.empty?
item = @queue.first
added = try_add item.msg, item.callback
added = try_add item.msg, item.callback, item.span
break unless added
@queue.shift
end
Expand Down Expand Up @@ -257,16 +257,16 @@ def empty?

protected

def items_add msg, callback
item = Item.new msg, callback
def items_add msg, callback, span
item = Item.new msg, callback, span
@items << item
@total_message_bytes += item.bytesize + 2
end

def try_add msg, callback
def try_add msg, callback, span
if @items.empty?
# Always add when empty, even if bytesize is bigger than total
items_add msg, callback
items_add msg, callback, span
return true
end
new_message_count = total_message_count + 1
Expand All @@ -275,12 +275,12 @@ def try_add msg, callback
new_message_bytes >= @publisher.max_bytes
return false
end
items_add msg, callback
items_add msg, callback, span
true
end

def queue_add msg, callback
item = Item.new msg, callback
def queue_add msg, callback, span
item = Item.new msg, callback, span
@queue << item
end

Expand All @@ -292,7 +292,7 @@ def total_message_bytes
@total_message_bytes
end

Item = Struct.new :msg, :callback do
Item = Struct.new :msg, :callback, :span do
def bytesize
msg.to_proto.bytesize
end
Expand Down
11 changes: 11 additions & 0 deletions google-cloud-pubsub/lib/google/cloud/pubsub/convert.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,17 @@ def pubsub_message data, attributes, ordering_key, extra_attrs
ordering_key: ordering_key
)
end

def span_attributes topic_name, msg
{
"messaging.system" => "pubsub",
"messaging.destination" => topic_name,
"messaging.destination_kind" => "topic",
"messaging.message_id" => msg.message_id,
"messaging.message_payload_size_bytes" => msg.to_proto.bytesize,
"pubsub.ordering_key" => msg.ordering_key
}
end
end

extend ClassMethods
Expand Down
Loading

0 comments on commit c6c31a5

Please sign in to comment.