Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new envs #1

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ requires-python =">=3.9"
license = {file = "LICENSE"}
authors = [
{name = "Alexander Nikulin", email = "[email protected]"},
{name = "Viacheslav Sinii", email = "[email protected]"}
]
dynamic = ["version"]

keywords = [
"reinforcement learning",
"meta-reinforcement learning",
"gridworld",
"dark room"
"dark room",
"bandit",
"bernoulli bandit",
"contextual bandit"
]
classifiers = [
"Development Status :: 4 - Beta",
Expand All @@ -29,7 +33,8 @@ classifiers = [
]

dependencies = [
"gymnasium>=0.29.0"
"gymnasium>=0.29.0",
"numba>=0.59.0"
]

[project.optional-dependencies]
Expand Down
16 changes: 16 additions & 0 deletions src/toymeta/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,19 @@
max_episode_steps=20,
kwargs={"size": 3, "random_start": True},
)

register(
id="BernoulliBandit",
entry_point="toymeta.bernoulli_bandit:MultiArmedBanditBernoulli",
)

register(
id="HAD-Dark-Room",
entry_point="toymeta.had_dark_room:HAD_DarkRoom",
max_episode_steps=20,
kwargs={"terminate_on_goal": True},
)

register(
id="ContextualBandit", entry_point="toymeta.contextual_bandit:ContextualBandit"
)
37 changes: 37 additions & 0 deletions src/toymeta/bernoulli_bandit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import gymnasium as gym
import numpy as np


class MultiArmedBanditBernoulli(gym.Env):
def __init__(self, arms_mean: np.ndarray, num_arms: int):
self.arms_mean = arms_mean
self.num_arms = num_arms

self.action_space = gym.spaces.Discrete(len(arms_mean))
# the only obs is 0
self.observation_space = gym.spaces.Discrete(1)

self.regret = 0

def reset(self, seed=None, options=None):
super().reset(seed=seed, options=options)
# we need to reset regret manually
self.regret = 0

return 0, {}

def step(self, action: int):
assert action < self.num_arms, (action, self.num_arms)

# calc reward
mean = self.arms_mean[action]
reward = self.np_random.binomial(n=1, p=mean)

# info for calculation of the regret
opt_mean = self.arms_mean[: self.num_arms].max()
opt_act = self.arms_mean[: self.num_arms].argmax()

self.regret += opt_mean - mean
info = {"regret": self.regret, "opt_act": opt_act}

return 0, reward, False, False, info
56 changes: 56 additions & 0 deletions src/toymeta/contextual_bandit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import gymnasium as gym
import numpy as np


# https://courses.cs.washington.edu/courses/cse599i/18wi/resources/lecture10/lecture10.pdf
class ContextualBandit(gym.Env):
def __init__(
self,
context_dim: int,
arm_embeds: np.ndarray,
num_arms: int,
):
self.arm_embeds = arm_embeds
self.num_arms = num_arms
self.context_dim = context_dim

self.action_space = gym.spaces.Discrete(len(arm_embeds))

self.observation_space = gym.spaces.Box(
low=-1e20, high=1e20, shape=(context_dim,), dtype=np.float64
)

self.regret = 0

def reset(self, seed=None, options=None):
super().reset(seed=seed, options=options)
self.context = self._get_new_context()
# we also need to reset regret manually
self.regret = 0

return self.context, {}

def _get_new_context(self):
return self.np_random.normal(size=(self.context_dim,)) / np.sqrt(
self.context_dim
)

def step(self, action: int):
assert action < self.num_arms, (action, self.num_arms)

all_means = (self.arm_embeds @ self.context)[: self.num_arms]

# calc reward
mean = all_means[action]
reward = self.np_random.normal(loc=mean, scale=1)

# info for calculation of the regret
opt_mean = all_means.max()
opt_act = all_means.argmax()

self.regret += opt_mean - mean
info = {"regret": self.regret, "opt_act": opt_act, "mean": mean}

self.context = self._get_new_context()

return self.context, reward, False, False, info
218 changes: 218 additions & 0 deletions src/toymeta/had_dark_room.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
"""
This file implements a DarkRoom environment with sequential actions
from Headless Algorithm Distillation https://arxiv.org/pdf/2312.13327.pdf.

Here each action consists of a sequence of atomic actions:
"noop", "down", "up", "right" and "left".
"""

import warnings
from itertools import product
from typing import Optional, Tuple

import gymnasium as gym
import numpy as np
from gymnasium import spaces
from numba import njit


def get_action_sequences(num_actions: int, seq_len: int):
seqs = list(product(np.arange(num_actions), repeat=seq_len))
seqs = np.vstack(seqs)

return seqs


@njit()
def pos_to_state(pos: Tuple[int, int], size: int):
return int(pos[0] * size + pos[1])


@njit()
def single_step(
action: int,
agent_pos: np.ndarray,
action_to_direction: np.ndarray,
size: int,
goal_pos: np.ndarray,
terminate_on_goal: bool,
) -> Tuple[np.ndarray, Tuple[int, float, bool, bool]]:
"""
This function makes an atomic step in the environment. Returns a new agent position and
a usual tuple from a gym environment.

:param action: index of an atomic action.
:param agent_pos: the current position of an agent.
:param action_to_direction: a list of transitions corresponding to each atomic action.
:param size: the size of the grid.
:param goal_pos: the goal's coordinates.
:param terminate_on_goal: whether the episode ends upon reaching the goal.
"""

