Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make SlotImpl detachable from its owner, and add a new runOnAllThreads interface to Slot. #8135

Merged
merged 15 commits into from
Sep 11, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions include/envoy/thread_local/thread_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ class Slot {
*/
using InitializeCb = std::function<ThreadLocalObjectSharedPtr(Event::Dispatcher& dispatcher)>;
virtual void set(InitializeCb cb) PURE;

/**
* UpdateCb takes the current stored data, and returns an updated/new version data.
* TLS will run the callback and replace the stored data with the returned value *in each thread*.
*
* NOTE: The update callback is not supposed to capture the Slot, or its owner. As the owner may
* be destructed in main thread before the update_cb gets called in a worker thread.
**/
using UpdateCb = std::function<ThreadLocalObjectSharedPtr(ThreadLocalObjectSharedPtr)>;
virtual void runOnAllThreads(const UpdateCb& update_cb) PURE;
virtual void runOnAllThreads(const UpdateCb& update_cb, Event::PostCb complete_cb) PURE;
};

using SlotPtr = std::unique_ptr<Slot>;
Expand Down
13 changes: 9 additions & 4 deletions source/common/common/non_copyable.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@

namespace Envoy {
/**
* Mixin class that makes derived classes not copyable. Like boost::noncopyable without boost.
* Mixin class that makes derived classes not copyable and not moveable. Like boost::noncopyable
* without boost.
*/
class NonCopyable {
protected:
NonCopyable() = default;

private:
NonCopyable(const NonCopyable&);
NonCopyable& operator=(const NonCopyable&);
// Non-moveable.
NonCopyable(NonCopyable&&) noexcept = delete;
NonCopyable& operator=(NonCopyable&&) noexcept = delete;

// Non-copyable.
NonCopyable(const NonCopyable&) = delete;
NonCopyable& operator=(const NonCopyable&) = delete;
};
} // namespace Envoy
10 changes: 10 additions & 0 deletions source/common/config/config_provider_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ ConfigSubscriptionCommonBase::~ConfigSubscriptionCommonBase() {
init_target_.ready();
config_provider_manager_.unbindSubscription(manager_identifier_);
}

void ConfigSubscriptionCommonBase::applyConfigUpdate(const ConfigUpdateCb& update_fn) {
tls_->runOnAllThreads([update_fn](ThreadLocal::ThreadLocalObjectSharedPtr previous)
-> ThreadLocal::ThreadLocalObjectSharedPtr {
auto prev_thread_local_config = std::dynamic_pointer_cast<ThreadLocalConfig>(previous);
prev_thread_local_config->config_ = update_fn(prev_thread_local_config->config_);
return previous;
});
}

