Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 6168] Fix Unacked Message Tracker by Using Time Partition on C++ #6391

Merged
merged 7 commits into from
Mar 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
long getUnAckedMessagesTimeoutMs() const;

void setTickDurationInMs(const uint64_t milliSeconds);

long getTickDurationInMs() const;

/**
* Set the delay to wait before re-delivering messages that have failed to be process.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,4 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_property(pulsar_producer_co

#ifdef __cplusplus
}
#endif
#endif
6 changes: 6 additions & 0 deletions pulsar-client-cpp/lib/ConsumerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeco
impl_->unAckedMessagesTimeoutMs = milliSeconds;
}

long ConsumerConfiguration::getTickDurationInMs() const { return impl_->tickDurationInMs; }

void ConsumerConfiguration::setTickDurationInMs(const uint64_t milliSeconds) {
impl_->tickDurationInMs = milliSeconds;
}

void ConsumerConfiguration::setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis) {
impl_->negativeAckRedeliveryDelayMs = redeliveryDelayMillis;
}
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace pulsar {
struct ConsumerConfigurationImpl {
SchemaInfo schemaInfo;
long unAckedMessagesTimeoutMs;
long tickDurationInMs;

long negativeAckRedeliveryDelayMs;
ConsumerType consumerType;
Expand All @@ -45,6 +46,7 @@ struct ConsumerConfigurationImpl {
ConsumerConfigurationImpl()
: schemaInfo(),
unAckedMessagesTimeoutMs(0),
tickDurationInMs(1000),
negativeAckRedeliveryDelayMs(60000),
consumerType(ConsumerExclusive),
messageListener(),
Expand Down
21 changes: 19 additions & 2 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] ";
consumerStr_ = consumerStrStream.str();
if (conf.getUnAckedMessagesTimeoutMs() != 0) {
unAckedMessageTrackerPtr_.reset(
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
if (conf.getTickDurationInMs() > 0) {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this));
} else {
unAckedMessageTrackerPtr_.reset(
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
}
} else {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
}
Expand Down Expand Up @@ -953,6 +958,18 @@ Result ConsumerImpl::resumeMessageListener() {
void ConsumerImpl::redeliverUnacknowledgedMessages() {
static std::set<MessageId> emptySet;
redeliverMessages(emptySet);
unAckedMessageTrackerPtr_->clear();
}

void ConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) {
if (messageIds.empty()) {
return;
}
if (config_.getConsumerType() != ConsumerShared && config_.getConsumerType() != ConsumerKeyShared) {
redeliverUnacknowledgedMessages();
return;
}
redeliverMessages(messageIds);
}

void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) {
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class ConsumerImpl : public ConsumerImplBase,
virtual bool isCumulativeAcknowledgementAllowed(ConsumerType consumerType);

virtual void redeliverMessages(const std::set<MessageId>& messageIds);
virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds);
virtual void negativeAcknowledge(const MessageId& msgId);

virtual void closeAsync(ResultCallback callback);
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ConsumerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ConsumerImplBase {
virtual Result pauseMessageListener() = 0;
virtual Result resumeMessageListener() = 0;
virtual void redeliverUnacknowledgedMessages() = 0;
virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) = 0;
virtual const std::string& getName() const = 0;
virtual int getNumOfPrefetchedMessages() const = 0;
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/LogUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,4 @@ std::string LogUtils::getLoggerName(const std::string& path) {
return path.substr(startIdx + 1, endIdx - startIdx - 1);
}

} // namespace pulsar
} // namespace pulsar
25 changes: 23 additions & 2 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
consumerStr_ = consumerStrStream.str();

if (conf.getUnAckedMessagesTimeoutMs() != 0) {
unAckedMessageTrackerPtr_.reset(
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
if (conf.getTickDurationInMs() > 0) {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this));
} else {
unAckedMessageTrackerPtr_.reset(
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
}
} else {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
}
Expand Down Expand Up @@ -653,6 +658,22 @@ void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages() {
consumer++) {
(consumer->second)->redeliverUnacknowledgedMessages();
}
unAckedMessageTrackerPtr_->clear();
}

void MultiTopicsConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) {
if (messageIds.empty()) {
return;
}
if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) {
redeliverUnacknowledgedMessages();
return;
}
LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer.");
for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
consumer++) {
(consumer->second)->redeliverUnacknowledgedMessages(messageIds);
}
}

int MultiTopicsConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); }
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
virtual Result pauseMessageListener();
virtual Result resumeMessageListener();
virtual void redeliverUnacknowledgedMessages();
virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds);
virtual int getNumOfPrefetchedMessages() const;
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback);
void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,
Expand Down
24 changes: 22 additions & 2 deletions pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,13 @@ PartitionedConsumerImpl::PartitionedConsumerImpl(ClientImplPtr client, const std
consumerStrStream << "[Partitioned Consumer: " << topic_ << "," << subscriptionName << ","
<< numPartitions << "]";
if (conf.getUnAckedMessagesTimeoutMs() != 0) {
unAckedMessageTrackerPtr_.reset(
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
if (conf.getTickDurationInMs() > 0) {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this));
} else {
unAckedMessageTrackerPtr_.reset(
new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
}
} else {
unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
}
Expand Down Expand Up @@ -426,6 +431,21 @@ void PartitionedConsumerImpl::redeliverUnacknowledgedMessages() {
for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
(*i)->redeliverUnacknowledgedMessages();
}
unAckedMessageTrackerPtr_->clear();
}

void PartitionedConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) {
if (messageIds.empty()) {
return;
}
if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) {
redeliverUnacknowledgedMessages();
return;
}
LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer.");
for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
(*i)->redeliverUnacknowledgedMessages(messageIds);
}
}

const std::string& PartitionedConsumerImpl::getName() const { return partitionStr_; }
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/PartitionedConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
virtual Result pauseMessageListener();
virtual Result resumeMessageListener();
virtual void redeliverUnacknowledgedMessages();
virtual void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds);
virtual const std::string& getName() const;
virtual int getNumOfPrefetchedMessages() const;
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback);
Expand Down
106 changes: 66 additions & 40 deletions pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
timeoutHandlerHelper();
ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get();
timer_ = executorService->createDeadlineTimer();
timer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs_));
timer_->expires_from_now(boost::posix_time::milliseconds(tickDurationInMs_));
timer_->async_wait([&](const boost::system::error_code& ec) {
if (ec) {
LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
Expand All @@ -42,86 +42,112 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
std::lock_guard<std::mutex> acquire(lock_);
LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ "
<< consumerReference_.getName().c_str());
if (!oldSet_.empty()) {

std::set<MessageId> headPartition = timePartitions.front();
timePartitions.pop_front();

std::set<MessageId> msgIdsToRedeliver;
if (!headPartition.empty()) {
LOG_INFO(consumerReference_.getName().c_str()
<< ": " << oldSet_.size() << " Messages were not acked within " << timeoutMs_ << " time");
oldSet_.clear();
currentSet_.clear();
consumerReference_.redeliverUnacknowledgedMessages();
<< ": " << headPartition.size() << " Messages were not acked within "
<< timePartitions.size() * tickDurationInMs_ << " time");
for (auto it = headPartition.begin(); it != headPartition.end(); it++) {
msgIdsToRedeliver.insert(*it);
messageIdPartitionMap.erase(*it);
}
}
headPartition.clear();
timePartitions.push_back(headPartition);

if (msgIdsToRedeliver.size() > 0) {
consumerReference_.redeliverUnacknowledgedMessages(msgIdsToRedeliver);
}
oldSet_.swap(currentSet_);
}

UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr client,
ConsumerImplBase& consumer)
: consumerReference_(consumer) {
UnAckedMessageTrackerEnabled(timeoutMs, timeoutMs, client, consumer);
}

UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long tickDurationInMs,
const ClientImplPtr client,
ConsumerImplBase& consumer)
: consumerReference_(consumer) {
timeoutMs_ = timeoutMs;
tickDurationInMs_ = (timeoutMs >= tickDurationInMs) ? tickDurationInMs : timeoutMs;
client_ = client;

int blankPartitions = (int)std::ceil((double)timeoutMs_ / tickDurationInMs_);
for (int i = 0; i < blankPartitions + 1; i++) {
std::set<MessageId> msgIds;
timePartitions.push_back(msgIds);
}

timeoutHandler();
}

bool UnAckedMessageTrackerEnabled::add(const MessageId& m) {
std::lock_guard<std::mutex> acquire(lock_);
oldSet_.erase(m);
return currentSet_.insert(m).second;
if (messageIdPartitionMap.count(m) == 0) {
bool insert = messageIdPartitionMap.insert(std::make_pair(m, timePartitions.back())).second;
return insert && timePartitions.back().insert(m).second;
}
return false;
}

bool UnAckedMessageTrackerEnabled::isEmpty() {
std::lock_guard<std::mutex> acquire(lock_);
return oldSet_.empty() && currentSet_.empty();
return messageIdPartitionMap.empty();
}

bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) {
std::lock_guard<std::mutex> acquire(lock_);
return oldSet_.erase(m) || currentSet_.erase(m);
bool removed = false;
std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(m);
if (exist != messageIdPartitionMap.end()) {
removed = exist->second.erase(m);
}
return removed;
}

long UnAckedMessageTrackerEnabled::size() {
std::lock_guard<std::mutex> acquire(lock_);
return oldSet_.size() + currentSet_.size();
return messageIdPartitionMap.size();
}

void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
std::lock_guard<std::mutex> acquire(lock_);
for (std::set<MessageId>::iterator it = oldSet_.begin(); it != oldSet_.end();) {
if (*it < msgId && it->partition() == msgId.partition()) {
oldSet_.erase(it++);
} else {
it++;
}
}
for (std::set<MessageId>::iterator it = currentSet_.begin(); it != currentSet_.end();) {
if (*it < msgId && it->partition() == msgId.partition()) {
currentSet_.erase(it++);
} else {
it++;
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
MessageId msgIdInMap = it->first;
if (msgIdInMap < msgId) {
std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(msgId);
if (exist != messageIdPartitionMap.end()) {
exist->second.erase(msgId);
}
}
}
}

// this is only for MultiTopicsConsumerImpl, when un-subscribe a single topic, should remove all it's message.
void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic) {
for (std::set<MessageId>::iterator it = oldSet_.begin(); it != oldSet_.end();) {
const std::string& topicPartitionName = it->getTopicName();
if (topicPartitionName.find(topic) != std::string::npos) {
oldSet_.erase(it++);
} else {
it++;
}
}
for (std::set<MessageId>::iterator it = currentSet_.begin(); it != currentSet_.end();) {
const std::string& topicPartitionName = it->getTopicName();
if (topicPartitionName.find(topic) != std::string::npos) {
currentSet_.erase(it++);
} else {
it++;
std::lock_guard<std::mutex> acquire(lock_);
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
MessageId msgIdInMap = it->first;
if (msgIdInMap.getTopicName().compare(topic) == 0) {
std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(msgIdInMap);
if (exist != messageIdPartitionMap.end()) {
exist->second.erase(msgIdInMap);
}
}
}
}

void UnAckedMessageTrackerEnabled::clear() {
currentSet_.clear();
oldSet_.clear();
messageIdPartitionMap.clear();
for (auto it = timePartitions.begin(); it != timePartitions.end(); it++) {
it->clear();
}
}

UnAckedMessageTrackerEnabled::~UnAckedMessageTrackerEnabled() {
Expand Down
6 changes: 4 additions & 2 deletions pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
public:
~UnAckedMessageTrackerEnabled();
UnAckedMessageTrackerEnabled(long timeoutMs, const ClientImplPtr, ConsumerImplBase&);
UnAckedMessageTrackerEnabled(long timeoutMs, long tickDuration, const ClientImplPtr, ConsumerImplBase&);
bool add(const MessageId& m);
bool remove(const MessageId& m);
void removeMessagesTill(const MessageId& msgId);
Expand All @@ -40,13 +41,14 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
void timeoutHandlerHelper();
bool isEmpty();
long size();
std::set<MessageId> currentSet_;
std::set<MessageId> oldSet_;
std::map<MessageId, std::set<MessageId>> messageIdPartitionMap;
std::deque<std::set<MessageId>> timePartitions;
std::mutex lock_;
DeadlineTimerPtr timer_;
ConsumerImplBase& consumerReference_;
ClientImplPtr client_;
long timeoutMs_;
long tickDurationInMs_;
};
} // namespace pulsar

Expand Down