agent_pos = np.clip(agent_pos + action_to_direction[action], 0, size - 1)

reward = 1.0 if np.array_equal(agent_pos, goal_pos) else 0.0
terminated = True if reward and terminate_on_goal else False

gym_output = pos_to_state(agent_pos, size), reward, terminated, False

return agent_pos, gym_output


@njit()
def multi_step(
action: int,
action_sequences: np.ndarray,
agent_pos: np.ndarray,
action_to_direction: np.ndarray,
size: int,
goal_pos: np.ndarray,
terminate_on_goal: bool,
) -> Tuple[np.ndarray, Tuple[int, float, bool, bool]]:
"""
This function makes an sequential step in the environment. Returns a new agent position and
a usual tuple from a gym environment.

:param action: index of a sequential action.
:param action_sequences: for each sequential action specifies the sequence of atomic actions' indices.
:param agent_pos: the current position of an agent.
:param action_to_direction: a list of transitions corresponding to each atomic action.
:param size: the size of the grid.
:param goal_pos: the goal's coordinates.
:param terminate_on_goal: whether the episode ends upon reaching the goal.
"""

# Choose a sequence of atomic actions
action_seq = action_sequences[action]

# Perf each atomic action one after another
rewards = np.zeros(len(action_seq))
terms = np.zeros(len(action_seq))
for i, act in enumerate(action_seq):
agent_pos, gym_output = single_step(
act, agent_pos, action_to_direction, size, goal_pos, terminate_on_goal
)
obs, rew, term, _ = gym_output
rewards[i] = rew
terms[i] = term

# The reward will equal to 1 if the sequence's trajectory has passed
# through a goal cell
reward = int(np.any(rewards == 1))
# The episode is finished if the sequence's trajectory has passed
# through a goal cell
term = np.any(terms)

gym_output = obs, reward, term, False

return agent_pos, gym_output


class HAD_DarkRoom(gym.Env):
"""
This is a darkroom environment where an agent operates in a grid and must reach a goal cell.
A single action is a sequence of atomic actions 'noop', 'up', 'down', 'left' and 'right'.

:param available_actions: indices of action sequences that the environment will use.
:param action_seq_len: the amount of atomic actions constituting the action sequence.
:param size: the size of the grid.
:param goal_pos: the goal position. If None, will be chosen randomly.
:param render_mode: same as in openai gym.
:param terminate_on_goal: whether the episode ends upon reaching the goal.
"""

def __init__(
self,
available_actions: np.ndarray,
action_seq_len: int = 1,
size: int = 9,
goal_pos: Optional[np.ndarray] = None,
render_mode=None,
terminate_on_goal: bool = False,
):
self.action_seq_len = action_seq_len
# 5 is amount of atomic actions
self.action_sequences = get_action_sequences(5, self.action_seq_len)
self.action_sequences = self.action_sequences[available_actions]

self.size = size
self.observation_space = spaces.Discrete(self.size**2)
self.action_space = spaces.Discrete(len(available_actions))

self.action_to_direction = np.array([[0, 0], [-1, 0], [0, 1], [1, 0], [0, -1]])

# the agent will start here
self.center_pos = (self.size // 2, self.size // 2)

# set the goal cell
if goal_pos is not None:
self.goal_pos = np.asarray(goal_pos)
assert self.goal_pos.ndim == 1
else:
self.goal_pos = self.generate_goal_pos()

self.terminate_on_goal = terminate_on_goal
self.render_mode = render_mode

def generate_goal_pos(self):
"""
Generates random coordinates for the goal.
"""
return self.np_random.integers(0, self.size, size=2)

def state_to_pos(self, state):
"""
Converts an index of a cell into 2-component coordinates
"""
return np.array(divmod(state, self.size))

def reset(self, seed=None, options=None):
super().reset(seed=seed, options=options)
self.agent_pos = np.array(self.center_pos, dtype=np.float32)

return pos_to_state(self.agent_pos, self.size), {}

def _single_step(self, action):
"""
An atomic step in the environment.

:param action: index of atomic action.
"""
self.agent_pos, gym_output = single_step(
action,
agent_pos=self.agent_pos,
action_to_direction=self.action_to_direction,
size=self.size,
goal_pos=self.goal_pos,
terminate_on_goal=self.terminate_on_goal,
)

return gym_output + ({},)

def step(self, action):
"""
A 'sequential' step in an environment.

:param action: index of a sequential action.
"""
self.agent_pos, gym_output = multi_step(
action,
action_sequences=self.action_sequences,
agent_pos=self.agent_pos,
action_to_direction=self.action_to_direction,
size=self.size,
goal_pos=self.goal_pos,
terminate_on_goal=self.terminate_on_goal,
)

return gym_output + ({},)

def render(self) -> Optional[np.ndarray]:
if self.render_mode == "rgb_array":
# Create a grid representing the dark room
grid = np.full(
(self.size, self.size, 3), fill_value=(255, 255, 255), dtype=np.uint8
)
grid[self.goal_pos[0], self.goal_pos[1]] = (255, 0, 0)
grid[int(self.agent_pos[0]), int(self.agent_pos[1])] = (0, 255, 0)
return grid
Loading