Skip to content

Commit

Permalink
Merge pull request #6086 from VadimPlh/fix-tx-group-compaction
Browse files Browse the repository at this point in the history
[transaction] Fix group_*_tx events compaction
  • Loading branch information
rystsov authored Aug 22, 2022
2 parents ff888d5 + 75463f1 commit a2cc309
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 18 deletions.
77 changes: 63 additions & 14 deletions src/v/kafka/server/group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1676,6 +1676,35 @@ group::commit_tx(cluster::commit_group_tx_request r) {

// we commit only if a provided tx_seq matches prepared tx_seq

// It is fix for https://github.com/redpanda-data/redpanda/issues/5163.
// Problem is group_*_tx contains only producer_id in key, so compaction
// save only last records for this events. We need only save in logs
// consumed offset after commit_txs. For this we can write store_offset
// event together with group_commit_tx.
//
// Problem: this 2 events contains different record batch type. So we can
// not put it in one batch and write on disk atomically. But it is not a
// problem, because client got ok for commit_request (see
// tx_gateway_frontend). So redpanda will eventually finish commit and
// complete write for both this events.
model::record_batch_reader::data_t batches;
batches.reserve(2);

cluster::simple_batch_builder store_offset_builder(
model::record_batch_type::raft_data, model::offset(0));
for (const auto& [tp, metadata] : prepare_it->second.offsets) {
update_store_offset_builder(
store_offset_builder,
tp.topic,
tp.partition,
metadata.offset,
metadata.committed_leader_epoch,
metadata.metadata,
model::timestamp{-1});
}

batches.push_back(std::move(store_offset_builder).build());

group_log_commit_tx commit_tx;
commit_tx.group_id = r.group_id;
// TODO: https://app.clubhouse.io/vectorized/story/2200
Expand All @@ -1687,7 +1716,9 @@ group::commit_tx(cluster::commit_group_tx_request r) {
r.pid,
std::move(commit_tx));

auto reader = model::make_memory_record_batch_reader(std::move(batch));
batches.push_back(std::move(batch));

auto reader = model::make_memory_record_batch_reader(std::move(batches));

auto e = co_await _partition->raft()->replicate(
_term,
Expand Down Expand Up @@ -2062,6 +2093,29 @@ kafka::error_code map_store_offset_error_code(std::error_code ec) {
return error_code::unknown_server_error;
}

void group::update_store_offset_builder(
cluster::simple_batch_builder& builder,
const model::topic& name,
model::partition_id partition,
model::offset committed_offset,
leader_epoch committed_leader_epoch,
const ss::sstring& metadata,
model::timestamp commit_timestamp) {
offset_metadata_key key{
.group_id = _id, .topic = name, .partition = partition};

offset_metadata_value value{
.offset = committed_offset,
.leader_epoch = committed_leader_epoch,
.metadata = metadata,
.commit_timestamp = commit_timestamp,
};

auto kv = _md_serializer.to_kv(
offset_metadata_kv{.key = std::move(key), .value = std::move(value)});
builder.add_raw_kv(std::move(kv.key), std::move(kv.value));
}

group::offset_commit_stages group::store_offsets(offset_commit_request&& r) {
cluster::simple_batch_builder builder(
model::record_batch_type::raft_data, model::offset(0));
Expand All @@ -2071,19 +2125,14 @@ group::offset_commit_stages group::store_offsets(offset_commit_request&& r) {

for (const auto& t : r.data.topics) {
for (const auto& p : t.partitions) {
offset_metadata_key key{
.group_id = _id, .topic = t.name, .partition = p.partition_index};

offset_metadata_value value{
.offset = p.committed_offset,
.leader_epoch = p.committed_leader_epoch,
.metadata = p.committed_metadata.value_or(""),
.commit_timestamp = model::timestamp(p.commit_timestamp),
};

auto kv = _md_serializer.to_kv(offset_metadata_kv{
.key = std::move(key), .value = std::move(value)});
builder.add_raw_kv(std::move(kv.key), std::move(kv.value));
update_store_offset_builder(
builder,
t.name,
p.partition_index,
p.committed_offset,
p.committed_leader_epoch,
p.committed_metadata.value_or(""),
model::timestamp(p.commit_timestamp));

model::topic_partition tp(t.name, p.partition_index);
offset_metadata md{
Expand Down
10 changes: 10 additions & 0 deletions src/v/kafka/server/group.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "model/fundamental.h"
#include "model/namespace.h"
#include "model/record.h"
#include "model/timestamp.h"
#include "seastarx.h"
#include "utils/mutex.h"

Expand Down Expand Up @@ -747,6 +748,15 @@ class group final : public ss::enable_lw_shared_from_this<group> {
return false;
}

void update_store_offset_builder(
cluster::simple_batch_builder& builder,
const model::topic& name,
model::partition_id partition,
model::offset commited_offset,
leader_epoch commited_leader_epoch,
const ss::sstring& metadata,
model::timestamp commited_timestemp);

kafka::group_id _id;
group_state _state;
model::timestamp _state_timestamp;
Expand Down
5 changes: 1 addition & 4 deletions src/v/storage/segment_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,7 @@ struct clean_segment_value
inline bool is_compactible(const model::record_batch& b) {
return !(
b.header().type == model::record_batch_type::raft_configuration
|| b.header().type == model::record_batch_type::archival_metadata
|| b.header().type == model::record_batch_type::group_abort_tx
|| b.header().type == model::record_batch_type::group_commit_tx
|| b.header().type == model::record_batch_type::group_prepare_tx);
|| b.header().type == model::record_batch_type::archival_metadata);
}

} // namespace storage::internal

0 comments on commit a2cc309

Please sign in to comment.