Skip to content

Commit

Permalink
[BACKPORT 2.20][#23700] CDCSDK: Use leader epoch instead of leader te…
Browse files Browse the repository at this point in the history
…rm in table removal bg task

Summary:
**Backport description:**
No merge conflicts

**Original description:**
Original commit: 7a4b409 / D38155
Catalog manager background thread processes tables that needs to be removed from CDC streams. As part of the removal, we modify the stream metadata and persist in sys-catalog table. While persisting, we were using the leader ready term instead of the leader epoch fetched at the start of the background thread which has been fixed now.
Jira: DB-12608

Test Plan:
Jenkins: urgent
Existing cdc ctests

Reviewers: xCluster, hsunder, skumar, sumukh.phalgaonkar

Reviewed By: skumar

Subscribers: ycdcxcluster, ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D38216
  • Loading branch information
siddharth2411 committed Sep 20, 2024
1 parent 8d4f15a commit e1a3aca
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -3205,7 +3205,7 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
const std::unordered_set<TableId>& tables_in_stream_metadata);

Status RemoveTableFromCDCStreamMetadataAndMaps(
const CDCStreamInfoPtr stream, const TableId table_id);
const CDCStreamInfoPtr stream, const TableId table_id, const LeaderEpoch& epoch);

// Should be bumped up when tablet locations are changed.
std::atomic<uintptr_t> tablet_locations_version_{0};
Expand Down
7 changes: 3 additions & 4 deletions src/yb/master/xrepl_catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2279,7 +2279,7 @@ Status CatalogManager::ProcessTablesToBeRemovedFromCDCSDKStreams(
"ProcessTablesToBeRemovedFromCDCSDKStreams::StartRemovalFromQualifiedTableList");

if (!FLAGS_TEST_cdcsdk_skip_table_removal_from_qualified_list) {
Status status = RemoveTableFromCDCStreamMetadataAndMaps(stream, table_id);
Status status = RemoveTableFromCDCStreamMetadataAndMaps(stream, table_id, epoch);
if (!status.ok()) {
LOG(WARNING) << "Encountered error while trying to remove table " << table_id
<< " from qualified table list of stream " << stream_id << " and maps. - "
Expand Down Expand Up @@ -8488,7 +8488,7 @@ Result<std::vector<cdc::CDCStateTableEntry>> CatalogManager::SyncCDCStateTableEn
}

Status CatalogManager::RemoveTableFromCDCStreamMetadataAndMaps(
const CDCStreamInfoPtr stream, const TableId table_id) {
const CDCStreamInfoPtr stream, const TableId table_id, const LeaderEpoch& epoch) {
// Remove the table from the CDC stream metadata & cdcsdk_tables_to_stream_map_ and persist
// the updated metadata.
{
Expand All @@ -8505,8 +8505,7 @@ Status CatalogManager::RemoveTableFromCDCStreamMetadataAndMaps(
LOG_WITH_FUNC(INFO) << "Removing table " << table_id
<< " from qualified table list of CDC stream " << stream->id();
RETURN_ACTION_NOT_OK(
sys_catalog_->Upsert(leader_ready_term(), stream),
"Updating CDC streams in system catalog");
sys_catalog_->Upsert(epoch, stream), "Updating CDC streams in system catalog");
}

ltm.Commit();
Expand Down

0 comments on commit e1a3aca

Please sign in to comment.