diff --git a/src/v/pandaproxy/kafka_client_cache.cc b/src/v/pandaproxy/kafka_client_cache.cc index 93167236adfb1..84ab039bf6224 100644 --- a/src/v/pandaproxy/kafka_client_cache.cc +++ b/src/v/pandaproxy/kafka_client_cache.cc @@ -20,10 +20,14 @@ namespace pandaproxy { kafka_client_cache::kafka_client_cache( - YAML::Node const& cfg, size_t max_size, std::chrono::milliseconds keep_alive) + YAML::Node const& cfg, + size_t max_size, + std::chrono::milliseconds keep_alive, + ss::timer& evict_timer) : _config{cfg} , _cache_max_size{max_size} - , _keep_alive{keep_alive} {} + , _keep_alive{keep_alive} + , _evict_timer{evict_timer} {} client_ptr kafka_client_cache::make_client( credential_t user, config::rest_authn_method authn_method) { @@ -59,6 +63,8 @@ client_ptr kafka_client_cache::fetch_or_insert( vlog(plog.debug, "Cache size reached, evicting {}", item.key); inner_list.pop_back(); _evicted_items.push_back(std::move(item)); + // Trigger the eviction process + _evict_timer.rearm(ss::lowres_clock::now()); } } @@ -129,7 +135,9 @@ ss::future<> kafka_client_cache::clean_stale_clients() { }; auto& inner_list = _cache.get(); co_await remove_client_if(inner_list, is_expired(_keep_alive)); +} +ss::future<> kafka_client_cache::evict_clients() { constexpr auto always = [](auto&&) { return true; }; co_await remove_client_if(_evicted_items, always); } diff --git a/src/v/pandaproxy/kafka_client_cache.h b/src/v/pandaproxy/kafka_client_cache.h index 87ed9b75f6537..64db6bda27337 100644 --- a/src/v/pandaproxy/kafka_client_cache.h +++ b/src/v/pandaproxy/kafka_client_cache.h @@ -13,6 +13,8 @@ #include "config/rest_authn_endpoint.h" #include "pandaproxy/types.h" +#include + #include #include #include @@ -34,7 +36,8 @@ class kafka_client_cache { kafka_client_cache( YAML::Node const& cfg, size_t max_size, - std::chrono::milliseconds keep_alive); + std::chrono::milliseconds keep_alive, + ss::timer& evict_timer); ~kafka_client_cache() = default; @@ -52,6 +55,7 @@ class kafka_client_cache { fetch_or_insert(credential_t user, config::rest_authn_method authn_method); ss::future<> clean_stale_clients(); + ss::future<> evict_clients(); size_t size() const; size_t max_size() const; @@ -80,5 +84,6 @@ class kafka_client_cache { std::chrono::milliseconds _keep_alive; underlying_t _cache; std::list _evicted_items; + ss::timer& _evict_timer; }; } // namespace pandaproxy diff --git a/src/v/pandaproxy/sharded_client_cache.cc b/src/v/pandaproxy/sharded_client_cache.cc index 15413d4225b61..a18790686b9fa 100644 --- a/src/v/pandaproxy/sharded_client_cache.cc +++ b/src/v/pandaproxy/sharded_client_cache.cc @@ -43,13 +43,26 @@ ss::future<> sharded_client_cache::start( }); }); + _evict_timer.set_callback([this] { + ssx::spawn_with_gate(_gate, [this] { + return _cache.invoke_on_all( + _smp_opts, + [](kafka_client_cache& cache) { return cache.evict_clients(); }); + }); + }); + return _cache - .start(proxy_client_cfg, client_cache_max_size, client_keep_alive) + .start( + proxy_client_cfg, + client_cache_max_size, + client_keep_alive, + std::reference_wrapper(_evict_timer)) .then([this] { _clean_timer.arm(clean_timer_period); }); } ss::future<> sharded_client_cache::stop() { _clean_timer.cancel(); + _evict_timer.cancel(); co_await _gate.close(); co_await _cache.stop(); } diff --git a/src/v/pandaproxy/sharded_client_cache.h b/src/v/pandaproxy/sharded_client_cache.h index 4efe36a4fc14a..d315b774304f1 100644 --- a/src/v/pandaproxy/sharded_client_cache.h +++ b/src/v/pandaproxy/sharded_client_cache.h @@ -58,5 +58,6 @@ class sharded_client_cache { ss::gate _gate; ss::sharded _cache; ss::timer _clean_timer; + ss::timer _evict_timer; }; } // namespace pandaproxy diff --git a/src/v/pandaproxy/test/kafka_client_cache.cc b/src/v/pandaproxy/test/kafka_client_cache.cc index 81fa5456e837c..e13b61cd1bca4 100644 --- a/src/v/pandaproxy/test/kafka_client_cache.cc +++ b/src/v/pandaproxy/test/kafka_client_cache.cc @@ -13,8 +13,11 @@ #include "config/rest_authn_endpoint.h" #include "kafka/client/configuration.h" #include "pandaproxy/types.h" +#include "ssx/future-util.h" +#include #include +#include #include #include @@ -27,24 +30,40 @@ using namespace std::chrono_literals; namespace pandaproxy { class test_client_cache : public kafka_client_cache { public: - explicit test_client_cache(size_t max_size) + explicit test_client_cache( + size_t max_size, ss::timer& evict_timer) : kafka_client_cache( to_yaml(kafka::client::configuration{}, config::redact_secrets::no), max_size, - 1000ms) {} + 1000ms, + evict_timer) {} +}; + +struct cache_wrapper { + ss::timer evict_timer; + ss::gate g; + test_client_cache client_cache; + + cache_wrapper(size_t max_size) + : client_cache{max_size, std::reference_wrapper(evict_timer)} { + evict_timer.set_callback([this] { + ssx::spawn_with_gate( + g, [this] { return client_cache.evict_clients(); }); + }); + } }; } // namespace pandaproxy namespace pp = pandaproxy; SEASTAR_THREAD_TEST_CASE(cache_make_client) { - pp::test_client_cache client_cache{10}; + pp::cache_wrapper cache_wrap{10}; pp::credential_t user{"red", "panda"}; { // Creating a client with no authn methods results in a kafka // client without a principal - pp::client_ptr client = client_cache.make_client( + pp::client_ptr client = cache_wrap.client_cache.make_client( user, config::rest_authn_method::none); BOOST_TEST(client->config().sasl_mechanism.value() == ""); BOOST_TEST(client->config().scram_username.value() == ""); @@ -54,7 +73,7 @@ SEASTAR_THREAD_TEST_CASE(cache_make_client) { { // Creating a client with http_basic authn type results // in a kafka client with a principal - pp::client_ptr client = client_cache.make_client( + pp::client_ptr client = cache_wrap.client_cache.make_client( user, config::rest_authn_method::http_basic); BOOST_TEST( client->config().sasl_mechanism.value() @@ -67,13 +86,13 @@ SEASTAR_THREAD_TEST_CASE(cache_make_client) { SEASTAR_THREAD_TEST_CASE(cache_fetch_or_insert) { size_t s = 1, max_s = 1; pp::credential_t user{"red", "panda"}; - pp::test_client_cache client_cache{s}; - BOOST_TEST(client_cache.size() == 0); - BOOST_TEST(client_cache.max_size() == max_s); + pp::cache_wrapper cache_wrap{s}; + BOOST_TEST(cache_wrap.client_cache.size() == 0); + BOOST_TEST(cache_wrap.client_cache.max_size() == max_s); // First fetch tests not-found path: cache.size > cache.max_size and cache // is empty - pp::client_ptr client = client_cache.fetch_or_insert( + pp::client_ptr client = cache_wrap.client_cache.fetch_or_insert( user, config::rest_authn_method::http_basic); BOOST_TEST( client->config().sasl_mechanism.value() == ss::sstring{"SCRAM-SHA-256"}); @@ -81,7 +100,7 @@ SEASTAR_THREAD_TEST_CASE(cache_fetch_or_insert) { BOOST_TEST(client->config().scram_password.value() == user.pass); // Second fetch tests found path: user password did not change - client = client_cache.fetch_or_insert( + client = cache_wrap.client_cache.fetch_or_insert( user, config::rest_authn_method::http_basic); BOOST_TEST( client->config().sasl_mechanism.value() == ss::sstring{"SCRAM-SHA-256"}); @@ -92,7 +111,7 @@ SEASTAR_THREAD_TEST_CASE(cache_fetch_or_insert) { user2.pass = "parrot"; // Third fetch tests found path: user password did change // so any refs will have the updated password. - pp::client_ptr client2 = client_cache.fetch_or_insert( + pp::client_ptr client2 = cache_wrap.client_cache.fetch_or_insert( user2, config::rest_authn_method::http_basic); BOOST_TEST( client2->config().sasl_mechanism.value() == ss::sstring{"SCRAM-SHA-256"}); @@ -107,12 +126,12 @@ SEASTAR_THREAD_TEST_CASE(cache_fetch_or_insert) { // Fourth fetch tests not-found path: cache.size == cache.max_size and cache // is not empty The LRU replacement policy takes affect and an element is // evicted - client2 = client_cache.fetch_or_insert( + client2 = cache_wrap.client_cache.fetch_or_insert( user2, config::rest_authn_method::http_basic); BOOST_TEST( client2->config().sasl_mechanism.value() == ss::sstring{"SCRAM-SHA-256"}); BOOST_TEST(client2->config().scram_username.value() == user2.name); BOOST_TEST(client2->config().scram_password.value() == user2.pass); - BOOST_TEST(client_cache.size() == s); - BOOST_TEST(client_cache.max_size() == max_s); + BOOST_TEST(cache_wrap.client_cache.size() == s); + BOOST_TEST(cache_wrap.client_cache.max_size() == max_s); }