From a2c2b5582f951eb1fd8d0b65a635944941d900c6 Mon Sep 17 00:00:00 2001 From: k2la Date: Fri, 21 Feb 2020 11:50:11 +0900 Subject: [PATCH 1/6] add redeliverUnacknowledgedMessages with messageIds --- pulsar-client-cpp/lib/ConsumerImpl.cc | 12 ++++++++++++ pulsar-client-cpp/lib/ConsumerImpl.h | 1 + pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 16 ++++++++++++++++ pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h | 1 + pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 15 +++++++++++++++ pulsar-client-cpp/lib/PartitionedConsumerImpl.h | 1 + 6 files changed, 46 insertions(+) diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index e90aef9b29fcf..27f87320dda62 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -953,6 +953,18 @@ Result ConsumerImpl::resumeMessageListener() { void ConsumerImpl::redeliverUnacknowledgedMessages() { static std::set emptySet; redeliverMessages(emptySet); + unAckedMessageTrackerPtr_->clear(); +} + +void ConsumerImpl::redeliverUnacknowledgedMessages(const std::set& messageIds) { + if (messageIds.empty()) { + return; + } + if (config_.getConsumerType() != ConsumerShared && config_.getConsumerType() != ConsumerKeyShared) { + redeliverUnacknowledgedMessages(); + return; + } + redeliverMessages(messageIds); } void ConsumerImpl::redeliverMessages(const std::set& messageIds) { diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index 8a25b499c1b1d..6d81fd0ab4361 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -98,6 +98,7 @@ class ConsumerImpl : public ConsumerImplBase, virtual bool isCumulativeAcknowledgementAllowed(ConsumerType consumerType); virtual void redeliverMessages(const std::set& messageIds); + virtual void redeliverUnacknowledgedMessages(const std::set& messageIds); virtual void negativeAcknowledge(const MessageId& msgId); virtual void closeAsync(ResultCallback callback); diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index c1ee3e945753b..069852772ae40 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -653,6 +653,22 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages() { consumer++) { (consumer->second)->redeliverUnacknowledgedMessages(); } + unAckedMessageTrackerPtr_->clear(); +} + +void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set& messageIds) { + if (messageIds.empty()) { + return; + } + if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) { + redeliverUnacknowledgedMessages(); + return; + } + LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer."); + for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); + consumer++) { + (consumer->second)->redeliverUnacknowledgedMessages(messageIds); + } } int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); } diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h index d190664c33a99..fa271febd4db4 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h @@ -69,6 +69,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase, virtual Result pauseMessageListener(); virtual Result resumeMessageListener(); virtual void redeliverUnacknowledgedMessages(); + virtual void redeliverUnacknowledgedMessages(const std::set& messageIds); virtual int getNumOfPrefetchedMessages() const; virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback); void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr, diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index 03fd2d2b9ca02..fddaddddd3ade 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -426,6 +426,21 @@ void PartitionedConsumerImpl::redeliverUnacknowledgedMessages() { for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) { (*i)->redeliverUnacknowledgedMessages(); } + unAckedMessageTrackerPtr_->clear(); +} + +void PartitionedConsumerImpl::redeliverUnacknowledgedMessages(const std::set& messageIds) { + if (messageIds.empty()) { + return; + } + if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) { + redeliverUnacknowledgedMessages(); + return; + } + LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer."); + for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) { + (*i)->redeliverUnacknowledgedMessages(messageIds); + } } const std::string& PartitionedConsumerImpl::getName() const { return partitionStr_; } diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h index 71b2a30442e63..fb4b04742bbae 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h @@ -64,6 +64,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase, virtual Result pauseMessageListener(); virtual Result resumeMessageListener(); virtual void redeliverUnacknowledgedMessages(); + virtual void redeliverUnacknowledgedMessages(const std::set& messageIds); virtual const std::string& getName() const; virtual int getNumOfPrefetchedMessages() const; virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback); From 80b70c739ace3a9a52bf82eb6754b1c6d080ccc8 Mon Sep 17 00:00:00 2001 From: k2la Date: Fri, 21 Feb 2020 12:02:48 +0900 Subject: [PATCH 2/6] add tickDurationInMs --- pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h | 4 ++++ pulsar-client-cpp/lib/ConsumerConfiguration.cc | 6 ++++++ pulsar-client-cpp/lib/ConsumerConfigurationImpl.h | 2 ++ 3 files changed, 12 insertions(+) diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h index 08b7c545eea3c..5468b378b7f92 100644 --- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h @@ -155,6 +155,10 @@ class PULSAR_PUBLIC ConsumerConfiguration { */ long getUnAckedMessagesTimeoutMs() const; + void setTickDurationInMs(const uint64_t milliSeconds); + + long getTickDurationInMs() const; + /** * Set the delay to wait before re-delivering messages that have failed to be process. *

diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc index 38fa1fe8b10b1..546b8b97cb2fb 100644 --- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc +++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc @@ -98,6 +98,12 @@ void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeco impl_->unAckedMessagesTimeoutMs = milliSeconds; } +long ConsumerConfiguration::getTickDurationInMs() const { return impl_->tickDurationInMs; } + +void ConsumerConfiguration::setTickDurationInMs(const uint64_t milliSeconds) { + impl_->tickDurationInMs = milliSeconds; +} + void ConsumerConfiguration::setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis) { impl_->negativeAckRedeliveryDelayMs = redeliveryDelayMillis; } diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h index 55dafd38aaa6b..8dd12633a79fc 100644 --- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h +++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h @@ -27,6 +27,7 @@ namespace pulsar { struct ConsumerConfigurationImpl { SchemaInfo schemaInfo; long unAckedMessagesTimeoutMs; + long tickDurationInMs; long negativeAckRedeliveryDelayMs; ConsumerType consumerType; @@ -45,6 +46,7 @@ struct ConsumerConfigurationImpl { ConsumerConfigurationImpl() : schemaInfo(), unAckedMessagesTimeoutMs(0), + tickDurationInMs(1000), negativeAckRedeliveryDelayMs(60000), consumerType(ConsumerExclusive), messageListener(), From fb5a84dee3aa5c4b2df970eb959cea87ee63e69d Mon Sep 17 00:00:00 2001 From: k2la Date: Fri, 21 Feb 2020 12:42:44 +0900 Subject: [PATCH 3/6] time partition --- pulsar-client-cpp/lib/ConsumerImpl.cc | 9 +- pulsar-client-cpp/lib/ConsumerImplBase.h | 1 + .../lib/MultiTopicsConsumerImpl.cc | 9 +- .../lib/PartitionedConsumerImpl.cc | 9 +- .../lib/UnAckedMessageTrackerEnabled.cc | 102 +++++++++++------- .../lib/UnAckedMessageTrackerEnabled.h | 6 +- 6 files changed, 89 insertions(+), 47 deletions(-) diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 27f87320dda62..b856c63183465 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -66,8 +66,13 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] "; consumerStr_ = consumerStrStream.str(); if (conf.getUnAckedMessagesTimeoutMs() != 0) { - unAckedMessageTrackerPtr_.reset( - new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); + if (conf.getTickDurationInMs() > 0) { + unAckedMessageTrackerPtr_.reset( + new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this)); + } else { + unAckedMessageTrackerPtr_.reset( + new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); + } } else { unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled()); } diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h index ab6ed9f14bb83..fc1506643ca1d 100644 --- a/pulsar-client-cpp/lib/ConsumerImplBase.h +++ b/pulsar-client-cpp/lib/ConsumerImplBase.h @@ -48,6 +48,7 @@ class ConsumerImplBase { virtual Result pauseMessageListener() = 0; virtual Result resumeMessageListener() = 0; virtual void redeliverUnacknowledgedMessages() = 0; + virtual void redeliverUnacknowledgedMessages(const std::set& messageIds) = 0; virtual const std::string& getName() const = 0; virtual int getNumOfPrefetchedMessages() const = 0; virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0; diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index 069852772ae40..8f9c93920fc38 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -45,8 +45,13 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std consumerStr_ = consumerStrStream.str(); if (conf.getUnAckedMessagesTimeoutMs() != 0) { - unAckedMessageTrackerPtr_.reset( - new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); + if (conf.getTickDurationInMs() > 0) { + unAckedMessageTrackerPtr_.reset( + new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this)); + } else { + unAckedMessageTrackerPtr_.reset( + new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); + } } else { unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled()); } diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index fddaddddd3ade..6d9b5bb8f6ee4 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -43,8 +43,13 @@ PartitionedConsumerImpl::PartitionedConsumerImpl(ClientImplPtr client, const std consumerStrStream << "[Partitioned Consumer: " << topic_ << "," << subscriptionName << "," << numPartitions << "]"; if (conf.getUnAckedMessagesTimeoutMs() != 0) { - unAckedMessageTrackerPtr_.reset( - new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); + if (conf.getTickDurationInMs() > 0) { + unAckedMessageTrackerPtr_.reset( + new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this)); + } else { + unAckedMessageTrackerPtr_.reset( + new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); + } } else { unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled()); } diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc index 2c768b2e561cd..e212e0dbd5ec0 100644 --- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc +++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc @@ -42,86 +42,110 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() { std::lock_guard acquire(lock_); LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ " << consumerReference_.getName().c_str()); - if (!oldSet_.empty()) { + + std::set headPartition = timePartitions.front(); + timePartitions.pop_front(); + + std::set msgIdsToRedeliver; + if (!headPartition.empty()) { LOG_INFO(consumerReference_.getName().c_str() - << ": " << oldSet_.size() << " Messages were not acked within " << timeoutMs_ << " time"); - oldSet_.clear(); - currentSet_.clear(); - consumerReference_.redeliverUnacknowledgedMessages(); + << ": " << headPartition.size() << " Messages were not acked within " << timePartitions.size() * tickDurationInMs_ << " time"); + for (auto it = headPartition.begin(); it != headPartition.end(); it++) { + msgIdsToRedeliver.insert(*it); + messageIdPartitionMap.erase(*it); + } + } + headPartition.clear(); + timePartitions.push_back(headPartition); + + if (msgIdsToRedeliver.size() > 0) { + consumerReference_.redeliverUnacknowledgedMessages(msgIdsToRedeliver); } - oldSet_.swap(currentSet_); } UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr client, ConsumerImplBase& consumer) + : consumerReference_(consumer){ + UnAckedMessageTrackerEnabled(timeoutMs, timeoutMs, client, consumer); +} + +UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long tickDurationInMs, const ClientImplPtr client, + ConsumerImplBase& consumer) : consumerReference_(consumer) { timeoutMs_ = timeoutMs; + tickDurationInMs_ = (timeoutMs >= tickDurationInMs) ? tickDurationInMs : timeoutMs; client_ = client; + + int blankPartitions = (int)std::ceil((double)timeoutMs_ / tickDurationInMs_); + for (int i = 0; i < blankPartitions + 1; i++) { + std::set msgIds; + timePartitions.push_back(msgIds); + } + timeoutHandler(); } bool UnAckedMessageTrackerEnabled::add(const MessageId& m) { std::lock_guard acquire(lock_); - oldSet_.erase(m); - return currentSet_.insert(m).second; + if (messageIdPartitionMap.count(m) == 0) { + bool insert = messageIdPartitionMap.insert(std::make_pair(m, timePartitions.back())).second; + return insert && timePartitions.back().insert(m).second; + } + return false; } bool UnAckedMessageTrackerEnabled::isEmpty() { std::lock_guard acquire(lock_); - return oldSet_.empty() && currentSet_.empty(); + return messageIdPartitionMap.empty(); } bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) { std::lock_guard acquire(lock_); - return oldSet_.erase(m) || currentSet_.erase(m); + bool removed = false; + std::map>::iterator exist = messageIdPartitionMap.find(m); + if (exist != messageIdPartitionMap.end()) { + removed = exist->second.erase(m); + } + return removed; } long UnAckedMessageTrackerEnabled::size() { std::lock_guard acquire(lock_); - return oldSet_.size() + currentSet_.size(); + return messageIdPartitionMap.size(); } void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) { std::lock_guard acquire(lock_); - for (std::set::iterator it = oldSet_.begin(); it != oldSet_.end();) { - if (*it < msgId && it->partition() == msgId.partition()) { - oldSet_.erase(it++); - } else { - it++; - } - } - for (std::set::iterator it = currentSet_.begin(); it != currentSet_.end();) { - if (*it < msgId && it->partition() == msgId.partition()) { - currentSet_.erase(it++); - } else { - it++; + for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) { + MessageId msgIdInMap = it->first; + if (msgIdInMap < msgId) { + std::map>::iterator exist = messageIdPartitionMap.find(msgId); + if (exist != messageIdPartitionMap.end()) { + exist->second.erase(msgId); + } } } } // this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's message. void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic) { - for (std::set::iterator it = oldSet_.begin(); it != oldSet_.end();) { - const std::string& topicPartitionName = it->getTopicName(); - if (topicPartitionName.find(topic) != std::string::npos) { - oldSet_.erase(it++); - } else { - it++; - } - } - for (std::set::iterator it = currentSet_.begin(); it != currentSet_.end();) { - const std::string& topicPartitionName = it->getTopicName(); - if (topicPartitionName.find(topic) != std::string::npos) { - currentSet_.erase(it++); - } else { - it++; + std::lock_guard acquire(lock_); + for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) { + MessageId msgIdInMap = it->first; + if (msgIdInMap.getTopicName().compare(topic) == 0) { + std::map>::iterator exist = messageIdPartitionMap.find(msgIdInMap); + if (exist != messageIdPartitionMap.end()) { + exist->second.erase(msgIdInMap); + } } } } void UnAckedMessageTrackerEnabled::clear() { - currentSet_.clear(); - oldSet_.clear(); + messageIdPartitionMap.clear(); + for (auto it = timePartitions.begin(); it != timePartitions.end(); it++) { + it->clear(); + } } UnAckedMessageTrackerEnabled::~UnAckedMessageTrackerEnabled() { diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h index 921e74725d0ed..c2b4012adb184 100644 --- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h +++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h @@ -28,6 +28,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface { public: ~UnAckedMessageTrackerEnabled(); UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr, ConsumerImplBase&); + UnAckedMessageTrackerEnabled(long timeoutMs, long tickDuration, const ClientImplPtr, ConsumerImplBase&); bool add(const MessageId& m); bool remove(const MessageId& m); void removeMessagesTill(const MessageId& msgId); @@ -40,13 +41,14 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface { void timeoutHandlerHelper(); bool isEmpty(); long size(); - std::set currentSet_; - std::set oldSet_; + std::map> messageIdPartitionMap; + std::deque> timePartitions; std::mutex lock_; DeadlineTimerPtr timer_; ConsumerImplBase& consumerReference_; ClientImplPtr client_; long timeoutMs_; + long tickDurationInMs_; }; } // namespace pulsar From 7e0b75c0dfd2b2bd4b624c71d4ad2f48364c5397 Mon Sep 17 00:00:00 2001 From: k2la Date: Fri, 21 Feb 2020 13:03:05 +0900 Subject: [PATCH 4/6] fix expires time --- pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc index e212e0dbd5ec0..65f9fde6072cc 100644 --- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc +++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc @@ -28,7 +28,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() { timeoutHandlerHelper(); ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get(); timer_ = executorService->createDeadlineTimer(); - timer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs_)); + timer_->expires_from_now(boost::posix_time::milliseconds(tickDurationInMs_)); timer_->async_wait([&](const boost::system::error_code& ec) { if (ec) { LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]"); From 042abcc809e92fd8580b8f5a1e4ec25af4f38531 Mon Sep 17 00:00:00 2001 From: k2la Date: Tue, 25 Feb 2020 13:12:48 +0900 Subject: [PATCH 5/6] make format --- .../include/pulsar/c/client_configuration.h | 8 +++++++- .../include/pulsar/c/consumer_configuration.h | 6 ++++-- .../include/pulsar/c/producer_configuration.h | 16 ++++++++++++---- pulsar-client-cpp/include/pulsar/c/result.h | 3 ++- pulsar-client-cpp/lib/ConsumerImpl.cc | 4 ++-- pulsar-client-cpp/lib/LogUtils.cc | 2 +- pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc | 4 ++-- pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 4 ++-- .../lib/UnAckedMessageTrackerEnabled.cc | 16 +++++++++------- 9 files changed, 41 insertions(+), 22 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/c/client_configuration.h b/pulsar-client-cpp/include/pulsar/c/client_configuration.h index a42164247a7d8..a140de6fd8ae6 100644 --- a/pulsar-client-cpp/include/pulsar/c/client_configuration.h +++ b/pulsar-client-cpp/include/pulsar/c/client_configuration.h @@ -25,7 +25,13 @@ extern "C" { #endif -typedef enum { pulsar_DEBUG = 0, pulsar_INFO = 1, pulsar_WARN = 2, pulsar_ERROR = 3 } pulsar_logger_level_t; +typedef enum +{ + pulsar_DEBUG = 0, + pulsar_INFO = 1, + pulsar_WARN = 2, + pulsar_ERROR = 3 +} pulsar_logger_level_t; typedef void (*pulsar_logger)(pulsar_logger_level_t level, const char *file, int line, const char *message, void *ctx); diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h index 7f7cfcc954069..5db193d75bdd5 100644 --- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h +++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h @@ -28,7 +28,8 @@ extern "C" { typedef struct _pulsar_consumer_configuration pulsar_consumer_configuration_t; -typedef enum { +typedef enum +{ /** * There can be only 1 consumer on the same topic with the same consumerName */ @@ -52,7 +53,8 @@ typedef enum { pulsar_ConsumerKeyShared } pulsar_consumer_type; -typedef enum { +typedef enum +{ /** * the latest position which means the start consuming position will be the last message */ diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h index c846451f9d8a3..0b11fa6571792 100644 --- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h +++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h @@ -28,21 +28,29 @@ extern "C" { #endif -typedef enum { +typedef enum +{ pulsar_UseSinglePartition, pulsar_RoundRobinDistribution, pulsar_CustomPartition } pulsar_partitions_routing_mode; -typedef enum { pulsar_Murmur3_32Hash, pulsar_BoostHash, pulsar_JavaStringHash } pulsar_hashing_scheme; +typedef enum +{ + pulsar_Murmur3_32Hash, + pulsar_BoostHash, + pulsar_JavaStringHash +} pulsar_hashing_scheme; -typedef enum { +typedef enum +{ pulsar_CompressionNone = 0, pulsar_CompressionLZ4 = 1, pulsar_CompressionZLib = 2 } pulsar_compression_type; -typedef enum { +typedef enum +{ pulsar_None = 0, pulsar_String = 1, pulsar_Json = 2, diff --git a/pulsar-client-cpp/include/pulsar/c/result.h b/pulsar-client-cpp/include/pulsar/c/result.h index 66a8718192b1e..d8f8f7f3d3e92 100644 --- a/pulsar-client-cpp/include/pulsar/c/result.h +++ b/pulsar-client-cpp/include/pulsar/c/result.h @@ -25,7 +25,8 @@ extern "C" { #endif -typedef enum { +typedef enum +{ pulsar_result_Ok, /// Operation successful pulsar_result_UnknownError, /// Unknown error happened on broker diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index b856c63183465..6e6b1058b712f 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -67,8 +67,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, consumerStr_ = consumerStrStream.str(); if (conf.getUnAckedMessagesTimeoutMs() != 0) { if (conf.getTickDurationInMs() > 0) { - unAckedMessageTrackerPtr_.reset( - new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this)); + unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( + conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this)); } else { unAckedMessageTrackerPtr_.reset( new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); diff --git a/pulsar-client-cpp/lib/LogUtils.cc b/pulsar-client-cpp/lib/LogUtils.cc index e2615a54e6b80..f9c9c9a5d65bd 100644 --- a/pulsar-client-cpp/lib/LogUtils.cc +++ b/pulsar-client-cpp/lib/LogUtils.cc @@ -26,7 +26,7 @@ namespace pulsar { void LogUtils::init(const std::string& logfilePath) { -// If this is called explicitely, we fallback to Log4cxx config, if enabled + // If this is called explicitely, we fallback to Log4cxx config, if enabled #ifdef USE_LOG4CXX if (!logfilePath.empty()) { diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index 8f9c93920fc38..e7102f7a31685 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -46,8 +46,8 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std if (conf.getUnAckedMessagesTimeoutMs() != 0) { if (conf.getTickDurationInMs() > 0) { - unAckedMessageTrackerPtr_.reset( - new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this)); + unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( + conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this)); } else { unAckedMessageTrackerPtr_.reset( new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index 6d9b5bb8f6ee4..0241a5439bf64 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -44,8 +44,8 @@ PartitionedConsumerImpl::PartitionedConsumerImpl(ClientImplPtr client, const std << numPartitions << "]"; if (conf.getUnAckedMessagesTimeoutMs() != 0) { if (conf.getTickDurationInMs() > 0) { - unAckedMessageTrackerPtr_.reset( - new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this)); + unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled( + conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this)); } else { unAckedMessageTrackerPtr_.reset( new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this)); diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc index 65f9fde6072cc..7894e64a874f6 100644 --- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc +++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc @@ -49,7 +49,8 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() { std::set msgIdsToRedeliver; if (!headPartition.empty()) { LOG_INFO(consumerReference_.getName().c_str() - << ": " << headPartition.size() << " Messages were not acked within " << timePartitions.size() * tickDurationInMs_ << " time"); + << ": " << headPartition.size() << " Messages were not acked within " + << timePartitions.size() * tickDurationInMs_ << " time"); for (auto it = headPartition.begin(); it != headPartition.end(); it++) { msgIdsToRedeliver.insert(*it); messageIdPartitionMap.erase(*it); @@ -65,11 +66,12 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() { UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr client, ConsumerImplBase& consumer) - : consumerReference_(consumer){ + : consumerReference_(consumer) { UnAckedMessageTrackerEnabled(timeoutMs, timeoutMs, client, consumer); } -UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long tickDurationInMs, const ClientImplPtr client, +UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long tickDurationInMs, + const ClientImplPtr client, ConsumerImplBase& consumer) : consumerReference_(consumer) { timeoutMs_ = timeoutMs; @@ -88,7 +90,7 @@ UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long bool UnAckedMessageTrackerEnabled::add(const MessageId& m) { std::lock_guard acquire(lock_); if (messageIdPartitionMap.count(m) == 0) { - bool insert = messageIdPartitionMap.insert(std::make_pair(m, timePartitions.back())).second; + bool insert = messageIdPartitionMap.insert(std::make_pair(m, timePartitions.back())).second; return insert && timePartitions.back().insert(m).second; } return false; @@ -103,9 +105,9 @@ bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) { std::lock_guard acquire(lock_); bool removed = false; std::map>::iterator exist = messageIdPartitionMap.find(m); - if (exist != messageIdPartitionMap.end()) { - removed = exist->second.erase(m); - } + if (exist != messageIdPartitionMap.end()) { + removed = exist->second.erase(m); + } return removed; } From 99cd71cd8c4b1403dd782c4d06fdd0e12aa04a05 Mon Sep 17 00:00:00 2001 From: k2la Date: Wed, 26 Feb 2020 09:58:57 +0900 Subject: [PATCH 6/6] make format --- .../include/pulsar/c/client_configuration.h | 8 +------- .../include/pulsar/c/consumer_configuration.h | 6 ++---- .../include/pulsar/c/producer_configuration.h | 18 +++++------------- pulsar-client-cpp/include/pulsar/c/result.h | 3 +-- pulsar-client-cpp/lib/LogUtils.cc | 4 ++-- 5 files changed, 11 insertions(+), 28 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/c/client_configuration.h b/pulsar-client-cpp/include/pulsar/c/client_configuration.h index a140de6fd8ae6..a42164247a7d8 100644 --- a/pulsar-client-cpp/include/pulsar/c/client_configuration.h +++ b/pulsar-client-cpp/include/pulsar/c/client_configuration.h @@ -25,13 +25,7 @@ extern "C" { #endif -typedef enum -{ - pulsar_DEBUG = 0, - pulsar_INFO = 1, - pulsar_WARN = 2, - pulsar_ERROR = 3 -} pulsar_logger_level_t; +typedef enum { pulsar_DEBUG = 0, pulsar_INFO = 1, pulsar_WARN = 2, pulsar_ERROR = 3 } pulsar_logger_level_t; typedef void (*pulsar_logger)(pulsar_logger_level_t level, const char *file, int line, const char *message, void *ctx); diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h index 5db193d75bdd5..7f7cfcc954069 100644 --- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h +++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h @@ -28,8 +28,7 @@ extern "C" { typedef struct _pulsar_consumer_configuration pulsar_consumer_configuration_t; -typedef enum -{ +typedef enum { /** * There can be only 1 consumer on the same topic with the same consumerName */ @@ -53,8 +52,7 @@ typedef enum pulsar_ConsumerKeyShared } pulsar_consumer_type; -typedef enum -{ +typedef enum { /** * the latest position which means the start consuming position will be the last message */ diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h index 0b11fa6571792..1fe44e082fc62 100644 --- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h +++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h @@ -28,29 +28,21 @@ extern "C" { #endif -typedef enum -{ +typedef enum { pulsar_UseSinglePartition, pulsar_RoundRobinDistribution, pulsar_CustomPartition } pulsar_partitions_routing_mode; -typedef enum -{ - pulsar_Murmur3_32Hash, - pulsar_BoostHash, - pulsar_JavaStringHash -} pulsar_hashing_scheme; +typedef enum { pulsar_Murmur3_32Hash, pulsar_BoostHash, pulsar_JavaStringHash } pulsar_hashing_scheme; -typedef enum -{ +typedef enum { pulsar_CompressionNone = 0, pulsar_CompressionLZ4 = 1, pulsar_CompressionZLib = 2 } pulsar_compression_type; -typedef enum -{ +typedef enum { pulsar_None = 0, pulsar_String = 1, pulsar_Json = 2, @@ -186,4 +178,4 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_property(pulsar_producer_co #ifdef __cplusplus } -#endif \ No newline at end of file +#endif diff --git a/pulsar-client-cpp/include/pulsar/c/result.h b/pulsar-client-cpp/include/pulsar/c/result.h index d8f8f7f3d3e92..66a8718192b1e 100644 --- a/pulsar-client-cpp/include/pulsar/c/result.h +++ b/pulsar-client-cpp/include/pulsar/c/result.h @@ -25,8 +25,7 @@ extern "C" { #endif -typedef enum -{ +typedef enum { pulsar_result_Ok, /// Operation successful pulsar_result_UnknownError, /// Unknown error happened on broker diff --git a/pulsar-client-cpp/lib/LogUtils.cc b/pulsar-client-cpp/lib/LogUtils.cc index f9c9c9a5d65bd..e4f6a1767f873 100644 --- a/pulsar-client-cpp/lib/LogUtils.cc +++ b/pulsar-client-cpp/lib/LogUtils.cc @@ -26,7 +26,7 @@ namespace pulsar { void LogUtils::init(const std::string& logfilePath) { - // If this is called explicitely, we fallback to Log4cxx config, if enabled +// If this is called explicitely, we fallback to Log4cxx config, if enabled #ifdef USE_LOG4CXX if (!logfilePath.empty()) { @@ -55,4 +55,4 @@ std::string LogUtils::getLoggerName(const std::string& path) { return path.substr(startIdx + 1, endIdx - startIdx - 1); } -} // namespace pulsar \ No newline at end of file +} // namespace pulsar