diff --git a/test_quick.py b/test_quick.py new file mode 100644 index 000000000..8396c3f4c --- /dev/null +++ b/test_quick.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python + +from unified_planning.shortcuts import * +up.shortcuts.get_environment().credits_stream = None + +from unified_planning.plans.ttp_to_stn import * +from unified_planning.test.examples import get_example_problems + + +if __name__ == '__main__': + + problems = get_example_problems() + problem = problems["matchcellar"].problem + with OneshotPlanner(problem_kind=problem.kind) as planner: + plan = planner.solve(problem).plan + ttp_to_stn = TTP_to_STN(plan, problem) + ttp_to_stn.get_table_event() + print("Done...") \ No newline at end of file diff --git a/unified_planning/plans/ttp_to_stn.py b/unified_planning/plans/ttp_to_stn.py index e69de29bb..784f4825f 100644 --- a/unified_planning/plans/ttp_to_stn.py +++ b/unified_planning/plans/ttp_to_stn.py @@ -0,0 +1,91 @@ + +from typing import List +import unified_planning as up +from unified_planning.plans.time_triggered_plan import _absolute_time +from unified_planning.plans.time_triggered_plan import * + + +def is_time_in_interv( + start, duration + ,timing: "up.model.timing.Timing", interval: "up.model.timing.TimeInterval" + )-> bool: + time_pt = _absolute_time(timing, start=start, duration=duration) + upper_time = _absolute_time(interval._upper, start=start, duration=duration) + lower_time = _absolute_time(interval._lower, start=start, duration=duration) + if (time_pt > lower_time if interval._is_left_open else time_pt >= lower_time) and (time_pt < upper_time if interval._is_right_open else time_pt <= upper_time): + return True + return False + + +class TTP_to_STN(): + + """ Create a STN from a TimeTriggeredPlan""" + def __init__(self, plan: TimeTriggeredPlan, problem: "up.model.mixins.ObjectsSetMixin"): + self.ttp = plan + self.problem = problem + self.table = {} + self.events = {} + + + def sort_condition(self, start, duration, conditions:{"up.model.timing.TimeInterval", List["up.model.fnode.FNode"]}, effect: ("up.model.timing.Timing", List["up.model.effect.Effect"]) + ) -> ("up.model.timing.Timing", List["up.model.fnode.FNode"]): + """ + From the dict of condition get all conditions for the specific timepoint from the couple timepoint effect. + Return the coresponding timepoint conditions couple. + """ + time_point = effect[0] + cond_result = [] + for time_interval, condition in conditions.items(): + if is_time_in_interv(start, duration, time_point, time_interval): # Time point in interval + cond_result += condition + result = (time_point, cond_result) + return result + + + def get_table_event( + self + ) -> {DurativeAction : {"up.model.timing.Timing" : ("up.model.fnode.FNode", "up.model.effect.Effect")}}: + """ + Return table : For each action (the tuple) and each timepoint of this action the couple conditions effects""" + for start, action, duration in self.ttp.timed_actions: + action_cpl = (start, action, duration) + self.table[action_cpl] = {} + for effect_time, effects in action.action._effects.items(): + pconditions = self.sort_condition(start, duration, action.action._conditions, (effect_time, effects)) + self.table[action_cpl].update({effect_time: (pconditions[1], effects)}) + + + def table_to_events(self) -> {"up.model.timing.TimXepoint": InstantaneousAction}: + """ + Return a list where each time is paired with an Instantaneous Action. + Each Instantaneous Action is created from preconditions and effects from get_table_event() + """ + result = {} + table = self.get_table_event() + for action_cpl, c_e_dict in table: + start = action_cpl[0] + duration = action_cpl[2] + action = action_cpl[1] + for time_pt, cond_eff_couple in c_e_dict: + time = 0 #TODO with time_pt start and duration + inst_action = InstantaneousAction(str(action)+str(time_pt)) + #inst_action.preconditions = cond_eff_couple[0] + inst_action.effects = cond_eff_couple[1] + ## One or another + inst_action.add_precondition(cond_eff_couple[0]) + #inst_action.add_effect(cond_eff_couple[1]) # does not accept list of effects... + result.update({time: inst_action}) + self.events = result + + def sort_events(self) -> {"up.model.timing.Timepoint": InstantaneousAction}: + self.table_to_events() + return self.events + + def events_to_partial_order_plan(self): + self.sort_events() + return + + def run(self): + self.events_to_partial_order_plan() + #partial order plan to STN + return \ No newline at end of file