diff --git a/demo_COVID.py b/demo_COVID.py index 2443544..d8d949f 100644 --- a/demo_COVID.py +++ b/demo_COVID.py @@ -6,7 +6,7 @@ from matplotlib.animation import FuncAnimation from infection import infect, recover_or_die, compute_mortality -from motion import update_positions, out_of_bounds, update_randoms +from motionHelper import update_positions, out_of_bounds, update_randoms from path_planning import set_destination, check_at_destination, keep_at_destination from population import initialize_population, initialize_destination_matrix diff --git a/motion.py b/motion.py index 57821f9..2bc6831 100644 --- a/motion.py +++ b/motion.py @@ -1,159 +1,142 @@ -''' -file that contains all function related to population mobility -and related computations -''' - import numpy as np +from path_planning import go_to_location, set_destination, check_at_destination,\ +keep_at_destination, reset_destinations +from motionHelper import out_of_bounds, update_positions, update_randoms -def update_positions(population): - '''update positions of all people - - Uses heading and speed to update all positions for - the next time step - - Keyword arguments - ----------------- - population : ndarray - the array containing all the population information - ''' +from infection import find_nearby, infect, recover_or_die, compute_mortality,\ +healthcare_infection_correction - #update positions - #x - population[:,1] = population[:,1] + (population[:,3] * population[:,5]) - #y - population[:,2] = population[:,2] + (population [:,4] * population[:,5]) +class Motion: - return population + #This class is responsible for managing all behavior in the simulator, + #including population changes, virus proliferation, etc. -def out_of_bounds(population, xbounds, ybounds): - '''checks which people are about to go out of bounds and corrects - - Function that updates headings of individuals that are about to - go outside of the world boundaries. + def __init__(self, sub_motion1, sub_motion2) -> None: + # human behavior will init here + self._human_behavior = sub_motion1 + # virus behavior will init here + self._virus_behavior = sub_motion2 - Keyword arguments - ----------------- - population : ndarray - the array containing all the population information + def simulation_motion(self,population,destinations,Config,pop_tracker, frame): - xbounds, ybounds : list or tuple - contains the lower and upper bounds of the world [min, max] - ''' - #update headings and positions where out of bounds - #update x heading - #determine number of elements that need to be updated - - shp = population[:,3][(population[:,1] <= xbounds[:,0]) & - (population[:,3] < 0)].shape - population[:,3][(population[:,1] <= xbounds[:,0]) & - (population[:,3] < 0)] = np.clip(np.random.normal(loc = 0.5, - scale = 0.5/3, - size = shp), - a_min = 0.05, a_max = 1) - - shp = population[:,3][(population[:,1] >= xbounds[:,1]) & - (population[:,3] > 0)].shape - population[:,3][(population[:,1] >= xbounds[:,1]) & - (population[:,3] > 0)] = np.clip(-np.random.normal(loc = 0.5, - scale = 0.5/3, - size = shp), - a_min = -1, a_max = -0.05) - - #update y heading - shp = population[:,4][(population[:,2] <= ybounds[:,0]) & - (population[:,4] < 0)].shape - population[:,4][(population[:,2] <= ybounds[:,0]) & - (population[:,4] < 0)] = np.clip(np.random.normal(loc = 0.5, - scale = 0.5/3, - size = shp), - a_min = 0.05, a_max = 1) - - shp = population[:,4][(population[:,2] >= ybounds[:,1]) & - (population[:,4] > 0)].shape - population[:,4][(population[:,2] >= ybounds[:,1]) & - (population[:,4] > 0)] = np.clip(-np.random.normal(loc = 0.5, - scale = 0.5/3, - size = shp), - a_min = -1, a_max = -0.05) - - return population - - -def update_randoms(population, pop_size, speed=0.01, heading_update_chance=0.02, - speed_update_chance=0.02, heading_multiplication=1, - speed_multiplication=1): - '''updates random states such as heading and speed - - Function that randomized the headings and speeds for population members - with settable odds. + #Monitor the basic actions of all people such as moving, out of bounding, or dead. + self._human_behavior.set_general_rule_motion(population, destinations, Config) - Keyword arguments - ----------------- - population : ndarray - the array containing all the population information - - pop_size : int - the size of the population + #special event happen(such as lockdown) and set randoms + self._human_behavior.special_event(population,destinations,Config,pop_tracker) + + #The spread of the virus and whether humans infected with the virus can heal themselves + self._virus_behavior.infection(population, destinations, Config, frame) - heading_update_chance : float - the odds of updating the heading of each member, each time step - speed_update_chance : float - the oodds of updating the speed of each member, each time step - heading_multiplication : int or float - factor to multiply heading with (default headings are between -1 and 1) - speed_multiplication : int or float - factor to multiply speed with (default speeds are between 0.0001 and 0.05 - speed : int or float - mean speed of population members, speeds will be taken from gaussian distribution - with mean 'speed' and sd 'speed / 3' +class Human_behavior: + ''' + This class aggregates all human behaviors. + + TODO: The Judgment based on age whether someone should go to school and work, could + design in here. ''' - #randomly update heading - #x - update = np.random.random(size=(pop_size,)) - shp = update[update <= heading_update_chance].shape - population[:,3][update <= heading_update_chance] = np.random.normal(loc = 0, - scale = 1/3, - size = shp) * heading_multiplication - #y - update = np.random.random(size=(pop_size,)) - shp = update[update <= heading_update_chance].shape - population[:,4][update <= heading_update_chance] = np.random.normal(loc = 0, - scale = 1/3, - size = shp) * heading_multiplication - #randomize speeds - update = np.random.random(size=(pop_size,)) - shp = update[update <= heading_update_chance].shape - population[:,5][update <= heading_update_chance] = np.random.normal(loc = speed, - scale = speed / 3, - size = shp) * speed_multiplication - - population[:,5] = np.clip(population[:,5], a_min=0.0001, a_max=0.05) - return population - - -def get_motion_parameters(xmin, ymin, xmax, ymax): - '''gets destination center and wander ranges - - Function that returns geometric parameters of the destination - that the population members have set. - - Keyword arguments: - ------------------ - xmin, ymin, xmax, ymax : int or float - lower and upper bounds of the destination area set. + #This function will set basic movement information for the crowd at each frame + def set_general_rule_motion(self,population, destinations, Config): + + #check destinations if active + #define motion vectors if destinations active and not everybody is at destination + active_dests = len(population[population[:,11] != 0]) # look op this only once + if active_dests > 0: + if len(population[population[:,12] == 0]) > 0: + population = set_destination(population, destinations) + population = check_at_destination(population, destinations, + wander_factor = Config.wander_factor_dest, + speed = Config.speed) + else: + #keep them at destination + population = keep_at_destination(population, destinations, + Config.wander_factor) + + #out of bounds + #define bounds arrays, excluding those who are marked as having a custom destination + if len(population[:,11] == 0) > 0: + _xbounds = np.array([[Config.xbounds[0] + 0.02, Config.xbounds[1] - 0.02]] * len(population[population[:,11] == 0])) + _ybounds = np.array([[Config.ybounds[0] + 0.02, Config.ybounds[1] - 0.02]] * len(population[population[:,11] == 0])) + population[population[:,11] == 0] = out_of_bounds(population[population[:,11] == 0], + _xbounds, _ybounds) + + #for dead ones: set speed and heading to 0 + population[:,3:5][population[:,6] == 3] = 0 + + #special event happen *here we just have lockdown + def special_event(self,population, destinations,Config,pop_tracker): + + if Config.lockdown: + if len(pop_tracker.infectious) == 0: + mx = 0 + else: + mx = np.max(pop_tracker.infectious) + + if len(population[population[:,6] == 1]) >= len(population) * Config.lockdown_percentage or\ + mx >= (len(population) * Config.lockdown_percentage): + #reduce speed of all members of society + population[:,5] = np.clip(population[:,5], a_min = None, a_max = 0.001) + #set speeds of complying people to 0 + population[:,5][Config.lockdown_vector == 0] = 0 + else: + #update randoms + population = update_randoms(population, Config.pop_size, Config.speed) + else: + #update randoms + population = update_randoms(population, Config.pop_size, Config.speed) + + #TODO If the person has other special behaviors, we can add them here + + update_positions(population) + + + def update_positions(self, population): + '''update positions of all people + + Uses heading and speed to update all positions for + the next time step + + Keyword arguments + ----------------- + population : ndarray + the array containing all the population information + ''' + + #update positions + #x + population[:,1] = population[:,1] + (population[:,3] * population[:,5]) + #y + population[:,2] = population[:,2] + (population [:,4] * population[:,5]) + + return population + + +class COVID_19_behavior: + ''' + This class aggregates all virus behaviors.If in the future there are functions + based on temperature, vaccines, etc. that can affect the spread of the virus + and the lethality, the relevant functions can be defined here ''' - x_center = xmin + ((xmax - xmin) / 2) - y_center = ymin + ((ymax - ymin) / 2) + #Base infection + def infection(self, population, destinations, Config, frame): + + #find new infections + population, destinations = infect(population, Config, frame, + send_to_location = Config.self_isolate, + location_bounds = Config.isolation_bounds, + destinations = destinations, + location_no = 1, + location_odds = Config.self_isolate_proportion) + + #recover and die + population = recover_or_die(population, frame, Config) - x_wander = (xmax - xmin) / 2 - y_wander = (ymax - ymin) / 2 - return x_center, y_center, x_wander, y_wander diff --git a/motionHelper.py b/motionHelper.py new file mode 100644 index 0000000..57821f9 --- /dev/null +++ b/motionHelper.py @@ -0,0 +1,159 @@ +''' +file that contains all function related to population mobility +and related computations +''' + +import numpy as np + +def update_positions(population): + '''update positions of all people + + Uses heading and speed to update all positions for + the next time step + + Keyword arguments + ----------------- + population : ndarray + the array containing all the population information + ''' + + #update positions + #x + population[:,1] = population[:,1] + (population[:,3] * population[:,5]) + #y + population[:,2] = population[:,2] + (population [:,4] * population[:,5]) + + return population + + +def out_of_bounds(population, xbounds, ybounds): + '''checks which people are about to go out of bounds and corrects + + Function that updates headings of individuals that are about to + go outside of the world boundaries. + + Keyword arguments + ----------------- + population : ndarray + the array containing all the population information + + xbounds, ybounds : list or tuple + contains the lower and upper bounds of the world [min, max] + ''' + #update headings and positions where out of bounds + #update x heading + #determine number of elements that need to be updated + + shp = population[:,3][(population[:,1] <= xbounds[:,0]) & + (population[:,3] < 0)].shape + population[:,3][(population[:,1] <= xbounds[:,0]) & + (population[:,3] < 0)] = np.clip(np.random.normal(loc = 0.5, + scale = 0.5/3, + size = shp), + a_min = 0.05, a_max = 1) + + shp = population[:,3][(population[:,1] >= xbounds[:,1]) & + (population[:,3] > 0)].shape + population[:,3][(population[:,1] >= xbounds[:,1]) & + (population[:,3] > 0)] = np.clip(-np.random.normal(loc = 0.5, + scale = 0.5/3, + size = shp), + a_min = -1, a_max = -0.05) + + #update y heading + shp = population[:,4][(population[:,2] <= ybounds[:,0]) & + (population[:,4] < 0)].shape + population[:,4][(population[:,2] <= ybounds[:,0]) & + (population[:,4] < 0)] = np.clip(np.random.normal(loc = 0.5, + scale = 0.5/3, + size = shp), + a_min = 0.05, a_max = 1) + + shp = population[:,4][(population[:,2] >= ybounds[:,1]) & + (population[:,4] > 0)].shape + population[:,4][(population[:,2] >= ybounds[:,1]) & + (population[:,4] > 0)] = np.clip(-np.random.normal(loc = 0.5, + scale = 0.5/3, + size = shp), + a_min = -1, a_max = -0.05) + + return population + + +def update_randoms(population, pop_size, speed=0.01, heading_update_chance=0.02, + speed_update_chance=0.02, heading_multiplication=1, + speed_multiplication=1): + '''updates random states such as heading and speed + + Function that randomized the headings and speeds for population members + with settable odds. + + Keyword arguments + ----------------- + population : ndarray + the array containing all the population information + + pop_size : int + the size of the population + + heading_update_chance : float + the odds of updating the heading of each member, each time step + + speed_update_chance : float + the oodds of updating the speed of each member, each time step + + heading_multiplication : int or float + factor to multiply heading with (default headings are between -1 and 1) + + speed_multiplication : int or float + factor to multiply speed with (default speeds are between 0.0001 and 0.05 + + speed : int or float + mean speed of population members, speeds will be taken from gaussian distribution + with mean 'speed' and sd 'speed / 3' + ''' + + #randomly update heading + #x + update = np.random.random(size=(pop_size,)) + shp = update[update <= heading_update_chance].shape + population[:,3][update <= heading_update_chance] = np.random.normal(loc = 0, + scale = 1/3, + size = shp) * heading_multiplication + #y + update = np.random.random(size=(pop_size,)) + shp = update[update <= heading_update_chance].shape + population[:,4][update <= heading_update_chance] = np.random.normal(loc = 0, + scale = 1/3, + size = shp) * heading_multiplication + #randomize speeds + update = np.random.random(size=(pop_size,)) + shp = update[update <= heading_update_chance].shape + population[:,5][update <= heading_update_chance] = np.random.normal(loc = speed, + scale = speed / 3, + size = shp) * speed_multiplication + + population[:,5] = np.clip(population[:,5], a_min=0.0001, a_max=0.05) + return population + + +def get_motion_parameters(xmin, ymin, xmax, ymax): + '''gets destination center and wander ranges + + Function that returns geometric parameters of the destination + that the population members have set. + + Keyword arguments: + ------------------ + xmin, ymin, xmax, ymax : int or float + lower and upper bounds of the destination area set. + + ''' + + x_center = xmin + ((xmax - xmin) / 2) + y_center = ymin + ((ymax - ymin) / 2) + + x_wander = (xmax - xmin) / 2 + y_wander = (ymax - ymin) / 2 + + return x_center, y_center, x_wander, y_wander diff --git a/path_planning.py b/path_planning.py index 11804f7..b1d72dd 100644 --- a/path_planning.py +++ b/path_planning.py @@ -5,7 +5,7 @@ import numpy as np -from motion import get_motion_parameters, update_randoms +from motionHelper import update_randoms,get_motion_parameters def go_to_location(patient, destination, location_bounds, dest_no=1): '''sends patient to defined location diff --git a/population.py b/population.py index 2879a8b..32eda6d 100644 --- a/population.py +++ b/population.py @@ -8,7 +8,7 @@ import numpy as np -from motion import get_motion_parameters +from motionHelper import get_motion_parameters from utils import check_folder def initialize_population(Config, mean_age=45, max_age=105, diff --git a/simulation.py b/simulation.py index 9f6055f..2ee1b5f 100644 --- a/simulation.py +++ b/simulation.py @@ -9,10 +9,7 @@ from environment import build_hospital from infection import find_nearby, infect, recover_or_die, compute_mortality,\ healthcare_infection_correction -from motion import update_positions, out_of_bounds, update_randoms,\ -get_motion_parameters -from path_planning import go_to_location, set_destination, check_at_destination,\ -keep_at_destination, reset_destinations +from motion import Motion,Human_behavior,COVID_19_behavior from population import initialize_population, initialize_destination_matrix,\ set_destination_bounds, save_data, save_population, Population_trackers from visualiser import build_fig, draw_tstep, set_style, plot_sir @@ -38,6 +35,15 @@ def __init__(self, *args, **kwargs): #initalise destinations vector self.destinations = initialize_destination_matrix(self.Config.pop_size, 1) + #initalise human behavior + human_behavior= Human_behavior() + + #initalise virus behavior + virus_behavior = COVID_19_behavior() + + #Control all behavior here + self.motion_control = Motion(human_behavior,virus_behavior) + def reinitialise(self): '''reset the simulation''' @@ -64,65 +70,8 @@ def tstep(self): #initialize figure self.fig, self.spec, self.ax1, self.ax2 = build_fig(self.Config) - #check destinations if active - #define motion vectors if destinations active and not everybody is at destination - active_dests = len(self.population[self.population[:,11] != 0]) # look op this only once - - if active_dests > 0 and len(self.population[self.population[:,12] == 0]) > 0: - self.population = set_destination(self.population, self.destinations) - self.population = check_at_destination(self.population, self.destinations, - wander_factor = self.Config.wander_factor_dest, - speed = self.Config.speed) - - if active_dests > 0 and len(self.population[self.population[:,12] == 1]) > 0: - #keep them at destination - self.population = keep_at_destination(self.population, self.destinations, - self.Config.wander_factor) - - #out of bounds - #define bounds arrays, excluding those who are marked as having a custom destination - if len(self.population[:,11] == 0) > 0: - _xbounds = np.array([[self.Config.xbounds[0] + 0.02, self.Config.xbounds[1] - 0.02]] * len(self.population[self.population[:,11] == 0])) - _ybounds = np.array([[self.Config.ybounds[0] + 0.02, self.Config.ybounds[1] - 0.02]] * len(self.population[self.population[:,11] == 0])) - self.population[self.population[:,11] == 0] = out_of_bounds(self.population[self.population[:,11] == 0], - _xbounds, _ybounds) - - #set randoms - if self.Config.lockdown: - if len(self.pop_tracker.infectious) == 0: - mx = 0 - else: - mx = np.max(self.pop_tracker.infectious) - - if len(self.population[self.population[:,6] == 1]) >= len(self.population) * self.Config.lockdown_percentage or\ - mx >= (len(self.population) * self.Config.lockdown_percentage): - #reduce speed of all members of society - self.population[:,5] = np.clip(self.population[:,5], a_min = None, a_max = 0.001) - #set speeds of complying people to 0 - self.population[:,5][self.Config.lockdown_vector == 0] = 0 - else: - #update randoms - self.population = update_randoms(self.population, self.Config.pop_size, self.Config.speed) - else: - #update randoms - self.population = update_randoms(self.population, self.Config.pop_size, self.Config.speed) - - #for dead ones: set speed and heading to 0 - self.population[:,3:5][self.population[:,6] == 3] = 0 - - #update positions - self.population = update_positions(self.population) - - #find new infections - self.population, self.destinations = infect(self.population, self.Config, self.frame, - send_to_location = self.Config.self_isolate, - location_bounds = self.Config.isolation_bounds, - destinations = self.destinations, - location_no = 1, - location_odds = self.Config.self_isolate_proportion) - - #recover and die - self.population = recover_or_die(self.population, self.frame, self.Config) + self.motion_control.simulation_motion(self.population, self.destinations, \ + self.Config, self.pop_tracker, self.frame) #send cured back to population if self isolation active #perhaps put in recover or die class