Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: apply limits to segment.bytes #6492

Merged
merged 2 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,31 @@ configuration::configuration()
: log_segment_size(
*this,
"log_segment_size",
"How large in bytes should each log segment be (default 1G)",
"Default log segment size in bytes for topics which do not set "
"segment.bytes",
{.needs_restart = needs_restart::no,
.example = "2147483648",
.visibility = visibility::tunable},
1_GiB,
{.min = 1_MiB})
, log_segment_size_min(
*this,
"log_segment_size_min",
"Lower bound on topic segment.bytes: lower values will be clamped to "
"this limit",
{.needs_restart = needs_restart::no,
.example = "16777216",
.visibility = visibility::tunable},
std::nullopt)
, log_segment_size_max(
*this,
"log_segment_size_max",
"Upper bound on topic segment.bytes: higher values will be clamped to "
"this limit",
{.needs_restart = needs_restart::no,
.example = "268435456",
.visibility = visibility::tunable},
std::nullopt)
, compacted_log_segment_size(
*this,
"compacted_log_segment_size",
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ namespace config {
struct configuration final : public config_store {
// WAL
bounded_property<uint64_t> log_segment_size;
property<std::optional<uint64_t>> log_segment_size_min;
property<std::optional<uint64_t>> log_segment_size_max;
bounded_property<uint64_t> compacted_log_segment_size;
property<std::chrono::milliseconds> readers_cache_eviction_timeout_ms;
// Network
Expand Down
28 changes: 23 additions & 5 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ namespace storage {
disk_log_impl::disk_log_impl(
ntp_config cfg, log_manager& manager, segment_set segs, kvstore& kvstore)
: log::impl(std::move(cfg))
, _segment_size_jitter(storage::internal::random_jitter())
, _manager(manager)
, _segment_size_jitter(
internal::random_jitter(_manager.config().segment_size_jitter))
, _segs(std::move(segs))
, _kvstore(kvstore)
, _start_offset(read_start_offset())
Expand Down Expand Up @@ -828,12 +829,29 @@ ss::future<> disk_log_impl::flush() {

size_t disk_log_impl::max_segment_size() const {
// override for segment size
size_t result;
if (config().has_overrides() && config().get_overrides().segment_size) {
return *config().get_overrides().segment_size;
result = *config().get_overrides().segment_size;
} else {
// no overrides use defaults
result = config().is_compacted()
? _manager.config().compacted_segment_size()
: _manager.config().max_segment_size();
}

// Clamp to safety limits on segment sizes, in case the
// property was set without proper validation (e.g. on
// an older version or before limits were set)
auto min_limit = config::shard_local_cfg().log_segment_size_min();
auto max_limit = config::shard_local_cfg().log_segment_size_max();
if (min_limit) {
result = std::max(*min_limit, result);
}
// no overrides use defaults
return config().is_compacted() ? _manager.config().compacted_segment_size()
: _manager.config().max_segment_size();
if (max_limit) {
result = std::min(*max_limit, result);
}

return result;
}

size_t disk_log_impl::bytes_left_before_roll() const {
Expand Down
2 changes: 1 addition & 1 deletion src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ class disk_log_impl final : public log::impl {
ss::promise<model::offset> promise;
ss::abort_source::subscription subscription;
};
float _segment_size_jitter;
bool _closed{false};
ss::gate _compaction_gate;
log_manager& _manager;
float _segment_size_jitter;
segment_set _segs;
kvstore& _kvstore;
model::offset _start_offset;
Expand Down
4 changes: 4 additions & 0 deletions src/v/storage/log_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "storage/log_housekeeping_meta.h"
#include "storage/ntp_config.h"
#include "storage/segment.h"
#include "storage/segment_utils.h"
#include "storage/storage_resources.h"
#include "storage/types.h"
#include "storage/version.h"
Expand Down Expand Up @@ -123,6 +124,9 @@ struct log_config {
ss::sstring base_dir;
config::binding<size_t> max_segment_size;

// Default 5% jitter on segment size thresholds
internal::jitter_percents segment_size_jitter{5};

// compacted segment size
config::binding<size_t> compacted_segment_size;
config::binding<size_t> max_compacted_segment_size;
Expand Down
3 changes: 1 addition & 2 deletions src/v/storage/segment_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,10 @@ ss::future<> do_swap_data_file_handles(
std::filesystem::path compacted_index_path(std::filesystem::path segment_path);

using jitter_percents = named_type<int, struct jitter_percents_tag>;
static constexpr jitter_percents default_segment_size_jitter(5);

// Generates a random jitter percentage [as a fraction] with in the passed
// percents range.
float random_jitter(jitter_percents = default_segment_size_jitter);
float random_jitter(jitter_percents);

// key types used to store data in key-value store
enum class kvstore_key_type : int8_t {
Expand Down
12 changes: 10 additions & 2 deletions src/v/storage/tests/segment_size_jitter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,24 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "storage/log_manager.h"
#include "storage/segment_utils.h"

#include <seastar/testing/thread_test_case.hh>

SEASTAR_THREAD_TEST_CASE(test_segment_size_jitter_calculation) {
storage::log_config cfg(
storage::log_config::storage_type::disk,
"/tmp",
100_MiB,
storage::debug_sanitize_files::yes);

std::array<size_t, 5> sizes = {1_GiB, 2_GiB, 100_MiB, 300_MiB, 10_GiB};
for (auto original_size : sizes) {
for (int i = 0; i < 100; ++i) {
auto new_sz = original_size
* (1 + storage::internal::random_jitter());
auto new_sz
= original_size
* (1 + storage::internal::random_jitter(cfg.segment_size_jitter));
BOOST_REQUIRE_GE(new_sz, 0.95f * original_size);
BOOST_REQUIRE_LE(new_sz, 1.05f * original_size);
}
Expand Down
79 changes: 74 additions & 5 deletions src/v/storage/tests/storage_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,74 @@ FIXTURE_TEST(check_max_segment_size, storage_test_fixture) {
BOOST_REQUIRE_EQUAL(size3 - size2, 2);
}

FIXTURE_TEST(check_max_segment_size_limits, storage_test_fixture) {
auto cfg = default_log_config(test_dir);

// Apply limits to the effective segment size: we may configure
// something different per-topic, but at runtime the effective
// segment size will be clamped to this range.
config::shard_local_cfg().log_segment_size_min.set_value(
std::make_optional(50_KiB));
config::shard_local_cfg().log_segment_size_max.set_value(
std::make_optional(200_KiB));

std::exception_ptr ex;
try {
// Initially 100KiB configured segment size, it is within the range
auto mock = config::mock_property<size_t>(100_KiB);

cfg.max_segment_size = mock.bind();
cfg.stype = storage::log_config::storage_type::disk;

ss::abort_source as;
storage::log_manager mgr = make_log_manager(cfg);
auto deferred = ss::defer([&mgr]() mutable { mgr.stop().get0(); });
auto ntp = model::ntp("default", "test", 0);
storage::ntp_config ntp_cfg(ntp, mgr.config().base_dir);
auto log = mgr.manage(std::move(ntp_cfg)).get0();
auto disk_log = get_disk_log(log);

BOOST_REQUIRE_EQUAL(disk_log->segments().size(), 0);

// Write 100 * 1_KiB batches, should yield 1 full segment
auto result = append_exactly(log, 50, 1_KiB).get0(); // 100*1_KiB
BOOST_REQUIRE_EQUAL(disk_log->segments().size(), 1);
result = append_exactly(log, 100, 1_KiB).get0(); // 100*1_KiB
BOOST_REQUIRE_EQUAL(disk_log->segments().size(), 2);

// A too-low segment size: should be clamped to the lower bound
mock.update(1_KiB);
disk_log->force_roll(ss::default_priority_class()).get();
BOOST_REQUIRE_EQUAL(disk_log->segments().size(), 3);

// Exceeding the apparent segment size doesn't roll, because it was
// clamped
result = append_exactly(log, 5, 1_KiB).get0();
BOOST_REQUIRE_EQUAL(disk_log->segments().size(), 3);
// Exceeding the lower bound segment size does cause a roll
result = append_exactly(log, 55, 1_KiB).get0();
BOOST_REQUIRE_EQUAL(disk_log->segments().size(), 4);

// A too-high segment size: should be clamped to the upper bound
mock.update(2000_KiB);
disk_log->force_roll(ss::default_priority_class()).get();
BOOST_REQUIRE_EQUAL(disk_log->segments().size(), 5);
// Exceeding the upper bound causes a roll, even if we didn't reach
// the user-configured segment size
result = append_exactly(log, 201, 1_KiB).get0();
BOOST_REQUIRE_EQUAL(disk_log->segments().size(), 6);
} catch (...) {
ex = std::current_exception();
}

config::shard_local_cfg().log_segment_size_min.reset();
config::shard_local_cfg().log_segment_size_max.reset();

if (ex) {
throw ex;
}
}

FIXTURE_TEST(partition_size_while_cleanup, storage_test_fixture) {
auto cfg = default_log_config(test_dir);
// make sure segments are small
Expand Down Expand Up @@ -1237,8 +1305,7 @@ FIXTURE_TEST(partition_size_while_cleanup, storage_test_fixture) {
as);

// Compact 10 times, with a configuration calling for 60kiB max log size.
// This results in approximately prefix truncating at offset 50, although
// this is nondeterministic due to max_segment_size jitter.
// This results in prefix truncating at offset 50.
for (int i = 0; i < 10; ++i) {
log.compact(ccfg).get0();
}
Expand All @@ -1248,9 +1315,7 @@ FIXTURE_TEST(partition_size_while_cleanup, storage_test_fixture) {
auto lstats_after = log.offsets();
BOOST_REQUIRE_EQUAL(
lstats_after.committed_offset, lstats_before.committed_offset);

// Cannot assert on lstats_after.start_offset: its value depends on
// the jitter applied in disk_log_impl::_max_segment_size
BOOST_REQUIRE_EQUAL(lstats_after.start_offset, model::offset{50});

auto batches = read_and_validate_all_batches(log);
auto total_batch_size = std::accumulate(
Expand All @@ -1276,6 +1341,10 @@ FIXTURE_TEST(partition_size_while_cleanup, storage_test_fixture) {
FIXTURE_TEST(check_segment_size_jitter, storage_test_fixture) {
auto cfg = default_log_config(test_dir);

// Switch on jitter: it is off by default in default_log_config because
// for most tests randomness is undesirable.
cfg.segment_size_jitter = storage::internal::jitter_percents{5};

// defaults
cfg.max_segment_size = config::mock_binding<size_t>(100_KiB);
cfg.stype = storage::log_config::storage_type::disk;
Expand Down
7 changes: 6 additions & 1 deletion src/v/storage/tests/storage_test_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,18 @@ class storage_test_fixture {

auto cache = i > 50 ? storage::with_cache::yes
: storage::with_cache::no;
return storage::log_config(
auto cfg = storage::log_config(
stype,
std::move(test_dir),
200_MiB,
storage::debug_sanitize_files::yes,
ss::default_priority_class(),
cache);

// Disable jitter for unit tests
cfg.segment_size_jitter = storage::internal::jitter_percents{0};

return cfg;
}

void
Expand Down