diff --git a/compiler/config.py b/compiler/config.py index 29d9ef419..34d853f0e 100644 --- a/compiler/config.py +++ b/compiler/config.py @@ -127,6 +127,10 @@ def add_common_arguments(parser): help="Run multiple pipelines in parallel if they are safe to run", action="store_true", default=False) + parser.add_argument("--parallel_pipelines_limit", + type=int, + help="Configure the limit for the number of parallel pipelines at one time (default: cpu count, 0 is turns parallel pipelines off)", + default=os.cpu_count()) parser.add_argument("--r_split", help="(experimental) use round robin split, merge, wrap, and unwrap", action="store_true") @@ -197,6 +201,8 @@ def pass_common_arguments(pash_arguments): arguments.append(string_to_argument("--distributed_exec")) if (pash_arguments.parallel_pipelines): arguments.append(string_to_argument("--parallel_pipelines")) + arguments.append(string_to_argument("--parallel_pipelines_limit")) + arguments.append(string_to_argument(str(pash_arguments.parallel_pipelines_limit))) if (pash_arguments.daemon_communicates_through_unix_pipes): arguments.append(string_to_argument("--daemon_communicates_through_unix_pipes")) arguments.append(string_to_argument("--r_split_batch_size")) diff --git a/compiler/pash_runtime_daemon.py b/compiler/pash_runtime_daemon.py index 54733b68c..56ef75673 100644 --- a/compiler/pash_runtime_daemon.py +++ b/compiler/pash_runtime_daemon.py @@ -327,6 +327,7 @@ def compile_and_add(self, compiled_script_file, var_file, input_ir_file): self.wait_for_all() if compile_success: + self.wait_for_limit(config.pash_args.parallel_pipelines_limit) response = success_response( f'{process_id} {compiled_script_file} {var_file} {input_ir_file}') else: @@ -357,9 +358,9 @@ def get_next_id(self): self.next_id += 1 return self.next_id - def wait_for_all(self): - log("Waiting for all processes to finish. There are", self.running_procs, "processes remaining.") - while self.running_procs > 0: + def wait_for_limit(self, limit): + log("Waiting for for number of processes to be less than limit") + while self.running_procs > limit: input_cmd = self.get_input() # must be exit command or something is wrong if (input_cmd.startswith("Exit:")): @@ -367,6 +368,10 @@ def wait_for_all(self): else: raise Exception( f"Command should be exit but it was {input_cmd}") + + def wait_for_all(self): + log("Waiting for all processes to finish. There are", self.running_procs, "processes remaining.") + self.wait_for_limit(0) self.unsafe_running = False def handle_exit(self, input_cmd):