From 259975af3dfac3b601c071c3d6a2c976dec3b3a0 Mon Sep 17 00:00:00 2001 From: Gaiejj <524339208@qq.com> Date: Wed, 8 May 2024 23:13:40 +0800 Subject: [PATCH] chore: update pytest --- omnisafe/envs/__init__.py | 5 +- .../envs_from_cbf.py => cbf_env.py} | 0 omnisafe/envs/classic_control/__init__.py | 4 +- .../envs/classic_control/envs_from_rcbf.py | 168 +--------------- omnisafe/envs/rcbf_env.py | 187 ++++++++++++++++++ omnisafe/evaluator.py | 4 +- 6 files changed, 193 insertions(+), 175 deletions(-) rename omnisafe/envs/{classic_control/envs_from_cbf.py => cbf_env.py} (100%) create mode 100644 omnisafe/envs/rcbf_env.py diff --git a/omnisafe/envs/__init__.py b/omnisafe/envs/__init__.py index fb1bf03b1..095a1134c 100644 --- a/omnisafe/envs/__init__.py +++ b/omnisafe/envs/__init__.py @@ -15,14 +15,13 @@ """Environment API for OmniSafe.""" from omnisafe.envs import classic_control -from omnisafe.envs.classic_control.envs_from_cbf import BarrierFunctionEnv -from omnisafe.envs.classic_control.envs_from_rcbf import RobustBarrierFunctionEnv +from omnisafe.envs.cbf_env import BarrierFunctionEnv from omnisafe.envs.core import CMDP, env_register, make, support_envs from omnisafe.envs.crabs_env import CRABSEnv from omnisafe.envs.custom_env import CustomEnv from omnisafe.envs.meta_drive_env import SafetyMetaDriveEnv -from omnisafe.envs.barrier_function_env import BarrierFunctionEnv from omnisafe.envs.mujoco_env import MujocoEnv +from omnisafe.envs.rcbf_env import RobustBarrierFunctionEnv from omnisafe.envs.safety_gymnasium_env import SafetyGymnasiumEnv from omnisafe.envs.safety_gymnasium_modelbased import SafetyGymnasiumModelBased from omnisafe.envs.safety_isaac_gym_env import SafetyIsaacGymEnv diff --git a/omnisafe/envs/classic_control/envs_from_cbf.py b/omnisafe/envs/cbf_env.py similarity index 100% rename from omnisafe/envs/classic_control/envs_from_cbf.py rename to omnisafe/envs/cbf_env.py diff --git a/omnisafe/envs/classic_control/__init__.py b/omnisafe/envs/classic_control/__init__.py index 9d5a3ba99..9c8e7b35a 100644 --- a/omnisafe/envs/classic_control/__init__.py +++ b/omnisafe/envs/classic_control/__init__.py @@ -14,6 +14,4 @@ # ============================================================================== """Environment implementations from papers.""" -from omnisafe.envs.classic_control import envs_from_crabs -from omnisafe.envs.classic_control.envs_from_cbf import BarrierFunctionEnv -from omnisafe.envs.classic_control.envs_from_rcbf import RobustBarrierFunctionEnv +from omnisafe.envs.classic_control import envs_from_crabs, envs_from_rcbf diff --git a/omnisafe/envs/classic_control/envs_from_rcbf.py b/omnisafe/envs/classic_control/envs_from_rcbf.py index e97aaaea5..bdf469876 100644 --- a/omnisafe/envs/classic_control/envs_from_rcbf.py +++ b/omnisafe/envs/classic_control/envs_from_rcbf.py @@ -20,16 +20,12 @@ from __future__ import annotations from collections.abc import Iterable -from typing import Any, Callable, ClassVar +from typing import Any, Callable import gymnasium import numpy as np -import torch from gymnasium import spaces -from omnisafe.envs.core import CMDP, env_register -from omnisafe.typing import Box - def to_pixel(meas_cm: list[float] | float, shift: int = 0) -> float: if isinstance(meas_cm, Iterable): @@ -171,10 +167,7 @@ def get_obs(self) -> np.ndarray: def obs_compass(self) -> np.ndarray: """Return a robot-centric compass observation of a list of positions.""" - - # Get ego vector in world frame vec = self.goal_pos - self.state[:2] - # Rotate into frame R = np.array( [ [np.cos(self.state[2]), -np.sin(self.state[2])], @@ -182,7 +175,6 @@ def obs_compass(self) -> np.ndarray: ], ) vec = np.matmul(vec, R) - # Normalize vec /= np.sqrt(np.sum(np.square(vec))) + 0.001 return vec @@ -208,161 +200,3 @@ def close(self) -> None: if self.viewer: self.viewer.close() self.viewer = None - - -@env_register -class RobustBarrierFunctionEnv(CMDP): - """Interface of control barrier function-based environments. - - .. warning:: - Since environments based on control barrier functions require special judgment and control - of environmental dynamics, they do not support the use of vectorized environments for - parallelization. - - Attributes: - need_auto_reset_wrapper (bool): Whether to use auto reset wrapper. - need_time_limit_wrapper (bool): Whether to use time limit wrapper. - """ - - need_auto_reset_wrapper = True - need_time_limit_wrapper = False - _support_envs: ClassVar[list[str]] = [ - 'Unicycle', - ] - - def __init__( - self, - env_id: str, - num_envs: int = 1, - device: str = 'cpu', - **kwargs: Any, - ) -> None: - """Initialize the environment. - - Args: - env_id (str): Environment id. - num_envs (int, optional): Number of environments. Defaults to 1. - device (torch.device, optional): Device to store the data. Defaults to 'cpu'. - - Keyword Args: - render_mode (str, optional): The render mode, ranging from ``human``, ``rgb_array``, ``rgb_array_list``. - Defaults to ``rgb_array``. - camera_name (str, optional): The camera name. - camera_id (int, optional): The camera id. - width (int, optional): The width of the rendered image. Defaults to 256. - height (int, optional): The height of the rendered image. Defaults to 256. - """ - super().__init__(env_id) - self._env_id = env_id - if num_envs == 1: - if self._env_id == 'Unicycle': - self._env = UnicycleEnv() - else: - raise NotImplementedError('Only support Unicycle now.') - assert isinstance(self._env.action_space, Box), 'Only support Box action space.' - assert isinstance( - self._env.observation_space, - Box, - ), 'Only support Box observation space.' - self._action_space = self._env.action_space - self._observation_space = self._env.observation_space - else: - raise NotImplementedError('Only support num_envs=1 now.') - self._device = torch.device(device) - - self._num_envs = num_envs - self._metadata = self._env.metadata - - def step( - self, - action: torch.Tensor, - ) -> tuple[ - torch.Tensor, - torch.Tensor, - torch.Tensor, - torch.Tensor, - torch.Tensor, - dict[str, Any], - ]: - """Step the environment. - - .. note:: - - OmniSafe use auto reset wrapper to reset the environment when the episode is - terminated. So the ``obs`` will be the first observation of the next episode. - And the true ``final_observation`` in ``info`` will be stored in the ``final_observation`` key of ``info``. - - Args: - action (torch.Tensor): Action to take. - - Returns: - observation: Agent's observation of the current environment. - reward: Amount of reward returned after previous action. - cost: Amount of cost returned after previous action. - terminated: Whether the episode has ended. - truncated: Whether the episode has been truncated due to a time limit. - info: Auxiliary diagnostic information (helpful for debugging, and sometimes learning). - """ - obs, reward, cost, terminated, truncated, info = self._env.step( - action.detach().cpu().numpy(), - ) - obs, reward, cost, terminated, truncated = ( - torch.as_tensor(x, dtype=torch.float32, device=self._device) - for x in (obs, reward, cost, terminated, truncated) - ) - if 'final_observation' in info: - info['final_observation'] = np.array( - [ - array if array is not None else np.zeros(obs.shape[-1]) - for array in info['final_observation'] - ], - ) - info['final_observation'] = torch.as_tensor( - info['final_observation'], - dtype=torch.float32, - device=self._device, - ) - - return obs, reward, cost, terminated, truncated, info - - def reset( - self, - seed: int | None = None, - options: dict[str, Any] | None = None, - ) -> tuple[torch.Tensor, dict]: - """Reset the environment. - - Args: - seed (int, optional): The random seed. Defaults to None. - options (dict[str, Any], optional): The options for the environment. Defaults to None. - - Returns: - observation: Agent's observation of the current environment. - info: Auxiliary diagnostic information (helpful for debugging, and sometimes learning). - """ - obs, info = self._env.reset(seed=seed, options=options) - return torch.as_tensor(obs, dtype=torch.float32, device=self._device), info - - def set_seed(self, seed: int) -> None: - """Set the seed for the environment. - - Args: - seed (int): Seed to set. - """ - self.reset(seed=seed) - - def render(self) -> Any: - """Render the environment. - - Returns: - Rendered environment. - """ - return self._env.render() - - def close(self) -> None: - """Close the environment.""" - self._env.close() - - def __getattr__(self, name: str) -> Any: - """Return the unwrapped environment attributes.""" - return getattr(self._env, name) diff --git a/omnisafe/envs/rcbf_env.py b/omnisafe/envs/rcbf_env.py new file mode 100644 index 000000000..f97586dc3 --- /dev/null +++ b/omnisafe/envs/rcbf_env.py @@ -0,0 +1,187 @@ +# Copyright 2023 OmniSafe Team. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Interface of control barrier function-based environments.""" + +# mypy: ignore-errors +# pylint: disable=all + +from __future__ import annotations + +from typing import Any, ClassVar + +import numpy as np +import torch + +from omnisafe.envs.classic_control.envs_from_rcbf import UnicycleEnv +from omnisafe.envs.core import CMDP, env_register +from omnisafe.typing import Box + + +@env_register +class RobustBarrierFunctionEnv(CMDP): + """Interface of control barrier function-based environments. + + .. warning:: + Since environments based on control barrier functions require special judgment and control + of environmental dynamics, they do not support the use of vectorized environments for + parallelization. + + Attributes: + need_auto_reset_wrapper (bool): Whether to use auto reset wrapper. + need_time_limit_wrapper (bool): Whether to use time limit wrapper. + """ + + need_auto_reset_wrapper = True + need_time_limit_wrapper = False + _support_envs: ClassVar[list[str]] = [ + 'Unicycle', + ] + + def __init__( + self, + env_id: str, + num_envs: int = 1, + device: str = 'cpu', + **kwargs: Any, + ) -> None: + """Initialize the environment. + + Args: + env_id (str): Environment id. + num_envs (int, optional): Number of environments. Defaults to 1. + device (torch.device, optional): Device to store the data. Defaults to 'cpu'. + + Keyword Args: + render_mode (str, optional): The render mode, ranging from ``human``, ``rgb_array``, ``rgb_array_list``. + Defaults to ``rgb_array``. + camera_name (str, optional): The camera name. + camera_id (int, optional): The camera id. + width (int, optional): The width of the rendered image. Defaults to 256. + height (int, optional): The height of the rendered image. Defaults to 256. + """ + super().__init__(env_id) + self._env_id = env_id + if num_envs == 1: + if self._env_id == 'Unicycle': + self._env = UnicycleEnv() + else: + raise NotImplementedError('Only support Unicycle now.') + assert isinstance(self._env.action_space, Box), 'Only support Box action space.' + assert isinstance( + self._env.observation_space, + Box, + ), 'Only support Box observation space.' + self._action_space = self._env.action_space + self._observation_space = self._env.observation_space + else: + raise NotImplementedError('Only support num_envs=1 now.') + self._device = torch.device(device) + + self._num_envs = num_envs + self._metadata = self._env.metadata + + def step( + self, + action: torch.Tensor, + ) -> tuple[ + torch.Tensor, + torch.Tensor, + torch.Tensor, + torch.Tensor, + torch.Tensor, + dict[str, Any], + ]: + """Step the environment. + + .. note:: + + OmniSafe use auto reset wrapper to reset the environment when the episode is + terminated. So the ``obs`` will be the first observation of the next episode. + And the true ``final_observation`` in ``info`` will be stored in the ``final_observation`` key of ``info``. + + Args: + action (torch.Tensor): Action to take. + + Returns: + observation: Agent's observation of the current environment. + reward: Amount of reward returned after previous action. + cost: Amount of cost returned after previous action. + terminated: Whether the episode has ended. + truncated: Whether the episode has been truncated due to a time limit. + info: Auxiliary diagnostic information (helpful for debugging, and sometimes learning). + """ + obs, reward, cost, terminated, truncated, info = self._env.step( + action.detach().cpu().numpy(), + ) + obs, reward, cost, terminated, truncated = ( + torch.as_tensor(x, dtype=torch.float32, device=self._device) + for x in (obs, reward, cost, terminated, truncated) + ) + if 'final_observation' in info: + info['final_observation'] = np.array( + [ + array if array is not None else np.zeros(obs.shape[-1]) + for array in info['final_observation'] + ], + ) + info['final_observation'] = torch.as_tensor( + info['final_observation'], + dtype=torch.float32, + device=self._device, + ) + + return obs, reward, cost, terminated, truncated, info + + def reset( + self, + seed: int | None = None, + options: dict[str, Any] | None = None, + ) -> tuple[torch.Tensor, dict]: + """Reset the environment. + + Args: + seed (int, optional): The random seed. Defaults to None. + options (dict[str, Any], optional): The options for the environment. Defaults to None. + + Returns: + observation: Agent's observation of the current environment. + info: Auxiliary diagnostic information (helpful for debugging, and sometimes learning). + """ + obs, info = self._env.reset(seed=seed, options=options) + return torch.as_tensor(obs, dtype=torch.float32, device=self._device), info + + def set_seed(self, seed: int) -> None: + """Set the seed for the environment. + + Args: + seed (int): Seed to set. + """ + self.reset(seed=seed) + + def render(self) -> Any: + """Render the environment. + + Returns: + Rendered environment. + """ + return self._env.render() + + def close(self) -> None: + """Close the environment.""" + self._env.close() + + def __getattr__(self, name: str) -> Any: + """Return the unwrapped environment attributes.""" + return getattr(self._env, name) diff --git a/omnisafe/evaluator.py b/omnisafe/evaluator.py index a1240a334..90535d931 100644 --- a/omnisafe/evaluator.py +++ b/omnisafe/evaluator.py @@ -39,6 +39,8 @@ SafeARCPlanner, ) from omnisafe.common import Normalizer +from omnisafe.common.barrier_comp import BarrierCompensator +from omnisafe.common.barrier_solver import PendulumSolver from omnisafe.common.control_barrier_function.crabs.models import ( AddGaussianNoise, CrabsCore, @@ -49,8 +51,6 @@ from omnisafe.common.control_barrier_function.crabs.optimizers import Barrier from omnisafe.common.control_barrier_function.crabs.utils import Normalizer as CRABSNormalizer from omnisafe.common.control_barrier_function.crabs.utils import create_model_and_trainer -from omnisafe.common.barrier_comp import BarrierCompensator -from omnisafe.common.barrier_solver import PendulumSolver from omnisafe.common.robust_barrier_solver import CBFQPLayer from omnisafe.common.robust_gp_model import DynamicsModel from omnisafe.envs.core import CMDP, make