Skip to content

Commit

Permalink
pp: immediately stop the client upon eviction
Browse files Browse the repository at this point in the history
Previously client.stop() was not called on the evicted client until the
client garbage collection kicked in. This meant that resources were not
freed until some later time. This commit instead forces client.stop
immediately upon eviction.

Fixes #6607
  • Loading branch information
NyaliaLui committed Oct 4, 2022
1 parent 60ee93b commit 5a7bfff
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 29 deletions.
70 changes: 41 additions & 29 deletions src/v/pandaproxy/kafka_client_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,46 @@

namespace pandaproxy {

template<typename List, typename Pred>
ss::future<> remove_client_if(List& list, Pred pred) {
std::list<typename List::value_type> remove;
auto first = list.begin();
auto last = list.end();
while (first != last) {
if (pred(*first)) {
remove.push_back(std::move(*first));
list.erase(first++);
} else {
++first;
}
}
for (auto& item : remove) {
co_await item.client->stop().handle_exception(
[&item](std::exception_ptr ex) {
// The stop failed
vlog(
plog.debug,
"Stale client {} stop already happened {}",
item.key,
ex);
});
}
}

static constexpr auto evict_timer_period = 1s;

kafka_client_cache::kafka_client_cache(
YAML::Node const& cfg, size_t max_size, std::chrono::milliseconds keep_alive)
: _config{cfg}
, _cache_max_size{max_size}
, _keep_alive{keep_alive} {}
, _keep_alive{keep_alive} {
_eviction_timer.set_callback([this] {
constexpr auto always = [](auto&&) { return true; };
ssx::background = remove_client_if(_evicted_items, always);
_eviction_timer.arm(evict_timer_period);
});
_eviction_timer.arm(evict_timer_period);
}

client_ptr kafka_client_cache::make_client(
credential_t user, config::rest_authn_method authn_method) {
Expand Down Expand Up @@ -59,6 +94,9 @@ client_ptr kafka_client_cache::fetch_or_insert(
vlog(plog.debug, "Cache size reached, evicting {}", item.key);
inner_list.pop_back();
_evicted_items.push_back(std::move(item));
// Immediately run the eviction process
_eviction_timer.rearm(
ss::timer<ss::lowres_clock>::time_point(0s));
}
}

Expand Down Expand Up @@ -92,34 +130,6 @@ client_ptr kafka_client_cache::fetch_or_insert(
return client;
}

namespace {
template<typename List, typename Pred>
ss::future<> remove_client_if(List& list, Pred pred) {
std::list<typename List::value_type> remove;
auto first = list.begin();
auto last = list.end();
while (first != last) {
if (pred(*first)) {
remove.push_back(std::move(*first));
list.erase(first++);
} else {
++first;
}
}
for (auto& item : remove) {
co_await item.client->stop().handle_exception(
[&item](std::exception_ptr ex) {
// The stop failed
vlog(
plog.debug,
"Stale client {} stop already happened {}",
item.key,
ex);
});
}
}
} // namespace

ss::future<> kafka_client_cache::clean_stale_clients() {
constexpr auto is_expired = [](std::chrono::milliseconds keep_alive) {
auto now = timestamped_user::clock::now();
Expand All @@ -135,6 +145,8 @@ ss::future<> kafka_client_cache::clean_stale_clients() {
}

ss::future<> kafka_client_cache::stop() {
_eviction_timer.cancel();

constexpr auto always = [](auto&&) { return true; };
auto& inner_list = _cache.get<underlying_list>();
co_await remove_client_if(inner_list, always);
Expand Down
4 changes: 4 additions & 0 deletions src/v/pandaproxy/kafka_client_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
#include "config/rest_authn_endpoint.h"
#include "pandaproxy/types.h"

#include <seastar/core/gate.hh>
#include <seastar/core/timer.hh>

#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/sequenced_index.hpp>
Expand Down Expand Up @@ -80,5 +83,6 @@ class kafka_client_cache {
std::chrono::milliseconds _keep_alive;
underlying_t _cache;
std::list<timestamped_user> _evicted_items;
ss::timer<ss::lowres_clock> _eviction_timer;
};
} // namespace pandaproxy

0 comments on commit 5a7bfff

Please sign in to comment.