Skip to content

Commit

Permalink
[BACKPORT 2.20][#19212] CDCSDK: Support deleting a CDCSDK stream usin…
Browse files Browse the repository at this point in the history
…g replication slot name

Summary:
**Backport Description**
The merge was clean. The only difference here is that the changes are under a TEST flag.

**Original Description**
Original commit: a8e2b04 / D29148
Add support for deleting a CDCSDK stream via its replication slot name. This is the second step in supporting a SQL syntax for CDC via the PG logical replication model.

The `DeleteCDCStreamRequestPB` proto now accepts a repeated list of `cdcsdk_ysql_replication_slot_name` as well. All the CDCSDK streams with these replication slot names are also deleted as part of this request.

**Follow Ups**
1. YSQL layer changes: See #18724 as the tracking issue

**Upgrade/Rollback safety**
This diff modifies the sys-catalog entry stored in yb-master, the changes are not rollback safe. As a result, these changes will be disabled during an upgrade via an autoflag (yb_enable_replication_commands - `LocalPersisted`) and only enabled after the upgrade is finalized.

The responsibility of checking the autoflag `yb_enable_replication_commands` is on the clients of the `DeleteCDCStream` RPC. The client is the YSQL layer commands of Replication Slot which will be added in future diffs.
Jira: DB-8009

Test Plan:
`./yb_build.sh --cxx-test master_xrepl-test --gtest_filter MasterTestXRepl.TestDeleteCDCStreamWithReplicationSlotName`
`./yb_build.sh --cxx-test master_xrepl-test --gtest_filter MasterTestXRepl.TestDeleteCDCStreamWithStreamIdAndReplicationSlotName`
`./yb_build.sh --cxx-test master_xrepl-test --gtest_filter MasterTestXRepl.TestDeleteCDCStreamNotFound`

Reviewers: hsunder, skumar, asrinivasan, xCluster

Reviewed By: skumar

Subscribers: bogdan, ybase, ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D30995
  • Loading branch information
dr0pdb committed Dec 13, 2023
1 parent 8ca55ae commit cf238ca
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 38 deletions.
3 changes: 3 additions & 0 deletions src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -2707,6 +2707,9 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
std::vector<CDCStreamInfoPtr> FindCDCStreamsForTablesToDeleteMetadata(
const std::unordered_set<TableId>& table_ids) const REQUIRES_SHARED(mutex_);

Result<std::optional<CDCStreamInfoPtr>> GetStreamIfValidForDelete(
const xrepl::StreamId& stream_id, bool force_delete) REQUIRES_SHARED(mutex_);

Status FillHeartbeatResponseEncryption(
const SysClusterConfigEntryPB& cluster_config,
const TSHeartbeatRequestPB* req,
Expand Down
2 changes: 2 additions & 0 deletions src/yb/master/master_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ message DeleteCDCStreamRequestPB {
repeated bytes stream_id = 1;
optional bool ignore_errors = 2 [default = false];
optional bool force_delete = 3 [default = false];
repeated string cdcsdk_ysql_replication_slot_name = 4;
}

message DeleteCDCStreamResponsePB {
// The error, if an error occurred with this request.
optional MasterErrorPB error = 1;
repeated bytes not_found_stream_ids = 2;
repeated string not_found_cdcsdk_ysql_replication_slot_names = 3;
}

enum IdTypePB {
Expand Down
112 changes: 104 additions & 8 deletions src/yb/master/master_xrepl-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class MasterTestXRepl : public MasterTestBase {
Result<GetCDCStreamResponsePB> GetCDCStream(const xrepl::StreamId& stream_id);
Result<GetCDCStreamResponsePB> GetCDCStream(const std::string& cdcsdk_ysql_replication_slot_name);
Status DeleteCDCStream(const xrepl::StreamId& stream_id);
Result<DeleteCDCStreamResponsePB> DeleteCDCStream(
const std::vector<xrepl::StreamId>& stream_ids,
const std::vector<std::string>& cdcsdk_ysql_replication_slot_name);
Result<ListCDCStreamsResponsePB> ListCDCStreams();
Result<ListCDCStreamsResponsePB> ListCDCSDKStreams();
Result<bool> IsObjectPartOfXRepl(const TableId& table_id);
Expand All @@ -77,6 +80,7 @@ class MasterTestXRepl : public MasterTestBase {
Status DeleteUniverseReplication(const std::string& producer_id);
Result<GetUniverseReplicationResponsePB> GetUniverseReplication(const std::string& producer_id);

Status CreateTableWithTableId(TableId* table_id);
};

Result<xrepl::StreamId> MasterTestXRepl::CreateCDCStream(const TableId& table_id) {
Expand Down Expand Up @@ -181,6 +185,25 @@ Status MasterTestXRepl::DeleteCDCStream(const xrepl::StreamId& stream_id) {
return Status::OK();
}

Result<DeleteCDCStreamResponsePB> MasterTestXRepl::DeleteCDCStream(
const std::vector<xrepl::StreamId>& stream_ids,
const std::vector<std::string>& cdcsdk_ysql_replication_slot_names) {
DeleteCDCStreamRequestPB req;
DeleteCDCStreamResponsePB resp;
for (const auto& stream_id : stream_ids) {
req.add_stream_id(stream_id.ToString());
}
for (const auto& replication_slot_name : cdcsdk_ysql_replication_slot_names) {
req.add_cdcsdk_ysql_replication_slot_name(replication_slot_name);
}

RETURN_NOT_OK(proxy_replication_->DeleteCDCStream(req, &resp, ResetAndGetController()));
if (resp.has_error()) {
RETURN_NOT_OK(StatusFromPB(resp.error().status()));
}
return resp;
}

Result<ListCDCStreamsResponsePB> MasterTestXRepl::ListCDCStreams() {
ListCDCStreamsRequestPB req;
ListCDCStreamsResponsePB resp;
Expand Down Expand Up @@ -259,10 +282,14 @@ Status MasterTestXRepl::DeleteUniverseReplication(const std::string& producer_id
return Status::OK();
}

Status MasterTestXRepl::CreateTableWithTableId(TableId* table_id) {
return CreateTable(kTableName, kTableSchema, table_id);
}

TEST_F(MasterTestXRepl, TestDisableTruncation) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_disable_truncate_table) = true;
TableId table_id;
ASSERT_OK(CreateTable(kTableName, kTableSchema, &table_id));
ASSERT_OK(CreateTableWithTableId(&table_id));
auto s = TruncateTableById(table_id);
EXPECT_TRUE(s.IsNotSupported());
}
Expand All @@ -280,7 +307,7 @@ TEST_F(MasterTestXRepl, TestCreateCDCStreamInvalidTable) {

TEST_F(MasterTestXRepl, TestCreateCDCStream) {
TableId table_id;
ASSERT_OK(CreateTable(kTableName, kTableSchema, &table_id));
ASSERT_OK(CreateTableWithTableId(&table_id));

ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_table_num_tablets) = 1;
auto stream_id = ASSERT_RESULT(CreateCDCStream(table_id));
Expand Down Expand Up @@ -453,7 +480,7 @@ TEST_F(MasterTestXRepl, TestCreateCDCStreamForNamespaceMissingReplicationSlotNam

TEST_F(MasterTestXRepl, TestDeleteCDCStream) {
TableId table_id;
ASSERT_OK(CreateTable(kTableName, kTableSchema, &table_id));
ASSERT_OK(CreateTableWithTableId(&table_id));

ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_table_num_tablets) = 1;
auto stream_id = ASSERT_RESULT(CreateCDCStream(table_id));
Expand All @@ -463,15 +490,84 @@ TEST_F(MasterTestXRepl, TestDeleteCDCStream) {

ASSERT_OK(DeleteCDCStream(stream_id));

resp.Clear();
resp = ASSERT_RESULT(GetCDCStream(stream_id));
ASSERT_TRUE(resp.has_error());
ASSERT_EQ(MasterErrorPB::OBJECT_NOT_FOUND, resp.error().code());
}

TEST_F(MasterTestXRepl, TestDeleteCDCStreamWithReplicationSlotName) {
CreateNamespaceResponsePB create_namespace_resp;
ASSERT_OK(CreatePgsqlNamespace(kNamespaceName, kPgsqlNamespaceId, &create_namespace_resp));
auto ns_id = create_namespace_resp.id();

ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_table_num_tablets) = 1;
auto stream_id = ASSERT_RESULT(CreateCDCStreamForNamespace(ns_id, kPgReplicationSlotName));
auto resp = ASSERT_RESULT(GetCDCStream(stream_id));

ASSERT_OK(DeleteCDCStream({} /*stream_ids*/, {kPgReplicationSlotName}));

resp = ASSERT_RESULT(GetCDCStream(stream_id));
ASSERT_TRUE(resp.has_error());
ASSERT_EQ(MasterErrorPB::OBJECT_NOT_FOUND, resp.error().code());
}

TEST_F(MasterTestXRepl, TestDeleteCDCStreamWithStreamIdAndReplicationSlotName) {
// Setup two streams - with and without replication slot name.
TableId table_id;
ASSERT_OK(CreateTableWithTableId(&table_id));

CreateNamespaceResponsePB create_namespace_resp;
ASSERT_OK(CreatePgsqlNamespace(kNamespaceName, kPgsqlNamespaceId, &create_namespace_resp));
auto ns_id = create_namespace_resp.id();

ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_table_num_tablets) = 1;
auto stream_id_1 = ASSERT_RESULT(CreateCDCStream(table_id));
auto stream_id_2 =
ASSERT_RESULT(CreateCDCStreamForNamespace(ns_id, kPgReplicationSlotName));

// Streams were created successfully.
auto resp = ASSERT_RESULT(GetCDCStream(stream_id_1));
ASSERT_EQ(resp.stream().table_id().Get(0), table_id);

resp = ASSERT_RESULT(GetCDCStream(stream_id_2));
ASSERT_EQ(resp.stream().namespace_id(), ns_id);
ASSERT_EQ(resp.stream().cdcsdk_ysql_replication_slot_name(), kPgReplicationSlotName);

// Delete streams:
// 1. Using stream_id
// 2. Using replication slot name
auto delete_resp = ASSERT_RESULT(DeleteCDCStream({stream_id_1}, {kPgReplicationSlotName}));

resp = ASSERT_RESULT(GetCDCStream(stream_id_1));
ASSERT_TRUE(resp.has_error());
ASSERT_EQ(MasterErrorPB::OBJECT_NOT_FOUND, resp.error().code());

resp = ASSERT_RESULT(GetCDCStream(stream_id_2));
ASSERT_TRUE(resp.has_error());
ASSERT_EQ(MasterErrorPB::OBJECT_NOT_FOUND, resp.error().code());

ASSERT_EQ(delete_resp.not_found_stream_ids_size(), 0);
ASSERT_EQ(delete_resp.not_found_cdcsdk_ysql_replication_slot_names_size(), 0);
}

TEST_F(MasterTestXRepl, TestDeleteCDCStreamNotFound) {
DeleteCDCStreamRequestPB req;
DeleteCDCStreamResponsePB resp;
req.add_stream_id("00000000000000000000000000000000");
req.add_cdcsdk_ysql_replication_slot_name("non_existent_replication_slot");

ASSERT_OK(proxy_replication_->DeleteCDCStream(req, &resp, ResetAndGetController()));
ASSERT_TRUE(resp.has_error());
ASSERT_EQ(resp.not_found_stream_ids_size(), 1);
ASSERT_EQ(resp.not_found_stream_ids().Get(0), "00000000000000000000000000000000");
ASSERT_EQ(resp.not_found_cdcsdk_ysql_replication_slot_names_size(), 1);
ASSERT_EQ(
resp.not_found_cdcsdk_ysql_replication_slot_names().Get(0), "non_existent_replication_slot");
}

TEST_F(MasterTestXRepl, TestDeleteTableWithCDCStream) {
TableId table_id;
ASSERT_OK(CreateTable(kTableName, kTableSchema, &table_id));
ASSERT_OK(CreateTableWithTableId(&table_id));

ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_table_num_tablets) = 1;
auto stream_id = ASSERT_RESULT(CreateCDCStream(table_id));
Expand All @@ -490,7 +586,7 @@ TEST_F(MasterTestXRepl, TestDeleteTableWithCDCStream) {
TEST_F(MasterTestXRepl, YB_DISABLE_TEST_IN_SANITIZERS(TestDeleteCDCStreamNoForceDelete)) {
// #12255. Added 'force_delete' flag, but only run this check if the client code specifies it.
TableId table_id;
ASSERT_OK(CreateTable(kTableName, kTableSchema, &table_id));
ASSERT_OK(CreateTableWithTableId(&table_id));

auto stream_id = xrepl::StreamId::Nil();
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_table_num_tablets) = 1;
Expand Down Expand Up @@ -525,7 +621,7 @@ TEST_F(MasterTestXRepl, YB_DISABLE_TEST_IN_SANITIZERS(TestDeleteCDCStreamNoForce

TEST_F(MasterTestXRepl, TestListCDCStreams) {
TableId table_id;
ASSERT_OK(CreateTable(kTableName, kTableSchema, &table_id));
ASSERT_OK(CreateTableWithTableId(&table_id));

ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_table_num_tablets) = 1;
auto stream_id = ASSERT_RESULT(CreateCDCStream(table_id));
Expand Down Expand Up @@ -571,7 +667,7 @@ TEST_F(MasterTestXRepl, TestListCDCStreamsCDCSDKWithReplicationSlot) {

TEST_F(MasterTestXRepl, TestIsObjectPartOfXRepl) {
TableId table_id;
ASSERT_OK(CreateTable(kTableName, kTableSchema, &table_id));
ASSERT_OK(CreateTableWithTableId(&table_id));

FLAGS_cdc_state_table_num_tablets = 1;
ASSERT_RESULT(CreateCDCStream(table_id));
Expand Down
89 changes: 59 additions & 30 deletions src/yb/master/xrepl_catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1044,54 +1044,58 @@ Status CatalogManager::DeleteCDCStream(
LOG(INFO) << "Servicing DeleteCDCStream request from " << RequestorString(rpc) << ": "
<< req->ShortDebugString();

if (req->stream_id_size() < 1) {
if (req->stream_id_size() == 0 && req->cdcsdk_ysql_replication_slot_name_size() == 0) {
return STATUS(
InvalidArgument, "No CDC Stream ID given", req->ShortDebugString(),
InvalidArgument, "No CDC Stream ID or YSQL Replication Slot Name given",
MasterError(MasterErrorPB::INVALID_REQUEST));
}

std::vector<CDCStreamInfoPtr> streams;
{
SharedLock lock(mutex_);
for (const auto& stream_id : req->stream_id()) {
auto stream =
FindPtrOrNull(cdc_stream_map_, VERIFY_RESULT(xrepl::StreamId::FromString(stream_id)));

if (stream == nullptr || stream->LockForRead()->is_deleting()) {
for (const auto& stream_id : req->stream_id()) {
auto stream_opt = VERIFY_RESULT(GetStreamIfValidForDelete(
VERIFY_RESULT(xrepl::StreamId::FromString(stream_id)), req->force_delete()));
if (stream_opt) {
streams.emplace_back(std::move(*stream_opt));
} else {
resp->add_not_found_stream_ids(stream_id);
LOG(WARNING) << "CDC stream does not exist: " << stream_id;
}
}

for (const auto& replication_slot_name : req->cdcsdk_ysql_replication_slot_name()) {
auto slot_name = ReplicationSlotName(replication_slot_name);
auto stream_it = FindOrNull(cdcsdk_replication_slots_to_stream_map_, slot_name);
auto stream_id = stream_it ? *stream_it : xrepl::StreamId::Nil();
auto stream_opt =
VERIFY_RESULT(GetStreamIfValidForDelete(std::move(stream_id), req->force_delete()));
if (stream_opt) {
streams.emplace_back(std::move(*stream_opt));
} else {
auto ltm = stream->LockForRead();
if (req->has_force_delete() && req->force_delete() == false) {
bool active = (ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE);
bool is_WAL = false;
for (const auto& option : ltm->pb.options()) {
if (option.key() == "record_format" && option.value() == "WAL") {
is_WAL = true;
}
}
if (is_WAL && active) {
return STATUS(
NotSupported,
"Cannot delete an xCluster Stream in replication. "
"Use 'force_delete' to override",
req->ShortDebugString(),
MasterError(MasterErrorPB::INVALID_REQUEST));
}
}
streams.push_back(stream);
resp->add_not_found_cdcsdk_ysql_replication_slot_names(replication_slot_name);
}
}
}

if (!resp->not_found_stream_ids().empty() && !req->ignore_errors()) {
string missing_streams;
JoinElements(resp->not_found_stream_ids(), ",", &missing_streams);
const auto& not_found_stream_ids = resp->not_found_stream_ids();
const auto& not_found_cdcsdk_ysql_replication_slot_names =
resp->not_found_cdcsdk_ysql_replication_slot_names();
if ((!not_found_stream_ids.empty() || !not_found_cdcsdk_ysql_replication_slot_names.empty()) &&
!req->ignore_errors()) {
std::vector<std::string> missing_streams(
resp->not_found_stream_ids_size() +
resp->not_found_cdcsdk_ysql_replication_slot_names_size());
missing_streams.insert(
missing_streams.end(), not_found_stream_ids.begin(), not_found_stream_ids.end());
missing_streams.insert(
missing_streams.end(), not_found_cdcsdk_ysql_replication_slot_names.begin(),
not_found_cdcsdk_ysql_replication_slot_names.end());
return STATUS(
NotFound,
Format(
"Did not find all requested CDC streams. Missing streams: [$0]. Request: $1",
missing_streams, req->ShortDebugString()),
JoinStrings(missing_streams, ","), req->ShortDebugString()),
MasterError(MasterErrorPB::OBJECT_NOT_FOUND));
}

Expand All @@ -1111,6 +1115,31 @@ Status CatalogManager::DeleteCDCStream(
return Status::OK();
}

Result<std::optional<CDCStreamInfoPtr>> CatalogManager::GetStreamIfValidForDelete(
const xrepl::StreamId& stream_id, bool force_delete) {
auto stream = FindPtrOrNull(cdc_stream_map_, stream_id);
if (stream == nullptr || stream->LockForRead()->is_deleting()) {
return std::nullopt;
}

auto ltm = stream->LockForRead();
if (!force_delete && ltm->pb.state() == SysCDCStreamEntryPB::ACTIVE) {
for (const auto& option : ltm->pb.options()) {
if (option.key() == "record_format") {
if (option.value() == "WAL") {
return STATUS(
NotSupported,
"Cannot delete an xCluster Stream in replication. "
"Use 'force_delete' to override",
MasterError(MasterErrorPB::INVALID_REQUEST));
}
break;
}
}
}
return stream;
}

Status CatalogManager::MarkCDCStreamsForMetadataCleanup(
const std::vector<CDCStreamInfoPtr>& streams, SysCDCStreamEntryPB::State state) {
if (streams.empty()) {
Expand Down

0 comments on commit cf238ca

Please sign in to comment.