diff --git a/resources/healthsystem/ResourceFile_HealthSystem_parameters.csv b/resources/healthsystem/ResourceFile_HealthSystem_parameters.csv index 2828a9376c..2d73d5c789 100644 --- a/resources/healthsystem/ResourceFile_HealthSystem_parameters.csv +++ b/resources/healthsystem/ResourceFile_HealthSystem_parameters.csv @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:e36cbd225191f893f2de5f0f34adc251046af5ec58daaf7c86b09a6c83c1e31d -size 379 +oid sha256:a2e9d24736e254c3eabba58e83612c8f7615d65e52402b42c07b3e7f6b26e85d +size 424 diff --git a/src/tlo/methods/healthsystem.py b/src/tlo/methods/healthsystem.py index 470242262c..3ce2e43159 100644 --- a/src/tlo/methods/healthsystem.py +++ b/src/tlo/methods/healthsystem.py @@ -1,10 +1,11 @@ import datetime import heapq as hp import itertools +import re import warnings from collections import Counter, defaultdict from collections.abc import Iterable -from itertools import repeat +from itertools import combinations, product, repeat from pathlib import Path from typing import Dict, List, NamedTuple, Optional, Tuple, Union @@ -563,6 +564,15 @@ class HealthSystem(Module): Types.BOOL, "Decide whether to scale HR capabilities by population size every year. Can be used as well as" " the dynamic_HR_scaling_factor" ), + + 'global_task_shifting_mode': Parameter( + Types.STRING, "Name of global task-shifting mode adopted"), + + 'global_task_shifting': Parameter( + Types.DICT, " Global task-shifting policy adopted by the health care system. It includes a list of the officers" + " eligible for task shifting. For each of those officers, it specifies which officers" + " can take over the officer's tasks, and a factor explaining how the originally scheduled time " + " should be scaled if performed by a different officer."), 'tclose_overwrite': Parameter( Types.INT, "Decide whether to overwrite tclose variables assigned by disease modules"), @@ -599,6 +609,7 @@ def __init__( beds_availability: Optional[str] = None, randomise_queue: bool = True, ignore_priority: bool = False, + global_task_shifting_mode: Optional[str] = None, policy_name: Optional[str] = None, capabilities_coefficient: Optional[float] = None, use_funded_or_actual_staffing: Optional[str] = None, @@ -673,7 +684,7 @@ def __init__( self.randomise_queue = randomise_queue self.ignore_priority = ignore_priority - + # This default value will be overwritten if assumed policy is not None self.lowest_priority_considered = 2 @@ -684,6 +695,8 @@ def __init__( 'VerticalProgrammes', 'ClinicallyVulnerable', 'EHP_III', 'LCOA_EHP'] self.arg_policy_name = policy_name + + self.arg_global_task_shifting_mode = global_task_shifting_mode self.tclose_overwrite = None self.tclose_days_offset_overwrite = None @@ -816,6 +829,11 @@ def read_parameters(self, data_folder): self.parameters['priority_rank'] = pd.read_excel(path_to_resourcefiles_for_healthsystem / 'priority_policies' / 'ResourceFile_PriorityRanking_ALLPOLICIES.xlsx', sheet_name=None) + + self.parameters['global_task_shifting'] = pd.read_excel(path_to_resourcefiles_for_healthsystem / 'human_resources'/ + 'task_shifting'/'ResourceFile_GlobalTaskShifting.xlsx', + sheet_name=None) + self.parameters['const_HR_scaling_table']: Dict = pd.read_excel( path_to_resourcefiles_for_healthsystem / @@ -870,6 +888,9 @@ def pre_initialise_population(self): # Set up framework for considering a priority policy self.setup_priority_policy() + + # Set up framework for considering a global task-shifting policy + self.setup_global_task_shifting() def initialise_population(self, population): @@ -971,12 +992,151 @@ def setup_priority_policy(self): self.list_fasttrack.append(('hv_diagnosed', 'FT_if_Hivdiagnosed')) if 'Tb' in self.sim.modules: self.list_fasttrack.append(('tb_diagnosed', 'FT_if_tbdiagnosed')) + + def setup_global_task_shifting(self): + + # Select the global task shifting mode to be considered + self.global_task_shifting_mode = self.get_global_task_shifting_mode_initial() + + # Load relevant sheet from resource file + df = self.parameters['global_task_shifting'][self.global_task_shifting_mode] + + self.global_task_shifting = {} + + # Iterate through the rows of the DataFrame and populate the dictionary + for index, row in df.iterrows(): + officer = row['Officer'] + alternative_officers = row['Alternative_officer'].split(',') + time_factor_str = str(row['Time_factor']) + + # Split the string into a list of floats + time_factors = [float(factor) for factor in time_factor_str.split(',')] + + # Create the entry in the dictionary + self.global_task_shifting[officer] = (alternative_officers, time_factors) + + # If task-shifting is considered, expand the number of possible appt_footprints + # to include potential task-shifting + if self.global_task_shifting: + + _appt_times_expand = {_facility_level: defaultdict(list) for _facility_level in + self._facility_levels} + + for level in self._facility_levels: + + for appt_footprint in self._appt_times[level]: + # Get all officers required for this appointment + officers_required = [subunit.officer_type for subunit in self._appt_times[level][appt_footprint]] + # Among these, select those that could be eligible for task-shifting if need should arise + officers_eligible_for_ts = [item for item in officers_required if item in self.global_task_shifting] + + tags_dictionary = {} + for officer in officers_eligible_for_ts: + tags = [] + # Find all potential task-shifting options for this officer + for officer_ts in self.global_task_shifting[officer][0]: + tags.append(officer[:4] + "-" + officer_ts[:4]) + tags_dictionary[officer] = tags + + # Calculate all possible appt footprints that may result from task-shifting, accounting + # for the fact that more than one requested officer may be eligible for task-shifting. + new_footprints = [] + for r in range(1, len(officers_eligible_for_ts)+1): + # Each combination illustrates potentially unavailable officers + for combination in combinations(officers_eligible_for_ts, r): + product_options = product(*(map(str, tags_dictionary[key]) for key in combination)) + result_strings = [appt_footprint + '_withTS_' + '_and_'.join(option) for option in product_options] + new_footprints.extend(result_strings) + + # For all these new footprints now obtain an updated call. + + # Task-shifting is logged in appt_footprint name as "xxxx-yyyy", where xxxx + # are the first four letters of officer-type being task-shifted, and yyyy + # are the first four letters of officer-type taking over tasks + pattern = re.compile(r'(?<=[-_])(\w{4}-\w{4})') + + # Initially assume this original call. This will be updated + # depending on task-shifting considered. + original_call = self._appt_times[level][appt_footprint] + # Get the officers and times associated with the original footprint + appt_footprint_times = Counter() + for appt_info in original_call: + appt_footprint_times[ + f"{appt_info.officer_type}" + ] += appt_info.time_taken + + for new_footprint in new_footprints: + + # Store all instances of task-shifting taking place in this appt footprint + task_shifting_adopted = {} + + # Find all instances of task-shifting in string + matches = pattern.findall(new_footprint) + + for match in matches: + short_original_officer = match[:4] + short_new_officer = match[5:] + + # Get the full name of the original medical officer + original_officer = (next((subunit for subunit in original_call if subunit.officer_type.startswith(short_original_officer)), None)).officer_type + + # Iterate through officers eligible for task shiting and find the appropriate time factor + # linked to this task-shifting + officer_types, factors = self.global_task_shifting[original_officer] + + for i, officer_type in enumerate(officer_types): + if officer_type.startswith(short_new_officer): + new_officer = officer_type + factor = factors[i] + break # Stop iterating once a match is found + + # Add this task shifting and factor by which will scale to dictionary + task_shifting_adopted[original_officer] = (new_officer,factor) + + if task_shifting_adopted: + updated_call_inc_task_shift = {} + # Go over all officers required for this appointment + for k in appt_footprint_times.keys(): + + # If task-shifting was requested for this officer, change name + # of officer and rescale original task by relevant factor + if k in task_shifting_adopted.keys(): + + task_for_officer = appt_footprint_times[k]*task_shifting_adopted[k][1] + j = task_shifting_adopted[k][0] + + if j in updated_call_inc_task_shift.keys(): + # If officer is already included in updated_call_inc_task_shift + # (e.g. because it was already performing own tasks as well as + # taking over that of officer not available) add to original task + updated_call_inc_task_shift[j] += task_for_officer + else: + updated_call_inc_task_shift[j] = task_for_officer + + # Else simply add original requirement to new call + else: + if k in updated_call_inc_task_shift.keys(): + # Ensure that if this officer already present in dictionary + # this task is added, not overwritten + updated_call_inc_task_shift[k] += appt_footprint_times[k] + else: + updated_call_inc_task_shift[k] = appt_footprint_times[k] + + for k in updated_call_inc_task_shift.keys(): + _appt_times_expand[level][new_footprint].append( + AppointmentSubunit( + officer_type=k, + time_taken= updated_call_inc_task_shift[k] + ) + ) + + # Add new footprints to the original dictionary at this level + self._appt_times[level].update(_appt_times_expand[level]) + def process_human_resources_files(self, use_funded_or_actual_staffing: str): """Create the data-structures needed from the information read into the parameters.""" - - # * Define Facility Levels self._facility_levels = set(self.parameters['Master_Facilities_List']['Facility_Level']) - {'5'} assert self._facility_levels == {'0', '1a', '1b', '2', '3', '4'} # todo soft code this? @@ -1010,6 +1170,9 @@ def process_human_resources_files(self, use_funded_or_actual_staffing: str): ) == len(appt_time_data) ) self._appt_times = appt_times_per_level_and_type + + + # * Define Which Appointments Are Possible At Each Facility Level appt_type_per_level_data = self.parameters['Appt_Offered_By_Facility_Level'] @@ -1307,6 +1470,14 @@ def get_priority_policy_initial(self) -> str: return self.parameters['policy_name'] \ if self.arg_policy_name is None \ else self.arg_policy_name + + def get_global_task_shifting_mode_initial(self) -> str: + """Returns `priority_policy`. (Should be equal to what is specified by the parameter, but + overwrite with what was provided in argument if an argument was specified -- provided for backward + compatibility/debugging.)""" + return self.parameters['global_task_shifting_mode'] \ + if self.arg_global_task_shifting_mode is None \ + else self.arg_global_task_shifting_mode def load_priority_policy(self, policy): @@ -1655,6 +1826,7 @@ def get_appt_footprint_as_time_request(self, facility_info: FacilityInfo, appt_f :return: A Counter that gives the times required for each officer-type in each facility_ID, where this time is non-zero. """ + # Accumulate appointment times for specified footprint using times from appointment times table. appt_footprint_times = Counter() for appt_type in appt_footprint: @@ -2367,7 +2539,7 @@ def process_events_mode_0_and_1(self, hold_over: List[HSIEventQueueItem]) -> Non hold_over.extend(_to_be_held_over) def process_events_mode_2(self, hold_over: List[HSIEventQueueItem]) -> None: - + capabilities_monitor = Counter(self.module.capabilities_today.to_dict()) set_capabilities_still_available = {k for k, v in capabilities_monitor.items() if v > 0.0} @@ -2441,21 +2613,67 @@ def process_events_mode_2(self, hold_over: List[HSIEventQueueItem]) -> None: # Retrieve officers&facility required for HSI original_call = next_event_tuple.hsi_event.expected_time_requests _priority = next_event_tuple.priority + # In this version of mode_appt_constraints = 2, do not have access to squeeze # based on queue information, and we assume no squeeze ever takes place. squeeze_factor = 0. - # Check if any of the officers required have run out. + # Check if any of the officers required have run out. If including task-shifting, + # this will involve checking if alternative capabilities are available for officers + # that are no longer available. out_of_resources = False + + # This string will store all TS taking place + task_shifting_tag = "" + for officer, call in original_call.items(): - # If any of the officers are not available, then out of resources + # If any of the officers are not available, then out of resources, unless these can be + # task-shifted if officer not in set_capabilities_still_available: + + # Set this to True for now, however if: + # 1. Task-shifting is included, + # 2. Task-shifting is available for this officer/treatment, + # 3. Alternative officers are still available + # then will reset to False out_of_resources = True + + if len(self.sim.modules['HealthSystem'].global_task_shifting) > 0: + + # Get officer type only + officer_no_facility = officer.split("Officer_")[1] + + # Extract task-shifting options for this officer + task_shift_options_for_officer = self.sim.modules['HealthSystem'].global_task_shifting.get(officer_no_facility) + + # Check if possible alternatives to officer are available + # If they are, must register that we'll be using alternative. + if task_shift_options_for_officer: + + # Unpack values + new_officer_no_facility, time_scaling = task_shift_options_for_officer + + for new_officer_no_facility, time_scaling in zip(new_officer_no_facility, time_scaling): + # Get the task-shifting officer + shift_target_officer = officer.replace(officer_no_facility, new_officer_no_facility) + + # Check if this task-shifting officer is available, and save task_shift + if shift_target_officer in set_capabilities_still_available: + # Record task-shifting + if task_shifting_tag == "": + task_shifting_tag += "_withTS_" + officer_no_facility[:4] + "-" + new_officer_no_facility[:4] + "_and_" + else: + task_shifting_tag += officer_no_facility[:4] + "-" + new_officer_no_facility[:4] + "_and_" + out_of_resources = False + + # Once we've found available officer to replace, no need to go through other + # options + break + # If officers still available, run event. Note: in current logic, a little # overtime is allowed to run last event of the day. This seems more realistic # than medical staff leaving earlier than # planned if seeing another patient would take them into overtime. - if out_of_resources: # Do not run, @@ -2499,6 +2717,7 @@ def process_events_mode_2(self, hold_over: List[HSIEventQueueItem]) -> None: f"Cannot run HSI {event.TREATMENT_ID} without facility_info being defined." # Expected appt footprint before running event + # NOTE-TO-ADD: This appt footprint needs to reflect that a different officer was used _appt_footprint_before_running = event.EXPECTED_APPT_FOOTPRINT # Run event & get actual footprint actual_appt_footprint = event.run(squeeze_factor=squeeze_factor) @@ -2509,19 +2728,36 @@ def process_events_mode_2(self, hold_over: List[HSIEventQueueItem]) -> None: # check its formatting: assert self.module.appt_footprint_is_valid(actual_appt_footprint) + # Add task-shifting tag when recalculating updated_call if not empty. + if task_shifting_tag != "": + if task_shifting_tag.endswith('_and_'): + task_shifting_tag = task_shifting_tag[:-5] + actual_appt_footprint = Counter({item + task_shifting_tag: count for item, count in actual_appt_footprint.items()}) + # Update call that will be used to compute capabilities used updated_call = self.module.get_appt_footprint_as_time_request( facility_info=event.facility_info, appt_footprint=actual_appt_footprint ) else: - actual_appt_footprint = _appt_footprint_before_running - updated_call = original_call + # If no task-shifting was required, go ahead as usual + if task_shifting_tag == "": + actual_appt_footprint = _appt_footprint_before_running + updated_call = original_call + # Else need to update footprint and recalculate call + else: + if task_shifting_tag.endswith('_and_'): + task_shifting_tag = task_shifting_tag[:-5] + actual_appt_footprint= Counter({item + task_shifting_tag: count for item, count in _appt_footprint_before_running.items()}) + updated_call = self.module.get_appt_footprint_as_time_request( + facility_info=event.facility_info, + appt_footprint=actual_appt_footprint + ) # Recalculate call on officers based on squeeze factor. for k in updated_call.keys(): updated_call[k] = updated_call[k]/(squeeze_factor + 1.) - + # Subtract this from capabilities used so-far today capabilities_monitor.subtract(updated_call) @@ -2542,7 +2778,7 @@ def process_events_mode_2(self, hold_over: List[HSIEventQueueItem]) -> None: self.module.running_total_footprint -= original_call self.module.running_total_footprint += updated_call - # Write to the log + # Write to the log. Appt footprint now includes info on task-shifting self.module.record_hsi_event( hsi_event=event, actual_appt_footprint=actual_appt_footprint, @@ -2550,6 +2786,7 @@ def process_events_mode_2(self, hold_over: List[HSIEventQueueItem]) -> None: did_run=True, priority=_priority ) + # Don't have any capabilities at all left for today, no # point in going through the queue to check what's left to do today. diff --git a/tests/test_healthsystem.py b/tests/test_healthsystem.py index e07a2d5889..2a423c8cbc 100644 --- a/tests/test_healthsystem.py +++ b/tests/test_healthsystem.py @@ -1,5 +1,6 @@ import heapq as hp import os +import re from pathlib import Path from typing import Set, Tuple @@ -1952,6 +1953,142 @@ def apply(self, person_id, squeeze_factor): assert (Nran_w_priority2 == int(tot_population/4)) & (Nran_w_priority3 == 0) +def test_task_shifting_in_mode_2(seed, tmpdir): + """Test that in mode 2 task-shifting takes place as expected even if capabilities for + required officer are no longer available, provided an alternative officer still is. + By "as expected" we mean that the first alternative officer listed is chosen preferentially. + """ + + # Create Dummy Module to host the HSI + class DummyModule(Module): + METADATA = {Metadata.DISEASE_MODULE, Metadata.USES_HEALTHSYSTEM} + + def read_parameters(self, data_folder): + pass + + def initialise_population(self, population): + pass + + def initialise_simulation(self, sim): + pass + + # Create a dummy HSI event class + class DummyHSIEvent(HSI_Event, IndividualScopeEventMixin): + def __init__(self, module, person_id, appt_type, level): + super().__init__(module, person_id=person_id) + self.TREATMENT_ID = 'DummyHSIEvent' + self.EXPECTED_APPT_FOOTPRINT = self.make_appt_footprint({appt_type: 1}) + self.ACCEPTED_FACILITY_LEVEL = level + + self.this_hsi_event_ran = False + + def apply(self, person_id, squeeze_factor): + self.this_hsi_event_ran = True + + log_config = { + "filename": "log", + "directory": tmpdir, + "custom_levels": {"tlo.methods.healthsystem": logging.DEBUG}, + } + + def simulate(task_shifting_option, Ntarget): + + sim = Simulation(start_date=start_date, seed=seed, log_config=log_config) + + # Register the core modules and simulate for 0 days + sim.register(demography.Demography(resourcefilepath=resourcefilepath), + healthsystem.HealthSystem(resourcefilepath=resourcefilepath, + capabilities_coefficient=1.0, + mode_appt_constraints=2, + global_task_shifting_mode=task_shifting_option, + ignore_priority=False, + randomise_queue=True, + policy_name="", + use_funded_or_actual_staffing='funded_plus'), + DummyModule() + ) + + tot_population = 100 + sim.make_initial_population(n=tot_population) + sim.simulate(end_date=sim.start_date) + + # Get pointer to the HealthSystemScheduler event + healthsystemscheduler = sim.modules['HealthSystem'].healthsystemscheduler + + # Force entire population in one district (keys_district[0]) and get facility ID + person_for_district = {d: i for i, d in enumerate(sim.population.props['district_of_residence'].cat.categories)} + keys_district = list(person_for_district.keys()) + + for i in range(0, int(tot_population)): + sim.population.props.at[i, 'district_of_residence'] = keys_district[0] + + + # Schedule an identical appointment for all individuals + for i in range(0, tot_population): + + hsi = DummyHSIEvent(module=sim.modules['DummyModule'], + person_id=i, + appt_type='MinorSurg', + level='1a') + + sim.modules['HealthSystem'].schedule_hsi_event( + hsi, + topen=sim.date, + tclose=sim.date + pd.DateOffset(days=1), + # Assign equal priority + priority=0 + ) + + hsi1 = DummyHSIEvent(module=sim.modules['DummyModule'], + person_id=0, # Ensures call is on officers in first district + appt_type='MinorSurg', + level='1a') + hsi1.initialise() + + facID = int((re.search(r'\d+', next(iter(hsi1.expected_time_requests)))).group()) + + pharmacy_task_time = hsi1.expected_time_requests['FacilityID_' + str(facID) + '_Officer_Pharmacy'] + nursing_task_time = hsi1.expected_time_requests['FacilityID_' + str(facID) + '_Officer_Nursing_and_Midwifery'] + clinical_task_time = hsi1.expected_time_requests['FacilityID_' + str(facID) + '_Officer_Clinical'] + + if task_shifting_option == 'default': + nursing_task_shift_factor = 0 + else: + assert 'Nursing_and_Midwifery' == sim.modules['HealthSystem'].global_task_shifting.get('Pharmacy')[0][0] + nursing_task_shift_factor = sim.modules['HealthSystem'].global_task_shifting.get('Pharmacy')[1][0] + + # Always set Pharmacy capabilities to zero. Set Clinical and Nursing capabilities such that Ntarget appointments + # can be delivered by relying on task-shifting from Nursing only. Setting Clinical time to Ntarget x clinical_task_time + # checks that when task-shifting first option (nurses) where always preferentially chosen over second. + # one (clinicians) + sim.modules['HealthSystem']._daily_capabilities['FacilityID_' + str(facID) + '_Officer_Pharmacy'] = 0.0 + sim.modules['HealthSystem']._daily_capabilities['FacilityID_' + str(facID) + '_Officer_Clinical'] = Ntarget*(clinical_task_time) + sim.modules['HealthSystem']._daily_capabilities['FacilityID_' + str(facID) + '_Officer_Nursing_and_Midwifery'] = Ntarget*(nursing_task_time + nursing_task_shift_factor*pharmacy_task_time) + + # Run healthsystemscheduler + healthsystemscheduler.apply(sim.population) + + # read the results + output = parse_log_file(sim.log_filepath, level=logging.DEBUG) + hs_output = output['tlo.methods.healthsystem']['HSI_Event'] + + return hs_output + + # Pharmacy capabilities set to zero. Clinical and Nursing capabilities initialised such that Ntarget + # appointments can be performed iff using task-shifting + Ntarget = 50 + + # Allow for task-shifting to take place (by adopting the naive mode), and check that all Ntarget events could run, even if Pharmacy + # capabilities were set to zero, + hs_output = simulate(task_shifting_option='naive', Ntarget=Ntarget) + assert hs_output['did_run'].sum() == Ntarget + + # Do not allow for task-shifting to take place (by adopting the default mode), and check that as a + # result no events could run, since Pharmacy capabilities were set to zero. + hs_output = simulate(task_shifting_option='default', Ntarget=Ntarget) + assert hs_output['did_run'].sum() == 0 + + @pytest.mark.slow def test_which_hsi_can_run(seed): """This test confirms whether, and how, HSI with each Appointment Type can run at each facility, under the