Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit memory used while fetching many partitions #10905

Merged
merged 12 commits into from
Jun 7, 2023
102 changes: 85 additions & 17 deletions src/v/config/bounded_property.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include "config/base_property.h"
#include "config/property.h"

#include <optional>

namespace config {

/**
Expand All @@ -24,10 +26,21 @@ namespace detail {
* Traits required for a type to be usable with `numeric_bounds`
*/
template<typename T>
concept numeric = requires(const T& x) {
{ x % x };
{ x < x } -> std::same_as<bool>;
{ x > x } -> std::same_as<bool>;
concept numeric = requires(const T& x, const T& y) {
x - y;
x + y;
x / 2;
{ x < y } -> std::same_as<bool>;
{ x > y } -> std::same_as<bool>;
};

/**
* Traits required for a type to be usable with `numeric_integral_bounds`
*/
template<typename T>
concept numeric_integral = requires(const T& x, const T& y) {
requires numeric<T>;
x % y;
};

/**
Expand Down Expand Up @@ -56,20 +69,36 @@ struct inner_type<T> {
using inner = typename T::value_type;
};

/**
* Traits for the bounds types
*/
template<typename T>
concept bounds = requires(T bounds, const typename T::underlying_t& value) {
{
bounds.min
} -> std::convertible_to<std::optional<typename T::underlying_t>>;
{
bounds.max
} -> std::convertible_to<std::optional<typename T::underlying_t>>;
{ bounds.validate(value) } -> std::same_as<std::optional<ss::sstring>>;
{ bounds.clamp(value) } -> std::same_as<typename T::underlying_t>;
};

} // namespace detail

/**
* Define valid bounds for a numeric configuration property.
* Define valid bounds for an integral numeric configuration property.
*/
template<typename T>
requires detail::numeric<T>
struct numeric_bounds {
requires detail::numeric_integral<T>
struct numeric_integral_bounds {
using underlying_t = T;
std::optional<T> min = std::nullopt;
std::optional<T> max = std::nullopt;
std::optional<T> align = std::nullopt;
std::optional<odd_even_constraint> oddeven = std::nullopt;

T clamp(T& original) {
T clamp(const T& original) const {
T result = original;

if (align.has_value()) {
Expand All @@ -88,7 +117,7 @@ struct numeric_bounds {
return result;
}

std::optional<ss::sstring> validate(T& value) {
std::optional<ss::sstring> validate(const T& value) const {
if (min.has_value() && value < min.value()) {
return fmt::format("too small, must be at least {}", min.value());
} else if (max.has_value() && value > max.value()) {
Expand All @@ -109,7 +138,48 @@ struct numeric_bounds {
}
};

template<typename T, typename I = typename detail::inner_type<T>::inner>
/**
* Define valid bounds for any numeric configuration property.
*/
template<detail::numeric T>
struct numeric_bounds {
using underlying_t = T;
std::optional<T> min = std::nullopt;
std::optional<T> max = std::nullopt;

T clamp(T value) const {
if (min.has_value()) {
value = std::max(min.value(), value);
}
if (max.has_value()) {
value = std::min(max.value(), value);
}
return value;
}

std::optional<ss::sstring> validate(const T& value) const {
if (min.has_value() && value < min.value()) {
return fmt::format("too small, must be at least {}", min.value());
} else if (max.has_value() && value > max.value()) {
return fmt::format("too large, must be at most {}", max.value());
}
return std::nullopt;
}
};

/**
* A property that validates its value against the constraints defined by \p B
*
* \tparam T Underlying property value type
* \tparam B Bounds class, like \ref numeric_integral_bounds or
* \ref numeric_bounds, or user defined that satisfies \ref detail::bounds
* \tparam I Always default
*/
template<
typename T,
template<typename> typename B = numeric_integral_bounds,
typename I = typename detail::inner_type<T>::inner>
requires detail::bounds<B<I>>
class bounded_property : public property<T> {
public:
bounded_property(
Expand All @@ -118,7 +188,7 @@ class bounded_property : public property<T> {
std::string_view desc,
base_property::metadata meta,
T def,
numeric_bounds<I> bounds,
B<I> bounds,
std::optional<legacy_default<T>> legacy = std::nullopt)
: property<T>(
conf,
Expand Down Expand Up @@ -199,11 +269,9 @@ class bounded_property : public property<T> {

if (_bounds.min.has_value() && _bounds.max.has_value()) {
// Take midpoint of min/max and align it.
guess = _bounds.min.value()
+ (_bounds.max.value() - _bounds.min.value()) / 2;
if (_bounds.align.has_value()) {
guess -= guess % _bounds.align.value();
}
dotnwat marked this conversation as resolved.
Show resolved Hide resolved
guess = _bounds.clamp(
_bounds.min.value()
+ (_bounds.max.value() - _bounds.min.value()) / 2);
} else {
if constexpr (reflection::is_std_optional<T>) {
if (property<T>::_default.has_value()) {
Expand All @@ -220,7 +288,7 @@ class bounded_property : public property<T> {
return fmt::format("{}", guess);
}

numeric_bounds<I> _bounds;
B<I> _bounds;

// The example value is stored rather than generated on the fly, to
// satisfy the example() interface that wants a string_view: it
Expand Down
19 changes: 18 additions & 1 deletion src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2074,7 +2074,24 @@ configuration::configuration()
"kafka_schema_id_validation_cache_capacity",
"Per-shard capacity of the cache for validating schema IDs.",
{.needs_restart = needs_restart::no, .visibility = visibility::tunable},
128) {}
128)
, kafka_memory_share_for_fetch(
*this,
"kafka_memory_share_for_fetch",
"The share of kafka subsystem memory that can be used for fetch read "
"buffers, as a fraction of kafka subsystem memory amount",
{.needs_restart = needs_restart::yes, .visibility = visibility::user},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.needs_restart = needs_restart::yes

is this something that we should be able to relax?

Copy link
Contributor Author

@dlex dlex Jun 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can. kafka_memory_sem is never waited upon, so we can increase and decrease it when the setting changes, provided the previous value is known. Maybe this can be left for future improvements?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can definitely do this as a follow up ticket. just wondering if it would work alright because stuff like this feels like it could be an important property to change at runtime on occasion.

0.5,
{.min = 0.0, .max = 1.0})
, kafka_memory_batch_size_estimate_for_fetch(
*this,
"kafka_memory_batch_size_estimate_for_fetch",
"The size of the batch used to estimate memory consumption for Fetch "
"requests, in bytes. Smaller sizes allow more concurrent fetch requests "
"per shard, larger sizes prevent running out of memory because of too "
"many concurrent fetch requests.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
1_MiB) {}

configuration::error_map_t configuration::load(const YAML::Node& root_node) {
if (!root_node["redpanda"]) {
Expand Down
3 changes: 3 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,9 @@ struct configuration final : public config_store {
// schema id validation
config::property<size_t> kafka_schema_id_validation_cache_capacity;

bounded_property<double, numeric_bounds> kafka_memory_share_for_fetch;
property<size_t> kafka_memory_batch_size_estimate_for_fetch;

configuration();

error_map_t load(const YAML::Node& root_node);
Expand Down
70 changes: 67 additions & 3 deletions src/v/config/tests/bounded_property_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,20 @@

#include <seastar/testing/thread_test_case.hh>

#include <optional>

static_assert(config::detail::bounds<config::numeric_integral_bounds<int>>);
static_assert(config::detail::bounds<config::numeric_bounds<double>>);

namespace {

struct test_config : public config::config_store {
config::bounded_property<int32_t> bounded_int;
config::bounded_property<std::optional<int32_t>> bounded_int_opt;
config::bounded_property<int16_t> odd_constraint;
config::bounded_property<double, config::numeric_bounds> bounded_double;
config::bounded_property<std::optional<double>, config::numeric_bounds>
bounded_double_opt;

test_config()
: bounded_int(
Expand All @@ -40,10 +48,24 @@ struct test_config : public config::config_store {
"Property value has to be odd",
{},
1,
{.oddeven = config::odd_even_constraint::odd}) {}
{.oddeven = config::odd_even_constraint::odd})
, bounded_double(
*this,
"bounded_double",
"A float with some bounds set",
{},
1.618033988749,
{.min = -1, .max = 2.236067977})
, bounded_double_opt(
*this,
"bounded_double_opt",
"An options float with some bounds set",
{},
std::nullopt,
{.min = -1, .max = 2.236067977}) {}
};

SEASTAR_THREAD_TEST_CASE(numeric_bounds) {
SEASTAR_THREAD_TEST_CASE(numeric_integral_bounds) {
auto cfg = test_config();

// We are checking numeric bounds, not YAML syntax/type validation. The
Expand Down Expand Up @@ -101,4 +123,46 @@ SEASTAR_THREAD_TEST_CASE(numeric_bounds) {
BOOST_CHECK(cfg.bounded_int() == 8192);
}

} // namespace
SEASTAR_THREAD_TEST_CASE(numeric_fp_bounds) {
auto cfg = test_config();

// We are checking numeric bounds, not YAML syntax/type validation. The
// latter shows up as exceptions from validate/set_value, rather than
// as structured errors, and is not covered by this test.
auto valid_values = {"-1", "-0.1", "1.618033988749", "2", "2.236067977"};
auto invalid_values = {
"-1000.9", "-1.0001", "3", "4095", "4097", "32769", "2000000000"};

std::optional<config::validation_error> verr;

for (const auto& v : valid_values) {
verr = cfg.bounded_double.validate(YAML::Load(v));
BOOST_TEST(!verr.has_value());

verr = cfg.bounded_double_opt.validate(YAML::Load(v));
BOOST_TEST(!verr.has_value());
}

for (const auto& v : invalid_values) {
verr = cfg.bounded_double.validate(YAML::Load(v));
BOOST_TEST(verr.has_value());

verr = cfg.bounded_double_opt.validate(YAML::Load(v));
BOOST_TEST(verr.has_value());
}

// Optional variant should also always consider nullopt to be valid.
verr = cfg.bounded_double_opt.validate(std::nullopt);
BOOST_TEST(!verr.has_value());

// # Invalid values should be clamped by set_value
// Too low: clamp to minimum
cfg.bounded_double.set_value(YAML::Load("-2"));
BOOST_TEST(cfg.bounded_double() == -1);

// Too high: clamp to maximum
cfg.bounded_double.set_value(YAML::Load("1000000"));
BOOST_TEST(cfg.bounded_double() == 2.236067977);
}

} // namespace
1 change: 1 addition & 0 deletions src/v/kafka/server/connection_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class connection_context final
connection_context& operator=(const connection_context&) = delete;
connection_context& operator=(connection_context&&) = delete;

/// The instance of \ref kafka::server on the shard serving the connection
server& server() { return _server; }
const ss::sstring& listener() const { return conn->name(); }
std::optional<security::sasl_server>& sasl() { return _sasl; }
Expand Down
Loading