diff --git a/CHANGELOG.md b/CHANGELOG.md index 59daacef..4d693bf6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,19 @@ +# 4.18.0 (July 24, 2025) + +## Feature Added +- Dynamic occupancy: groups of exposed + infected may now be defined. +Model and calculator updated, and tests added. Documentation still to +be updated. User interface mostly unchanged (the new feature is not +visible there). + +## Bug Fixes +- Fix in profiler to adapt to a non back-compatible change in pyinstrument +package. +- Type ignored in the expert app. + +## Other +- Update of mypy, pytest-mypy and pyinstrument dependencies (version). + # 4.17.8 (March 13, 2025) ## Bug Fixes diff --git a/caimira/pyproject.toml b/caimira/pyproject.toml index aff4eb9b..891eb53d 100644 --- a/caimira/pyproject.toml +++ b/caimira/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "caimira" -version = "4.17.8" +version = "4.18.0" description = "CAiMIRA - CERN Airborne Model for Indoor Risk Assessment" readme = "README.md" license = { text = "Apache-2.0" } @@ -23,7 +23,7 @@ dependencies = [ "mistune", "numpy", "pandas", - "pyinstrument", + "pyinstrument >= 5.0.3", "python-dateutil", "requests", "retry", @@ -39,8 +39,8 @@ dependencies = [ dev = [] test = [ "pytest", - "pytest-mypy >= 0.10.3", - "mypy >= 1.0.0", + "pytest-mypy >= 1.0.1", + "mypy >= 1.17.0", "pytest-tornasync", "types-dataclasses", "types-python-dateutil", diff --git a/caimira/src/caimira/api/controller/virus_report_controller.py b/caimira/src/caimira/api/controller/virus_report_controller.py index 4375cc35..c6e95d96 100644 --- a/caimira/src/caimira/api/controller/virus_report_controller.py +++ b/caimira/src/caimira/api/controller/virus_report_controller.py @@ -31,6 +31,9 @@ def submit_virus_form(form_data: typing.Dict, report_generation_parallelism: typ report_data: typing.Dict = generate_report(form_obj=form_obj, report_generation_parallelism=report_generation_parallelism) # Handle model representation - if report_data['model']: report_data['model'] = repr(report_data['model']) + if report_data['model']: + report_data['model'] = repr(report_data['model']) + for single_group_output in report_data['groups'].values(): + del single_group_output['model'] # Model representation per group not needed return report_data diff --git a/caimira/src/caimira/calculator/models/models.py b/caimira/src/caimira/calculator/models/models.py index 8a250907..e19b1ee0 100644 --- a/caimira/src/caimira/calculator/models/models.py +++ b/caimira/src/caimira/calculator/models/models.py @@ -666,7 +666,7 @@ class Particle: # deposition fraction depends on aerosol particle diameter. d = (self.diameter * evaporation_factor) IFrac = 1 - 0.5 * (1 - (1 / (1 + (0.00076*(d**2.8))))) - fdep = IFrac * (0.0587 + fdep = IFrac * (0.0587 # type: ignore + (0.911/(1 + np.exp(4.77 + 1.485 * np.log(d)))) + (0.943/(1 + np.exp(0.508 - 2.58 * np.log(d))))) # type: ignore return fdep @@ -711,6 +711,9 @@ class Expiration(_ExpirationBase): # to c_n,i in Eq. (4) of https://doi.org/10.1101/2021.10.14.21264988) cn: float = 1. + #: Expiration name + name: typing.Optional[str] = None + @property def particle(self) -> Particle: """ @@ -799,7 +802,7 @@ Activity.types = { @dataclass(frozen=True) class SimplePopulation: """ - Represents a group of people all with exactly the same behaviour and + Represents a group of people all with exactly the same behavior and situation. """ @@ -844,7 +847,7 @@ class SimplePopulation: @dataclass(frozen=True) class Population(SimplePopulation): """ - Represents a group of people all with exactly the same behaviour and + Represents a group of people all with exactly the same behavior and situation, considering the usage of mask and a certain host immunity. """ @@ -1313,7 +1316,7 @@ class ShortRangeModel: data_registry: DataRegistry #: Expiration type - expiration: _ExpirationBase + expiration: Expiration #: Activity type activity: Activity @@ -1639,6 +1642,9 @@ class ExposureModel: #: Total people with short-range interactions exposed_to_short_range: int = 0 + #: Unique group identifier + identifier: str = 'group_1' + #: The number of times the exposure event is repeated (default 1). @property def repeats(self) -> int: @@ -1653,6 +1659,9 @@ class ExposureModel: In other words, the air exchange rate from the ventilation, and the virus decay constant, must not be given as arrays. + + It also checks that the number of exposed is + static during the simulation time. """ c_model = self.concentration_model # Check if the diameter is vectorised. @@ -1663,6 +1672,11 @@ class ExposureModel: c_model.ventilation.air_exchange(c_model.room, time)) for time in c_model.state_change_times()))): raise ValueError("If the diameter is an array, none of the ventilation parameters " "or virus decay constant can be arrays at the same time.") + + # Check if exposed population is static + if not isinstance(self.exposed.number, int) or not isinstance(self.exposed.presence, Interval): + raise TypeError("The exposed number must be an int and presence an Interval. " + f"Got {type(self.exposed.number)} and {type(self.exposed.presence)}.") @method_cache def population_state_change_times(self) -> typing.List[float]: @@ -1809,11 +1823,9 @@ class ExposureModel: The number of virus per m^3 deposited on the respiratory tract. """ population_change_times = self.population_state_change_times() - deposited_exposure = [] for start, stop in zip(population_change_times[:-1], population_change_times[1:]): deposited_exposure.append(self.deposited_exposure_between_bounds(start, stop)) - return deposited_exposure def deposited_exposure(self) -> _VectorisedFloat: @@ -1838,8 +1850,7 @@ class ExposureModel: return (1 - np.prod([1 - prob for prob in self._infection_probability_list()], axis = 0)) * 100 def total_probability_rule(self) -> _VectorisedFloat: - if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or - isinstance(self.exposed.number, IntPiecewiseConstant)): + if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant)): raise NotImplementedError("Cannot compute total probability " "(including incidence rate) with dynamic occupancy") @@ -1847,9 +1858,9 @@ class ExposureModel: sum_probability = 0.0 # Create an equivalent exposure model but changing the number of infected cases. - total_people = self.concentration_model.infected.number + self.exposed.number + total_people = self.concentration_model.infected.number + self.exposed.number # type: ignore max_num_infected = (total_people if total_people < 10 else 10) - # The influence of a higher number of simultainious infected people (> 4 - 5) yields an almost negligible contirbution to the total probability. + # The influence of a higher number of simultaneous infected people (> 4 - 5) yields an almost negligible contribution to the total probability. # To be on the safe side, a hard coded limit with a safety margin of 2x was set. # Therefore we decided a hard limit of 10 infected people. for num_infected in range(1, max_num_infected + 1): @@ -1872,43 +1883,81 @@ class ExposureModel: 1) Long-range exposure: take the infection_probability and multiply by the occupants exposed to long-range. 2) Short- and long-range exposure: take the infection_probability of long-range multiplied by the occupants exposed to long-range only, plus the infection_probability of short- and long-range multiplied by the occupants exposed to short-range only. - - Currently disabled when dynamic occupancy is defined for the exposed population. """ - - if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or - isinstance(self.exposed.number, IntPiecewiseConstant)): - raise NotImplementedError("Cannot compute expected new cases " - "with dynamic occupancy") - + number = self.exposed.number if self.short_range != (): - new_cases_long_range = nested_replace(self, {'short_range': [],}).infection_probability() * (self.exposed.number - self.exposed_to_short_range) + new_cases_long_range = nested_replace(self, {'short_range': [],}).infection_probability() * (number - self.exposed_to_short_range) # type: ignore return (new_cases_long_range + (self.infection_probability() * self.exposed_to_short_range)) / 100 - return self.infection_probability() * self.exposed.number / 100 + return self.infection_probability() * number / 100 def reproduction_number(self) -> _VectorisedFloat: """ The reproduction number can be thought of as the expected number of cases directly generated by one infected case in a population. - - Currently disabled when dynamic occupancy is defined for both the infected and exposed population. """ - - if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or - isinstance(self.exposed.number, IntPiecewiseConstant)): - raise NotImplementedError("Cannot compute reproduction number " - "with dynamic occupancy") - - if self.concentration_model.infected.number == 1: + infected_population: InfectedPopulation = self.concentration_model.infected + if isinstance(infected_population.number, int) and infected_population.number == 1: return self.expected_new_cases() # Create an equivalent exposure model but with precisely - # one infected case. + # one infected case, respecting the presence interval. single_exposure_model = nested_replace( self, { - 'concentration_model.infected.number': 1} + 'concentration_model.infected.number': 1, + 'concentration_model.infected.presence': infected_population.presence_interval(), + } ) - return single_exposure_model.expected_new_cases() - \ No newline at end of file + + +@dataclass(frozen=True) +class ExposureModelGroup: + """ + Represents a group of exposure models. This is to handle the case + when different groups of people come and go in the room at different + times. These groups are then handled fully independently, with + exposure dose and probability of infection defined for each of them. + """ + data_registry: DataRegistry + + #: The set of exposure models for each exposed population + exposure_models: typing.Tuple[ExposureModel, ...] + + def __post_init__(self): + """ + Validate that all ExposureModels have the same ConcentrationModel. + """ + first_concentration_model = self.exposure_models[0].concentration_model + for model in self.exposure_models[1:]: + # Check that the number of infected people and their presence is the same + if (model.concentration_model.infected.number != first_concentration_model.infected.number or + model.concentration_model.infected.presence != first_concentration_model.infected.presence): + raise ValueError("All ExposureModels must have the same infected number and presence in the ConcentrationModel.") + + @method_cache + def _deposited_exposure_list(self) -> typing.List[_VectorisedFloat]: + """ + List of doses absorbed by each member of the groups. + """ + return [model.deposited_exposure() for model in self.exposure_models] + + @method_cache + def _infection_probability_list(self): + """ + List of the probability of infection for each group. + """ + return [model.infection_probability() for model in self.exposure_models] # type: ignore + + def expected_new_cases(self) -> _VectorisedFloat: + """ + Final expected number of new cases considering the + contribution of each individual probability of infection. + """ + return np.sum([model.expected_new_cases() for model in self.exposure_models], axis=0) # type: ignore + + def reproduction_number(self) -> _VectorisedFloat: + """ + Expected number of cases when there is only one infected case. + """ + return np.sum([model.reproduction_number() for model in self.exposure_models], axis=0) # type: ignore diff --git a/caimira/src/caimira/calculator/models/monte_carlo/data.py b/caimira/src/caimira/calculator/models/monte_carlo/data.py index 88e7898e..ca62dfba 100644 --- a/caimira/src/caimira/calculator/models/monte_carlo/data.py +++ b/caimira/src/caimira/calculator/models/monte_carlo/data.py @@ -365,6 +365,7 @@ def expiration_distribution( BLO_factors, d_min=0.1, d_max=30., + exp_type=None, ): """ Returns an Expiration with an aerosol diameter distribution, defined @@ -382,6 +383,7 @@ def expiration_distribution( kernel_bandwidth=0.1, ), cn=BLOmodel(data_registry, BLO_factors).integrate(d_min, d_max), + name=exp_type, ) @@ -432,7 +434,8 @@ def short_range_expiration_distributions(data_registry): data_registry=data_registry, BLO_factors=BLO_factors, d_min=param_evaluation(data_registry.expiration_particle['particle_size_range']['short_range'], 'minimum_diameter'), - d_max=param_evaluation(data_registry.expiration_particle['particle_size_range']['short_range'], 'maximum_diameter') + d_max=param_evaluation(data_registry.expiration_particle['particle_size_range']['short_range'], 'maximum_diameter'), + exp_type=exp_type, ) for exp_type, BLO_factors in expiration_BLO_factors(data_registry).items() } diff --git a/caimira/src/caimira/calculator/models/monte_carlo/models.py b/caimira/src/caimira/calculator/models/monte_carlo/models.py index 1c7c4a6e..2caddbaa 100644 --- a/caimira/src/caimira/calculator/models/monte_carlo/models.py +++ b/caimira/src/caimira/calculator/models/monte_carlo/models.py @@ -74,7 +74,6 @@ def _build_mc_model(model: dataclass_instance) -> typing.Type[MCModelBase[_Model elif new_field.type == typing.Tuple[models.SpecificInterval, ...]: SI = getattr(sys.modules[__name__], "SpecificInterval") field_type = typing.Tuple[typing.Union[models.SpecificInterval, SI], ...] - elif new_field.type == typing.Union[int, models.IntPiecewiseConstant]: IPC = getattr(sys.modules[__name__], "IntPiecewiseConstant") field_type = typing.Union[int, models.IntPiecewiseConstant, IPC] diff --git a/caimira/src/caimira/calculator/models/profiler.py b/caimira/src/caimira/calculator/models/profiler.py index 8f3c26a0..88f22beb 100644 --- a/caimira/src/caimira/calculator/models/profiler.py +++ b/caimira/src/caimira/calculator/models/profiler.py @@ -31,7 +31,7 @@ class Profilers(Enum): class PyInstrumentWrapper: - profiler = PyInstrumentProfiler(async_mode=True) + profiler = PyInstrumentProfiler(async_mode='enabled') @property def is_running(self): diff --git a/caimira/src/caimira/calculator/report/virus_report_data.py b/caimira/src/caimira/calculator/report/virus_report_data.py index 2071fb2a..f1fe1fb0 100644 --- a/caimira/src/caimira/calculator/report/virus_report_data.py +++ b/caimira/src/caimira/calculator/report/virus_report_data.py @@ -5,18 +5,32 @@ import io import typing import numpy as np import matplotlib.pyplot as plt +from collections import defaultdict from caimira.calculator.models import models, dataclass_utils, profiler, monte_carlo as mc from caimira.calculator.models.enums import ViralLoads from caimira.calculator.validators.virus.virus_validator import VirusFormData -def model_start_end(model: models.ExposureModel): - t_start = min(model.exposed.presence_interval().boundaries()[0][0], +def model_start_end(model: typing.Union[models.ExposureModelGroup, models.ExposureModel]): + """ + Calculates the boundary times for an ExposureModel or ExposureModelGroup. + + For a single ExposureModel, determines its boundary times by comparing + the presence intervals of both the exposed and the infected people. + For an ExposureModelGroup, finds the earliest start time and the latest end time + across all models in the group. + """ + if isinstance(model, models.ExposureModelGroup): + t_start = min((model_start_end(nth_model)[0] for nth_model in model.exposure_models)) + t_end = max((model_start_end(nth_model)[1] for nth_model in model.exposure_models)) + return t_start, t_end + else: + t_start = min(model.exposed.presence_interval().boundaries()[0][0], model.concentration_model.infected.presence_interval().boundaries()[0][0]) - t_end = max(model.exposed.presence_interval().boundaries()[-1][1], + t_end = max(model.exposed.presence_interval().boundaries()[-1][1], model.concentration_model.infected.presence_interval().boundaries()[-1][1]) - return t_start, t_end + return t_start, t_end def fill_big_gaps(array, gap_size): @@ -42,7 +56,7 @@ def fill_big_gaps(array, gap_size): return result -def non_temp_transition_times(model: models.ExposureModel): +def non_temp_transition_times(model: typing.Union[models.ExposureModelGroup, models.ExposureModel]): """ Return the non-temperature (and PiecewiseConstant) based transition times. @@ -63,7 +77,7 @@ def non_temp_transition_times(model: models.ExposureModel): t_start, t_end = model_start_end(model) change_times = {t_start, t_end} - for name, obj in walk_model(model, name="exposure"): + for _, obj in walk_model(model, name="exposure"): if isinstance(obj, models.Interval): change_times |= obj.transition_times() @@ -72,7 +86,8 @@ def non_temp_transition_times(model: models.ExposureModel): return sorted(time for time in change_times if (t_start <= time <= t_end)) -def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional[int] = None) -> typing.List[float]: +def interesting_times(model: typing.Union[models.ExposureModelGroup, models.ExposureModel], + approx_n_pts: typing.Optional[int] = None) -> typing.List[float]: """ Pick approximately ``approx_n_pts`` time points which are interesting for the given model. If not provided by argument, ``approx_n_pts`` is set to be 15 times @@ -94,126 +109,220 @@ def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional return nice_times -def concentrations_with_sr_breathing(form: VirusFormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]: - lower_concentrations = [] - for time in times: - for index, (start, stop) in enumerate(short_range_intervals): - # For visualization issues, add short-range breathing activity to the initial long-range concentrations - if start <= time <= stop and form.short_range_interactions[index]['expiration'] == 'Breathing': - lower_concentrations.append( - np.array(model.concentration(float(time))).mean()) - break - lower_concentrations.append( - np.array(model.concentration_model.concentration(float(time))).mean()) - return lower_concentrations +def _concentrations_with_sr_breathing(form: VirusFormData, model: models.ExposureModel, + time: float, fn_name: typing.Optional[str] = None): + """ + Returns the zoomed viral concentrations. + """ + for index, (start, stop) in enumerate([interaction.presence.boundaries()[0] for interaction in model.short_range]): + if start <= time <= stop and form.short_range_interactions[model.identifier][index]['expiration'] == 'Breathing': + return np.array(model.concentration(float(time))).mean(), fn_name + return np.array(model.concentration_model.concentration(float(time))).mean(), fn_name -def _calculate_deposited_exposure(model, time1, time2, fn_name=None): +def _calculate_deposited_exposure(model: models.ExposureModel, + time1: float, time2: float, fn_name: typing.Optional[str] = None): return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name -def _calculate_long_range_deposited_exposure(model, time1, time2, fn_name=None): +def _calculate_long_range_deposited_exposure(model: models.ExposureModel, + time1: float, time2: float, fn_name: typing.Optional[str] = None): return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name -def _calculate_co2_concentration(CO2_model, time, fn_name=None): +def _calculate_concentration(model: models.ExposureModel, + time: float, fn_name: typing.Optional[str] = None): + """ + Returns the concentration of viruses emitted by + the infected population. Short- and long-range included. + """ + return np.array(model.concentration(float(time))).mean(), fn_name + + +def _calculate_co2_concentration(CO2_model: models.CO2ConcentrationModel, time: float, fn_name: typing.Optional[str] = None): + """ + Returns the CO2 concentration emitted by all + the present population. + """ return np.array(CO2_model.concentration(float(time))).mean(), fn_name -@profiler.profile -def calculate_report_data(form: VirusFormData, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]: - model: models.ExposureModel = form.build_model() +def merge_intervals(intervals: typing.List[typing.List[float]]) -> typing.List[typing.List[float]]: + """ + Merges overlapping intervals from a list of intervals. + Assumes intervals are sorted based on start times. + """ + if not intervals: + return [] + + merged = [list(intervals[0])] + for start, end in intervals[1:]: + if merged[-1][1] < start: + merged.append([start, end]) + else: + merged[-1][1] = max(merged[-1][1], end) + return merged + + +def merge_short_range_interactions(all_exposed_groups: typing.Dict[str, typing.Any]) -> typing.List[typing.Dict[str, typing.Any]]: + """ + Expands the short range interactions per exposed group to a single data structure. + """ + merged_interactions = defaultdict(list) + for group in all_exposed_groups.values(): + for interaction in group["short_range_interactions"]: + merged_interactions[interaction["expiration"]].extend(interaction["presence_interval"]) - times = interesting_times(model) - short_range_intervals = [interaction.presence.boundaries()[0] - for interaction in model.short_range] - short_range_expirations = [interaction['expiration'] - for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else [] - - concentrations = [ - np.array(model.concentration(float(time))).mean() - for time in times + # Merge and sort intervals + return [ + {"expiration": exp, "presence_interval": merge_intervals(sorted(intervals, key=lambda x: x[0]))} + for exp, intervals in merged_interactions.items() ] - lower_concentrations = concentrations_with_sr_breathing( - form, model, times, short_range_intervals) + +def group_results(form: VirusFormData, model_group: models.ExposureModelGroup) -> typing.Dict[str, typing.Any]: + """ + Generates the output per group of exposure models. + """ + groups: dict = defaultdict(dict) + for single_group in model_group.exposure_models: + # Probability of infection + prob = single_group.infection_probability() + prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True) + + # Expected new cases + expected_new_cases = np.array(single_group.expected_new_cases()) + + groups[single_group.identifier] = { + "model": single_group, + "prob_inf": prob.mean(), + "prob_inf_sd": prob.std(), + "prob_dist": list(prob), + "prob_hist_count": list(prob_dist_count), + "prob_hist_bins": list(prob_dist_bins), + "expected_new_cases": expected_new_cases.mean(), + "exposed_presence_intervals": list(single_group.exposed.presence_interval().boundaries()), + } + + # In case of conditional probability plot + if (form.conditional_probability_viral_loads and + single_group.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] == ViralLoads.COVID_OVERALL.value): # type: ignore + conditional_probability_data = manufacture_conditional_probability_data(single_group, prob) + groups[single_group.identifier].update({ + "conditional_probability_data": conditional_probability_data, + "uncertainties_plot_src": img2base64(_figure2bytes(uncertainties_plot(prob, conditional_probability_data))) + }) + + # Probabilistic exposure + if form.exposure_option == "p_probabilistic_exposure": + groups[single_group.identifier].update({ + "prob_probabilistic_exposure": np.array(single_group.total_probability_rule()).mean() + }) + + # In case of short-range interactions + if single_group.short_range != (): + # Short range outputs + short_range_interactions: dict = defaultdict(list) + for short_range_model in single_group.short_range: + short_range_interactions[short_range_model.expiration.name].extend( + short_range_model.presence.boundaries() + ) + + long_range_single_group = dataclass_utils.nested_replace( + single_group, {'short_range': ()} + ) + groups[single_group.identifier].update({ + "long_range_prob": long_range_single_group.infection_probability().mean(), + "long_range_expected_new_cases": long_range_single_group.expected_new_cases().mean(), + "short_range_interactions": [ + {"expiration": expiration, "presence_interval": intervals} + for expiration, intervals in short_range_interactions.items() + ], + }) + + return groups + + +@profiler.profile +def calculate_report_data(form: VirusFormData, + executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]: + """ + Generates the simulation output data. + """ + model_group: models.ExposureModelGroup = form.build_model() + results_per_group: typing.Dict[str, typing.Any] = group_results(form, model_group) + times = interesting_times(model_group) + + # CO2 concentration CO2_model: models.CO2ConcentrationModel = form.build_CO2_model() - # compute deposited exposures and CO2 concentrations in parallel to increase performance - deposited_exposures = [] - long_range_deposited_exposures = [] + # Compute deposited exposures and virus/CO2 concentrations in parallel to increase performance + deposited_exposures = defaultdict(list) + long_range_deposited_exposures = defaultdict(list) + concentrations = defaultdict(list) + concentrations_zoomed = defaultdict(list) CO2_concentrations = [] tasks = [] with executor_factory() as executor: for time1, time2 in zip(times[:-1], times[1:]): - tasks.append(executor.submit( - _calculate_deposited_exposure, model, time1, time2, fn_name="de")) - tasks.append(executor.submit( - _calculate_long_range_deposited_exposure, model, time1, time2, fn_name="lr")) - # co2 concentration: takes each time as param, not the interval + for single_group in model_group.exposure_models: + tasks.append(executor.submit( + _calculate_deposited_exposure, single_group, time1, time2, fn_name=f"{single_group.identifier}:de")) + # virus and co2 concentration: takes each time as param, not the interval + tasks.append(executor.submit( + _calculate_concentration, single_group, time1, fn_name=f"{single_group.identifier}:cn")) + if single_group.short_range != (): + tasks.append(executor.submit( + _calculate_long_range_deposited_exposure, single_group, time1, time2, fn_name=f"{single_group.identifier}:de_lr")) + tasks.append(executor.submit( + _concentrations_with_sr_breathing, form, single_group, time1, fn_name=f"{single_group.identifier}:cn_zoomed")) + tasks.append(executor.submit( _calculate_co2_concentration, CO2_model, time1, fn_name="co2")) - # co2 concentration: calculate the last time too + + # virus and co2 concentration: calculate the last time too + for single_model in model_group.exposure_models: + tasks.append(executor.submit(_calculate_concentration, + single_model, times[-1], fn_name=f"{single_model.identifier}:cn")) + if single_group.short_range != (): + tasks.append(executor.submit(_concentrations_with_sr_breathing, + form, single_model, times[-1], fn_name=f"{single_model.identifier}:cn_zoomed")) + tasks.append(executor.submit(_calculate_co2_concentration, - CO2_model, times[-1], fn_name="co2")) - + CO2_model, times[-1], fn_name="co2")) + for task in tasks: result, fn_name = task.result() - if fn_name == "de": - deposited_exposures.append(result) - elif fn_name == "lr": - long_range_deposited_exposures.append(result) - elif fn_name == "co2": - CO2_concentrations.append(result) + if ":" in fn_name: + if fn_name.split(":")[1] == "de": + deposited_exposures[fn_name.split(':')[0]].append(result) + elif fn_name.split(":")[1] == "de_lr": + long_range_deposited_exposures[fn_name.split(':')[0]].append(result) + elif fn_name.split(":")[1] == "cn": + concentrations[fn_name.split(':')[0]].append(result) + elif fn_name.split(":")[1] == "cn_zoomed": + concentrations_zoomed[fn_name.split(':')[0]].append(result) + else: + if fn_name == "co2": + CO2_concentrations.append(result) - cumulative_doses = np.cumsum(deposited_exposures) - long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures) - - prob = np.array(model.infection_probability()) - prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True) + # Update results per group + for single_group in model_group.exposure_models: + results_per_group[single_group.identifier]["concentrations"] = concentrations[single_group.identifier] + results_per_group[single_group.identifier]["cumulative_doses"] = list(np.cumsum(deposited_exposures[single_group.identifier])) + # Calculate long_range results when short-range interactions are defined + if single_group.short_range != (): + results_per_group[single_group.identifier]["concentrations_zoomed"] = concentrations_zoomed[single_group.identifier] + results_per_group[single_group.identifier]["long_range_cumulative_doses"] = list(np.cumsum(long_range_deposited_exposures[single_group.identifier])) - # Probabilistic exposure and expected new cases (only for static occupancy) - prob_probabilistic_exposure = None - expected_new_cases = None - if form.occupancy_format == "static": - if form.exposure_option == "p_probabilistic_exposure": - prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean() - expected_new_cases = np.array(model.expected_new_cases()).mean() - - exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()] - - conditional_probability_data = None - uncertainties_plot_src = None - if (form.conditional_probability_viral_loads and - model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] == ViralLoads.COVID_OVERALL.value): # type: ignore - # Generate all the required data for the conditional probability plot - conditional_probability_data = manufacture_conditional_probability_data( - model, prob) - # Generate the matplotlib image based on the received data - uncertainties_plot_src = img2base64(_figure2bytes( - uncertainties_plot(prob, conditional_probability_data))) - return { - "model": model, + # General results across all groups + "model": model_group.exposure_models[0], "times": list(times), - "exposed_presence_intervals": exposed_presence_intervals, - "short_range_intervals": short_range_intervals, - "short_range_expirations": short_range_expirations, - "concentrations": concentrations, - "concentrations_zoomed": lower_concentrations, - "cumulative_doses": list(cumulative_doses), - "long_range_cumulative_doses": list(long_range_cumulative_doses), - "prob_inf": prob.mean(), - "prob_inf_sd": prob.std(), - "prob_dist": list(prob), - "prob_hist_count": list(prob_dist_count), - "prob_hist_bins": list(prob_dist_bins), - "prob_probabilistic_exposure": prob_probabilistic_exposure, - "expected_new_cases": expected_new_cases, - "uncertainties_plot_src": uncertainties_plot_src, "CO2_concentrations": CO2_concentrations, - "conditional_probability_data": conditional_probability_data, + # Group specific results + "groups": results_per_group, } @@ -339,9 +448,7 @@ def calculate_vl_scenarios_percentiles(model: mc.ExposureModel) -> typing.Dict[s scenarios = {} for percentil in (0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99): vl = np.quantile(viral_load, percentil) - specific_vl_scenario = dataclass_utils.nested_replace(model, - {'concentration_model.infected.virus.viral_load_in_sputum': vl} - ) + specific_vl_scenario = dataclass_utils.nested_replace(model, {'concentration_model.infected.virus.viral_load_in_sputum': vl}) scenarios[str(vl)] = np.mean( specific_vl_scenario.infection_probability()) return { @@ -350,7 +457,12 @@ def calculate_vl_scenarios_percentiles(model: mc.ExposureModel) -> typing.Dict[s def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]: - scenarios = {} + """ + Generates the data structure containing all the alternative scenarios. + It is only compatible with single group occupancy models, therefore + it returns an ExposureModel object and not an ExposureModelGroup. + """ + scenarios: typing.Dict[str, models.ExposureModelGroup] = {} if (form.short_range_option == "short_range_no"): # Two special option cases - HEPA and/or FFP2 masks. FFP2_being_worn = bool(form.mask_wearing_option == @@ -401,33 +513,39 @@ def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, m scenarios['No ventilation with Type I masks'] = with_mask_no_vent.build_mc_model() if not (form.mask_wearing_option == 'mask_off' and form.ventilation_type == 'no_ventilation'): scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model() - else: - # When dynamic occupancy is defined, the replace of total people is useless - the expected number of new cases is not calculated. - if form.occupancy_format == 'static': - no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[], total_people=form.total_people - form.short_range_occupants) - elif form.occupancy_format == 'dynamic': - for occ in form.dynamic_exposed_occupancy: # Update the number of exposed people with long-range exposure - if occ['total_people'] > form.short_range_occupants: occ['total_people'] = max(0, occ['total_people'] - form.short_range_occupants) - no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[], dynamic_exposed_occupancy=form.dynamic_exposed_occupancy) - + # Adjust the number of exposed people with long-range exposure based on short-range interactions + if not form.occupancy: + no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions={}, total_people=form.total_people - form.short_range_occupants) + else: + for group_id, group in form.occupancy.items(): + # Check if the group exists in short-range interactions + if group_id in form.short_range_interactions: + short_range_count = form.short_range_occupants + total_people = group['total_people'] + if total_people > short_range_count > 0: + # Update the total_people with the adjusted value + group['total_people'] = max(0, total_people - short_range_count) + no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions={}, occupancy=form.occupancy) scenarios['Base scenario without short-range interactions'] = no_short_range_alternative.build_mc_model() + for scenario_name, scenario in scenarios.items(): + scenarios[scenario_name] = scenario.exposure_models[0] # type: ignore return scenarios def scenario_statistics( mc_model: mc.ExposureModel, sample_times: typing.List[float], - static_occupancy: bool, compute_prob_exposure: bool, ): model = mc_model.build_model( - size=mc_model.data_registry.monte_carlo['sample_size']) - + size=mc_model.data_registry.monte_carlo['sample_size'] + ) + return { 'probability_of_infection': np.mean(model.infection_probability()), - 'expected_new_cases': np.mean(model.expected_new_cases()) if static_occupancy else None, + 'expected_new_cases': np.mean(model.expected_new_cases()), 'concentrations': [ np.mean(model.concentration(time)) for time in sample_times @@ -441,32 +559,30 @@ def comparison_report( report_data: typing.Dict[str, typing.Any], scenarios: typing.Dict[str, mc.ExposureModel], executor_factory: typing.Callable[[], concurrent.futures.Executor], -): +): if (form.short_range_option == "short_range_no"): statistics = { 'Current scenario': { - 'probability_of_infection': report_data['prob_inf'], - 'expected_new_cases': report_data['expected_new_cases'], - 'concentrations': report_data['concentrations'], + 'probability_of_infection': report_data['groups']['group_1']['prob_inf'], + 'expected_new_cases': report_data['groups']['group_1']['expected_new_cases'], + 'concentrations': report_data['groups']['group_1']['concentrations'], } } else: statistics = {} - static_occupancy = form.occupancy_format == "static" - compute_prob_exposure = form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure" and static_occupancy + compute_prob_exposure = form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure" and not form.occupancy with executor_factory() as executor: results = executor.map( scenario_statistics, scenarios.values(), [report_data['times']] * len(scenarios), - [static_occupancy] * len(scenarios), [compute_prob_exposure] * len(scenarios), timeout=60, ) - for (name, model), model_stats in zip(scenarios.items(), results): + for (name, _), model_stats in zip(scenarios.items(), results): statistics[name] = model_stats return { @@ -474,7 +590,9 @@ def comparison_report( } -def alternative_scenarios_data(form: VirusFormData, report_data: typing.Dict[str, typing.Any], executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]: +def alternative_scenarios_data(form: VirusFormData, + report_data: typing.Dict[str, typing.Any], + executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]: alternative_scenarios: typing.Dict[str, typing.Any] = manufacture_alternative_scenarios(form=form) return { 'alternative_scenarios': comparison_report(form=form, report_data=report_data, scenarios=alternative_scenarios, executor_factory=executor_factory) diff --git a/caimira/src/caimira/calculator/validators/co2/co2_validator.py b/caimira/src/caimira/calculator/validators/co2/co2_validator.py index 7b9185ab..c85738f9 100644 --- a/caimira/src/caimira/calculator/validators/co2/co2_validator.py +++ b/caimira/src/caimira/calculator/validators/co2/co2_validator.py @@ -28,8 +28,6 @@ class CO2FormData(FormData): # and the defaults in any html form must not be contradictory. _DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = { 'CO2_data': '{}', - 'dynamic_infected_occupancy': '[]', - 'dynamic_exposed_occupancy': '[]', 'exposed_coffee_break_option': 'coffee_break_0', 'exposed_coffee_duration': 5, 'exposed_finish': '17:30', @@ -47,7 +45,7 @@ class CO2FormData(FormData): 'infected_lunch_start': '12:30', 'infected_people': 1, 'infected_start': '08:30', - 'occupancy_format': 'static', + 'occupancy': '{}', 'room_capacity': None, 'room_volume': NO_DEFAULT, 'specific_breaks': '{}', @@ -74,7 +72,7 @@ class CO2FormData(FormData): raise TypeError(f'The room capacity should be a valid integer (> 0). Got {self.room_capacity}.') # Validate specific inputs - breaks (exposed and infected) - if self.specific_breaks != {}: + if self.specific_breaks != {} and not self.occupancy: if type(self.specific_breaks) is not dict: raise TypeError('The specific breaks should be in a dictionary.') @@ -188,11 +186,6 @@ class CO2FormData(FormData): return img2base64(_figure2bytes(fig)), vent_plot_data - def population_present_changes(self, infected_presence: models.Interval, exposed_presence: models.Interval) -> typing.List[float]: - state_change_times = set(infected_presence.transition_times()) - state_change_times.update(exposed_presence.transition_times()) - return sorted(state_change_times) - def ventilation_transition_times(self) -> typing.Tuple[float]: ''' Check if the last time from the input data is @@ -207,45 +200,16 @@ class CO2FormData(FormData): return tuple(vent_states) def build_model(self, sample_size = None) -> models.CO2DataModel: - # Build a simple infected and exposed population for the case when presence - # intervals and number of people are dynamic. Activity type is not needed. - if self.occupancy_format == 'dynamic': - if isinstance(self.dynamic_infected_occupancy, typing.List) and len(self.dynamic_infected_occupancy) > 0: - infected_people = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy) - infected_presence = None - else: - raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_infected_occupancy}".') - if isinstance(self.dynamic_exposed_occupancy, typing.List) and len(self.dynamic_exposed_occupancy) > 0: - exposed_people = self.generate_dynamic_occupancy(self.dynamic_exposed_occupancy) - exposed_presence = None - else: - raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_exposed_occupancy}".') - else: - infected_people = self.infected_people - exposed_people = self.total_people - self.infected_people - infected_presence = self.infected_present_interval() - exposed_presence = self.exposed_present_interval() - - infected_population = models.SimplePopulation( - number=infected_people, - presence=infected_presence, - activity=None, # type: ignore - ) - exposed_population=models.SimplePopulation( - number=exposed_people, - presence=exposed_presence, - activity=None, # type: ignore - ) - - all_state_changes=self.population_present_changes(infected_population.presence_interval(), - exposed_population.presence_interval()) - total_people = [infected_population.people_present(stop) + exposed_population.people_present(stop) - for _, stop in zip(all_state_changes[:-1], all_state_changes[1:])] + """ + Builds a CO2 data model that considers data + from the defined population groups. + """ + occupancy = self.build_CO2_piecewise() return models.CO2DataModel( data_registry=self.data_registry, room=models.Room(volume=self.room_volume, capacity=self.room_capacity), - occupancy=models.IntPiecewiseConstant(transition_times=tuple(all_state_changes), values=tuple(total_people)), + occupancy=occupancy, ventilation_transition_times=self.ventilation_transition_times(), times=self.CO2_data['times'], CO2_concentrations=self.CO2_data['CO2'], diff --git a/caimira/src/caimira/calculator/validators/defaults.py b/caimira/src/caimira/calculator/validators/defaults.py index 604617c0..5aa3f2ce 100644 --- a/caimira/src/caimira/calculator/validators/defaults.py +++ b/caimira/src/caimira/calculator/validators/defaults.py @@ -16,8 +16,6 @@ DEFAULTS = { 'calculator_version': NO_DEFAULT, 'ceiling_height': 0., 'conditional_probability_viral_loads': False, - 'dynamic_exposed_occupancy': '[]', - 'dynamic_infected_occupancy': '[]', 'event_month': 'January', 'exposed_coffee_break_option': 'coffee_break_0', 'exposed_coffee_duration': 5, @@ -49,14 +47,14 @@ DEFAULTS = { 'mask_type': 'Type I', 'mask_wearing_option': 'mask_off', 'mechanical_ventilation_type': 'not-applicable', - 'occupancy_format': 'static', + 'occupancy': '{}', 'opening_distance': 0., 'precise_activity': '{}', 'room_heating_option': False, 'room_number': NO_DEFAULT, 'room_volume': 0., 'sensor_in_use': '', - 'short_range_interactions': '[]', + 'short_range_interactions': '{}', 'short_range_occupants': 0, 'short_range_option': 'short_range_no', 'simulation_name': NO_DEFAULT, @@ -82,7 +80,8 @@ DEFAULTS = { # ------------------ Validation ---------------------- COFFEE_OPTIONS_INT = {'coffee_break_0': 0, 'coffee_break_1': 1, - 'coffee_break_2': 2, 'coffee_break_4': 4} + 'coffee_break_2': 2, 'coffee_break_3': 3, + 'coffee_break_4': 4} CONFIDENCE_LEVEL_OPTIONS = {'confidence_low': 10, 'confidence_medium': 5, 'confidence_high': 2} MECHANICAL_VENTILATION_TYPES = { diff --git a/caimira/src/caimira/calculator/validators/form_validator.py b/caimira/src/caimira/calculator/validators/form_validator.py index fe2e58dc..3fc2f67c 100644 --- a/caimira/src/caimira/calculator/validators/form_validator.py +++ b/caimira/src/caimira/calculator/validators/form_validator.py @@ -36,15 +36,11 @@ class FormData: infected_lunch_start: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed infected_start: minutes_since_midnight infected_people: int - occupancy_format: str + occupancy: dict room_volume: float specific_breaks: dict total_people: int - # Dynamic occupancy inputs - dynamic_exposed_occupancy: list - dynamic_infected_occupancy: list - data_registry: DataRegistry _DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS @@ -97,117 +93,372 @@ class FormData: form_dict.pop(attr) return form_dict - def validate_population_parameters(self): - # Static occupancy is defined. - if self.occupancy_format == 'static': - # Validate number of infected <= number of total people - if self.infected_people >= self.total_people: - raise ValueError( - 'Number of infected people cannot be greater or equal to the number of total people.') + def validate_group_presence_input(self, group_id: str, group_presence: typing.List): + """ + When occupancy is defined, this method validates the + presence times within an occupancy group. + """ + # Checks if the presence input is a valid list + if not isinstance(group_presence, list): + raise TypeError(f'The "presence" parameter in occupancy group "{group_id}" should be a valid list. Got {type(group_presence)}.') + # Checks if the presence input is populated + if len(group_presence) == 0: + raise TypeError(f'The "presence" parameter in occupancy group "{group_id}" should be a valid, non-empty list. Got {group_presence}.') + + # Already processed presence intervals for overlap checking + existing_occupancy_presence_interval: typing.List = [] + + for presence_interval in group_presence: + # Checks if each presence entry is a valid dict + if not isinstance(presence_interval, typing.Dict): + raise TypeError(f'Each presence interval should be a valid dictionary. Got {type(presence_interval)} in occupancy group "{group_id}".') - # Validate time intervals selected by user - time_intervals = [ - ['exposed_start', 'exposed_finish'], - ['infected_start', 'infected_finish'], - ] - if self.exposed_lunch_option: - time_intervals.append( - ['exposed_lunch_start', 'exposed_lunch_finish']) - if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option: - time_intervals.append( - ['infected_lunch_start', 'infected_lunch_finish']) + # Parameters in each presence entry + presence_params = presence_interval.keys() - for start_name, end_name in time_intervals: - start = getattr(self, start_name) - end = getattr(self, end_name) - if start > end: + # Checks for the "start_time" and "finish_time" params + for time_param in ["start_time", "finish_time"]: + if time_param not in presence_params: + raise TypeError(f'Missing "{time_param}" key in "presence" parameter of occupancy group "{group_id}".' + f' Got keys: {", ".join(presence_params)}.') + + time_value = presence_interval[time_param] + if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time_value): + raise ValueError(f'Invalid time format found in "presence" parameter of occupancy group "{group_id}". ' + f'Expected HH:MM, got {time_value}.') + + if presence_interval["finish_time"] <= presence_interval["start_time"]: + raise ValueError(f'Inconsistent times found in "presence" parameter of occupancy group "{group_id}".' + f'The "{presence_interval}" entry has a start time ("{presence_interval["start_time"]}") ' + f'after the finish time ("{presence_interval["finish_time"]}").') + + # Checks for the occupancy group uniqueness of intervals + self.check_overlap(presence_interval, existing_occupancy_presence_interval) + existing_occupancy_presence_interval.append(presence_interval) + + def validate_short_range_interaction_input(self, group_id: str, sr_interactions: typing.List): + """ + Validates the short-range interactions within an occupancy group. + """ + # Within a group, checks if the short-range input is a valid list + if not isinstance(sr_interactions, list): + raise TypeError(f'The short-range interactions in occupancy group "{group_id}" should be defined in a valid list. Got {type(sr_interactions)}.') + # Within a group, checks if the list is populated + if len(sr_interactions) == 0: + raise TypeError(f'The short-range interactions in occupancy group "{group_id}" should be a non-empty list. Got {type(sr_interactions)}.') + + # Already processed interactions for overlap checking + existing_sr_interaction_interval: typing.List = [] + + for interaction in sr_interactions: + # Checks if each interaction is a valid dict + if not isinstance(interaction, typing.Dict): + raise TypeError(f'Each short-range interaction should be a dictionary. Got {type(interaction)} in occupancy group "{group_id}".') + + # Parameters in each short-range interaction + interaction_params = interaction.keys() + + # Checks for the expiration key and its constraints + if "expiration" not in interaction_params: + raise TypeError(f'Missing "expiration" key in short-range interaction for occupancy group "{group_id}". Got keys: {", ".join(interaction_params)}.') + else: + expiration = interaction["expiration"] + if expiration not in list(self.data_registry.expiration_particle['expiration_BLO_factors'].keys()): # type: ignore + raise ValueError(f'Invalid expiration value in short-range interaction for occupancy group "{group_id}". Got "{expiration}".') + + # Checks for start_time key and its format + if "start_time" not in interaction_params: + raise TypeError(f'Missing "start_time" key in short-range interaction for occupancy group "{group_id}". Got keys: {", ".join(interaction_params)}.') + else: + start_time = interaction["start_time"] + if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(start_time): + raise ValueError(f'Invalid time format for start_time in short-range interaction for occupancy group "{group_id}". Expected HH:MM, got {start_time}.') + + # Checks for "duration" and its format + if "duration" not in interaction_params: + raise TypeError(f'Missing "duration" key in short-range interaction for occupancy group "{group_id}". Got keys: {", ".join(interaction_params)}.') + else: + duration = interaction["duration"] + if duration < 0: + raise ValueError(f'The duration value in short-range interaction for occupancy group "{group_id}" should be a non-negative integer. Got {duration}.') + + # Legacy usage - occupancy input is not defined (default empty dict) + if not self.occupancy: + # It means that we have a single exposure model + lr_start = min(self.infected_start, self.exposed_start)/60 + lr_stop = max(self.infected_finish, self.exposed_finish)/60 + + if not self.check_interaction_is_within_long_range(interaction, existing_sr_interaction_interval, lr_start, lr_stop): raise ValueError( - f"{start_name} must be less than {end_name}. Got {start} and {end}.") + f'Short-range interactions must occur during simulation time. Got {interaction} in occupancy group "{group_id}".' + ) + + # Add interaction to the list of already processed interactions + existing_sr_interaction_interval.append(interaction) + else: + # Find corresponding exposure group + occupancy_group_obj = next( + (occupancy_value for occupancy_key, occupancy_value in self.occupancy.items() + if occupancy_key == group_id), + None + ) - def validate_lunch(start, finish): - lunch_start = getattr(self, f'{population}_lunch_start') - lunch_finish = getattr(self, f'{population}_lunch_finish') - return (start <= lunch_start <= finish and - start <= lunch_finish <= finish) - - def get_lunch_mins(population): - lunch_mins = 0 - if getattr(self, f'{population}_lunch_option'): - lunch_mins = getattr( - self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start') - return lunch_mins - - def get_coffee_mins(population): - coffee_mins = 0 - if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0': - coffee_mins = COFFEE_OPTIONS_INT[getattr( - self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration') - return coffee_mins - - def get_activity_mins(population): - return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start') - - populations = [ - 'exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed'] - for population in populations: - # Validate lunch time within the activity times. - if (getattr(self, f'{population}_lunch_option') and - not validate_lunch(getattr(self, f'{population}_start'), getattr( - self, f'{population}_finish')) - ): + if occupancy_group_obj is None: raise ValueError( - f"{population} lunch break must be within presence times." + f'Occupancy group "{group_id}" referenced in short-range interactions was not found in the occupancy input.' ) - # Length of breaks < length of activity - if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population): + is_within_any_lr = False + for presence in occupancy_group_obj['presence']: + # Check for correct timing within long-range exposure and overlaps with existing interactions + lr_start = time_string_to_minutes(presence['start_time'])/60 + lr_stop = time_string_to_minutes(presence['finish_time'])/60 + + # Flag to check if interaction falls within any long-range exposure interval + if self.check_interaction_is_within_long_range(interaction, existing_sr_interaction_interval, lr_start, lr_stop): + is_within_any_lr = True + + # Add interaction to the list of processed interactions if within long-range + existing_sr_interaction_interval.append(interaction) + + # If the interaction does not fall within any presence interval of the occupancy group, raise an error + if not is_within_any_lr: raise ValueError( - f"Length of breaks >= Length of {population} presence." + f'Short-range interaction {interaction} does not fall within any presence interval in occupancy group "{group_id}".' ) + + def validate_dynamic_exposed_format(self, group_id: str, group: typing.Dict): + """ + Validates the expected keywords for the occupancy input. + """ + # Parameters in each presence entry + group_params = group.keys() - for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT), - ('infected_coffee_break_option', COFFEE_OPTIONS_INT)]: - if getattr(self, attr_name) not in valid_set: - raise ValueError( - f"{getattr(self, attr_name)} is not a valid value for {attr_name}") - # Dynamic occupancy is defined. - elif self.occupancy_format == 'dynamic': - for dynamic_format in (self.dynamic_infected_occupancy, self.dynamic_exposed_occupancy): - for occupancy in dynamic_format: - # Check if each occupancy entry is a dictionary - if not isinstance(occupancy, typing.Dict): - raise TypeError(f'Each occupancy entry should be in a dictionary format. Got "{type(occupancy)}".') - - # Check for required keys in each occupancy entry - dict_keys = list(occupancy.keys()) - if "total_people" not in dict_keys: - raise TypeError(f'Unable to fetch "total_people" key. Got "{dict_keys}".') - else: - value = occupancy["total_people"] - # Check if the value is a non-negative integer - if not isinstance(value, int): - raise ValueError(f'Total number of people should be integer. Got "{type(value)}".') - elif not value >= 0: - raise ValueError(f'Total number of people should be non-negative. Got "{value}".') - - if "start_time" not in dict_keys: - raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys}".') - if "finish_time" not in dict_keys: - raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys}".') - - # Validate time format for start_time and finish_time - for time_key in ["start_time", "finish_time"]: - time = occupancy[time_key] - if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time): - raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".') + # Total people input + if 'total_people' not in group_params: + raise TypeError(f'Missing "total_people" key in occupancy group "{group_id}". Got keys: {", ".join(group_params)}.') else: - raise ValueError(f"'{self.occupancy_format}' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.") + total_people = group['total_people'] + if not isinstance(total_people, int) or total_people < 0: + raise ValueError(f'The "total_people" input in occupancy group "{group_id}" should be a non-negative integer. Got {total_people}.') + + # Infected people input + if 'infected' not in group_params: + raise TypeError(f'Missing "infected" key in occupancy group "{group_id}". Got keys: {", ".join(group_params)}.') + else: + infected = group['infected'] + if not isinstance(infected, int) or infected < 0: + raise ValueError(f'The infected input in occupancy group "{group_id}" should be a non-negative integer. Got {infected}.') + elif infected > total_people: # Validate number of infected <= number of total people + raise ValueError(f'The number of infected people ({infected}) cannot be greater than the total people ({total_people}).') + + # Presence input + if 'presence' not in group_params: + raise TypeError(f'Missing "presence" key in occupancy group "{group_id}". Got keys: {", ".join(group_params)}.') + + def get_start_and_finish_time(self, entry: typing.Dict) -> typing.Tuple: + entry_start = time_string_to_minutes(entry["start_time"])/60 + if "finish_time" in entry: + entry_finish = time_string_to_minutes(entry["finish_time"])/60 + else: + entry_finish = entry_start + entry['duration']/60 + return entry_start, entry_finish + + def check_interaction_is_within_long_range(self, interaction: typing.Dict, existing_interactions: typing.List, + lr_start: float, lr_stop: float) -> bool: + """ + Check if the short-range interaction falls within the long-range exposure time. + Check if the short-range interaction given as input overlaps with any already + existing interactions for the same occupancy group. + """ + interaction_start, interaction_finish = self.get_start_and_finish_time(interaction) + # Check if the SR interaction is within the LR exposure time + if lr_start <= interaction_start <= lr_stop and lr_start <= interaction_finish <= lr_stop: + # Check the overlap with already existing interactions + self.check_overlap(interaction, existing_interactions) + return True + return False + + def check_overlap(self, entry: typing.Dict, existing_entries: typing.List): + """ + Check if an entry overlaps with an already existing entry + by comparing the start and finish times of all entries. + """ + entry_start, entry_finish = self.get_start_and_finish_time(entry) + for existing_entry in existing_entries: + existing_entry_start, existing_entry_finish = self.get_start_and_finish_time(existing_entry) + # Check for overlap + if (entry_start < existing_entry_finish and existing_entry_start < entry_finish): + raise ValueError( + f'Overlap detected: The entry {entry} overlaps with ' + f'an already existing entry ({existing_entry}).' + ) + # In case no exception is raised, simply returns + return + + def validate_population_parameters(self): + """ + Validate required parameters for dynamic inputs. + """ + if isinstance(self.occupancy, typing.Dict): + # Legacy usage - occupancy input is not defined (default empty dict) + if not self.occupancy: + # Validate number of infected <= number of total people + if self.infected_people >= self.total_people: + raise ValueError( + 'Number of infected people cannot be greater or equal to the number of total people.') + + # Validate time intervals selected by user + time_intervals = [ + ['exposed_start', 'exposed_finish'], + ['infected_start', 'infected_finish'], + ] + if self.exposed_lunch_option: + time_intervals.append( + ['exposed_lunch_start', 'exposed_lunch_finish']) + if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option: + time_intervals.append( + ['infected_lunch_start', 'infected_lunch_finish']) + + for start_name, end_name in time_intervals: + start = getattr(self, start_name) + end = getattr(self, end_name) + if start > end: + raise ValueError( + f"{start_name} must be less than {end_name}. Got {start} and {end}.") + + def validate_lunch(start, finish): + lunch_start = getattr(self, f'{population}_lunch_start') + lunch_finish = getattr(self, f'{population}_lunch_finish') + return (start <= lunch_start <= finish and + start <= lunch_finish <= finish) + + def get_lunch_mins(population): + lunch_mins = 0 + if getattr(self, f'{population}_lunch_option'): + lunch_mins = getattr( + self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start') + return lunch_mins + + def get_coffee_mins(population): + coffee_mins = 0 + if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0': + coffee_mins = COFFEE_OPTIONS_INT[getattr( + self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration') + return coffee_mins + + def get_activity_mins(population): + return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start') + + populations = [ + 'exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed'] + for population in populations: + # Validate lunch time within the activity times. + if (getattr(self, f'{population}_lunch_option') and + not validate_lunch(getattr(self, f'{population}_start'), getattr( + self, f'{population}_finish')) + ): + raise ValueError( + f"{population} lunch break must be within presence times." + ) + + # Length of breaks < length of activity + if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population): + raise ValueError( + f"Length of breaks >= Length of {population} presence." + ) + + for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT), + ('infected_coffee_break_option', COFFEE_OPTIONS_INT)]: + if getattr(self, attr_name) not in valid_set: + raise ValueError( + f"{getattr(self, attr_name)} is not a valid value for {attr_name}") + # Occupancy input is defined + else: + # Checks if occupancy input is a valid dict + if self.occupancy and isinstance(self.occupancy, typing.Dict): + # The key is the actual identifier + for group_id, group in self.occupancy.items(): + # For each group, validate input format + self.validate_dynamic_exposed_format(group_id, group) + # ...as well as the respective presence input + self.validate_group_presence_input(group_id, group['presence']) + else: + raise TypeError(f'The "occupancy" input should be a valid dictionary. Got {self.occupancy}.') def validate(self): raise NotImplementedError("Subclass must implement") def build_model(self, sample_size: typing.Optional[int] = None): raise NotImplementedError("Subclass must implement") + + def population_present_changes(self, transition_times_list: typing.Tuple[float, ...]) -> typing.List[float]: + """ + Returns a sorted list of unique state changes on + a population list. + """ + return sorted(set(transition_times_list)) + + def convert_interval_to_piecewise(self, interval: models.SpecificInterval, value: int): + """ + Converts an Interval and a single value to an IntPiecewiseConstant. + """ + transition_times = [] + values = [] + + for start, end in interval.present_times: + transition_times.extend([start, end]) + values.extend([value, 0]) + + # Drop the last value (0) to match number of intervals + if values: + values.pop() + + return models.IntPiecewiseConstant( + transition_times=tuple(transition_times), + values=tuple(values), + ) + + def build_CO2_piecewise(self): + """ + Builds a simple IntPiecewiseConstant for the different + population groups that are defined. + """ + # Legacy usage - occupancy input is not defined (default empty dict) + if not self.occupancy: + infected_occupancy = self.convert_interval_to_piecewise( + interval=self.infected_present_interval(), + value=self.infected_people, + ) + exposed_occupancy = self.convert_interval_to_piecewise( + interval=self.exposed_present_interval(), + value=self.total_people - self.infected_people, + ) + total_models = [infected_occupancy, exposed_occupancy] + else: + infected_occupancy = self.generate_infected_occupancy(self.occupancy) + total_models = [infected_occupancy] + # For all state changes + for group in self.occupancy.values(): + model_piecewise = self.convert_interval_to_piecewise( + interval=self.generate_exposed_presence(group['presence']), + value=group['total_people'] - group['infected'] + ) + total_models.append(model_piecewise) + + # Get all state change times from combined populations + all_state_changes = self.population_present_changes([t for model in total_models for t in model.transition_times]) + + # Compute total people at each state change + total_people = [] + for _, stop in zip(all_state_changes[:-1], all_state_changes[1:]): + total_people_in_group = sum(model.value(stop) for model in total_models) + total_people.append(total_people_in_group) + + return models.IntPiecewiseConstant( + transition_times=tuple(all_state_changes), + values=tuple(total_people) + ) def _compute_breaks_in_interval(self, start, finish, n_breaks, duration) -> models.BoundarySequence_t: break_delay = ((finish - start) - @@ -417,26 +668,65 @@ class FormData: self.exposed_start, self.exposed_finish, breaks=breaks, ) + + def generate_exposed_presence(self, presence: typing.List) -> models.SpecificInterval: + """ + Creates a model to represent exposed occupancy over time. + """ + exposed_intervals = [] + + # Sort occupancy entries by start_time to ensure proper ordering + presence_sorted = sorted( + presence, key=lambda x: time_string_to_minutes(x['start_time']) + ) + + for period in presence_sorted: + start_time = time_string_to_minutes(period['start_time']) / 60 + finish_time = time_string_to_minutes(period['finish_time']) / 60 + exposed_intervals.append((start_time, finish_time)) + + return models.SpecificInterval(tuple(exposed_intervals)) - def generate_dynamic_occupancy(self, dynamic_occupancy: typing.List[typing.Dict[str, typing.Any]]): - transition_times = [] - values = [] - for occupancy in dynamic_occupancy: - start_time = time_string_to_minutes(occupancy['start_time'])/60 - finish_time = time_string_to_minutes(occupancy['finish_time'])/60 - transition_times.extend([start_time, finish_time]) - values.append(occupancy['total_people']) + def generate_infected_occupancy(self, occupancy: typing.Dict) -> models.IntPiecewiseConstant: + """ + Creates a model to represent infected occupancy over time. + """ + transition_times = set() + infected_intervals = [] - unique_transition_times_sorted = np.array(sorted(set(transition_times))) - - if len(values) != len(unique_transition_times_sorted) - 1: - raise ValueError("Cannot compute dynamic occupancy with the provided inputs.") + # Extract presence data + for group in occupancy.values(): + infected = group["infected"] + for period in group["presence"]: + start_time = time_string_to_minutes(period['start_time']) / 60 + finish_time = time_string_to_minutes(period['finish_time']) / 60 + transition_times.add(start_time) # unique time points + transition_times.add(finish_time) # unique time points + infected_intervals.append((start_time, finish_time, infected)) - population_occupancy: models.IntPiecewiseConstant = models.IntPiecewiseConstant( - transition_times=tuple(unique_transition_times_sorted), - values=tuple(values) + # Sort transition times + sorted_transition_times = list(sorted(transition_times)) + + # Values for each time segment + raw_values = [ + sum(people for start, end, people in infected_intervals if start <= t1 < end) + for t1 in sorted_transition_times[:-1] + ] + + # Merge consecutive intervals with the same infected count + opt_times = [sorted_transition_times[0]] + opt_values = [raw_values[0]] + for i in range(1, len(raw_values)): + if raw_values[i] != opt_values[-1]: + opt_times.append(sorted_transition_times[i]) + opt_values.append(raw_values[i]) + # Ensure the last time is included + opt_times.append(sorted_transition_times[-1]) + + return models.IntPiecewiseConstant( + transition_times=tuple(opt_times), + values=tuple(opt_values) ) - return population_occupancy def _hours2timestring(hours: float): @@ -450,6 +740,8 @@ def time_string_to_minutes(time: str) -> minutes_since_midnight: :param time: A string of the form "HH:MM" representing a time of day :return: The number of minutes between 'time' and 00:00 """ + if not (0 <= int(time[:2]) <= 23) or not (0 <= int(time[3:]) <= 59): + raise ValueError(f"Wrong time format. Got {time}") return minutes_since_midnight(60 * int(time[:2]) + int(time[3:])) diff --git a/caimira/src/caimira/calculator/validators/virus/virus_validator.py b/caimira/src/caimira/calculator/validators/virus/virus_validator.py index 80eec891..247a3a6b 100644 --- a/caimira/src/caimira/calculator/validators/virus/virus_validator.py +++ b/caimira/src/caimira/calculator/validators/virus/virus_validator.py @@ -4,6 +4,7 @@ import logging import typing import re +from collections import defaultdict import numpy as np from caimira import __version__ as calculator_version @@ -11,7 +12,7 @@ from ..form_validator import FormData, cast_class_fields, time_string_to_minutes from ..defaults import (DEFAULTS, CONFIDENCE_LEVEL_OPTIONS, MECHANICAL_VENTILATION_TYPES, MASK_WEARING_OPTIONS, MONTH_NAMES, VACCINE_BOOSTER_TYPE, VACCINE_TYPE, VENTILATION_TYPES, VOLUME_TYPES, WINDOWS_OPENING_REGIMES, WINDOWS_TYPES) -from ...models import models, data, monte_carlo as mc +from ...models import models, data, dataclass_utils, monte_carlo as mc from ...models.monte_carlo.data import activity_distributions, virus_distributions, mask_distributions, short_range_distances from ...models.monte_carlo.data import expiration_distribution, expiration_BLO_factors, expiration_distributions, short_range_expiration_distributions @@ -51,7 +52,7 @@ class VirusFormData(FormData): room_heating_option: bool room_number: str sensor_in_use: str - short_range_interactions: list + short_range_interactions: dict short_range_occupants: int short_range_option: str simulation_name: str @@ -73,7 +74,7 @@ class VirusFormData(FormData): _DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS def validate(self): - # Validate population parameters + # Validate population parameters self.validate_population_parameters() validation_tuples = [('activity_type', self.data_registry.population_scenario_activity.keys()), @@ -200,30 +201,37 @@ class VirusFormData(FormData): f'The sum of all respiratory activities should be 100. Got {total_percentage}.') # Validate number of people with short-range interactions - if self.occupancy_format == "static": max_occupants_for_sr = self.total_people - self.infected_people - else: max_occupants_for_sr = np.max(np.array([entry["total_people"] for entry in self.dynamic_exposed_occupancy])) + if not self.occupancy: + # Legacy usage - occupancy input is not defined (default empty dict) + max_occupants_for_sr = self.total_people - self.infected_people + else: + max_occupants_for_sr = 0 + for group_id, group in self.occupancy.items(): + exposed_occupants_in_group = group['total_people'] - group['infected'] + max_occupants_for_sr = max(max_occupants_for_sr, exposed_occupants_in_group) + if self.short_range_occupants > max_occupants_for_sr: raise ValueError( f'The total number of occupants having short-range interactions ({self.short_range_occupants}) should be lower than the exposed population ({max_occupants_for_sr}).' ) - + # Validate short-range interactions interval if self.short_range_option == "short_range_yes": - for interaction in self.short_range_interactions: - # Check if presence is within long-range exposure - presence = self.short_range_interval(interaction) - if (self.occupancy_format == 'dynamic'): - long_range_start = min(time_string_to_minutes(self.dynamic_infected_occupancy[0]['start_time']), - time_string_to_minutes(self.dynamic_exposed_occupancy[0]['start_time'])) - long_range_stop = max(time_string_to_minutes(self.dynamic_infected_occupancy[-1]['finish_time']), - time_string_to_minutes(self.dynamic_exposed_occupancy[-1]['finish_time'])) + if isinstance(self.short_range_interactions, dict): + # Checks if short_range_interactions input is not empty + if len(self.short_range_interactions) == 0: + raise ValueError(f'When short_range_option input is set to "{self.short_range_option}", the short_range_interactions input should not be empty. Got {self.short_range_interactions}.') + # Checks that the number of groups in the short_range_interactions input is less or equal than those defined in the occupancy + elif not self.occupancy: + # Legacy usage - occupancy input is not defined (default empty dict) + if len(self.short_range_interactions) > 1: + raise ValueError(f'Incompatible number of occupancy groups in the short_range_interactions input. Got {len(self.short_range_interactions)} groups when the maximum is 1.') else: - long_range_start = min(self.infected_start, self.exposed_start) - long_range_stop = max(self.infected_finish, self.exposed_finish) - if not (long_range_start/60 <= presence.present_times[0][0] <= long_range_stop/60 and - long_range_start/60 <= presence.present_times[0][-1] <= long_range_stop/60): - raise ValueError(f"Short-range interactions should be defined during simulation time. Got {interaction}") - + if len(self.short_range_interactions) > len(self.occupancy): + raise ValueError(f'Incompatible number of occupancy groups in the short_range_interactions input. Got {len(self.short_range_interactions)} groups when the maximum is {len(self.occupancy)} (from the occupancy input).') + for group_id, interactions in self.short_range_interactions.items(): + self.validate_short_range_interaction_input(group_id, interactions) + def initialize_room(self) -> models.Room: # Initializes room with volume either given directly or as product of area and height if self.volume_type == 'room_volume_explicit': @@ -243,69 +251,98 @@ class VirusFormData(FormData): return models.Room(volume=volume, inside_temp=models.PiecewiseConstant((0, 24), (inside_temp,)), humidity=humidity) # type: ignore - def build_mc_model(self) -> mc.ExposureModel: - room = self.initialize_room() + def build_mc_model(self) -> mc.ExposureModelGroup: + room: models.Room = self.initialize_room() ventilation: models._VentilationBase = self.ventilation() infected_population: models.InfectedPopulation = self.infected_population() - short_range = [] - if self.short_range_option == "short_range_yes": - for interaction in self.short_range_interactions: - short_range.append(mc.ShortRangeModel( - data_registry=self.data_registry, - expiration=short_range_expiration_distributions( - self.data_registry)[interaction['expiration']], - activity=infected_population.activity, - presence=self.short_range_interval(interaction), - distance=short_range_distances(self.data_registry), - )) - return mc.ExposureModel( + short_range = defaultdict(list) + if self.short_range_option == "short_range_yes": + sr_expiration_distributions = short_range_expiration_distributions(self.data_registry) + for key, group in self.short_range_interactions.items(): + for interaction in group: + expiration = sr_expiration_distributions[interaction['expiration']] + presence = self.short_range_interval(interaction) + distances = short_range_distances(self.data_registry) + short_range[key].append(mc.ShortRangeModel( + data_registry=self.data_registry, + expiration=expiration, + activity=infected_population.activity, + presence=presence, + distance=distances, + )) + + concentration_model: models.ConcentrationModel = mc.ConcentrationModel( data_registry=self.data_registry, - concentration_model=mc.ConcentrationModel( - data_registry=self.data_registry, - room=room, - ventilation=ventilation, - infected=infected_population, - evaporation_factor=0.3, - ), - short_range=tuple(short_range), - exposed=self.exposed_population(), - geographical_data=mc.Cases( - geographic_population=self.geographic_population, - geographic_cases=self.geographic_cases, - ascertainment_bias=CONFIDENCE_LEVEL_OPTIONS[self.ascertainment_bias], - ), - exposed_to_short_range=self.short_range_occupants, + room=room, + ventilation=ventilation, + infected=infected_population, + evaporation_factor=0.3, ) - def build_model(self, sample_size=None) -> models.ExposureModel: + geographical_data: models.Cases = mc.Cases( + geographic_population=self.geographic_population, + geographic_cases=self.geographic_cases, + ascertainment_bias=CONFIDENCE_LEVEL_OPTIONS[self.ascertainment_bias], + ) + + if not self.occupancy: + # Legacy usage - occupancy input is not defined (default empty dict) + exposed_population = self.exposed_population() + short_range_tuple = tuple(item for sublist in short_range.values() for item in sublist) + return mc.ExposureModelGroup( + data_registry=self.data_registry, + exposure_models = (mc.ExposureModel( + data_registry=self.data_registry, + concentration_model=concentration_model, + short_range=short_range_tuple, + exposed=exposed_population, + geographical_data=geographical_data, + exposed_to_short_range=self.short_range_occupants, + ),) + ) + else: + exposure_model_set = [] + for exposure_group in self.occupancy.keys(): + sr_models: typing.Tuple[models.ShortRangeModel, ...] = tuple(short_range[exposure_group]) + exposed_population = self.exposed_population(exposure_group) + + exposure_model = mc.ExposureModel( + data_registry=self.data_registry, + concentration_model=concentration_model, + short_range=sr_models, + exposed=exposed_population, + geographical_data=geographical_data, + exposed_to_short_range=self.short_range_occupants, + identifier=exposure_group, + ) + exposure_model_set.append(exposure_model) + + return mc.ExposureModelGroup( + data_registry=self.data_registry, + exposure_models=tuple(exposure_model_set) + ) + + def build_model(self, sample_size=None) -> models.ExposureModelGroup: sample_size = sample_size or self.data_registry.monte_carlo['sample_size'] - return self.build_mc_model().build_model(size=sample_size) + return self.build_mc_model().build_model(sample_size) def build_CO2_model(self, sample_size=None) -> models.CO2ConcentrationModel: + """ + Builds a CO2 model that considers the type of + activity and data from the defined population groups. + """ sample_size = sample_size or self.data_registry.monte_carlo['sample_size'] - infected_population: models.InfectedPopulation = self.infected_population( - ).build_model(sample_size) - exposed_population: models.Population = self.exposed_population().build_model(sample_size) - - state_change_times = set( - infected_population.presence_interval().transition_times()) - state_change_times.update( - exposed_population.presence_interval().transition_times()) - transition_times = sorted(state_change_times) - - total_people = [infected_population.people_present(stop) + exposed_population.people_present(stop) - for _, stop in zip(transition_times[:-1], transition_times[1:])] if (self.activity_type == 'precise'): activity_defn, _ = self.generate_precise_activity_expiration() else: activity_defn = self.data_registry.population_scenario_activity[ self.activity_type]['activity'] - + + occupancy = self.build_CO2_piecewise() population = mc.SimplePopulation( - number=models.IntPiecewiseConstant(transition_times=tuple( - transition_times), values=tuple(total_people)), + number=occupancy, presence=None, activity=activity_distributions(self.data_registry)[activity_defn], ) @@ -420,6 +457,7 @@ class VirusFormData(FormData): # This is a minimal, always present source of ventilation, due # to the air infiltration from the outside. # See CERN-OPEN-2021-004, p. 12. + # type: ignore residual_vent: float = self.data_registry.ventilation['infiltration_ventilation'] # type: ignore infiltration_ventilation = models.AirChange( active=always_on, air_exch=residual_vent) @@ -447,7 +485,7 @@ class VirusFormData(FormData): def generate_precise_activity_expiration(self) -> typing.Tuple[typing.Any, ...]: # It means the precise activity is not defined by a specific input. - if self.precise_activity == {}: + if not self.precise_activity: return () respiratory_dict = {} for respiratory_activity in self.precise_activity['respiratory_activity']: @@ -457,30 +495,30 @@ class VirusFormData(FormData): return (self.precise_activity['physical_activity'], respiratory_dict) def infected_population(self) -> mc.InfectedPopulation: + """ + Generates an InfectedPopulation class, for both static and + dynamic occupancy. + """ # Initializes the virus virus = virus_distributions(self.data_registry)[self.virus_type] # Occupancy - if self.occupancy_format == 'dynamic': - if isinstance(self.dynamic_infected_occupancy, typing.List) and len(self.dynamic_infected_occupancy) > 0: - # If dynamic occupancy is defined, the generator will parse and validate the - # respective input to a format readable by the model - `IntPiecewiseConstant`. - infected_occupancy = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy) - infected_presence = None - else: - raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_infected_occupancy}".') - else: - # The number of exposed occupants is the total number of occupants - # minus the number of infected occupants. - infected_occupancy = self.infected_people + if not self.occupancy: + # Legacy usage - occupancy input is not defined (default empty dict) + infected_occupancy: typing.Union[int, models.IntPiecewiseConstant] = self.infected_people infected_presence = self.infected_present_interval() + else: + infected_occupancy = self.generate_infected_occupancy(self.occupancy) + infected_presence = None # Activity and expiration - activity_defn = self.data_registry.population_scenario_activity[self.activity_type]['activity'] - expiration_defn = self.data_registry.population_scenario_activity[self.activity_type]['expiration'] + activity_defn = self.data_registry.population_scenario_activity[ + self.activity_type]['activity'] + expiration_defn = self.data_registry.population_scenario_activity[ + self.activity_type]['expiration'] if (self.activity_type == 'smallmeeting'): # Conversation of N people is approximately 1/N% of the time speaking. - total_people: int = max(infected_occupancy.values) if self.occupancy_format == 'dynamic' else self.total_people + total_people: int = self.total_people if not self.occupancy else max(infected_occupancy.values) # type: ignore expiration_defn = {'Speaking': 1, 'Breathing': total_people - 1} elif (self.activity_type == 'precise'): activity_defn, expiration_defn = self.generate_precise_activity_expiration() @@ -496,31 +534,35 @@ class VirusFormData(FormData): mask=self.mask(), activity=activity, expiration=expiration, - # Vaccination status does not affect the infected population (for now) + # Vaccination status does not affect the infected population (for the time being) host_immunity=0., ) return infected - def exposed_population(self) -> mc.Population: + def exposed_population(self, exposure_group: typing.Optional[str] = None) -> mc.Population: + """ + Generates an exposed Population class, for both static and + dynamic occupancy. The number of people is constant for a + single group of exposed population, except when breaks are defined. + """ + # Occupancy + if not exposure_group and not self.occupancy: + # The number of exposed occupants is the total number of occupants + # minus the number of infected occupants. + exposed_occupancy = self.total_people - self.infected_people + exposed_presence = self.exposed_present_interval() + elif exposure_group: + dynamic_group = self.occupancy[exposure_group] + exposed_occupancy = dynamic_group['total_people'] - dynamic_group['infected'] + exposed_presence = self.generate_exposed_presence(dynamic_group['presence']) + + # Activity activity_defn = (self.precise_activity['physical_activity'] if self.activity_type == 'precise' else str(self.data_registry.population_scenario_activity[self.activity_type]['activity'])) activity = activity_distributions(self.data_registry)[activity_defn] - if self.occupancy_format == 'dynamic': - if isinstance(self.dynamic_exposed_occupancy, typing.List) and len(self.dynamic_exposed_occupancy) > 0: - # If dynamic occupancy is defined, the generator will parse and validate the - # respective input to a format readable by the model - IntPiecewiseConstant. - exposed_occupancy = self.generate_dynamic_occupancy(self.dynamic_exposed_occupancy) - exposed_presence = None - else: - raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_exposed_occupancy}".') - else: - # The number of exposed occupants is the total number of occupants - # minus the number of infected occupants. - exposed_occupancy = self.total_people - self.infected_people - exposed_presence = self.exposed_present_interval() - + # Vaccination if (self.vaccine_option): if (self.vaccine_booster_option and self.vaccine_booster_type != 'Other'): host_immunity = [vaccine['VE'] for vaccine in data.vaccine_booster_host_immunity if @@ -569,8 +611,6 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]: 'calculator_version': calculator_version, 'ceiling_height': '', 'conditional_probability_viral_loads': '0', - 'dynamic_exposed_occupancy': '[]', - 'dynamic_infected_occupancy': '[]', 'event_month': 'January', 'exposed_coffee_break_option': 'coffee_break_4', 'exposed_coffee_duration': '10', @@ -601,12 +641,12 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]: 'mask_type': 'Type I', 'mask_wearing_option': 'mask_off', 'mechanical_ventilation_type': '', - 'occupancy_format': 'static', + 'occupancy': '{}', 'opening_distance': '0.2', 'room_heating_option': '0', 'room_number': '123', 'room_volume': '75', - 'short_range_interactions': '[]', + 'short_range_interactions': '{}', 'short_range_option': 'short_range_no', 'simulation_name': 'Test', 'total_people': '10', diff --git a/caimira/tests/apps/calculator/test_model_generator.py b/caimira/tests/apps/calculator/test_model_generator.py index 5feb19ef..ede4b678 100644 --- a/caimira/tests/apps/calculator/test_model_generator.py +++ b/caimira/tests/apps/calculator/test_model_generator.py @@ -17,7 +17,7 @@ from caimira.calculator.store.data_registry import DataRegistry def test_model_from_dict(baseline_form_data, data_registry): form = virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry) - assert isinstance(form.build_model(), models.ExposureModel) + assert isinstance(form.build_model(), models.ExposureModelGroup) def test_model_from_dict_invalid(baseline_form_data, data_registry): @@ -590,51 +590,344 @@ def test_form_timezone(baseline_form_data, data_registry, longitude, latitude, m assert offset == expected_offset -@pytest.mark.parametrize( - ["occupancy_format_input", "error"], - [ - ['dynamc', "'dynamc' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",], - ['stact', "'stact' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",], - ['random', "'random' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",] - ] -) -def test_dynamic_format_input(occupancy_format_input, error, baseline_form: virus_validator.VirusFormData): - baseline_form.occupancy_format = occupancy_format_input - with pytest.raises(ValueError, match=re.escape(error)): - baseline_form.validate() - - -@pytest.mark.parametrize( - ["dynamic_occupancy_input", "error"], - [ - [[["total_people", 10, "start_time", "10:00", "finish_time", "11:00"]], "Each occupancy entry should be in a dictionary format. Got \"\"."], - [[{"tal_people": 10, "start_time": "10:00", "finish_time": "11:00"}], "Unable to fetch \"total_people\" key. Got \"['tal_people', 'start_time', 'finish_time']\"."], - [[{"total_people": 10, "art_time": "10:00", "finish_time": "11:00"}], "Unable to fetch \"start_time\" key. Got \"['total_people', 'art_time', 'finish_time']\"."], - [[{"total_people": 10, "start_time": "10:00", "ish_time": "11:00"}], "Unable to fetch \"finish_time\" key. Got \"['total_people', 'start_time', 'ish_time']\"."], - [[{"total_people": 10, "start_time": "10", "finish_time": "11:00"}], "Wrong time format - \"HH:MM\". Got \"10\"."], - [[{"total_people": 10, "start_time": "10:00", "finish_time": "11"}], "Wrong time format - \"HH:MM\". Got \"11\"."], - ] -) -def test_dynamic_occupancy_structure(dynamic_occupancy_input, error, baseline_form: virus_validator.VirusFormData): - baseline_form.occupancy_format = "dynamic" - baseline_form.dynamic_infected_occupancy = dynamic_occupancy_input - baseline_form.dynamic_exposed_occupancy = dynamic_occupancy_input +def test_occupancy_TypeError(baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy = [] # type: ignore + error = 'The "occupancy" input should be a valid dictionary. Got [].' with pytest.raises(TypeError, match=re.escape(error)): baseline_form.validate() @pytest.mark.parametrize( - ["dynamic_occupancy_input", "error"], - [ - [[{"total_people": "10", "start_time": "10:00", "finish_time": "11:00"}], "Total number of people should be integer. Got \"\"."], - [[{"total_people": 9.8, "start_time": "10:00", "finish_time": "11:00"}], "Total number of people should be integer. Got \"\"."], - [[{"total_people": [10], "start_time": "10:00", "finish_time": "11:00"}], "Total number of people should be integer. Got \"\"."], - [[{"total_people": -1, "start_time": "10:00", "finish_time": "11:00"}], "Total number of people should be non-negative. Got \"-1\"."], + ["occupancy", "error"], + [ + [ + {"tal_people": 10, "infected": 5, "presence": [{"start_time": "10:00", "finish_time": "11:00"}],}, + 'Missing "total_people" key in occupancy group "group_A". Got keys: tal_people, infected, presence.' + ], + [ + {"total_people": 10, "infeted": 5, "presence": [{"start_time": "10:00", "finish_time": "11:00"}],}, + 'Missing "infected" key in occupancy group "group_A". Got keys: total_people, infeted, presence.' + ], + [ + {"total_people": 10, "infected": 5, "pesence": [{"start_time": "10:00", "finish_time": "11:00"}],}, + 'Missing "presence" key in occupancy group "group_A". Got keys: total_people, infected, pesence.' + ], ] ) -def test_dynamic_occupancy_total_people(dynamic_occupancy_input, error, baseline_form: virus_validator.VirusFormData): - baseline_form.occupancy_format = "dynamic" - baseline_form.dynamic_infected_occupancy = dynamic_occupancy_input - baseline_form.dynamic_exposed_occupancy = dynamic_occupancy_input +def test_occupancy_general_params_TypeError(occupancy, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy = {"group_A": occupancy} + with pytest.raises(TypeError, match=re.escape(error)): + baseline_form.validate() + + +@pytest.mark.parametrize( + ["occupancy_presence", "error"], + [ + [{"start_time": "10:00", "finish_time": "11:00"}, 'The "presence" parameter in occupancy group "group_A" should be a valid list. Got .'], + [[], 'The "presence" parameter in occupancy group "group_A" should be a valid, non-empty list. Got [].'], + [[["start_time", "10:00", "finish_time", "11:00"]], 'Each presence interval should be a valid dictionary. Got in occupancy group "group_A".'], + [[{"art_time": "10:00", "finish_time": "11:00"}], 'Missing "start_time" key in "presence" parameter of occupancy group "group_A". Got keys: art_time, finish_time.'], + [[{"start_time": "10:00", "ish_time": "11:00"}], 'Missing "finish_time" key in "presence" parameter of occupancy group "group_A". Got keys: start_time, ish_time.'], + ] +) +def test_occupancy_presence_TypeError(occupancy_presence, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy = { + "group_A": { + "total_people": 10, + "infected": 5, + "presence": occupancy_presence, + } + } + with pytest.raises(TypeError, match=re.escape(error)): + baseline_form.validate() + + +@pytest.mark.parametrize( + ["occupancy_presence", "error"], + [ + [[{"start_time": "10", "finish_time": "11:00"}], 'Invalid time format found in "presence" parameter of occupancy group "group_A". Expected HH:MM, got 10.'], + [[{"start_time": "10:00", "finish_time": "11"}], 'Invalid time format found in "presence" parameter of occupancy group "group_A". Expected HH:MM, got 11.'], + ] +) +def test_occupancy_presence_ValueError(occupancy_presence, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy = { + "group_A": { + "total_people": 10, + "infected": 5, + "presence": occupancy_presence + } + } with pytest.raises(ValueError, match=re.escape(error)): baseline_form.validate() + + +@pytest.mark.parametrize( + ["total_people", "error"], + [ + ["10", 'The "total_people" input in occupancy group "group_A" should be a non-negative integer. Got 10.'], + [9.8, 'The "total_people" input in occupancy group "group_A" should be a non-negative integer. Got 9.8.'], + [[10], 'The "total_people" input in occupancy group "group_A" should be a non-negative integer. Got [10].'], + [-1, 'The "total_people" input in occupancy group "group_A" should be a non-negative integer. Got -1.'], + ] +) +def test_occupancy_total_people_ValueError(total_people, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy = { + "group_A": { + "total_people": total_people, + "infected": 10, + "presence": [{"start_time": "08:00", "finish_time": "18:00"},], + }, + } + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + +@pytest.mark.parametrize( + ["infected", "error"], + [ + ["10", 'The infected input in occupancy group "group_A" should be a non-negative integer. Got 10.'], + [9.8, 'The infected input in occupancy group "group_A" should be a non-negative integer. Got 9.8.'], + [[10], 'The infected input in occupancy group "group_A" should be a non-negative integer. Got [10].'], + [-1, 'The infected input in occupancy group "group_A" should be a non-negative integer. Got -1.'], + [30, 'The number of infected people (30) cannot be greater than the total people (20).'] + ] +) +def test_occupancy_infected_ValueError(infected, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy = { + "group_A": { + "total_people": 20, + "infected": infected, + "presence": [{"start_time": "08:00", "finish_time": "18:00"},], + }, + } + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + +def test_occupancy_presence_overlap(baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy = { + "group_A": { + "total_people": 10, + "infected": 5, + "presence": [ + {"start_time": "08:00", "finish_time": "17:00"}, + {"start_time": "13:00", "finish_time": "14:00"}, + ], + }, + } + error = ( + 'Overlap detected: The entry ' + '{\'start_time\': \'13:00\', \'finish_time\': \'14:00\'}' + ' overlaps with an already existing entry ' + '({\'start_time\': \'08:00\', \'finish_time\': \'17:00\'}).' + ) + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + +@pytest.mark.parametrize( + ["short_range_input", "error"], + [ + [[["expiration", "Shouting", "start_time", "09:00", "duration", 30]], 'Each short-range interaction should be a dictionary. Got in occupancy group "group_A".'], + [[{"expiratio": "Shouting", "start_time": "09:00", "duration": 30}], 'Missing "expiration" key in short-range interaction for occupancy group "group_A". Got keys: expiratio, start_time, duration.'], + [[{"expiration": "Shouting", "start_tim": "09:00", "duration": 30}], 'Missing "start_time" key in short-range interaction for occupancy group "group_A". Got keys: expiration, start_tim, duration.'], + [[{"expiration": "Shouting", "start_time": "09:00", "duratio": 30}], 'Missing "duration" key in short-range interaction for occupancy group "group_A". Got keys: expiration, start_time, duratio.'], + ] +) +def test_short_range_TypeError(short_range_input, error, baseline_form: virus_validator.VirusFormData): + baseline_form.short_range_option = "short_range_yes" + baseline_form.short_range_interactions = {"group_A": short_range_input} + with pytest.raises(TypeError, match=re.escape(error)): + baseline_form.validate() + + +def test_short_range_exposure_group(baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy = { + "group_A": { + "total_people": 20, + "infected": 10, + "presence": [ + {"start_time": "10:00", "finish_time": "12:00"}, + {"start_time": "13:00", "finish_time": "17:00"}, + ], + }, + "group_B": { + "total_people": 20, + 'infected': 10, + "presence": [ + {"start_time": "10:00", "finish_time": "11:00"}, + ], + }, + } + + # Check for existence of the dictionary key + baseline_form.short_range_option = 'short_range_yes' + baseline_form.short_range_interactions = { + "group_C": [{"expiration": "Shouting", "start_time": "10:30", "duration": 30}], + } + error = 'Occupancy group "group_C" referenced in short-range interactions was not found in the occupancy input.' + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + # Check if interaction time is within simulation time + baseline_form.short_range_interactions = { + "group_A": [{"expiration": "Shouting", "start_time": "18:00", "duration": 30}], + } + error = ( + 'Short-range interaction {\'expiration\': \'Shouting\', \'start_time\': \'18:00\', \'duration\': 30}' + ' does not fall within any presence interval in occupancy group "group_A".' + ) + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + +@pytest.mark.parametrize( + ["short_range_input", "error"], + [ + [[{"expiration": "Shouting", "start_time": "9", "duration": 30}], 'Invalid time format for start_time in short-range interaction for occupancy group "group_A". Expected HH:MM, got 9.'], + [[{"expiration": "Whisper", "start_time": "09:00", "duration": 30}], 'Invalid expiration value in short-range interaction for occupancy group "group_A". Got "Whisper".'], + [[{"expiration": "Shouting", "start_time": "09:00", "duration": -30}], 'The duration value in short-range interaction for occupancy group "group_A" should be a non-negative integer. Got -30.'], + ] +) +def test_short_range_value_error(short_range_input, error, baseline_form: virus_validator.VirusFormData): + baseline_form.short_range_option = "short_range_yes" + baseline_form.short_range_interactions = {"group_A": short_range_input} + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + +def test_short_range_with_occupancy_format(baseline_form: virus_validator.VirusFormData): + baseline_form.short_range_option = "short_range_yes" + baseline_form.short_range_interactions = {"group_A": [{"expiration": "Shouting", "start_time": "07:00", "duration": 30}]} + + # Checks if interaction is defined during simulation time + error = ( + 'Short-range interactions must occur during simulation time. Got' + ' {\'expiration\': \'Shouting\', \'start_time\': \'07:00\', \'duration\': 30}' + ' in occupancy group "group_A".' + ) + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + # Checks overlap of short-range interactions + baseline_form.short_range_interactions = { + "group_A": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}, + {"expiration": "Shouting", "start_time": "10:10", "duration": 15}], + } + error = ( + 'Overlap detected: The entry ' + '{\'expiration\': \'Shouting\', \'start_time\': \'10:10\', \'duration\': 15}' + ' overlaps with an already existing entry ' + '({\'expiration\': \'Shouting\', \'start_time\': \'10:00\', \'duration\': 30}).' + ) + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + # Checks if short_range_option relates with the short_range-interactions input + baseline_form.short_range_option = "short_range_yes" + baseline_form.short_range_interactions = {} + error = ( + 'When short_range_option input is set to "short_range_yes", the short_range_interactions ' + 'input should not be empty. Got {}.' + ) + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + # Checks if more than one group is defined (legacy) + baseline_form.short_range_interactions = { + "group_A": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}], + "group_B": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}] + } + error = ( + 'Incompatible number of occupancy groups in the short_range_interactions input. ' + 'Got 2 groups when the maximum is 1.' + ) + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + # Checks if more than one group is defined + baseline_form.occupancy = { + "group_A": {"total_people": 20, "infected": 10, "presence": [ + {"start_time": "10:00", "finish_time": "12:00"}, + {"start_time": "13:00", "finish_time": "17:00"}, + ], + } + } + baseline_form.short_range_interactions = { + "group_A": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}], + "group_B": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}] + } + error = ( + 'Incompatible number of occupancy groups in the short_range_interactions input. ' + 'Got 2 groups when the maximum is 1 (from the occupancy input).' + ) + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + +def test_population_generation_from_occupancy(baseline_form: virus_validator.VirusFormData): + # Checks the correct translation of the occupancy data into the right exposure and infected models + baseline_form.occupancy = { + "group_A": { + "total_people": 5, + "infected": 2, + "presence": [ + {"start_time": "09:00", "finish_time": "12:00"}, + {"start_time": "13:00", "finish_time": "17:00"}, + ], + }, + "group_B": { + "total_people": 3, + "infected": 1, + "presence": [ + {"start_time": "09:00", "finish_time": "10:00"}, + {"start_time": "11:00", "finish_time": "12:00"}, + ], + }, + } + + exposure_model_group: models.ExposureModelGroup = baseline_form.build_model() + + # Assert that from this occupancy input, two ExposureModels are created + assert len(exposure_model_group.exposure_models) == 2 + assert all(isinstance(model, models.ExposureModel) for model in exposure_model_group.exposure_models) + + first_group = exposure_model_group.exposure_models[0] + second_group = exposure_model_group.exposure_models[1] + + # Assert the exposed population generation (number and presence) from the occupancy input + # Type checks + assert isinstance(first_group.exposed, models.Population) + assert isinstance(first_group.exposed.number, int) + assert isinstance(first_group.exposed.presence, models.Interval) + + assert isinstance(second_group.exposed, models.Population) + assert isinstance(second_group.exposed.number, int) + assert isinstance(second_group.exposed.presence, models.Interval) + + # Value checks + assert first_group.exposed.number == 3 + assert tuple(first_group.exposed.presence.transition_times()) == (9, 12, 13, 17) + assert first_group.exposed.presence.boundaries() == ((9, 12), (13, 17)) + + assert second_group.exposed.number == 2 + assert tuple(second_group.exposed.presence.transition_times()) == (9, 10, 11, 12) + assert second_group.exposed.presence.boundaries() == ((9, 10), (11, 12)) + + # Assert that the infected population is the same for all the models + # Type checks + assert isinstance(first_group.concentration_model.infected, models.InfectedPopulation) + assert isinstance(second_group.concentration_model.infected, models.InfectedPopulation) + # Value checks + assert first_group.concentration_model.infected.number == second_group.concentration_model.infected.number + assert first_group.concentration_model.infected.presence == second_group.concentration_model.infected.presence + + # Assert the infected population generation (number and presence) from the occupancy input + for infected_obj in [first_group.concentration_model.infected, second_group.concentration_model.infected]: + # Type checks + assert isinstance(infected_obj.number, models.IntPiecewiseConstant) + assert infected_obj.presence is None + # Value checks + assert infected_obj.number.interval().boundaries() == ((9, 10), (10, 11), (11, 12), (13, 17)) + assert infected_obj.number.transition_times == (9, 10, 11, 12, 13, 17) + assert infected_obj.number.values == (3, 2, 3, 0, 2) diff --git a/caimira/tests/models/test_dynamic_population.py b/caimira/tests/models/test_dynamic_population.py index a6377094..75400d2a 100644 --- a/caimira/tests/models/test_dynamic_population.py +++ b/caimira/tests/models/test_dynamic_population.py @@ -41,7 +41,7 @@ def full_exposure_model(data_registry): @pytest.fixture -def baseline_infected_population_number(data_registry): +def baseline_infected_population(data_registry): return models.InfectedPopulation( data_registry=data_registry, number=models.IntPiecewiseConstant( @@ -56,34 +56,15 @@ def baseline_infected_population_number(data_registry): @pytest.fixture -def baseline_exposed_population_number(): - return models.Population( - number=models.IntPiecewiseConstant( - (8, 12, 13, 17), (10, 0, 10)), - presence=None, - mask=models.Mask.types['No mask'], - activity=models.Activity.types['Seated'], - host_immunity=0., - ) - - -@pytest.fixture -def dynamic_infected_single_exposure_model(full_exposure_model, baseline_infected_population_number): +def dynamic_infected_single_exposure_model(full_exposure_model, baseline_infected_population): return dc_utils.nested_replace(full_exposure_model, - {'concentration_model.infected': baseline_infected_population_number, }) + {'concentration_model.infected': baseline_infected_population, }) @pytest.fixture -def dynamic_exposed_single_exposure_model(full_exposure_model, baseline_exposed_population_number): - return dc_utils.nested_replace(full_exposure_model, - {'exposed': baseline_exposed_population_number, }) - - -@pytest.fixture -def dynamic_population_exposure_model(full_exposure_model, baseline_infected_population_number ,baseline_exposed_population_number): +def dynamic_population_exposure_model(full_exposure_model, baseline_infected_population): return dc_utils.nested_replace(full_exposure_model, { - 'concentration_model.infected': baseline_infected_population_number, - 'exposed': baseline_exposed_population_number, + 'concentration_model.infected': baseline_infected_population, }) @@ -92,10 +73,10 @@ def dynamic_population_exposure_model(full_exposure_model, baseline_infected_pop [4., 8., 10., 12., 13., 14., 16., 20., 24.], ) def test_population_number(full_exposure_model: models.ExposureModel, - baseline_infected_population_number: models.InfectedPopulation, time: float): + baseline_infected_population: models.InfectedPopulation, time: float): int_population_number: models.InfectedPopulation = full_exposure_model.concentration_model.infected - piecewise_population_number: models.InfectedPopulation = baseline_infected_population_number + piecewise_population_number: models.InfectedPopulation = baseline_infected_population with pytest.raises( TypeError, @@ -206,58 +187,52 @@ def test_dynamic_dose(data_registry, full_exposure_model: models.ExposureModel, def test_infection_probability( full_exposure_model: models.ExposureModel, dynamic_infected_single_exposure_model: models.ExposureModel, - dynamic_exposed_single_exposure_model: models.ExposureModel, dynamic_population_exposure_model: models.ExposureModel): base_infection_probability = full_exposure_model.infection_probability() npt.assert_almost_equal(base_infection_probability, dynamic_infected_single_exposure_model.infection_probability()) - npt.assert_almost_equal(base_infection_probability, dynamic_exposed_single_exposure_model.infection_probability()) npt.assert_almost_equal(base_infection_probability, dynamic_population_exposure_model.infection_probability()) def test_dynamic_total_probability_rule( dynamic_infected_single_exposure_model: models.ExposureModel, - dynamic_exposed_single_exposure_model: models.ExposureModel, dynamic_population_exposure_model: models.ExposureModel): with pytest.raises(NotImplementedError, match=re.escape("Cannot compute total probability " "(including incidence rate) with dynamic occupancy")): dynamic_infected_single_exposure_model.total_probability_rule() - with pytest.raises(NotImplementedError, match=re.escape("Cannot compute total probability " - "(including incidence rate) with dynamic occupancy")): - dynamic_exposed_single_exposure_model.total_probability_rule() with pytest.raises(NotImplementedError, match=re.escape("Cannot compute total probability " "(including incidence rate) with dynamic occupancy")): dynamic_population_exposure_model.total_probability_rule() -def test_dynamic_expected_new_cases( - dynamic_infected_single_exposure_model: models.ExposureModel, - dynamic_exposed_single_exposure_model: models.ExposureModel, - dynamic_population_exposure_model: models.ExposureModel): - - with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases " - "with dynamic occupancy")): - dynamic_infected_single_exposure_model.expected_new_cases() - with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases " - "with dynamic occupancy")): - dynamic_exposed_single_exposure_model.expected_new_cases() - with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases " - "with dynamic occupancy")): - dynamic_population_exposure_model.expected_new_cases() +def test_exposure_model_group_structure(data_registry, full_exposure_model: models.ExposureModel): + """ + ExposureModels must have the same ConcentrationModel. + In this test the number of infected occupants is different. + """ + another_full_exposure_model = dc_utils.nested_replace(full_exposure_model, + {'concentration_model.infected.number': 2, }) + with pytest.raises(ValueError, match=re.escape("All ExposureModels must have the same infected number and presence in the ConcentrationModel.")): + models.ExposureModelGroup(data_registry, exposure_models=(full_exposure_model, another_full_exposure_model, )) -def test_dynamic_reproduction_number( - dynamic_infected_single_exposure_model: models.ExposureModel, - dynamic_exposed_single_exposure_model: models.ExposureModel, - dynamic_population_exposure_model: models.ExposureModel): +def test_exposure_model_group_expected_new_cases(data_registry, full_exposure_model: models.ExposureModel): + """ + ExposureModelGroup expected number of new cases must + be the sum of expected new cases of each ExposureModel. + + In this case, the number of exposed people is changing + between the two ExposureModel groups. + """ + another_full_exposure_model = dc_utils.nested_replace( + full_exposure_model, {'exposed.number': 5, } + ) + exposure_model_group = models.ExposureModelGroup( + data_registry=data_registry, + exposure_models=(full_exposure_model, another_full_exposure_model, ), + ) - with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number " - "with dynamic occupancy")): - dynamic_infected_single_exposure_model.reproduction_number() - with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number " - "with dynamic occupancy")): - dynamic_exposed_single_exposure_model.reproduction_number() - with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number " - "with dynamic occupancy")): - dynamic_population_exposure_model.reproduction_number() + assert exposure_model_group.expected_new_cases() == ( + full_exposure_model.expected_new_cases() + another_full_exposure_model.expected_new_cases() + ) diff --git a/cern_caimira/pyproject.toml b/cern_caimira/pyproject.toml index 84c1c0a9..029c6a80 100644 --- a/cern_caimira/pyproject.toml +++ b/cern_caimira/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "cern-caimira" -version = "4.17.8" +version = "4.18.0" description = "CAiMIRA - CERN Airborne Model for Indoor Risk Assessment" license = { text = "Apache-2.0" } authors = [ @@ -27,7 +27,7 @@ dependencies = [ "mistune", "numpy", "pandas", - "pyinstrument", + "pyinstrument >= 5.0.3", "retry", "ruptures", "scipy", @@ -40,8 +40,8 @@ dependencies = [ dev = [] test = [ "pytest", - "pytest-mypy >= 0.10.3", - "mypy >= 1.0.0", + "pytest-mypy >= 1.0.1", + "mypy >= 1.17.0", "pytest-tornasync", "types-dataclasses", "types-requests" diff --git a/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py b/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py index e6ed1200..6b7e5ce2 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py +++ b/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py @@ -122,9 +122,9 @@ class VirusReportGenerator: model: models.ExposureModel = report_data['model'] data_registry_version: typing.Optional[str] = f"v{model.data_registry.version}" if model.data_registry.version else None - # Alternative scenarios data - alternative_scenarios: typing.Dict[str,typing.Any] = alternative_scenarios_data(form, report_data, executor_factory) - context.update(alternative_scenarios) + # Alternative scenarios data (only generated in the legacy version - when occupancy input is empty) + if not form.occupancy: + context.update(alternative_scenarios_data(form, report_data, executor_factory)) # Alternative viral load data if form.conditional_probability_viral_loads: diff --git a/cern_caimira/src/cern_caimira/apps/calculator/static/css/report.css b/cern_caimira/src/cern_caimira/apps/calculator/static/css/report.css index dd111aa5..f42d7a3a 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/static/css/report.css +++ b/cern_caimira/src/cern_caimira/apps/calculator/static/css/report.css @@ -126,13 +126,13 @@ p.notes { padding: 15px; page-break-inside: avoid; } - #button_full_exposure, #button_hide_high_concentration { + #button_full_exposure-group_static, #button_hide_high_concentration-group_static { display: none!important; } - #long_range_cumulative_checkbox, #lr_cumulative_checkbox_label { + #long_range_cumulative_checkbox-group_static, #lr_cumulative_checkbox_label-group_static { display: none!important; } - #button_alternative_full_exposure, #button_alternative_hide_high_concentration { + #button_alternative_full_exposure-group_static, #button_alternative_hide_high_concentration-group_static { display: none!important; } #export-csv { diff --git a/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js b/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js index f28bd5cb..30bf4af1 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js +++ b/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js @@ -17,14 +17,12 @@ const CO2_data_form = [ "infected_lunch_option", "infected_lunch_start", "infected_people", - "dynamic_infected_occupancy", "infected_start", + "occupancy", "room_capacity", "room_volume", "specific_breaks", - "total_people", - "dynamic_exposed_occupancy", - "occupancy_format", + "total_people" ]; // Method to upload a valid data file (accepted formats: .xls and .xlsx) @@ -301,6 +299,7 @@ function displayFittingData(json_response) { // Not needed for the form submission delete json_response["CO2_plot_img"]; delete json_response["predictive_CO2"]; + delete json_response["CO2_plot_data"]; // Convert nulls to empty strings in the JSON response if (json_response["room_capacity"] === null) json_response["room_capacity"] = ''; if (json_response["ventilation_lsp_values"] === null) json_response["ventilation_lsp_values"] = ''; diff --git a/cern_caimira/src/cern_caimira/apps/calculator/static/js/form.js b/cern_caimira/src/cern_caimira/apps/calculator/static/js/form.js index 631be3cc..27ab227d 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/static/js/form.js +++ b/cern_caimira/src/cern_caimira/apps/calculator/static/js/form.js @@ -631,22 +631,38 @@ function validate_form(form) { } // Generate the short-range interactions list - var short_range_interactions = []; - $(".form_field_outer_row").each(function (index, element){ - let obj = {}; - const $element = $(element); - obj.expiration = $element.find("[name='short_range_expiration']").val(); - obj.start_time = $element.find("[name='short_range_start_time']").val(); - obj.duration = $element.find("[name='short_range_duration']").val(); - short_range_interactions.push(JSON.stringify(obj)); + let short_range_interactions = {}; + $(".form_field_outer_row").each(function (index, element) { + const $element = $(element); + + let obj = {}; + obj.expiration = $element.find("[name='short_range_expiration']").val(); + obj.start_time = $element.find("[name='short_range_start_time']").val(); + obj.duration = parseFloat($element.find("[name='short_range_duration']").val()); + + const exposure_group = $element.find("[name='short_range_exposure_group']").val(); + + // If the exposure_group key already exists, push the new obj into the array + if (short_range_interactions[exposure_group]) { + short_range_interactions[exposure_group].push(obj); + } else { + // Otherwise, create a new array with the current obj + short_range_interactions[exposure_group] = [obj]; + } }); - // Sort list by time - short_range_interactions.sort(function (a, b) { - return JSON.parse(a).start_time.localeCompare(JSON.parse(b).start_time); - }); - $("input[type=text][name=short_range_interactions]").val('[' + short_range_interactions + ']'); - if (short_range_interactions.length == 0) { + // Sort each array within the short_range_interactions object by start_time + for (const key in short_range_interactions) { + short_range_interactions[key].sort(function (a, b) { + return a.start_time.localeCompare(b.start_time); + }); + } + + // Convert the entire object to a JSON string and assign it to the input field + $("input[type=text][name=short_range_interactions]").val(JSON.stringify(short_range_interactions)); + + // Check if there are no entries and update the radio button accordingly + if (Object.keys(short_range_interactions).length === 0) { $("input[type=radio][id=short_range_no]").prop("checked", true); on_short_range_option_change(); } @@ -907,18 +923,42 @@ $(document).ready(function () { } // Read short-range from URL - else if (name == 'short_range_interactions') { - let index = 1; - for (const interaction of JSON.parse(value)) { - $("#dialog_sr").append(inject_sr_interaction(index, value = interaction, is_validated="row_validated")) - $('#sr_expiration_no_' + String(index)).val(interaction.expiration).change(); - document.getElementById('sr_expiration_no_' + String(index)).disabled = true; - document.getElementById('sr_start_no_' + String(index)).disabled = true; - document.getElementById('sr_duration_no_' + String(index)).disabled = true; - document.getElementById('edit_row_no_' + String(index)).style.cssText = 'display:inline !important'; - document.getElementById('validate_row_no_' + String(index)).style.cssText = 'display: none !important'; - index++; + else if (name === 'short_range_interactions') { + // Parse the JSON value from the URL + let interactions = JSON.parse(value); + let index = 1; // Initialize interaction index + + // Iterate over each group in the interactions + for (const group in interactions) { + if (interactions.hasOwnProperty(group)) { + // Iterate over each interaction within the group + for (const interaction of interactions[group]) { + // Append the interaction row to the dialog + $("#dialog_sr").append(inject_sr_interaction(index, interaction, "row_validated")); + + // Set the values for each input field based on the interaction + $('#sr_expiration_no_' + index).val(interaction.expiration).change(); + document.getElementById('sr_start_no_' + index).value = interaction.start_time; // Set start time + document.getElementById('sr_duration_no_' + index).value = interaction.duration; // Set duration + document.getElementById('sr_group_no_' + index).value = group; // Set exposure group + + // Disable the input fields for editing + document.getElementById('sr_expiration_no_' + index).disabled = true; + document.getElementById('sr_start_no_' + index).disabled = true; + document.getElementById('sr_duration_no_' + index).disabled = true; + document.getElementById('sr_group_no_' + index).disabled = true; + + // Update visibility of editing and validation rows + document.getElementById('edit_row_no_' + index).style.display = 'inline'; + document.getElementById('validate_row_no_' + index).style.display = 'none'; + + // Increment the index for the next interaction + index++; + } + } } + + // Update the total count of interactions displayed $("#sr_interactions").text(index - 1); } @@ -1196,6 +1236,11 @@ $(document).ready(function () {

