diff --git a/source/common/router/rds_impl.cc b/source/common/router/rds_impl.cc index 88ac8d4ed3eb..4586845b2691 100644 --- a/source/common/router/rds_impl.cc +++ b/source/common/router/rds_impl.cc @@ -47,23 +47,19 @@ StaticRouteConfigProviderImpl::StaticRouteConfigProviderImpl( // TODO(htuch): If support for multiple clusters is added per #1170 cluster_name_ // initialization needs to be fixed. -RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl( +RdsRouteConfigSubscription::RdsRouteConfigSubscription( const envoy::config::filter::network::http_connection_manager::v2::Rds& rds, const std::string& manager_identifier, Server::Configuration::FactoryContext& factory_context, - const std::string& stat_prefix, RouteConfigProviderManagerImpl& route_config_provider_manager) - : factory_context_(factory_context), tls_(factory_context.threadLocal().allocateSlot()), - route_config_name_(rds.route_config_name()), + const std::string& stat_prefix, + Envoy::Router::RouteConfigProviderManagerImpl& route_config_provider_manager) + : route_config_name_(rds.route_config_name()), scope_(factory_context.scope().createScope(stat_prefix + "rds." + route_config_name_ + ".")), stats_({ALL_RDS_STATS(POOL_COUNTER(*scope_))}), route_config_provider_manager_(route_config_provider_manager), - manager_identifier_(manager_identifier), + manager_identifier_(manager_identifier), time_source_(factory_context.systemTimeSource()), last_updated_(factory_context.systemTimeSource().currentTime()) { ::Envoy::Config::Utility::checkLocalInfo("rds", factory_context.localInfo()); - ConfigConstSharedPtr initial_config(new NullConfigImpl()); - tls_->set([initial_config](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { - return std::make_shared(initial_config); - }); subscription_ = Envoy::Config::SubscriptionFactory::subscriptionFromConfigSource< envoy::api::v2::RouteConfiguration>( rds.config_source(), factory_context.localInfo().node(), factory_context.dispatcher(), @@ -77,10 +73,9 @@ RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl( }, "envoy.api.v2.RouteDiscoveryService.FetchRoutes", "envoy.api.v2.RouteDiscoveryService.StreamRoutes"); - config_source_ = MessageUtil::getJsonStringFromMessage(rds.config_source(), true); } -RdsRouteConfigProviderImpl::~RdsRouteConfigProviderImpl() { +RdsRouteConfigSubscription::~RdsRouteConfigSubscription() { // If we get destroyed during initialization, make sure we signal that we "initialized". runInitializeCallbackIfAny(); @@ -88,16 +83,12 @@ RdsRouteConfigProviderImpl::~RdsRouteConfigProviderImpl() { // hold a shared_ptr to it. The RouteConfigProviderManager holds weak_ptrs to the // RdsRouteConfigProviders. Therefore, the map entry for the RdsRouteConfigProvider has to get // cleaned by the RdsRouteConfigProvider's destructor. - route_config_provider_manager_.route_config_providers_.erase(manager_identifier_); + route_config_provider_manager_.route_config_subscriptions_.erase(manager_identifier_); } -Router::ConfigConstSharedPtr RdsRouteConfigProviderImpl::config() { - return tls_->getTyped().config_; -} - -void RdsRouteConfigProviderImpl::onConfigUpdate(const ResourceVector& resources, +void RdsRouteConfigSubscription::onConfigUpdate(const ResourceVector& resources, const std::string& version_info) { - last_updated_ = factory_context_.systemTimeSource().currentTime(); + last_updated_ = time_source_.currentTime(); if (resources.empty()) { ENVOY_LOG(debug, "Missing RouteConfiguration for {} in onConfigUpdate()", route_config_name_); @@ -115,35 +106,79 @@ void RdsRouteConfigProviderImpl::onConfigUpdate(const ResourceVector& resources, throw EnvoyException(fmt::format("Unexpected RDS configuration (expecting {}): {}", route_config_name_, route_config.name())); } + const uint64_t new_hash = MessageUtil::hash(route_config); if (!config_info_ || new_hash != config_info_.value().last_config_hash_) { - ConfigConstSharedPtr new_config(new ConfigImpl(route_config, factory_context_, false)); config_info_ = {new_hash, version_info}; + route_config_proto_ = route_config; stats_.config_reload_.inc(); ENVOY_LOG(debug, "rds: loading new configuration: config_name={} hash={}", route_config_name_, new_hash); - tls_->runOnAllThreads( - [this, new_config]() -> void { tls_->getTyped().config_ = new_config; }); - route_config_proto_ = route_config; + for (auto* provider : route_config_providers_) { + provider->onConfigUpdate(); + } } + runInitializeCallbackIfAny(); } -void RdsRouteConfigProviderImpl::onConfigUpdateFailed(const EnvoyException*) { +void RdsRouteConfigSubscription::onConfigUpdateFailed(const EnvoyException*) { // We need to allow server startup to continue, even if we have a bad // config. runInitializeCallbackIfAny(); } -void RdsRouteConfigProviderImpl::runInitializeCallbackIfAny() { +void RdsRouteConfigSubscription::registerInitTarget(Init::Manager& init_manager) { + init_manager.registerTarget(*this); +} + +void RdsRouteConfigSubscription::runInitializeCallbackIfAny() { if (initialize_callback_) { initialize_callback_(); initialize_callback_ = nullptr; } } -void RdsRouteConfigProviderImpl::registerInitTarget(Init::Manager& init_manager) { - init_manager.registerTarget(*this); +RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl( + RdsRouteConfigSubscriptionSharedPtr&& subscription, + Server::Configuration::FactoryContext& factory_context) + : subscription_(std::move(subscription)), factory_context_(factory_context), + tls_(factory_context.threadLocal().allocateSlot()) { + ConfigConstSharedPtr initial_config; + if (subscription_->config_info_.has_value()) { + initial_config = + std::make_shared(subscription_->route_config_proto_, factory_context_, false); + } else { + initial_config = std::make_shared(); + } + tls_->set([initial_config](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { + return std::make_shared(initial_config); + }); + subscription_->route_config_providers_.insert(this); +} + +RdsRouteConfigProviderImpl::~RdsRouteConfigProviderImpl() { + subscription_->route_config_providers_.erase(this); +} + +Router::ConfigConstSharedPtr RdsRouteConfigProviderImpl::config() { + return tls_->getTyped().config_; +} + +absl::optional RdsRouteConfigProviderImpl::configInfo() const { + if (!subscription_->config_info_) { + return {}; + } else { + return ConfigInfo{subscription_->route_config_proto_, + subscription_->config_info_.value().last_config_version_}; + } +} + +void RdsRouteConfigProviderImpl::onConfigUpdate() { + ConfigConstSharedPtr new_config( + new ConfigImpl(subscription_->route_config_proto_, factory_context_, false)); + tls_->runOnAllThreads( + [this, new_config]() -> void { tls_->getTyped().config_ = new_config; }); } RouteConfigProviderManagerImpl::RouteConfigProviderManagerImpl(Server::Admin& admin) { @@ -157,14 +192,15 @@ RouteConfigProviderManagerImpl::RouteConfigProviderManagerImpl(Server::Admin& ad std::vector RouteConfigProviderManagerImpl::getRdsRouteConfigProviders() { std::vector ret; - ret.reserve(route_config_providers_.size()); - for (const auto& element : route_config_providers_) { + ret.reserve(route_config_subscriptions_.size()); + for (const auto& element : route_config_subscriptions_) { // Because the RouteConfigProviderManager's weak_ptrs only get cleaned up - // in the RdsRouteConfigProviderImpl destructor, and the single threaded nature + // in the RdsRouteConfigSubscription destructor, and the single threaded nature // of this code, locking the weak_ptr will not fail. - RouteConfigProviderSharedPtr provider = element.second.lock(); - ASSERT(provider); - ret.push_back(provider); + auto subscription = element.second.lock(); + ASSERT(subscription); + ASSERT(subscription->route_config_providers_.size() > 0); + ret.push_back((*subscription->route_config_providers_.begin())->shared_from_this()); } return ret; }; @@ -190,31 +226,34 @@ Router::RouteConfigProviderSharedPtr RouteConfigProviderManagerImpl::getRdsRoute const envoy::config::filter::network::http_connection_manager::v2::Rds& rds, Server::Configuration::FactoryContext& factory_context, const std::string& stat_prefix) { - // RdsRouteConfigProviders are unique based on their serialized RDS config. + // RdsRouteConfigSubscriptions are unique based on their serialized RDS config. // TODO(htuch): Full serialization here gives large IDs, could get away with a // strong hash instead. const std::string manager_identifier = rds.SerializeAsString(); - auto it = route_config_providers_.find(manager_identifier); - if (it == route_config_providers_.end()) { + RdsRouteConfigSubscriptionSharedPtr subscription; + + auto it = route_config_subscriptions_.find(manager_identifier); + if (it == route_config_subscriptions_.end()) { // std::make_shared does not work for classes with private constructors. There are ways // around it. However, since this is not a performance critical path we err on the side // of simplicity. - std::shared_ptr new_provider{new RdsRouteConfigProviderImpl( - rds, manager_identifier, factory_context, stat_prefix, *this)}; - - new_provider->registerInitTarget(factory_context.initManager()); + subscription.reset(new RdsRouteConfigSubscription(rds, manager_identifier, factory_context, + stat_prefix, *this)); - route_config_providers_.insert({manager_identifier, new_provider}); + subscription->registerInitTarget(factory_context.initManager()); - return new_provider; + route_config_subscriptions_.insert({manager_identifier, subscription}); + } else { + // Because the RouteConfigProviderManager's weak_ptrs only get cleaned up + // in the RdsRouteConfigSubscription destructor, and the single threaded nature + // of this code, locking the weak_ptr will not fail. + subscription = it->second.lock(); } + ASSERT(subscription); - // Because the RouteConfigProviderManager's weak_ptrs only get cleaned up - // in the RdsRouteConfigProviderImpl destructor, and the single threaded nature - // of this code, locking the weak_ptr will not fail. - Router::RouteConfigProviderSharedPtr new_provider = it->second.lock(); - ASSERT(new_provider); + Router::RouteConfigProviderSharedPtr new_provider{ + new RdsRouteConfigProviderImpl(std::move(subscription), factory_context)}; return new_provider; }; diff --git a/source/common/router/rds_impl.h b/source/common/router/rds_impl.h index 60f39aa2a809..261759248f9d 100644 --- a/source/common/router/rds_impl.h +++ b/source/common/router/rds_impl.h @@ -4,6 +4,7 @@ #include #include #include +#include #include "envoy/api/v2/rds.pb.h" #include "envoy/api/v2/route/route.pb.h" @@ -80,18 +81,18 @@ struct RdsStats { }; class RouteConfigProviderManagerImpl; +class RdsRouteConfigProviderImpl; /** - * Implementation of RouteConfigProvider that fetches the route configuration dynamically using - * the RDS API. + * A class that fetches the route configuration dynamically using the RDS API and updates them to + * RDS config providers. */ -class RdsRouteConfigProviderImpl - : public RouteConfigProvider, - public Init::Target, +class RdsRouteConfigSubscription + : public Init::Target, Envoy::Config::SubscriptionCallbacks, Logger::Loggable { public: - ~RdsRouteConfigProviderImpl(); + ~RdsRouteConfigSubscription(); // Init::Target void initialize(std::function callback) override { @@ -99,17 +100,6 @@ class RdsRouteConfigProviderImpl subscription_->start({route_config_name_}, *this); } - // Router::RouteConfigProvider - Router::ConfigConstSharedPtr config() override; - absl::optional configInfo() const override { - if (!config_info_) { - return {}; - } else { - return ConfigInfo{route_config_proto_, config_info_.value().last_config_version_}; - } - } - SystemTime lastUpdated() const override { return last_updated_; } - // Config::SubscriptionCallbacks void onConfigUpdate(const ResourceVector& resources, const std::string& version_info) override; void onConfigUpdateFailed(const EnvoyException* e) override; @@ -118,18 +108,12 @@ class RdsRouteConfigProviderImpl } private: - struct ThreadLocalConfig : public ThreadLocal::ThreadLocalObject { - ThreadLocalConfig(ConfigConstSharedPtr initial_config) : config_(initial_config) {} - - ConfigConstSharedPtr config_; - }; - struct LastConfigInfo { uint64_t last_config_hash_; std::string last_config_version_; }; - RdsRouteConfigProviderImpl( + RdsRouteConfigSubscription( const envoy::config::filter::network::http_connection_manager::v2::Rds& rds, const std::string& manager_identifier, Server::Configuration::FactoryContext& factory_context, const std::string& stat_prefix, @@ -138,19 +122,57 @@ class RdsRouteConfigProviderImpl void registerInitTarget(Init::Manager& init_manager); void runInitializeCallbackIfAny(); - Server::Configuration::FactoryContext& factory_context_; std::unique_ptr> subscription_; - ThreadLocal::SlotPtr tls_; - std::string config_source_; + std::function initialize_callback_; const std::string route_config_name_; - absl::optional config_info_; Stats::ScopePtr scope_; RdsStats stats_; - std::function initialize_callback_; RouteConfigProviderManagerImpl& route_config_provider_manager_; const std::string manager_identifier_; - envoy::api::v2::RouteConfiguration route_config_proto_; + SystemTimeSource& time_source_; SystemTime last_updated_; + absl::optional config_info_; + envoy::api::v2::RouteConfiguration route_config_proto_; + std::unordered_set route_config_providers_; + + friend class RouteConfigProviderManagerImpl; + friend class RdsRouteConfigProviderImpl; +}; + +typedef std::shared_ptr RdsRouteConfigSubscriptionSharedPtr; + +/** + * Implementation of RouteConfigProvider that fetches the route configuration dynamically using + * the subscription. + */ +class RdsRouteConfigProviderImpl : public RouteConfigProvider, + public std::enable_shared_from_this, + Logger::Loggable { +public: + ~RdsRouteConfigProviderImpl(); + + RdsRouteConfigSubscription& subscription() { return *subscription_; } + void onConfigUpdate(); + + // Router::RouteConfigProvider + Router::ConfigConstSharedPtr config() override; + absl::optional configInfo() const override; + SystemTime lastUpdated() const override { return subscription_->last_updated_; } + +private: + struct ThreadLocalConfig : public ThreadLocal::ThreadLocalObject { + ThreadLocalConfig(ConfigConstSharedPtr initial_config) : config_(initial_config) {} + + ConfigConstSharedPtr config_; + }; + + RdsRouteConfigProviderImpl(RdsRouteConfigSubscriptionSharedPtr&& subscription, + Server::Configuration::FactoryContext& factory_context); + + RdsRouteConfigSubscriptionSharedPtr subscription_; + Server::Configuration::FactoryContext& factory_context_; + ThreadLocal::SlotPtr tls_; + const std::string route_config_name_; friend class RouteConfigProviderManagerImpl; }; @@ -180,12 +202,12 @@ class RouteConfigProviderManagerImpl : public RouteConfigProviderManager, // as in ConfigTracker. I.e. the ProviderImpls would have an EntryOwner for these lists // Then the lifetime management stuff is centralized and opaque. Plus the copypasta // in getRdsRouteConfigProviders()/getStaticRouteConfigProviders() goes away. - std::unordered_map> - route_config_providers_; + std::unordered_map> + route_config_subscriptions_; std::vector> static_route_config_providers_; Server::ConfigTracker::EntryOwnerPtr config_tracker_entry_; - friend class RdsRouteConfigProviderImpl; + friend class RdsRouteConfigSubscription; }; } // namespace Router diff --git a/test/common/router/rds_impl_test.cc b/test/common/router/rds_impl_test.cc index 2d649ca392fe..a918767606e7 100644 --- a/test/common/router/rds_impl_test.cc +++ b/test/common/router/rds_impl_test.cc @@ -513,9 +513,10 @@ TEST_F(RouteConfigProviderManagerImplTest, Basic) { RouteConfigProviderSharedPtr provider2 = route_config_provider_manager_->getRdsRouteConfigProvider(rds_, factory_context_, "foo_prefix"); - // So this means that both shared_ptrs should be the same. - EXPECT_EQ(provider_, provider2); - EXPECT_EQ(2UL, provider_.use_count()); + // So this means that both provider have same subscription. + EXPECT_NE(provider_, provider2); + EXPECT_EQ(&dynamic_cast(*provider_).subscription(), + &dynamic_cast(*provider2).subscription()); std::string config_json2 = R"EOF( { @@ -541,13 +542,11 @@ TEST_F(RouteConfigProviderManagerImplTest, Basic) { route_config_provider_manager_->getRdsRouteConfigProvider(rds2, factory_context_, "foo_prefix"); EXPECT_NE(provider3, provider_); - EXPECT_EQ(2UL, provider_.use_count()); EXPECT_EQ(1UL, provider3.use_count()); std::vector configured_providers = route_config_provider_manager_->getRdsRouteConfigProviders(); EXPECT_EQ(2UL, configured_providers.size()); - EXPECT_EQ(3UL, provider_.use_count()); EXPECT_EQ(2UL, provider3.use_count()); provider_.reset(); @@ -575,7 +574,8 @@ TEST_F(RouteConfigProviderManagerImplTest, ValidateFail) { auto* route_config = route_configs.Add(); route_config->set_name("foo_route_config"); route_config->mutable_virtual_hosts()->Add(); - EXPECT_THROW(provider_impl.onConfigUpdate(route_configs, ""), ProtoValidationException); + EXPECT_THROW(provider_impl.subscription().onConfigUpdate(route_configs, ""), + ProtoValidationException); } TEST_F(RouteConfigProviderManagerImplTest, onConfigUpdateEmpty) { @@ -583,7 +583,7 @@ TEST_F(RouteConfigProviderManagerImplTest, onConfigUpdateEmpty) { factory_context_.init_manager_.initialize(); auto& provider_impl = dynamic_cast(*provider_.get()); EXPECT_CALL(factory_context_.init_manager_.initialized_, ready()); - provider_impl.onConfigUpdate({}, ""); + provider_impl.subscription().onConfigUpdate({}, ""); EXPECT_EQ( 1UL, factory_context_.scope_.counter("foo_prefix.rds.foo_route_config.update_empty").value()); } @@ -596,8 +596,8 @@ TEST_F(RouteConfigProviderManagerImplTest, onConfigUpdateWrongSize) { route_configs.Add(); route_configs.Add(); EXPECT_CALL(factory_context_.init_manager_.initialized_, ready()); - EXPECT_THROW_WITH_MESSAGE(provider_impl.onConfigUpdate(route_configs, ""), EnvoyException, - "Unexpected RDS resource length: 2"); + EXPECT_THROW_WITH_MESSAGE(provider_impl.subscription().onConfigUpdate(route_configs, ""), + EnvoyException, "Unexpected RDS resource length: 2"); } } // namespace diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc index d0a72ed610d5..e25eaa1902d2 100644 --- a/test/integration/ads_integration_test.cc +++ b/test/integration/ads_integration_test.cc @@ -198,9 +198,10 @@ class AdsIntegrationTest : public AdsIntegrationBaseTest, fake_upstreams_[0]->localAddress()->ip()->port())); } - envoy::api::v2::Listener buildListener(const std::string& name, const std::string& route_config) { - return TestUtility::parseYaml( - fmt::format(R"EOF( + envoy::api::v2::Listener buildListener(const std::string& name, const std::string& route_config, + const std::string& stat_prefix = "ads_test") { + return TestUtility::parseYaml(fmt::format( + R"EOF( name: {} address: socket_address: @@ -210,14 +211,14 @@ class AdsIntegrationTest : public AdsIntegrationBaseTest, filters: - name: envoy.http_connection_manager config: - stat_prefix: ads_test + stat_prefix: {} codec_type: HTTP2 rds: route_config_name: {} config_source: {{ ads: {{}} }} http_filters: [{{ name: envoy.router }}] )EOF", - name, Network::Test::getLoopbackAddressString(ipVersion()), route_config)); + name, Network::Test::getLoopbackAddressString(ipVersion()), stat_prefix, route_config)); } envoy::api::v2::RouteConfiguration buildRouteConfig(const std::string& name, @@ -471,6 +472,79 @@ TEST_P(AdsIntegrationTest, Failure) { makeSingleRequest(); } +// Regression test for the use-after-free crash when processing RDS update (#3953). +TEST_P(AdsIntegrationTest, RdsAfterLdsWithNoRdsChanges) { + initialize(); + + // Send initial configuration. + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {buildCluster("cluster_0")}, "1"); + sendDiscoveryResponse( + Config::TypeUrl::get().ClusterLoadAssignment, {buildClusterLoadAssignment("cluster_0")}, "1"); + sendDiscoveryResponse( + Config::TypeUrl::get().Listener, {buildListener("listener_0", "route_config_0")}, "1"); + sendDiscoveryResponse( + Config::TypeUrl::get().RouteConfiguration, {buildRouteConfig("route_config_0", "cluster_0")}, + "1"); + test_server_->waitForCounterGe("listener_manager.listener_create_success", 1); + + // Validate that we can process a request. + makeSingleRequest(); + + // Update existing LDS (change stat_prefix). + sendDiscoveryResponse( + Config::TypeUrl::get().Listener, {buildListener("listener_0", "route_config_0", "rds_crash")}, + "2"); + test_server_->waitForCounterGe("listener_manager.listener_create_success", 2); + + // Update existing RDS (no changes). + sendDiscoveryResponse( + Config::TypeUrl::get().RouteConfiguration, {buildRouteConfig("route_config_0", "cluster_0")}, + "2"); + + // Validate that we can process a request again + makeSingleRequest(); +} + +// Regression test for the use-after-free crash when processing RDS update (#3953). +TEST_P(AdsIntegrationTest, RdsAfterLdsWithRdsChange) { + initialize(); + + // Send initial configuration. + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {buildCluster("cluster_0")}, "1"); + sendDiscoveryResponse( + Config::TypeUrl::get().ClusterLoadAssignment, {buildClusterLoadAssignment("cluster_0")}, "1"); + sendDiscoveryResponse( + Config::TypeUrl::get().Listener, {buildListener("listener_0", "route_config_0")}, "1"); + sendDiscoveryResponse( + Config::TypeUrl::get().RouteConfiguration, {buildRouteConfig("route_config_0", "cluster_0")}, + "1"); + test_server_->waitForCounterGe("listener_manager.listener_create_success", 1); + + // Validate that we can process a request. + makeSingleRequest(); + + // Update existing LDS (change stat_prefix). + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, + {buildCluster("cluster_1")}, "2"); + sendDiscoveryResponse( + Config::TypeUrl::get().ClusterLoadAssignment, {buildClusterLoadAssignment("cluster_1")}, "2"); + sendDiscoveryResponse( + Config::TypeUrl::get().Listener, {buildListener("listener_0", "route_config_0", "rds_crash")}, + "2"); + test_server_->waitForCounterGe("listener_manager.listener_create_success", 2); + + // Update existing RDS (migrate traffic to cluster_1). + sendDiscoveryResponse( + Config::TypeUrl::get().RouteConfiguration, {buildRouteConfig("route_config_0", "cluster_1")}, + "2"); + + // Validate that we can process a request after RDS update + test_server_->waitForCounterGe("http.ads_test.rds.route_config_0.config_reload", 2); + makeSingleRequest(); +} + class AdsFailIntegrationTest : public AdsIntegrationBaseTest, public Grpc::GrpcClientIntegrationParamTest { public: