Skip to content

Commit

Permalink
pp: trigger eviction when a client is removed
Browse files Browse the repository at this point in the history
Previously client.stop() was not called on the evicted client until
client garbage collection kicked in. This meant that resources were not
freed until some later time. This commit instead triggers the eviction
process when a client is removed from the cache.

Closes #6607
  • Loading branch information
NyaliaLui committed Oct 5, 2022
1 parent 8dd23f7 commit 7ccc5a0
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 18 deletions.
12 changes: 10 additions & 2 deletions src/v/pandaproxy/kafka_client_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
namespace pandaproxy {

kafka_client_cache::kafka_client_cache(
YAML::Node const& cfg, size_t max_size, std::chrono::milliseconds keep_alive)
YAML::Node const& cfg,
size_t max_size,
std::chrono::milliseconds keep_alive,
ss::timer<ss::lowres_clock>& evict_timer)
: _config{cfg}
, _cache_max_size{max_size}
, _keep_alive{keep_alive} {}
, _keep_alive{keep_alive}
, _evict_timer{evict_timer} {}

client_ptr kafka_client_cache::make_client(
credential_t user, config::rest_authn_method authn_method) {
Expand Down Expand Up @@ -59,6 +63,8 @@ client_ptr kafka_client_cache::fetch_or_insert(
vlog(plog.debug, "Cache size reached, evicting {}", item.key);
inner_list.pop_back();
_evicted_items.push_back(std::move(item));
// Trigger the eviction process
_evict_timer.rearm(ss::lowres_clock::now());
}
}

Expand Down Expand Up @@ -129,7 +135,9 @@ ss::future<> kafka_client_cache::clean_stale_clients() {
};
auto& inner_list = _cache.get<underlying_list>();
co_await remove_client_if(inner_list, is_expired(_keep_alive));
}

ss::future<> kafka_client_cache::evict_clients() {
constexpr auto always = [](auto&&) { return true; };
co_await remove_client_if(_evicted_items, always);
}
Expand Down
7 changes: 6 additions & 1 deletion src/v/pandaproxy/kafka_client_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include "config/rest_authn_endpoint.h"
#include "pandaproxy/types.h"

#include <seastar/core/timer.hh>

#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/sequenced_index.hpp>
Expand All @@ -34,7 +36,8 @@ class kafka_client_cache {
kafka_client_cache(
YAML::Node const& cfg,
size_t max_size,
std::chrono::milliseconds keep_alive);
std::chrono::milliseconds keep_alive,
ss::timer<ss::lowres_clock>& evict_timer);

~kafka_client_cache() = default;

Expand All @@ -52,6 +55,7 @@ class kafka_client_cache {
fetch_or_insert(credential_t user, config::rest_authn_method authn_method);

ss::future<> clean_stale_clients();
ss::future<> evict_clients();

size_t size() const;
size_t max_size() const;
Expand Down Expand Up @@ -80,5 +84,6 @@ class kafka_client_cache {
std::chrono::milliseconds _keep_alive;
underlying_t _cache;
std::list<timestamped_user> _evicted_items;
ss::timer<ss::lowres_clock>& _evict_timer;
};
} // namespace pandaproxy
15 changes: 14 additions & 1 deletion src/v/pandaproxy/sharded_client_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,26 @@ ss::future<> sharded_client_cache::start(
});
});

_evict_timer.set_callback([this] {
ssx::spawn_with_gate(_gate, [this] {
return _cache.invoke_on_all(
_smp_opts,
[](kafka_client_cache& cache) { return cache.evict_clients(); });
});
});

return _cache
.start(proxy_client_cfg, client_cache_max_size, client_keep_alive)
.start(
proxy_client_cfg,
client_cache_max_size,
client_keep_alive,
std::reference_wrapper(_evict_timer))
.then([this] { _clean_timer.arm(clean_timer_period); });
}

ss::future<> sharded_client_cache::stop() {
_clean_timer.cancel();
_evict_timer.cancel();
co_await _gate.close();
co_await _cache.stop();
}
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/sharded_client_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,6 @@ class sharded_client_cache {
ss::gate _gate;
ss::sharded<kafka_client_cache> _cache;
ss::timer<ss::lowres_clock> _clean_timer;
ss::timer<ss::lowres_clock> _evict_timer;
};
} // namespace pandaproxy
47 changes: 33 additions & 14 deletions src/v/pandaproxy/test/kafka_client_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
#include "config/rest_authn_endpoint.h"
#include "kafka/client/configuration.h"
#include "pandaproxy/types.h"
#include "ssx/future-util.h"

#include <seastar/core/gate.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/timer.hh>
#include <seastar/testing/thread_test_case.hh>

