diff --git a/gymnasium/wrappers/__init__.py b/gymnasium/wrappers/__init__.py index 93062d1d1..9876a3f73 100644 --- a/gymnasium/wrappers/__init__.py +++ b/gymnasium/wrappers/__init__.py @@ -58,7 +58,13 @@ RecordEpisodeStatistics, TimeLimit, ) -from gymnasium.wrappers.rendering import HumanRendering, RecordVideo, RenderCollection +from gymnasium.wrappers.rendering import ( + AddWhiteNoise, + HumanRendering, + ObstructView, + RecordVideo, + RenderCollection, +) from gymnasium.wrappers.stateful_action import StickyAction from gymnasium.wrappers.stateful_observation import ( DelayObservation, @@ -122,6 +128,8 @@ "OrderEnforcing", "RecordEpisodeStatistics", # --- Rendering --- + "AddWhiteNoise", + "ObstructView", "RenderCollection", "RecordVideo", "HumanRendering", diff --git a/gymnasium/wrappers/rendering.py b/gymnasium/wrappers/rendering.py index 7291df06d..231c676fc 100644 --- a/gymnasium/wrappers/rendering.py +++ b/gymnasium/wrappers/rendering.py @@ -3,6 +3,8 @@ * ``RenderCollection`` - Collects rendered frames into a list * ``RecordVideo`` - Records a video of the environments * ``HumanRendering`` - Provides human rendering of environments with ``"rgb_array"`` +* ``AddWhiteNoise`` - Randomly replaces pixels with white noise +* ``ObstructView`` - Randomly places patches of white noise to obstruct the pixel rendering """ from __future__ import annotations @@ -16,13 +18,15 @@ import gymnasium as gym from gymnasium import error, logger from gymnasium.core import ActType, ObsType, RenderFrame -from gymnasium.error import DependencyNotInstalled +from gymnasium.error import DependencyNotInstalled, InvalidProbability __all__ = [ "RenderCollection", "RecordVideo", "HumanRendering", + "AddWhiteNoise", + "ObstructView", ] @@ -562,3 +566,175 @@ def close(self): pygame.display.quit() pygame.quit() super().close() + + +class AddWhiteNoise( + gym.Wrapper[ObsType, ActType, ObsType, ActType], gym.utils.RecordConstructorArgs +): + """Randomly replaces pixels with white noise. + + If used with ``render_mode="rgb_array"`` and ``AddRenderObservation``, it will + make observations noisy. + The environment may also become partially-observable, turning the MDP into a POMDP. + + Example - Every pixel will be replaced by white noise with probability 0.5: + >>> env = gym.make("LunarLander-v3", render_mode="rgb_array") + >>> env = AddWhiteNoise(env, probability_of_noise_per_pixel=0.5) + >>> env = HumanRendering(env) + >>> obs, _ = env.reset(seed=123) + >>> obs, *_ = env.step(env.action_space.sample()) + """ + + def __init__( + self, + env: gym.Env[ObsType, ActType], + probability_of_noise_per_pixel: float, + is_noise_grayscale: bool = False, + ): + """Wrapper replaces random pixels with white noise. + + Args: + env: The environment that is being wrapped + probability_of_noise_per_pixel: the probability that a pixel is white noise + is_noise_grayscale: if True, RGB noise is converted to grayscale + """ + if not 0 <= probability_of_noise_per_pixel < 1: + raise InvalidProbability( + f"probability_of_noise_per_pixel should be in the interval [0,1). Received {probability_of_noise_per_pixel}" + ) + + gym.utils.RecordConstructorArgs.__init__( + self, + probability_of_noise_per_pixel=probability_of_noise_per_pixel, + is_noise_grayscale=is_noise_grayscale, + ) + gym.Wrapper.__init__(self, env) + + self.probability_of_noise_per_pixel = probability_of_noise_per_pixel + self.is_noise_grayscale = is_noise_grayscale + + def render(self) -> RenderFrame: + """Compute the render frames as specified by render_mode attribute during initialization of the environment, then add white noise.""" + render_out = super().render() + + if self.is_noise_grayscale: + noise = ( + self.np_random.integers( + (0, 0, 0), + 255 * np.array([0.2989, 0.5870, 0.1140]), + size=render_out.shape, + dtype=np.uint8, + ) + .sum(-1, keepdims=True) + .repeat(3, -1) + ) + else: + noise = self.np_random.integers( + 0, + 255, + size=render_out.shape, + dtype=np.uint8, + ) + + mask = ( + self.np_random.random(render_out.shape[0:2]) + < self.probability_of_noise_per_pixel + ) + + return np.where(mask[..., None], noise, render_out) + + +class ObstructView( + gym.Wrapper[ObsType, ActType, ObsType, ActType], gym.utils.RecordConstructorArgs +): + """Randomly obstructs rendering with white noise patches. + + If used with ``render_mode="rgb_array"`` and ``AddRenderObservation``, it will + make observations noisy. + The number of patches depends on how many pixels we want to obstruct. + Depending on the size of the patches, the environment may become + partially-observable, turning the MDP into a POMDP. + + Example - Obstruct 50% of the pixels with patches of size 50x50 pixels: + >>> env = gym.make("LunarLander-v3", render_mode="rgb_array") + >>> env = ObstructView(env, obstructed_pixels_ratio=0.5, obstruction_width=50) + >>> env = HumanRendering(env) + >>> obs, _ = env.reset(seed=123) + >>> obs, *_ = env.step(env.action_space.sample()) + """ + + def __init__( + self, + env: gym.Env[ObsType, ActType], + obstructed_pixels_ratio: float, + obstruction_width: int, + is_noise_grayscale: bool = False, + ): + """Wrapper obstructs pixels with white noise patches. + + Args: + env: The environment that is being wrapped + obstructed_pixels_ratio: the percentage of pixels obstructed with white noise + obstruction_width: the width of the obstruction patches + is_noise_grayscale: if True, RGB noise is converted to grayscale + """ + if not 0 <= obstructed_pixels_ratio < 1: + raise ValueError( + f"obstructed_pixels_ratio should be in the interval [0,1). Received {obstructed_pixels_ratio}" + ) + + if obstruction_width < 1: + raise ValueError( + f"obstruction_width should be larger or equal than 1. Received {obstruction_width}" + ) + + gym.utils.RecordConstructorArgs.__init__( + self, + obstructed_pixels_ratio=obstructed_pixels_ratio, + obstruction_width=obstruction_width, + is_noise_grayscale=is_noise_grayscale, + ) + gym.Wrapper.__init__(self, env) + + self.obstruction_centers_ratio = obstructed_pixels_ratio / obstruction_width**2 + self.obstruction_width = obstruction_width + self.is_noise_grayscale = is_noise_grayscale + + def render(self) -> RenderFrame: + """Compute the render frames as specified by render_mode attribute during initialization of the environment, then add white noise patches.""" + render_out = super().render() + + render_shape = render_out.shape + n_pixels = render_shape[0] * render_shape[1] + n_obstructions = int(n_pixels * self.obstruction_centers_ratio) + centers = self.np_random.integers(0, n_pixels, n_obstructions) + centers = np.unravel_index(centers, (render_shape[0], render_shape[1])) + mask = np.zeros((render_shape[0], render_shape[1]), dtype=bool) + low = self.obstruction_width // 2 + high = self.obstruction_width - low + for x, y in zip(*centers): + mask[ + max(x - low, 0) : min(x + high, render_shape[0]), + max(y - low, 0) : min(y + high, render_shape[1]), + ] = True + + if self.is_noise_grayscale: + noise = ( + self.np_random.integers( + (0, 0, 0), + 255 * np.array([0.2989, 0.5870, 0.1140]), + size=render_out.shape, + dtype=np.uint8, + ) + .sum(-1, keepdims=True) + .repeat(3, -1) + ) + else: + noise = self.np_random.integers( + 0, + 255, + size=render_out.shape, + dtype=np.uint8, + ) + + return np.where(mask[..., None], noise, render_out) diff --git a/tests/wrappers/test_white_noise_rendering.py b/tests/wrappers/test_white_noise_rendering.py new file mode 100644 index 000000000..56a8fa57c --- /dev/null +++ b/tests/wrappers/test_white_noise_rendering.py @@ -0,0 +1,35 @@ +"""Test suite of AddWhiteNoise and ObstructView wrapper.""" + +import gymnasium as gym +from gymnasium.wrappers import AddWhiteNoise, HumanRendering, ObstructView + + +def test_white_noise_rendering(): + for mode in ["rgb_array"]: + env = gym.make("CartPole-v1", render_mode=mode, disable_env_checker=True) + env = AddWhiteNoise(env, probability_of_noise_per_pixel=0.5) + env = HumanRendering(env) + + assert env.render_mode == "human" + env.reset() + + for _ in range(75): + _, _, terminated, truncated, _ = env.step(env.action_space.sample()) + if terminated or truncated: + env.reset() + + env.close() + + env = gym.make("CartPole-v1", render_mode=mode, disable_env_checker=True) + env = ObstructView(env, obstructed_pixels_ratio=0.5, obstruction_width=100) + env = HumanRendering(env) + + assert env.render_mode == "human" + env.reset() + + for _ in range(75): + _, _, terminated, truncated, _ = env.step(env.action_space.sample()) + if terminated or truncated: + env.reset() + + env.close()