Skip to content

Commit

Permalink
fix flaky tests in ydb workload (#14035)
Browse files Browse the repository at this point in the history
  • Loading branch information
a-serebryanskiy authored Feb 3, 2025
1 parent c16cc5d commit 3f816c5
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 118 deletions.
8 changes: 5 additions & 3 deletions ydb/apps/ydb/ut/workload-topic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <util/system/env.h>
#include <util/system/shellcommand.h>

namespace {
class TFixture : public NUnitTest::TBaseFixture {

public:
Expand Down Expand Up @@ -184,9 +185,9 @@ Y_UNIT_TEST(Full_Statistics_UseTx)
const TVector<TString> expectedSubheaders = {"#", "msg/s", "MB/s", "percentile,ms", "percentile,msg", "msg/s", "MB/s", "percentile,ms", "percentile,ms", "percentile,ms", "percentile,ms"};
TVector<TString> values = ParseStatisticsLine(lines[2]);
UnitAssertColumnsOrder(lines[1], expectedSubheaders);
// assert there are correct values in output
// messages per second
UNIT_ASSERT_EQUAL_C(values[1], "5", "Messages per second differs from expected: expected = " << 5 << ", got = " << values[1]);
// assert there are correct values in output messages per second
UNIT_ASSERT_GT(std::stoi(values[1]), 1);
UNIT_ASSERT_LE(std::stoi(values[1]), 5);
}

Y_UNIT_TEST(WriteInTx)
Expand Down Expand Up @@ -242,3 +243,4 @@ Y_UNIT_TEST(WriteProducesToAllPartitionsEvenly)
}

}
} // anonymous namespace
278 changes: 163 additions & 115 deletions ydb/apps/ydb/ut/workload-transfer-topic-to-table.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "run_ydb.h"
#include <iostream>

#include <util/string/split.h>

Expand All @@ -8,97 +9,160 @@
#include <ydb-cpp-sdk/client/topic/client.h>
#include <ydb-cpp-sdk/client/table/table.h>

