Skip to content

Commit

Permalink
Merge pull request #697 from huangworld/dspash-future
Browse files Browse the repository at this point in the history
Naive Fault Tolerance Support
  • Loading branch information
huangworld authored Nov 29, 2023
2 parents 5dee7dd + d4512b0 commit 5b924ec
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 23 deletions.
6 changes: 6 additions & 0 deletions compiler/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ def add_common_arguments(parser):
parser.add_argument("--version",
action='version',
version='%(prog)s {version}'.format(version=__version__))
parser.add_argument("--worker_timeout",
help="determines if we will mock a timeout for worker node.",
default="")
parser.add_argument("--worker_timeout_choice",
help="determines which worker node will be timed out.",
default="")
return


Expand Down
12 changes: 9 additions & 3 deletions compiler/dspash/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
import argparse
import requests
import time

DISH_TOP = os.environ['DISH_TOP']
PASH_TOP = os.environ['PASH_TOP']
Expand Down Expand Up @@ -128,26 +129,31 @@ def manage_connection(conn, addr):
if not data:
break

print("got new request")
request = decode_request(data)

print("got new request")
request = decode_request(data)
if request['type'] == 'Exec-Graph':
graph, shell_vars, functions = parse_exec_graph(request)
debug = True if request['debug'] else False
save_configs(graph, dfs_configs_paths)
time.sleep(int(request['worker_timeout']))
rc = exec_graph(graph, shell_vars, functions, debug)
rcs.append((rc, request))
body = {}
elif request['type'] == 'Done':
print("Received 'Done' signal. Closing connection from the worker.")
break
else:
print(f"Unsupported request {request}")
send_success(conn, body)
print("connection ended")

# Ensure subprocesses have finished, and releasing corresponding resources
for rc, request in rcs:
if request['debug']:
send_log(rc, request)
else:
rc.wait()
print("connection ended")


def parse_args():
Expand Down
1 change: 0 additions & 1 deletion compiler/dspash/worker.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#!/bin/bash

# trap ctrl-c and call ctrl_c()
trap cleanup INT

Expand Down
74 changes: 55 additions & 19 deletions compiler/dspash/worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,34 @@ def get_running_processes(self):
# answer = self.socket.recv(1024)
return self._running_processes

def send_graph_exec_request(self, graph, shell_vars, functions, debug=False) -> bool:
def send_graph_exec_request(self, graph, shell_vars, functions, debug=False, worker_timeout=0) -> bool:
request_dict = { 'type': 'Exec-Graph',
'graph': graph,
'functions': functions,
'shell_variables': None, # Doesn't seem needed for now
'debug': None
'debug': None,
'worker_timeout': worker_timeout
}
if debug:
request_dict['debug'] = {'name': self.name, 'url': f'{DEBUG_URL}/putlog'}

request = encode_request(request_dict)
#TODO: do I need to open and close connection?
log(self._socket, self._port)
send_msg(self._socket, request)
# TODO wait until the command exec finishes and run this in parallel?
response_data = recv_msg(self._socket)
retries = 0
MAX_RETRIES = 2
RETRY_DELAY = 1
self._socket.settimeout(5)
response_data = None
while retries < MAX_RETRIES and not response_data:
try:
response_data = recv_msg(self._socket)
except socket.timeout:
log(f"Timeout encountered. Retry {retries + 1} of {MAX_RETRIES}.")
retries += 1
time.sleep(RETRY_DELAY)
if not response_data or decode_request(response_data)['status'] != "OK":
raise Exception(f"didn't recieved ack on request {response_data}")
else:
Expand All @@ -68,8 +81,9 @@ def send_graph_exec_request(self, graph, shell_vars, functions, debug=False) ->
return True

def close(self):
self._socket.send("Done")
self._socket.send(encode_request({"type": "Done"}))
self._socket.close()
self._online = False

def _wait_ack(self):
confirmation = self._socket.recv(4096)
Expand Down Expand Up @@ -147,22 +161,44 @@ def run(self):
elif request.startswith("Exec-Graph"):
args = request.split(':', 1)[1].strip()
filename, declared_functions_file = args.split()
numExecutedSubgraphs = 0
numTotalSubgraphs = None
crashed_worker = workers_manager.args.worker_timeout_choice if workers_manager.args.worker_timeout_choice != '' else "worker1" # default to be worker1
try:
while not numTotalSubgraphs or numExecutedSubgraphs < numTotalSubgraphs:
# In the naive fault tolerance, we want all workers to receive its subgraph(s) without crashing
# if a crash happens, we'll re-split the IR and do it again until scheduling is done without any crash.
numExecutedSubgraphs = 0
worker_subgraph_pairs, shell_vars, main_graph = prepare_graph_for_remote_exec(filename, workers_manager.get_worker)
if numTotalSubgraphs == None:
numTotalSubgraphs = len(worker_subgraph_pairs)
script_fname = to_shell_file(main_graph, workers_manager.args)
log("Master node graph stored in ", script_fname)

# Read functions
log("Functions stored in ", declared_functions_file)
declared_functions = read_file(declared_functions_file)

# Execute subgraphs on workers
for worker, subgraph in worker_subgraph_pairs:
worker_timeout = workers_manager.args.worker_timeout if worker.name == crashed_worker and workers_manager.args.worker_timeout else 0

try:
worker.send_graph_exec_request(subgraph, shell_vars, declared_functions, workers_manager.args.debug, worker_timeout)
numExecutedSubgraphs += 1
except Exception as e:
# worker timeout
worker.close()
log(f"{worker} closed")
# Report to main shell a script to execute
# Delay this to the very end when every worker has received the subgraph
response_msg = f"OK {script_fname}"
dspash_socket.respond(response_msg, conn)
except Exception as e:
print(e)



worker_subgraph_pairs, shell_vars, main_graph = prepare_graph_for_remote_exec(filename, workers_manager.get_worker)
script_fname = to_shell_file(main_graph, workers_manager.args)
log("Master node graph stored in ", script_fname)

# Read functions
log("Functions stored in ", declared_functions_file)
declared_functions = read_file(declared_functions_file)

# Report to main shell a script to execute
response_msg = f"OK {script_fname}"
dspash_socket.respond(response_msg, conn)

# Execute subgraphs on workers
for worker, subgraph in worker_subgraph_pairs:
worker.send_graph_exec_request(subgraph, shell_vars, declared_functions, workers_manager.args.debug)
else:
raise Exception(f"Unknown request: {request}")

Expand Down
10 changes: 10 additions & 0 deletions compiler/pash_init_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ export pash_parallel_pipelines=0
export pash_daemon_communicates_through_unix_pipes_flag=0
export show_version=0
export distributed_exec=0
export worker_timeout=0
export worker_timeout_choice=0

for item in "$@"
do
Expand Down Expand Up @@ -102,6 +104,14 @@ do
if [ "--distributed_exec" == "$item" ]; then
export distributed_exec=1
fi

if [ "--worker_timeout" == "$item" ]; then
export worker_timeout=1
fi

if [ "--worker_timeout_choice" == "$item" ]; then
export worker_timeout_choice=1
fi
done

## `pash_redir_output` and `pash_redir_all_output` are strictly for logging.
Expand Down

0 comments on commit 5b924ec

Please sign in to comment.