From f0be03e9fef44493dd8a13020388a7b45cc05cb9 Mon Sep 17 00:00:00 2001 From: Hari Krishna Sunder Date: Fri, 3 Jan 2025 18:55:37 -0800 Subject: [PATCH] [BACKPORT 2024.1][#25480] xCluster: Keep running xcluster task even if leader temporarily loses the lease Summary: Original commit: 91a5d5144fe82875480104d9d9c35c13e8f8e7b8 / D40988 On an xCluster target universe the yb-master leader computes the xCluster safe time every second. This task stops if the leadership is lost, and restarts again when a new leader is elected and sys catalog is reloaded. In some rare cases the master can lose the raft lease, but still retain its leadership. In this case we should keep running the xCluster task until we are certain that the leadership is lost. `SCOPED_LEADER_SHARED_LOCK` checks if we are the leader and lease is valid. This change uses `CatalogManager::CheckIsLeaderAndReady` to check if the leadership is still valid. Removing unnecessary `leader_term_` member. Jira: DB-14730 Test Plan: Jenkins ./yb_build.sh asan --cxx-test integration-tests_xcluster_ysql-test --gtest_filter XClusterYSqlTestConsistentTransactionsTest.AddServerIntraTransaction -n 40 --tp 1 Reviewers: sergei, jhe, slingam, xCluster Reviewed By: jhe Subscribers: ybase Differential Revision: https://phorge.dev.yugabyte.com/D41014 --- .../xcluster_safe_time_service-test.cc | 5 --- .../xcluster/xcluster_safe_time_service.cc | 45 +++++++++++-------- .../xcluster/xcluster_safe_time_service.h | 1 - 3 files changed, 26 insertions(+), 25 deletions(-) diff --git a/src/yb/master/xcluster/xcluster_safe_time_service-test.cc b/src/yb/master/xcluster/xcluster_safe_time_service-test.cc index bfec9a449a50..954d6953e649 100644 --- a/src/yb/master/xcluster/xcluster_safe_time_service-test.cc +++ b/src/yb/master/xcluster/xcluster_safe_time_service-test.cc @@ -295,11 +295,6 @@ TEST_F(XClusterSafeTimeServiceTest, ComputeSafeTimeWithFilters) { ASSERT_EQ(safe_time_service.safe_time_map_[db2], ht_invalid); ASSERT_EQ(safe_time_service.entries_to_delete_.size(), 0); - ASSERT_NOK(safe_time_service.GetXClusterSafeTimeForNamespace( - dummy_leader_term + 1, db1, XClusterSafeTimeFilter::NONE)); - ASSERT_NOK(safe_time_service.GetXClusterSafeTimeForNamespace( - dummy_leader_term - 1, db1, XClusterSafeTimeFilter::NONE)); - auto db1_none = ASSERT_RESULT(GetXClusterSafeTimeWithNoFilter(safe_time_service, db1)); auto db1_ddlqueue = ASSERT_RESULT(GetXClusterSafeTimeFilterOutDdlQueue(safe_time_service, db1)); auto db2_none = ASSERT_RESULT(GetXClusterSafeTimeWithNoFilter(safe_time_service, db2)); diff --git a/src/yb/master/xcluster/xcluster_safe_time_service.cc b/src/yb/master/xcluster/xcluster_safe_time_service.cc index 090e5d4d8dd1..fcb8b2c6376c 100644 --- a/src/yb/master/xcluster/xcluster_safe_time_service.cc +++ b/src/yb/master/xcluster/xcluster_safe_time_service.cc @@ -138,26 +138,35 @@ void XClusterSafeTimeService::ProcessTaskPeriodically() { } auto leader_term_result = GetLeaderTermFromCatalogManager(); - if (!leader_term_result.ok()) { - VLOG_WITH_FUNC(1) << "Going into idle mode due to master leader change"; - EnterIdleMode("master leader change"); - return; - } - int64_t leader_term = leader_term_result.get(); + if (leader_term_result.ok()) { + // Compute safe time now and also update the metrics. + bool further_computation_needed = true; + auto result = ComputeSafeTime(*leader_term_result, /* update_metrics */ true); + if (result.ok()) { + further_computation_needed = result.get(); + } else { + LOG(WARNING) << "Failure in XClusterSafeTime task: " << result; + } - // Compute safe time now and also update the metrics. - bool further_computation_needed = true; - auto result = ComputeSafeTime(leader_term, /* update_metrics */ true); - if (result.ok()) { - further_computation_needed = result.get(); + if (!further_computation_needed) { + VLOG_WITH_FUNC(1) << "Going into idle mode due to lack of work"; + EnterIdleMode("no more work left"); + return; + } } else { - LOG(WARNING) << "Failure in XClusterSafeTime task: " << result; - } + // We can fail to get the term due to transient errors like a stale lease. In these cases we can + // recover without losing the leadership or changing the term. So check again to see if we are + // the leader. + auto leader_status = catalog_manager_->CheckIsLeaderAndReady(); + if (!leader_status.ok()) { + LOG_WITH_FUNC(INFO) << "Going into idle mode due to master leader change: " << leader_status; + EnterIdleMode("master leader change"); + return; + } - if (!further_computation_needed) { - VLOG_WITH_FUNC(1) << "Going into idle mode due to lack of work"; - EnterIdleMode("no more work left"); - return; + YB_LOG_EVERY_N_SECS_OR_VLOG(INFO, 60, 1) + << "Skip xCluster safe time computation since this is not a healthy master leader: " + << leader_term_result.status(); } // Delay before before running the task again. @@ -229,7 +238,6 @@ Result XClusterSafeTimeService::GetXClusterSafeTimeForNamespace( const XClusterSafeTimeFilter& filter) { SharedLock lock(mutex_); SCHECK(safe_time_table_ready_, IllegalState, "Safe time table is not ready yet."); - SCHECK_EQ(leader_term_, leader_term, IllegalState, "Received unexpected leader term"); const XClusterNamespaceToSafeTimeMap& safe_time_map = VERIFY_RESULT(GetFilteredXClusterSafeTimeMap(filter)); @@ -477,7 +485,6 @@ Result XClusterSafeTimeService::ComputeSafeTime( // and setting the new config. Its important to make sure that the config we persist is accurate // as only that protects the safe time from going backwards. RETURN_NOT_OK(SetXClusterSafeTime(leader_term, namespace_safe_time_map)); - leader_term_ = leader_term; if (update_metrics) { // Update the metrics using the newly computed maps. diff --git a/src/yb/master/xcluster/xcluster_safe_time_service.h b/src/yb/master/xcluster/xcluster_safe_time_service.h index 068f5eb47918..d6ab0ed2b13d 100644 --- a/src/yb/master/xcluster/xcluster_safe_time_service.h +++ b/src/yb/master/xcluster/xcluster_safe_time_service.h @@ -154,7 +154,6 @@ class XClusterSafeTimeService { std::unique_ptr safe_time_table_; - int64_t leader_term_ GUARDED_BY(mutex_); int32_t cluster_config_version_ GUARDED_BY(mutex_); std::map producer_tablet_namespace_map_ GUARDED_BY(mutex_);