Skip to content

Refactor methods #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion demo_COVID.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from matplotlib.animation import FuncAnimation

from infection import infect, recover_or_die, compute_mortality
from motion import update_positions, out_of_bounds, update_randoms
from motionHelper import update_positions, out_of_bounds, update_randoms
from path_planning import set_destination, check_at_destination, keep_at_destination
from population import initialize_population, initialize_destination_matrix

Expand Down
259 changes: 121 additions & 138 deletions motion.py
Original file line number Diff line number Diff line change
@@ -1,159 +1,142 @@
'''
file that contains all function related to population mobility
and related computations
'''

import numpy as np
from path_planning import go_to_location, set_destination, check_at_destination,\
keep_at_destination, reset_destinations
from motionHelper import out_of_bounds, update_positions, update_randoms

def update_positions(population):
'''update positions of all people

Uses heading and speed to update all positions for
the next time step

Keyword arguments
-----------------
population : ndarray
the array containing all the population information
'''
from infection import find_nearby, infect, recover_or_die, compute_mortality,\
healthcare_infection_correction

#update positions
#x
population[:,1] = population[:,1] + (population[:,3] * population[:,5])
#y
population[:,2] = population[:,2] + (population [:,4] * population[:,5])
class Motion:

return population
#This class is responsible for managing all behavior in the simulator,
#including population changes, virus proliferation, etc.


def out_of_bounds(population, xbounds, ybounds):
'''checks which people are about to go out of bounds and corrects

Function that updates headings of individuals that are about to
go outside of the world boundaries.
def __init__(self, sub_motion1, sub_motion2) -> None:
# human behavior will init here
self._human_behavior = sub_motion1
# virus behavior will init here
self._virus_behavior = sub_motion2

Keyword arguments
-----------------
population : ndarray
the array containing all the population information
def simulation_motion(self,population,destinations,Config,pop_tracker, frame):

xbounds, ybounds : list or tuple
contains the lower and upper bounds of the world [min, max]
'''
#update headings and positions where out of bounds
#update x heading
#determine number of elements that need to be updated

shp = population[:,3][(population[:,1] <= xbounds[:,0]) &
(population[:,3] < 0)].shape
population[:,3][(population[:,1] <= xbounds[:,0]) &
(population[:,3] < 0)] = np.clip(np.random.normal(loc = 0.5,
scale = 0.5/3,
size = shp),
a_min = 0.05, a_max = 1)

shp = population[:,3][(population[:,1] >= xbounds[:,1]) &
(population[:,3] > 0)].shape
population[:,3][(population[:,1] >= xbounds[:,1]) &
(population[:,3] > 0)] = np.clip(-np.random.normal(loc = 0.5,
scale = 0.5/3,
size = shp),
a_min = -1, a_max = -0.05)

#update y heading
shp = population[:,4][(population[:,2] <= ybounds[:,0]) &
(population[:,4] < 0)].shape
population[:,4][(population[:,2] <= ybounds[:,0]) &
(population[:,4] < 0)] = np.clip(np.random.normal(loc = 0.5,
scale = 0.5/3,
size = shp),
a_min = 0.05, a_max = 1)

shp = population[:,4][(population[:,2] >= ybounds[:,1]) &
(population[:,4] > 0)].shape
population[:,4][(population[:,2] >= ybounds[:,1]) &
(population[:,4] > 0)] = np.clip(-np.random.normal(loc = 0.5,
scale = 0.5/3,
size = shp),
a_min = -1, a_max = -0.05)

return population


def update_randoms(population, pop_size, speed=0.01, heading_update_chance=0.02,
speed_update_chance=0.02, heading_multiplication=1,
speed_multiplication=1):
'''updates random states such as heading and speed

Function that randomized the headings and speeds for population members
with settable odds.
#Monitor the basic actions of all people such as moving, out of bounding, or dead.
self._human_behavior.set_general_rule_motion(population, destinations, Config)

Keyword arguments
-----------------
population : ndarray
the array containing all the population information

pop_size : int
the size of the population
#special event happen(such as lockdown) and set randoms
self._human_behavior.special_event(population,destinations,Config,pop_tracker)

#The spread of the virus and whether humans infected with the virus can heal themselves
self._virus_behavior.infection(population, destinations, Config, frame)

heading_update_chance : float
the odds of updating the heading of each member, each time step

speed_update_chance : float
the oodds of updating the speed of each member, each time step

heading_multiplication : int or float
factor to multiply heading with (default headings are between -1 and 1)

speed_multiplication : int or float
factor to multiply speed with (default speeds are between 0.0001 and 0.05

speed : int or float
mean speed of population members, speeds will be taken from gaussian distribution
with mean 'speed' and sd 'speed / 3'
class Human_behavior:
'''
This class aggregates all human behaviors.

