forked from BrenoHA/introduction-to-ai
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRL.py
639 lines (525 loc) · 30.5 KB
/
RL.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
#####################################################################################################################################################
######################################################################## INFO #######################################################################
#####################################################################################################################################################
"""
This program aims at training the rat to beat a greedy opponent.
To do so, we use reinforcement learning, and more precisely deep Q learning (DQN).
If you set the TRAIN_MODEL constant to True, the rat will be training.
Otherwise it will just try to get the cheese using what it has learned.
In this lab, we will work on a full maze with no mud.
The settings have been set to a 10x10 maze with 15 pieces of cheese.
You can change these settings if you want.
This file already provides you with a functional RL algorithm.
However, it is not very efficient, and you will have to improve it.
Before you do so, you should understand how it works.
The file makes use of Weights and Biases (WandB) to monitor the training.
You can disable its use by setting the USE_WANDB constant to False.
If you want to use it, you will have to create an account on https://wandb.ai/ and get an API key.
Then, you will have to store that key in a file named "wandb.key" and set the WANDB_KEY_PATH variable accordingly (by defaultit is set to the same directory as this file).
Here is your mission:
1 - Have a look at the code and make sure you understand it.
Also, make sure the code runs for training (TRAIN_MODEL=True) and playing (TRAIN_MODEL=False).
If not, you may have to install some packages or change some paths.
2 - Improve the RL algorithm.
There are multiple ways to do so, and you are free to choose the ones you want.
Some ideas:
* Improve the data representation.
* Change the reward function.
* Improve the exploration strategy (some keywords you may be looking for are "epsilon greedy" or "softmax policy" for instance).
* Change the hyperparameters of the algorithm (discount factor, batch size, etc.).
* Improve the model architecture.
* Change the optimizer or its parameters.
* Change the loss function.
* Etc.
"""
#####################################################################################################################################################
###################################################################### IMPORTS ######################################################################
#####################################################################################################################################################
# Import PyRat
from pyrat import *
# External imports
import torch
import random
import numpy
import shutil
import tqdm
import wandb
import sys
import os
# Previously developed functions
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "lab4"))
import greedy as opponent
from utils import get_opponent_name
#####################################################################################################################################################
############################################################### CONSTANTS & VARIABLES ###############################################################
#####################################################################################################################################################
"""
Paths where to store the stuff that must be shared from a game to the other.
"""
OUTPUT_DIRECTORY = os.path.join(os.path.dirname(os.path.realpath(__file__)), "RL")
MODEL_FILE_NAME = os.path.join(OUTPUT_DIRECTORY, "RL_model.pt")
OPTIMIZER_FILE_NAME = os.path.join(OUTPUT_DIRECTORY, "RL_optimizer.pt")
EXPERIENCE_FILE_NAME = os.path.join(OUTPUT_DIRECTORY, "RL_experience.pt")
#####################################################################################################################################################
"""
Indicates if we are in train or test mode.
"""
TRAIN_MODEL = True
RESET_TRAINING = True
#####################################################################################################################################################
"""
RL algorithm parameters.
"""
MAX_EXPERIENCE_SIZE = 1000
EXPERIENCE_MIN_SIZE_FOR_TRAINING = 1000
NB_BATCHES = 16
BATCH_SIZE = 32
DISCOUNT_FACTOR = 0.9
#####################################################################################################################################################
"""
Parameters of the optimizer used to train the model.
"""
LEARNING_RATE = 0.1
#####################################################################################################################################################
"""
Number of PyRat games from which to learn.
"""
NB_EPISODES = 1000
#####################################################################################################################################################
"""
Wandb parameters.
"""
USE_WANDB = True
WANDB_KEY_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "wandb.key")
#####################################################################################################################################################
############################################################### DEEP Q LEARNING MODEL ###############################################################
#####################################################################################################################################################
class DQN (torch.nn.Module):
#############################################################################################################################################
# CONSTRUCTOR #
#############################################################################################################################################
def __init__ ( self: Self,
data_shape: Tuple[int, ...],
actions_dimension: int
) -> Self:
"""
This function is the constructor of the class.
In:
* self: Reference to the current object.
* data_shape: Shape of the input data.
* actions_dimension: Number of possible actions.
Out:
* self: Reference to the current object.
"""
# Inherit from parent class
super(DQN, self).__init__()
# Define the layers
self.layers = torch.nn.Sequential(torch.nn.Flatten(),
torch.nn.Linear(torch.prod(torch.tensor(data_shape)), 32),
torch.nn.Linear(32, actions_dimension))
#############################################################################################################################################
# PUBLIC METHODS #
#############################################################################################################################################
def forward ( self: Self,
x: torch.tensor
) -> torch.tensor:
"""
This function performs a forward pass of the data through the model.
In:
* self: Reference to the current object.
* x: Input data.
Out:
* y: Output data.
"""
# Forward pass
x = x.float()
y = self.layers(x)
return y
#####################################################################################################################################################
##################################################################### FUNCTIONS #####################################################################
#####################################################################################################################################################
def build_state ( maze: Union[numpy.ndarray, Dict[int, Dict[int, int]]],
maze_width: int,
maze_height: int,
name: str,
teams: Dict[str, List[str]],
player_locations: Dict[str, int],
cheese: List[int]
) -> torch.tensor:
"""
This function builds a state tensor to use as an input for the DQN.
Here we assume a 2-player game.
In:
* maze: Map of the maze, as data type described by PyRat's "maze_representation" option.
* maze_width: Width of the maze in number of cells.
* maze_height: Height of the maze in number of cells.
* name: Name of the player being trained.
* teams: Recap of the teams of players.
* player_locations: Locations for all players in the game.
* cheese: List of available pieces of cheese in the maze.
Out:
* state: Tensor representing the state of the game.
"""
# Check if a GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Function to return an array with the player at the center
def _center_maze (location):
channel = torch.zeros(maze_height * 2 - 1, maze_width * 2 - 1, device=device)
location_row, location_col = location // maze_width, location % maze_width
for c in cheese:
c_row, c_col = c // maze_width, c % maze_width
channel[maze_height - 1 - location_row + c_row, maze_width - 1 - location_col + c_col] = 1
return channel.unsqueeze(0)
# A channel centered on the player
player_channel = _center_maze(player_locations[name])
# A channel centered on the opponent
opponent_name = get_opponent_name(teams, name)
opponent_channel = _center_maze(player_locations[opponent_name])
# Concatenate channels
state = torch.cat([player_channel, opponent_channel], dim=0).double()
return state
#####################################################################################################################################################
def select_action ( state: torch.tensor,
possible_actions: List[str],
model: DQN
) -> str:
"""
This function returns one of the possible actions, given the state of the game.
In:
* state: State of the game in which we are now.
* possible_actions: Actions that the agent can perform.
* model: Deep Q learning model used to predict the next action.
Out:
* action: One action chosen from possible_actions.
"""
# We use the model's prediction as a basis for the action
with torch.no_grad():
model_outputs = model(state.unsqueeze(0))
# We get the best action
action_index = torch.argmax(model_outputs).item()
action = possible_actions[action_index]
return action
#####################################################################################################################################################
def compute_reward ( state: torch.tensor,
scores_at_state: Dict[str, float],
new_state: torch.tensor,
scores_at_new_state: Dict[str, float],
name: str,
teams: Dict[str, List[str]],
initial_cheese_count: int
) -> float:
"""
This function returns a reward for going from a state to another.
Here we just return a negative constant, as we want to minimize the number of turns before gathering all cheese.
In:
* state: State of the game before moving.
* scores_at_state: Scores of the players before moving.
* new_state: State of the game after moving.
* scores_at_new_state: Scores of the players after moving.
* name: Name of the player being trained.
* teams: Recap of the teams of players.
* initial_cheese_count: Number of cheese in the maze at the beginning of the game.
Out:
* reward: Value of the reward to return.
"""
# If there is no change in scores, we give no reward
opponent_name = get_opponent_name(teams, name)
if scores_at_state[name] == scores_at_new_state[name] and scores_at_state[opponent_name] == scores_at_new_state[opponent_name]:
return 0.0
# If we are tied with the opponent, and already half of the cheese is collected, we give no reward
if scores_at_new_state[name] == scores_at_state[opponent_name] and scores_at_new_state[name] + scores_at_new_state[opponent_name] >= initial_cheese_count / 2:
return 0.0
# If we are tied with the opponent in the beginning of the game, we give a positive reward
if scores_at_new_state[name] == scores_at_state[opponent_name] and scores_at_new_state[name] + scores_at_new_state[opponent_name] < initial_cheese_count / 2:
return 1.0
# If the opponent wins the game, we give a negative reward
if scores_at_new_state[opponent_name] > initial_cheese_count / 2:
return -1.0
# If we win the game, we give a positive reward
if scores_at_new_state[name] > initial_cheese_count / 2:
return 1.0
# If we caught a cheese alone, we give a positive reward
if scores_at_new_state[name] > scores_at_state[name]:
return 1.0
# Otherwise, the opponent caught a cheese alone, and we give no reward
return 0.0
#####################################################################################################################################################
def make_batch ( model: DQN,
experience: List[Dict[str, Any]],
possible_actions: List[str]
) -> Tuple[torch.tensor, torch.tensor]:
"""
This function builds batches from the memory to train the model on.
Each batch is a pair (data, target), where each element has batch size as first dimension.
In:
* model: Model to train.
* experience: List of experience situations encountered across games.
* possible_actions: Actions mapped with the output of the model.
Out:
* data: Batch of data from the memory.
* targets: Targets associated with the sampled data.
"""
# Get indices
batch_size = min(BATCH_SIZE, len(experience))
indices = random.sample(range(len(experience)), batch_size)
# Create the batch
data = torch.zeros(batch_size, *experience[0]["state"].shape)
targets = torch.zeros(batch_size, len(possible_actions))
for i in range(batch_size):
# Data is the sampled state
data[i] = experience[indices[i]]["state"]
# Target is the discounted reward
with torch.no_grad():
targets[i] = model(data[i].unsqueeze(0))
if experience[indices[i]]["over"]:
targets[i, possible_actions.index(experience[indices[i]]["action"])] = experience[indices[i]]["reward"]
else:
model_outputs = model(experience[indices[i]]["new_state"].unsqueeze(0))
targets[i, possible_actions.index(experience[indices[i]]["action"])] = experience[indices[i]]["reward"] + DISCOUNT_FACTOR * torch.max(model_outputs).item()
# Done
return data, targets
#####################################################################################################################################################
def train_model ( model: DQN,
optimizer: torch.optim.Optimizer,
experience: List[Dict[str, Any]],
possible_actions: List[str]
) -> None:
"""
This function trains the model on the experience.
In:
* model: Model to train.
* optimizer: Optimizer used to train the model.
* experience: List of experience situations encountered across games.
* possible_actions: Actions mapped with the output of the model.
Out:
* None.
"""
# Define the loss function
loss_function = torch.nn.MSELoss()
# Ensure model is in train mode
model.train()
# Train loop
total_loss = 0
for b in range(NB_BATCHES):
# Create a random batch
data, targets = make_batch(model, experience, possible_actions)
# Reset gradients
optimizer.zero_grad()
# Forward + backward + optimize
outputs = model(data)
loss = loss_function(outputs, targets)
loss.backward()
optimizer.step()
# Accumulate total loss for debug
total_loss += loss.item()
# Log loss
if USE_WANDB:
wandb.log({"total_loss": total_loss})
#####################################################################################################################################################
##################################################### EXECUTED ONCE AT THE BEGINNING OF THE GAME ####################################################
#####################################################################################################################################################
def preprocessing ( maze: Union[numpy.ndarray, Dict[int, Dict[int, int]]],
maze_width: int,
maze_height: int,
name: str,
teams: Dict[str, List[str]],
player_locations: Dict[str, int],
cheese: List[int],
possible_actions: List[str],
memory: threading.local
) -> None:
"""
This function is called once at the beginning of the game.
It is typically given more time than the turn function, to perform complex computations.
Store the results of these computations in the provided memory to reuse them later during turns.
To do so, you can crete entries in the memory dictionary as memory.my_key = my_value.
In:
* maze: Map of the maze, as data type described by PyRat's "maze_representation" option.
* maze_width: Width of the maze in number of cells.
* maze_height: Height of the maze in number of cells.
* name: Name of the player controlled by this function.
* teams: Recap of the teams of players.
* player_locations: Locations for all players in the game.
* cheese: List of available pieces of cheese in the maze.
* possible_actions: List of possible actions.
* memory: Local memory to share information between preprocessing, turn and postprocessing.
Out:
* None.
"""
# Instanciate a DQN model with weights loaded from file (if any)
state_dimension = build_state(maze, maze_width, maze_height, name, teams, player_locations, cheese).shape
actions_dimension = len(possible_actions)
memory.model = DQN(state_dimension, actions_dimension)
if os.path.exists(MODEL_FILE_NAME):
memory.model.load_state_dict(torch.load(MODEL_FILE_NAME))
# Instanciate an optimizer, initialized in its previous state to preserve momentum (if any)
memory.optimizer = torch.optim.SGD(memory.model.parameters(), lr=LEARNING_RATE)
if os.path.exists(OPTIMIZER_FILE_NAME):
memory.optimizer.load_state_dict(torch.load(OPTIMIZER_FILE_NAME))
# In train mode, load the experience from previous games (if any)
memory.experience = []
if TRAIN_MODEL:
if os.path.exists(EXPERIENCE_FILE_NAME):
memory.experience = torch.load(EXPERIENCE_FILE_NAME)
# We remember the initial cheese count and previous turn scores
memory.initial_cheese_count = len(cheese)
memory.previous_scores = {p: 0 for p in player_locations}
#####################################################################################################################################################
######################################################### EXECUTED AT EACH TURN OF THE GAME #########################################################
#####################################################################################################################################################
def turn ( maze: Union[numpy.ndarray, Dict[int, Dict[int, int]]],
maze_width: int,
maze_height: int,
name: str,
teams: Dict[str, List[str]],
player_locations: Dict[str, int],
player_scores: Dict[str, float],
player_muds: Dict[str, Dict[str, Union[None, int]]],
cheese: List[int],
possible_actions: List[str],
memory: threading.local
) -> str:
"""
This function is called at every turn of the game and should return an action within the set of possible actions.
You can access the memory you stored during the preprocessing function by doing memory.my_key.
You can also update the existing memory with new information, or create new entries as memory.my_key = my_value.
In:
* maze: Map of the maze, as data type described by PyRat's "maze_representation" option.
* maze_width: Width of the maze in number of cells.
* maze_height: Height of the maze in number of cells.
* name: Name of the player controlled by this function.
* teams: Recap of the teams of players.
* player_locations: Locations for all players in the game.
* player_scores: Scores for all players in the game.
* player_muds: Indicates which player is currently crossing mud.
* cheese: List of available pieces of cheese in the maze.
* possible_actions: List of possible actions.
* memory: Local memory to share information between preprocessing, turn and postprocessing.
Out:
* action: One of the possible actions, as given in possible_actions.
"""
# Select an action to perform
state = build_state(maze, maze_width, maze_height, name, teams, player_locations, cheese)
action = select_action(state, possible_actions, memory.model)
# Stuff to do if training the model
if TRAIN_MODEL:
# Remove old experience entries if needed
if len(memory.experience) >= MAX_EXPERIENCE_SIZE:
del memory.experience[0]
# Complement the previous turn and initialize the current one
if len(memory.experience) > 0 and "over" not in memory.experience[-1]:
memory.experience[-1]["reward"] = compute_reward(memory.experience[-1]["state"], memory.previous_scores, state, player_scores, name, teams, memory.initial_cheese_count)
memory.experience[-1]["new_state"] = state
memory.experience[-1]["over"] = False
memory.experience.append({"state": state, "action": action})
# Save scores for next turn
memory.previous_scores = player_scores
# Done
return action
#####################################################################################################################################################
######################################################## EXECUTED ONCE AT THE END OF THE GAME #######################################################
#####################################################################################################################################################
def postprocessing ( maze: Union[numpy.ndarray, Dict[int, Dict[int, int]]],
maze_width: int,
maze_height: int,
name: str,
teams: Dict[str, List[str]],
player_locations: Dict[str, int],
player_scores: Dict[str, float],
player_muds: Dict[str, Dict[str, Union[None, int]]],
cheese: List[int],
possible_actions: List[str],
memory: threading.local,
stats: Dict[str, Any],
) -> None:
"""
This function is called once at the end of the game.
It is not timed, and can be used to make some cleanup, analyses of the completed game, model training, etc.
In:
* maze: Map of the maze, as data type described by PyRat's "maze_representation" option.
* maze_width: Width of the maze in number of cells.
* maze_height: Height of the maze in number of cells.
* name: Name of the player controlled by this function.
* teams: Recap of the teams of players.
* player_locations: Locations for all players in the game.
* player_scores: Scores for all players in the game.
* player_muds: Indicates which player is currently crossing mud.
* cheese: List of available pieces of cheese in the maze.
* possible_actions: List of possible actions.
* memory: Local memory to share information between preprocessing, turn and postprocessing.
Out:
* None.
"""
# Stuff to do if training the model
if TRAIN_MODEL:
# Complement the experience and save it
state = build_state(maze, maze_width, maze_height, name, teams, player_locations, cheese)
if len(memory.experience) > 0 and "over" not in memory.experience[-1]:
memory.experience[-1]["reward"] = compute_reward(memory.experience[-1]["state"], memory.previous_scores, state, player_scores, name, teams, memory.initial_cheese_count)
memory.experience[-1]["new_state"] = state
memory.experience[-1]["over"] = True
torch.save(memory.experience, EXPERIENCE_FILE_NAME)
# Train the model if we have enough memory
if len(memory.experience) >= EXPERIENCE_MIN_SIZE_FOR_TRAINING:
train_model(memory.model, memory.optimizer, memory.experience, possible_actions)
torch.save(memory.model.state_dict(), MODEL_FILE_NAME)
torch.save(memory.optimizer.state_dict(), OPTIMIZER_FILE_NAME)
# Log the final scores
if USE_WANDB:
opponent_name = get_opponent_name(teams, name)
wandb.log({"final_score[player]": player_scores[name],
"final_score[opponent]": player_scores[opponent_name],
"final_score_difference": player_scores[name] - player_scores[opponent_name]})
#####################################################################################################################################################
######################################################################## GO! ########################################################################
#####################################################################################################################################################
if __name__ == "__main__":
# Map the functions to the character
players = [{"name": "RL",
"team": "You",
"skin": "rat",
"preprocessing_function": preprocessing,
"turn_function": turn,
"postprocessing_function": postprocessing},
{"name": "Greedy",
"team": "Opponent",
"skin": "python",
"preprocessing_function": opponent.preprocessing if "preprocessing" in dir(opponent) else None,
"turn_function": opponent.turn,
"postprocessing_function": opponent.postprocessing if "postprocessing" in dir(opponent) else None}]
# Customize the game elements
config = {"maze_width": 10,
"maze_height": 10,
"cell_percentage": 100.0,
"wall_percentage": 0.0,
"mud_percentage": 0.0,
"nb_cheese": 15}
# Train mode
if TRAIN_MODEL:
# Remove old files if needed
if RESET_TRAINING:
if os.path.exists(OUTPUT_DIRECTORY):
shutil.rmtree(OUTPUT_DIRECTORY, ignore_errors=True)
os.mkdir(OUTPUT_DIRECTORY)
# Connect to WandB for monitoring
if USE_WANDB:
wandb.login(key=open(WANDB_KEY_PATH).read().strip(), force=True)
wandb.init(project="PyRat_RL", dir=OUTPUT_DIRECTORY)
# Run multiple games with no GUI
config["game_mode"] = "synchronous"
config["preprocessing_time"] = 0.0
config["turn_time"] = 0.0
config["render_mode"] = "no_rendering"
for i in tqdm.tqdm(range(NB_EPISODES), desc="Episode", position=0, leave=False):
game = PyRat(players, **config)
stats = game.start()
if stats == {}:
break
# Test mode
else:
# Make a single game with GUI
game = PyRat(players, **config)
stats = game.start()
print(stats)
#####################################################################################################################################################
#####################################################################################################################################################