From 03356b9e72fb3a3b83029cbc881f5c5be98c8c28 Mon Sep 17 00:00:00 2001 From: Dom Del Nano Date: Mon, 16 Sep 2024 03:00:00 +0000 Subject: [PATCH 1/3] Make metadata pod lookups more resilient to short lived processes Signed-off-by: Dom Del Nano --- src/carnot/funcs/metadata/metadata_ops.cc | 1 + src/carnot/funcs/metadata/metadata_ops.h | 19 +++ .../analyzer/convert_metadata_rule.cc | 130 +++++++++++++++++- .../compiler/analyzer/convert_metadata_rule.h | 26 ++++ .../analyzer/convert_metadata_rule_test.cc | 91 +++++++++++- src/carnot/planner/compiler/test_utils.h | 8 ++ .../planner/distributed/splitter/splitter.cc | 25 ++-- src/carnot/planner/logical_planner_test.cc | 128 +++++++++++++++++ src/carnot/planner/test_utils.h | 102 ++++++++++++++ 9 files changed, 516 insertions(+), 14 deletions(-) diff --git a/src/carnot/funcs/metadata/metadata_ops.cc b/src/carnot/funcs/metadata/metadata_ops.cc index 76220e0bf6b..e3714b13aba 100644 --- a/src/carnot/funcs/metadata/metadata_ops.cc +++ b/src/carnot/funcs/metadata/metadata_ops.cc @@ -45,6 +45,7 @@ void RegisterMetadataOpsOrDie(px::carnot::udf::Registry* registry) { registry->RegisterOrDie("has_service_name"); registry->RegisterOrDie("has_value"); registry->RegisterOrDie("ip_to_pod_id"); + registry->RegisterOrDie("_ip_to_pod_id_pem_exec"); registry->RegisterOrDie("ip_to_pod_id"); registry->RegisterOrDie("pod_id_to_pod_name"); registry->RegisterOrDie("pod_id_to_pod_labels"); diff --git a/src/carnot/funcs/metadata/metadata_ops.h b/src/carnot/funcs/metadata/metadata_ops.h index 6bd3db98ea8..d9fb295aaa7 100644 --- a/src/carnot/funcs/metadata/metadata_ops.h +++ b/src/carnot/funcs/metadata/metadata_ops.h @@ -2977,6 +2977,25 @@ class IPToPodIDUDF : public ScalarUDF { static udfspb::UDFSourceExecutor Executor() { return udfspb::UDFSourceExecutor::UDF_KELVIN; } }; +/** + * This UDF is a compiler internal function. It should only be used when the IP address is + * guaranteed to be a local address since this function is forced to run on PEMs. In cases + * where the IP could be a remote address, then it is more correct to have the function run on + * Kelvin (IPToPodIDUDF or IPToPodIDAtTimeUDF). + */ +class IPToPodIDAtTimePEMExecUDF : public ScalarUDF { + public: + /** + * @brief Gets the pod id of pod with given pod_ip and time + */ + StringValue Exec(FunctionContext* ctx, StringValue pod_ip, Time64NSValue time) { + auto md = GetMetadataState(ctx); + return md->k8s_metadata_state().PodIDByIPAtTime(pod_ip, time.val); + } + + static udfspb::UDFSourceExecutor Executor() { return udfspb::UDFSourceExecutor::UDF_PEM; } +}; + class IPToPodIDAtTimeUDF : public ScalarUDF { public: /** diff --git a/src/carnot/planner/compiler/analyzer/convert_metadata_rule.cc b/src/carnot/planner/compiler/analyzer/convert_metadata_rule.cc index bdcc7462161..9df76fe6426 100644 --- a/src/carnot/planner/compiler/analyzer/convert_metadata_rule.cc +++ b/src/carnot/planner/compiler/analyzer/convert_metadata_rule.cc @@ -31,6 +31,58 @@ namespace carnot { namespace planner { namespace compiler { +namespace { +std::string GetUniquePodNameCol(std::shared_ptr parent_type, + absl::flat_hash_set* used_column_names) { + auto col_name_counter = 0; + do { + auto new_col = absl::StrCat("pod_name_", col_name_counter++); + if (!used_column_names->contains(new_col) && !parent_type->HasColumn(new_col)) { + used_column_names->insert(new_col); + return new_col; + } + } while (true); +} +} // namespace + +Status ConvertMetadataRule::AddPodNameConversionMapsWithFallback( + IR* graph, IRNode* container, ExpressionIR* metadata_expr, ExpressionIR* fallback_expr, + const std::pair& col_names) const { + if (Match(container, Func())) { + for (int64_t parent_id : graph->dag().ParentsOf(container->id())) { + PX_RETURN_IF_ERROR(AddPodNameConversionMapsWithFallback( + graph, graph->Get(parent_id), metadata_expr, fallback_expr, col_names)); + } + } else if (Match(container, Operator())) { + auto container_op = static_cast(container); + for (auto parent_op : container_op->parents()) { + auto metadata_col_expr = ColumnExpression(col_names.first, metadata_expr); + auto fallback_col_expr = ColumnExpression(col_names.second, fallback_expr); + PX_ASSIGN_OR_RETURN( + auto md_map_ir, + graph->CreateNode(container->ast(), parent_op, + std::vector{metadata_col_expr}, true)); + PX_ASSIGN_OR_RETURN( + auto fallback_md_map_ir, + graph->CreateNode(container->ast(), static_cast(md_map_ir), + std::vector{fallback_col_expr}, true)); + PX_RETURN_IF_ERROR(container_op->ReplaceParent(parent_op, fallback_md_map_ir)); + + for (auto child : parent_op->Children()) { + if (child == md_map_ir) { + continue; + } + PX_RETURN_IF_ERROR(child->ReplaceParent(parent_op, md_map_ir)); + } + + PX_RETURN_IF_ERROR(PropagateTypeChangesFromNode(graph, md_map_ir, compiler_state_)); + PX_RETURN_IF_ERROR(PropagateTypeChangesFromNode(graph, fallback_md_map_ir, compiler_state_)); + } + } + + return Status::OK(); +} + Status ConvertMetadataRule::UpdateMetadataContainer(IRNode* container, MetadataIR* metadata, ExpressionIR* metadata_expr) const { if (Match(container, Func())) { @@ -70,6 +122,12 @@ StatusOr ConvertMetadataRule::FindKeyColumn(std::shared_ptrColumnNames(), ",")); } +bool CheckBackupConversionAvailable(std::shared_ptr parent_type, + const std::string& func_name) { + return parent_type->HasColumn("time_") && parent_type->HasColumn("local_addr") && + func_name == "upid_to_pod_name"; +} + StatusOr ConvertMetadataRule::Apply(IRNode* ir_node) { if (!Match(ir_node, Metadata())) { return false; @@ -85,8 +143,9 @@ StatusOr ConvertMetadataRule::Apply(IRNode* ir_node) { PX_ASSIGN_OR_RETURN(auto parent, metadata->ReferencedOperator()); PX_ASSIGN_OR_RETURN(auto containing_ops, metadata->ContainingOperators()); + auto resolved_table_type = parent->resolved_table_type(); PX_ASSIGN_OR_RETURN(std::string key_column_name, - FindKeyColumn(parent->resolved_table_type(), md_property, ir_node)); + FindKeyColumn(resolved_table_type, md_property, ir_node)); PX_ASSIGN_OR_RETURN(ColumnIR * key_column, graph->CreateNode(ir_node->ast(), key_column_name, parent_op_idx)); @@ -96,10 +155,74 @@ StatusOr ConvertMetadataRule::Apply(IRNode* ir_node) { FuncIR * conversion_func, graph->CreateNode(ir_node->ast(), FuncIR::Op{FuncIR::Opcode::non_op, "", func_name}, std::vector{key_column})); + FuncIR* orig_conversion_func = conversion_func; + ExpressionIR* conversion_expr = static_cast(conversion_func); + + // TODO(ddelnano): Until the short lived process issue (gh#1638) is resolved, add a fallback + // conversion function that uses local_addr for pod lookups when the upid based default + // (upid_to_pod_name) fails. This turns the `df.ctx["pod"]` lookup into the following pseudo code: + // + // fallback = px.pod_id_to_pod_name(px.ip_to_pod_id(df.ctx["local_addr"])) + // df.pod = px.select(px.upid_to_pod_name(df.upid) == "", fallback, px.upid_to_pod_name(df.upid)) + FuncIR* backup_conversion_func = nullptr; + auto backup_conversion_available = CheckBackupConversionAvailable(resolved_table_type, func_name); + std::pair col_names; + if (backup_conversion_available) { + absl::flat_hash_set used_column_names; + col_names = std::make_pair(GetUniquePodNameCol(resolved_table_type, &used_column_names), + GetUniquePodNameCol(resolved_table_type, &used_column_names)); + PX_ASSIGN_OR_RETURN(ColumnIR * local_addr_col, + graph->CreateNode(ir_node->ast(), "local_addr", parent_op_idx)); + PX_ASSIGN_OR_RETURN(ColumnIR * time_col, + graph->CreateNode(ir_node->ast(), "time_", parent_op_idx)); + PX_ASSIGN_OR_RETURN( + ColumnIR * md_expr_col, + graph->CreateNode(ir_node->ast(), col_names.first, parent_op_idx)); + PX_ASSIGN_OR_RETURN( + FuncIR * ip_conversion_func, + graph->CreateNode(ir_node->ast(), + FuncIR::Op{FuncIR::Opcode::non_op, "", "_ip_to_pod_id_pem_exec"}, + std::vector{local_addr_col, time_col})); + + // This doesn't need to have a "pem exec" equivalent function as long as the metadata + // annotation is set as done below + PX_ASSIGN_OR_RETURN( + backup_conversion_func, + graph->CreateNode( + ir_node->ast(), FuncIR::Op{FuncIR::Opcode::non_op, "", "pod_id_to_pod_name"}, + std::vector{static_cast(ip_conversion_func)})); + + backup_conversion_func->set_annotations(ExpressionIR::Annotations(md_type)); + + PX_ASSIGN_OR_RETURN(ExpressionIR * empty_string, + graph->CreateNode(ir_node->ast(), "")); + PX_ASSIGN_OR_RETURN( + FuncIR * select_expr, + graph->CreateNode( + ir_node->ast(), FuncIR::Op{FuncIR::Opcode::eq, "==", "equal"}, + std::vector{static_cast(md_expr_col), empty_string})); + PX_ASSIGN_OR_RETURN(auto duplicate_md_expr_col, graph->CopyNode(md_expr_col)); + PX_ASSIGN_OR_RETURN( + FuncIR * select_func, + graph->CreateNode( + ir_node->ast(), FuncIR::Op{FuncIR::Opcode::non_op, "", "select"}, + std::vector{static_cast(select_expr), + backup_conversion_func, duplicate_md_expr_col})); + + conversion_func = select_func; + PX_ASSIGN_OR_RETURN(conversion_expr, graph->CreateNode( + ir_node->ast(), col_names.second, parent_op_idx)); + } + for (int64_t parent_id : graph->dag().ParentsOf(metadata->id())) { // For each container node of the metadata expression, update it to point to the // new conversion func instead. - PX_RETURN_IF_ERROR(UpdateMetadataContainer(graph->Get(parent_id), metadata, conversion_func)); + auto container = graph->Get(parent_id); + PX_RETURN_IF_ERROR(UpdateMetadataContainer(container, metadata, conversion_expr)); + if (backup_conversion_available) { + PX_RETURN_IF_ERROR(AddPodNameConversionMapsWithFallback( + graph, container, orig_conversion_func, conversion_func, col_names)); + } } // Propagate type changes from the new conversion_func. @@ -107,8 +230,7 @@ StatusOr ConvertMetadataRule::Apply(IRNode* ir_node) { DCHECK_EQ(conversion_func->EvaluatedDataType(), column_type) << "Expected the parent key column type and metadata property type to match."; - conversion_func->set_annotations(ExpressionIR::Annotations(md_type)); - + orig_conversion_func->set_annotations(ExpressionIR::Annotations(md_type)); return true; } diff --git a/src/carnot/planner/compiler/analyzer/convert_metadata_rule.h b/src/carnot/planner/compiler/analyzer/convert_metadata_rule.h index a027c582d8b..2cf69e517c4 100644 --- a/src/carnot/planner/compiler/analyzer/convert_metadata_rule.h +++ b/src/carnot/planner/compiler/analyzer/convert_metadata_rule.h @@ -20,6 +20,7 @@ #include #include +#include #include "src/carnot/planner/compiler_state/compiler_state.h" #include "src/carnot/planner/rules/rules.h" @@ -46,6 +47,31 @@ class ConvertMetadataRule : public Rule { ExpressionIR* metadata_expr) const; StatusOr FindKeyColumn(std::shared_ptr parent_type, MetadataProperty* property, IRNode* node_for_error) const; + + /** + * + * This function aids in applying a fallback to a metadata conversion expression. + * It works by adding two conversion maps to the graph. The first map contains a column expression + * that contains the metadata expression result. The second map contains a column expression that + * contains the fallback expression result. It is intended to be used as a short term workaround + * for gh#1638 where pod name lookups fail for short lived processes. This fallback expression + * allows for an alternative mechanism if the primary lookup fails. See the example below: + * + * Before applying the rule: + * + * MemorySource -> MapIR (containing MetadataIR) + * + * After applying the rule: + * + * MemorySource + * -> MapIR (col_names[0]: metadata_expr) + * -> MapIR (col_names[1]: fallback_expr) + * -> MapIR (col_names[1] & existing cols) + */ + + Status AddPodNameConversionMapsWithFallback( + IR* graph, IRNode* container, ExpressionIR* metadata_expr, ExpressionIR* fallback_expr, + const std::pair& col_names) const; }; } // namespace compiler diff --git a/src/carnot/planner/compiler/analyzer/convert_metadata_rule_test.cc b/src/carnot/planner/compiler/analyzer/convert_metadata_rule_test.cc index b52469f9964..f5313d2c0e5 100644 --- a/src/carnot/planner/compiler/analyzer/convert_metadata_rule_test.cc +++ b/src/carnot/planner/compiler/analyzer/convert_metadata_rule_test.cc @@ -31,7 +31,7 @@ using table_store::schema::Relation; using ConvertMetadataRuleTest = RulesTest; -TEST_F(ConvertMetadataRuleTest, multichild) { +TEST_F(ConvertMetadataRuleTest, multichild_without_fallback_func) { auto relation = Relation(cpu_relation); MetadataType conversion_column = MetadataType::UPID; std::string conversion_column_str = MetadataProperty::GetMetadataString(conversion_column); @@ -114,6 +114,95 @@ TEST_F(ConvertMetadataRuleTest, missing_conversion_column) { skip_check_stray_nodes_ = true; } +TEST_F(ConvertMetadataRuleTest, multichild_with_fallback_func) { + auto relation = Relation(http_events_relation); + MetadataType conversion_column = MetadataType::UPID; + std::string conversion_column_str = MetadataProperty::GetMetadataString(conversion_column); + relation.AddColumn(types::DataType::UINT128, conversion_column_str); + compiler_state_->relation_map()->emplace("table", relation); + + auto metadata_name = "pod_name"; + MetadataProperty* property = md_handler->GetProperty(metadata_name).ValueOrDie(); + MetadataIR* metadata_ir = MakeMetadataIR(metadata_name, /* parent_op_idx */ 0); + metadata_ir->set_property(property); + + auto src = MakeMemSource(relation); + auto map = MakeMap(src, {{"md", metadata_ir}}); + + ResolveTypesRule type_rule(compiler_state_.get()); + ASSERT_OK(type_rule.Execute(graph.get())); + + ConvertMetadataRule rule(compiler_state_.get()); + auto result = rule.Execute(graph.get()); + ASSERT_OK(result); + EXPECT_TRUE(result.ValueOrDie()); + + EXPECT_EQ(0, graph->FindNodesThatMatch(Metadata()).size()); + + EXPECT_EQ(1, src->Children().size()); + auto md_map = static_cast(src->Children()[0]); + EXPECT_NE(md_map, map); + + FuncIR* upid_to_pod_name = nullptr; + for (auto col_expr : md_map->col_exprs()) { + if (col_expr.name == "pod_name_0") { + EXPECT_MATCH(col_expr.node, Func()); + upid_to_pod_name = static_cast(col_expr.node); + } + } + EXPECT_NE(upid_to_pod_name, nullptr); + EXPECT_EQ(absl::Substitute("upid_to_$0", metadata_name), upid_to_pod_name->func_name()); + EXPECT_EQ(1, upid_to_pod_name->all_args().size()); + auto input_col = upid_to_pod_name->all_args()[0]; + EXPECT_MATCH(input_col, ColumnNode("upid")); + EXPECT_MATCH(upid_to_pod_name, ResolvedExpression()); + EXPECT_MATCH(input_col, ResolvedExpression()); + + EXPECT_EQ(1, md_map->Children().size()); + auto fallback_map = static_cast(md_map->Children()[0]); + FuncIR* fallback_func_select = nullptr; + for (auto col_expr : fallback_map->col_exprs()) { + if (col_expr.name == "pod_name_1") { + EXPECT_MATCH(col_expr.node, Func()); + fallback_func_select = static_cast(col_expr.node); + } + } + + EXPECT_NE(fallback_func_select, nullptr); + EXPECT_EQ("select", fallback_func_select->func_name()); + EXPECT_EQ(3, fallback_func_select->all_args().size()); + + auto orig_func_check = fallback_func_select->all_args()[0]; + EXPECT_MATCH(orig_func_check, Func()); + auto equals_func = static_cast(orig_func_check); + EXPECT_EQ("equal", equals_func->func_name()); + EXPECT_EQ(2, equals_func->all_args().size()); + EXPECT_MATCH(equals_func->all_args()[0], ColumnNode("pod_name_0")); + EXPECT_MATCH(equals_func->all_args()[1], String("")); + EXPECT_MATCH(orig_func_check, ResolvedExpression()); + + EXPECT_MATCH(fallback_func_select->all_args()[1], Func()); + auto fallback_func = static_cast(fallback_func_select->all_args()[1]); + EXPECT_EQ("pod_id_to_pod_name", fallback_func->func_name()); + EXPECT_EQ(1, fallback_func->all_args().size()); + EXPECT_MATCH(fallback_func->all_args()[0], Func()); + EXPECT_MATCH(fallback_func, ResolvedExpression()); + + auto ip_func = static_cast(fallback_func->all_args()[0]); + EXPECT_EQ("_ip_to_pod_id_pem_exec", ip_func->func_name()); + EXPECT_EQ(2, ip_func->all_args().size()); + EXPECT_MATCH(ip_func->all_args()[0], ColumnNode("local_addr")); + EXPECT_MATCH(ip_func->all_args()[1], ColumnNode("time_")); + EXPECT_MATCH(ip_func, ResolvedExpression()); + + // Check that the semantic type of the conversion func is propagated properly. + auto type_or_s = map->resolved_table_type()->GetColumnType("md"); + ASSERT_OK(type_or_s); + auto type = std::static_pointer_cast(type_or_s.ConsumeValueOrDie()); + EXPECT_EQ(types::STRING, type->data_type()); + EXPECT_EQ(types::ST_POD_NAME, type->semantic_type()); +} + } // namespace compiler } // namespace planner } // namespace carnot diff --git a/src/carnot/planner/compiler/test_utils.h b/src/carnot/planner/compiler/test_utils.h index 2f65f616b50..99156633f61 100644 --- a/src/carnot/planner/compiler/test_utils.h +++ b/src/carnot/planner/compiler/test_utils.h @@ -860,10 +860,15 @@ class RulesTest : public OperatorTests { std::vector({types::DataType::INT64, types::DataType::FLOAT64, types::DataType::FLOAT64, types::DataType::FLOAT64}), std::vector({"count", "cpu0", "cpu1", "cpu2"})); + http_events_relation = table_store::schema::Relation( + std::vector( + {types::DataType::TIME64NS, types::DataType::STRING, types::DataType::INT64}), + std::vector({"time_", "local_addr", "local_port"})); semantic_rel = Relation({types::INT64, types::FLOAT64, types::STRING}, {"bytes", "cpu", "str_col"}, {types::ST_BYTES, types::ST_PERCENT, types::ST_NONE}); rel_map->emplace("cpu", cpu_relation); + rel_map->emplace("http_events", http_events_relation); rel_map->emplace("semantic_table", semantic_rel); compiler_state_ = std::make_unique( @@ -935,6 +940,7 @@ class RulesTest : public OperatorTests { std::unique_ptr info_; int64_t time_now = 1552607213931245000; table_store::schema::Relation cpu_relation; + table_store::schema::Relation http_events_relation; table_store::schema::Relation semantic_rel; std::unique_ptr md_handler; }; @@ -1110,6 +1116,8 @@ class ASTVisitorTest : public OperatorTests { Relation http_events_relation; http_events_relation.AddColumn(types::TIME64NS, "time_"); http_events_relation.AddColumn(types::UINT128, "upid"); + http_events_relation.AddColumn(types::STRING, "local_addr"); + http_events_relation.AddColumn(types::INT64, "local_port"); http_events_relation.AddColumn(types::STRING, "remote_addr"); http_events_relation.AddColumn(types::INT64, "remote_port"); http_events_relation.AddColumn(types::INT64, "major_version"); diff --git a/src/carnot/planner/distributed/splitter/splitter.cc b/src/carnot/planner/distributed/splitter/splitter.cc index f073d6ed053..c8761726f57 100644 --- a/src/carnot/planner/distributed/splitter/splitter.cc +++ b/src/carnot/planner/distributed/splitter/splitter.cc @@ -23,6 +23,7 @@ #include #include "src/carnot/planner/distributed/distributed_rules.h" +#include "src/carnot/planner/distributed/splitter/executor_utils.h" #include "src/carnot/planner/distributed/splitter/presplit_analyzer/presplit_analyzer.h" #include "src/carnot/planner/distributed/splitter/presplit_optimizer/presplit_optimizer.h" #include "src/carnot/planner/distributed/splitter/scalar_udfs_run_on_executor_rule.h" @@ -195,42 +196,48 @@ bool SparseFilter(OperatorIR* op) { return MatchSparseFilterExpr(static_cast(op)->filter_expr()); } -bool MatchMetadataOrSubExpression(ExpressionIR* expr) { +bool MatchMetadataOrSubExpression(ExpressionIR* expr, CompilerState* state) { if (Match(expr, MetadataExpression())) { return true; } else if (Match(expr, Func())) { auto func = static_cast(expr); for (const auto& arg : func->args()) { - if (MatchMetadataOrSubExpression(arg)) { + if (MatchMetadataOrSubExpression(arg, state)) { return true; } } + auto pem_only_exec = IsFuncWithExecutor(state, func, udfspb::UDFSourceExecutor::UDF_PEM); + if (pem_only_exec.ok() && pem_only_exec.ValueOrDie()) { + return true; + } } return false; } -bool MustBeOnPemMap(OperatorIR* op) { +bool MustBeOnPemMap(OperatorIR* op, CompilerState* state) { if (!Match(op, Map())) { return false; } MapIR* map = static_cast(op); for (const auto& expr : map->col_exprs()) { - if (MatchMetadataOrSubExpression(expr.node)) { + if (MatchMetadataOrSubExpression(expr.node, state)) { return true; } } return false; } -bool MustBeOnPemFilter(OperatorIR* op) { +bool MustBeOnPemFilter(OperatorIR* op, CompilerState* state) { if (!Match(op, Filter())) { return false; } FilterIR* filter = static_cast(op); - return MatchMetadataOrSubExpression(filter->filter_expr()); + return MatchMetadataOrSubExpression(filter->filter_expr(), state); } -bool MustBeOnPem(OperatorIR* op) { return MustBeOnPemMap(op) || MustBeOnPemFilter(op); } +bool MustBeOnPem(OperatorIR* op, CompilerState* state) { + return MustBeOnPemMap(op, state) || MustBeOnPemFilter(op, state); +} bool Splitter::CanBeGRPCBridgeTree(OperatorIR* op) { // We can generalize this beyond sparse filters in the future as needed. @@ -254,14 +261,14 @@ void Splitter::ConstructGRPCBridgeTree( } for (OperatorIR* child : op->Children()) { if (ignore_children.contains(child)) { - if (MustBeOnPem(child)) { + if (MustBeOnPem(child, compiler_state_)) { LOG(ERROR) << "must be on PEM, but not found: " << child->DebugString(); } continue; } // Certain operators must be on PEM, ie because of metadatas locality. If an operator // is on a PEM then we must make sure we place it there, regardless of GRPCBridges. - if (MustBeOnPem(child)) { + if (MustBeOnPem(child, compiler_state_)) { GRPCBridgeTree child_bridge_node; child_bridge_node.must_be_on_pem = true; child_bridge_node.starting_op = child; diff --git a/src/carnot/planner/logical_planner_test.cc b/src/carnot/planner/logical_planner_test.cc index b15cf201484..2fc232a4839 100644 --- a/src/carnot/planner/logical_planner_test.cc +++ b/src/carnot/planner/logical_planner_test.cc @@ -54,6 +54,19 @@ class LogicalPlannerTest : public ::testing::Test { *query_request.mutable_logical_planner_state() = state; return query_request; } + plannerpb::QueryRequest MakeQueryRequestWithExecArgs( + const distributedpb::LogicalPlannerState& state, const std::string& query, + const std::vector& exec_funcs) { + plannerpb::QueryRequest query_request; + query_request.set_query_str(query); + *query_request.mutable_logical_planner_state() = state; + for (const auto& exec_func : exec_funcs) { + auto f = query_request.add_exec_funcs(); + f->set_func_name(exec_func); + f->set_output_table_prefix(exec_func); + } + return query_request; + } udfspb::UDFInfo info_; }; @@ -770,6 +783,121 @@ TEST_F(LogicalPlannerTest, filter_pushdown_bug) { ASSERT_OK(plan->ToProto()); } +const char kDuplicateColNameError[] = R"pxl( +import px + +df = px.DataFrame(table='http_events', start_time='-6m') +df.pod_name = px.select(df.ctx['pod'] != "", df.ctx['pod'], "pod") + +px.display(df) +)pxl"; +TEST_F(LogicalPlannerTest, duplicate_col_error) { + auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie(); + auto state = testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema); + ASSERT_OK_AND_ASSIGN(auto plan, planner->Plan(MakeQueryRequest(state, kDuplicateColNameError))); + ASSERT_OK(plan->ToProto()); +} + +const char kPodNameFallbackConversion[] = R"pxl( +import px + +df = px.DataFrame(table='http_events', start_time='-6m') +df.pod = df.ctx['pod'] + +px.display(df) +)pxl"; +TEST_F(LogicalPlannerTest, pod_name_fallback_conversion) { + auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie(); + auto state = testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema); + ASSERT_OK_AND_ASSIGN(auto plan, + planner->Plan(MakeQueryRequest(state, kPodNameFallbackConversion))); + ASSERT_OK(plan->ToProto()); +} + +const char kPodNameFallbackConversionWithFilter[] = R"pxl( +import px + +df = px.DataFrame(table='http_events', start_time='-6m') +df[df.ctx['pod'] != ""] + +px.display(df) +)pxl"; +TEST_F(LogicalPlannerTest, pod_name_fallback_conversion_with_filter) { + auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie(); + auto state = testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema); + ASSERT_OK_AND_ASSIGN( + auto plan, planner->Plan(MakeQueryRequest(state, kPodNameFallbackConversionWithFilter))); + ASSERT_OK(plan->ToProto()); +} + +// Use a data table that doesn't contain local_addr to test df.ctx['pod'] conversion without +// the fallback conversion. +const char kPodNameConversionWithoutFallback[] = R"pxl( +import px + +def cql_flow_graph(): + df = px.DataFrame('cql_events', start_time='-5m') + df.pod = df.ctx['pod'] + + df.ra_pod = px.pod_id_to_pod_name(px.ip_to_pod_id(df.remote_addr)) + df.is_ra_pod = df.ra_pod != '' + df.ra_name = px.select(df.is_ra_pod, df.ra_pod, df.remote_addr) + + df.is_server_tracing = df.trace_role == 2 + + df.source = px.select(df.is_server_tracing, df.ra_name, df.pod) + df.destination = px.select(df.is_server_tracing, df.pod, df.ra_name) + + return df + + +def cql_summary_with_links(): + df = cql_flow_graph() + + return df +)pxl"; +TEST_F(LogicalPlannerTest, pod_name_conversion_without_fallback) { + auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie(); + auto state = testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema); + ASSERT_OK_AND_ASSIGN(auto plan, planner->Plan(MakeQueryRequestWithExecArgs( + state, kPodNameConversionWithoutFallback, + {"cql_flow_graph", "cql_summary_with_links"}))); + ASSERT_OK(plan->ToProto()); +} + +// PxL query that contains a GRPC bridge with 2 branches where branch 1 is a blocking node +// and branch 2 contains a PEM only UDF func (not a metadata expression). This triggers a previous +// splitter bug where a PEM only UDF is incorrectly placed on the after blocking side of the bridge. +// Metadata expressions didn't have this problem since md annotations force PEM only scheduling, +// however, calling UDF only UDFs directly triggers this problem. +// +// The PxL below is roughly equivalent to the following problematic bridge: +// +// MemSrc -> Map -> Map (w/ PEM only UDF) -> GRPC Sink (df return) +// \ +// -> GRPC Sink (px.debug) +const char kBrokenUpidToPodNameQuery[] = R"pxl( +import px +def dns_flow_graph(): + df = px.DataFrame('http_events', start_time='-5m') + df.pod = df.ctx['pod'] + + # Create table in drawer. + px.debug(df, "dns_events") + + df.to_entity = px.select(df.remote_addr == '127.0.0.1', + px.upid_to_pod_name(df.upid), + px.Service(px.nslookup(df.remote_addr))) + return df +)pxl"; +TEST_F(LogicalPlannerTest, broken_upid_to_pod_name_query) { + auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie(); + auto state = testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema); + ASSERT_OK_AND_ASSIGN(auto plan, planner->Plan(MakeQueryRequestWithExecArgs( + state, kBrokenUpidToPodNameQuery, {"dns_flow_graph"}))); + ASSERT_OK(plan->ToProto()); +} + const char kHttpDataScript[] = R"pxl( import px diff --git a/src/carnot/planner/test_utils.h b/src/carnot/planner/test_utils.h index 23216f167a1..84a5f94c8fe 100644 --- a/src/carnot/planner/test_utils.h +++ b/src/carnot/planner/test_utils.h @@ -102,6 +102,16 @@ relation_map { column_type: UINT128 column_semantic_type: ST_UPID } + columns { + column_name: "local_addr" + column_type: STRING + column_semantic_type: ST_NONE + } + columns { + column_name: "local_port" + column_type: INT64 + column_semantic_type: ST_NONE + } columns { column_name: "remote_addr" column_type: STRING @@ -174,6 +184,46 @@ relation_map { } } } +relation_map { + key: "cql_events" + value { + columns { + column_name: "time_" + column_type: TIME64NS + column_semantic_type: ST_NONE + } + columns { + column_name: "upid" + column_type: UINT128 + column_semantic_type: ST_UPID + } + columns { + column_name: "remote_addr" + column_type: STRING + column_semantic_type: ST_NONE + } + columns { + column_name: "remote_port" + column_type: INT64 + column_semantic_type: ST_NONE + } + columns { + column_name: "trace_role" + column_type: INT64 + column_semantic_type: ST_NONE + } + columns { + column_name: "major_version" + column_type: INT64 + column_semantic_type: ST_NONE + } + columns { + column_name: "latency" + column_type: INT64 + column_semantic_type: ST_DURATION_NS + } + } +} relation_map { key: "cpu" value { @@ -967,6 +1017,53 @@ schema_info { low_bits: 0x0000000000000003 } } +schema_info { + name: "cql_events" + relation { + columns { + column_name: "time_" + column_type: TIME64NS + column_semantic_type: ST_NONE + } + columns { + column_name: "upid" + column_type: UINT128 + column_semantic_type: ST_NONE + } + columns { + column_name: "remote_addr" + column_type: STRING + column_semantic_type: ST_NONE + } + columns { + column_name: "remote_port" + column_type: INT64 + column_semantic_type: ST_NONE + } + columns { + column_name: "trace_role" + column_type: INT64 + column_semantic_type: ST_NONE + } + columns { + column_name: "latency" + column_type: INT64 + column_semantic_type: ST_NONE + } + } + agent_list { + high_bits: 0x0000000100000000 + low_bits: 0x0000000000000001 + } + agent_list { + high_bits: 0x0000000100000000 + low_bits: 0x0000000000000002 + } + agent_list { + high_bits: 0x0000000100000000 + low_bits: 0x0000000000000003 + } +} schema_info { name: "http_events" relation { @@ -980,6 +1077,11 @@ schema_info { column_type: UINT128 column_semantic_type: ST_NONE } + columns { + column_name: "local_addr" + column_type: STRING + column_semantic_type: ST_NONE + } } agent_list { high_bits: 0x0000000100000000 From cb6a070f613323196fe70c069437ae8bd4a7c7c1 Mon Sep 17 00:00:00 2001 From: Dom Del Nano Date: Thu, 16 Jan 2025 06:19:57 +0000 Subject: [PATCH 2/3] Use multi line comment to avoid backslash line continuation Signed-off-by: Dom Del Nano --- src/carnot/planner/logical_planner_test.cc | 24 ++++++++++++---------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/carnot/planner/logical_planner_test.cc b/src/carnot/planner/logical_planner_test.cc index 2fc232a4839..e551c2bf540 100644 --- a/src/carnot/planner/logical_planner_test.cc +++ b/src/carnot/planner/logical_planner_test.cc @@ -865,17 +865,19 @@ TEST_F(LogicalPlannerTest, pod_name_conversion_without_fallback) { ASSERT_OK(plan->ToProto()); } -// PxL query that contains a GRPC bridge with 2 branches where branch 1 is a blocking node -// and branch 2 contains a PEM only UDF func (not a metadata expression). This triggers a previous -// splitter bug where a PEM only UDF is incorrectly placed on the after blocking side of the bridge. -// Metadata expressions didn't have this problem since md annotations force PEM only scheduling, -// however, calling UDF only UDFs directly triggers this problem. -// -// The PxL below is roughly equivalent to the following problematic bridge: -// -// MemSrc -> Map -> Map (w/ PEM only UDF) -> GRPC Sink (df return) -// \ -// -> GRPC Sink (px.debug) +/* + * PxL query that contains a GRPC bridge with 2 branches where branch 1 is a blocking node + * and branch 2 contains a PEM only UDF func (not a metadata expression). This triggers a previous + * splitter bug where a PEM only UDF is incorrectly placed on the after blocking side of the bridge. + * Metadata expressions didn't have this problem since md annotations force PEM only scheduling, + * however, calling UDF only UDFs directly triggers this problem. + * + * The PxL below is roughly equivalent to the following problematic bridge: + * + * MemSrc -> Map -> Map (w/ PEM only UDF) -> GRPC Sink (df return) + * \ + * -> GRPC Sink (px.debug) + */ const char kBrokenUpidToPodNameQuery[] = R"pxl( import px def dns_flow_graph(): From e166c9544b1c7b5eae4bb768e9eb34622556e12d Mon Sep 17 00:00:00 2001 From: Dom Del Nano Date: Thu, 16 Jan 2025 16:33:54 +0000 Subject: [PATCH 3/3] Fix tests for gcc Signed-off-by: Dom Del Nano --- .../compiler/analyzer/convert_metadata_rule_test.cc | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/carnot/planner/compiler/analyzer/convert_metadata_rule_test.cc b/src/carnot/planner/compiler/analyzer/convert_metadata_rule_test.cc index f5313d2c0e5..119622118b0 100644 --- a/src/carnot/planner/compiler/analyzer/convert_metadata_rule_test.cc +++ b/src/carnot/planner/compiler/analyzer/convert_metadata_rule_test.cc @@ -115,6 +115,7 @@ TEST_F(ConvertMetadataRuleTest, missing_conversion_column) { } TEST_F(ConvertMetadataRuleTest, multichild_with_fallback_func) { + std::string pod_name_col_prefix = "pod_name_"; auto relation = Relation(http_events_relation); MetadataType conversion_column = MetadataType::UPID; std::string conversion_column_str = MetadataProperty::GetMetadataString(conversion_column); @@ -144,10 +145,12 @@ TEST_F(ConvertMetadataRuleTest, multichild_with_fallback_func) { EXPECT_NE(md_map, map); FuncIR* upid_to_pod_name = nullptr; + std::string upid_to_pod_name_col_name; for (auto col_expr : md_map->col_exprs()) { - if (col_expr.name == "pod_name_0") { + if (absl::StartsWith(col_expr.name, pod_name_col_prefix)) { EXPECT_MATCH(col_expr.node, Func()); upid_to_pod_name = static_cast(col_expr.node); + upid_to_pod_name_col_name = col_expr.name; } } EXPECT_NE(upid_to_pod_name, nullptr); @@ -161,10 +164,13 @@ TEST_F(ConvertMetadataRuleTest, multichild_with_fallback_func) { EXPECT_EQ(1, md_map->Children().size()); auto fallback_map = static_cast(md_map->Children()[0]); FuncIR* fallback_func_select = nullptr; + std::string fallback_func_select_col_name; for (auto col_expr : fallback_map->col_exprs()) { - if (col_expr.name == "pod_name_1") { + if (absl::StartsWith(col_expr.name, pod_name_col_prefix) && + col_expr.name != upid_to_pod_name_col_name) { EXPECT_MATCH(col_expr.node, Func()); fallback_func_select = static_cast(col_expr.node); + fallback_func_select_col_name = col_expr.name; } } @@ -177,7 +183,7 @@ TEST_F(ConvertMetadataRuleTest, multichild_with_fallback_func) { auto equals_func = static_cast(orig_func_check); EXPECT_EQ("equal", equals_func->func_name()); EXPECT_EQ(2, equals_func->all_args().size()); - EXPECT_MATCH(equals_func->all_args()[0], ColumnNode("pod_name_0")); + EXPECT_MATCH(equals_func->all_args()[0], ColumnNode(upid_to_pod_name_col_name)); EXPECT_MATCH(equals_func->all_args()[1], String("")); EXPECT_MATCH(orig_func_check, ResolvedExpression());