Skip to content

Commit

Permalink
[Issue 652] Quick fixes to the documentation for the main building bl…
Browse files Browse the repository at this point in the history
…ocks of the library (#667)

* fix documentation

* fix linter errors
  • Loading branch information
reugn committed Nov 14, 2021
1 parent 1576868 commit 7773d27
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 98 deletions.
83 changes: 45 additions & 38 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"time"
)

// Pair of a Consumer and Message
// ConsumerMessage represents a pair of a Consumer and Message.
type ConsumerMessage struct {
Consumer
Message
Expand Down Expand Up @@ -53,113 +53,120 @@ const (
type SubscriptionInitialPosition int

const (
// Latest position which means the start consuming position will be the last message
// SubscriptionPositionLatest is the latest position which means the start consuming position
// will be the last message
SubscriptionPositionLatest SubscriptionInitialPosition = iota

// Earliest position which means the start consuming position will be the first message
// SubscriptionPositionEarliest is the earliest position which means the start consuming position
// will be the first message
SubscriptionPositionEarliest
)

// Configuration for Dead Letter Queue consumer policy
// DLQPolicy represents the configuration for the Dead Letter Queue consumer policy.
type DLQPolicy struct {
// Maximum number of times that a message will be delivered before being sent to the dead letter queue.
// MaxDeliveries specifies the maximum number of times that a message will be delivered before being
// sent to the dead letter queue.
MaxDeliveries uint32

// Name of the topic where the failing messages will be sent.
// DeadLetterTopic specifies the name of the topic where the failing messages will be sent.
DeadLetterTopic string

// Name of the topic where the retry messages will be sent.
// RetryLetterTopic specifies the name of the topic where the retry messages will be sent.
RetryLetterTopic string
}

// ConsumerOptions is used to configure and create instances of Consumer
// ConsumerOptions is used to configure and create instances of Consumer.
type ConsumerOptions struct {
// Specify the topic this consumer will subscribe on.
// Topic specifies the topic this consumer will subscribe on.
// Either a topic, a list of topics or a topics pattern are required when subscribing
Topic string

// Specify a list of topics this consumer will subscribe on.
// Topics specifies a list of topics this consumer will subscribe on.
// Either a topic, a list of topics or a topics pattern are required when subscribing
Topics []string

// Specify a regular expression to subscribe to multiple topics under the same namespace.
// TopicsPattern specifies a regular expression to subscribe to multiple topics under the same namespace.
// Either a topic, a list of topics or a topics pattern are required when subscribing
TopicsPattern string

// Specify the interval in which to poll for new partitions or new topics if using a TopicsPattern.
// AutoDiscoveryPeriod specifies the interval in which to poll for new partitions or new topics
// if using a TopicsPattern.
AutoDiscoveryPeriod time.Duration

// Specify the subscription name for this consumer
// SubscriptionName specifies the subscription name for this consumer
// This argument is required when subscribing
SubscriptionName string

// Attach a set of application defined properties to the consumer
// This properties will be visible in the topic stats
// Properties represents a set of application defined properties for the consumer.
// Those properties will be visible in the topic stats
Properties map[string]string

// Select the subscription type to be used when subscribing to the topic.
// Type specifies the subscription type to be used when subscribing to a topic.
// Default is `Exclusive`
Type SubscriptionType

// InitialPosition at which the cursor will be set when subscribe
// SubscriptionInitialPosition is the initial position at which the cursor will be set when subscribe
// Default is `Latest`
SubscriptionInitialPosition

// Configuration for Dead Letter Queue consumer policy.
// DLQ represents the configuration for Dead Letter Queue consumer policy.
// eg. route the message to topic X after N failed attempts at processing it
// By default is nil and there's no DLQ
DLQ *DLQPolicy

// Configuration for Key Shared consumer policy.
// KeySharedPolicy represents the configuration for Key Shared consumer policy.
KeySharedPolicy *KeySharedPolicy

// Auto retry send messages to default filled DLQPolicy topics
// RetryEnable determines whether to automatically retry sending messages to default filled DLQPolicy topics.
// Default is false
RetryEnable bool

// Sets a `MessageChannel` for the consumer
// MessageChannel sets a `MessageChannel` for the consumer
// When a message is received, it will be pushed to the channel for consumption
MessageChannel chan ConsumerMessage

// Sets the size of the consumer receive queue.
// ReceiverQueueSize sets the size of the consumer receive queue.
// The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
// application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
// throughput at the expense of bigger memory utilization.
// Default value is `1000` messages and should be good for most use cases.
ReceiverQueueSize int

// The delay after which to redeliver the messages that failed to be
// processed. Default is 1min. (See `Consumer.Nack()`)
// NackRedeliveryDelay specifies the delay after which to redeliver the messages that failed to be
// processed. Default is 1 min. (See `Consumer.Nack()`)
NackRedeliveryDelay time.Duration

// Set the consumer name.
// Name specifies the consumer name.
Name string

// If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
// of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
// each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
// point, the messages will be sent as normal.
// ReadCompacted, if enabled, the consumer will read messages from the compacted topic rather than reading the
// full message backlog of the topic. This means that, if the topic has been compacted, the consumer will only
// see the latest value for each key in the topic, up until the point in the topic message backlog that has been
// compacted. Beyond that point, the messages will be sent as normal.
//
// ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
// failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
// shared subscription, will lead to the subscription call throwing a PulsarClientException.
ReadCompacted bool

// Mark the subscription as replicated to keep it in sync across clusters
// ReplicateSubscriptionState marks the subscription as replicated to keep it in sync across clusters
ReplicateSubscriptionState bool

// A chain of interceptors, These interceptors will be called at some points defined in ConsumerInterceptor interface.
// Interceptors is a chain of interceptors. These interceptors will be called at some points defined in
// ConsumerInterceptor interface.
Interceptors ConsumerInterceptors

// Schema represents the schema implementation.
Schema Schema

// MaxReconnectToBroker set the maximum retry number of reconnectToBroker. (default: ultimate)
// MaxReconnectToBroker sets the maximum retry number of reconnectToBroker. (default: ultimate)
MaxReconnectToBroker *uint

// Decryption decryption related fields to decrypt the encrypted message
// Decryption represents the encryption related fields required by the consumer to decrypt a message.
Decryption *MessageDecryptionInfo

// If enabled, the default implementation of NackBackoffPolicy will be used to calculate the delay time of
// EnableDefaultNackBackoffPolicy, if enabled, the default implementation of NackBackoffPolicy will be used
// to calculate the delay time of
// nack backoff, Default: false.
EnableDefaultNackBackoffPolicy bool

Expand Down Expand Up @@ -195,7 +202,7 @@ type Consumer interface {
// ReconsumeLater mark a message for redelivery after custom delay
ReconsumeLater(msg Message, delay time.Duration)

// Acknowledge the failure to process a single message.
// Nack acknowledges the failure to process a single message.
//
// When a message is "negatively acked" it will be marked for redelivery after
// some fixed delay. The delay is configurable when constructing the consumer
Expand All @@ -204,7 +211,7 @@ type Consumer interface {
// This call is not blocking.
Nack(Message)

// Acknowledge the failure to process a single message.
// NackID acknowledges the failure to process a single message.
//
// When a message is "negatively acked" it will be marked for redelivery after
// some fixed delay. The delay is configurable when constructing the consumer
Expand All @@ -216,14 +223,14 @@ type Consumer interface {
// Close the consumer and stop the broker to push more messages
Close()

// Reset the subscription associated with this consumer to a specific message id.
// Seek resets the subscription associated with this consumer to a specific message id.
// The message id can either be a specific message or represent the first or last messages in the topic.
//
// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
// seek() on the individual partitions.
Seek(MessageID) error

// Reset the subscription associated with this consumer to a specific message publish time.
// SeekByTime resets the subscription associated with this consumer to a specific message publish time.
//
// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the seek() on
// the individual partitions.
Expand Down
49 changes: 25 additions & 24 deletions pulsar/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type ProducerMessage struct {
// Payload for the message
Payload []byte

//Value and payload is mutually exclusive, `Value interface{}` for schema message.
// Value and payload is mutually exclusive, `Value interface{}` for schema message.
Value interface{}

// Key sets the key of the message for routing policy
Expand All @@ -48,19 +48,19 @@ type ProducerMessage struct {
// ReplicationClusters override the replication clusters for this message.
ReplicationClusters []string

// Disable the replication for this message
// DisableReplication disables the replication for this message
DisableReplication bool

// SequenceID set the sequence id to assign to the current message
// SequenceID sets the sequence id to assign to the current message
SequenceID *int64

// Request to deliver the message only after the specified relative delay.
// DeliverAfter requests to deliver the message only after the specified relative delay.
// Note: messages are only delivered with delay when a consumer is consuming
// through a `SubscriptionType=Shared` subscription. With other subscription
// types, the messages will still be delivered immediately.
DeliverAfter time.Duration

// Deliver the message only at or after the specified absolute timestamp.
// DeliverAt delivers the message only at or after the specified absolute timestamp.
// Note: messages are only delivered with delay when a consumer is consuming
// through a `SubscriptionType=Shared` subscription. With other subscription
// types, the messages will still be delivered immediately.
Expand All @@ -69,56 +69,57 @@ type ProducerMessage struct {

// Message abstraction used in Pulsar
type Message interface {
// Topic get the topic from which this message originated from
// Topic returns the topic from which this message originated from.
Topic() string

// ProducerName returns the name of the producer that has published the message.
ProducerName() string

// Properties are application defined key/value pairs that will be attached to the message.
// Return the properties attached to the message.
// Returns the properties attached to the message.
Properties() map[string]string

// Payload get the payload of the message
// Payload returns the payload of the message
Payload() []byte

// ID get the unique message ID associated with this message.
// ID returns the unique message ID associated with this message.
// The message id can be used to univocally refer to a message without having the keep the entire payload in memory.
ID() MessageID

// PublishTime get the publish time of this message. The publish time is the timestamp that a client
// PublishTime returns the publish time of this message. The publish time is the timestamp that a client
// publish the message.
PublishTime() time.Time

// EventTime get the event time associated with this message. It is typically set by the applications via
// EventTime returns the event time associated with this message. It is typically set by the applications via
// `ProducerMessage.EventTime`.
// If EventTime is 0, it means there isn't any event time associated with this message.
EventTime() time.Time

// Key get the key of the message, if any
// Key returns the key of the message, if any
Key() string

// OrderingKey get the ordering key of the message, if any
// OrderingKey returns the ordering key of the message, if any
OrderingKey() string

// Get message redelivery count, redelivery count maintain in pulsar broker. When client nack acknowledge messages,
// RedeliveryCount returns message redelivery count, redelivery count maintain in pulsar broker.
// When client nack acknowledge messages,
// broker will dispatch message again with message redelivery count in CommandMessage defined.
//
// Message redelivery increases monotonically in a broker, when topic switch ownership to a another broker
// redelivery count will be recalculated.
RedeliveryCount() uint32

// Check whether the message is replicated from other cluster.
// IsReplicated determines whether the message is replicated from another cluster.
IsReplicated() bool

// Get name of cluster, from which the message is replicated.
// GetReplicatedFrom returns the name of the cluster, from which the message is replicated.
GetReplicatedFrom() string

//Get the de-serialized value of the message, according the configured
// GetSchemaValue returns the de-serialized value of the message, according to the configuration.
GetSchemaValue(v interface{}) error

// GetEncryptionContext get the ecryption context of message
// It will be used by the application to parse undecrypted message
// GetEncryptionContext returns the ecryption context of the message.
// It will be used by the application to parse the undecrypted message.
GetEncryptionContext() *EncryptionContext
}

Expand All @@ -127,16 +128,16 @@ type MessageID interface {
// Serialize the message id into a sequence of bytes that can be stored somewhere else
Serialize() []byte

// Get the message ledgerID
// LedgerID returns the message ledgerID
LedgerID() int64

// Get the message entryID
// EntryID returns the message entryID
EntryID() int64

// Get the message batchIdx
// BatchIdx returns the message batchIdx
BatchIdx() int32

// Get the message partitionIdx
// PartitionIdx returns the message partitionIdx
PartitionIdx() int32
}

Expand All @@ -150,7 +151,7 @@ func EarliestMessageID() MessageID {
return earliestMessageID
}

// LatestMessage returns a messageID that points to the latest message
// LatestMessageID returns a messageID that points to the latest message
func LatestMessageID() MessageID {
return latestMessageID
}
Loading

0 comments on commit 7773d27

Please sign in to comment.