Y_UNIT_TEST_SUITE(YdbWorkloadTransferTopicToTable) {

struct TTopicConfigMatcher {
TString Name = "transfer-topic";
ui32 Partitions = 128;
ui32 Consumers = 1;
};

struct TTableConfigMatcher {
TString Name = "transfer-table";
ui32 Partitions = 128;
namespace {
class TFixture : public NUnitTest::TBaseFixture {


public:
TString TopicName;
TString TableName;

public:
void SetUp(NUnitTest::TTestContext&) override {
// we need to generate random topic name and table name, cause in CI tests are executed parallely against one common database
TopicName = GenerateTopicName();;
TableName = GenerateTableName();

try {
ExecYdb({"init", "--topic", TopicName, "--table", TableName});
} catch (const yexception) {
// ignore errors
}
}

void TearDown(NUnitTest::TTestContext&) override {
try {
ExecYdb({"clean", "--topic", TopicName, "--table", TableName});
} catch (const yexception) {
// ignore errors
}
}

struct TTopicConfigMatcher {
TString Name;
ui32 Partitions;
ui32 Consumers;
};

struct TTableConfigMatcher {
TString Name;
ui32 Partitions;
};

NYdb::NTable::TSession GetSession(NYdb::NTable::TTableClient& client)
{
auto result = client.GetSession().GetValueSync();
return result.GetSession();
}

void ExpectTopic(const TTopicConfigMatcher& matcher)
{
NYdb::TDriverConfig config;
config.SetEndpoint(GetYdbEndpoint());
config.SetDatabase(GetYdbDatabase());

NYdb::TDriver driver(config);
NYdb::NTopic::TTopicClient client(driver);

auto result = client.DescribeTopic(matcher.Name).GetValueSync();
if (result.GetStatus() == NYdb::EStatus::SCHEME_ERROR) {
UNIT_ASSERT_VALUES_EQUAL(0, matcher.Partitions);
UNIT_ASSERT_VALUES_EQUAL(0, matcher.Consumers);
} else {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);

auto& description = result.GetTopicDescription();

UNIT_ASSERT_VALUES_EQUAL(description.GetPartitions().size(), matcher.Partitions);
UNIT_ASSERT_VALUES_EQUAL(description.GetConsumers().size(), matcher.Consumers);
}
}

void ExpectTable(const TTableConfigMatcher& matcher)
{
NYdb::TDriverConfig config;
config.SetEndpoint(GetYdbEndpoint());
config.SetDatabase(GetYdbDatabase());

NYdb::TDriver driver(config);
NYdb::NTable::TTableClient client(driver);
auto session = GetSession(client);

NYdb::NTable::TDescribeTableSettings options;
options.WithTableStatistics(true);

auto result = session.DescribeTable("/" + GetYdbDatabase() + "/" + matcher.Name,
options).GetValueSync();
if (result.GetStatus() == NYdb::EStatus::SCHEME_ERROR) {
UNIT_ASSERT_VALUES_EQUAL(0, matcher.Partitions);
} else {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);

auto description = result.GetTableDescription();

UNIT_ASSERT_VALUES_EQUAL(description.GetPartitionsCount(), matcher.Partitions);
}
}

TString ExecYdb(const TList<TString>& args, bool checkExitCode = true)
{
//
// ydb -e grpc://${YDB_ENDPOINT} -d /${YDB_DATABASE} workload transfer topic-to-table ${args}
//
return RunYdb({"workload", "transfer", "topic-to-table"}, args, checkExitCode);
}

void RunYdbAndAssertTopicAndTableProps(const TList<TString>& args,
const TString& topic, ui32 topicPartitions, ui32 consumers,
const TString& table, ui32 tablePartitions)
{
ExecYdb(args);
ExpectTopic({.Name=topic, .Partitions=topicPartitions, .Consumers=consumers});
ExpectTable({.Name=table, .Partitions=tablePartitions});
}

void EnsureStatisticsColumns(const TList<TString>& args,
const TVector<TString>& columns1,
const TVector<TString>& columns2)
{
ExecYdb({"init"});
auto output = ExecYdb(args, false);

TVector<TString> lines;
Split(output, "\n", lines);

UnitAssertColumnsOrder(lines[0], columns1);
UnitAssertColumnsOrder(lines[1], columns2);

ExecYdb({"clean"});
}

TString GenerateTopicName()
{
return "transfer-topic-" + GetRandomString(4);
}

TString GenerateTableName()
{
return "transfer-table-" + GetRandomString(4);
}
private:

TString GetRandomString(int len)
{
TString str;
for (int i = 0; i < len; ++i) {
str.push_back('a' + rand() % 26);
}
return str;
}
};

NYdb::NTable::TSession GetSession(NYdb::NTable::TTableClient& client)
{
auto result = client.GetSession().GetValueSync();
return result.GetSession();
}

void ExpectTopic(const TTopicConfigMatcher& matcher)
{
NYdb::TDriverConfig config;
config.SetEndpoint(GetYdbEndpoint());
config.SetDatabase(GetYdbDatabase());

NYdb::TDriver driver(config);
NYdb::NTopic::TTopicClient client(driver);

auto result = client.DescribeTopic(matcher.Name).GetValueSync();
if (result.GetStatus() == NYdb::EStatus::SCHEME_ERROR) {
UNIT_ASSERT_VALUES_EQUAL(0, matcher.Partitions);
UNIT_ASSERT_VALUES_EQUAL(0, matcher.Consumers);
} else {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);

auto& description = result.GetTopicDescription();

UNIT_ASSERT_VALUES_EQUAL(description.GetPartitions().size(), matcher.Partitions);
UNIT_ASSERT_VALUES_EQUAL(description.GetConsumers().size(), matcher.Consumers);
}
}

void ExpectTable(const TTableConfigMatcher& matcher)
{
NYdb::TDriverConfig config;
config.SetEndpoint(GetYdbEndpoint());
config.SetDatabase(GetYdbDatabase());

NYdb::TDriver driver(config);
NYdb::NTable::TTableClient client(driver);
auto session = GetSession(client);

NYdb::NTable::TDescribeTableSettings options;
options.WithTableStatistics(true);

auto result = session.DescribeTable("/" + GetYdbDatabase() + "/" + matcher.Name,
options).GetValueSync();
if (result.GetStatus() == NYdb::EStatus::SCHEME_ERROR) {
UNIT_ASSERT_VALUES_EQUAL(0, matcher.Partitions);
} else {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);

auto description = result.GetTableDescription();

UNIT_ASSERT_VALUES_EQUAL(description.GetPartitionsCount(), matcher.Partitions);
}
}