bool ConfigSubscriptionInstance::checkAndApplyConfigUpdate(const Protobuf::Message& config_proto,
const std::string& config_name,
const std::string& version_info) {
Expand Down
13 changes: 1 addition & 12 deletions source/common/config/config_provider_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,19 +216,8 @@ class ConfigSubscriptionCommonBase : protected Logger::Loggable<Logger::Id::conf
*
* @param update_fn the callback to run on each thread, it takes the previous version Config and
* returns a updated/new version Config.
* @param complete_cb the callback to run when the update propagation is done.
*/
void applyConfigUpdate(
const ConfigUpdateCb& update_fn, const Event::PostCb& complete_cb = []() {}) {
tls_->runOnAllThreads(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove tls_ as a member?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I follow your question, may I ask why?
tls_ is per subscription in config-provider framework, depends on config type (delta/non-delta), it creates per-worker/x-worker-shared ConfigImpl, which is shared among providers.

[this, update_fn]() {
// NOTE: there is a known race condition between *this* subscription being teared down in
// main thread and the posted callback being executed before the destruction. See more
// details in https://github.com/envoyproxy/envoy/issues/7902
tls_->getTyped<ThreadLocalConfig>().config_ = update_fn(getConfig());
},
complete_cb);
}
void applyConfigUpdate(const ConfigUpdateCb& update_fn);

void setLastUpdated() { last_updated_ = time_source_.systemTime(); }

Expand Down
8 changes: 6 additions & 2 deletions source/common/router/rds_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,12 @@ Router::ConfigConstSharedPtr RdsRouteConfigProviderImpl::config() {
void RdsRouteConfigProviderImpl::onConfigUpdate() {
ConfigConstSharedPtr new_config(
new ConfigImpl(config_update_info_->routeConfiguration(), factory_context_, false));
tls_->runOnAllThreads(
[this, new_config]() -> void { tls_->getTyped<ThreadLocalConfig>().config_ = new_config; });
tls_->runOnAllThreads([new_config](ThreadLocal::ThreadLocalObjectSharedPtr previous)
-> ThreadLocal::ThreadLocalObjectSharedPtr {
auto prev_config = std::dynamic_pointer_cast<ThreadLocalConfig>(previous);
prev_config->config_ = new_config;
return previous;
});
}

void RdsRouteConfigProviderImpl::validateConfig(
Expand Down
87 changes: 85 additions & 2 deletions source/common/thread_local/thread_local_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ SlotPtr InstanceImpl::allocateSlot() {

if (free_slot_indexes_.empty()) {
std::unique_ptr<SlotImpl> slot(new SlotImpl(*this, slots_.size()));
slots_.push_back(slot.get());
return slot;
auto wrapper = std::make_unique<Bookkeeper>(*this, std::move(slot));
slots_.push_back(&wrapper->slot());
return wrapper;
}
const uint32_t idx = free_slot_indexes_.front();
free_slot_indexes_.pop_front();
Expand All @@ -42,11 +43,59 @@ bool InstanceImpl::SlotImpl::currentThreadRegistered() {
return thread_local_data_.data_.size() > index_;
}

void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) {
parent_.runOnAllThreads([this, cb]() { setThreadLocal(index_, cb(get())); });
}

void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) {
parent_.runOnAllThreads([this, cb]() { setThreadLocal(index_, cb(get())); }, complete_cb);
}

ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() {
ASSERT(currentThreadRegistered());
return thread_local_data_.data_[index_];
}

InstanceImpl::Bookkeeper::Bookkeeper(InstanceImpl& parent, std::unique_ptr<SlotImpl>&& slot)
: parent_(parent), holder_(std::make_unique<SlotHolder>(std::move(slot))) {}

ThreadLocalObjectSharedPtr InstanceImpl::Bookkeeper::get() { return slot().get(); }

void InstanceImpl::Bookkeeper::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) {
mattklein123 marked this conversation as resolved.
Show resolved Hide resolved
slot().runOnAllThreads(
[cb, ref_count = holder_->ref_count_](ThreadLocalObjectSharedPtr previous) {
return cb(std::move(previous));
},
complete_cb);
}

void InstanceImpl::Bookkeeper::runOnAllThreads(const UpdateCb& cb) {
slot().runOnAllThreads(
[cb, ref_count = holder_->ref_count_](ThreadLocalObjectSharedPtr previous) {
return cb(std::move(previous));
});
}

bool InstanceImpl::Bookkeeper::currentThreadRegistered() {
return slot().currentThreadRegistered();
}

void InstanceImpl::Bookkeeper::runOnAllThreads(Event::PostCb cb) {
// Use holder_.ref_count_ to bookkeep how many on-the-fly callback are out there.
slot().runOnAllThreads([cb, ref_count = holder_->ref_count_]() { cb(); });
}

void InstanceImpl::Bookkeeper::runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) {
// Use holder_.ref_count_ to bookkeep how many on-the-fly callback are out there.
slot().runOnAllThreads([cb, main_callback, ref_count = holder_->ref_count_]() { cb(); },
main_callback);
}

void InstanceImpl::Bookkeeper::set(InitializeCb cb) {
slot().set([cb, ref_count = holder_->ref_count_](Event::Dispatcher& dispatcher)
-> ThreadLocalObjectSharedPtr { return cb(dispatcher); });
}

void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(!shutdown_);
Expand All @@ -61,6 +110,40 @@ void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_threa
}
}

