diff --git a/test_quick.py b/test_ttp_to_stn.py similarity index 95% rename from test_quick.py rename to test_ttp_to_stn.py index 9db4bc789..d196caef1 100644 --- a/test_quick.py +++ b/test_ttp_to_stn.py @@ -33,7 +33,7 @@ u, v, ), - [float(d["interval"][0]), float(d["interval"][0])], + [float(d["interval"][0]), float(d["interval"][1])], ) for u, v, d in ttp_to_stn.stn.edges(data=True) ] diff --git a/unified_planning/plans/ttp_to_stn.py b/unified_planning/plans/ttp_to_stn.py index e7acd740b..36115a1a8 100644 --- a/unified_planning/plans/ttp_to_stn.py +++ b/unified_planning/plans/ttp_to_stn.py @@ -11,6 +11,13 @@ from unified_planning.plans.partial_order_plan import PartialOrderPlan from unified_planning.plot.plan_plot import plot_partial_order_plan +EPSILON = Fraction(1, 1000) +MAX_TIME = Fraction(5000, 1) + +import operator + +ops = {"+": operator.add, "-": operator.sub} + def is_time_in_interv( start: Fraction, @@ -18,6 +25,9 @@ def is_time_in_interv( timing: "up.model.timing.Timing", interval: "up.model.timing.TimeInterval", ) -> bool: + """ + Return if the is in the interval. + """ time_pt = _absolute_time(timing, start=start, duration=duration) upper_time = _absolute_time(interval._upper, start=start, duration=duration) lower_time = _absolute_time(interval._lower, start=start, duration=duration) @@ -81,7 +91,8 @@ def get_table_event( ], ]: """ - Return table : For each action (the tuple) and each timepoint of this action the couple conditions effects""" + Return table : For each action (the tuple) and each timepoint of this action the couple conditions effects + """ for start, ai, duration in self.ttp.timed_actions: action = ai.action action_cpl = (start, ai, duration) @@ -119,7 +130,7 @@ def table_to_events(self) -> List[Tuple[Fraction, InstantaneousAction]]: time = _absolute_time(time_pt, start, duration) inst_action = InstantaneousAction(str(action) + str(time_pt)) - # set condition and effect to each instant action + # set conditions and effects to each Instantaneous Action if len(cond_eff_couple[0]) > 0: for cond in cond_eff_couple[0]: inst_action.add_precondition(cond) @@ -132,6 +143,9 @@ def table_to_events(self) -> List[Tuple[Fraction, InstantaneousAction]]: return self.events def sort_events(self) -> List[Tuple[Fraction, InstantaneousAction]]: + """ + Return the table of instant action sorted by time. + """ self.table_to_events() self.events_sorted = sorted(self.events, key=lambda acts: acts[0]) @@ -142,6 +156,9 @@ def sort_events(self) -> List[Tuple[Fraction, InstantaneousAction]]: return self.events_sorted def events_to_partial_order_plan(self): + """ + Return the partial order plan from the list of events sorted. + """ self.sort_events() list_act = [ActionInstance(i[1]) for i in self.events_sorted] seqplan = SequentialPlan(list_act) @@ -151,6 +168,10 @@ def events_to_partial_order_plan(self): return partial_order_plan def partial_order_plan_to_stn(self) -> nx.DiGraph: + """ + Stn with Start and End nodes + Return the stn from the partial order plan. + """ graph = self.partial_order_plan._graph for ai_current, l_next_ai in self.partial_order_plan.get_adjacency_list.items(): @@ -158,14 +179,26 @@ def partial_order_plan_to_stn(self) -> nx.DiGraph: item[0] for item in self.events_sorted if item[1] == ai_current.action ][0] for ai_next in l_next_ai: - ai_next_time = [ - item[0] for item in self.events_sorted if item[1] == ai_next.action - ][0] - time_edge = ai_next_time - ai_current_time - graph[ai_current][ai_next]["interval"] = [ - time_edge, - time_edge, - ] # TODO get time with interval correctly + name_current = re.findall( + r"(.+)(start|end) *(\+|\-)* *(\d*)", str(ai_current.action._name) + ) + name_next = re.findall( + r"(.+)(start|end) *(\+|\-)* *(\d*)", str(ai_next.action._name) + ) + # Time between two differents actions depend on epsilon + if name_current[0][0] != name_next[0][0]: + graph[ai_current][ai_next]["interval"] = [ + EPSILON, + MAX_TIME, + ] + else: + ai_next_time = [ + item[0] + for item in self.events_sorted + if item[1] == ai_next.action + ][0] + time_edge = ai_next_time - ai_current_time + graph[ai_current][ai_next]["interval"] = [time_edge, time_edge] # Add start and end nodes @@ -173,11 +206,14 @@ def partial_order_plan_to_stn(self) -> nx.DiGraph: end = ActionInstance(InstantaneousAction("END")) graph.add_node(start) graph.add_node(end) + + # Add edge between start and first node graph.add_edge( start, list(self.partial_order_plan.get_adjacency_list.keys())[0], interval=[0.0, 0.0], ) + # Add edge between end and last node graph.add_edge( [ item @@ -194,6 +230,10 @@ def partial_order_plan_to_stn(self) -> nx.DiGraph: return graph def stn_clean(self, graph: nx.DiGraph): + """ + Delete nodes of actions in between start and end. And create new edges from edges deleted. + It clean the graph given in place. + """ nodes_list = list(graph.nodes(data=True)) to_jump = [] @@ -202,7 +242,9 @@ def stn_clean(self, graph: nx.DiGraph): continue neighbors_node = copy.copy(graph.neighbors(node)) for next_node in neighbors_node: - if bool(re.match("(.*)(end|start) \+ [0-9]+$", next_node.action._name)): + if bool( + re.match("(.*)(end|start) (\+|\-) [0-9]+$", next_node.action._name) + ): neighbors_next_node = copy.copy(graph.neighbors(next_node)) for next_next_node in neighbors_next_node: if next_next_node == node: @@ -214,13 +256,9 @@ def stn_clean(self, graph: nx.DiGraph): a[0] + b[0], a[1] + b[1], ] - graph.remove_edge( - next_node, next_next_node - ) # is it usefull ? because delete node at the end + graph.remove_edge(next_node, next_next_node) - graph.remove_edge( - node, next_node - ) # is it usefull ? because delete node at the end + graph.remove_edge(node, next_node) graph.remove_node(next_node) to_jump += [item[0] for item in nodes_list if item[0] == next_node]