forked from mswang12/minDQN
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathminDQN.py
151 lines (120 loc) · 5.62 KB
/
minDQN.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
"""
A Minimal Deep Q-Learning Implementation (minDQN)
Running this code will render the agent solving the CartPole environment using OpenAI gym. Our Minimal Deep Q-Network is approximately 150 lines of code. In addition, this implementation uses Tensorflow and Keras and should generally run in less than 15 minutes.
Usage: python3 minDQN.py
"""
import gym
import tensorflow as tf
import numpy as np
from tensorflow import keras
from collections import deque
import time
import random
RANDOM_SEED = 5
tf.random.set_seed(RANDOM_SEED)
env = gym.make('CartPole-v1')
env.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
print("Action Space: {}".format(env.action_space))
print("State space: {}".format(env.observation_space))
# An episode a full game
train_episodes = 300
test_episodes = 100
def agent(state_shape, action_shape):
""" The agent maps X-states to Y-actions
e.g. The neural network output is [.1, .7, .1, .3]
The highest value 0.7 is the Q-Value.
The index of the highest action (0.7) is action #1.
"""
learning_rate = 0.001
init = tf.keras.initializers.HeUniform()
model = keras.Sequential()
model.add(keras.layers.Dense(24, input_shape=state_shape, activation='relu', kernel_initializer=init))
model.add(keras.layers.Dense(12, activation='relu', kernel_initializer=init))
model.add(keras.layers.Dense(action_shape, activation='linear', kernel_initializer=init))
model.compile(loss=tf.keras.losses.Huber(), optimizer=tf.keras.optimizers.Adam(lr=learning_rate), metrics=['accuracy'])
return model
def get_qs(model, state, step):
return model.predict(state.reshape([1, state.shape[0]]))[0]
def train(env, replay_memory, model, target_model, done):
learning_rate = 0.7 # Learning rate
discount_factor = 0.618
MIN_REPLAY_SIZE = 1000
if len(replay_memory) < MIN_REPLAY_SIZE:
return
batch_size = 64 * 2
mini_batch = random.sample(replay_memory, batch_size)
current_states = np.array([encode_observation(transition[0], env.observation_space.shape) for transition in mini_batch])
current_qs_list = model.predict(current_states)
new_current_states = np.array([encode_observation(transition[3], env.observation_space.shape) for transition in mini_batch])
future_qs_list = target_model.predict(new_current_states)
X = []
Y = []
for index, (observation, action, reward, new_observation, done) in enumerate(mini_batch):
if not done:
max_future_q = reward + discount_factor * np.max(future_qs_list[index])
else:
max_future_q = reward
current_qs = current_qs_list[index]
current_qs[action] = (1 - learning_rate) * current_qs[action] + learning_rate * max_future_q
X.append(encode_observation(observation, env.observation_space.shape))
Y.append(current_qs)
model.fit(np.array(X), np.array(Y), batch_size=batch_size, verbose=0, shuffle=True)
def encode_observation(observation, n_dims):
return observation
def main():
epsilon = 1 # Epsilon-greedy algorithm in initialized at 1 meaning every step is random at the start
max_epsilon = 1 # You can't explore more than 100% of the time
min_epsilon = 0.01 # At a minimum, we'll always explore 1% of the time
decay = 0.01
# 1. Initialize the Target and Main models
# Main Model (updated every 4 steps)
model = agent(env.observation_space.shape, env.action_space.n)
# Target Model (updated every 100 steps)
target_model = agent(env.observation_space.shape, env.action_space.n)
target_model.set_weights(model.get_weights())
replay_memory = deque(maxlen=50_000)
target_update_counter = 0
# X = states, y = actions
X = []
y = []
steps_to_update_target_model = 0
for episode in range(train_episodes):
total_training_rewards = 0
observation = env.reset()
done = False
while not done:
steps_to_update_target_model += 1
if True:
env.render()
random_number = np.random.rand()
# 2. Explore using the Epsilon Greedy Exploration Strategy
if random_number <= epsilon:
# Explore
action = env.action_space.sample()
else:
# Exploit best known action
# model dims are (batch, env.observation_space.n)
encoded = encode_observation(observation, env.observation_space.shape[0])
encoded_reshaped = encoded.reshape([1, encoded.shape[0]])
predicted = model.predict(encoded_reshaped).flatten()
action = np.argmax(predicted)
new_observation, reward, done, info = env.step(action)
replay_memory.append([observation, action, reward, new_observation, done])
# 3. Update the Main Network using the Bellman Equation
if steps_to_update_target_model % 4 == 0 or done:
train(env, replay_memory, model, target_model, done)
observation = new_observation
total_training_rewards += reward
if done:
print('Total training rewards: {} after n steps = {} with final reward = {}'.format(total_training_rewards, episode, reward))
total_training_rewards += 1
if steps_to_update_target_model >= 100:
print('Copying main network weights to the target network weights')
target_model.set_weights(model.get_weights())
steps_to_update_target_model = 0
break
epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-decay * episode)
env.close()
if __name__ == '__main__':
main()