Skip to content

Commit

Permalink
fix references to file_id_gen
Browse files Browse the repository at this point in the history
  • Loading branch information
Forthoney committed Nov 25, 2023
1 parent 0191365 commit bd155d1
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 183 deletions.
4 changes: 2 additions & 2 deletions compiler/ast_to_ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ def compile_node_command(ast_node, fileIdGen, config):

try:
## If the command is not compileable to a DFG the following call will fail
ir = fileIdGen.compile_command_to_DFG(
command_name, options, redirections=compiled_redirections
ir = compile_command_to_DFG(
fileIdGen, command_name, options, redirections=compiled_redirections
)
compiled_ast = ir
except ValueError as err:
Expand Down
22 changes: 6 additions & 16 deletions compiler/dspash/ir_helper.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
import argparse
import sys
import socket
import pickle
import traceback
from datetime import datetime
from typing import List, Set, Tuple, Dict, Callable
from typing import List, Tuple, Dict, Callable
from uuid import uuid4

sys.path.append("/pash/compiler")

import config
from ir import *
from ast_to_ir import compile_asts
from ir_to_ast import to_shell
from util import *
from dspash.hdfs_utils import HDFSFileConfig
Expand All @@ -21,13 +17,7 @@
from definitions.ir.nodes.eager import *
from definitions.ir.nodes.pash_split import *

import definitions.ir.nodes.r_merge as r_merge
import definitions.ir.nodes.r_split as r_split
import definitions.ir.nodes.r_unwrap as r_unwrap
import definitions.ir.nodes.dgsh_tee as dgsh_tee
import definitions.ir.nodes.remote_pipe as remote_pipe
import shlex
import subprocess
import pash_compiler
from collections import deque, defaultdict

Expand Down Expand Up @@ -140,7 +130,7 @@ def split_ir(graph: IR) -> Tuple[List[IR], Dict[int, IR]]:
# If subgraph is empty and edge isn't ephemeral the edge needs to be added
if not input_fid.get_ident() in subgraph.edges:
new_fid = input_fid
subgraph.add_to_edge(new_fid, node_id)
subgraph.add_edge(new_fid, to_edge=node_id)
input_edge_id = new_fid.get_ident()
else:
input_edge_id = input_fid.get_ident()
Expand All @@ -150,13 +140,13 @@ def split_ir(graph: IR) -> Tuple[List[IR], Dict[int, IR]]:

# Add edges coming out of the node
for output_fid in output_fids:
subgraph.add_from_edge(node_id, output_fid)
subgraph.add_edge(node_id, from_edge=output_fid)
visited_edges.add(output_fid)

# Add edges coming into the node
for input_fid in input_fids:
if input_fid.get_ident() not in subgraph.edges:
subgraph.add_to_edge(input_fid, node_id)
subgraph.add_edge(input_fid, to_edge=node_id)

# Add the node
subgraph.add_node(node)
Expand All @@ -173,7 +163,7 @@ def split_ir(graph: IR) -> Tuple[List[IR], Dict[int, IR]]:
return subgraphs, input_fifo_map


def add_stdout_fid(graph: IR, file_id_gen: FileIdGen) -> FileId:
def add_stdout_fid(graph: IR, file_id_gen: FileIdGenerator) -> FileId:
stdout = file_id_gen.next_file_id()
stdout.set_resource(FileDescriptorResource(("fd", 1)))
graph.add_edge(stdout)
Expand All @@ -182,7 +172,7 @@ def add_stdout_fid(graph: IR, file_id_gen: FileIdGen) -> FileId:

def assign_workers_to_subgraphs(
subgraphs: List[IR],
file_id_gen: FileIdGen,
file_id_gen: FileIdGenerator,
input_fifo_map: Dict[int, IR],
get_worker: Callable,
) -> (IR, Tuple):
Expand Down
95 changes: 9 additions & 86 deletions compiler/file_id_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,9 @@
StdDescriptorWithIOInfo,
OptionWithIO,
)
from pash_annotations.annotation_generation.datatypes.ParallelizabilityInfo import (
ParallelizabilityInfo,
)
from pash_annotations.annotation_generation.datatypes.CommandProperties import (
CommandProperties,
)
from pash_annotations.datatypes.CommandInvocationWithIOVars import (
CommandInvocationWithIOVars,
)
from annotations_utils.util_parsing import parse_arg_list_to_command_invocation
from annotations_utils.util_cmd_invocations import (
get_input_output_info_from_cmd_invocation_util,
get_parallelizability_info_from_cmd_invocation_util,
)
from ir import IR


