Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attach topic and custom labels to Prometheus metrics #410

Merged
merged 5 commits into from
Dec 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion oauth2/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"github.com/apache/pulsar-client-go/oauth2"
"github.com/apache/pulsar-client-go/oauth2/store"

xoauth2 "golang.org/x/oauth2"
"github.com/apache/pulsar-client-go/oauth2/clock"
xoauth2 "golang.org/x/oauth2"
)

// A CachingTokenSource is anything that can return a token, and is backed by a cache.
Expand Down
3 changes: 3 additions & 0 deletions pulsar/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ type ClientOptions struct {
// log.NewLoggerWithLogrus(logrus.StandardLogger())
// FIXME: use `logger` as internal field name instead of `log` as it's more idiomatic
Logger log.Logger

// Add custom labels to all the metrics reported by this client instance
CustomMetricsLabels map[string]string
}

type Client interface {
Expand Down
17 changes: 14 additions & 3 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type client struct {
rpcClient internal.RPCClient
handlers internal.ClientHandlers
lookupService internal.LookupService
metrics *internal.Metrics

log log.Logger
}
Expand Down Expand Up @@ -110,13 +111,23 @@ func newClient(options ClientOptions) (Client, error) {
maxConnectionsPerHost = 1
}

var metrics *internal.Metrics
if options.CustomMetricsLabels != nil {
metrics = internal.NewMetricsProvider(options.CustomMetricsLabels)
} else {
metrics = internal.NewMetricsProvider(map[string]string{})
}

c := &client{
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, maxConnectionsPerHost, logger),
cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, connectionTimeout, maxConnectionsPerHost, logger,
metrics),
log: logger,
metrics: metrics,
}
c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger)
c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig != nil, logger)
c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger, metrics)
c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig != nil, logger, metrics)
c.handlers = internal.NewClientHandlers()

return c, nil
}

Expand Down
37 changes: 8 additions & 29 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,11 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
)

var (
consumersOpened = promauto.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_consumers_opened",
Help: "Counter of consumers created by the client",
ConstLabels: constLabels(),
})

consumersClosed = promauto.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_consumers_closed",
Help: "Counter of consumers closed by the client",
ConstLabels: constLabels(),
})

consumersPartitions = promauto.NewGauge(prometheus.GaugeOpts{
Name: "pulsar_client_consumers_partitions_active",
Help: "Counter of individual partitions the consumers are currently active",
ConstLabels: constLabels(),
})
)

var ErrConsumerClosed = errors.New("consumer closed")

const defaultNackRedeliveryDelay = 1 * time.Minute
Expand Down Expand Up @@ -82,7 +59,8 @@ type consumer struct {
errorCh chan error
ticker *time.Ticker

log log.Logger
log log.Logger
metrics *internal.TopicMetrics
}

func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
Expand Down Expand Up @@ -221,6 +199,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
rlq: rlq,
log: client.log.SubLogger(log.Fields{"topic": topic}),
consumerName: options.Name,
metrics: client.metrics.GetTopicMetrics(topic),
}

err := consumer.internalTopicSubscribeToPartitions()
Expand Down Expand Up @@ -327,7 +306,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
keySharedPolicy: c.options.KeySharedPolicy,
schema: c.options.Schema,
}
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq)
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
err: err,
partition: idx,
Expand Down Expand Up @@ -360,15 +339,15 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
return err
}

consumersPartitions.Add(float64(partitionsToAdd))
c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
return nil
}

func topicSubscribe(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlqRouter *dlqRouter, retryRouter *retryRouter) (Consumer, error) {
c, err := newInternalConsumer(client, options, topic, messageCh, dlqRouter, retryRouter, false)
if err == nil {
consumersOpened.Inc()
c.metrics.ConsumersOpened.Inc()
}
return c, err
}
Expand Down Expand Up @@ -519,8 +498,8 @@ func (c *consumer) Close() {
c.client.handlers.Del(c)
c.dlq.close()
c.rlq.close()
consumersClosed.Inc()
consumersPartitions.Sub(float64(len(c.consumers)))
c.metrics.ConsumersClosed.Inc()
c.metrics.ConsumersPartitions.Sub(float64(len(c.consumers)))
})
}

Expand Down
2 changes: 0 additions & 2 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []str
return nil, errs
}

consumersOpened.Inc()
return mtc, nil
}

Expand Down Expand Up @@ -178,7 +177,6 @@ func (c *multiTopicConsumer) Close() {
close(c.closeCh)
c.dlq.close()
c.rlq.close()
consumersClosed.Inc()
})
}

Expand Down
77 changes: 14 additions & 63 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/gogo/protobuf/proto"

"github.com/apache/pulsar-client-go/pulsar/internal"
Expand All @@ -35,55 +32,6 @@ import (
)

var (
messagesReceived = promauto.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_messages_received",
Help: "Counter of messages received by the client",
ConstLabels: constLabels(),
})

bytesReceived = promauto.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_bytes_received",
Help: "Counter of bytes received by the client",
ConstLabels: constLabels(),
})

