Skip to content

Commit

Permalink
config: make SlotImpl detachable from its owner, and add a new runOnA…
Browse files Browse the repository at this point in the history
…llThreads interface to Slot. (#8135)

See the issue in #7902, this PR is to make the SlotImpl detachable from its owner, by introducing a Booker object wraps around a SlotImpl, which bookkeeps all the on-the-fly update callbacks. And on its destruction, if there are still on-the-fly callbacks, move the SlotImpl to an deferred-delete queue, instead of destructing the SlotImpl which may cause an SEGV error.

More importantly, introduce a new runOnAllThreads(ThreadLocal::UpdateCb cb) API to Slot, which requests a Slot Owner to not assume that the Slot or its owner will out-live (in Main thread) the fired on-the-fly update callbacks, and should not capture the Slot or its owner in the update_cb.

Picked RDS and config-providers-framework as examples to demonstrate that this change works. {i.e., changed from the runOnAllThreads(Event::PostCb) to the new runOnAllThreads(TLS::UpdateCb) interface. }

Risk Level: Medium
Testing: unit test
Docs Changes: N/A
Release Notes: N/A
[Optional Fixes #Issue] #7902

Signed-off-by: Xin Zhuang <stevenzzz@google.com>
  • Loading branch information
stevenzzzz authored and alyssawilk committed Sep 11, 2019
1 parent 0ee3cc3 commit c5ffdda
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 21 deletions.
11 changes: 11 additions & 0 deletions include/envoy/thread_local/thread_local.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ class Slot {
*/
using InitializeCb = std::function<ThreadLocalObjectSharedPtr(Event::Dispatcher& dispatcher)>;
virtual void set(InitializeCb cb) PURE;

/**
* UpdateCb takes the current stored data, and returns an updated/new version data.
* TLS will run the callback and replace the stored data with the returned value *in each thread*.
*
* NOTE: The update callback is not supposed to capture the Slot, or its owner. As the owner may
* be destructed in main thread before the update_cb gets called in a worker thread.
**/
using UpdateCb = std::function<ThreadLocalObjectSharedPtr(ThreadLocalObjectSharedPtr)>;
virtual void runOnAllThreads(const UpdateCb& update_cb) PURE;
virtual void runOnAllThreads(const UpdateCb& update_cb, Event::PostCb complete_cb) PURE;
};

using SlotPtr = std::unique_ptr<Slot>;
Expand Down
13 changes: 9 additions & 4 deletions source/common/common/non_copyable.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@

namespace Envoy {
/**
* Mixin class that makes derived classes not copyable. Like boost::noncopyable without boost.
* Mixin class that makes derived classes not copyable and not moveable. Like boost::noncopyable
* without boost.
*/
class NonCopyable {
protected:
NonCopyable() = default;

private:
NonCopyable(const NonCopyable&);
NonCopyable& operator=(const NonCopyable&);
// Non-moveable.
NonCopyable(NonCopyable&&) noexcept = delete;
NonCopyable& operator=(NonCopyable&&) noexcept = delete;

// Non-copyable.
NonCopyable(const NonCopyable&) = delete;
NonCopyable& operator=(const NonCopyable&) = delete;
};
} // namespace Envoy
10 changes: 10 additions & 0 deletions source/common/config/config_provider_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ ConfigSubscriptionCommonBase::~ConfigSubscriptionCommonBase() {
init_target_.ready();
config_provider_manager_.unbindSubscription(manager_identifier_);
}

void ConfigSubscriptionCommonBase::applyConfigUpdate(const ConfigUpdateCb& update_fn) {
tls_->runOnAllThreads([update_fn](ThreadLocal::ThreadLocalObjectSharedPtr previous)
-> ThreadLocal::ThreadLocalObjectSharedPtr {
auto prev_thread_local_config = std::dynamic_pointer_cast<ThreadLocalConfig>(previous);
prev_thread_local_config->config_ = update_fn(prev_thread_local_config->config_);
return previous;
});
}

bool ConfigSubscriptionInstance::checkAndApplyConfigUpdate(const Protobuf::Message& config_proto,
const std::string& config_name,
const std::string& version_info) {
Expand Down
13 changes: 1 addition & 12 deletions source/common/config/config_provider_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,19 +216,8 @@ class ConfigSubscriptionCommonBase : protected Logger::Loggable<Logger::Id::conf
*
* @param update_fn the callback to run on each thread, it takes the previous version Config and
* returns a updated/new version Config.
* @param complete_cb the callback to run when the update propagation is done.
*/
void applyConfigUpdate(
const ConfigUpdateCb& update_fn, const Event::PostCb& complete_cb = []() {}) {
tls_->runOnAllThreads(
[this, update_fn]() {
// NOTE: there is a known race condition between *this* subscription being teared down in
// main thread and the posted callback being executed before the destruction. See more
// details in https://github.com/envoyproxy/envoy/issues/7902
tls_->getTyped<ThreadLocalConfig>().config_ = update_fn(getConfig());
},
complete_cb);
}
void applyConfigUpdate(const ConfigUpdateCb& update_fn);

void setLastUpdated() { last_updated_ = time_source_.systemTime(); }

Expand Down
8 changes: 6 additions & 2 deletions source/common/router/rds_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,12 @@ Router::ConfigConstSharedPtr RdsRouteConfigProviderImpl::config() {
void RdsRouteConfigProviderImpl::onConfigUpdate() {
ConfigConstSharedPtr new_config(
new ConfigImpl(config_update_info_->routeConfiguration(), factory_context_, false));
tls_->runOnAllThreads(
[this, new_config]() -> void { tls_->getTyped<ThreadLocalConfig>().config_ = new_config; });
tls_->runOnAllThreads([new_config](ThreadLocal::ThreadLocalObjectSharedPtr previous)
-> ThreadLocal::ThreadLocalObjectSharedPtr {
auto prev_config = std::dynamic_pointer_cast<ThreadLocalConfig>(previous);
prev_config->config_ = new_config;
return previous;
});
}

void RdsRouteConfigProviderImpl::validateConfig(
Expand Down
90 changes: 88 additions & 2 deletions source/common/thread_local/thread_local_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ SlotPtr InstanceImpl::allocateSlot() {

if (free_slot_indexes_.empty()) {
std::unique_ptr<SlotImpl> slot(new SlotImpl(*this, slots_.size()));
slots_.push_back(slot.get());
return slot;
auto wrapper = std::make_unique<Bookkeeper>(*this, std::move(slot));
slots_.push_back(wrapper->slot_.get());
return wrapper;
}
const uint32_t idx = free_slot_indexes_.front();
free_slot_indexes_.pop_front();
Expand All @@ -42,11 +43,64 @@ bool InstanceImpl::SlotImpl::currentThreadRegistered() {
return thread_local_data_.data_.size() > index_;
}

void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) {
parent_.runOnAllThreads([this, cb]() { setThreadLocal(index_, cb(get())); });
}

void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) {
parent_.runOnAllThreads([this, cb]() { setThreadLocal(index_, cb(get())); }, complete_cb);
}

ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() {
ASSERT(currentThreadRegistered());
return thread_local_data_.data_[index_];
}

InstanceImpl::Bookkeeper::Bookkeeper(InstanceImpl& parent, std::unique_ptr<SlotImpl>&& slot)
: parent_(parent), slot_(std::move(slot)),
ref_count_(/*not used.*/ nullptr,
[slot = slot_.get(), &parent = this->parent_](uint32_t* /* not used */) {
// On destruction, post a cleanup callback on main thread, this could happen on
// any thread.
parent.scheduleCleanup(slot);
}) {}

ThreadLocalObjectSharedPtr InstanceImpl::Bookkeeper::get() { return slot_->get(); }

void InstanceImpl::Bookkeeper::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) {
slot_->runOnAllThreads(
[cb, ref_count = this->ref_count_](ThreadLocalObjectSharedPtr previous) {
return cb(std::move(previous));
},
complete_cb);
}

void InstanceImpl::Bookkeeper::runOnAllThreads(const UpdateCb& cb) {
slot_->runOnAllThreads([cb, ref_count = this->ref_count_](ThreadLocalObjectSharedPtr previous) {
return cb(std::move(previous));
});
}

bool InstanceImpl::Bookkeeper::currentThreadRegistered() {
return slot_->currentThreadRegistered();
}

void InstanceImpl::Bookkeeper::runOnAllThreads(Event::PostCb cb) {
// Use ref_count_ to bookkeep how many on-the-fly callback are out there.
slot_->runOnAllThreads([cb, ref_count = this->ref_count_]() { cb(); });
}

void InstanceImpl::Bookkeeper::runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) {
// Use ref_count_ to bookkeep how many on-the-fly callback are out there.
slot_->runOnAllThreads([cb, main_callback, ref_count = this->ref_count_]() { cb(); },
main_callback);
}

void InstanceImpl::Bookkeeper::set(InitializeCb cb) {
slot_->set([cb, ref_count = this->ref_count_](Event::Dispatcher& dispatcher)
-> ThreadLocalObjectSharedPtr { return cb(dispatcher); });
}

void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(!shutdown_);
Expand All @@ -61,6 +115,38 @@ void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_threa
}
}