class FileIdGenerator:
Expand Down Expand Up @@ -58,15 +46,7 @@ def next_ephemeral_file_id(self):
file_id.make_ephemeral()
return file_id

def _create_file_id_for_resource(self, resource):
"""
Create a file id for a given resource
"""
file_id = self.next_file_id()
file_id.set_resource(resource)
return file_id

def _add_file_id_vars(self, cmd_with_io):
def add_file_id_vars(self, cmd_with_io):
new_flagoption_list = self._make_flag_opt_list(cmd_with_io.flag_option_list)

new_operand_list = self._make_operand_list(cmd_with_io.operand_list)
Expand All @@ -91,6 +71,14 @@ def _add_file_id_vars(self, cmd_with_io):
)
return command_invocation_with_io_vars, self.dfg_edges

def _create_file_id_for_resource(self, resource):
"""
Create a file id for a given resource
"""
file_id = self.next_file_id()
file_id.set_resource(resource)
return file_id

def _make_operand_list(self, operand_list):
new_op_list = []
for operand in operand_list:
Expand Down Expand Up @@ -123,68 +111,3 @@ def _add_var_for_descriptor(self, operand):
self.dfg_edges[fid_id] = (file_id, None, None)
self.access_map[fid_id] = operand.get_access()
return fid_id

def compile_command_to_DFG(self, command, options, redirections=[]):
command_invocation = parse_arg_list_to_command_invocation(command, options)
io_info = get_input_output_info_from_cmd_invocation_util(command_invocation)
if io_info is None:
raise Exception(
f"InputOutputInformation for {format_arg_chars(command)} not provided so considered side-effectful."
)
elif io_info.has_other_outputs():
raise Exception(
f"Command {format_arg_chars(command)} has outputs other than streaming."
)
para_info = get_parallelizability_info_from_cmd_invocation_util(
command_invocation
)
if para_info is None:
# defaults to no parallelizer's and all properties False
para_info = ParallelizabilityInfo()
cmd_with_io = io_info.apply_input_output_info_to_command_invocation(
command_invocation
)

parallelizer_lst, round_robin, commutative = para_info.unpack_info()
property_dict = [
{
"round_robin_compatible_with_cat": round_robin,
"is_commutative": commutative,
}
]
cmd_related_properties = CommandProperties(property_dict)

## TODO: Make an empty IR and add edges and nodes incrementally (using the methods defined in IR).

## Add all inputs and outputs to the DFG edges
cmd_with_io_vars, dfg_edges = self._add_file_id_vars(cmd_with_io)
com_redirs = redirections
## TODO: Add assignments
com_assignments = []

## Assume: Everything must be completely expanded
## TODO: Add an assertion about that.
dfg_node = DFGNode(
cmd_with_io_vars,
com_redirs=com_redirs,
com_assignments=com_assignments,
parallelizer_list=parallelizer_lst,
cmd_related_properties=cmd_related_properties,
)
# log(f'Dfg node: {dfg_node}')
node_id = dfg_node.get_id()

## Assign the from, to node in edges
for fid_id in dfg_node.get_input_list():
fid, from_node, to_node = dfg_edges[fid_id]
assert to_node is None
dfg_edges[fid_id] = (fid, from_node, node_id)

for fid_id in dfg_node.get_output_list():
fid, from_node, to_node = dfg_edges[fid_id]
assert from_node is None
dfg_edges[fid_id] = (fid, node_id, to_node)

dfg_nodes = {node_id: dfg_node}
dfg = IR(dfg_nodes, dfg_edges)
return dfg
Loading

0 comments on commit bd155d1

Please sign in to comment.