From 50b21f718e1356633d4eda90741b9169fa5a5963 Mon Sep 17 00:00:00 2001 From: Christian Clauss Date: Mon, 10 Feb 2025 17:55:56 +0100 Subject: [PATCH] ruff-rules-for-pyupgrade --- compiler_opt/benchmark/benchmark_chromium.py | 8 +- compiler_opt/benchmark/benchmark_report.py | 17 +- .../benchmark/benchmark_report_converter.py | 2 +- compiler_opt/benchmark/benchmarking_utils.py | 6 +- .../benchmark/gtest_executable_utils.py | 13 +- .../distributed/buffered_scheduler.py | 15 +- .../distributed/local/local_worker_manager.py | 13 +- compiler_opt/distributed/worker.py | 9 +- compiler_opt/es/blackbox_evaluator.py | 17 +- compiler_opt/es/blackbox_learner.py | 18 +- compiler_opt/es/blackbox_optimizers.py | 92 +++++----- compiler_opt/es/blackbox_test_utils.py | 7 +- ...gradient_ascent_optimization_algorithms.py | 13 +- compiler_opt/es/policy_utils.py | 5 +- .../regalloc_trace/regalloc_trace_worker.py | 21 +-- .../regalloc_trace_worker_test.py | 3 +- compiler_opt/rl/agent_config.py | 11 +- compiler_opt/rl/best_trajectory.py | 7 +- compiler_opt/rl/best_trajectory_test.py | 2 +- compiler_opt/rl/compilation_runner.py | 68 +++---- compiler_opt/rl/compilation_runner_test.py | 5 +- compiler_opt/rl/corpus.py | 51 +++--- compiler_opt/rl/data_collector.py | 8 +- compiler_opt/rl/data_reader.py | 10 +- compiler_opt/rl/distributed/agent.py | 18 +- compiler_opt/rl/distributed/learner.py | 13 +- .../rl/distributed/ppo_collect_lib.py | 10 +- compiler_opt/rl/distributed/ppo_eval_lib.py | 7 +- compiler_opt/rl/env.py | 42 ++--- compiler_opt/rl/env_test.py | 10 +- compiler_opt/rl/feature_ops.py | 8 +- .../generate_bc_trajectories_lib.py | 167 +++++++++--------- .../generate_bc_trajectories_test.py | 3 +- .../weighted_bc_trainer_lib.py | 47 +++-- compiler_opt/rl/inlining/env.py | 10 +- .../rl/inlining/imitation_learning_config.py | 3 +- compiler_opt/rl/inlining/inlining_runner.py | 3 +- compiler_opt/rl/local_data_collector.py | 24 +-- compiler_opt/rl/local_data_collector_test.py | 9 +- compiler_opt/rl/log_reader.py | 27 +-- compiler_opt/rl/policy_saver.py | 14 +- compiler_opt/rl/problem_configuration.py | 10 +- compiler_opt/rl/random_net_distillation.py | 2 +- compiler_opt/rl/regalloc/regalloc_network.py | 15 +- compiler_opt/rl/regalloc/regalloc_runner.py | 3 +- .../regalloc_priority_runner.py | 9 +- compiler_opt/rl/registry.py | 4 +- compiler_opt/rl/train_bc.py | 4 +- compiler_opt/rl/train_locally.py | 3 +- compiler_opt/rl/trainer.py | 13 +- .../tools/combine_tfa_policies_lib.py | 7 +- .../tools/feature_importance_graphs.py | 6 +- .../tools/feature_importance_utils.py | 4 +- compiler_opt/tools/generate_default_trace.py | 15 +- compiler_opt/tools/generate_vocab.py | 8 +- compiler_opt/type_map.py | 4 +- pyproject.toml | 3 +- 57 files changed, 448 insertions(+), 498 deletions(-) diff --git a/compiler_opt/benchmark/benchmark_chromium.py b/compiler_opt/benchmark/benchmark_chromium.py index 166ebea1..8602d2e3 100644 --- a/compiler_opt/benchmark/benchmark_chromium.py +++ b/compiler_opt/benchmark/benchmark_chromium.py @@ -52,8 +52,6 @@ from compiler_opt.benchmark import gtest_executable_utils from compiler_opt.benchmark import benchmarking_utils -from typing import List, Dict, Union - FLAGS = flags.FLAGS test_prefix = './compiler_opt/benchmark/chromium_test_descriptions/' @@ -110,7 +108,7 @@ def build_chromium_tests(regalloc_advisor: str, chromium_build_path: str, chromium_source_path: str, depot_tools_path: str, - llvm_build_path: str, tests_to_build: List[str]): + llvm_build_path: str, tests_to_build: list[str]): """Builds the chromium test suite This function will build the specified chromium tests using the specified @@ -170,9 +168,9 @@ def build_chromium_tests(regalloc_advisor: str, chromium_build_path: str, ninja_compile_process.wait() -def run_tests(tests_to_run: List[Dict[str, Union[str, List[str]]]], +def run_tests(tests_to_run: list[dict[str, str | list[str]]], chromium_absolute_build_path: str, num_threads: int, - perf_counters: List[str]): + perf_counters: list[str]): """A utility to run a set of chromium tests This function takes in a list of test descriptions containing the diff --git a/compiler_opt/benchmark/benchmark_report.py b/compiler_opt/benchmark/benchmark_report.py index b7d7d1f5..0a574149 100644 --- a/compiler_opt/benchmark/benchmark_report.py +++ b/compiler_opt/benchmark/benchmark_report.py @@ -18,31 +18,28 @@ import statistics from typing import Any -from typing import Dict -from typing import Iterable -from typing import List -from typing import Tuple +from collections.abc import Iterable from absl import logging # For each benchmark, and for each counter, capture the recorded values. -PerBenchmarkResults = Dict[str, Dict[str, List[float]]] +PerBenchmarkResults = dict[str, dict[str, list[float]]] # Benchmark data, as captured by the benchmark json output: a dictionary from # benchmark names to a list of run results. Each run result is a dictionary of # key-value pairs, e.g. counter name - value. -BenchmarkRunResults = Dict[str, List[Dict[str, Any]]] +BenchmarkRunResults = dict[str, list[dict[str, Any]]] # A comparison per benchmark, per counter, capturing the geomean and the stdev # of the base and experiment values. -ABComparison = Dict[str, Dict[str, Tuple[float, float, float]]] +ABComparison = dict[str, dict[str, tuple[float, float, float]]] -def _geomean(data: List[float]): +def _geomean(data: list[float]): return math.exp(sum(math.log(x) for x in data) / len(data)) -def _stdev(data: List[float]): +def _stdev(data: list[float]): assert data return 0.0 if len(data) == 1 else statistics.stdev(data) @@ -70,7 +67,7 @@ def counters(self): def raw_measurements(self): return self._raw_measurements - def counter_means(self, benchmark: str, counter: str) -> Tuple[float, float]: + def counter_means(self, benchmark: str, counter: str) -> tuple[float, float]: if counter not in self.counters(): raise ValueError('unknown counter') if benchmark not in self.names(): diff --git a/compiler_opt/benchmark/benchmark_report_converter.py b/compiler_opt/benchmark/benchmark_report_converter.py index 10812531..c9741612 100644 --- a/compiler_opt/benchmark/benchmark_report_converter.py +++ b/compiler_opt/benchmark/benchmark_report_converter.py @@ -32,7 +32,7 @@ import csv import json -from typing import Sequence +from collections.abc import Sequence from absl import app from absl import flags diff --git a/compiler_opt/benchmark/benchmarking_utils.py b/compiler_opt/benchmark/benchmarking_utils.py index 320515b3..67fd888d 100644 --- a/compiler_opt/benchmark/benchmarking_utils.py +++ b/compiler_opt/benchmark/benchmarking_utils.py @@ -20,11 +20,9 @@ import tensorflow import json -from typing import Optional, List - def build_llvm(model_path: str, use_existing_build: bool, llvm_build_path: str, - llvm_source_path: Optional[str]): + llvm_source_path: str | None): """Builds LLVM/clang with the specified model and the correct settings This function invokes CMake with all the correct build flags specified @@ -72,7 +70,7 @@ def build_llvm(model_path: str, use_existing_build: bool, llvm_build_path: str, cmake_compile_process.wait() -def run_microbenchmark(executable: str, perf_counters: List[str]): +def run_microbenchmark(executable: str, perf_counters: list[str]): """Runs all the tests in a specific google benchmark binary This function takes in an executable and performance counters according to the diff --git a/compiler_opt/benchmark/gtest_executable_utils.py b/compiler_opt/benchmark/gtest_executable_utils.py index e715789d..7ac196a0 100644 --- a/compiler_opt/benchmark/gtest_executable_utils.py +++ b/compiler_opt/benchmark/gtest_executable_utils.py @@ -18,11 +18,10 @@ import re from joblib import Parallel, delayed -from typing import Tuple, List, Optional, Dict from absl import logging -def run_test(test_executable: str, test_name: str, perf_counters: List[str]): +def run_test(test_executable: str, test_name: str, perf_counters: list[str]): """Runs a specific test This function executes a specific test in a gtest executable using @@ -55,7 +54,7 @@ def run_test(test_executable: str, test_name: str, perf_counters: List[str]): return decoded_stderr -def parse_perf_stat_output(perf_stat_output: str, perf_counters: List[str]): +def parse_perf_stat_output(perf_stat_output: str, perf_counters: list[str]): """Parses raw output from perf stat This function takes in the raw decoded output from perf stat @@ -77,7 +76,7 @@ def parse_perf_stat_output(perf_stat_output: str, perf_counters: List[str]): return counters_dict -def run_and_parse(test_description: Tuple[str, str, List[str]]): +def run_and_parse(test_description: tuple[str, str, list[str]]): """Runs a test and processes the output of an individual test This function takes in a description of an individual test, runs the test @@ -98,9 +97,9 @@ def run_and_parse(test_description: Tuple[str, str, List[str]]): return None -def run_test_suite(test_suite_description: Dict[str, List[str]], - test_executable: str, perf_counters: List[str], - num_threads: Optional[int]): +def run_test_suite(test_suite_description: dict[str, list[str]], + test_executable: str, perf_counters: list[str], + num_threads: int | None): """Runs an entire test suite This function takes in a test set description in the form of a path to a JSON diff --git a/compiler_opt/distributed/buffered_scheduler.py b/compiler_opt/distributed/buffered_scheduler.py index 997d983a..97797371 100644 --- a/compiler_opt/distributed/buffered_scheduler.py +++ b/compiler_opt/distributed/buffered_scheduler.py @@ -18,7 +18,8 @@ import concurrent.futures import threading -from typing import Any, Callable, Iterable, List, Optional, Tuple, TypeVar +from typing import Any, TypeVar +from collections.abc import Callable, Iterable from compiler_opt.distributed import worker @@ -26,9 +27,9 @@ W = TypeVar('W') -def schedule(work: List[Callable[[T], worker.WorkerFuture]], - workers: List[T], - buffer=2) -> List[concurrent.futures.Future]: +def schedule(work: list[Callable[[T], worker.WorkerFuture]], + workers: list[T], + buffer=2) -> list[concurrent.futures.Future]: """ Assigns work to workers once previous work of the worker are completed. @@ -86,8 +87,8 @@ def schedule_on_worker_pool( action: Callable[[W, T], Any], jobs: Iterable[T], worker_pool: worker.WorkerPool, - buffer_size: Optional[int] = None -) -> Tuple[List[W], List[concurrent.futures.Future]]: + buffer_size: int | None = None +) -> tuple[list[W], list[concurrent.futures.Future]]: """ Schedule the given action on workers from the given worker pool. Args: @@ -111,7 +112,7 @@ def work(w: worker.Worker): return work work = [work_factory(job) for job in jobs] - workers: List[W] = worker_pool.get_currently_active() + workers: list[W] = worker_pool.get_currently_active() return workers, schedule(work, workers, (worker_pool.get_worker_concurrency() if buffer_size is None else buffer_size)) diff --git a/compiler_opt/distributed/local/local_worker_manager.py b/compiler_opt/distributed/local/local_worker_manager.py index 0d900096..3346dd2f 100644 --- a/compiler_opt/distributed/local/local_worker_manager.py +++ b/compiler_opt/distributed/local/local_worker_manager.py @@ -40,7 +40,8 @@ from contextlib import AbstractContextManager from multiprocessing import connection -from typing import Any, Callable, Dict, List, Optional +from typing import Any +from collections.abc import Callable @dataclasses.dataclass(frozen=True) @@ -139,7 +140,7 @@ def __init__(self): # lock for the msgid -> reply future map. The map will be set to None # when we stop. self._lock = threading.Lock() - self._map: Dict[int, concurrent.futures.Future] = {} + self._map: dict[int, concurrent.futures.Future] = {} # thread draining the pipe self._pump = threading.Thread(target=self._msg_pump) @@ -164,7 +165,7 @@ def observer(): def _msg_pump(self): while True: - task_result: Optional[TaskResult] = self._pipe.recv() + task_result: TaskResult | None = self._pipe.recv() if task_result is None: # Poison pill fed by observer break with self._lock: @@ -229,7 +230,7 @@ def set_nice(self, val: int): """ psutil.Process(self._process.pid).nice(val) - def set_affinity(self, val: List[int]): + def set_affinity(self, val: list[int]): """Sets the CPU affinity of the process, this modifies which cores the OS schedules it on. """ @@ -247,7 +248,7 @@ def __dir__(self): def create_local_worker_pool(worker_cls: 'type[worker.Worker]', - count: Optional[int], *args, + count: int | None, *args, **kwargs) -> worker.FixedWorkerPool: """Create a local worker pool for worker_cls.""" if not count: @@ -271,7 +272,7 @@ def close_local_worker_pool(pool: worker.FixedWorkerPool): class LocalWorkerPoolManager(AbstractContextManager): """A pool of workers hosted on the local machines, each in its own process.""" - def __init__(self, worker_class: 'type[worker.Worker]', count: Optional[int], + def __init__(self, worker_class: 'type[worker.Worker]', count: int | None, *args, **kwargs): self._pool = create_local_worker_pool(worker_class, count, *args, **kwargs) diff --git a/compiler_opt/distributed/worker.py b/compiler_opt/distributed/worker.py index aca46d59..f54c7451 100644 --- a/compiler_opt/distributed/worker.py +++ b/compiler_opt/distributed/worker.py @@ -14,7 +14,8 @@ """Common abstraction for a worker contract.""" import abc -from typing import Any, List, Iterable, Optional, Protocol, TypeVar +from typing import Any, Protocol, TypeVar +from collections.abc import Iterable import gin @@ -35,7 +36,7 @@ class WorkerPool(metaclass=abc.ABCMeta): # Issue #155 would strongly-type the return type. @abc.abstractmethod - def get_currently_active(self) -> List[Any]: + def get_currently_active(self) -> list[Any]: raise NotImplementedError() @abc.abstractmethod @@ -47,7 +48,7 @@ class FixedWorkerPool(WorkerPool): """A WorkerPool built from a fixed list of workers.""" # Issue #155 would strongly-type `workers` - def __init__(self, workers: List[Any], worker_concurrency: int = 2): + def __init__(self, workers: list[Any], worker_concurrency: int = 2): self._workers = workers self._worker_concurrency = worker_concurrency @@ -80,7 +81,7 @@ def wait_for(futures: Iterable[WorkerFuture]): pass -def get_exception(worker_future: WorkerFuture) -> Optional[Exception]: +def get_exception(worker_future: WorkerFuture) -> Exception | None: assert worker_future.done() try: _ = worker_future.result() diff --git a/compiler_opt/es/blackbox_evaluator.py b/compiler_opt/es/blackbox_evaluator.py index 7222f737..59a6d4d4 100644 --- a/compiler_opt/es/blackbox_evaluator.py +++ b/compiler_opt/es/blackbox_evaluator.py @@ -15,7 +15,6 @@ import abc import concurrent.futures -from typing import List, Optional from absl import logging import gin @@ -36,8 +35,8 @@ def __init__(self, train_corpus: corpus.Corpus): @abc.abstractmethod def get_results( - self, pool: FixedWorkerPool, perturbations: List[policy_saver.Policy] - ) -> List[concurrent.futures.Future]: + self, pool: FixedWorkerPool, perturbations: list[policy_saver.Policy] + ) -> list[concurrent.futures.Future]: raise NotImplementedError() @abc.abstractmethod @@ -45,7 +44,7 @@ def set_baseline(self, pool: FixedWorkerPool) -> None: raise NotImplementedError() def get_rewards( - self, results: List[concurrent.futures.Future]) -> List[Optional[float]]: + self, results: list[concurrent.futures.Future]) -> list[float | None]: rewards = [None] * len(results) for i in range(len(results)): @@ -74,8 +73,8 @@ def __init__(self, train_corpus: corpus.Corpus, super().__init__(train_corpus) def get_results( - self, pool: FixedWorkerPool, perturbations: List[policy_saver.Policy] - ) -> List[concurrent.futures.Future]: + self, pool: FixedWorkerPool, perturbations: list[policy_saver.Policy] + ) -> list[concurrent.futures.Future]: if not self._samples: for _ in range(self._total_num_perturbations): sample = self._train_corpus.sample(self._num_ir_repeats_within_worker) @@ -118,11 +117,11 @@ def __init__(self, train_corpus: corpus.Corpus, self._bb_trace_path = bb_trace_path self._function_index_path = function_index_path - self._baseline: Optional[float] = None + self._baseline: float | None = None def get_results( - self, pool: FixedWorkerPool, perturbations: List[policy_saver.Policy] - ) -> List[concurrent.futures.Future]: + self, pool: FixedWorkerPool, perturbations: list[policy_saver.Policy] + ) -> list[concurrent.futures.Future]: job_args = [{ 'modules': self._train_corpus.module_specs, 'function_index_path': self._function_index_path, diff --git a/compiler_opt/es/blackbox_learner.py b/compiler_opt/es/blackbox_learner.py index c8e962f3..597affd1 100644 --- a/compiler_opt/es/blackbox_learner.py +++ b/compiler_opt/es/blackbox_learner.py @@ -22,7 +22,7 @@ import numpy.typing as npt import tempfile import tensorflow as tf -from typing import List, Optional, Protocol +from typing import Protocol from compiler_opt.distributed.worker import FixedWorkerPool from compiler_opt.es import blackbox_optimizers @@ -81,8 +81,8 @@ class BlackboxLearnerConfig: step_size: float -def _prune_skipped_perturbations(perturbations: List[npt.NDArray[np.float32]], - rewards: List[Optional[float]]): +def _prune_skipped_perturbations(perturbations: list[npt.NDArray[np.float32]], + rewards: list[float | None]): """Remove perturbations that were skipped during the training step. Perturbations may be skipped due to an early exit condition or a server error @@ -135,7 +135,7 @@ def __init__(self, config: BlackboxLearnerConfig, initial_step: int = 0, deadline: float = 30.0, - seed: Optional[int] = None): + seed: int | None = None): """Construct a BlackboxLeaner. Args: @@ -165,7 +165,7 @@ def __init__(self, self._evaluator = self._config.evaluator(self._train_corpus, self._config.estimator_type) - def _get_perturbations(self) -> List[npt.NDArray[np.float32]]: + def _get_perturbations(self) -> list[npt.NDArray[np.float32]]: """Get perturbations for the model weights.""" rng = np.random.default_rng(seed=self._seed) return [ @@ -174,8 +174,8 @@ def _get_perturbations(self) -> List[npt.NDArray[np.float32]]: for _ in range(self._config.total_num_perturbations) ] - def _update_model(self, perturbations: List[npt.NDArray[np.float32]], - rewards: List[float]) -> None: + def _update_model(self, perturbations: list[npt.NDArray[np.float32]], + rewards: list[float]) -> None: """Update the model given a list of perturbations and rewards.""" self._model_weights = self._blackbox_opt.run_step( perturbations=np.array(perturbations), @@ -183,11 +183,11 @@ def _update_model(self, perturbations: List[npt.NDArray[np.float32]], current_input=self._model_weights, current_value=np.mean(rewards)) - def _log_rewards(self, rewards: List[float]) -> None: + def _log_rewards(self, rewards: list[float]) -> None: """Log reward to console.""" logging.info('Train reward: [%f]', np.mean(rewards)) - def _log_tf_summary(self, rewards: List[float]) -> None: + def _log_tf_summary(self, rewards: list[float]) -> None: """Log tensorboard data.""" with self._summary_writer.as_default(): tf.summary.scalar( diff --git a/compiler_opt/es/blackbox_optimizers.py b/compiler_opt/es/blackbox_optimizers.py index 97fb9db2..a8cacb79 100644 --- a/compiler_opt/es/blackbox_optimizers.py +++ b/compiler_opt/es/blackbox_optimizers.py @@ -61,7 +61,8 @@ import scipy.optimize as sp_opt import scipy.version from sklearn import linear_model -from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple, Union +from typing import Any +from collections.abc import Callable, Mapping, Sequence from compiler_opt.es import gradient_ascent_optimization_algorithms @@ -75,10 +76,10 @@ # but numpy.typing does not allow for that indication FloatArray2D = Sequence[FloatArray] -SequenceOfFloats = Union[Sequence[float], FloatArray] +SequenceOfFloats = Sequence[float] | FloatArray -LinearModel = Union[linear_model.Ridge, linear_model.Lasso, - linear_model.LinearRegression] +LinearModel = ( + linear_model.Ridge | linear_model.Lasso | linear_model.LinearRegression) class CurrentPointEstimate(enum.Enum): @@ -124,7 +125,7 @@ class UpdateMethod(enum.Enum): def filter_top_directions( perturbations: FloatArray2D, function_values: FloatArray, estimator_type: EstimatorType, - num_top_directions: int) -> Tuple[FloatArray, FloatArray]: + num_top_directions: int) -> tuple[FloatArray, FloatArray]: """Select the subset of top-performing perturbations. Args: @@ -193,7 +194,7 @@ def run_step(self, perturbations: FloatArray2D, function_values: FloatArray, raise NotImplementedError('Abstract method') @abc.abstractmethod - def get_hyperparameters(self) -> List[float]: + def get_hyperparameters(self) -> list[float]: """Returns the list of hyperparameters for blackbox function runs. Returns the list of hyperparameters for blackbox function runs that can be @@ -205,7 +206,7 @@ def get_hyperparameters(self) -> List[float]: raise NotImplementedError('Abstract method') @abc.abstractmethod - def get_state(self) -> List[float]: + def get_state(self) -> list[float]: """Returns the state of the optimizer. Returns the state of the optimizer. @@ -246,7 +247,7 @@ class StatefulOptimizer(BlackboxOptimizer): def __init__(self, estimator_type: EstimatorType, normalize_fvalues: bool, hyperparameters_update_method: UpdateMethod, - extra_params: Optional[Sequence[int]]): + extra_params: Sequence[int] | None): self.estimator_type = estimator_type self.normalize_fvalues = normalize_fvalues @@ -260,13 +261,13 @@ def __init__(self, estimator_type: EstimatorType, normalize_fvalues: bool, self.std_state_vector = [1.0] * self.state_dim super().__init__() - def get_hyperparameters(self) -> List[float]: + def get_hyperparameters(self) -> list[float]: if self.hyperparameters_update_method == UpdateMethod.STATE_NORMALIZATION: return self.mean_state_vector + self.std_state_vector else: return [] - def get_state(self) -> List[float]: + def get_state(self) -> list[float]: if self.hyperparameters_update_method == UpdateMethod.STATE_NORMALIZATION: current_state = [self.nb_steps] current_state += self.sum_state_vector @@ -318,17 +319,17 @@ def set_state(self, state: SequenceOfFloats) -> None: class MonteCarloBlackboxOptimizer(StatefulOptimizer): """Class implementing GD optimizer with MC estimation of the gradient.""" - def __init__(self, - precision_parameter: float, - estimator_type: EstimatorType, - normalize_fvalues: bool, - hyperparameters_update_method: UpdateMethod, - extra_params: Optional[Sequence[int]], - step_size: Optional[float] = None, - num_top_directions: int = 0, - gradient_ascent_optimizer: Optional[ - gradient_ascent_optimization_algorithms - .GradientAscentOptimizer] = None): + def __init__( + self, + precision_parameter: float, + estimator_type: EstimatorType, + normalize_fvalues: bool, + hyperparameters_update_method: UpdateMethod, + extra_params: Sequence[int] | None, + step_size: float | None = None, + num_top_directions: int = 0, + gradient_ascent_optimizer: gradient_ascent_optimization_algorithms + .GradientAscentOptimizer | None = None): # Check step_size and gradient_ascent_optimizer if (step_size is None) == (gradient_ascent_optimizer is None): raise ValueError('Exactly one of step_size and \ @@ -380,7 +381,7 @@ def run_step(self, perturbations: FloatArray2D, function_values: FloatArray, # gradients return self.gradient_ascent_optimizer.run_step(current_input, gradient) - def get_state(self) -> List[float]: + def get_state(self) -> list[float]: gradient_ascent_state = self.gradient_ascent_optimizer.get_state() return super().get_state() + gradient_ascent_state @@ -392,17 +393,17 @@ def set_state(self, state: SequenceOfFloats) -> None: class SklearnRegressionBlackboxOptimizer(StatefulOptimizer): """Class implementing GD optimizer with regression for grad. estimation.""" - def __init__(self, - regression_method: RegressionType, - regularizer: float, - estimator_type: EstimatorType, - normalize_fvalues: bool, - hyperparameters_update_method: UpdateMethod, - extra_params: Optional[Sequence[int]], - step_size: Optional[float] = None, - gradient_ascent_optimizer: Optional[ - gradient_ascent_optimization_algorithms - .GradientAscentOptimizer] = None): + def __init__( + self, + regression_method: RegressionType, + regularizer: float, + estimator_type: EstimatorType, + normalize_fvalues: bool, + hyperparameters_update_method: UpdateMethod, + extra_params: Sequence[int] | None, + step_size: float | None = None, + gradient_ascent_optimizer: gradient_ascent_optimization_algorithms + .GradientAscentOptimizer | None = None): # Check step_size and gradient_ascent_optimizer if (step_size is None) == (gradient_ascent_optimizer is None): raise ValueError('Exactly one of step_size and gradient_ascent_optimizer \ @@ -456,7 +457,7 @@ def run_step(self, perturbations: FloatArray2D, function_values: FloatArray, # gradients return self.gradient_ascent_optimizer.run_step(current_input, gradient) - def get_state(self) -> List[float]: + def get_state(self) -> list[float]: gradient_ascent_state = self.gradient_ascent_optimizer.get_state() return super().get_state + gradient_ascent_state @@ -484,7 +485,7 @@ def set_state(self, state: SequenceOfFloats) -> None: def normalize_function_values( function_values: FloatArray, - current_value: float) -> Tuple[FloatArray, List[float]]: + current_value: float) -> tuple[FloatArray, list[float]]: values = function_values.tolist() values.append(current_value) mean = sum(values) / float(len(values)) @@ -498,7 +499,7 @@ def monte_carlo_gradient(precision_parameter: float, perturbations: FloatArray2D, function_values: FloatArray, current_value: float, - energy: Optional[float] = 0) -> FloatArray: + energy: float | None = 0) -> FloatArray: """Calculates Monte Carlo gradient. There are several ways of estimating the gradient. This is specified by the @@ -608,7 +609,7 @@ def sklearn_regression_gradient(clf: LinearModel, estimator_type: EstimatorType, """ -class QuadraticModel(object): +class QuadraticModel: """A class for quadratic functions. Presents an interface for evaluating functions of the form @@ -620,7 +621,7 @@ class QuadraticModel(object): def __init__(self, Av: Callable[[FloatArray], FloatArray], b: FloatArray, - c: Optional[float] = 0): + c: float | None = 0): """Initialize quadratic function. Args: @@ -657,7 +658,7 @@ def grad(self, x: FloatArray) -> FloatArray: return self.quad_v(x) + self.b -class ProjectedGradientOptimizer(object): +class ProjectedGradientOptimizer: r"""A class implementing the projected gradient algorithm. The update is given by @@ -747,7 +748,7 @@ def projector(w: FloatArray) -> FloatArray: return projector -class TrustRegionSubproblemOptimizer(object): +class TrustRegionSubproblemOptimizer: r"""Solves the trust region subproblem over the L2 ball. min_x f(x) s.t. \|x - p\| \leq R @@ -765,8 +766,8 @@ class TrustRegionSubproblemOptimizer(object): def __init__(self, model_function: QuadraticModel, - trust_region_params: Dict[str, Any], - x_init: Optional[FloatArray] = None): + trust_region_params: dict[str, Any], + x_init: FloatArray | None = None): self.mf = model_function self.params = trust_region_params self.center = x_init @@ -905,8 +906,8 @@ class TrustRegionOptimizer(StatefulOptimizer): def __init__(self, precision_parameter: float, estimator_type: EstimatorType, normalize_fvalues: bool, hyperparameters_update_method: UpdateMethod, - extra_params: Optional[Sequence[int]], tr_params: Mapping[str, - Any]): + extra_params: Sequence[int] | None, tr_params: Mapping[str, + Any]): self.precision_parameter = precision_parameter super().__init__(estimator_type, normalize_fvalues, hyperparameters_update_method, extra_params) @@ -976,8 +977,7 @@ def trust_region_test(self, current_input: FloatArray, abs_ratio > self.params['reject_threshold']) should_shrink = (not is_ascent and abs_ratio > self.params['shrink_neg_threshold']) - should_grow = ((is_ascent and - tr_imp_ratio > self.params['grow_threshold'])) + should_grow = (is_ascent and tr_imp_ratio > self.params['grow_threshold']) log_message = (' fval pct change: ' + str(abs_ratio) + ' tr_ratio: ' + str(tr_imp_ratio)) if should_reject: diff --git a/compiler_opt/es/blackbox_test_utils.py b/compiler_opt/es/blackbox_test_utils.py index d5aba49e..e539fb11 100644 --- a/compiler_opt/es/blackbox_test_utils.py +++ b/compiler_opt/es/blackbox_test_utils.py @@ -13,7 +13,7 @@ # limitations under the License. """Test facilities for Blackbox classes.""" -from typing import List, Collection, Optional +from collections.abc import Collection import gin @@ -34,7 +34,7 @@ def __init__(self, arg, *, kwarg): self.function_value = 0.0 def compile(self, policy: policy_saver.Policy, - samples: List[corpus.ModuleSpec]) -> float: + samples: list[corpus.ModuleSpec]) -> float: if policy and samples: self.function_value += 1.0 return self.function_value @@ -56,8 +56,7 @@ def __init__(self, arg, *, kwarg): def compile_corpus_and_evaluate( self, modules: Collection[corpus.ModuleSpec], function_index_path: str, - bb_trace_path: str, - tflite_policy: Optional[policy_saver.Policy]) -> float: + bb_trace_path: str, tflite_policy: policy_saver.Policy | None) -> float: if modules and function_index_path and bb_trace_path and tflite_policy: self._function_value += 1 return self._function_value diff --git a/compiler_opt/es/gradient_ascent_optimization_algorithms.py b/compiler_opt/es/gradient_ascent_optimization_algorithms.py index 75f0bcb7..9e94ff45 100644 --- a/compiler_opt/es/gradient_ascent_optimization_algorithms.py +++ b/compiler_opt/es/gradient_ascent_optimization_algorithms.py @@ -54,7 +54,6 @@ import numpy as np import numpy.typing as npt -from typing import List, Optional class GradientAscentOptimizer(metaclass=abc.ABCMeta): @@ -82,7 +81,7 @@ def run_step(self, current_input: npt.NDArray[np.float32], raise NotImplementedError("Abstract method") @abc.abstractmethod - def get_state(self) -> List[float]: + def get_state(self) -> list[float]: """Returns the state of the optimizer. Returns the state of the optimizer. @@ -139,7 +138,7 @@ def run_step(self, current_input: npt.NDArray[np.float32], return current_input + step - def get_state(self) -> List[float]: + def get_state(self) -> list[float]: return self.moving_average.tolist() def set_state(self, state: npt.NDArray[np.float32]) -> None: @@ -156,9 +155,9 @@ class AdamOptimizer(GradientAscentOptimizer): def __init__(self, step_size: float, - beta1: Optional[float] = 0.9, - beta2: Optional[float] = 0.999, - epsilon: Optional[float] = 1e-07): + beta1: float | None = 0.9, + beta2: float | None = 0.999, + epsilon: float | None = 1e-07): self.step_size = step_size self.beta1 = beta1 self.beta2 = beta2 @@ -198,7 +197,7 @@ def run_step(self, current_input: npt.NDArray[np.float32], return current_input + step - def get_state(self) -> List[float]: + def get_state(self) -> list[float]: return (self.first_moment_moving_average.tolist() + self.second_moment_moving_average.tolist() + [self.t]) diff --git a/compiler_opt/es/policy_utils.py b/compiler_opt/es/policy_utils.py index e9126544..ef83c4e7 100644 --- a/compiler_opt/es/policy_utils.py +++ b/compiler_opt/es/policy_utils.py @@ -13,7 +13,8 @@ # limitations under the License. """Util functions to create and edit a tf_agent policy.""" -from typing import Protocol, Sequence, Type +from typing import Protocol +from collections.abc import Sequence import gin import numpy as np @@ -35,7 +36,7 @@ class HasModelVariables(Protocol): # TODO(abenalaast): Issue #280 @gin.configurable(module='policy_utils') def create_actor_policy( - actor_network_ctor: Type[network.DistributionNetwork], + actor_network_ctor: type[network.DistributionNetwork], greedy: bool = False, ) -> tf_policy.TFPolicy: """Creates an actor policy.""" diff --git a/compiler_opt/es/regalloc_trace/regalloc_trace_worker.py b/compiler_opt/es/regalloc_trace/regalloc_trace_worker.py index 9717de5a..c9ad4728 100644 --- a/compiler_opt/es/regalloc_trace/regalloc_trace_worker.py +++ b/compiler_opt/es/regalloc_trace/regalloc_trace_worker.py @@ -19,7 +19,7 @@ other relevant data to produce an overall cost for the model being evaluated. """ -from typing import Optional, Collection +from collections.abc import Collection import os import pathlib import subprocess @@ -65,7 +65,7 @@ def __init__(self, clang_path: str, basic_block_trace_model_path: str, self._corpus_path = corpus_path def _compile_module(self, module_to_compile: corpus.ModuleSpec, - output_directory: str, tflite_policy_path: Optional[str]): + output_directory: str, tflite_policy_path: str | None): command_vector = [self._clang_path] context = corpus.Corpus.ReplaceContext( os.path.join(self._corpus_path, module_to_compile.name) + ".bc", @@ -94,15 +94,11 @@ def _compile_module(self, module_to_compile: corpus.ModuleSpec, parents=True, exist_ok=True) command_vector.extend(["-o", module_output_path]) - subprocess.run( - command_vector, - check=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + subprocess.run(command_vector, check=True, capture_output=True) def _build_corpus(self, modules: Collection[corpus.ModuleSpec], output_directory: str, - tflite_policy: Optional[policy_saver.Policy]): + tflite_policy: policy_saver.Policy | None): with tempfile.TemporaryDirectory() as tflite_policy_dir: if tflite_policy: tflite_policy.to_filesystem(tflite_policy_dir) @@ -147,11 +143,7 @@ def _evaluate_corpus(self, module_directory: str, function_index_path: str, f"--bb_trace_path={bb_trace_path}", "--model_type=mca" ] - output = subprocess.run( - command_vector, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - check=True) + output = subprocess.run(command_vector, capture_output=True, check=True) segment_costs = [] for line in output.stdout.decode("utf-8").split("\n"): @@ -168,8 +160,7 @@ def _evaluate_corpus(self, module_directory: str, function_index_path: str, def compile_corpus_and_evaluate( self, modules: Collection[corpus.ModuleSpec], function_index_path: str, - bb_trace_path: str, - tflite_policy: Optional[policy_saver.Policy]) -> float: + bb_trace_path: str, tflite_policy: policy_saver.Policy | None) -> float: with tempfile.TemporaryDirectory() as compilation_dir: self._build_corpus(modules, compilation_dir, tflite_policy) diff --git a/compiler_opt/es/regalloc_trace/regalloc_trace_worker_test.py b/compiler_opt/es/regalloc_trace/regalloc_trace_worker_test.py index 29c73b73..7fb9eb60 100644 --- a/compiler_opt/es/regalloc_trace/regalloc_trace_worker_test.py +++ b/compiler_opt/es/regalloc_trace/regalloc_trace_worker_test.py @@ -13,7 +13,6 @@ # limitations under the License. """Test for RegallocTraceWorker.""" -from typing import List import os import json import stat @@ -27,7 +26,7 @@ from compiler_opt.rl import policy_saver -def _setup_corpus(corpus_dir: str) -> List[corpus.ModuleSpec]: +def _setup_corpus(corpus_dir: str) -> list[corpus.ModuleSpec]: modules = [ corpus.ModuleSpec("module_a", 1, ("-fmodule-a",), True), corpus.ModuleSpec("module_b", 1, ("-fmodule-b",), True) diff --git a/compiler_opt/rl/agent_config.py b/compiler_opt/rl/agent_config.py index b9be9f1c..524b3f97 100644 --- a/compiler_opt/rl/agent_config.py +++ b/compiler_opt/rl/agent_config.py @@ -13,7 +13,8 @@ # limitations under the License. """util function to create a tf_agent.""" -from typing import Any, Callable, Dict +from typing import Any +from collections.abc import Callable import abc import gin @@ -53,13 +54,13 @@ def create_agent(self, preprocessing_layers: tf.keras.layers.Layer, raise NotImplementedError() def get_policy_info_parsing_dict( - self) -> Dict[str, tf.io.FixedLenSequenceFeature]: + self) -> dict[str, tf.io.FixedLenSequenceFeature]: """Return the parsing dict for the policy info.""" return {} # pylint: disable=unused-argument def process_parsed_sequence_and_get_policy_info( - self, parsed_sequence: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: + self, parsed_sequence: dict[str, Any]) -> dict[str, dict[str, Any]]: """Function to process parsed_sequence and to return policy_info. Args: @@ -146,7 +147,7 @@ def create_agent(self, preprocessing_layers: tf.keras.layers.Layer, value_net=critic_network) def get_policy_info_parsing_dict( - self) -> Dict[str, tf.io.FixedLenSequenceFeature]: + self) -> dict[str, tf.io.FixedLenSequenceFeature]: if tensor_spec.is_discrete(self._action_spec): return { 'CategoricalProjectionNetwork_logits': @@ -164,7 +165,7 @@ def get_policy_info_parsing_dict( } def process_parsed_sequence_and_get_policy_info( - self, parsed_sequence: Dict[str, Any]) -> Dict[str, Dict[str, Any]]: + self, parsed_sequence: dict[str, Any]) -> dict[str, dict[str, Any]]: if tensor_spec.is_discrete(self._action_spec): policy_info = { 'dist_params': { diff --git a/compiler_opt/rl/best_trajectory.py b/compiler_opt/rl/best_trajectory.py index 754ce94f..acc848f5 100644 --- a/compiler_opt/rl/best_trajectory.py +++ b/compiler_opt/rl/best_trajectory.py @@ -15,7 +15,6 @@ import dataclasses import json -from typing import Dict, List import tensorflow as tf @@ -25,7 +24,7 @@ @dataclasses.dataclass(frozen=True) class BestTrajectory: reward: float - action_list: List[int] + action_list: list[int] class BestTrajectoryRepo: @@ -39,11 +38,11 @@ def __init__(self, action_name: str): list from tensorflow.SequenceExample. """ # {module_name: {identifier: best trajectory}} - self._best_trajectories: Dict[str, Dict[str, BestTrajectory]] = {} + self._best_trajectories: dict[str, dict[str, BestTrajectory]] = {} self._action_name: str = action_name @property - def best_trajectories(self) -> Dict[str, Dict[str, BestTrajectory]]: + def best_trajectories(self) -> dict[str, dict[str, BestTrajectory]]: return self._best_trajectories.copy() def sink_to_json_file(self, path: str): diff --git a/compiler_opt/rl/best_trajectory_test.py b/compiler_opt/rl/best_trajectory_test.py index e40e9b9a..ed084344 100644 --- a/compiler_opt/rl/best_trajectory_test.py +++ b/compiler_opt/rl/best_trajectory_test.py @@ -93,7 +93,7 @@ def test_sink_to_csv_file(self): path = self.create_tempfile().full_path repo = _get_test_repo_1() repo.sink_to_csv_file(path) - with open(path, 'r', encoding='utf-8') as f: + with open(path, encoding='utf-8') as f: text = f.read() self.assertEqual(text, diff --git a/compiler_opt/rl/compilation_runner.py b/compiler_opt/rl/compilation_runner.py index 8fae82a2..a030eef0 100644 --- a/compiler_opt/rl/compilation_runner.py +++ b/compiler_opt/rl/compilation_runner.py @@ -21,7 +21,7 @@ import subprocess import tempfile import threading -from typing import Callable, Dict, List, Optional, Tuple +from collections.abc import Callable from absl import flags from absl import logging @@ -62,9 +62,9 @@ class NonTemporaryDirectory: def __init__( self, - suffix: Optional[str] = None, - prefix: Optional[str] = None, - dir: Optional[str] = None, # pylint: disable=redefined-builtin + suffix: str | None = None, + prefix: str | None = None, + dir: str | None = None, # pylint: disable=redefined-builtin ignore_cleanup_errors: bool = False): _ = ignore_cleanup_errors # unused self.name = tempfile.mkdtemp(suffix, prefix, dir) @@ -79,7 +79,7 @@ def __exit__(self, exc, value, tb): pass -def get_workdir_context(explicit_temps_dir: Optional[str] = None): +def get_workdir_context(explicit_temps_dir: str | None = None): """Return a context which manages how the temperory directories are handled. When the flag explicit_temps_dir is specified temporary directories are @@ -208,11 +208,11 @@ def __del__(self): raise RuntimeError('Cancellation manager deleted while containing items.') -def start_cancellable_process( - cmdline: List[str], - timeout: float, - cancellation_manager: Optional[WorkerCancellationManager], - want_output: bool = False) -> Optional[bytes]: +def start_cancellable_process(cmdline: list[str], + timeout: float, + cancellation_manager: WorkerCancellationManager + | None, + want_output: bool = False) -> bytes | None: """Start a cancellable process. Args: @@ -283,18 +283,18 @@ class CompilationResult: 2) The keys in reward stats are those in the keys field. """ - sequence_examples: dataclasses.InitVar[List[tf.train.SequenceExample]] - serialized_sequence_examples: List[str] = dataclasses.field(init=False) + sequence_examples: dataclasses.InitVar[list[tf.train.SequenceExample]] + serialized_sequence_examples: list[str] = dataclasses.field(init=False) length: int = dataclasses.field(init=False) - reward_stats: Dict[str, RewardStat] - rewards: List[float] - policy_rewards: List[float] - keys: List[str] + reward_stats: dict[str, RewardStat] + rewards: list[float] + policy_rewards: list[float] + keys: list[str] # The id of the model used to generate this compilation result - model_id: Optional[int] + model_id: int | None - def __post_init__(self, sequence_examples: List[tf.train.SequenceExample]): + def __post_init__(self, sequence_examples: list[tf.train.SequenceExample]): object.__setattr__(self, 'serialized_sequence_examples', [x.SerializeToString() for x in sequence_examples]) lengths = [ @@ -316,9 +316,9 @@ class CompilationRunnerStub(metaclass=abc.ABCMeta): def collect_data( self, loaded_module_spec: corpus.LoadedModuleSpec, - policy: Optional[policy_saver.Policy] = None, - reward_stat: Optional[Dict[str, RewardStat]] = None, - model_id: Optional[int] = None) -> WorkerFuture[CompilationResult]: + policy: policy_saver.Policy | None = None, + reward_stat: dict[str, RewardStat] | None = None, + model_id: int | None = None) -> WorkerFuture[CompilationResult]: raise NotImplementedError() @abc.abstractmethod @@ -360,12 +360,13 @@ def is_priority_method(cls, method_name: str) -> bool: 'cancel_all_work', 'enable', 'pause_all_work', 'resume_all_work' } - def __init__(self, - clang_path: Optional[str] = None, - launcher_path: Optional[str] = None, - moving_average_decay_rate: float = 1, - create_observer_fns: Optional[List[Callable[ - [], CompilationResultObserver]]] = None): + def __init__( + self, + clang_path: str | None = None, + launcher_path: str | None = None, + moving_average_decay_rate: float = 1, + create_observer_fns: list[Callable[[], CompilationResultObserver]] + | None = None): """Initialization of CompilationRunner class. Args: @@ -403,9 +404,9 @@ def resume_all_work(self): def collect_data(self, loaded_module_spec: corpus.LoadedModuleSpec, - policy: Optional[policy_saver.Policy] = None, - reward_stat: Optional[Dict[str, RewardStat]] = None, - model_id: Optional[int] = None) -> CompilationResult: + policy: policy_saver.Policy | None = None, + reward_stat: dict[str, RewardStat] | None = None, + model_id: int | None = None) -> CompilationResult: """Collect data for the given IR file and policy. Args: @@ -458,9 +459,8 @@ def collect_data(self, sequence_example = v[0] policy_reward = v[1] if k not in reward_stat: - raise ValueError( - (f'Example {k} does not exist under default policy for ' - f'cmd line: {final_cmd_line}')) + raise ValueError(f'Example {k} does not exist under default policy for ' + f'cmd line: {final_cmd_line}') default_reward = reward_stat[k].default_reward moving_average_reward = reward_stat[k].moving_average_reward sequence_example = _overwrite_trajectory_reward( @@ -492,7 +492,7 @@ def collect_data(self, def compile_fn( self, command_line: corpus.FullyQualifiedCmdLine, tf_policy_path: str, reward_only: bool, - workdir: str) -> Dict[str, Tuple[tf.train.SequenceExample, float]]: + workdir: str) -> dict[str, tuple[tf.train.SequenceExample, float]]: """Compiles for the given IR file under the given policy. Args: diff --git a/compiler_opt/rl/compilation_runner_test.py b/compiler_opt/rl/compilation_runner_test.py index 1cf78587..ad9b0461 100644 --- a/compiler_opt/rl/compilation_runner_test.py +++ b/compiler_opt/rl/compilation_runner_test.py @@ -91,10 +91,9 @@ def _mock_compile_fn(file_paths, tf_policy_path, reward_only, workdir): # pylin return {'default': (sequence_example, native_size)} -_mock_policy = policy_saver.Policy(bytes(), bytes()) +_mock_policy = policy_saver.Policy(b'', b'') -_mock_loaded_module_spec = corpus.LoadedModuleSpec( - name='dummy', loaded_ir=bytes()) +_mock_loaded_module_spec = corpus.LoadedModuleSpec(name='dummy', loaded_ir=b'') class CompilationRunnerTest(tf.test.TestCase): diff --git a/compiler_opt/rl/corpus.py b/compiler_opt/rl/corpus.py index cf8f4440..f9419dfa 100644 --- a/compiler_opt/rl/corpus.py +++ b/compiler_opt/rl/corpus.py @@ -19,7 +19,8 @@ from absl import logging from dataclasses import dataclass -from typing import Any, Callable, Dict, List, Optional, Tuple, Type +from typing import Any +from collections.abc import Callable import json import os @@ -29,14 +30,14 @@ # Alias to better self-document APIs. Represents a complete, ready to use # command line, where all the flags reference existing, local files. -FullyQualifiedCmdLine = Tuple[str, ...] +FullyQualifiedCmdLine = tuple[str, ...] def _apply_cmdline_filters( - orig_options: Tuple[str, ...], - additional_flags: Tuple[str, ...] = (), - delete_flags: Tuple[str, ...] = (), - replace_flags: Optional[Dict[str, str]] = None) -> Tuple[str]: + orig_options: tuple[str, ...], + additional_flags: tuple[str, ...] = (), + delete_flags: tuple[str, ...] = (), + replace_flags: dict[str, str] | None = None) -> tuple[str]: option_iterator = iter(orig_options) matched_replace_flags = set() replace_flags = replace_flags if replace_flags is not None else {} @@ -84,8 +85,8 @@ class LoadedModuleSpec: """ name: str loaded_ir: bytes - loaded_thinlto_index: Optional[bytes] = None - orig_options: Tuple[str, ...] = () + loaded_thinlto_index: bytes | None = None + orig_options: tuple[str, ...] = () def _create_files_and_get_context(self, local_dir: str): root_dir = os.path.join(local_dir, self.name) @@ -117,7 +118,7 @@ class ModuleSpec: """ name: str size: int - command_line: Tuple[str, ...] = () + command_line: tuple[str, ...] = () has_thinlto: bool = False @@ -125,7 +126,7 @@ class Sampler(metaclass=abc.ABCMeta): """Corpus sampler abstraction.""" @abc.abstractmethod - def __init__(self, module_specs: Tuple[ModuleSpec]): + def __init__(self, module_specs: tuple[ModuleSpec]): self._module_specs = module_specs @abc.abstractmethod @@ -133,7 +134,7 @@ def reset(self): pass @abc.abstractmethod - def __call__(self, k: int, n: int = 20) -> List[ModuleSpec]: + def __call__(self, k: int, n: int = 20) -> list[ModuleSpec]: """ Args: k: number of modules to sample @@ -147,14 +148,14 @@ class SamplerBucketRoundRobin(Sampler): round-robin order. The buckets are sequential sections of module_specs of roughly equal lengths.""" - def __init__(self, module_specs: Tuple[ModuleSpec]): + def __init__(self, module_specs: tuple[ModuleSpec]): self._ranges = {} super().__init__(module_specs) def reset(self): pass - def __call__(self, k: int, n: int = 20) -> List[ModuleSpec]: + def __call__(self, k: int, n: int = 20) -> list[ModuleSpec]: """ Args: module_specs: list of module_specs to sample from @@ -193,7 +194,7 @@ class CorpusExhaustedError(Exception): class SamplerWithoutReplacement(Sampler): """Randomly samples the corpus, without replacement.""" - def __init__(self, module_specs: Tuple[ModuleSpec]): + def __init__(self, module_specs: tuple[ModuleSpec]): super().__init__(module_specs) self._idx = 0 self._shuffle_order() @@ -206,7 +207,7 @@ def reset(self): self._shuffle_order() self._idx = 0 - def __call__(self, k: int, n: int = 10) -> List[ModuleSpec]: + def __call__(self, k: int, n: int = 10) -> list[ModuleSpec]: """ Args: k: number of modules to sample @@ -262,16 +263,16 @@ class Corpus: class ReplaceContext: """Context for 'replace' rules.""" module_full_path: str - thinlto_full_path: Optional[str] = None + thinlto_full_path: str | None = None def __init__(self, *, data_path: str, - module_filter: Optional[Callable[[str], bool]] = None, - additional_flags: Tuple[str, ...] = (), - delete_flags: Tuple[str, ...] = (), - replace_flags: Optional[Dict[str, str]] = None, - sampler_type: Type[Sampler] = SamplerBucketRoundRobin): + module_filter: Callable[[str], bool] | None = None, + additional_flags: tuple[str, ...] = (), + delete_flags: tuple[str, ...] = (), + replace_flags: dict[str, str] | None = None, + sampler_type: type[Sampler] = SamplerBucketRoundRobin): """ Prepares the corpus by pre-loading all the CorpusElements and preparing for sampling. Command line origin (.cmd file or override) is decided, and final @@ -297,7 +298,7 @@ def __init__(self, # {additional|delete}_flags here with tf.io.gfile.GFile( os.path.join(data_path, 'corpus_description.json'), 'r') as f: - corpus_description: Dict[str, Any] = json.load(f) + corpus_description: dict[str, Any] = json.load(f) module_paths = corpus_description['modules'] if len(module_paths) == 0: @@ -382,7 +383,7 @@ def get_cmdline(name: str): def reset(self): self._sampler.reset() - def sample(self, k: int, sort: bool = False) -> List[ModuleSpec]: + def sample(self, k: int, sort: bool = False) -> list[ModuleSpec]: """Samples `k` module_specs, optionally sorting by size descending. Use load_module_spec to get LoadedModuleSpecs - this allows the user to @@ -423,8 +424,8 @@ def __len__(self): def create_corpus_for_testing(location: str, - elements: List[ModuleSpec], - cmdline: Tuple[str, ...] = ('-cc1',), + elements: list[ModuleSpec], + cmdline: tuple[str, ...] = ('-cc1',), cmdline_is_override=False, is_thinlto=False, **kwargs) -> Corpus: diff --git a/compiler_opt/rl/data_collector.py b/compiler_opt/rl/data_collector.py index ac5412a1..062504d4 100644 --- a/compiler_opt/rl/data_collector.py +++ b/compiler_opt/rl/data_collector.py @@ -15,7 +15,7 @@ import abc import time -from typing import Dict, Iterator, Tuple, Sequence +from collections.abc import Iterator, Sequence import numpy as np from compiler_opt.rl import policy_saver @@ -36,7 +36,7 @@ 60, 70, 80, 90, 95, 99, 99.5, 99.9) -def build_distribution_monitor(data: Sequence[float]) -> Dict[str, float]: +def build_distribution_monitor(data: Sequence[float]) -> dict[str, float]: if not data: return {} quantiles = np.percentile(data, REWARD_QUANTILE_MONITOR, method='lower') @@ -53,7 +53,7 @@ class DataCollector(metaclass=abc.ABCMeta): @abc.abstractmethod def collect_data( self, policy: policy_saver.Policy, model_id: int - ) -> Tuple[Iterator[trajectory.Trajectory], Dict[str, Dict[str, float]]]: + ) -> tuple[Iterator[trajectory.Trajectory], dict[str, dict[str, float]]]: """Collect data for a given policy. Args: @@ -85,7 +85,7 @@ def __init__( self, # pylint: disable=dangerous-default-value num_modules: int, deadline: float = DEADLINE_IN_SECONDS, - thresholds: Tuple[Tuple[float, float], ...] = WAIT_TERMINATION): + thresholds: tuple[tuple[float, float], ...] = WAIT_TERMINATION): """Initializes the early exit checker. Args: diff --git a/compiler_opt/rl/data_reader.py b/compiler_opt/rl/data_reader.py index 34e1eb8d..bf33b83c 100644 --- a/compiler_opt/rl/data_reader.py +++ b/compiler_opt/rl/data_reader.py @@ -13,7 +13,7 @@ # limitations under the License. """util function to create training datasets.""" -from typing import Callable, List +from collections.abc import Callable import tensorflow as tf from tf_agents.trajectories import trajectory @@ -87,7 +87,7 @@ def _parser_fn(serialized_proto): def create_flat_sequence_example_dataset_fn( agent_cfg: agent_config.AgentConfig -) -> Callable[[List[str]], tf.data.Dataset]: +) -> Callable[[list[str]], tf.data.Dataset]: """Get a function that creates a dataset from serialized sequence examples. The dataset is "flat" insofar as it does not batch for sequence length nor @@ -124,7 +124,7 @@ def _sequence_example_dataset_fn(sequence_examples): def create_sequence_example_dataset_fn( agent_cfg: agent_config.AgentConfig, batch_size: int, - train_sequence_length: int) -> Callable[[List[str]], tf.data.Dataset]: + train_sequence_length: int) -> Callable[[list[str]], tf.data.Dataset]: """Get a function that creates a dataset from serialized sequence examples. Args: @@ -163,7 +163,7 @@ def create_file_dataset_fn( agent_cfg: agent_config.AgentConfig, batch_size: int, train_sequence_length: int, - input_dataset) -> Callable[[List[str]], tf.data.Dataset]: + input_dataset) -> Callable[[list[str]], tf.data.Dataset]: """Get a function that creates an dataset from files. Args: @@ -214,7 +214,7 @@ def _file_dataset_fn(data_path): def create_tfrecord_dataset_fn( agent_cfg: agent_config.AgentConfig, batch_size: int, - train_sequence_length: int) -> Callable[[List[str]], tf.data.Dataset]: + train_sequence_length: int) -> Callable[[list[str]], tf.data.Dataset]: """Get a function that creates an dataset from tfrecord. Args: diff --git a/compiler_opt/rl/distributed/agent.py b/compiler_opt/rl/distributed/agent.py index f616f43a..c87bc2f1 100644 --- a/compiler_opt/rl/distributed/agent.py +++ b/compiler_opt/rl/distributed/agent.py @@ -13,8 +13,6 @@ # limitations under the License. """PPO agent definition and utility functions.""" -from typing import Optional, Tuple - from absl import logging import tensorflow as tf @@ -51,22 +49,22 @@ class MLGOPPOAgent(ppo_agent.PPOAgent): def __init__(self, time_step_spec: ts.TimeStep, action_spec: types.NestedTensorSpec, - optimizer: Optional[types.Optimizer] = None, - actor_net: Optional[network.Network] = None, - value_net: Optional[network.Network] = None, + optimizer: types.Optimizer | None = None, + actor_net: network.Network | None = None, + value_net: network.Network | None = None, importance_ratio_clipping: types.Float = 0.2, discount_factor: types.Float = 1.0, entropy_regularization: types.Float = 0.01, value_pred_loss_coef: types.Float = 0.5, - gradient_clipping: Optional[types.Float] = 1.0, - value_clipping: Optional[types.Float] = None, + gradient_clipping: types.Float | None = 1.0, + value_clipping: types.Float | None = None, check_numerics: bool = False, debug_summaries: bool = False, summarize_grads_and_vars: bool = False, - train_step_counter: Optional[tf.Variable] = None, + train_step_counter: tf.Variable | None = None, aggregate_losses_across_replicas=False, loss_scaling_factor=1., - name: Optional[str] = 'PPOClipAgent'): + name: str | None = 'PPOClipAgent'): """Creates a PPO Agent implementing the clipped probability ratios. Args: @@ -153,7 +151,7 @@ def __init__(self, def compute_return_and_advantage( self, next_time_steps: ts.TimeStep, - value_preds: types.Tensor) -> Tuple[types.Tensor, types.Tensor]: + value_preds: types.Tensor) -> tuple[types.Tensor, types.Tensor]: """Compute the Monte Carlo return and advantage. Args: diff --git a/compiler_opt/rl/distributed/learner.py b/compiler_opt/rl/distributed/learner.py index 08998a2e..0145e93b 100644 --- a/compiler_opt/rl/distributed/learner.py +++ b/compiler_opt/rl/distributed/learner.py @@ -13,7 +13,7 @@ # limitations under the License. """Utility to create MLGO policy learner.""" -from typing import Callable, List, Optional, Tuple +from collections.abc import Callable from absl import logging @@ -25,11 +25,11 @@ # A function which processes a tuple of a nested tensor representing a TF-Agent # Trajectory and Reverb SampleInfo. -_SequenceParamsType = Tuple[types.NestedTensor, types.ReverbSampleInfo] +_SequenceParamsType = tuple[types.NestedTensor, types.ReverbSampleInfo] _SequenceFnType = Callable[[_SequenceParamsType], _SequenceParamsType] -class MLGOPPOLearner(object): +class MLGOPPOLearner: """Manages all the learning details needed. These include: @@ -56,12 +56,11 @@ def __init__(self, minibatch_size: int, shuffle_buffer_size: int, num_epochs: int = 1, - triggers: Optional[List[ - interval_trigger.IntervalTrigger]] = None, + triggers: list[interval_trigger.IntervalTrigger] | None = None, checkpoint_interval: int = 100000, summary_interval: int = 1000, - strategy: Optional[tf.distribute.Strategy] = None, - per_sequence_fn: Optional[_SequenceFnType] = None, + strategy: tf.distribute.Strategy | None = None, + per_sequence_fn: _SequenceFnType | None = None, allow_variable_length_episodes: bool = False) -> None: """Initializes a MLGOPPOLearner instance. diff --git a/compiler_opt/rl/distributed/ppo_collect_lib.py b/compiler_opt/rl/distributed/ppo_collect_lib.py index 828e71d9..f38469f6 100644 --- a/compiler_opt/rl/distributed/ppo_collect_lib.py +++ b/compiler_opt/rl/distributed/ppo_collect_lib.py @@ -15,7 +15,6 @@ import collections import os -from typing import List, Optional import tempfile import functools @@ -102,7 +101,7 @@ def observe(self, result: compilation_runner.CompilationResult) -> None: def collect(corpus_path: str, replay_buffer_server_address: str, - variable_container_server_address: str, num_workers: Optional[int], + variable_container_server_address: str, num_workers: int | None, worker_manager_class, sequence_length: int) -> None: """Collects experience using a policy updated after every episode. @@ -154,7 +153,7 @@ def collect(corpus_path: str, replay_buffer_server_address: str, dataset_fn = data_reader.create_flat_sequence_example_dataset_fn( agent_cfg=agent_cfg) - def sequence_example_iterator_fn(seq_ex: List[str]): + def sequence_example_iterator_fn(seq_ex: list[str]): return iter(dataset_fn(seq_ex).prefetch(tf.data.AUTOTUNE)) cps = corpus.Corpus( @@ -195,9 +194,8 @@ def sequence_example_iterator_fn(seq_ex: List[str]): @gin.configurable def run_collect(root_dir: str, corpus_path: str, replay_buffer_server_address: str, - variable_container_server_address: str, - num_workers: Optional[int], worker_manager_class, - sequence_length: int): + variable_container_server_address: str, num_workers: int | None, + worker_manager_class, sequence_length: int): """Collects experience using a policy updated after every episode. Waits for a policy to be saved in root_dir before beginning collection. diff --git a/compiler_opt/rl/distributed/ppo_eval_lib.py b/compiler_opt/rl/distributed/ppo_eval_lib.py index 3816ec6c..8205eb21 100644 --- a/compiler_opt/rl/distributed/ppo_eval_lib.py +++ b/compiler_opt/rl/distributed/ppo_eval_lib.py @@ -17,7 +17,6 @@ import collections import os import time -from typing import List, Optional from absl import logging @@ -39,7 +38,7 @@ def evaluate(root_dir: str, corpus_path: str, - variable_container_server_address: str, num_workers: Optional[int], + variable_container_server_address: str, num_workers: int | None, worker_manager_class): """Evaluate a given policy on the given corpus. @@ -86,7 +85,7 @@ def evaluate(root_dir: str, corpus_path: str, dataset_fn = data_reader.create_flat_sequence_example_dataset_fn( agent_cfg=agent_cfg) - def sequence_example_iterator_fn(seq_ex: List[str]): + def sequence_example_iterator_fn(seq_ex: list[str]): return iter(dataset_fn(seq_ex).prefetch(tf.data.AUTOTUNE)) cps = corpus.Corpus( @@ -176,7 +175,7 @@ def sequence_example_iterator_fn(seq_ex: List[str]): def run_evaluate(root_dir: str, corpus_path: str, variable_container_server_address: str, - num_workers: Optional[int], worker_manager_class): + num_workers: int | None, worker_manager_class): """Wait for the collect policy to be ready and run collect job.""" # Wait for the collect policy to become available, then load it. policy_dir = os.path.join(root_dir, learner.POLICY_SAVED_MODEL_DIR, diff --git a/compiler_opt/rl/env.py b/compiler_opt/rl/env.py index a858f25f..de4ca4a6 100644 --- a/compiler_opt/rl/env.py +++ b/compiler_opt/rl/env.py @@ -23,7 +23,7 @@ import contextlib import io import os -from typing import Callable, Generator, List, Optional, Tuple, Type +from collections.abc import Callable, Generator import numpy as np @@ -40,14 +40,14 @@ class StepType(Enum): @dataclasses.dataclass class TimeStep: - obs: Optional[dict[str, np.NDArray]] - reward: Optional[dict[str, float]] - score_policy: Optional[dict[str, float]] - score_default: Optional[dict[str, float]] - context: Optional[str] + obs: dict[str, np.NDArray] | None + reward: dict[str, float] | None + score_policy: dict[str, float] | None + score_default: dict[str, float] | None + context: str | None module_name: str working_dir: str - obs_id: Optional[int] + obs_id: int | None step_type: StepType @@ -67,9 +67,9 @@ class MLGOTask(metaclass=abc.ABCMeta): """ @abc.abstractmethod - def get_cmdline(self, clang_path: str, base_args: List[str], - interactive_base_path: Optional[str], - working_dir: str) -> List[str]: + def get_cmdline(self, clang_path: str, base_args: list[str], + interactive_base_path: str | None, + working_dir: str) -> list[str]: """Get the cmdline for building with this task. The resulting list[str] should be able to be passed to subprocess.run to @@ -122,7 +122,7 @@ def __init__(self, proc: subprocess.Popen, self._module_name = module_name self._working_dir = working_dir - def get_scores(self, timeout: Optional[int] = None): + def get_scores(self, timeout: int | None = None): self._proc.wait(timeout=timeout) return self._get_scores_fn() @@ -222,9 +222,9 @@ def _reward_fn(a: float, b: float) -> float: def clang_session( clang_path: str, module: corpus.LoadedModuleSpec, - task_type: Type[MLGOTask], + task_type: type[MLGOTask], *, - explicit_temps_dir: Optional[str] = None, + explicit_temps_dir: str | None = None, interactive: bool, ): """Context manager for clang session. @@ -292,11 +292,11 @@ def _get_scores() -> dict[str, float]: def _get_clang_generator( clang_path: str, - task_type: Type[MLGOTask], - explicit_temps_dir: Optional[str] = None, + task_type: type[MLGOTask], + explicit_temps_dir: str | None = None, interactive_only: bool = False, -) -> Generator[Optional[Tuple[ClangProcess, InteractiveClang]], - Optional[corpus.LoadedModuleSpec], None]: +) -> Generator[tuple[ClangProcess, InteractiveClang] | None, + corpus.LoadedModuleSpec | None, None]: """Returns a tuple of generators for creating InteractiveClang objects. Args: @@ -351,10 +351,10 @@ def __init__( self, *, clang_path: str, - task_type: Type[MLGOTask], + task_type: type[MLGOTask], obs_spec, action_spec, - explicit_temps_dir: Optional[str] = None, + explicit_temps_dir: str | None = None, interactive_only: bool = False, ): self._clang_generator = _get_clang_generator( @@ -365,8 +365,8 @@ def __init__( self._obs_spec = obs_spec self._action_spec = action_spec - self._iclang: Optional[InteractiveClang] = None - self._clang: Optional[ClangProcess] = None + self._iclang: InteractiveClang | None = None + self._clang: ClangProcess | None = None @property def obs_spec(self): diff --git a/compiler_opt/rl/env_test.py b/compiler_opt/rl/env_test.py index c7f499d7..24f6ec37 100644 --- a/compiler_opt/rl/env_test.py +++ b/compiler_opt/rl/env_test.py @@ -22,8 +22,6 @@ import tempfile from absl.testing import flagsaver -from typing import Dict, List, Optional - import tensorflow as tf import numpy as np @@ -45,9 +43,9 @@ class MockTask(env.MLGOTask): """Implementation of mock task for testing.""" - def get_cmdline(self, clang_path: str, base_args: List[str], - interactive_base_path: Optional[str], - working_dir: str) -> List[str]: + def get_cmdline(self, clang_path: str, base_args: list[str], + interactive_base_path: str | None, + working_dir: str) -> list[str]: if interactive_base_path: interactive_args = [ f'--interactive={interactive_base_path}', @@ -56,7 +54,7 @@ def get_cmdline(self, clang_path: str, base_args: List[str], interactive_args = [] return [clang_path] + base_args + interactive_args - def get_module_scores(self, working_dir: str) -> Dict[str, float]: + def get_module_scores(self, working_dir: str) -> dict[str, float]: return {'default': 47} diff --git a/compiler_opt/rl/feature_ops.py b/compiler_opt/rl/feature_ops.py index 6d39515a..94422631 100644 --- a/compiler_opt/rl/feature_ops.py +++ b/compiler_opt/rl/feature_ops.py @@ -16,7 +16,7 @@ import os import re -from typing import List, Callable, Optional +from collections.abc import Callable import numpy as np import tensorflow.compat.v2 as tf @@ -50,12 +50,12 @@ def identity_fn(obs: types.Float): return tf.cast(tf.expand_dims(obs, -1), tf.float32) -def get_normalize_fn(quantile: List[float], +def get_normalize_fn(quantile: list[float], with_sqrt: bool, with_z_score_normalization: bool, eps: float = 1e-8, - preprocessing_fn: Optional[Callable[[types.Tensor], - types.Float]] = None): + preprocessing_fn: Callable[[types.Tensor], types.Float] + | None = None): """Return a normalization function to normalize the input feature.""" if not preprocessing_fn: diff --git a/compiler_opt/rl/imitation_learning/generate_bc_trajectories_lib.py b/compiler_opt/rl/imitation_learning/generate_bc_trajectories_lib.py index 1abd2052..a5868841 100644 --- a/compiler_opt/rl/imitation_learning/generate_bc_trajectories_lib.py +++ b/compiler_opt/rl/imitation_learning/generate_bc_trajectories_lib.py @@ -16,7 +16,8 @@ import concurrent.futures import contextlib import gin -from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Generator, Union +from typing import Any +from collections.abc import Callable, Generator import json from absl import flags @@ -53,7 +54,7 @@ FLAGS = flags.FLAGS -ProfilingDictValueType = Dict[str, Union[str, float, int]] +ProfilingDictValueType = dict[str, str | float | int] @dataclasses.dataclass @@ -140,7 +141,7 @@ def add_string_feature( def add_feature_list(seq_example: tf.train.SequenceExample, - feature_list: List[Any], feature_name: str): + feature_list: list[Any], feature_name: str): """Add the feature_list to the sequence example under feature name. Args: @@ -153,12 +154,12 @@ def add_feature_list(seq_example: tf.train.SequenceExample, np.dtype(np.float32), str, ]): - raise AssertionError((f'Unsupported type for feature {feature_name}' - f' of type {type(feature_list[0])}. ' - 'Supported types are np.int64, np.float32, str')) + raise AssertionError(f'Unsupported type for feature {feature_name}' + f' of type {type(feature_list[0])}. ' + 'Supported types are np.int64, np.float32, str') if isinstance(feature_list[0], np.float32): add_function = add_float_feature - elif isinstance(feature_list[0], (int, np.int64)): + elif isinstance(feature_list[0], int | np.int64): add_function = add_int_feature else: add_function = add_string_feature @@ -187,7 +188,7 @@ def wrap_function(*args, **kwargs): def policy_distr_wrapper( tf_policy: policies.TFPolicy -) -> Callable[[time_step.TimeStep, Optional[tf_types.NestedTensor]], +) -> Callable[[time_step.TimeStep, tf_types.NestedTensor | None], policy_step.PolicyStep]: """Return a wrapper for a loaded tf policy distribution. @@ -233,21 +234,21 @@ class ExplorationWithPolicy: def __init__( self, - replay_prefix: List[np.ndarray], + replay_prefix: list[np.ndarray], policy: Callable[[time_step.TimeStep], np.ndarray], explore_policy: Callable[[time_step.TimeStep], policy_step.PolicyStep], - explore_on_features: Optional[Dict[str, Callable[[tf.Tensor], - bool]]] = None, + explore_on_features: dict[str, Callable[[tf.Tensor], bool]] | None = None, ): self._explore_step: int = len(replay_prefix) - 1 - self._explore_state: Optional[time_step.TimeStep] = None + self._explore_state: time_step.TimeStep | None = None self._replay_prefix = replay_prefix self._policy = policy self._explore_policy = explore_policy self._curr_step = 0 self._gap = np.inf - self._explore_on_features: Optional[Dict[str, Callable[ - [tf.Tensor], bool]]] = explore_on_features + self._explore_on_features: dict[str, + Callable[[tf.Tensor], + bool]] | None = explore_on_features self._stop_exploration = False def _compute_gap(self, distr: np.ndarray) -> np.float32: @@ -259,7 +260,7 @@ def _compute_gap(self, distr: np.ndarray) -> np.float32: def get_explore_step(self) -> int: return self._explore_step - def get_explore_state(self) -> Optional[time_step.TimeStep]: + def get_explore_state(self) -> time_step.TimeStep | None: return self._explore_state def get_advice(self, state: time_step.TimeStep) -> np.ndarray: @@ -318,18 +319,15 @@ def __init__( self, loaded_module_spec: corpus.LoadedModuleSpec, clang_path: str, - mlgo_task_type: Type[env.MLGOTask], + mlgo_task_type: type[env.MLGOTask], exploration_frac: float = 1.0, max_exploration_steps: int = 10, max_horizon_to_explore=np.inf, - explore_on_features: Optional[Dict[str, Callable[[tf.Tensor], - bool]]] = None, - obs_action_specs: Optional[Tuple[ - time_step.TimeStep, - tensor_spec.BoundedTensorSpec, - ]] = None, + explore_on_features: dict[str, Callable[[tf.Tensor], bool]] | None = None, + obs_action_specs: tuple[time_step.TimeStep, tensor_spec.BoundedTensorSpec] + | None = None, reward_key: str = '', - explicit_temps_dir: Optional[str] = None, + explicit_temps_dir: str | None = None, **kwargs, ): self._loaded_module_spec = loaded_module_spec @@ -361,8 +359,8 @@ def __init__( if self._env.action_spec: if self._env.action_spec.dtype != tf.int64: raise TypeError( - ('Environment action_spec type ' - f'{self._env.action_spec.dtype} does not match tf.int64')) + 'Environment action_spec type ' + f'{self._env.action_spec.dtype} does not match tf.int64') self._exploration_frac = exploration_frac self._max_exploration_steps = max_exploration_steps self._max_horizon_to_explore = max_horizon_to_explore @@ -371,7 +369,7 @@ def __init__( def compile_module( self, - policy: Callable[[Optional[time_step.TimeStep]], np.ndarray], + policy: Callable[[time_step.TimeStep | None], np.ndarray], ) -> tf.train.SequenceExample: """Compiles the module with the given policy and outputs a seq. example. @@ -411,11 +409,11 @@ def compile_module( working_dir_head = os.path.split(self._working_dir)[0] shutil.rmtree(working_dir_head) if horizon <= 0: - raise ValueError(('Policy did not take any inlining decision for module ' - f'{self._loaded_module_spec.name}.')) + raise ValueError('Policy did not take any inlining decision for module ' + f'{self._loaded_module_spec.name}.') if curr_obs_dict.step_type != env.StepType.LAST: - raise ValueError(('Compilation loop exited at step type' - f'{curr_obs_dict.step_type} before last step')) + raise ValueError('Compilation loop exited at step type' + f'{curr_obs_dict.step_type} before last step') reward = curr_obs_dict.score_policy[self._reward_key] reward_list = np.float32(reward) * np.float32(np.ones(horizon)) add_feature_list(sequence_example, reward_list, @@ -427,10 +425,10 @@ def compile_module( def explore_function( self, - policy: Callable[[Optional[time_step.TimeStep]], np.ndarray], - explore_policy: Optional[Callable[[time_step.TimeStep], - policy_step.PolicyStep]] = None, - ) -> Tuple[List[tf.train.SequenceExample], List[str], int, float]: + policy: Callable[[time_step.TimeStep | None], np.ndarray], + explore_policy: Callable[[time_step.TimeStep], policy_step.PolicyStep] + | None = None, + ) -> tuple[list[tf.train.SequenceExample], list[str], int, float]: """Explores the module using the given policy and the exploration distr. Args: @@ -514,13 +512,13 @@ def explore_function( def explore_at_state_generator( self, - replay_prefix: List[np.ndarray], + replay_prefix: list[np.ndarray], explore_step: int, explore_state: time_step.TimeStep, - policy: Callable[[Optional[time_step.TimeStep]], np.ndarray], + policy: Callable[[time_step.TimeStep | None], np.ndarray], explore_policy: Callable[[time_step.TimeStep], policy_step.PolicyStep], num_samples: int = 1, - ) -> Generator[Tuple[tf.train.SequenceExample, ExplorationWithPolicy], None, + ) -> Generator[tuple[tf.train.SequenceExample, ExplorationWithPolicy], None, None]: """Generate sequence examples and next exploration policy while exploring. @@ -592,16 +590,16 @@ def _process_obs(self, curr_obs, sequence_example): else: if curr_obs_feature_name not in self._env.obs_spec.keys(): raise AssertionError( - (f'Feature name {curr_obs_feature_name} not in obs_spec {1}' - f'{self._env.obs_spec.keys()}')) + f'Feature name {curr_obs_feature_name} not in obs_spec {1}' + f'{self._env.obs_spec.keys()}') if curr_obs_feature_name in [ SequenceExampleFeatureNames.action, SequenceExampleFeatureNames.reward, SequenceExampleFeatureNames.module_name ]: raise AssertionError( - (f'Feature name {curr_obs_feature_name} part of ' - f'SequenceExampleFeatureNames {self._env.obs_spec.keys()}')) + f'Feature name {curr_obs_feature_name} part of ' + f'SequenceExampleFeatureNames {self._env.obs_spec.keys()}') obs_dtype = self._env.obs_spec[curr_obs_feature_name].dtype curr_obs_feature = curr_obs[curr_obs_feature_name] curr_obs[curr_obs_feature_name] = tf.convert_to_tensor( @@ -613,11 +611,11 @@ def _process_obs(self, curr_obs, sequence_example): class ModuleWorkerResultProcessor: """Utility class to process ModuleExplorer results for ModuleWorker.""" - def __init__(self, persistent_objects_path: Optional[str] = None): + def __init__(self, persistent_objects_path: str | None = None): self._persistent_objects_path = persistent_objects_path def _partition_for_loss(self, seq_example: tf.train.SequenceExample, - partitions: List[float], label_name: str): + partitions: list[float], label_name: str): """Adds a feature to seq_example to partition the examples into buckets. Given a tuple of partition limits (a_1, a_2, ..., a_n) we create n+1 @@ -638,11 +636,11 @@ def _partition_for_loss(self, seq_example: tf.train.SequenceExample, def process_succeeded( self, - succeeded: List[Tuple[List, List[str], int, float]], + succeeded: list[tuple[list, list[str], int, float]], spec_name: str, - partitions: List[float], + partitions: list[float], label_name: str = SequenceExampleFeatureNames.label_name - ) -> Tuple[tf.train.SequenceExample, ProfilingDictValueType, + ) -> tuple[tf.train.SequenceExample, ProfilingDictValueType, ProfilingDictValueType]: seq_example_list = [exploration_res[0] for exploration_res in succeeded] working_dir_list = [(exploration_res[1], exploration_res[2]) @@ -749,22 +747,19 @@ class ModuleWorker(worker.Worker): def __init__( # pylint: disable=dangerous-default-value self, - module_explorer_type: Type[ModuleExplorer] = ModuleExplorer, + module_explorer_type: type[ModuleExplorer] = ModuleExplorer, clang_path: str = gin.REQUIRED, - mlgo_task_type: Type[env.MLGOTask] = gin.REQUIRED, - policy_paths: List[Optional[str]] = [], + mlgo_task_type: type[env.MLGOTask] = gin.REQUIRED, + policy_paths: list[str | None] = [], exploration_frac: float = 1.0, max_exploration_steps: int = 7, - callable_policies: List[Optional[Callable[[Any], np.ndarray]]] = [], - exploration_policy_paths: Optional[str] = None, - explore_on_features: Optional[Dict[str, Callable[[tf.Tensor], - bool]]] = None, - obs_action_specs: Optional[Tuple[ - time_step.TimeStep, - tensor_spec.BoundedTensorSpec, - ]] = None, - persistent_objects_path: Optional[str] = None, - partitions: List[float] = [ + callable_policies: list[Callable[[Any], np.ndarray] | None] = [], + exploration_policy_paths: str | None = None, + explore_on_features: dict[str, Callable[[tf.Tensor], bool]] | None = None, + obs_action_specs: tuple[time_step.TimeStep, tensor_spec.BoundedTensorSpec] + | None = None, + persistent_objects_path: str | None = None, + partitions: list[float] = [ 0., ], **envargs, @@ -773,23 +768,24 @@ def __init__( raise AssertionError("""At least one policy needs to be specified in policy paths or callable_policies""") logging.info('Environment args: %s', envargs) - self._module_explorer_type: Type[ModuleExplorer] = module_explorer_type + self._module_explorer_type: type[ModuleExplorer] = module_explorer_type self._clang_path: str = clang_path - self._mlgo_task_type: Type[env.MLGOTask] = mlgo_task_type - self._policy_paths: List[Optional[str]] = policy_paths - self._exploration_policy_paths: Optional[str] = exploration_policy_paths + self._mlgo_task_type: type[env.MLGOTask] = mlgo_task_type + self._policy_paths: list[str | None] = policy_paths + self._exploration_policy_paths: str | None = exploration_policy_paths self._exploration_frac: float = exploration_frac self._max_exploration_steps: int = max_exploration_steps - self._tf_policy_action: List[Optional[Callable[[Any], np.ndarray]]] = [] - self._exploration_policy_distrs: List[Optional[Callable[ - [time_step.TimeStep, Optional[tf_types.NestedTensor]], - policy_step.PolicyStep]]] = [ + self._tf_policy_action: list[Callable[[Any], np.ndarray] | None] = [] + self._exploration_policy_distrs: list[Callable[ + [time_step.TimeStep, tf_types.NestedTensor | None], + policy_step.PolicyStep] | None] = [ None for _ in range(len(policy_paths) + len(callable_policies)) ] - self._explore_on_features: Optional[Dict[str, Callable[ - [tf.Tensor], bool]]] = explore_on_features - self._obs_action_specs: Optional[Tuple[ - time_step.TimeStep, tensor_spec.BoundedTensorSpec]] = obs_action_specs + self._explore_on_features: dict[str, + Callable[[tf.Tensor], + bool]] | None = explore_on_features + self._obs_action_specs: tuple[time_step.TimeStep, tensor_spec + .BoundedTensorSpec] | None = obs_action_specs self._mw_utility = ModuleWorkerResultProcessor(persistent_objects_path) self._persistent_objects_path = persistent_objects_path self._partitions = partitions @@ -804,9 +800,9 @@ def __init__( if len(exploration_policy_paths) > (len(policy_paths) + len(callable_policies)): raise AssertionError( - (f'Number of exploration policies: {len(exploration_policy_paths)},' - 'greater than number of policies: ' - f'{len(policy_paths) + len(callable_policies)}')) + f'Number of exploration policies: {len(exploration_policy_paths)},' + 'greater than number of policies: ' + f'{len(policy_paths) + len(callable_policies)}') self._exploration_policy_distrs = [] for exploration_policy_path in exploration_policy_paths: expl_policy = tf.saved_model.load( @@ -828,7 +824,7 @@ def __init__( def select_best_exploration( self, loaded_module_spec: corpus.LoadedModuleSpec, - ) -> Tuple[Tuple[int, ProfilingDictValueType, ProfilingDictValueType], + ) -> tuple[tuple[int, ProfilingDictValueType, ProfilingDictValueType], tf.train.SequenceExample]: num_calls = len(self._tf_policy_action) @@ -890,18 +886,17 @@ def select_best_exploration( def gen_trajectories( # pylint: disable=dangerous-default-value data_path: str = gin.REQUIRED, - delete_flags: Tuple[str, ...] = (), + delete_flags: tuple[str, ...] = (), output_file_name: str = gin.REQUIRED, output_path: str = gin.REQUIRED, - mlgo_task_type: Type[env.MLGOTask] = gin.REQUIRED, - callable_policies: List[Optional[Callable[[Any], np.ndarray]]] = [], - explore_on_features: Optional[Dict[str, Callable[[tf.Tensor], - bool]]] = None, - obs_action_spec: Optional[Tuple[time_step.TimeStep, - tensor_spec.BoundedTensorSpec]] = None, - num_workers: Optional[int] = None, + mlgo_task_type: type[env.MLGOTask] = gin.REQUIRED, + callable_policies: list[Callable[[Any], np.ndarray] | None] = [], + explore_on_features: dict[str, Callable[[tf.Tensor], bool]] | None = None, + obs_action_spec: tuple[time_step.TimeStep, tensor_spec.BoundedTensorSpec] + | None = None, + num_workers: int | None = None, num_output_files: int = 1, - profiling_file_path: Optional[str] = None, + profiling_file_path: str | None = None, worker_wait_sec: int = 100, worker_class_type=ModuleWorker, worker_manager_class=local_worker_manager.LocalWorkerPoolManager, @@ -953,8 +948,8 @@ def gen_trajectories( total_successful_examples = 0 total_work = len(corpus_elements) total_failed_examples = 0 - total_profiles_max: List[Optional[ProfilingDictValueType]] = [] - total_profiles_pol: List[Optional[ProfilingDictValueType]] = [] + total_profiles_max: list[ProfilingDictValueType | None] = [] + total_profiles_pol: list[ProfilingDictValueType | None] = [] size_per_file = total_work // num_output_files worker_count = ( diff --git a/compiler_opt/rl/imitation_learning/generate_bc_trajectories_test.py b/compiler_opt/rl/imitation_learning/generate_bc_trajectories_test.py index d4238b1c..f988c828 100644 --- a/compiler_opt/rl/imitation_learning/generate_bc_trajectories_test.py +++ b/compiler_opt/rl/imitation_learning/generate_bc_trajectories_test.py @@ -17,7 +17,6 @@ from absl import app import gin import json -from typing import List from unittest import mock import os @@ -40,7 +39,7 @@ _eps = 1e-5 -def _get_state_list() -> List[time_step.TimeStep]: +def _get_state_list() -> list[time_step.TimeStep]: state_0 = time_step.TimeStep( discount=tf.constant(np.array([0.]), dtype=tf.float32), diff --git a/compiler_opt/rl/imitation_learning/weighted_bc_trainer_lib.py b/compiler_opt/rl/imitation_learning/weighted_bc_trainer_lib.py index c72d23d8..8e0cca2e 100644 --- a/compiler_opt/rl/imitation_learning/weighted_bc_trainer_lib.py +++ b/compiler_opt/rl/imitation_learning/weighted_bc_trainer_lib.py @@ -15,8 +15,6 @@ from absl import flags -from typing import List, Optional - import bisect import copy import gin @@ -65,18 +63,18 @@ def __init__( # pylint: disable=dangerous-default-value self, partitions: list[float] = [0.], - weights: Optional[np.ndarray] = None): + weights: np.ndarray | None = None): self._weights = weights if not weights: self._weights = np.ones(len(partitions) + 1) self._probs: np.ndarray = np.exp(self._weights) / np.sum( np.exp(self._weights)) - self._partitions: List[float] = partitions + self._partitions: list[float] = partitions self._round: int = 1 def _bucket_by_feature( - self, data: List[ProfilingDictValueType], - feature_name: str) -> List[List[ProfilingDictValueType]]: + self, data: list[ProfilingDictValueType], + feature_name: str) -> list[list[ProfilingDictValueType]]: """Partitions the profiles according to the feature name. Partitions the profiles according to the feature name and the @@ -112,9 +110,9 @@ def _get_exp_gradient_step(self, loss, step_size) -> np.ndarray: return np.exp(self._weights) / np.sum(np.exp(self._weights)) def create_new_profile(self, - data_comparator: List[ProfilingDictValueType], - data_eval: List[ProfilingDictValueType], - eps: float = 1e-5) -> List[ProfilingDictValueType]: + data_comparator: list[ProfilingDictValueType], + data_eval: list[ProfilingDictValueType], + eps: float = 1e-5) -> list[ProfilingDictValueType]: """Create a new profile which contains the regret and relative reward. The regret is measured as the difference between the loss of the data_eval @@ -142,12 +140,12 @@ def create_new_profile(self, logging.error('KeyError: %s', k) continue if isinstance(prof[SequenceExampleFeatureNames.loss], str): - raise ValueError(('prof[SequenceExampleFeatureNames.loss] is a string' - 'but it should be numeric.')) + raise ValueError('prof[SequenceExampleFeatureNames.loss] is a string' + 'but it should be numeric.') if isinstance(new_prof[SequenceExampleFeatureNames.loss], str): raise ValueError( - ('new_prof[SequenceExampleFeatureNames.loss] is a string' - 'but it should be numeric.')) + 'new_prof[SequenceExampleFeatureNames.loss] is a string' + 'but it should be numeric.') new_prof[SequenceExampleFeatureNames .regret] = new_prof[SequenceExampleFeatureNames.loss] - prof[ SequenceExampleFeatureNames.loss] @@ -158,8 +156,8 @@ def create_new_profile(self, return list(func_key_dict.values()) def update_weights( - self, comparator_profile: List[ProfilingDictValueType], - policy_profile: List[ProfilingDictValueType]) -> np.ndarray: + self, comparator_profile: list[ProfilingDictValueType], + policy_profile: list[ProfilingDictValueType]) -> np.ndarray: """Constructs a new profile and uses the loss to update self._probs with EG. Args: @@ -209,13 +207,12 @@ def __init__( batch_size: int = 128, epochs: int = 1, log_interval: int = 1000, - optimizer: Optional[keras.optimizers.Optimizer] = None, - save_model_dir: Optional[str] = None, + optimizer: keras.optimizers.Optimizer | None = None, + save_model_dir: str | None = None, shuffle_size: int = 131072, - training_weights: Optional[TrainingWeights] = None, - features_to_remove: Optional[List[str]] = [ - 'policy_label', 'inlining_default' - ]): + training_weights: TrainingWeights | None = None, + features_to_remove: list[str] + | None = ['policy_label', 'inlining_default']): self._width = width self._layers = layers self._batch_size = batch_size @@ -305,7 +302,7 @@ def _make_feature_label(self, parsed_example, num_processors): logging.warning('Feature %s is nan', name) return tf.concat(concat_arr, -1), tf.concat([label, weight_label], -1) - def load_dataset(self, filepaths: List[str]) -> tf.data.TFRecordDataset: + def load_dataset(self, filepaths: list[str]) -> tf.data.TFRecordDataset: """Load datasets from specified filepaths for training. Args: @@ -380,7 +377,7 @@ def _train_step(self, example, label, weight_labels, weights_arr): self._update_metrics(y_true, y_pred, loss_value, weights) return loss_value - def train(self, filepaths: List[str]): + def train(self, filepaths: list[str]): """Train the model for number of the specified number of epochs.""" dataset = self.load_dataset(filepaths) logging.info('Datasets loaded from %s', str(filepaths)) @@ -427,7 +424,7 @@ def __init__( self, *args, keras_policy: tf.keras.Model, - features_to_remove: Optional[List[str]] = ['inlining_default'], + features_to_remove: list[str] | None = ['inlining_default'], **kwargs): super().__init__(*args, **kwargs) self._keras_policy = keras_policy @@ -463,7 +460,7 @@ def _create_action(self, inlining_prediction): def _action(self, time_step: ts.TimeStep, policy_state: types.NestedTensor, - seed: Optional[types.Seed] = None) -> policy_step.PolicyStep: + seed: types.Seed | None = None) -> policy_step.PolicyStep: new_observation = time_step.observation keras_model_input = self._process_observation(new_observation) inlining_predict = self._keras_policy(keras_model_input)[0] diff --git a/compiler_opt/rl/inlining/env.py b/compiler_opt/rl/inlining/env.py index 752d4620..80885090 100644 --- a/compiler_opt/rl/inlining/env.py +++ b/compiler_opt/rl/inlining/env.py @@ -21,8 +21,6 @@ from compiler_opt.rl import env from compiler_opt.rl.inlining import config -from typing import Dict, List, Optional - _COMPILED_MODULE_NAME = 'compiled_module' @@ -34,9 +32,9 @@ def __init__(self, llvm_size_path: str): super().__init__() self._llvm_size_path = llvm_size_path - def get_cmdline(self, clang_path: str, base_args: List[str], - interactive_base_path: Optional[str], - working_dir: str) -> List[str]: + def get_cmdline(self, clang_path: str, base_args: list[str], + interactive_base_path: str | None, + working_dir: str) -> list[str]: if interactive_base_path: interactive_args = [ '-mllvm', @@ -52,7 +50,7 @@ def get_cmdline(self, clang_path: str, base_args: List[str], return [clang_path ] + base_args + interactive_args + ['-o', compiled_module_path] - def get_module_scores(self, working_dir: str) -> Dict[str, float]: + def get_module_scores(self, working_dir: str) -> dict[str, float]: compiled_module_path = os.path.join(working_dir, _COMPILED_MODULE_NAME) cmdline = [self._llvm_size_path, compiled_module_path] completed_proc = subprocess.run(cmdline, capture_output=True, check=True) diff --git a/compiler_opt/rl/inlining/imitation_learning_config.py b/compiler_opt/rl/inlining/imitation_learning_config.py index ef0c4b03..14c460c4 100644 --- a/compiler_opt/rl/inlining/imitation_learning_config.py +++ b/compiler_opt/rl/inlining/imitation_learning_config.py @@ -14,7 +14,6 @@ """Module for collect data of inlining-for-size.""" import gin -from typing import Type import numpy as np import tensorflow as tf @@ -77,7 +76,7 @@ def get_input_signature(): @gin.register -def get_task_type() -> Type[env.InliningForSizeTask]: +def get_task_type() -> type[env.InliningForSizeTask]: """Returns the task type for the trajectory collection.""" return env.InliningForSizeTask diff --git a/compiler_opt/rl/inlining/inlining_runner.py b/compiler_opt/rl/inlining/inlining_runner.py index f6e5e6b8..8f247286 100644 --- a/compiler_opt/rl/inlining/inlining_runner.py +++ b/compiler_opt/rl/inlining/inlining_runner.py @@ -15,7 +15,6 @@ import os import tempfile -from typing import Dict, Tuple import gin import tensorflow as tf @@ -47,7 +46,7 @@ def __init__(self, llvm_size_path: str, *args, **kwargs): def compile_fn( self, command_line: corpus.FullyQualifiedCmdLine, tf_policy_path: str, reward_only: bool, - workdir: str) -> Dict[str, Tuple[tf.train.SequenceExample, float]]: + workdir: str) -> dict[str, tuple[tf.train.SequenceExample, float]]: """Run inlining for the given IR file under the given policy. Args: diff --git a/compiler_opt/rl/local_data_collector.py b/compiler_opt/rl/local_data_collector.py index b55e245c..116f9df2 100644 --- a/compiler_opt/rl/local_data_collector.py +++ b/compiler_opt/rl/local_data_collector.py @@ -16,7 +16,7 @@ import concurrent.futures import itertools import time -from typing import Callable, Dict, Iterator, List, Optional, Tuple +from collections.abc import Callable, Iterator from absl import logging from tf_agents.trajectories import trajectory @@ -38,10 +38,10 @@ def __init__( cps: corpus.Corpus, num_modules: int, worker_pool: worker.WorkerPool, - parser: Callable[[List[str]], Iterator[trajectory.Trajectory]], - reward_stat_map: Dict[str, Optional[Dict[str, - compilation_runner.RewardStat]]], - best_trajectory_repo: Optional[best_trajectory.BestTrajectoryRepo], + parser: Callable[[list[str]], Iterator[trajectory.Trajectory]], + reward_stat_map: dict[str, + dict[str, compilation_runner.RewardStat] | None], + best_trajectory_repo: best_trajectory.BestTrajectoryRepo | None, exit_checker_ctor=data_collector.EarlyExitChecker): # TODO(mtrofin): type exit_checker_ctor when we get typing.Protocol support super().__init__() @@ -50,7 +50,7 @@ def __init__( self._num_modules = num_modules self._parser = parser self._worker_pool = worker_pool - self._workers: List[ + self._workers: list[ compilation_runner .CompilationRunnerStub] = self._worker_pool.get_currently_active() self._reward_stat_map = reward_stat_map @@ -61,11 +61,11 @@ def __init__( # We remove this activity from the critical path by running it concurrently # with the training phase - i.e. whatever happens between successive data # collection calls. Subsequent runs will wait for these to finish. - self._reset_workers: Optional[concurrent.futures.Future] = None - self._current_futures: List[worker.WorkerFuture] = [] + self._reset_workers: concurrent.futures.Future | None = None + self._current_futures: list[worker.WorkerFuture] = [] self._pool = concurrent.futures.ThreadPoolExecutor() self._prefetch_pool = concurrent.futures.ThreadPoolExecutor() - self._next_sample: List[ + self._next_sample: list[ concurrent.futures.Future] = self._prefetch_next_sample() def _prefetch_next_sample(self): @@ -100,7 +100,7 @@ def _join_pending_jobs(self): time.time() - t1) def _schedule_jobs(self, policy: policy_saver.Policy, model_id: int, - sampled_modules: List[corpus.LoadedModuleSpec]) -> None: + sampled_modules: list[corpus.LoadedModuleSpec]) -> None: # by now, all the pending work, which was signaled to cancel, must've # finished self._join_pending_jobs() @@ -119,7 +119,7 @@ def _schedule_jobs(self, policy: policy_saver.Policy, model_id: int, def collect_data( self, policy: policy_saver.Policy, model_id: int - ) -> Tuple[Iterator[trajectory.Trajectory], Dict[str, Dict[str, float]]]: + ) -> tuple[Iterator[trajectory.Trajectory], dict[str, dict[str, float]]]: """Collect data for a given policy. Args: @@ -133,7 +133,7 @@ def collect_data( information is viewable in TensorBoard. """ time1 = time.time() - sampled_modules: List[corpus.LoadedModuleSpec] = [ + sampled_modules: list[corpus.LoadedModuleSpec] = [ s.result() for s in self._next_sample ] logging.info('resolving prefetched sample took: %d seconds', diff --git a/compiler_opt/rl/local_data_collector_test.py b/compiler_opt/rl/local_data_collector_test.py index 2d03d41d..95c42b7b 100644 --- a/compiler_opt/rl/local_data_collector_test.py +++ b/compiler_opt/rl/local_data_collector_test.py @@ -16,7 +16,6 @@ # pylint: disable=protected-access import collections import string -from typing import List, Tuple import tensorflow as tf from tf_agents.system import system_multiprocessing as multiprocessing @@ -30,9 +29,9 @@ from compiler_opt.rl import local_data_collector from compiler_opt.rl import policy_saver -_policy_str = 'policy'.encode(encoding='utf-8') +_policy_str = b'policy' -_mock_policy = policy_saver.Policy(output_spec=bytes(), policy=_policy_str) +_mock_policy = policy_saver.Policy(output_spec=b'', policy=_policy_str) def _get_sequence_example(feature_value): @@ -114,11 +113,11 @@ def collect_data(self, *args, **kwargs): class DeterministicSampler(corpus.Sampler): """A corpus sampler that returns modules in order, and can also be reset.""" - def __init__(self, module_specs: Tuple[corpus.ModuleSpec]): + def __init__(self, module_specs: tuple[corpus.ModuleSpec]): super().__init__(module_specs) self._cur_pos = 0 - def __call__(self, k: int, n: int = 20) -> List[corpus.ModuleSpec]: + def __call__(self, k: int, n: int = 20) -> list[corpus.ModuleSpec]: ret = [] for _ in range(k): ret.append(self._module_specs[self._cur_pos % len(self._module_specs)]) diff --git a/compiler_opt/rl/log_reader.py b/compiler_opt/rl/log_reader.py index 293ed991..3122cb37 100644 --- a/compiler_opt/rl/log_reader.py +++ b/compiler_opt/rl/log_reader.py @@ -61,7 +61,8 @@ import math from compiler_opt import type_map -from typing import Any, BinaryIO, Dict, Generator, List, Optional +from typing import Any, BinaryIO +from collections.abc import Generator import numpy as np import tensorflow as tf @@ -74,9 +75,9 @@ } -def create_tensorspec(d: Dict[str, Any]) -> tf.TensorSpec: +def create_tensorspec(d: dict[str, Any]) -> tf.TensorSpec: name: str = d['name'] - shape: List[int] = [int(e) for e in d['shape']] + shape: list[int] = [int(e) for e in d['shape']] element_type_str: str = d['type'] if element_type_str not in _element_type_name_to_dtype: raise ValueError(f'uknown type: {element_type_str}') @@ -136,21 +137,21 @@ def __getitem__(self, index: int): @dataclasses.dataclass(frozen=True) class _Header: - features: List[tf.TensorSpec] - score: Optional[tf.TensorSpec] + features: list[tf.TensorSpec] + score: tf.TensorSpec | None def _read_tensor(fs: BinaryIO, ts: tf.TensorSpec) -> LogReaderTensorValue: size = math.prod(ts.shape) * ctypes.sizeof(_dtype_to_ctype[ts.dtype]) data = fs.read(size) if len(data) != size: - raise IOError( + raise OSError( f'Expected to read a total of {size} bytes for tensors, got {len(data)}' ) return LogReaderTensorValue(ts, data) -def _read_header(f: BinaryIO) -> Optional[_Header]: +def _read_header(f: BinaryIO) -> _Header | None: header_raw = f.readline() if not header_raw: # This is the path taken by empty files @@ -165,8 +166,8 @@ def _read_header(f: BinaryIO) -> Optional[_Header]: class ObservationRecord: context: str observation_id: int - feature_values: List[LogReaderTensorValue] - score: Optional[LogReaderTensorValue] + feature_values: list[LogReaderTensorValue] + score: LogReaderTensorValue | None def _enumerate_log_from_stream( @@ -182,7 +183,7 @@ def _enumerate_log_from_stream( def expect_newline(): expected = f.readline().decode('utf-8') if '\n' != expected: - raise IOError(f'Expected newline in log stream, got {expected}') + raise OSError(f'Expected newline in log stream, got {expected}') while event_str := f.readline(): event = json.loads(event_str) @@ -196,7 +197,7 @@ def expect_newline(): if score_spec is not None: score_header = json.loads(f.readline()) if int(score_header['outcome']) != observation_id: - raise IOError(f'Expected observation ID {observation_id} \ + raise OSError(f'Expected observation ID {observation_id} \ got {score_header["outcome"]}') score = _read_tensor(f, score_spec) expect_newline() @@ -235,8 +236,8 @@ def _add_feature(se: tf.train.SequenceExample, spec: tf.TensorSpec, def read_log_as_sequence_examples( - fname: str) -> Dict[str, tf.train.SequenceExample]: - ret: Dict[str, tf.train.SequenceExample] = collections.defaultdict( + fname: str) -> dict[str, tf.train.SequenceExample]: + ret: dict[str, tf.train.SequenceExample] = collections.defaultdict( tf.train.SequenceExample) # a record is an observation: the features and score for one step. # the records are in time order diff --git a/compiler_opt/rl/policy_saver.py b/compiler_opt/rl/policy_saver.py index aa50f01f..33dc8f3e 100644 --- a/compiler_opt/rl/policy_saver.py +++ b/compiler_opt/rl/policy_saver.py @@ -24,8 +24,6 @@ from tf_agents.policies import policy_saver from tf_agents.typing import types as tf_agents_types -from typing import Dict, Tuple - OUTPUT_SIGNATURE = 'output_spec.json' TFLITE_MODEL_NAME = 'model.tflite' @@ -34,7 +32,7 @@ } -def _split_tensor_name(name: str) -> Tuple[str, int]: +def _split_tensor_name(name: str) -> tuple[str, int]: """Return tuple (op, port) with the op and int port for the tensor name.""" op_port = name.split(':', 2) if len(op_port) == 1: @@ -144,7 +142,7 @@ def from_filesystem(location: str): return Policy(output_spec=output_spec, policy=policy) -class PolicySaver(object): +class PolicySaver: """Object that saves policy and model config file required by inference. ```python @@ -153,13 +151,13 @@ class PolicySaver(object): ``` """ - def __init__(self, policy_dict: Dict[str, tf_policy.TFPolicy]): + def __init__(self, policy_dict: dict[str, tf_policy.TFPolicy]): """Initialize the PolicySaver object. Args: policy_dict: A dict mapping from policy name to policy. """ - self._policy_saver_dict: Dict[str, Tuple[ + self._policy_saver_dict: dict[str, tuple[ policy_saver.PolicySaver, tf_policy.TFPolicy]] = { policy_name: (policy_saver.PolicySaver( policy, batch_size=1, use_nest_path_signatures=False), policy @@ -189,8 +187,8 @@ def _write_output_signature( # First entry in output list is the decision (action) decision_spec = tf.nest.flatten(action_signature.action) if len(decision_spec) != 1: - raise ValueError(('Expected action decision to have 1 tensor, but ' - f'saw: {action_signature.action}')) + raise ValueError('Expected action decision to have 1 tensor, but ' + f'saw: {action_signature.action}') # Find the decision's tensor in the flattened output tensor list. sm_action_decision = ( diff --git a/compiler_opt/rl/problem_configuration.py b/compiler_opt/rl/problem_configuration.py index 844b84da..fd76e01a 100644 --- a/compiler_opt/rl/problem_configuration.py +++ b/compiler_opt/rl/problem_configuration.py @@ -68,7 +68,7 @@ import abc import gin -from typing import Callable, Dict, Iterable, Optional, Tuple +from collections.abc import Callable, Iterable import tensorflow as tf import tf_agents as tfa @@ -90,7 +90,7 @@ def get_env(self) -> env.MLGOEnvironmentBase: @abc.abstractmethod def get_signature_spec( - self) -> Tuple[types.NestedTensorSpec, types.NestedTensorSpec]: + self) -> tuple[types.NestedTensorSpec, types.NestedTensorSpec]: raise NotImplementedError @abc.abstractmethod @@ -110,13 +110,13 @@ def get_runner_type(self) -> 'type[compilation_runner.CompilationRunner]': # List of flags to add to clang compilation command. @gin.configurable(module='problem_config') - def flags_to_add(self, add_flags=()) -> Tuple[str, ...]: + def flags_to_add(self, add_flags=()) -> tuple[str, ...]: return add_flags # List of flags to remove from clang compilation command. The flag names # should match the actual flags provided to clang.' @gin.configurable(module='problem_config') - def flags_to_delete(self, delete_flags=()) -> Tuple[str, ...]: + def flags_to_delete(self, delete_flags=()) -> tuple[str, ...]: return delete_flags # List of flags to replace in the clang compilation command. The flag names @@ -128,5 +128,5 @@ def flags_to_delete(self, delete_flags=()) -> Tuple[str, ...]: # } # return replace_flags @gin.configurable(module='problem_config') - def flags_to_replace(self, replace_flags=None) -> Optional[Dict[str, str]]: + def flags_to_replace(self, replace_flags=None) -> dict[str, str] | None: return replace_flags diff --git a/compiler_opt/rl/random_net_distillation.py b/compiler_opt/rl/random_net_distillation.py index ca8ef33c..b46ded27 100644 --- a/compiler_opt/rl/random_net_distillation.py +++ b/compiler_opt/rl/random_net_distillation.py @@ -18,7 +18,7 @@ @gin.configurable -class RandomNetworkDistillation(): +class RandomNetworkDistillation: """The Random Network Distillation class.""" def __init__(self, diff --git a/compiler_opt/rl/regalloc/regalloc_network.py b/compiler_opt/rl/regalloc/regalloc_network.py index 98c77de4..a4590409 100644 --- a/compiler_opt/rl/regalloc/regalloc_network.py +++ b/compiler_opt/rl/regalloc/regalloc_network.py @@ -13,7 +13,8 @@ # limitations under the License. """Actor network for Register Allocation.""" -from typing import Optional, Sequence, Callable, Any +from typing import Any +from collections.abc import Sequence, Callable import gin import tensorflow as tf @@ -67,14 +68,14 @@ def __init__( self, input_tensor_spec: types.NestedTensorSpec, output_tensor_spec: types.NestedTensorSpec, - preprocessing_layers: Optional[types.NestedLayer] = None, - preprocessing_combiner: Optional[tf.keras.layers.Layer] = None, - conv_layer_params: Optional[Sequence[Any]] = None, - fc_layer_params: Optional[Sequence[int]] = (200, 100), - dropout_layer_params: Optional[Sequence[float]] = None, + preprocessing_layers: types.NestedLayer | None = None, + preprocessing_combiner: tf.keras.layers.Layer | None = None, + conv_layer_params: Sequence[Any] | None = None, + fc_layer_params: Sequence[int] | None = (200, 100), + dropout_layer_params: Sequence[float] | None = None, activation_fn: Callable[[types.Tensor], types.Tensor] = tf.keras.activations.relu, - kernel_initializer: Optional[tf.keras.initializers.Initializer] = None, + kernel_initializer: tf.keras.initializers.Initializer | None = None, batch_squash: bool = True, dtype: tf.DType = tf.float32, name: str = 'RegAllocNetwork'): diff --git a/compiler_opt/rl/regalloc/regalloc_runner.py b/compiler_opt/rl/regalloc/regalloc_runner.py index 673655d3..799a2505 100644 --- a/compiler_opt/rl/regalloc/regalloc_runner.py +++ b/compiler_opt/rl/regalloc/regalloc_runner.py @@ -15,7 +15,6 @@ import os import tempfile -from typing import Dict, Tuple import gin import tensorflow as tf @@ -42,7 +41,7 @@ class RegAllocRunner(compilation_runner.CompilationRunner): def compile_fn( self, command_line: corpus.FullyQualifiedCmdLine, tf_policy_path: str, reward_only: bool, - workdir: str) -> Dict[str, Tuple[tf.train.SequenceExample, float]]: + workdir: str) -> dict[str, tuple[tf.train.SequenceExample, float]]: """Run the compiler for the given IR file under the given policy. Args: diff --git a/compiler_opt/rl/regalloc_priority/regalloc_priority_runner.py b/compiler_opt/rl/regalloc_priority/regalloc_priority_runner.py index 587eb730..8b7be240 100644 --- a/compiler_opt/rl/regalloc_priority/regalloc_priority_runner.py +++ b/compiler_opt/rl/regalloc_priority/regalloc_priority_runner.py @@ -18,7 +18,6 @@ import os import tempfile -from typing import Dict, Optional, Tuple from compiler_opt.rl import compilation_runner from compiler_opt.rl import log_reader @@ -29,10 +28,10 @@ class RegAllocPriorityRunner(compilation_runner.CompilationRunner): """Class for collecting data for regalloc-priority-prediction.""" def _compile_fn( - self, file_paths: Tuple[str, ...], tf_policy_path: str, reward_only: bool, - workdir: str, cancellation_manager: Optional[ - compilation_runner.WorkerCancellationManager] - ) -> Dict[str, Tuple[tf.train.SequenceExample, float]]: + self, file_paths: tuple[str, ...], tf_policy_path: str, reward_only: bool, + workdir: str, + cancellation_manager: compilation_runner.WorkerCancellationManager | None + ) -> dict[str, tuple[tf.train.SequenceExample, float]]: file_paths = file_paths[0].replace('.bc', '') working_dir = tempfile.mkdtemp(dir=workdir) diff --git a/compiler_opt/rl/registry.py b/compiler_opt/rl/registry.py index 2437e6bd..fda7d887 100644 --- a/compiler_opt/rl/registry.py +++ b/compiler_opt/rl/registry.py @@ -21,8 +21,6 @@ See also problem_configuration.py """ -from typing import Type - import gin import tf_agents as tfa @@ -38,6 +36,6 @@ @gin.configurable(module='config_registry') def get_configuration( - implementation: Type[problem_configuration.ProblemConfiguration] + implementation: type[problem_configuration.ProblemConfiguration] ) -> problem_configuration.ProblemConfiguration: return implementation() diff --git a/compiler_opt/rl/train_bc.py b/compiler_opt/rl/train_bc.py index f0efa020..7f763a98 100644 --- a/compiler_opt/rl/train_bc.py +++ b/compiler_opt/rl/train_bc.py @@ -37,8 +37,6 @@ from tf_agents.policies import tf_policy from tensorboard.plugins.hparams import api as hp -from typing import Dict - _ROOT_DIR = flags.DEFINE_string( 'root_dir', os.getenv('TEST_UNDECLARED_OUTPUTS_DIR'), 'Root directory for writing logs/summaries/checkpoints.') @@ -72,7 +70,7 @@ def train_eval(agent_config_type=agent_config.BCAgentConfig, agent: tf_policy.TFAgent = agent_config.create_agent( agent_cfg, preprocessing_layer_creator=preprocessing_layer_creator) llvm_trainer = trainer.Trainer(root_dir=root_dir, agent=agent) - policy_dict: Dict[str, tf_policy.TFPolicy] = { + policy_dict: dict[str, tf_policy.TFPolicy] = { 'saved_policy': agent.policy, 'saved_collect_policy': agent.collect_policy, } diff --git a/compiler_opt/rl/train_locally.py b/compiler_opt/rl/train_locally.py index dc9f377b..f26ca0f4 100644 --- a/compiler_opt/rl/train_locally.py +++ b/compiler_opt/rl/train_locally.py @@ -18,7 +18,6 @@ import json import os import time -from typing import List from absl import app from absl import flags @@ -115,7 +114,7 @@ def train_eval(worker_manager_class=LocalWorkerPoolManager, batch_size=batch_size, train_sequence_length=train_sequence_length) - def sequence_example_iterator_fn(seq_ex: List[str]): + def sequence_example_iterator_fn(seq_ex: list[str]): return iter(dataset_fn(seq_ex).repeat().prefetch(tf.data.AUTOTUNE)) reward_stat_map = collections.defaultdict(lambda: None) diff --git a/compiler_opt/rl/trainer.py b/compiler_opt/rl/trainer.py index 4a10ba8e..04959359 100644 --- a/compiler_opt/rl/trainer.py +++ b/compiler_opt/rl/trainer.py @@ -25,11 +25,10 @@ from tf_agents import trajectories from tf_agents.utils import common as common_utils -from typing import Optional @gin.configurable -class Trainer(object): +class Trainer: """Object that trains LLVM policy. After initialization, the function 'train' can be called multiple times to @@ -46,9 +45,9 @@ def __init__( self, root_dir: str, agent: tf_agent.TFAgent, - random_network_distillation: Optional[ - random_net_distillation.RandomNetworkDistillation] = None, - warmstart_policy_dir: Optional[str] = None, + random_network_distillation: random_net_distillation + .RandomNetworkDistillation | None = None, + warmstart_policy_dir: str | None = None, # Params for summaries and logging checkpoint_interval=10000, log_interval=100, @@ -216,8 +215,8 @@ def train(self, dataset_iter, monitor_dict, num_iterations: int): experience = next(dataset_iter) except StopIteration: logging.warning( - ('Warning: skip training because do not have enough data to fill ' - 'in a batch, consider increase data or reduce batch size.')) + 'Warning: skip training because do not have enough data to fill ' + 'in a batch, consider increase data or reduce batch size.') break # random network distillation for intrinsic reward generation diff --git a/compiler_opt/tools/combine_tfa_policies_lib.py b/compiler_opt/tools/combine_tfa_policies_lib.py index d8f37f78..effc037a 100644 --- a/compiler_opt/tools/combine_tfa_policies_lib.py +++ b/compiler_opt/tools/combine_tfa_policies_lib.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. """Combines two tf-agent policies with the given state and action spec.""" -from typing import Dict, Optional, Tuple import tensorflow as tf import hashlib @@ -27,7 +26,7 @@ class CombinedTFPolicy(tf_agents.policies.TFPolicy): """Policy which combines two target policies.""" - def __init__(self, *args, tf_policies: Dict[str, tf_agents.policies.TFPolicy], + def __init__(self, *args, tf_policies: dict[str, tf_agents.policies.TFPolicy], **kwargs): super().__init__(*args, **kwargs) @@ -61,7 +60,7 @@ def __init__(self, *args, tf_policies: Dict[str, tf_agents.policies.TFPolicy], def _process_observation( self, observation: types.NestedSpecTensorOrArray - ) -> Tuple[types.NestedSpecTensorOrArray, types.TensorOrArray]: + ) -> tuple[types.NestedSpecTensorOrArray, types.TensorOrArray]: assert "model_selector" in self.sorted_keys high_low_tensor = self.high_low_tensor for name in self.sorted_keys: @@ -83,7 +82,7 @@ def _process_observation( def _action(self, time_step: ts.TimeStep, policy_state: types.NestedTensorSpec, - seed: Optional[types.Seed] = None) -> policy_step.PolicyStep: + seed: types.Seed | None = None) -> policy_step.PolicyStep: new_observation = time_step.observation new_observation, switch_tensor = self._process_observation(new_observation) updated_step = ts.TimeStep( diff --git a/compiler_opt/tools/feature_importance_graphs.py b/compiler_opt/tools/feature_importance_graphs.py index 6c77c3da..816a9652 100644 --- a/compiler_opt/tools/feature_importance_graphs.py +++ b/compiler_opt/tools/feature_importance_graphs.py @@ -19,9 +19,7 @@ import shap import json -from typing import Dict, List, Union, Optional - -DataType = Dict[str, Union[numpy.typing.ArrayLike, List[str]]] +DataType = dict[str, numpy.typing.ArrayLike | list[str]] def load_shap_values(file_name: str) -> DataType: @@ -49,7 +47,7 @@ def init_shap_for_notebook(): shap.initjs() -def graph_individual_example(data: DataType, index: Optional[int]): +def graph_individual_example(data: DataType, index: int | None): """Creates a force plot for an example Args: diff --git a/compiler_opt/tools/feature_importance_utils.py b/compiler_opt/tools/feature_importance_utils.py index 84e36b9a..d11da693 100644 --- a/compiler_opt/tools/feature_importance_utils.py +++ b/compiler_opt/tools/feature_importance_utils.py @@ -23,9 +23,9 @@ import numpy.typing from tf_agents.typing import types -from typing import Callable, Dict, Tuple +from collections.abc import Callable -SignatureType = Dict[str, Tuple[numpy.typing.ArrayLike, tf.dtypes.DType]] +SignatureType = dict[str, tuple[numpy.typing.ArrayLike, tf.dtypes.DType]] def get_input_signature(example_input: types.NestedTensorSpec) -> SignatureType: diff --git a/compiler_opt/tools/generate_default_trace.py b/compiler_opt/tools/generate_default_trace.py index a6c343c2..c05648ab 100644 --- a/compiler_opt/tools/generate_default_trace.py +++ b/compiler_opt/tools/generate_default_trace.py @@ -17,7 +17,6 @@ import contextlib import functools import re -from typing import Dict, List, Optional, Tuple from absl import app from absl import flags @@ -75,7 +74,7 @@ class FilteringWorker(worker.Worker): key_filter: regex filter for key names to include, or None to include all. """ - def __init__(self, policy_path: Optional[str], key_filter: Optional[str], + def __init__(self, policy_path: str | None, key_filter: str | None, runner_type: 'type[compilation_runner.CompilationRunner]', runner_kwargs): self._policy_path = policy_path @@ -86,7 +85,7 @@ def __init__(self, policy_path: Optional[str], key_filter: Optional[str], def compile_and_filter( self, loaded_module_spec: corpus.LoadedModuleSpec - ) -> Tuple[str, List[str], Dict[str, compilation_runner.RewardStat]]: + ) -> tuple[str, list[str], dict[str, compilation_runner.RewardStat]]: data = self._runner.collect_data( loaded_module_spec=loaded_module_spec, policy=self._policy, @@ -184,15 +183,15 @@ def generate_trace( if performance_writer: for key, value in reward_stat.items(): performance_writer.write( - (f'{module_name},{key},{value.default_reward},' - f'{value.moving_average_reward}\n')) + f'{module_name},{key},{value.default_reward},' + f'{value.moving_average_reward}\n') logging.info('%d success, %d failed out of %d', total_successful_examples, total_failed_examples, total_work) - print((f'{total_successful_examples} of {len(corpus_elements)} modules ' - f'succeeded, and {total_training_examples} trainining examples ' - 'written')) + print(f'{total_successful_examples} of {len(corpus_elements)} modules ' + f'succeeded, and {total_training_examples} trainining examples ' + 'written') if __name__ == '__main__': diff --git a/compiler_opt/tools/generate_vocab.py b/compiler_opt/tools/generate_vocab.py index 37657032..180fff17 100644 --- a/compiler_opt/tools/generate_vocab.py +++ b/compiler_opt/tools/generate_vocab.py @@ -19,7 +19,7 @@ import math import multiprocessing as mp import os -from typing import Callable, Dict, List, Iterable +from collections.abc import Callable, Iterable from absl import app from absl import flags @@ -53,7 +53,7 @@ def _get_feature_info( serialized_proto: tf.Tensor, - features_to_not_process: Iterable[str]) -> Dict[str, tf.io.RaggedFeature]: + features_to_not_process: Iterable[str]) -> dict[str, tf.io.RaggedFeature]: """Provides feature information by analyzing a single serialized example. Args: @@ -82,8 +82,8 @@ def _get_feature_info( def create_tfrecord_parser_fn( - sequence_features: Dict[str, tf.io.RaggedFeature] -) -> Callable[[str], List[tf.Tensor]]: + sequence_features: dict[str, tf.io.RaggedFeature] +) -> Callable[[str], list[tf.Tensor]]: """Create a parser function for reading serialized tf.data.TFRecordDataset. Args: diff --git a/compiler_opt/type_map.py b/compiler_opt/type_map.py index b35e1772..6e872a1f 100644 --- a/compiler_opt/type_map.py +++ b/compiler_opt/type_map.py @@ -13,7 +13,7 @@ # limitations under the License. """Map between tf, ctypes, and string names for scalar types.""" import ctypes -from typing import List, Tuple, Union +from typing import Union import tensorflow as tf ScalarCType = Union[ # pylint: disable=invalid-name @@ -21,7 +21,7 @@ 'type[ctypes.c_int16]', 'type[ctypes.c_uint16]', 'type[ctypes.c_int32]', 'type[ctypes.c_uint32]', 'type[ctypes.c_int64]', 'type[ctypes.c_uint64]'] -TYPE_ASSOCIATIONS: List[Tuple[str, ScalarCType, +TYPE_ASSOCIATIONS: list[tuple[str, ScalarCType, tf.DType]] = [ ('float', ctypes.c_float, tf.float32), ('double', ctypes.c_double, tf.float64), diff --git a/pyproject.toml b/pyproject.toml index 2ff17d88..1f7e2fc4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,8 +1,9 @@ [tool.ruff] line-length = 103 -lint.select = [ "C40", "C9", "E", "F", "PERF", "W", "YTT" ] +lint.select = [ "C40", "C9", "E", "F", "PERF", "UP", "W", "YTT" ] lint.ignore = [ "E722", "E731", "F401", "PERF203" ] lint.mccabe.max-complexity = 18 +target-version = "py310" [tool.pytest.ini_options] # DO always create a tracking issue when allow-listing warnings