// Deletes a Slot if it's recycleable(no on-the-fly callbacks points to it), otherwise puts it into
// a deferred delete queue, and schedules a cleanup task to collect the Slot once it's recycleable.
void InstanceImpl::recycle(std::unique_ptr<SlotHolder>&& holder) {
if (holder->isRecycleable()) {
holder.reset();
return;
}
deferred_deletes_.emplace_back(std::move(holder));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing tracks when this holder is eventually recyclable, right? Meaning, in this case it will stick around the deferred delete list until something else gets scheduled for cleanup? (Since presumably this schedule cleanup can race with the thing actually being recyclable). Maybe this is fine? Or maybe you should be checking for use count on the worker threads and scheduling cleanup if its detached and ready to be recycled (the last one)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's tracked:

  1. A cleanup task is scheduled when we put a new item into the deferred delete queue.
  2. The cleanup task will cleanup recyclable items from the Q, and reschedule another cleanup task(the recursion) if there are any non-recyclable ones.

So once an item is put into the queue, the above event-loop (1->2->2->2->....) will guarantee the items been cleaned up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stepping back though, can you refresh my memory on why this complexity is needed vs. just cancelling any potential updates before we start listener shutdown which I think would be serialized correctly? Thank you!

/wait

I think the canceling/waiting approach requires more changes to the code base. Basically, if we want to be lock free, we'd have to split the LDS update (or Server shutdown) into 2 phases:

  1. set an flag or broadcast canceling signals to workers.
  2. Wait to be signaled that the 'shutdown' message has been received by everyone.

In the meantime, if it's a LDS update caused ListenerImpl teardown, we have to bookkeep all the TLS resources owned by "this" ListenerImpl, and only wait on them to be cleaned up, as a RDS subscription (or any other subscription is shared between Listeners).

On the other hand, this change limit changes to tls module only, least disturbance to the code base.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, if we want to be lock free, we'd have to split the LDS update (or Server shutdown) into 2 phases

To be clear, this is how it already works. See ListenerManagerImpl::drainListener which already handles draining and stopping a listener, including posting a completion back and forth. Assuming all xDS updates are stopped before removal (or drain) by definition there can be no destruction race condition. What I'm trying to get at is how hard is it really to just shut down LDS/RDS/etc. updates for a listener on the main thread before doing this? Naively it doesn't seem that hard.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I talked to @stevenzzzz offline and we are going to time box trying to implement explicit shutdown of resources prior to listener shutdown. This might be accomplished via the init manager or some similar structure. If it ends up being too hard or has complications we can come back to this.

/wait

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be in favor of not repeating something like init manager. Init manager is has turned out tobe hard to reason about, with a maze of callbacks, watchers, registration conditions, lifetime considerations and so on. History speaks for itself with all the bugs and races on this one.

I think there is a good argument to be made that the solution in this PR has benefits in that it simplifies the contract around slot ownership, e.g. the runOnAllThreads() gets the threads out of the business of worrying about tls_ slow ownership and lifetime. It also centralizes all the complexity inside of the SlotImpl, whereas init manager style solutions diffuse this complexity.

So, my $0.02 is that something like the solution in this PR is a nice direction; basically getting away from having threads worrying about TLS slots directly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing out another idea that has come up when chatting with @stevenzzzz; we could have a centralized (owned by Server::Instance) TLS slot manager. This will be the actual owner of the memory backing all slots and it lives forever. It would hand out slot handles and then have them returned via RAII.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, my $0.02 is that something like the solution in this PR is a nice direction; basically getting away from having threads worrying about TLS slots directly.

I think with this solution there are going to be potentially hard to reason about thread interleavings, and I have no doubt we will have a different set of issues from this direction, though I agree it's a nice solution and it will work in the end. My reason for proposing the use of the existing init manager system is that it already exists. We must register things into the init manager, and I think it would be relatively trivial to also register a shutdown hook of some type at the same time.

With that said, I don't feel that strongly about it, so if you both want to continue with this solution we can do that.

we could have a centralized (owned by Server::Instance) TLS slot manager.

This is the way it already works?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good discussion here.
Personally I am not a fan of the init-manager mechanism. It's essentially a barrier that's passed around and hard to track what's been barriered and what's(should) not.

The other problem I want to call out here is that no matter which way we go, we need to let contributors know about this problem, and we should not distributed (by capturing some shared_ptr) ownership of a tls slot or its owner to worker threads.
That has the side effect of destructing the delicate dependency graph of objects in ServerImpl and cause crash during server shutdown (IIUC, per my experience though we drain the libevent message queue when shutdown a worker, we don't prevent new callbacks been posted to Dispatcher's callback queue). An example would be post a
a callback which captures a shared_ptr to workers. On destruction, The singletonManager is destructed first, and then the RdsConfigSubscription owned by RdsRouteConfigProvider will try to remove it self from the destructed RdsConfigProviderManager.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other problem I want to call out here is that no matter which way we go, we need to let contributors know about this problem, and we should not distributed (by capturing some shared_ptr) ownership of a tls slot or its owner to worker threads.

This is my point exactly. Although I agree Init/ShutdownManager is problematic in certain ways, its extremely explicit on what needs to happen. Basically, don't do any more updates. We are trading that for rules around how this can be used which are hard to understand and enforce. With that, I will stop there. @htuch I assume you want to move forward with this solution?

scheduleCleanup();
}

