diff --git a/ydb/library/yql/yt/native/plugin.cpp b/ydb/library/yql/yt/native/plugin.cpp index 48fd34de6090..3be2cb05e227 100644 --- a/ydb/library/yql/yt/native/plugin.cpp +++ b/ydb/library/yql/yt/native/plugin.cpp @@ -93,30 +93,21 @@ std::optional MaybeToOptional(const TMaybe& maybeStr) //////////////////////////////////////////////////////////////////////////////// -struct TQueryPlan -{ - std::optional Plan; - YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, PlanSpinLock); -}; - -struct TActiveQuery -{ - NYql::TProgramPtr Program; - bool Compiled = false; - - TProgressMerger ProgressMerger; - std::optional Plan; -}; - -//////////////////////////////////////////////////////////////////////////////// - class TQueryPipelineConfigurator : public NYql::IPipelineConfigurator + , public TRefCounted { public: - TQueryPipelineConfigurator(NYql::TProgramPtr program, TQueryPlan& plan) + struct TQueryPlan + { + std::optional Plan; + YT_DECLARE_SPIN_LOCK(NThreading::TReaderWriterSpinLock, PlanSpinLock); + }; + NYql::TProgramPtr Program_; + mutable TQueryPlan Plan_; + + TQueryPipelineConfigurator(NYql::TProgramPtr program) : Program_(std::move(program)) - , Plan_(plan) { } void AfterCreate(NYql::TTransformationPipeline* /*pipeline*/) const override @@ -138,10 +129,20 @@ class TQueryPipelineConfigurator pipeline->Add(NYql::CreateFunctorTransformer(transformer), "PlanOutput"); } +}; +DECLARE_REFCOUNTED_TYPE(TQueryPipelineConfigurator) +DEFINE_REFCOUNTED_TYPE(TQueryPipelineConfigurator) -private: - NYql::TProgramPtr Program_; - TQueryPlan& Plan_; +//////////////////////////////////////////////////////////////////////////////// + +struct TActiveQuery +{ + NYql::TProgramPtr Program; + bool Compiled = false; + + TProgressMerger ProgressMerger; + TQueryPipelineConfiguratorPtr PipelineConfigurator; + std::optional Plan; }; //////////////////////////////////////////////////////////////////////////////// @@ -460,9 +461,11 @@ class TYqlPlugin int executeMode) { auto program = ProgramFactory_->Create("-memory-", queryText); + auto pipelineConfigurator = New(program); { auto guard = WriterGuard(ProgressSpinLock); ActiveQueriesProgress_[queryId].Program = program; + ActiveQueriesProgress_[queryId].PipelineConfigurator = pipelineConfigurator; } TVector> credentials; @@ -487,14 +490,11 @@ class TYqlPlugin auto userDataTable = FilesToUserTable(files); program->AddUserDataTable(userDataTable); - TQueryPlan queryPlan; - auto pipelineConfigurator = TQueryPipelineConfigurator(program, queryPlan); - program->SetProgressWriter([&] (const NYql::TOperationProgress& progress) { std::optional plan; { - auto guard = ReaderGuard(queryPlan.PlanSpinLock); - plan.swap(queryPlan.Plan); + auto guard = ReaderGuard(pipelineConfigurator->Plan_.PlanSpinLock); + plan.swap(pipelineConfigurator->Plan_.Plan); } auto guard = WriterGuard(ProgressSpinLock); @@ -543,10 +543,10 @@ class TYqlPlugin status = program->Validate(user, nullptr); break; case 1: // Optimize. - status = program->OptimizeWithConfig(user, pipelineConfigurator); + status = program->OptimizeWithConfig(user, *pipelineConfigurator); break; case 2: // Run. - status = program->RunWithConfig(user, pipelineConfigurator); + status = program->RunWithConfig(user, *pipelineConfigurator); break; default: // Unknown. return TQueryResult{ @@ -605,16 +605,13 @@ class TYqlPlugin std::vector files, int executeMode) noexcept override { - try { - auto result = GuardedRun(queryId, user, credentials, queryText, settings, files, executeMode); - if (result.YsonError) { - ExtractQuery(queryId); - } - - return result; - } catch (const std::exception& ex) { + auto finalCleaning = Finally([&] { ExtractQuery(queryId); + }); + try { + return GuardedRun(queryId, user, credentials, queryText, settings, files, executeMode); + } catch (const std::exception& ex) { return TQueryResult{ .YsonError = MessageToYtErrorYson(ex.what()), };