diff --git a/pz_risk/common/__init__.py b/pz_risk/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pz_risk/common/envs/__init__.py b/pz_risk/common/envs/__init__.py new file mode 100644 index 0000000..cbfd154 --- /dev/null +++ b/pz_risk/common/envs/__init__.py @@ -0,0 +1,9 @@ +from stable_baselines3.common.envs.bit_flipping_env import BitFlippingEnv +from stable_baselines3.common.envs.identity_env import ( + FakeImageEnv, + IdentityEnv, + IdentityEnvBox, + IdentityEnvMultiBinary, + IdentityEnvMultiDiscrete, +) +from stable_baselines3.common.envs.multi_input_envs import SimpleMultiObsEnv diff --git a/pz_risk/common/envs/bit_flipping_env.py b/pz_risk/common/envs/bit_flipping_env.py new file mode 100644 index 0000000..ba9b220 --- /dev/null +++ b/pz_risk/common/envs/bit_flipping_env.py @@ -0,0 +1,204 @@ +from collections import OrderedDict +from typing import Any, Dict, Optional, Union + +import numpy as np +from gym import GoalEnv, spaces +from gym.envs.registration import EnvSpec + +from stable_baselines3.common.type_aliases import GymStepReturn + + +class BitFlippingEnv(GoalEnv): + """ + Simple bit flipping env, useful to test HER. + The goal is to flip all the bits to get a vector of ones. + In the continuous variant, if the ith action component has a value > 0, + then the ith bit will be flipped. + + :param n_bits: Number of bits to flip + :param continuous: Whether to use the continuous actions version or not, + by default, it uses the discrete one + :param max_steps: Max number of steps, by default, equal to n_bits + :param discrete_obs_space: Whether to use the discrete observation + version or not, by default, it uses the ``MultiBinary`` one + :param image_obs_space: Use image as input instead of the ``MultiBinary`` one. + :param channel_first: Whether to use channel-first or last image. + """ + + spec = EnvSpec("BitFlippingEnv-v0") + + def __init__( + self, + n_bits: int = 10, + continuous: bool = False, + max_steps: Optional[int] = None, + discrete_obs_space: bool = False, + image_obs_space: bool = False, + channel_first: bool = True, + ): + super(BitFlippingEnv, self).__init__() + # Shape of the observation when using image space + self.image_shape = (1, 36, 36) if channel_first else (36, 36, 1) + # The achieved goal is determined by the current state + # here, it is a special where they are equal + if discrete_obs_space: + # In the discrete case, the agent act on the binary + # representation of the observation + self.observation_space = spaces.Dict( + { + "observation": spaces.Discrete(2 ** n_bits), + "achieved_goal": spaces.Discrete(2 ** n_bits), + "desired_goal": spaces.Discrete(2 ** n_bits), + } + ) + elif image_obs_space: + # When using image as input, + # one image contains the bits 0 -> 0, 1 -> 255 + # and the rest is filled with zeros + self.observation_space = spaces.Dict( + { + "observation": spaces.Box( + low=0, + high=255, + shape=self.image_shape, + dtype=np.uint8, + ), + "achieved_goal": spaces.Box( + low=0, + high=255, + shape=self.image_shape, + dtype=np.uint8, + ), + "desired_goal": spaces.Box( + low=0, + high=255, + shape=self.image_shape, + dtype=np.uint8, + ), + } + ) + else: + self.observation_space = spaces.Dict( + { + "observation": spaces.MultiBinary(n_bits), + "achieved_goal": spaces.MultiBinary(n_bits), + "desired_goal": spaces.MultiBinary(n_bits), + } + ) + + self.obs_space = spaces.MultiBinary(n_bits) + + if continuous: + self.action_space = spaces.Box(-1, 1, shape=(n_bits,), dtype=np.float32) + else: + self.action_space = spaces.Discrete(n_bits) + self.continuous = continuous + self.discrete_obs_space = discrete_obs_space + self.image_obs_space = image_obs_space + self.state = None + self.desired_goal = np.ones((n_bits,)) + if max_steps is None: + max_steps = n_bits + self.max_steps = max_steps + self.current_step = 0 + + def seed(self, seed: int) -> None: + self.obs_space.seed(seed) + + def convert_if_needed(self, state: np.ndarray) -> Union[int, np.ndarray]: + """ + Convert to discrete space if needed. + + :param state: + :return: + """ + if self.discrete_obs_space: + # The internal state is the binary representation of the + # observed one + return int(sum([state[i] * 2 ** i for i in range(len(state))])) + + if self.image_obs_space: + size = np.prod(self.image_shape) + image = np.concatenate((state * 255, np.zeros(size - len(state), dtype=np.uint8))) + return image.reshape(self.image_shape).astype(np.uint8) + return state + + def convert_to_bit_vector(self, state: Union[int, np.ndarray], batch_size: int) -> np.ndarray: + """ + Convert to bit vector if needed. + + :param state: + :param batch_size: + :return: + """ + # Convert back to bit vector + if isinstance(state, int): + state = np.array(state).reshape(batch_size, -1) + # Convert to binary representation + state = (((state[:, :] & (1 << np.arange(len(self.state))))) > 0).astype(int) + elif self.image_obs_space: + state = state.reshape(batch_size, -1)[:, : len(self.state)] / 255 + else: + state = np.array(state).reshape(batch_size, -1) + + return state + + def _get_obs(self) -> Dict[str, Union[int, np.ndarray]]: + """ + Helper to create the observation. + + :return: The current observation. + """ + return OrderedDict( + [ + ("observation", self.convert_if_needed(self.state.copy())), + ("achieved_goal", self.convert_if_needed(self.state.copy())), + ("desired_goal", self.convert_if_needed(self.desired_goal.copy())), + ] + ) + + def reset(self) -> Dict[str, Union[int, np.ndarray]]: + self.current_step = 0 + self.state = self.obs_space.sample() + return self._get_obs() + + def step(self, action: Union[np.ndarray, int]) -> GymStepReturn: + if self.continuous: + self.state[action > 0] = 1 - self.state[action > 0] + else: + self.state[action] = 1 - self.state[action] + obs = self._get_obs() + reward = float(self.compute_reward(obs["achieved_goal"], obs["desired_goal"], None)) + done = reward == 0 + self.current_step += 1 + # Episode terminate when we reached the goal or the max number of steps + info = {"is_success": done} + done = done or self.current_step >= self.max_steps + return obs, reward, done, info + + def compute_reward( + self, achieved_goal: Union[int, np.ndarray], desired_goal: Union[int, np.ndarray], _info: Optional[Dict[str, Any]] + ) -> np.float32: + # As we are using a vectorized version, we need to keep track of the `batch_size` + if isinstance(achieved_goal, int): + batch_size = 1 + elif self.image_obs_space: + batch_size = achieved_goal.shape[0] if len(achieved_goal.shape) > 3 else 1 + else: + batch_size = achieved_goal.shape[0] if len(achieved_goal.shape) > 1 else 1 + + desired_goal = self.convert_to_bit_vector(desired_goal, batch_size) + achieved_goal = self.convert_to_bit_vector(achieved_goal, batch_size) + + # Deceptive reward: it is positive only when the goal is achieved + # Here we are using a vectorized version + distance = np.linalg.norm(achieved_goal - desired_goal, axis=-1) + return -(distance > 0).astype(np.float32) + + def render(self, mode: str = "human") -> Optional[np.ndarray]: + if mode == "rgb_array": + return self.state.copy() + print(self.state) + + def close(self) -> None: + pass diff --git a/pz_risk/common/envs/identity_env.py b/pz_risk/common/envs/identity_env.py new file mode 100644 index 0000000..aef85b1 --- /dev/null +++ b/pz_risk/common/envs/identity_env.py @@ -0,0 +1,150 @@ +from typing import Optional, Union + +import numpy as np +from gym import Env, Space +from gym.spaces import Box, Discrete, MultiBinary, MultiDiscrete + +from stable_baselines3.common.type_aliases import GymObs, GymStepReturn + + +class IdentityEnv(Env): + def __init__(self, dim: Optional[int] = None, space: Optional[Space] = None, ep_length: int = 100): + """ + Identity environment for testing purposes + + :param dim: the size of the action and observation dimension you want + to learn. Provide at most one of ``dim`` and ``space``. If both are + None, then initialization proceeds with ``dim=1`` and ``space=None``. + :param space: the action and observation space. Provide at most one of + ``dim`` and ``space``. + :param ep_length: the length of each episode in timesteps + """ + if space is None: + if dim is None: + dim = 1 + space = Discrete(dim) + else: + assert dim is None, "arguments for both 'dim' and 'space' provided: at most one allowed" + + self.action_space = self.observation_space = space + self.ep_length = ep_length + self.current_step = 0 + self.num_resets = -1 # Becomes 0 after __init__ exits. + self.reset() + + def reset(self) -> GymObs: + self.current_step = 0 + self.num_resets += 1 + self._choose_next_state() + return self.state + + def step(self, action: Union[int, np.ndarray]) -> GymStepReturn: + reward = self._get_reward(action) + self._choose_next_state() + self.current_step += 1 + done = self.current_step >= self.ep_length + return self.state, reward, done, {} + + def _choose_next_state(self) -> None: + self.state = self.action_space.sample() + + def _get_reward(self, action: Union[int, np.ndarray]) -> float: + return 1.0 if np.all(self.state == action) else 0.0 + + def render(self, mode: str = "human") -> None: + pass + + +class IdentityEnvBox(IdentityEnv): + def __init__(self, low: float = -1.0, high: float = 1.0, eps: float = 0.05, ep_length: int = 100): + """ + Identity environment for testing purposes + + :param low: the lower bound of the box dim + :param high: the upper bound of the box dim + :param eps: the epsilon bound for correct value + :param ep_length: the length of each episode in timesteps + """ + space = Box(low=low, high=high, shape=(1,), dtype=np.float32) + super().__init__(ep_length=ep_length, space=space) + self.eps = eps + + def step(self, action: np.ndarray) -> GymStepReturn: + reward = self._get_reward(action) + self._choose_next_state() + self.current_step += 1 + done = self.current_step >= self.ep_length + return self.state, reward, done, {} + + def _get_reward(self, action: np.ndarray) -> float: + return 1.0 if (self.state - self.eps) <= action <= (self.state + self.eps) else 0.0 + + +class IdentityEnvMultiDiscrete(IdentityEnv): + def __init__(self, dim: int = 1, ep_length: int = 100): + """ + Identity environment for testing purposes + + :param dim: the size of the dimensions you want to learn + :param ep_length: the length of each episode in timesteps + """ + space = MultiDiscrete([dim, dim]) + super().__init__(ep_length=ep_length, space=space) + + +class IdentityEnvMultiBinary(IdentityEnv): + def __init__(self, dim: int = 1, ep_length: int = 100): + """ + Identity environment for testing purposes + + :param dim: the size of the dimensions you want to learn + :param ep_length: the length of each episode in timesteps + """ + space = MultiBinary(dim) + super().__init__(ep_length=ep_length, space=space) + + +class FakeImageEnv(Env): + """ + Fake image environment for testing purposes, it mimics Atari games. + + :param action_dim: Number of discrete actions + :param screen_height: Height of the image + :param screen_width: Width of the image + :param n_channels: Number of color channels + :param discrete: Create discrete action space instead of continuous + :param channel_first: Put channels on first axis instead of last + """ + + def __init__( + self, + action_dim: int = 6, + screen_height: int = 84, + screen_width: int = 84, + n_channels: int = 1, + discrete: bool = True, + channel_first: bool = False, + ): + self.observation_shape = (screen_height, screen_width, n_channels) + if channel_first: + self.observation_shape = (n_channels, screen_height, screen_width) + self.observation_space = Box(low=0, high=255, shape=self.observation_shape, dtype=np.uint8) + if discrete: + self.action_space = Discrete(action_dim) + else: + self.action_space = Box(low=-1, high=1, shape=(5,), dtype=np.float32) + self.ep_length = 10 + self.current_step = 0 + + def reset(self) -> np.ndarray: + self.current_step = 0 + return self.observation_space.sample() + + def step(self, action: Union[np.ndarray, int]) -> GymStepReturn: + reward = 0.0 + self.current_step += 1 + done = self.current_step >= self.ep_length + return self.observation_space.sample(), reward, done, {} + + def render(self, mode: str = "human") -> None: + pass diff --git a/pz_risk/common/envs/multi_input_envs.py b/pz_risk/common/envs/multi_input_envs.py new file mode 100644 index 0000000..0a8aec6 --- /dev/null +++ b/pz_risk/common/envs/multi_input_envs.py @@ -0,0 +1,180 @@ +from typing import Dict, Union + +import gym +import numpy as np + +from stable_baselines3.common.type_aliases import GymStepReturn + + +class SimpleMultiObsEnv(gym.Env): + """ + Base class for GridWorld-based MultiObs Environments 4x4 grid world. + + .. code-block:: text + + ____________ + | 0 1 2 3| + | 4|¯5¯¯6¯| 7| + | 8|_9_10_|11| + |12 13 14 15| + ¯¯¯¯¯¯¯¯¯¯¯¯¯¯ + + start is 0 + states 5, 6, 9, and 10 are blocked + goal is 15 + actions are = [left, down, right, up] + + simple linear state env of 15 states but encoded with a vector and an image observation: + each column is represented by a random vector and each row is + represented by a random image, both sampled once at creation time. + + :param num_col: Number of columns in the grid + :param num_row: Number of rows in the grid + :param random_start: If true, agent starts in random position + :param channel_last: If true, the image will be channel last, else it will be channel first + """ + + def __init__( + self, + num_col: int = 4, + num_row: int = 4, + random_start: bool = True, + discrete_actions: bool = True, + channel_last: bool = True, + ): + super(SimpleMultiObsEnv, self).__init__() + + self.vector_size = 5 + if channel_last: + self.img_size = [64, 64, 1] + else: + self.img_size = [1, 64, 64] + + self.random_start = random_start + self.discrete_actions = discrete_actions + if discrete_actions: + self.action_space = gym.spaces.Discrete(4) + else: + self.action_space = gym.spaces.Box(0, 1, (4,)) + + self.observation_space = gym.spaces.Dict( + spaces={ + "vec": gym.spaces.Box(0, 1, (self.vector_size,), dtype=np.float64), + "img": gym.spaces.Box(0, 255, self.img_size, dtype=np.uint8), + } + ) + self.count = 0 + # Timeout + self.max_count = 100 + self.log = "" + self.state = 0 + self.action2str = ["left", "down", "right", "up"] + self.init_possible_transitions() + + self.num_col = num_col + self.state_mapping = [] + self.init_state_mapping(num_col, num_row) + + self.max_state = len(self.state_mapping) - 1 + + def init_state_mapping(self, num_col: int, num_row: int) -> None: + """ + Initializes the state_mapping array which holds the observation values for each state + + :param num_col: Number of columns. + :param num_row: Number of rows. + """ + # Each column is represented by a random vector + col_vecs = np.random.random((num_col, self.vector_size)) + # Each row is represented by a random image + row_imgs = np.random.randint(0, 255, (num_row, 64, 64), dtype=np.uint8) + + for i in range(num_col): + for j in range(num_row): + self.state_mapping.append({"vec": col_vecs[i], "img": row_imgs[j].reshape(self.img_size)}) + + def get_state_mapping(self) -> Dict[str, np.ndarray]: + """ + Uses the state to get the observation mapping. + + :return: observation dict {'vec': ..., 'img': ...} + """ + return self.state_mapping[self.state] + + def init_possible_transitions(self) -> None: + """ + Initializes the transitions of the environment + The environment exploits the cardinal directions of the grid by noting that + they correspond to simple addition and subtraction from the cell id within the grid + + - up => means moving up a row => means subtracting the length of a column + - down => means moving down a row => means adding the length of a column + - left => means moving left by one => means subtracting 1 + - right => means moving right by one => means adding 1 + + Thus one only needs to specify in which states each action is possible + in order to define the transitions of the environment + """ + self.left_possible = [1, 2, 3, 13, 14, 15] + self.down_possible = [0, 4, 8, 3, 7, 11] + self.right_possible = [0, 1, 2, 12, 13, 14] + self.up_possible = [4, 8, 12, 7, 11, 15] + + def step(self, action: Union[int, float, np.ndarray]) -> GymStepReturn: + """ + Run one timestep of the environment's dynamics. When end of + episode is reached, you are responsible for calling `reset()` + to reset this environment's state. + Accepts an action and returns a tuple (observation, reward, done, info). + + :param action: + :return: tuple (observation, reward, done, info). + """ + if not self.discrete_actions: + action = np.argmax(action) + else: + action = int(action) + + self.count += 1 + + prev_state = self.state + + reward = -0.1 + # define state transition + if self.state in self.left_possible and action == 0: # left + self.state -= 1 + elif self.state in self.down_possible and action == 1: # down + self.state += self.num_col + elif self.state in self.right_possible and action == 2: # right + self.state += 1 + elif self.state in self.up_possible and action == 3: # up + self.state -= self.num_col + + got_to_end = self.state == self.max_state + reward = 1 if got_to_end else reward + done = self.count > self.max_count or got_to_end + + self.log = f"Went {self.action2str[action]} in state {prev_state}, got to state {self.state}" + + return self.get_state_mapping(), reward, done, {"got_to_end": got_to_end} + + def render(self, mode: str = "human") -> None: + """ + Prints the log of the environment. + + :param mode: + """ + print(self.log) + + def reset(self) -> Dict[str, np.ndarray]: + """ + Resets the environment state and step count and returns reset observation. + + :return: observation dict {'vec': ..., 'img': ...} + """ + self.count = 0 + if not self.random_start: + self.state = 0 + else: + self.state = np.random.randint(0, self.max_state) + return self.state_mapping[self.state] diff --git a/pz_risk/common/logger.py b/pz_risk/common/logger.py new file mode 100644 index 0000000..06b8693 --- /dev/null +++ b/pz_risk/common/logger.py @@ -0,0 +1,615 @@ +import datetime +import json +import os +import sys +import tempfile +import warnings +from collections import defaultdict +from typing import Any, Dict, List, Optional, Sequence, TextIO, Tuple, Union + +import numpy as np +import pandas +import torch as th +from matplotlib import pyplot as plt + +try: + from torch.utils.tensorboard import SummaryWriter +except ImportError: + SummaryWriter = None + +DEBUG = 10 +INFO = 20 +WARN = 30 +ERROR = 40 +DISABLED = 50 + + +class Video(object): + """ + Video data class storing the video frames and the frame per seconds + + :param frames: frames to create the video from + :param fps: frames per second + """ + + def __init__(self, frames: th.Tensor, fps: Union[float, int]): + self.frames = frames + self.fps = fps + + +class Figure(object): + """ + Figure data class storing a matplotlib figure and whether to close the figure after logging it + + :param figure: figure to log + :param close: if true, close the figure after logging it + """ + + def __init__(self, figure: plt.figure, close: bool): + self.figure = figure + self.close = close + + +class Image(object): + """ + Image data class storing an image and data format + + :param image: image to log + :param dataformats: Image data format specification of the form NCHW, NHWC, CHW, HWC, HW, WH, etc. + More info in add_image method doc at https://pytorch.org/docs/stable/tensorboard.html + Gym envs normally use 'HWC' (channel last) + """ + + def __init__(self, image: Union[th.Tensor, np.ndarray, str], dataformats: str): + self.image = image + self.dataformats = dataformats + + +class FormatUnsupportedError(NotImplementedError): + def __init__(self, unsupported_formats: Sequence[str], value_description: str): + if len(unsupported_formats) > 1: + format_str = f"formats {', '.join(unsupported_formats)} are" + else: + format_str = f"format {unsupported_formats[0]} is" + super(FormatUnsupportedError, self).__init__( + f"The {format_str} not supported for the {value_description} value logged.\n" + f"You can exclude formats via the `exclude` parameter of the logger's `record` function." + ) + + +class KVWriter(object): + """ + Key Value writer + """ + + def write(self, key_values: Dict[str, Any], key_excluded: Dict[str, Union[str, Tuple[str, ...]]], step: int = 0) -> None: + """ + Write a dictionary to file + + :param key_values: + :param key_excluded: + :param step: + """ + raise NotImplementedError + + def close(self) -> None: + """ + Close owned resources + """ + raise NotImplementedError + + +class SeqWriter(object): + """ + sequence writer + """ + + def write_sequence(self, sequence: List) -> None: + """ + write_sequence an array to file + + :param sequence: + """ + raise NotImplementedError + + +class HumanOutputFormat(KVWriter, SeqWriter): + def __init__(self, filename_or_file: Union[str, TextIO]): + """ + log to a file, in a human readable format + + :param filename_or_file: the file to write the log to + """ + if isinstance(filename_or_file, str): + self.file = open(filename_or_file, "wt") + self.own_file = True + else: + assert hasattr(filename_or_file, "write"), f"Expected file or str, got {filename_or_file}" + self.file = filename_or_file + self.own_file = False + + def write(self, key_values: Dict, key_excluded: Dict, step: int = 0) -> None: + # Create strings for printing + key2str = {} + tag = None + for (key, value), (_, excluded) in zip(sorted(key_values.items()), sorted(key_excluded.items())): + + if excluded is not None and ("stdout" in excluded or "log" in excluded): + continue + + elif isinstance(value, Video): + raise FormatUnsupportedError(["stdout", "log"], "video") + + elif isinstance(value, Figure): + raise FormatUnsupportedError(["stdout", "log"], "figure") + + elif isinstance(value, Image): + raise FormatUnsupportedError(["stdout", "log"], "image") + + elif isinstance(value, float): + # Align left + value_str = f"{value:<8.3g}" + else: + value_str = str(value) + + if key.find("/") > 0: # Find tag and add it to the dict + tag = key[: key.find("/") + 1] + key2str[self._truncate(tag)] = "" + # Remove tag from key + if tag is not None and tag in key: + key = str(" " + key[len(tag) :]) + + key2str[self._truncate(key)] = self._truncate(value_str) + + # Find max widths + if len(key2str) == 0: + warnings.warn("Tried to write empty key-value dict") + return + else: + key_width = max(map(len, key2str.keys())) + val_width = max(map(len, key2str.values())) + + # Write out the data + dashes = "-" * (key_width + val_width + 7) + lines = [dashes] + for key, value in key2str.items(): + key_space = " " * (key_width - len(key)) + val_space = " " * (val_width - len(value)) + lines.append(f"| {key}{key_space} | {value}{val_space} |") + lines.append(dashes) + self.file.write("\n".join(lines) + "\n") + + # Flush the output to the file + self.file.flush() + + @classmethod + def _truncate(cls, string: str, max_length: int = 23) -> str: + return string[: max_length - 3] + "..." if len(string) > max_length else string + + def write_sequence(self, sequence: List) -> None: + sequence = list(sequence) + for i, elem in enumerate(sequence): + self.file.write(elem) + if i < len(sequence) - 1: # add space unless this is the last one + self.file.write(" ") + self.file.write("\n") + self.file.flush() + + def close(self) -> None: + """ + closes the file + """ + if self.own_file: + self.file.close() + + +def filter_excluded_keys( + key_values: Dict[str, Any], key_excluded: Dict[str, Union[str, Tuple[str, ...]]], _format: str +) -> Dict[str, Any]: + """ + Filters the keys specified by ``key_exclude`` for the specified format + + :param key_values: log dictionary to be filtered + :param key_excluded: keys to be excluded per format + :param _format: format for which this filter is run + :return: dict without the excluded keys + """ + + def is_excluded(key: str) -> bool: + return key in key_excluded and key_excluded[key] is not None and _format in key_excluded[key] + + return {key: value for key, value in key_values.items() if not is_excluded(key)} + + +class JSONOutputFormat(KVWriter): + def __init__(self, filename: str): + """ + log to a file, in the JSON format + + :param filename: the file to write the log to + """ + self.file = open(filename, "wt") + + def write(self, key_values: Dict[str, Any], key_excluded: Dict[str, Union[str, Tuple[str, ...]]], step: int = 0) -> None: + def cast_to_json_serializable(value: Any): + if isinstance(value, Video): + raise FormatUnsupportedError(["json"], "video") + if isinstance(value, Figure): + raise FormatUnsupportedError(["json"], "figure") + if isinstance(value, Image): + raise FormatUnsupportedError(["json"], "image") + if hasattr(value, "dtype"): + if value.shape == () or len(value) == 1: + # if value is a dimensionless numpy array or of length 1, serialize as a float + return float(value) + else: + # otherwise, a value is a numpy array, serialize as a list or nested lists + return value.tolist() + return value + + key_values = { + key: cast_to_json_serializable(value) + for key, value in filter_excluded_keys(key_values, key_excluded, "json").items() + } + self.file.write(json.dumps(key_values) + "\n") + self.file.flush() + + def close(self) -> None: + """ + closes the file + """ + + self.file.close() + + +class CSVOutputFormat(KVWriter): + def __init__(self, filename: str): + """ + log to a file, in a CSV format + + :param filename: the file to write the log to + """ + + self.file = open(filename, "w+t") + self.keys = [] + self.separator = "," + self.quotechar = '"' + + def write(self, key_values: Dict[str, Any], key_excluded: Dict[str, Union[str, Tuple[str, ...]]], step: int = 0) -> None: + # Add our current row to the history + key_values = filter_excluded_keys(key_values, key_excluded, "csv") + extra_keys = key_values.keys() - self.keys + if extra_keys: + self.keys.extend(extra_keys) + self.file.seek(0) + lines = self.file.readlines() + self.file.seek(0) + for (i, key) in enumerate(self.keys): + if i > 0: + self.file.write(",") + self.file.write(key) + self.file.write("\n") + for line in lines[1:]: + self.file.write(line[:-1]) + self.file.write(self.separator * len(extra_keys)) + self.file.write("\n") + for i, key in enumerate(self.keys): + if i > 0: + self.file.write(",") + value = key_values.get(key) + + if isinstance(value, Video): + raise FormatUnsupportedError(["csv"], "video") + + elif isinstance(value, Figure): + raise FormatUnsupportedError(["csv"], "figure") + + elif isinstance(value, Image): + raise FormatUnsupportedError(["csv"], "image") + + elif isinstance(value, str): + # escape quotechars by prepending them with another quotechar + value = value.replace(self.quotechar, self.quotechar + self.quotechar) + + # additionally wrap text with quotechars so that any delimiters in the text are ignored by csv readers + self.file.write(self.quotechar + value + self.quotechar) + + elif value is not None: + self.file.write(str(value)) + self.file.write("\n") + self.file.flush() + + def close(self) -> None: + """ + closes the file + """ + self.file.close() + + +class TensorBoardOutputFormat(KVWriter): + def __init__(self, folder: str): + """ + Dumps key/value pairs into TensorBoard's numeric format. + + :param folder: the folder to write the log to + """ + assert SummaryWriter is not None, "tensorboard is not installed, you can use " "pip install tensorboard to do so" + self.writer = SummaryWriter(log_dir=folder) + + def write(self, key_values: Dict[str, Any], key_excluded: Dict[str, Union[str, Tuple[str, ...]]], step: int = 0) -> None: + + for (key, value), (_, excluded) in zip(sorted(key_values.items()), sorted(key_excluded.items())): + + if excluded is not None and "tensorboard" in excluded: + continue + + if isinstance(value, np.ScalarType): + if isinstance(value, str): + # str is considered a np.ScalarType + self.writer.add_text(key, value, step) + else: + self.writer.add_scalar(key, value, step) + + if isinstance(value, th.Tensor): + self.writer.add_histogram(key, value, step) + + if isinstance(value, Video): + self.writer.add_video(key, value.frames, step, value.fps) + + if isinstance(value, Figure): + self.writer.add_figure(key, value.figure, step, close=value.close) + + if isinstance(value, Image): + self.writer.add_image(key, value.image, step, dataformats=value.dataformats) + + # Flush the output to the file + self.writer.flush() + + def close(self) -> None: + """ + closes the file + """ + if self.writer: + self.writer.close() + self.writer = None + + +def make_output_format(_format: str, log_dir: str, log_suffix: str = "") -> KVWriter: + """ + return a logger for the requested format + + :param _format: the requested format to log to ('stdout', 'log', 'json' or 'csv' or 'tensorboard') + :param log_dir: the logging directory + :param log_suffix: the suffix for the log file + :return: the logger + """ + os.makedirs(log_dir, exist_ok=True) + if _format == "stdout": + return HumanOutputFormat(sys.stdout) + elif _format == "log": + return HumanOutputFormat(os.path.join(log_dir, f"log{log_suffix}.txt")) + elif _format == "json": + return JSONOutputFormat(os.path.join(log_dir, f"progress{log_suffix}.json")) + elif _format == "csv": + return CSVOutputFormat(os.path.join(log_dir, f"progress{log_suffix}.csv")) + elif _format == "tensorboard": + return TensorBoardOutputFormat(log_dir) + else: + raise ValueError(f"Unknown format specified: {_format}") + + +# ================================================================ +# Backend +# ================================================================ + + +class Logger(object): + """ + The logger class. + + :param folder: the logging location + :param output_formats: the list of output formats + """ + + def __init__(self, folder: Optional[str], output_formats: List[KVWriter]): + self.name_to_value = defaultdict(float) # values this iteration + self.name_to_count = defaultdict(int) + self.name_to_excluded = defaultdict(str) + self.level = INFO + self.dir = folder + self.output_formats = output_formats + + def record(self, key: str, value: Any, exclude: Optional[Union[str, Tuple[str, ...]]] = None) -> None: + """ + Log a value of some diagnostic + Call this once for each diagnostic quantity, each iteration + If called many times, last value will be used. + + :param key: save to log this key + :param value: save to log this value + :param exclude: outputs to be excluded + """ + self.name_to_value[key] = value + self.name_to_excluded[key] = exclude + + def record_mean(self, key: str, value: Any, exclude: Optional[Union[str, Tuple[str, ...]]] = None) -> None: + """ + The same as record(), but if called many times, values averaged. + + :param key: save to log this key + :param value: save to log this value + :param exclude: outputs to be excluded + """ + if value is None: + self.name_to_value[key] = None + return + old_val, count = self.name_to_value[key], self.name_to_count[key] + self.name_to_value[key] = old_val * count / (count + 1) + value / (count + 1) + self.name_to_count[key] = count + 1 + self.name_to_excluded[key] = exclude + + def dump(self, step: int = 0) -> None: + """ + Write all of the diagnostics from the current iteration + """ + if self.level == DISABLED: + return + for _format in self.output_formats: + if isinstance(_format, KVWriter): + _format.write(self.name_to_value, self.name_to_excluded, step) + + self.name_to_value.clear() + self.name_to_count.clear() + self.name_to_excluded.clear() + + def log(self, *args, level: int = INFO) -> None: + """ + Write the sequence of args, with no separators, + to the console and output files (if you've configured an output file). + + level: int. (see logger.py docs) If the global logger level is higher than + the level argument here, don't print to stdout. + + :param args: log the arguments + :param level: the logging level (can be DEBUG=10, INFO=20, WARN=30, ERROR=40, DISABLED=50) + """ + if self.level <= level: + self._do_log(args) + + def debug(self, *args) -> None: + """ + Write the sequence of args, with no separators, + to the console and output files (if you've configured an output file). + Using the DEBUG level. + + :param args: log the arguments + """ + self.log(*args, level=DEBUG) + + def info(self, *args) -> None: + """ + Write the sequence of args, with no separators, + to the console and output files (if you've configured an output file). + Using the INFO level. + + :param args: log the arguments + """ + self.log(*args, level=INFO) + + def warn(self, *args) -> None: + """ + Write the sequence of args, with no separators, + to the console and output files (if you've configured an output file). + Using the WARN level. + + :param args: log the arguments + """ + self.log(*args, level=WARN) + + def error(self, *args) -> None: + """ + Write the sequence of args, with no separators, + to the console and output files (if you've configured an output file). + Using the ERROR level. + + :param args: log the arguments + """ + self.log(*args, level=ERROR) + + # Configuration + # ---------------------------------------- + def set_level(self, level: int) -> None: + """ + Set logging threshold on current logger. + + :param level: the logging level (can be DEBUG=10, INFO=20, WARN=30, ERROR=40, DISABLED=50) + """ + self.level = level + + def get_dir(self) -> str: + """ + Get directory that log files are being written to. + will be None if there is no output directory (i.e., if you didn't call start) + + :return: the logging directory + """ + return self.dir + + def close(self) -> None: + """ + closes the file + """ + for _format in self.output_formats: + _format.close() + + # Misc + # ---------------------------------------- + def _do_log(self, args) -> None: + """ + log to the requested format outputs + + :param args: the arguments to log + """ + for _format in self.output_formats: + if isinstance(_format, SeqWriter): + _format.write_sequence(map(str, args)) + + +def configure(folder: Optional[str] = None, format_strings: Optional[List[str]] = None) -> Logger: + """ + Configure the current logger. + + :param folder: the save location + (if None, $SB3_LOGDIR, if still None, tempdir/SB3-[date & time]) + :param format_strings: the output logging format + (if None, $SB3_LOG_FORMAT, if still None, ['stdout', 'log', 'csv']) + :return: The logger object. + """ + if folder is None: + folder = os.getenv("SB3_LOGDIR") + if folder is None: + folder = os.path.join(tempfile.gettempdir(), datetime.datetime.now().strftime("SB3-%Y-%m-%d-%H-%M-%S-%f")) + assert isinstance(folder, str) + os.makedirs(folder, exist_ok=True) + + log_suffix = "" + if format_strings is None: + format_strings = os.getenv("SB3_LOG_FORMAT", "stdout,log,csv").split(",") + + format_strings = list(filter(None, format_strings)) + output_formats = [make_output_format(f, folder, log_suffix) for f in format_strings] + + logger = Logger(folder=folder, output_formats=output_formats) + # Only print when some files will be saved + if len(format_strings) > 0 and format_strings != ["stdout"]: + logger.log(f"Logging to {folder}") + return logger + + +# ================================================================ +# Readers +# ================================================================ + + +def read_json(filename: str) -> pandas.DataFrame: + """ + read a json file using pandas + + :param filename: the file path to read + :return: the data in the json + """ + data = [] + with open(filename, "rt") as file_handler: + for line in file_handler: + data.append(json.loads(line)) + return pandas.DataFrame(data) + + +def read_csv(filename: str) -> pandas.DataFrame: + """ + read a csv file using pandas + + :param filename: the file path to read + :return: the data in the csv + """ + return pandas.read_csv(filename, index_col=None, comment="#") diff --git a/pz_risk/common/preprocessing.py b/pz_risk/common/preprocessing.py new file mode 100644 index 0000000..dc0bc5c --- /dev/null +++ b/pz_risk/common/preprocessing.py @@ -0,0 +1,216 @@ +import warnings +from typing import Dict, Tuple, Union + +import numpy as np +import torch as th +from gym import spaces +from torch.nn import functional as F + + +def is_image_space_channels_first(observation_space: spaces.Box) -> bool: + """ + Check if an image observation space (see ``is_image_space``) + is channels-first (CxHxW, True) or channels-last (HxWxC, False). + + Use a heuristic that channel dimension is the smallest of the three. + If second dimension is smallest, raise an exception (no support). + + :param observation_space: + :return: True if observation space is channels-first image, False if channels-last. + """ + smallest_dimension = np.argmin(observation_space.shape).item() + if smallest_dimension == 1: + warnings.warn("Treating image space as channels-last, while second dimension was smallest of the three.") + return smallest_dimension == 0 + + +def is_image_space( + observation_space: spaces.Space, + check_channels: bool = False, +) -> bool: + """ + Check if a observation space has the shape, limits and dtype + of a valid image. + The check is conservative, so that it returns False if there is a doubt. + + Valid images: RGB, RGBD, GrayScale with values in [0, 255] + + :param observation_space: + :param check_channels: Whether to do or not the check for the number of channels. + e.g., with frame-stacking, the observation space may have more channels than expected. + :return: + """ + if isinstance(observation_space, spaces.Box) and len(observation_space.shape) == 3: + # Check the type + if observation_space.dtype != np.uint8: + return False + + # Check the value range + if np.any(observation_space.low != 0) or np.any(observation_space.high != 255): + return False + + # Skip channels check + if not check_channels: + return True + # Check the number of channels + if is_image_space_channels_first(observation_space): + n_channels = observation_space.shape[0] + else: + n_channels = observation_space.shape[-1] + # RGB, RGBD, GrayScale + return n_channels in [1, 3, 4] + return False + + +def maybe_transpose(observation: np.ndarray, observation_space: spaces.Space) -> np.ndarray: + """ + Handle the different cases for images as PyTorch use channel first format. + + :param observation: + :param observation_space: + :return: channel first observation if observation is an image + """ + # Avoid circular import + from stable_baselines3.common.vec_env import VecTransposeImage + + if is_image_space(observation_space): + if not (observation.shape == observation_space.shape or observation.shape[1:] == observation_space.shape): + # Try to re-order the channels + transpose_obs = VecTransposeImage.transpose_image(observation) + if transpose_obs.shape == observation_space.shape or transpose_obs.shape[1:] == observation_space.shape: + observation = transpose_obs + return observation + + +def preprocess_obs( + obs: th.Tensor, + observation_space: spaces.Space, + normalize_images: bool = True, +) -> Union[th.Tensor, Dict[str, th.Tensor]]: + """ + Preprocess observation to be to a neural network. + For images, it normalizes the values by dividing them by 255 (to have values in [0, 1]) + For discrete observations, it create a one hot vector. + + :param obs: Observation + :param observation_space: + :param normalize_images: Whether to normalize images or not + (True by default) + :return: + """ + if isinstance(observation_space, spaces.Box): + if is_image_space(observation_space) and normalize_images: + return obs.float() / 255.0 + return obs.float() + + elif isinstance(observation_space, spaces.Discrete): + # One hot encoding and convert to float to avoid errors + return F.one_hot(obs.long(), num_classes=observation_space.n).float() + + elif isinstance(observation_space, spaces.MultiDiscrete): + # Tensor concatenation of one hot encodings of each Categorical sub-space + return th.cat( + [ + F.one_hot(obs_.long(), num_classes=int(observation_space.nvec[idx])).float() + for idx, obs_ in enumerate(th.split(obs.long(), 1, dim=1)) + ], + dim=-1, + ).view(obs.shape[0], sum(observation_space.nvec)) + + elif isinstance(observation_space, spaces.MultiBinary): + return obs.float() + + elif isinstance(observation_space, spaces.Dict): + # Do not modify by reference the original observation + preprocessed_obs = {} + for key, _obs in obs.items(): + preprocessed_obs[key] = preprocess_obs(_obs, observation_space[key], normalize_images=normalize_images) + return preprocessed_obs + + else: + raise NotImplementedError(f"Preprocessing not implemented for {observation_space}") + + +def get_obs_shape( + observation_space: spaces.Space, +) -> Union[Tuple[int, ...], Dict[str, Tuple[int, ...]]]: + """ + Get the shape of the observation (useful for the buffers). + + :param observation_space: + :return: + """ + if isinstance(observation_space, spaces.Box): + return observation_space.shape + elif isinstance(observation_space, spaces.Discrete): + # Observation is an int + return (1,) + elif isinstance(observation_space, spaces.MultiDiscrete): + # Number of discrete features + return (int(len(observation_space.nvec)),) + elif isinstance(observation_space, spaces.MultiBinary): + # Number of binary features + return (int(observation_space.n),) + elif isinstance(observation_space, spaces.Dict): + return {key: get_obs_shape(subspace) for (key, subspace) in observation_space.spaces.items()} + + else: + raise NotImplementedError(f"{observation_space} observation space is not supported") + + +def get_flattened_obs_dim(observation_space: spaces.Space) -> int: + """ + Get the dimension of the observation space when flattened. + It does not apply to image observation space. + + Used by the ``FlattenExtractor`` to compute the input shape. + + :param observation_space: + :return: + """ + # See issue https://github.com/openai/gym/issues/1915 + # it may be a problem for Dict/Tuple spaces too... + if isinstance(observation_space, spaces.MultiDiscrete): + return sum(observation_space.nvec) + else: + # Use Gym internal method + return spaces.utils.flatdim(observation_space) + + +def get_action_dim(action_space: spaces.Space) -> int: + """ + Get the dimension of the action space. + + :param action_space: + :return: + """ + if isinstance(action_space, spaces.Box): + return int(np.prod(action_space.shape)) + elif isinstance(action_space, spaces.Discrete): + # Action is an int + return 1 + elif isinstance(action_space, spaces.MultiDiscrete): + # Number of discrete actions + return int(len(action_space.nvec)) + elif isinstance(action_space, spaces.MultiBinary): + # Number of binary actions + return int(action_space.n) + else: + raise NotImplementedError(f"{action_space} action space is not supported") + + +def check_for_nested_spaces(obs_space: spaces.Space): + """ + Make sure the observation space does not have nested spaces (Dicts/Tuples inside Dicts/Tuples). + If so, raise an Exception informing that there is no support for this. + + :param obs_space: an observation space + :return: + """ + if isinstance(obs_space, (spaces.Dict, spaces.Tuple)): + sub_spaces = obs_space.spaces.values() if isinstance(obs_space, spaces.Dict) else obs_space.spaces + for sub_space in sub_spaces: + if isinstance(sub_space, (spaces.Dict, spaces.Tuple)): + raise NotImplementedError( + "Nested observation spaces are not supported (Tuple/Dict space inside Tuple/Dict space)." + ) diff --git a/pz_risk/common/running_mean_std.py b/pz_risk/common/running_mean_std.py new file mode 100644 index 0000000..700e7b5 --- /dev/null +++ b/pz_risk/common/running_mean_std.py @@ -0,0 +1,39 @@ +from typing import Tuple + +import numpy as np + + +class RunningMeanStd(object): + def __init__(self, epsilon: float = 1e-4, shape: Tuple[int, ...] = ()): + """ + Calulates the running mean and std of a data stream + https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm + + :param epsilon: helps with arithmetic issues + :param shape: the shape of the data stream's output + """ + self.mean = np.zeros(shape, np.float64) + self.var = np.ones(shape, np.float64) + self.count = epsilon + + def update(self, arr: np.ndarray) -> None: + batch_mean = np.mean(arr, axis=0) + batch_var = np.var(arr, axis=0) + batch_count = arr.shape[0] + self.update_from_moments(batch_mean, batch_var, batch_count) + + def update_from_moments(self, batch_mean: np.ndarray, batch_var: np.ndarray, batch_count: int) -> None: + delta = batch_mean - self.mean + tot_count = self.count + batch_count + + new_mean = self.mean + delta * batch_count / tot_count + m_a = self.var * self.count + m_b = batch_var * batch_count + m_2 = m_a + m_b + np.square(delta) * self.count * batch_count / (self.count + batch_count) + new_var = m_2 / (self.count + batch_count) + + new_count = batch_count + self.count + + self.mean = new_mean + self.var = new_var + self.count = new_count diff --git a/pz_risk/common/sb2_compat/__init__.py b/pz_risk/common/sb2_compat/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pz_risk/common/sb2_compat/rmsprop_tf_like.py b/pz_risk/common/sb2_compat/rmsprop_tf_like.py new file mode 100644 index 0000000..04edf37 --- /dev/null +++ b/pz_risk/common/sb2_compat/rmsprop_tf_like.py @@ -0,0 +1,136 @@ +from typing import Any, Callable, Dict, Iterable, Optional + +import torch +from torch.optim import Optimizer + + +class RMSpropTFLike(Optimizer): + r"""Implements RMSprop algorithm with closer match to Tensorflow version. + + For reproducibility with original stable-baselines. Use this + version with e.g. A2C for stabler learning than with the PyTorch + RMSProp. Based on the PyTorch v1.5.0 implementation of RMSprop. + + See a more throughout conversion in pytorch-image-models repository: + https://github.com/rwightman/pytorch-image-models/blob/master/timm/optim/rmsprop_tf.py + + Changes to the original RMSprop: + - Move epsilon inside square root + - Initialize squared gradient to ones rather than zeros + + Proposed by G. Hinton in his + `course `_. + + The centered version first appears in `Generating Sequences + With Recurrent Neural Networks `_. + + The implementation here takes the square root of the gradient average before + adding epsilon (note that TensorFlow interchanges these two operations). The effective + learning rate is thus :math:`\alpha/(\sqrt{v} + \epsilon)` where :math:`\alpha` + is the scheduled learning rate and :math:`v` is the weighted moving average + of the squared gradient. + + :params: iterable of parameters to optimize or dicts defining + parameter groups + :param lr: learning rate (default: 1e-2) + :param momentum: momentum factor (default: 0) + :param alpha: smoothing constant (default: 0.99) + :param eps: term added to the denominator to improve + numerical stability (default: 1e-8) + :param centered: if ``True``, compute the centered RMSProp, + the gradient is normalized by an estimation of its variance + :param weight_decay: weight decay (L2 penalty) (default: 0) + + """ + + def __init__( + self, + params: Iterable[torch.nn.Parameter], + lr: float = 1e-2, + alpha: float = 0.99, + eps: float = 1e-8, + weight_decay: float = 0, + momentum: float = 0, + centered: bool = False, + ): + if not 0.0 <= lr: + raise ValueError("Invalid learning rate: {}".format(lr)) + if not 0.0 <= eps: + raise ValueError("Invalid epsilon value: {}".format(eps)) + if not 0.0 <= momentum: + raise ValueError("Invalid momentum value: {}".format(momentum)) + if not 0.0 <= weight_decay: + raise ValueError("Invalid weight_decay value: {}".format(weight_decay)) + if not 0.0 <= alpha: + raise ValueError("Invalid alpha value: {}".format(alpha)) + + defaults = dict(lr=lr, momentum=momentum, alpha=alpha, eps=eps, centered=centered, weight_decay=weight_decay) + super(RMSpropTFLike, self).__init__(params, defaults) + + def __setstate__(self, state: Dict[str, Any]) -> None: + super(RMSpropTFLike, self).__setstate__(state) + for group in self.param_groups: + group.setdefault("momentum", 0) + group.setdefault("centered", False) + + @torch.no_grad() + def step(self, closure: Optional[Callable[[], None]] = None) -> Optional[torch.Tensor]: + """Performs a single optimization step. + + :param closure: A closure that reevaluates the model + and returns the loss. + :return: loss + """ + loss = None + if closure is not None: + with torch.enable_grad(): + loss = closure() + + for group in self.param_groups: + for p in group["params"]: + if p.grad is None: + continue + grad = p.grad + if grad.is_sparse: + raise RuntimeError("RMSpropTF does not support sparse gradients") + state = self.state[p] + + # State initialization + if len(state) == 0: + state["step"] = 0 + # PyTorch initialized to zeros here + state["square_avg"] = torch.ones_like(p, memory_format=torch.preserve_format) + if group["momentum"] > 0: + state["momentum_buffer"] = torch.zeros_like(p, memory_format=torch.preserve_format) + if group["centered"]: + state["grad_avg"] = torch.zeros_like(p, memory_format=torch.preserve_format) + + square_avg = state["square_avg"] + alpha = group["alpha"] + + state["step"] += 1 + + if group["weight_decay"] != 0: + grad = grad.add(p, alpha=group["weight_decay"]) + + square_avg.mul_(alpha).addcmul_(grad, grad, value=1 - alpha) + + if group["centered"]: + grad_avg = state["grad_avg"] + grad_avg.mul_(alpha).add_(grad, alpha=1 - alpha) + # PyTorch added epsilon after square root + # avg = square_avg.addcmul(grad_avg, grad_avg, value=-1).sqrt_().add_(group['eps']) + avg = square_avg.addcmul(grad_avg, grad_avg, value=-1).add_(group["eps"]).sqrt_() + else: + # PyTorch added epsilon after square root + # avg = square_avg.sqrt().add_(group['eps']) + avg = square_avg.add(group["eps"]).sqrt_() + + if group["momentum"] > 0: + buf = state["momentum_buffer"] + buf.mul_(group["momentum"]).addcdiv_(grad, avg) + p.add_(buf, alpha=-group["lr"]) + else: + p.addcdiv_(grad, avg, value=-group["lr"]) + + return loss diff --git a/pz_risk/common/type_aliases.py b/pz_risk/common/type_aliases.py new file mode 100644 index 0000000..fd3e3a0 --- /dev/null +++ b/pz_risk/common/type_aliases.py @@ -0,0 +1,71 @@ +"""Common aliases for type hints""" + +from enum import Enum +from typing import Any, Callable, Dict, List, NamedTuple, Tuple, Union + +import gym +import numpy as np +import torch as th + +from common import vec_env + +GymEnv = Union[gym.Env, vec_env.VecEnv] +GymObs = Union[Tuple, Dict[str, Any], np.ndarray, int] +GymStepReturn = Tuple[GymObs, float, bool, Dict] +TensorDict = Dict[Union[str, int], th.Tensor] +OptimizerStateDict = Dict[str, Any] + +# A schedule takes the remaining progress as input +# and ouputs a scalar (e.g. learning rate, clip range, ...) +Schedule = Callable[[float], float] + + +class RolloutBufferSamples(NamedTuple): + observations: th.Tensor + actions: th.Tensor + old_values: th.Tensor + old_log_prob: th.Tensor + advantages: th.Tensor + returns: th.Tensor + + +class DictRolloutBufferSamples(RolloutBufferSamples): + observations: TensorDict + actions: th.Tensor + old_values: th.Tensor + old_log_prob: th.Tensor + advantages: th.Tensor + returns: th.Tensor + + +class ReplayBufferSamples(NamedTuple): + observations: th.Tensor + actions: th.Tensor + next_observations: th.Tensor + dones: th.Tensor + rewards: th.Tensor + + +class DictReplayBufferSamples(ReplayBufferSamples): + observations: TensorDict + actions: th.Tensor + next_observations: th.Tensor + dones: th.Tensor + rewards: th.Tensor + + +class RolloutReturn(NamedTuple): + episode_reward: float + episode_timesteps: int + n_episodes: int + continue_training: bool + + +class TrainFrequencyUnit(Enum): + STEP = "step" + EPISODE = "episode" + + +class TrainFreq(NamedTuple): + frequency: int + unit: TrainFrequencyUnit # either "step" or "episode" diff --git a/pz_risk/common/utils.py b/pz_risk/common/utils.py new file mode 100644 index 0000000..24ea465 --- /dev/null +++ b/pz_risk/common/utils.py @@ -0,0 +1,463 @@ +import glob +import os +import random +from collections import deque +from itertools import zip_longest +from typing import Dict, Iterable, Optional, Union + +import gym +import numpy as np +import torch as th + +# Check if tensorboard is available for pytorch +try: + from torch.utils.tensorboard import SummaryWriter +except ImportError: + SummaryWriter = None + +from .logger import Logger, configure +from .type_aliases import GymEnv, Schedule, TensorDict, TrainFreq, TrainFrequencyUnit + + +def set_random_seed(seed: int, using_cuda: bool = False) -> None: + """ + Seed the different random generators. + + :param seed: + :param using_cuda: + """ + # Seed python RNG + random.seed(seed) + # Seed numpy RNG + np.random.seed(seed) + # seed the RNG for all devices (both CPU and CUDA) + th.manual_seed(seed) + + if using_cuda: + # Deterministic operations for CuDNN, it may impact performances + th.backends.cudnn.deterministic = True + th.backends.cudnn.benchmark = False + + +# From stable baselines +def explained_variance(y_pred: np.ndarray, y_true: np.ndarray) -> np.ndarray: + """ + Computes fraction of variance that ypred explains about y. + Returns 1 - Var[y-ypred] / Var[y] + + interpretation: + ev=0 => might as well have predicted zero + ev=1 => perfect prediction + ev<0 => worse than just predicting zero + + :param y_pred: the prediction + :param y_true: the expected value + :return: explained variance of ypred and y + """ + assert y_true.ndim == 1 and y_pred.ndim == 1 + var_y = np.var(y_true) + return np.nan if var_y == 0 else 1 - np.var(y_true - y_pred) / var_y + + +def update_learning_rate(optimizer: th.optim.Optimizer, learning_rate: float) -> None: + """ + Update the learning rate for a given optimizer. + Useful when doing linear schedule. + + :param optimizer: + :param learning_rate: + """ + for param_group in optimizer.param_groups: + param_group["lr"] = learning_rate + + +def get_schedule_fn(value_schedule: Union[Schedule, float, int]) -> Schedule: + """ + Transform (if needed) learning rate and clip range (for PPO) + to callable. + + :param value_schedule: + :return: + """ + # If the passed schedule is a float + # create a constant function + if isinstance(value_schedule, (float, int)): + # Cast to float to avoid errors + value_schedule = constant_fn(float(value_schedule)) + else: + assert callable(value_schedule) + return value_schedule + + +def get_linear_fn(start: float, end: float, end_fraction: float) -> Schedule: + """ + Create a function that interpolates linearly between start and end + between ``progress_remaining`` = 1 and ``progress_remaining`` = ``end_fraction``. + This is used in DQN for linearly annealing the exploration fraction + (epsilon for the epsilon-greedy strategy). + + :params start: value to start with if ``progress_remaining`` = 1 + :params end: value to end with if ``progress_remaining`` = 0 + :params end_fraction: fraction of ``progress_remaining`` + where end is reached e.g 0.1 then end is reached after 10% + of the complete training process. + :return: + """ + + def func(progress_remaining: float) -> float: + if (1 - progress_remaining) > end_fraction: + return end + else: + return start + (1 - progress_remaining) * (end - start) / end_fraction + + return func + + +def constant_fn(val: float) -> Schedule: + """ + Create a function that returns a constant + It is useful for learning rate schedule (to avoid code duplication) + + :param val: + :return: + """ + + def func(_): + return val + + return func + + +def get_device(device: Union[th.device, str] = "auto") -> th.device: + """ + Retrieve PyTorch device. + It checks that the requested device is available first. + For now, it supports only cpu and cuda. + By default, it tries to use the gpu. + + :param device: One for 'auto', 'cuda', 'cpu' + :return: + """ + # Cuda by default + if device == "auto": + device = "cuda" + # Force conversion to th.device + device = th.device(device) + + # Cuda not available + if device.type == th.device("cuda").type and not th.cuda.is_available(): + return th.device("cpu") + + return device + + +def get_latest_run_id(log_path: Optional[str] = None, log_name: str = "") -> int: + """ + Returns the latest run number for the given log name and log path, + by finding the greatest number in the directories. + + :return: latest run number + """ + max_run_id = 0 + for path in glob.glob(f"{log_path}/{log_name}_[0-9]*"): + file_name = path.split(os.sep)[-1] + ext = file_name.split("_")[-1] + if log_name == "_".join(file_name.split("_")[:-1]) and ext.isdigit() and int(ext) > max_run_id: + max_run_id = int(ext) + return max_run_id + + +def configure_logger( + verbose: int = 0, + tensorboard_log: Optional[str] = None, + tb_log_name: str = "", + reset_num_timesteps: bool = True, +) -> Logger: + """ + Configure the logger's outputs. + + :param verbose: the verbosity level: 0 no output, 1 info, 2 debug + :param tensorboard_log: the log location for tensorboard (if None, no logging) + :param tb_log_name: tensorboard log + :param reset_num_timesteps: Whether the ``num_timesteps`` attribute is reset or not. + It allows to continue a previous learning curve (``reset_num_timesteps=False``) + or start from t=0 (``reset_num_timesteps=True``, the default). + :return: The logger object + """ + save_path, format_strings = None, ["stdout"] + + if tensorboard_log is not None and SummaryWriter is None: + raise ImportError("Trying to log data to tensorboard but tensorboard is not installed.") + + if tensorboard_log is not None and SummaryWriter is not None: + latest_run_id = get_latest_run_id(tensorboard_log, tb_log_name) + if not reset_num_timesteps: + # Continue training in the same directory + latest_run_id -= 1 + save_path = os.path.join(tensorboard_log, f"{tb_log_name}_{latest_run_id + 1}") + if verbose >= 1: + format_strings = ["stdout", "tensorboard"] + else: + format_strings = ["tensorboard"] + elif verbose == 0: + format_strings = [""] + return configure(save_path, format_strings=format_strings) + + +def check_for_correct_spaces(env: GymEnv, observation_space: gym.spaces.Space, action_space: gym.spaces.Space) -> None: + """ + Checks that the environment has same spaces as provided ones. Used by BaseAlgorithm to check if + spaces match after loading the model with given env. + Checked parameters: + - observation_space + - action_space + + :param env: Environment to check for valid spaces + :param observation_space: Observation space to check against + :param action_space: Action space to check against + """ + if observation_space != env.observation_space: + raise ValueError(f"Observation spaces do not match: {observation_space} != {env.observation_space}") + if action_space != env.action_space: + raise ValueError(f"Action spaces do not match: {action_space} != {env.action_space}") + + +def is_vectorized_box_observation(observation: np.ndarray, observation_space: gym.spaces.Box) -> bool: + """ + For box observation type, detects and validates the shape, + then returns whether or not the observation is vectorized. + + :param observation: the input observation to validate + :param observation_space: the observation space + :return: whether the given observation is vectorized or not + """ + if observation.shape == observation_space.shape: + return False + elif observation.shape[1:] == observation_space.shape: + return True + else: + raise ValueError( + f"Error: Unexpected observation shape {observation.shape} for " + + f"Box environment, please use {observation_space.shape} " + + "or (n_env, {}) for the observation shape.".format(", ".join(map(str, observation_space.shape))) + ) + + +def is_vectorized_discrete_observation(observation: np.ndarray, observation_space: gym.spaces.Discrete) -> bool: + """ + For discrete observation type, detects and validates the shape, + then returns whether or not the observation is vectorized. + + :param observation: the input observation to validate + :param observation_space: the observation space + :return: whether the given observation is vectorized or not + """ + if observation.shape == (): # A numpy array of a number, has shape empty tuple '()' + return False + elif len(observation.shape) == 1: + return True + else: + raise ValueError( + f"Error: Unexpected observation shape {observation.shape} for " + + "Discrete environment, please use (1,) or (n_env, 1) for the observation shape." + ) + + +def is_vectorized_multidiscrete_observation(observation: np.ndarray, observation_space: gym.spaces.MultiDiscrete) -> bool: + """ + For multidiscrete observation type, detects and validates the shape, + then returns whether or not the observation is vectorized. + + :param observation: the input observation to validate + :param observation_space: the observation space + :return: whether the given observation is vectorized or not + """ + if observation.shape == (len(observation_space.nvec),): + return False + elif len(observation.shape) == 2 and observation.shape[1] == len(observation_space.nvec): + return True + else: + raise ValueError( + f"Error: Unexpected observation shape {observation.shape} for MultiDiscrete " + + f"environment, please use ({len(observation_space.nvec)},) or " + + f"(n_env, {len(observation_space.nvec)}) for the observation shape." + ) + + +def is_vectorized_multibinary_observation(observation: np.ndarray, observation_space: gym.spaces.MultiBinary) -> bool: + """ + For multibinary observation type, detects and validates the shape, + then returns whether or not the observation is vectorized. + + :param observation: the input observation to validate + :param observation_space: the observation space + :return: whether the given observation is vectorized or not + """ + if observation.shape == (observation_space.n,): + return False + elif len(observation.shape) == 2 and observation.shape[1] == observation_space.n: + return True + else: + raise ValueError( + f"Error: Unexpected observation shape {observation.shape} for MultiBinary " + + f"environment, please use ({observation_space.n},) or " + + f"(n_env, {observation_space.n}) for the observation shape." + ) + + +def is_vectorized_dict_observation(observation: np.ndarray, observation_space: gym.spaces.Dict) -> bool: + """ + For dict observation type, detects and validates the shape, + then returns whether or not the observation is vectorized. + + :param observation: the input observation to validate + :param observation_space: the observation space + :return: whether the given observation is vectorized or not + """ + for key, subspace in observation_space.spaces.items(): + if observation[key].shape == subspace.shape: + return False + + all_good = True + + for key, subspace in observation_space.spaces.items(): + if observation[key].shape[1:] != subspace.shape: + all_good = False + break + + if all_good: + return True + else: + raise ValueError( + f"Error: Unexpected observation shape {observation.shape} for " + + f"Tuple environment, please use {(obs.shape for obs in observation_space.spaces)} " + ) + + +def is_vectorized_observation(observation: np.ndarray, observation_space: gym.spaces.Space) -> bool: + """ + For every observation type, detects and validates the shape, + then returns whether or not the observation is vectorized. + + :param observation: the input observation to validate + :param observation_space: the observation space + :return: whether the given observation is vectorized or not + """ + + is_vec_obs_func_dict = { + gym.spaces.Box: is_vectorized_box_observation, + gym.spaces.Discrete: is_vectorized_discrete_observation, + gym.spaces.MultiDiscrete: is_vectorized_multidiscrete_observation, + gym.spaces.MultiBinary: is_vectorized_multibinary_observation, + gym.spaces.Dict: is_vectorized_dict_observation, + } + + try: + is_vec_obs_func = is_vec_obs_func_dict[type(observation_space)] + return is_vec_obs_func(observation, observation_space) + except KeyError: + raise ValueError( + "Error: Cannot determine if the observation is vectorized " + f" with the space type {observation_space}." + ) + + +def safe_mean(arr: Union[np.ndarray, list, deque]) -> np.ndarray: + """ + Compute the mean of an array if there is at least one element. + For empty array, return NaN. It is used for logging only. + + :param arr: + :return: + """ + return np.nan if len(arr) == 0 else np.mean(arr) + + +def zip_strict(*iterables: Iterable) -> Iterable: + r""" + ``zip()`` function but enforces that iterables are of equal length. + Raises ``ValueError`` if iterables not of equal length. + Code inspired by Stackoverflow answer for question #32954486. + + :param \*iterables: iterables to ``zip()`` + """ + # As in Stackoverflow #32954486, use + # new object for "empty" in case we have + # Nones in iterable. + sentinel = object() + for combo in zip_longest(*iterables, fillvalue=sentinel): + if sentinel in combo: + raise ValueError("Iterables have different lengths") + yield combo + + +def polyak_update( + params: Iterable[th.nn.Parameter], + target_params: Iterable[th.nn.Parameter], + tau: float, +) -> None: + """ + Perform a Polyak average update on ``target_params`` using ``params``: + target parameters are slowly updated towards the main parameters. + ``tau``, the soft update coefficient controls the interpolation: + ``tau=1`` corresponds to copying the parameters to the target ones whereas nothing happens when ``tau=0``. + The Polyak update is done in place, with ``no_grad``, and therefore does not create intermediate tensors, + or a computation graph, reducing memory cost and improving performance. We scale the target params + by ``1-tau`` (in-place), add the new weights, scaled by ``tau`` and store the result of the sum in the target + params (in place). + See https://github.com/DLR-RM/stable-baselines3/issues/93 + + :param params: parameters to use to update the target params + :param target_params: parameters to update + :param tau: the soft update coefficient ("Polyak update", between 0 and 1) + """ + with th.no_grad(): + # zip does not raise an exception if length of parameters does not match. + for param, target_param in zip_strict(params, target_params): + target_param.data.mul_(1 - tau) + th.add(target_param.data, param.data, alpha=tau, out=target_param.data) + + +def obs_as_tensor( + obs: Union[np.ndarray, Dict[Union[str, int], np.ndarray]], device: th.device +) -> Union[th.Tensor, TensorDict]: + """ + Moves the observation to the given device. + + :param obs: + :param device: PyTorch device + :return: PyTorch tensor of the observation on a desired device. + """ + if isinstance(obs, np.ndarray): + return th.as_tensor(obs).to(device) + elif isinstance(obs, dict): + return {key: th.as_tensor(_obs).to(device) for (key, _obs) in obs.items()} + else: + raise Exception(f"Unrecognized type of observation {type(obs)}") + + +def should_collect_more_steps( + train_freq: TrainFreq, + num_collected_steps: int, + num_collected_episodes: int, +) -> bool: + """ + Helper used in ``collect_rollouts()`` of off-policy algorithms + to determine the termination condition. + + :param train_freq: How much experience should be collected before updating the policy. + :param num_collected_steps: The number of already collected steps. + :param num_collected_episodes: The number of already collected episodes. + :return: Whether to continue or not collecting experience + by doing rollouts of the current policy. + """ + if train_freq.unit == TrainFrequencyUnit.STEP: + return num_collected_steps < train_freq.frequency + + elif train_freq.unit == TrainFrequencyUnit.EPISODE: + return num_collected_episodes < train_freq.frequency + + else: + raise ValueError( + "The unit of the `train_freq` must be either TrainFrequencyUnit.STEP " + f"or TrainFrequencyUnit.EPISODE not '{train_freq.unit}'!" + ) diff --git a/pz_risk/common/vec_env/__init__.py b/pz_risk/common/vec_env/__init__.py new file mode 100644 index 0000000..deec691 --- /dev/null +++ b/pz_risk/common/vec_env/__init__.py @@ -0,0 +1,66 @@ +# flake8: noqa F401 +import typing +from copy import deepcopy +from typing import Optional, Type, Union + +from .base_vec_env import CloudpickleWrapper, VecEnv, VecEnvWrapper +from .dummy_vec_env import DummyVecEnv +from .subproc_vec_env import SubprocVecEnv +from .vec_normalize import VecNormalize + + +# Avoid circular import +if typing.TYPE_CHECKING: + from stable_baselines3.common.type_aliases import GymEnv + + +def unwrap_vec_wrapper(env: Union["GymEnv", VecEnv], vec_wrapper_class: Type[VecEnvWrapper]) -> Optional[VecEnvWrapper]: + """ + Retrieve a ``VecEnvWrapper`` object by recursively searching. + + :param env: + :param vec_wrapper_class: + :return: + """ + env_tmp = env + while isinstance(env_tmp, VecEnvWrapper): + if isinstance(env_tmp, vec_wrapper_class): + return env_tmp + env_tmp = env_tmp.venv + return None + + +def unwrap_vec_normalize(env: Union["GymEnv", VecEnv]) -> Optional[VecNormalize]: + """ + :param env: + :return: + """ + return unwrap_vec_wrapper(env, VecNormalize) # pytype:disable=bad-return-type + + +def is_vecenv_wrapped(env: Union["GymEnv", VecEnv], vec_wrapper_class: Type[VecEnvWrapper]) -> bool: + """ + Check if an environment is already wrapped by a given ``VecEnvWrapper``. + + :param env: + :param vec_wrapper_class: + :return: + """ + return unwrap_vec_wrapper(env, vec_wrapper_class) is not None + + +# Define here to avoid circular import +def sync_envs_normalization(env: "GymEnv", eval_env: "GymEnv") -> None: + """ + Sync eval env and train env when using VecNormalize + + :param env: + :param eval_env: + """ + env_tmp, eval_env_tmp = env, eval_env + while isinstance(env_tmp, VecEnvWrapper): + if isinstance(env_tmp, VecNormalize): + eval_env_tmp.obs_rms = deepcopy(env_tmp.obs_rms) + eval_env_tmp.ret_rms = deepcopy(env_tmp.ret_rms) + env_tmp = env_tmp.venv + eval_env_tmp = eval_env_tmp.venv diff --git a/pz_risk/common/vec_env/base_vec_env.py b/pz_risk/common/vec_env/base_vec_env.py new file mode 100644 index 0000000..08c60fa --- /dev/null +++ b/pz_risk/common/vec_env/base_vec_env.py @@ -0,0 +1,374 @@ +import inspect +import warnings +from abc import ABC, abstractmethod +from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Type, Union + +import cloudpickle +import gym +import numpy as np + +# Define type aliases here to avoid circular import +# Used when we want to access one or more VecEnv +VecEnvIndices = Union[None, int, Iterable[int]] +# VecEnvObs is what is returned by the reset() method +# it contains the observation for each env +VecEnvObs = Union[np.ndarray, Dict[str, np.ndarray], Tuple[np.ndarray, ...]] +# VecEnvStepReturn is what is returned by the step() method +# it contains the observation, reward, done, info for each env +VecEnvStepReturn = Tuple[VecEnvObs, np.ndarray, np.ndarray, List[Dict]] + + +def tile_images(img_nhwc: Sequence[np.ndarray]) -> np.ndarray: # pragma: no cover + """ + Tile N images into one big PxQ image + (P,Q) are chosen to be as close as possible, and if N + is square, then P=Q. + + :param img_nhwc: list or array of images, ndim=4 once turned into array. img nhwc + n = batch index, h = height, w = width, c = channel + :return: img_HWc, ndim=3 + """ + img_nhwc = np.asarray(img_nhwc) + n_images, height, width, n_channels = img_nhwc.shape + # new_height was named H before + new_height = int(np.ceil(np.sqrt(n_images))) + # new_width was named W before + new_width = int(np.ceil(float(n_images) / new_height)) + img_nhwc = np.array(list(img_nhwc) + [img_nhwc[0] * 0 for _ in range(n_images, new_height * new_width)]) + # img_HWhwc + out_image = img_nhwc.reshape((new_height, new_width, height, width, n_channels)) + # img_HhWwc + out_image = out_image.transpose(0, 2, 1, 3, 4) + # img_Hh_Ww_c + out_image = out_image.reshape((new_height * height, new_width * width, n_channels)) + return out_image + + +class VecEnv(ABC): + """ + An abstract asynchronous, vectorized environment. + + :param num_envs: the number of environments + :param observation_space: the observation space + :param action_space: the action space + """ + + metadata = {"render.modes": ["human", "rgb_array"]} + + def __init__(self, num_envs: int, observation_space: gym.spaces.Space, action_space: gym.spaces.Space): + self.num_envs = num_envs + self.observation_space = observation_space + self.action_space = action_space + + @abstractmethod + def reset(self) -> VecEnvObs: + """ + Reset all the environments and return an array of + observations, or a tuple of observation arrays. + + If step_async is still doing work, that work will + be cancelled and step_wait() should not be called + until step_async() is invoked again. + + :return: observation + """ + raise NotImplementedError() + + @abstractmethod + def step_async(self, actions: np.ndarray) -> None: + """ + Tell all the environments to start taking a step + with the given actions. + Call step_wait() to get the results of the step. + + You should not call this if a step_async run is + already pending. + """ + raise NotImplementedError() + + @abstractmethod + def step_wait(self) -> VecEnvStepReturn: + """ + Wait for the step taken with step_async(). + + :return: observation, reward, done, information + """ + raise NotImplementedError() + + @abstractmethod + def close(self) -> None: + """ + Clean up the environment's resources. + """ + raise NotImplementedError() + + @abstractmethod + def get_attr(self, attr_name: str, indices: VecEnvIndices = None) -> List[Any]: + """ + Return attribute from vectorized environment. + + :param attr_name: The name of the attribute whose value to return + :param indices: Indices of envs to get attribute from + :return: List of values of 'attr_name' in all environments + """ + raise NotImplementedError() + + @abstractmethod + def set_attr(self, attr_name: str, value: Any, indices: VecEnvIndices = None) -> None: + """ + Set attribute inside vectorized environments. + + :param attr_name: The name of attribute to assign new value + :param value: Value to assign to `attr_name` + :param indices: Indices of envs to assign value + :return: + """ + raise NotImplementedError() + + @abstractmethod + def env_method(self, method_name: str, *method_args, indices: VecEnvIndices = None, **method_kwargs) -> List[Any]: + """ + Call instance methods of vectorized environments. + + :param method_name: The name of the environment method to invoke. + :param indices: Indices of envs whose method to call + :param method_args: Any positional arguments to provide in the call + :param method_kwargs: Any keyword arguments to provide in the call + :return: List of items returned by the environment's method call + """ + raise NotImplementedError() + + @abstractmethod + def env_is_wrapped(self, wrapper_class: Type[gym.Wrapper], indices: VecEnvIndices = None) -> List[bool]: + """ + Check if environments are wrapped with a given wrapper. + + :param method_name: The name of the environment method to invoke. + :param indices: Indices of envs whose method to call + :param method_args: Any positional arguments to provide in the call + :param method_kwargs: Any keyword arguments to provide in the call + :return: True if the env is wrapped, False otherwise, for each env queried. + """ + raise NotImplementedError() + + def step(self, actions: np.ndarray) -> VecEnvStepReturn: + """ + Step the environments with the given action + + :param actions: the action + :return: observation, reward, done, information + """ + self.step_async(actions) + return self.step_wait() + + def get_images(self) -> Sequence[np.ndarray]: + """ + Return RGB images from each environment + """ + raise NotImplementedError + + def render(self, mode: str = "human") -> Optional[np.ndarray]: + """ + Gym environment rendering + + :param mode: the rendering type + """ + try: + imgs = self.get_images() + except NotImplementedError: + warnings.warn(f"Render not defined for {self}") + return + + # Create a big image by tiling images from subprocesses + bigimg = tile_images(imgs) + if mode == "human": + import cv2 # pytype:disable=import-error + + cv2.imshow("vecenv", bigimg[:, :, ::-1]) + cv2.waitKey(1) + elif mode == "rgb_array": + return bigimg + else: + raise NotImplementedError(f"Render mode {mode} is not supported by VecEnvs") + + @abstractmethod + def seed(self, seed: Optional[int] = None) -> List[Union[None, int]]: + """ + Sets the random seeds for all environments, based on a given seed. + Each individual environment will still get its own seed, by incrementing the given seed. + + :param seed: The random seed. May be None for completely random seeding. + :return: Returns a list containing the seeds for each individual env. + Note that all list elements may be None, if the env does not return anything when being seeded. + """ + pass + + @property + def unwrapped(self) -> "VecEnv": + if isinstance(self, VecEnvWrapper): + return self.venv.unwrapped + else: + return self + + def getattr_depth_check(self, name: str, already_found: bool) -> Optional[str]: + """Check if an attribute reference is being hidden in a recursive call to __getattr__ + + :param name: name of attribute to check for + :param already_found: whether this attribute has already been found in a wrapper + :return: name of module whose attribute is being shadowed, if any. + """ + if hasattr(self, name) and already_found: + return f"{type(self).__module__}.{type(self).__name__}" + else: + return None + + def _get_indices(self, indices: VecEnvIndices) -> Iterable[int]: + """ + Convert a flexibly-typed reference to environment indices to an implied list of indices. + + :param indices: refers to indices of envs. + :return: the implied list of indices. + """ + if indices is None: + indices = range(self.num_envs) + elif isinstance(indices, int): + indices = [indices] + return indices + + +class VecEnvWrapper(VecEnv): + """ + Vectorized environment base class + + :param venv: the vectorized environment to wrap + :param observation_space: the observation space (can be None to load from venv) + :param action_space: the action space (can be None to load from venv) + """ + + def __init__( + self, + venv: VecEnv, + observation_space: Optional[gym.spaces.Space] = None, + action_space: Optional[gym.spaces.Space] = None, + ): + self.venv = venv + VecEnv.__init__( + self, + num_envs=venv.num_envs, + observation_space=observation_space or venv.observation_space, + action_space=action_space or venv.action_space, + ) + self.class_attributes = dict(inspect.getmembers(self.__class__)) + + def step_async(self, actions: np.ndarray) -> None: + self.venv.step_async(actions) + + @abstractmethod + def reset(self) -> VecEnvObs: + pass + + @abstractmethod + def step_wait(self) -> VecEnvStepReturn: + pass + + def seed(self, seed: Optional[int] = None) -> List[Union[None, int]]: + return self.venv.seed(seed) + + def close(self) -> None: + return self.venv.close() + + def render(self, mode: str = "human") -> Optional[np.ndarray]: + return self.venv.render(mode=mode) + + def get_images(self) -> Sequence[np.ndarray]: + return self.venv.get_images() + + def get_attr(self, attr_name: str, indices: VecEnvIndices = None) -> List[Any]: + return self.venv.get_attr(attr_name, indices) + + def set_attr(self, attr_name: str, value: Any, indices: VecEnvIndices = None) -> None: + return self.venv.set_attr(attr_name, value, indices) + + def env_method(self, method_name: str, *method_args, indices: VecEnvIndices = None, **method_kwargs) -> List[Any]: + return self.venv.env_method(method_name, *method_args, indices=indices, **method_kwargs) + + def env_is_wrapped(self, wrapper_class: Type[gym.Wrapper], indices: VecEnvIndices = None) -> List[bool]: + return self.venv.env_is_wrapped(wrapper_class, indices=indices) + + def __getattr__(self, name: str) -> Any: + """Find attribute from wrapped venv(s) if this wrapper does not have it. + Useful for accessing attributes from venvs which are wrapped with multiple wrappers + which have unique attributes of interest. + """ + blocked_class = self.getattr_depth_check(name, already_found=False) + if blocked_class is not None: + own_class = f"{type(self).__module__}.{type(self).__name__}" + error_str = ( + f"Error: Recursive attribute lookup for {name} from {own_class} is " + "ambiguous and hides attribute from {blocked_class}" + ) + raise AttributeError(error_str) + + return self.getattr_recursive(name) + + def _get_all_attributes(self) -> Dict[str, Any]: + """Get all (inherited) instance and class attributes + + :return: all_attributes + """ + all_attributes = self.__dict__.copy() + all_attributes.update(self.class_attributes) + return all_attributes + + def getattr_recursive(self, name: str) -> Any: + """Recursively check wrappers to find attribute. + + :param name: name of attribute to look for + :return: attribute + """ + all_attributes = self._get_all_attributes() + if name in all_attributes: # attribute is present in this wrapper + attr = getattr(self, name) + elif hasattr(self.venv, "getattr_recursive"): + # Attribute not present, child is wrapper. Call getattr_recursive rather than getattr + # to avoid a duplicate call to getattr_depth_check. + attr = self.venv.getattr_recursive(name) + else: # attribute not present, child is an unwrapped VecEnv + attr = getattr(self.venv, name) + + return attr + + def getattr_depth_check(self, name: str, already_found: bool) -> str: + """See base class. + + :return: name of module whose attribute is being shadowed, if any. + """ + all_attributes = self._get_all_attributes() + if name in all_attributes and already_found: + # this venv's attribute is being hidden because of a higher venv. + shadowed_wrapper_class = f"{type(self).__module__}.{type(self).__name__}" + elif name in all_attributes and not already_found: + # we have found the first reference to the attribute. Now check for duplicates. + shadowed_wrapper_class = self.venv.getattr_depth_check(name, True) + else: + # this wrapper does not have the attribute. Keep searching. + shadowed_wrapper_class = self.venv.getattr_depth_check(name, already_found) + + return shadowed_wrapper_class + + +class CloudpickleWrapper: + """ + Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle) + + :param var: the variable you wish to wrap for pickling with cloudpickle + """ + + def __init__(self, var: Any): + self.var = var + + def __getstate__(self) -> Any: + return cloudpickle.dumps(self.var) + + def __setstate__(self, var: Any) -> None: + self.var = cloudpickle.loads(var) diff --git a/pz_risk/common/vec_env/dummy_vec_env.py b/pz_risk/common/vec_env/dummy_vec_env.py new file mode 100644 index 0000000..051ed84 --- /dev/null +++ b/pz_risk/common/vec_env/dummy_vec_env.py @@ -0,0 +1,126 @@ +from collections import OrderedDict +from copy import deepcopy +from typing import Any, Callable, List, Optional, Sequence, Type, Union + +import gym +import pettingzoo +import numpy as np + +from .base_vec_env import VecEnv, VecEnvIndices, VecEnvObs, VecEnvStepReturn +from .util import copy_obs_dict, dict_to_obs, obs_space_info + + +class DummyVecEnv(VecEnv): + """ + Creates a simple vectorized wrapper for multiple environments, calling each environment in sequence on the current + Python process. This is useful for computationally simple environment such as ``cartpole-v1``, + as the overhead of multiprocess or multithread outweighs the environment computation time. + This can also be used for RL methods that + require a vectorized environment, but that you want a single environments to train with. + + :param env_fns: a list of functions + that return environments to vectorize + """ + + def __init__(self, env_fns: List[Callable[[], pettingzoo.AECEnv]]): + self.envs = [fn() for fn in env_fns] + env = self.envs[0] + VecEnv.__init__(self, len(env_fns), env.observation_spaces, env.action_spaces) + obs_space = env.observation_spaces + self.keys, shapes, dtypes = obs_space_info(obs_space) + + self.buf_obs = OrderedDict([(k, np.zeros((self.num_envs,) + tuple(shapes[k]), dtype=dtypes[k])) for k in self.keys]) + self.buf_dones = np.zeros((self.num_envs,), dtype=bool) + self.buf_rews = np.zeros((self.num_envs,), dtype=np.float32) + self.buf_infos = [{} for _ in range(self.num_envs)] + self.actions = None + self.metadata = env.metadata + + def step_async(self, actions: np.ndarray) -> None: + self.actions = actions + + def step_wait(self) -> VecEnvStepReturn: + for env_idx in range(self.num_envs): + obs, self.buf_rews[env_idx], self.buf_dones[env_idx], self.buf_infos[env_idx] = self.envs[env_idx].step( + self.actions[env_idx] + ) + if self.buf_dones[env_idx]: + # save final observation where user can get it, then reset + self.buf_infos[env_idx]["terminal_observation"] = obs + obs = self.envs[env_idx].reset() + self._save_obs(env_idx, obs) + return (self._obs_from_buf(), np.copy(self.buf_rews), np.copy(self.buf_dones), deepcopy(self.buf_infos)) + + def seed(self, seed: Optional[int] = None) -> List[Union[None, int]]: + seeds = list() + for idx, env in enumerate(self.envs): + seeds.append(env.seed(seed + idx)) + return seeds + + def reset(self) -> VecEnvObs: + for env_idx in range(self.num_envs): + obs = self.envs[env_idx].reset() + self._save_obs(env_idx, obs) + return self._obs_from_buf() + + def close(self) -> None: + for env in self.envs: + env.close() + + def get_images(self) -> Sequence[np.ndarray]: + return [env.render(mode="rgb_array") for env in self.envs] + + def render(self, mode: str = "human") -> Optional[np.ndarray]: + """ + Gym environment rendering. If there are multiple environments then + they are tiled together in one image via ``BaseVecEnv.render()``. + Otherwise (if ``self.num_envs == 1``), we pass the render call directly to the + underlying environment. + + Therefore, some arguments such as ``mode`` will have values that are valid + only when ``num_envs == 1``. + + :param mode: The rendering type. + """ + if self.num_envs == 1: + return self.envs[0].render(mode=mode) + else: + return super().render(mode=mode) + + def _save_obs(self, env_idx: int, obs: VecEnvObs) -> None: + for key in self.keys: + if key is None: + self.buf_obs[key][env_idx] = obs + else: + self.buf_obs[key][env_idx] = obs[key] + + def _obs_from_buf(self) -> VecEnvObs: + return dict_to_obs(self.observation_space, copy_obs_dict(self.buf_obs)) + + def get_attr(self, attr_name: str, indices: VecEnvIndices = None) -> List[Any]: + """Return attribute from vectorized environment (see base class).""" + target_envs = self._get_target_envs(indices) + return [getattr(env_i, attr_name) for env_i in target_envs] + + def set_attr(self, attr_name: str, value: Any, indices: VecEnvIndices = None) -> None: + """Set attribute inside vectorized environments (see base class).""" + target_envs = self._get_target_envs(indices) + for env_i in target_envs: + setattr(env_i, attr_name, value) + + def env_method(self, method_name: str, *method_args, indices: VecEnvIndices = None, **method_kwargs) -> List[Any]: + """Call instance methods of vectorized environments.""" + target_envs = self._get_target_envs(indices) + return [getattr(env_i, method_name)(*method_args, **method_kwargs) for env_i in target_envs] + + def env_is_wrapped(self, wrapper_class: Type[gym.Wrapper], indices: VecEnvIndices = None) -> List[bool]: + """Check if worker environments are wrapped with a given wrapper""" + target_envs = self._get_target_envs(indices) + # Import here to avoid a circular import + from stable_baselines3.common import env_util + + return [env_util.is_wrapped(env_i, wrapper_class) for env_i in target_envs] + + def _get_target_envs(self, indices: VecEnvIndices) -> List[gym.Env]: + indices = self._get_indices(indices) + return [self.envs[i] for i in indices] diff --git a/pz_risk/common/vec_env/subproc_vec_env.py b/pz_risk/common/vec_env/subproc_vec_env.py new file mode 100644 index 0000000..cfe238c --- /dev/null +++ b/pz_risk/common/vec_env/subproc_vec_env.py @@ -0,0 +1,220 @@ +import multiprocessing as mp +from collections import OrderedDict +from typing import Any, Callable, List, Optional, Sequence, Tuple, Type, Union + +import gym +import numpy as np + +from .base_vec_env import ( + CloudpickleWrapper, + VecEnv, + VecEnvIndices, + VecEnvObs, + VecEnvStepReturn, +) + + +def _worker( + remote: mp.connection.Connection, parent_remote: mp.connection.Connection, env_fn_wrapper: CloudpickleWrapper +) -> None: + # Import here to avoid a circular import + from stable_baselines3.common.env_util import is_wrapped + + parent_remote.close() + env = env_fn_wrapper.var() + while True: + try: + cmd, data = remote.recv() + if cmd == "step": + observation, reward, done, info = env.step(data) + if done: + # save final observation where user can get it, then reset + info["terminal_observation"] = observation + observation = env.reset() + remote.send((observation, reward, done, info)) + elif cmd == "seed": + remote.send(env.seed(data)) + elif cmd == "reset": + observation = env.reset() + remote.send(observation) + elif cmd == "render": + remote.send(env.render(data)) + elif cmd == "close": + env.close() + remote.close() + break + elif cmd == "get_spaces": + remote.send((env.observation_space, env.action_space)) + elif cmd == "env_method": + method = getattr(env, data[0]) + remote.send(method(*data[1], **data[2])) + elif cmd == "get_attr": + remote.send(getattr(env, data)) + elif cmd == "set_attr": + remote.send(setattr(env, data[0], data[1])) + elif cmd == "is_wrapped": + remote.send(is_wrapped(env, data)) + else: + raise NotImplementedError(f"`{cmd}` is not implemented in the worker") + except EOFError: + break + + +class SubprocVecEnv(VecEnv): + """ + Creates a multiprocess vectorized wrapper for multiple environments, distributing each environment to its own + process, allowing significant speed up when the environment is computationally complex. + + For performance reasons, if your environment is not IO bound, the number of environments should not exceed the + number of logical cores on your CPU. + + .. warning:: + + Only 'forkserver' and 'spawn' start methods are thread-safe, + which is important when TensorFlow sessions or other non thread-safe + libraries are used in the parent (see issue #217). However, compared to + 'fork' they incur a small start-up cost and have restrictions on + global variables. With those methods, users must wrap the code in an + ``if __name__ == "__main__":`` block. + For more information, see the multiprocessing documentation. + + :param env_fns: Environments to run in subprocesses + :param start_method: method used to start the subprocesses. + Must be one of the methods returned by multiprocessing.get_all_start_methods(). + Defaults to 'forkserver' on available platforms, and 'spawn' otherwise. + """ + + def __init__(self, env_fns: List[Callable[[], gym.Env]], start_method: Optional[str] = None): + self.waiting = False + self.closed = False + n_envs = len(env_fns) + + if start_method is None: + # Fork is not a thread safe method (see issue #217) + # but is more user friendly (does not require to wrap the code in + # a `if __name__ == "__main__":`) + forkserver_available = "forkserver" in mp.get_all_start_methods() + start_method = "forkserver" if forkserver_available else "spawn" + ctx = mp.get_context(start_method) + + self.remotes, self.work_remotes = zip(*[ctx.Pipe() for _ in range(n_envs)]) + self.processes = [] + for work_remote, remote, env_fn in zip(self.work_remotes, self.remotes, env_fns): + args = (work_remote, remote, CloudpickleWrapper(env_fn)) + # daemon=True: if the main process crashes, we should not cause things to hang + process = ctx.Process(target=_worker, args=args, daemon=True) # pytype:disable=attribute-error + process.start() + self.processes.append(process) + work_remote.close() + + self.remotes[0].send(("get_spaces", None)) + observation_space, action_space = self.remotes[0].recv() + VecEnv.__init__(self, len(env_fns), observation_space, action_space) + + def step_async(self, actions: np.ndarray) -> None: + for remote, action in zip(self.remotes, actions): + remote.send(("step", action)) + self.waiting = True + + def step_wait(self) -> VecEnvStepReturn: + results = [remote.recv() for remote in self.remotes] + self.waiting = False + obs, rews, dones, infos = zip(*results) + return _flatten_obs(obs, self.observation_space), np.stack(rews), np.stack(dones), infos + + def seed(self, seed: Optional[int] = None) -> List[Union[None, int]]: + for idx, remote in enumerate(self.remotes): + remote.send(("seed", seed + idx)) + return [remote.recv() for remote in self.remotes] + + def reset(self) -> VecEnvObs: + for remote in self.remotes: + remote.send(("reset", None)) + obs = [remote.recv() for remote in self.remotes] + return _flatten_obs(obs, self.observation_space) + + def close(self) -> None: + if self.closed: + return + if self.waiting: + for remote in self.remotes: + remote.recv() + for remote in self.remotes: + remote.send(("close", None)) + for process in self.processes: + process.join() + self.closed = True + + def get_images(self) -> Sequence[np.ndarray]: + for pipe in self.remotes: + # gather images from subprocesses + # `mode` will be taken into account later + pipe.send(("render", "rgb_array")) + imgs = [pipe.recv() for pipe in self.remotes] + return imgs + + def get_attr(self, attr_name: str, indices: VecEnvIndices = None) -> List[Any]: + """Return attribute from vectorized environment (see base class).""" + target_remotes = self._get_target_remotes(indices) + for remote in target_remotes: + remote.send(("get_attr", attr_name)) + return [remote.recv() for remote in target_remotes] + + def set_attr(self, attr_name: str, value: Any, indices: VecEnvIndices = None) -> None: + """Set attribute inside vectorized environments (see base class).""" + target_remotes = self._get_target_remotes(indices) + for remote in target_remotes: + remote.send(("set_attr", (attr_name, value))) + for remote in target_remotes: + remote.recv() + + def env_method(self, method_name: str, *method_args, indices: VecEnvIndices = None, **method_kwargs) -> List[Any]: + """Call instance methods of vectorized environments.""" + target_remotes = self._get_target_remotes(indices) + for remote in target_remotes: + remote.send(("env_method", (method_name, method_args, method_kwargs))) + return [remote.recv() for remote in target_remotes] + + def env_is_wrapped(self, wrapper_class: Type[gym.Wrapper], indices: VecEnvIndices = None) -> List[bool]: + """Check if worker environments are wrapped with a given wrapper""" + target_remotes = self._get_target_remotes(indices) + for remote in target_remotes: + remote.send(("is_wrapped", wrapper_class)) + return [remote.recv() for remote in target_remotes] + + def _get_target_remotes(self, indices: VecEnvIndices) -> List[Any]: + """ + Get the connection object needed to communicate with the wanted + envs that are in subprocesses. + + :param indices: refers to indices of envs. + :return: Connection object to communicate between processes. + """ + indices = self._get_indices(indices) + return [self.remotes[i] for i in indices] + + +def _flatten_obs(obs: Union[List[VecEnvObs], Tuple[VecEnvObs]], space: gym.spaces.Space) -> VecEnvObs: + """ + Flatten observations, depending on the observation space. + + :param obs: observations. + A list or tuple of observations, one per environment. + Each environment observation may be a NumPy array, or a dict or tuple of NumPy arrays. + :return: flattened observations. + A flattened NumPy array or an OrderedDict or tuple of flattened numpy arrays. + Each NumPy array has the environment index as its first axis. + """ + assert isinstance(obs, (list, tuple)), "expected list or tuple of observations per environment" + assert len(obs) > 0, "need observations from at least one environment" + + if isinstance(space, gym.spaces.Dict): + assert isinstance(space.spaces, OrderedDict), "Dict space must have ordered subspaces" + assert isinstance(obs[0], dict), "non-dict observation for environment with Dict observation space" + return OrderedDict([(k, np.stack([o[k] for o in obs])) for k in space.spaces.keys()]) + elif isinstance(space, gym.spaces.Tuple): + assert isinstance(obs[0], tuple), "non-tuple observation for environment with Tuple observation space" + obs_len = len(space.spaces) + return tuple((np.stack([o[i] for o in obs]) for i in range(obs_len))) + else: + return np.stack(obs) diff --git a/pz_risk/common/vec_env/util.py b/pz_risk/common/vec_env/util.py new file mode 100644 index 0000000..2c2f990 --- /dev/null +++ b/pz_risk/common/vec_env/util.py @@ -0,0 +1,76 @@ +""" +Helpers for dealing with vectorized environments. +""" +from collections import OrderedDict +from typing import Any, Dict, List, Tuple + +import gym +import numpy as np + +from ..preprocessing import check_for_nested_spaces +from .base_vec_env import VecEnvObs + + +def copy_obs_dict(obs: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: + """ + Deep-copy a dict of numpy arrays. + + :param obs: a dict of numpy arrays. + :return: a dict of copied numpy arrays. + """ + assert isinstance(obs, OrderedDict), f"unexpected type for observations '{type(obs)}'" + return OrderedDict([(k, np.copy(v)) for k, v in obs.items()]) + + +def dict_to_obs(obs_space: gym.spaces.Space, obs_dict: Dict[Any, np.ndarray]) -> VecEnvObs: + """ + Convert an internal representation raw_obs into the appropriate type + specified by space. + + :param obs_space: an observation space. + :param obs_dict: a dict of numpy arrays. + :return: returns an observation of the same type as space. + If space is Dict, function is identity; if space is Tuple, converts dict to Tuple; + otherwise, space is unstructured and returns the value raw_obs[None]. + """ + if isinstance(obs_space, gym.spaces.Dict): + return obs_dict + elif isinstance(obs_space, gym.spaces.Tuple): + assert len(obs_dict) == len(obs_space.spaces), "size of observation does not match size of observation space" + return tuple((obs_dict[i] for i in range(len(obs_space.spaces)))) + else: + assert set(obs_dict.keys()) == {None}, "multiple observation keys for unstructured observation space" + return obs_dict[None] + + +def obs_space_info(obs_space: gym.spaces.Space) -> Tuple[List[str], Dict[Any, Tuple[int, ...]], Dict[Any, np.dtype]]: + """ + Get dict-structured information about a gym.Space. + + Dict spaces are represented directly by their dict of subspaces. + Tuple spaces are converted into a dict with keys indexing into the tuple. + Unstructured spaces are represented by {None: obs_space}. + + :param obs_space: an observation space + :return: A tuple (keys, shapes, dtypes): + keys: a list of dict keys. + shapes: a dict mapping keys to shapes. + dtypes: a dict mapping keys to dtypes. + """ + check_for_nested_spaces(obs_space) + if isinstance(obs_space, gym.spaces.Dict): + assert isinstance(obs_space.spaces, OrderedDict), "Dict space must have ordered subspaces" + subspaces = obs_space.spaces + elif isinstance(obs_space, gym.spaces.Tuple): + subspaces = {i: space for i, space in enumerate(obs_space.spaces)} + else: + assert not hasattr(obs_space, "spaces"), f"Unsupported structured space '{type(obs_space)}'" + subspaces = {None: obs_space} + keys = [] + shapes = {} + dtypes = {} + for key, box in subspaces.items(): + keys.append(key) + shapes[key] = box.shape + dtypes[key] = box.dtype + return keys, shapes, dtypes diff --git a/pz_risk/common/vec_env/vec_normalize.py b/pz_risk/common/vec_env/vec_normalize.py new file mode 100644 index 0000000..2236b74 --- /dev/null +++ b/pz_risk/common/vec_env/vec_normalize.py @@ -0,0 +1,260 @@ +import pickle +import warnings +from copy import deepcopy +from typing import Any, Dict, Union + +import gym +import numpy as np + +from common import utils +from ..running_mean_std import RunningMeanStd +from .base_vec_env import VecEnv, VecEnvStepReturn, VecEnvWrapper + + +class VecNormalize(VecEnvWrapper): + """ + A moving average, normalizing wrapper for vectorized environment. + has support for saving/loading moving average, + + :param venv: the vectorized environment to wrap + :param training: Whether to update or not the moving average + :param norm_obs: Whether to normalize observation or not (default: True) + :param norm_reward: Whether to normalize rewards or not (default: True) + :param clip_obs: Max absolute value for observation + :param clip_reward: Max value absolute for discounted reward + :param gamma: discount factor + :param epsilon: To avoid division by zero + """ + + def __init__( + self, + venv: VecEnv, + training: bool = True, + norm_obs: bool = True, + norm_reward: bool = True, + clip_obs: float = 10.0, + clip_reward: float = 10.0, + gamma: float = 0.99, + epsilon: float = 1e-8, + ): + VecEnvWrapper.__init__(self, venv) + + if norm_obs: + if not isinstance(self.observation_space, (gym.spaces.Box, gym.spaces.Dict)): + raise ValueError("VecNormalize only supports `gym.spaces.Box` and `gym.spaces.Dict` observation spaces") + + if isinstance(self.observation_space, gym.spaces.Dict): + self.obs_keys = set(self.observation_space.spaces.keys()) + self.obs_spaces = self.observation_space.spaces + self.obs_rms = {key: RunningMeanStd(shape=space.shape) for key, space in self.obs_spaces.items()} + else: + self.obs_keys, self.obs_spaces = None, None + self.obs_rms = RunningMeanStd(shape=self.observation_space.shape) + + self.ret_rms = RunningMeanStd(shape=()) + self.clip_obs = clip_obs + self.clip_reward = clip_reward + # Returns: discounted rewards + self.returns = np.zeros(self.num_envs) + self.gamma = gamma + self.epsilon = epsilon + self.training = training + self.norm_obs = norm_obs + self.norm_reward = norm_reward + self.old_obs = np.array([]) + self.old_reward = np.array([]) + + def __getstate__(self) -> Dict[str, Any]: + """ + Gets state for pickling. + + Excludes self.venv, as in general VecEnv's may not be pickleable.""" + state = self.__dict__.copy() + # these attributes are not pickleable + del state["venv"] + del state["class_attributes"] + # these attributes depend on the above and so we would prefer not to pickle + del state["returns"] + return state + + def __setstate__(self, state: Dict[str, Any]) -> None: + """ + Restores pickled state. + + User must call set_venv() after unpickling before using. + + :param state:""" + self.__dict__.update(state) + assert "venv" not in state + self.venv = None + + def set_venv(self, venv: VecEnv) -> None: + """ + Sets the vector environment to wrap to venv. + + Also sets attributes derived from this such as `num_env`. + + :param venv: + """ + if self.venv is not None: + raise ValueError("Trying to set venv of already initialized VecNormalize wrapper.") + VecEnvWrapper.__init__(self, venv) + + # Check only that the observation_space match + utils.check_for_correct_spaces(venv, self.observation_space, venv.action_space) + self.returns = np.zeros(self.num_envs) + + def step_wait(self) -> VecEnvStepReturn: + """ + Apply sequence of actions to sequence of environments + actions -> (observations, rewards, dones) + + where ``dones`` is a boolean vector indicating whether each element is new. + """ + obs, rewards, dones, infos = self.venv.step_wait() + self.old_obs = obs + self.old_reward = rewards + + if self.training: + if isinstance(obs, dict) and isinstance(self.obs_rms, dict): + for key in self.obs_rms.keys(): + self.obs_rms[key].update(obs[key]) + else: + self.obs_rms.update(obs) + + obs = self.normalize_obs(obs) + + if self.training: + self._update_reward(rewards) + rewards = self.normalize_reward(rewards) + + # Normalize the terminal observations + for idx, done in enumerate(dones): + if not done: + continue + if "terminal_observation" in infos[idx]: + infos[idx]["terminal_observation"] = self.normalize_obs(infos[idx]["terminal_observation"]) + + self.returns[dones] = 0 + return obs, rewards, dones, infos + + def _update_reward(self, reward: np.ndarray) -> None: + """Update reward normalization statistics.""" + self.returns = self.returns * self.gamma + reward + self.ret_rms.update(self.returns) + + def _normalize_obs(self, obs: np.ndarray, obs_rms: RunningMeanStd) -> np.ndarray: + """ + Helper to normalize observation. + :param obs: + :param obs_rms: associated statistics + :return: normalized observation + """ + return np.clip((obs - obs_rms.mean) / np.sqrt(obs_rms.var + self.epsilon), -self.clip_obs, self.clip_obs) + + def _unnormalize_obs(self, obs: np.ndarray, obs_rms: RunningMeanStd) -> np.ndarray: + """ + Helper to unnormalize observation. + :param obs: + :param obs_rms: associated statistics + :return: unnormalized observation + """ + return (obs * np.sqrt(obs_rms.var + self.epsilon)) + obs_rms.mean + + def normalize_obs(self, obs: Union[np.ndarray, Dict[str, np.ndarray]]) -> Union[np.ndarray, Dict[str, np.ndarray]]: + """ + Normalize observations using this VecNormalize's observations statistics. + Calling this method does not update statistics. + """ + # Avoid modifying by reference the original object + obs_ = deepcopy(obs) + if self.norm_obs: + if isinstance(obs, dict) and isinstance(self.obs_rms, dict): + for key in self.obs_rms.keys(): + obs_[key] = self._normalize_obs(obs[key], self.obs_rms[key]).astype(np.float32) + else: + obs_ = self._normalize_obs(obs, self.obs_rms).astype(np.float32) + return obs_ + + def normalize_reward(self, reward: np.ndarray) -> np.ndarray: + """ + Normalize rewards using this VecNormalize's rewards statistics. + Calling this method does not update statistics. + """ + if self.norm_reward: + reward = np.clip(reward / np.sqrt(self.ret_rms.var + self.epsilon), -self.clip_reward, self.clip_reward) + return reward + + def unnormalize_obs(self, obs: Union[np.ndarray, Dict[str, np.ndarray]]) -> Union[np.ndarray, Dict[str, np.ndarray]]: + # Avoid modifying by reference the original object + obs_ = deepcopy(obs) + if self.norm_obs: + if isinstance(obs, dict) and isinstance(self.obs_rms, dict): + for key in self.obs_rms.keys(): + obs_[key] = self._unnormalize_obs(obs[key], self.obs_rms[key]) + else: + obs_ = self._unnormalize_obs(obs, self.obs_rms) + return obs_ + + def unnormalize_reward(self, reward: np.ndarray) -> np.ndarray: + if self.norm_reward: + return reward * np.sqrt(self.ret_rms.var + self.epsilon) + return reward + + def get_original_obs(self) -> Union[np.ndarray, Dict[str, np.ndarray]]: + """ + Returns an unnormalized version of the observations from the most recent + step or reset. + """ + return deepcopy(self.old_obs) + + def get_original_reward(self) -> np.ndarray: + """ + Returns an unnormalized version of the rewards from the most recent step. + """ + return self.old_reward.copy() + + def reset(self) -> Union[np.ndarray, Dict[str, np.ndarray]]: + """ + Reset all environments + :return: first observation of the episode + """ + obs = self.venv.reset() + self.old_obs = obs + self.returns = np.zeros(self.num_envs) + if self.training: + if isinstance(obs, dict) and isinstance(self.obs_rms, dict): + for key in self.obs_rms.keys(): + self.obs_rms[key].update(obs[key]) + else: + self.obs_rms.update(obs) + return self.normalize_obs(obs) + + @staticmethod + def load(load_path: str, venv: VecEnv) -> "VecNormalize": + """ + Loads a saved VecNormalize object. + + :param load_path: the path to load from. + :param venv: the VecEnv to wrap. + :return: + """ + with open(load_path, "rb") as file_handler: + vec_normalize = pickle.load(file_handler) + vec_normalize.set_venv(venv) + return vec_normalize + + def save(self, save_path: str) -> None: + """ + Save current VecNormalize object with + all running statistics and settings (e.g. clip_obs) + + :param save_path: The path to save to + """ + with open(save_path, "wb") as file_handler: + pickle.dump(self, file_handler) + + @property + def ret(self) -> np.ndarray: + warnings.warn("`VecNormalize` `ret` attribute is deprecated. Please use `returns` instead.", DeprecationWarning) + return self.returns