#include <boost/test/unit_test.hpp>
Expand All @@ -27,24 +30,40 @@ using namespace std::chrono_literals;
namespace pandaproxy {
class test_client_cache : public kafka_client_cache {
public:
explicit test_client_cache(size_t max_size)
explicit test_client_cache(
size_t max_size, ss::timer<ss::lowres_clock>& evict_timer)
: kafka_client_cache(
to_yaml(kafka::client::configuration{}, config::redact_secrets::no),
max_size,
1000ms) {}
1000ms,
evict_timer) {}
};

struct cache_wrapper {
ss::timer<ss::lowres_clock> evict_timer;
ss::gate g;
test_client_cache client_cache;

cache_wrapper(size_t max_size)
: client_cache{max_size, std::reference_wrapper(evict_timer)} {
evict_timer.set_callback([this] {
ssx::spawn_with_gate(
g, [this] { return client_cache.evict_clients(); });
});
}
};
} // namespace pandaproxy

namespace pp = pandaproxy;

SEASTAR_THREAD_TEST_CASE(cache_make_client) {
pp::test_client_cache client_cache{10};
pp::cache_wrapper cache_wrap{10};
pp::credential_t user{"red", "panda"};

{
// Creating a client with no authn methods results in a kafka
// client without a principal
pp::client_ptr client = client_cache.make_client(
pp::client_ptr client = cache_wrap.client_cache.make_client(
user, config::rest_authn_method::none);
BOOST_TEST(client->config().sasl_mechanism.value() == "");
BOOST_TEST(client->config().scram_username.value() == "");
Expand All @@ -54,7 +73,7 @@ SEASTAR_THREAD_TEST_CASE(cache_make_client) {
{
// Creating a client with http_basic authn type results
// in a kafka client with a principal
pp::client_ptr client = client_cache.make_client(
pp::client_ptr client = cache_wrap.client_cache.make_client(
user, config::rest_authn_method::http_basic);
BOOST_TEST(
client->config().sasl_mechanism.value()
Expand All @@ -67,21 +86,21 @@ SEASTAR_THREAD_TEST_CASE(cache_make_client) {
SEASTAR_THREAD_TEST_CASE(cache_fetch_or_insert) {
size_t s = 1, max_s = 1;
pp::credential_t user{"red", "panda"};
pp::test_client_cache client_cache{s};
BOOST_TEST(client_cache.size() == 0);
BOOST_TEST(client_cache.max_size() == max_s);
pp::cache_wrapper cache_wrap{s};
BOOST_TEST(cache_wrap.client_cache.size() == 0);
BOOST_TEST(cache_wrap.client_cache.max_size() == max_s);

// First fetch tests not-found path: cache.size > cache.max_size and cache
// is empty
pp::client_ptr client = client_cache.fetch_or_insert(
pp::client_ptr client = cache_wrap.client_cache.fetch_or_insert(
user, config::rest_authn_method::http_basic);
BOOST_TEST(
client->config().sasl_mechanism.value() == ss::sstring{"SCRAM-SHA-256"});
BOOST_TEST(client->config().scram_username.value() == user.name);
BOOST_TEST(client->config().scram_password.value() == user.pass);

// Second fetch tests found path: user password did not change
client = client_cache.fetch_or_insert(
client = cache_wrap.client_cache.fetch_or_insert(
user, config::rest_authn_method::http_basic);
BOOST_TEST(
client->config().sasl_mechanism.value() == ss::sstring{"SCRAM-SHA-256"});
Expand All @@ -92,7 +111,7 @@ SEASTAR_THREAD_TEST_CASE(cache_fetch_or_insert) {
user2.pass = "parrot";
// Third fetch tests found path: user password did change
// so any refs will have the updated password.
pp::client_ptr client2 = client_cache.fetch_or_insert(
pp::client_ptr client2 = cache_wrap.client_cache.fetch_or_insert(
user2, config::rest_authn_method::http_basic);
BOOST_TEST(
client2->config().sasl_mechanism.value() == ss::sstring{"SCRAM-SHA-256"});
Expand All @@ -107,12 +126,12 @@ SEASTAR_THREAD_TEST_CASE(cache_fetch_or_insert) {
// Fourth fetch tests not-found path: cache.size == cache.max_size and cache
// is not empty The LRU replacement policy takes affect and an element is
// evicted
client2 = client_cache.fetch_or_insert(
client2 = cache_wrap.client_cache.fetch_or_insert(
user2, config::rest_authn_method::http_basic);
BOOST_TEST(
client2->config().sasl_mechanism.value() == ss::sstring{"SCRAM-SHA-256"});
BOOST_TEST(client2->config().scram_username.value() == user2.name);
BOOST_TEST(client2->config().scram_password.value() == user2.pass);
BOOST_TEST(client_cache.size() == s);
BOOST_TEST(client_cache.max_size() == max_s);
BOOST_TEST(cache_wrap.client_cache.size() == s);
BOOST_TEST(cache_wrap.client_cache.max_size() == max_s);
}

0 comments on commit 7ccc5a0

Please sign in to comment.