From 527dd5ffa201f9a1f862ddf6850db17103f977f7 Mon Sep 17 00:00:00 2001 From: lrdossan Date: Wed, 10 Jul 2024 15:59:54 +0200 Subject: [PATCH] added methods to read and parse dynamic occupancy. Probabilistic exposure and expected new cases were removed from results --- .../calculator/report/virus_report_data.py | 20 +++-- .../validators/virus/virus_validator.py | 85 ++++++++++++++++--- 2 files changed, 87 insertions(+), 18 deletions(-) diff --git a/caimira/src/caimira/calculator/report/virus_report_data.py b/caimira/src/caimira/calculator/report/virus_report_data.py index bf4f5733..b93bf7d4 100644 --- a/caimira/src/caimira/calculator/report/virus_report_data.py +++ b/caimira/src/caimira/calculator/report/virus_report_data.py @@ -171,13 +171,19 @@ def calculate_report_data(form: VirusFormData, executor_factory: typing.Callable long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures) prob = np.array(model.infection_probability()) - prob_dist_count, prob_dist_bins = np.histogram( - prob/100, bins=100, density=True) - prob_probabilistic_exposure = np.array( - model.total_probability_rule()).mean() - expected_new_cases = np.array(model.expected_new_cases()).mean() - exposed_presence_intervals = [ - list(interval) for interval in model.exposed.presence_interval().boundaries()] + prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True) + + if form.exposure_option == "p_probabilistic_exposure": + prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean() + else: prob_probabilistic_exposure = None + + if ((isinstance(form.dynamic_infected_occupancy, typing.List) and len(form.dynamic_infected_occupancy) > 0) or + (isinstance(form.dynamic_exposed_occupancy, typing.List) and len(form.dynamic_exposed_occupancy) > 0)): + expected_new_cases = None + else: + expected_new_cases = np.array(model.expected_new_cases()).mean() + + exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()] conditional_probability_data = None uncertainties_plot_src = None diff --git a/caimira/src/caimira/calculator/validators/virus/virus_validator.py b/caimira/src/caimira/calculator/validators/virus/virus_validator.py index 9b68f460..513486de 100644 --- a/caimira/src/caimira/calculator/validators/virus/virus_validator.py +++ b/caimira/src/caimira/calculator/validators/virus/virus_validator.py @@ -439,6 +439,56 @@ class VirusFormData(FormData): ] = respiratory_activity['percentage'] return (self.precise_activity['physical_activity'], respiratory_dict) + + def generate_dynamic_occupancy(self, dynamic_occupancy: typing.List[typing.Dict[str, typing.Any]]): + ### Data format validation ### + for occupancy in dynamic_occupancy: + # Check if each occupancy entry is a dictionary + if not isinstance(occupancy, typing.Dict): + raise TypeError(f'Each occupancy entry should be in a dictionary format. Got "{type(occupancy)}."') + + # Check for required keys in each occupancy entry + dict_keys = list(occupancy.keys()) + if "total_people" not in dict_keys: + raise TypeError(f'Unable to fetch "total_people" key. Got "{dict_keys[0]}".') + else: + value = occupancy["total_people"] + # Check if the total_people value is a non-negative integer + if not isinstance(value, int): + raise ValueError(f"Total number of people should be integer. Got {value}.") + elif not value >= 0: + raise ValueError(f"Total number of people should be non-negative. Got {value}.") + + if "start_time" not in dict_keys: + raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys[1]}".') + if "finish_time" not in dict_keys: + raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys[2]}".') + + # Validate time format for start_time and finish_time + for time_key in ["start_time", "finish_time"]: + time = occupancy[time_key] + if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time): + raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".') + + transition_times = [] + values = [] + for occupancy in dynamic_occupancy: + start_time = time_string_to_minutes(occupancy['start_time'])/60 + finish_time = time_string_to_minutes(occupancy['finish_time'])/60 + transition_times.extend([start_time, finish_time]) + values.append(occupancy['total_people']) + + unique_transition_times_sorted = np.array(sorted(set(transition_times))) + + if len(values) != len(unique_transition_times_sorted) - 1: + raise ValueError("Cannot compute dynamic occupancy with the inputs provided.") + + population_occupancy: models.IntPiecewiseConstant = models.IntPiecewiseConstant( + transition_times=tuple(unique_transition_times_sorted), + values=tuple(values) + ) + population_presence: typing.Union[None, models.Interval] = None + return population_occupancy, population_presence def infected_population(self) -> mc.InfectedPopulation: # Initializes the virus @@ -453,18 +503,26 @@ class VirusFormData(FormData): expiration_defn = {'Speaking': 1, 'Breathing': self.total_people - 1} elif (self.activity_type == 'precise'): - activity_defn, expiration_defn = self.generate_precise_activity_expiration() + activity_defn, expiration_defn = self.generate_precise_activity_expiration() # TODO: what to do here? activity = activity_distributions(self.data_registry)[activity_defn] expiration = build_expiration(self.data_registry, expiration_defn) - infected_occupants = self.infected_people + if isinstance(self.dynamic_infected_occupancy, typing.List) and len(self.dynamic_infected_occupancy) > 0: + # If dynamic occupancy is defined, the generator will parse and validate the + # respective input to a format readable by the model - IntPiecewiseConstant. + infected_occupancy, infected_presence = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy) + else: + # The number of exposed occupants is the total number of occupants + # minus the number of infected occupants. + infected_occupancy = self.infected_people + infected_presence = self.infected_present_interval() infected = mc.InfectedPopulation( data_registry=self.data_registry, - number=infected_occupants, + number=infected_occupancy, + presence=infected_presence, virus=virus, - presence=self.infected_present_interval(), mask=self.mask(), activity=activity, expiration=expiration, @@ -478,11 +536,16 @@ class VirusFormData(FormData): if self.activity_type == 'precise' else str(self.data_registry.population_scenario_activity[self.activity_type]['activity'])) activity = activity_distributions(self.data_registry)[activity_defn] - - infected_occupants = self.infected_people - # The number of exposed occupants is the total number of occupants - # minus the number of infected occupants. - exposed_occupants = self.total_people - infected_occupants + + if isinstance(self.dynamic_exposed_occupancy, typing.List) and len(self.dynamic_exposed_occupancy) > 0: + # If dynamic occupancy is defined, the generator will parse and validate the + # respective input to a format readable by the model - IntPiecewiseConstant. + exposed_occupancy, exposed_presence = self.generate_dynamic_occupancy(self.dynamic_exposed_occupancy) + else: + # The number of exposed occupants is the total number of occupants + # minus the number of infected occupants. + exposed_occupancy = self.total_people - self.infected_people + exposed_presence = self.exposed_present_interval() if (self.vaccine_option): if (self.vaccine_booster_option and self.vaccine_booster_type != 'Other'): @@ -495,8 +558,8 @@ class VirusFormData(FormData): host_immunity = 0. exposed = mc.Population( - number=exposed_occupants, - presence=self.exposed_present_interval(), + number=exposed_occupancy, + presence=exposed_presence, activity=activity, mask=self.mask(), host_immunity=host_immunity,