Skip to content

Commit

Permalink
Fix deadlock (#14101)
Browse files Browse the repository at this point in the history
  • Loading branch information
Krisha11 authored Feb 3, 2025
1 parent 3f816c5 commit 74ff898
Showing 1 changed file with 34 additions and 37 deletions.
71 changes: 34 additions & 37 deletions ydb/library/yql/yt/native/plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,30 +93,21 @@ std::optional<TString> MaybeToOptional(const TMaybe<TString>& maybeStr)

////////////////////////////////////////////////////////////////////////////////

struct TQueryPlan
{
std::optional<TString> Plan;
YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, PlanSpinLock);
};

struct TActiveQuery
{
NYql::TProgramPtr Program;
bool Compiled = false;

TProgressMerger ProgressMerger;
std::optional<TString> Plan;
};

////////////////////////////////////////////////////////////////////////////////

class TQueryPipelineConfigurator
: public NYql::IPipelineConfigurator
, public TRefCounted
{
public:
TQueryPipelineConfigurator(NYql::TProgramPtr program, TQueryPlan& plan)
struct TQueryPlan
{
std::optional<TString> Plan;
YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, PlanSpinLock);
};
NYql::TProgramPtr Program_;
mutable TQueryPlan Plan_;

TQueryPipelineConfigurator(NYql::TProgramPtr program)
: Program_(std::move(program))
, Plan_(plan)
{ }

void AfterCreate(NYql::TTransformationPipeline* /*pipeline*/) const override
Expand All @@ -138,10 +129,20 @@ class TQueryPipelineConfigurator

pipeline->Add(NYql::CreateFunctorTransformer(transformer), "PlanOutput");
}
};
DECLARE_REFCOUNTED_TYPE(TQueryPipelineConfigurator)
DEFINE_REFCOUNTED_TYPE(TQueryPipelineConfigurator)

private:
NYql::TProgramPtr Program_;
TQueryPlan& Plan_;
////////////////////////////////////////////////////////////////////////////////

struct TActiveQuery
{
NYql::TProgramPtr Program;
bool Compiled = false;

TProgressMerger ProgressMerger;
TQueryPipelineConfiguratorPtr PipelineConfigurator;
std::optional<TString> Plan;
};

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -460,9 +461,11 @@ class TYqlPlugin
int executeMode)
{
auto program = ProgramFactory_->Create("-memory-", queryText);
auto pipelineConfigurator = New<TQueryPipelineConfigurator>(program);
{
auto guard = WriterGuard(ProgressSpinLock);
ActiveQueriesProgress_[queryId].Program = program;
ActiveQueriesProgress_[queryId].PipelineConfigurator = pipelineConfigurator;
}

TVector<std::pair<TString, NYql::TCredential>> credentials;
Expand All @@ -487,14 +490,11 @@ class TYqlPlugin
auto userDataTable = FilesToUserTable(files);
program->AddUserDataTable(userDataTable);

TQueryPlan queryPlan;
auto pipelineConfigurator = TQueryPipelineConfigurator(program, queryPlan);

program->SetProgressWriter([&] (const NYql::TOperationProgress& progress) {
std::optional<TString> plan;
{
auto guard = ReaderGuard(queryPlan.PlanSpinLock);
plan.swap(queryPlan.Plan);
auto guard = ReaderGuard(pipelineConfigurator->Plan_.PlanSpinLock);
plan.swap(pipelineConfigurator->Plan_.Plan);
}

auto guard = WriterGuard(ProgressSpinLock);
Expand Down Expand Up @@ -543,10 +543,10 @@ class TYqlPlugin
status = program->Validate(user, nullptr);
break;
case 1: // Optimize.
status = program->OptimizeWithConfig(user, pipelineConfigurator);
status = program->OptimizeWithConfig(user, *pipelineConfigurator);
break;
case 2: // Run.
status = program->RunWithConfig(user, pipelineConfigurator);
status = program->RunWithConfig(user, *pipelineConfigurator);
break;
default: // Unknown.
return TQueryResult{
Expand Down Expand Up @@ -605,16 +605,13 @@ class TYqlPlugin
std::vector<TQueryFile> files,
int executeMode) noexcept override
{
try {
auto result = GuardedRun(queryId, user, credentials, queryText, settings, files, executeMode);
if (result.YsonError) {
ExtractQuery(queryId);
}

return result;
} catch (const std::exception& ex) {
auto finalCleaning = Finally([&] {
ExtractQuery(queryId);
});

try {
return GuardedRun(queryId, user, credentials, queryText, settings, files, executeMode);
} catch (const std::exception& ex) {
return TQueryResult{
.YsonError = MessageToYtErrorYson(ex.what()),
};
Expand Down

0 comments on commit 74ff898

Please sign in to comment.