Skip to content

Commit

Permalink
data-store graph window efficiency/refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Jan 22, 2023
1 parent fb63772 commit c20d9b5
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 98 deletions.
216 changes: 120 additions & 96 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
TASK_STATUSES_ORDERED
)
from cylc.flow.task_state_prop import extract_group_state
from cylc.flow.taskdef import generate_graph_parents
from cylc.flow.taskdef import generate_graph_parents, generate_graph_children
from cylc.flow.task_state import TASK_STATUSES_FINAL
from cylc.flow.util import (
serialise,
Expand Down Expand Up @@ -651,8 +651,16 @@ def generate_definition_elements(self):
self.parents = parents

def increment_graph_window(
self, itask, edge_distance=0, active_id=None,
descendant=False, is_parent=False):
self,
name,
point,
flow_nums,
edge_distance=0,
active_id=None,
descendant=False,
is_parent=False,
is_manual_submit=False
):
"""Generate graph window about given origin to n-edge-distance.
Args:
Expand All @@ -671,12 +679,15 @@ def increment_graph_window(
"""
# Create this source node
s_tokens = self.id_.duplicate(itask.tokens)
s_tokens = self.id_.duplicate(
cycle=str(point),
task=name,
)
if active_id is None:
active_id = s_tokens.id

# flag manual triggers for pruning on deletion.
if itask.is_manual_submit:
if is_manual_submit:
self.prune_trigger_nodes.setdefault(active_id, set()).add(
s_tokens.id
)
Expand All @@ -700,8 +711,21 @@ def increment_graph_window(
active_id
].setdefault(edge_distance, set()).add(s_tokens.id)
return

# Generate task proxy node
is_orphan, graph_children = self.generate_ghost_task(
s_tokens,
point,
flow_nums,
is_parent
)

tdef = self.schd.config.taskdefs[name]
if graph_children is None:
graph_children = generate_graph_children(tdef, point)

