-
Notifications
You must be signed in to change notification settings - Fork 580
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Limit memory used while fetching many partitions #10905
Limit memory used while fetching many partitions #10905
Conversation
1d26c57
to
484db77
Compare
/ci-repeat 3 |
CI failures in https://buildkite.com/redpanda/redpanda/builds/29714#01884a44-7df4-4a4f-8a28-16a952c0ee31: in https://buildkite.com/redpanda/redpanda/builds/29714#01884a37-0b3d-4a6c-9a29-322f4cb9d22d
|
@dlex - I couldn't quite determine how this works from the description in the PR. Can you lengthen it a bit? What isn't clear to be exactly the relationship between the new and existing (kafka RPC) semaphore on all shards involved in a reuqestion: the "connection" shard where the planner is running and then the shards targeted in the fetch. Which semaphores are decremented when and by how much? Maybe a small worked example would help me if you're willing to provide it. |
484db77
to
acb23eb
Compare
Force push: rebased upstream |
acb23eb
to
6fbfe0a
Compare
Force push: linter errors |
@dlex could you summarize the changes between the previous reverted change and this one? I'm curious what the issue was. |
6fbfe0a
to
4021c51
Compare
Force push: rebased upstream |
4021c51
to
bd02089
Compare
Force push: some comments revisited |
bd02089
to
93af53b
Compare
Force push: test improvements
|
93af53b
to
9be0d3b
Compare
Kafka server now stores (per shard) memory semaphore that will limit memory usage by fetch request handler. Semaphore count is configured based on the "kafka_memory_share_for_fetch" property and the kafka rpc service memory size. Metric `vectorized_kafka_rpc_fetch_avail_mem_bytes` added to control the semaphore level. There is a sharded `server` accessor in `request_context` to reach the local shard instance of the new semaphore, as well as the local instance of `net::memory` semaphore.
Access `replica_selector` via the newly exposed `sharded<server>` to reach the local shard instance of `kafka::server` and its replica_selector. This prevents cross shard access to `metadata_cache` and future objects when replica selectors evolve.
d81dad6
to
d17be67
Compare
Force push:
|
Consult with memory semaphores on whether there is enough memory available to perform the fetch while concurrently fetching from ntps. Both general kafka memory semaphore, and the new kafka fetch memory semaphores are used. With the former one, the amount consumed from it by request memory estimator is considered. Since batch size is not known ahead, it is estimated at 1 MiB. The first partition in the list is fetched regardless of the semaphores values, to satisfy the requirement that at least a signle partition from the fetch request must advance. The amount of units held is adjusted to the actual size used as soon as it is known. The acquired units of the memory semaphores are held with `read_result` until it is destroyed at the end of the fetch request processing. When `read_result` is destroyed in the connection shard, the semaphore units are returned in the shard where they have been acquired. If request's max_size bytes is more than either semaphore holds, max_size is reduced to the memory actually available, also considering the minimum batch size.
In kafka_server_rpfixture, an extra `kafka::server` is created using a barely initialized `server_configuration` instance. A garbage in `max_service_memory_per_core` has caused issues now because of the new arithmetics done with in in the kafka::server ctor.
Test the algorithm that decides whether can a fetch request proceed in an ntp based on the resources available. Move the existing testing-only symbols into the `testing` ns.
RAM increased to 512M because redpanda was failing on 256M for unrelated reasons. Test with different values for "kafka_memory_share_for_fetch".
d17be67
to
06a38b9
Compare
Force push: release semaphore units in the shard where they have been acquired. |
/ci-repeat 10 |
/backport v23.1.x |
/backport v22.3.x |
Failed to run cherry-pick command. I executed the commands below:
|
Failed to run cherry-pick command. I executed the commands below:
|
A semaphore is added to
kafka::server
to control the memory used for the buffers in the fetch path. The semaphore is set to a configurable fraction of kafka shard memory (default 0.5). As the fetch work is delegated to different shards, the first partition requested from a shard is always served, other partitions may be skipped if the memory semaphore on the shard does not have enough units available. Memory reservation is done based on the estimation of 1 MiB per batch (default, configurable), which is adjusted as soon as the real used size is known. The semaphore is local to the partition home shard.The overall kafka server memory semaphore is also used the same way, so sizes of fetch responses are also limited from the overall kafka server perspective as well. It is also the partition home shard's semaphore, not the one associated with the connection that has received the fetch request. This is done because the memory for the buffers is acquired on the partition home shard.
Fetch request memory estimator is not changed, because it only works with the memory semaphore associated with the connection shard, and at the time of the estimation we don't even know what shards will be affected.
Neither of the semaphores is waited upon, the amounts only get consumed from there. Both semaphores can go negative as the result. So, when overused:
read_from_ntp
s will only perform the minimal read then (one batch per shard's part of the plan)Actually, fetch memory semaphore is never waited upon and could have been an integer instead. The only reason it is a semaphore is a convenience of
semaphore_units
.As a side effect, this PR adds support for floating point bounded properties.
The differences from the reverted #10371 are:
fetch_memory_estimator
, not done in this PR because memory estimator only affects connection shard memory semaphore, while the idea of the PR is to account memory against the memory semaphores of the shards where the memory is getting allocated.Fixes #3409.
Backports Required
Release Notes
Improvements
Memory control is improved in Fetch request handler, covering the cases when a client tries to fetch too many partitions lead by the same shard. Some of the request partitions will not be fetched if the broker does not have enough memory for that, or if that would violate constraints set by
kafka_max_bytes_per_fetch
andfetch_max_bytes
.If
kafka_max_bytes_per_fetch
is not configured properly, redpanda is now more robust against huge Fetch requests by controlling memory consumption dynamically.