From cd3df90c4ca982622ee85f568588d0be64930f20 Mon Sep 17 00:00:00 2001 From: lrdossan Date: Thu, 3 Oct 2024 16:44:28 +0200 Subject: [PATCH 01/21] Added dynamic groups in backend and propagated required changes to requests - Correction of typos - Changes do model to accommodate group of exposure models - Handled virus validator to accommodate group of exposure models - Modifications to accomodate generation of results for exposure model group - Added data registry attribute to exposuremodelgroups class - Fixed test for dynamic models - Defined ExposureModelGroup with required methods - Adapted virus report data to generate results for groups of exposed population - Build same distributions for different models - Fixed bug with defaults - Handled definition of a single ExposureModel root obj when only one group is defined - Added short-range expirations per group - Added full validation on short_range interactions with dynamic exposure model - Added full set of tests - Updated docstrings - Added support for number of people in exposed population, which should be identical within each group - Added type checks - Added UI adjustments for expected new cases - Adapted exposed population data format to accommodate constant total_people --- .../src/caimira/calculator/models/models.py | 113 ++++-- .../calculator/report/virus_report_data.py | 279 +++++++++---- .../validators/co2/co2_validator.py | 50 +-- .../caimira/calculator/validators/defaults.py | 7 +- .../calculator/validators/form_validator.py | 374 +++++++++++++++--- .../validators/virus/virus_validator.py | 230 ++++++----- .../apps/calculator/test_model_generator.py | 269 ++++++++++++- .../tests/models/test_dynamic_population.py | 50 +-- .../apps/calculator/static/js/co2_form.js | 8 +- .../apps/calculator/static/js/form.js | 99 +++-- .../apps/calculator/static/js/report.js | 4 +- .../templates/base/calculator.form.html.j2 | 4 +- .../templates/base/calculator.report.html.j2 | 35 +- .../templates/cern/calculator.report.html.j2 | 10 +- cern_caimira/tests/conftest.py | 2 +- cern_caimira/tests/test_report_generator.py | 59 --- 16 files changed, 1118 insertions(+), 475 deletions(-) diff --git a/caimira/src/caimira/calculator/models/models.py b/caimira/src/caimira/calculator/models/models.py index 8a250907..9d8c073a 100644 --- a/caimira/src/caimira/calculator/models/models.py +++ b/caimira/src/caimira/calculator/models/models.py @@ -799,7 +799,7 @@ Activity.types = { @dataclass(frozen=True) class SimplePopulation: """ - Represents a group of people all with exactly the same behaviour and + Represents a group of people all with exactly the same behavior and situation. """ @@ -844,7 +844,7 @@ class SimplePopulation: @dataclass(frozen=True) class Population(SimplePopulation): """ - Represents a group of people all with exactly the same behaviour and + Represents a group of people all with exactly the same behavior and situation, considering the usage of mask and a certain host immunity. """ @@ -1324,6 +1324,9 @@ class ShortRangeModel: #: Interpersonal distances distance: _VectorisedFloat + #: Expiration definition + expiration_def: typing.Optional[str] = None + def dilution_factor(self) -> _VectorisedFloat: ''' The dilution factor for the respective expiratory activity type. @@ -1653,6 +1656,9 @@ class ExposureModel: In other words, the air exchange rate from the ventilation, and the virus decay constant, must not be given as arrays. + + It also checks that the number of exposed is + static during the simulation time. """ c_model = self.concentration_model # Check if the diameter is vectorised. @@ -1663,6 +1669,11 @@ class ExposureModel: c_model.ventilation.air_exchange(c_model.room, time)) for time in c_model.state_change_times()))): raise ValueError("If the diameter is an array, none of the ventilation parameters " "or virus decay constant can be arrays at the same time.") + + # Check if exposed population is static + if not isinstance(self.exposed.number, int) or not isinstance(self.exposed.presence, Interval): + raise TypeError("The exposed number must be an int and presence an Interval. " + f"Got {type(self.exposed.number)} and {type(self.exposed.presence)}.") @method_cache def population_state_change_times(self) -> typing.List[float]: @@ -1809,11 +1820,9 @@ class ExposureModel: The number of virus per m^3 deposited on the respiratory tract. """ population_change_times = self.population_state_change_times() - deposited_exposure = [] for start, stop in zip(population_change_times[:-1], population_change_times[1:]): deposited_exposure.append(self.deposited_exposure_between_bounds(start, stop)) - return deposited_exposure def deposited_exposure(self) -> _VectorisedFloat: @@ -1838,8 +1847,7 @@ class ExposureModel: return (1 - np.prod([1 - prob for prob in self._infection_probability_list()], axis = 0)) * 100 def total_probability_rule(self) -> _VectorisedFloat: - if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or - isinstance(self.exposed.number, IntPiecewiseConstant)): + if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant)): raise NotImplementedError("Cannot compute total probability " "(including incidence rate) with dynamic occupancy") @@ -1847,9 +1855,9 @@ class ExposureModel: sum_probability = 0.0 # Create an equivalent exposure model but changing the number of infected cases. - total_people = self.concentration_model.infected.number + self.exposed.number + total_people = self.concentration_model.infected.number + self.exposed.number # type: ignore max_num_infected = (total_people if total_people < 10 else 10) - # The influence of a higher number of simultainious infected people (> 4 - 5) yields an almost negligible contirbution to the total probability. + # The influence of a higher number of simultaneous infected people (> 4 - 5) yields an almost negligible contribution to the total probability. # To be on the safe side, a hard coded limit with a safety margin of 2x was set. # Therefore we decided a hard limit of 10 infected people. for num_infected in range(1, max_num_infected + 1): @@ -1872,43 +1880,88 @@ class ExposureModel: 1) Long-range exposure: take the infection_probability and multiply by the occupants exposed to long-range. 2) Short- and long-range exposure: take the infection_probability of long-range multiplied by the occupants exposed to long-range only, plus the infection_probability of short- and long-range multiplied by the occupants exposed to short-range only. - - Currently disabled when dynamic occupancy is defined for the exposed population. """ - - if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or - isinstance(self.exposed.number, IntPiecewiseConstant)): - raise NotImplementedError("Cannot compute expected new cases " - "with dynamic occupancy") - + number = self.exposed.number if self.short_range != (): - new_cases_long_range = nested_replace(self, {'short_range': [],}).infection_probability() * (self.exposed.number - self.exposed_to_short_range) + new_cases_long_range = nested_replace(self, {'short_range': [],}).infection_probability() * (number - self.exposed_to_short_range) # type: ignore return (new_cases_long_range + (self.infection_probability() * self.exposed_to_short_range)) / 100 - return self.infection_probability() * self.exposed.number / 100 + return self.infection_probability() * number / 100 def reproduction_number(self) -> _VectorisedFloat: """ The reproduction number can be thought of as the expected number of cases directly generated by one infected case in a population. - - Currently disabled when dynamic occupancy is defined for both the infected and exposed population. """ - - if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or - isinstance(self.exposed.number, IntPiecewiseConstant)): - raise NotImplementedError("Cannot compute reproduction number " - "with dynamic occupancy") - - if self.concentration_model.infected.number == 1: + infected_population: InfectedPopulation = self.concentration_model.infected + if isinstance(infected_population.number, int) and infected_population.number == 1: return self.expected_new_cases() # Create an equivalent exposure model but with precisely - # one infected case. + # one infected case, respecting the presence interval. single_exposure_model = nested_replace( self, { - 'concentration_model.infected.number': 1} + 'concentration_model.infected.number': 1, + 'concentration_model.infected.presence': infected_population.presence_interval(), + } ) - return single_exposure_model.expected_new_cases() + + +@dataclass(frozen=True) +class ExposureModelGroup: + """ + Represents a group of exposure models. This is to handle the case + when different groups of people come and go in the room at different + times. These groups are then handled fully independently, with + exposure dose and probability of infection defined for each of them. + """ + data_registry: DataRegistry + + #: The set of exposure models for each exposed population + exposure_models: typing.Tuple[ExposureModel, ...] + + @method_cache + def _deposited_exposure_list(self) -> typing.List[_VectorisedFloat]: + """ + List of doses absorbed by each member of the groups. + """ + return [model.deposited_exposure() for model in self.exposure_models] + + @method_cache + def _infection_probability_list(self): + """ + List of the probability of infection for each group. + """ + return [model.infection_probability() for model in self.exposure_models] # type: ignore + + def expected_new_cases(self) -> _VectorisedFloat: + """ + Final expected number of new cases considering the + contribution of each individual probability of infection. + """ + return np.sum([model.expected_new_cases() for model in self.exposure_models], axis=0) # type: ignore + + def reproduction_number(self) -> _VectorisedFloat: + """ + Reproduction number considering the contribution + of each individual probability of infection and + a single infected occupant. + """ + single_exposure_models = [] + for model in self.exposure_models: + if model.concentration_model.infected.number != 1: + model = nested_replace( + self, { + 'model.concentration_model.infected.number': 1 + } + ) + single_exposure_models.append(model) + + single_exposure_model_group = nested_replace( + self, { + 'exposure_models': single_exposure_models, + } + ) + return single_exposure_model_group.expected_new_cases() \ No newline at end of file diff --git a/caimira/src/caimira/calculator/report/virus_report_data.py b/caimira/src/caimira/calculator/report/virus_report_data.py index 2071fb2a..1cf96595 100644 --- a/caimira/src/caimira/calculator/report/virus_report_data.py +++ b/caimira/src/caimira/calculator/report/virus_report_data.py @@ -12,13 +12,37 @@ from caimira.calculator.validators.virus.virus_validator import VirusFormData def model_start_end(model: models.ExposureModel): + """ + Calculates the start and end times for a single ExposureModel. + + Determines the boundary times of an ExposureModel by comparing + the presence intervals of both the exposed and the infected people. + """ t_start = min(model.exposed.presence_interval().boundaries()[0][0], model.concentration_model.infected.presence_interval().boundaries()[0][0]) t_end = max(model.exposed.presence_interval().boundaries()[-1][1], model.concentration_model.infected.presence_interval().boundaries()[-1][1]) + return t_start, t_end +def model_boundary_times(model: typing.Union[models.ExposureModelGroup, models.ExposureModel]): + """ + Calculates the boundary times for an ExposureModel or ExposureModelGroup. + + For a single ExposureModel, determines its boundary times by comparing + the presence intervals of both the exposed and the infected people. + For an ExposureModelGroup, finds the earliest start time and the latest end time + across all models in the group. + """ + if isinstance(model, models.ExposureModelGroup): + t_start = min((model_start_end(nth_model)[0] for nth_model in model.exposure_models)) + t_end = max((model_start_end(nth_model)[1] for nth_model in model.exposure_models)) + return t_start, t_end + else: + return model_start_end(model) + + def fill_big_gaps(array, gap_size): """ Insert values into the given sorted list if there is a gap of more than ``gap_size``. @@ -42,7 +66,7 @@ def fill_big_gaps(array, gap_size): return result -def non_temp_transition_times(model: models.ExposureModel): +def non_temp_transition_times(model: typing.Union[models.ExposureModelGroup, models.ExposureModel]): """ Return the non-temperature (and PiecewiseConstant) based transition times. @@ -60,10 +84,10 @@ def non_temp_transition_times(model: models.ExposureModel): else: yield name, obj - t_start, t_end = model_start_end(model) + t_start, t_end = model_boundary_times(model) change_times = {t_start, t_end} - for name, obj in walk_model(model, name="exposure"): + for _, obj in walk_model(model, name="exposure"): if isinstance(obj, models.Interval): change_times |= obj.transition_times() @@ -72,7 +96,8 @@ def non_temp_transition_times(model: models.ExposureModel): return sorted(time for time in change_times if (t_start <= time <= t_end)) -def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional[int] = None) -> typing.List[float]: +def interesting_times(model: typing.Union[models.ExposureModelGroup, models.ExposureModel], + approx_n_pts: typing.Optional[int] = None) -> typing.List[float]: """ Pick approximately ``approx_n_pts`` time points which are interesting for the given model. If not provided by argument, ``approx_n_pts`` is set to be 15 times @@ -94,113 +119,195 @@ def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional return nice_times -def concentrations_with_sr_breathing(form: VirusFormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]: +def process_short_range_interactions(model: typing.Union[models.ExposureModelGroup, models.ExposureModel], + times: typing.List[float]): + """ + Process both ExposureModel and ExposureModelGroup for short-range + expirations, intervals and concentrations. Returns a tuple containing + lower concentrations, short-range expirations, and short-range intervals. + """ + if isinstance(model, models.ExposureModelGroup): + model_list = model.exposure_models + elif isinstance(model, models.ExposureModel): + model_list = (model,) + else: + raise TypeError(f"Model should be either an instance of ExposureModel or ExposureModelGroup. Got '{type(model)}.'") + + # Collect short-range expirations and intervals + short_range_expirations: typing.List[str] = [] + short_range_intervals: typing.List[models.BoundarySequence_t] = [] + for model in model_list: + for short_range_model in model.short_range: + short_range_expirations.append(short_range_model.expiration_def) # type: ignore + short_range_intervals.extend(short_range_model.presence.boundaries()) + + # Collect lower concentrations (including Breathing) lower_concentrations = [] for time in times: - for index, (start, stop) in enumerate(short_range_intervals): - # For visualization issues, add short-range breathing activity to the initial long-range concentrations - if start <= time <= stop and form.short_range_interactions[index]['expiration'] == 'Breathing': - lower_concentrations.append( - np.array(model.concentration(float(time))).mean()) + breathing_found = False + for model in model_list: + for short_range_model in model.short_range: + ((start, stop),) = short_range_model.presence.boundaries() + + # Check if the expiration is "Breathing" and the if time is within boundaries + if short_range_model.expiration_def == 'Breathing' and (start <= time <= stop): + lower_concentrations.append(np.sum([np.array(model.concentration(float(time))).mean() for model in model_list])) + breathing_found = True + break + + if breathing_found: break - lower_concentrations.append( - np.array(model.concentration_model.concentration(float(time))).mean()) - return lower_concentrations + + lower_concentrations.append(np.sum([np.array(model.concentration_model.concentration(float(time))).mean() for model in model_list])) + + return lower_concentrations, short_range_expirations, short_range_intervals -def _calculate_deposited_exposure(model, time1, time2, fn_name=None): - return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name +def _calculate_deposited_exposure(model: typing.Union[models.ExposureModelGroup, models.ExposureModel], + time1: float, time2: float, fn_name: typing.Optional[str] = None): + if isinstance(model, models.ExposureModelGroup): + return np.sum([np.array(nth_model.deposited_exposure_between_bounds(float(time1), float(time2))).mean() for nth_model in model.exposure_models]), fn_name + else: + return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name -def _calculate_long_range_deposited_exposure(model, time1, time2, fn_name=None): - return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name +def _calculate_long_range_deposited_exposure(model: typing.Union[models.ExposureModelGroup, models.ExposureModel], + time1: float, time2: float, fn_name: typing.Optional[str] = None): + if isinstance(model, models.ExposureModelGroup): + return np.sum([np.array(nth_model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean() for nth_model in model.exposure_models]), fn_name + else: + return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name -def _calculate_co2_concentration(CO2_model, time, fn_name=None): +def _calculate_concentration(model: typing.Union[models.ExposureModelGroup, models.ExposureModel], + time: float, fn_name: typing.Optional[str] = None): + if isinstance(model, models.ExposureModelGroup): + return np.sum([np.array(nth_model.concentration(float(time))).mean() for nth_model in model.exposure_models]), fn_name + else: + return np.array(model.concentration(float(time))).mean(), fn_name + + +def _calculate_co2_concentration(CO2_model: models.CO2ConcentrationModel, time: float, fn_name: typing.Optional[str] = None): return np.array(CO2_model.concentration(float(time))).mean(), fn_name @profiler.profile def calculate_report_data(form: VirusFormData, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]: - model: models.ExposureModel = form.build_model() - + """ + General output data of a test scenario. + """ + model: typing.Union[models.ExposureModel, models.ExposureModelGroup] = form.build_model() times = interesting_times(model) - short_range_intervals = [interaction.presence.boundaries()[0] - for interaction in model.short_range] - short_range_expirations = [interaction['expiration'] - for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else [] + + if isinstance(model, models.ExposureModelGroup): + exposed_presence_intervals = [] + probabilities_of_infection = [] + for nth_model in model.exposure_models: + exposed_presence_intervals.extend(list(nth_model.exposed.presence_interval().boundaries())) + probabilities_of_infection.append(nth_model.infection_probability()) + index_of_max_mean = max( + range(len(probabilities_of_infection)), + key=lambda i: probabilities_of_infection[i].mean() + ) + probability_of_infection = probabilities_of_infection[index_of_max_mean] + elif isinstance(model, models.ExposureModel): + exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()] + probability_of_infection = model.infection_probability() + else: + raise TypeError(f"Model should be either an instance of ExposureModel or ExposureModelGroup. Got '{type(model)}.'") + + # Handle short-range related outputs + lower_concentrations, short_range_expirations, short_range_intervals = None, None, None + # Short-range related data: + if (form.short_range_option == "short_range_yes"): + lower_concentrations, short_range_expirations, short_range_intervals = process_short_range_interactions(model, times) + + # Probability of infection + prob = probability_of_infection + prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True) - concentrations = [ - np.array(model.concentration(float(time))).mean() - for time in times - ] - lower_concentrations = concentrations_with_sr_breathing( - form, model, times, short_range_intervals) + # Expected new cases + expected_new_cases = np.array(model.expected_new_cases()).mean() + # Expected number of new cases per group + # expected_new_cases_per_group = [np.array(model.expected_new_cases()).mean() for model in models_set.exposure_models] + + # CO2 concentration CO2_model: models.CO2ConcentrationModel = form.build_CO2_model() - # compute deposited exposures and CO2 concentrations in parallel to increase performance + # Compute deposited exposures and virus/CO2 concentrations in parallel to increase performance deposited_exposures = [] long_range_deposited_exposures = [] CO2_concentrations = [] + concentrations = [] tasks = [] - with executor_factory() as executor: + with executor_factory() as executor: # TODO: parallelism in the models for time1, time2 in zip(times[:-1], times[1:]): tasks.append(executor.submit( _calculate_deposited_exposure, model, time1, time2, fn_name="de")) tasks.append(executor.submit( - _calculate_long_range_deposited_exposure, model, time1, time2, fn_name="lr")) - # co2 concentration: takes each time as param, not the interval + _calculate_long_range_deposited_exposure, model, time1, time2, fn_name="de_lr")) + tasks.append(executor.submit( + _calculate_concentration, model, time1, fn_name="cn")) + # virus and co2 concentration: takes each time as param, not the interval tasks.append(executor.submit( _calculate_co2_concentration, CO2_model, time1, fn_name="co2")) - # co2 concentration: calculate the last time too + # virus and co2 concentration: calculate the last time too + tasks.append(executor.submit( _calculate_concentration, + model, times[-1], fn_name="cn")) tasks.append(executor.submit(_calculate_co2_concentration, - CO2_model, times[-1], fn_name="co2")) - + CO2_model, times[-1], fn_name="co2")) + for task in tasks: result, fn_name = task.result() if fn_name == "de": deposited_exposures.append(result) - elif fn_name == "lr": + elif fn_name == "de_lr": long_range_deposited_exposures.append(result) + elif fn_name == "cn": + concentrations.append(result) elif fn_name == "co2": CO2_concentrations.append(result) cumulative_doses = np.cumsum(deposited_exposures) long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures) - prob = np.array(model.infection_probability()) - prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True) - - # Probabilistic exposure and expected new cases (only for static occupancy) prob_probabilistic_exposure = None - expected_new_cases = None - if form.occupancy_format == "static": - if form.exposure_option == "p_probabilistic_exposure": - prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean() - expected_new_cases = np.array(model.expected_new_cases()).mean() - - exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()] + if isinstance(model, models.ExposureModel) and form.exposure_option == "p_probabilistic_exposure": + prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean() conditional_probability_data = None uncertainties_plot_src = None - if (form.conditional_probability_viral_loads and - model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] == ViralLoads.COVID_OVERALL.value): # type: ignore - # Generate all the required data for the conditional probability plot - conditional_probability_data = manufacture_conditional_probability_data( - model, prob) - # Generate the matplotlib image based on the received data - uncertainties_plot_src = img2base64(_figure2bytes( - uncertainties_plot(prob, conditional_probability_data))) + if form.conditional_probability_viral_loads: + if isinstance(model, models.ExposureModelGroup): + all_the_same_virus = True + for nth_model in model.exposure_models: + if nth_model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] != ViralLoads.COVID_OVERALL.value: # type: ignore + all_the_same_virus = False + if all_the_same_virus: + # Given the similarities, pick the first exposure model + the_model: models.ExposureModel = model.exposure_models[0] + # Generate all the required data for the conditional probability plot + conditional_probability_data = manufacture_conditional_probability_data(the_model, prob) + # Generate the matplotlib image based on the received data + uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(prob, conditional_probability_data))) + elif isinstance(model, models.ExposureModel): + if model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] != ViralLoads.COVID_OVERALL.value: # type: ignore + # Generate all the required data for the conditional probability plot + conditional_probability_data = manufacture_conditional_probability_data(model, prob) + # Generate the matplotlib image based on the received data + uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(prob, conditional_probability_data))) + else: + raise TypeError(f"Model should be either an instance of ExposureModel or ExposureModelGroup. Got '{type(model)}.'") return { - "model": model, + "model": model.exposure_models[0] if isinstance(model, models.ExposureModelGroup) else model, # TODO: which model do we want to show info about? "times": list(times), "exposed_presence_intervals": exposed_presence_intervals, "short_range_intervals": short_range_intervals, "short_range_expirations": short_range_expirations, - "concentrations": concentrations, + "concentrations": list(concentrations), "concentrations_zoomed": lower_concentrations, "cumulative_doses": list(cumulative_doses), "long_range_cumulative_doses": list(long_range_cumulative_doses), @@ -349,7 +456,7 @@ def calculate_vl_scenarios_percentiles(model: mc.ExposureModel) -> typing.Dict[s } -def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]: +def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, typing.Union[mc.ExposureModel, mc.ExposureModelGroup]]: scenarios = {} if (form.short_range_option == "short_range_no"): # Two special option cases - HEPA and/or FFP2 masks. @@ -401,45 +508,60 @@ def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, m scenarios['No ventilation with Type I masks'] = with_mask_no_vent.build_mc_model() if not (form.mask_wearing_option == 'mask_off' and form.ventilation_type == 'no_ventilation'): scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model() - else: - # When dynamic occupancy is defined, the replace of total people is useless - the expected number of new cases is not calculated. + # Adjust the number of exposed people with long-range exposure based on short-range interactions if form.occupancy_format == 'static': - no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[], total_people=form.total_people - form.short_range_occupants) + no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions={}, total_people=form.total_people - form.short_range_occupants) elif form.occupancy_format == 'dynamic': - for occ in form.dynamic_exposed_occupancy: # Update the number of exposed people with long-range exposure - if occ['total_people'] > form.short_range_occupants: occ['total_people'] = max(0, occ['total_people'] - form.short_range_occupants) - no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[], dynamic_exposed_occupancy=form.dynamic_exposed_occupancy) - + for group_name, occupancy_list in form.dynamic_exposed_occupancy.items(): + # Check if the group exists in short-range interactions + if group_name in form.short_range_interactions: + short_range_count = form.short_range_occupants + total_people = occupancy_list['total_people'] + if total_people > short_range_count > 0: + # Update the total_people with the adjusted value + occupancy_list['total_people'] = max(0, total_people - short_range_count) + no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions={}, dynamic_exposed_occupancy=form.dynamic_exposed_occupancy) scenarios['Base scenario without short-range interactions'] = no_short_range_alternative.build_mc_model() return scenarios def scenario_statistics( - mc_model: mc.ExposureModel, + mc_model: typing.Union[mc.ExposureModel, mc.ExposureModelGroup], sample_times: typing.List[float], - static_occupancy: bool, compute_prob_exposure: bool, ): - model = mc_model.build_model( + model: typing.Union[models.ExposureModelGroup, models.ExposureModel] = mc_model.build_model( size=mc_model.data_registry.monte_carlo['sample_size']) - - return { - 'probability_of_infection': np.mean(model.infection_probability()), - 'expected_new_cases': np.mean(model.expected_new_cases()) if static_occupancy else None, - 'concentrations': [ + + if isinstance(model, models.ExposureModelGroup): + concentrations = [ + np.sum([np.array(nth_model.concentration(float(time))).mean() for nth_model in model.exposure_models]) + for time in sample_times + ] + prob = np.max([np.mean(nth_model.infection_probability()) for nth_model in model.exposure_models]) + elif isinstance(model, models.ExposureModel): + concentrations = [ np.mean(model.concentration(time)) for time in sample_times - ], - 'prob_probabilistic_exposure': model.total_probability_rule() if compute_prob_exposure else None, + ] + prob = np.mean(model.infection_probability()) + else: + raise TypeError(f"Model should be either an instance of ExposureModel or ExposureModelGroup. Got '{type(model)}.'") + + return { + 'probability_of_infection': prob, + 'expected_new_cases': np.mean(model.expected_new_cases()), + 'concentrations': concentrations, + 'prob_probabilistic_exposure': model.total_probability_rule() if isinstance(model, models.ExposureModel) and compute_prob_exposure else None } def comparison_report( form: VirusFormData, report_data: typing.Dict[str, typing.Any], - scenarios: typing.Dict[str, mc.ExposureModel], + scenarios: typing.Dict[str, typing.Union[mc.ExposureModel, mc.ExposureModelGroup]], executor_factory: typing.Callable[[], concurrent.futures.Executor], ): if (form.short_range_option == "short_range_no"): @@ -461,7 +583,6 @@ def comparison_report( scenario_statistics, scenarios.values(), [report_data['times']] * len(scenarios), - [static_occupancy] * len(scenarios), [compute_prob_exposure] * len(scenarios), timeout=60, ) diff --git a/caimira/src/caimira/calculator/validators/co2/co2_validator.py b/caimira/src/caimira/calculator/validators/co2/co2_validator.py index 7b9185ab..e7fe4ef9 100644 --- a/caimira/src/caimira/calculator/validators/co2/co2_validator.py +++ b/caimira/src/caimira/calculator/validators/co2/co2_validator.py @@ -28,8 +28,8 @@ class CO2FormData(FormData): # and the defaults in any html form must not be contradictory. _DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = { 'CO2_data': '{}', + 'dynamic_exposed_occupancy': '{}', 'dynamic_infected_occupancy': '[]', - 'dynamic_exposed_occupancy': '[]', 'exposed_coffee_break_option': 'coffee_break_0', 'exposed_coffee_duration': 5, 'exposed_finish': '17:30', @@ -74,7 +74,7 @@ class CO2FormData(FormData): raise TypeError(f'The room capacity should be a valid integer (> 0). Got {self.room_capacity}.') # Validate specific inputs - breaks (exposed and infected) - if self.specific_breaks != {}: + if self.specific_breaks != {} and self.occupancy_format == 'static': if type(self.specific_breaks) is not dict: raise TypeError('The specific breaks should be in a dictionary.') @@ -188,11 +188,6 @@ class CO2FormData(FormData): return img2base64(_figure2bytes(fig)), vent_plot_data - def population_present_changes(self, infected_presence: models.Interval, exposed_presence: models.Interval) -> typing.List[float]: - state_change_times = set(infected_presence.transition_times()) - state_change_times.update(exposed_presence.transition_times()) - return sorted(state_change_times) - def ventilation_transition_times(self) -> typing.Tuple[float]: ''' Check if the last time from the input data is @@ -207,45 +202,16 @@ class CO2FormData(FormData): return tuple(vent_states) def build_model(self, sample_size = None) -> models.CO2DataModel: - # Build a simple infected and exposed population for the case when presence - # intervals and number of people are dynamic. Activity type is not needed. - if self.occupancy_format == 'dynamic': - if isinstance(self.dynamic_infected_occupancy, typing.List) and len(self.dynamic_infected_occupancy) > 0: - infected_people = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy) - infected_presence = None - else: - raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_infected_occupancy}".') - if isinstance(self.dynamic_exposed_occupancy, typing.List) and len(self.dynamic_exposed_occupancy) > 0: - exposed_people = self.generate_dynamic_occupancy(self.dynamic_exposed_occupancy) - exposed_presence = None - else: - raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_exposed_occupancy}".') - else: - infected_people = self.infected_people - exposed_people = self.total_people - self.infected_people - infected_presence = self.infected_present_interval() - exposed_presence = self.exposed_present_interval() - - infected_population = models.SimplePopulation( - number=infected_people, - presence=infected_presence, - activity=None, # type: ignore - ) - exposed_population=models.SimplePopulation( - number=exposed_people, - presence=exposed_presence, - activity=None, # type: ignore - ) - - all_state_changes=self.population_present_changes(infected_population.presence_interval(), - exposed_population.presence_interval()) - total_people = [infected_population.people_present(stop) + exposed_population.people_present(stop) - for _, stop in zip(all_state_changes[:-1], all_state_changes[1:])] + """ + Builds a CO2 data model that considers data + from the defined population groups. + """ + occupancy = self.build_CO2_piecewise() return models.CO2DataModel( data_registry=self.data_registry, room=models.Room(volume=self.room_volume, capacity=self.room_capacity), - occupancy=models.IntPiecewiseConstant(transition_times=tuple(all_state_changes), values=tuple(total_people)), + occupancy=occupancy, ventilation_transition_times=self.ventilation_transition_times(), times=self.CO2_data['times'], CO2_concentrations=self.CO2_data['CO2'], diff --git a/caimira/src/caimira/calculator/validators/defaults.py b/caimira/src/caimira/calculator/validators/defaults.py index 604617c0..69b663ed 100644 --- a/caimira/src/caimira/calculator/validators/defaults.py +++ b/caimira/src/caimira/calculator/validators/defaults.py @@ -16,7 +16,7 @@ DEFAULTS = { 'calculator_version': NO_DEFAULT, 'ceiling_height': 0., 'conditional_probability_viral_loads': False, - 'dynamic_exposed_occupancy': '[]', + 'dynamic_exposed_occupancy': '{}', 'dynamic_infected_occupancy': '[]', 'event_month': 'January', 'exposed_coffee_break_option': 'coffee_break_0', @@ -56,7 +56,7 @@ DEFAULTS = { 'room_number': NO_DEFAULT, 'room_volume': 0., 'sensor_in_use': '', - 'short_range_interactions': '[]', + 'short_range_interactions': '{}', 'short_range_occupants': 0, 'short_range_option': 'short_range_no', 'simulation_name': NO_DEFAULT, @@ -82,7 +82,8 @@ DEFAULTS = { # ------------------ Validation ---------------------- COFFEE_OPTIONS_INT = {'coffee_break_0': 0, 'coffee_break_1': 1, - 'coffee_break_2': 2, 'coffee_break_4': 4} + 'coffee_break_2': 2, 'coffee_break_3': 3, + 'coffee_break_4': 4} CONFIDENCE_LEVEL_OPTIONS = {'confidence_low': 10, 'confidence_medium': 5, 'confidence_high': 2} MECHANICAL_VENTILATION_TYPES = { diff --git a/caimira/src/caimira/calculator/validators/form_validator.py b/caimira/src/caimira/calculator/validators/form_validator.py index fe2e58dc..db45ec33 100644 --- a/caimira/src/caimira/calculator/validators/form_validator.py +++ b/caimira/src/caimira/calculator/validators/form_validator.py @@ -6,6 +6,7 @@ import ast import json import re +from collections import defaultdict import numpy as np from .defaults import DEFAULTS, NO_DEFAULT, COFFEE_OPTIONS_INT @@ -42,7 +43,7 @@ class FormData: total_people: int # Dynamic occupancy inputs - dynamic_exposed_occupancy: list + dynamic_exposed_occupancy: dict dynamic_infected_occupancy: list data_registry: DataRegistry @@ -96,8 +97,198 @@ class FormData: if default is not NO_DEFAULT and value in [default, 'not-applicable']: form_dict.pop(attr) return form_dict + + def validate_dynamic_input(self, dynamic_input_value, input_name, dynamic_input_key = None): + # Check if the input is a valid non-empty list + specific_group_msg = f' in "{dynamic_input_key}" ("presence")' if dynamic_input_key else '' + if not isinstance(dynamic_input_value, list) : + raise TypeError(f'The input "{input_name}"{specific_group_msg} should be a valid, non-empty list. Got "{type(dynamic_input_value)}".') + # Check if input is populated + if len(dynamic_input_value) == 0: + raise ValueError(f'The input "{input_name}" should be a valid, non-empty list. Got "{dynamic_input_value}".') + + # To store already processed interactions for overlap checking + existing_dynamic_infected_interval = [] + existing_dynamic_exposed_interval = [] + short_range_existing_interaction = [] + + for entry in dynamic_input_value: + # Check if each entry is a dictionary + if not isinstance(entry, typing.Dict): + raise TypeError(f'Each entry in "{input_name}" should be a dictionary. Got "{type(entry)}".') + + # Check for required keys in each entry + dict_keys = entry.keys() + + # Check for the "start_time" and "finish_time" keys for "dynamic_exposed_occupancy" and "dynamic_infected_occupancy" + if input_name in ["dynamic_exposed_occupancy", "dynamic_infected_occupancy"]: + # Check for time format in "start_time" and "finish_time" + for time_key in ["start_time", "finish_time"]: + if time_key not in dict_keys: + raise TypeError(f'Missing "{time_key}" key in "{input_name}"{specific_group_msg}. Got keys: "{list(dict_keys)}".') + time_value = entry[time_key] + if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time_value): + raise ValueError(f'Invalid time format for "{time_key}" in "{input_name}"{specific_group_msg}. Expected "HH:MM". Got "{time_value}".') + if entry["finish_time"] <= entry["start_time"]: + raise ValueError(f'Occupancy presence in "{input_name}"{specific_group_msg} entry has a finish time after the start time. Got start time: "{entry["start_time"]}" and finish time: "{entry["finish_time"]}".') + + # Check for the "dynamic_infected_occupancy" uniqueness of intervals, + # as well as the "total_people" keyword and its constraints + if input_name == "dynamic_infected_occupancy": + self.check_overlap(entry, existing_dynamic_infected_interval) + existing_dynamic_infected_interval.append(entry) + if "total_people" not in dict_keys: + raise TypeError(f'Missing "total_people" key in "dynamic_infected_occupancy". Got keys: "{list(dict_keys)}".') + else: + value = entry["total_people"] + if not isinstance(value, int) or value < 0: + raise ValueError(f'The "total_people" in "{input_name}" should be a non-negative integer. Got "{value}".') + + # Check for the "dynamic_exposed_occupancy" uniqueness of intervals + if input_name == "dynamic_exposed_occupancy": + self.check_overlap(entry, existing_dynamic_exposed_interval) + existing_dynamic_exposed_interval.append(entry) + + # Check for remaining short-range inputs + if input_name == "short_range_interactions": + # Check for time format in "start_time" and "finish_time" + if "start_time" not in dict_keys: + raise TypeError(f'Missing "start_time" key in "short_range_interactions", "{dynamic_input_key}". Got keys: "{list(dict_keys)}".') + start_time = entry["start_time"] + if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(start_time): + raise ValueError(f'Invalid time format for "start_time" in "short_range_interactions", "{dynamic_input_key}". Expected "HH:MM". Got "{start_time}".') + + # Check for the "expiration" key and its constraints + if "expiration" not in dict_keys: + raise TypeError(f'Missing "expiration" key in "short_range_interactions", "{dynamic_input_key}". Got keys: "{list(dict_keys)}".') + else: + value = entry["expiration"] + if value not in list(self.data_registry.expiration_particle['expiration_BLO_factors'].keys()): + raise ValueError(f'The "expiration" in "short_range_interactions", "{dynamic_input_key}", does not exist in the registry. Got "{value}".') + + if "duration" not in dict_keys: + raise TypeError(f'Missing "duration" key in "short_range_interactions", "{dynamic_input_key}". Got keys: "{list(dict_keys)}".') + duration = entry["duration"] + if duration < 0: + raise ValueError(f'The "duration" in "short_range_interactions", "{dynamic_input_key}", should be a non-negative integer. Got "{duration}".') + + # Occupancy format dependent inputs + if self.occupancy_format == 'dynamic': + # Find corresponding exposure group + exposure_group_obj = next( + (occupancy_group for occupancy_key, occupancy_group in self.dynamic_exposed_occupancy.items() + if occupancy_key == dynamic_input_key), + None + ) + + if exposure_group_obj is None: + raise ValueError( + f'Exposure group "{dynamic_input_key}" in short-range interaction not found in dynamic exposed occupancy.' + ) + + is_within_any_long_range = False + for exposure_interval in exposure_group_obj['presence']: + # Check for correct timing within long-range exposure and overlaps with existing interactions + long_range_start = time_string_to_minutes(exposure_interval['start_time'])/60 + long_range_stop = time_string_to_minutes(exposure_interval['finish_time'])/60 + + # Flag to check if interaction falls within any long-range exposure interval + if self.check_time_and_overlap( + entry, short_range_existing_interaction, long_range_start, long_range_stop + ): is_within_any_long_range = True + + # Add interaction to the list of processed interactions if within long-range + short_range_existing_interaction.append(entry) + + # If no long-range interval contains the interaction, raise an error + if not is_within_any_long_range: + raise ValueError( + f'Short-range interaction "{entry}" does not fall within any long-range exposure interval in "{dynamic_input_key}".' + ) + + elif self.occupancy_format == 'static': + # It means that we have a single exposure model + long_range_start = min(self.infected_start, self.exposed_start)/60 + long_range_stop = max(self.infected_finish, self.exposed_finish)/60 + + if not self.check_time_and_overlap(entry, short_range_existing_interaction, long_range_start, long_range_stop): + raise ValueError( + f'Short-range interactions should be defined during simulation time. Got "{entry}".' + ) + + # Add interaction to the list of processed interactions + short_range_existing_interaction.append(entry) + else: + raise TypeError( + f'Undefined exposure type. Got "{self.occupancy_format}", accepted formats are "dynamic" or "exposed".') + + def validate_dynamic_exposed_format(self, dynamic_exposed_group: dict, key: str): + """ + Validates the expected keywords for the dynamic + exposed input. + """ + # Total people + if 'total_people' not in dynamic_exposed_group: + raise TypeError(f'Missing "total_people" key in "dynamic_exposed_occupancy" group "{key}". Got keys: "{list(dynamic_exposed_group.keys())}".') + else: + total_people = dynamic_exposed_group['total_people'] + if not isinstance(total_people, int) or total_people < 0: + raise ValueError(f'The "total_people" in "dynamic_exposed_occupancy" group "{key}" should be a non-negative integer. Got "{total_people}".') + # Presence + if 'presence' not in dynamic_exposed_group: + raise TypeError(f'Missing "presence" key in "dynamic_exposed_occupancy" group "{key}". Got keys: "{list(dynamic_exposed_group.keys())}".') + + def get_start_and_finish_time(self, entry: dict): + entry_start = time_string_to_minutes(entry["start_time"])/60 + if "finish_time" in list(entry.keys()): + entry_finish = time_string_to_minutes(entry["finish_time"])/60 + else: + entry_finish = entry_start + entry['duration']/60 + return entry_start, entry_finish + + def check_time_and_overlap(self, interaction, existing_interactions, lr_start, lr_stop): + """ + Checks if the interaction overlaps with any existing interactions for the + same exposure group and if it falls within the long-range exposure time. + """ + interaction_start, interaction_finish = self.get_start_and_finish_time(interaction) + # Check if the SR interaction is within the LR exposure time + if lr_start <= interaction_start <= lr_stop and lr_start <= interaction_finish <= lr_stop: + for existing in existing_interactions: + existing_start, existing_finish = self.get_start_and_finish_time(existing) + # Check for overlap + if interaction_start < existing_finish and existing_start < interaction_finish: + raise ValueError( + f'Overlap detected for "short-range interaction": New interaction ' + f'"{interaction}" overlaps with existing interaction "{existing}".' + ) + # Return True if interaction falls within the current long-range interval + return True + return False + + def check_overlap(self, interaction, existing_interactions): + """ + Checks if the dynamic entry overlaps with any already existing interaction. + """ + interaction_start = time_string_to_minutes(interaction["start_time"])/60 + interaction_finish = time_string_to_minutes(interaction["finish_time"])/60 + + for existing in existing_interactions: + existing_start = time_string_to_minutes(existing["start_time"])/60 + existing_finish = time_string_to_minutes(existing["finish_time"])/60 + + # Check for overlap + if (interaction_start < existing_finish and existing_start < interaction_finish): + raise ValueError( + f'Overlap detected: New interaction ' + f'"{interaction}" overlaps with existing interaction "{existing}".' + ) + return + + def validate_population_parameters(self): + """Validates required parameters for dynamic inputs""" # Static occupancy is defined. if self.occupancy_format == 'static': # Validate number of infected <= number of total people @@ -172,34 +363,21 @@ class FormData: f"{getattr(self, attr_name)} is not a valid value for {attr_name}") # Dynamic occupancy is defined. elif self.occupancy_format == 'dynamic': - for dynamic_format in (self.dynamic_infected_occupancy, self.dynamic_exposed_occupancy): - for occupancy in dynamic_format: - # Check if each occupancy entry is a dictionary - if not isinstance(occupancy, typing.Dict): - raise TypeError(f'Each occupancy entry should be in a dictionary format. Got "{type(occupancy)}".') - - # Check for required keys in each occupancy entry - dict_keys = list(occupancy.keys()) - if "total_people" not in dict_keys: - raise TypeError(f'Unable to fetch "total_people" key. Got "{dict_keys}".') - else: - value = occupancy["total_people"] - # Check if the value is a non-negative integer - if not isinstance(value, int): - raise ValueError(f'Total number of people should be integer. Got "{type(value)}".') - elif not value >= 0: - raise ValueError(f'Total number of people should be non-negative. Got "{value}".') - - if "start_time" not in dict_keys: - raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys}".') - if "finish_time" not in dict_keys: - raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys}".') - - # Validate time format for start_time and finish_time - for time_key in ["start_time", "finish_time"]: - time = occupancy[time_key] - if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time): - raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".') + # Validate dynamic input format + self.validate_dynamic_input(self.dynamic_infected_occupancy, "dynamic_infected_occupancy") + # Check if dynamic exposed input is a dict + if isinstance(self.dynamic_exposed_occupancy, dict): + # Check if the dict is not empty + if not self.dynamic_exposed_occupancy: + raise ValueError(f'The input "dynamic_exposed_occupancy" should be a valid, non-empty dict. Got "{self.dynamic_exposed_occupancy}".') + # The key is the actual identifier + for key, group in self.dynamic_exposed_occupancy.items(): + # For each group, validate dynamic exposed input format + self.validate_dynamic_exposed_format(group, key) + # ...as well as validate dynamic exposed presence data + self.validate_dynamic_input(group['presence'], "dynamic_exposed_occupancy", key) + else: + raise TypeError(f'The input "dynamic_exposed_occupancy" should be a valid, non-empty dict. Got "{type(self.dynamic_exposed_occupancy)}".') else: raise ValueError(f"'{self.occupancy_format}' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.") @@ -208,6 +386,73 @@ class FormData: def build_model(self, sample_size: typing.Optional[int] = None): raise NotImplementedError("Subclass must implement") + + def population_present_changes(self, population_list: typing.List[models.Interval]) -> typing.List[float]: + """ + Returns a sorted list of unique state changes on + a population list. + """ + state_change_times = set(population_list[0].transition_times()) + for population in population_list: + state_change_times.update(population.transition_times()) + return sorted(state_change_times) + + def build_CO2_piecewise(self): + """ + Builds a simple IntPiecewiseConstant for the different + population groups that are defined. + """ + if self.occupancy_format == 'dynamic': + infected_occupancy = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy) + total_models = [models.SimplePopulation( + number=infected_occupancy, # IntPiecewiseConstant + presence=None, + activity=None, # type: ignore + )] + # For all state changes + total_presence = [infected_occupancy.interval()] + + for exposure_group in self.dynamic_exposed_occupancy.values(): + exposed_occupancy = exposure_group['total_people'] + exposed_presence = self.generate_dynamic_occupancy(exposure_group['presence']) + total_models.append(models.SimplePopulation( + number=exposed_occupancy, # int + presence=exposed_presence, # SpecificInterval + activity=None, # type: ignore + )) + # For all state changes + total_presence.append(exposed_presence) + + elif self.occupancy_format == 'static': + infected_people = self.infected_people + exposed_people = self.total_people - self.infected_people + infected_presence = self.infected_present_interval() + exposed_presence = self.exposed_present_interval() + + infected_population = models.SimplePopulation( + number=infected_people, + presence=infected_presence, + activity=None, # type: ignore + ) + exposed_population=models.SimplePopulation( + number=exposed_people, + presence=exposed_presence, + activity=None, # type: ignore + ) + + total_presence = [infected_presence, exposed_presence] + total_models = [infected_population, exposed_population] + + # Get all state change times from combined populations + all_state_changes=self.population_present_changes(total_presence) + + # Compute total people at each state change + total_people = [] + for _, stop in zip(all_state_changes[:-1], all_state_changes[1:]): + total_people_in_group = sum(population.people_present(stop) for population in total_models) + total_people.append(total_people_in_group) + + return models.IntPiecewiseConstant(transition_times=tuple(all_state_changes), values=tuple(total_people)) def _compute_breaks_in_interval(self, start, finish, n_breaks, duration) -> models.BoundarySequence_t: break_delay = ((finish - start) - @@ -419,25 +664,64 @@ class FormData: ) def generate_dynamic_occupancy(self, dynamic_occupancy: typing.List[typing.Dict[str, typing.Any]]): - transition_times = [] - values = [] - for occupancy in dynamic_occupancy: - start_time = time_string_to_minutes(occupancy['start_time'])/60 - finish_time = time_string_to_minutes(occupancy['finish_time'])/60 - transition_times.extend([start_time, finish_time]) - values.append(occupancy['total_people']) + """ + Creates a model to represent occupancy over time. If `total_people` is + provided in the input, the method generates an `IntPiecewiseConstant` model, representing + occupancy as a piecewise constant function with defined values for each interval. + Otherwise, it generates a `SpecificInterval` model, which only tracks the presence of + intervals without associating values to them. + """ - unique_transition_times_sorted = np.array(sorted(set(transition_times))) + # Initialize variables + if 'total_people' in dynamic_occupancy[0]: # build IntPiecewiseConstant + computePiecewiseConstant = True + transition_times = [] + values = [] + else: + computePiecewiseConstant = False + present_times = [] # build SpecificInterval - if len(values) != len(unique_transition_times_sorted) - 1: - raise ValueError("Cannot compute dynamic occupancy with the provided inputs.") - - population_occupancy: models.IntPiecewiseConstant = models.IntPiecewiseConstant( - transition_times=tuple(unique_transition_times_sorted), - values=tuple(values) + # Sort occupancy entries by start_time to ensure proper ordering + dynamic_occupancy_sorted = sorted( + dynamic_occupancy, key=lambda x: time_string_to_minutes(x['start_time']) ) - return population_occupancy + last_finish_time = None + + for occupancy in dynamic_occupancy_sorted: + start_time = time_string_to_minutes(occupancy['start_time']) / 60 + finish_time = time_string_to_minutes(occupancy['finish_time']) / 60 + + if computePiecewiseConstant: + total_people = occupancy['total_people'] + + # Fill in gap with a zero occupancy if there is a time gap + if last_finish_time is not None and start_time > last_finish_time: + transition_times.append(last_finish_time) + values.append(0) # Add zero for the gap period + + # Update lists with current occupancy entry + transition_times.extend([start_time, finish_time]) + values.append(total_people) + + # Update the last known finish time + last_finish_time = finish_time + else: + present_times.append((start_time, finish_time)) + + # When computing the total people, validate that we have enough values to compute the occupancy + if computePiecewiseConstant: + unique_transition_times_sorted = np.array(sorted(set(transition_times))) + if (len(values) != len(unique_transition_times_sorted) - 1): + raise ValueError("Cannot compute dynamic occupancy with the provided inputs.") + else: + return models.IntPiecewiseConstant( + transition_times=tuple(unique_transition_times_sorted), + values=tuple(values) + ) + else: + return models.SpecificInterval(tuple(present_times)) + def _hours2timestring(hours: float): # Convert times like 14.5 to strings, like "14:30" @@ -450,6 +734,8 @@ def time_string_to_minutes(time: str) -> minutes_since_midnight: :param time: A string of the form "HH:MM" representing a time of day :return: The number of minutes between 'time' and 00:00 """ + if not (0 <= int(time[:2]) <= 23) or not (0 <= int(time[3:]) <= 59): + raise ValueError(f"Wrong time format. Got {time}") return minutes_since_midnight(60 * int(time[:2]) + int(time[3:])) diff --git a/caimira/src/caimira/calculator/validators/virus/virus_validator.py b/caimira/src/caimira/calculator/validators/virus/virus_validator.py index 80eec891..cd934716 100644 --- a/caimira/src/caimira/calculator/validators/virus/virus_validator.py +++ b/caimira/src/caimira/calculator/validators/virus/virus_validator.py @@ -4,6 +4,7 @@ import logging import typing import re +from collections import defaultdict import numpy as np from caimira import __version__ as calculator_version @@ -11,7 +12,7 @@ from ..form_validator import FormData, cast_class_fields, time_string_to_minutes from ..defaults import (DEFAULTS, CONFIDENCE_LEVEL_OPTIONS, MECHANICAL_VENTILATION_TYPES, MASK_WEARING_OPTIONS, MONTH_NAMES, VACCINE_BOOSTER_TYPE, VACCINE_TYPE, VENTILATION_TYPES, VOLUME_TYPES, WINDOWS_OPENING_REGIMES, WINDOWS_TYPES) -from ...models import models, data, monte_carlo as mc +from ...models import models, data, dataclass_utils, monte_carlo as mc from ...models.monte_carlo.data import activity_distributions, virus_distributions, mask_distributions, short_range_distances from ...models.monte_carlo.data import expiration_distribution, expiration_BLO_factors, expiration_distributions, short_range_expiration_distributions @@ -51,7 +52,7 @@ class VirusFormData(FormData): room_heating_option: bool room_number: str sensor_in_use: str - short_range_interactions: list + short_range_interactions: dict short_range_occupants: int short_range_option: str simulation_name: str @@ -73,7 +74,7 @@ class VirusFormData(FormData): _DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS def validate(self): - # Validate population parameters + # Validate population parameters self.validate_population_parameters() validation_tuples = [('activity_type', self.data_registry.population_scenario_activity.keys()), @@ -200,30 +201,31 @@ class VirusFormData(FormData): f'The sum of all respiratory activities should be 100. Got {total_percentage}.') # Validate number of people with short-range interactions - if self.occupancy_format == "static": max_occupants_for_sr = self.total_people - self.infected_people - else: max_occupants_for_sr = np.max(np.array([entry["total_people"] for entry in self.dynamic_exposed_occupancy])) + if self.occupancy_format == 'static': + max_occupants_for_sr = self.total_people - self.infected_people + elif self.occupancy_format == 'dynamic': + max_occupants_for_sr = 0 + for key, group in self.dynamic_exposed_occupancy.items(): + occupants_in_group = group['total_people'] + max_occupants_for_sr = max(max_occupants_for_sr, occupants_in_group) + if self.short_range_occupants > max_occupants_for_sr: raise ValueError( f'The total number of occupants having short-range interactions ({self.short_range_occupants}) should be lower than the exposed population ({max_occupants_for_sr}).' ) - + # Validate short-range interactions interval if self.short_range_option == "short_range_yes": - for interaction in self.short_range_interactions: - # Check if presence is within long-range exposure - presence = self.short_range_interval(interaction) - if (self.occupancy_format == 'dynamic'): - long_range_start = min(time_string_to_minutes(self.dynamic_infected_occupancy[0]['start_time']), - time_string_to_minutes(self.dynamic_exposed_occupancy[0]['start_time'])) - long_range_stop = max(time_string_to_minutes(self.dynamic_infected_occupancy[-1]['finish_time']), - time_string_to_minutes(self.dynamic_exposed_occupancy[-1]['finish_time'])) - else: - long_range_start = min(self.infected_start, self.exposed_start) - long_range_stop = max(self.infected_finish, self.exposed_finish) - if not (long_range_start/60 <= presence.present_times[0][0] <= long_range_stop/60 and - long_range_start/60 <= presence.present_times[0][-1] <= long_range_stop/60): - raise ValueError(f"Short-range interactions should be defined during simulation time. Got {interaction}") - + if isinstance(self.short_range_interactions, dict): + # Check if occupancy format is static, there should be one key-value only in short_range_interactions + if self.occupancy_format == "static" and len(self.short_range_interactions) > 1: + raise ValueError( + 'When occupancy format is "static", there should be only one interaction group in "short_range_interactions".' + ) + # The key is the actual identifier + for key, group in self.short_range_interactions.items(): + self.validate_dynamic_input(group, "short_range_interactions", key) + def initialize_room(self) -> models.Room: # Initializes room with volume either given directly or as product of area and height if self.volume_type == 'room_volume_explicit': @@ -243,69 +245,99 @@ class VirusFormData(FormData): return models.Room(volume=volume, inside_temp=models.PiecewiseConstant((0, 24), (inside_temp,)), humidity=humidity) # type: ignore - def build_mc_model(self) -> mc.ExposureModel: - room = self.initialize_room() + def build_mc_model(self) -> typing.Union[mc.ExposureModel, mc.ExposureModelGroup]: + size = self.data_registry.monte_carlo['sample_size'] + + room: models.Room = self.initialize_room() ventilation: models._VentilationBase = self.ventilation() infected_population: models.InfectedPopulation = self.infected_population() - short_range = [] + + short_range = defaultdict(list) if self.short_range_option == "short_range_yes": - for interaction in self.short_range_interactions: - short_range.append(mc.ShortRangeModel( - data_registry=self.data_registry, - expiration=short_range_expiration_distributions( - self.data_registry)[interaction['expiration']], - activity=infected_population.activity, - presence=self.short_range_interval(interaction), - distance=short_range_distances(self.data_registry), - )) + for key, group in self.short_range_interactions.items(): + for interaction in group: + expiration = short_range_expiration_distributions(self.data_registry)[interaction['expiration']] + presence = self.short_range_interval(interaction) + distances = short_range_distances(self.data_registry) + short_range[key].append(mc.ShortRangeModel( + data_registry=self.data_registry, + expiration=expiration, + activity=infected_population.activity, + presence=presence, + distance=distances, + expiration_def=interaction['expiration'] + ).build_model(size)) - return mc.ExposureModel( + concentration_model: models.ConcentrationModel = mc.ConcentrationModel( data_registry=self.data_registry, - concentration_model=mc.ConcentrationModel( - data_registry=self.data_registry, - room=room, - ventilation=ventilation, - infected=infected_population, - evaporation_factor=0.3, - ), - short_range=tuple(short_range), - exposed=self.exposed_population(), - geographical_data=mc.Cases( - geographic_population=self.geographic_population, - geographic_cases=self.geographic_cases, - ascertainment_bias=CONFIDENCE_LEVEL_OPTIONS[self.ascertainment_bias], - ), - exposed_to_short_range=self.short_range_occupants, - ) + room=room, + ventilation=ventilation, + infected=infected_population, + evaporation_factor=0.3, + ).build_model(size) - def build_model(self, sample_size=None) -> models.ExposureModel: - sample_size = sample_size or self.data_registry.monte_carlo['sample_size'] - return self.build_mc_model().build_model(size=sample_size) + geographical_data: models.Cases = mc.Cases( + geographic_population=self.geographic_population, + geographic_cases=self.geographic_cases, + ascertainment_bias=CONFIDENCE_LEVEL_OPTIONS[self.ascertainment_bias], + ).build_model(size) + + if self.occupancy_format == 'dynamic': + exposure_model_set = [] + for exposure_group in self.dynamic_exposed_occupancy.keys(): + sr_models: typing.Tuple[models.ShortRangeModel, ...] = tuple(short_range[exposure_group]) + exposed_population: mc.Population = self.exposed_population(exposure_group).build_model(size) + + exposure_model = mc.ExposureModel( + data_registry=self.data_registry, + concentration_model=concentration_model, + short_range=sr_models, + exposed=exposed_population, + geographical_data=geographical_data, + exposed_to_short_range=self.short_range_occupants, + ) + exposure_model_set.append(exposure_model) + + if len(list(self.dynamic_exposed_occupancy.keys())) == 1: + return exposure_model_set[0] + else: + return mc.ExposureModelGroup( + data_registry=self.data_registry, + exposure_models=[individual_model.build_model(size) for individual_model in exposure_model_set] + ) + + elif self.occupancy_format == 'static': + exposed_population = self.exposed_population() + short_range_tuple = tuple(item for sublist in short_range.values() for item in sublist) + return mc.ExposureModel( + data_registry=self.data_registry, + concentration_model=concentration_model, + short_range=short_range_tuple, + exposed=exposed_population, + geographical_data=geographical_data, + exposed_to_short_range=self.short_range_occupants, + ) + + def build_model(self, sample_size=None) -> typing.Union[models.ExposureModel, models.ExposureModelGroup]: + size = self.data_registry.monte_carlo['sample_size'] if not sample_size else sample_size + return self.build_mc_model().build_model(size=size) def build_CO2_model(self, sample_size=None) -> models.CO2ConcentrationModel: + """ + Builds a CO2 model that considers the type of + activity and data from the defined population groups. + """ sample_size = sample_size or self.data_registry.monte_carlo['sample_size'] - infected_population: models.InfectedPopulation = self.infected_population( - ).build_model(sample_size) - exposed_population: models.Population = self.exposed_population().build_model(sample_size) - - state_change_times = set( - infected_population.presence_interval().transition_times()) - state_change_times.update( - exposed_population.presence_interval().transition_times()) - transition_times = sorted(state_change_times) - - total_people = [infected_population.people_present(stop) + exposed_population.people_present(stop) - for _, stop in zip(transition_times[:-1], transition_times[1:])] if (self.activity_type == 'precise'): activity_defn, _ = self.generate_precise_activity_expiration() else: activity_defn = self.data_registry.population_scenario_activity[ self.activity_type]['activity'] - + + occupancy = self.build_CO2_piecewise() population = mc.SimplePopulation( - number=models.IntPiecewiseConstant(transition_times=tuple( - transition_times), values=tuple(total_people)), + number=occupancy, presence=None, activity=activity_distributions(self.data_registry)[activity_defn], ) @@ -420,6 +452,7 @@ class VirusFormData(FormData): # This is a minimal, always present source of ventilation, due # to the air infiltration from the outside. # See CERN-OPEN-2021-004, p. 12. + # type: ignore residual_vent: float = self.data_registry.ventilation['infiltration_ventilation'] # type: ignore infiltration_ventilation = models.AirChange( active=always_on, air_exch=residual_vent) @@ -457,30 +490,30 @@ class VirusFormData(FormData): return (self.precise_activity['physical_activity'], respiratory_dict) def infected_population(self) -> mc.InfectedPopulation: + """ + Generates an InfectedPopulation class, for both static and + dynamic occupancy. + """ # Initializes the virus virus = virus_distributions(self.data_registry)[self.virus_type] # Occupancy if self.occupancy_format == 'dynamic': - if isinstance(self.dynamic_infected_occupancy, typing.List) and len(self.dynamic_infected_occupancy) > 0: - # If dynamic occupancy is defined, the generator will parse and validate the - # respective input to a format readable by the model - `IntPiecewiseConstant`. - infected_occupancy = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy) - infected_presence = None - else: - raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_infected_occupancy}".') + infected_occupancy = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy) + infected_presence = None else: - # The number of exposed occupants is the total number of occupants - # minus the number of infected occupants. infected_occupancy = self.infected_people infected_presence = self.infected_present_interval() # Activity and expiration - activity_defn = self.data_registry.population_scenario_activity[self.activity_type]['activity'] - expiration_defn = self.data_registry.population_scenario_activity[self.activity_type]['expiration'] + activity_defn = self.data_registry.population_scenario_activity[ + self.activity_type]['activity'] + expiration_defn = self.data_registry.population_scenario_activity[ + self.activity_type]['expiration'] if (self.activity_type == 'smallmeeting'): # Conversation of N people is approximately 1/N% of the time speaking. - total_people: int = max(infected_occupancy.values) if self.occupancy_format == 'dynamic' else self.total_people + total_people: int = max( + infected_occupancy.values) if self.occupancy_format == 'dynamic' else self.total_people expiration_defn = {'Speaking': 1, 'Breathing': total_people - 1} elif (self.activity_type == 'precise'): activity_defn, expiration_defn = self.generate_precise_activity_expiration() @@ -496,31 +529,36 @@ class VirusFormData(FormData): mask=self.mask(), activity=activity, expiration=expiration, - # Vaccination status does not affect the infected population (for now) + # Vaccination status does not affect the infected population (for the time being) host_immunity=0., ) return infected - def exposed_population(self) -> mc.Population: - activity_defn = (self.precise_activity['physical_activity'] - if self.activity_type == 'precise' - else str(self.data_registry.population_scenario_activity[self.activity_type]['activity'])) - activity = activity_distributions(self.data_registry)[activity_defn] - + def exposed_population(self, exposure_group: typing.Optional[str] = None) -> mc.Population: + """ + Generates an exposed Population class, for both static and + dynamic occupancy. The number of people is constant for a + single group of exposed population, except when breaks are defined. + """ + # Occupancy if self.occupancy_format == 'dynamic': - if isinstance(self.dynamic_exposed_occupancy, typing.List) and len(self.dynamic_exposed_occupancy) > 0: - # If dynamic occupancy is defined, the generator will parse and validate the - # respective input to a format readable by the model - IntPiecewiseConstant. - exposed_occupancy = self.generate_dynamic_occupancy(self.dynamic_exposed_occupancy) - exposed_presence = None - else: - raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_exposed_occupancy}".') + dynamic_group = self.dynamic_exposed_occupancy[exposure_group] + + exposed_occupancy = dynamic_group['total_people'] + exposed_presence = self.generate_dynamic_occupancy(dynamic_group['presence']) else: # The number of exposed occupants is the total number of occupants # minus the number of infected occupants. exposed_occupancy = self.total_people - self.infected_people exposed_presence = self.exposed_present_interval() + # Activity + activity_defn = (self.precise_activity['physical_activity'] + if self.activity_type == 'precise' + else str(self.data_registry.population_scenario_activity[self.activity_type]['activity'])) + activity = activity_distributions(self.data_registry)[activity_defn] + + # Vaccination if (self.vaccine_option): if (self.vaccine_booster_option and self.vaccine_booster_type != 'Other'): host_immunity = [vaccine['VE'] for vaccine in data.vaccine_booster_host_immunity if @@ -569,7 +607,7 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]: 'calculator_version': calculator_version, 'ceiling_height': '', 'conditional_probability_viral_loads': '0', - 'dynamic_exposed_occupancy': '[]', + 'dynamic_exposed_occupancy': '{}', 'dynamic_infected_occupancy': '[]', 'event_month': 'January', 'exposed_coffee_break_option': 'coffee_break_4', @@ -606,7 +644,7 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]: 'room_heating_option': '0', 'room_number': '123', 'room_volume': '75', - 'short_range_interactions': '[]', + 'short_range_interactions': '{}', 'short_range_option': 'short_range_no', 'simulation_name': 'Test', 'total_people': '10', diff --git a/caimira/tests/apps/calculator/test_model_generator.py b/caimira/tests/apps/calculator/test_model_generator.py index 5feb19ef..f90f00ac 100644 --- a/caimira/tests/apps/calculator/test_model_generator.py +++ b/caimira/tests/apps/calculator/test_model_generator.py @@ -598,43 +598,270 @@ def test_form_timezone(baseline_form_data, data_registry, longitude, latitude, m ['random', "'random' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",] ] ) -def test_dynamic_format_input(occupancy_format_input, error, baseline_form: virus_validator.VirusFormData): +def test_occupancy_format_input(occupancy_format_input, error, baseline_form: virus_validator.VirusFormData): baseline_form.occupancy_format = occupancy_format_input with pytest.raises(ValueError, match=re.escape(error)): baseline_form.validate() +def test_dynamic_input_format_ValueError(baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy_format = "dynamic" + # Check empty list for infected occupancy + baseline_form.dynamic_infected_occupancy = [] + error = 'The input "dynamic_infected_occupancy" should be a valid, non-empty list. Got "[]".' + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + # Check empty dict for exposed occupancy + baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}] + baseline_form.dynamic_exposed_occupancy = {} + error = 'The input "dynamic_exposed_occupancy" should be a valid, non-empty dict. Got "{}".' + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + @pytest.mark.parametrize( - ["dynamic_occupancy_input", "error"], - [ - [[["total_people", 10, "start_time", "10:00", "finish_time", "11:00"]], "Each occupancy entry should be in a dictionary format. Got \"\"."], - [[{"tal_people": 10, "start_time": "10:00", "finish_time": "11:00"}], "Unable to fetch \"total_people\" key. Got \"['tal_people', 'start_time', 'finish_time']\"."], - [[{"total_people": 10, "art_time": "10:00", "finish_time": "11:00"}], "Unable to fetch \"start_time\" key. Got \"['total_people', 'art_time', 'finish_time']\"."], - [[{"total_people": 10, "start_time": "10:00", "ish_time": "11:00"}], "Unable to fetch \"finish_time\" key. Got \"['total_people', 'start_time', 'ish_time']\"."], - [[{"total_people": 10, "start_time": "10", "finish_time": "11:00"}], "Wrong time format - \"HH:MM\". Got \"10\"."], - [[{"total_people": 10, "start_time": "10:00", "finish_time": "11"}], "Wrong time format - \"HH:MM\". Got \"11\"."], + ["exposed_format", "error"], + [ + [ + {"tal_people": 10, "presence": [{"start_time": "10:00", "finish_time": "11:00"}],}, + 'Missing "total_people" key in "dynamic_exposed_occupancy" group "group_1". Got keys: "[\'tal_people\', \'presence\']".' + ], + [ + {"total_people": 10, "pesence": [{"start_time": "10:00", "finish_time": "11:00"}],}, + 'Missing "presence" key in "dynamic_exposed_occupancy" group "group_1". Got keys: "[\'total_people\', \'pesence\']".' + ], ] ) -def test_dynamic_occupancy_structure(dynamic_occupancy_input, error, baseline_form: virus_validator.VirusFormData): +def test_dynamic_exposed_format_TypeError(exposed_format, error, baseline_form: virus_validator.VirusFormData): baseline_form.occupancy_format = "dynamic" - baseline_form.dynamic_infected_occupancy = dynamic_occupancy_input - baseline_form.dynamic_exposed_occupancy = dynamic_occupancy_input + baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}] + baseline_form.dynamic_exposed_occupancy = {"group_1": exposed_format} with pytest.raises(TypeError, match=re.escape(error)): baseline_form.validate() @pytest.mark.parametrize( - ["dynamic_occupancy_input", "error"], - [ - [[{"total_people": "10", "start_time": "10:00", "finish_time": "11:00"}], "Total number of people should be integer. Got \"\"."], - [[{"total_people": 9.8, "start_time": "10:00", "finish_time": "11:00"}], "Total number of people should be integer. Got \"\"."], - [[{"total_people": [10], "start_time": "10:00", "finish_time": "11:00"}], "Total number of people should be integer. Got \"\"."], - [[{"total_people": -1, "start_time": "10:00", "finish_time": "11:00"}], "Total number of people should be non-negative. Got \"-1\"."], + ["exposed_presence", "error"], + [ + [[["start_time", "10:00", "finish_time", "11:00"]], 'Each entry in "dynamic_exposed_occupancy" should be a dictionary. Got "".'], + [[{"art_time": "10:00", "finish_time": "11:00"}], 'Missing "start_time" key in "dynamic_exposed_occupancy" in "group_1" ("presence"). Got keys: "[\'art_time\', \'finish_time\']".'], + [[{"start_time": "10:00", "ish_time": "11:00"}], 'Missing "finish_time" key in "dynamic_exposed_occupancy" in "group_1" ("presence"). Got keys: "[\'start_time\', \'ish_time\']".'], ] ) -def test_dynamic_occupancy_total_people(dynamic_occupancy_input, error, baseline_form: virus_validator.VirusFormData): +def test_dynamic_exposed_presence_TypeError(exposed_presence, error, baseline_form: virus_validator.VirusFormData): baseline_form.occupancy_format = "dynamic" - baseline_form.dynamic_infected_occupancy = dynamic_occupancy_input - baseline_form.dynamic_exposed_occupancy = dynamic_occupancy_input + baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}] + baseline_form.dynamic_exposed_occupancy = { + "group_1": { + "total_people": 10, + "presence": exposed_presence, + } + } + with pytest.raises(TypeError, match=re.escape(error)): + baseline_form.validate() + + +@pytest.mark.parametrize( + ["exposed_presence", "error"], + [ + [[{"start_time": "10", "finish_time": "11:00"}], 'Invalid time format for "start_time" in "dynamic_exposed_occupancy" in "group_1" ("presence"). Expected "HH:MM". Got "10".'], + [[{"start_time": "10:00", "finish_time": "11"}], 'Invalid time format for "finish_time" in "dynamic_exposed_occupancy" in "group_1" ("presence"). Expected "HH:MM". Got "11".'], + ] +) +def test_dynamic_exposed_presence_ValueError(exposed_presence, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy_format = "dynamic" + baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}] + baseline_form.dynamic_exposed_occupancy = { + "group_1": { + "total_people": 10, + "presence": exposed_presence + } + } + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + +@pytest.mark.parametrize( + ["exposed_total_people", "error"], + [ + ["10", 'The "total_people" in "dynamic_exposed_occupancy" group "group_1" should be a non-negative integer. Got "10".'], + [9.8, 'The "total_people" in "dynamic_exposed_occupancy" group "group_1" should be a non-negative integer. Got "9.8".'], + [[10], 'The "total_people" in "dynamic_exposed_occupancy" group "group_1" should be a non-negative integer. Got "[10]".'], + [-1, 'The "total_people" in "dynamic_exposed_occupancy" group "group_1" should be a non-negative integer. Got "-1".'], + ] +) +def test_dynamic_exposed_total_people_ValueError(exposed_total_people, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy_format = "dynamic" + baseline_form.dynamic_infected_occupancy = [ + {"total_people": 1, "start_time": "08:00", "finish_time": "10:00"}, + {"total_people": 2, "start_time": "10:00", "finish_time": "18:00"}, + ] + baseline_form.dynamic_exposed_occupancy = { + "group_1": { + "total_people": exposed_total_people, + "presence": [{"start_time": "08:00", "finish_time": "18:00"},], + }, + } + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + +def test_dynamic_infected_overlap(baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy_format = "dynamic" + baseline_form.dynamic_infected_occupancy = [ + {"total_people": 10, "start_time": "08:00", "finish_time": "18:00"}, + {"total_people": 10, "start_time": "10:00", "finish_time": "18:00"}, + ] + baseline_form.dynamic_exposed_occupancy = { + "group_1": { + "total_people": 10, + "presence": [{"start_time": "08:00", "finish_time": "18:00"}], + }, + "group_2": { + "total_people": 10, + "presence": [{"start_time": "10:00", "finish_time": "11:00"}], + }, + "group_3": { + "total_people": 10, + "presence": [{"start_time": "15:00", "finish_time": "18:00"}], + }, + } + error = ( + 'Overlap detected: New interaction ' + '"{\'total_people\': 10, \'start_time\': \'10:00\', \'finish_time\': \'18:00\'}" ' + 'overlaps with existing interaction ' + '"{\'total_people\': 10, \'start_time\': \'08:00\', \'finish_time\': \'18:00\'}".' + ) + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + +def test_dynamic_exposure_group_duplication(baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy_format = "dynamic" + baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}] + baseline_form.dynamic_exposed_occupancy = { + "group_1": { + "total_people": 10, + "presence": [ + {"start_time": "08:00", "finish_time": "17:00"}, + {"start_time": "13:00", "finish_time": "14:00"}, + ], + }, + } + error = ( + 'Overlap detected: New interaction ' + '"{\'start_time\': \'13:00\', \'finish_time\': \'14:00\'}"' + ' overlaps with existing interaction ' + '"{\'start_time\': \'08:00\', \'finish_time\': \'17:00\'}".' + ) + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + +@pytest.mark.parametrize( + ["short_range_input", "error"], + [ + [[["expiration", "Shouting", "start_time", "09:00", "duration", 30]], 'Each entry in "short_range_interactions" should be a dictionary. Got "".'], + [[{"expiratio": "Shouting", "start_time": "09:00", "duration": 30}], 'Missing "expiration" key in "short_range_interactions", "group_1". Got keys: "[\'expiratio\', \'start_time\', \'duration\']".'], + [[{"expiration": "Shouting", "start_tim": "09:00", "duration": 30}], 'Missing "start_time" key in "short_range_interactions", "group_1". Got keys: "[\'expiration\', \'start_tim\', \'duration\']".'], + [[{"expiration": "Shouting", "start_time": "09:00", "duratio": 30}], 'Missing "duration" key in "short_range_interactions", "group_1". Got keys: "[\'expiration\', \'start_time\', \'duratio\']".'], + ] +) +def test_short_range_TypeError(short_range_input, error, baseline_form: virus_validator.VirusFormData): + baseline_form.short_range_option = "short_range_yes" + baseline_form.short_range_interactions = {"group_1": short_range_input} + with pytest.raises(TypeError, match=re.escape(error)): + baseline_form.validate() + + +def test_short_range_exposure_group(baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy_format = 'dynamic' + baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}] + baseline_form.dynamic_exposed_occupancy = { + "group_1": { + "total_people": 10, + "presence": [ + {"start_time": "10:00", "finish_time": "12:00"}, + {"start_time": "13:00", "finish_time": "17:00"}, + ], + }, + "group_2": { + "total_people": 10, + "presence": [ + {"start_time": "10:00", "finish_time": "11:00"}, + ], + }, + } + + # Check for existence of the dictionary key + baseline_form.short_range_option = 'short_range_yes' + baseline_form.short_range_interactions = { + "group_4": [{"expiration": "Shouting", "start_time": "10:30", "duration": 30}], + } + error = 'Exposure group "group_4" in short-range interaction not found in dynamic exposed occupancy.' + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + # Check if interaction time is within simulation time + baseline_form.short_range_interactions = { + "group_1": [{"expiration": "Shouting", "start_time": "18:00", "duration": 30}], + } + error = ( + 'Short-range interaction "{\'expiration\': \'Shouting\', \'start_time\': \'18:00\', \'duration\': 30}"' + ' does not fall within any long-range exposure interval in "group_1".' + ) + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + +@pytest.mark.parametrize( + ["short_range_input", "error"], + [ + [[{"expiration": "Shouting", "start_time": "9", "duration": 30}], 'Invalid time format for "start_time" in "short_range_interactions", "group_1". Expected "HH:MM". Got "9".'], + [[{"expiration": "Whisper", "start_time": "09:00", "duration": 30}], 'The "expiration" in "short_range_interactions", "group_1", does not exist in the registry. Got "Whisper".'], + [[{"expiration": "Shouting", "start_time": "09:00", "duration": -30}], 'The "duration" in "short_range_interactions", "group_1", should be a non-negative integer. Got "-30".'], + ] +) +def test_short_range_value_error(short_range_input, error, baseline_form: virus_validator.VirusFormData): + baseline_form.short_range_option = "short_range_yes" + baseline_form.short_range_interactions = {"group_1": short_range_input} + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + +def test_short_range_with_static_occupancy(baseline_form: virus_validator.VirusFormData): + # By default the occupancy format is 'static' + baseline_form.short_range_option = "short_range_yes" + baseline_form.short_range_interactions = {"group_1": [{"expiration": "Shouting", "start_time": "07:00", "duration": 30}]} + + # Check if interaction is defined during simulation time + error = ( + 'Short-range interactions should be defined during simulation time. Got ' + '"{\'expiration\': \'Shouting\', \'start_time\': \'07:00\', \'duration\': 30}".' + ) + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + # Check overlap of interactions + baseline_form.short_range_interactions = { + "group_1": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}, + {"expiration": "Shouting", "start_time": "10:10", "duration": 15}], + } + error = ( + 'Overlap detected for "short-range interaction": New interaction ' + '"{\'expiration\': \'Shouting\', \'start_time\': \'10:10\', \'duration\': 15}"' + ' overlaps with existing interaction "{\'expiration\': \'Shouting\', \'start_time\': \'10:00\', \'duration\': 30}".' + ) + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + # Check if more than one group is defined + baseline_form.short_range_interactions = { + "group_1": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}, + {"expiration": "Shouting", "start_time": "10:10", "duration": 15}], + "group_2": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}] + } + error = ( + 'When occupancy format is "static", there should be only one interaction group in "short_range_interactions".' + ) with pytest.raises(ValueError, match=re.escape(error)): baseline_form.validate() diff --git a/caimira/tests/models/test_dynamic_population.py b/caimira/tests/models/test_dynamic_population.py index a6377094..53233ac8 100644 --- a/caimira/tests/models/test_dynamic_population.py +++ b/caimira/tests/models/test_dynamic_population.py @@ -41,7 +41,7 @@ def full_exposure_model(data_registry): @pytest.fixture -def baseline_infected_population_number(data_registry): +def baseline_infected_population(data_registry): return models.InfectedPopulation( data_registry=data_registry, number=models.IntPiecewiseConstant( @@ -56,34 +56,15 @@ def baseline_infected_population_number(data_registry): @pytest.fixture -def baseline_exposed_population_number(): - return models.Population( - number=models.IntPiecewiseConstant( - (8, 12, 13, 17), (10, 0, 10)), - presence=None, - mask=models.Mask.types['No mask'], - activity=models.Activity.types['Seated'], - host_immunity=0., - ) - - -@pytest.fixture -def dynamic_infected_single_exposure_model(full_exposure_model, baseline_infected_population_number): +def dynamic_infected_single_exposure_model(full_exposure_model, baseline_infected_population): return dc_utils.nested_replace(full_exposure_model, - {'concentration_model.infected': baseline_infected_population_number, }) + {'concentration_model.infected': baseline_infected_population, }) @pytest.fixture -def dynamic_exposed_single_exposure_model(full_exposure_model, baseline_exposed_population_number): - return dc_utils.nested_replace(full_exposure_model, - {'exposed': baseline_exposed_population_number, }) - - -@pytest.fixture -def dynamic_population_exposure_model(full_exposure_model, baseline_infected_population_number ,baseline_exposed_population_number): +def dynamic_population_exposure_model(full_exposure_model, baseline_infected_population): return dc_utils.nested_replace(full_exposure_model, { - 'concentration_model.infected': baseline_infected_population_number, - 'exposed': baseline_exposed_population_number, + 'concentration_model.infected': baseline_infected_population, }) @@ -92,10 +73,10 @@ def dynamic_population_exposure_model(full_exposure_model, baseline_infected_pop [4., 8., 10., 12., 13., 14., 16., 20., 24.], ) def test_population_number(full_exposure_model: models.ExposureModel, - baseline_infected_population_number: models.InfectedPopulation, time: float): + baseline_infected_population: models.InfectedPopulation, time: float): int_population_number: models.InfectedPopulation = full_exposure_model.concentration_model.infected - piecewise_population_number: models.InfectedPopulation = baseline_infected_population_number + piecewise_population_number: models.InfectedPopulation = baseline_infected_population with pytest.raises( TypeError, @@ -206,58 +187,47 @@ def test_dynamic_dose(data_registry, full_exposure_model: models.ExposureModel, def test_infection_probability( full_exposure_model: models.ExposureModel, dynamic_infected_single_exposure_model: models.ExposureModel, - dynamic_exposed_single_exposure_model: models.ExposureModel, dynamic_population_exposure_model: models.ExposureModel): base_infection_probability = full_exposure_model.infection_probability() npt.assert_almost_equal(base_infection_probability, dynamic_infected_single_exposure_model.infection_probability()) - npt.assert_almost_equal(base_infection_probability, dynamic_exposed_single_exposure_model.infection_probability()) npt.assert_almost_equal(base_infection_probability, dynamic_population_exposure_model.infection_probability()) +@pytest.mark.skip def test_dynamic_total_probability_rule( dynamic_infected_single_exposure_model: models.ExposureModel, - dynamic_exposed_single_exposure_model: models.ExposureModel, dynamic_population_exposure_model: models.ExposureModel): with pytest.raises(NotImplementedError, match=re.escape("Cannot compute total probability " "(including incidence rate) with dynamic occupancy")): dynamic_infected_single_exposure_model.total_probability_rule() - with pytest.raises(NotImplementedError, match=re.escape("Cannot compute total probability " - "(including incidence rate) with dynamic occupancy")): - dynamic_exposed_single_exposure_model.total_probability_rule() with pytest.raises(NotImplementedError, match=re.escape("Cannot compute total probability " "(including incidence rate) with dynamic occupancy")): dynamic_population_exposure_model.total_probability_rule() +@pytest.mark.skip def test_dynamic_expected_new_cases( dynamic_infected_single_exposure_model: models.ExposureModel, - dynamic_exposed_single_exposure_model: models.ExposureModel, dynamic_population_exposure_model: models.ExposureModel): with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases " "with dynamic occupancy")): dynamic_infected_single_exposure_model.expected_new_cases() - with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases " - "with dynamic occupancy")): - dynamic_exposed_single_exposure_model.expected_new_cases() with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases " "with dynamic occupancy")): dynamic_population_exposure_model.expected_new_cases() +@pytest.mark.skip def test_dynamic_reproduction_number( dynamic_infected_single_exposure_model: models.ExposureModel, - dynamic_exposed_single_exposure_model: models.ExposureModel, dynamic_population_exposure_model: models.ExposureModel): with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number " "with dynamic occupancy")): dynamic_infected_single_exposure_model.reproduction_number() - with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number " - "with dynamic occupancy")): - dynamic_exposed_single_exposure_model.reproduction_number() with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number " "with dynamic occupancy")): dynamic_population_exposure_model.reproduction_number() diff --git a/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js b/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js index f28bd5cb..f574ebb5 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js +++ b/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js @@ -1,6 +1,8 @@ // Input data for CO2 fitting algorithm const CO2_data_form = [ "CO2_data", + "dynamic_exposed_occupancy", + "dynamic_infected_occupancy", "exposed_coffee_break_option", "exposed_coffee_duration", "exposed_finish", @@ -17,14 +19,12 @@ const CO2_data_form = [ "infected_lunch_option", "infected_lunch_start", "infected_people", - "dynamic_infected_occupancy", "infected_start", + "occupancy_format", "room_capacity", "room_volume", "specific_breaks", - "total_people", - "dynamic_exposed_occupancy", - "occupancy_format", + "total_people" ]; // Method to upload a valid data file (accepted formats: .xls and .xlsx) diff --git a/cern_caimira/src/cern_caimira/apps/calculator/static/js/form.js b/cern_caimira/src/cern_caimira/apps/calculator/static/js/form.js index 631be3cc..2686ac60 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/static/js/form.js +++ b/cern_caimira/src/cern_caimira/apps/calculator/static/js/form.js @@ -631,22 +631,38 @@ function validate_form(form) { } // Generate the short-range interactions list - var short_range_interactions = []; - $(".form_field_outer_row").each(function (index, element){ - let obj = {}; - const $element = $(element); - obj.expiration = $element.find("[name='short_range_expiration']").val(); - obj.start_time = $element.find("[name='short_range_start_time']").val(); - obj.duration = $element.find("[name='short_range_duration']").val(); - short_range_interactions.push(JSON.stringify(obj)); + let short_range_interactions = {}; + $(".form_field_outer_row").each(function (index, element) { + const $element = $(element); + + let obj = {}; + obj.expiration = $element.find("[name='short_range_expiration']").val(); + obj.start_time = $element.find("[name='short_range_start_time']").val(); + obj.duration = parseFloat($element.find("[name='short_range_duration']").val()); + + const exposure_group = $element.find("[name='short_range_exposure_group']").val(); + + // If the exposure_group key already exists, push the new obj into the array + if (short_range_interactions[exposure_group]) { + short_range_interactions[exposure_group].push(obj); + } else { + // Otherwise, create a new array with the current obj + short_range_interactions[exposure_group] = [obj]; + } }); - // Sort list by time - short_range_interactions.sort(function (a, b) { - return JSON.parse(a).start_time.localeCompare(JSON.parse(b).start_time); - }); - $("input[type=text][name=short_range_interactions]").val('[' + short_range_interactions + ']'); - if (short_range_interactions.length == 0) { + // Sort each array within the short_range_interactions object by start_time + for (const key in short_range_interactions) { + short_range_interactions[key].sort(function (a, b) { + return a.start_time.localeCompare(b.start_time); + }); + } + + // Convert the entire object to a JSON string and assign it to the input field + $("input[type=text][name=short_range_interactions]").val(JSON.stringify(short_range_interactions)); + + // Check if there are no entries and update the radio button accordingly + if (Object.keys(short_range_interactions).length === 0) { $("input[type=radio][id=short_range_no]").prop("checked", true); on_short_range_option_change(); } @@ -907,18 +923,42 @@ $(document).ready(function () { } // Read short-range from URL - else if (name == 'short_range_interactions') { - let index = 1; - for (const interaction of JSON.parse(value)) { - $("#dialog_sr").append(inject_sr_interaction(index, value = interaction, is_validated="row_validated")) - $('#sr_expiration_no_' + String(index)).val(interaction.expiration).change(); - document.getElementById('sr_expiration_no_' + String(index)).disabled = true; - document.getElementById('sr_start_no_' + String(index)).disabled = true; - document.getElementById('sr_duration_no_' + String(index)).disabled = true; - document.getElementById('edit_row_no_' + String(index)).style.cssText = 'display:inline !important'; - document.getElementById('validate_row_no_' + String(index)).style.cssText = 'display: none !important'; - index++; + else if (name === 'short_range_interactions') { + // Parse the JSON value from the URL + let interactions = JSON.parse(value); + let index = 1; // Initialize interaction index + + // Iterate over each group in the interactions + for (const group in interactions) { + if (interactions.hasOwnProperty(group)) { + // Iterate over each interaction within the group + for (const interaction of interactions[group]) { + // Append the interaction row to the dialog + $("#dialog_sr").append(inject_sr_interaction(index, interaction, "row_validated")); + + // Set the values for each input field based on the interaction + $('#sr_expiration_no_' + index).val(interaction.expiration).change(); + document.getElementById('sr_start_no_' + index).value = interaction.start_time; // Set start time + document.getElementById('sr_duration_no_' + index).value = interaction.duration; // Set duration + document.getElementById('sr_group_no_' + index).value = group; // Set exposure group + + // Disable the input fields for editing + document.getElementById('sr_expiration_no_' + index).disabled = true; + document.getElementById('sr_start_no_' + index).disabled = true; + document.getElementById('sr_duration_no_' + index).disabled = true; + document.getElementById('sr_group_no_' + index).disabled = true; + + // Update visibility of editing and validation rows + document.getElementById('edit_row_no_' + index).style.display = 'inline'; + document.getElementById('validate_row_no_' + index).style.display = 'none'; + + // Increment the index for the next interaction + index++; + } + } } + + // Update the total count of interactions displayed $("#sr_interactions").text(index - 1); } @@ -1196,6 +1236,11 @@ $(document).ready(function () {

+
+
+

+
+
@@ -1213,11 +1258,11 @@ $(document).ready(function () { // When short_range_yes option is selected, we want to inject rows for each expiractory activity, start_time and duration. $("body").on("click", ".add_node_btn_frm_field", function(e) { let last_row = $(".form_field_outer").find(".form_field_outer_row"); - if (last_row.length == 0) $("#dialog_sr").append(inject_sr_interaction(1, value = { activity: "", start_time: "", duration: "15" })); + if (last_row.length == 0) $("#dialog_sr").append(inject_sr_interaction(1, value = { activity: "", start_time: "", duration: "15", exposure_group: "A" })); else { last_index = last_row.last().find(".short_range_option").prop("id").split("_").slice(-1)[0]; index = parseInt(last_index) + 1; - $("#dialog_sr").append(inject_sr_interaction(index, value = { activity: "", start_time: "", duration: "15" })); + $("#dialog_sr").append(inject_sr_interaction(index, value = { activity: "", start_time: "", duration: "15", exposure_group: "A"})); } }); diff --git a/cern_caimira/src/cern_caimira/apps/calculator/static/js/report.js b/cern_caimira/src/cern_caimira/apps/calculator/static/js/report.js index 786794d0..20a3be07 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/static/js/report.js +++ b/cern_caimira/src/cern_caimira/apps/calculator/static/js/report.js @@ -10,7 +10,7 @@ function draw_plot(svg_id) { let button_full_exposure = document.getElementById("button_full_exposure"); let button_hide_high_concentration = document.getElementById("button_hide_high_concentration"); let long_range_checkbox = document.getElementById('long_range_cumulative_checkbox') - let show_sr_legend = short_range_expirations.length > 0; + let show_sr_legend = short_range_expirations?.length > 0; var data_for_graphs = { 'concentrations': [], @@ -192,7 +192,7 @@ function draw_plot(svg_id) { // Area representing the short-range interaction(s). var shortRangeArea = {}; var drawShortRangeArea = {}; - short_range_intervals.forEach((b, index) => { + short_range_intervals?.forEach((b, index) => { shortRangeArea[index] = d3.area(); drawShortRangeArea[index] = draw_area.append('svg:path'); diff --git a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 index 600f92a4..77f26f90 100644 --- a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 +++ b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 @@ -455,8 +455,8 @@

{# "static" vs. "dynamic" #} - - + +
diff --git a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 index e81cc53f..f106ac01 100644 --- a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 +++ b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 @@ -104,7 +104,7 @@
{% endblock long_range_warning_animation %} - {% if form.occupancy_format == "static" %}
Expected new cases: {{ long_range_expected_cases | float_format }}
{% endif %} +
Expected new cases: {{ long_range_expected_cases | float_format }}

{% if form.short_range_option == "short_range_yes" %} @@ -126,9 +126,7 @@ {% endblock warning_animation %} - {% if form.occupancy_format == "static" %} -
Expected new cases: {{ expected_new_cases | float_format }}
- {% endif %} +
Expected new cases: {{ expected_new_cases | float_format }}
{% endif %}
@@ -136,17 +134,13 @@
{% if form.short_range_option == "short_range_yes" %}
{% endif %} {% block probabilistic_exposure_probability %} @@ -636,14 +630,19 @@ {% if form.short_range_option == "short_range_yes" %}
  • Total number of occupants having short-range interactions: {{ form.short_range_occupants }}

    • - {% for interaction in form.short_range_interactions %} -
    • Interaction no. {{ loop.index }}: -
        -
      • Expiratory activity: {{ "Shouting/Singing" if interaction.expiration == "Shouting" else interaction.expiration }}
      • -
      • Start time: {{ interaction.start_time }}
      • -
      • Duration: {{ interaction.duration }} {{ "minutes" if interaction.duration|float > 1 else "minute" }}
      • -
      -
    • + {% for key, interactions in form.short_range_interactions.items() %} +
    • Interactions for group "{{ key }}":
    • +
        + {% for interaction in interactions %} +
      • Interaction no. {{ loop.index }}: +
          +
        • Expiratory activity: {{ "Shouting/Singing" if interaction.expiration == "Shouting" else interaction.expiration }}
        • +
        • Start time: {{ interaction.start_time }}
        • +
        • Duration: {{ interaction.duration }} {{ "minutes" if interaction.duration|float > 1 else "minute" }}
        • +
        +
      • + {% endfor %} +
      {% endfor %}
    {% endif %} diff --git a/cern_caimira/src/cern_caimira/apps/templates/cern/calculator.report.html.j2 b/cern_caimira/src/cern_caimira/apps/templates/cern/calculator.report.html.j2 index 4248b65b..2a151f4b 100644 --- a/cern_caimira/src/cern_caimira/apps/templates/cern/calculator.report.html.j2 +++ b/cern_caimira/src/cern_caimira/apps/templates/cern/calculator.report.html.j2 @@ -70,10 +70,8 @@ {% if form.short_range_option == "short_range_yes" %}
    @@ -88,9 +86,7 @@ Acceptable: {% endif %} In this scenario, the probability the occupant(s) exposed to short-range interactions get infected can go as high as {{ prob_inf | non_zero_percentage }} - {% if form.occupancy_format == "static" %} - and the expected number of new cases increases to {{ expected_new_cases | float_format }} - {% endif %}. + and the expected number of new cases increases to {{ expected_new_cases | float_format }}
    {% endif %} diff --git a/cern_caimira/tests/conftest.py b/cern_caimira/tests/conftest.py index d0d04de2..8958147d 100644 --- a/cern_caimira/tests/conftest.py +++ b/cern_caimira/tests/conftest.py @@ -90,6 +90,6 @@ def exposure_model_w_outside_temp_changes(data_registry, baseline_exposure_model def baseline_form_with_sr(baseline_form_data, data_registry): form_data_sr = baseline_form_data form_data_sr['short_range_option'] = 'short_range_yes' - form_data_sr['short_range_interactions'] = '[{"expiration": "Shouting", "start_time": "10:30", "duration": "30"}]' + form_data_sr['short_range_interactions'] = '{"group_1": [{"expiration": "Shouting", "start_time": "10:30", "duration": 30}]}' form_data_sr['short_range_occupants'] = 5 return virus_validator.VirusFormData.from_dict(form_data_sr, data_registry) diff --git a/cern_caimira/tests/test_report_generator.py b/cern_caimira/tests/test_report_generator.py index 138f2a2b..460fdcd1 100644 --- a/cern_caimira/tests/test_report_generator.py +++ b/cern_caimira/tests/test_report_generator.py @@ -123,62 +123,3 @@ def test_expected_new_cases(baseline_form_with_sr: VirusFormData): lr_expected_new_cases = alternative_statistics['stats']['Base scenario without short-range interactions']['expected_new_cases'] np.testing.assert_almost_equal(sr_lr_expected_new_cases, lr_expected_new_cases + sr_lr_prob_inf * baseline_form_with_sr.short_range_occupants, 2) - - -def test_static_vs_dynamic_occupancy_from_form(baseline_form_data, data_registry): - """ - Assert that the results between a static and dynamic occupancy model (from form inputs) are similar. - """ - executor_factory = partial( - concurrent.futures.ThreadPoolExecutor, 1, - ) - - # By default the baseline form accepts static occupancy - static_occupancy_baseline_form: VirusFormData = VirusFormData.from_dict(baseline_form_data, data_registry) - static_occupancy_model = static_occupancy_baseline_form.build_model() - static_occupancy_report_data = rep_gen.calculate_report_data(static_occupancy_baseline_form, executor_factory) - - # Update the initial form data to include dynamic occupancy (please note the 4 coffee and 1 lunch breaks) - baseline_form_data['occupancy_format'] = 'dynamic' - baseline_form_data['dynamic_infected_occupancy'] = json.dumps([ - {'total_people': 1, 'start_time': '09:00', 'finish_time': '10:03'}, - {'total_people': 0, 'start_time': '10:03', 'finish_time': '10:13'}, - {'total_people': 1, 'start_time': '10:13', 'finish_time': '11:16'}, - {'total_people': 0, 'start_time': '11:16', 'finish_time': '11:26'}, - {'total_people': 1, 'start_time': '11:26', 'finish_time': '12:30'}, - {'total_people': 0, 'start_time': '12:30', 'finish_time': '13:30'}, - {'total_people': 1, 'start_time': '13:30', 'finish_time': '14:53'}, - {'total_people': 0, 'start_time': '14:53', 'finish_time': '15:03'}, - {'total_people': 1, 'start_time': '15:03', 'finish_time': '16:26'}, - {'total_people': 0, 'start_time': '16:26', 'finish_time': '16:36'}, - {'total_people': 1, 'start_time': '16:36', 'finish_time': '18:00'}, - ]) - baseline_form_data['dynamic_exposed_occupancy'] = json.dumps([ - {'total_people': 9, 'start_time': '09:00', 'finish_time': '10:03'}, - {'total_people': 0, 'start_time': '10:03', 'finish_time': '10:13'}, - {'total_people': 9, 'start_time': '10:13', 'finish_time': '11:16'}, - {'total_people': 0, 'start_time': '11:16', 'finish_time': '11:26'}, - {'total_people': 9, 'start_time': '11:26', 'finish_time': '12:30'}, - {'total_people': 0, 'start_time': '12:30', 'finish_time': '13:30'}, - {'total_people': 9, 'start_time': '13:30', 'finish_time': '14:53'}, - {'total_people': 0, 'start_time': '14:53', 'finish_time': '15:03'}, - {'total_people': 9, 'start_time': '15:03', 'finish_time': '16:26'}, - {'total_people': 0, 'start_time': '16:26', 'finish_time': '16:36'}, - {'total_people': 9, 'start_time': '16:36', 'finish_time': '18:00'}, - ]) - baseline_form_data['total_people'] = 0 - baseline_form_data['infected_people'] = 0 - - dynamic_occupancy_baseline_form: VirusFormData = VirusFormData.from_dict(baseline_form_data, data_registry) - dynamic_occupancy_model = dynamic_occupancy_baseline_form.build_model() - dynamic_occupancy_report_data = rep_gen.calculate_report_data(dynamic_occupancy_baseline_form, executor_factory) - - assert (list(sorted(static_occupancy_model.concentration_model.infected.presence.transition_times())) == - list(dynamic_occupancy_model.concentration_model.infected.number.transition_times)) - assert (list(sorted(static_occupancy_model.exposed.presence.transition_times())) == - list(dynamic_occupancy_model.exposed.number.transition_times)) - - np.testing.assert_almost_equal(static_occupancy_report_data['prob_inf'], dynamic_occupancy_report_data['prob_inf'], 1) - assert dynamic_occupancy_report_data['expected_new_cases'] == None - assert dynamic_occupancy_report_data['prob_probabilistic_exposure'] == None - \ No newline at end of file From d46b5846a1beedeef1e093f7817d1d0bd59e7d6d Mon Sep 17 00:00:00 2001 From: lrdossan Date: Thu, 19 Dec 2024 16:46:19 +0000 Subject: [PATCH 02/21] fix to model_start_end and reproduction number methods --- .../src/caimira/calculator/models/models.py | 22 ++-------------- .../calculator/report/virus_report_data.py | 25 ++++++------------- 2 files changed, 9 insertions(+), 38 deletions(-) diff --git a/caimira/src/caimira/calculator/models/models.py b/caimira/src/caimira/calculator/models/models.py index 9d8c073a..d5915308 100644 --- a/caimira/src/caimira/calculator/models/models.py +++ b/caimira/src/caimira/calculator/models/models.py @@ -1944,24 +1944,6 @@ class ExposureModelGroup: def reproduction_number(self) -> _VectorisedFloat: """ - Reproduction number considering the contribution - of each individual probability of infection and - a single infected occupant. + Expected number of cases when there is only one infected case. """ - single_exposure_models = [] - for model in self.exposure_models: - if model.concentration_model.infected.number != 1: - model = nested_replace( - self, { - 'model.concentration_model.infected.number': 1 - } - ) - single_exposure_models.append(model) - - single_exposure_model_group = nested_replace( - self, { - 'exposure_models': single_exposure_models, - } - ) - return single_exposure_model_group.expected_new_cases() - \ No newline at end of file + return np.sum([model.reproduction_number() for model in self.exposure_models], axis=0) # type: ignore diff --git a/caimira/src/caimira/calculator/report/virus_report_data.py b/caimira/src/caimira/calculator/report/virus_report_data.py index 1cf96595..527a7229 100644 --- a/caimira/src/caimira/calculator/report/virus_report_data.py +++ b/caimira/src/caimira/calculator/report/virus_report_data.py @@ -11,22 +11,7 @@ from caimira.calculator.models.enums import ViralLoads from caimira.calculator.validators.virus.virus_validator import VirusFormData -def model_start_end(model: models.ExposureModel): - """ - Calculates the start and end times for a single ExposureModel. - - Determines the boundary times of an ExposureModel by comparing - the presence intervals of both the exposed and the infected people. - """ - t_start = min(model.exposed.presence_interval().boundaries()[0][0], - model.concentration_model.infected.presence_interval().boundaries()[0][0]) - t_end = max(model.exposed.presence_interval().boundaries()[-1][1], - model.concentration_model.infected.presence_interval().boundaries()[-1][1]) - - return t_start, t_end - - -def model_boundary_times(model: typing.Union[models.ExposureModelGroup, models.ExposureModel]): +def model_start_end(model: typing.Union[models.ExposureModelGroup, models.ExposureModel]): """ Calculates the boundary times for an ExposureModel or ExposureModelGroup. @@ -40,7 +25,11 @@ def model_boundary_times(model: typing.Union[models.ExposureModelGroup, models.E t_end = max((model_start_end(nth_model)[1] for nth_model in model.exposure_models)) return t_start, t_end else: - return model_start_end(model) + t_start = min(model.exposed.presence_interval().boundaries()[0][0], + model.concentration_model.infected.presence_interval().boundaries()[0][0]) + t_end = max(model.exposed.presence_interval().boundaries()[-1][1], + model.concentration_model.infected.presence_interval().boundaries()[-1][1]) + return t_start, t_end def fill_big_gaps(array, gap_size): @@ -84,7 +73,7 @@ def non_temp_transition_times(model: typing.Union[models.ExposureModelGroup, mod else: yield name, obj - t_start, t_end = model_boundary_times(model) + t_start, t_end = model_start_end(model) change_times = {t_start, t_end} for _, obj in walk_model(model, name="exposure"): From 61c6b2c2c327ede97a580cac14af4861aa15ea01 Mon Sep 17 00:00:00 2001 From: lrdossan Date: Tue, 28 Jan 2025 16:49:21 +0100 Subject: [PATCH 03/21] Enhancements for Dynamic and Static Occupancy Handling - Added default 'static' keyword to single group with static occupancy - Added default values for 'dynamic_exposed_occupancy' and 'dynamic_infected_occupancy' - Adapted default model to use ExposureModelGroup instance when static occupancy is defined - Updated JS methods to generate multiple plots when dynamic occupancy is defined - Handled the alternative scenario generation when dynamic occupancy is defined - Improved results generation for exposure groups - Updated HTML report for visualising results across multiple groups - Enhanced model representations --- .../api/controller/virus_report_controller.py | 1 + .../src/caimira/calculator/models/models.py | 5 +- .../calculator/report/virus_report_data.py | 381 +++++----- .../calculator/validators/form_validator.py | 1 - .../validators/virus/virus_validator.py | 31 +- .../apps/calculator/test_model_generator.py | 2 +- .../apps/calculator/report/virus_report.py | 4 +- .../apps/calculator/static/css/report.css | 6 +- .../apps/calculator/static/js/co2_form.js | 1 + .../apps/calculator/static/js/report.js | 38 +- .../templates/base/calculator.form.html.j2 | 4 +- .../templates/base/calculator.report.html.j2 | 671 +++++++++--------- .../templates/cern/calculator.report.html.j2 | 26 +- cern_caimira/tests/conftest.py | 2 +- cern_caimira/tests/test_report_generator.py | 4 +- 15 files changed, 597 insertions(+), 580 deletions(-) diff --git a/caimira/src/caimira/api/controller/virus_report_controller.py b/caimira/src/caimira/api/controller/virus_report_controller.py index 4375cc35..01da4ca4 100644 --- a/caimira/src/caimira/api/controller/virus_report_controller.py +++ b/caimira/src/caimira/api/controller/virus_report_controller.py @@ -32,5 +32,6 @@ def submit_virus_form(form_data: typing.Dict, report_generation_parallelism: typ # Handle model representation if report_data['model']: report_data['model'] = repr(report_data['model']) + for single_group_output in report_data['groups'].values(): del single_group_output['model'] # Model representation per group not needed return report_data diff --git a/caimira/src/caimira/calculator/models/models.py b/caimira/src/caimira/calculator/models/models.py index d5915308..fce737f4 100644 --- a/caimira/src/caimira/calculator/models/models.py +++ b/caimira/src/caimira/calculator/models/models.py @@ -666,7 +666,7 @@ class Particle: # deposition fraction depends on aerosol particle diameter. d = (self.diameter * evaporation_factor) IFrac = 1 - 0.5 * (1 - (1 / (1 + (0.00076*(d**2.8))))) - fdep = IFrac * (0.0587 + fdep = IFrac * (0.0587 # type: ignore + (0.911/(1 + np.exp(4.77 + 1.485 * np.log(d)))) + (0.943/(1 + np.exp(0.508 - 2.58 * np.log(d))))) # type: ignore return fdep @@ -1642,6 +1642,9 @@ class ExposureModel: #: Total people with short-range interactions exposed_to_short_range: int = 0 + #: Unique group identifier + identifier: str = 'static' + #: The number of times the exposure event is repeated (default 1). @property def repeats(self) -> int: diff --git a/caimira/src/caimira/calculator/report/virus_report_data.py b/caimira/src/caimira/calculator/report/virus_report_data.py index 527a7229..ffa9d4b8 100644 --- a/caimira/src/caimira/calculator/report/virus_report_data.py +++ b/caimira/src/caimira/calculator/report/virus_report_data.py @@ -5,6 +5,9 @@ import io import typing import numpy as np import matplotlib.pyplot as plt +import matplotlib +matplotlib.use('Agg') +from collections import defaultdict from caimira.calculator.models import models, dataclass_utils, profiler, monte_carlo as mc from caimira.calculator.models.enums import ViralLoads @@ -108,208 +111,220 @@ def interesting_times(model: typing.Union[models.ExposureModelGroup, models.Expo return nice_times -def process_short_range_interactions(model: typing.Union[models.ExposureModelGroup, models.ExposureModel], - times: typing.List[float]): +def _concentrations_with_sr_breathing(form: VirusFormData, model: models.ExposureModel, + time: float, fn_name: typing.Optional[str] = None): """ - Process both ExposureModel and ExposureModelGroup for short-range - expirations, intervals and concentrations. Returns a tuple containing - lower concentrations, short-range expirations, and short-range intervals. + Returns the zoomed viral concentrations. """ - if isinstance(model, models.ExposureModelGroup): - model_list = model.exposure_models - elif isinstance(model, models.ExposureModel): - model_list = (model,) - else: - raise TypeError(f"Model should be either an instance of ExposureModel or ExposureModelGroup. Got '{type(model)}.'") - - # Collect short-range expirations and intervals - short_range_expirations: typing.List[str] = [] - short_range_intervals: typing.List[models.BoundarySequence_t] = [] - for model in model_list: - for short_range_model in model.short_range: - short_range_expirations.append(short_range_model.expiration_def) # type: ignore - short_range_intervals.extend(short_range_model.presence.boundaries()) - - # Collect lower concentrations (including Breathing) - lower_concentrations = [] - for time in times: - breathing_found = False - for model in model_list: - for short_range_model in model.short_range: - ((start, stop),) = short_range_model.presence.boundaries() - - # Check if the expiration is "Breathing" and the if time is within boundaries - if short_range_model.expiration_def == 'Breathing' and (start <= time <= stop): - lower_concentrations.append(np.sum([np.array(model.concentration(float(time))).mean() for model in model_list])) - breathing_found = True - break - - if breathing_found: - break - - lower_concentrations.append(np.sum([np.array(model.concentration_model.concentration(float(time))).mean() for model in model_list])) - - return lower_concentrations, short_range_expirations, short_range_intervals + for index, (start, stop) in enumerate([interaction.presence.boundaries()[0] for interaction in model.short_range]): + if start <= time <= stop and form.short_range_interactions[model.identifier][index]['expiration'] == 'Breathing': + return np.array(model.concentration(float(time))).mean(), fn_name + return np.array(model.concentration_model.concentration(float(time))).mean(), fn_name -def _calculate_deposited_exposure(model: typing.Union[models.ExposureModelGroup, models.ExposureModel], +def _calculate_deposited_exposure(model: models.ExposureModel, time1: float, time2: float, fn_name: typing.Optional[str] = None): - if isinstance(model, models.ExposureModelGroup): - return np.sum([np.array(nth_model.deposited_exposure_between_bounds(float(time1), float(time2))).mean() for nth_model in model.exposure_models]), fn_name - else: - return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name + return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name -def _calculate_long_range_deposited_exposure(model: typing.Union[models.ExposureModelGroup, models.ExposureModel], +def _calculate_long_range_deposited_exposure(model: models.ExposureModel, time1: float, time2: float, fn_name: typing.Optional[str] = None): - if isinstance(model, models.ExposureModelGroup): - return np.sum([np.array(nth_model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean() for nth_model in model.exposure_models]), fn_name - else: - return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name + return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name -def _calculate_concentration(model: typing.Union[models.ExposureModelGroup, models.ExposureModel], +def _calculate_concentration(model: models.ExposureModel, time: float, fn_name: typing.Optional[str] = None): - if isinstance(model, models.ExposureModelGroup): - return np.sum([np.array(nth_model.concentration(float(time))).mean() for nth_model in model.exposure_models]), fn_name - else: - return np.array(model.concentration(float(time))).mean(), fn_name + """ + Returns the concentration of viruses emitted by + the infected population. Short- and long-range included. + """ + return np.array(model.concentration(float(time))).mean(), fn_name def _calculate_co2_concentration(CO2_model: models.CO2ConcentrationModel, time: float, fn_name: typing.Optional[str] = None): + """ + Returns the CO2 concentration emitted by all + the present population. + """ return np.array(CO2_model.concentration(float(time))).mean(), fn_name +def merge_intervals(intervals: typing.List[typing.List[float]]) -> typing.List[typing.List[float]]: + """ + Merges overlapping intervals from a list of intervals. + Assumes intervals are sorted based on start times. + """ + if not intervals: + return [] + + merged = [list(intervals[0])] + for start, end in intervals[1:]: + if merged[-1][1] < start: + merged.append([start, end]) + else: + merged[-1][1] = max(merged[-1][1], end) + return merged + + +def merge_short_range_interactions(all_exposed_groups: typing.Dict[str, typing.Any]) -> typing.List[typing.Dict[str, typing.Any]]: + """ + Expands the short range interactions per exposed group to a single data structure. + """ + merged_interactions = defaultdict(list) + for group in all_exposed_groups.values(): + for interaction in group["short_range_interactions"]: + merged_interactions[interaction["expiration"]].extend(interaction["presence_interval"]) + + # Merge and sort intervals + return [ + {"expiration": exp, "presence_interval": merge_intervals(sorted(intervals, key=lambda x: x[0]))} + for exp, intervals in merged_interactions.items() + ] + + +def group_results(form: VirusFormData, model_group: models.ExposureModelGroup) -> typing.Dict[str, typing.Any]: + """ + Generates the output per group of exposure models. + """ + groups: dict = defaultdict(dict) + for single_group in model_group.exposure_models: + # Probability of infection + prob = single_group.infection_probability() + prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True) + + # Expected new cases + expected_new_cases = np.array(single_group.expected_new_cases()) + + groups[single_group.identifier] = { + "model": single_group, + "prob_inf": prob.mean(), + "prob_inf_sd": prob.std(), + "prob_dist": list(prob), + "prob_hist_count": list(prob_dist_count), + "prob_hist_bins": list(prob_dist_bins), + "expected_new_cases": expected_new_cases.mean(), + "exposed_presence_intervals": list(single_group.exposed.presence_interval().boundaries()), + } + + # In case of conditional probability plot + if (form.conditional_probability_viral_loads and + single_group.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] == ViralLoads.COVID_OVERALL.value): # type: ignore + conditional_probability_data = manufacture_conditional_probability_data(single_group, prob) + groups[single_group.identifier].update({ + "conditional_probability_data": conditional_probability_data, + "uncertainties_plot_src": img2base64(_figure2bytes(uncertainties_plot(prob, conditional_probability_data))) + }) + + # Probabilistic exposure + if form.exposure_option == "p_probabilistic_exposure": + groups[single_group.identifier].update({ + "prob_probabilistic_exposure": np.array(single_group.total_probability_rule()).mean() + }) + + # In case of short-range interactions + if single_group.short_range != (): + # Short range outputs + short_range_interactions: dict = defaultdict(list) + for short_range_model in single_group.short_range: + short_range_interactions[short_range_model.expiration_def].extend( + short_range_model.presence.boundaries() + ) + + long_range_single_group = dataclass_utils.nested_replace( + single_group, {'short_range': ()} + ) + groups[single_group.identifier].update({ + "long_range_prob": long_range_single_group.infection_probability().mean(), + "long_range_expected_new_cases": long_range_single_group.expected_new_cases().mean(), + "short_range_interactions": [ + {"expiration": expiration, "presence_interval": intervals} + for expiration, intervals in short_range_interactions.items() + ], + }) + + return groups + + @profiler.profile def calculate_report_data(form: VirusFormData, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]: """ - General output data of a test scenario. + Simulation output data. """ - model: typing.Union[models.ExposureModel, models.ExposureModelGroup] = form.build_model() - times = interesting_times(model) - - if isinstance(model, models.ExposureModelGroup): - exposed_presence_intervals = [] - probabilities_of_infection = [] - for nth_model in model.exposure_models: - exposed_presence_intervals.extend(list(nth_model.exposed.presence_interval().boundaries())) - probabilities_of_infection.append(nth_model.infection_probability()) - index_of_max_mean = max( - range(len(probabilities_of_infection)), - key=lambda i: probabilities_of_infection[i].mean() - ) - probability_of_infection = probabilities_of_infection[index_of_max_mean] - elif isinstance(model, models.ExposureModel): - exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()] - probability_of_infection = model.infection_probability() - else: - raise TypeError(f"Model should be either an instance of ExposureModel or ExposureModelGroup. Got '{type(model)}.'") - - # Handle short-range related outputs - lower_concentrations, short_range_expirations, short_range_intervals = None, None, None - # Short-range related data: - if (form.short_range_option == "short_range_yes"): - lower_concentrations, short_range_expirations, short_range_intervals = process_short_range_interactions(model, times) - - # Probability of infection - prob = probability_of_infection - prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True) + model_group: models.ExposureModelGroup = form.build_model() + results_per_group: typing.Dict[str, typing.Any] = group_results(form, model_group) - # Expected new cases - expected_new_cases = np.array(model.expected_new_cases()).mean() - - # Expected number of new cases per group - # expected_new_cases_per_group = [np.array(model.expected_new_cases()).mean() for model in models_set.exposure_models] + times = interesting_times(model_group) # CO2 concentration CO2_model: models.CO2ConcentrationModel = form.build_CO2_model() # Compute deposited exposures and virus/CO2 concentrations in parallel to increase performance - deposited_exposures = [] - long_range_deposited_exposures = [] + deposited_exposures = defaultdict(list) + long_range_deposited_exposures = defaultdict(list) + concentrations = defaultdict(list) + concentrations_zoomed = defaultdict(list) CO2_concentrations = [] - concentrations = [] tasks = [] - with executor_factory() as executor: # TODO: parallelism in the models + with executor_factory() as executor: for time1, time2 in zip(times[:-1], times[1:]): - tasks.append(executor.submit( - _calculate_deposited_exposure, model, time1, time2, fn_name="de")) - tasks.append(executor.submit( - _calculate_long_range_deposited_exposure, model, time1, time2, fn_name="de_lr")) - tasks.append(executor.submit( - _calculate_concentration, model, time1, fn_name="cn")) - # virus and co2 concentration: takes each time as param, not the interval + for single_group in model_group.exposure_models: + tasks.append(executor.submit( + _calculate_deposited_exposure, single_group, time1, time2, fn_name=f"{single_group.identifier}:de")) + # virus and co2 concentration: takes each time as param, not the interval + tasks.append(executor.submit( + _calculate_concentration, single_group, time1, fn_name=f"{single_group.identifier}:cn")) + if single_group.short_range != (): + tasks.append(executor.submit( + _calculate_long_range_deposited_exposure, single_group, time1, time2, fn_name=f"{single_group.identifier}:de_lr")) + tasks.append(executor.submit( + _concentrations_with_sr_breathing, form, single_group, time1, fn_name=f"{single_group.identifier}:cn_zoomed")) + tasks.append(executor.submit( _calculate_co2_concentration, CO2_model, time1, fn_name="co2")) + # virus and co2 concentration: calculate the last time too - tasks.append(executor.submit( _calculate_concentration, - model, times[-1], fn_name="cn")) + for single_model in model_group.exposure_models: + tasks.append(executor.submit(_calculate_concentration, + single_model, times[-1], fn_name=f"{single_model.identifier}:cn")) + if single_group.short_range != (): + tasks.append(executor.submit(_concentrations_with_sr_breathing, + form, single_model, times[-1], fn_name=f"{single_model.identifier}:cn_zoomed")) + tasks.append(executor.submit(_calculate_co2_concentration, CO2_model, times[-1], fn_name="co2")) for task in tasks: result, fn_name = task.result() - if fn_name == "de": - deposited_exposures.append(result) - elif fn_name == "de_lr": - long_range_deposited_exposures.append(result) - elif fn_name == "cn": - concentrations.append(result) - elif fn_name == "co2": - CO2_concentrations.append(result) - - cumulative_doses = np.cumsum(deposited_exposures) - long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures) - - prob_probabilistic_exposure = None - if isinstance(model, models.ExposureModel) and form.exposure_option == "p_probabilistic_exposure": - prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean() - - conditional_probability_data = None - uncertainties_plot_src = None - if form.conditional_probability_viral_loads: - if isinstance(model, models.ExposureModelGroup): - all_the_same_virus = True - for nth_model in model.exposure_models: - if nth_model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] != ViralLoads.COVID_OVERALL.value: # type: ignore - all_the_same_virus = False - if all_the_same_virus: - # Given the similarities, pick the first exposure model - the_model: models.ExposureModel = model.exposure_models[0] - # Generate all the required data for the conditional probability plot - conditional_probability_data = manufacture_conditional_probability_data(the_model, prob) - # Generate the matplotlib image based on the received data - uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(prob, conditional_probability_data))) - elif isinstance(model, models.ExposureModel): - if model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] != ViralLoads.COVID_OVERALL.value: # type: ignore - # Generate all the required data for the conditional probability plot - conditional_probability_data = manufacture_conditional_probability_data(model, prob) - # Generate the matplotlib image based on the received data - uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(prob, conditional_probability_data))) + if ":" in fn_name: + if fn_name.split(":")[1] == "de": + deposited_exposures[fn_name.split(':')[0]].append(result) + elif fn_name.split(":")[1] == "de_lr": + long_range_deposited_exposures[fn_name.split(':')[0]].append(result) + elif fn_name.split(":")[1] == "cn": + concentrations[fn_name.split(':')[0]].append(result) + elif fn_name.split(":")[1] == "cn_zoomed": + concentrations_zoomed[fn_name.split(':')[0]].append(result) else: - raise TypeError(f"Model should be either an instance of ExposureModel or ExposureModelGroup. Got '{type(model)}.'") + if fn_name == "co2": + CO2_concentrations.append(result) + # Update results per group + for single_group in model_group.exposure_models: + results_per_group[single_group.identifier]["concentrations"] = concentrations[single_group.identifier] + results_per_group[single_group.identifier]["cumulative_doses"] = list(np.cumsum(deposited_exposures[single_group.identifier])) + # Calculate long_range results when short-range interactions are defined + if single_group.short_range != (): + results_per_group[single_group.identifier]["concentrations_zoomed"] = concentrations_zoomed[single_group.identifier] + results_per_group[single_group.identifier]["long_range_cumulative_doses"] = list(np.cumsum(long_range_deposited_exposures[single_group.identifier])) + return { - "model": model.exposure_models[0] if isinstance(model, models.ExposureModelGroup) else model, # TODO: which model do we want to show info about? + # General results across all groups + "model": model_group.exposure_models[0], "times": list(times), - "exposed_presence_intervals": exposed_presence_intervals, - "short_range_intervals": short_range_intervals, - "short_range_expirations": short_range_expirations, - "concentrations": list(concentrations), - "concentrations_zoomed": lower_concentrations, - "cumulative_doses": list(cumulative_doses), - "long_range_cumulative_doses": list(long_range_cumulative_doses), - "prob_inf": prob.mean(), - "prob_inf_sd": prob.std(), - "prob_dist": list(prob), - "prob_hist_count": list(prob_dist_count), - "prob_hist_bins": list(prob_dist_bins), - "prob_probabilistic_exposure": prob_probabilistic_exposure, - "expected_new_cases": expected_new_cases, - "uncertainties_plot_src": uncertainties_plot_src, "CO2_concentrations": CO2_concentrations, - "conditional_probability_data": conditional_probability_data, + # Group specific results + "groups": results_per_group, } @@ -435,9 +450,7 @@ def calculate_vl_scenarios_percentiles(model: mc.ExposureModel) -> typing.Dict[s scenarios = {} for percentil in (0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99): vl = np.quantile(viral_load, percentil) - specific_vl_scenario = dataclass_utils.nested_replace(model, - {'concentration_model.infected.virus.viral_load_in_sputum': vl} - ) + specific_vl_scenario = dataclass_utils.nested_replace(model, {'concentration_model.infected.virus.viral_load_in_sputum': vl}) scenarios[str(vl)] = np.mean( specific_vl_scenario.infection_probability()) return { @@ -517,55 +530,43 @@ def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, t def scenario_statistics( - mc_model: typing.Union[mc.ExposureModel, mc.ExposureModelGroup], + mc_model_group: mc.ExposureModelGroup, sample_times: typing.List[float], compute_prob_exposure: bool, ): - model: typing.Union[models.ExposureModelGroup, models.ExposureModel] = mc_model.build_model( - size=mc_model.data_registry.monte_carlo['sample_size']) + model_group: models.ExposureModelGroup = mc_model_group.build_model( + size=mc_model_group.data_registry.monte_carlo['sample_size']) + model = model_group.exposure_models[0] - if isinstance(model, models.ExposureModelGroup): - concentrations = [ - np.sum([np.array(nth_model.concentration(float(time))).mean() for nth_model in model.exposure_models]) - for time in sample_times - ] - prob = np.max([np.mean(nth_model.infection_probability()) for nth_model in model.exposure_models]) - elif isinstance(model, models.ExposureModel): - concentrations = [ + return { + 'probability_of_infection': np.mean(model.infection_probability()), + 'expected_new_cases': np.mean(model.expected_new_cases()), + 'concentrations': [ np.mean(model.concentration(time)) for time in sample_times - ] - prob = np.mean(model.infection_probability()) - else: - raise TypeError(f"Model should be either an instance of ExposureModel or ExposureModelGroup. Got '{type(model)}.'") - - return { - 'probability_of_infection': prob, - 'expected_new_cases': np.mean(model.expected_new_cases()), - 'concentrations': concentrations, - 'prob_probabilistic_exposure': model.total_probability_rule() if isinstance(model, models.ExposureModel) and compute_prob_exposure else None + ], + 'prob_probabilistic_exposure': model.total_probability_rule() if compute_prob_exposure else None, } def comparison_report( form: VirusFormData, report_data: typing.Dict[str, typing.Any], - scenarios: typing.Dict[str, typing.Union[mc.ExposureModel, mc.ExposureModelGroup]], + scenarios: typing.Dict[str, mc.ExposureModelGroup], executor_factory: typing.Callable[[], concurrent.futures.Executor], -): +): if (form.short_range_option == "short_range_no"): statistics = { 'Current scenario': { - 'probability_of_infection': report_data['prob_inf'], - 'expected_new_cases': report_data['expected_new_cases'], - 'concentrations': report_data['concentrations'], + 'probability_of_infection': report_data['groups']['static']['prob_inf'], + 'expected_new_cases': report_data['groups']['static']['expected_new_cases'], + 'concentrations': report_data['groups']['static']['concentrations'], } } else: statistics = {} - static_occupancy = form.occupancy_format == "static" - compute_prob_exposure = form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure" and static_occupancy + compute_prob_exposure = form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure" and form.occupancy_format == "static" with executor_factory() as executor: results = executor.map( @@ -576,7 +577,7 @@ def comparison_report( timeout=60, ) - for (name, model), model_stats in zip(scenarios.items(), results): + for (name, _), model_stats in zip(scenarios.items(), results): statistics[name] = model_stats return { diff --git a/caimira/src/caimira/calculator/validators/form_validator.py b/caimira/src/caimira/calculator/validators/form_validator.py index db45ec33..2d378d65 100644 --- a/caimira/src/caimira/calculator/validators/form_validator.py +++ b/caimira/src/caimira/calculator/validators/form_validator.py @@ -6,7 +6,6 @@ import ast import json import re -from collections import defaultdict import numpy as np from .defaults import DEFAULTS, NO_DEFAULT, COFFEE_OPTIONS_INT diff --git a/caimira/src/caimira/calculator/validators/virus/virus_validator.py b/caimira/src/caimira/calculator/validators/virus/virus_validator.py index cd934716..a4160a40 100644 --- a/caimira/src/caimira/calculator/validators/virus/virus_validator.py +++ b/caimira/src/caimira/calculator/validators/virus/virus_validator.py @@ -245,7 +245,7 @@ class VirusFormData(FormData): return models.Room(volume=volume, inside_temp=models.PiecewiseConstant((0, 24), (inside_temp,)), humidity=humidity) # type: ignore - def build_mc_model(self) -> typing.Union[mc.ExposureModel, mc.ExposureModelGroup]: + def build_mc_model(self) -> mc.ExposureModelGroup: size = self.data_registry.monte_carlo['sample_size'] room: models.Room = self.initialize_room() @@ -295,30 +295,31 @@ class VirusFormData(FormData): exposed=exposed_population, geographical_data=geographical_data, exposed_to_short_range=self.short_range_occupants, + identifier=exposure_group, ) exposure_model_set.append(exposure_model) - if len(list(self.dynamic_exposed_occupancy.keys())) == 1: - return exposure_model_set[0] - else: - return mc.ExposureModelGroup( - data_registry=self.data_registry, - exposure_models=[individual_model.build_model(size) for individual_model in exposure_model_set] - ) + return mc.ExposureModelGroup( + data_registry=self.data_registry, + exposure_models=[individual_model.build_model(size) for individual_model in exposure_model_set] + ) elif self.occupancy_format == 'static': exposed_population = self.exposed_population() short_range_tuple = tuple(item for sublist in short_range.values() for item in sublist) - return mc.ExposureModel( + return mc.ExposureModelGroup( data_registry=self.data_registry, - concentration_model=concentration_model, - short_range=short_range_tuple, - exposed=exposed_population, - geographical_data=geographical_data, - exposed_to_short_range=self.short_range_occupants, + exposure_models = [mc.ExposureModel( + data_registry=self.data_registry, + concentration_model=concentration_model, + short_range=short_range_tuple, + exposed=exposed_population, + geographical_data=geographical_data, + exposed_to_short_range=self.short_range_occupants, + ).build_model(size)] ) - def build_model(self, sample_size=None) -> typing.Union[models.ExposureModel, models.ExposureModelGroup]: + def build_model(self, sample_size=None) -> models.ExposureModelGroup: size = self.data_registry.monte_carlo['sample_size'] if not sample_size else sample_size return self.build_mc_model().build_model(size=size) diff --git a/caimira/tests/apps/calculator/test_model_generator.py b/caimira/tests/apps/calculator/test_model_generator.py index f90f00ac..b33d7363 100644 --- a/caimira/tests/apps/calculator/test_model_generator.py +++ b/caimira/tests/apps/calculator/test_model_generator.py @@ -17,7 +17,7 @@ from caimira.calculator.store.data_registry import DataRegistry def test_model_from_dict(baseline_form_data, data_registry): form = virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry) - assert isinstance(form.build_model(), models.ExposureModel) + assert isinstance(form.build_model(), models.ExposureModelGroup) def test_model_from_dict_invalid(baseline_form_data, data_registry): diff --git a/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py b/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py index e6ed1200..c3964204 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py +++ b/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py @@ -123,8 +123,8 @@ class VirusReportGenerator: data_registry_version: typing.Optional[str] = f"v{model.data_registry.version}" if model.data_registry.version else None # Alternative scenarios data - alternative_scenarios: typing.Dict[str,typing.Any] = alternative_scenarios_data(form, report_data, executor_factory) - context.update(alternative_scenarios) + if form.occupancy_format == 'static': + context.update(alternative_scenarios_data(form, report_data, executor_factory)) # Alternative viral load data if form.conditional_probability_viral_loads: diff --git a/cern_caimira/src/cern_caimira/apps/calculator/static/css/report.css b/cern_caimira/src/cern_caimira/apps/calculator/static/css/report.css index dd111aa5..f42d7a3a 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/static/css/report.css +++ b/cern_caimira/src/cern_caimira/apps/calculator/static/css/report.css @@ -126,13 +126,13 @@ p.notes { padding: 15px; page-break-inside: avoid; } - #button_full_exposure, #button_hide_high_concentration { + #button_full_exposure-group_static, #button_hide_high_concentration-group_static { display: none!important; } - #long_range_cumulative_checkbox, #lr_cumulative_checkbox_label { + #long_range_cumulative_checkbox-group_static, #lr_cumulative_checkbox_label-group_static { display: none!important; } - #button_alternative_full_exposure, #button_alternative_hide_high_concentration { + #button_alternative_full_exposure-group_static, #button_alternative_hide_high_concentration-group_static { display: none!important; } #export-csv { diff --git a/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js b/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js index f574ebb5..09e53b03 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js +++ b/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js @@ -301,6 +301,7 @@ function displayFittingData(json_response) { // Not needed for the form submission delete json_response["CO2_plot_img"]; delete json_response["predictive_CO2"]; + delete json_response["CO2_plot_data"]; // Convert nulls to empty strings in the JSON response if (json_response["room_capacity"] === null) json_response["room_capacity"] = ''; if (json_response["ventilation_lsp_values"] === null) json_response["ventilation_lsp_values"] = ''; diff --git a/cern_caimira/src/cern_caimira/apps/calculator/static/js/report.js b/cern_caimira/src/cern_caimira/apps/calculator/static/js/report.js index 20a3be07..bfd9a43c 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/static/js/report.js +++ b/cern_caimira/src/cern_caimira/apps/calculator/static/js/report.js @@ -4,13 +4,18 @@ function on_report_load(conditional_probability_viral_loads) { } /* Generate the concentration plot using d3 library. */ -function draw_plot(svg_id) { +function draw_plot(svg_id, group_id, times, concentrations_zoomed, + concentrations, cumulative_doses, long_range_cumulative_doses, + exposed_presence_intervals, short_range_interactions) { // Used for controlling the short-range interactions - let button_full_exposure = document.getElementById("button_full_exposure"); - let button_hide_high_concentration = document.getElementById("button_hide_high_concentration"); - let long_range_checkbox = document.getElementById('long_range_cumulative_checkbox') - let show_sr_legend = short_range_expirations?.length > 0; + let button_full_exposure = document.getElementById(`button_full_exposure-group_${group_id}`); + let button_hide_high_concentration = document.getElementById(`button_hide_high_concentration-group_${group_id}`); + let long_range_checkbox = document.getElementById(`long_range_cumulative_checkbox-group_${group_id}`); + let show_sr_legend = short_range_interactions.length > 0; + + let short_range_intervals = short_range_interactions.map((interaction) => interaction["presence_interval"]); + let short_range_expirations = short_range_interactions.map((interaction) => interaction["expiration"]); var data_for_graphs = { 'concentrations': [], @@ -521,12 +526,12 @@ function draw_plot(svg_id) { } // Draw for the first time to initialize. - redraw(); + redraw(svg_id); update_concentration_plot(concentrations, cumulative_doses); // Redraw based on the new size whenever the browser window is resized. window.addEventListener("resize", e => { - redraw(); + redraw(svg_id); if (button_full_exposure && button_full_exposure.disabled) update_concentration_plot(concentrations, cumulative_doses); else update_concentration_plot(concentrations_zoomed, long_range_cumulative_doses) }); @@ -536,12 +541,13 @@ function draw_plot(svg_id) { // 'list_of_scenarios' is a dictionary with all the scenarios // 'times' is a list of times for all the scenarios function draw_generic_concentration_plot( - plot_svg_id, + svg_id, + times, y_axis_label, h_lines, ) { - if (plot_svg_id === 'CO2_concentration_graph') { + if (svg_id === 'CO2_concentration_graph') { list_of_scenarios = {'CO₂ concentration': {'concentrations': CO2_concentrations}}; min_y_axis_domain = 400; } @@ -575,7 +581,7 @@ function draw_generic_concentration_plot( var first_scenario = Object.values(data_for_scenarios)[0] // Add main SVG element - var plot_div = document.getElementById(plot_svg_id); + var plot_div = document.getElementById(svg_id); var vis = d3.select(plot_div).append('svg'); var xRange = d3.scaleTime().domain([first_scenario[0].hour, first_scenario[first_scenario.length - 1].hour]); @@ -706,7 +712,7 @@ function draw_generic_concentration_plot( } function update_concentration_plot(concentration_data) { - list_of_scenarios = (plot_svg_id === 'CO2_concentration_graph') ? {'CO₂ concentration': {'concentrations': CO2_concentrations}} : alternative_scenarios + list_of_scenarios = (svg_id === 'CO2_concentration_graph') ? {'CO₂ concentration': {'concentrations': CO2_concentrations}} : alternative_scenarios var highest_concentration = 0. for (scenario in list_of_scenarios) { @@ -739,11 +745,11 @@ function draw_generic_concentration_plot( var graph_width; var graph_height; - function redraw() { + function redraw(svg_id) { // Define width and height according to the screen size. Always use an already defined - var window_width = document.getElementById('concentration_plot').clientWidth; + var window_width = document.getElementById(svg_id).clientWidth; var div_width = window_width; - var div_height = document.getElementById('concentration_plot').clientHeight; + var div_height = document.getElementById(svg_id).clientHeight; graph_width = div_width; graph_height = div_height; var margins = { top: 30, right: 20, bottom: 50, left: 60 }; @@ -882,12 +888,12 @@ function draw_generic_concentration_plot( } // Draw for the first time to initialize. - redraw(); + redraw(svg_id); update_concentration_plot('concentrations'); // Redraw based on the new size whenever the browser window is resized. window.addEventListener("resize", e => { - redraw(); + redraw(svg_id); update_concentration_plot('concentrations'); }); } diff --git a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 index 77f26f90..79e99b2e 100644 --- a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 +++ b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 @@ -455,8 +455,8 @@

    {# "static" vs. "dynamic" #} - - + +
    diff --git a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 index f106ac01..ac098ec1 100644 --- a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 +++ b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 @@ -54,356 +54,361 @@
    - {% set long_range_prob_inf = prob_inf %} - {% set long_range_expected_cases = expected_new_cases %} - - {# Update values if short range option is "short_range_yes" #} - {% if form.short_range_option == "short_range_yes" %} - {% set scenario = alternative_scenarios.stats.values() | first %} - {# Probability of infection values #} - {% set long_range_prob_inf = scenario.probability_of_infection %} - {# Expected new case values #} - {% set long_range_expected_cases = scenario.expected_new_cases %} - - {% if form.exposure_option == 'p_probabilistic_exposure' %} - {% set long_range_prob_probabilistic_exposure = scenario.prob_probabilistic_exposure %} + {% for group_id, group_results in groups.items() %} + {% if group_results.get('long_range_prob_inf') %}{{group_results.long_range_prob_inf}}{% endif %} + {% set long_range_prob_inf = group_results.prob_inf %} + {% set long_range_expected_cases = group_results.expected_new_cases %} + {# Update values if short range option is "short_range_yes" #} + {% if group_results.get('long_range_prob') %} + {# Probability of infection values #} + {% set long_range_prob_inf = group_results.long_range_prob %} + {# Expected new case values #} + {% set long_range_expected_cases = group_results.long_range_expected_new_cases %} + + {% if form.exposure_option == 'p_probabilistic_exposure' %} + {% set long_range_prob_probabilistic_exposure = scenario.prob_probabilistic_exposure %} + {% endif %} {% endif %} - {% endif %} - - {% block report_results %} -
    -
    Results - -
    -
    -
    -

    -

    -
    -
    -
    - Probability of infection (%)
    - {% if form.short_range_option == "short_range_yes" %} - Without short-range interactions - {% endif %} -
    -
    - -
    - {% block long_range_warning_animation %} -
    - {{long_range_prob_inf | non_zero_percentage}} - - - - -
    - {% endblock long_range_warning_animation %} -
    -
    Expected new cases: {{ long_range_expected_cases | float_format }}
    -
    -
    - {% if form.short_range_option == "short_range_yes" %} -
    -
    - Probability of infection (%)
    - With short-range interactions -
    -
    - -
    - {% block warning_animation %} -
    - {{prob_inf | non_zero_percentage}} - - - - -
    - {% endblock warning_animation %} -
    -
    Expected new cases: {{ expected_new_cases | float_format }}
    -
    - {% endif %} -
    - {% block report_summary %} -
    - - {% if form.short_range_option == "short_range_yes" %} -
    - - {% endif %} - {% block probabilistic_exposure_probability %} - {% if form.exposure_option == "p_probabilistic_exposure" %} -
    - - {% endif %} - {% endblock probabilistic_exposure_probability %} -
    - {% endblock report_summary %} -
    -
    -
    - {% block report_summary_footnote %} - {% endblock report_summary_footnote %} -
    -

    * The results are based on the parameters and assumptions published in the CARA publication: doi.org/10.1098/rsfs.2021.0076.


    - {% if form.short_range_option == "short_range_yes" %} - {% if 'Speaking' in form.short_range_interactions|string or 'Shouting' in form.short_range_interactions|string %} - - - {% endif %} - - {% endif %} -
    - - IRP - Infectious Respiratory Particles. -

    -
    -
    -
    -
    -
    Result uncertainties - -
    -
    -
    -
    -
    - -
    - {% if model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] == 'Ref: Viral load - covid overal viral load data' %} -
    - - -
    - {% endif %} - {% if form.conditional_probability_viral_loads %} -
    - -
    -

    (i)   Predictive probability of infection for a given value of the viral load

    -

    (ii)  Histogram of the viral load data

    -

    (iii) Histogram of the conditional probability of infection (result of total predictive probability in the middle)

    -
    -
    - {% endif %} -
    -
    -
    -
    - -
    -
    Predictive CO₂ Concentration Profile - -
    -
    -
    -
    -
    - -
    -
    -
    -
    - - {% if form.short_range_option == "short_range_no" %} -
    -
    Alternative scenarios -
    -
    +
    -
    -
    +

    +

    +
    +
    +
    + Probability of infection (%)
    + {% if group_results.get('long_range_prob') %} + Without short-range interactions + {% endif %} +
    +
    + +
    + {% block long_range_warning_animation scoped %} +
    + {{ long_range_prob_inf | non_zero_percentage }} + + + + +
    + {% endblock long_range_warning_animation %} +
    +
    Expected new cases: {{ long_range_expected_cases | float_format }}
    +
    +
    + {% if group_results.get('long_range_prob') %} +
    +
    + Probability of infection (%)
    + With short-range interactions +
    +
    + +
    + {% block warning_animation scoped %} +
    + {{ group_results.prob_inf | non_zero_percentage }} + + + + +
    + {% endblock warning_animation %} +
    +
    Expected new cases: {{ group_results.expected_new_cases | float_format }}
    +
    + {% endif %} +
    + {% block report_summary scoped %} +
    + + {% if group_results.get('long_range_prob') %} +
    + + {% endif %} + {% block probabilistic_exposure_probability %} + {% if form.exposure_option == "p_probabilistic_exposure" %} +
    + + {% endif %} + {% endblock probabilistic_exposure_probability %} +
    + {% endblock report_summary %} +
    +
    +
    + {% block report_summary_footnote %} + {% endblock report_summary_footnote %} +
    +

    * The results are based on the parameters and assumptions published in the CARA publication: doi.org/10.1098/rsfs.2021.0076.


    + {% if group_results.get('long_range_prob') %} + {% if form.short_range_option == 'short_range_yes' and ('Speaking' in form.short_range_interactions[group_id]|string or 'Shouting' in form.short_range_interactions[group_id]|string) %} + + + {% endif %} + + + {% endif %} +
    -
    - {% block report_scenarios_summary_table %} - - - - - - {% if form.occupancy_format == "static" %}{% endif %} - - - - {% for scenario_name, scenario_stats in alternative_scenarios.stats.items() %} - - - - {% if form.occupancy_format == "static" %}{% endif %} - - {% endfor %} - -
    ScenarioP(I)Expected new cases
    {{ scenario_name }} {{ scenario_stats.probability_of_infection | non_zero_percentage }}{{ scenario_stats.expected_new_cases | float_format }}
    - {% endblock report_scenarios_summary_table %} -
    -
    -

    Notes for alternative scenarios:
    -

      -
    1. This graph shows the concentration of infectious quanta in the air. The filtration of Type I and FFP2 masks, if worn, applies not only to the emission rate but also to the individual exposure (i.e. inhalation). - For this reason, scenarios with different types of mask will show the same concentration on the graph but have different absorbed doses and infection probabilities.
    2. -
    3. If you have selected more sophisticated options, such as HEPA filtration or FFP2 masks, this will be indicated in the plot as the "base scenario", representing the inputs inserted in the form.
      - The other alternative scenarios shown for comparison will not include either HEPA filtration or FFP2 masks.
    4. -
    -
    + IRP - Infectious Respiratory Particles.

    - {% endif %} - - {% endblock report_results %} - - -