TODO: The Judgment based on age whether someone should go to school and work, could
design in here.
'''

#randomly update heading
#x
update = np.random.random(size=(pop_size,))
shp = update[update <= heading_update_chance].shape
population[:,3][update <= heading_update_chance] = np.random.normal(loc = 0,
scale = 1/3,
size = shp) * heading_multiplication
#y
update = np.random.random(size=(pop_size,))
shp = update[update <= heading_update_chance].shape
population[:,4][update <= heading_update_chance] = np.random.normal(loc = 0,
scale = 1/3,
size = shp) * heading_multiplication
#randomize speeds
update = np.random.random(size=(pop_size,))
shp = update[update <= heading_update_chance].shape
population[:,5][update <= heading_update_chance] = np.random.normal(loc = speed,
scale = speed / 3,
size = shp) * speed_multiplication

population[:,5] = np.clip(population[:,5], a_min=0.0001, a_max=0.05)
return population


def get_motion_parameters(xmin, ymin, xmax, ymax):
'''gets destination center and wander ranges

Function that returns geometric parameters of the destination
that the population members have set.

Keyword arguments:
------------------
xmin, ymin, xmax, ymax : int or float
lower and upper bounds of the destination area set.

#This function will set basic movement information for the crowd at each frame
def set_general_rule_motion(self,population, destinations, Config):

#check destinations if active
#define motion vectors if destinations active and not everybody is at destination
active_dests = len(population[population[:,11] != 0]) # look op this only once
if active_dests > 0:
if len(population[population[:,12] == 0]) > 0:
population = set_destination(population, destinations)
population = check_at_destination(population, destinations,
wander_factor = Config.wander_factor_dest,
speed = Config.speed)
else:
#keep them at destination
population = keep_at_destination(population, destinations,
Config.wander_factor)

#out of bounds
#define bounds arrays, excluding those who are marked as having a custom destination
if len(population[:,11] == 0) > 0:
_xbounds = np.array([[Config.xbounds[0] + 0.02, Config.xbounds[1] - 0.02]] * len(population[population[:,11] == 0]))
_ybounds = np.array([[Config.ybounds[0] + 0.02, Config.ybounds[1] - 0.02]] * len(population[population[:,11] == 0]))
population[population[:,11] == 0] = out_of_bounds(population[population[:,11] == 0],
_xbounds, _ybounds)

#for dead ones: set speed and heading to 0
population[:,3:5][population[:,6] == 3] = 0

#special event happen *here we just have lockdown
def special_event(self,population, destinations,Config,pop_tracker):

if Config.lockdown:
if len(pop_tracker.infectious) == 0:
mx = 0
else:
mx = np.max(pop_tracker.infectious)

if len(population[population[:,6] == 1]) >= len(population) * Config.lockdown_percentage or\
mx >= (len(population) * Config.lockdown_percentage):
#reduce speed of all members of society
population[:,5] = np.clip(population[:,5], a_min = None, a_max = 0.001)
#set speeds of complying people to 0
population[:,5][Config.lockdown_vector == 0] = 0
else:
#update randoms
population = update_randoms(population, Config.pop_size, Config.speed)
else:
#update randoms
population = update_randoms(population, Config.pop_size, Config.speed)

#TODO If the person has other special behaviors, we can add them here

update_positions(population)


def update_positions(self, population):
'''update positions of all people

Uses heading and speed to update all positions for
the next time step

Keyword arguments
-----------------
population : ndarray
the array containing all the population information
'''

#update positions
#x
population[:,1] = population[:,1] + (population[:,3] * population[:,5])
#y
population[:,2] = population[:,2] + (population [:,4] * population[:,5])

return population


class COVID_19_behavior:
'''
This class aggregates all virus behaviors.If in the future there are functions
based on temperature, vaccines, etc. that can affect the spread of the virus
and the lethality, the relevant functions can be defined here
'''

x_center = xmin + ((xmax - xmin) / 2)
y_center = ymin + ((ymax - ymin) / 2)
#Base infection
def infection(self, population, destinations, Config, frame):

#find new infections
population, destinations = infect(population, Config, frame,
send_to_location = Config.self_isolate,
location_bounds = Config.isolation_bounds,
destinations = destinations,
location_no = 1,
location_odds = Config.self_isolate_proportion)

#recover and die
population = recover_or_die(population, frame, Config)

x_wander = (xmax - xmin) / 2
y_wander = (ymax - ymin) / 2

return x_center, y_center, x_wander, y_wander
Loading