Skip to content

Commit

Permalink
compression/async_stream_zstd: switch to using object pool
Browse files Browse the repository at this point in the history
  • Loading branch information
ballard26 committed Dec 1, 2022
1 parent 2922bd5 commit 13223b6
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 44 deletions.
122 changes: 94 additions & 28 deletions src/v/compression/async_stream_zstd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ static void throw_if_error(size_t rc) {

static constexpr size_t default_decompression_size = 2_MiB;
static constexpr int default_compression_level = 3;
static constexpr int default_contexts = 2;

static thread_local std::unique_ptr<async_stream_zstd> zstd_instance;

Expand All @@ -57,7 +58,8 @@ void initialize_async_stream_zstd(size_t decompression_size) {
return;
}

zstd_instance = std::make_unique<async_stream_zstd>(decompression_size);
zstd_instance = std::make_unique<async_stream_zstd>(
decompression_size, default_contexts);
}

async_stream_zstd& async_stream_zstd_instance() {
Expand All @@ -68,48 +70,103 @@ async_stream_zstd& async_stream_zstd_instance() {
return *zstd_instance;
}

async_stream_zstd::async_stream_zstd(size_t decompression_size) {
_decompression_size = decompression_size;

_d_buffer = ss::temporary_buffer<char>(ZSTD_DStreamOutSize());
async_stream_zstd::static_cctx::static_cctx(int compression_level) {
_c_buffer = ss::temporary_buffer<char>(ZSTD_CStreamOutSize());

auto d_workspace_size = ZSTD_estimateDStreamSize(decompression_size);
auto c_workspace_size = ZSTD_estimateCStreamSize(default_compression_level);

// zstd requires alignment on both of the buffer below.
_d_workspace = ss::allocate_aligned_buffer<char>(d_workspace_size, 8);
auto c_workspace_size = ZSTD_estimateCStreamSize(compression_level);
// zstd requires alignment for the buffer below.
_c_workspace = ss::allocate_aligned_buffer<char>(c_workspace_size, 8);

vassert(
_c_workspace,
"Failed to allocate zstd workspace of size {}",
c_workspace_size);

_compress_ctx = ZSTD_initStaticCCtx(_c_workspace.get(), c_workspace_size);

throw_if_error(ZSTD_CCtx_setParameter(
_compress_ctx, ZSTD_c_compressionLevel, default_compression_level));
}

async_stream_zstd::static_cctx::static_cctx(static_cctx&& ctx) noexcept
: _compress_ctx(ctx._compress_ctx)
, _c_workspace(std::move(ctx._c_workspace))
, _c_buffer(std::move(ctx._c_buffer)) {
ctx._compress_ctx = nullptr;
}

async_stream_zstd::static_cctx&
async_stream_zstd::static_cctx::operator=(static_cctx&& x) noexcept {
if (this != &x) {
_compress_ctx = x._compress_ctx;
_c_workspace = std::move(x._c_workspace);
_c_buffer = std::move(x._c_buffer);

x._compress_ctx = nullptr;
}

return *this;
}

async_stream_zstd::static_dctx::static_dctx(size_t decompression_size) {
_d_buffer = ss::temporary_buffer<char>(ZSTD_DStreamOutSize());
auto d_workspace_size = ZSTD_estimateDStreamSize(decompression_size);
// zstd requires alignment for the buffer below.
_d_workspace = ss::allocate_aligned_buffer<char>(d_workspace_size, 8);

vassert(
_d_workspace,
"Failed to allocate zstd workspace of size {}",
d_workspace_size);

_compress_ctx = ZSTD_initStaticCCtx(_c_workspace.get(), c_workspace_size);
_decompress_ctx = ZSTD_initStaticDCtx(_d_workspace.get(), d_workspace_size);
}

throw_if_error(ZSTD_CCtx_setParameter(
_compress_ctx, ZSTD_c_compressionLevel, default_compression_level));
async_stream_zstd::static_dctx::static_dctx(static_dctx&& ctx) noexcept
: _decompress_ctx(ctx._decompress_ctx)
, _d_workspace(std::move(ctx._d_workspace))
, _d_buffer(std::move(ctx._d_buffer)) {
ctx._decompress_ctx = nullptr;
}

async_stream_zstd::static_dctx&
async_stream_zstd::static_dctx::operator=(static_dctx&& x) noexcept {
if (this != &x) {
_decompress_ctx = x._decompress_ctx;
_d_workspace = std::move(x._d_workspace);
_d_buffer = std::move(x._d_buffer);

x._decompress_ctx = nullptr;
}

return *this;
}

async_stream_zstd::async_stream_zstd(
size_t decompression_size, int number_of_contexts) {
_decompression_size = decompression_size;

for (auto i = 0; i < number_of_contexts; i++) {
_compression_ctx_pool.release_object(
static_cctx(default_compression_level));
_decompression_ctx_pool.release_object(static_dctx(decompression_size));
}
}

size_t async_stream_zstd::decompression_size() const {
return _decompression_size;
}

ss::future<iobuf> async_stream_zstd::compress(iobuf i_buf) {
auto lock = co_await _compress_mtx.get_units();
ss::abort_source as;
auto ctx_scope = co_await _compression_ctx_pool.allocate_scoped_object(as);
auto& ctx = *ctx_scope;

// reset compressor ctx
ZSTD_CCtx_reset(_compress_ctx, ZSTD_reset_session_only);
ZSTD_CCtx_reset(ctx._compress_ctx, ZSTD_reset_session_only);

// NOTE: always enable content size. **decompression** depends on this
throw_if_error(
ZSTD_CCtx_setPledgedSrcSize(_compress_ctx, i_buf.size_bytes()));
ZSTD_CCtx_setPledgedSrcSize(ctx._compress_ctx, i_buf.size_bytes()));

iobuf ret_buf;

Expand All @@ -119,10 +176,12 @@ ss::future<iobuf> async_stream_zstd::compress(iobuf i_buf) {

do {
ZSTD_outBuffer out = {
.dst = _c_buffer.get_write(), .size = _c_buffer.size(), .pos = 0};
remaining = ZSTD_endStream(_compress_ctx, &out);
.dst = ctx._c_buffer.get_write(),
.size = ctx._c_buffer.size(),
.pos = 0};
remaining = ZSTD_endStream(ctx._compress_ctx, &out);
throw_if_error(remaining);
ret_buf.append(_c_buffer.get(), out.pos);
ret_buf.append(ctx._c_buffer.get(), out.pos);
} while (remaining > 0);

co_return std::move(ret_buf);
Expand All @@ -138,13 +197,15 @@ ss::future<iobuf> async_stream_zstd::compress(iobuf i_buf) {
int finished;
do {
ZSTD_outBuffer out = {
.dst = _c_buffer.get_write(), .size = _c_buffer.size(), .pos = 0};
.dst = ctx._c_buffer.get_write(),
.size = ctx._c_buffer.size(),
.pos = 0};

const auto remaining = ZSTD_compressStream2(
_compress_ctx, &out, &in, mode);
ctx._compress_ctx, &out, &in, mode);
throw_if_error(remaining);

ret_buf.append(_c_buffer.get(), out.pos);
ret_buf.append(ctx._c_buffer.get(), out.pos);
finished = (mode == ZSTD_e_end) ? (remaining == 0)
: (in.pos == in.size);

Expand All @@ -161,9 +222,12 @@ ss::future<iobuf> async_stream_zstd::uncompress(iobuf i_buf) {
"Asked to stream_zstd::uncompress empty buffer");
}

auto lock = co_await _decompress_mtx.get_units();
ss::abort_source as;
auto ctx_scope = co_await _decompression_ctx_pool.allocate_scoped_object(
as);
auto& ctx = *ctx_scope;

ZSTD_DCtx_reset(_decompress_ctx, ZSTD_reset_session_only);
ZSTD_DCtx_reset(ctx._decompress_ctx, ZSTD_reset_session_only);

iobuf ret_buf;
size_t last_zstd_ret = 0;
Expand All @@ -173,13 +237,15 @@ ss::future<iobuf> async_stream_zstd::uncompress(iobuf i_buf) {

while (in.pos < in.size) {
ZSTD_outBuffer out = {
.dst = _d_buffer.get_write(), .size = _d_buffer.size(), .pos = 0};
.dst = ctx._d_buffer.get_write(),
.size = ctx._d_buffer.size(),
.pos = 0};

const auto zstd_ret = ZSTD_decompressStream(
_decompress_ctx, &out, &in);
ctx._decompress_ctx, &out, &in);
throw_if_error(zstd_ret);

ret_buf.append(_d_buffer.get(), out.pos);
ret_buf.append(ctx._d_buffer.get(), out.pos);
last_zstd_ret = zstd_ret;

co_await ss::coroutine::maybe_yield();
Expand Down
34 changes: 23 additions & 11 deletions src/v/compression/async_stream_zstd.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
*/

#pragma once

#include "bytes/iobuf.h"
#include "utils/mutex.h"
#include "utils/object_pool.h"

#include <seastar/core/aligned_buffer.hh>

Expand All @@ -29,27 +30,38 @@ namespace compression {
class async_stream_zstd {
public:
async_stream_zstd() = delete;
async_stream_zstd(size_t);
async_stream_zstd(size_t, int);

ss::future<iobuf> compress(iobuf);
ss::future<iobuf> uncompress(iobuf);

size_t decompression_size() const;

private:
size_t _decompression_size;
struct static_cctx {
static_cctx(int);
static_cctx(static_cctx&&) noexcept;
static_cctx& operator=(static_cctx&& x) noexcept;

mutex _compress_mtx;
mutex _decompress_mtx;
ZSTD_CCtx* _compress_ctx;
std::unique_ptr<char[], ss::free_deleter> _c_workspace;
ss::temporary_buffer<char> _c_buffer;
};

ZSTD_CCtx* _compress_ctx;
ZSTD_DCtx* _decompress_ctx;
struct static_dctx {
static_dctx(size_t);
static_dctx(static_dctx&&) noexcept;
static_dctx& operator=(static_dctx&& x) noexcept;

std::unique_ptr<char[], ss::free_deleter> _d_workspace;
std::unique_ptr<char[], ss::free_deleter> _c_workspace;
ZSTD_DCtx* _decompress_ctx;
std::unique_ptr<char[], ss::free_deleter> _d_workspace;
ss::temporary_buffer<char> _d_buffer;
};

size_t _decompression_size;

ss::temporary_buffer<char> _d_buffer;
ss::temporary_buffer<char> _c_buffer;
object_pool<static_cctx> _compression_ctx_pool;
object_pool<static_dctx> _decompression_ctx_pool;
};

void initialize_async_stream_zstd(size_t);
Expand Down
4 changes: 2 additions & 2 deletions src/v/compression/tests/zstd_stream_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ inline void uncompress_test(size_t data_size) {

inline ss::future<> async_compress_test(size_t data_size) {
auto o = gen(data_size);
compression::async_stream_zstd fn(2_MiB);
compression::async_stream_zstd fn(2_MiB, 1);

perf_tests::start_measuring_time();
perf_tests::do_not_optimize(co_await fn.compress(std::move(o)));
perf_tests::stop_measuring_time();
}

inline ss::future<> async_uncompress_test(size_t data_size) {
compression::async_stream_zstd fn(2_MiB);
compression::async_stream_zstd fn(2_MiB, 1);
auto o = co_await fn.compress(gen(data_size));

perf_tests::start_measuring_time();
Expand Down
6 changes: 3 additions & 3 deletions src/v/compression/tests/zstd_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ SEASTAR_THREAD_TEST_CASE(stream_zstd_test) {
}

SEASTAR_THREAD_TEST_CASE(async_stream_zstd_test) {
compression::async_stream_zstd fn(default_decompression_size);
compression::async_stream_zstd fn(default_decompression_size, 1);
auto test_sizes = get_test_sizes();
for (size_t i : sizes) {
iobuf buf = gen(i);
Expand Down Expand Up @@ -131,7 +131,7 @@ SEASTAR_THREAD_TEST_CASE(interleaved_async_stream_zstd_test) {
}

SEASTAR_THREAD_TEST_CASE(async_stream_to_stream_test) {
compression::async_stream_zstd fn(default_decompression_size);
compression::async_stream_zstd fn(default_decompression_size, 1);
compression::stream_zstd fn_s;
auto test_sizes = get_test_sizes();
for (size_t i : sizes) {
Expand All @@ -145,7 +145,7 @@ SEASTAR_THREAD_TEST_CASE(async_stream_to_stream_test) {
}

SEASTAR_THREAD_TEST_CASE(stream_to_async_stream_test) {
compression::async_stream_zstd fn(default_decompression_size);
compression::async_stream_zstd fn(default_decompression_size, 1);
compression::stream_zstd fn_s;
auto test_sizes = get_test_sizes();
for (size_t i : sizes) {
Expand Down

0 comments on commit 13223b6

Please sign in to comment.