// Cleans up the deferred deletes queue.
// Removes all the items with ref count 1, and reschedule another cleanup task if there are left
// un-recycleable items.
// This way ensures that, unless server shuts down, an enqueued item will be
// deleted eventually.
void InstanceImpl::scheduleCleanup() {
ASSERT(main_thread_dispatcher_ != nullptr);
if (shutdown_) {
return;
}
if (deferred_deletes_.empty()) {
return;
}
main_thread_dispatcher_->post([this]() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this ought to be avoidable, but perhaps I'm missing something. We're using ref_count_ not as an integer, but just to have an object with a use count right?

If we made refcount_ a class, and had Bookkeeper have a reference to it (along with runOnAllThreads calls), and the refcount destructor did a dispatcher post(delete)) couldn't we avoid this extra layer of clean up complexity? We'd still delete when both the Bookkeeper and all thread references were gone, and we'd still delete in the original thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the problem with "posting back to main-thread when the propagation is done", there again is this race-window, where the posted callback in main-thread may gets run first, and then the "copy of lambda" in the worker thread may held the last reference of the refcount_.

IMHO, We should also avoid to move the ownership of a slotImpl or its owner, by posting a callback/complete_cb which shares a portion of the object as well. That may cause complication when ServerImpl tears down, I went into a crash with that, where I posted shared_ptr back to main thread dispatcher queue, during shutdown, the SingletonManager is teared down ahead of the DispatcherImpl in ServerImpl, which includes the RouteConfigProviderManager that's referenced by the RdsRouteCnofigProvider in the dispatcher-msg-queue.

The Bookeeper is inspiring WRT extending the ability of Slot.

I am not sure if the SlotHolder is too complex since it is kind of re-inventing shared_ptr. What's more the Bookeeper combining with SlotHolder is duplicating the deleter of shared_ptr.

My understanding of the bookkeeper is to track

  • how many runOnAllThread on the fly
  • The last of Bookkeeper destruction and runOnallThread should destruct the real slot

I propose to replace SlotHolder by shared_ptr, and bookeeper should
1 ref the shared_ptr until destruction (shared_ptr as a member)
2 increase the ref of shared_ptr at runOnAllThread (no more complex than capture it)
3. choose one from 3.1 and 3.2
3.1 BookKeeper set the deleter of the shared_ptr to moving to defer delete queue
3.2 rewrite the BookKeeper so that runOnAllThread enforce a oncomplete_cb to decrease the shared_ptr on main thread. See https://github.com/envoyproxy/envoy/pull/7011/files . 3.2 doesn't even need a defer delete queue.

My proposal not decrease the complexity of SlotHolder.
With 3.2 Bookkeeper::runOnAllThread could cut the overhead and the complexity of operation on deferred deletion.

WDYT?

The Bookeeper is inspiring WRT extending the ability of Slot.

I am not sure if the SlotHolder is too complex since it is kind of re-inventing shared_ptr. What's more the Bookeeper combining with SlotHolder is duplicating the deleter of shared_ptr.

My understanding of the bookkeeper is to track

  • how many runOnAllThread on the fly
  • The last of Bookkeeper destruction and runOnallThread should destruct the real slot

I propose to replace SlotHolder by shared_ptr, and bookeeper should
1 ref the shared_ptr until destruction (shared_ptr as a member)
2 increase the ref of shared_ptr at runOnAllThread (no more complex than capture it)
3. choose one from 3.1 and 3.2
3.1 BookKeeper set the deleter of the shared_ptr to moving to defer delete queue
3.2 rewrite the BookKeeper so that runOnAllThread enforce a oncomplete_cb to decrease the shared_ptr on main thread. See https://github.com/envoyproxy/envoy/pull/7011/files . 3.2 doesn't even need a defer delete queue.

My proposal not decrease the complexity of SlotHolder.
With 3.2 Bookkeeper::runOnAllThread could cut the overhead and the complexity of operation on deferred deletion.

WDYT?

Yeah, SlotHolder is basically the obj that holds a slot and its outgoing ref-count. I made a class instead using a shared pointer as I thought it may be more "readable". the ferre

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, SingletonManager early destruction. With this restriction I don't have a good opinion...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked w/ Alyssa offline, get the idea now. I believe this matches what lamndai suggested as well.
The "schedule a cleanup callback" logic is removed and each Slot Bookkeeper fires a cleanup callback on the ref-count's destruction.

deferred_deletes_.remove_if(
[](std::unique_ptr<SlotHolder>& holder) -> bool { return holder->isRecycleable(); });
if (!deferred_deletes_.empty()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer a loop vs. recursion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me update the misleading comment.
This method will reschedule another cleanup task in the main thread dispatcher Q. Not exactly recursive.
We can't use a loop here as it basically means a lock/wait mechanism (on the ongoing update callbacks been finished).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks this makes more sense now. Can you add a bunch more comments inside recycle and this function to describe what is going on and the various interleavings?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping on adding more comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, missed this.
I added some more comments around the function header. and also recycle.

// Reschedule another cleanup task if there are still non-recyclable slots.
scheduleCleanup();
}
});
}

