Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make metadata pod lookups more resilient to short lived processes #2078

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/carnot/funcs/metadata/metadata_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ void RegisterMetadataOpsOrDie(px::carnot::udf::Registry* registry) {
registry->RegisterOrDie<HasServiceNameUDF>("has_service_name");
registry->RegisterOrDie<HasValueUDF>("has_value");
registry->RegisterOrDie<IPToPodIDUDF>("ip_to_pod_id");
registry->RegisterOrDie<IPToPodIDAtTimePEMExecUDF>("_ip_to_pod_id_pem_exec");
registry->RegisterOrDie<IPToPodIDAtTimeUDF>("ip_to_pod_id");
registry->RegisterOrDie<PodIDToPodNameUDF>("pod_id_to_pod_name");
registry->RegisterOrDie<PodIDToPodLabelsUDF>("pod_id_to_pod_labels");
Expand Down
19 changes: 19 additions & 0 deletions src/carnot/funcs/metadata/metadata_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -2977,6 +2977,25 @@ class IPToPodIDUDF : public ScalarUDF {
static udfspb::UDFSourceExecutor Executor() { return udfspb::UDFSourceExecutor::UDF_KELVIN; }
};

/**
* This UDF is a compiler internal function. It should only be used when the IP address is
* guaranteed to be a local address since this function is forced to run on PEMs. In cases
* where the IP could be a remote address, then it is more correct to have the function run on
* Kelvin (IPToPodIDUDF or IPToPodIDAtTimeUDF).
*/
class IPToPodIDAtTimePEMExecUDF : public ScalarUDF {
public:
/**
* @brief Gets the pod id of pod with given pod_ip and time
*/
StringValue Exec(FunctionContext* ctx, StringValue pod_ip, Time64NSValue time) {
auto md = GetMetadataState(ctx);
return md->k8s_metadata_state().PodIDByIPAtTime(pod_ip, time.val);
}

static udfspb::UDFSourceExecutor Executor() { return udfspb::UDFSourceExecutor::UDF_PEM; }
};

class IPToPodIDAtTimeUDF : public ScalarUDF {
public:
/**
Expand Down
130 changes: 126 additions & 4 deletions src/carnot/planner/compiler/analyzer/convert_metadata_rule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,58 @@ namespace carnot {
namespace planner {
namespace compiler {

namespace {
std::string GetUniquePodNameCol(std::shared_ptr<TableType> parent_type,
absl::flat_hash_set<std::string>* used_column_names) {
auto col_name_counter = 0;
do {
auto new_col = absl::StrCat("pod_name_", col_name_counter++);
if (!used_column_names->contains(new_col) && !parent_type->HasColumn(new_col)) {
used_column_names->insert(new_col);
return new_col;
}
} while (true);
}
} // namespace

Status ConvertMetadataRule::AddPodNameConversionMapsWithFallback(
IR* graph, IRNode* container, ExpressionIR* metadata_expr, ExpressionIR* fallback_expr,
const std::pair<std::string, std::string>& col_names) const {
if (Match(container, Func())) {
for (int64_t parent_id : graph->dag().ParentsOf(container->id())) {
PX_RETURN_IF_ERROR(AddPodNameConversionMapsWithFallback(
graph, graph->Get(parent_id), metadata_expr, fallback_expr, col_names));
}
} else if (Match(container, Operator())) {
auto container_op = static_cast<OperatorIR*>(container);
for (auto parent_op : container_op->parents()) {
auto metadata_col_expr = ColumnExpression(col_names.first, metadata_expr);
auto fallback_col_expr = ColumnExpression(col_names.second, fallback_expr);
PX_ASSIGN_OR_RETURN(
auto md_map_ir,
graph->CreateNode<MapIR>(container->ast(), parent_op,
std::vector<ColumnExpression>{metadata_col_expr}, true));
PX_ASSIGN_OR_RETURN(
auto fallback_md_map_ir,
graph->CreateNode<MapIR>(container->ast(), static_cast<OperatorIR*>(md_map_ir),
std::vector<ColumnExpression>{fallback_col_expr}, true));
PX_RETURN_IF_ERROR(container_op->ReplaceParent(parent_op, fallback_md_map_ir));

for (auto child : parent_op->Children()) {
if (child == md_map_ir) {
continue;
}
PX_RETURN_IF_ERROR(child->ReplaceParent(parent_op, md_map_ir));
}

PX_RETURN_IF_ERROR(PropagateTypeChangesFromNode(graph, md_map_ir, compiler_state_));
PX_RETURN_IF_ERROR(PropagateTypeChangesFromNode(graph, fallback_md_map_ir, compiler_state_));
}
}

return Status::OK();
}

Status ConvertMetadataRule::UpdateMetadataContainer(IRNode* container, MetadataIR* metadata,
ExpressionIR* metadata_expr) const {
if (Match(container, Func())) {
Expand Down Expand Up @@ -70,6 +122,12 @@ StatusOr<std::string> ConvertMetadataRule::FindKeyColumn(std::shared_ptr<TableTy
absl::StrJoin(parent_type->ColumnNames(), ","));
}

bool CheckBackupConversionAvailable(std::shared_ptr<TableType> parent_type,
const std::string& func_name) {
return parent_type->HasColumn("time_") && parent_type->HasColumn("local_addr") &&
func_name == "upid_to_pod_name";
}

StatusOr<bool> ConvertMetadataRule::Apply(IRNode* ir_node) {
if (!Match(ir_node, Metadata())) {
return false;
Expand All @@ -85,8 +143,9 @@ StatusOr<bool> ConvertMetadataRule::Apply(IRNode* ir_node) {
PX_ASSIGN_OR_RETURN(auto parent, metadata->ReferencedOperator());
PX_ASSIGN_OR_RETURN(auto containing_ops, metadata->ContainingOperators());

auto resolved_table_type = parent->resolved_table_type();
PX_ASSIGN_OR_RETURN(std::string key_column_name,
FindKeyColumn(parent->resolved_table_type(), md_property, ir_node));
FindKeyColumn(resolved_table_type, md_property, ir_node));

PX_ASSIGN_OR_RETURN(ColumnIR * key_column,
graph->CreateNode<ColumnIR>(ir_node->ast(), key_column_name, parent_op_idx));
Expand All @@ -96,19 +155,82 @@ StatusOr<bool> ConvertMetadataRule::Apply(IRNode* ir_node) {
FuncIR * conversion_func,
graph->CreateNode<FuncIR>(ir_node->ast(), FuncIR::Op{FuncIR::Opcode::non_op, "", func_name},
std::vector<ExpressionIR*>{key_column}));
FuncIR* orig_conversion_func = conversion_func;
ExpressionIR* conversion_expr = static_cast<ExpressionIR*>(conversion_func);

// TODO(ddelnano): Until the short lived process issue (gh#1638) is resolved, add a fallback
// conversion function that uses local_addr for pod lookups when the upid based default
// (upid_to_pod_name) fails. This turns the `df.ctx["pod"]` lookup into the following pseudo code:
//
// fallback = px.pod_id_to_pod_name(px.ip_to_pod_id(df.ctx["local_addr"]))
// df.pod = px.select(px.upid_to_pod_name(df.upid) == "", fallback, px.upid_to_pod_name(df.upid))
FuncIR* backup_conversion_func = nullptr;
auto backup_conversion_available = CheckBackupConversionAvailable(resolved_table_type, func_name);
std::pair<std::string, std::string> col_names;
if (backup_conversion_available) {
absl::flat_hash_set<std::string> used_column_names;
col_names = std::make_pair(GetUniquePodNameCol(resolved_table_type, &used_column_names),
GetUniquePodNameCol(resolved_table_type, &used_column_names));
PX_ASSIGN_OR_RETURN(ColumnIR * local_addr_col,
graph->CreateNode<ColumnIR>(ir_node->ast(), "local_addr", parent_op_idx));
PX_ASSIGN_OR_RETURN(ColumnIR * time_col,
graph->CreateNode<ColumnIR>(ir_node->ast(), "time_", parent_op_idx));
PX_ASSIGN_OR_RETURN(
ColumnIR * md_expr_col,
graph->CreateNode<ColumnIR>(ir_node->ast(), col_names.first, parent_op_idx));
PX_ASSIGN_OR_RETURN(
FuncIR * ip_conversion_func,
graph->CreateNode<FuncIR>(ir_node->ast(),
FuncIR::Op{FuncIR::Opcode::non_op, "", "_ip_to_pod_id_pem_exec"},
std::vector<ExpressionIR*>{local_addr_col, time_col}));

// This doesn't need to have a "pem exec" equivalent function as long as the metadata
// annotation is set as done below
PX_ASSIGN_OR_RETURN(
backup_conversion_func,
graph->CreateNode<FuncIR>(
ir_node->ast(), FuncIR::Op{FuncIR::Opcode::non_op, "", "pod_id_to_pod_name"},
std::vector<ExpressionIR*>{static_cast<ExpressionIR*>(ip_conversion_func)}));

backup_conversion_func->set_annotations(ExpressionIR::Annotations(md_type));

PX_ASSIGN_OR_RETURN(ExpressionIR * empty_string,
graph->CreateNode<StringIR>(ir_node->ast(), ""));
PX_ASSIGN_OR_RETURN(
FuncIR * select_expr,
graph->CreateNode<FuncIR>(
ir_node->ast(), FuncIR::Op{FuncIR::Opcode::eq, "==", "equal"},
std::vector<ExpressionIR*>{static_cast<ExpressionIR*>(md_expr_col), empty_string}));
PX_ASSIGN_OR_RETURN(auto duplicate_md_expr_col, graph->CopyNode<ColumnIR>(md_expr_col));
PX_ASSIGN_OR_RETURN(
FuncIR * select_func,
graph->CreateNode<FuncIR>(
ir_node->ast(), FuncIR::Op{FuncIR::Opcode::non_op, "", "select"},
std::vector<ExpressionIR*>{static_cast<ExpressionIR*>(select_expr),
backup_conversion_func, duplicate_md_expr_col}));

conversion_func = select_func;
PX_ASSIGN_OR_RETURN(conversion_expr, graph->CreateNode<ColumnIR>(
ir_node->ast(), col_names.second, parent_op_idx));
}

for (int64_t parent_id : graph->dag().ParentsOf(metadata->id())) {
// For each container node of the metadata expression, update it to point to the
// new conversion func instead.
PX_RETURN_IF_ERROR(UpdateMetadataContainer(graph->Get(parent_id), metadata, conversion_func));
auto container = graph->Get(parent_id);
PX_RETURN_IF_ERROR(UpdateMetadataContainer(container, metadata, conversion_expr));
if (backup_conversion_available) {
PX_RETURN_IF_ERROR(AddPodNameConversionMapsWithFallback(
graph, container, orig_conversion_func, conversion_func, col_names));
}
}

// Propagate type changes from the new conversion_func.
PX_RETURN_IF_ERROR(PropagateTypeChangesFromNode(graph, conversion_func, compiler_state_));

DCHECK_EQ(conversion_func->EvaluatedDataType(), column_type)
<< "Expected the parent key column type and metadata property type to match.";
conversion_func->set_annotations(ExpressionIR::Annotations(md_type));

orig_conversion_func->set_annotations(ExpressionIR::Annotations(md_type));
return true;
}

Expand Down
26 changes: 26 additions & 0 deletions src/carnot/planner/compiler/analyzer/convert_metadata_rule.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <memory>
#include <string>
#include <utility>

#include "src/carnot/planner/compiler_state/compiler_state.h"
#include "src/carnot/planner/rules/rules.h"
Expand All @@ -46,6 +47,31 @@ class ConvertMetadataRule : public Rule {
ExpressionIR* metadata_expr) const;
StatusOr<std::string> FindKeyColumn(std::shared_ptr<TableType> parent_type,
MetadataProperty* property, IRNode* node_for_error) const;

/**
*
* This function aids in applying a fallback to a metadata conversion expression.
* It works by adding two conversion maps to the graph. The first map contains a column expression
* that contains the metadata expression result. The second map contains a column expression that
* contains the fallback expression result. It is intended to be used as a short term workaround
* for gh#1638 where pod name lookups fail for short lived processes. This fallback expression
* allows for an alternative mechanism if the primary lookup fails. See the example below:
*
* Before applying the rule:
*
* MemorySource -> MapIR (containing MetadataIR)
*
* After applying the rule:
*
* MemorySource
* -> MapIR (col_names[0]: metadata_expr)
* -> MapIR (col_names[1]: fallback_expr)
* -> MapIR (col_names[1] & existing cols)
*/

Status AddPodNameConversionMapsWithFallback(
IR* graph, IRNode* container, ExpressionIR* metadata_expr, ExpressionIR* fallback_expr,
const std::pair<std::string, std::string>& col_names) const;
};

} // namespace compiler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ using table_store::schema::Relation;