+
+
+

+
+
@@ -1213,11 +1258,11 @@ $(document).ready(function () { // When short_range_yes option is selected, we want to inject rows for each expiractory activity, start_time and duration. $("body").on("click", ".add_node_btn_frm_field", function(e) { let last_row = $(".form_field_outer").find(".form_field_outer_row"); - if (last_row.length == 0) $("#dialog_sr").append(inject_sr_interaction(1, value = { activity: "", start_time: "", duration: "15" })); + if (last_row.length == 0) $("#dialog_sr").append(inject_sr_interaction(1, value = { activity: "", start_time: "", duration: "15", exposure_group: "group_1" })); else { last_index = last_row.last().find(".short_range_option").prop("id").split("_").slice(-1)[0]; index = parseInt(last_index) + 1; - $("#dialog_sr").append(inject_sr_interaction(index, value = { activity: "", start_time: "", duration: "15" })); + $("#dialog_sr").append(inject_sr_interaction(index, value = { activity: "", start_time: "", duration: "15", exposure_group: "group_1"})); } }); diff --git a/cern_caimira/src/cern_caimira/apps/calculator/static/js/report.js b/cern_caimira/src/cern_caimira/apps/calculator/static/js/report.js index 786794d0..a91bd189 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/static/js/report.js +++ b/cern_caimira/src/cern_caimira/apps/calculator/static/js/report.js @@ -4,13 +4,18 @@ function on_report_load(conditional_probability_viral_loads) { } /* Generate the concentration plot using d3 library. */ -function draw_plot(svg_id) { +function draw_plot(svg_id, group_id, times, concentrations_zoomed, + concentrations, cumulative_doses, long_range_cumulative_doses, + exposed_presence_intervals, short_range_interactions) { // Used for controlling the short-range interactions - let button_full_exposure = document.getElementById("button_full_exposure"); - let button_hide_high_concentration = document.getElementById("button_hide_high_concentration"); - let long_range_checkbox = document.getElementById('long_range_cumulative_checkbox') - let show_sr_legend = short_range_expirations.length > 0; + let button_full_exposure = document.getElementById(`button_full_exposure-group_${group_id}`); + let button_hide_high_concentration = document.getElementById(`button_hide_high_concentration-group_${group_id}`); + let long_range_checkbox = document.getElementById(`long_range_cumulative_checkbox-group_${group_id}`); + let show_sr_legend = short_range_interactions.length > 0; + + let short_range_intervals = short_range_interactions.map((interaction) => interaction["presence_interval"]); + let short_range_expirations = short_range_interactions.map((interaction) => interaction["expiration"]); var data_for_graphs = { 'concentrations': [], @@ -192,7 +197,7 @@ function draw_plot(svg_id) { // Area representing the short-range interaction(s). var shortRangeArea = {}; var drawShortRangeArea = {}; - short_range_intervals.forEach((b, index) => { + short_range_intervals?.forEach((b, index) => { shortRangeArea[index] = d3.area(); drawShortRangeArea[index] = draw_area.append('svg:path'); @@ -285,7 +290,7 @@ function draw_plot(svg_id) { }); // Short-Range Area. - short_range_intervals.forEach((b, index) => { + short_range_intervals.flat().forEach((b, index) => { shortRangeArea[index].x(d => xTimeRange(d.time)) .y0(graph_height - 50) .y1(d => yRange(d.concentration)); @@ -521,12 +526,12 @@ function draw_plot(svg_id) { } // Draw for the first time to initialize. - redraw(); + redraw(svg_id); update_concentration_plot(concentrations, cumulative_doses); // Redraw based on the new size whenever the browser window is resized. window.addEventListener("resize", e => { - redraw(); + redraw(svg_id); if (button_full_exposure && button_full_exposure.disabled) update_concentration_plot(concentrations, cumulative_doses); else update_concentration_plot(concentrations_zoomed, long_range_cumulative_doses) }); @@ -536,12 +541,13 @@ function draw_plot(svg_id) { // 'list_of_scenarios' is a dictionary with all the scenarios // 'times' is a list of times for all the scenarios function draw_generic_concentration_plot( - plot_svg_id, + svg_id, + times, y_axis_label, h_lines, ) { - if (plot_svg_id === 'CO2_concentration_graph') { + if (svg_id === 'CO2_concentration_graph') { list_of_scenarios = {'CO₂ concentration': {'concentrations': CO2_concentrations}}; min_y_axis_domain = 400; } @@ -575,7 +581,7 @@ function draw_generic_concentration_plot( var first_scenario = Object.values(data_for_scenarios)[0] // Add main SVG element - var plot_div = document.getElementById(plot_svg_id); + var plot_div = document.getElementById(svg_id); var vis = d3.select(plot_div).append('svg'); var xRange = d3.scaleTime().domain([first_scenario[0].hour, first_scenario[first_scenario.length - 1].hour]); @@ -706,7 +712,7 @@ function draw_generic_concentration_plot( } function update_concentration_plot(concentration_data) { - list_of_scenarios = (plot_svg_id === 'CO2_concentration_graph') ? {'CO₂ concentration': {'concentrations': CO2_concentrations}} : alternative_scenarios + list_of_scenarios = (svg_id === 'CO2_concentration_graph') ? {'CO₂ concentration': {'concentrations': CO2_concentrations}} : alternative_scenarios var highest_concentration = 0. for (scenario in list_of_scenarios) { @@ -739,11 +745,11 @@ function draw_generic_concentration_plot( var graph_width; var graph_height; - function redraw() { + function redraw(svg_id) { // Define width and height according to the screen size. Always use an already defined - var window_width = document.getElementById('concentration_plot').clientWidth; + var window_width = document.getElementById(svg_id).clientWidth; var div_width = window_width; - var div_height = document.getElementById('concentration_plot').clientHeight; + var div_height = document.getElementById(svg_id).clientHeight; graph_width = div_width; graph_height = div_height; var margins = { top: 30, right: 20, bottom: 50, left: 60 }; @@ -882,12 +888,12 @@ function draw_generic_concentration_plot( } // Draw for the first time to initialize. - redraw(); + redraw(svg_id); update_concentration_plot('concentrations'); // Redraw based on the new size whenever the browser window is resized. window.addEventListener("resize", e => { - redraw(); + redraw(svg_id); update_concentration_plot('concentrations'); }); } diff --git a/cern_caimira/src/cern_caimira/apps/expert_apps/state.py b/cern_caimira/src/cern_caimira/apps/expert_apps/state.py index 98381975..dcba8366 100644 --- a/cern_caimira/src/cern_caimira/apps/expert_apps/state.py +++ b/cern_caimira/src/cern_caimira/apps/expert_apps/state.py @@ -237,7 +237,7 @@ class DataclassInstanceState(DataclassState[Datamodel_T]): # TODO: It is possible to cut observer connections by clearing like this. self._data.clear() - for field in dataclasses.fields(instance_dataclass): + for field in dataclasses.fields(instance_dataclass): # type: ignore if dataclasses.is_dataclass(field.type): self._data[field.name] = self._state_builder.visit(field) self._data[field.name].dcs_observe(self._fire_observers) diff --git a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 index 600f92a4..fad5a331 100644 --- a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 +++ b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 @@ -454,9 +454,7 @@ ?

- {# "static" vs. "dynamic" #} - - +
diff --git a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 index e81cc53f..93ad650d 100644 --- a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 +++ b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 @@ -54,362 +54,361 @@
- {% set long_range_prob_inf = prob_inf %} - {% set long_range_expected_cases = expected_new_cases %} - - {# Update values if short range option is "short_range_yes" #} - {% if form.short_range_option == "short_range_yes" %} - {% set scenario = alternative_scenarios.stats.values() | first %} - {# Probability of infection values #} - {% set long_range_prob_inf = scenario.probability_of_infection %} - {# Expected new case values #} - {% set long_range_expected_cases = scenario.expected_new_cases %} - - {% if form.exposure_option == 'p_probabilistic_exposure' %} - {% set long_range_prob_probabilistic_exposure = scenario.prob_probabilistic_exposure %} + {% for group_id, group_results in groups.items() %} + {% if group_results.get('long_range_prob_inf') %}{{group_results.long_range_prob_inf}}{% endif %} + {% set long_range_prob_inf = group_results.prob_inf %} + {% set long_range_expected_cases = group_results.expected_new_cases %} + {# Update values if short range option is "short_range_yes" #} + {% if group_results.get('long_range_prob') %} + {# Probability of infection values #} + {% set long_range_prob_inf = group_results.long_range_prob %} + {# Expected new case values #} + {% set long_range_expected_cases = group_results.long_range_expected_new_cases %} + + {% if form.exposure_option == 'p_probabilistic_exposure' %} + {% set long_range_prob_probabilistic_exposure = scenario.prob_probabilistic_exposure %} + {% endif %} {% endif %} - {% endif %} - - {% block report_results %} -
-
Results - -
-
-
-

-

-
-
-
- Probability of infection (%)
- {% if form.short_range_option == "short_range_yes" %} - Without short-range interactions - {% endif %} -
-
- -
- {% block long_range_warning_animation %} -
- {{long_range_prob_inf | non_zero_percentage}} - - - - -
- {% endblock long_range_warning_animation %} -
- {% if form.occupancy_format == "static" %}
Expected new cases: {{ long_range_expected_cases | float_format }}
{% endif %} -
-
- {% if form.short_range_option == "short_range_yes" %} -
-
- Probability of infection (%)
- With short-range interactions -
-
- -
- {% block warning_animation %} -
- {{prob_inf | non_zero_percentage}} - - - - -
- {% endblock warning_animation %} -
- {% if form.occupancy_format == "static" %} -
Expected new cases: {{ expected_new_cases | float_format }}
- {% endif %} -
- {% endif %} -
- {% block report_summary %} -
- - {% if form.short_range_option == "short_range_yes" %} -
- - {% endif %} - {% block probabilistic_exposure_probability %} - {% if form.exposure_option == "p_probabilistic_exposure" %} -
- - {% endif %} - {% endblock probabilistic_exposure_probability %} -
- {% endblock report_summary %} -
-
-
- {% block report_summary_footnote %} - {% endblock report_summary_footnote %} -
-

* The results are based on the parameters and assumptions published in the CARA publication: doi.org/10.1098/rsfs.2021.0076.


- {% if form.short_range_option == "short_range_yes" %} - {% if 'Speaking' in form.short_range_interactions|string or 'Shouting' in form.short_range_interactions|string %} - - - {% endif %} - - {% endif %} -
- - IRP - Infectious Respiratory Particles. -

-
-
-
-
-
Result uncertainties - -
-
-
-
-
- -
- {% if model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] == 'Ref: Viral load - covid overal viral load data' %} -
- - -
- {% endif %} - {% if form.conditional_probability_viral_loads %} -
- -
-

(i)   Predictive probability of infection for a given value of the viral load

-

(ii)  Histogram of the viral load data

-

(iii) Histogram of the conditional probability of infection (result of total predictive probability in the middle)

-
-
- {% endif %} -
-
-
-
- -
-
Predictive CO₂ Concentration Profile - -
-
-
-
-
- -
-
-
-
- - {% if form.short_range_option == "short_range_no" %} -
-
Alternative scenarios -
-
+
-
-
+

+

+
+
+
+ Probability of infection (%)
+ {% if group_results.get('long_range_prob') %} + Without short-range interactions + {% endif %} +
+
+ +
+ {% block long_range_warning_animation scoped %} +
+ {{ long_range_prob_inf | non_zero_percentage }} + + + + +
+ {% endblock long_range_warning_animation %} +
+
Expected new cases: {{ long_range_expected_cases | float_format }}
+
+
+ {% if group_results.get('long_range_prob') %} +
+
+ Probability of infection (%)
+ With short-range interactions +
+
+ +
+ {% block warning_animation scoped %} +
+ {{ group_results.prob_inf | non_zero_percentage }} + + + + +
+ {% endblock warning_animation %} +
+
Expected new cases: {{ group_results.expected_new_cases | float_format }}
+
+ {% endif %} +
+ {% block report_summary scoped %} +
+ + {% if group_results.get('long_range_prob') %} +
+ + {% endif %} + {% block probabilistic_exposure_probability %} + {% if form.exposure_option == "p_probabilistic_exposure" %} +
+ + {% endif %} + {% endblock probabilistic_exposure_probability %} +
+ {% endblock report_summary %} +
+
+
+ {% block report_summary_footnote %} + {% endblock report_summary_footnote %} +
+

* The results are based on the parameters and assumptions published in the CARA publication: doi.org/10.1098/rsfs.2021.0076.


+ {% if group_results.get('long_range_prob') %} + {% if form.short_range_option == 'short_range_yes' and ('Speaking' in form.short_range_interactions[group_id]|string or 'Shouting' in form.short_range_interactions[group_id]|string) %} + + + {% endif %} + + + {% endif %} +
-
- {% block report_scenarios_summary_table %} - - - - - - {% if form.occupancy_format == "static" %}{% endif %} - - - - {% for scenario_name, scenario_stats in alternative_scenarios.stats.items() %} - - - - {% if form.occupancy_format == "static" %}{% endif %} - - {% endfor %} - -
ScenarioP(I)Expected new cases
{{ scenario_name }} {{ scenario_stats.probability_of_infection | non_zero_percentage }}{{ scenario_stats.expected_new_cases | float_format }}
- {% endblock report_scenarios_summary_table %} -
-
-

Notes for alternative scenarios:
-

    -
  1. This graph shows the concentration of infectious quanta in the air. The filtration of Type I and FFP2 masks, if worn, applies not only to the emission rate but also to the individual exposure (i.e. inhalation). - For this reason, scenarios with different types of mask will show the same concentration on the graph but have different absorbed doses and infection probabilities.
  2. -
  3. If you have selected more sophisticated options, such as HEPA filtration or FFP2 masks, this will be indicated in the plot as the "base scenario", representing the inputs inserted in the form.
    - The other alternative scenarios shown for comparison will not include either HEPA filtration or FFP2 masks.
  4. -
-
+ IRP - Infectious Respiratory Particles.

- {% endif %} - - {% endblock report_results %} - - -