forked from NTT123/a0-jax
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtree_search.py
69 lines (61 loc) · 1.91 KB
/
tree_search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""
Monte Carlo tree search.
"""
from functools import partial
import chex
import jax
import jax.numpy as jnp
import mctx
from env import Enviroment as E
from utils import batched_policy, env_step
def recurrent_fn(params, rng_key: chex.Array, action: chex.Array, embedding):
"""One simulation step in MCTS."""
del rng_key
agent = params
env = embedding
env, reward = jax.vmap(env_step)(env, action)
state = jax.vmap(lambda e: e.canonical_observation())(env)
prior_logits, value = jax.vmap(lambda a, s: a(s), in_axes=(None, 0))(agent, state)
discount = -1.0 * jnp.ones_like(reward)
terminated = env.is_terminated()
assert value.shape == terminated.shape
value = jnp.where(terminated, 0.0, value)
assert discount.shape == terminated.shape
discount = jnp.where(terminated, 0.0, discount)
recurrent_fn_output = mctx.RecurrentFnOutput(
reward=reward,
discount=discount,
prior_logits=prior_logits,
value=value,
)
return recurrent_fn_output, env
def improve_policy_with_mcts(
agent,
env: E,
rng_key: chex.Array,
rec_fn,
num_simulations: int,
):
"""Improve agent policy using MCTS.
Returns:
An improved policy.
"""
state = jax.vmap(lambda e: e.canonical_observation())(env)
_, (prior_logits, value) = batched_policy(agent, state)
root = mctx.RootFnOutput(prior_logits=prior_logits, value=value, embedding=env)
policy_output = mctx.gumbel_muzero_policy(
params=agent,
rng_key=rng_key,
root=root,
recurrent_fn=rec_fn,
num_simulations=num_simulations,
invalid_actions=jax.vmap(lambda e: e.invalid_actions())(env),
qtransform=partial(
mctx.qtransform_completed_by_mix_value,
value_scale=0.1,
maxvisit_init=50,
rescale_values=False,
),
gumbel_scale=1.0,
)
return policy_output