From 0585c935ab8ad2e61086448a45bc918c9019e6fd Mon Sep 17 00:00:00 2001 From: Siddharth Shah Date: Wed, 4 Dec 2024 11:34:14 +0530 Subject: [PATCH] [BACKPORT 2024.2.0][#24829] Revert "[BACKPORT 2024.2.0][#24829] CDC: Fix GetChanges WAL segment reading and computation of safe hybrid time for GetChanges resposne" Summary: This reverts commit 60174a3ca10e01a16024856bd5f4a05f1aaf78c5 on 2024.2.0 as it is planned for 2024.2.0.1. Jira: DB-13935 Test Plan: Jenkins: compile only Reviewers: skumar, sumukh.phalgaonkar Reviewed By: skumar Subscribers: ycdcxcluster Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D40449 --- src/yb/cdc/cdcsdk_producer.cc | 82 ++++------------- src/yb/consensus/consensus_queue.cc | 91 ++++++++----------- src/yb/consensus/consensus_queue.h | 3 +- src/yb/consensus/raft_consensus.cc | 5 +- src/yb/consensus/raft_consensus.h | 3 +- src/yb/integration-tests/cdcsdk_ysql-test.cc | 82 ----------------- .../integration-tests/cdcsdk_ysql_test_base.h | 1 - 7 files changed, 59 insertions(+), 208 deletions(-) diff --git a/src/yb/cdc/cdcsdk_producer.cc b/src/yb/cdc/cdcsdk_producer.cc index 02edcfa9e0ba..cca1899820f1 100644 --- a/src/yb/cdc/cdcsdk_producer.cc +++ b/src/yb/cdc/cdcsdk_producer.cc @@ -92,7 +92,7 @@ DEFINE_NON_RUNTIME_int64( "ConsistentStreamSafeTime for CDCSDK by resolving all committed intetns"); DEFINE_RUNTIME_bool(cdc_read_wal_segment_by_segment, - true, + false, "When this flag is set to true, GetChanges will read the WAL segment by " "segment. If valid records are found in the first segment, GetChanges will " "return these records in response. If no valid records are found then next " @@ -2007,14 +2007,13 @@ Status GetConsistentWALRecords( bool* wait_for_wal_update, OpId* last_seen_op_id, int64_t& last_readable_opid_index, const int64_t& safe_hybrid_time_req, const CoarseTimePoint& deadline, std::vector>* consistent_wal_records, - std::vector>* all_checkpoints, - HybridTime* last_read_wal_op_record_time, bool* is_entire_wal_read) { + std::vector>* all_checkpoints) { VLOG(2) << "Getting consistent WAL records. safe_hybrid_time_req: " << safe_hybrid_time_req << ", consistent_safe_time: " << *consistent_safe_time << ", last_seen_op_id: " << last_seen_op_id->ToString() << ", historical_max_op_id: " << historical_max_op_id; auto raft_consensus = VERIFY_RESULT(tablet_peer->GetRaftConsensus()); - HybridTime last_read_segment_footer_safe_time = HybridTime::kInvalid; + bool read_entire_wal = false; do { consensus::ReadOpsResult read_ops; @@ -2028,10 +2027,16 @@ Status GetConsistentWALRecords( // end of the WAL. read_ops = VERIFY_RESULT(raft_consensus->ReadReplicatedMessagesInSegmentForCDC( *last_seen_op_id, deadline, /* fetch_single_entry */ false, &last_readable_opid_index, - &consistent_stream_safe_time_footer, is_entire_wal_read)); + &consistent_stream_safe_time_footer)); + if (!consistent_stream_safe_time_footer) { + // HybridTime::kInvalid in consistent_stream_safe_time_footer indicates that we have read + // the currently active segment. + read_entire_wal = true; + consistent_stream_safe_time_footer = HybridTime(*consistent_safe_time); + } } else { - *is_entire_wal_read = true; + read_entire_wal = true; // Read all the committed WAL messages with hybrid time <= consistent_stream_safe_time. If // there exist messages in the WAL which are replicated but not yet committed, // ReadReplicatedMessagesForConsistentCDC waits for them to get committed and eventually @@ -2048,7 +2053,6 @@ Status GetConsistentWALRecords( for (const auto& msg : read_ops.messages) { last_seen_op_id->term = msg->id().term(); last_seen_op_id->index = msg->id().index(); - *last_read_wal_op_record_time = HybridTime(msg->hybrid_time()); if (IsIntent(msg) || (IsUpdateTransactionOp(msg) && msg->transaction_state().status() != TransactionStatus::APPLYING)) { @@ -2065,8 +2069,7 @@ Status GetConsistentWALRecords( << ", commit_time: " << GetTransactionCommitTime(msg) << ", consistent safe_time: " << *consistent_safe_time << ", consistent_stream_safe_time_footer: " << consistent_stream_safe_time_footer - << ", safe_hybrid_time_req: " << safe_hybrid_time_req - << ", is_entire_wal_read: " << *is_entire_wal_read; + << ", safe_hybrid_time_req: " << safe_hybrid_time_req; } else if (VLOG_IS_ON(3)) { VLOG(3) << "Read WAL msg on " << "tablet_id: " << tablet_peer->tablet_id() << ", op_type: " << msg->op_type() @@ -2074,8 +2077,7 @@ Status GetConsistentWALRecords( << ", commit_time: " << GetTransactionCommitTime(msg) << ", consistent safe_time: " << *consistent_safe_time << ", consistent_stream_safe_time_footer: " << consistent_stream_safe_time_footer - << ", safe_hybrid_time_req: " << safe_hybrid_time_req - << ", is_entire_wal_read: " << *is_entire_wal_read; + << ", safe_hybrid_time_req: " << safe_hybrid_time_req; } all_checkpoints->push_back(msg); @@ -2089,7 +2091,7 @@ Status GetConsistentWALRecords( // Handle the case where WAL doesn't have the apply record for all the committed transactions. if (historical_max_op_id.valid() && historical_max_op_id > *last_seen_op_id && - *is_entire_wal_read) { + read_entire_wal) { *wait_for_wal_update = true; break; } @@ -2103,36 +2105,24 @@ Status GetConsistentWALRecords( SortConsistentWALRecords(consistent_wal_records); - // For closed segments, consistent_stream_safe_time_footer corresponds to the value read from - // segment footer. For active segment, it will be Invalid. - if (FLAGS_cdc_read_wal_segment_by_segment && consistent_stream_safe_time_footer.is_valid()) { - last_read_segment_footer_safe_time = consistent_stream_safe_time_footer; - } - if (!consistent_wal_records->empty()) { auto record = consistent_wal_records->front(); if (FLAGS_cdc_read_wal_segment_by_segment && GetTransactionCommitTime(record) <= consistent_stream_safe_time_footer.ToUint64()) { // Since there exists atleast one message with commit_time <= consistent_stream_safe_time, // we don't need to read the next segment. + *consistent_safe_time = consistent_stream_safe_time_footer.ToUint64(); break; } } // No need for another iteration if we have read the entire WAL. - if (*is_entire_wal_read) { + if (read_entire_wal) { break; } } while (last_seen_op_id->index < last_readable_opid_index); - // Skip updating consistent safe time when entire WAL is read and we can ship all records - // till the consistent safe time computed in cdc producer. - if (FLAGS_cdc_read_wal_segment_by_segment && !(*is_entire_wal_read) && - last_read_segment_footer_safe_time.is_valid()) { - *consistent_safe_time = last_read_segment_footer_safe_time.ToUint64(); - } - VLOG_WITH_FUNC(1) << "Got a total of " << consistent_wal_records->size() << " WAL records " << "in the current segment"; return Status::OK(); @@ -2495,22 +2485,6 @@ bool IsReplicationSlotStream(const StreamMetadata& stream_metadata) { !stream_metadata.GetReplicationSlotName()->empty(); } -// Response safe time follows the invaraint: -// Request safe time <= Response safe time <= value from GetConsistentStreamSafeTime(). -// If response safe time is set to GetConsistentStreamSafeTime()'s value, then it implies that we -// have read the entire WAL. In any other case, the response safe time can either be the last read -// WAL segment's footer safe time ('min_start_time_running_txns') or commit time of the last -// transaction being shipped in the current response. Both these values (footer safe time or commit -// time of last txn) will be <= last read WAL OP's record time. -bool CheckResponseSafeTimeCorrectness( - HybridTime last_read_wal_op_record_time, HybridTime resp_safe_time, bool is_entire_wal_read) { - if (!last_read_wal_op_record_time.is_valid() || resp_safe_time <= last_read_wal_op_record_time) { - return true; - } - - return is_entire_wal_read; -} - // CDC get changes is different from xCluster as it doesn't need // to read intents from WAL. @@ -2571,8 +2545,6 @@ Status GetChangesForCDCSDK( auto safe_hybrid_time_resp = HybridTime::kInvalid; HaveMoreMessages have_more_messages(false); - HybridTime last_read_wal_op_record_time = HybridTime::kInvalid; - bool is_entire_wal_read = false; // It is snapshot call. if (from_op_id.write_id() == -1) { snapshot_operation = true; @@ -2600,8 +2572,7 @@ Status GetChangesForCDCSDK( RETURN_NOT_OK(GetConsistentWALRecords( tablet_peer, mem_tracker, msgs_holder, &consumption, &consistent_stream_safe_time, historical_max_op_id, &wait_for_wal_update, &last_seen_op_id, *last_readable_opid_index, - safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints, - &last_read_wal_op_record_time, &is_entire_wal_read)); + safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints)); else // 'skip_intents' is true here because we want the first transaction to be the partially // streamed transaction. @@ -2695,8 +2666,7 @@ Status GetChangesForCDCSDK( RETURN_NOT_OK(GetConsistentWALRecords( tablet_peer, mem_tracker, msgs_holder, &consumption, &consistent_stream_safe_time, historical_max_op_id, &wait_for_wal_update, &last_seen_op_id, *last_readable_opid_index, - safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints, - &last_read_wal_op_record_time, &is_entire_wal_read)); + safe_hybrid_time_req, deadline, &wal_records, &all_checkpoints)); else // 'skip_intents' is false otherwise in case the complete wal segment is filled with // intents we will break the loop thinking that WAL has no more records. @@ -3080,25 +3050,11 @@ Status GetChangesForCDCSDK( // If we need to wait for WAL to get up to date with all committed transactions, we will send the // request safe in the response as well. - auto computed_safe_hybrid_time_req = - HybridTime((safe_hybrid_time_req > 0) ? safe_hybrid_time_req : 0); auto safe_time = wait_for_wal_update - ? computed_safe_hybrid_time_req + ? HybridTime((safe_hybrid_time_req > 0) ? safe_hybrid_time_req : 0) : GetCDCSDKSafeTimeForTarget( leader_safe_time.get(), safe_hybrid_time_resp, have_more_messages, consistent_stream_safe_time, snapshot_operation); - - if (!snapshot_operation && !CheckResponseSafeTimeCorrectness( - last_read_wal_op_record_time, safe_time, is_entire_wal_read)) { - LOG(WARNING) << "Stream_id: " << stream_id << ", tablet_id: " << tablet_id - << ", response safe time: " << safe_time - << " is greater than last read WAL OP's record time: " - << last_read_wal_op_record_time - << ", req_safe_time: " << computed_safe_hybrid_time_req - << ", consistent stream safe time: " << HybridTime(consistent_stream_safe_time) - << ", leader safe time: " << leader_safe_time.get() - << ", is_entire_wal_read: " << is_entire_wal_read; - } resp->set_safe_hybrid_time(safe_time.ToUint64()); // It is possible in case of a partially streamed transaction. diff --git a/src/yb/consensus/consensus_queue.cc b/src/yb/consensus/consensus_queue.cc index 0b714f4bf980..783d6b576ce6 100644 --- a/src/yb/consensus/consensus_queue.cc +++ b/src/yb/consensus/consensus_queue.cc @@ -877,77 +877,58 @@ Result PeerMessageQueue::ReadReplicatedMessagesForConsistentCDC( Result PeerMessageQueue::ReadReplicatedMessagesInSegmentForCDC( const OpId& from_op_id, CoarseTimePoint deadline, bool fetch_single_entry, - int64_t* last_committed_index, HybridTime* consistent_stream_safe_time_footer, - bool* read_entire_wal) { + int64_t* last_committed_index, HybridTime* consistent_stream_safe_time_footer) { auto read_ops = ReadOpsResult(); - int64_t start_op_id_index; - int64_t current_segment_num; - int64_t segment_last_index; - int64_t committed_op_id_index; - int64_t last_replicated_op_id_index; - do { - // We wait till committed_op_id_index becomes >= last index of the segment that contains the - // from_op_id. If we reach deadline, then return have_more_messages as true to wait for wal - // update. - if (deadline - CoarseMonoClock::Now() <= FLAGS_cdcsdk_wal_reads_deadline_buffer_secs * 1s) { - read_ops.have_more_messages = HaveMoreMessages(true); - return read_ops; - } - - std::tie(committed_op_id_index, last_replicated_op_id_index) = - GetCommittedAndMajorityReplicatedIndex(); + auto [committed_op_id_index, last_replicated_op_id_index] = + GetCommittedAndMajorityReplicatedIndex(); - // Determine if there are pending operations in RAFT but not yet LogCache. - auto pending_messages = committed_op_id_index != last_replicated_op_id_index; + // Determine if there are pending operations in RAFT but not yet LogCache. + auto pending_messages = committed_op_id_index != last_replicated_op_id_index; - if (last_committed_index) { - *last_committed_index = committed_op_id_index; - } + if (last_committed_index) { + *last_committed_index = committed_op_id_index; + } - if (from_op_id.index >= committed_op_id_index && !fetch_single_entry) { - // Nothing to read. - return ReadOpsResult{ - .messages = ReplicateMsgs(), - .preceding_op = OpId(), - .have_more_messages = HaveMoreMessages(pending_messages)}; - } + if (from_op_id.index >= committed_op_id_index && !fetch_single_entry) { + // Nothing to read. + return ReadOpsResult { + .messages = ReplicateMsgs(), + .preceding_op = OpId(), + .have_more_messages = HaveMoreMessages(pending_messages) + }; + } - start_op_id_index = GetStartOpIdIndex(from_op_id.index); + auto start_op_id_index = GetStartOpIdIndex(from_op_id.index); - VLOG(1) << "Will read Ops from a WAL segment for tablet: " << tablet_id_ + VLOG(1) << "Will read Ops from a WAL segment. " << " start_op_id_index = " << start_op_id_index << " committed_op_id_index = " << committed_op_id_index << " last_replicated_op_id_index = " << last_replicated_op_id_index; - auto current_segment_num_result = log_cache_.LookupOpWalSegmentNumber(start_op_id_index); - if (!current_segment_num_result.ok() || - (*current_segment_num_result == log_cache_.GetActiveSegmentNumber())) { + auto current_segment_num_result = log_cache_.LookupOpWalSegmentNumber(start_op_id_index); + if (!current_segment_num_result.ok() || + (*current_segment_num_result == log_cache_.GetActiveSegmentNumber())) { + // Read entire WAL. + return ReadReplicatedMessagesForConsistentCDC( + from_op_id, HybridTime::kInvalid.ToUint64(), deadline, fetch_single_entry); + } + + auto current_segment_num = *current_segment_num_result; + auto segment_last_index = + VERIFY_RESULT(log_cache_.GetMaxReplicateIndexFromSegmentFooter(current_segment_num)); + + // Nothing to read in this segment, read the next segment. + if (start_op_id_index == segment_last_index) { + current_segment_num = VERIFY_RESULT(log_cache_.LookupOpWalSegmentNumber(start_op_id_index + 1)); + if (current_segment_num == log_cache_.GetActiveSegmentNumber()) { // Read entire WAL. - *read_entire_wal = true; return ReadReplicatedMessagesForConsistentCDC( from_op_id, HybridTime::kInvalid.ToUint64(), deadline, fetch_single_entry); } - - current_segment_num = *current_segment_num_result; segment_last_index = VERIFY_RESULT(log_cache_.GetMaxReplicateIndexFromSegmentFooter(current_segment_num)); - - // Nothing to read in this segment, read the next segment. - if (start_op_id_index == segment_last_index) { - current_segment_num = - VERIFY_RESULT(log_cache_.LookupOpWalSegmentNumber(start_op_id_index + 1)); - if (current_segment_num == log_cache_.GetActiveSegmentNumber()) { - // Read entire WAL. - *read_entire_wal = true; - return ReadReplicatedMessagesForConsistentCDC( - from_op_id, HybridTime::kInvalid.ToUint64(), deadline, fetch_single_entry); - } - segment_last_index = - VERIFY_RESULT(log_cache_.GetMaxReplicateIndexFromSegmentFooter(current_segment_num)); - } - } while (segment_last_index > committed_op_id_index); - + } auto consistent_stream_safe_time = VERIFY_RESULT(log_cache_.GetMinStartTimeRunningTxnsFromSegmentFooter(current_segment_num)); @@ -955,7 +936,7 @@ Result PeerMessageQueue::ReadReplicatedMessagesInSegmentForCDC( *consistent_stream_safe_time_footer = consistent_stream_safe_time; } - VLOG(1) << "Reading a new WAL segment for tablet: " << tablet_id_ << " Segment info:" + VLOG(1) << "Reading a new WAL segment. Segment info:" << " current_segment_num = " << current_segment_num << " active_segment_num = " << log_cache_.GetActiveSegmentNumber() << " segment_last_index = " << segment_last_index diff --git a/src/yb/consensus/consensus_queue.h b/src/yb/consensus/consensus_queue.h index a1d8f830b1e7..2617085fd694 100644 --- a/src/yb/consensus/consensus_queue.h +++ b/src/yb/consensus/consensus_queue.h @@ -408,8 +408,7 @@ class PeerMessageQueue { Result ReadReplicatedMessagesInSegmentForCDC( const OpId& from_op_id, CoarseTimePoint deadline, bool fetch_single_entry = false, int64_t* last_committed_index = nullptr, - HybridTime* consistent_stream_safe_time_footer = nullptr, - bool* read_entire_wal = nullptr); + HybridTime* consistent_stream_safe_time_footer = nullptr); void UpdateCDCConsumerOpId(const yb::OpId& op_id); diff --git a/src/yb/consensus/raft_consensus.cc b/src/yb/consensus/raft_consensus.cc index b4a53181dec9..d1f8d99e43c0 100644 --- a/src/yb/consensus/raft_consensus.cc +++ b/src/yb/consensus/raft_consensus.cc @@ -3653,11 +3653,10 @@ Result RaftConsensus::ReadReplicatedMessagesForConsistentCDC( Result RaftConsensus::ReadReplicatedMessagesInSegmentForCDC( const OpId& from_op_id, CoarseTimePoint deadline, bool fetch_single_entry, - int64_t* last_committed_index, HybridTime* consistent_stream_safe_time_footer, - bool* read_entire_wal) { + int64_t* last_committed_index, HybridTime* consistent_stream_safe_time_footer) { return queue_->ReadReplicatedMessagesInSegmentForCDC( from_op_id, deadline, fetch_single_entry, last_committed_index, - consistent_stream_safe_time_footer, read_entire_wal); + consistent_stream_safe_time_footer); } void RaftConsensus::UpdateCDCConsumerOpId(const yb::OpId& op_id) { diff --git a/src/yb/consensus/raft_consensus.h b/src/yb/consensus/raft_consensus.h index cfef2bc6dd29..5ffbb0240926 100644 --- a/src/yb/consensus/raft_consensus.h +++ b/src/yb/consensus/raft_consensus.h @@ -293,8 +293,7 @@ class RaftConsensus : public std::enable_shared_from_this, CoarseTimePoint deadline, bool fetch_single_entry = false, int64_t* last_committed_index = nullptr, - HybridTime* consistent_stream_safe_time_footer = nullptr, - bool* read_entire_wal = nullptr); + HybridTime* consistent_stream_safe_time_footer = nullptr); void UpdateCDCConsumerOpId(const yb::OpId& op_id) override; diff --git a/src/yb/integration-tests/cdcsdk_ysql-test.cc b/src/yb/integration-tests/cdcsdk_ysql-test.cc index 4f8187e300d2..c92c76f79632 100644 --- a/src/yb/integration-tests/cdcsdk_ysql-test.cc +++ b/src/yb/integration-tests/cdcsdk_ysql-test.cc @@ -10043,88 +10043,6 @@ TEST_F(CDCSDKYsqlTest, TestWithMajorityReplicatedButNonCommittedSingleShardTxn) ASSERT_GE(total, 12); } -TEST_F(CDCSDKYsqlTest, TestWithMajorityReplicatedButNonCommittedMultiShardTxn) { - ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true; - constexpr int num_tservers = 1; - ASSERT_OK(SetUpWithParams(num_tservers, /* num_masters */ 1, false)); - - constexpr auto num_tablets = 1; - auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); - ASSERT_OK(conn.ExecuteFormat("CREATE TABLE test1(id1 int primary key) SPLIT INTO 1 tablets;")); - auto table = ASSERT_RESULT(GetTable(&test_cluster_, kNamespaceName, "test1")); - google::protobuf::RepeatedPtrField tablets; - ASSERT_OK(test_client()->GetTablets( - table, 0, &tablets, - /* partition_list_version =*/nullptr)); - ASSERT_EQ(tablets.size(), num_tablets); - - const auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStream()); - const auto& tablet_id = tablets.Get(0).tablet_id(); - - auto checkpoint_result = ASSERT_RESULT(GetCDCSDKSnapshotCheckpoint(stream_id, tablet_id)); - // Switch to streaming directly. - checkpoint_result.set_write_id(0); - - constexpr int num_inserts = 10; - LOG(INFO) << "Starting txn"; - ASSERT_OK(conn.Execute("BEGIN")); - for (int i = 0; i < num_inserts; i++) { - ASSERT_OK(conn.ExecuteFormat("INSERT INTO test1 VALUES ($0)", i)); - } - - // Explicitly rollover so that the UPDATE_TXN_OP of txn1 goes into the next segment. - log::SegmentSequence segments; - for (const auto& peer : test_cluster()->GetTabletPeers(num_tservers - 1)) { - if (peer->tablet_id() != tablet_id) { - continue; - } - auto tablet = ASSERT_RESULT(peer->shared_tablet_safe()); - ASSERT_OK(peer->log()->AllocateSegmentAndRollOver()); - ASSERT_OK(peer->log()->GetSegmentsSnapshot(&segments)); - ASSERT_EQ(segments.size(), 2); - } - - uint64_t min_start_time_running_txns; - const log::ReadableLogSegmentPtr& last_segment = ASSERT_RESULT(segments.back()); - for (const auto& segment : segments) { - // All segments except for the last should have a footer. - if (&segment == &last_segment) { - continue; - } - ASSERT_TRUE(segment->HasFooter()); - ASSERT_TRUE(segment->footer().has_min_start_time_running_txns()); - min_start_time_running_txns = segment->footer().min_start_time_running_txns(); - } - - ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_stop_committed_op_id_updation) = true; - - ASSERT_OK(conn.Execute("COMMIT")); - - // DDL record will be read but it will be filtered due to commit time threshold. - auto change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &checkpoint_result)); - ASSERT_EQ(change_resp.cdc_sdk_proto_records_size(), 0); - - // Segment-1 will be read till the end but no WAL OPs relevant for CDC would be found. - auto change_resp2 = ASSERT_RESULT(GetChangesFromCDC( - stream_id, tablets, &change_resp.cdc_sdk_checkpoint(), 0, change_resp.safe_hybrid_time(), - change_resp.wal_segment_index())); - ASSERT_EQ(change_resp2.cdc_sdk_proto_records_size(), 0); - // safe time received in the response should match with the footer value of segment-1. - ASSERT_EQ(change_resp2.safe_hybrid_time(), min_start_time_running_txns); - - ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_stop_committed_op_id_updation) = false; - // Perform another txn so that committed_op_id gets updated. - ASSERT_OK(conn.Execute("BEGIN")); - ASSERT_OK(conn.ExecuteFormat("INSERT INTO test1 VALUES (10)")); - ASSERT_OK(conn.Execute("COMMIT")); - - auto change_resp3 = ASSERT_RESULT(GetChangesFromCDC( - stream_id, tablets, &change_resp2.cdc_sdk_checkpoint(), 0, change_resp2.safe_hybrid_time(), - change_resp2.wal_segment_index())); - // 1 DDL + Txn1 (B + 10 inserts + C) + Txn2 (B + 1 insert + C) - ASSERT_EQ(change_resp3.cdc_sdk_proto_records_size(), 16); -} - TEST_F(CDCSDKYsqlTest, TestCleanupOfTableNotOfInterest) { ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_cdc_consistent_snapshot_streams) = true; diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.h b/src/yb/integration-tests/cdcsdk_ysql_test_base.h index 3aea6bd21bb6..f60987846b6c 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.h +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.h @@ -26,7 +26,6 @@ #include "yb/client/schema.h" #include "yb/client/table_handle.h" #include "yb/client/transaction.h" -#include "yb/consensus/log.h" #include "yb/master/catalog_manager_if.h" #include "yb/tablet/transaction_participant.h"