Skip to content

Commit

Permalink
cloud_storage: Log client address in batch parser
Browse files Browse the repository at this point in the history
The client address is added to the remote segment batch parser logs. The
address cannot be set as a member because a remote segment reader may be
cached and have its configuration reset. The client address is always
read from the configuration to make sure we get the latest address.

(cherry picked from commit 0996d4f)
  • Loading branch information
abhijat committed Nov 14, 2023
1 parent 9dd7649 commit e4de1ce
Showing 1 changed file with 29 additions and 10 deletions.
39 changes: 29 additions & 10 deletions src/v/cloud_storage/remote_segment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1117,14 +1117,17 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
const model::record_batch_header& header) const override {
vlog(
_ctxlog.trace,
"accept_batch_start {}, current delta: {}",
"[{}] accept_batch_start {}, current delta: {}",
_config.client_address,
header,
_parent._cur_delta);

if (rp_to_kafka(header.base_offset) > _config.max_offset) {
vlog(
_ctxlog.debug,
"accept_batch_start stop parser because {} > {}(kafka offset)",
"[{}] accept_batch_start stop parser because {} > {}(kafka "
"offset)",
_config.client_address,
header.base_offset(),
_config.max_offset);
return batch_consumer::consume_result::stop_parser;
Expand All @@ -1136,7 +1139,8 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
if (model::record_batch_type::raft_data != header.type) {
vlog(
_ctxlog.debug,
"accept_batch_start skip because record batch type is {}",
"[{}] accept_batch_start skip because record batch type is {}",
_config.client_address,
header.type);
return batch_consumer::consume_result::skip_batch;
}
Expand All @@ -1147,9 +1151,10 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
rp_to_kafka(header.last_offset()) < _config.start_offset)) {
vlog(
_ctxlog.debug,
"accept_batch_start skip because "
"[{}] accept_batch_start skip because "
"last_kafka_offset {} (last_rp_offset: {}) < "
"config.start_offset: {}",
_config.client_address,
rp_to_kafka(header.last_offset()),
header.last_offset(),
_config.start_offset);
Expand All @@ -1159,15 +1164,19 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
if (
(_config.strict_max_bytes || _config.bytes_consumed)
&& (_config.bytes_consumed + header.size_bytes) > _config.max_bytes) {
vlog(_ctxlog.debug, "accept_batch_start stop because overbudget");
vlog(
_ctxlog.debug,
"[{}] accept_batch_start stop because overbudget",
_config.client_address);
_config.over_budget = true;
return batch_consumer::consume_result::stop_parser;
}

if (_config.first_timestamp > header.max_timestamp) {
vlog(
_ctxlog.debug,
"accept_batch_start skip because header timestamp is {}",
"[{}] accept_batch_start skip because header timestamp is {}",
_config.client_address,
header.first_timestamp);
return batch_consumer::consume_result::skip_batch;
}
Expand All @@ -1182,7 +1191,8 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
size_t /*size_on_disk*/) override {
vlog(
_ctxlog.trace,
"consume_batch_start called for {}",
"[{}] consume_batch_start called for {}",
_config.client_address,
header.base_offset);
_header = header;
_header.ctx.term = _term;
Expand All @@ -1197,7 +1207,10 @@ class remote_segment_batch_consumer : public storage::batch_consumer {
// changing the _cur_delta. The _cur_delta that is be used for current
// record batch can only account record batches in all previous batches.
vlog(
_ctxlog.debug, "skip_batch_start called for {}", header.base_offset);
_ctxlog.debug,
"[{}] skip_batch_start called for {}",
_config.client_address,
header.base_offset);
advance_config_offsets(header);
if (
std::count(
Expand Down Expand Up @@ -1390,10 +1403,16 @@ ss::future<> remote_segment_batch_reader::stop() {
co_return;
}

vlog(_ctxlog.debug, "remote_segment_batch_reader::stop");
vlog(
_ctxlog.debug,
"[{}] remote_segment_batch_reader::stop",
_config.client_address);
co_await _gate.close();
if (_parser) {
vlog(_ctxlog.debug, "remote_segment_batch_reader::stop - parser-close");
vlog(
_ctxlog.debug,
"[{}] remote_segment_batch_reader::stop - parser-close",
_config.client_address);
co_await _parser->close();
_parser.reset();
}
Expand Down

0 comments on commit e4de1ce

Please sign in to comment.