diff --git a/unified_planning/model/mixins/metrics.py b/unified_planning/model/mixins/metrics.py index 0dc2f817f..7e74d9e54 100644 --- a/unified_planning/model/mixins/metrics.py +++ b/unified_planning/model/mixins/metrics.py @@ -13,13 +13,11 @@ # limitations under the License. # -from typing import Dict, List, Optional, Set, Tuple +from typing import Dict, List, Optional import unified_planning as up from unified_planning.model.metrics import PlanQualityMetric -from unified_planning.model.problem_kind import ProblemKind from unified_planning.model.mixins import ActionsSetMixin -from unified_planning.exceptions import UPProblemDefinitionError class MetricsMixin: @@ -78,88 +76,3 @@ def _clone_to(self, other: "MetricsMixin", new_actions: Optional[ActionsSetMixin else: cloned.append(m) other._metrics = cloned - - def _update_kind_metric( - self, - kind: ProblemKind, - linear_checker: "up.model.walkers.linear_checker.LinearChecker", - static_fluents: Set["up.model.Fluent"], - ) -> Tuple[Set["up.model.Fluent"], Set["up.model.Fluent"]]: - """Updates the kind object passed as parameter to account for given metrics. - Return a pair for fluent sets that should respectively be only increased/decreased - (necessary for checking numeric problem kind properties). - """ - fluents_to_only_increase = set() - fluents_to_only_decrease = set() - fve = self._env.free_vars_extractor - for metric in self._metrics: - if metric.is_minimize_expression_on_final_state(): - assert isinstance( - metric, up.model.metrics.MinimizeExpressionOnFinalState - ) - kind.set_quality_metrics("FINAL_VALUE") - ( - is_linear, - fnode_to_only_increase, # positive fluents in minimize can only be increased - fnode_to_only_decrease, # negative fluents in minimize can only be decreased - ) = linear_checker.get_fluents(metric.expression) - if is_linear: - fluents_to_only_increase = { - e.fluent() for e in fnode_to_only_increase - } - fluents_to_only_decrease = { - e.fluent() for e in fnode_to_only_decrease - } - else: - kind.unset_problem_type("SIMPLE_NUMERIC_PLANNING") - elif metric.is_maximize_expression_on_final_state(): - assert isinstance( - metric, up.model.metrics.MaximizeExpressionOnFinalState - ) - kind.set_quality_metrics("FINAL_VALUE") - ( - is_linear, - fnode_to_only_decrease, # positive fluents in maximize can only be decreased - fnode_to_only_increase, # negative fluents in maximize can only be increased - ) = linear_checker.get_fluents(metric.expression) - if is_linear: - fluents_to_only_increase = { - e.fluent() for e in fnode_to_only_increase - } - fluents_to_only_decrease = { - e.fluent() for e in fnode_to_only_decrease - } - else: - kind.unset_problem_type("SIMPLE_NUMERIC_PLANNING") - elif metric.is_minimize_action_costs(): - assert isinstance(metric, up.model.metrics.MinimizeActionCosts) - kind.set_quality_metrics("ACTIONS_COST") - for cost in metric.costs.values(): - if cost is None: - raise UPProblemDefinitionError( - "The cost of an Action can't be None." - ) - if metric.default is not None: - for f in fve.get(metric.default): - if f.fluent() in static_fluents: - kind.set_actions_cost_kind( - "STATIC_FLUENTS_IN_ACTIONS_COST" - ) - else: - kind.set_actions_cost_kind("FLUENTS_IN_ACTIONS_COST") - for f in fve.get(cost): - if f.fluent() in static_fluents: - kind.set_actions_cost_kind("STATIC_FLUENTS_IN_ACTIONS_COST") - else: - kind.set_actions_cost_kind("FLUENTS_IN_ACTIONS_COST") - elif metric.is_minimize_makespan(): - kind.set_quality_metrics("MAKESPAN") - elif metric.is_minimize_sequential_plan_length(): - kind.set_quality_metrics("PLAN_LENGTH") - elif metric.is_oversubscription(): - kind.set_quality_metrics("OVERSUBSCRIPTION") - elif metric.is_temporal_oversubscription(): - kind.set_quality_metrics("TEMPORAL_OVERSUBSCRIPTION") - else: - assert False, "Unknown quality metric" - return fluents_to_only_increase, fluents_to_only_decrease diff --git a/unified_planning/model/problem.py b/unified_planning/model/problem.py index dfe81d85d..7f68ff45e 100644 --- a/unified_planning/model/problem.py +++ b/unified_planning/model/problem.py @@ -742,9 +742,7 @@ def __init__( ( fluents_to_only_increase, fluents_to_only_decrease, - ) = self.pb._update_kind_metric( - self.kind, self.linear_checker, self.static_fluents - ) + ) = self.update_problem_kind_metric() # fluents that can only be increased (resp. decreased) for the problem to be SIMPLE_NUMERIC_PLANNING self.fluents_to_only_increase: Set[Fluent] = fluents_to_only_increase self.fluents_to_only_decrease: Set[Fluent] = fluents_to_only_decrease @@ -1005,6 +1003,101 @@ def update_problem_kind_action( else: raise NotImplementedError + def update_problem_kind_metric( + self, + ) -> Tuple[Set["up.model.Fluent"], Set["up.model.Fluent"]]: + """Updates the kind object passed as parameter to account for given metrics. + Return a pair for fluent sets that should respectively be only increased/decreased + (necessary for checking numeric problem kind properties). + """ + fluents_to_only_increase = set() + fluents_to_only_decrease = set() + fve = self.pb.environment.free_vars_extractor + for metric in self.pb.quality_metrics: + ovsb_goals: Iterable["up.model.fnode.FNode"] = tuple() + if metric.is_minimize_expression_on_final_state(): + assert isinstance( + metric, up.model.metrics.MinimizeExpressionOnFinalState + ) + self.kind.set_quality_metrics("FINAL_VALUE") + self.update_problem_kind_expression(metric.expression) + ( + is_linear, + fnode_to_only_increase, # positive fluents in minimize can only be increased + fnode_to_only_decrease, # negative fluents in minimize can only be decreased + ) = self.linear_checker.get_fluents(metric.expression) + if is_linear: + fluents_to_only_increase = { + e.fluent() for e in fnode_to_only_increase + } + fluents_to_only_decrease = { + e.fluent() for e in fnode_to_only_decrease + } + else: + self.kind.unset_problem_type("SIMPLE_NUMERIC_PLANNING") + elif metric.is_maximize_expression_on_final_state(): + assert isinstance( + metric, up.model.metrics.MaximizeExpressionOnFinalState + ) + self.kind.set_quality_metrics("FINAL_VALUE") + self.update_problem_kind_expression(metric.expression) + ( + is_linear, + fnode_to_only_decrease, # positive fluents in maximize can only be decreased + fnode_to_only_increase, # negative fluents in maximize can only be increased + ) = self.linear_checker.get_fluents(metric.expression) + if is_linear: + fluents_to_only_increase = { + e.fluent() for e in fnode_to_only_increase + } + fluents_to_only_decrease = { + e.fluent() for e in fnode_to_only_decrease + } + else: + self.kind.unset_problem_type("SIMPLE_NUMERIC_PLANNING") + elif metric.is_minimize_action_costs(): + assert isinstance(metric, up.model.metrics.MinimizeActionCosts) + self.kind.set_quality_metrics("ACTIONS_COST") + for cost in metric.costs.values(): + if cost is None: + raise UPProblemDefinitionError( + "The cost of an Action can't be None." + ) + if metric.default is not None: + for f in fve.get(metric.default): + if f.fluent() in self.static_fluents: + self.kind.set_actions_cost_kind( + "STATIC_FLUENTS_IN_ACTIONS_COST" + ) + else: + self.kind.set_actions_cost_kind( + "FLUENTS_IN_ACTIONS_COST" + ) + for f in fve.get(cost): + if f.fluent() in self.static_fluents: + self.kind.set_actions_cost_kind( + "STATIC_FLUENTS_IN_ACTIONS_COST" + ) + else: + self.kind.set_actions_cost_kind("FLUENTS_IN_ACTIONS_COST") + elif metric.is_minimize_makespan(): + self.kind.set_quality_metrics("MAKESPAN") + elif metric.is_minimize_sequential_plan_length(): + self.kind.set_quality_metrics("PLAN_LENGTH") + elif metric.is_oversubscription(): + assert isinstance(metric, up.model.Oversubscription) + self.kind.set_quality_metrics("OVERSUBSCRIPTION") + ovsb_goals = metric.goals.keys() + elif metric.is_temporal_oversubscription(): + assert isinstance(metric, up.model.TemporalOversubscription) + self.kind.set_quality_metrics("TEMPORAL_OVERSUBSCRIPTION") + ovsb_goals = map(lambda x: x[1], metric.goals.keys()) + else: + assert False, "Unknown quality metric" + for goal in ovsb_goals: + self.update_problem_kind_expression(goal) + return fluents_to_only_increase, fluents_to_only_decrease + def generate_causal_graph( problem: Problem,