forked from Arker123/Q-learning
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
151 lines (130 loc) · 6.78 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# Authors:- Arnav Kharbanda, Yashasav Prajapati, Shobhit Juglan, Arpit Kumar Gautam
# References:- https://www.analyticsvidhya.com/blog/2021/04/q-learning-algorithm-with-step-by-step-implementation-using-python/
# https://link.springer.com/article/10.1007/BF00992698
import numpy as np
# numeric action codes: 0 = up, 1 = right, 2 = down, 3 = left
actions = ["up", "right", "down", "left"]
environment_rows = 11
environment_columns = 11
q_values = np.zeros((environment_rows, environment_columns, 4))
# Create a 2D numpy array to hold the rewards for each state.
# The array contains 11 rows and 11 columns (to match the shape of the environment), and each value is initialized to -100.
rewards = np.full((environment_rows, environment_columns), -100.0)
rewards[0, 5] = 100.0 # set the reward for the packaging area (i.e., the goal) to 100
# define aisle locations (i.e., white squares) for rows 1 through 9
aisles = {} # store locations in a dictionary
aisles[1] = [i for i in range(1, 10)]
aisles[2] = [1, 7, 9]
aisles[3] = [i for i in range(1, 8)]
aisles[3].append(9)
aisles[4] = [3, 7]
aisles[5] = [i for i in range(11)]
aisles[6] = [5]
aisles[7] = [i for i in range(1, 10)]
aisles[8] = [3, 7]
aisles[9] = [i for i in range(11)]
# set the rewards for all aisle locations (i.e., white squares)
for row_index in range(1, 10):
for column_index in aisles[row_index]:
rewards[row_index, column_index] = -1.0
# print rewards matrix
for row in rewards:
print(row)
# define a function that determines if the specified location is a terminal state
def is_terminal_state(current_row_index, current_column_index):
# if the reward for this location is -1, then it is not a terminal state (i.e., it is a 'white square')
if rewards[current_row_index, current_column_index] == -1.0:
return False
else:
return True
# define a function that will choose a random, non-terminal starting location
def get_starting_location():
# get a random row and column index
current_row_index = np.random.randint(environment_rows)
current_column_index = np.random.randint(environment_columns)
# continue choosing random row and column indexes until a non-terminal state is identified
# (i.e., until the chosen state is a 'white square').
while is_terminal_state(current_row_index, current_column_index):
current_row_index = np.random.randint(environment_rows)
current_column_index = np.random.randint(environment_columns)
return current_row_index, current_column_index
# define an epsilon greedy algorithm that will choose which action to take next (i.e., where to move next)
def get_next_action(current_row_index, current_column_index, epsilon):
# if a randomly chosen value between 0 and 1 is less than epsilon,
# then choose the most promising value from the Q-table for this state.
if np.random.random() < epsilon:
return np.argmax(q_values[current_row_index, current_column_index])
else: # choose a random action
return np.random.randint(4)
# define a function that will get the next location based on the chosen action
def get_next_location(current_row_index, current_column_index, action_index):
new_row_index = current_row_index
new_column_index = current_column_index
if actions[action_index] == "up" and current_row_index > 0:
new_row_index -= 1
elif (
actions[action_index] == "right"
and current_column_index < environment_columns - 1
):
new_column_index += 1
elif actions[action_index] == "down" and current_row_index < environment_rows - 1:
new_row_index += 1
elif actions[action_index] == "left" and current_column_index > 0:
new_column_index -= 1
return new_row_index, new_column_index
# Define a function that will get the shortest path between any location within the warehouse that
# the robot is allowed to travel and the item packaging location.
def get_shortest_path(start_row_index, start_column_index):
# return immediately if this is an invalid starting location
if is_terminal_state(start_row_index, start_column_index):
return []
else: # if this is a 'legal' starting location
current_row_index, current_column_index = start_row_index, start_column_index
shortest_path = []
shortest_path.append([current_row_index, current_column_index])
# continue moving along the path until we reach the goal (i.e., the item packaging location)
while not is_terminal_state(current_row_index, current_column_index):
# get the best action to take
action_index = get_next_action(current_row_index, current_column_index, 1.0)
# move to the next location on the path, and add the new location to the list
current_row_index, current_column_index = get_next_location(
current_row_index, current_column_index, action_index
)
shortest_path.append([current_row_index, current_column_index])
return shortest_path
# define training parameters
epsilon = 0.9 # the percentage of time when we should take the best action (instead of a random action)
discount_factor = 0.9 # discount factor for future rewards
learning_rate = 0.9 # the rate at which the agent should learn
# run through 1000 training episodes
for episode in range(1000):
# get the starting location for this episode
row_index, column_index = get_starting_location()
# continue taking actions (i.e., moving) until we reach a terminal state
# (i.e., until we reach the item packaging area or crash into an item storage location)
while not is_terminal_state(row_index, column_index):
# choose which action to take (i.e., where to move next)
action_index = get_next_action(row_index, column_index, epsilon)
# perform the chosen action, and transition to the next state (i.e., move to the next location)
old_row_index, old_column_index = (
row_index,
column_index,
) # store the old row and column indexes
row_index, column_index = get_next_location(
row_index, column_index, action_index
)
# receive the reward for moving to the new state, and calculate the temporal difference
reward = rewards[row_index, column_index]
old_q_value = q_values[old_row_index, old_column_index, action_index]
temporal_difference = (
reward
+ (discount_factor * np.max(q_values[row_index, column_index]))
- old_q_value
)
# update the Q-value for the previous state and action pair
new_q_value = old_q_value + (learning_rate * temporal_difference)
q_values[old_row_index, old_column_index, action_index] = new_q_value
print("Training complete!")
print(get_shortest_path(3, 9))
print(get_shortest_path(5, 0))
print(get_shortest_path(9, 5))