Skip to content

Commit

Permalink
[yugabyte#11729][DocDB][xCluster] Fix for replication not working if …
Browse files Browse the repository at this point in the history
…user upgrades to a branch with CDCSDK code changes

Summary:
With the changes for CDCSDK, we have separate `source_type` values i.e. `XCLUSTER` for xCluster replication and `CDCSDK` for the new changes. Similarly there is another option i.e. `checkpoint_type` which can have `IMPLICIT` and `EXPLICIT` values.

If a stream for replication has been created before upgrading, it was unable to continue replication after upgrading to the latest version since the `source_type` and `checkpoint_type` options were missing from it as it has only been introduced with the CDCSDK changes only.

Test Plan:
* Manually tested with a custom build on dev portal

Reviewers: sergei, jhe, mkantimath, skumar, rahuldesirazu

Reviewed By: skumar, rahuldesirazu

Subscribers: rahuldesirazu, iamoncar, sdash, ybase, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D15989
  • Loading branch information
vaibhav-yb authored and nathanhjli committed Mar 31, 2022
1 parent 360d35a commit f9843dd
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 0 deletions.
17 changes: 17 additions & 0 deletions ent/src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,20 @@ CHECKED_STATUS VerifyArg(const SetCDCCheckpointRequestPB& req) {
return Status::OK();
}

// This function is to handle the upgrade scenario where the DB is upgraded from a version
// without CDCSDK changes to the one with it. So in case, some required options are missing,
// the default values will be added for the same.
void AddDefaultOptionsIfMissing(std::unordered_map<std::string, std::string>* options) {
if ((*options).find(cdc::kSourceType) == (*options).end()) {
(*options).emplace(cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER));
}

if ((*options).find(cdc::kCheckpointType) == (*options).end()) {
(*options).emplace(cdc::kCheckpointType,
CDCCheckpointType_Name(cdc::CDCCheckpointType::IMPLICIT));
}
}

} // namespace

template <class ReqType, class RespType>
Expand Down Expand Up @@ -2107,6 +2121,9 @@ Result<std::shared_ptr<StreamMetadata>> CDCServiceImpl::GetStream(const std::str
RETURN_NOT_OK(client()->GetCDCStream(stream_id, &ns_id, &object_ids, &options));

auto stream_metadata = std::make_shared<StreamMetadata>();

AddDefaultOptionsIfMissing(&options);

for (const auto& option : options) {
if (option.first == kRecordType) {
SCHECK(CDCRecordType_Parse(option.second, &stream_metadata->record_type),
Expand Down
32 changes: 32 additions & 0 deletions ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,34 @@ class CDCStreamLoader : public Visitor<PersistentCDCStreamInfo> {
public:
explicit CDCStreamLoader(CatalogManager* catalog_manager) : catalog_manager_(catalog_manager) {}

void AddDefaultValuesIfMissing(const SysCDCStreamEntryPB& metadata,
CDCStreamInfo::WriteLock* l) {
bool source_type_present = false;
bool checkpoint_type_present = false;

// Iterate over all the options to check if checkpoint_type and source_type are present.
for (auto option : metadata.options()) {
if (option.key() == cdc::kSourceType) {
source_type_present = true;
}
if (option.key() == cdc::kCheckpointType) {
checkpoint_type_present = true;
}
}

if (!source_type_present) {
auto source_type_opt = l->mutable_data()->pb.add_options();
source_type_opt->set_key(cdc::kSourceType);
source_type_opt->set_value(cdc::CDCRequestSource_Name(cdc::XCLUSTER));
}

if (!checkpoint_type_present) {
auto checkpoint_type_opt = l->mutable_data()->pb.add_options();
checkpoint_type_opt->set_key(cdc::kCheckpointType);
checkpoint_type_opt->set_value(cdc::CDCCheckpointType_Name(cdc::IMPLICIT));
}
}

Status Visit(const CDCStreamId& stream_id, const SysCDCStreamEntryPB& metadata)
REQUIRES(catalog_manager_->mutex_) {
DCHECK(!ContainsKey(catalog_manager_->cdc_stream_map_, stream_id))
Expand Down Expand Up @@ -245,6 +273,10 @@ class CDCStreamLoader : public Visitor<PersistentCDCStreamInfo> {
auto l = stream->LockForWrite();
l.mutable_data()->pb.CopyFrom(metadata);

// If no source_type and checkpoint_type is present, that means the stream was created in
// a previous version where these options were not present.
AddDefaultValuesIfMissing(metadata, &l);

// If the table has been deleted, then mark this stream as DELETING so it can be deleted by the
// catalog manager background thread. Otherwise if this stream is missing an entry
// for state, then mark its state as Active.
Expand Down

0 comments on commit f9843dd

Please sign in to comment.