Skip to content

Commit

Permalink
Fix ranges & points mix in read actor (#13925)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 authored Jan 30, 2025
1 parent bb6255c commit ef83724
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 5 deletions.
9 changes: 7 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_executer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
}

std::sort(std::begin(shardsRanges), std::end(shardsRanges), [&](const TShardRangesWithShardId& lhs, const TShardRangesWithShardId& rhs) {
return CompareBorders<false, false>(
return CompareBorders<true, true>(
lhs.Ranges->GetRightBorder().first->GetCells(),
rhs.Ranges->GetRightBorder().first->GetCells(),
lhs.Ranges->GetRightBorder().second,
Expand Down Expand Up @@ -1312,8 +1312,13 @@ class TKqpExecuterBase : public TActor<TDerived> {
settings->SetShardIdHint(*shardsRangesForTask[0].ShardId);
}

bool hasRanges = false;
for (const auto& shardRanges : shardsRangesForTask) {
shardRanges.Ranges->SerializeTo(settings);
hasRanges |= shardRanges.Ranges->HasRanges();
}

for (const auto& shardRanges : shardsRangesForTask) {
shardRanges.Ranges->SerializeTo(settings, !hasRanges);
}
}
}
Expand Down
16 changes: 14 additions & 2 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,18 @@ TString TShardKeyRanges::ToString(const TVector<NScheme::TTypeInfo>& keyTypes, c
return sb;
}

bool TShardKeyRanges::HasRanges() const {
if (IsFullRange()) {
return true;
}
for (const auto& range : Ranges) {
if (std::holds_alternative<TSerializedTableRange>(range)) {
return true;
}
}
return false;
}

void TShardKeyRanges::SerializeTo(NKikimrTxDataShard::TKqpTransaction_TDataTaskMeta_TKeyRange* proto) const {
if (IsFullRange()) {
auto& protoRange = *proto->MutableFullRange();
Expand Down Expand Up @@ -724,12 +736,12 @@ void TShardKeyRanges::SerializeTo(NKikimrTxDataShard::TKqpTransaction_TScanTaskM
}
}

void TShardKeyRanges::SerializeTo(NKikimrTxDataShard::TKqpReadRangesSourceSettings* proto) const {
void TShardKeyRanges::SerializeTo(NKikimrTxDataShard::TKqpReadRangesSourceSettings* proto, bool allowPoints) const {
if (IsFullRange()) {
auto& protoRange = *proto->MutableRanges()->AddKeyRanges();
FullRange->Serialize(protoRange);
} else {
bool usePoints = true;
bool usePoints = allowPoints;
for (auto& range : Ranges) {
if (std::holds_alternative<TSerializedTableRange>(range)) {
usePoints = false;
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_tasks_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ struct TShardKeyRanges {
void MakeFullPoint(TSerializedCellVec&& range);
void MakeFull(TSerializedPointOrRange&& pointOrRange);

bool HasRanges() const;

bool IsFullRange() const { return FullRange.has_value(); }
TVector<TSerializedPointOrRange>& GetRanges() { return Ranges; }

Expand All @@ -164,7 +166,7 @@ struct TShardKeyRanges {
TString ToString(const TVector<NScheme::TTypeInfo>& keyTypes, const NScheme::TTypeRegistry& typeRegistry) const;
void SerializeTo(NKikimrTxDataShard::TKqpTransaction_TDataTaskMeta_TKeyRange* proto) const;
void SerializeTo(NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta_TReadOpMeta* proto) const;
void SerializeTo(NKikimrTxDataShard::TKqpReadRangesSourceSettings* proto) const;
void SerializeTo(NKikimrTxDataShard::TKqpReadRangesSourceSettings* proto, bool allowPoints = true) const;

std::pair<const TSerializedCellVec*, bool> GetRightBorder() const;
};
Expand Down
52 changes: 52 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5720,6 +5720,58 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
CompareYson(R"([[2u]])", FormatResultSetYson(result.GetResultSet(0)));
}
}

Y_UNIT_TEST(ReadManyRangesAndPoints) {
NKikimrConfig::TAppConfig appConfig;
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetWithSampleTables(false);

TKikimrRunner kikimr(settings);
Tests::NCommon::TLoggerInit(kikimr).Initialize();

auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();

const TString query = R"(
CREATE TABLE `/Root/DataShard` (
Col1 String,
Col2 String,
PRIMARY KEY (Col1)
)
WITH (
STORE = ROW,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10,
PARTITION_AT_KEYS = (("a"), ("b"), ("c"), ("d"), ("e"), ("f"), ("g"), ("h"), ("i"))
);
)";

auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

auto client = kikimr.GetQueryClient();

{
auto prepareResult = client.ExecuteQuery(R"(
UPSERT INTO `/Root/DataShard` (Col1, Col2) VALUES ("a", "a") , ("c", "c"), ("e", "e");
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
}

{
auto result = client.ExecuteQuery(R"(
SELECT Col2 FROM `/Root/DataShard` WHERE
('f' <= Col1 AND Col1 <= 'g') OR
('h' <= Col1 AND Col1 <= 'i') OR
('j' <= Col1 AND Col1 <= 'k') OR
('l' <= Col1 AND Col1 <= 'm') OR
Col1 == "a" OR
Col1 == "c" OR
Col1 == "e" OR
Col1 == "";
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
}
}

} // namespace NKqp
Expand Down
28 changes: 28 additions & 0 deletions ydb/core/scheme/scheme_ranges_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,34 @@ Y_UNIT_TEST_SUITE(SchemeRanges) {
UNIT_ASSERT_EQUAL(CompareRanges(first, second, types), 0);
UNIT_ASSERT_EQUAL(CompareRanges(second, first, types), 0);
}

{
TTableRange first(secondLeft, true, secondLeft, true);
TTableRange second(secondLeft, true, sedondRight, false);
UNIT_ASSERT_EQUAL(CompareRanges(first, second, types), 0);
UNIT_ASSERT_EQUAL(CompareRanges(second, first, types), 0);
}

{
TTableRange first(secondLeft, true, secondLeft, true);
TTableRange second(secondLeft, false, sedondRight, true);
UNIT_ASSERT_EQUAL(CompareRanges(first, second, types), -1);
UNIT_ASSERT_EQUAL(CompareRanges(second, first, types), 1);
}

{
TTableRange first(sedondRight, true, sedondRight, true);
TTableRange second(secondLeft, false, sedondRight, true);
UNIT_ASSERT_EQUAL(CompareRanges(first, second, types), 0);
UNIT_ASSERT_EQUAL(CompareRanges(second, first, types), 0);
}

{
TTableRange first(sedondRight, true, sedondRight, true);
TTableRange second(secondLeft, true, sedondRight, false);
UNIT_ASSERT_EQUAL(CompareRanges(first, second, types), 1);
UNIT_ASSERT_EQUAL(CompareRanges(second, first, types), -1);
}
}

Y_UNIT_TEST(CmpBorders) {
Expand Down

0 comments on commit ef83724

Please sign in to comment.