TString ExecYdb(const TList<TString>& args, bool checkExitCode = true)
{
//
// ydb -e grpc://${YDB_ENDPOINT} -d /${YDB_DATABASE} workload transfer topic-to-table ${args}
//
return RunYdb({"workload", "transfer", "topic-to-table"}, args,
checkExitCode);
}

void RunYdb(const TList<TString>& args,
const TString& topic, ui32 topicPartitions, ui32 consumers,
const TString& table, ui32 tablePartitions)
{
ExecYdb(args);
ExpectTopic({.Name=topic, .Partitions=topicPartitions, .Consumers=consumers});
ExpectTable({.Name=table, .Partitions=tablePartitions});
}
Y_UNIT_TEST_SUITE_F(YdbWorkloadTransferTopicToTable, TFixture) {

Y_UNIT_TEST(Default_Run)
{
ExecYdb({"init"});
auto output = ExecYdb({"run", "-s", "10"});
ExecYdb({"clean"});
auto output = ExecYdb({"run", "-s", "10", "--topic", TopicName});

ui64 fullTime = GetFullTimeValue(output);

Expand All @@ -108,24 +172,25 @@ Y_UNIT_TEST(Default_Run)

Y_UNIT_TEST(Default_Init_Clean)
{
const TString topic = "transfer-topic";
const TString table = "transfer-table";
// setip and tear down also executed but ignored
TopicName = GenerateTopicName();;
TableName = GenerateTableName();

RunYdb({"init"}, topic, 128, 1, table, 128);
RunYdb({"clean"}, topic, 0, 0, table, 0);
ExecYdb({"init", "--topic", TopicName, "--table", TableName});
ExecYdb({"clean", "--topic", TopicName, "--table", TableName});
}

Y_UNIT_TEST(Specific_Init_Clean)
{
const TString topic = "my-topic";
const TString table = "my-table";
const TString topic = GenerateTopicName();
const TString table = GenerateTableName();

RunYdb({"init",
RunYdbAndAssertTopicAndTableProps({"init",
"--topic", topic, "--topic-partitions", "3", "--consumers", "5",
"--table", table, "--table-partitions", "8"},
topic, 3, 5,
table, 8);
RunYdb({"clean",
RunYdbAndAssertTopicAndTableProps({"clean",
"--topic", topic,
"--table", table},
topic, 0, 0,
Expand All @@ -134,37 +199,20 @@ Y_UNIT_TEST(Specific_Init_Clean)

Y_UNIT_TEST(Clean_Without_Init)
{
UNIT_ASSERT_EXCEPTION(ExecYdb({"clean"}), yexception);
UNIT_ASSERT_EXCEPTION(ExecYdb({"clean", "--topic", GenerateTopicName(), "--table", GenerateTableName()}), yexception);
}

Y_UNIT_TEST(Double_Init)
{
ExecYdb({"init"});
UNIT_ASSERT_EXCEPTION(ExecYdb({"init"}), yexception);
ExecYdb({"clean"});
}

void EnsureStatisticsColumns(const TList<TString>& args,
const TVector<TString>& columns1,
const TVector<TString>& columns2)
{
ExecYdb({"init"});
auto output = ExecYdb(args, false);

TVector<TString> lines;
Split(output, "\n", lines);

UnitAssertColumnsOrder(lines[0], columns1);
UnitAssertColumnsOrder(lines[1], columns2);

ExecYdb({"clean"});
UNIT_ASSERT_EXCEPTION(ExecYdb({"init", "--topic", TopicName, "--table", TableName}), yexception);
}

Y_UNIT_TEST(Statistics)
{
EnsureStatisticsColumns({"run", "-s", "1", "--warmup", "0"},
EnsureStatisticsColumns({"run", "-s", "1", "--warmup", "0", "--topic", TopicName, "--table", TableName},
{"Window", "Write speed", "Write time", "Inflight", "Read speed", "Topic time", "Select time", "Upsert time", "Commit time"},
{"#", "msg/s", "MB/s", "percentile,ms", "percentile,msg", "msg/s", "MB/s", "percentile,ms", "percentile,ms", "percentile,ms", "percentile,ms"});
}

}
} // anonymous namespace

0 comments on commit 3f816c5

Please sign in to comment.