-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rds: split subscription out from RdsRouteConfigProviderImpl #3960
Changes from all commits
813116e
4b2680d
842d08d
4fa2562
6d433fe
3c2ea5d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,23 +47,19 @@ StaticRouteConfigProviderImpl::StaticRouteConfigProviderImpl( | |
|
||
// TODO(htuch): If support for multiple clusters is added per #1170 cluster_name_ | ||
// initialization needs to be fixed. | ||
RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl( | ||
RdsRouteConfigSubscription::RdsRouteConfigSubscription( | ||
const envoy::config::filter::network::http_connection_manager::v2::Rds& rds, | ||
const std::string& manager_identifier, Server::Configuration::FactoryContext& factory_context, | ||
const std::string& stat_prefix, RouteConfigProviderManagerImpl& route_config_provider_manager) | ||
: factory_context_(factory_context), tls_(factory_context.threadLocal().allocateSlot()), | ||
route_config_name_(rds.route_config_name()), | ||
const std::string& stat_prefix, | ||
Envoy::Router::RouteConfigProviderManagerImpl& route_config_provider_manager) | ||
: route_config_name_(rds.route_config_name()), | ||
scope_(factory_context.scope().createScope(stat_prefix + "rds." + route_config_name_ + ".")), | ||
stats_({ALL_RDS_STATS(POOL_COUNTER(*scope_))}), | ||
route_config_provider_manager_(route_config_provider_manager), | ||
manager_identifier_(manager_identifier), | ||
manager_identifier_(manager_identifier), time_source_(factory_context.systemTimeSource()), | ||
last_updated_(factory_context.systemTimeSource().currentTime()) { | ||
::Envoy::Config::Utility::checkLocalInfo("rds", factory_context.localInfo()); | ||
|
||
ConfigConstSharedPtr initial_config(new NullConfigImpl()); | ||
tls_->set([initial_config](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { | ||
return std::make_shared<ThreadLocalConfig>(initial_config); | ||
}); | ||
subscription_ = Envoy::Config::SubscriptionFactory::subscriptionFromConfigSource< | ||
envoy::api::v2::RouteConfiguration>( | ||
rds.config_source(), factory_context.localInfo().node(), factory_context.dispatcher(), | ||
|
@@ -77,27 +73,22 @@ RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl( | |
}, | ||
"envoy.api.v2.RouteDiscoveryService.FetchRoutes", | ||
"envoy.api.v2.RouteDiscoveryService.StreamRoutes"); | ||
config_source_ = MessageUtil::getJsonStringFromMessage(rds.config_source(), true); | ||
} | ||
|
||
RdsRouteConfigProviderImpl::~RdsRouteConfigProviderImpl() { | ||
RdsRouteConfigSubscription::~RdsRouteConfigSubscription() { | ||
// If we get destroyed during initialization, make sure we signal that we "initialized". | ||
runInitializeCallbackIfAny(); | ||
|
||
// The ownership of RdsRouteConfigProviderImpl is shared among all HttpConnectionManagers that | ||
// hold a shared_ptr to it. The RouteConfigProviderManager holds weak_ptrs to the | ||
// RdsRouteConfigProviders. Therefore, the map entry for the RdsRouteConfigProvider has to get | ||
// cleaned by the RdsRouteConfigProvider's destructor. | ||
route_config_provider_manager_.route_config_providers_.erase(manager_identifier_); | ||
route_config_provider_manager_.route_config_subscriptions_.erase(manager_identifier_); | ||
} | ||
|
||
Router::ConfigConstSharedPtr RdsRouteConfigProviderImpl::config() { | ||
return tls_->getTyped<ThreadLocalConfig>().config_; | ||
} | ||
|
||
void RdsRouteConfigProviderImpl::onConfigUpdate(const ResourceVector& resources, | ||
void RdsRouteConfigSubscription::onConfigUpdate(const ResourceVector& resources, | ||
const std::string& version_info) { | ||
last_updated_ = factory_context_.systemTimeSource().currentTime(); | ||
last_updated_ = time_source_.currentTime(); | ||
|
||
if (resources.empty()) { | ||
ENVOY_LOG(debug, "Missing RouteConfiguration for {} in onConfigUpdate()", route_config_name_); | ||
|
@@ -115,35 +106,79 @@ void RdsRouteConfigProviderImpl::onConfigUpdate(const ResourceVector& resources, | |
throw EnvoyException(fmt::format("Unexpected RDS configuration (expecting {}): {}", | ||
route_config_name_, route_config.name())); | ||
} | ||
|
||
const uint64_t new_hash = MessageUtil::hash(route_config); | ||
if (!config_info_ || new_hash != config_info_.value().last_config_hash_) { | ||
ConfigConstSharedPtr new_config(new ConfigImpl(route_config, factory_context_, false)); | ||
config_info_ = {new_hash, version_info}; | ||
route_config_proto_ = route_config; | ||
stats_.config_reload_.inc(); | ||
ENVOY_LOG(debug, "rds: loading new configuration: config_name={} hash={}", route_config_name_, | ||
new_hash); | ||
tls_->runOnAllThreads( | ||
[this, new_config]() -> void { tls_->getTyped<ThreadLocalConfig>().config_ = new_config; }); | ||
route_config_proto_ = route_config; | ||
for (auto* provider : route_config_providers_) { | ||
provider->onConfigUpdate(); | ||
} | ||
} | ||
|
||
runInitializeCallbackIfAny(); | ||
} | ||
|
||
void RdsRouteConfigProviderImpl::onConfigUpdateFailed(const EnvoyException*) { | ||
void RdsRouteConfigSubscription::onConfigUpdateFailed(const EnvoyException*) { | ||
// We need to allow server startup to continue, even if we have a bad | ||
// config. | ||
runInitializeCallbackIfAny(); | ||
} | ||
|
||
void RdsRouteConfigProviderImpl::runInitializeCallbackIfAny() { | ||
void RdsRouteConfigSubscription::registerInitTarget(Init::Manager& init_manager) { | ||
init_manager.registerTarget(*this); | ||
} | ||
|
||
void RdsRouteConfigSubscription::runInitializeCallbackIfAny() { | ||
if (initialize_callback_) { | ||
initialize_callback_(); | ||
initialize_callback_ = nullptr; | ||
} | ||
} | ||
|
||
void RdsRouteConfigProviderImpl::registerInitTarget(Init::Manager& init_manager) { | ||
init_manager.registerTarget(*this); | ||
RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl( | ||
RdsRouteConfigSubscriptionSharedPtr&& subscription, | ||
Server::Configuration::FactoryContext& factory_context) | ||
: subscription_(std::move(subscription)), factory_context_(factory_context), | ||
tls_(factory_context.threadLocal().allocateSlot()) { | ||
ConfigConstSharedPtr initial_config; | ||
if (subscription_->config_info_.has_value()) { | ||
initial_config = | ||
std::make_shared<ConfigImpl>(subscription_->route_config_proto_, factory_context_, false); | ||
} else { | ||
initial_config = std::make_shared<NullConfigImpl>(); | ||
} | ||
tls_->set([initial_config](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { | ||
return std::make_shared<ThreadLocalConfig>(initial_config); | ||
}); | ||
subscription_->route_config_providers_.insert(this); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Subscription defines a OnUpdate interface, it only stores the interface pointers. It should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No it won't work, you will need a pointer to RdsRouteConfigProviderImpl to implement All the onUpdate is only used in |
||
} | ||
|
||
RdsRouteConfigProviderImpl::~RdsRouteConfigProviderImpl() { | ||
subscription_->route_config_providers_.erase(this); | ||
} | ||
|
||
Router::ConfigConstSharedPtr RdsRouteConfigProviderImpl::config() { | ||
return tls_->getTyped<ThreadLocalConfig>().config_; | ||
} | ||
|
||
absl::optional<RouteConfigProvider::ConfigInfo> RdsRouteConfigProviderImpl::configInfo() const { | ||
if (!subscription_->config_info_) { | ||
return {}; | ||
} else { | ||
return ConfigInfo{subscription_->route_config_proto_, | ||
subscription_->config_info_.value().last_config_version_}; | ||
} | ||
} | ||
|
||
void RdsRouteConfigProviderImpl::onConfigUpdate() { | ||
ConfigConstSharedPtr new_config( | ||
new ConfigImpl(subscription_->route_config_proto_, factory_context_, false)); | ||
tls_->runOnAllThreads( | ||
[this, new_config]() -> void { tls_->getTyped<ThreadLocalConfig>().config_ = new_config; }); | ||
} | ||
|
||
RouteConfigProviderManagerImpl::RouteConfigProviderManagerImpl(Server::Admin& admin) { | ||
|
@@ -157,14 +192,15 @@ RouteConfigProviderManagerImpl::RouteConfigProviderManagerImpl(Server::Admin& ad | |
std::vector<RouteConfigProviderSharedPtr> | ||
RouteConfigProviderManagerImpl::getRdsRouteConfigProviders() { | ||
std::vector<RouteConfigProviderSharedPtr> ret; | ||
ret.reserve(route_config_providers_.size()); | ||
for (const auto& element : route_config_providers_) { | ||
ret.reserve(route_config_subscriptions_.size()); | ||
for (const auto& element : route_config_subscriptions_) { | ||
// Because the RouteConfigProviderManager's weak_ptrs only get cleaned up | ||
// in the RdsRouteConfigProviderImpl destructor, and the single threaded nature | ||
// in the RdsRouteConfigSubscription destructor, and the single threaded nature | ||
// of this code, locking the weak_ptr will not fail. | ||
RouteConfigProviderSharedPtr provider = element.second.lock(); | ||
ASSERT(provider); | ||
ret.push_back(provider); | ||
auto subscription = element.second.lock(); | ||
ASSERT(subscription); | ||
ASSERT(subscription->route_config_providers_.size() > 0); | ||
ret.push_back((*subscription->route_config_providers_.begin())->shared_from_this()); | ||
} | ||
return ret; | ||
}; | ||
|
@@ -190,31 +226,34 @@ Router::RouteConfigProviderSharedPtr RouteConfigProviderManagerImpl::getRdsRoute | |
const envoy::config::filter::network::http_connection_manager::v2::Rds& rds, | ||
Server::Configuration::FactoryContext& factory_context, const std::string& stat_prefix) { | ||
|
||
// RdsRouteConfigProviders are unique based on their serialized RDS config. | ||
// RdsRouteConfigSubscriptions are unique based on their serialized RDS config. | ||
// TODO(htuch): Full serialization here gives large IDs, could get away with a | ||
// strong hash instead. | ||
const std::string manager_identifier = rds.SerializeAsString(); | ||
|
||
auto it = route_config_providers_.find(manager_identifier); | ||
if (it == route_config_providers_.end()) { | ||
RdsRouteConfigSubscriptionSharedPtr subscription; | ||
|
||
auto it = route_config_subscriptions_.find(manager_identifier); | ||
if (it == route_config_subscriptions_.end()) { | ||
// std::make_shared does not work for classes with private constructors. There are ways | ||
// around it. However, since this is not a performance critical path we err on the side | ||
// of simplicity. | ||
std::shared_ptr<RdsRouteConfigProviderImpl> new_provider{new RdsRouteConfigProviderImpl( | ||
rds, manager_identifier, factory_context, stat_prefix, *this)}; | ||
|
||
new_provider->registerInitTarget(factory_context.initManager()); | ||
subscription.reset(new RdsRouteConfigSubscription(rds, manager_identifier, factory_context, | ||
stat_prefix, *this)); | ||
|
||
route_config_providers_.insert({manager_identifier, new_provider}); | ||
subscription->registerInitTarget(factory_context.initManager()); | ||
|
||
return new_provider; | ||
route_config_subscriptions_.insert({manager_identifier, subscription}); | ||
} else { | ||
// Because the RouteConfigProviderManager's weak_ptrs only get cleaned up | ||
// in the RdsRouteConfigSubscription destructor, and the single threaded nature | ||
// of this code, locking the weak_ptr will not fail. | ||
subscription = it->second.lock(); | ||
} | ||
ASSERT(subscription); | ||
|
||
// Because the RouteConfigProviderManager's weak_ptrs only get cleaned up | ||
// in the RdsRouteConfigProviderImpl destructor, and the single threaded nature | ||
// of this code, locking the weak_ptr will not fail. | ||
Router::RouteConfigProviderSharedPtr new_provider = it->second.lock(); | ||
ASSERT(new_provider); | ||
Router::RouteConfigProviderSharedPtr new_provider{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be unique_ptr? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will involving interface changes and make them consistent with StaticRouteConfigProviders, and update There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO, it is important to use unique_ptr to show Provider object is not shared, only HttpConnectionManager owns it. |
||
new RdsRouteConfigProviderImpl(std::move(subscription), factory_context)}; | ||
return new_provider; | ||
}; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
#include <functional> | ||
#include <string> | ||
#include <unordered_map> | ||
#include <unordered_set> | ||
|
||
#include "envoy/api/v2/rds.pb.h" | ||
#include "envoy/api/v2/route/route.pb.h" | ||
|
@@ -80,36 +81,25 @@ struct RdsStats { | |
}; | ||
|
||
class RouteConfigProviderManagerImpl; | ||
class RdsRouteConfigProviderImpl; | ||
|
||
/** | ||
* Implementation of RouteConfigProvider that fetches the route configuration dynamically using | ||
* the RDS API. | ||
* A class that fetches the route configuration dynamically using the RDS API and updates them to | ||
* RDS config providers. | ||
*/ | ||
class RdsRouteConfigProviderImpl | ||
: public RouteConfigProvider, | ||
public Init::Target, | ||
class RdsRouteConfigSubscription | ||
: public Init::Target, | ||
Envoy::Config::SubscriptionCallbacks<envoy::api::v2::RouteConfiguration>, | ||
Logger::Loggable<Logger::Id::router> { | ||
public: | ||
~RdsRouteConfigProviderImpl(); | ||
~RdsRouteConfigSubscription(); | ||
|
||
// Init::Target | ||
void initialize(std::function<void()> callback) override { | ||
initialize_callback_ = callback; | ||
subscription_->start({route_config_name_}, *this); | ||
} | ||
|
||
// Router::RouteConfigProvider | ||
Router::ConfigConstSharedPtr config() override; | ||
absl::optional<ConfigInfo> configInfo() const override { | ||
if (!config_info_) { | ||
return {}; | ||
} else { | ||
return ConfigInfo{route_config_proto_, config_info_.value().last_config_version_}; | ||
} | ||
} | ||
SystemTime lastUpdated() const override { return last_updated_; } | ||
|
||
// Config::SubscriptionCallbacks | ||
void onConfigUpdate(const ResourceVector& resources, const std::string& version_info) override; | ||
void onConfigUpdateFailed(const EnvoyException* e) override; | ||
|
@@ -118,18 +108,12 @@ class RdsRouteConfigProviderImpl | |
} | ||
|
||
private: | ||
struct ThreadLocalConfig : public ThreadLocal::ThreadLocalObject { | ||
ThreadLocalConfig(ConfigConstSharedPtr initial_config) : config_(initial_config) {} | ||
|
||
ConfigConstSharedPtr config_; | ||
}; | ||
|
||
struct LastConfigInfo { | ||
uint64_t last_config_hash_; | ||
std::string last_config_version_; | ||
}; | ||
|
||
RdsRouteConfigProviderImpl( | ||
RdsRouteConfigSubscription( | ||
const envoy::config::filter::network::http_connection_manager::v2::Rds& rds, | ||
const std::string& manager_identifier, Server::Configuration::FactoryContext& factory_context, | ||
const std::string& stat_prefix, | ||
|
@@ -138,19 +122,57 @@ class RdsRouteConfigProviderImpl | |
void registerInitTarget(Init::Manager& init_manager); | ||
void runInitializeCallbackIfAny(); | ||
|
||
Server::Configuration::FactoryContext& factory_context_; | ||
std::unique_ptr<Envoy::Config::Subscription<envoy::api::v2::RouteConfiguration>> subscription_; | ||
ThreadLocal::SlotPtr tls_; | ||
std::string config_source_; | ||
std::function<void()> initialize_callback_; | ||
const std::string route_config_name_; | ||
absl::optional<LastConfigInfo> config_info_; | ||
Stats::ScopePtr scope_; | ||
RdsStats stats_; | ||
std::function<void()> initialize_callback_; | ||
RouteConfigProviderManagerImpl& route_config_provider_manager_; | ||
const std::string manager_identifier_; | ||
envoy::api::v2::RouteConfiguration route_config_proto_; | ||
SystemTimeSource& time_source_; | ||
SystemTime last_updated_; | ||
absl::optional<LastConfigInfo> config_info_; | ||
envoy::api::v2::RouteConfiguration route_config_proto_; | ||
std::unordered_set<RdsRouteConfigProviderImpl*> route_config_providers_; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not to store the whole provider object, define OnUpdate interface, only store the interface pointers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto above, you need pointer to ProviderImpl to get shared_ptr of it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is what I mean:
Here RdsConfigSubscription uses it as: std::unordered_set<RdsSubscriptionCallback*> update_callbacks_; ProviderImpl will also implement such Callback There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I know what you meant, then you won't be able to do https://github.com/lizan/envoy/blob/6d433fef72eebc37b63acef63fd492cbbb9ff659/source/common/router/rds_impl.cc#L204 without dynamic_cast. It doesn't make sense to make an interface which is completely implementation detail. |
||
|
||
friend class RouteConfigProviderManagerImpl; | ||
friend class RdsRouteConfigProviderImpl; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we not use friend class? define some getter functions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the point of define more getter functions? No other classes should access them via getter than friend classes listed here. |
||
}; | ||
|
||
typedef std::shared_ptr<RdsRouteConfigSubscription> RdsRouteConfigSubscriptionSharedPtr; | ||
|
||
/** | ||
* Implementation of RouteConfigProvider that fetches the route configuration dynamically using | ||
* the subscription. | ||
*/ | ||
class RdsRouteConfigProviderImpl : public RouteConfigProvider, | ||
public std::enable_shared_from_this<RdsRouteConfigProviderImpl>, | ||
Logger::Loggable<Logger::Id::router> { | ||
public: | ||
~RdsRouteConfigProviderImpl(); | ||
|
||
RdsRouteConfigSubscription& subscription() { return *subscription_; } | ||
void onConfigUpdate(); | ||
|
||
// Router::RouteConfigProvider | ||
Router::ConfigConstSharedPtr config() override; | ||
absl::optional<ConfigInfo> configInfo() const override; | ||
SystemTime lastUpdated() const override { return subscription_->last_updated_; } | ||
|
||
private: | ||
struct ThreadLocalConfig : public ThreadLocal::ThreadLocalObject { | ||
ThreadLocalConfig(ConfigConstSharedPtr initial_config) : config_(initial_config) {} | ||
|
||
ConfigConstSharedPtr config_; | ||
}; | ||
|
||
RdsRouteConfigProviderImpl(RdsRouteConfigSubscriptionSharedPtr&& subscription, | ||
Server::Configuration::FactoryContext& factory_context); | ||
|
||
RdsRouteConfigSubscriptionSharedPtr subscription_; | ||
Server::Configuration::FactoryContext& factory_context_; | ||
ThreadLocal::SlotPtr tls_; | ||
const std::string route_config_name_; | ||
|
||
friend class RouteConfigProviderManagerImpl; | ||
}; | ||
|
@@ -180,12 +202,12 @@ class RouteConfigProviderManagerImpl : public RouteConfigProviderManager, | |
// as in ConfigTracker. I.e. the ProviderImpls would have an EntryOwner for these lists | ||
// Then the lifetime management stuff is centralized and opaque. Plus the copypasta | ||
// in getRdsRouteConfigProviders()/getStaticRouteConfigProviders() goes away. | ||
std::unordered_map<std::string, std::weak_ptr<RdsRouteConfigProviderImpl>> | ||
route_config_providers_; | ||
std::unordered_map<std::string, std::weak_ptr<RdsRouteConfigSubscription>> | ||
route_config_subscriptions_; | ||
std::vector<std::weak_ptr<RouteConfigProvider>> static_route_config_providers_; | ||
Server::ConfigTracker::EntryOwnerPtr config_tracker_entry_; | ||
|
||
friend class RdsRouteConfigProviderImpl; | ||
friend class RdsRouteConfigSubscription; | ||
}; | ||
|
||
} // namespace Router | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we define a getter function? not to use subscription member directly.