diff --git a/src/v/config/bounded_property.h b/src/v/config/bounded_property.h index 095e4087375d..83a0f2eb2ed5 100644 --- a/src/v/config/bounded_property.h +++ b/src/v/config/bounded_property.h @@ -12,6 +12,8 @@ #include "config/base_property.h" #include "config/property.h" +#include + namespace config { /** @@ -24,10 +26,21 @@ namespace detail { * Traits required for a type to be usable with `numeric_bounds` */ template -concept numeric = requires(const T& x) { - { x % x }; - { x < x } -> std::same_as; - { x > x } -> std::same_as; +concept numeric = requires(const T& x, const T& y) { + x - y; + x + y; + x / 2; + { x < y } -> std::same_as; + { x > y } -> std::same_as; +}; + +/** + * Traits required for a type to be usable with `numeric_integral_bounds` + */ +template +concept numeric_integral = requires(const T& x, const T& y) { + requires numeric; + x % y; }; /** @@ -56,20 +69,36 @@ struct inner_type { using inner = typename T::value_type; }; +/** + * Traits for the bounds types + */ +template +concept bounds = requires(T bounds, const typename T::underlying_t& value) { + { + bounds.min + } -> std::convertible_to>; + { + bounds.max + } -> std::convertible_to>; + { bounds.validate(value) } -> std::same_as>; + { bounds.clamp(value) } -> std::same_as; +}; + } // namespace detail /** - * Define valid bounds for a numeric configuration property. + * Define valid bounds for an integral numeric configuration property. */ template -requires detail::numeric -struct numeric_bounds { +requires detail::numeric_integral +struct numeric_integral_bounds { + using underlying_t = T; std::optional min = std::nullopt; std::optional max = std::nullopt; std::optional align = std::nullopt; std::optional oddeven = std::nullopt; - T clamp(T& original) { + T clamp(const T& original) const { T result = original; if (align.has_value()) { @@ -88,7 +117,7 @@ struct numeric_bounds { return result; } - std::optional validate(T& value) { + std::optional validate(const T& value) const { if (min.has_value() && value < min.value()) { return fmt::format("too small, must be at least {}", min.value()); } else if (max.has_value() && value > max.value()) { @@ -109,7 +138,48 @@ struct numeric_bounds { } }; -template::inner> +/** + * Define valid bounds for any numeric configuration property. + */ +template +struct numeric_bounds { + using underlying_t = T; + std::optional min = std::nullopt; + std::optional max = std::nullopt; + + T clamp(T value) const { + if (min.has_value()) { + value = std::max(min.value(), value); + } + if (max.has_value()) { + value = std::min(max.value(), value); + } + return value; + } + + std::optional validate(const T& value) const { + if (min.has_value() && value < min.value()) { + return fmt::format("too small, must be at least {}", min.value()); + } else if (max.has_value() && value > max.value()) { + return fmt::format("too large, must be at most {}", max.value()); + } + return std::nullopt; + } +}; + +/** + * A property that validates its value against the constraints defined by \p B + * + * \tparam T Underlying property value type + * \tparam B Bounds class, like \ref numeric_integral_bounds or + * \ref numeric_bounds, or user defined that satisfies \ref detail::bounds + * \tparam I Always default + */ +template< + typename T, + template typename B = numeric_integral_bounds, + typename I = typename detail::inner_type::inner> +requires detail::bounds> class bounded_property : public property { public: bounded_property( @@ -118,7 +188,7 @@ class bounded_property : public property { std::string_view desc, base_property::metadata meta, T def, - numeric_bounds bounds, + B bounds, std::optional> legacy = std::nullopt) : property( conf, @@ -199,11 +269,9 @@ class bounded_property : public property { if (_bounds.min.has_value() && _bounds.max.has_value()) { // Take midpoint of min/max and align it. - guess = _bounds.min.value() - + (_bounds.max.value() - _bounds.min.value()) / 2; - if (_bounds.align.has_value()) { - guess -= guess % _bounds.align.value(); - } + guess = _bounds.clamp( + _bounds.min.value() + + (_bounds.max.value() - _bounds.min.value()) / 2); } else { if constexpr (reflection::is_std_optional) { if (property::_default.has_value()) { @@ -220,7 +288,7 @@ class bounded_property : public property { return fmt::format("{}", guess); } - numeric_bounds _bounds; + B _bounds; // The example value is stored rather than generated on the fly, to // satisfy the example() interface that wants a string_view: it diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 17f209180aa3..285aba31a92e 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2074,7 +2074,24 @@ configuration::configuration() "kafka_schema_id_validation_cache_capacity", "Per-shard capacity of the cache for validating schema IDs.", {.needs_restart = needs_restart::no, .visibility = visibility::tunable}, - 128) {} + 128) + , kafka_memory_share_for_fetch( + *this, + "kafka_memory_share_for_fetch", + "The share of kafka subsystem memory that can be used for fetch read " + "buffers, as a fraction of kafka subsystem memory amount", + {.needs_restart = needs_restart::yes, .visibility = visibility::user}, + 0.5, + {.min = 0.0, .max = 1.0}) + , kafka_memory_batch_size_estimate_for_fetch( + *this, + "kafka_memory_batch_size_estimate_for_fetch", + "The size of the batch used to estimate memory consumption for Fetch " + "requests, in bytes. Smaller sizes allow more concurrent fetch requests " + "per shard, larger sizes prevent running out of memory because of too " + "many concurrent fetch requests.", + {.needs_restart = needs_restart::no, .visibility = visibility::user}, + 1_MiB) {} configuration::error_map_t configuration::load(const YAML::Node& root_node) { if (!root_node["redpanda"]) { diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index af88b0a271cc..2636b1a4a43a 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -420,6 +420,9 @@ struct configuration final : public config_store { // schema id validation config::property kafka_schema_id_validation_cache_capacity; + bounded_property kafka_memory_share_for_fetch; + property kafka_memory_batch_size_estimate_for_fetch; + configuration(); error_map_t load(const YAML::Node& root_node); diff --git a/src/v/config/tests/bounded_property_test.cc b/src/v/config/tests/bounded_property_test.cc index 22ba37f8c18e..85100043de2a 100644 --- a/src/v/config/tests/bounded_property_test.cc +++ b/src/v/config/tests/bounded_property_test.cc @@ -12,12 +12,20 @@ #include +#include + +static_assert(config::detail::bounds>); +static_assert(config::detail::bounds>); + namespace { struct test_config : public config::config_store { config::bounded_property bounded_int; config::bounded_property> bounded_int_opt; config::bounded_property odd_constraint; + config::bounded_property bounded_double; + config::bounded_property, config::numeric_bounds> + bounded_double_opt; test_config() : bounded_int( @@ -40,10 +48,24 @@ struct test_config : public config::config_store { "Property value has to be odd", {}, 1, - {.oddeven = config::odd_even_constraint::odd}) {} + {.oddeven = config::odd_even_constraint::odd}) + , bounded_double( + *this, + "bounded_double", + "A float with some bounds set", + {}, + 1.618033988749, + {.min = -1, .max = 2.236067977}) + , bounded_double_opt( + *this, + "bounded_double_opt", + "An options float with some bounds set", + {}, + std::nullopt, + {.min = -1, .max = 2.236067977}) {} }; -SEASTAR_THREAD_TEST_CASE(numeric_bounds) { +SEASTAR_THREAD_TEST_CASE(numeric_integral_bounds) { auto cfg = test_config(); // We are checking numeric bounds, not YAML syntax/type validation. The @@ -101,4 +123,46 @@ SEASTAR_THREAD_TEST_CASE(numeric_bounds) { BOOST_CHECK(cfg.bounded_int() == 8192); } -} // namespace \ No newline at end of file +SEASTAR_THREAD_TEST_CASE(numeric_fp_bounds) { + auto cfg = test_config(); + + // We are checking numeric bounds, not YAML syntax/type validation. The + // latter shows up as exceptions from validate/set_value, rather than + // as structured errors, and is not covered by this test. + auto valid_values = {"-1", "-0.1", "1.618033988749", "2", "2.236067977"}; + auto invalid_values = { + "-1000.9", "-1.0001", "3", "4095", "4097", "32769", "2000000000"}; + + std::optional verr; + + for (const auto& v : valid_values) { + verr = cfg.bounded_double.validate(YAML::Load(v)); + BOOST_TEST(!verr.has_value()); + + verr = cfg.bounded_double_opt.validate(YAML::Load(v)); + BOOST_TEST(!verr.has_value()); + } + + for (const auto& v : invalid_values) { + verr = cfg.bounded_double.validate(YAML::Load(v)); + BOOST_TEST(verr.has_value()); + + verr = cfg.bounded_double_opt.validate(YAML::Load(v)); + BOOST_TEST(verr.has_value()); + } + + // Optional variant should also always consider nullopt to be valid. + verr = cfg.bounded_double_opt.validate(std::nullopt); + BOOST_TEST(!verr.has_value()); + + // # Invalid values should be clamped by set_value + // Too low: clamp to minimum + cfg.bounded_double.set_value(YAML::Load("-2")); + BOOST_TEST(cfg.bounded_double() == -1); + + // Too high: clamp to maximum + cfg.bounded_double.set_value(YAML::Load("1000000")); + BOOST_TEST(cfg.bounded_double() == 2.236067977); +} + +} // namespace diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 0d55c9778761..5e9850b6526a 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -136,6 +136,7 @@ class connection_context final connection_context& operator=(const connection_context&) = delete; connection_context& operator=(connection_context&&) = delete; + /// The instance of \ref kafka::server on the shard serving the connection server& server() { return _server; } const ss::sstring& listener() const { return conn->name(); } std::optional& sasl() { return _sasl; } diff --git a/src/v/kafka/server/handlers/fetch.cc b/src/v/kafka/server/handlers/fetch.cc index a3a5826dd8d9..3b8a62f24898 100644 --- a/src/v/kafka/server/handlers/fetch.cc +++ b/src/v/kafka/server/handlers/fetch.cc @@ -31,6 +31,7 @@ #include "model/timeout_clock.h" #include "random/generators.h" #include "resource_mgmt/io_priority.h" +#include "ssx/semaphore.h" #include "storage/parser_utils.h" #include "utils/to_string.h" @@ -142,6 +143,100 @@ static ss::future read_from_partition( std::move(aborted_transactions)); } +read_result::memory_units_t::~memory_units_t() noexcept { + if (shard == ss::this_shard_id()) { + return; + } + auto f = ss::smp::submit_to( + shard, [uk = std::move(kafka), uf = std::move(fetch)]() mutable noexcept { + uk.return_all(); + uf.return_all(); + }); + if (!f.available()) { + ss::engine().run_in_background(std::move(f)); + } +} + +/** + * Consume proper amounts of units from memory semaphores and return them as + * semaphore_units. Fetch semaphore units returned are the indication of + * available resources: if none, there is no memory for the operation; + * if less than \p max_bytes, the fetch should be capped to that size. + * + * \param max_bytes The limit of how much data is going to be fetched + * \param obligatory_batch_read Set to true for the first ntp in the fetch + * fetch request, at least one batch must be fetched for that ntp. Also it + * is assumed that a batch size has already been consumed from kafka + * memory semaphore for it. + */ +static read_result::memory_units_t reserve_memory_units( + ssx::semaphore& memory_sem, + ssx::semaphore& memory_fetch_sem, + const size_t max_bytes, + const bool obligatory_batch_read) { + read_result::memory_units_t memory_units; + const size_t memory_kafka_now = memory_sem.current(); + const size_t memory_fetch = memory_fetch_sem.current(); + const size_t batch_size_estimate + = config::shard_local_cfg().kafka_memory_batch_size_estimate_for_fetch(); + + if (obligatory_batch_read) { + // cap what we want at what we have, but no further down than a single + // batch size - with \ref obligatory_batch_read, it must be fetched + // regardless + const size_t fetch_size = std::max( + batch_size_estimate, + std::min({max_bytes, memory_kafka_now, memory_fetch})); + memory_units.fetch = ss::consume_units(memory_fetch_sem, fetch_size); + memory_units.kafka = ss::consume_units(memory_sem, fetch_size); + } else { + // max_bytes is how much we prepare to read from this ntp, but no less + // than one full batch + const size_t requested_fetch_size = std::max( + max_bytes, batch_size_estimate); + // cap what we want at what we have + const size_t fetch_size = std::min( + {requested_fetch_size, memory_kafka_now, memory_fetch}); + // only reserve memory if we have space for at least one batch, + // otherwise this ntp will be skipped + if (fetch_size >= batch_size_estimate) { + memory_units.fetch = ss::consume_units( + memory_fetch_sem, fetch_size); + memory_units.kafka = ss::consume_units(memory_sem, fetch_size); + } + } + + return memory_units; +} + +/** + * Make the \p units hold exactly \p target_bytes, by consuming more units + * from \p sem or by returning extra units back. + */ +static void adjust_semaphore_units( + ssx::semaphore& sem, ssx::semaphore_units& units, const size_t target_bytes) { + if (target_bytes < units.count()) { + units.return_units(units.count() - target_bytes); + } + if (target_bytes > units.count()) { + units.adopt(ss::consume_units(sem, target_bytes - units.count())); + } +} + +/** + * Memory units have been reserved before the read op based on an estimation. + * Now when we know how much data has actually been read, return any extra + * amount. + */ +static void adjust_memory_units( + ssx::semaphore& memory_sem, + ssx::semaphore& memory_fetch_sem, + read_result::memory_units_t& memory_units, + const size_t read_bytes) { + adjust_semaphore_units(memory_sem, memory_units.kafka, read_bytes); + adjust_semaphore_units(memory_fetch_sem, memory_units.fetch, read_bytes); +} + /** * Entry point for reading from an ntp. This is executed on NTP home core and * build error responses if anything goes wrong. @@ -151,7 +246,25 @@ static ss::future do_read_from_ntp( const replica_selector& replica_selector, ntp_fetch_config ntp_config, bool foreign_read, - std::optional deadline) { + std::optional deadline, + const bool obligatory_batch_read, + ssx::semaphore& memory_sem, + ssx::semaphore& memory_fetch_sem) { + // control available memory + read_result::memory_units_t memory_units; + if (!ntp_config.cfg.skip_read) { + memory_units = reserve_memory_units( + memory_sem, + memory_fetch_sem, + ntp_config.cfg.max_bytes, + obligatory_batch_read); + if (!memory_units.fetch) { + ntp_config.cfg.skip_read = true; + } else if (ntp_config.cfg.max_bytes > memory_units.fetch.count()) { + ntp_config.cfg.max_bytes = memory_units.fetch.count(); + } + } + /* * lookup the ntp's partition */ @@ -235,25 +348,49 @@ static ss::future do_read_from_ntp( preferred_replica); } } - co_return co_await read_from_partition( + read_result result = co_await read_from_partition( std::move(*kafka_partition), ntp_config.cfg, foreign_read, deadline); + + adjust_memory_units( + memory_sem, memory_fetch_sem, memory_units, result.data_size_bytes()); + result.memory_units = std::move(memory_units); + co_return result; } +namespace testing { + ss::future read_from_ntp( cluster::partition_manager& cluster_pm, const replica_selector& replica_selector, const model::ktp& ktp, fetch_config config, bool foreign_read, - std::optional deadline) { + std::optional deadline, + const bool obligatory_batch_read, + ssx::semaphore& memory_sem, + ssx::semaphore& memory_fetch_sem) { return do_read_from_ntp( cluster_pm, replica_selector, {ktp, std::move(config)}, foreign_read, - deadline); + deadline, + obligatory_batch_read, + memory_sem, + memory_fetch_sem); } +read_result::memory_units_t reserve_memory_units( + ssx::semaphore& memory_sem, + ssx::semaphore& memory_fetch_sem, + const size_t max_bytes, + const bool obligatory_batch_read) { + return kafka::reserve_memory_units( + memory_sem, memory_fetch_sem, max_bytes, obligatory_batch_read); +} + +} // namespace testing + static void fill_fetch_responses( op_context& octx, std::vector results, @@ -348,14 +485,19 @@ static ss::future> fetch_ntps_in_parallel( const replica_selector& replica_selector, std::vector ntp_fetch_configs, bool foreign_read, - std::optional deadline) { + std::optional deadline, + const size_t bytes_left, + ssx::semaphore& memory_sem, + ssx::semaphore& memory_fetch_sem) { size_t total_max_bytes = 0; for (const auto& c : ntp_fetch_configs) { total_max_bytes += c.cfg.max_bytes; } - auto max_bytes_per_fetch - = config::shard_local_cfg().kafka_max_bytes_per_fetch(); + // bytes_left comes from the fetch plan and also accounts for the max_bytes + // field in the fetch request + const size_t max_bytes_per_fetch = std::min( + config::shard_local_cfg().kafka_max_bytes_per_fetch(), bytes_left); if (total_max_bytes > max_bytes_per_fetch) { auto per_partition = max_bytes_per_fetch / ntp_fetch_configs.size(); vlog( @@ -370,17 +512,26 @@ static ss::future> fetch_ntps_in_parallel( } } + const auto first_p_id = ntp_fetch_configs.front().ktp().get_partition(); auto results = co_await ssx::parallel_transform( std::move(ntp_fetch_configs), - [&cluster_pm, &replica_selector, deadline, foreign_read]( - const ntp_fetch_config& ntp_cfg) { + [&cluster_pm, + &replica_selector, + deadline, + foreign_read, + first_p_id, + &memory_sem, + &memory_fetch_sem](const ntp_fetch_config& ntp_cfg) { auto p_id = ntp_cfg.ktp().get_partition(); return do_read_from_ntp( cluster_pm, replica_selector, ntp_cfg, foreign_read, - deadline) + deadline, + first_p_id == p_id, + memory_sem, + memory_fetch_sem) .then([p_id](read_result res) { res.partition = p_id; return res; @@ -429,23 +580,27 @@ handle_shard_fetch(ss::shard_id shard, op_context& octx, shard_fetch fetch) { return ss::now(); } - bool foreign_read = shard != ss::this_shard_id(); + const bool foreign_read = shard != ss::this_shard_id(); // dispatch to remote core return octx.rctx.partition_manager() .invoke_on( shard, octx.ssg, - [foreign_read, - deadline = octx.deadline, - configs = std::move(fetch.requests), - &octx](cluster::partition_manager& mgr) mutable { + [foreign_read, configs = std::move(fetch.requests), &octx]( + cluster::partition_manager& mgr) mutable { + // &octx is captured only to immediately use its accessors here so + // that there is a list of all objects accessed next to `invoke_on`. + // This is meant to help avoiding unintended cross shard access return fetch_ntps_in_parallel( mgr, - octx.rctx.replica_selector(), + octx.rctx.server().local().get_replica_selector(), std::move(configs), foreign_read, - deadline); + octx.deadline, + octx.bytes_left, + octx.rctx.server().local().memory(), + octx.rctx.server().local().memory_fetch_sem()); }) .then([responses = std::move(fetch.responses), start_time = fetch.start_time, diff --git a/src/v/kafka/server/handlers/fetch.h b/src/v/kafka/server/handlers/fetch.h index 84efab7dea25..0b14470abbad 100644 --- a/src/v/kafka/server/handlers/fetch.h +++ b/src/v/kafka/server/handlers/fetch.h @@ -207,6 +207,23 @@ struct read_result { using foreign_data_t = ss::foreign_ptr>; using data_t = std::unique_ptr; using variant_t = std::variant; + + /// Holds semaphore units from memory semaphores. Can be passed across + /// shards, semaphore units will be released in the shard where the instance + /// of this class has been created. + struct memory_units_t { + ssx::semaphore_units kafka; + ssx::semaphore_units fetch; + ss::shard_id shard = ss::this_shard_id(); + + ~memory_units_t() noexcept; + memory_units_t() noexcept = default; + memory_units_t(memory_units_t&&) noexcept = default; + memory_units_t& operator=(memory_units_t&&) noexcept = default; + memory_units_t(const memory_units_t&) = delete; + memory_units_t& operator=(const memory_units_t&) = delete; + }; + explicit read_result(error_code e) : error(e) {} @@ -284,6 +301,7 @@ struct read_result { error_code error; model::partition_id partition; std::vector aborted_transactions; + memory_units_t memory_units; }; // struct aggregating fetch requests and corresponding response iterators for // the same shard @@ -349,21 +367,34 @@ struct fetch_plan { } }; +/* + * Unit Tests Exposure + */ +namespace testing { + ss::future read_from_ntp( cluster::partition_manager&, const replica_selector&, const model::ktp&, fetch_config, bool, - std::optional); + std::optional, + bool obligatory_batch_read, + ssx::semaphore& memory_sem, + ssx::semaphore& memory_fetch_sem); -namespace testing { /** * Create a fetch plan with the simple fetch planner. * * Exposed for testing/benchmarking only. */ kafka::fetch_plan make_simple_fetch_plan(op_context& octx); -} // namespace testing +read_result::memory_units_t reserve_memory_units( + ssx::semaphore& memory_sem, + ssx::semaphore& memory_fetch_sem, + const size_t max_bytes, + const bool obligatory_batch_read); + +} // namespace testing } // namespace kafka diff --git a/src/v/kafka/server/request_context.h b/src/v/kafka/server/request_context.h index 10c0da017356..f5e17c9918b5 100644 --- a/src/v/kafka/server/request_context.h +++ b/src/v/kafka/server/request_context.h @@ -242,9 +242,7 @@ class request_context { return _conn->server().controller_api(); } - const replica_selector& replica_selector() const { - return _conn->server().get_replica_selector(); - } + ss::sharded& server() { return _conn->server().container(); } private: template diff --git a/src/v/kafka/server/server.cc b/src/v/kafka/server/server.cc index edbab0eee642..a507dd69c952 100644 --- a/src/v/kafka/server/server.cc +++ b/src/v/kafka/server/server.cc @@ -133,17 +133,44 @@ server::server( , _gssapi_principal_mapper( config::shard_local_cfg().sasl_kerberos_principal_mapping.bind()) , _krb_configurator(config::shard_local_cfg().sasl_kerberos_config.bind()) + , _memory_fetch_sem( + static_cast( + cfg->local().max_service_memory_per_core + * config::shard_local_cfg().kafka_memory_share_for_fetch()), + "kafka/server-mem-fetch") , _thread_worker(tw) , _replica_selector( std::make_unique(_metadata_cache.local())) , _schema_registry(sr) { + vlog( + klog.debug, + "Starting kafka server with {} byte limit on fetch requests", + _memory_fetch_sem.available_units()); if (qdc_config) { _qdc_mon.emplace(*qdc_config); } + setup_metrics(); _probe.setup_metrics(); _probe.setup_public_metrics(); } +void server::setup_metrics() { + namespace sm = ss::metrics; + if (config::shard_local_cfg().disable_metrics()) { + return; + } + + _metrics.add_group( + prometheus_sanitize::metrics_name(cfg.name), + { + sm::make_total_bytes( + "fetch_avail_mem_bytes", + [this] { return _memory_fetch_sem.current(); }, + sm::description(ssx::sformat( + "{}: Memory available for fetch request processing", cfg.name))), + }); +} + ss::scheduling_group server::fetch_scheduling_group() const { return config::shard_local_cfg().use_fetch_scheduler_group() ? _fetch_scheduling_group diff --git a/src/v/kafka/server/server.h b/src/v/kafka/server/server.h index 02348c71a448..4d2599e9f83b 100644 --- a/src/v/kafka/server/server.h +++ b/src/v/kafka/server/server.h @@ -38,7 +38,9 @@ namespace kafka { -class server final : public net::server { +class server final + : public net::server + , public ss::peering_sharded_service { public: server( ss::sharded*, @@ -181,7 +183,11 @@ class server final : public net::server { return _handler_probes.get_probe(key); } + ssx::semaphore& memory_fetch_sem() noexcept { return _memory_fetch_sem; } + private: + void setup_metrics(); + ss::smp_service_group _smp_group; ss::scheduling_group _fetch_scheduling_group; ss::sharded& _topics_frontend; @@ -209,10 +215,11 @@ class server final : public net::server { security::tls::principal_mapper _mtls_principal_mapper; security::gssapi_principal_mapper _gssapi_principal_mapper; security::krb5::configurator _krb_configurator; + ssx::semaphore _memory_fetch_sem; handler_probe_manager _handler_probes; - class latency_probe _probe; + ss::metrics::metric_groups _metrics; ssx::thread_worker& _thread_worker; std::unique_ptr _replica_selector; const std::unique_ptr& _schema_registry; diff --git a/src/v/kafka/server/tests/CMakeLists.txt b/src/v/kafka/server/tests/CMakeLists.txt index a7bdbb59bea7..257e3ab29a0e 100644 --- a/src/v/kafka/server/tests/CMakeLists.txt +++ b/src/v/kafka/server/tests/CMakeLists.txt @@ -9,6 +9,7 @@ rp_test( handler_interface_test.cc quota_managers_test.cc validator_tests.cc + fetch_unit_test.cc DEFINITIONS BOOST_TEST_DYN_LINK LIBRARIES Boost::unit_test_framework v::kafka v::coproc LABELS kafka diff --git a/src/v/kafka/server/tests/fetch_test.cc b/src/v/kafka/server/tests/fetch_test.cc index caa8e4c7d589..0ca145c6aff5 100644 --- a/src/v/kafka/server/tests/fetch_test.cc +++ b/src/v/kafka/server/tests/fetch_test.cc @@ -168,19 +168,25 @@ FIXTURE_TEST(read_from_ntp_max_bytes, redpanda_thread_fixture) { auto octx = kafka::op_context( std::move(rctx), ss::default_smp_service_group()); auto shard = octx.rctx.shards().shard_for(ktp).value(); - return octx.rctx.partition_manager() - .invoke_on( - shard, - [ktp, config, &octx](cluster::partition_manager& pm) { - return kafka::read_from_ntp( - pm, - octx.rctx.replica_selector(), - ktp, - config, - true, - model::no_timeout); - }) - .get0(); + kafka::read_result res + = octx.rctx.partition_manager() + .invoke_on( + shard, + [&octx, ktp, config](cluster::partition_manager& pm) { + return kafka::testing::read_from_ntp( + pm, + octx.rctx.server().local().get_replica_selector(), + ktp, + config, + true, + model::no_timeout, + false, + octx.rctx.server().local().memory(), + octx.rctx.server().local().memory_fetch_sem()); + }) + .get0(); + BOOST_TEST_REQUIRE(res.has_data()); + return res; }; wait_for_controller_leadership().get0(); auto ntp = make_data(get_next_partition_revision_id().get()); diff --git a/src/v/kafka/server/tests/fetch_unit_test.cc b/src/v/kafka/server/tests/fetch_unit_test.cc new file mode 100644 index 000000000000..34ca55eed0a9 --- /dev/null +++ b/src/v/kafka/server/tests/fetch_unit_test.cc @@ -0,0 +1,147 @@ +// Copyright 2023 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "kafka/server/handlers/fetch.h" +#include "seastarx.h" +#include "ssx/semaphore.h" + +#include +#include + +struct reserve_mem_units_test_result { + size_t kafka, fetch; + explicit reserve_mem_units_test_result(size_t size) + : kafka(size) + , fetch(size) {} + reserve_mem_units_test_result(size_t kafka_, size_t fetch_) + : kafka(kafka_) + , fetch(fetch_) {} + friend bool operator==( + const reserve_mem_units_test_result&, + const reserve_mem_units_test_result&) + = default; + friend std::ostream& + operator<<(std::ostream& s, const reserve_mem_units_test_result& v) { + return s << "{kafka: " << v.kafka << ", fetch: " << v.fetch << "}"; + } +}; + +BOOST_AUTO_TEST_CASE(reserve_memory_units_test) { + using namespace kafka; + using namespace std::chrono_literals; + using r = reserve_mem_units_test_result; + + // reserve memory units, return how many memory units have been reserved + // from each memory semaphore + ssx::semaphore memory_sem{100_MiB, "test_memory_sem"}; + ssx::semaphore memory_fetch_sem{50_MiB, "test_memory_fetch_sem"}; + const auto test_case = + [&memory_sem, &memory_fetch_sem]( + size_t max_bytes, + bool obligatory_batch_read) -> reserve_mem_units_test_result { + auto mu = kafka::testing::reserve_memory_units( + memory_sem, memory_fetch_sem, max_bytes, obligatory_batch_read); + return {mu.kafka.count(), mu.fetch.count()}; + }; + + static constexpr size_t batch_size = 1_MiB; + + // below are test prerequisites, tests are done based on these assumptions + // if these are not valid, the test needs a change + size_t kafka_mem = memory_sem.available_units(); + size_t fetch_mem = memory_fetch_sem.available_units(); + BOOST_TEST(fetch_mem > batch_size * 3); + BOOST_TEST_REQUIRE(kafka_mem > fetch_mem); + BOOST_TEST_REQUIRE(batch_size > 100); + + // *** plenty of memory cases + // kafka_mem > fetch_mem > batch_size + // Reserved memory is limited by the fetch memory semaphore + BOOST_TEST(test_case(batch_size / 100, false) == r(batch_size)); + BOOST_TEST(test_case(batch_size / 100, true) == r(batch_size)); + BOOST_TEST(test_case(batch_size, false) == r(batch_size)); + BOOST_TEST(test_case(batch_size, true) == r(batch_size)); + BOOST_TEST(test_case(batch_size * 3, false) == r(batch_size * 3)); + BOOST_TEST(test_case(batch_size * 3, true) == r(batch_size * 3)); + BOOST_TEST(test_case(fetch_mem, false) == r(fetch_mem)); + BOOST_TEST(test_case(fetch_mem, true) == r(fetch_mem)); + BOOST_TEST(test_case(fetch_mem + 1, false) == r(fetch_mem)); + BOOST_TEST(test_case(fetch_mem + 1, true) == r(fetch_mem)); + BOOST_TEST(test_case(kafka_mem, false) == r(fetch_mem)); + BOOST_TEST(test_case(kafka_mem, true) == r(fetch_mem)); + + // *** still a lot of mem but kafka mem somewhat used: + // fetch_mem > kafka_mem > batch_size (fetch_mem - kafka_mem < batch_size) + // Obligatory reads to not come into play yet because we still have more + // memory than a single batch, but the amount of memory reserved is limited + // by the smaller semaphore, which is kafka_mem in this case + auto memsemunits = ss::consume_units( + memory_sem, kafka_mem - fetch_mem + 1000); + kafka_mem = memory_sem.available_units(); + BOOST_TEST_REQUIRE(kafka_mem < fetch_mem); + BOOST_TEST_REQUIRE(kafka_mem > batch_size + 1000); + + BOOST_TEST(test_case(batch_size, false) == r(batch_size)); + BOOST_TEST(test_case(batch_size, true) == r(batch_size)); + BOOST_TEST(test_case(kafka_mem - 100, false) == r(kafka_mem - 100)); + BOOST_TEST(test_case(kafka_mem - 100, true) == r(kafka_mem - 100)); + BOOST_TEST(test_case(kafka_mem + 100, false) == r(kafka_mem)); + BOOST_TEST(test_case(kafka_mem + 100, true) == r(kafka_mem)); + BOOST_TEST(test_case(fetch_mem + 100, false) == r(kafka_mem)); + BOOST_TEST(test_case(fetch_mem + 100, true) == r(kafka_mem)); + + memsemunits.return_all(); + kafka_mem = memory_sem.available_units(); + + // *** low on fetch memory tests + // kafka_mem > batch_size > fetch_mem + // Under this condition, unless obligatory_batch_read, we cannot reserve + // memory as it's not enough for at least a single batch. + // If obligatory_batch_read, the reserved amount will always be a single + // batch. + memsemunits = ss::consume_units( + memory_fetch_sem, fetch_mem - batch_size + 1000); + fetch_mem = memory_fetch_sem.available_units(); + BOOST_TEST_REQUIRE(kafka_mem > batch_size); + BOOST_TEST_REQUIRE(fetch_mem < batch_size); + + BOOST_TEST(test_case(fetch_mem - 100, false) == r(0)); + BOOST_TEST(test_case(fetch_mem - 100, true) == r(batch_size)); + BOOST_TEST(test_case(batch_size - 100, false) == r(0)); + BOOST_TEST(test_case(batch_size - 100, true) == r(batch_size)); + BOOST_TEST(test_case(kafka_mem - 100, false) == r(0)); + BOOST_TEST(test_case(kafka_mem - 100, true) == r(batch_size)); + BOOST_TEST(test_case(kafka_mem + 100, false) == r(0)); + BOOST_TEST(test_case(kafka_mem + 100, true) == r(batch_size)); + + memsemunits.return_all(); + fetch_mem = memory_fetch_sem.available_units(); + + // *** low on kafka memory tests + // fetch_mem > batch_size > kafka_mem + // Essentially the same behaviour as in low fetch memory cases + memsemunits = ss::consume_units(memory_sem, kafka_mem - batch_size + 1000); + kafka_mem = memory_sem.available_units(); + BOOST_TEST_REQUIRE(kafka_mem < batch_size); + BOOST_TEST_REQUIRE(fetch_mem > batch_size); + + BOOST_TEST(test_case(kafka_mem - 100, false) == r(0)); + BOOST_TEST(test_case(kafka_mem - 100, true) == r(batch_size)); + BOOST_TEST(test_case(batch_size - 100, false) == r(0)); + BOOST_TEST(test_case(batch_size - 100, true) == r(batch_size)); + BOOST_TEST(test_case(batch_size + 100, false) == r(0)); + BOOST_TEST(test_case(batch_size + 100, true) == r(batch_size)); + BOOST_TEST(test_case(fetch_mem - 100, false) == r(0)); + BOOST_TEST(test_case(fetch_mem - 100, true) == r(batch_size)); + BOOST_TEST(test_case(fetch_mem + 100, false) == r(0)); + BOOST_TEST(test_case(fetch_mem + 100, true) == r(batch_size)); + + memsemunits.return_all(); + kafka_mem = memory_sem.available_units(); +} diff --git a/src/v/net/server.h b/src/v/net/server.h index f1129e7c72f6..56b5d155ace3 100644 --- a/src/v/net/server.h +++ b/src/v/net/server.h @@ -73,7 +73,7 @@ struct config_connection_rate_bindings { struct server_configuration { std::vector addrs; - int64_t max_service_memory_per_core; + int64_t max_service_memory_per_core = 0; std::optional listen_backlog; std::optional tcp_recv_buf; std::optional tcp_send_buf; diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index 6fc55ec389d7..4d1877bff238 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -45,6 +45,7 @@ #include "pandaproxy/schema_registry/configuration.h" #include "redpanda/application.h" #include "resource_mgmt/cpu_scheduling.h" +#include "ssx/thread_worker.h" #include "storage/directories.h" #include "storage/tests/utils/disk_log_builder.h" #include "test_utils/async.h" @@ -122,33 +123,39 @@ class redpanda_thread_fixture { app.check_environment(); app.wire_up_and_start(*app_signal, true); - configs.start(ss::sstring("fixture_config")).get(); + net::server_configuration scfg("fixture_config"); + scfg.max_service_memory_per_core = memory_groups::rpc_total_memory(); + scfg.disable_metrics = net::metrics_disabled::yes; + scfg.disable_public_metrics = net::public_metrics_disabled::yes; + configs.start(scfg).get(); // used by request context builder - proto = std::make_unique( - &configs, - app.smp_service_groups.kafka_smp_sg(), - app.sched_groups.fetch_sg(), - app.metadata_cache, - app.controller->get_topics_frontend(), - app.controller->get_config_frontend(), - app.controller->get_feature_table(), - app.quota_mgr, - app.snc_quota_mgr, - app.group_router, - app.usage_manager, - app.shard_table, - app.partition_manager, - app.id_allocator_frontend, - app.controller->get_credential_store(), - app.controller->get_authorizer(), - app.controller->get_security_frontend(), - app.controller->get_api(), - app.tx_gateway_frontend, - app.tx_registry_frontend, - std::nullopt, - *app.thread_worker, - app.schema_registry()); + proto + .start( + &configs, + app.smp_service_groups.kafka_smp_sg(), + app.sched_groups.fetch_sg(), + std::ref(app.metadata_cache), + std::ref(app.controller->get_topics_frontend()), + std::ref(app.controller->get_config_frontend()), + std::ref(app.controller->get_feature_table()), + std::ref(app.quota_mgr), + std::ref(app.snc_quota_mgr), + std::ref(app.group_router), + std::ref(app.usage_manager), + std::ref(app.shard_table), + std::ref(app.partition_manager), + std::ref(app.id_allocator_frontend), + std::ref(app.controller->get_credential_store()), + std::ref(app.controller->get_authorizer()), + std::ref(app.controller->get_security_frontend()), + std::ref(app.controller->get_api()), + std::ref(app.tx_gateway_frontend), + std::ref(app.tx_registry_frontend), + std::nullopt, + std::ref(*app.thread_worker), + std::ref(app.schema_registry())) + .get(); configs.stop().get(); } @@ -226,6 +233,7 @@ class redpanda_thread_fixture { ~redpanda_thread_fixture() { shutdown(); + proto.stop().get(); if (remove_on_shutdown) { std::filesystem::remove_all(data_dir); } @@ -645,7 +653,7 @@ class redpanda_thread_fixture { conn_ptr make_connection_context() { security::sasl_server sasl(security::sasl_server::sasl_state::complete); return ss::make_lw_shared( - *proto, + proto.local(), nullptr, std::move(sasl), false, @@ -689,7 +697,7 @@ class redpanda_thread_fixture { uint16_t schema_reg_port; std::filesystem::path data_dir; ss::sharded configs; - std::unique_ptr proto; + ss::sharded proto; bool remove_on_shutdown; std::unique_ptr<::stop_signal> app_signal; }; diff --git a/tests/rptest/tests/memory_stress_test.py b/tests/rptest/tests/memory_stress_test.py index 3c0304e52ce2..0ff1700d34e7 100644 --- a/tests/rptest/tests/memory_stress_test.py +++ b/tests/rptest/tests/memory_stress_test.py @@ -9,7 +9,7 @@ from enum import Enum -from ducktape.mark import ok_to_fail +from ducktape.mark import ok_to_fail, parametrize from rptest.clients.types import TopicSpec from rptest.services.cluster import cluster from rptest.services.kaf_consumer import KafConsumer @@ -36,10 +36,12 @@ def setUp(self): # enabling each test case to customize its ResourceSettings pass - @ok_to_fail # until the fix is delivered @cluster(num_nodes=5) @skip_debug_mode - def test_fetch_with_many_partitions(self): + @parametrize(memory_share_for_fetch=0.05) + @parametrize(memory_share_for_fetch=0.5) + @parametrize(memory_share_for_fetch=0.8) + def test_fetch_with_many_partitions(self, memory_share_for_fetch: float): """ Exhaust memory by consuming from too many partitions in a single Fetch API request. @@ -47,11 +49,15 @@ def test_fetch_with_many_partitions(self): # memory_mb does not work with debug redpanda build, therefore the test # only makes sense with release redpanda, hence @skip_debug_mode self.redpanda.set_resource_settings( - ResourceSettings(memory_mb=256, num_cpus=1)) + ResourceSettings(memory_mb=512, num_cpus=1)) self.redpanda.set_seed_servers(self.redpanda.nodes) + self.redpanda.add_extra_rp_conf({ + "kafka_batch_max_bytes": + 10 * 1024 * 1024, + "kafka_memory_share_for_fetch": + memory_share_for_fetch + }) self.redpanda.start(omit_seeds_on_idx_one=False) - self.redpanda.set_cluster_config( - {"kafka_batch_max_bytes": 10 * 1024 * 1024}) # the maximum message size that does not make redpanda OOM with all # the other params as they are is 64 MiB @@ -68,8 +74,9 @@ def test_fetch_with_many_partitions(self): 150 * 500_000) produce_timeout = msg_count * msg_size // 2184533 self.logger.info( - f"Starting producer. msg_size={msg_size}, msg_count={msg_count}, partiton_count={partition_count}, rpk_response_timeout={rpk_response_timeout}, produce_timeout={produce_timeout}" - ) + f"Starting producer. msg_size={msg_size}, msg_count={msg_count}, " + f"partiton_count={partition_count}, rpk_response_timeout={rpk_response_timeout}, " + f"produce_timeout={produce_timeout}") producer = RpkProducer(self.test_context, self.redpanda,