-
Notifications
You must be signed in to change notification settings - Fork 1.1k
chore: event count throttle for squashed commands #4924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: kostas <kostas@dragonflydb.io>
src/server/multi_command_squasher.cc
Outdated
thread_local size_t MultiCommandSquasher::throttle_size_limit_ = | ||
absl::GetFlag(FLAGS_throttle_squashed); | ||
|
||
thread_local util::fb2::EventCount MultiCommandSquasher::ec_; | ||
|
||
MultiCommandSquasher::MultiCommandSquasher(absl::Span<StoredCmd> cmds, ConnectionContext* cntx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used not only from async fiber but also directly from the connection. If we preempt, the connection will also "freeze". I guess this is fine, just mentioning it here for completeness.
There are 3 calls of this and all of them should be ok if we preempt from these flows.
tests/dragonfly/memory_test.py
Outdated
await cl.execute_command("exec") | ||
|
||
# With the current approach this will overshoot | ||
# await client.execute_command("multi") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish we also handled this case as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the difference? why does it not handles this case?
src/server/multi_command_squasher.h
Outdated
@@ -94,6 +104,9 @@ class MultiCommandSquasher { | |||
|
|||
// we increase size in one thread and decrease in another | |||
static atomic_uint64_t current_reply_size_; | |||
static thread_local size_t throttle_size_limit_; | |||
// Used to throttle when memory is tight | |||
static thread_local util::fb2::EventCount ec_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this to avoid ThisFiber::Yield, ThisFiber::SleepFor in while(true)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since it's thread local, it's more efficient to use NoOpLock together with CondVarAny
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@romange nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I added a bug here:
static atomic_uint64_t current_reply_size_;
so current_reply_size
is not thread local. So what can happen is:
Core 0 -> starts multi/exec
Core 1 -> starts multi/exec but needs to throttle so it goes to sleep waiting on the thread local cond variable
Core 0 -> is done, notifies the thread local
Core 1 -> the fiber never awakes even though we decremented current_reply_size
.
Since current_reply_size
is global then so should ec_
.
P.s. not very happy with this extra synchronization but we only pay it when we are under memory pressure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be thread local.
@adiholden pinging for an early discussion here |
src/server/multi_command_squasher.cc
Outdated
@@ -15,6 +16,8 @@ | |||
#include "server/transaction.h" | |||
#include "server/tx_base.h" | |||
|
|||
ABSL_FLAG(size_t, throttle_squashed, 0, ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@adiholden I will adjust as we said f2f. Looking for some early feedback based on our discussion
src/server/multi_command_squasher.cc
Outdated
@@ -63,6 +66,10 @@ size_t Size(const facade::CapturingReplyBuilder::Payload& payload) { | |||
} // namespace | |||
|
|||
atomic_uint64_t MultiCommandSquasher::current_reply_size_ = 0; | |||
thread_local size_t MultiCommandSquasher::throttle_size_limit_ = | |||
absl::GetFlag(FLAGS_throttle_squashed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed this morning multiply by thread count. The limit should be per thread and the current_reply_size_ is global counter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I know, I even wrote a comment above that I will follow up with this 😄
I wanted to know if you have anything else to add 😄
src/server/multi_command_squasher.cc
Outdated
@@ -63,6 +66,9 @@ size_t Size(const facade::CapturingReplyBuilder::Payload& payload) { | |||
} // namespace | |||
|
|||
atomic_uint64_t MultiCommandSquasher::current_reply_size_ = 0; | |||
thread_local size_t MultiCommandSquasher::throttle_size_limit_ = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we should multiply throttle_squashed by the number of io threads and not shard number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should no do this at all. the limit should be by thread. and in general it's not well defined to initialize thread local by using another thread local that is initialized to nullptr.
src/server/multi_command_squasher.h
Outdated
@@ -37,6 +38,15 @@ class MultiCommandSquasher { | |||
return current_reply_size_.load(std::memory_order_relaxed); | |||
} | |||
|
|||
static bool IsMultiCommandSquasherOverLimit() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe rename to IsReplySizeOverLimit?
src/server/multi_command_squasher.h
Outdated
// Used to throttle when memory is tight | ||
static util::fb2::EventCount ec_; | ||
|
||
static thread_local size_t throttle_size_limit_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe reply_size_limit_ ?
src/server/multi_command_squasher.cc
Outdated
@@ -15,6 +16,8 @@ | |||
#include "server/transaction.h" | |||
#include "server/tx_base.h" | |||
|
|||
ABSL_FLAG(size_t, throttle_squashed, 0, ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe squashed_reply_size_limit
add flag description
also I think we should have a default limit here maybe 128_MB ?
tests/dragonfly/memory_test.py
Outdated
# At any point we should not cross this limit | ||
assert df.rss < 1_500_000_000 | ||
cl = df.client() | ||
await cl.execute_command("multi") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that the flow that you are testing is the multi exec flow which I did not think about. When I suggested this throttling I was thinking about the pipeline flow.
When reviewing now the multi exec flow I am not 100% sure for implying this logic in this flow as we when you do the await in the code to wait for the size to decrease we already scheduled the transaction and I am not sure if this can lead in some cases to a deadlock
src/server/server_state.h
Outdated
@@ -270,6 +270,10 @@ class ServerState { // public struct - to allow initialization. | |||
|
|||
bool ShouldLogSlowCmd(unsigned latency_usec) const; | |||
|
|||
size_t GetTotalShards() const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed - you can use shard_set->size()
everywhere
src/server/multi_command_squasher.cc
Outdated
@@ -215,6 +222,9 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { | |||
if (order_.empty()) | |||
return true; | |||
|
|||
MultiCommandSquasher::ec_.await( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all our current approaches of limiting memory are "per-thread", this is consistent and works nicely with shared-nothing. What is the reason for not using per thread limits? In addition, we already have per thread throttling inside dragonfly_connection code, see IsPipelineBufferOverLimit. Did you consider pigging back on this mechanism instead ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as long as current_reply_size_ is global I dont see how we can do this per thread
src/server/multi_command_squasher.cc
Outdated
for (auto idx : order_) { | ||
auto& replies = sharded_[idx].replies; | ||
CHECK(!replies.empty()); | ||
|
||
aborted |= opts_.error_abort && CapturingReplyBuilder::TryExtractError(replies.back()); | ||
|
||
current_reply_size_.fetch_sub(Size(replies.back()), std::memory_order_relaxed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I said nothing when current_reply_size_
was added. it was a mistake. I do not want anyone introduces global states in Dragonfly codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BorysTheDev FYI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but @romange we discussed this when current_reply_size_ was added. Because the multi command sqasher is adding replies in different threads you said it makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would something like this work?
https://github.com/dragonflydb/dragonfly/compare/RemoveAtomic?expand=1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@romange The change in your branch linked above is that you count the reply size after we executed all the squashed commands. So it does uses the thread local approach correctly but what we expose to metrics is not accurate because at the time the capture reply builders grow we do not expose this until we finish with all the squashed. I think that applying such logic will impact the throttling on reply size just in delay, we will throttle but not when actually we are at the threshold but with some delay. I guess we can do this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no a "correct" solution because I can show you a scenario where single central atomic won't work either:
we throttle before we send commands to shards, but maybe there are tons of squashed commands in flight that have not filled their replies yet, so you let the next command pass and only then the reply buffer increases.
I would rather have a less accurate metric than have have all our threads contend on atomics and now on a single condvar. this kills performance. I won't be surprised that even now squashing performance is worse because of the "reply bufffer size" atomic being hammered by multiple threads.
src/server/multi_command_squasher.cc
Outdated
// This is not true for `multi/exec` which uses `Execute()` but locks ahead before it | ||
// calls `ScheduleSingleHop` below. | ||
// TODO Investigate what are the side effects for allowing it `lock ahead` mode. | ||
if (opts_.is_mult_non_atomic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont remember if we discussed this - Why do you need to throttle here and not use the same mecanism we have today for pipeline backpressure to throttle also if we have squashing_current_reply_size above limit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good comment. We discussed this back then reply_size
was defined in multi_command_squasher
which is part of /server
and not facade
so the connection had no access to it.
After Roman's changes however, he moved this to facade_types
so now facade
has access to it. I moved everything to dragonfly_connection
. However, I chose not to throttle on the queue back pressure (within DispatchSingle
) and do it a little later within AsyncFiber
because:
- throttling on reply size should only be relevant when we try to squash the pipeline. I don't want to introduce delays for commands that are executing standalone as I don't believe they will increase RSS that much for the workloads we try to "fix"
- throttling before we dispatch async to the queue is not a great idea because the async fiber might sleep between iterations and inbetween the dispatch queue will aggregate a bunch of commands. Once the async fiber wakes up, it will try to squash the pipeline -- bypassing the protective mechanism we just added (since we the state might be that the reply size is already over the limit yet we don't know it because we made that decision on the dispatch and not on the async fiber level)
@@ -2075,6 +2098,16 @@ void Connection::DecrNumConns() { | |||
--stats_->num_conns_other; | |||
} | |||
|
|||
bool Connection::IsReplySizeOverLimit() const { | |||
std::atomic<size_t>& reply_sz = tl_facade_stats->reply_stats.squashing_current_reply_size; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.load(memory_order_relaxed)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, why did you split this into 2 lines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.load(memory_order_relaxed)
We should synchronize acquire and release semantics, otherwise the load might be an older value in the modification order of the atomic variable.
I splited it in two lines because otherwise the expression was too big
std::atomic<size_t>& reply_sz = tl_facade_stats->reply_stats.squashing_current_reply_size; | ||
size_t current = reply_sz.load(std::memory_order_acquire); | ||
const bool over_limit = reply_size_limit != 0 && current > 0 && current > reply_size_limit; | ||
LOG(INFO) << "current: " << current << "/" << reply_size_limit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log info on this will flood the log file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was accidental, I changed that when I wanted to debug something. will fix
size_t current = reply_sz.load(std::memory_order_acquire); | ||
const bool over_limit = reply_size_limit != 0 && current > 0 && current > reply_size_limit; | ||
LOG(INFO) << "current: " << current << "/" << reply_size_limit; | ||
VLOG_IF(2, over_limit) << "MultiCommandSquasher overlimit: " << current << "/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually if we are over limit we want to see this in logs, make this a warning and print once a second
@@ -2105,7 +2138,7 @@ void Connection::EnsureMemoryBudget(unsigned tid) { | |||
|
|||
Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, unsigned thread_id, | |||
uint32_t client_id) | |||
: ptr_{ptr}, thread_id_{thread_id}, client_id_{client_id} { | |||
: ptr_{std::move(ptr)}, thread_id_{thread_id}, client_id_{client_id} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deep copying a shared ptr increments the atomic counter so we pay for an atomic operation. move avoids that by copying the control pointer instead. I just saw the misuse and used move 😄
@@ -14,6 +14,7 @@ | |||
|
|||
#include "base/iterator.h" | |||
#include "facade/op_status.h" | |||
#include "util/fibers/synchronization.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope
// We need to first set async_dispatch before we preempt. Otherwise, when the AsyncFiber | ||
// wakes up, sync_dispatch might be true, violating the precondition of this flow | ||
// (when we block earlier in the body of AsyncFiber at cnd_.wait(noop_lk, [this])...) | ||
fb2::NoOpLock noop; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still I dont see you use QueueBackpressure class today. Why not use the same class for throttle on squashed pipeline reply size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I did that on purpose:
throttling before we dispatch async to the queue is not a great idea because the async fiber might sleep between iterations and inbetween the dispatch queue will aggregate a bunch of commands. Once the async fiber wakes up, it will try to squash the pipeline -- bypassing the protective mechanism we just added (since we the state might be that the reply size is already over the limit yet we don't know it because we made that decision on the dispatch and not on the async fiber level)
see #4924 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please describe scenario of how squashing_current_reply_size
can be greater than zero at this point of time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, squashing_current_reply_size
is an atomic thread local variable that is shared and can be incremented on different threads. Different connections from thread 1 can schedule a series of squashed pipelines on thread 2 (and will increment the same squashing_current_reply_size
). When an async fiber on thread 1 wakes up after its squashed pipeline finished executing (and decremented the `squashing_current_reply_size), it will try to squash another one but in parallel the same variable got incremented on thread 2 (because of another of another tx) and potentially crossed the hard limit.
However, your question raised an interesting point. Is it reasonable to expect that squashing_current_reply_size
is usually close to 0
even when multiple connections dispatch squashed pipelines ? I would say maybe, because the rate we increment and decrement this variable on separate threads should be close. By the time we decremented it on thread 1 another tx incremented it on thread 2 and the same cycle repeats in such a way that we always have a value for this variable that is close to 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, you are right, squashing_current_reply_size can be higher than a threshold due to other connections on this thread. In that case I suggest that instead of throttling the connection - we just avoid calling SquashPipeline, and choose the usual route of sending a single command (lines 1644-1647)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah and that would remove the throttling/extra synchronization all together +1
Throttle/preempt flows that use multi command squasher and crb crosses the limit.