-
Notifications
You must be signed in to change notification settings - Fork 580
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add an asychronous zstd stream compression class #5565
Conversation
ad24b22
to
d7487ed
Compare
Is the gist to use thread local storage of 2mb as the scratch space. Can the cover letter have more deets. |
Updated the cover letter to explain what's going on in the PR. Sorry for the delay! This new class will allocated about 4mb of scratch space for compression/decompression on each thread at the start of RP. 2mb will be for decompression and 2mb will be for compression. By doing this we can guarantee that Zstd won't internally allocate any more space during any compression/decompression operations. |
SEASTAR_THREAD_TEST_CASE(async_stream_zstd_test) { | ||
compression::async_stream_zstd fn; | ||
auto test_sizes = get_test_sizes(); | ||
for (size_t i : sizes) { | ||
iobuf buf = gen(i); | ||
|
||
auto cbuf = fn.compress(buf.share(0, i)).get(); | ||
auto dbuf = fn.uncompress(std::move(cbuf)).get(); | ||
|
||
BOOST_CHECK_EQUAL(dbuf, buf); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about also expanding zstd_stream_bench
? it would be nice too if it was setup to trigger a reactor stall without the async version (now that seastar has a 25 ms threshold it should be easier to trigger). seeing the difference in performance will be interesting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do
src/v/redpanda/application.cc
Outdated
ss::smp::invoke_on_all([] { | ||
compression::async_stream_zstd::init_workspace( | ||
config::shard_local_cfg().zstd_decompress_workspace_bytes()); | ||
}).get0(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this a workspace that is now separate from the non-async version? should one be replaced or can they share? if they are separate what's the future of them in terms of being combined?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is separated since they can't be shared without a mutex. Though I suppose we could have a compress/decompress mutex both classes share. The hope is that we can replace stream_zstd
with async_stream_zstd
in v/kafka
as which point we can remove stream_zstd
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The hope is that we can replace stream_zstd with async_stream_zstd in v/kafka as which point we can remove stream_zstd.
yeh i think this makes sense as the preferred path forward. assuming we don't find any issues with performance we can probably make that switch soon
78b6baf
to
59dbe87
Compare
Results of performance testing vs the existing
Some of the conclusions that can be made are;
In all tests Beyond this |
@ballard26 we should make this the default in our internal RPC |
59dbe87
to
773ae30
Compare
773ae30
to
228345d
Compare
acb8c62
to
ee1fc49
Compare
db5bea6
to
3e6512f
Compare
…end` Adding additional scheduling points in `dispatch_send` to account for the newly futurized `as_scattered` results in a race condition where messages are sent out of order. As such the easiest solution is to call `as_scattered` outside of `dispatch_send` where adding a scheduling point won't result in this race condition. Then pass the result to `dispatch_send`.
Results of performance testing vs the existing `stream_zstd` class are as follows: ``` test iterations median mad min max allocs tasks inst streaming_zstd_1mb.compress 6385 130.290us 1.513us 126.861us 137.173us 16.000 0.000 0.0 streaming_zstd_1mb.uncompress 4185 78.486us 2.100us 75.342us 80.586us 108.616 0.000 0.0 streaming_zstd_10mb.compress 653 1.118ms 24.680us 1.041ms 1.142ms 942.830 0.000 0.0 streaming_zstd_10mb.uncompress 429 844.631us 80.656us 760.340us 976.986us 2095.301 0.000 0.0 async_stream_zstd.1mb_compress 7616 100.022us 262.374ns 99.547us 100.598us 148.278 2.082 0.0 async_stream_zstd.1mb_uncompress 4804 73.661us 108.821ns 73.542us 73.819us 312.587 6.156 0.0 async_stream_zstd.10mb_compress 772 979.403us 1.092us 978.266us 985.029us 2176.762 56.239 0.0 async_stream_zstd.10mb_uncompress 490 728.405us 720.304ns 726.690us 731.632us 4089.282 109.756 0.0 ```
3e6512f
to
13223b6
Compare
LGTM, thanks for all the changes! |
|
Is this worth backporting? |
Might be a lot of effort (it's a large-ish PR and might not be easy backport) and I would prefer we rather recommended users to upgrade, honestly, @BenPope |
I understand. This isn't in any released version, so that advice will have to wait until at least v23.1.1. |
Ah I see what you mean, it isn't backported even to v22.3.x. Yeah, that we might want to do. @ballard26 ? |
/backport v22.3.x |
Cover letter
This PR adds a new Zstd streaming interface
async_stream_zstd
that is different from the existingstream_zstd
in a few ways.Firstly all compression/decompression methods return futures and allow for seastar to interrupt them if they are close to hitting the reactor stall timeout. This interface change is the main justification for a new class instead of modifying the existing one as a lot of non-futurized code in
v/kafka
expects compress/uncompress to be a blocking call that returns the actual result right away. Hopefully we'll be able to migrate users ofstream_zstd
toasync_stream_zstd
in time.Secondly in
compress
currentlystream_zstd
allocates a large contiguous buffer for the entire expected size for the compressed data block this has shown to cause OOM errors when the data block is too large (see #5566 ).async_stream_zstd
instead of allocating a large contiguous buffer for the compression output is outputting a Zstd block at a time (128KiB max size) and then appending that block to an iobuf. This change should probably be ported over tostream_zstd
as well in case any kafka API messages prove to be large enough to cause OOM errors.Fixes #5116, #5566
Release notes