-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDQN_Breakout.py
161 lines (129 loc) · 6.62 KB
/
DQN_Breakout.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import gym
import tensorflow as tf
import numpy as np
import pylab as pl
"""
Deep Q Learning (and additional) steps
Main loop:
1. Estimate Q values based on the observed state and sample an action
2. Perform an action and observe next state
4. Save state, action, reward, next state, done (append to existing memory)
5. Randomize a mini-batch from collected experiences (memory)
6. Use randomized mini-batch to estimate target Q values and train
Q_target = neural net output using mini-batch of states as input and then
Q_target[action] = reward (from state) + gamma * reward (from next state)
Helpful sources:
https://keon.io/deep-q-learning/
"""
class DQNagent():
def __init__(self, s_size, a_size, m_size, lr):
self.memory = [] # object: memory
self.history = [] # object: training history for plotting
self.m_size = m_size # parameter: memory size
self.s_size = s_size # parameter: state size
self.a_size = a_size # parameter: action size
self.lr = lr # parameter: learning rate
self.Q_values, self.optimization, self.sess, self.cost, self.y, self.x = self.tf_sess() # tf session
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
if len(self.memory) > self.m_size: self.memory = self.memory[-self.m_size:]
def act(self, state, epsilon):
if np.random.uniform(0,1,1) < epsilon:
action = np.zeros(self.a_size)
action[np.random.randint(0, self.a_size,1)] = 1
else:
action = self.sess.run(self.Q_values, feed_dict={self.x: [state]})
return action
def memory_batch(self, batch_size):
if len(self.memory) > batch_size:
batch_id = np.random.choice(len(self.memory) - 1, batch_size)
mini_batch = [self.memory[index] for index in batch_id]
else:
mini_batch = self.memory
return mini_batch
def train(self, training_batch, discount, return_Q_targets):
# Convert list to array and un-group stacked data
training_batch = np.array(training_batch)
state, action, reward, next_state, done = training_batch[:,0], training_batch[:,1], training_batch[:,2], training_batch[:,3], training_batch[:,4]
# Calculate target Q values
Q_target = self.sess.run(self.Q_values, feed_dict={self.x: np.stack(state)}) # Q values from state
Q_target_next = np.amax(self.sess.run(self.Q_values, feed_dict={self.x: np.stack(next_state)}), axis=1) # Max Q values from next_state
Game_not_done = 1 - done # Swapping 0->1 and 1->0. Originally if done = True, but I need True if not Done.
Q_target_one = reward + discount * Q_target_next * Game_not_done # Estimate target Q values
for i in range(len(Q_target_one)): # Assign target Q values to appropriate action
Q_target[i, action[i]] = Q_target_one[i]
# Train
_ = self.sess.run(self.optimization, feed_dict={self.x: np.stack(state), self.y: np.stack(Q_target)})
if return_Q_targets: return Q_target # return Q_targets if curious to see what it looks like
def tf_sess(self):
# Create tf placeholders and weights
x = tf.placeholder(shape=[None, self.s_size], dtype=tf.float32)
y = tf.placeholder(shape=[None, self.a_size], dtype=tf.float32)
O = {'w1': tf.Variable(tf.truncated_normal(shape=[self.s_size, 128], mean=0, stddev=0.1, dtype=tf.float32)),
'w2': tf.Variable(tf.truncated_normal(shape=[128, 32], mean=0.1, stddev=0, dtype=tf.float32)),
'w3': tf.Variable(tf.truncated_normal(shape=[32, self.a_size], mean=0, stddev=0.1, dtype=tf.float32))}
B = {'b1': tf.Variable(tf.truncated_normal(shape=[1, 128], mean=0, stddev=0.1, dtype=tf.float32)),
'b2': tf.Variable(tf.truncated_normal(shape=[1, 32], mean=0, stddev=0.1, dtype=tf.float32))}
# Estimate net's output
l1 = tf.nn.relu(tf.matmul(x, O['w1']) + B['b1'])
l2 = tf.nn.relu(tf.matmul(l1, O['w2']) + B['b2'])
Q_values = tf.matmul(l2, O['w3'])
# Estimate cost and create optimizer
cost = tf.losses.huber_loss(y, Q_values) # https://www.tensorflow.org/api_docs/python/tf/losses/huber_loss
optimization = tf.train.AdamOptimizer(learning_rate=self.lr).minimize(cost)
# Initiate tf session
init = tf.global_variables_initializer()
sess = tf.Session()
sess.run(init)
return Q_values, optimization, sess, cost, y, x
# Hyperparameters
learning_rate = 0.00025
discount = 0.999
epsilon_max = 1
epsilon_min = 0.1 # minimum value
epsilon_d = 0.0003 # 1 - epsilon_d * games_played
memory_size = 16192 * 2
mini_batch_size = 512
train_freq = 2
# Initialize game environment, agent and else
tf.reset_default_graph()
env = gym.make('Breakout-ram-v0')
agent = DQNagent(s_size=len(env.reset()), a_size=len(env.unwrapped.get_action_meanings()), m_size=memory_size, lr=learning_rate)
pl.ioff() # to turn off plotting in IDE (save it to a file instead) pl.ion() to reverse
# Reset variables
state = env.reset() / 255
epsilon = 1
frame = 0
score = 0
running_score = 0
games_played = 0
memory_is_full = False
# Main loop
while True:
# Chose and perform an action, save experience
action = np.argmax(agent.act(state, epsilon))
next_state, reward, done, _ = env.step(action)
agent.remember(state, action, reward, next_state / 255, done)
# Update other variables
state = next_state / 255
score += reward
frame += 1
memory_is_full = (len(agent.memory) == memory_size)
# Sample training batch and train
if memory_is_full and frame % train_freq == 0:
training_batch = agent.memory_batch(mini_batch_size)
_ = agent.train(training_batch, discount, return_Q_targets=True)
if done: # Stuff to do if game is finished
# Reset, update and etc.
state = env.reset() / 255
running_score = running_score * 0.99 + 0.01 * score
agent.history.append(running_score)
# Print and save
print('Game: %d, R: %.2f, Running R: %.2f, Eps: %.2f, Memory: %d' % (games_played, score, running_score, epsilon, len(agent.memory)))
pl.plot(agent.history)
pl.savefig('DGQ Breakout test.png')
# Update other variables
if memory_is_full: epsilon = epsilon_max - epsilon_d * games_played if epsilon > epsilon_min else epsilon_min
if memory_is_full: games_played += 1
frame = 0
score = 0