-
Notifications
You must be signed in to change notification settings - Fork 580
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Basic auth scale tests2 #7111
Basic auth scale tests2 #7111
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After removing the gate and lock from the sharded client cache, I'm seeing node crashes and connection timeouts from the Python Requests library with tests with 1k and 2k users.
I think we still need a gate and lock for concurrency.
Doesn't the webserver use worker threads to handle requests? If so, then we need to control access to the client cache which is a sharded service. I'll look into this more. We use seastar http server, not apache so this comment does not apply.
ss::smp_submit_to_options{context().smp_sg}, | ||
[func{std::forward<Func>(func)}]( | ||
kafka_client_cache& cache) mutable { | ||
return std::invoke(std::move(func), cache); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gate or lock here. OR really a gate/lock on the sharded service. The method invoke_on_cache
was meant to the be the wrapper for the gate and lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need a gate; kafka_client_cache
encapsulates a gate that protects async calls.
You don't need a lock; kafka_client_cache
encapsulates a lock that protects against reentrancy of clean_stale_clients
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without the gate and lock, the test fails with 2k users (and 1k users but 2/10 runs) with a crash rptest.services.utils.NodeCrash: <NodeCrash docker-rp-1: redpanda: /home/nyalia/redpanda/vbuild/debug/clang/rp_deps_install/include/seastar/core/iostream.hh:391: seastar::output_stream<char>::~output_stream() [CharType = char]: Assertion !_in_batch && "Was this stream properly closed?"' failed.
There are also many connection errors due to timeout, likely because the broker crashed:
41ea10>: Failed to establish a new connection: [Errno 111] Connection refused')) raise ConnectionError(e, request=request) requests.exceptions.ConnectionError: HTTPConnectionPool(host='docker-rp-1', port=8082): Max retries exceeded with url: /topics (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f4834380220>: Failed to establish a new connection: [Errno 111] Connection refused'))
The assert failure Assertion !_in_batch && "Was this stream properly closed?
indicates that a output_stream was destroyed/closed while it still held bytes to transport.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so what's happening is that the client is being evicted during do_connect
, and its lifetime isn't extended.
Try something like this:
diff --git a/src/v/kafka/client/client.cc b/src/v/kafka/client/client.cc
index 38abfdbd2..931963e46 100644
--- a/src/v/kafka/client/client.cc
+++ b/src/v/kafka/client/client.cc
@@ -63,9 +63,9 @@ ss::future<> client::do_connect(net::unresolved_address addr) {
return make_broker(unknown_node_id, addr, _config)
.then([this](shared_broker_t broker) {
return broker->dispatch(metadata_request{.list_all_topics = true})
- .then([this, broker](metadata_response res) {
- return apply(std::move(res));
- });
+ .then(
+ [this](metadata_response res) { return apply(std::move(res)); })
+ .finally([broker]() {});
});
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
And this makes sense. I think Noah also mentioned something about this scenario but I thought I accounted for that already but I did not think about the broker!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ss::smp_submit_to_options{context().smp_sg}, | ||
[func{std::forward<Func>(func)}]( | ||
kafka_client_cache& cache) mutable { | ||
return std::invoke(std::move(func), cache); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need a gate; kafka_client_cache
encapsulates a gate that protects async calls.
You don't need a lock; kafka_client_cache
encapsulates a lock that protects against reentrancy of clean_stale_clients
.
9f00864
to
ee98958
Compare
This commit reduces cross-shard communication by moving the garbage collection timer into the individual sharded instances. Otherwise we risk a seastar assert failure on the shared timer. The assert failure happens when two or more sharded instances evict and then trigger garabage collection at the sametime.
Prior to this commit, all sharded state was handled in the sharded_client_cache wrapper. That wrapper existed on a single core which led to concurrency failures since there was cross-core communication betweem the wrapper and the sharded services. This commit reduces cross-core communication by moving concurrency mechanisms into the sharded service. This commit also serves as the pre-req to remove the sharded_client_cache wrapper. Finally, the auth_ctx_server is the new "frontend" for the kafka client cache. The ctx server can pass function handles to the sharded instance directly. So invoke_on_cache is no longer needed.
The wrapper is obsolete now since the auth_ctx_server calls the sharded client cache directly.
ee98958
to
a232c79
Compare
Closing now since #6781 has the updated code |
Cover letter
Draft of changes proposed in #6781
Backport Required
UX changes
Release notes