From a1552b98c45861dbe316eb8bd3d6c8ec0fb5921b Mon Sep 17 00:00:00 2001 From: cjx10 Date: Thu, 14 Mar 2024 05:00:40 +0000 Subject: [PATCH] Retry on broken pipe and refactor error handling (#154) --- experiment/builder_runner.py | 89 ++++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 39 deletions(-) diff --git a/experiment/builder_runner.py b/experiment/builder_runner.py index e562157c59..8c7f056bc3 100644 --- a/experiment/builder_runner.py +++ b/experiment/builder_runner.py @@ -322,14 +322,6 @@ def get_coverage_local( class CloudBuilderRunner(BuilderRunner): """Cloud BuilderRunner.""" - _RETRYABLE_ERRORS = [ - # As mentioned in pr #100. - 'RESOURCE_EXHAUSTED', - # Temp workaround for issue #12. - 'You do not currently have an active account selected', - # Workaround for issue #85. - 'gcloud crashed (OSError): unexpected end of data', - ] def __init__(self, *args, experiment_name: str, experiment_bucket: str, **kwargs): @@ -337,6 +329,51 @@ def __init__(self, *args, experiment_name: str, experiment_bucket: str, self.experiment_bucket = experiment_bucket super().__init__(*args, **kwargs) + @staticmethod + def _run_with_retry_control(target_path: str, *args, **kwargs) -> bool: + """sp.run() with controllable retry and customized exponential backoff.""" + # List of (error_str, exp_backoff_func). + retryable_errors = [ + # As mentioned in pr #100. + ('RESOURCE_EXHAUSTED', lambda x: 5 * 2**x + random.randint(50, 90)), + # As mentioned in pr #151. + ('BrokenPipeError: [Errno 32] Broken pipe', + lambda x: 5 * 2**x + random.randint(1, 5)), + # Temp workaround for issue #12. + ('You do not currently have an active account selected', + lambda x: 5 * 2**x), + # Workaround for issue #85. + ('gcloud crashed (OSError): unexpected end of data', lambda x: 5 * 2**x + ), + ] + + for attempt_id in range(1, CLOUD_EXP_MAX_ATTEMPT + 1): + try: + sp.run(*args, capture_output=True, check=True, **kwargs) + return True + except sp.CalledProcessError as e: + # Replace \n for single log entry on cloud. + stdout = e.stdout.decode('utf-8').replace('\n', '\t') + stderr = e.stderr.decode('utf-8').replace('\n', '\t') + + delay = next((delay_f(attempt_id) + for err, delay_f in retryable_errors + if err in stdout + stderr), 0) + + if not delay or attempt_id == CLOUD_EXP_MAX_ATTEMPT: + logging.error('Failed to evaluate %s on cloud, attempt %d:\n%s\n%s', + os.path.realpath(target_path), attempt_id, stdout, + stderr) + break + + logging.warning( + 'Failed to evaluate %s on cloud, attempt %d, retry in %ds:\n' + '%s\n%s', os.path.realpath(target_path), attempt_id, delay, stdout, + stderr) + time.sleep(delay) + + return False + def build_and_run(self, generated_project: str, target_path: str, iteration: int) -> tuple[BuildResult, Optional[RunResult]]: build_result = BuildResult() @@ -358,9 +395,9 @@ def build_and_run(self, generated_project: str, target_path: str, coverage_name = f'{uid}.coverage' coverage_path = f'gs://{self.experiment_bucket}/{coverage_name}' - for attempt_id in range(1, CLOUD_EXP_MAX_ATTEMPT + 1): - try: - sp.run([ + if not self._run_with_retry_control( + os.path.realpath(target_path), + [ f'./{oss_fuzz_checkout.VENV_DIR}/bin/python3', 'infra/build/functions/target_experiment.py', f'--project={generated_project}', @@ -371,34 +408,8 @@ def build_and_run(self, generated_project: str, target_path: str, f'--upload_coverage={coverage_path}', f'--experiment_name={self.experiment_name}', '--' ] + self._libfuzzer_args(), - capture_output=True, - check=True, - cwd=oss_fuzz_checkout.OSS_FUZZ_DIR) - break - except sp.CalledProcessError as e: - # Replace \n for single log entry on cloud. - stdout = e.stdout.decode('utf-8').replace('\n', '\t') - stderr = e.stderr.decode('utf-8').replace('\n', '\t') - - captured_error = next( - (err for err in self._RETRYABLE_ERRORS if err in stdout + stderr), - '') - if captured_error and attempt_id < CLOUD_EXP_MAX_ATTEMPT: - delay = 5 * 2**attempt_id - if captured_error == 'RESOURCE_EXHAUSTED': - # Add random jitter in case of exceeding request per minute quota. - delay += random.randint(50, 90) - - logging.warning( - 'Failed to evaluate %s on cloud, attempt %d:\n%s\n%s\n' - 'Retry in %ds...', os.path.realpath(target_path), attempt_id, - stdout, stderr, delay) - time.sleep(delay) - else: - logging.error('Failed to evaluate %s on cloud, attempt %d:\n%s\n%s', - os.path.realpath(target_path), attempt_id, stdout, - stderr) - return build_result, None + cwd=oss_fuzz_checkout.OSS_FUZZ_DIR): + return build_result, None print(f'Evaluated {os.path.realpath(target_path)} on cloud.')