if (
(not any(itask.graph_children.values()) and descendant)
(not any(graph_children.values()) and descendant)
or self.n_edge_distance == 0
):
self.n_window_boundary_nodes[
Expand All @@ -710,40 +734,63 @@ def increment_graph_window(

self.n_window_nodes[active_id].add(s_tokens.id)

# Generate task proxy node
is_orphan = self.generate_ghost_task(s_tokens.id, itask, is_parent)

edge_distance += 1

# Don't expand window about orphan task.
if not is_orphan:
# TODO: xtrigger is workflow_state edges too
# Reference set for workflow relations
for items in itask.graph_children.values():
if edge_distance == 1:
descendant = True
self._expand_graph_window(
s_tokens,
items,
active_id,
itask.flow_nums,
edge_distance,
descendant,
False,
)
final_point = self.schd.config.final_point
if edge_distance == 1:
descendant = True
# Children/downstream nodes
for items in graph_children.values():
for t_name, t_point, _ in items:
if t_point > final_point:
continue
t_tokens = self.id_.duplicate(
cycle=str(t_point),
task=t_name,
)
# We still increment the graph one further to find
# boundary nodes, but don't create elements.
if edge_distance <= self.n_edge_distance:
self.generate_edge(s_tokens, t_tokens, active_id)
if t_tokens.id in self.n_window_nodes[active_id]:
continue
self.increment_graph_window(
t_name,
t_point,
flow_nums,
edge_distance,
active_id,
descendant,
False
)

for items in generate_graph_parents(
itask.tdef, itask.point
).values():
self._expand_graph_window(
s_tokens,
items,
active_id,
itask.flow_nums,
edge_distance,
False,
True,
)
# Parents/upstream nodes
for items in generate_graph_parents(tdef, point).values():
for t_name, t_point, _ in items:
if t_point > final_point:
continue
t_tokens = self.id_.duplicate(
cycle=str(t_point),
task=t_name,
)
if edge_distance <= self.n_edge_distance:
# reverse for parent
self.generate_edge(t_tokens, s_tokens, active_id)
if t_tokens.id in self.n_window_nodes[active_id]:
continue
self.increment_graph_window(
t_name,
t_point,
flow_nums,
edge_distance,
active_id,
False,
True
)

# If this is the active task (edge_distance has been incremented),
# then add the most distant child as a trigger to prune it.
Expand All @@ -762,65 +809,29 @@ def increment_graph_window(
getattr(self.updated[WORKFLOW], EDGES).edges.extend(
self.n_window_edges[active_id])

def _expand_graph_window(
self,
s_tokens,
items,
active_id,
flow_nums,
edge_distance,
descendant=False,
is_parent=False
):
def generate_edge(self, s_tokens, t_tokens, active_id):
"""Construct nodes/edges for children/parents of source node."""
final_point = self.schd.config.final_point
for t_name, t_point, _ in items:
if t_point > final_point:
continue
t_tokens = self.id_.duplicate(
cycle=str(t_point),
task=t_name,
# Initiate edge element.
e_id = self.edge_id(s_tokens, t_tokens)
if e_id in self.n_window_edges[active_id]:
return
if (
e_id not in self.data[self.workflow_id][EDGES]
and e_id not in self.added[EDGES]
):
self.added[EDGES][e_id] = PbEdge(
id=e_id,
source=s_tokens.id,
target=t_tokens.id
)
# Initiate edge element.
if is_parent:
e_id = self.edge_id(t_tokens, s_tokens)
else:
e_id = self.edge_id(s_tokens, t_tokens)
if e_id in self.n_window_edges[active_id]:
continue
if (
e_id not in self.data[self.workflow_id][EDGES]
and e_id not in self.added[EDGES]
and edge_distance <= self.n_edge_distance
):
if is_parent:
self.added[EDGES][e_id] = PbEdge(
id=e_id,
source=t_tokens.id,
target=s_tokens.id
)
else:
self.added[EDGES][e_id] = PbEdge(
id=e_id,
source=s_tokens.id,
target=t_tokens.id
)
# Add edge id to node field for resolver reference
self.updated[TASK_PROXIES].setdefault(
t_tokens.id,
PbTaskProxy(id=t_tokens.id)).edges.append(e_id)
self.updated[TASK_PROXIES].setdefault(
s_tokens.id,
PbTaskProxy(id=s_tokens.id)).edges.append(e_id)
self.n_window_edges[active_id].add(e_id)
if t_tokens.id in self.n_window_nodes[active_id]:
continue
self.increment_graph_window(
TaskProxy(
self.schd.config.get_taskdef(t_name),
t_point, flow_nums, submit_num=0
),
edge_distance, active_id, descendant, is_parent)
# Add edge id to node field for resolver reference
self.updated[TASK_PROXIES].setdefault(
t_tokens.id,
PbTaskProxy(id=t_tokens.id)).edges.append(e_id)
self.updated[TASK_PROXIES].setdefault(
s_tokens.id,
PbTaskProxy(id=s_tokens.id)).edges.append(e_id)
self.n_window_edges[active_id].add(e_id)

def remove_pool_node(self, name, point):
"""Remove ID reference and flag isolate node/branch for pruning."""
Expand Down Expand Up @@ -854,7 +865,7 @@ def add_pool_node(self, name, point):
).id
self.all_task_pool.add(tp_id)

def generate_ghost_task(self, tp_id, itask, is_parent=False):
def generate_ghost_task(self, tokens, point, flow_nums, is_parent=False):
"""Create task-point element populated with static data.
Args:
Expand All @@ -870,17 +881,30 @@ def generate_ghost_task(self, tp_id, itask, is_parent=False):
True/False
"""
name = itask.tdef.name
name = tokens['task']
point_string = tokens['cycle']
t_id = self.definition_id(name)
point_string = f'{itask.point}'
tp_id = tokens.id
task_proxies = self.data[self.workflow_id][TASK_PROXIES]

is_orphan = False
if name not in self.schd.config.taskdefs:
is_orphan = True

itask = self.schd.pool.get_task(point_string, name)
if tp_id in task_proxies or tp_id in self.added[TASK_PROXIES]:
return is_orphan
if itask is None:
return is_orphan, None
return is_orphan, itask.graph_children

if itask is None:
itask = TaskProxy(
self.schd.config.get_taskdef(name),
point,
flow_nums,
submit_num=0,
data_mode=True
)

if is_orphan:
self.generate_orphan_task(itask)
Expand All @@ -893,7 +917,7 @@ def generate_ghost_task(self, tp_id, itask, is_parent=False):
task_def = self.added[TASKS][t_id]
except KeyError:
# Task removed from workflow definition.
return False
return False, itask.graph_children

update_time = time()
tp_stamp = f'{tp_id}@{update_time}'
Expand Down Expand Up @@ -961,7 +985,7 @@ def generate_ghost_task(self, tp_id, itask, is_parent=False):

self.updates_pending = True

return is_orphan
return is_orphan, itask.graph_children

def generate_orphan_task(self, itask):
"""Generate orphan task definition."""
Expand Down
15 changes: 14 additions & 1 deletion cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,12 @@ def create_data_store_elements(self, itask):
# Register pool node reference
self.data_store_mgr.add_pool_node(itask.tdef.name, itask.point)
# Create new data-store n-distance graph window about this task
self.data_store_mgr.increment_graph_window(itask)
self.data_store_mgr.increment_graph_window(
itask.tdef.name,
itask.point,
itask.flow_nums,
is_manual_submit=itask.is_manual_submit
)
self.data_store_mgr.delta_task_state(itask)
self.data_store_mgr.delta_task_held(itask)
self.data_store_mgr.delta_task_queued(itask)
Expand Down Expand Up @@ -741,6 +746,14 @@ def get_tasks_by_point(self):

return point_itasks

def get_task(self, point, name):
"""Retrieve a task from the pool."""
rel_id = f'{point}/{name}'
for pool in (self.main_pool, self.hidden_pool):
tasks = pool.get(point)
if tasks and rel_id in tasks:
return tasks[rel_id]

def _get_hidden_task_by_id(self, id_: str) -> Optional[TaskProxy]:
"""Return runahead pool task by ID if it exists, or None."""
for itask_ids in list(self.hidden_pool.values()):
Expand Down
7 changes: 6 additions & 1 deletion cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ class TaskProxy:
is_held: True if the task is held, else False.
submit_num: Number of times the task has attempted job submission.
is_late: Is the task late?
data_mode: Reduced store reference data.
"""

# Memory optimization - constrain possible attributes to this list.
Expand Down Expand Up @@ -193,6 +194,7 @@ def __init__(
is_late: bool = False,
is_manual_submit: bool = False,
flow_wait: bool = False,
data_mode: bool = False,
) -> None:

self.tdef = tdef
Expand Down Expand Up @@ -235,7 +237,10 @@ def __init__(

self.local_job_file_path: Optional[str] = None

self.platform = get_platform()
if data_mode:
self.platform = {}
else:
self.platform = get_platform()

self.job_vacated = False
self.poll_timer: Optional['TaskActionTimer'] = None
Expand Down

0 comments on commit c20d9b5

Please sign in to comment.