diff --git a/CONTRIBUTORS b/CONTRIBUTORS index 455051a9e..be6d7a39f 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -19,6 +19,7 @@ Luca Framba Luigi Bonassi Miguel Ramirez Roland Godet +Samuel Gobbi Sebastian Stock Selvakumar Hastham Sathiya Satchi Sadanandam Srajan Goyal diff --git a/docs/contributions.rst b/docs/contributions.rst index dad356e86..8b1056e8c 100644 --- a/docs/contributions.rst +++ b/docs/contributions.rst @@ -17,9 +17,10 @@ Contributors to the repository include: - Luca Framba - Luigi Bonassi - Roland Godet +- Samuel Gobbi - Selvakumar Hastham Sathiya Satchi Sadanandam - Srajan Goyal - Uwe Köckemann Alternate source for contributions: -https://github.com/aiplan4eu/unified-planning/graphs/contributors \ No newline at end of file +https://github.com/aiplan4eu/unified-planning/graphs/contributors diff --git a/docs/problem_representation.rst b/docs/problem_representation.rst index 61f155c7a..24194d66e 100644 --- a/docs/problem_representation.rst +++ b/docs/problem_representation.rst @@ -63,6 +63,12 @@ Problem Kinds * - - SELF_OVERLAPPING - The temporal planning problem allows actions self overlapping. + * - + - PROCESSES + - The problem contains processes, natural transitions that occur automatically over time as their pre-conditions are met. + * - + - EVENTS + - The problem contains events, natural transitions that occur automatically at a single time point when their pre-conditions are met. * - EXPRESSION_DURATION - STATIC_FLUENTS_IN_DURATION - The duration of at least one action uses static fluents (that may never change). @@ -111,6 +117,15 @@ Problem Kinds * - - DECREASE_EFFECTS - At least one effect uses the numeric decrement operator. + * - + - INCREASE_CONTINUOUS_EFFECTS + - At least one effect uses the continuous numeric increment operator. + * - + - DECREASE_CONTINUOUS_EFFECTS + - At least one effect uses the continuous numeric decrement operator. + * - + - NON_LINEAR_CONTINUOUS_EFFECTS + - At least one continuous effect is described by a differential equation that depends on a continuous variable. * - - STATIC_FLUENTS_IN_BOOLEAN_ASSIGNMENTS - At least one effect uses a static fluent in the expression of a boolean assignment. diff --git a/unified_planning/io/pddl_reader.py b/unified_planning/io/pddl_reader.py index 745d8ce4f..a20d6f99a 100644 --- a/unified_planning/io/pddl_reader.py +++ b/unified_planning/io/pddl_reader.py @@ -101,7 +101,7 @@ def __init__(self): + ":requirements" + OneOrMore( one_of( - ":strips :typing :negative-preconditions :disjunctive-preconditions :equality :existential-preconditions :universal-preconditions :quantified-preconditions :conditional-effects :fluents :numeric-fluents :adl :durative-actions :duration-inequalities :timed-initial-literals :timed-initial-effects :action-costs :hierarchy :method-preconditions :constraints :contingent :preferences" + ":strips :typing :negative-preconditions :disjunctive-preconditions :equality :existential-preconditions :universal-preconditions :quantified-preconditions :conditional-effects :fluents :numeric-fluents :adl :durative-actions :duration-inequalities :timed-initial-literals :timed-initial-effects :action-costs :hierarchy :method-preconditions :constraints :contingent :preferences :time" ) ) + Suppress(")") @@ -413,7 +413,7 @@ def __init__(self, environment: typing.Optional[Environment] = None): def _parse_exp( self, problem: up.model.Problem, - act: typing.Optional[Union[up.model.Action, htn.Method, htn.TaskNetwork]], + act: typing.Optional[Union[up.model.Transition, htn.Method, htn.TaskNetwork]], types_map: TypesMap, var: Dict[str, up.model.Variable], exp: CustomParseResults, @@ -655,7 +655,7 @@ def _add_effect( if isinstance(act, up.model.Process): eff1 = (eff[0], eff[1].simplify()) - act.add_derivative(*eff1) + act.add_increase_continuous_effect(*eff1) else: act.add_increase_effect(*eff if timing is None else (timing, *eff)) # type: ignore elif op == "decrease": @@ -669,8 +669,8 @@ def _add_effect( cond, ) if isinstance(act, up.model.Process): - eff1 = (eff[0], (eff[1] * (-1)).simplify()) - act.add_derivative(*eff1) + eff1 = (eff[0], eff[1].simplify()) + act.add_decrease_continuous_effect(*eff1) else: act.add_decrease_effect(*eff if timing is None else (timing, *eff)) # type: ignore elif op == "forall": @@ -1292,7 +1292,7 @@ def declare_type( CustomParseResults(a["eff"][0]), domain_str, ) - problem.add_action(proc) + problem.add_process(proc) for a in domain_res.get("events", []): n = a["name"] @@ -1317,7 +1317,7 @@ def declare_type( CustomParseResults(a["eff"][0]), domain_str, ) - problem.add_action(evt) + problem.add_event(evt) for a in domain_res.get("actions", []): n = a["name"] diff --git a/unified_planning/io/pddl_writer.py b/unified_planning/io/pddl_writer.py index 8110da3e2..c4f39f561 100644 --- a/unified_planning/io/pddl_writer.py +++ b/unified_planning/io/pddl_writer.py @@ -139,6 +139,7 @@ WithName = Union[ "up.model.Type", "up.model.Action", + "up.model.NaturalTransition", "up.model.Fluent", "up.model.Object", "up.model.Parameter", @@ -443,6 +444,8 @@ def _write_domain(self, out: IO[str]): out.write(" :hierarchy") # HTN / HDDL if self.problem_kind.has_method_preconditions(): out.write(" :method-preconditions") + if self.problem_kind.has_processes() or self.problem_kind.has_events(): + out.write(" :time") out.write(")\n") if self.problem_kind.has_hierarchical_typing(): @@ -530,7 +533,9 @@ def _write_domain(self, out: IO[str]): converter = ConverterToPDDLString( self.problem.environment, self._get_mangled_name ) - costs = {} + costs: Dict[ + Union[up.model.NaturalTransition, up.model.Action], Optional[up.model.FNode] + ] = {} metrics = self.problem.quality_metrics if len(metrics) == 1: metric = metrics[0] @@ -595,74 +600,15 @@ def _write_domain(self, out: IO[str]): out.write(")\n") for a in self.problem.actions: - if isinstance(a, up.model.InstantaneousAction) or isinstance( - a, up.model.Event - ): + if isinstance(a, up.model.InstantaneousAction): if any(p.simplify().is_false() for p in a.preconditions): continue - if isinstance(a, up.model.Event): - out.write(f" (:event {self._get_mangled_name(a)}") - else: - out.write(f" (:action {self._get_mangled_name(a)}") + out.write(f" (:action {self._get_mangled_name(a)}") out.write(f"\n :parameters (") self._write_parameters(out, a) out.write(")") - if len(a.preconditions) > 0: - precond_str = [] - for p in (c.simplify() for c in a.preconditions): - if not p.is_true(): - if p.is_and(): - precond_str.extend(map(converter.convert, p.args)) - else: - precond_str.append(converter.convert(p)) - out.write(f'\n :precondition (and {" ".join(precond_str)})') - elif len(a.preconditions) == 0 and self.empty_preconditions: - out.write(f"\n :precondition ()") - if len(a.effects) > 0: - out.write("\n :effect (and") - for e in a.effects: - _write_effect( - e, - None, - out, - converter, - self.rewrite_bool_assignments, - self._get_mangled_name, - ) - - if a in costs: - out.write( - f" (increase (total-cost) {converter.convert(costs[a])})" - ) - out.write(")") - out.write(")\n") - elif isinstance(a, up.model.Process): - if any(p.simplify().is_false() for p in a.preconditions): - continue - out.write(f" (:process {self._get_mangled_name(a)}") - out.write(f"\n :parameters (") - self._write_parameters(out, a) - out.write(")") - if len(a.preconditions) > 0: - precond_str = [] - for p in (c.simplify() for c in a.preconditions): - if not p.is_true(): - if p.is_and(): - precond_str.extend(map(converter.convert, p.args)) - else: - precond_str.append(converter.convert(p)) - out.write(f'\n :precondition (and {" ".join(precond_str)})') - elif len(a.preconditions) == 0 and self.empty_preconditions: - out.write(f"\n :precondition ()") - if len(a.effects) > 0: - out.write("\n :effect (and") - for e in a.effects: - _write_derivative( - e, - out, - converter, - ) - out.write(")") + self._write_untimed_preconditions(a, converter, out) + self._write_untimed_effects(a, converter, out, costs) out.write(")\n") elif isinstance(a, DurativeAction): if any( @@ -733,6 +679,36 @@ def _write_domain(self, out: IO[str]): out.write(")\n") else: raise NotImplementedError + for proc in self.problem.processes: + + if any(p.simplify().is_false() for p in proc.preconditions): + continue + out.write(f" (:process {self._get_mangled_name(proc)}") + out.write(f"\n :parameters (") + self._write_parameters(out, proc) + out.write(")") + self._write_untimed_preconditions(proc, converter, out) + if len(proc.effects) > 0: + out.write("\n :effect (and") + for e in proc.effects: + _write_derivative( + e, + out, + converter, + ) + out.write(")") + out.write(")\n") + for eve in self.problem.events: + + if any(p.simplify().is_false() for p in eve.preconditions): + continue + out.write(f" (:event {self._get_mangled_name(eve)}") + out.write(f"\n :parameters (") + self._write_parameters(out, eve) + out.write(")") + self._write_untimed_preconditions(eve, converter, out) + self._write_untimed_effects(eve, converter, out, costs) + out.write(")\n") out.write(")\n") def _write_problem(self, out: IO[str]): @@ -1066,6 +1042,36 @@ def format_subtask(t: up.model.htn.Subtask): "Task network constraints not supported by HDDL Writer yet" ) + def _write_untimed_preconditions(self, item, converter, out): + if len(item.preconditions) > 0: + precond_str: list[str] = [] + for p in (c.simplify() for c in item.preconditions): + if not p.is_true(): + if p.is_and(): + precond_str.extend(map(converter.convert, p.args)) + else: + precond_str.append(converter.convert(p)) + out.write(f'\n :precondition (and {" ".join(precond_str)})') + elif len(item.preconditions) == 0 and self.empty_preconditions: + out.write(f"\n :precondition ()") + + def _write_untimed_effects(self, item, converter, out, costs): + if len(item.effects) > 0: + out.write("\n :effect (and") + for e in item.effects: + _write_effect( + e, + None, + out, + converter, + self.rewrite_bool_assignments, + self._get_mangled_name, + ) + + if item in costs: + out.write(f" (increase (total-cost) {converter.convert(costs[item])})") + out.write(")") + def _get_pddl_name(item: Union[WithName, "up.model.AbstractProblem"]) -> str: """This function returns a pddl name for the chosen item""" @@ -1235,8 +1241,10 @@ def _write_derivative( out.write(f" (increase {fluent} (* #t {converter.convert(simplified_value)} ))") elif effect.is_decrease(): out.write(f" (decrease {fluent} (* #t {converter.convert(simplified_value)} ))") - elif effect.is_derivative(): + elif effect.is_continuous_increase(): out.write(f" (increase {fluent} (* #t {converter.convert(simplified_value)} ))") + elif effect.is_continuous_decrease(): + out.write(f" (decrease {fluent} (* #t {converter.convert(simplified_value)} ))") else: raise UPProblemDefinitionError( "Derivative can only be expressed as increase, decrease in processes", diff --git a/unified_planning/model/__init__.py b/unified_planning/model/__init__.py index d0f61f013..96d152094 100644 --- a/unified_planning/model/__init__.py +++ b/unified_planning/model/__init__.py @@ -19,9 +19,17 @@ InstantaneousAction, DurativeAction, SensingAction, +) + +from unified_planning.model.natural_transition import ( + NaturalTransition, Process, Event, ) + +from unified_planning.model.transition import ( + Transition, +) from unified_planning.model.effect import Effect, SimulatedEffect, EffectKind from unified_planning.model.expression import ( BoolExpression, diff --git a/unified_planning/model/action.py b/unified_planning/model/action.py index 47b25bce1..f01ed5396 100644 --- a/unified_planning/model/action.py +++ b/unified_planning/model/action.py @@ -13,9 +13,7 @@ # limitations under the License. # """ -This module defines the `Action` base class and some of his extensions. -An `Action` has a `name`, a `list` of `Parameter`, a `list` of `preconditions` -and a `list` of `effects`. +This module defines the `Action` class and some of his extensions. """ @@ -32,59 +30,15 @@ from typing import Any, Dict, List, Set, Union, Optional, Iterable from collections import OrderedDict +from unified_planning.model.transition import ( + UntimedEffectMixin, + PreconditionMixin, + Transition, +) -class Action(ABC): - """This is the `Action` interface.""" - - def __init__( - self, - _name: str, - _parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None, - _env: Optional[Environment] = None, - **kwargs: "up.model.types.Type", - ): - self._environment = get_environment(_env) - self._name = _name - self._parameters: "OrderedDict[str, up.model.parameter.Parameter]" = ( - OrderedDict() - ) - if _parameters is not None: - assert len(kwargs) == 0 - for n, t in _parameters.items(): - assert self._environment.type_manager.has_type( - t - ), "type of parameter does not belong to the same environment of the action" - self._parameters[n] = up.model.parameter.Parameter( - n, t, self._environment - ) - else: - for n, t in kwargs.items(): - assert self._environment.type_manager.has_type( - t - ), "type of parameter does not belong to the same environment of the action" - self._parameters[n] = up.model.parameter.Parameter( - n, t, self._environment - ) - - @abstractmethod - def __eq__(self, oth: object) -> bool: - raise NotImplementedError - def _print_parameters(self, s): - first = True - for p in self.parameters: - if first: - s.append("(") - first = False - else: - s.append(", ") - s.append(str(p)) - if not first: - s.append(")") - - @abstractmethod - def __hash__(self) -> int: - raise NotImplementedError +class Action(Transition): + """This is the `Action` interface.""" def __call__( self, @@ -99,75 +53,8 @@ def __call__( self, params, agent=agent, motion_paths=motion_paths ) - @abstractmethod - def clone(self): - raise NotImplementedError - - @property - def environment(self) -> Environment: - """Returns this `Action` `Environment`.""" - return self._environment - - @property - def name(self) -> str: - """Returns the `Action` `name`.""" - return self._name - - @name.setter - def name(self, new_name: str): - """Sets the `Action` `name`.""" - self._name = new_name - - @property - def parameters(self) -> List["up.model.parameter.Parameter"]: - """Returns the `list` of the `Action parameters`.""" - return list(self._parameters.values()) - - def parameter(self, name: str) -> "up.model.parameter.Parameter": - """ - Returns the `parameter` of the `Action` with the given `name`. - - Example - ------- - >>> from unified_planning.shortcuts import * - >>> location_type = UserType("Location") - >>> move = InstantaneousAction("move", source=location_type, target=location_type) - >>> move.parameter("source") # return the "source" parameter of the action, with type "Location" - Location source - >>> move.parameter("target") - Location target - - If a parameter's name (1) does not conflict with an existing attribute of `Action` and (2) does not start with '_' - it can also be accessed as if it was an attribute of the action. For instance: - - >>> move.source - Location source - - :param name: The `name` of the target `parameter`. - :return: The `parameter` of the `Action` with the given `name`. - """ - if name not in self._parameters: - raise ValueError(f"Action '{self.name}' has no parameter '{name}'") - return self._parameters[name] - - def __getattr__(self, parameter_name: str) -> "up.model.parameter.Parameter": - if parameter_name.startswith("_"): - # guard access as pickling relies on attribute error to be thrown even when - # no attributes of the object have been set. - # In this case accessing `self._name` or `self._parameters`, would re-invoke __getattr__ - raise AttributeError(f"Action has no attribute '{parameter_name}'") - if parameter_name not in self._parameters: - raise AttributeError( - f"Action '{self.name}' has no attribute or parameter '{parameter_name}'" - ) - return self._parameters[parameter_name] - - def is_conditional(self) -> bool: - """Returns `True` if the `Action` has `conditional effects`, `False` otherwise.""" - raise NotImplementedError - -class InstantaneousTransitionMixin(Action): +class InstantaneousAction(UntimedEffectMixin, Action, PreconditionMixin): """Represents an instantaneous action.""" def __init__( @@ -178,18 +65,38 @@ def __init__( **kwargs: "up.model.types.Type", ): Action.__init__(self, _name, _parameters, _env, **kwargs) - self._preconditions: List["up.model.fnode.FNode"] = [] - self._effects: List[up.model.effect.Effect] = [] - self._simulated_effect: Optional[up.model.effect.SimulatedEffect] = None - # fluent assigned is the mapping of the fluent to it's value if it is an unconditional assignment - self._fluents_assigned: Dict[ - "up.model.fnode.FNode", "up.model.fnode.FNode" - ] = {} - # fluent_inc_dec is the set of the fluents that have an unconditional increase or decrease - self._fluents_inc_dec: Set["up.model.fnode.FNode"] = set() + PreconditionMixin.__init__(self, _env) + UntimedEffectMixin.__init__(self, _env) + + def __repr__(self) -> str: + s = [] + s.append(f"action {self.name}") + first = True + for p in self.parameters: + if first: + s.append("(") + first = False + else: + s.append(", ") + s.append(str(p)) + if not first: + s.append(")") + s.append(" {\n") + s.append(" preconditions = [\n") + for c in self.preconditions: + s.append(f" {str(c)}\n") + s.append(" ]\n") + s.append(" effects = [\n") + for e in self.effects: + s.append(f" {str(e)}\n") + s.append(" ]\n") + if self._simulated_effect is not None: + s.append(f" simulated effect = {self._simulated_effect}\n") + s.append(" }") + return "".join(s) def __eq__(self, oth: object) -> bool: - if isinstance(oth, InstantaneousTransitionMixin): + if isinstance(oth, InstantaneousAction): cond = ( self._environment == oth._environment and self._name == oth._name @@ -215,282 +122,6 @@ def __hash__(self) -> int: res += hash(self._simulated_effect) return res - def clone(self): - new_params = OrderedDict( - (param_name, param.type) for param_name, param in self._parameters.items() - ) - new_instantaneous_action = InstantaneousTransitionMixin( - self._name, new_params, self._environment - ) - new_instantaneous_action._preconditions = self._preconditions[:] - new_instantaneous_action._effects = [e.clone() for e in self._effects] - new_instantaneous_action._fluents_assigned = self._fluents_assigned.copy() - new_instantaneous_action._fluents_inc_dec = self._fluents_inc_dec.copy() - new_instantaneous_action._simulated_effect = self._simulated_effect - return new_instantaneous_action - - @property - def preconditions(self) -> List["up.model.fnode.FNode"]: - """Returns the `list` of the `Action` `preconditions`.""" - return self._preconditions - - def clear_preconditions(self): - """Removes all the `Action preconditions`""" - self._preconditions = [] - - @property - def effects(self) -> List["up.model.effect.Effect"]: - """Returns the `list` of the `Action effects`.""" - return self._effects - - def clear_effects(self): - """Removes all the `Action's effects`.""" - self._effects = [] - self._fluents_assigned = {} - self._fluents_inc_dec = set() - self._simulated_effect = None - - @property - def conditional_effects(self) -> List["up.model.effect.Effect"]: - """Returns the `list` of the `action conditional effects`. - - IMPORTANT NOTE: this property does some computation, so it should be called as - seldom as possible.""" - return [e for e in self._effects if e.is_conditional()] - - def is_conditional(self) -> bool: - """Returns `True` if the `action` has `conditional effects`, `False` otherwise.""" - return any(e.is_conditional() for e in self._effects) - - @property - def unconditional_effects(self) -> List["up.model.effect.Effect"]: - """Returns the `list` of the `action unconditional effects`. - - IMPORTANT NOTE: this property does some computation, so it should be called as - seldom as possible.""" - return [e for e in self._effects if not e.is_conditional()] - - def add_precondition( - self, - precondition: Union[ - "up.model.fnode.FNode", - "up.model.fluent.Fluent", - "up.model.parameter.Parameter", - bool, - ], - ): - """ - Adds the given expression to `action's preconditions`. - - :param precondition: The expression that must be added to the `action's preconditions`. - """ - (precondition_exp,) = self._environment.expression_manager.auto_promote( - precondition - ) - assert self._environment.type_checker.get_type(precondition_exp).is_bool_type() - if precondition_exp == self._environment.expression_manager.TRUE(): - return - free_vars = self._environment.free_vars_oracle.get_free_variables( - precondition_exp - ) - if len(free_vars) != 0: - raise UPUnboundedVariablesError( - f"The precondition {str(precondition_exp)} has unbounded variables:\n{str(free_vars)}" - ) - if precondition_exp not in self._preconditions: - self._preconditions.append(precondition_exp) - - def add_effect( - self, - fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"], - value: "up.model.expression.Expression", - condition: "up.model.expression.BoolExpression" = True, - forall: Iterable["up.model.variable.Variable"] = tuple(), - ): - """ - Adds the given `assignment` to the `action's effects`. - - :param fluent: The `fluent` of which `value` is modified by the `assignment`. - :param value: The `value` to assign to the given `fluent`. - :param condition: The `condition` in which this `effect` is applied; the default - value is `True`. - :param forall: The 'Variables' that are universally quantified in this - effect; the default value is empty. - """ - ( - fluent_exp, - value_exp, - condition_exp, - ) = self._environment.expression_manager.auto_promote(fluent, value, condition) - if not fluent_exp.is_fluent_exp() and not fluent_exp.is_dot(): - raise UPUsageError( - "fluent field of add_effect must be a Fluent or a FluentExp or a Dot." - ) - if not self._environment.type_checker.get_type(condition_exp).is_bool_type(): - raise UPTypeError("Effect condition is not a Boolean condition!") - if not fluent_exp.type.is_compatible(value_exp.type): - # Value is not assignable to fluent (its type is not a subset of the fluent's type). - raise UPTypeError( - f"InstantaneousAction effect has an incompatible value type. Fluent type: {fluent_exp.type} // Value type: {value_exp.type}" - ) - self._add_effect_instance( - up.model.effect.Effect(fluent_exp, value_exp, condition_exp, forall=forall) - ) - - def add_increase_effect( - self, - fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"], - value: "up.model.expression.Expression", - condition: "up.model.expression.BoolExpression" = True, - forall: Iterable["up.model.variable.Variable"] = tuple(), - ): - """ - Adds the given `increase effect` to the `action's effects`. - - :param fluent: The `fluent` which `value` is increased. - :param value: The given `fluent` is incremented by the given `value`. - :param condition: The `condition` in which this `effect` is applied; the default - value is `True`. - :param forall: The 'Variables' that are universally quantified in this - effect; the default value is empty. - """ - ( - fluent_exp, - value_exp, - condition_exp, - ) = self._environment.expression_manager.auto_promote( - fluent, - value, - condition, - ) - if not fluent_exp.is_fluent_exp() and not fluent_exp.is_dot(): - raise UPUsageError( - "fluent field of add_increase_effect must be a Fluent or a FluentExp or a Dot." - ) - if not condition_exp.type.is_bool_type(): - raise UPTypeError("Effect condition is not a Boolean condition!") - if not fluent_exp.type.is_compatible(value_exp.type): - raise UPTypeError( - f"InstantaneousAction effect has an incompatible value type. Fluent type: {fluent_exp.type} // Value type: {value_exp.type}" - ) - if not fluent_exp.type.is_int_type() and not fluent_exp.type.is_real_type(): - raise UPTypeError("Increase effects can be created only on numeric types!") - self._add_effect_instance( - up.model.effect.Effect( - fluent_exp, - value_exp, - condition_exp, - kind=up.model.effect.EffectKind.INCREASE, - forall=forall, - ) - ) - - def add_decrease_effect( - self, - fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"], - value: "up.model.expression.Expression", - condition: "up.model.expression.BoolExpression" = True, - forall: Iterable["up.model.variable.Variable"] = tuple(), - ): - """ - Adds the given `decrease effect` to the `action's effects`. - - :param fluent: The `fluent` which value is decreased. - :param value: The given `fluent` is decremented by the given `value`. - :param condition: The `condition` in which this `effect` is applied; the default - value is `True`. - :param forall: The 'Variables' that are universally quantified in this - effect; the default value is empty. - """ - ( - fluent_exp, - value_exp, - condition_exp, - ) = self._environment.expression_manager.auto_promote(fluent, value, condition) - if not fluent_exp.is_fluent_exp() and not fluent_exp.is_dot(): - raise UPUsageError( - "fluent field of add_decrease_effect must be a Fluent or a FluentExp or a Dot." - ) - if not condition_exp.type.is_bool_type(): - raise UPTypeError("Effect condition is not a Boolean condition!") - if not fluent_exp.type.is_compatible(value_exp.type): - raise UPTypeError( - f"InstantaneousAction effect has an incompatible value type. Fluent type: {fluent_exp.type} // Value type: {value_exp.type}" - ) - if not fluent_exp.type.is_int_type() and not fluent_exp.type.is_real_type(): - raise UPTypeError("Decrease effects can be created only on numeric types!") - self._add_effect_instance( - up.model.effect.Effect( - fluent_exp, - value_exp, - condition_exp, - kind=up.model.effect.EffectKind.DECREASE, - forall=forall, - ) - ) - - def _add_effect_instance(self, effect: "up.model.effect.Effect"): - assert ( - effect.environment == self._environment - ), "effect does not have the same environment of the action" - up.model.effect.check_conflicting_effects( - effect, - None, - self._simulated_effect, - self._fluents_assigned, - self._fluents_inc_dec, - "action", - ) - self._effects.append(effect) - - @property - def simulated_effect(self) -> Optional["up.model.effect.SimulatedEffect"]: - """Returns the `action` `simulated effect`.""" - return self._simulated_effect - - def set_simulated_effect(self, simulated_effect: "up.model.effect.SimulatedEffect"): - """ - Sets the given `simulated effect` as the only `action's simulated effect`. - - :param simulated_effect: The `SimulatedEffect` instance that must be set as this `action`'s only - `simulated effect`. - """ - up.model.effect.check_conflicting_simulated_effects( - simulated_effect, - None, - self._fluents_assigned, - self._fluents_inc_dec, - "action", - ) - if simulated_effect.environment != self.environment: - raise UPUsageError( - "The added SimulatedEffect does not have the same environment of the Action" - ) - self._simulated_effect = simulated_effect - - def _set_preconditions(self, preconditions: List["up.model.fnode.FNode"]): - self._preconditions = preconditions - - -class InstantaneousAction(InstantaneousTransitionMixin): - def __repr__(self) -> str: - s = [] - s.append(f"action {self.name}") - self._print_parameters(s) - s.append(" {\n") - s.append(" preconditions = [\n") - for c in self.preconditions: - s.append(f" {str(c)}\n") - s.append(" ]\n") - s.append(" effects = [\n") - for e in self.effects: - s.append(f" {str(e)}\n") - s.append(" ]\n") - if self._simulated_effect is not None: - s.append(f" simulated effect = {self._simulated_effect}\n") - s.append(" }") - return "".join(s) - def clone(self): new_params = OrderedDict( (param_name, param.type) for param_name, param in self._parameters.items() @@ -783,245 +414,3 @@ def __repr__(self) -> str: s.append(" ]\n") s.append(" }") return "".join(s) - - -""" -Below we have natural transitions. These are not controlled by the agent and would probably need a proper subclass. Natural transitions can be of two kinds: -Processes or Events. -Processes dictate how numeric variables evolve over time through the use of time-derivative functions -Events dictate the analogous of urgent transitions in timed automata theory -""" - - -class Process(Action): - """This is the `Process` class, which implements the abstract `Action` class.""" - - def __init__( - self, - _name: str, - _parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None, - _env: Optional[Environment] = None, - **kwargs: "up.model.types.Type", - ): - Action.__init__(self, _name, _parameters, _env, **kwargs) - self._preconditions: List["up.model.fnode.FNode"] = [] - self._effects: List[up.model.effect.Effect] = [] - self._simulated_effect: Optional[up.model.effect.SimulatedEffect] = None - # fluent assigned is the mapping of the fluent to it's value if it is an unconditional assignment - self._fluents_assigned: Dict[ - "up.model.fnode.FNode", "up.model.fnode.FNode" - ] = {} - # fluent_inc_dec is the set of the fluents that have an unconditional increase or decrease - self._fluents_inc_dec: Set["up.model.fnode.FNode"] = set() - - def __repr__(self) -> str: - s = [] - s.append(f"process {self.name}") - self._print_parameters(s) - s.append(" {\n") - s.append(" preconditions = [\n") - for c in self.preconditions: - s.append(f" {str(c)}\n") - s.append(" ]\n") - s.append(" effects = [\n") - for e in self.effects: - s.append(f" {str(e)}\n") - s.append(" ]\n") - s.append(" }") - return "".join(s) - - def __eq__(self, oth: object) -> bool: - if isinstance(oth, Process): - cond = ( - self._environment == oth._environment - and self._name == oth._name - and self._parameters == oth._parameters - ) - return ( - cond - and set(self._preconditions) == set(oth._preconditions) - and set(self._effects) == set(oth._effects) - and self._simulated_effect == oth._simulated_effect - ) - else: - return False - - def __hash__(self) -> int: - res = hash(self._name) - for ap in self._parameters.items(): - res += hash(ap) - for p in self._preconditions: - res += hash(p) - for e in self._effects: - res += hash(e) - res += hash(self._simulated_effect) - return res - - def clone(self): - new_params = OrderedDict( - (param_name, param.type) for param_name, param in self._parameters.items() - ) - new_process = Process(self._name, new_params, self._environment) - new_process._preconditions = self._preconditions[:] - new_process._effects = [e.clone() for e in self._effects] - new_process._fluents_assigned = self._fluents_assigned.copy() - new_process._fluents_inc_dec = self._fluents_inc_dec.copy() - new_process._simulated_effect = self._simulated_effect - return new_process - - @property - def preconditions(self) -> List["up.model.fnode.FNode"]: - """Returns the `list` of the `Process` `preconditions`.""" - return self._preconditions - - def clear_preconditions(self): - """Removes all the `Process preconditions`""" - self._preconditions = [] - - @property - def effects(self) -> List["up.model.effect.Effect"]: - """Returns the `list` of the `Process effects`.""" - return self._effects - - def clear_effects(self): - """Removes all the `Process's effects`.""" - self._effects = [] - self._fluents_assigned = {} - self._fluents_inc_dec = set() - - def _add_effect_instance(self, effect: "up.model.effect.Effect"): - assert ( - effect.environment == self._environment - ), "effect does not have the same environment of the Process" - - self._effects.append(effect) - - def add_precondition( - self, - precondition: Union[ - "up.model.fnode.FNode", - "up.model.fluent.Fluent", - "up.model.parameter.Parameter", - bool, - ], - ): - """ - Adds the given expression to `Process's preconditions`. - - :param precondition: The expression that must be added to the `Process's preconditions`. - """ - (precondition_exp,) = self._environment.expression_manager.auto_promote( - precondition - ) - assert self._environment.type_checker.get_type(precondition_exp).is_bool_type() - if precondition_exp == self._environment.expression_manager.TRUE(): - return - free_vars = self._environment.free_vars_oracle.get_free_variables( - precondition_exp - ) - if len(free_vars) != 0: - raise UPUnboundedVariablesError( - f"The precondition {str(precondition_exp)} has unbounded variables:\n{str(free_vars)}" - ) - if precondition_exp not in self._preconditions: - self._preconditions.append(precondition_exp) - - def add_derivative( - self, - fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"], - value: "up.model.expression.Expression", - ): - """ - Adds the given `time derivative effect` to the `process's effects`. - - :param fluent: The `fluent` is the numeric state variable of which this process expresses its time derivative, which in Newton's notation would be over-dot(fluent). - :param value: This is the actual time derivative function. For instance, `fluent = 4` expresses that the time derivative of `fluent` is 4. - """ - ( - fluent_exp, - value_exp, - condition_exp, - ) = self._environment.expression_manager.auto_promote( - fluent, - value, - True, - ) - if not fluent_exp.is_fluent_exp() and not fluent_exp.is_dot(): - raise UPUsageError( - "fluent field of add_increase_effect must be a Fluent or a FluentExp or a Dot." - ) - if not fluent_exp.type.is_compatible(value_exp.type): - raise UPTypeError( - f"Process effect has an incompatible value type. Fluent type: {fluent_exp.type} // Value type: {value_exp.type}" - ) - if not fluent_exp.type.is_int_type() and not fluent_exp.type.is_real_type(): - raise UPTypeError("Derivative can be created only on numeric types!") - self._add_effect_instance( - up.model.effect.Effect( - fluent_exp, - value_exp, - condition_exp, - kind=up.model.effect.EffectKind.DERIVATIVE, - forall=tuple(), - ) - ) - - -class Event(InstantaneousTransitionMixin): - """This class represents an event.""" - - def __init__( - self, - _name: str, - _parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None, - _env: Optional[Environment] = None, - **kwargs: "up.model.types.Type", - ): - InstantaneousTransitionMixin.__init__(self, _name, _parameters, _env, **kwargs) - - def __eq__(self, oth: object) -> bool: - if isinstance(oth, Event): - return super().__eq__(oth) - else: - return False - - def __hash__(self) -> int: - res = hash(self._name) - for ap in self._parameters.items(): - res += hash(ap) - for p in self._preconditions: - res += hash(p) - for e in self._effects: - res += hash(e) - res += hash(self._simulated_effect) - return res - - def __repr__(self) -> str: - s = [] - s.append(f"event {self.name}") - self._print_parameters(s) - s.append(" {\n") - s.append(" preconditions = [\n") - for c in self.preconditions: - s.append(f" {str(c)}\n") - s.append(" ]\n") - s.append(" effects = [\n") - for e in self.effects: - s.append(f" {str(e)}\n") - s.append(" ]\n") - if self._simulated_effect is not None: - s.append(f" simulated effect = {self._simulated_effect}\n") - s.append(" }") - return "".join(s) - - def clone(self): - new_params = OrderedDict( - (param_name, param.type) for param_name, param in self._parameters.items() - ) - new_instantaneous_action = Event(self._name, new_params, self._environment) - new_instantaneous_action._preconditions = self._preconditions[:] - new_instantaneous_action._effects = [e.clone() for e in self._effects] - new_instantaneous_action._fluents_assigned = self._fluents_assigned.copy() - new_instantaneous_action._fluents_inc_dec = self._fluents_inc_dec.copy() - new_instantaneous_action._simulated_effect = self._simulated_effect - return new_instantaneous_action diff --git a/unified_planning/model/effect.py b/unified_planning/model/effect.py index 1a7d8f240..5996fd85b 100644 --- a/unified_planning/model/effect.py +++ b/unified_planning/model/effect.py @@ -38,13 +38,15 @@ class EffectKind(Enum): `ASSIGN` => `if C then F <= V` `INCREASE` => `if C then F <= F + V` `DECREASE` => `if C then F <= F - V` - `DERIVATIVE` => `dF/dt <= V` + `CONTINUOUS_INCREASE` => `dF/dt <= V` + `CONTINUOUS_DECREASE` => `dF/dt <= -V` """ ASSIGN = auto() INCREASE = auto() DECREASE = auto() - DERIVATIVE = auto() + CONTINUOUS_INCREASE = auto() + CONTINUOUS_DECREASE = auto() class Effect: @@ -111,7 +113,7 @@ def __repr__(self) -> str: s.append(f"forall {', '.join(str(v) for v in self._forall)}") if self.is_conditional(): s.append(f"if {str(self._condition)} then") - if not (self.is_derivative()): + if not (self.is_continuous_increase() or self.is_continuous_decrease()): s.append(f"{str(self._fluent)}") if self.is_assignment(): s.append(":=") @@ -119,8 +121,10 @@ def __repr__(self) -> str: s.append("+=") elif self.is_decrease(): s.append("-=") - elif self.is_derivative(): + elif self.is_continuous_increase(): s.append(f"d{str(self._fluent)}/dt =") + elif self.is_continuous_decrease(): + s.append(f"d{str(self._fluent)}/dt = -") s.append(f"{str(self._value)}") return " ".join(s) @@ -250,9 +254,13 @@ def is_decrease(self) -> bool: """Returns `True` if the :func:`kind ` of this `Effect` is a `decrease`, `False` otherwise.""" return self._kind == EffectKind.DECREASE - def is_derivative(self) -> bool: - """Returns `True` if the :func:`kind ` of this `Effect` is a `derivative`, `False` otherwise.""" - return self._kind == EffectKind.DERIVATIVE + def is_continuous_increase(self) -> bool: + """Returns `True` if the :func:`kind ` of this `Effect` is a `continuous increase`, `False` otherwise.""" + return self._kind == EffectKind.CONTINUOUS_INCREASE + + def is_continuous_decrease(self) -> bool: + """Returns `True` if the :func:`kind ` of this `Effect` is a `continuous decrease`, `False` otherwise.""" + return self._kind == EffectKind.CONTINUOUS_DECREASE class SimulatedEffect: diff --git a/unified_planning/model/mixins/__init__.py b/unified_planning/model/mixins/__init__.py index 3cd6ed8e7..38f96009b 100644 --- a/unified_planning/model/mixins/__init__.py +++ b/unified_planning/model/mixins/__init__.py @@ -14,6 +14,9 @@ # from unified_planning.model.mixins.actions_set import ActionsSetMixin +from unified_planning.model.mixins.natural_transitions_set import ( + NaturalTransitionsSetMixin, +) from unified_planning.model.mixins.time_model import TimeModelMixin from unified_planning.model.mixins.fluents_set import FluentsSetMixin from unified_planning.model.mixins.objects_set import ObjectsSetMixin @@ -24,6 +27,7 @@ __all__ = [ "ActionsSetMixin", + "NaturalTransitionsSetMixin", "TimeModelMixin", "FluentsSetMixin", "ObjectsSetMixin", diff --git a/unified_planning/model/mixins/actions_set.py b/unified_planning/model/mixins/actions_set.py index a2aeaabfb..2c5be9272 100644 --- a/unified_planning/model/mixins/actions_set.py +++ b/unified_planning/model/mixins/actions_set.py @@ -82,26 +82,6 @@ def durative_actions(self) -> Iterator["up.model.action.DurativeAction"]: if isinstance(a, up.model.action.DurativeAction): yield a - @property - def processes(self) -> Iterator["up.model.action.Process"]: - """Returs all the sensing actions of the problem. - - IMPORTANT NOTE: this property does some computation, so it should be called as - seldom as possible.""" - for a in self._actions: - if isinstance(a, up.model.action.Process): - yield a - - @property - def events(self) -> Iterator["up.model.action.Event"]: - """Returs all the sensing actions of the problem. - - IMPORTANT NOTE: this property does some computation, so it should be called as - seldom as possible.""" - for a in self._actions: - if isinstance(a, up.model.action.Event): - yield a - @property def conditional_actions(self) -> List["up.model.action.Action"]: """ diff --git a/unified_planning/model/mixins/natural_transitions_set.py b/unified_planning/model/mixins/natural_transitions_set.py new file mode 100644 index 000000000..e8cf24dfa --- /dev/null +++ b/unified_planning/model/mixins/natural_transitions_set.py @@ -0,0 +1,191 @@ +# Copyright 2021-2023 AIPlan4EU project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from warnings import warn +import unified_planning as up +from unified_planning.exceptions import UPProblemDefinitionError, UPValueError +from typing import Iterator, List, Iterable, Union + + +class NaturalTransitionsSetMixin: + """ + This class is a mixin that contains a `set` of `natural_transitions` with some related methods. + + NOTE: when this mixin is used in combination with other mixins that share some + of the attributes (e.g. `environment`, `add_user_type_method`, `has_name_method`), it is required + to pass the very same arguments to the mixins constructors. + """ + + def __init__(self, environment, add_user_type_method, has_name_method): + self._env = environment + self._add_user_type_method = add_user_type_method + self._has_name_method = has_name_method + self._events: List["up.model.natural_transition.Event"] = [] + self._processes: List["up.model.natural_transition.Process"] = [] + + @property + def environment(self) -> "up.environment.Environment": + """Returns the `Problem` environment.""" + return self._env + + @property + def processes( + self, + ) -> List["up.model.natural_transition.Process"]: + """Returns the list of the `Processes` in the `Problem`.""" + return self._processes + + @property + def events( + self, + ) -> List["up.model.natural_transition.Event"]: + """Returns the list of the `Events` in the `Problem`.""" + return self._events + + @property + def natural_transitions( + self, + ) -> List["up.model.natural_transition.NaturalTransition"]: + """Returns the list of the `Processes` and `Events` in the `Problem`.""" + ntlist: List[up.model.natural_transition.NaturalTransition] = [] + ntlist.extend(self.processes) + ntlist.extend(self.events) + return ntlist + + def clear_events(self): + """Removes all the `Problem` `Events`.""" + self._events = [] + + def clear_processes(self): + """Removes all the `Problem` `Processes`.""" + self._processes = [] + + def process(self, name: str) -> "up.model.natural_transition.Process": + """ + Returns the `Process` with the given `name`. + + :param name: The `name` of the target `process`. + :return: The `process` in the `problem` with the given `name`. + """ + for a in self._processes: + if a.name == name: + return a + raise UPValueError(f"Process of name: {name} is not defined!") + + def event(self, name: str) -> "up.model.natural_transition.Event": + """ + Returns the `event` with the given `name`. + + :param name: The `name` of the target `event`. + :return: The `event` in the `problem` with the given `name`. + """ + for a in self._events: + if a.name == name: + return a + raise UPValueError(f"NaturalTransition of name: {name} is not defined!") + + def has_process(self, name: str) -> bool: + """ + Returns `True` if the `problem` has the `process` with the given `name`, + `False` otherwise. + + :param name: The `name` of the target `process`. + :return: `True` if the `problem` has an `process` with the given `name`, `False` otherwise. + """ + for a in self._processes: + if a.name == name: + return True + return False + + def has_event(self, name: str) -> bool: + """ + Returns `True` if the `problem` has the `event` with the given `name`, + `False` otherwise. + + :param name: The `name` of the target `event`. + :return: `True` if the `problem` has an `event` with the given `name`, `False` otherwise. + """ + for a in self._events: + if a.name == name: + return True + return False + + def add_process(self, process: "up.model.natural_transition.Process"): + """ + Adds the given `process` to the `problem`. + + :param natural_transition: The `process` that must be added to the `problem`. + """ + assert ( + process.environment == self._env + ), "Process does not have the same environment of the problem" + if self._has_name_method(process.name): + msg = f"Name {process.name} already defined! Different elements of a problem can have the same name if the environment flag error_used_name is disabled." + if self._env.error_used_name or any( + process.name == a.name for a in self._processes + ): + raise UPProblemDefinitionError(msg) + else: + warn(msg) + self._processes.append(process) + for param in process.parameters: + if param.type.is_user_type(): + self._add_user_type_method(param.type) + + def add_event(self, event: "up.model.natural_transition.Event"): + """ + Adds the given `event` to the `problem`. + + :param event: The `event` that must be added to the `problem`. + """ + assert ( + event.environment == self._env + ), "Event does not have the same environment of the problem" + if self._has_name_method(event.name): + msg = f"Name {event.name} already defined! Different elements of a problem can have the same name if the environment flag error_used_name is disabled." + if self._env.error_used_name or any( + event.name == a.name for a in self._events + ): + raise UPProblemDefinitionError(msg) + else: + warn(msg) + self._events.append(event) + for param in event.parameters: + if param.type.is_user_type(): + self._add_user_type_method(param.type) + + def add_processes( + self, + processes: Iterable["up.model.natural_transition.Process"], + ): + """ + Adds the given `processes` to the `problem`. + + :param processes: The `processes` that must be added to the `problem`. + """ + for process in processes: + self.add_process(process) + + def add_events( + self, + events: Iterable["up.model.natural_transition.Event"], + ): + """ + Adds the given `events` to the `problem`. + + :param events: The `events` that must be added to the `problem`. + """ + for event in events: + self.add_event(event) diff --git a/unified_planning/model/natural_transition.py b/unified_planning/model/natural_transition.py new file mode 100644 index 000000000..d783b785f --- /dev/null +++ b/unified_planning/model/natural_transition.py @@ -0,0 +1,271 @@ +# Copyright 2021-2023 AIPlan4EU project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +import unified_planning as up +from unified_planning.environment import get_environment, Environment +from unified_planning.exceptions import ( + UPTypeError, + UPUnboundedVariablesError, + UPProblemDefinitionError, + UPUsageError, +) +from unified_planning.model.mixins.timed_conds_effs import TimedCondsEffs +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Set, Union, Optional, Iterable +from collections import OrderedDict + +from unified_planning.model.transition import ( + UntimedEffectMixin, + PreconditionMixin, + Transition, +) + + +""" +Below we have natural transitions. These are not controlled by the agent. Natural transitions can be of two kinds: +Processes or Events. +Processes dictate how numeric variables evolve over time through the use of time-derivative functions +Events dictate the analogous of urgent transitions in timed automata theory +""" + + +class NaturalTransition(Transition, PreconditionMixin): + """This is the `NaturalTransition` interface""" + + +class Process(NaturalTransition): + """This is the `Process` class, which implements the abstract `NaturalTransition` class.""" + + def __init__( + self, + _name: str, + _parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None, + _env: Optional[Environment] = None, + **kwargs: "up.model.types.Type", + ): + Transition.__init__(self, _name, _parameters, _env, **kwargs) + PreconditionMixin.__init__(self, _env) + self._effects: List[up.model.effect.Effect] = [] + # fluent assigned is the mapping of the fluent to it's value if it is an unconditional assignment + self._fluents_assigned: Dict[ + "up.model.fnode.FNode", "up.model.fnode.FNode" + ] = {} + # fluent_inc_dec is the set of the fluents that have an unconditional increase or decrease + self._fluents_inc_dec: Set["up.model.fnode.FNode"] = set() + + def __repr__(self) -> str: + s = [] + s.append(f"process {self.name}") + self._print_parameters(s) + s.append(" {\n") + s.append(" preconditions = [\n") + for c in self.preconditions: + s.append(f" {str(c)}\n") + s.append(" ]\n") + s.append(" effects = [\n") + for e in self.effects: + s.append(f" {str(e)}\n") + s.append(" ]\n") + s.append(" }") + return "".join(s) + + def __eq__(self, oth: object) -> bool: + if isinstance(oth, Process): + cond = ( + self._environment == oth._environment + and self._name == oth._name + and self._parameters == oth._parameters + ) + return ( + cond + and set(self._preconditions) == set(oth._preconditions) + and set(self._effects) == set(oth._effects) + ) + else: + return False + + def __hash__(self) -> int: + res = hash(self._name) + for ap in self._parameters.items(): + res += hash(ap) + for p in self._preconditions: + res += hash(p) + for e in self._effects: + res += hash(e) + return res + + def clone(self): + new_params = OrderedDict( + (param_name, param.type) for param_name, param in self._parameters.items() + ) + new_process = Process(self._name, new_params, self._environment) + new_process._preconditions = self._preconditions[:] + new_process._effects = [e.clone() for e in self._effects] + new_process._fluents_assigned = self._fluents_assigned.copy() + new_process._fluents_inc_dec = self._fluents_inc_dec.copy() + return new_process + + @property + def effects(self) -> List["up.model.effect.Effect"]: + """Returns the `list` of the `Process effects`.""" + return self._effects + + def clear_effects(self): + """Removes all the `Process's effects`.""" + self._effects = [] + self._fluents_assigned = {} + self._fluents_inc_dec = set() + + def _add_effect_instance(self, effect: "up.model.effect.Effect"): + assert ( + effect.environment == self._environment + ), "effect does not have the same environment of the Process" + + self._effects.append(effect) + + def _add_continuous_effect( + self, + fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"], + value: "up.model.expression.Expression", + negative: bool, + ): + ( + fluent_exp, + value_exp, + condition_exp, + ) = self._environment.expression_manager.auto_promote( + fluent, + value, + True, + ) + if not fluent_exp.is_fluent_exp() and not fluent_exp.is_dot(): + raise UPUsageError( + "fluent field of add_increase_effect must be a Fluent or a FluentExp or a Dot." + ) + if not fluent_exp.type.is_compatible(value_exp.type): + raise UPTypeError( + f"Process effect has an incompatible value type. Fluent type: {fluent_exp.type} // Value type: {value_exp.type}" + ) + if not fluent_exp.type.is_int_type() and not fluent_exp.type.is_real_type(): + raise UPTypeError("Derivative can be created only on numeric types!") + e_kind = up.model.effect.EffectKind.CONTINUOUS_INCREASE + + if negative: + e_kind = up.model.effect.EffectKind.CONTINUOUS_DECREASE + self._add_effect_instance( + up.model.effect.Effect( + fluent_exp, + value_exp, + condition_exp, + kind=e_kind, + forall=tuple(), + ) + ) + + def add_increase_continuous_effect( + self, + fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"], + value: "up.model.expression.Expression", + ): + """ + Adds the given `continuous increase effect` to the `process's effects`. + + NOTE description might be outdated + :param fluent: The `fluent` is the numeric state variable of which this process expresses its derivative, which in Newton's notation would be over-dot(fluent). + :param value: This is the actual time derivative function. For instance, `fluent = 4` expresses that the time derivative of `fluent` is 4. + """ + self._add_continuous_effect(fluent, value, False) + + def add_decrease_continuous_effect( + self, + fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"], + value: "up.model.expression.Expression", + ): + """ + Adds the given `continuous decrease effect` to the `process's effects`. + + NOTE description might be outdated + :param fluent: The `fluent` is the numeric state variable of which this process expresses its derivative, which in Newton's notation would be over-dot(fluent). + :param value: This is the actual time derivative function. For instance, `fluent = 4` expresses that the time derivative of `fluent` is 4. + """ + self._add_continuous_effect(fluent, value, True) + + +class Event(UntimedEffectMixin, NaturalTransition): + """This class represents an event.""" + + def __init__( + self, + _name: str, + _parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None, + _env: Optional[Environment] = None, + **kwargs: "up.model.types.Type", + ): + Transition.__init__(self, _name, _parameters, _env, **kwargs) + PreconditionMixin.__init__(self, _env) + UntimedEffectMixin.__init__(self, _env) + + def __eq__(self, oth: object) -> bool: + if isinstance(oth, Event): + cond = ( + self._environment == oth._environment + and self._name == oth._name + and self._parameters == oth._parameters + ) + return ( + cond + and set(self._preconditions) == set(oth._preconditions) + and set(self._effects) == set(oth._effects) + ) + else: + return False + + def __hash__(self) -> int: + res = hash(self._name) + for ap in self._parameters.items(): + res += hash(ap) + for p in self._preconditions: + res += hash(p) + for e in self._effects: + res += hash(e) + return res + + def clone(self): + new_params = OrderedDict( + (param_name, param.type) for param_name, param in self._parameters.items() + ) + new_event = Event(self._name, new_params, self._environment) + new_event._preconditions = self._preconditions[:] + new_event._effects = [e.clone() for e in self._effects] + new_event._fluents_assigned = self._fluents_assigned.copy() + new_event._fluents_inc_dec = self._fluents_inc_dec.copy() + return new_event + + def __repr__(self) -> str: + s = [] + s.append(f"event {self.name}") + self._print_parameters(s) + s.append(" {\n") + s.append(" preconditions = [\n") + for c in self.preconditions: + s.append(f" {str(c)}\n") + s.append(" ]\n") + s.append(" effects = [\n") + for e in self.effects: + s.append(f" {str(e)}\n") + s.append(" ]\n") + s.append(" }") + return "".join(s) diff --git a/unified_planning/model/problem.py b/unified_planning/model/problem.py index 4a1934ece..fd6a3a2db 100644 --- a/unified_planning/model/problem.py +++ b/unified_planning/model/problem.py @@ -17,11 +17,13 @@ from itertools import chain, product import unified_planning as up +from unified_planning.model.effect import EffectKind import unified_planning.model.tamp from unified_planning.model import Fluent from unified_planning.model.abstract_problem import AbstractProblem from unified_planning.model.mixins import ( ActionsSetMixin, + NaturalTransitionsSetMixin, TimeModelMixin, FluentsSetMixin, ObjectsSetMixin, @@ -52,6 +54,7 @@ class Problem( # type: ignore[misc] TimeModelMixin, FluentsSetMixin, ActionsSetMixin, + NaturalTransitionsSetMixin, ObjectsSetMixin, InitialStateMixin, MetricsMixin, @@ -80,6 +83,9 @@ def __init__( ActionsSetMixin.__init__( self, self.environment, self._add_user_type, self.has_name ) + NaturalTransitionsSetMixin.__init__( + self, self.environment, self._add_user_type, self.has_name + ) ObjectsSetMixin.__init__( self, self.environment, self._add_user_type, self.has_name ) @@ -117,6 +123,14 @@ def __repr__(self) -> str: s.append("actions = [\n") s.extend(map(custom_str, self.actions)) s.append("]\n\n") + if len(self.processes) > 0: + s.append("processes = [\n") + s.extend(map(custom_str, self.processes)) + s.append("]\n\n") + if len(self.events) > 0: + s.append("events = [\n") + s.extend(map(custom_str, self.events)) + s.append("]\n\n") if len(self.user_types) > 0: s.append("objects = [\n") for ty in self.user_types: @@ -235,6 +249,8 @@ def clone(self): TimeModelMixin._clone_to(self, new_p) new_p._actions = [a.clone() for a in self._actions] + new_p._events = [a.clone() for a in self._events] + new_p._processes = [a.clone() for a in self._processes] new_p._timed_effects = { t: [e.clone() for e in el] for t, el in self._timed_effects.items() } @@ -311,7 +327,7 @@ def _get_static_and_unused_fluents( (f.fluent() for e in exps for f in fve.get(e)) ) for a in self._actions: - if isinstance(a, up.model.action.InstantaneousTransitionMixin): + if isinstance(a, up.model.action.InstantaneousAction): remove_used_fluents(*a.preconditions) for e in a.effects: remove_used_fluents(e.fluent, e.value, e.condition) @@ -332,12 +348,17 @@ def _get_static_and_unused_fluents( unused_fluents.clear() for f in se.fluents: static_fluents.discard(f.fluent()) - elif isinstance(a, up.model.action.Process): - for e in a.effects: - remove_used_fluents(e.fluent, e.value, e.condition) - static_fluents.discard(e.fluent.fluent()) else: raise NotImplementedError + for ev in self._events: + remove_used_fluents(*ev.preconditions) + for e in ev.effects: + remove_used_fluents(e.fluent, e.value, e.condition) + static_fluents.discard(e.fluent.fluent()) + for pro in self._processes: + for e in pro.effects: + remove_used_fluents(e.fluent, e.value, e.condition) + static_fluents.discard(e.fluent.fluent()) for el in self._timed_effects.values(): for e in el: remove_used_fluents(e.fluent, e.value, e.condition) @@ -674,6 +695,8 @@ def _kind_factory(self) -> "_KindFactory": if len(self._timed_effects) > 0: factory.kind.set_time("CONTINUOUS_TIME") factory.kind.set_time("TIMED_EFFECTS") + for process in self._processes: + factory.update_problem_kind_process(process) for effect in chain(*self._timed_effects.values()): factory.update_problem_kind_effect(effect) if len(self._timed_goals) > 0: @@ -689,6 +712,8 @@ def _kind_factory(self) -> "_KindFactory": factory.update_problem_kind_initial_state(self) if len(list(self.processes)) > 0: factory.kind.set_time("PROCESSES") + if len(list(self.events)) > 0: + factory.kind.set_time("EVENTS") return factory @@ -864,6 +889,10 @@ def update_problem_kind_effect( self.kind.set_effects_kind("STATIC_FLUENTS_IN_OBJECT_ASSIGNMENTS") if any(f not in self.static_fluents for f in fluents_in_value): self.kind.set_effects_kind("FLUENTS_IN_OBJECT_ASSIGNMENTS") + elif e.is_continuous_increase(): + self.kind.unset_problem_type("SIMPLE_NUMERIC_PLANNING") + elif e.is_continuous_decrease(): + self.kind.unset_problem_type("SIMPLE_NUMERIC_PLANNING") def update_problem_kind_expression( self, @@ -992,7 +1021,7 @@ def update_problem_kind_action( if isinstance(action, up.model.tamp.InstantaneousMotionAction): if len(action.motion_constraints) > 0: self.kind.set_problem_class("TAMP") - if isinstance(action, up.model.action.InstantaneousTransitionMixin): + if isinstance(action, up.model.action.InstantaneousAction): for c in action.preconditions: self.update_problem_kind_expression(c) for e in action.effects: @@ -1007,14 +1036,36 @@ def update_problem_kind_action( for t, le in action.effects.items(): for e in le: self.update_action_timed_effect(t, e) + if len(action.simulated_effects) > 0: self.kind.set_simulated_entities("SIMULATED_EFFECTS") self.kind.set_time("CONTINUOUS_TIME") - elif isinstance(action, up.model.action.Process): - pass else: raise NotImplementedError + def update_problem_kind_process( + self, + process: "up.model.natural_transition.Process", + ): + for param in process.parameters: + self.update_action_parameter(param) + + continuous_fluents = set() + fluents_in_rhs = set() + for e in process.effects: + if e.kind == EffectKind.CONTINUOUS_INCREASE: + self.kind.set_effects_kind("INCREASE_CONTINUOUS_EFFECTS") + if e.kind == EffectKind.CONTINUOUS_DECREASE: + self.kind.set_effects_kind("DECREASE_CONTINUOUS_EFFECTS") + + continuous_fluents.add(e.fluent.fluent) + rhs = self.simplifier.simplify(e.value) + for var in self.environment.free_vars_extractor.get(rhs): + if var.is_fluent_exp(): + fluents_in_rhs.add(var.fluent) + if any(variable in fluents_in_rhs for variable in continuous_fluents): + self.kind.set_effects_kind("NON_LINEAR_CONTINUOUS_EFFECTS") + def update_problem_kind_metric( self, ) -> Tuple[Set["up.model.Fluent"], Set["up.model.Fluent"]]: diff --git a/unified_planning/model/problem_kind.py b/unified_planning/model/problem_kind.py index 52ae19d9e..91482f570 100644 --- a/unified_planning/model/problem_kind.py +++ b/unified_planning/model/problem_kind.py @@ -47,6 +47,7 @@ "DURATION_INEQUALITIES", "SELF_OVERLAPPING", "PROCESSES", + "EVENTS", ], "EXPRESSION_DURATION": [ "STATIC_FLUENTS_IN_DURATIONS", @@ -70,6 +71,9 @@ "CONDITIONAL_EFFECTS", "INCREASE_EFFECTS", "DECREASE_EFFECTS", + "INCREASE_CONTINUOUS_EFFECTS", + "DECREASE_CONTINUOUS_EFFECTS", + "NON_LINEAR_CONTINUOUS_EFFECTS", "STATIC_FLUENTS_IN_BOOLEAN_ASSIGNMENTS", "STATIC_FLUENTS_IN_NUMERIC_ASSIGNMENTS", "STATIC_FLUENTS_IN_OBJECT_ASSIGNMENTS", diff --git a/unified_planning/model/transition.py b/unified_planning/model/transition.py new file mode 100644 index 000000000..deb26ee29 --- /dev/null +++ b/unified_planning/model/transition.py @@ -0,0 +1,415 @@ +# Copyright 2021-2023 AIPlan4EU project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +This module defines the `Transition` base class and some of his extensions. +A `Transition` has a `name`, a `list` of `Parameter`, a `list` of `preconditions` +and a `list` of `effects`. +""" + + +import unified_planning as up +from unified_planning.environment import get_environment, Environment +from unified_planning.exceptions import ( + UPTypeError, + UPUnboundedVariablesError, + UPProblemDefinitionError, + UPUsageError, +) +from unified_planning.model.mixins.timed_conds_effs import TimedCondsEffs +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Set, Union, Optional, Iterable +from collections import OrderedDict + + +class Transition(ABC): + """This is the `Transition` interface.""" + + def __init__( + self, + _name: str, + _parameters: Optional["OrderedDict[str, up.model.types.Type]"] = None, + _env: Optional[Environment] = None, + **kwargs: "up.model.types.Type", + ): + self._environment = get_environment(_env) + self._name = _name + self._parameters: "OrderedDict[str, up.model.parameter.Parameter]" = ( + OrderedDict() + ) + if _parameters is not None: + assert len(kwargs) == 0 + for n, t in _parameters.items(): + assert self._environment.type_manager.has_type( + t + ), "type of parameter does not belong to the same environment of the transition" + self._parameters[n] = up.model.parameter.Parameter( + n, t, self._environment + ) + else: + for n, t in kwargs.items(): + assert self._environment.type_manager.has_type( + t + ), "type of parameter does not belong to the same environment of the transition" + self._parameters[n] = up.model.parameter.Parameter( + n, t, self._environment + ) + + @abstractmethod + def __eq__(self, oth: object) -> bool: + raise NotImplementedError + + def _print_parameters(self, s): + first = True + for p in self.parameters: + if first: + s.append("(") + first = False + else: + s.append(", ") + s.append(str(p)) + if not first: + s.append(")") + + @abstractmethod + def __hash__(self) -> int: + raise NotImplementedError + + @abstractmethod + def clone(self): + raise NotImplementedError + + def is_conditional(self) -> bool: + """Returns `True` if the `Transition` has `conditional effects`, `False` otherwise.""" + raise NotImplementedError + + @property + def environment(self) -> Environment: + """Returns this `Transition` `Environment`.""" + return self._environment + + @property + def name(self) -> str: + """Returns the `Transition `name`.""" + return self._name + + @name.setter + def name(self, new_name: str): + """Sets the `Transition` `name`.""" + self._name = new_name + + @property + def parameters(self) -> List["up.model.parameter.Parameter"]: + """Returns the `list` of the `Transition parameters`.""" + return list(self._parameters.values()) + + def parameter(self, name: str) -> "up.model.parameter.Parameter": + """ + Returns the `parameter` of the `Transition` with the given `name`. + + Example + ------- + >>> from unified_planning.shortcuts import * + >>> location_type = UserType("Location") + >>> move = InstantaneousAction("move", source=location_type, target=location_type) + >>> move.parameter("source") # return the "source" parameter of the transition, with type "Location" + Location source + >>> move.parameter("target") + Location target + + If a parameter's name (1) does not conflict with an existing attribute of `Transition` and (2) does not start with '_' + it can also be accessed as if it was an attribute of the transition. For instance: + + >>> move.source + Location source + + :param name: The `name` of the target `parameter`. + :return: The `parameter` of the `Transition` with the given `name`. + """ + if name not in self._parameters: + raise ValueError(f"Transition '{self.name}' has no parameter '{name}'") + return self._parameters[name] + + def __getattr__(self, parameter_name: str) -> "up.model.parameter.Parameter": + if parameter_name.startswith("_"): + # guard access as pickling relies on attribute error to be thrown even when + # no attributes of the object have been set. + # In this case accessing `self._name` or `self._parameters`, would re-invoke __getattr__ + raise AttributeError(f"Transition has no attribute '{parameter_name}'") + if parameter_name not in self._parameters: + raise AttributeError( + f"Transition '{self.name}' has no attribute or parameter '{parameter_name}'" + ) + return self._parameters[parameter_name] + + +class PreconditionMixin: + def __init__(self, _env): + self._preconditions: List["up.model.fnode.FNode"] = [] + self._environment = get_environment(_env) + + @property + def preconditions(self) -> List["up.model.fnode.FNode"]: + """Returns the `list` of the `Action` `preconditions`.""" + return self._preconditions + + def clear_preconditions(self): + """Removes all the `Action preconditions`""" + self._preconditions = [] + + def add_precondition( + self, + precondition: Union[ + "up.model.fnode.FNode", + "up.model.fluent.Fluent", + "up.model.parameter.Parameter", + bool, + ], + ): + """ + Adds the given expression to `action's preconditions`. + + :param precondition: The expression that must be added to the `action's preconditions`. + """ + (precondition_exp,) = self._environment.expression_manager.auto_promote( + precondition + ) + assert self._environment.type_checker.get_type(precondition_exp).is_bool_type() + if precondition_exp == self._environment.expression_manager.TRUE(): + return + free_vars = self._environment.free_vars_oracle.get_free_variables( + precondition_exp + ) + if len(free_vars) != 0: + raise UPUnboundedVariablesError( + f"The precondition {str(precondition_exp)} has unbounded variables:\n{str(free_vars)}" + ) + if precondition_exp not in self._preconditions: + self._preconditions.append(precondition_exp) + + def _set_preconditions(self, preconditions: List["up.model.fnode.FNode"]): + self._preconditions = preconditions + + +class UntimedEffectMixin: + def __init__(self, _env): + self._environment = get_environment(_env) + self._effects: List[up.model.effect.Effect] = [] + self._simulated_effect: Optional[up.model.effect.SimulatedEffect] = None + # fluent assigned is the mapping of the fluent to it's value if it is an unconditional assignment + self._fluents_assigned: Dict[ + "up.model.fnode.FNode", "up.model.fnode.FNode" + ] = {} + # fluent_inc_dec is the set of the fluents that have an unconditional increase or decrease + self._fluents_inc_dec: Set["up.model.fnode.FNode"] = set() + + @property + def effects(self) -> List["up.model.effect.Effect"]: + """Returns the `list` of the `Action effects`.""" + return self._effects + + def clear_effects(self): + """Removes all the `Action's effects`.""" + self._effects = [] + self._fluents_assigned = {} + self._fluents_inc_dec = set() + self._simulated_effect = None + + @property + def conditional_effects(self) -> List["up.model.effect.Effect"]: + """Returns the `list` of the `action conditional effects`. + + IMPORTANT NOTE: this property does some computation, so it should be called as + seldom as possible.""" + return [e for e in self._effects if e.is_conditional()] + + def is_conditional(self) -> bool: + """Returns `True` if the `action` has `conditional effects`, `False` otherwise.""" + return any(e.is_conditional() for e in self._effects) + + @property + def unconditional_effects(self) -> List["up.model.effect.Effect"]: + """Returns the `list` of the `action unconditional effects`. + + IMPORTANT NOTE: this property does some computation, so it should be called as + seldom as possible.""" + return [e for e in self._effects if not e.is_conditional()] + + def add_effect( + self, + fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"], + value: "up.model.expression.Expression", + condition: "up.model.expression.BoolExpression" = True, + forall: Iterable["up.model.variable.Variable"] = tuple(), + ): + """ + Adds the given `assignment` to the `action's effects`. + + :param fluent: The `fluent` of which `value` is modified by the `assignment`. + :param value: The `value` to assign to the given `fluent`. + :param condition: The `condition` in which this `effect` is applied; the default + value is `True`. + :param forall: The 'Variables' that are universally quantified in this + effect; the default value is empty. + """ + ( + fluent_exp, + value_exp, + condition_exp, + ) = self._environment.expression_manager.auto_promote(fluent, value, condition) + if not fluent_exp.is_fluent_exp() and not fluent_exp.is_dot(): + raise UPUsageError( + "fluent field of add_effect must be a Fluent or a FluentExp or a Dot." + ) + if not self._environment.type_checker.get_type(condition_exp).is_bool_type(): + raise UPTypeError("Effect condition is not a Boolean condition!") + if not fluent_exp.type.is_compatible(value_exp.type): + # Value is not assignable to fluent (its type is not a subset of the fluent's type). + raise UPTypeError( + f"InstantaneousAction effect has an incompatible value type. Fluent type: {fluent_exp.type} // Value type: {value_exp.type}" + ) + self._add_effect_instance( + up.model.effect.Effect(fluent_exp, value_exp, condition_exp, forall=forall) + ) + + def add_increase_effect( + self, + fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"], + value: "up.model.expression.Expression", + condition: "up.model.expression.BoolExpression" = True, + forall: Iterable["up.model.variable.Variable"] = tuple(), + ): + """ + Adds the given `increase effect` to the `action's effects`. + + :param fluent: The `fluent` which `value` is increased. + :param value: The given `fluent` is incremented by the given `value`. + :param condition: The `condition` in which this `effect` is applied; the default + value is `True`. + :param forall: The 'Variables' that are universally quantified in this + effect; the default value is empty. + """ + ( + fluent_exp, + value_exp, + condition_exp, + ) = self._environment.expression_manager.auto_promote( + fluent, + value, + condition, + ) + if not fluent_exp.is_fluent_exp() and not fluent_exp.is_dot(): + raise UPUsageError( + "fluent field of add_increase_effect must be a Fluent or a FluentExp or a Dot." + ) + if not condition_exp.type.is_bool_type(): + raise UPTypeError("Effect condition is not a Boolean condition!") + if not fluent_exp.type.is_compatible(value_exp.type): + raise UPTypeError( + f"InstantaneousAction effect has an incompatible value type. Fluent type: {fluent_exp.type} // Value type: {value_exp.type}" + ) + if not fluent_exp.type.is_int_type() and not fluent_exp.type.is_real_type(): + raise UPTypeError("Increase effects can be created only on numeric types!") + self._add_effect_instance( + up.model.effect.Effect( + fluent_exp, + value_exp, + condition_exp, + kind=up.model.effect.EffectKind.INCREASE, + forall=forall, + ) + ) + + def add_decrease_effect( + self, + fluent: Union["up.model.fnode.FNode", "up.model.fluent.Fluent"], + value: "up.model.expression.Expression", + condition: "up.model.expression.BoolExpression" = True, + forall: Iterable["up.model.variable.Variable"] = tuple(), + ): + """ + Adds the given `decrease effect` to the `action's effects`. + + :param fluent: The `fluent` which value is decreased. + :param value: The given `fluent` is decremented by the given `value`. + :param condition: The `condition` in which this `effect` is applied; the default + value is `True`. + :param forall: The 'Variables' that are universally quantified in this + effect; the default value is empty. + """ + ( + fluent_exp, + value_exp, + condition_exp, + ) = self._environment.expression_manager.auto_promote(fluent, value, condition) + if not fluent_exp.is_fluent_exp() and not fluent_exp.is_dot(): + raise UPUsageError( + "fluent field of add_decrease_effect must be a Fluent or a FluentExp or a Dot." + ) + if not condition_exp.type.is_bool_type(): + raise UPTypeError("Effect condition is not a Boolean condition!") + if not fluent_exp.type.is_compatible(value_exp.type): + raise UPTypeError( + f"InstantaneousAction effect has an incompatible value type. Fluent type: {fluent_exp.type} // Value type: {value_exp.type}" + ) + if not fluent_exp.type.is_int_type() and not fluent_exp.type.is_real_type(): + raise UPTypeError("Decrease effects can be created only on numeric types!") + self._add_effect_instance( + up.model.effect.Effect( + fluent_exp, + value_exp, + condition_exp, + kind=up.model.effect.EffectKind.DECREASE, + forall=forall, + ) + ) + + def _add_effect_instance(self, effect: "up.model.effect.Effect"): + assert ( + effect.environment == self._environment + ), "effect does not have the same environment of the action" + up.model.effect.check_conflicting_effects( + effect, + None, + self._simulated_effect, + self._fluents_assigned, + self._fluents_inc_dec, + "action", + ) + self._effects.append(effect) + + @property + def simulated_effect(self) -> Optional["up.model.effect.SimulatedEffect"]: + """Returns the `action` `simulated effect`.""" + return self._simulated_effect + + def set_simulated_effect(self, simulated_effect: "up.model.effect.SimulatedEffect"): + """ + Sets the given `simulated effect` as the only `action's simulated effect`. + + :param simulated_effect: The `SimulatedEffect` instance that must be set as this `action`'s only + `simulated effect`. + """ + up.model.effect.check_conflicting_simulated_effects( + simulated_effect, + None, + self._fluents_assigned, + self._fluents_inc_dec, + "action", + ) + if simulated_effect.environment != self._environment: + raise UPUsageError( + "The added SimulatedEffect does not have the same environment of the Action" + ) + self._simulated_effect = simulated_effect diff --git a/unified_planning/test/examples/processes.py b/unified_planning/test/examples/processes.py index 999e45eb1..74215144a 100644 --- a/unified_planning/test/examples/processes.py +++ b/unified_planning/test/examples/processes.py @@ -1,5 +1,4 @@ from unified_planning.shortcuts import * -from unified_planning.model.action import Process from unified_planning.test import TestCase @@ -18,14 +17,14 @@ def get_example_problems(): b = Process("moving") b.add_precondition(on) - b.add_derivative(d, 1) + b.add_increase_continuous_effect(d, 1) problem = Problem("1d_Movement") problem.add_fluent(on) problem.add_fluent(d) problem.add_action(a) - problem.add_action(b) - problem.add_action(evt) + problem.add_process(b) + problem.add_event(evt) problem.set_initial_value(on, False) problem.set_initial_value(d, 0) problem.add_goal(GE(d, 10)) @@ -40,4 +39,56 @@ def get_example_problems(): invalid_plans=[], ) problems["1d_movement"] = test_problem + + problem = Problem("boiling_water") + boiler_on = Fluent("boiler_on") + temperature = Fluent("temperature", RealType()) + water_level = Fluent("water_level", RealType()) + chimney_vent_open = Fluent("chimney_vent_open") + + turn_on_boiler = InstantaneousAction("turn_on_boiler") + turn_on_boiler.add_precondition(Not(boiler_on)) + turn_on_boiler.add_effect(boiler_on, True) + + water_heating = Process("water_heating") + water_heating.add_precondition(And(boiler_on, LE(temperature, 100))) + water_heating.add_increase_continuous_effect(temperature, 1) + + water_boiling = Process("water_boiling") + water_boiling.add_precondition(And(boiler_on, GE(temperature, 100))) + water_boiling.add_decrease_continuous_effect(water_level, 1) + + open_chimney_vent_auto = Event("open_chimney_vent_auto") + open_chimney_vent_auto.add_precondition( + And(Not(chimney_vent_open), GE(temperature, 100)) + ) + open_chimney_vent_auto.add_effect(chimney_vent_open, True) + + turn_off_boiler_auto = Event("turn_off_boiler_auto") + turn_off_boiler_auto.add_precondition(And(LE(water_level, 0), boiler_on)) + turn_off_boiler_auto.add_effect(boiler_on, False) + + problem.add_fluent(boiler_on) + problem.set_initial_value(boiler_on, False) + problem.add_fluent(chimney_vent_open) + problem.set_initial_value(chimney_vent_open, False) + problem.add_fluent(temperature) + problem.set_initial_value(temperature, 20) + problem.add_fluent(water_level) + problem.set_initial_value(water_level, 10) + problem.add_action(turn_on_boiler) + problem.add_process(water_heating) + problem.add_process(water_boiling) + problem.add_event(open_chimney_vent_auto) + problem.add_event(turn_off_boiler_auto) + problem.add_goal(And(Not(boiler_on), And(chimney_vent_open, LE(water_level, 2)))) + + test_problem = TestCase( + problem=problem, + solvable=True, + valid_plans=[], + invalid_plans=[], + ) + problems["boiling_water"] = test_problem + return problems diff --git a/unified_planning/test/test_model.py b/unified_planning/test/test_model.py index dca8aa511..dbb399668 100644 --- a/unified_planning/test/test_model.py +++ b/unified_planning/test/test_model.py @@ -22,7 +22,7 @@ ) from unified_planning.test.examples import get_example_problems from unified_planning.test import unittest_TestCase, main -from unified_planning.model.action import InstantaneousTransitionMixin +from unified_planning.model.action import InstantaneousAction class TestModel(unittest_TestCase): @@ -71,7 +71,7 @@ def test_clone_problem_and_action(self): for action_1, action_2 in zip( problem_clone_1.actions, problem_clone_2.actions ): - if isinstance(action_2, InstantaneousTransitionMixin): + if isinstance(action_2, InstantaneousAction): action_2._effects = [] action_1_clone = action_1.clone() action_1_clone._effects = [] @@ -83,6 +83,10 @@ def test_clone_problem_and_action(self): action_2._effects = [] action_1_clone = action_1.clone() action_1_clone._effects = [] + elif isinstance(action_2, Event): + action_2._effects = [] + action_1_clone = action_1.clone() + action_1_clone._effects = [] else: raise NotImplementedError self.assertEqual(action_2, action_1_clone) @@ -122,9 +126,12 @@ def test_process(self): x = Fluent("x", IntType()) move = Process("moving", car=Vehicle) move.add_precondition(a) - move.add_derivative(x, 1) + move.add_increase_continuous_effect(x, 1) e = Effect( - FluentExp(x), Int(1), TRUE(), unified_planning.model.EffectKind.DERIVATIVE + FluentExp(x), + Int(1), + TRUE(), + unified_planning.model.EffectKind.CONTINUOUS_INCREASE, ) self.assertEqual(move.effects[0], e) self.assertEqual(move.name, "moving") diff --git a/unified_planning/test/test_pddl_io.py b/unified_planning/test/test_pddl_io.py index b39984fcd..71fe65539 100644 --- a/unified_planning/test/test_pddl_io.py +++ b/unified_planning/test/test_pddl_io.py @@ -14,8 +14,8 @@ import os import tempfile -from typing import cast import pytest +from typing import cast import unified_planning from unified_planning.shortcuts import * from unified_planning.test import ( @@ -442,17 +442,18 @@ def test_non_linear_car(self): self.assertTrue(problem is not None) self.assertEqual(len(problem.fluents), 8) - self.assertEqual( - len(list([ele for ele in problem.actions if isinstance(ele, Process)])), 3 - ) - self.assertEqual( - len(list([ele for ele in problem.actions if isinstance(ele, Event)])), 1 - ) + n_proc = len(list([el for el in problem.processes if isinstance(el, Process)])) + n_eve = len(list([el for el in problem.events if isinstance(el, Event)])) + self.assertEqual(n_proc, 3) + self.assertEqual(n_eve, 1) found_drag_ahead = False - for ele in problem.actions: + for ele in problem.processes: if isinstance(ele, Process): for e in ele.effects: - self.assertEqual(e.kind, EffectKind.DERIVATIVE) + self.assertTrue( + (e.kind == EffectKind.CONTINUOUS_INCREASE) + or (e.kind == EffectKind.CONTINUOUS_DECREASE) + ) if ele.name == "drag_ahead": found_drag_ahead = True self.assertTrue("engine_running" in str(ele)) @@ -595,6 +596,14 @@ def test_examples_io(self): ) ) self.assertEqual(len(problem.actions), len(parsed_problem.actions)) + self.assertEqual( + len(problem.processes), + len(parsed_problem.processes), + ) + self.assertEqual( + len(problem.events), + len(parsed_problem.events), + ) for a in problem.actions: parsed_a = parsed_problem.action(w.get_pddl_name(a)) self.assertEqual(a, w.get_item_named(parsed_a.name)) diff --git a/unified_planning/test/test_problem.py b/unified_planning/test/test_problem.py index 2189f311c..c031853cf 100644 --- a/unified_planning/test/test_problem.py +++ b/unified_planning/test/test_problem.py @@ -531,6 +531,7 @@ def test_simple_numeric_planning_kind(self): "sched:resource_set", "sched:jobshop-ft06-operators", "1d_Movement", + "boiling_water", ] for example in self.problems.values(): problem = example.problem @@ -606,6 +607,21 @@ def test_undefined_initial_state(self): pb_name, ) + def test_natural_transitions(self): + p = self.problems["1d_movement"].problem + print(p) + self.assertTrue(p.has_process("moving")) + self.assertTrue(p.has_event("turn_off_automatically")) + print(p.process("moving")) + print(p.event("turn_off_automatically")) + p.clear_events() + p.clear_processes() + self.assertEqual(len(p.natural_transitions), 0) + p_boiling_water = self.problems["boiling_water"].problem + self.assertFalse(p_boiling_water.kind.has_non_linear_continuous_effects()) + self.assertTrue(p_boiling_water.kind.has_increase_continuous_effects()) + self.assertTrue(p_boiling_water.kind.has_decrease_continuous_effects()) + if __name__ == "__main__": main()