-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
18 changed files
with
3,205 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
from stable_baselines3.common.envs.bit_flipping_env import BitFlippingEnv | ||
from stable_baselines3.common.envs.identity_env import ( | ||
FakeImageEnv, | ||
IdentityEnv, | ||
IdentityEnvBox, | ||
IdentityEnvMultiBinary, | ||
IdentityEnvMultiDiscrete, | ||
) | ||
from stable_baselines3.common.envs.multi_input_envs import SimpleMultiObsEnv |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,204 @@ | ||
from collections import OrderedDict | ||
from typing import Any, Dict, Optional, Union | ||
|
||
import numpy as np | ||
from gym import GoalEnv, spaces | ||
from gym.envs.registration import EnvSpec | ||
|
||
from stable_baselines3.common.type_aliases import GymStepReturn | ||
|
||
|
||
class BitFlippingEnv(GoalEnv): | ||
""" | ||
Simple bit flipping env, useful to test HER. | ||
The goal is to flip all the bits to get a vector of ones. | ||
In the continuous variant, if the ith action component has a value > 0, | ||
then the ith bit will be flipped. | ||
:param n_bits: Number of bits to flip | ||
:param continuous: Whether to use the continuous actions version or not, | ||
by default, it uses the discrete one | ||
:param max_steps: Max number of steps, by default, equal to n_bits | ||
:param discrete_obs_space: Whether to use the discrete observation | ||
version or not, by default, it uses the ``MultiBinary`` one | ||
:param image_obs_space: Use image as input instead of the ``MultiBinary`` one. | ||
:param channel_first: Whether to use channel-first or last image. | ||
""" | ||
|
||
spec = EnvSpec("BitFlippingEnv-v0") | ||
|
||
def __init__( | ||
self, | ||
n_bits: int = 10, | ||
continuous: bool = False, | ||
max_steps: Optional[int] = None, | ||
discrete_obs_space: bool = False, | ||
image_obs_space: bool = False, | ||
channel_first: bool = True, | ||
): | ||
super(BitFlippingEnv, self).__init__() | ||
# Shape of the observation when using image space | ||
self.image_shape = (1, 36, 36) if channel_first else (36, 36, 1) | ||
# The achieved goal is determined by the current state | ||
# here, it is a special where they are equal | ||
if discrete_obs_space: | ||
# In the discrete case, the agent act on the binary | ||
# representation of the observation | ||
self.observation_space = spaces.Dict( | ||
{ | ||
"observation": spaces.Discrete(2 ** n_bits), | ||
"achieved_goal": spaces.Discrete(2 ** n_bits), | ||
"desired_goal": spaces.Discrete(2 ** n_bits), | ||
} | ||
) | ||
elif image_obs_space: | ||
# When using image as input, | ||
# one image contains the bits 0 -> 0, 1 -> 255 | ||
# and the rest is filled with zeros | ||
self.observation_space = spaces.Dict( | ||
{ | ||
"observation": spaces.Box( | ||
low=0, | ||
high=255, | ||
shape=self.image_shape, | ||
dtype=np.uint8, | ||
), | ||
"achieved_goal": spaces.Box( | ||
low=0, | ||
high=255, | ||
shape=self.image_shape, | ||
dtype=np.uint8, | ||
), | ||
"desired_goal": spaces.Box( | ||
low=0, | ||
high=255, | ||
shape=self.image_shape, | ||
dtype=np.uint8, | ||
), | ||
} | ||
) | ||
else: | ||
self.observation_space = spaces.Dict( | ||
{ | ||
"observation": spaces.MultiBinary(n_bits), | ||
"achieved_goal": spaces.MultiBinary(n_bits), | ||
"desired_goal": spaces.MultiBinary(n_bits), | ||
} | ||
) | ||
|
||
self.obs_space = spaces.MultiBinary(n_bits) | ||
|
||
if continuous: | ||
self.action_space = spaces.Box(-1, 1, shape=(n_bits,), dtype=np.float32) | ||
else: | ||
self.action_space = spaces.Discrete(n_bits) | ||
self.continuous = continuous | ||
self.discrete_obs_space = discrete_obs_space | ||
self.image_obs_space = image_obs_space | ||
self.state = None | ||
self.desired_goal = np.ones((n_bits,)) | ||
if max_steps is None: | ||
max_steps = n_bits | ||
self.max_steps = max_steps | ||
self.current_step = 0 | ||
|
||
def seed(self, seed: int) -> None: | ||
self.obs_space.seed(seed) | ||
|
||
def convert_if_needed(self, state: np.ndarray) -> Union[int, np.ndarray]: | ||
""" | ||
Convert to discrete space if needed. | ||
:param state: | ||
:return: | ||
""" | ||
if self.discrete_obs_space: | ||
# The internal state is the binary representation of the | ||
# observed one | ||
return int(sum([state[i] * 2 ** i for i in range(len(state))])) | ||
|
||
if self.image_obs_space: | ||
size = np.prod(self.image_shape) | ||
image = np.concatenate((state * 255, np.zeros(size - len(state), dtype=np.uint8))) | ||
return image.reshape(self.image_shape).astype(np.uint8) | ||
return state | ||
|
||
def convert_to_bit_vector(self, state: Union[int, np.ndarray], batch_size: int) -> np.ndarray: | ||
""" | ||
Convert to bit vector if needed. | ||
:param state: | ||
:param batch_size: | ||
:return: | ||
""" | ||
# Convert back to bit vector | ||
if isinstance(state, int): | ||
state = np.array(state).reshape(batch_size, -1) | ||
# Convert to binary representation | ||
state = (((state[:, :] & (1 << np.arange(len(self.state))))) > 0).astype(int) | ||
elif self.image_obs_space: | ||
state = state.reshape(batch_size, -1)[:, : len(self.state)] / 255 | ||
else: | ||
state = np.array(state).reshape(batch_size, -1) | ||
|
||
return state | ||
|
||
def _get_obs(self) -> Dict[str, Union[int, np.ndarray]]: | ||
""" | ||
Helper to create the observation. | ||
:return: The current observation. | ||
""" | ||
return OrderedDict( | ||
[ | ||
("observation", self.convert_if_needed(self.state.copy())), | ||
("achieved_goal", self.convert_if_needed(self.state.copy())), | ||
("desired_goal", self.convert_if_needed(self.desired_goal.copy())), | ||
] | ||
) | ||
|
||
def reset(self) -> Dict[str, Union[int, np.ndarray]]: | ||
self.current_step = 0 | ||
self.state = self.obs_space.sample() | ||
return self._get_obs() | ||
|
||
def step(self, action: Union[np.ndarray, int]) -> GymStepReturn: | ||
if self.continuous: | ||
self.state[action > 0] = 1 - self.state[action > 0] | ||
else: | ||
self.state[action] = 1 - self.state[action] | ||
obs = self._get_obs() | ||
reward = float(self.compute_reward(obs["achieved_goal"], obs["desired_goal"], None)) | ||
done = reward == 0 | ||
self.current_step += 1 | ||
# Episode terminate when we reached the goal or the max number of steps | ||
info = {"is_success": done} | ||
done = done or self.current_step >= self.max_steps | ||
return obs, reward, done, info | ||
|
||
def compute_reward( | ||
self, achieved_goal: Union[int, np.ndarray], desired_goal: Union[int, np.ndarray], _info: Optional[Dict[str, Any]] | ||
) -> np.float32: | ||
# As we are using a vectorized version, we need to keep track of the `batch_size` | ||
if isinstance(achieved_goal, int): | ||
batch_size = 1 | ||
elif self.image_obs_space: | ||
batch_size = achieved_goal.shape[0] if len(achieved_goal.shape) > 3 else 1 | ||
else: | ||
batch_size = achieved_goal.shape[0] if len(achieved_goal.shape) > 1 else 1 | ||
|
||
desired_goal = self.convert_to_bit_vector(desired_goal, batch_size) | ||
achieved_goal = self.convert_to_bit_vector(achieved_goal, batch_size) | ||
|
||
# Deceptive reward: it is positive only when the goal is achieved | ||
# Here we are using a vectorized version | ||
distance = np.linalg.norm(achieved_goal - desired_goal, axis=-1) | ||
return -(distance > 0).astype(np.float32) | ||
|
||
def render(self, mode: str = "human") -> Optional[np.ndarray]: | ||
if mode == "rgb_array": | ||
return self.state.copy() | ||
print(self.state) | ||
|
||
def close(self) -> None: | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
from typing import Optional, Union | ||
|
||
import numpy as np | ||
from gym import Env, Space | ||
from gym.spaces import Box, Discrete, MultiBinary, MultiDiscrete | ||
|
||
from stable_baselines3.common.type_aliases import GymObs, GymStepReturn | ||
|
||
|
||
class IdentityEnv(Env): | ||
def __init__(self, dim: Optional[int] = None, space: Optional[Space] = None, ep_length: int = 100): | ||
""" | ||
Identity environment for testing purposes | ||
:param dim: the size of the action and observation dimension you want | ||
to learn. Provide at most one of ``dim`` and ``space``. If both are | ||
None, then initialization proceeds with ``dim=1`` and ``space=None``. | ||
:param space: the action and observation space. Provide at most one of | ||
``dim`` and ``space``. | ||
:param ep_length: the length of each episode in timesteps | ||
""" | ||
if space is None: | ||
if dim is None: | ||
dim = 1 | ||
space = Discrete(dim) | ||
else: | ||
assert dim is None, "arguments for both 'dim' and 'space' provided: at most one allowed" | ||
|
||
self.action_space = self.observation_space = space | ||
self.ep_length = ep_length | ||
self.current_step = 0 | ||
self.num_resets = -1 # Becomes 0 after __init__ exits. | ||
self.reset() | ||
|
||
def reset(self) -> GymObs: | ||
self.current_step = 0 | ||
self.num_resets += 1 | ||
self._choose_next_state() | ||
return self.state | ||
|
||
def step(self, action: Union[int, np.ndarray]) -> GymStepReturn: | ||
reward = self._get_reward(action) | ||
self._choose_next_state() | ||
self.current_step += 1 | ||
done = self.current_step >= self.ep_length | ||
return self.state, reward, done, {} | ||
|
||
def _choose_next_state(self) -> None: | ||
self.state = self.action_space.sample() | ||
|
||
def _get_reward(self, action: Union[int, np.ndarray]) -> float: | ||
return 1.0 if np.all(self.state == action) else 0.0 | ||
|
||
def render(self, mode: str = "human") -> None: | ||
pass | ||
|
||
|
||
class IdentityEnvBox(IdentityEnv): | ||
def __init__(self, low: float = -1.0, high: float = 1.0, eps: float = 0.05, ep_length: int = 100): | ||
""" | ||
Identity environment for testing purposes | ||
:param low: the lower bound of the box dim | ||
:param high: the upper bound of the box dim | ||
:param eps: the epsilon bound for correct value | ||
:param ep_length: the length of each episode in timesteps | ||
""" | ||
space = Box(low=low, high=high, shape=(1,), dtype=np.float32) | ||
super().__init__(ep_length=ep_length, space=space) | ||
self.eps = eps | ||
|
||
def step(self, action: np.ndarray) -> GymStepReturn: | ||
reward = self._get_reward(action) | ||
self._choose_next_state() | ||
self.current_step += 1 | ||
done = self.current_step >= self.ep_length | ||
return self.state, reward, done, {} | ||
|
||
def _get_reward(self, action: np.ndarray) -> float: | ||
return 1.0 if (self.state - self.eps) <= action <= (self.state + self.eps) else 0.0 | ||
|
||
|
||
class IdentityEnvMultiDiscrete(IdentityEnv): | ||
def __init__(self, dim: int = 1, ep_length: int = 100): | ||
""" | ||
Identity environment for testing purposes | ||
:param dim: the size of the dimensions you want to learn | ||
:param ep_length: the length of each episode in timesteps | ||
""" | ||
space = MultiDiscrete([dim, dim]) | ||
super().__init__(ep_length=ep_length, space=space) | ||
|
||
|
||
class IdentityEnvMultiBinary(IdentityEnv): | ||
def __init__(self, dim: int = 1, ep_length: int = 100): | ||
""" | ||
Identity environment for testing purposes | ||
:param dim: the size of the dimensions you want to learn | ||
:param ep_length: the length of each episode in timesteps | ||
""" | ||
space = MultiBinary(dim) | ||
super().__init__(ep_length=ep_length, space=space) | ||
|
||
|
||
class FakeImageEnv(Env): | ||
""" | ||
Fake image environment for testing purposes, it mimics Atari games. | ||
:param action_dim: Number of discrete actions | ||
:param screen_height: Height of the image | ||
:param screen_width: Width of the image | ||
:param n_channels: Number of color channels | ||
:param discrete: Create discrete action space instead of continuous | ||
:param channel_first: Put channels on first axis instead of last | ||
""" | ||
|
||
def __init__( | ||
self, | ||
action_dim: int = 6, | ||
screen_height: int = 84, | ||
screen_width: int = 84, | ||
n_channels: int = 1, | ||
discrete: bool = True, | ||
channel_first: bool = False, | ||
): | ||
self.observation_shape = (screen_height, screen_width, n_channels) | ||
if channel_first: | ||
self.observation_shape = (n_channels, screen_height, screen_width) | ||
self.observation_space = Box(low=0, high=255, shape=self.observation_shape, dtype=np.uint8) | ||
if discrete: | ||
self.action_space = Discrete(action_dim) | ||
else: | ||
self.action_space = Box(low=-1, high=1, shape=(5,), dtype=np.float32) | ||
self.ep_length = 10 | ||
self.current_step = 0 | ||
|
||
def reset(self) -> np.ndarray: | ||
self.current_step = 0 | ||
return self.observation_space.sample() | ||
|
||
def step(self, action: Union[np.ndarray, int]) -> GymStepReturn: | ||
reward = 0.0 | ||
self.current_step += 1 | ||
done = self.current_step >= self.ep_length | ||
return self.observation_space.sample(), reward, done, {} | ||
|
||
def render(self, mode: str = "human") -> None: | ||
pass |
Oops, something went wrong.