prefetchedMessages = promauto.NewGauge(prometheus.GaugeOpts{
Name: "pulsar_client_consumer_prefetched_messages",
Help: "Number of messages currently sitting in the consumer pre-fetch queue",
ConstLabels: constLabels(),
})

prefetchedBytes = promauto.NewGauge(prometheus.GaugeOpts{
Name: "pulsar_client_consumer_prefetched_bytes",
Help: "Total number of bytes currently sitting in the consumer pre-fetch queue",
ConstLabels: constLabels(),
})

acksCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_consumer_acks",
Help: "Counter of messages acked by client",
ConstLabels: constLabels(),
})

nacksCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_consumer_nacks",
Help: "Counter of messages nacked by client",
ConstLabels: constLabels(),
})

dlqCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_consumer_dlq_messages",
Help: "Counter of messages sent to Dead letter queue",
ConstLabels: constLabels(),
})

processingTime = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "pulsar_client_consumer_processing_time_seconds",
Help: "Time it takes for application to process messages",
Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
ConstLabels: constLabels(),
})

lastestMessageID = LatestMessageID()
)

Expand Down Expand Up @@ -172,10 +120,12 @@ type partitionConsumer struct {
log log.Logger

compressionProviders map[pb.CompressionType]compression.Provider
metrics *internal.TopicMetrics
}

func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts,
messageCh chan ConsumerMessage, dlq *dlqRouter) (*partitionConsumer, error) {
messageCh chan ConsumerMessage, dlq *dlqRouter,
metrics *internal.TopicMetrics) (*partitionConsumer, error) {
pc := &partitionConsumer{
state: consumerInit,
parentConsumer: parent,
Expand All @@ -196,6 +146,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
clearQueueCh: make(chan func(id trackingMessageID)),
compressionProviders: make(map[pb.CompressionType]compression.Provider),
dlq: dlq,
metrics: metrics,
}
pc.log = client.log.SubLogger(log.Fields{
"name": pc.name,
Expand Down Expand Up @@ -308,8 +259,8 @@ func (pc *partitionConsumer) requestGetLastMessageID() (trackingMessageID, error

func (pc *partitionConsumer) AckID(msgID trackingMessageID) {
if !msgID.Undefined() && msgID.ack() {
acksCounter.Inc()
processingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9)
pc.metrics.AcksCounter.Inc()
pc.metrics.ProcessingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9)
req := &ackRequest{
msgID: msgID,
}
Expand All @@ -321,7 +272,7 @@ func (pc *partitionConsumer) AckID(msgID trackingMessageID) {

func (pc *partitionConsumer) NackID(msgID trackingMessageID) {
pc.nackTracker.Add(msgID.messageID)
nacksCounter.Inc()
pc.metrics.NacksCounter.Inc()
}

func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
Expand Down Expand Up @@ -492,8 +443,8 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
ackTracker = newAckTracker(numMsgs)
}

messagesReceived.Add(float64(numMsgs))
prefetchedMessages.Add(float64(numMsgs))
pc.metrics.MessagesReceived.Add(float64(numMsgs))
pc.metrics.PrefetchedMessages.Add(float64(numMsgs))

for i := 0; i < numMsgs; i++ {
smm, payload, err := reader.ReadMessage()
Expand All @@ -502,8 +453,8 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header
return err
}

bytesReceived.Add(float64(len(payload)))
prefetchedBytes.Add(float64(len(payload)))
pc.metrics.BytesReceived.Add(float64(len(payload)))
pc.metrics.PrefetchedBytes.Add(float64(len(payload)))

msgID := newTrackingMessageID(
int64(pbMsgID.GetLedgerId()),
Expand Down Expand Up @@ -623,15 +574,15 @@ func (pc *partitionConsumer) dispatcher() {

if pc.dlq.shouldSendToDlq(&nextMessage) {
// pass the message to the DLQ router
dlqCounter.Inc()
pc.metrics.DlqCounter.Inc()
messageCh = pc.dlq.Chan()
} else {
// pass the message to application channel
messageCh = pc.messageCh
}

prefetchedMessages.Dec()
prefetchedBytes.Sub(float64(len(messages[0].payLoad)))
pc.metrics.PrefetchedMessages.Dec()
pc.metrics.PrefetchedBytes.Sub(float64(len(messages[0].payLoad)))
} else {
// we are ready for more messages
queueCh = pc.queueCh
Expand Down
3 changes: 3 additions & 0 deletions pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
metrics: internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
}

headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
Expand Down Expand Up @@ -65,6 +66,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
metrics: internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
}

headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
Expand Down Expand Up @@ -95,6 +97,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
eventsCh: eventsCh,
compressionProviders: make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
metrics: internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
}

headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
Expand Down
2 changes: 0 additions & 2 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p

go rc.monitor()

consumersOpened.Inc()
return rc, nil
}

Expand Down Expand Up @@ -221,7 +220,6 @@ func (c *regexConsumer) Close() {
wg.Wait()
c.dlq.close()
c.rlq.close()
consumersClosed.Inc()
})
}

Expand Down
Loading