diff --git a/caimira/pyproject.toml b/caimira/pyproject.toml index b442e11f..93d1f1e4 100644 --- a/caimira/pyproject.toml +++ b/caimira/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "caimira" -version = "4.17.0a1" +version = "4.17.0a2" description = "CAiMIRA - CERN Airborne Model for Indoor Risk Assessment" license = { text = "Apache-2.0" } authors = [ diff --git a/caimira/src/caimira/calculator/models/models.py b/caimira/src/caimira/calculator/models/models.py index 79577b67..9af2a8cf 100644 --- a/caimira/src/caimira/calculator/models/models.py +++ b/caimira/src/caimira/calculator/models/models.py @@ -1867,20 +1867,22 @@ class ExposureModel: return 0 def expected_new_cases(self) -> _VectorisedFloat: - if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or - isinstance(self.exposed.number, IntPiecewiseConstant)): - raise NotImplementedError("Cannot compute expected new cases " - "with dynamic occupancy") - """ The expected_new_cases may provide one or two different outputs: 1) Long-range exposure: take the infection_probability and multiply by the occupants exposed to long-range. 2) Short- and long-range exposure: take the infection_probability of long-range multiplied by the occupants exposed to long-range only, plus the infection_probability of short- and long-range multiplied by the occupants exposed to short-range only. + + Currently disabled when dynamic occupancy is defined for the exposed population. """ + if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or + isinstance(self.exposed.number, IntPiecewiseConstant)): + raise NotImplementedError("Cannot compute expected new cases " + "with dynamic occupancy") + if self.short_range != (): - new_cases_long_range = nested_replace(self, {'short_range': (),}).infection_probability() * (self.exposed.number - self.exposed_to_short_range) + new_cases_long_range = nested_replace(self, {'short_range': [],}).infection_probability() * (self.exposed.number - self.exposed_to_short_range) return (new_cases_long_range + (self.infection_probability() * self.exposed_to_short_range)) / 100 return self.infection_probability() * self.exposed.number / 100 @@ -1890,7 +1892,9 @@ class ExposureModel: The reproduction number can be thought of as the expected number of cases directly generated by one infected case in a population. + Currently disabled when dynamic occupancy is defined for both the infected and exposed population. """ + if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or isinstance(self.exposed.number, IntPiecewiseConstant)): raise NotImplementedError("Cannot compute reproduction number " @@ -1907,3 +1911,4 @@ class ExposureModel: ) return single_exposure_model.expected_new_cases() + \ No newline at end of file diff --git a/caimira/src/caimira/calculator/report/virus_report_data.py b/caimira/src/caimira/calculator/report/virus_report_data.py index bf4f5733..6c4d62e6 100644 --- a/caimira/src/caimira/calculator/report/virus_report_data.py +++ b/caimira/src/caimira/calculator/report/virus_report_data.py @@ -171,13 +171,17 @@ def calculate_report_data(form: VirusFormData, executor_factory: typing.Callable long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures) prob = np.array(model.infection_probability()) - prob_dist_count, prob_dist_bins = np.histogram( - prob/100, bins=100, density=True) - prob_probabilistic_exposure = np.array( - model.total_probability_rule()).mean() - expected_new_cases = np.array(model.expected_new_cases()).mean() - exposed_presence_intervals = [ - list(interval) for interval in model.exposed.presence_interval().boundaries()] + prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True) + + # Probabilistic exposure and expected new cases (only for static occupancy) + prob_probabilistic_exposure = None + expected_new_cases = None + if form.occupancy_format == "static": + if form.exposure_option == "p_probabilistic_exposure": + prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean() + expected_new_cases = np.array(model.expected_new_cases()).mean() + + exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()] conditional_probability_data = None uncertainties_plot_src = None @@ -207,9 +211,9 @@ def calculate_report_data(form: VirusFormData, executor_factory: typing.Callable "prob_hist_bins": list(prob_dist_bins), "prob_probabilistic_exposure": prob_probabilistic_exposure, "expected_new_cases": expected_new_cases, + "uncertainties_plot_src": uncertainties_plot_src, "CO2_concentrations": CO2_concentrations, "conditional_probability_data": conditional_probability_data, - "uncertainties_plot_src": uncertainties_plot_src, } @@ -399,8 +403,14 @@ def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, m scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model() else: - no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[], - total_people=form.total_people - form.short_range_occupants) + # When dynamic occupancy is defined, the replace of total people is useless - the expected number of new cases is not calculated. + if form.occupancy_format == 'static': + no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[], total_people=form.total_people - form.short_range_occupants) + elif form.occupancy_format == 'dynamic': + for occ in form.dynamic_exposed_occupancy: # Update the number of exposed people with long-range exposure + if occ['total_people'] > form.short_range_occupants: occ['total_people'] = max(0, occ['total_people'] - form.short_range_occupants) + no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[], dynamic_exposed_occupancy=form.dynamic_exposed_occupancy) + scenarios['Base scenario without short-range interactions'] = no_short_range_alternative.build_mc_model() return scenarios @@ -409,24 +419,20 @@ def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, m def scenario_statistics( mc_model: mc.ExposureModel, sample_times: typing.List[float], - compute_prob_exposure: bool + static_occupancy: bool, + compute_prob_exposure: bool, ): model = mc_model.build_model( size=mc_model.data_registry.monte_carlo['sample_size']) - if (compute_prob_exposure): - # It means we have data to calculate the total_probability_rule - prob_probabilistic_exposure = model.total_probability_rule() - else: - prob_probabilistic_exposure = 0. return { 'probability_of_infection': np.mean(model.infection_probability()), - 'expected_new_cases': np.mean(model.expected_new_cases()), + 'expected_new_cases': np.mean(model.expected_new_cases()) if static_occupancy else None, 'concentrations': [ np.mean(model.concentration(time)) for time in sample_times ], - 'prob_probabilistic_exposure': prob_probabilistic_exposure, + 'prob_probabilistic_exposure': model.total_probability_rule() if compute_prob_exposure else None, } @@ -447,16 +453,15 @@ def comparison_report( else: statistics = {} - if (form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure"): - compute_prob_exposure = True - else: - compute_prob_exposure = False - + static_occupancy = form.occupancy_format == "static" + compute_prob_exposure = form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure" and static_occupancy + with executor_factory() as executor: results = executor.map( scenario_statistics, scenarios.values(), [report_data['times']] * len(scenarios), + [static_occupancy] * len(scenarios), [compute_prob_exposure] * len(scenarios), timeout=60, ) diff --git a/caimira/src/caimira/calculator/validators/co2/co2_validator.py b/caimira/src/caimira/calculator/validators/co2/co2_validator.py index ba865a7c..36ed80fa 100644 --- a/caimira/src/caimira/calculator/validators/co2/co2_validator.py +++ b/caimira/src/caimira/calculator/validators/co2/co2_validator.py @@ -44,11 +44,14 @@ class CO2FormData(FormData): 'infected_lunch_option': True, 'infected_lunch_start': '12:30', 'infected_people': 1, + 'dynamic_infected_occupancy': '[]', 'infected_start': '08:30', 'room_capacity': None, 'room_volume': NO_DEFAULT, 'specific_breaks': '{}', 'total_people': NO_DEFAULT, + 'dynamic_exposed_occupancy': '[]', + 'occupancy_format': 'static', } def __init__(self, **kwargs): @@ -60,7 +63,7 @@ class CO2FormData(FormData): self.data_registry = DataRegistry() def validate(self): - # Validate population parameters + # Validate population parameters self.validate_population_parameters() # Validate room capacity @@ -194,20 +197,36 @@ class CO2FormData(FormData): size = size or self.data_registry.monte_carlo['sample_size'] # Build a simple infected and exposed population for the case when presence # intervals and number of people are dynamic. Activity type is not needed. - infected_presence = self.infected_present_interval() + if self.occupancy_format == 'dynamic': + if isinstance(self.dynamic_infected_occupancy, typing.List) and len(self.dynamic_infected_occupancy) > 0: + infected_people = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy) + infected_presence = None + else: + raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_infected_occupancy}".') + if isinstance(self.dynamic_exposed_occupancy, typing.List) and len(self.dynamic_exposed_occupancy) > 0: + exposed_people = self.generate_dynamic_occupancy(self.dynamic_exposed_occupancy) + exposed_presence = None + else: + raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_exposed_occupancy}".') + else: + infected_people = self.infected_people + exposed_people = self.total_people - self.infected_people + infected_presence = self.infected_present_interval() + exposed_presence = self.exposed_present_interval() + infected_population = models.SimplePopulation( - number=self.infected_people, + number=infected_people, presence=infected_presence, activity=None, # type: ignore ) - exposed_presence = self.exposed_present_interval() exposed_population=models.SimplePopulation( - number=self.total_people - self.infected_people, + number=exposed_people, presence=exposed_presence, activity=None, # type: ignore ) - - all_state_changes = self.population_present_changes(infected_presence, exposed_presence) + + all_state_changes=self.population_present_changes(infected_population.presence_interval(), + exposed_population.presence_interval()) total_people = [infected_population.people_present(stop) + exposed_population.people_present(stop) for _, stop in zip(all_state_changes[:-1], all_state_changes[1:])] diff --git a/caimira/src/caimira/calculator/validators/defaults.py b/caimira/src/caimira/calculator/validators/defaults.py index 30cb928f..bd14db38 100644 --- a/caimira/src/caimira/calculator/validators/defaults.py +++ b/caimira/src/caimira/calculator/validators/defaults.py @@ -37,6 +37,7 @@ DEFAULTS = { 'infected_lunch_option': True, 'infected_lunch_start': '12:30', 'infected_people': 1, + 'dynamic_infected_occupancy': NO_DEFAULT, 'infected_start': '08:30', 'inside_temp': NO_DEFAULT, 'location_latitude': NO_DEFAULT, @@ -49,12 +50,14 @@ DEFAULTS = { 'mask_type': 'Type I', 'mask_wearing_option': 'mask_off', 'mechanical_ventilation_type': 'not-applicable', + 'occupancy_format': 'static', 'opening_distance': 0., 'room_heating_option': False, 'room_number': NO_DEFAULT, 'room_volume': 0., 'simulation_name': NO_DEFAULT, 'total_people': NO_DEFAULT, + 'dynamic_exposed_occupancy': NO_DEFAULT, 'vaccine_option': False, 'vaccine_booster_option': False, 'vaccine_type': 'AZD1222_(AstraZeneca)', @@ -71,7 +74,7 @@ DEFAULTS = { 'window_opening_regime': 'windows_open_permanently', 'sensor_in_use': '', 'short_range_option': 'short_range_no', - 'short_range_interactions': '[]', + 'short_range_interactions': NO_DEFAULT, 'short_range_occupants': 0, } diff --git a/caimira/src/caimira/calculator/validators/form_validator.py b/caimira/src/caimira/calculator/validators/form_validator.py index 233cdbf6..074ebf20 100644 --- a/caimira/src/caimira/calculator/validators/form_validator.py +++ b/caimira/src/caimira/calculator/validators/form_validator.py @@ -4,6 +4,7 @@ import logging import typing import ast import json +import re import numpy as np @@ -18,7 +19,7 @@ minutes_since_midnight = typing.NewType('minutes_since_midnight', int) @dataclasses.dataclass class FormData: - specific_breaks: dict + # Static occupancy inputs exposed_coffee_break_option: str exposed_coffee_duration: int exposed_finish: minutes_since_midnight @@ -26,21 +27,24 @@ class FormData: exposed_lunch_option: bool exposed_lunch_start: minutes_since_midnight exposed_start: minutes_since_midnight - # Used if infected_dont_have_breaks_with_exposed - infected_coffee_break_option: str - infected_coffee_duration: int # Used if infected_dont_have_breaks_with_exposed + infected_coffee_break_option: str #Used if infected_dont_have_breaks_with_exposed + infected_coffee_duration: int #Used if infected_dont_have_breaks_with_exposed infected_dont_have_breaks_with_exposed: bool infected_finish: minutes_since_midnight - # Used if infected_dont_have_breaks_with_exposed - infected_lunch_finish: minutes_since_midnight - infected_lunch_option: bool # Used if infected_dont_have_breaks_with_exposed - # Used if infected_dont_have_breaks_with_exposed - infected_lunch_start: minutes_since_midnight - infected_people: int + infected_lunch_finish: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed + infected_lunch_option: bool #Used if infected_dont_have_breaks_with_exposed + infected_lunch_start: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed infected_start: minutes_since_midnight + infected_people: int + occupancy_format: str room_volume: float + specific_breaks: dict total_people: int + # Dynamic occupancy inputs + dynamic_exposed_occupancy: list + dynamic_infected_occupancy: list + data_registry: DataRegistry _DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS @@ -94,76 +98,110 @@ class FormData: return form_dict def validate_population_parameters(self): - # Validate number of infected <= number of total people - if self.infected_people >= self.total_people: - raise ValueError( - 'Number of infected people cannot be greater or equal to the number of total people.') - - # Validate time intervals selected by user - time_intervals = [ - ['exposed_start', 'exposed_finish'], - ['infected_start', 'infected_finish'], - ] - if self.exposed_lunch_option: - time_intervals.append( - ['exposed_lunch_start', 'exposed_lunch_finish']) - if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option: - time_intervals.append( - ['infected_lunch_start', 'infected_lunch_finish']) - - for start_name, end_name in time_intervals: - start = getattr(self, start_name) - end = getattr(self, end_name) - if start > end: + # Static occupancy is defined. + if self.occupancy_format == 'static': + # Validate number of infected <= number of total people + if self.infected_people >= self.total_people: raise ValueError( - f"{start_name} must be less than {end_name}. Got {start} and {end}.") + 'Number of infected people cannot be greater or equal to the number of total people.') - def validate_lunch(start, finish): - lunch_start = getattr(self, f'{population}_lunch_start') - lunch_finish = getattr(self, f'{population}_lunch_finish') - return (start <= lunch_start <= finish and - start <= lunch_finish <= finish) + # Validate time intervals selected by user + time_intervals = [ + ['exposed_start', 'exposed_finish'], + ['infected_start', 'infected_finish'], + ] + if self.exposed_lunch_option: + time_intervals.append( + ['exposed_lunch_start', 'exposed_lunch_finish']) + if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option: + time_intervals.append( + ['infected_lunch_start', 'infected_lunch_finish']) - def get_lunch_mins(population): - lunch_mins = 0 - if getattr(self, f'{population}_lunch_option'): - lunch_mins = getattr( - self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start') - return lunch_mins - - def get_coffee_mins(population): - coffee_mins = 0 - if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0': - coffee_mins = COFFEE_OPTIONS_INT[getattr( - self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration') - return coffee_mins - - def get_activity_mins(population): - return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start') - - populations = [ - 'exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed'] - for population in populations: - # Validate lunch time within the activity times. - if (getattr(self, f'{population}_lunch_option') and - not validate_lunch(getattr(self, f'{population}_start'), getattr( - self, f'{population}_finish')) - ): - raise ValueError( - f"{population} lunch break must be within presence times." - ) - - # Length of breaks < length of activity - if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population): - raise ValueError( - f"Length of breaks >= Length of {population} presence." - ) - - for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT), - ('infected_coffee_break_option', COFFEE_OPTIONS_INT)]: - if getattr(self, attr_name) not in valid_set: + for start_name, end_name in time_intervals: + start = getattr(self, start_name) + end = getattr(self, end_name) + if start > end: raise ValueError( - f"{getattr(self, attr_name)} is not a valid value for {attr_name}") + f"{start_name} must be less than {end_name}. Got {start} and {end}.") + + def validate_lunch(start, finish): + lunch_start = getattr(self, f'{population}_lunch_start') + lunch_finish = getattr(self, f'{population}_lunch_finish') + return (start <= lunch_start <= finish and + start <= lunch_finish <= finish) + + def get_lunch_mins(population): + lunch_mins = 0 + if getattr(self, f'{population}_lunch_option'): + lunch_mins = getattr( + self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start') + return lunch_mins + + def get_coffee_mins(population): + coffee_mins = 0 + if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0': + coffee_mins = COFFEE_OPTIONS_INT[getattr( + self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration') + return coffee_mins + + def get_activity_mins(population): + return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start') + + populations = [ + 'exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed'] + for population in populations: + # Validate lunch time within the activity times. + if (getattr(self, f'{population}_lunch_option') and + not validate_lunch(getattr(self, f'{population}_start'), getattr( + self, f'{population}_finish')) + ): + raise ValueError( + f"{population} lunch break must be within presence times." + ) + + # Length of breaks < length of activity + if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population): + raise ValueError( + f"Length of breaks >= Length of {population} presence." + ) + + for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT), + ('infected_coffee_break_option', COFFEE_OPTIONS_INT)]: + if getattr(self, attr_name) not in valid_set: + raise ValueError( + f"{getattr(self, attr_name)} is not a valid value for {attr_name}") + # Dynamic occupancy is defined. + elif self.occupancy_format == 'dynamic': + for dynamic_format in (self.dynamic_infected_occupancy, self.dynamic_exposed_occupancy): + for occupancy in dynamic_format: + # Check if each occupancy entry is a dictionary + if not isinstance(occupancy, typing.Dict): + raise TypeError(f'Each occupancy entry should be in a dictionary format. Got "{type(occupancy)}".') + + # Check for required keys in each occupancy entry + dict_keys = list(occupancy.keys()) + if "total_people" not in dict_keys: + raise TypeError(f'Unable to fetch "total_people" key. Got "{dict_keys}".') + else: + value = occupancy["total_people"] + # Check if the value is a non-negative integer + if not isinstance(value, int): + raise ValueError(f'Total number of people should be integer. Got "{type(value)}".') + elif not value >= 0: + raise ValueError(f'Total number of people should be non-negative. Got "{value}".') + + if "start_time" not in dict_keys: + raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys}".') + if "finish_time" not in dict_keys: + raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys}".') + + # Validate time format for start_time and finish_time + for time_key in ["start_time", "finish_time"]: + time = occupancy[time_key] + if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time): + raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".') + else: + raise ValueError(f"'{self.occupancy_format}' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.") def validate(self): raise NotImplementedError("Subclass must implement") @@ -369,14 +407,6 @@ class FormData: breaks=breaks, ) - def population_present_interval(self) -> models.Interval: - state_change_times = set( - self.infected_present_interval().transition_times()) - state_change_times.update( - self.exposed_present_interval().transition_times()) - all_state_changes = sorted(state_change_times) - return models.SpecificInterval(tuple(zip(all_state_changes[:-1], all_state_changes[1:]))) - def exposed_present_interval(self) -> models.Interval: if self.specific_breaks != {}: # It means the breaks are specific and not predefined breaks = self.generate_specific_break_times( @@ -387,6 +417,26 @@ class FormData: self.exposed_start, self.exposed_finish, breaks=breaks, ) + + def generate_dynamic_occupancy(self, dynamic_occupancy: typing.List[typing.Dict[str, typing.Any]]): + transition_times = [] + values = [] + for occupancy in dynamic_occupancy: + start_time = time_string_to_minutes(occupancy['start_time'])/60 + finish_time = time_string_to_minutes(occupancy['finish_time'])/60 + transition_times.extend([start_time, finish_time]) + values.append(occupancy['total_people']) + + unique_transition_times_sorted = np.array(sorted(set(transition_times))) + + if len(values) != len(unique_transition_times_sorted) - 1: + raise ValueError("Cannot compute dynamic occupancy with the provided inputs.") + + population_occupancy: models.IntPiecewiseConstant = models.IntPiecewiseConstant( + transition_times=tuple(unique_transition_times_sorted), + values=tuple(values) + ) + return population_occupancy def _hours2timestring(hours: float): diff --git a/caimira/src/caimira/calculator/validators/virus/virus_validator.py b/caimira/src/caimira/calculator/validators/virus/virus_validator.py index 323c8106..ca491b7f 100644 --- a/caimira/src/caimira/calculator/validators/virus/virus_validator.py +++ b/caimira/src/caimira/calculator/validators/virus/virus_validator.py @@ -73,6 +73,7 @@ class VirusFormData(FormData): _DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS def validate(self): + # Validate population parameters self.validate_population_parameters() validation_tuples = [('activity_type', self.data_registry.population_scenario_activity.keys()), @@ -202,11 +203,29 @@ class VirusFormData(FormData): f'The sum of all respiratory activities should be 100. Got {total_percentage}.') # Validate number of people with short-range interactions - max_occupants_for_sr = self.total_people - self.infected_people + if self.occupancy_format == "static": max_occupants_for_sr = self.total_people - self.infected_people + else: max_occupants_for_sr = np.max(np.array([entry["total_people"] for entry in self.dynamic_exposed_occupancy])) if self.short_range_occupants > max_occupants_for_sr: raise ValueError( f'The total number of occupants having short-range interactions ({self.short_range_occupants}) should be lower than the exposed population ({max_occupants_for_sr}).' ) + + # Validate short-range interactions interval + if self.short_range_option == "short_range_yes": + for interaction in self.short_range_interactions: + # Check if presence is within long-range exposure + presence = self.short_range_interval(interaction) + if (self.occupancy_format == 'dynamic'): + long_range_start = min(time_string_to_minutes(self.dynamic_infected_occupancy[0]['start_time']), + time_string_to_minutes(self.dynamic_exposed_occupancy[0]['start_time'])) + long_range_stop = max(time_string_to_minutes(self.dynamic_infected_occupancy[-1]['finish_time']), + time_string_to_minutes(self.dynamic_exposed_occupancy[-1]['finish_time'])) + else: + long_range_start = min(self.infected_start, self.exposed_start) + long_range_stop = max(self.infected_finish, self.exposed_finish) + if not (long_range_start/60 <= presence.present_times[0][0] <= long_range_stop/60 and + long_range_start/60 <= presence.present_times[0][-1] <= long_range_stop/60): + raise ValueError(f"Short-range interactions should be defined during simulation time. Got {interaction}") def initialize_room(self) -> models.Room: # Initializes room with volume either given directly or as product of area and height @@ -230,7 +249,7 @@ class VirusFormData(FormData): def build_mc_model(self) -> mc.ExposureModel: room = self.initialize_room() ventilation: models._VentilationBase = self.ventilation() - infected_population = self.infected_population() + infected_population: models.InfectedPopulation = self.infected_population() short_range = [] if self.short_range_option == "short_range_yes": for interaction in self.short_range_interactions: @@ -444,27 +463,39 @@ class VirusFormData(FormData): # Initializes the virus virus = virus_distributions(self.data_registry)[self.virus_type] - activity_defn = self.data_registry.population_scenario_activity[ - self.activity_type]['activity'] - expiration_defn = self.data_registry.population_scenario_activity[ - self.activity_type]['expiration'] + # Occupancy + if self.occupancy_format == 'dynamic': + if isinstance(self.dynamic_infected_occupancy, typing.List) and len(self.dynamic_infected_occupancy) > 0: + # If dynamic occupancy is defined, the generator will parse and validate the + # respective input to a format readable by the model - `IntPiecewiseConstant`. + infected_occupancy = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy) + infected_presence = None + else: + raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_infected_occupancy}".') + else: + # The number of exposed occupants is the total number of occupants + # minus the number of infected occupants. + infected_occupancy = self.infected_people + infected_presence = self.infected_present_interval() + + # Activity and expiration + activity_defn = self.data_registry.population_scenario_activity[self.activity_type]['activity'] + expiration_defn = self.data_registry.population_scenario_activity[self.activity_type]['expiration'] if (self.activity_type == 'smallmeeting'): # Conversation of N people is approximately 1/N% of the time speaking. - expiration_defn = {'Speaking': 1, - 'Breathing': self.total_people - 1} + total_people: int = max(infected_occupancy.values) if self.occupancy_format == 'dynamic' else self.total_people + expiration_defn = {'Speaking': 1, 'Breathing': total_people - 1} elif (self.activity_type == 'precise'): activity_defn, expiration_defn = self.generate_precise_activity_expiration() activity = activity_distributions(self.data_registry)[activity_defn] expiration = build_expiration(self.data_registry, expiration_defn) - infected_occupants = self.infected_people - infected = mc.InfectedPopulation( data_registry=self.data_registry, - number=infected_occupants, + number=infected_occupancy, + presence=infected_presence, virus=virus, - presence=self.infected_present_interval(), mask=self.mask(), activity=activity, expiration=expiration, @@ -479,10 +510,19 @@ class VirusFormData(FormData): else str(self.data_registry.population_scenario_activity[self.activity_type]['activity'])) activity = activity_distributions(self.data_registry)[activity_defn] - infected_occupants = self.infected_people - # The number of exposed occupants is the total number of occupants - # minus the number of infected occupants. - exposed_occupants = self.total_people - infected_occupants + if self.occupancy_format == 'dynamic': + if isinstance(self.dynamic_exposed_occupancy, typing.List) and len(self.dynamic_exposed_occupancy) > 0: + # If dynamic occupancy is defined, the generator will parse and validate the + # respective input to a format readable by the model - IntPiecewiseConstant. + exposed_occupancy = self.generate_dynamic_occupancy(self.dynamic_exposed_occupancy) + exposed_presence = None + else: + raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_exposed_occupancy}".') + else: + # The number of exposed occupants is the total number of occupants + # minus the number of infected occupants. + exposed_occupancy = self.total_people - self.infected_people + exposed_presence = self.exposed_present_interval() if (self.vaccine_option): if (self.vaccine_booster_option and self.vaccine_booster_type != 'Other'): @@ -495,8 +535,8 @@ class VirusFormData(FormData): host_immunity = 0. exposed = mc.Population( - number=exposed_occupants, - presence=self.exposed_present_interval(), + number=exposed_occupancy, + presence=exposed_presence, activity=activity, mask=self.mask(), host_immunity=host_immunity, @@ -528,8 +568,12 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]: 'activity_type': 'office', 'air_changes': '', 'air_supply': '', + 'ascertainment_bias': 'confidence_low', 'ceiling_height': '', 'conditional_probability_viral_loads': '0', + 'dynamic_exposed_occupancy': '[]', + 'dynamic_infected_occupancy': '[]', + 'event_month': 'January', 'exposed_coffee_break_option': 'coffee_break_4', 'exposed_coffee_duration': '10', 'exposed_finish': '18:00', @@ -538,6 +582,8 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]: 'exposed_lunch_start': '12:30', 'exposed_start': '09:00', 'floor_area': '', + 'geographic_cases': 0, + 'geographic_population': 0, 'hepa_amount': '250', 'hepa_option': '0', 'humidity': '0.5', @@ -554,36 +600,33 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]: 'location_latitude': 46.20833, 'location_longitude': 6.14275, 'location_name': 'Geneva', - 'geographic_population': 0, - 'geographic_cases': 0, - 'ascertainment_bias': 'confidence_low', 'mask_type': 'Type I', 'mask_wearing_option': 'mask_off', 'mechanical_ventilation_type': '', 'calculator_version': calculator_version, 'opening_distance': '0.2', - 'event_month': 'January', + 'occupancy_format': 'static', 'room_heating_option': '0', 'room_number': '123', 'room_volume': '75', + 'short_range_interactions': '[]', + 'short_range_option': 'short_range_no', 'simulation_name': 'Test', 'total_people': '10', - 'vaccine_option': '0', 'vaccine_booster_option': '0', - 'vaccine_type': 'Ad26.COV2.S_(Janssen)', 'vaccine_booster_type': 'AZD1222_(AstraZeneca)', + 'vaccine_option': '0', + 'vaccine_type': 'Ad26.COV2.S_(Janssen)', 'ventilation_type': 'natural_ventilation', 'virus_type': 'SARS_CoV_2', 'volume_type': 'room_volume_explicit', + 'window_height': '2', + 'window_opening_regime': 'windows_open_permanently', 'windows_duration': '10', 'windows_frequency': '60', - 'window_height': '2', + 'windows_number': '1', 'window_type': 'window_sliding', 'window_width': '2', - 'windows_number': '1', - 'window_opening_regime': 'windows_open_permanently', - 'short_range_option': 'short_range_no', - 'short_range_interactions': '[]', } diff --git a/caimira/tests/apps/calculator/test_model_generator.py b/caimira/tests/apps/calculator/test_model_generator.py index 435ecd39..5feb19ef 100644 --- a/caimira/tests/apps/calculator/test_model_generator.py +++ b/caimira/tests/apps/calculator/test_model_generator.py @@ -1,5 +1,5 @@ import dataclasses -import typing +import re import numpy as np import numpy.testing as npt @@ -588,3 +588,53 @@ def test_form_timezone(baseline_form_data, data_registry, longitude, latitude, m name, offset = form.tz_name_and_utc_offset() assert name == expected_tz_name assert offset == expected_offset + + +@pytest.mark.parametrize( + ["occupancy_format_input", "error"], + [ + ['dynamc', "'dynamc' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",], + ['stact', "'stact' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",], + ['random', "'random' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",] + ] +) +def test_dynamic_format_input(occupancy_format_input, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy_format = occupancy_format_input + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + +@pytest.mark.parametrize( + ["dynamic_occupancy_input", "error"], + [ + [[["total_people", 10, "start_time", "10:00", "finish_time", "11:00"]], "Each occupancy entry should be in a dictionary format. Got \"\"."], + [[{"tal_people": 10, "start_time": "10:00", "finish_time": "11:00"}], "Unable to fetch \"total_people\" key. Got \"['tal_people', 'start_time', 'finish_time']\"."], + [[{"total_people": 10, "art_time": "10:00", "finish_time": "11:00"}], "Unable to fetch \"start_time\" key. Got \"['total_people', 'art_time', 'finish_time']\"."], + [[{"total_people": 10, "start_time": "10:00", "ish_time": "11:00"}], "Unable to fetch \"finish_time\" key. Got \"['total_people', 'start_time', 'ish_time']\"."], + [[{"total_people": 10, "start_time": "10", "finish_time": "11:00"}], "Wrong time format - \"HH:MM\". Got \"10\"."], + [[{"total_people": 10, "start_time": "10:00", "finish_time": "11"}], "Wrong time format - \"HH:MM\". Got \"11\"."], + ] +) +def test_dynamic_occupancy_structure(dynamic_occupancy_input, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy_format = "dynamic" + baseline_form.dynamic_infected_occupancy = dynamic_occupancy_input + baseline_form.dynamic_exposed_occupancy = dynamic_occupancy_input + with pytest.raises(TypeError, match=re.escape(error)): + baseline_form.validate() + + +@pytest.mark.parametrize( + ["dynamic_occupancy_input", "error"], + [ + [[{"total_people": "10", "start_time": "10:00", "finish_time": "11:00"}], "Total number of people should be integer. Got \"\"."], + [[{"total_people": 9.8, "start_time": "10:00", "finish_time": "11:00"}], "Total number of people should be integer. Got \"\"."], + [[{"total_people": [10], "start_time": "10:00", "finish_time": "11:00"}], "Total number of people should be integer. Got \"\"."], + [[{"total_people": -1, "start_time": "10:00", "finish_time": "11:00"}], "Total number of people should be non-negative. Got \"-1\"."], + ] +) +def test_dynamic_occupancy_total_people(dynamic_occupancy_input, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy_format = "dynamic" + baseline_form.dynamic_infected_occupancy = dynamic_occupancy_input + baseline_form.dynamic_exposed_occupancy = dynamic_occupancy_input + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() diff --git a/caimira/tests/apps/calculator/test_specific_model_generator.py b/caimira/tests/apps/calculator/test_specific_model_generator.py index 6945a2c1..513d908c 100644 --- a/caimira/tests/apps/calculator/test_specific_model_generator.py +++ b/caimira/tests/apps/calculator/test_specific_model_generator.py @@ -1,5 +1,3 @@ -from typing import Type -import numpy as np import pytest from caimira.calculator.validators.virus import virus_validator diff --git a/caimira/tests/models/test_dynamic_population.py b/caimira/tests/models/test_dynamic_population.py index d7de725b..a6377094 100644 --- a/caimira/tests/models/test_dynamic_population.py +++ b/caimira/tests/models/test_dynamic_population.py @@ -230,6 +230,7 @@ def test_dynamic_total_probability_rule( "(including incidence rate) with dynamic occupancy")): dynamic_population_exposure_model.total_probability_rule() + def test_dynamic_expected_new_cases( dynamic_infected_single_exposure_model: models.ExposureModel, dynamic_exposed_single_exposure_model: models.ExposureModel, @@ -245,11 +246,12 @@ def test_dynamic_expected_new_cases( "with dynamic occupancy")): dynamic_population_exposure_model.expected_new_cases() + def test_dynamic_reproduction_number( dynamic_infected_single_exposure_model: models.ExposureModel, dynamic_exposed_single_exposure_model: models.ExposureModel, dynamic_population_exposure_model: models.ExposureModel): - + with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number " "with dynamic occupancy")): dynamic_infected_single_exposure_model.reproduction_number() diff --git a/cern_caimira/pyproject.toml b/cern_caimira/pyproject.toml index 98d101bd..8eedd26d 100644 --- a/cern_caimira/pyproject.toml +++ b/cern_caimira/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "cern-caimira" -version = "4.17.0a1" +version = "4.17.1a1" description = "CAiMIRA - CERN Airborne Model for Indoor Risk Assessment" license = { text = "Apache-2.0" } authors = [ diff --git a/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js b/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js index 16df7060..9f491418 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js +++ b/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js @@ -17,11 +17,14 @@ const CO2_data_form = [ "infected_lunch_option", "infected_lunch_start", "infected_people", + "dynamic_infected_occupancy", "infected_start", "room_capacity", "room_volume", "specific_breaks", "total_people", + "dynamic_exposed_occupancy", + "occupancy_format", ]; // Method to upload a valid data file (accepted formats: .xls and .xlsx) diff --git a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 index cabc5adf..ed1e24ba 100644 --- a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 +++ b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 @@ -447,6 +447,10 @@ ?
+ {# "static" vs. "dynamic" #} + + +
diff --git a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 index 0868683e..fadd5e79 100644 --- a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 +++ b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 @@ -104,7 +104,7 @@
{% endblock long_range_warning_animation %} -
Expected new cases: {{ long_range_expected_cases | float_format }}
+ {% if form.occupancy_format == "static" %}
Expected new cases: {{ long_range_expected_cases | float_format }}
{% endif %}
{% if form.short_range_option == "short_range_yes" %} @@ -126,38 +126,50 @@ {% endblock warning_animation %} -
Expected new cases: {{ expected_new_cases | float_format }}
+ {% if form.occupancy_format == "static" %} +
Expected new cases: {{ expected_new_cases | float_format }}
+ {% endif %} {% endif %}
{% block report_summary %}
{% if form.short_range_option == "short_range_yes" %}
{% endif %} {% block probabilistic_exposure_probability %} {% if form.exposure_option == "p_probabilistic_exposure" %}
{% endif %} @@ -298,7 +310,7 @@ Scenario P(I) - Expected new cases + {% if form.occupancy_format == "static" %}Expected new cases{% endif %} @@ -306,7 +318,7 @@ {{ scenario_name }} {{ scenario_stats.probability_of_infection | non_zero_percentage }} - {{ scenario_stats.expected_new_cases | float_format }} + {% if form.occupancy_format == "static" %}{{ scenario_stats.expected_new_cases | float_format }}{% endif %} {% endfor %} @@ -626,11 +638,11 @@
    {% for interaction in form.short_range_interactions %}
  • Interaction no. {{ loop.index }}: -
      -
    • Expiratory activity: {{ "Shouting/Singing" if interaction.expiration == "Shouting" else interaction.expiration }}
    • -
    • Start time: {{ interaction.start_time }}
    • -
    • Duration: {{ interaction.duration }} {{ "minutes" if interaction.duration|float > 1 else "minute" }}
    • -
    +
      +
    • Expiratory activity: {{ "Shouting/Singing" if interaction.expiration == "Shouting" else interaction.expiration }}
    • +
    • Start time: {{ interaction.start_time }}
    • +
    • Duration: {{ interaction.duration }} {{ "minutes" if interaction.duration|float > 1 else "minute" }}
    • +
  • {% endfor %}
diff --git a/cern_caimira/src/cern_caimira/apps/templates/cern/calculator.report.html.j2 b/cern_caimira/src/cern_caimira/apps/templates/cern/calculator.report.html.j2 index 701c2ab0..4248b65b 100644 --- a/cern_caimira/src/cern_caimira/apps/templates/cern/calculator.report.html.j2 +++ b/cern_caimira/src/cern_caimira/apps/templates/cern/calculator.report.html.j2 @@ -11,7 +11,7 @@ {% set long_range_prob_inf = prob_inf %} {% endif %} -{% if ((long_range_prob_inf > red_prob_lim) or (expected_new_cases >= 1)) %} +{% if ((long_range_prob_inf > red_prob_lim) or (form.occupancy_format == "static" and expected_new_cases >= 1)) %} {% set long_range_scale_warning = 'red' %} {% set long_range_warning_color= 'bg-danger' %} {% elif (orange_prob_lim <= long_range_prob_inf <= red_prob_lim) %} @@ -22,7 +22,7 @@ {% set long_range_warning_color = 'bg-success' %} {% endif %} -{% if ((prob_inf > red_prob_lim) or (expected_new_cases >= 1)) %} {% set scale_warning = 'red' %} +{% if ((prob_inf > red_prob_lim) or (form.occupancy_format == "static" and expected_new_cases >= 1)) %} {% set scale_warning = 'red' %} {% elif (orange_prob_lim <= prob_inf <= red_prob_lim) %} {% set scale_warning = 'orange' %} {% else %} {% set scale_warning = 'green' %} {% endif %} @@ -70,7 +70,10 @@ {% if form.short_range_option == "short_range_yes" %}
@@ -84,7 +87,10 @@ {% endif %} @@ -137,12 +143,12 @@ Scenario P(i) - Expected new cases + {% if form.occupancy_format == "static" %}Expected new cases{% endif %} {% for scenario_name, scenario_stats in alternative_scenarios.stats.items() %} - {%if (( scenario_stats.probability_of_infection > red_prob_lim) or (scenario_stats.expected_new_cases >= 1)) %} + {%if (( scenario_stats.probability_of_infection > red_prob_lim) or (form.occupancy_format == "static" and scenario_stats.expected_new_cases >= 1)) %} {% elif (orange_prob_lim <= scenario_stats.probability_of_infection <= red_prob_lim) %} @@ -151,7 +157,7 @@ {% endif%} {{ scenario_name }} {{ scenario_stats.probability_of_infection | non_zero_percentage }} - {{ scenario_stats.expected_new_cases | float_format }} + {% if form.occupancy_format == "static" %}{{ scenario_stats.expected_new_cases | float_format }}{% endif %} {% endfor %} diff --git a/cern_caimira/tests/test_report_generator.py b/cern_caimira/tests/test_report_generator.py index 6e257f61..f15833a2 100644 --- a/cern_caimira/tests/test_report_generator.py +++ b/cern_caimira/tests/test_report_generator.py @@ -2,6 +2,7 @@ import concurrent.futures from functools import partial import os import time +import json import numpy as np import pytest @@ -123,4 +124,62 @@ def test_expected_new_cases(baseline_form_with_sr: VirusFormData): lr_expected_new_cases = alternative_statistics['stats']['Base scenario without short-range interactions']['expected_new_cases'] np.testing.assert_almost_equal(sr_lr_expected_new_cases, lr_expected_new_cases + sr_lr_prob_inf * baseline_form_with_sr.short_range_occupants, 2) + + +def test_static_vs_dynamic_occupancy_from_form(baseline_form_data, data_registry): + """ + Assert that the results between a static and dynamic occupancy model (from form inputs) are similar. + """ + executor_factory = partial( + concurrent.futures.ThreadPoolExecutor, 1, + ) + + # By default the baseline form accepts static occupancy + static_occupancy_baseline_form: VirusFormData = VirusFormData.from_dict(baseline_form_data, data_registry) + static_occupancy_model = static_occupancy_baseline_form.build_model() + static_occupancy_report_data = rep_gen.calculate_report_data(static_occupancy_baseline_form, executor_factory) + + # Update the initial form data to include dynamic occupancy (please note the 4 coffee and 1 lunch breaks) + baseline_form_data['occupancy_format'] = 'dynamic' + baseline_form_data['dynamic_infected_occupancy'] = json.dumps([ + {'total_people': 1, 'start_time': '09:00', 'finish_time': '10:03'}, + {'total_people': 0, 'start_time': '10:03', 'finish_time': '10:13'}, + {'total_people': 1, 'start_time': '10:13', 'finish_time': '11:16'}, + {'total_people': 0, 'start_time': '11:16', 'finish_time': '11:26'}, + {'total_people': 1, 'start_time': '11:26', 'finish_time': '12:30'}, + {'total_people': 0, 'start_time': '12:30', 'finish_time': '13:30'}, + {'total_people': 1, 'start_time': '13:30', 'finish_time': '14:53'}, + {'total_people': 0, 'start_time': '14:53', 'finish_time': '15:03'}, + {'total_people': 1, 'start_time': '15:03', 'finish_time': '16:26'}, + {'total_people': 0, 'start_time': '16:26', 'finish_time': '16:36'}, + {'total_people': 1, 'start_time': '16:36', 'finish_time': '18:00'}, + ]) + baseline_form_data['dynamic_exposed_occupancy'] = json.dumps([ + {'total_people': 9, 'start_time': '09:00', 'finish_time': '10:03'}, + {'total_people': 0, 'start_time': '10:03', 'finish_time': '10:13'}, + {'total_people': 9, 'start_time': '10:13', 'finish_time': '11:16'}, + {'total_people': 0, 'start_time': '11:16', 'finish_time': '11:26'}, + {'total_people': 9, 'start_time': '11:26', 'finish_time': '12:30'}, + {'total_people': 0, 'start_time': '12:30', 'finish_time': '13:30'}, + {'total_people': 9, 'start_time': '13:30', 'finish_time': '14:53'}, + {'total_people': 0, 'start_time': '14:53', 'finish_time': '15:03'}, + {'total_people': 9, 'start_time': '15:03', 'finish_time': '16:26'}, + {'total_people': 0, 'start_time': '16:26', 'finish_time': '16:36'}, + {'total_people': 9, 'start_time': '16:36', 'finish_time': '18:00'}, + ]) + baseline_form_data['total_people'] = 0 + baseline_form_data['infected_people'] = 0 + + dynamic_occupancy_baseline_form: VirusFormData = VirusFormData.from_dict(baseline_form_data, data_registry) + dynamic_occupancy_model = dynamic_occupancy_baseline_form.build_model() + dynamic_occupancy_report_data = rep_gen.calculate_report_data(dynamic_occupancy_baseline_form, executor_factory) + + assert (list(sorted(static_occupancy_model.concentration_model.infected.presence.transition_times())) == + list(dynamic_occupancy_model.concentration_model.infected.number.transition_times)) + assert (list(sorted(static_occupancy_model.exposed.presence.transition_times())) == + list(dynamic_occupancy_model.exposed.number.transition_times)) + + np.testing.assert_almost_equal(static_occupancy_report_data['prob_inf'], dynamic_occupancy_report_data['prob_inf'], 1) + assert dynamic_occupancy_report_data['expected_new_cases'] == None + assert dynamic_occupancy_report_data['prob_probabilistic_exposure'] == None \ No newline at end of file