diff --git a/braincraft/challenge.py b/braincraft/challenge.py index f4e0edf..178ed4d 100644 --- a/braincraft/challenge.py +++ b/braincraft/challenge.py @@ -5,6 +5,7 @@ import numpy as np from tqdm import tqdm from camera import Camera +from nonlinearities import NonlinearityType, get_nonlinearity_function # from bot import Bot # from environment import Environment @@ -112,6 +113,12 @@ def evaluate(model, Bot, Environment, runs=10, seed=None, debug=False): # Unfold model W_in, W, W_out, warmup, leak, f, g = model + + # Convert enum types to functions if needed + if isinstance(f, NonlinearityType): + f = get_nonlinearity_function(f) + if isinstance(g, NonlinearityType): + g = get_nonlinearity_function(g) scores = [] seeds = np.random.randint(0, 1_000_000, runs) diff --git a/braincraft/env1_player_evolution.py b/braincraft/env1_player_evolution.py index 50cdb0d..99bcf00 100644 --- a/braincraft/env1_player_evolution.py +++ b/braincraft/env1_player_evolution.py @@ -11,6 +11,8 @@ from multiprocessing import Pool, cpu_count from typing import Tuple, List, Callable, Generator +from nonlinearities import NonlinearityType, get_nonlinearity_function + # ------------------------------------------------------------------------------ # Custom reward and evalution functions # ------------------------------------------------------------------------------ @@ -196,7 +198,7 @@ def evaluate_bot_with_rewards( def evaluate_individual( args: Tuple[ - np.ndarray, np.ndarray, np.ndarray, int, float, Callable, Callable, int, int + np.ndarray, np.ndarray, np.ndarray, int, float, NonlinearityType, NonlinearityType, int, int ], ) -> Tuple[float, float]: """Worker function to evaluate a single wout""" @@ -209,7 +211,11 @@ def evaluate_individual( from bot import Bot from environment_1 import Environment - model = win, w, wout, warmup, leak, f, g + # Convert enum types to functions + f_func = get_nonlinearity_function(f) + g_func = get_nonlinearity_function(g) + + model = win, w, wout, warmup, leak, f_func, g_func score_mean, score_std = evaluate_bot_with_rewards( model, Bot, Environment, runs=3, debug=False ) @@ -223,8 +229,8 @@ def evaluate_population_parallel( w: np.ndarray, warmup: int, leak: float, - f: Callable, - g: Callable, + f: NonlinearityType, + g: NonlinearityType, seed: int, num_processes: int = None, ) -> List[float]: @@ -249,7 +255,7 @@ def evaluate_population_parallel( def evolutionary_player() -> Generator[ - Tuple[np.ndarray, np.ndarray, np.ndarray, int, float, Callable, Callable] + Tuple[np.ndarray, np.ndarray, np.ndarray, int, float, NonlinearityType, NonlinearityType] ]: """Evolutionary algorithm player - optimizes only Wout with multiprocessing""" @@ -265,8 +271,8 @@ def evolutionary_player() -> Generator[ n = 1000 p = bot.camera.resolution warmup = 0 - f = np.tanh # activation function for 'reservoir' - g = np.tanh # activation function for 'reservoir output' + f = NonlinearityType.TANH # activation function for 'reservoir' + g = NonlinearityType.TANH # activation function for 'reservoir output' leak = 0.8 spectral_radius = 0.95 # Desired spectral radius for W density = 0.1 # Density for Win and W diff --git a/braincraft/env1_player_random.py b/braincraft/env1_player_random.py index fa13cb0..5575270 100644 --- a/braincraft/env1_player_random.py +++ b/braincraft/env1_player_random.py @@ -4,11 +4,11 @@ """ Example and evaluation of the performances of a random player. """ +import numpy as np from bot import Bot from environment_1 import Environment - -def identity(x): - return x +from nonlinearities import NonlinearityType, get_nonlinearity_function +from challenge import evaluate def random_player(): """Random players building""" @@ -20,8 +20,8 @@ def random_player(): n = 1000 p = bot.camera.resolution warmup = 0 - f = np.tanh - g = np.tanh + f = NonlinearityType.TANH + g = NonlinearityType.TANH leak = 0.85 # Random search for best model for 5 tries (not efficient at all) @@ -48,7 +48,6 @@ def random_player(): # ----------------------------------------------------------------------------- if __name__ == "__main__": import time - import numpy as np from challenge import train, evaluate seed = 12345 diff --git a/braincraft/env1_player_simple.py b/braincraft/env1_player_simple.py index 8fdbe4d..6930b79 100644 --- a/braincraft/env1_player_simple.py +++ b/braincraft/env1_player_simple.py @@ -4,139 +4,152 @@ Example and evaluation of the performances of a random player. """ +import numpy as np from bot import Bot from environment_1 import Environment - -def identity(x): - return x +from nonlinearities import NonlinearityType, get_nonlinearity_function + def custom_eval(Environment, Bot, model, n_min, n_evals=1, t_max=300): """Agent evaluation with early stopping if to many wall hits happen""" - + score_list, hit_list, energy_list = [], [], [] for n in range(n_evals): - environment = Environment() bot = Bot() - + W_in, W, W_out, warmup, leak, f, g = model - + + # Convert enum types to functions if needed + if isinstance(f, NonlinearityType): + f = get_nonlinearity_function(f) + if isinstance(g, NonlinearityType): + g = get_nonlinearity_function(g) + n_cam = bot.camera.resolution n_inp = W_in.shape[1] n_rec = W.shape[0] - + # prepare simulation I = np.zeros(n_inp) X = np.zeros(n_rec) O = 0 - + t = 0 score = 0 hits = 0 - for t in range(t_max): + for t in range(t_max): if bot.hit: score -= 1 hits += 1 else: score += 1 - - I[ : n_cam] = 1 - bot.camera.depths - I[ n_cam :] = bot.hit, bot.energy, 1.0 - + + I[:n_cam] = 1 - bot.camera.depths + I[n_cam:] = bot.hit, bot.energy, 1.0 + X = (1 - leak) * X + leak * f(np.dot(W_in, I) + np.dot(W, X)) O = g(np.dot(W_out, X)) O = np.clip(O, -5, 5) bot.forward(O, environment) - + if bot.energy < 0.001 or hits >= 30: break - + score_list.append(score) hit_list.append(hits) energy_list.append(bot.energy) - + if hits >= 30: break - + return np.mean(score_list) - + + def simple_player(): """Aims to build the most simplistic controller which avoids hitting walls. Drives only along the outer lane.""" bot = Bot() - + # network hyperparameters leak = 0.95 - def act(x): - x = np.tanh(x) - return np.where(x > 0, x, 0) - + f = NonlinearityType.TANH_RELU + g = NonlinearityType.LINEAR + # model size n_cam = bot.camera.resolution n_inp = n_cam + 3 - n_rec = 1000 - + n_rec = 1000 + n_min = n_cam - 1 # only using every n_min'th sensor node - + score_best = -np.inf model_best = None while True: - model_values = np.random.uniform(0, 1, 2) - + # input parameters exc_input = model_values[0] - thresh_inp = model_values[1] - + thresh_inp = model_values[1] + # construct the model W_in = np.zeros((n_rec, n_inp)) W = np.zeros((n_rec, n_rec)) W_out = np.zeros(n_rec) - - model = W_in, W, W_out, 0, leak, act, identity - + + model = W_in, W, W_out, 0, leak, f, g + # input weights for connecting sensor to steering pop - for i in range(0, n_cam, n_min): W_in[i, i] = exc_input + for i in range(0, n_cam, n_min): + W_in[i, i] = exc_input # constant threshold for "ignoring" walls beyond a certain depth W_in[:n_cam:n_min, -1] = -thresh_inp - + # output weights W_out[:n_cam:n_min] = n_min * np.linspace(-1, 1, len(W_out[:n_cam:n_min])) - + # Evaluate model - score_mean = custom_eval(Environment, Bot, model, n_min) - - if score_mean >= 300: break - + score_mean = custom_eval(Environment, Bot, model, n_min) + + if score_mean >= 300: + break + if score_mean > score_best: score_best = score_mean model_best = model yield model_best - + yield model - + + if __name__ == "__main__": import time - import numpy as np - from challenge import train, evaluate - + + import numpy as np + from challenge import evaluate, train + seed = 78 np.random.seed(seed) - + # Training (100 seconds) - print(f"Starting training for 100 seconds (user time)") + print("Starting training for 100 seconds (user time)") model = train(simple_player, timeout=100) - - W_all = np.concatenate([np.concatenate([np.array([0]), model[2], np.zeros(67)])[None, :], - np.concatenate([np.zeros((1000, 1)), model[1], model[0]], axis=1), - np.zeros((67, 1068))]) - print("Number of 'neurons' used in network (including inputs and output):", - sum(((np.sum(np.abs(W_all), 0) > 0) + (np.sum(np.abs(W_all), 1) > 0)) > 0)) - print("Non-zero weights in network:", np.sum(W_all!=0)) - + + W_all = np.concatenate( + [ + np.concatenate([np.array([0]), model[2], np.zeros(67)])[None, :], + np.concatenate([np.zeros((1000, 1)), model[1], model[0]], axis=1), + np.zeros((67, 1068)), + ] + ) + print( + "Number of 'neurons' used in network (including inputs and output):", + sum(((np.sum(np.abs(W_all), 0) > 0) + (np.sum(np.abs(W_all), 1) > 0)) > 0), + ) + print("Non-zero weights in network:", np.sum(W_all != 0)) + # Evaluation start_time = time.time() score, std = evaluate(model, Bot, Environment, runs=10, debug=False, seed=seed) elapsed = time.time() - start_time print(f"Evaluation completed after {elapsed:.2f} seconds") print(f"Final score: {score:.2f} ± {std:.2f}") - diff --git a/braincraft/env1_player_switcher.py b/braincraft/env1_player_switcher.py index 0ca83a4..46b2bc1 100644 --- a/braincraft/env1_player_switcher.py +++ b/braincraft/env1_player_switcher.py @@ -4,15 +4,10 @@ Example and evaluation of the performances of a handcrafter switcher player. """ +import numpy as np from bot import Bot from environment_1 import Environment - -# ** activation functions ****************************************************** -def ReLU(x): - return np.clip(x, a_min=0, a_max=None) - -def identity(x): - return x +from nonlinearities import NonlinearityType, get_nonlinearity_function # ** define a model *********************************************************** def switcher_player(): @@ -100,8 +95,8 @@ def switcher_player(): # finaly, apply to output W_out[0, i_turn_mid+2] = 100 - f = lambda x: ReLU( x ) - g = identity + f = NonlinearityType.RELU + g = NonlinearityType.LINEAR model = W_in, W, W_out, warmup, leak, f, g yield model @@ -110,7 +105,6 @@ def switcher_player(): if __name__ == "__main__": import time - import numpy as np from challenge import train, evaluate seed = 78 diff --git a/braincraft/env1_player_switcher_alt.py b/braincraft/env1_player_switcher_alt.py index 59d6074..59431b2 100644 --- a/braincraft/env1_player_switcher_alt.py +++ b/braincraft/env1_player_switcher_alt.py @@ -4,15 +4,10 @@ Example and evaluation of the performances of a handcrafter switcher player. """ +import numpy as np from bot import Bot from environment_1 import Environment - -# ** activation functions ****************************************************** -def ReLU(x): - return np.clip(x, a_min=0, a_max=None) - -def identity(x): - return x +from nonlinearities import NonlinearityType, get_nonlinearity_function # ** define a model *********************************************************** def switcher_player(): @@ -23,9 +18,8 @@ def switcher_player(): # network hyperparameters leak = 0.95 - def act(x): - x = np.tanh(x) - return np.where(x > 0, x, 0) + f = NonlinearityType.TANH_RELU # Custom tanh + ReLU combination + g = NonlinearityType.LINEAR # model size n_cam = bot.camera.resolution @@ -55,7 +49,7 @@ def act(x): W = np.zeros((n_rec, n_rec)) W_out = np.zeros(n_rec) - model = W_in, W, W_out, 0, leak, act, identity + model = W_in, W, W_out, 0, leak, f, g # input weights for connecting sensor to steering pop for i in range(0, n_cam, n_min): W_in[i, i] = exc_input @@ -100,7 +94,6 @@ def act(x): if __name__ == "__main__": import time - import numpy as np from challenge import train, evaluate seed = 78 diff --git a/braincraft/nonlinearities.py b/braincraft/nonlinearities.py new file mode 100644 index 0000000..8b6b0f2 --- /dev/null +++ b/braincraft/nonlinearities.py @@ -0,0 +1,43 @@ +""" +Nonlinearity functions for neural network activation. +""" + +import numpy as np +from enum import Enum +from typing import Callable + + +class NonlinearityType(Enum): + """Enum for different nonlinearity activation functions.""" + TANH = "tanh" + SIGMOID = "sigmoid" + RELU = "relu" + HEAVISIDE = "heaviside" + LINEAR = "linear" + TANH_RELU = "tanh_relu" # Custom: tanh followed by ReLU (positive part only) + + +def get_nonlinearity_function(nonlinearity_type: NonlinearityType) -> Callable[[np.ndarray], np.ndarray]: + """ + Get the activation function corresponding to the nonlinearity type. + + Args: + nonlinearity_type: The type of nonlinearity to use + + Returns: + The activation function + """ + if nonlinearity_type == NonlinearityType.TANH: + return np.tanh + elif nonlinearity_type == NonlinearityType.SIGMOID: + return lambda x: 1 / (1 + np.exp(-np.clip(x, -500, 500))) + elif nonlinearity_type == NonlinearityType.RELU: + return lambda x: np.maximum(0, x) + elif nonlinearity_type == NonlinearityType.HEAVISIDE: + return lambda x: (x > 0).astype(float) + elif nonlinearity_type == NonlinearityType.LINEAR: + return lambda x: x + elif nonlinearity_type == NonlinearityType.TANH_RELU: + return lambda x: np.where(np.tanh(x) > 0, np.tanh(x), 0) + else: + raise ValueError(f"Unknown nonlinearity type: {nonlinearity_type}") \ No newline at end of file