From c6242674a6ca66ce83efdf8538e667858e75c1fd Mon Sep 17 00:00:00 2001 From: Zhicheng Huang Date: Sun, 19 Nov 2023 16:03:38 -0500 Subject: [PATCH] Naive Fault Tolerance Support 1. Added naive mock of worker crashes through controlled sleep via worker_timeout and worker_timeout_choice flags. 2. Added naive detection of worker crashes in scheduler based on a retry and backoff logic 3. Added naive fault tolerance logic in scheduler - re-split the original IR and distribute to healthy workers to (re)-execute again. 4. Upreved benchmarks based on data branch (though require some additional modifications to make these updates suitable for distr context (will be updated in a separate PR in dish repo). --- compiler/config.py | 6 ++ compiler/dspash/worker.py | 12 ++- compiler/dspash/worker.sh | 1 - compiler/dspash/worker_manager.py | 74 ++++++++++++++----- compiler/pash_init_setup.sh | 11 +++ .../benchmarks/bio/bio-align/genome-diff.sh | 2 +- .../benchmarks/bio/bio-align/genquality.sh | 2 +- evaluation/benchmarks/bio/bio1/setup.sh | 2 +- .../max-temp/max-temp-preprocess.sh | 4 +- evaluation/benchmarks/max-temp/max-temp.sh | 2 +- .../benchmarks/max-temp/temp-analytics.sh | 2 +- evaluation/benchmarks/nlp/input/setup.sh | 2 +- .../benchmarks/oneliners/input/setup.sh | 8 +- .../benchmarks/web-index/input/setup.sh | 3 +- evaluation/intro/input/setup.sh | 4 +- evaluation/other/more-scripts/page-count.sh | 2 +- evaluation/other/more-scripts/spell.sh | 2 +- evaluation/tests/input/setup.sh | 9 ++- evaluation/tests/sed-test.sh | 6 +- 19 files changed, 107 insertions(+), 47 deletions(-) diff --git a/compiler/config.py b/compiler/config.py index 3479cb069..ed0c9eb71 100644 --- a/compiler/config.py +++ b/compiler/config.py @@ -174,6 +174,12 @@ def add_common_arguments(parser): parser.add_argument("--version", action='version', version='%(prog)s {version}'.format(version=__version__)) + parser.add_argument("--worker_timeout", + help="determines if we will mock a timeout for worker node.", + default="") + parser.add_argument("--worker_timeout_choice", + help="determines which worker node will be timed out.", + default="") return diff --git a/compiler/dspash/worker.py b/compiler/dspash/worker.py index 65a61755e..83bfca522 100644 --- a/compiler/dspash/worker.py +++ b/compiler/dspash/worker.py @@ -6,6 +6,7 @@ import os import argparse import requests +import time DISH_TOP = os.environ['DISH_TOP'] PASH_TOP = os.environ['PASH_TOP'] @@ -128,26 +129,31 @@ def manage_connection(conn, addr): if not data: break - print("got new request") - request = decode_request(data) + print("got new request") + request = decode_request(data) if request['type'] == 'Exec-Graph': graph, shell_vars, functions = parse_exec_graph(request) debug = True if request['debug'] else False save_configs(graph, dfs_configs_paths) + time.sleep(int(request['worker_timeout'])) rc = exec_graph(graph, shell_vars, functions, debug) rcs.append((rc, request)) body = {} + elif request['type'] == 'Done': + print("Received 'Done' signal. Closing connection from the worker.") + break else: print(f"Unsupported request {request}") send_success(conn, body) - print("connection ended") + # Ensure subprocesses have finished, and releasing corresponding resources for rc, request in rcs: if request['debug']: send_log(rc, request) else: rc.wait() + print("connection ended") def parse_args(): diff --git a/compiler/dspash/worker.sh b/compiler/dspash/worker.sh index b19ddcdb9..c10875d9d 100644 --- a/compiler/dspash/worker.sh +++ b/compiler/dspash/worker.sh @@ -1,5 +1,4 @@ #!/bin/bash - # trap ctrl-c and call ctrl_c() trap cleanup INT diff --git a/compiler/dspash/worker_manager.py b/compiler/dspash/worker_manager.py index 4fcbbe496..f75ad7d6e 100644 --- a/compiler/dspash/worker_manager.py +++ b/compiler/dspash/worker_manager.py @@ -45,21 +45,34 @@ def get_running_processes(self): # answer = self.socket.recv(1024) return self._running_processes - def send_graph_exec_request(self, graph, shell_vars, functions, debug=False) -> bool: + def send_graph_exec_request(self, graph, shell_vars, functions, debug=False, worker_timeout=0) -> bool: request_dict = { 'type': 'Exec-Graph', 'graph': graph, 'functions': functions, 'shell_variables': None, # Doesn't seem needed for now - 'debug': None + 'debug': None, + 'worker_timeout': worker_timeout } if debug: request_dict['debug'] = {'name': self.name, 'url': f'{DEBUG_URL}/putlog'} request = encode_request(request_dict) #TODO: do I need to open and close connection? + log(self._socket, self._port) send_msg(self._socket, request) # TODO wait until the command exec finishes and run this in parallel? - response_data = recv_msg(self._socket) + retries = 0 + MAX_RETRIES = 2 + RETRY_DELAY = 1 + self._socket.settimeout(5) + response_data = None + while retries < MAX_RETRIES and not response_data: + try: + response_data = recv_msg(self._socket) + except socket.timeout: + log(f"Timeout encountered. Retry {retries + 1} of {MAX_RETRIES}.") + retries += 1 + time.sleep(RETRY_DELAY) if not response_data or decode_request(response_data)['status'] != "OK": raise Exception(f"didn't recieved ack on request {response_data}") else: @@ -68,8 +81,9 @@ def send_graph_exec_request(self, graph, shell_vars, functions, debug=False) -> return True def close(self): - self._socket.send("Done") + self._socket.send(encode_request({"type": "Done"})) self._socket.close() + self._online = False def _wait_ack(self): confirmation = self._socket.recv(4096) @@ -147,22 +161,44 @@ def run(self): elif request.startswith("Exec-Graph"): args = request.split(':', 1)[1].strip() filename, declared_functions_file = args.split() + numExecutedSubgraphs = 0 + numTotalSubgraphs = None + crashed_worker = workers_manager.args.worker_timeout_choice if workers_manager.args.worker_timeout_choice != '' else "worker1" # default to be worker1 + try: + while not numTotalSubgraphs or numExecutedSubgraphs < numTotalSubgraphs: + # In the naive fault tolerance, we want all workers to receive its subgraph(s) without crashing + # if a crash happens, we'll re-split the IR and do it again until scheduling is done without any crash. + numExecutedSubgraphs = 0 + worker_subgraph_pairs, shell_vars, main_graph = prepare_graph_for_remote_exec(filename, workers_manager.get_worker) + if numTotalSubgraphs == None: + numTotalSubgraphs = len(worker_subgraph_pairs) + script_fname = to_shell_file(main_graph, workers_manager.args) + log("Master node graph stored in ", script_fname) + + # Read functions + log("Functions stored in ", declared_functions_file) + declared_functions = read_file(declared_functions_file) + + # Execute subgraphs on workers + for worker, subgraph in worker_subgraph_pairs: + worker_timeout = workers_manager.args.worker_timeout if worker.name == crashed_worker and workers_manager.args.worker_timeout else 0 + + try: + worker.send_graph_exec_request(subgraph, shell_vars, declared_functions, workers_manager.args.debug, worker_timeout) + numExecutedSubgraphs += 1 + except Exception as e: + # worker timeout + worker.close() + log(f"{worker} closed") + # Report to main shell a script to execute + # Delay this to the very end when every worker has received the subgraph + response_msg = f"OK {script_fname}" + dspash_socket.respond(response_msg, conn) + except Exception as e: + print(e) + + - worker_subgraph_pairs, shell_vars, main_graph = prepare_graph_for_remote_exec(filename, workers_manager.get_worker) - script_fname = to_shell_file(main_graph, workers_manager.args) - log("Master node graph stored in ", script_fname) - - # Read functions - log("Functions stored in ", declared_functions_file) - declared_functions = read_file(declared_functions_file) - - # Report to main shell a script to execute - response_msg = f"OK {script_fname}" - dspash_socket.respond(response_msg, conn) - - # Execute subgraphs on workers - for worker, subgraph in worker_subgraph_pairs: - worker.send_graph_exec_request(subgraph, shell_vars, declared_functions, workers_manager.args.debug) else: raise Exception(f"Unknown request: {request}") diff --git a/compiler/pash_init_setup.sh b/compiler/pash_init_setup.sh index a2d41c9e8..7cf326594 100644 --- a/compiler/pash_init_setup.sh +++ b/compiler/pash_init_setup.sh @@ -23,6 +23,8 @@ export pash_parallel_pipelines=0 export pash_daemon_communicates_through_unix_pipes_flag=0 export show_version=0 export distributed_exec=0 +export worker_timeout=0 +export worker_timeout_choice=0 for item in "$@" do @@ -102,6 +104,14 @@ do if [ "--distributed_exec" == "$item" ]; then export distributed_exec=1 fi + + if [ "--worker_timeout" == "$item" ]; then + export worker_timeout=1 + fi + + if [ "--worker_timeout_choice" == "$item" ]; then + export worker_timeout_choice=1 + fi done ## `pash_redir_output` and `pash_redir_all_output` are strictly for logging. @@ -235,4 +245,5 @@ if [ "$distributed_exec" -eq 1 ]; then fi export -f pash_communicate_daemon export -f pash_communicate_daemon_just_send +export -f pash_communicate_worker_manager export -f pash_wait_until_daemon_listening diff --git a/evaluation/benchmarks/bio/bio-align/genome-diff.sh b/evaluation/benchmarks/bio/bio-align/genome-diff.sh index a269f9e95..c82061797 100755 --- a/evaluation/benchmarks/bio/bio-align/genome-diff.sh +++ b/evaluation/benchmarks/bio/bio-align/genome-diff.sh @@ -11,7 +11,7 @@ # bacteria), and any regions with less than 10 supporting reads. # Requires: samtools, minimap2, bcftools -# Data: http://ndr.md/data/bio/R1.fastq.gz http://ndr.md/data/bio/R2.fastq.gz http://ndr.md/data/bio/ref.fa +# Data: atlas-group.cs.brown.edu/data/bio/R1.fastq.gz atlas-group.cs.brown.edu/data/bio/R2.fastq.gz atlas-group.cs.brown.edu/data/bio/ref.fa # https://github.com/samtools/samtools/releases/latest # https://github.com/lh3/minimap2 diff --git a/evaluation/benchmarks/bio/bio-align/genquality.sh b/evaluation/benchmarks/bio/bio-align/genquality.sh index 64c777fdd..62c731960 100755 --- a/evaluation/benchmarks/bio/bio-align/genquality.sh +++ b/evaluation/benchmarks/bio/bio-align/genquality.sh @@ -6,7 +6,7 @@ # http://thegenomefactory.blogspot.com/2019/09/25-reasons-assemblies-dont-make-it-into.html # Require: csvkit -# Data: http://ndr.md/data/bio/genbank.txt +# Data: atlas-group.cs.brown.edu/data/bio/genbank.txt IN=./input/genbank.txt OUT=./output/out.txt diff --git a/evaluation/benchmarks/bio/bio1/setup.sh b/evaluation/benchmarks/bio/bio1/setup.sh index 40bdd47cb..9c2bb1629 100644 --- a/evaluation/benchmarks/bio/bio1/setup.sh +++ b/evaluation/benchmarks/bio/bio1/setup.sh @@ -8,7 +8,7 @@ mkdir -p input mkdir -p output cd input if [[ ! -f R1.fastq ]]; then - wget ndr.md/data/bio/{R1.fastq.gz,R2.fastq.gz,ref.fa} + wget atlas-group.cs.brown.edu/data/bio/{R1.fastq.gz,R2.fastq.gz,ref.fa} gunzip R1.fastq.gz gunzip R2.fastq.gz diff --git a/evaluation/benchmarks/max-temp/max-temp-preprocess.sh b/evaluation/benchmarks/max-temp/max-temp-preprocess.sh index e3d4b98c5..8d0719049 100755 --- a/evaluation/benchmarks/max-temp/max-temp-preprocess.sh +++ b/evaluation/benchmarks/max-temp/max-temp-preprocess.sh @@ -1,12 +1,12 @@ #!/bin/bash -sed 's;^;http://ndr.md/data/noaa/;' | +sed 's;^;atlas-group.cs.brown.edu/data/noaa/;' | sed 's;$;/;' | xargs -r -n 1 curl -s | grep gz | tr -s ' \n' | cut -d ' ' -f9 | sed 's;^\(.*\)\(20[0-9][0-9]\).gz;\2/\1\2\.gz;' | - sed 's;^;http://ndr.md/data/noaa/;' | + sed 's;^;atlas-group.cs.brown.edu/data/noaa/;' | xargs -n1 curl -s | gunzip diff --git a/evaluation/benchmarks/max-temp/max-temp.sh b/evaluation/benchmarks/max-temp/max-temp.sh index b0c18aaa8..b74f72b10 100755 --- a/evaluation/benchmarks/max-temp/max-temp.sh +++ b/evaluation/benchmarks/max-temp/max-temp.sh @@ -2,7 +2,7 @@ FROM=${FROM:-2015} TO=${TO:-2015} -IN=${IN:-'http://ndr.md/data/noaa/'} +IN=${IN:-'atlas-group.cs.brown.edu/data/noaa/'} fetch=${fetch:-"curl -s"} seq $FROM $TO | diff --git a/evaluation/benchmarks/max-temp/temp-analytics.sh b/evaluation/benchmarks/max-temp/temp-analytics.sh index 319a8f0e4..a1399fa7d 100755 --- a/evaluation/benchmarks/max-temp/temp-analytics.sh +++ b/evaluation/benchmarks/max-temp/temp-analytics.sh @@ -2,7 +2,7 @@ FROM=${FROM:-2015} TO=${TO:-2015} -IN=${IN:-'http://ndr.md/data/noaa/'} +IN=${IN:-'atlas-group.cs.brown.edu/data/noaa/'} fetch=${fetch:-"curl -s"} data_file=temperatures.txt diff --git a/evaluation/benchmarks/nlp/input/setup.sh b/evaluation/benchmarks/nlp/input/setup.sh index 5486b39f2..a26a9cf19 100755 --- a/evaluation/benchmarks/nlp/input/setup.sh +++ b/evaluation/benchmarks/nlp/input/setup.sh @@ -20,7 +20,7 @@ setup_dataset() { cd pg if [[ "$1" == "--full" ]]; then echo 'N.b.: download/extraction will take about 10min' - wget ndr.md/data/pg.tar.xz + wget atlas-group.cs.brown.edu/data/pg.tar.xz # FIXME: moving to PG soon if [ $? -ne 0 ]; then cat <<-'EOF' | sed 's/^ *//' Downloading input dataset failed, thus need to manually rsync all books from project gutenberg: diff --git a/evaluation/benchmarks/oneliners/input/setup.sh b/evaluation/benchmarks/oneliners/input/setup.sh index 96388980d..eb8a00317 100755 --- a/evaluation/benchmarks/oneliners/input/setup.sh +++ b/evaluation/benchmarks/oneliners/input/setup.sh @@ -26,7 +26,7 @@ setup_dataset() { fi if [ ! -f ./1M.txt ]; then - curl -sf 'http://ndr.md/data/dummy/1M.txt' > 1M.txt + curl -sf 'atlas-group.cs.brown.edu/data/dummy/1M.txt' > 1M.txt if [ $? -ne 0 ]; then echo 'cannot find 1M.txt -- please contact the developers of pash' exit 1 @@ -51,7 +51,7 @@ setup_dataset() { fi if [ ! -f ./1G.txt ]; then - curl -sf 'http://ndr.md/data/dummy/1G.txt' > 1G.txt + curl -sf 'atlas-group.cs.brown.edu/data/dummy/1G.txt' > 1G.txt if [ $? -ne 0 ]; then echo 'cannot find 1G.txt -- please contact the developers of pash' exit 1 @@ -61,7 +61,7 @@ setup_dataset() { # download wamerican-insane dictionary and sort according to machine if [ ! -f ./dict.txt ]; then - curl -sf 'http://ndr.md/data/dummy/dict.txt' | sort > dict.txt + curl -sf 'atlas-group.cs.brown.edu/data/dummy/dict.txt' | sort > dict.txt if [ $? -ne 0 ]; then echo 'cannot find dict.txt -- please contact the developers of pash' exit 1 @@ -70,7 +70,7 @@ setup_dataset() { fi if [ ! -f ./all_cmds.txt ]; then - curl -sf 'http://ndr.md/data/dummy/all_cmds.txt' > all_cmds.txt + curl -sf 'atlas-group.cs.brown.edu/data/dummy/all_cmds.txt' > all_cmds.txt if [ $? -ne 0 ]; then # This should be OK for tests, no need for abort ls /usr/bin/* > all_cmds.txt diff --git a/evaluation/benchmarks/web-index/input/setup.sh b/evaluation/benchmarks/web-index/input/setup.sh index 72a4fd8f9..79a77276a 100755 --- a/evaluation/benchmarks/web-index/input/setup.sh +++ b/evaluation/benchmarks/web-index/input/setup.sh @@ -17,8 +17,7 @@ setup_dataset() { wget $wiki_archive || eexit "cannot fetch wikipedia" 7za x wikipedia-en-html.tar.7z tar -xvf wikipedia-en-html.tar - wget http://ndr.md/data/wikipedia/index.txt # || eexit "cannot fetch wiki indices" - # It is actually OK if we don't have this index since we download the 500/1000 below + wget atlas-group.cs.brown.edu/data/wikipedia/index.txt # FIXME: we download index below? fi if [ "$1" = "--small" ]; then diff --git a/evaluation/intro/input/setup.sh b/evaluation/intro/input/setup.sh index a524e9e56..e1c253dd7 100755 --- a/evaluation/intro/input/setup.sh +++ b/evaluation/intro/input/setup.sh @@ -7,7 +7,7 @@ cd $(dirname $0) [ "$1" = "-c" ] && rm-files 100M.txt words sorted_words if [ ! -f ./100M.txt ]; then - curl -f 'ndr.md/data/dummy/100M.txt' > 100M.txt + curl -sf --connect-timeout 10 'atlas-group.cs.brown.edu/data/dummy/100M.txt' > 100M.txt if [ $? -ne 0 ]; then curl -f 'http://www.gutenberg.org/files/2600/2600-0.txt' | head -c 1M > 1M.txt [ $? -ne 0 ] && eexit 'cannot find 1M.txt' @@ -20,7 +20,7 @@ if [ ! -f ./100M.txt ]; then fi if [ ! -f ./words ]; then - curl -f 'http://ndr.md/data/dummy/words' > words + curl -sf --connect-timeout 10 'atlas-group.cs.brown.edu/data/dummy/words' > words if [ $? -ne 0 ]; then if [ $(uname) = 'Darwin' ]; then cp /usr/share/dict/web2 words || eexit "cannot find dict file" diff --git a/evaluation/other/more-scripts/page-count.sh b/evaluation/other/more-scripts/page-count.sh index b4a3326e5..c4d89ecfd 100755 --- a/evaluation/other/more-scripts/page-count.sh +++ b/evaluation/other/more-scripts/page-count.sh @@ -5,7 +5,7 @@ # Require: libimage-exiftool-perl, bc # Data: -# http://ndr.md/data/dummy/large.pdf +# atlas-group.cs.brown.edu/data/large.pdf # More data: # https://arxiv.org/help/bulk_data diff --git a/evaluation/other/more-scripts/spell.sh b/evaluation/other/more-scripts/spell.sh index 1d4a9f330..9fd5e7384 100755 --- a/evaluation/other/more-scripts/spell.sh +++ b/evaluation/other/more-scripts/spell.sh @@ -6,7 +6,7 @@ # TODO: `groff is an interesting "pure", whose wrapper only needs split input # TODO: files carefully. -# Data: http://ndr.md/data/dummy/ronn.1 +# Data: atlas-group.cs.brown.edu/data/dummy/ronn.1 # dict depends on the system (and has to be sorted), so we assume it exists dict=./input/dict.txt diff --git a/evaluation/tests/input/setup.sh b/evaluation/tests/input/setup.sh index ac78afd20..88b332f1c 100755 --- a/evaluation/tests/input/setup.sh +++ b/evaluation/tests/input/setup.sh @@ -16,7 +16,7 @@ esac [ "$1" = "-c" ] && rm-files 1M.txt all_cmds.txt words sorted_words 10M.txt if [ ! -f ./1M.txt ]; then - curl -sf 'http://ndr.md/data/dummy/1M.txt' > 1M.txt + curl -sf --connect-timeout 10 'atlas-group.cs.brown.edu/data/dummy/1M.txt' > 1M.txt if [ $? -ne 0 ]; then curl -sf 'http://www.gutenberg.org/files/2600/2600-0.txt' | head -c 1${head_sz} > 1M.txt [ $? -ne 0 ] && eexit 'cannot find 1M.txt' @@ -26,7 +26,10 @@ fi if [ ! -f ./all_cmds.txt ]; then if [ "$(hostname)" = "deathstar" ]; then - curl -sf 'http://ndr.md/data/dummy/all_cmds.txt' > all_cmds.txt || eexit "all_cmds not found" + curl -sf --connect-timeout 10 'atlas-group.cs.brown.edu/data/dummy/all_cmds.txt' > all_cmds.txt + if [ $? -ne 0 ]; then + curl -f 'https://zenodo.org/record/7650885/files/all_cmds.txt' > all_cmds.txt || eexit "all_cmds not found" + fi else ls /usr/bin/* > all_cmds.txt fi @@ -34,7 +37,7 @@ if [ ! -f ./all_cmds.txt ]; then fi if [ ! -f ./words ]; then - curl -sf 'http://ndr.md/data/dummy/words' > words + curl -sf --connect-timeout 10 'atlas-group.cs.brown.edu/data/dummy/words' > words if [ $? -ne 0 ]; then if [ $(uname) = 'Darwin' ]; then cp /usr/share/dict/web2 words || eexit "cannot find dict file" diff --git a/evaluation/tests/sed-test.sh b/evaluation/tests/sed-test.sh index f5ba0ac85..38d1cc855 100644 --- a/evaluation/tests/sed-test.sh +++ b/evaluation/tests/sed-test.sh @@ -1,11 +1,11 @@ cat $PASH_TOP/evaluation/tests/input/1M.txt | sed 's;^d;da;' | - sed 's;^;http://ndr.md/data/noaa/;' | + sed 's;^;atlas-group.cs.brown.edu/data/noaa/;' | sed 's;$;/;' | sed 's;^\(.*\)\(20[0-9][0-9]\).gz;\2/\1\2\.gz;' | - sed 's;^;http://ndr.md/data/noaa/;' | + sed 's;^;atlas-group.cs.brown.edu/data/noaa/;' | sed "s#^#$WIKI#" | sed s/\$/'0s'/ | sed 1d | sed 4d | - sed "\$d" \ No newline at end of file + sed "\$d"