void InstanceImpl::removeSlot(SlotImpl& slot) {
ASSERT(std::this_thread::get_id() == main_thread_id_);

Expand Down
43 changes: 42 additions & 1 deletion source/common/thread_local/thread_local_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
#include "envoy/thread_local/thread_local.h"

#include "common/common/logger.h"
#include "common/common/non_copyable.h"

namespace Envoy {
namespace ThreadLocal {

/**
* Implementation of ThreadLocal that relies on static thread_local objects.
*/
class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
class InstanceImpl : Logger::Loggable<Logger::Id::main>, public NonCopyable, public Instance {
public:
InstanceImpl() : main_thread_id_(std::this_thread::get_id()) {}
~InstanceImpl() override;
Expand All @@ -35,6 +36,8 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
// ThreadLocal::Slot
ThreadLocalObjectSharedPtr get() override;
bool currentThreadRegistered() override;
void runOnAllThreads(const UpdateCb& cb) override;
void runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) override;
void runOnAllThreads(Event::PostCb cb) override { parent_.runOnAllThreads(cb); }
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override {
parent_.runOnAllThreads(cb, main_callback);
Expand All @@ -45,17 +48,55 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
const uint64_t index_;
};

// A helper class for holding a SlotImpl and its bookkeeping shared_ptr which counts the number of
// update callbacks on-the-fly.
struct SlotHolder {
SlotHolder(std::unique_ptr<SlotImpl>&& slot) : slot_(std::move(slot)) {}
bool isRecycleable() { return ref_count_.use_count() == 1; }

const std::unique_ptr<SlotImpl> slot_;
const std::shared_ptr<int> ref_count_{new int(0)};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shared_ptr as ref_count reminds me of the pattern I eliminated in envoy before. See #7011

};

// A Wrapper of SlotImpl which on destruction returns the SlotImpl to the deferred delete queue
// (detaches it).
struct Bookkeeper : public Slot {
Bookkeeper(InstanceImpl& parent, std::unique_ptr<SlotImpl>&& slot);
~Bookkeeper() override { parent_.recycle(std::move(holder_)); }
SlotImpl& slot() { return *(holder_->slot_); }

// ThreadLocal::Slot
ThreadLocalObjectSharedPtr get() override;
void runOnAllThreads(const UpdateCb& cb) override;
void runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) override;
bool currentThreadRegistered() override;
void runOnAllThreads(Event::PostCb cb) override;
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override;
void set(InitializeCb cb) override;

InstanceImpl& parent_;
std::unique_ptr<SlotHolder> holder_;
};

struct ThreadLocalData {
Event::Dispatcher* dispatcher_{};
std::vector<ThreadLocalObjectSharedPtr> data_;
};

void recycle(std::unique_ptr<SlotHolder>&& holder);
// Cleanup the deferred deletes queue.
void scheduleCleanup();
void removeSlot(SlotImpl& slot);
void runOnAllThreads(Event::PostCb cb);
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback);
static void setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object);