using ConvertMetadataRuleTest = RulesTest;

TEST_F(ConvertMetadataRuleTest, multichild) {
TEST_F(ConvertMetadataRuleTest, multichild_without_fallback_func) {
auto relation = Relation(cpu_relation);
MetadataType conversion_column = MetadataType::UPID;
std::string conversion_column_str = MetadataProperty::GetMetadataString(conversion_column);
Expand Down Expand Up @@ -114,6 +114,101 @@ TEST_F(ConvertMetadataRuleTest, missing_conversion_column) {
skip_check_stray_nodes_ = true;
}

TEST_F(ConvertMetadataRuleTest, multichild_with_fallback_func) {
std::string pod_name_col_prefix = "pod_name_";
auto relation = Relation(http_events_relation);
MetadataType conversion_column = MetadataType::UPID;
std::string conversion_column_str = MetadataProperty::GetMetadataString(conversion_column);
relation.AddColumn(types::DataType::UINT128, conversion_column_str);
compiler_state_->relation_map()->emplace("table", relation);

auto metadata_name = "pod_name";
MetadataProperty* property = md_handler->GetProperty(metadata_name).ValueOrDie();
MetadataIR* metadata_ir = MakeMetadataIR(metadata_name, /* parent_op_idx */ 0);
metadata_ir->set_property(property);

auto src = MakeMemSource(relation);
auto map = MakeMap(src, {{"md", metadata_ir}});

ResolveTypesRule type_rule(compiler_state_.get());
ASSERT_OK(type_rule.Execute(graph.get()));

ConvertMetadataRule rule(compiler_state_.get());
auto result = rule.Execute(graph.get());
ASSERT_OK(result);
EXPECT_TRUE(result.ValueOrDie());

EXPECT_EQ(0, graph->FindNodesThatMatch(Metadata()).size());

EXPECT_EQ(1, src->Children().size());
auto md_map = static_cast<MapIR*>(src->Children()[0]);
EXPECT_NE(md_map, map);

FuncIR* upid_to_pod_name = nullptr;
std::string upid_to_pod_name_col_name;
for (auto col_expr : md_map->col_exprs()) {
if (absl::StartsWith(col_expr.name, pod_name_col_prefix)) {
EXPECT_MATCH(col_expr.node, Func());
upid_to_pod_name = static_cast<FuncIR*>(col_expr.node);
upid_to_pod_name_col_name = col_expr.name;
}
}
EXPECT_NE(upid_to_pod_name, nullptr);
EXPECT_EQ(absl::Substitute("upid_to_$0", metadata_name), upid_to_pod_name->func_name());
EXPECT_EQ(1, upid_to_pod_name->all_args().size());
auto input_col = upid_to_pod_name->all_args()[0];
EXPECT_MATCH(input_col, ColumnNode("upid"));
EXPECT_MATCH(upid_to_pod_name, ResolvedExpression());
EXPECT_MATCH(input_col, ResolvedExpression());

EXPECT_EQ(1, md_map->Children().size());
auto fallback_map = static_cast<MapIR*>(md_map->Children()[0]);
FuncIR* fallback_func_select = nullptr;
std::string fallback_func_select_col_name;
for (auto col_expr : fallback_map->col_exprs()) {
if (absl::StartsWith(col_expr.name, pod_name_col_prefix) &&
col_expr.name != upid_to_pod_name_col_name) {
EXPECT_MATCH(col_expr.node, Func());
fallback_func_select = static_cast<FuncIR*>(col_expr.node);
fallback_func_select_col_name = col_expr.name;
}
}

EXPECT_NE(fallback_func_select, nullptr);
EXPECT_EQ("select", fallback_func_select->func_name());
EXPECT_EQ(3, fallback_func_select->all_args().size());

auto orig_func_check = fallback_func_select->all_args()[0];
EXPECT_MATCH(orig_func_check, Func());
auto equals_func = static_cast<FuncIR*>(orig_func_check);
EXPECT_EQ("equal", equals_func->func_name());
EXPECT_EQ(2, equals_func->all_args().size());
EXPECT_MATCH(equals_func->all_args()[0], ColumnNode(upid_to_pod_name_col_name));
EXPECT_MATCH(equals_func->all_args()[1], String(""));
EXPECT_MATCH(orig_func_check, ResolvedExpression());

EXPECT_MATCH(fallback_func_select->all_args()[1], Func());
auto fallback_func = static_cast<FuncIR*>(fallback_func_select->all_args()[1]);
EXPECT_EQ("pod_id_to_pod_name", fallback_func->func_name());
EXPECT_EQ(1, fallback_func->all_args().size());
EXPECT_MATCH(fallback_func->all_args()[0], Func());
EXPECT_MATCH(fallback_func, ResolvedExpression());

auto ip_func = static_cast<FuncIR*>(fallback_func->all_args()[0]);
EXPECT_EQ("_ip_to_pod_id_pem_exec", ip_func->func_name());
EXPECT_EQ(2, ip_func->all_args().size());
EXPECT_MATCH(ip_func->all_args()[0], ColumnNode("local_addr"));
EXPECT_MATCH(ip_func->all_args()[1], ColumnNode("time_"));
EXPECT_MATCH(ip_func, ResolvedExpression());

// Check that the semantic type of the conversion func is propagated properly.
auto type_or_s = map->resolved_table_type()->GetColumnType("md");
ASSERT_OK(type_or_s);
auto type = std::static_pointer_cast<ValueType>(type_or_s.ConsumeValueOrDie());
EXPECT_EQ(types::STRING, type->data_type());
EXPECT_EQ(types::ST_POD_NAME, type->semantic_type());
}

} // namespace compiler
} // namespace planner
} // namespace carnot
Expand Down
8 changes: 8 additions & 0 deletions src/carnot/planner/compiler/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -860,10 +860,15 @@ class RulesTest : public OperatorTests {
std::vector<types::DataType>({types::DataType::INT64, types::DataType::FLOAT64,
types::DataType::FLOAT64, types::DataType::FLOAT64}),
std::vector<std::string>({"count", "cpu0", "cpu1", "cpu2"}));
http_events_relation = table_store::schema::Relation(
std::vector<types::DataType>(
{types::DataType::TIME64NS, types::DataType::STRING, types::DataType::INT64}),
std::vector<std::string>({"time_", "local_addr", "local_port"}));
semantic_rel =
Relation({types::INT64, types::FLOAT64, types::STRING}, {"bytes", "cpu", "str_col"},
{types::ST_BYTES, types::ST_PERCENT, types::ST_NONE});
rel_map->emplace("cpu", cpu_relation);
rel_map->emplace("http_events", http_events_relation);
rel_map->emplace("semantic_table", semantic_rel);

compiler_state_ = std::make_unique<CompilerState>(
Expand Down Expand Up @@ -935,6 +940,7 @@ class RulesTest : public OperatorTests {
std::unique_ptr<RegistryInfo> info_;
int64_t time_now = 1552607213931245000;
table_store::schema::Relation cpu_relation;
table_store::schema::Relation http_events_relation;
table_store::schema::Relation semantic_rel;
std::unique_ptr<MetadataHandler> md_handler;
};
Expand Down Expand Up @@ -1110,6 +1116,8 @@ class ASTVisitorTest : public OperatorTests {
Relation http_events_relation;
http_events_relation.AddColumn(types::TIME64NS, "time_");
http_events_relation.AddColumn(types::UINT128, "upid");
http_events_relation.AddColumn(types::STRING, "local_addr");
http_events_relation.AddColumn(types::INT64, "local_port");
http_events_relation.AddColumn(types::STRING, "remote_addr");
http_events_relation.AddColumn(types::INT64, "remote_port");
http_events_relation.AddColumn(types::INT64, "major_version");
Expand Down
Loading
Loading