diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 08b7c545eea3c..5468b378b7f92 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -155,6 +155,10 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
long getUnAckedMessagesTimeoutMs() const;
+ void setTickDurationInMs(const uint64_t milliSeconds);
+
+ long getTickDurationInMs() const;
+
/**
* Set the delay to wait before re-delivering messages that have failed to be process.
*
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
index c846451f9d8a3..1fe44e082fc62 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -178,4 +178,4 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_property(pulsar_producer_co
#ifdef __cplusplus
}
-#endif
\ No newline at end of file
+#endif
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 38fa1fe8b10b1..546b8b97cb2fb 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -98,6 +98,12 @@ void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeco
impl_->unAckedMessagesTimeoutMs = milliSeconds;
}
+long ConsumerConfiguration::getTickDurationInMs() const { return impl_->tickDurationInMs; }
+
+void ConsumerConfiguration::setTickDurationInMs(const uint64_t milliSeconds) {
+ impl_->tickDurationInMs = milliSeconds;
+}
+
void ConsumerConfiguration::setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis) {
impl_->negativeAckRedeliveryDelayMs = redeliveryDelayMillis;
}
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 55dafd38aaa6b..8dd12633a79fc 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -27,6 +27,7 @@ namespace pulsar {
struct ConsumerConfigurationImpl {
SchemaInfo schemaInfo;
long unAckedMessagesTimeoutMs;
+ long tickDurationInMs;
long negativeAckRedeliveryDelayMs;
ConsumerType consumerType;
@@ -45,6 +46,7 @@ struct ConsumerConfigurationImpl {
ConsumerConfigurationImpl()
: schemaInfo(),
unAckedMessagesTimeoutMs(0),
+ tickDurationInMs(1000),
negativeAckRedeliveryDelayMs(60000),
consumerType(ConsumerExclusive),
messageListener(),
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index e90aef9b29fcf..6e6b1058b712f 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -66,8 +66,13 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] ";
consumerStr_ = consumerStrStream.str();
if (conf.getUnAckedMessagesTimeoutMs() != 0) {
- unAckedMessageTrackerPtr_.reset(
- new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
+ if (conf.getTickDurationInMs() > 0) {
+ unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+ conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this));
+ } else {
+ unAckedMessageTrackerPtr_.reset(
+ new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
+ }
} else {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
}
@@ -953,6 +958,18 @@ Result ConsumerImpl::resumeMessageListener() {
void ConsumerImpl::redeliverUnacknowledgedMessages() {
static std::set emptySet;
redeliverMessages(emptySet);
+ unAckedMessageTrackerPtr_->clear();
+}
+
+void ConsumerImpl::redeliverUnacknowledgedMessages(const std::set& messageIds) {
+ if (messageIds.empty()) {
+ return;
+ }
+ if (config_.getConsumerType() != ConsumerShared && config_.getConsumerType() != ConsumerKeyShared) {
+ redeliverUnacknowledgedMessages();
+ return;
+ }
+ redeliverMessages(messageIds);
}
void ConsumerImpl::redeliverMessages(const std::set& messageIds) {
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 8a25b499c1b1d..6d81fd0ab4361 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -98,6 +98,7 @@ class ConsumerImpl : public ConsumerImplBase,
virtual bool isCumulativeAcknowledgementAllowed(ConsumerType consumerType);
virtual void redeliverMessages(const std::set& messageIds);
+ virtual void redeliverUnacknowledgedMessages(const std::set& messageIds);
virtual void negativeAcknowledge(const MessageId& msgId);
virtual void closeAsync(ResultCallback callback);
diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h
index ab6ed9f14bb83..fc1506643ca1d 100644
--- a/pulsar-client-cpp/lib/ConsumerImplBase.h
+++ b/pulsar-client-cpp/lib/ConsumerImplBase.h
@@ -48,6 +48,7 @@ class ConsumerImplBase {
virtual Result pauseMessageListener() = 0;
virtual Result resumeMessageListener() = 0;
virtual void redeliverUnacknowledgedMessages() = 0;
+ virtual void redeliverUnacknowledgedMessages(const std::set& messageIds) = 0;
virtual const std::string& getName() const = 0;
virtual int getNumOfPrefetchedMessages() const = 0;
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
diff --git a/pulsar-client-cpp/lib/LogUtils.cc b/pulsar-client-cpp/lib/LogUtils.cc
index e2615a54e6b80..e4f6a1767f873 100644
--- a/pulsar-client-cpp/lib/LogUtils.cc
+++ b/pulsar-client-cpp/lib/LogUtils.cc
@@ -55,4 +55,4 @@ std::string LogUtils::getLoggerName(const std::string& path) {
return path.substr(startIdx + 1, endIdx - startIdx - 1);
}
-} // namespace pulsar
\ No newline at end of file
+} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index c1ee3e945753b..e7102f7a31685 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -45,8 +45,13 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
consumerStr_ = consumerStrStream.str();
if (conf.getUnAckedMessagesTimeoutMs() != 0) {
- unAckedMessageTrackerPtr_.reset(
- new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
+ if (conf.getTickDurationInMs() > 0) {
+ unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+ conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this));
+ } else {
+ unAckedMessageTrackerPtr_.reset(
+ new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
+ }
} else {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
}
@@ -653,6 +658,22 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages() {
consumer++) {
(consumer->second)->redeliverUnacknowledgedMessages();
}
+ unAckedMessageTrackerPtr_->clear();
+}
+
+void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set& messageIds) {
+ if (messageIds.empty()) {
+ return;
+ }
+ if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) {
+ redeliverUnacknowledgedMessages();
+ return;
+ }
+ LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer.");
+ for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
+ consumer++) {
+ (consumer->second)->redeliverUnacknowledgedMessages(messageIds);
+ }
}
int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); }
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index d190664c33a99..fa271febd4db4 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -69,6 +69,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
virtual Result pauseMessageListener();
virtual Result resumeMessageListener();
virtual void redeliverUnacknowledgedMessages();
+ virtual void redeliverUnacknowledgedMessages(const std::set& messageIds);
virtual int getNumOfPrefetchedMessages() const;
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback);
void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 03fd2d2b9ca02..0241a5439bf64 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -43,8 +43,13 @@ PartitionedConsumerImpl::PartitionedConsumerImpl(ClientImplPtr client, const std
consumerStrStream << "[Partitioned Consumer: " << topic_ << "," << subscriptionName << ","
<< numPartitions << "]";
if (conf.getUnAckedMessagesTimeoutMs() != 0) {
- unAckedMessageTrackerPtr_.reset(
- new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
+ if (conf.getTickDurationInMs() > 0) {
+ unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
+ conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this));
+ } else {
+ unAckedMessageTrackerPtr_.reset(
+ new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
+ }
} else {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
}
@@ -426,6 +431,21 @@ void PartitionedConsumerImpl::redeliverUnacknowledgedMessages() {
for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
(*i)->redeliverUnacknowledgedMessages();
}
+ unAckedMessageTrackerPtr_->clear();
+}
+
+void PartitionedConsumerImpl::redeliverUnacknowledgedMessages(const std::set& messageIds) {
+ if (messageIds.empty()) {
+ return;
+ }
+ if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) {
+ redeliverUnacknowledgedMessages();
+ return;
+ }
+ LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer.");
+ for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
+ (*i)->redeliverUnacknowledgedMessages(messageIds);
+ }
}
const std::string& PartitionedConsumerImpl::getName() const { return partitionStr_; }
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index 71b2a30442e63..fb4b04742bbae 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -64,6 +64,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
virtual Result pauseMessageListener();
virtual Result resumeMessageListener();
virtual void redeliverUnacknowledgedMessages();
+ virtual void redeliverUnacknowledgedMessages(const std::set& messageIds);
virtual const std::string& getName() const;
virtual int getNumOfPrefetchedMessages() const;
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback);
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index 2c768b2e561cd..7894e64a874f6 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -28,7 +28,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
timeoutHandlerHelper();
ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get();
timer_ = executorService->createDeadlineTimer();
- timer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs_));
+ timer_->expires_from_now(boost::posix_time::milliseconds(tickDurationInMs_));
timer_->async_wait([&](const boost::system::error_code& ec) {
if (ec) {
LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
@@ -42,86 +42,112 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
std::lock_guard acquire(lock_);
LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ "
<< consumerReference_.getName().c_str());
- if (!oldSet_.empty()) {
+
+ std::set headPartition = timePartitions.front();
+ timePartitions.pop_front();
+
+ std::set msgIdsToRedeliver;
+ if (!headPartition.empty()) {
LOG_INFO(consumerReference_.getName().c_str()
- << ": " << oldSet_.size() << " Messages were not acked within " << timeoutMs_ << " time");
- oldSet_.clear();
- currentSet_.clear();
- consumerReference_.redeliverUnacknowledgedMessages();
+ << ": " << headPartition.size() << " Messages were not acked within "
+ << timePartitions.size() * tickDurationInMs_ << " time");
+ for (auto it = headPartition.begin(); it != headPartition.end(); it++) {
+ msgIdsToRedeliver.insert(*it);
+ messageIdPartitionMap.erase(*it);
+ }
+ }
+ headPartition.clear();
+ timePartitions.push_back(headPartition);
+
+ if (msgIdsToRedeliver.size() > 0) {
+ consumerReference_.redeliverUnacknowledgedMessages(msgIdsToRedeliver);
}
- oldSet_.swap(currentSet_);
}
UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr client,
ConsumerImplBase& consumer)
: consumerReference_(consumer) {
+ UnAckedMessageTrackerEnabled(timeoutMs, timeoutMs, client, consumer);
+}
+
+UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long tickDurationInMs,
+ const ClientImplPtr client,
+ ConsumerImplBase& consumer)
+ : consumerReference_(consumer) {
timeoutMs_ = timeoutMs;
+ tickDurationInMs_ = (timeoutMs >= tickDurationInMs) ? tickDurationInMs : timeoutMs;
client_ = client;
+
+ int blankPartitions = (int)std::ceil((double)timeoutMs_ / tickDurationInMs_);
+ for (int i = 0; i < blankPartitions + 1; i++) {
+ std::set msgIds;
+ timePartitions.push_back(msgIds);
+ }
+
timeoutHandler();
}
bool UnAckedMessageTrackerEnabled::add(const MessageId& m) {
std::lock_guard acquire(lock_);
- oldSet_.erase(m);
- return currentSet_.insert(m).second;
+ if (messageIdPartitionMap.count(m) == 0) {
+ bool insert = messageIdPartitionMap.insert(std::make_pair(m, timePartitions.back())).second;
+ return insert && timePartitions.back().insert(m).second;
+ }
+ return false;
}
bool UnAckedMessageTrackerEnabled::isEmpty() {
std::lock_guard acquire(lock_);
- return oldSet_.empty() && currentSet_.empty();
+ return messageIdPartitionMap.empty();
}
bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) {
std::lock_guard acquire(lock_);
- return oldSet_.erase(m) || currentSet_.erase(m);
+ bool removed = false;
+ std::map>::iterator exist = messageIdPartitionMap.find(m);
+ if (exist != messageIdPartitionMap.end()) {
+ removed = exist->second.erase(m);
+ }
+ return removed;
}
long UnAckedMessageTrackerEnabled::size() {
std::lock_guard acquire(lock_);
- return oldSet_.size() + currentSet_.size();
+ return messageIdPartitionMap.size();
}
void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
std::lock_guard acquire(lock_);
- for (std::set::iterator it = oldSet_.begin(); it != oldSet_.end();) {
- if (*it < msgId && it->partition() == msgId.partition()) {
- oldSet_.erase(it++);
- } else {
- it++;
- }
- }
- for (std::set::iterator it = currentSet_.begin(); it != currentSet_.end();) {
- if (*it < msgId && it->partition() == msgId.partition()) {
- currentSet_.erase(it++);
- } else {
- it++;
+ for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
+ MessageId msgIdInMap = it->first;
+ if (msgIdInMap < msgId) {
+ std::map>::iterator exist = messageIdPartitionMap.find(msgId);
+ if (exist != messageIdPartitionMap.end()) {
+ exist->second.erase(msgId);
+ }
}
}
}
// this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's message.
void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic) {
- for (std::set::iterator it = oldSet_.begin(); it != oldSet_.end();) {
- const std::string& topicPartitionName = it->getTopicName();
- if (topicPartitionName.find(topic) != std::string::npos) {
- oldSet_.erase(it++);
- } else {
- it++;
- }
- }
- for (std::set::iterator it = currentSet_.begin(); it != currentSet_.end();) {
- const std::string& topicPartitionName = it->getTopicName();
- if (topicPartitionName.find(topic) != std::string::npos) {
- currentSet_.erase(it++);
- } else {
- it++;
+ std::lock_guard acquire(lock_);
+ for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
+ MessageId msgIdInMap = it->first;
+ if (msgIdInMap.getTopicName().compare(topic) == 0) {
+ std::map>::iterator exist = messageIdPartitionMap.find(msgIdInMap);
+ if (exist != messageIdPartitionMap.end()) {
+ exist->second.erase(msgIdInMap);
+ }
}
}
}
void UnAckedMessageTrackerEnabled::clear() {
- currentSet_.clear();
- oldSet_.clear();
+ messageIdPartitionMap.clear();
+ for (auto it = timePartitions.begin(); it != timePartitions.end(); it++) {
+ it->clear();
+ }
}
UnAckedMessageTrackerEnabled::~UnAckedMessageTrackerEnabled() {
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
index 921e74725d0ed..c2b4012adb184 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
@@ -28,6 +28,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
public:
~UnAckedMessageTrackerEnabled();
UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr, ConsumerImplBase&);
+ UnAckedMessageTrackerEnabled(long timeoutMs, long tickDuration, const ClientImplPtr, ConsumerImplBase&);
bool add(const MessageId& m);
bool remove(const MessageId& m);
void removeMessagesTill(const MessageId& msgId);
@@ -40,13 +41,14 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
void timeoutHandlerHelper();
bool isEmpty();
long size();
- std::set currentSet_;
- std::set oldSet_;
+ std::map> messageIdPartitionMap;
+ std::deque> timePartitions;
std::mutex lock_;
DeadlineTimerPtr timer_;
ConsumerImplBase& consumerReference_;
ClientImplPtr client_;
long timeoutMs_;
+ long tickDurationInMs_;
};
} // namespace pulsar