static thread_local ThreadLocalData thread_local_data_;

// A queue for Slots that has to be deferred to delete due to out-going callbacks
// pointing to the Slot.
std::list<std::unique_ptr<SlotHolder>> deferred_deletes_;

std::vector<SlotImpl*> slots_;
// A list of index of freed slots.
std::list<uint32_t> free_slot_indexes_;
Expand Down
34 changes: 34 additions & 0 deletions test/common/thread_local/thread_local_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,40 @@ TEST_F(ThreadLocalInstanceImplTest, All) {
tls_.shutdownThread();
}

// Test that the config passed into the update callback is the previous version stored in the slot.
TEST_F(ThreadLocalInstanceImplTest, UpdateCallback) {
InSequence s;

SlotPtr slot = tls_.allocateSlot();

auto newer_version = std::make_shared<TestThreadLocalObject>();
bool update_called = false;

TestThreadLocalObject& object_ref = setObject(*slot);
auto update_cb = [&object_ref, &update_called,
newer_version](ThreadLocalObjectSharedPtr obj) -> ThreadLocalObjectSharedPtr {
// The unit test setup have two dispatchers registered, but only one thread, this lambda will be
// called twice in the same thread.
if (!update_called) {
EXPECT_EQ(obj.get(), &object_ref);
update_called = true;
} else {
EXPECT_EQ(obj.get(), newer_version.get());
}

return newer_version;
};
EXPECT_CALL(thread_dispatcher_, post(_));
EXPECT_CALL(object_ref, onDestroy());
EXPECT_CALL(*newer_version, onDestroy());
slot->runOnAllThreads(update_cb);

EXPECT_EQ(newer_version.get(), &slot->getTyped<TestThreadLocalObject>());

tls_.shutdownGlobalThreading();
tls_.shutdownThread();
}

// TODO(ramaraochavali): Run this test with real threads. The current issue in the unit
// testing environment is, the post to main_dispatcher is not working as expected.

Expand Down
8 changes: 8 additions & 0 deletions test/mocks/thread_local/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ class MockInstance : public Instance {
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override {
parent_.runOnAllThreads(cb, main_callback);
}
void runOnAllThreads(const UpdateCb& cb) override {
parent_.runOnAllThreads([cb, this]() { parent_.data_[index_] = cb(parent_.data_[index_]); });
}
void runOnAllThreads(const UpdateCb& cb, Event::PostCb main_callback) override {
parent_.runOnAllThreads([cb, this]() { parent_.data_[index_] = cb(parent_.data_[index_]); },
main_callback);
}

void set(InitializeCb cb) override { parent_.data_[index_] = cb(parent_.dispatcher_); }

MockInstance& parent_;
Expand Down