diff --git a/pyproject.toml b/pyproject.toml index 221f1a7..9365f1e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,6 +6,7 @@ requires-python =">=3.9" license = {file = "LICENSE"} authors = [ {name = "Alexander Nikulin", email = "a.p.nikulin@tinkoff.ai"}, + {name = "Viacheslav Sinii", email = "v.siniy@tinkoff.ai"} ] dynamic = ["version"] @@ -13,7 +14,10 @@ keywords = [ "reinforcement learning", "meta-reinforcement learning", "gridworld", - "dark room" + "dark room", + "bandit", + "bernoulli bandit", + "contextual bandit" ] classifiers = [ "Development Status :: 4 - Beta", @@ -29,7 +33,8 @@ classifiers = [ ] dependencies = [ - "gymnasium>=0.29.0" + "gymnasium>=0.29.0", + "numba>=0.59.0" ] [project.optional-dependencies] diff --git a/src/toymeta/__init__.py b/src/toymeta/__init__.py index 8fde849..ac5c83e 100644 --- a/src/toymeta/__init__.py +++ b/src/toymeta/__init__.py @@ -50,3 +50,19 @@ max_episode_steps=20, kwargs={"size": 3, "random_start": True}, ) + +register( + id="BernoulliBandit", + entry_point="toymeta.bernoulli_bandit:MultiArmedBanditBernoulli", +) + +register( + id="HAD-Dark-Room", + entry_point="toymeta.had_dark_room:HAD_DarkRoom", + max_episode_steps=20, + kwargs={"terminate_on_goal": True}, +) + +register( + id="ContextualBandit", entry_point="toymeta.contextual_bandit:ContextualBandit" +) diff --git a/src/toymeta/bernoulli_bandit.py b/src/toymeta/bernoulli_bandit.py new file mode 100644 index 0000000..8643268 --- /dev/null +++ b/src/toymeta/bernoulli_bandit.py @@ -0,0 +1,37 @@ +import gymnasium as gym +import numpy as np + + +class MultiArmedBanditBernoulli(gym.Env): + def __init__(self, arms_mean: np.ndarray, num_arms: int): + self.arms_mean = arms_mean + self.num_arms = num_arms + + self.action_space = gym.spaces.Discrete(len(arms_mean)) + # the only obs is 0 + self.observation_space = gym.spaces.Discrete(1) + + self.regret = 0 + + def reset(self, seed=None, options=None): + super().reset(seed=seed, options=options) + # we need to reset regret manually + self.regret = 0 + + return 0, {} + + def step(self, action: int): + assert action < self.num_arms, (action, self.num_arms) + + # calc reward + mean = self.arms_mean[action] + reward = self.np_random.binomial(n=1, p=mean) + + # info for calculation of the regret + opt_mean = self.arms_mean[: self.num_arms].max() + opt_act = self.arms_mean[: self.num_arms].argmax() + + self.regret += opt_mean - mean + info = {"regret": self.regret, "opt_act": opt_act} + + return 0, reward, False, False, info diff --git a/src/toymeta/contextual_bandit.py b/src/toymeta/contextual_bandit.py new file mode 100644 index 0000000..460ca5c --- /dev/null +++ b/src/toymeta/contextual_bandit.py @@ -0,0 +1,56 @@ +import gymnasium as gym +import numpy as np + + +# https://courses.cs.washington.edu/courses/cse599i/18wi/resources/lecture10/lecture10.pdf +class ContextualBandit(gym.Env): + def __init__( + self, + context_dim: int, + arm_embeds: np.ndarray, + num_arms: int, + ): + self.arm_embeds = arm_embeds + self.num_arms = num_arms + self.context_dim = context_dim + + self.action_space = gym.spaces.Discrete(len(arm_embeds)) + + self.observation_space = gym.spaces.Box( + low=-1e20, high=1e20, shape=(context_dim,), dtype=np.float64 + ) + + self.regret = 0 + + def reset(self, seed=None, options=None): + super().reset(seed=seed, options=options) + self.context = self._get_new_context() + # we also need to reset regret manually + self.regret = 0 + + return self.context, {} + + def _get_new_context(self): + return self.np_random.normal(size=(self.context_dim,)) / np.sqrt( + self.context_dim + ) + + def step(self, action: int): + assert action < self.num_arms, (action, self.num_arms) + + all_means = (self.arm_embeds @ self.context)[: self.num_arms] + + # calc reward + mean = all_means[action] + reward = self.np_random.normal(loc=mean, scale=1) + + # info for calculation of the regret + opt_mean = all_means.max() + opt_act = all_means.argmax() + + self.regret += opt_mean - mean + info = {"regret": self.regret, "opt_act": opt_act, "mean": mean} + + self.context = self._get_new_context() + + return self.context, reward, False, False, info diff --git a/src/toymeta/had_dark_room.py b/src/toymeta/had_dark_room.py new file mode 100644 index 0000000..64807cb --- /dev/null +++ b/src/toymeta/had_dark_room.py @@ -0,0 +1,218 @@ +""" +This file implements a DarkRoom environment with sequential actions +from Headless Algorithm Distillation https://arxiv.org/pdf/2312.13327.pdf. + +Here each action consists of a sequence of atomic actions: +"noop", "down", "up", "right" and "left". +""" + +import warnings +from itertools import product +from typing import Optional, Tuple + +import gymnasium as gym +import numpy as np +from gymnasium import spaces +from numba import njit + + +def get_action_sequences(num_actions: int, seq_len: int): + seqs = list(product(np.arange(num_actions), repeat=seq_len)) + seqs = np.vstack(seqs) + + return seqs + + +@njit() +def pos_to_state(pos: Tuple[int, int], size: int): + return int(pos[0] * size + pos[1]) + + +@njit() +def single_step( + action: int, + agent_pos: np.ndarray, + action_to_direction: np.ndarray, + size: int, + goal_pos: np.ndarray, + terminate_on_goal: bool, +) -> Tuple[np.ndarray, Tuple[int, float, bool, bool]]: + """ + This function makes an atomic step in the environment. Returns a new agent position and + a usual tuple from a gym environment. + + :param action: index of an atomic action. + :param agent_pos: the current position of an agent. + :param action_to_direction: a list of transitions corresponding to each atomic action. + :param size: the size of the grid. + :param goal_pos: the goal's coordinates. + :param terminate_on_goal: whether the episode ends upon reaching the goal. + """ + + agent_pos = np.clip(agent_pos + action_to_direction[action], 0, size - 1) + + reward = 1.0 if np.array_equal(agent_pos, goal_pos) else 0.0 + terminated = True if reward and terminate_on_goal else False + + gym_output = pos_to_state(agent_pos, size), reward, terminated, False + + return agent_pos, gym_output + + +@njit() +def multi_step( + action: int, + action_sequences: np.ndarray, + agent_pos: np.ndarray, + action_to_direction: np.ndarray, + size: int, + goal_pos: np.ndarray, + terminate_on_goal: bool, +) -> Tuple[np.ndarray, Tuple[int, float, bool, bool]]: + """ + This function makes an sequential step in the environment. Returns a new agent position and + a usual tuple from a gym environment. + + :param action: index of a sequential action. + :param action_sequences: for each sequential action specifies the sequence of atomic actions' indices. + :param agent_pos: the current position of an agent. + :param action_to_direction: a list of transitions corresponding to each atomic action. + :param size: the size of the grid. + :param goal_pos: the goal's coordinates. + :param terminate_on_goal: whether the episode ends upon reaching the goal. + """ + + # Choose a sequence of atomic actions + action_seq = action_sequences[action] + + # Perf each atomic action one after another + rewards = np.zeros(len(action_seq)) + terms = np.zeros(len(action_seq)) + for i, act in enumerate(action_seq): + agent_pos, gym_output = single_step( + act, agent_pos, action_to_direction, size, goal_pos, terminate_on_goal + ) + obs, rew, term, _ = gym_output + rewards[i] = rew + terms[i] = term + + # The reward will equal to 1 if the sequence's trajectory has passed + # through a goal cell + reward = int(np.any(rewards == 1)) + # The episode is finished if the sequence's trajectory has passed + # through a goal cell + term = np.any(terms) + + gym_output = obs, reward, term, False + + return agent_pos, gym_output + + +class HAD_DarkRoom(gym.Env): + """ + This is a darkroom environment where an agent operates in a grid and must reach a goal cell. + A single action is a sequence of atomic actions 'noop', 'up', 'down', 'left' and 'right'. + + :param available_actions: indices of action sequences that the environment will use. + :param action_seq_len: the amount of atomic actions constituting the action sequence. + :param size: the size of the grid. + :param goal_pos: the goal position. If None, will be chosen randomly. + :param render_mode: same as in openai gym. + :param terminate_on_goal: whether the episode ends upon reaching the goal. + """ + + def __init__( + self, + available_actions: np.ndarray, + action_seq_len: int = 1, + size: int = 9, + goal_pos: Optional[np.ndarray] = None, + render_mode=None, + terminate_on_goal: bool = False, + ): + self.action_seq_len = action_seq_len + # 5 is amount of atomic actions + self.action_sequences = get_action_sequences(5, self.action_seq_len) + self.action_sequences = self.action_sequences[available_actions] + + self.size = size + self.observation_space = spaces.Discrete(self.size**2) + self.action_space = spaces.Discrete(len(available_actions)) + + self.action_to_direction = np.array([[0, 0], [-1, 0], [0, 1], [1, 0], [0, -1]]) + + # the agent will start here + self.center_pos = (self.size // 2, self.size // 2) + + # set the goal cell + if goal_pos is not None: + self.goal_pos = np.asarray(goal_pos) + assert self.goal_pos.ndim == 1 + else: + self.goal_pos = self.generate_goal_pos() + + self.terminate_on_goal = terminate_on_goal + self.render_mode = render_mode + + def generate_goal_pos(self): + """ + Generates random coordinates for the goal. + """ + return self.np_random.integers(0, self.size, size=2) + + def state_to_pos(self, state): + """ + Converts an index of a cell into 2-component coordinates + """ + return np.array(divmod(state, self.size)) + + def reset(self, seed=None, options=None): + super().reset(seed=seed, options=options) + self.agent_pos = np.array(self.center_pos, dtype=np.float32) + + return pos_to_state(self.agent_pos, self.size), {} + + def _single_step(self, action): + """ + An atomic step in the environment. + + :param action: index of atomic action. + """ + self.agent_pos, gym_output = single_step( + action, + agent_pos=self.agent_pos, + action_to_direction=self.action_to_direction, + size=self.size, + goal_pos=self.goal_pos, + terminate_on_goal=self.terminate_on_goal, + ) + + return gym_output + ({},) + + def step(self, action): + """ + A 'sequential' step in an environment. + + :param action: index of a sequential action. + """ + self.agent_pos, gym_output = multi_step( + action, + action_sequences=self.action_sequences, + agent_pos=self.agent_pos, + action_to_direction=self.action_to_direction, + size=self.size, + goal_pos=self.goal_pos, + terminate_on_goal=self.terminate_on_goal, + ) + + return gym_output + ({},) + + def render(self) -> Optional[np.ndarray]: + if self.render_mode == "rgb_array": + # Create a grid representing the dark room + grid = np.full( + (self.size, self.size, 3), fill_value=(255, 255, 255), dtype=np.uint8 + ) + grid[self.goal_pos[0], self.goal_pos[1]] = (255, 0, 0) + grid[int(self.agent_pos[0]), int(self.agent_pos[1])] = (0, 255, 0) + return grid diff --git a/tests/test.py b/tests/test.py new file mode 100644 index 0000000..a725da5 --- /dev/null +++ b/tests/test.py @@ -0,0 +1,44 @@ +import unittest + +import gymnasium as gym +import numpy as np +import toymeta + + +class TestEnvs(unittest.TestCase): + def test_contextual(self): + try: + env = gym.make( + "ContextualBandit", + context_dim=2, + arm_embeds=np.random.randn(2, 2), + num_arms=2, + ) + env.reset() + env.step(1) + except Exception as e: + self.fail(f"Some error occured: {e}") + + def test_bernoulli(self): + try: + env = gym.make( + "BernoulliBandit", arms_mean=np.array([0.5, 0.9]), num_arms=2 + ) + env.reset() + env.step(1) + except Exception as e: + self.fail(f"Some error occured: {e}") + + def test_had_dark_room(self): + try: + env = gym.make( + "HAD-Dark-Room", available_actions=np.array([1, 2, 3]), action_seq_len=3 + ) + env.reset() + env.step(1) + except Exception as e: + self.fail(f"Some error occured: {e}") + + +if __name__ == "__main__": + unittest.main()