// Puts the slot into a deferred delete container, the slot will be destructed when its out-going
// callback reference count goes to 0.
void InstanceImpl::recycle(std::unique_ptr<SlotImpl>&& slot) {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(slot != nullptr);
auto* slot_addr = slot.get();
deferred_deletes_.insert({slot_addr, std::move(slot)});
}

// Called by the Bookkeeper ref_count destructor, the SlotImpl in the deferred deletes map can be
// destructed now.
void InstanceImpl::scheduleCleanup(SlotImpl* slot) {
if (shutdown_) {
// If server is shutting down, do nothing here.
// The destruction of Bookkeeper has already transferred the SlotImpl to the deferred_deletes_
// queue. No matter if this method is called from a Worker thread, the SlotImpl will be
// destructed on main thread when InstanceImpl destructs.
return;
}
if (std::this_thread::get_id() == main_thread_id_) {
// If called from main thread, save a callback.
ASSERT(deferred_deletes_.contains(slot));
deferred_deletes_.erase(slot);
return;
}
main_thread_dispatcher_->post([slot, this]() {
ASSERT(deferred_deletes_.contains(slot));
// The slot is guaranteed to be put into the deferred_deletes_ map by Bookkeeper destructor.
deferred_deletes_.erase(slot);
});
}

void InstanceImpl::removeSlot(SlotImpl& slot) {
ASSERT(std::this_thread::get_id() == main_thread_id_);

Expand Down
37 changes: 36 additions & 1 deletion source/common/thread_local/thread_local_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@
#include "envoy/thread_local/thread_local.h"

#include "common/common/logger.h"
#include "common/common/non_copyable.h"

#include "absl/container/flat_hash_map.h"

namespace Envoy {
namespace ThreadLocal {

/**
* Implementation of ThreadLocal that relies on static thread_local objects.
*/
class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
class InstanceImpl : Logger::Loggable<Logger::Id::main>, public NonCopyable, public Instance {
public:
InstanceImpl() : main_thread_id_(std::this_thread::get_id()) {}
~InstanceImpl() override;
Expand All @@ -35,6 +38,8 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
// ThreadLocal::Slot
ThreadLocalObjectSharedPtr get() override;
bool currentThreadRegistered() override;
void runOnAllThreads(const UpdateCb& cb) override;
void runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) override;
void runOnAllThreads(Event::PostCb cb) override { parent_.runOnAllThreads(cb); }
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override {
parent_.runOnAllThreads(cb, main_callback);
Expand All @@ -45,17 +50,47 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public Instance {
const uint64_t index_;
};

// A Wrapper of SlotImpl which on destruction returns the SlotImpl to the deferred delete queue
// (detaches it).
struct Bookkeeper : public Slot {
Bookkeeper(InstanceImpl& parent, std::unique_ptr<SlotImpl>&& slot);
~Bookkeeper() override { parent_.recycle(std::move(slot_)); }

// ThreadLocal::Slot
ThreadLocalObjectSharedPtr get() override;
void runOnAllThreads(const UpdateCb& cb) override;
void runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) override;
bool currentThreadRegistered() override;
void runOnAllThreads(Event::PostCb cb) override;
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override;
void set(InitializeCb cb) override;

InstanceImpl& parent_;
std::unique_ptr<SlotImpl> slot_;
std::shared_ptr<uint32_t> ref_count_;
};

struct ThreadLocalData {
Event::Dispatcher* dispatcher_{};
std::vector<ThreadLocalObjectSharedPtr> data_;
};

void recycle(std::unique_ptr<SlotImpl>&& slot);
// Cleanup the deferred deletes queue.
void scheduleCleanup(SlotImpl* slot);

void removeSlot(SlotImpl& slot);
void runOnAllThreads(Event::PostCb cb);
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback);
static void setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object);

static thread_local ThreadLocalData thread_local_data_;

// A indexed container for Slots that has to be deferred to delete due to out-going callbacks
// pointing to the Slot. To let the ref_count_ deleter find the SlotImpl by address, the container
// is defined as a map of SlotImpl address to the unique_ptr<SlotImpl>.
absl::flat_hash_map<SlotImpl*, std::unique_ptr<SlotImpl>> deferred_deletes_;

std::vector<SlotImpl*> slots_;
// A list of index of freed slots.
std::list<uint32_t> free_slot_indexes_;
Expand Down
34 changes: 34 additions & 0 deletions test/common/thread_local/thread_local_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,40 @@ TEST_F(ThreadLocalInstanceImplTest, All) {
tls_.shutdownThread();
}

// Test that the config passed into the update callback is the previous version stored in the slot.
TEST_F(ThreadLocalInstanceImplTest, UpdateCallback) {
InSequence s;

SlotPtr slot = tls_.allocateSlot();

auto newer_version = std::make_shared<TestThreadLocalObject>();
bool update_called = false;

TestThreadLocalObject& object_ref = setObject(*slot);
auto update_cb = [&object_ref, &update_called,
newer_version](ThreadLocalObjectSharedPtr obj) -> ThreadLocalObjectSharedPtr {
// The unit test setup have two dispatchers registered, but only one thread, this lambda will be
// called twice in the same thread.
if (!update_called) {
EXPECT_EQ(obj.get(), &object_ref);
update_called = true;
} else {
EXPECT_EQ(obj.get(), newer_version.get());
}

return newer_version;
};
EXPECT_CALL(thread_dispatcher_, post(_));
EXPECT_CALL(object_ref, onDestroy());
EXPECT_CALL(*newer_version, onDestroy());
slot->runOnAllThreads(update_cb);

EXPECT_EQ(newer_version.get(), &slot->getTyped<TestThreadLocalObject>());

tls_.shutdownGlobalThreading();
tls_.shutdownThread();
}

// TODO(ramaraochavali): Run this test with real threads. The current issue in the unit
// testing environment is, the post to main_dispatcher is not working as expected.

Expand Down
8 changes: 8 additions & 0 deletions test/mocks/thread_local/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ class MockInstance : public Instance {
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) override {
parent_.runOnAllThreads(cb, main_callback);
}
void runOnAllThreads(const UpdateCb& cb) override {
parent_.runOnAllThreads([cb, this]() { parent_.data_[index_] = cb(parent_.data_[index_]); });
}
void runOnAllThreads(const UpdateCb& cb, Event::PostCb main_callback) override {
parent_.runOnAllThreads([cb, this]() { parent_.data_[index_] = cb(parent_.data_[index_]); },
main_callback);
}

void set(InitializeCb cb) override { parent_.data_[index_] = cb(parent_.dispatcher_); }

MockInstance& parent_;
Expand Down

0 comments on commit c5ffdda

Please sign in to comment.