-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdqn_agent.py
111 lines (101 loc) · 4.69 KB
/
dqn_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import GRU, Dense, Dropout, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import load_model
from collections import deque
import tensorflow as tf
import numpy as np
import random
import multiprocessing
import time
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
class DQNAgent:
def __init__(self, action_size, window_size, is_model=False, current_iter = 0, current_step=0, model_name="", loss=0, epsilon=1.0, learning_rate=0.001):
self.action_size = action_size
self.memory = deque(maxlen=100)
self.window_size = window_size
self.gamma = 0.95
self.epsilon = epsilon
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
self.learning_rate = learning_rate
self.learning_rate_decay = 0.9995
self.step = current_step
self.current_iter = current_iter
self.loss = 0
self.loss_avg = loss
if not is_model:
self.model = self._build_model()
else:
self._load_model(model_name)
def timer_decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
elapsed_time = end_time - start_time
print(f"\nFunction '{func.__name__}' took {elapsed_time} seconds to complete")
return result
return wrapper
def _load_model(self, model_name):
self.model = load_model(model_name)
self.model.compile(optimizer=Adam(learning_rate=self.learning_rate, clipnorm=1.0), loss='mse', run_eagerly=True)
def _build_model(self):
model = Sequential()
model.add(GRU(128, input_shape=(self.window_size, 5), return_sequences=True, name="gru_1"))
model.add(Dropout(0.2))
model.add(GRU(256, return_sequences=True, name="gru_2"))
model.add(Dropout(0.2))
model.add(GRU(512, name="gru_3"))
model.add(Dropout(0.2))
model.add(Dense(self.action_size, activation='linear'))
model.compile(optimizer=Adam(learning_rate=self.learning_rate, clipnorm=1.0), loss='mse', run_eagerly=True)
return model
def remember(self, state, action, reward, done, n_rewards, best_reward, prediction):
self.memory.append((state, action, reward, done, n_rewards, best_reward, prediction))
def act(self, state, prediction=False):
state = state.reshape(1, 50, 5)
predictions = 0
if np.random.rand() <= self.epsilon and prediction==False:
action = random.randrange(self.action_size)
act_values = [action]
else:
predictions = self.model.predict(state, verbose=0)
#print(f"Type {type(predictions[0])} Prediction {predictions[0]}")
act_values = [np.argmax(predictions)]
return act_values, predictions
def minibatch_process(self, minibatch):
state, action, reward, done, n_rewards, best_reward, predictions = minibatch
#print(f"State Shape: {np.shape(state)}")
state[np.isnan(state)] = 0
state = np.expand_dims(state, axis=0)
if type(predictions) == type(0):
predictions = self.model.predict(state, verbose=0)
target_f = predictions
#target = sum([self.gamma**k * rew for k, rew in enumerate(n_rewards)])
#print(f"\nBest Action: {type(best_reward)} Target: {target_f} Target Type: {type(target_f)}")
target = np.amax(predictions)
if not done:
discounted_rewards = sum([self.gamma**i * n_rewards[i] for i in range(len(n_rewards))])
q_val_n_step = self.gamma**len(n_rewards) * np.amax(predictions)
target = discounted_rewards + q_val_n_step
#print(f"--------Target_f {target_f} Action: {action[0]} Target: {target}")
target_f[0][int(action[0])] = target
#print(f"New Prediction Target: {target_f} Best Reward {best_reward}")
#print(f"Shape: {np.shape(state)}")
_loss = self.model.train_on_batch(state, target_f)
self.loss = _loss
return _loss
@timer_decorator
def replay(self, minibatch):
loss_arr = []
for i, mini in enumerate(minibatch):
print(f"\rMinibatch iter {i}", end="")
_loss = self.minibatch_process(mini)
loss_arr.append(_loss)
self.loss_avg = (self.loss_avg+(sum(loss_arr)/len(loss_arr)))/2
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
def save_model(self, model_name):
self.model.save(model_name)