From a56ad5f77a523070eca85492b93895a0cf722f58 Mon Sep 17 00:00:00 2001 From: lrdossan Date: Fri, 21 Mar 2025 17:35:49 +0100 Subject: [PATCH] updates following reviewing process: - replaced 'dynamic_exposed_occupancy', 'dynamic_infected_occupancy' and 'dynamic_occupancy' by 'occupancy' that encompasses both - developed respective functionality, including CAiMIRA UI and REST API - simplified logic with entry overlaps and respective validation - simplified CO2 logic with dynamic groups - simplified logic with alternative scenarios - changed default identifier of unique group - handled naming conventions - updated existing tests and added new tests for the most recent structure - generate docscrings update --- .../api/controller/virus_report_controller.py | 2 +- .../src/caimira/calculator/models/models.py | 8 +- .../calculator/models/monte_carlo/models.py | 1 - .../calculator/report/virus_report_data.py | 53 +- .../validators/co2/co2_validator.py | 6 +- .../caimira/calculator/validators/defaults.py | 4 +- .../calculator/validators/form_validator.py | 689 +++++++++--------- .../validators/virus/virus_validator.py | 113 +-- .../apps/calculator/test_model_generator.py | 310 ++++---- .../tests/models/test_dynamic_population.py | 2 +- .../apps/calculator/report/virus_report.py | 4 +- .../apps/calculator/static/js/co2_form.js | 4 +- .../apps/calculator/static/js/form.js | 6 +- .../templates/base/calculator.form.html.j2 | 4 +- .../templates/base/calculator.report.html.j2 | 10 +- .../templates/cern/calculator.report.html.j2 | 10 +- cern_caimira/tests/conftest.py | 2 +- cern_caimira/tests/test_report_generator.py | 28 +- 18 files changed, 644 insertions(+), 612 deletions(-) diff --git a/caimira/src/caimira/api/controller/virus_report_controller.py b/caimira/src/caimira/api/controller/virus_report_controller.py index 19cbcb05..c6e95d96 100644 --- a/caimira/src/caimira/api/controller/virus_report_controller.py +++ b/caimira/src/caimira/api/controller/virus_report_controller.py @@ -29,7 +29,7 @@ def submit_virus_form(form_data: typing.Dict, report_generation_parallelism: typ form_obj: VirusFormData = generate_form_obj(form_data=form_data, data_registry=data_registry) report_data: typing.Dict = generate_report(form_obj=form_obj, report_generation_parallelism=report_generation_parallelism) - + # Handle model representation if report_data['model']: report_data['model'] = repr(report_data['model']) diff --git a/caimira/src/caimira/calculator/models/models.py b/caimira/src/caimira/calculator/models/models.py index 6f40aff0..add4e848 100644 --- a/caimira/src/caimira/calculator/models/models.py +++ b/caimira/src/caimira/calculator/models/models.py @@ -1643,7 +1643,7 @@ class ExposureModel: exposed_to_short_range: int = 0 #: Unique group identifier - identifier: str = 'static' + identifier: str = 'group_1' #: The number of times the exposure event is repeated (default 1). @property @@ -1930,8 +1930,10 @@ class ExposureModelGroup: """ first_concentration_model = self.exposure_models[0].concentration_model for model in self.exposure_models[1:]: - if model.concentration_model != first_concentration_model: - raise ValueError("All ExposureModels must have the same ConcentrationModel.") + # Check that the number of infected people and their presence is the same + if (model.concentration_model.infected.number != first_concentration_model.infected.number or + model.concentration_model.infected.presence != first_concentration_model.infected.presence): + raise ValueError("All ExposureModels must have the same infected number and presence in the ConcentrationModel.") @method_cache def _deposited_exposure_list(self) -> typing.List[_VectorisedFloat]: diff --git a/caimira/src/caimira/calculator/models/monte_carlo/models.py b/caimira/src/caimira/calculator/models/monte_carlo/models.py index 1c7c4a6e..2caddbaa 100644 --- a/caimira/src/caimira/calculator/models/monte_carlo/models.py +++ b/caimira/src/caimira/calculator/models/monte_carlo/models.py @@ -74,7 +74,6 @@ def _build_mc_model(model: dataclass_instance) -> typing.Type[MCModelBase[_Model elif new_field.type == typing.Tuple[models.SpecificInterval, ...]: SI = getattr(sys.modules[__name__], "SpecificInterval") field_type = typing.Tuple[typing.Union[models.SpecificInterval, SI], ...] - elif new_field.type == typing.Union[int, models.IntPiecewiseConstant]: IPC = getattr(sys.modules[__name__], "IntPiecewiseConstant") field_type = typing.Union[int, models.IntPiecewiseConstant, IPC] diff --git a/caimira/src/caimira/calculator/report/virus_report_data.py b/caimira/src/caimira/calculator/report/virus_report_data.py index ffa9d4b8..5647f657 100644 --- a/caimira/src/caimira/calculator/report/virus_report_data.py +++ b/caimira/src/caimira/calculator/report/virus_report_data.py @@ -246,13 +246,13 @@ def group_results(form: VirusFormData, model_group: models.ExposureModelGroup) - @profiler.profile -def calculate_report_data(form: VirusFormData, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]: +def calculate_report_data(form: VirusFormData, + executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]: """ - Simulation output data. + Generates the simulation output data. """ model_group: models.ExposureModelGroup = form.build_model() results_per_group: typing.Dict[str, typing.Any] = group_results(form, model_group) - times = interesting_times(model_group) # CO2 concentration @@ -458,8 +458,13 @@ def calculate_vl_scenarios_percentiles(model: mc.ExposureModel) -> typing.Dict[s } -def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, typing.Union[mc.ExposureModel, mc.ExposureModelGroup]]: - scenarios = {} +def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]: + """ + Generates the data structure containing all the alternative scenarios. + It is only compatible with single group occupancy models, therefore + it returns an ExposureModel object and not an ExposureModelGroup. + """ + scenarios: typing.Dict[str, models.ExposureModelGroup] = {} if (form.short_range_option == "short_range_no"): # Two special option cases - HEPA and/or FFP2 masks. FFP2_being_worn = bool(form.mask_wearing_option == @@ -512,31 +517,33 @@ def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, t scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model() else: # Adjust the number of exposed people with long-range exposure based on short-range interactions - if form.occupancy_format == 'static': + if not form.occupancy: no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions={}, total_people=form.total_people - form.short_range_occupants) - elif form.occupancy_format == 'dynamic': - for group_name, occupancy_list in form.dynamic_exposed_occupancy.items(): + else: + for group_id, group in form.occupancy.items(): # Check if the group exists in short-range interactions - if group_name in form.short_range_interactions: + if group_id in form.short_range_interactions: short_range_count = form.short_range_occupants - total_people = occupancy_list['total_people'] + total_people = group['total_people'] if total_people > short_range_count > 0: # Update the total_people with the adjusted value - occupancy_list['total_people'] = max(0, total_people - short_range_count) - no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions={}, dynamic_exposed_occupancy=form.dynamic_exposed_occupancy) + group['total_people'] = max(0, total_people - short_range_count) + no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions={}, occupancy=form.occupancy) scenarios['Base scenario without short-range interactions'] = no_short_range_alternative.build_mc_model() + for sceario_name, scenario in scenarios.items(): + scenarios[sceario_name] = scenario.exposure_models[0] # type: ignore return scenarios def scenario_statistics( - mc_model_group: mc.ExposureModelGroup, + mc_model: mc.ExposureModel, sample_times: typing.List[float], compute_prob_exposure: bool, ): - model_group: models.ExposureModelGroup = mc_model_group.build_model( - size=mc_model_group.data_registry.monte_carlo['sample_size']) - model = model_group.exposure_models[0] + model = mc_model.build_model( + size=mc_model.data_registry.monte_carlo['sample_size'] + ) return { 'probability_of_infection': np.mean(model.infection_probability()), @@ -552,21 +559,21 @@ def scenario_statistics( def comparison_report( form: VirusFormData, report_data: typing.Dict[str, typing.Any], - scenarios: typing.Dict[str, mc.ExposureModelGroup], + scenarios: typing.Dict[str, mc.ExposureModel], executor_factory: typing.Callable[[], concurrent.futures.Executor], ): if (form.short_range_option == "short_range_no"): statistics = { 'Current scenario': { - 'probability_of_infection': report_data['groups']['static']['prob_inf'], - 'expected_new_cases': report_data['groups']['static']['expected_new_cases'], - 'concentrations': report_data['groups']['static']['concentrations'], + 'probability_of_infection': report_data['groups']['group_1']['prob_inf'], + 'expected_new_cases': report_data['groups']['group_1']['expected_new_cases'], + 'concentrations': report_data['groups']['group_1']['concentrations'], } } else: statistics = {} - compute_prob_exposure = form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure" and form.occupancy_format == "static" + compute_prob_exposure = form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure" and not form.occupancy with executor_factory() as executor: results = executor.map( @@ -585,7 +592,9 @@ def comparison_report( } -def alternative_scenarios_data(form: VirusFormData, report_data: typing.Dict[str, typing.Any], executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]: +def alternative_scenarios_data(form: VirusFormData, + report_data: typing.Dict[str, typing.Any], + executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]: alternative_scenarios: typing.Dict[str, typing.Any] = manufacture_alternative_scenarios(form=form) return { 'alternative_scenarios': comparison_report(form=form, report_data=report_data, scenarios=alternative_scenarios, executor_factory=executor_factory) diff --git a/caimira/src/caimira/calculator/validators/co2/co2_validator.py b/caimira/src/caimira/calculator/validators/co2/co2_validator.py index e7fe4ef9..c85738f9 100644 --- a/caimira/src/caimira/calculator/validators/co2/co2_validator.py +++ b/caimira/src/caimira/calculator/validators/co2/co2_validator.py @@ -28,8 +28,6 @@ class CO2FormData(FormData): # and the defaults in any html form must not be contradictory. _DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = { 'CO2_data': '{}', - 'dynamic_exposed_occupancy': '{}', - 'dynamic_infected_occupancy': '[]', 'exposed_coffee_break_option': 'coffee_break_0', 'exposed_coffee_duration': 5, 'exposed_finish': '17:30', @@ -47,7 +45,7 @@ class CO2FormData(FormData): 'infected_lunch_start': '12:30', 'infected_people': 1, 'infected_start': '08:30', - 'occupancy_format': 'static', + 'occupancy': '{}', 'room_capacity': None, 'room_volume': NO_DEFAULT, 'specific_breaks': '{}', @@ -74,7 +72,7 @@ class CO2FormData(FormData): raise TypeError(f'The room capacity should be a valid integer (> 0). Got {self.room_capacity}.') # Validate specific inputs - breaks (exposed and infected) - if self.specific_breaks != {} and self.occupancy_format == 'static': + if self.specific_breaks != {} and not self.occupancy: if type(self.specific_breaks) is not dict: raise TypeError('The specific breaks should be in a dictionary.') diff --git a/caimira/src/caimira/calculator/validators/defaults.py b/caimira/src/caimira/calculator/validators/defaults.py index 69b663ed..5aa3f2ce 100644 --- a/caimira/src/caimira/calculator/validators/defaults.py +++ b/caimira/src/caimira/calculator/validators/defaults.py @@ -16,8 +16,6 @@ DEFAULTS = { 'calculator_version': NO_DEFAULT, 'ceiling_height': 0., 'conditional_probability_viral_loads': False, - 'dynamic_exposed_occupancy': '{}', - 'dynamic_infected_occupancy': '[]', 'event_month': 'January', 'exposed_coffee_break_option': 'coffee_break_0', 'exposed_coffee_duration': 5, @@ -49,7 +47,7 @@ DEFAULTS = { 'mask_type': 'Type I', 'mask_wearing_option': 'mask_off', 'mechanical_ventilation_type': 'not-applicable', - 'occupancy_format': 'static', + 'occupancy': '{}', 'opening_distance': 0., 'precise_activity': '{}', 'room_heating_option': False, diff --git a/caimira/src/caimira/calculator/validators/form_validator.py b/caimira/src/caimira/calculator/validators/form_validator.py index 2d378d65..cfd0727c 100644 --- a/caimira/src/caimira/calculator/validators/form_validator.py +++ b/caimira/src/caimira/calculator/validators/form_validator.py @@ -36,15 +36,11 @@ class FormData: infected_lunch_start: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed infected_start: minutes_since_midnight infected_people: int - occupancy_format: str + occupancy: dict room_volume: float specific_breaks: dict total_people: int - # Dynamic occupancy inputs - dynamic_exposed_occupancy: dict - dynamic_infected_occupancy: list - data_registry: DataRegistry _DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS @@ -96,147 +92,169 @@ class FormData: if default is not NO_DEFAULT and value in [default, 'not-applicable']: form_dict.pop(attr) return form_dict - - def validate_dynamic_input(self, dynamic_input_value, input_name, dynamic_input_key = None): - # Check if the input is a valid non-empty list - specific_group_msg = f' in "{dynamic_input_key}" ("presence")' if dynamic_input_key else '' - if not isinstance(dynamic_input_value, list) : - raise TypeError(f'The input "{input_name}"{specific_group_msg} should be a valid, non-empty list. Got "{type(dynamic_input_value)}".') - # Check if input is populated - if len(dynamic_input_value) == 0: - raise ValueError(f'The input "{input_name}" should be a valid, non-empty list. Got "{dynamic_input_value}".') + def validate_group_presence_input(self, group_id: str, group_presence: typing.Dict): + """ + When occupancy is defined, this method validates the + presence times within an occupancy group. + """ + # Checks if the presence input is a valid list + if not isinstance(group_presence, list): + raise TypeError(f'The presence parameter in occupancy group "{group_id}" should be a valid list. Got {type(group_presence)}.') + # Checks if the presence input is populated + if len(group_presence) == 0: + raise TypeError(f'The presence parameter in occupancy group "{group_id}" should be a valid, non-empty list. Got {group_presence}.') - # To store already processed interactions for overlap checking - existing_dynamic_infected_interval = [] - existing_dynamic_exposed_interval = [] - short_range_existing_interaction = [] + # Already processed presence intervals for overlap checking + existing_occupancy_presence_interval = [] - for entry in dynamic_input_value: - # Check if each entry is a dictionary - if not isinstance(entry, typing.Dict): - raise TypeError(f'Each entry in "{input_name}" should be a dictionary. Got "{type(entry)}".') + for presence_interval in group_presence: + # Checks if each presence entry is a valid dict + if not isinstance(presence_interval, typing.Dict): + raise TypeError(f'Each presence interval should be a valid dictionary. Got {type(presence_interval)} in occupancy group "{group_id}".') - # Check for required keys in each entry - dict_keys = entry.keys() + # Parameters in each presence entry + presence_params = presence_interval.keys() - # Check for the "start_time" and "finish_time" keys for "dynamic_exposed_occupancy" and "dynamic_infected_occupancy" - if input_name in ["dynamic_exposed_occupancy", "dynamic_infected_occupancy"]: - # Check for time format in "start_time" and "finish_time" - for time_key in ["start_time", "finish_time"]: - if time_key not in dict_keys: - raise TypeError(f'Missing "{time_key}" key in "{input_name}"{specific_group_msg}. Got keys: "{list(dict_keys)}".') - time_value = entry[time_key] - if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time_value): - raise ValueError(f'Invalid time format for "{time_key}" in "{input_name}"{specific_group_msg}. Expected "HH:MM". Got "{time_value}".') - if entry["finish_time"] <= entry["start_time"]: - raise ValueError(f'Occupancy presence in "{input_name}"{specific_group_msg} entry has a finish time after the start time. Got start time: "{entry["start_time"]}" and finish time: "{entry["finish_time"]}".') - - # Check for the "dynamic_infected_occupancy" uniqueness of intervals, - # as well as the "total_people" keyword and its constraints - if input_name == "dynamic_infected_occupancy": - self.check_overlap(entry, existing_dynamic_infected_interval) - existing_dynamic_infected_interval.append(entry) - if "total_people" not in dict_keys: - raise TypeError(f'Missing "total_people" key in "dynamic_infected_occupancy". Got keys: "{list(dict_keys)}".') - else: - value = entry["total_people"] - if not isinstance(value, int) or value < 0: - raise ValueError(f'The "total_people" in "{input_name}" should be a non-negative integer. Got "{value}".') + # Checks for the "start_time" and "finish_time" params + for time_param in ["start_time", "finish_time"]: + if time_param not in presence_params: + raise TypeError(f'Missing {time_param} key in presence parameter of occupancy group "{group_id}".' + f' Got keys: {", ".join(presence_params)}.') + + time_value = presence_interval[time_param] + if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time_value): + raise ValueError(f'Invalid time format found in presence parameter of occupancy group "{group_id}". ' + f'Expected HH:MM, got {time_value}.') - # Check for the "dynamic_exposed_occupancy" uniqueness of intervals - if input_name == "dynamic_exposed_occupancy": - self.check_overlap(entry, existing_dynamic_exposed_interval) - existing_dynamic_exposed_interval.append(entry) + if presence_interval["finish_time"] <= presence_interval["start_time"]: + raise ValueError(f'Inconsistent times found in "presence" parameter of occupancy group "{group_id}".' + f'The "{presence_interval}" entry has a start time ("{presence_interval["start_time"]}") ' + f'after the finish time ("{presence_interval["finish_time"]}").') - # Check for remaining short-range inputs - if input_name == "short_range_interactions": - # Check for time format in "start_time" and "finish_time" - if "start_time" not in dict_keys: - raise TypeError(f'Missing "start_time" key in "short_range_interactions", "{dynamic_input_key}". Got keys: "{list(dict_keys)}".') - start_time = entry["start_time"] + # Checks for the occupancy group uniqueness of intervals + self.check_overlap(presence_interval, existing_occupancy_presence_interval) + existing_occupancy_presence_interval.append(presence_interval) + + def validate_short_range_interaction_input(self, group_id: str, sr_interactions: typing.List): + """ + Validates the short-range interactions within an occupancy group. + """ + # Within a group, checks if the short-range input is a valid list + if not isinstance(sr_interactions, list): + raise TypeError(f'The short-range interactions in occupancy group "{group_id}" should be defined in a valid list. Got {type(sr_interactions)}.') + # Within a group, checks if the list is populated + if len(sr_interactions) == 0: + raise TypeError(f'The short-range interactions in occupancy group "{group_id}" should be a non-empty list. Got {type(sr_interactions)}.') + + # Already processed interactions for overlap checking + existing_sr_interaction_interval: typing.List = [] + + for interaction in sr_interactions: + # Checks if each interaction is a valid dict + if not isinstance(interaction, typing.Dict): + raise TypeError(f'Each short-range interaction should be a dictionary. Got {type(interaction)} in occupancy group "{group_id}".') + + # Parameters in each short-range interaction + interaction_params = interaction.keys() + + # Checks for the expiration key and its constraints + if "expiration" not in interaction_params: + raise TypeError(f'Missing expiration key in short-range interaction for occupancy group "{group_id}". Got keys: {", ".join(interaction_params)}.') + else: + expiration = interaction["expiration"] + if expiration not in list(self.data_registry.expiration_particle['expiration_BLO_factors'].keys()): # type: ignore + raise ValueError(f'Invalid expiration value in short-range interaction for occupancy group "{group_id}". Got "{expiration}".') + + # Checks for start_time key and its format + if "start_time" not in interaction_params: + raise TypeError(f'Missing start_time key in short-range interaction for occupancy group "{group_id}". Got keys: {", ".join(interaction_params)}.') + else: + start_time = interaction["start_time"] if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(start_time): - raise ValueError(f'Invalid time format for "start_time" in "short_range_interactions", "{dynamic_input_key}". Expected "HH:MM". Got "{start_time}".') - - # Check for the "expiration" key and its constraints - if "expiration" not in dict_keys: - raise TypeError(f'Missing "expiration" key in "short_range_interactions", "{dynamic_input_key}". Got keys: "{list(dict_keys)}".') - else: - value = entry["expiration"] - if value not in list(self.data_registry.expiration_particle['expiration_BLO_factors'].keys()): - raise ValueError(f'The "expiration" in "short_range_interactions", "{dynamic_input_key}", does not exist in the registry. Got "{value}".') - - if "duration" not in dict_keys: - raise TypeError(f'Missing "duration" key in "short_range_interactions", "{dynamic_input_key}". Got keys: "{list(dict_keys)}".') - duration = entry["duration"] + raise ValueError(f'Invalid time format for start_time in short-range interaction for occupancy group "{group_id}". Expected HH:MM, got {start_time}.') + + # Checks for "duration" and its format + if "duration" not in interaction_params: + raise TypeError(f'Missing duration key in short-range interaction for occupancy group "{group_id}". Got keys: {", ".join(interaction_params)}.') + else: + duration = interaction["duration"] if duration < 0: - raise ValueError(f'The "duration" in "short_range_interactions", "{dynamic_input_key}", should be a non-negative integer. Got "{duration}".') + raise ValueError(f'The duration value in short-range interaction for occupancy group "{group_id}" should be a non-negative integer. Got {duration}.') + + # Legacy usage - occupancy input is not defined (default empty dict) + if not self.occupancy: + # It means that we have a single exposure model + lr_start = min(self.infected_start, self.exposed_start)/60 + lr_stop = max(self.infected_finish, self.exposed_finish)/60 - # Occupancy format dependent inputs - if self.occupancy_format == 'dynamic': - # Find corresponding exposure group - exposure_group_obj = next( - (occupancy_group for occupancy_key, occupancy_group in self.dynamic_exposed_occupancy.items() - if occupancy_key == dynamic_input_key), - None + if not self.check_interaction_is_within_long_range(interaction, existing_sr_interaction_interval, lr_start, lr_stop): + raise ValueError( + f'Short-range interactions must occur during simulation time. Got {interaction} in occupancy group "{group_id}".' + ) + + # Add interaction to the list of already processed interactions + existing_sr_interaction_interval.append(interaction) + else: + # Find corresponding exposure group + occupancy_group_obj = next( + (occupancy_value for occupancy_key, occupancy_value in self.occupancy.items() + if occupancy_key == group_id), + None + ) + + if occupancy_group_obj is None: + raise ValueError( + f'Occupancy group "{group_id}" referenced in short-range interactions was not found in the occupancy input.' ) - if exposure_group_obj is None: - raise ValueError( - f'Exposure group "{dynamic_input_key}" in short-range interaction not found in dynamic exposed occupancy.' - ) + is_within_any_lr = False + for presence in occupancy_group_obj['presence']: + # Check for correct timing within long-range exposure and overlaps with existing interactions + lr_start = time_string_to_minutes(presence['start_time'])/60 + lr_stop = time_string_to_minutes(presence['finish_time'])/60 + + # Flag to check if interaction falls within any long-range exposure interval + if self.check_interaction_is_within_long_range(interaction, existing_sr_interaction_interval, lr_start, lr_stop): + is_within_any_lr = True + + # Add interaction to the list of processed interactions if within long-range + existing_sr_interaction_interval.append(interaction) - is_within_any_long_range = False - for exposure_interval in exposure_group_obj['presence']: - # Check for correct timing within long-range exposure and overlaps with existing interactions - long_range_start = time_string_to_minutes(exposure_interval['start_time'])/60 - long_range_stop = time_string_to_minutes(exposure_interval['finish_time'])/60 - - # Flag to check if interaction falls within any long-range exposure interval - if self.check_time_and_overlap( - entry, short_range_existing_interaction, long_range_start, long_range_stop - ): is_within_any_long_range = True - - # Add interaction to the list of processed interactions if within long-range - short_range_existing_interaction.append(entry) - - # If no long-range interval contains the interaction, raise an error - if not is_within_any_long_range: - raise ValueError( - f'Short-range interaction "{entry}" does not fall within any long-range exposure interval in "{dynamic_input_key}".' - ) - - elif self.occupancy_format == 'static': - # It means that we have a single exposure model - long_range_start = min(self.infected_start, self.exposed_start)/60 - long_range_stop = max(self.infected_finish, self.exposed_finish)/60 - - if not self.check_time_and_overlap(entry, short_range_existing_interaction, long_range_start, long_range_stop): - raise ValueError( - f'Short-range interactions should be defined during simulation time. Got "{entry}".' - ) - - # Add interaction to the list of processed interactions - short_range_existing_interaction.append(entry) - else: - raise TypeError( - f'Undefined exposure type. Got "{self.occupancy_format}", accepted formats are "dynamic" or "exposed".') + # If the interaction does not fall within any presence interval of the occupancy group, raise an error + if not is_within_any_lr: + raise ValueError( + f'Short-range interaction {interaction} does not fall within any presence interval in occupancy group "{group_id}".' + ) - def validate_dynamic_exposed_format(self, dynamic_exposed_group: dict, key: str): + def validate_dynamic_exposed_format(self, group_id: str, group: typing.Dict): """ - Validates the expected keywords for the dynamic - exposed input. - """ - # Total people - if 'total_people' not in dynamic_exposed_group: - raise TypeError(f'Missing "total_people" key in "dynamic_exposed_occupancy" group "{key}". Got keys: "{list(dynamic_exposed_group.keys())}".') + Validates the expected keywords for the occupancy input. + """ + # Parameters in each presence entry + group_params = group.keys() + + # Total people input + if 'total_people' not in group_params: + raise TypeError(f'Missing total_people key in occupancy group "{group_id}". Got keys: {", ".join(group_params)}.') else: - total_people = dynamic_exposed_group['total_people'] + total_people = group['total_people'] if not isinstance(total_people, int) or total_people < 0: - raise ValueError(f'The "total_people" in "dynamic_exposed_occupancy" group "{key}" should be a non-negative integer. Got "{total_people}".') - # Presence - if 'presence' not in dynamic_exposed_group: - raise TypeError(f'Missing "presence" key in "dynamic_exposed_occupancy" group "{key}". Got keys: "{list(dynamic_exposed_group.keys())}".') + raise ValueError(f'The total_people input in occupancy group "{group_id}" should be a non-negative integer. Got {total_people}.') + + # Infected people input + if 'infected' not in group_params: + raise TypeError(f'Missing infected key in occupancy group "{group_id}". Got keys: {", ".join(group_params)}.') + else: + infected = group['infected'] + if not isinstance(infected, int) or infected < 0: + raise ValueError(f'The infected input in occupancy group "{group_id}" should be a non-negative integer. Got {infected}.') + elif infected > total_people: # Validate number of infected <= number of total people + raise ValueError(f'The number of infected people ({infected}) cannot be greater than the total people ({total_people}).') + + # Presence input + if 'presence' not in group_params: + raise TypeError(f'Missing presence key in occupancy group "{group_id}". Got keys: {", ".join(group_params)}.') def get_start_and_finish_time(self, entry: dict): entry_start = time_string_to_minutes(entry["start_time"])/60 @@ -245,140 +263,128 @@ class FormData: else: entry_finish = entry_start + entry['duration']/60 return entry_start, entry_finish - - def check_time_and_overlap(self, interaction, existing_interactions, lr_start, lr_stop): + + def check_interaction_is_within_long_range(self, interaction, existing_interactions, + lr_start, lr_stop): """ - Checks if the interaction overlaps with any existing interactions for the - same exposure group and if it falls within the long-range exposure time. + Check if the short-range interaction falls within the long-range exposure time. + Check if the short-range interaction given as input overlaps with any already + existing interactions for the same occupancy group. """ interaction_start, interaction_finish = self.get_start_and_finish_time(interaction) # Check if the SR interaction is within the LR exposure time if lr_start <= interaction_start <= lr_stop and lr_start <= interaction_finish <= lr_stop: - for existing in existing_interactions: - existing_start, existing_finish = self.get_start_and_finish_time(existing) - # Check for overlap - if interaction_start < existing_finish and existing_start < interaction_finish: - raise ValueError( - f'Overlap detected for "short-range interaction": New interaction ' - f'"{interaction}" overlaps with existing interaction "{existing}".' - ) - # Return True if interaction falls within the current long-range interval + # Check the overlap with already existing interactions + self.check_overlap(interaction, existing_interactions) return True return False - def check_overlap(self, interaction, existing_interactions): + def check_overlap(self, entry, existing_entries): """ - Checks if the dynamic entry overlaps with any already existing interaction. + Check if an entry overlaps with an already existing entry + by comparing the start and finish times of all entries. """ - interaction_start = time_string_to_minutes(interaction["start_time"])/60 - interaction_finish = time_string_to_minutes(interaction["finish_time"])/60 - - for existing in existing_interactions: - existing_start = time_string_to_minutes(existing["start_time"])/60 - existing_finish = time_string_to_minutes(existing["finish_time"])/60 - + entry_start, entry_finish = self.get_start_and_finish_time(entry) + for existing_entry in existing_entries: + existing_entry_start, existing_entry_finish = self.get_start_and_finish_time(existing_entry) # Check for overlap - if (interaction_start < existing_finish and existing_start < interaction_finish): + if (entry_start < existing_entry_finish and existing_entry_start < entry_finish): raise ValueError( - f'Overlap detected: New interaction ' - f'"{interaction}" overlaps with existing interaction "{existing}".' + f'Overlap detected: The entry {entry} overlaps with ' + f'an already existing entry ({existing_entry}).' ) + # In case no exception is raised, simply returns return - def validate_population_parameters(self): - """Validates required parameters for dynamic inputs""" - # Static occupancy is defined. - if self.occupancy_format == 'static': - # Validate number of infected <= number of total people - if self.infected_people >= self.total_people: - raise ValueError( - 'Number of infected people cannot be greater or equal to the number of total people.') - - # Validate time intervals selected by user - time_intervals = [ - ['exposed_start', 'exposed_finish'], - ['infected_start', 'infected_finish'], - ] - if self.exposed_lunch_option: - time_intervals.append( - ['exposed_lunch_start', 'exposed_lunch_finish']) - if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option: - time_intervals.append( - ['infected_lunch_start', 'infected_lunch_finish']) - - for start_name, end_name in time_intervals: - start = getattr(self, start_name) - end = getattr(self, end_name) - if start > end: + """ + Validate required parameters for dynamic inputs. + """ + if isinstance(self.occupancy, typing.Dict): + # Legacy usage - occupancy input is not defined (default empty dict) + if not self.occupancy: + # Validate number of infected <= number of total people + if self.infected_people >= self.total_people: raise ValueError( - f"{start_name} must be less than {end_name}. Got {start} and {end}.") + 'Number of infected people cannot be greater or equal to the number of total people.') - def validate_lunch(start, finish): - lunch_start = getattr(self, f'{population}_lunch_start') - lunch_finish = getattr(self, f'{population}_lunch_finish') - return (start <= lunch_start <= finish and - start <= lunch_finish <= finish) + # Validate time intervals selected by user + time_intervals = [ + ['exposed_start', 'exposed_finish'], + ['infected_start', 'infected_finish'], + ] + if self.exposed_lunch_option: + time_intervals.append( + ['exposed_lunch_start', 'exposed_lunch_finish']) + if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option: + time_intervals.append( + ['infected_lunch_start', 'infected_lunch_finish']) - def get_lunch_mins(population): - lunch_mins = 0 - if getattr(self, f'{population}_lunch_option'): - lunch_mins = getattr( - self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start') - return lunch_mins - - def get_coffee_mins(population): - coffee_mins = 0 - if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0': - coffee_mins = COFFEE_OPTIONS_INT[getattr( - self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration') - return coffee_mins - - def get_activity_mins(population): - return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start') - - populations = [ - 'exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed'] - for population in populations: - # Validate lunch time within the activity times. - if (getattr(self, f'{population}_lunch_option') and - not validate_lunch(getattr(self, f'{population}_start'), getattr( - self, f'{population}_finish')) - ): - raise ValueError( - f"{population} lunch break must be within presence times." - ) - - # Length of breaks < length of activity - if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population): - raise ValueError( - f"Length of breaks >= Length of {population} presence." - ) - - for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT), - ('infected_coffee_break_option', COFFEE_OPTIONS_INT)]: - if getattr(self, attr_name) not in valid_set: + for start_name, end_name in time_intervals: + start = getattr(self, start_name) + end = getattr(self, end_name) + if start > end: raise ValueError( - f"{getattr(self, attr_name)} is not a valid value for {attr_name}") - # Dynamic occupancy is defined. - elif self.occupancy_format == 'dynamic': - # Validate dynamic input format - self.validate_dynamic_input(self.dynamic_infected_occupancy, "dynamic_infected_occupancy") - # Check if dynamic exposed input is a dict - if isinstance(self.dynamic_exposed_occupancy, dict): - # Check if the dict is not empty - if not self.dynamic_exposed_occupancy: - raise ValueError(f'The input "dynamic_exposed_occupancy" should be a valid, non-empty dict. Got "{self.dynamic_exposed_occupancy}".') - # The key is the actual identifier - for key, group in self.dynamic_exposed_occupancy.items(): - # For each group, validate dynamic exposed input format - self.validate_dynamic_exposed_format(group, key) - # ...as well as validate dynamic exposed presence data - self.validate_dynamic_input(group['presence'], "dynamic_exposed_occupancy", key) + f"{start_name} must be less than {end_name}. Got {start} and {end}.") + + def validate_lunch(start, finish): + lunch_start = getattr(self, f'{population}_lunch_start') + lunch_finish = getattr(self, f'{population}_lunch_finish') + return (start <= lunch_start <= finish and + start <= lunch_finish <= finish) + + def get_lunch_mins(population): + lunch_mins = 0 + if getattr(self, f'{population}_lunch_option'): + lunch_mins = getattr( + self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start') + return lunch_mins + + def get_coffee_mins(population): + coffee_mins = 0 + if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0': + coffee_mins = COFFEE_OPTIONS_INT[getattr( + self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration') + return coffee_mins + + def get_activity_mins(population): + return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start') + + populations = [ + 'exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed'] + for population in populations: + # Validate lunch time within the activity times. + if (getattr(self, f'{population}_lunch_option') and + not validate_lunch(getattr(self, f'{population}_start'), getattr( + self, f'{population}_finish')) + ): + raise ValueError( + f"{population} lunch break must be within presence times." + ) + + # Length of breaks < length of activity + if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population): + raise ValueError( + f"Length of breaks >= Length of {population} presence." + ) + + for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT), + ('infected_coffee_break_option', COFFEE_OPTIONS_INT)]: + if getattr(self, attr_name) not in valid_set: + raise ValueError( + f"{getattr(self, attr_name)} is not a valid value for {attr_name}") + # Occupancy input is defined else: - raise TypeError(f'The input "dynamic_exposed_occupancy" should be a valid, non-empty dict. Got "{type(self.dynamic_exposed_occupancy)}".') + # Checks if occupancy input is a valid dict + if self.occupancy and isinstance(self.occupancy, typing.Dict): + # The key is the actual identifier + for group_id, group in self.occupancy.items(): + # For each group, validate input format + self.validate_dynamic_exposed_format(group_id, group) + # ...as well as the respective presence input + self.validate_group_presence_input(group_id, group['presence']) else: - raise ValueError(f"'{self.occupancy_format}' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.") + raise TypeError(f'The occupancy input should be a valid dictionary. Got {self.occupancy}.') def validate(self): raise NotImplementedError("Subclass must implement") @@ -386,72 +392,73 @@ class FormData: def build_model(self, sample_size: typing.Optional[int] = None): raise NotImplementedError("Subclass must implement") - def population_present_changes(self, population_list: typing.List[models.Interval]) -> typing.List[float]: + def population_present_changes(self, transition_times_list: typing.Tuple[float, ...]) -> typing.List[float]: """ Returns a sorted list of unique state changes on a population list. """ - state_change_times = set(population_list[0].transition_times()) - for population in population_list: - state_change_times.update(population.transition_times()) - return sorted(state_change_times) + return sorted(set(transition_times_list)) + + def convert_interval_to_piecewise(self, interval: models.SpecificInterval, value: int): + """ + Converts an Interval and a single value to an IntPiecewiseConstant. + """ + transition_times = [] + values = [] + + for start, end in interval.present_times: + transition_times.extend([start, end]) + values.extend([value, 0]) + + # Drop the last value (0) to match number of intervals + if values: + values.pop() + + return models.IntPiecewiseConstant( + transition_times=tuple(transition_times), + values=tuple(values), + ) def build_CO2_piecewise(self): """ Builds a simple IntPiecewiseConstant for the different population groups that are defined. """ - if self.occupancy_format == 'dynamic': - infected_occupancy = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy) - total_models = [models.SimplePopulation( - number=infected_occupancy, # IntPiecewiseConstant - presence=None, - activity=None, # type: ignore - )] + # Legacy usage - occupancy input is not defined (default empty dict) + if not self.occupancy: + infected_occupancy = self.convert_interval_to_piecewise( + interval=self.infected_present_interval(), + value=self.infected_people, + ) + exposed_occupancy = self.convert_interval_to_piecewise( + interval=self.exposed_present_interval(), + value=self.total_people - self.infected_people, + ) + total_models = [infected_occupancy, exposed_occupancy] + else: + infected_occupancy = self.generate_infected_occupancy(self.occupancy) + total_models = [infected_occupancy] # For all state changes - total_presence = [infected_occupancy.interval()] - - for exposure_group in self.dynamic_exposed_occupancy.values(): - exposed_occupancy = exposure_group['total_people'] - exposed_presence = self.generate_dynamic_occupancy(exposure_group['presence']) - total_models.append(models.SimplePopulation( - number=exposed_occupancy, # int - presence=exposed_presence, # SpecificInterval - activity=None, # type: ignore - )) - # For all state changes - total_presence.append(exposed_presence) - - elif self.occupancy_format == 'static': - infected_people = self.infected_people - exposed_people = self.total_people - self.infected_people - infected_presence = self.infected_present_interval() - exposed_presence = self.exposed_present_interval() - - infected_population = models.SimplePopulation( - number=infected_people, - presence=infected_presence, - activity=None, # type: ignore - ) - exposed_population=models.SimplePopulation( - number=exposed_people, - presence=exposed_presence, - activity=None, # type: ignore - ) - - total_presence = [infected_presence, exposed_presence] - total_models = [infected_population, exposed_population] + for group in self.occupancy.values(): + model_piecewise = self.convert_interval_to_piecewise( + interval=self.generate_exposed_presence(group['presence']), + value=group['total_people'] - group['infected'] + ) + total_models.append(model_piecewise) # Get all state change times from combined populations - all_state_changes=self.population_present_changes(total_presence) + all_state_changes = self.population_present_changes([t for model in total_models for t in model.transition_times]) # Compute total people at each state change total_people = [] for _, stop in zip(all_state_changes[:-1], all_state_changes[1:]): - total_people_in_group = sum(population.people_present(stop) for population in total_models) + total_people_in_group = sum(model.value(stop) for model in total_models) total_people.append(total_people_in_group) - return models.IntPiecewiseConstant(transition_times=tuple(all_state_changes), values=tuple(total_people)) + return models.IntPiecewiseConstant( + transition_times=tuple(all_state_changes), + values=tuple(total_people) + ) def _compute_breaks_in_interval(self, start, finish, n_breaks, duration) -> models.BoundarySequence_t: break_delay = ((finish - start) - @@ -661,66 +668,66 @@ class FormData: self.exposed_start, self.exposed_finish, breaks=breaks, ) + + def generate_exposed_presence(self, presence: typing.List) -> models.SpecificInterval: + """ + Creates a model to represent exposed occupancy over time. + """ + exposed_intervals = [] + + # Sort occupancy entries by start_time to ensure proper ordering + presence_sorted = sorted( + presence, key=lambda x: time_string_to_minutes(x['start_time']) + ) + + for period in presence_sorted: + start_time = time_string_to_minutes(period['start_time']) / 60 + finish_time = time_string_to_minutes(period['finish_time']) / 60 + exposed_intervals.append((start_time, finish_time)) + + return models.SpecificInterval(tuple(exposed_intervals)) - def generate_dynamic_occupancy(self, dynamic_occupancy: typing.List[typing.Dict[str, typing.Any]]): + def generate_infected_occupancy(self, occupancy: typing.Dict) -> models.IntPiecewiseConstant: """ - Creates a model to represent occupancy over time. If `total_people` is - provided in the input, the method generates an `IntPiecewiseConstant` model, representing - occupancy as a piecewise constant function with defined values for each interval. - Otherwise, it generates a `SpecificInterval` model, which only tracks the presence of - intervals without associating values to them. + Creates a model to represent infected occupancy over time. """ + transition_times = set() + infected_intervals = [] - # Initialize variables - if 'total_people' in dynamic_occupancy[0]: # build IntPiecewiseConstant - computePiecewiseConstant = True - transition_times = [] - values = [] - else: - computePiecewiseConstant = False - present_times = [] # build SpecificInterval + # Extract presence data + for group in occupancy.values(): + infected = group["infected"] + for period in group["presence"]: + start_time = time_string_to_minutes(period['start_time']) / 60 + finish_time = time_string_to_minutes(period['finish_time']) / 60 + transition_times.add(start_time) # unique time points + transition_times.add(finish_time) # unique time points + infected_intervals.append((start_time, finish_time, infected)) + + # Sort transition times + sorted_transition_times = list(sorted(transition_times)) - # Sort occupancy entries by start_time to ensure proper ordering - dynamic_occupancy_sorted = sorted( - dynamic_occupancy, key=lambda x: time_string_to_minutes(x['start_time']) + # Values for each time segment + raw_values = [ + sum(people for start, end, people in infected_intervals if start <= t1 < end) + for t1 in sorted_transition_times[:-1] + ] + + # Merge consecutive intervals with the same infected count + opt_times = [sorted_transition_times[0]] + opt_values = [raw_values[0]] + for i in range(1, len(raw_values)): + if raw_values[i] != opt_values[-1]: + opt_times.append(sorted_transition_times[i]) + opt_values.append(raw_values[i]) + # Ensure the last time is included + opt_times.append(sorted_transition_times[-1]) + + return models.IntPiecewiseConstant( + transition_times=tuple(opt_times), + values=tuple(opt_values) ) - last_finish_time = None - - for occupancy in dynamic_occupancy_sorted: - start_time = time_string_to_minutes(occupancy['start_time']) / 60 - finish_time = time_string_to_minutes(occupancy['finish_time']) / 60 - - if computePiecewiseConstant: - total_people = occupancy['total_people'] - - # Fill in gap with a zero occupancy if there is a time gap - if last_finish_time is not None and start_time > last_finish_time: - transition_times.append(last_finish_time) - values.append(0) # Add zero for the gap period - - # Update lists with current occupancy entry - transition_times.extend([start_time, finish_time]) - values.append(total_people) - - # Update the last known finish time - last_finish_time = finish_time - else: - present_times.append((start_time, finish_time)) - - # When computing the total people, validate that we have enough values to compute the occupancy - if computePiecewiseConstant: - unique_transition_times_sorted = np.array(sorted(set(transition_times))) - if (len(values) != len(unique_transition_times_sorted) - 1): - raise ValueError("Cannot compute dynamic occupancy with the provided inputs.") - else: - return models.IntPiecewiseConstant( - transition_times=tuple(unique_transition_times_sorted), - values=tuple(values) - ) - else: - return models.SpecificInterval(tuple(present_times)) - def _hours2timestring(hours: float): # Convert times like 14.5 to strings, like "14:30" diff --git a/caimira/src/caimira/calculator/validators/virus/virus_validator.py b/caimira/src/caimira/calculator/validators/virus/virus_validator.py index a4160a40..4718b116 100644 --- a/caimira/src/caimira/calculator/validators/virus/virus_validator.py +++ b/caimira/src/caimira/calculator/validators/virus/virus_validator.py @@ -201,13 +201,14 @@ class VirusFormData(FormData): f'The sum of all respiratory activities should be 100. Got {total_percentage}.') # Validate number of people with short-range interactions - if self.occupancy_format == 'static': + if not self.occupancy: + # Legacy usage - occupancy input is not defined (default empty dict) max_occupants_for_sr = self.total_people - self.infected_people - elif self.occupancy_format == 'dynamic': + else: max_occupants_for_sr = 0 - for key, group in self.dynamic_exposed_occupancy.items(): - occupants_in_group = group['total_people'] - max_occupants_for_sr = max(max_occupants_for_sr, occupants_in_group) + for group_id, group in self.occupancy.items(): + exposed_occupants_in_group = group['total_people'] - group['infected'] + max_occupants_for_sr = max(max_occupants_for_sr, exposed_occupants_in_group) if self.short_range_occupants > max_occupants_for_sr: raise ValueError( @@ -217,14 +218,19 @@ class VirusFormData(FormData): # Validate short-range interactions interval if self.short_range_option == "short_range_yes": if isinstance(self.short_range_interactions, dict): - # Check if occupancy format is static, there should be one key-value only in short_range_interactions - if self.occupancy_format == "static" and len(self.short_range_interactions) > 1: - raise ValueError( - 'When occupancy format is "static", there should be only one interaction group in "short_range_interactions".' - ) - # The key is the actual identifier - for key, group in self.short_range_interactions.items(): - self.validate_dynamic_input(group, "short_range_interactions", key) + # Checks if short_range_interactions input is not empty + if len(self.short_range_interactions) == 0: + raise ValueError(f'When short_range_option input is set to "{self.short_range_option}", the short_range_interactions input should not be empty. Got {self.short_range_interactions}.') + # Checks that the number of groups in the short_range_interactions input is less or equal than those defined in the occupancy + elif not self.occupancy: + # Legacy usage - occupancy input is not defined (default empty dict) + if len(self.short_range_interactions) > 1: + raise ValueError(f'Incompatible number of occupancy groups in the short_range_interactions input. Got {len(self.short_range_interactions)} groups when the maximum is 1.') + else: + if len(self.short_range_interactions) > len(self.occupancy): + raise ValueError(f'Incompatible number of occupancy groups in the short_range_interactions input. Got {len(self.short_range_interactions)} groups when the maximum is {len(self.occupancy)} (from the occupancy input).') + for group_id, interactions in self.short_range_interactions.items(): + self.validate_short_range_interaction_input(group_id, interactions) def initialize_room(self) -> models.Room: # Initializes room with volume either given directly or as product of area and height @@ -246,8 +252,6 @@ class VirusFormData(FormData): return models.Room(volume=volume, inside_temp=models.PiecewiseConstant((0, 24), (inside_temp,)), humidity=humidity) # type: ignore def build_mc_model(self) -> mc.ExposureModelGroup: - size = self.data_registry.monte_carlo['sample_size'] - room: models.Room = self.initialize_room() ventilation: models._VentilationBase = self.ventilation() infected_population: models.InfectedPopulation = self.infected_population() @@ -266,7 +270,7 @@ class VirusFormData(FormData): presence=presence, distance=distances, expiration_def=interaction['expiration'] - ).build_model(size)) + )) concentration_model: models.ConcentrationModel = mc.ConcentrationModel( data_registry=self.data_registry, @@ -274,19 +278,34 @@ class VirusFormData(FormData): ventilation=ventilation, infected=infected_population, evaporation_factor=0.3, - ).build_model(size) + ) geographical_data: models.Cases = mc.Cases( geographic_population=self.geographic_population, geographic_cases=self.geographic_cases, ascertainment_bias=CONFIDENCE_LEVEL_OPTIONS[self.ascertainment_bias], - ).build_model(size) + ) - if self.occupancy_format == 'dynamic': + if not self.occupancy: + # Legacy usage - occupancy input is not defined (default empty dict) + exposed_population = self.exposed_population() + short_range_tuple = tuple(item for sublist in short_range.values() for item in sublist) + return mc.ExposureModelGroup( + data_registry=self.data_registry, + exposure_models = (mc.ExposureModel( + data_registry=self.data_registry, + concentration_model=concentration_model, + short_range=short_range_tuple, + exposed=exposed_population, + geographical_data=geographical_data, + exposed_to_short_range=self.short_range_occupants, + ),) + ) + else: exposure_model_set = [] - for exposure_group in self.dynamic_exposed_occupancy.keys(): + for exposure_group in self.occupancy.keys(): sr_models: typing.Tuple[models.ShortRangeModel, ...] = tuple(short_range[exposure_group]) - exposed_population: mc.Population = self.exposed_population(exposure_group).build_model(size) + exposed_population = self.exposed_population(exposure_group) exposure_model = mc.ExposureModel( data_registry=self.data_registry, @@ -301,27 +320,12 @@ class VirusFormData(FormData): return mc.ExposureModelGroup( data_registry=self.data_registry, - exposure_models=[individual_model.build_model(size) for individual_model in exposure_model_set] - ) - - elif self.occupancy_format == 'static': - exposed_population = self.exposed_population() - short_range_tuple = tuple(item for sublist in short_range.values() for item in sublist) - return mc.ExposureModelGroup( - data_registry=self.data_registry, - exposure_models = [mc.ExposureModel( - data_registry=self.data_registry, - concentration_model=concentration_model, - short_range=short_range_tuple, - exposed=exposed_population, - geographical_data=geographical_data, - exposed_to_short_range=self.short_range_occupants, - ).build_model(size)] + exposure_models=tuple(exposure_model_set) ) def build_model(self, sample_size=None) -> models.ExposureModelGroup: - size = self.data_registry.monte_carlo['sample_size'] if not sample_size else sample_size - return self.build_mc_model().build_model(size=size) + sample_size = sample_size or self.data_registry.monte_carlo['sample_size'] + return self.build_mc_model().build_model(sample_size) def build_CO2_model(self, sample_size=None) -> models.CO2ConcentrationModel: """ @@ -481,7 +485,7 @@ class VirusFormData(FormData): def generate_precise_activity_expiration(self) -> typing.Tuple[typing.Any, ...]: # It means the precise activity is not defined by a specific input. - if self.precise_activity == {}: + if not self.precise_activity: return () respiratory_dict = {} for respiratory_activity in self.precise_activity['respiratory_activity']: @@ -499,12 +503,13 @@ class VirusFormData(FormData): virus = virus_distributions(self.data_registry)[self.virus_type] # Occupancy - if self.occupancy_format == 'dynamic': - infected_occupancy = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy) - infected_presence = None - else: - infected_occupancy = self.infected_people + if not self.occupancy: + # Legacy usage - occupancy input is not defined (default empty dict) + infected_occupancy: typing.Union[int, models.IntPiecewiseConstant] = self.infected_people infected_presence = self.infected_present_interval() + else: + infected_occupancy = self.generate_infected_occupancy(self.occupancy) + infected_presence = None # Activity and expiration activity_defn = self.data_registry.population_scenario_activity[ @@ -513,8 +518,7 @@ class VirusFormData(FormData): self.activity_type]['expiration'] if (self.activity_type == 'smallmeeting'): # Conversation of N people is approximately 1/N% of the time speaking. - total_people: int = max( - infected_occupancy.values) if self.occupancy_format == 'dynamic' else self.total_people + total_people: int = self.total_people if not self.occupancy else max(infected_occupancy.values) # type: ignore expiration_defn = {'Speaking': 1, 'Breathing': total_people - 1} elif (self.activity_type == 'precise'): activity_defn, expiration_defn = self.generate_precise_activity_expiration() @@ -542,16 +546,15 @@ class VirusFormData(FormData): single group of exposed population, except when breaks are defined. """ # Occupancy - if self.occupancy_format == 'dynamic': - dynamic_group = self.dynamic_exposed_occupancy[exposure_group] - - exposed_occupancy = dynamic_group['total_people'] - exposed_presence = self.generate_dynamic_occupancy(dynamic_group['presence']) - else: + if not exposure_group and not self.occupancy: # The number of exposed occupants is the total number of occupants # minus the number of infected occupants. exposed_occupancy = self.total_people - self.infected_people exposed_presence = self.exposed_present_interval() + elif exposure_group: + dynamic_group = self.occupancy[exposure_group] + exposed_occupancy = dynamic_group['total_people'] - dynamic_group['infected'] + exposed_presence = self.generate_exposed_presence(dynamic_group['presence']) # Activity activity_defn = (self.precise_activity['physical_activity'] @@ -608,8 +611,6 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]: 'calculator_version': calculator_version, 'ceiling_height': '', 'conditional_probability_viral_loads': '0', - 'dynamic_exposed_occupancy': '{}', - 'dynamic_infected_occupancy': '[]', 'event_month': 'January', 'exposed_coffee_break_option': 'coffee_break_4', 'exposed_coffee_duration': '10', @@ -640,7 +641,7 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]: 'mask_type': 'Type I', 'mask_wearing_option': 'mask_off', 'mechanical_ventilation_type': '', - 'occupancy_format': 'static', + 'occupancy': '{}', 'opening_distance': '0.2', 'room_heating_option': '0', 'room_number': '123', diff --git a/caimira/tests/apps/calculator/test_model_generator.py b/caimira/tests/apps/calculator/test_model_generator.py index b33d7363..40ad284a 100644 --- a/caimira/tests/apps/calculator/test_model_generator.py +++ b/caimira/tests/apps/calculator/test_model_generator.py @@ -590,71 +590,52 @@ def test_form_timezone(baseline_form_data, data_registry, longitude, latitude, m assert offset == expected_offset -@pytest.mark.parametrize( - ["occupancy_format_input", "error"], - [ - ['dynamc', "'dynamc' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",], - ['stact', "'stact' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",], - ['random', "'random' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",] - ] -) -def test_occupancy_format_input(occupancy_format_input, error, baseline_form: virus_validator.VirusFormData): - baseline_form.occupancy_format = occupancy_format_input - with pytest.raises(ValueError, match=re.escape(error)): - baseline_form.validate() - - -def test_dynamic_input_format_ValueError(baseline_form: virus_validator.VirusFormData): - baseline_form.occupancy_format = "dynamic" - # Check empty list for infected occupancy - baseline_form.dynamic_infected_occupancy = [] - error = 'The input "dynamic_infected_occupancy" should be a valid, non-empty list. Got "[]".' - with pytest.raises(ValueError, match=re.escape(error)): - baseline_form.validate() - # Check empty dict for exposed occupancy - baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}] - baseline_form.dynamic_exposed_occupancy = {} - error = 'The input "dynamic_exposed_occupancy" should be a valid, non-empty dict. Got "{}".' - with pytest.raises(ValueError, match=re.escape(error)): - baseline_form.validate() - - -@pytest.mark.parametrize( - ["exposed_format", "error"], - [ - [ - {"tal_people": 10, "presence": [{"start_time": "10:00", "finish_time": "11:00"}],}, - 'Missing "total_people" key in "dynamic_exposed_occupancy" group "group_1". Got keys: "[\'tal_people\', \'presence\']".' - ], - [ - {"total_people": 10, "pesence": [{"start_time": "10:00", "finish_time": "11:00"}],}, - 'Missing "presence" key in "dynamic_exposed_occupancy" group "group_1". Got keys: "[\'total_people\', \'pesence\']".' - ], - ] -) -def test_dynamic_exposed_format_TypeError(exposed_format, error, baseline_form: virus_validator.VirusFormData): - baseline_form.occupancy_format = "dynamic" - baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}] - baseline_form.dynamic_exposed_occupancy = {"group_1": exposed_format} +def test_occupancy_TypeError(baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy = [] # type: ignore + error = 'The occupancy input should be a valid dictionary. Got [].' with pytest.raises(TypeError, match=re.escape(error)): baseline_form.validate() @pytest.mark.parametrize( - ["exposed_presence", "error"], + ["occupancy", "error"], [ - [[["start_time", "10:00", "finish_time", "11:00"]], 'Each entry in "dynamic_exposed_occupancy" should be a dictionary. Got "".'], - [[{"art_time": "10:00", "finish_time": "11:00"}], 'Missing "start_time" key in "dynamic_exposed_occupancy" in "group_1" ("presence"). Got keys: "[\'art_time\', \'finish_time\']".'], - [[{"start_time": "10:00", "ish_time": "11:00"}], 'Missing "finish_time" key in "dynamic_exposed_occupancy" in "group_1" ("presence"). Got keys: "[\'start_time\', \'ish_time\']".'], + [ + {"tal_people": 10, "infected": 5, "presence": [{"start_time": "10:00", "finish_time": "11:00"}],}, + 'Missing total_people key in occupancy group "group_A". Got keys: tal_people, infected, presence.' + ], + [ + {"total_people": 10, "infeted": 5, "presence": [{"start_time": "10:00", "finish_time": "11:00"}],}, + 'Missing infected key in occupancy group "group_A". Got keys: total_people, infeted, presence.' + ], + [ + {"total_people": 10, "infected": 5, "pesence": [{"start_time": "10:00", "finish_time": "11:00"}],}, + 'Missing presence key in occupancy group "group_A". Got keys: total_people, infected, pesence.' + ], ] ) -def test_dynamic_exposed_presence_TypeError(exposed_presence, error, baseline_form: virus_validator.VirusFormData): - baseline_form.occupancy_format = "dynamic" - baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}] - baseline_form.dynamic_exposed_occupancy = { - "group_1": { +def test_occupancy_general_params_TypeError(occupancy, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy = {"group_A": occupancy} + with pytest.raises(TypeError, match=re.escape(error)): + baseline_form.validate() + + +@pytest.mark.parametrize( + ["occupancy_presence", "error"], + [ + [{"start_time": "10:00", "finish_time": "11:00"}, 'The presence parameter in occupancy group "group_A" should be a valid list. Got .'], + [[], 'The presence parameter in occupancy group "group_A" should be a valid, non-empty list. Got [].'], + [[["start_time", "10:00", "finish_time", "11:00"]], 'Each presence interval should be a valid dictionary. Got in occupancy group "group_A".'], + [[{"art_time": "10:00", "finish_time": "11:00"}], 'Missing start_time key in presence parameter of occupancy group "group_A". Got keys: art_time, finish_time.'], + [[{"start_time": "10:00", "ish_time": "11:00"}], 'Missing finish_time key in presence parameter of occupancy group "group_A". Got keys: start_time, ish_time.'], + ] +) +def test_occupancy_presence_TypeError(occupancy_presence, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy = { + "group_A": { "total_people": 10, - "presence": exposed_presence, + "infected": 5, + "presence": occupancy_presence, } } with pytest.raises(TypeError, match=re.escape(error)): @@ -662,42 +643,60 @@ def test_dynamic_exposed_presence_TypeError(exposed_presence, error, baseline_fo @pytest.mark.parametrize( - ["exposed_presence", "error"], + ["occupancy_presence", "error"], [ - [[{"start_time": "10", "finish_time": "11:00"}], 'Invalid time format for "start_time" in "dynamic_exposed_occupancy" in "group_1" ("presence"). Expected "HH:MM". Got "10".'], - [[{"start_time": "10:00", "finish_time": "11"}], 'Invalid time format for "finish_time" in "dynamic_exposed_occupancy" in "group_1" ("presence"). Expected "HH:MM". Got "11".'], + [[{"start_time": "10", "finish_time": "11:00"}], 'Invalid time format found in presence parameter of occupancy group "group_A". Expected HH:MM, got 10.'], + [[{"start_time": "10:00", "finish_time": "11"}], 'Invalid time format found in presence parameter of occupancy group "group_A". Expected HH:MM, got 11.'], ] ) -def test_dynamic_exposed_presence_ValueError(exposed_presence, error, baseline_form: virus_validator.VirusFormData): - baseline_form.occupancy_format = "dynamic" - baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}] - baseline_form.dynamic_exposed_occupancy = { - "group_1": { +def test_occupancy_presence_ValueError(occupancy_presence, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy = { + "group_A": { "total_people": 10, - "presence": exposed_presence + "infected": 5, + "presence": occupancy_presence } } with pytest.raises(ValueError, match=re.escape(error)): baseline_form.validate() + @pytest.mark.parametrize( - ["exposed_total_people", "error"], + ["total_people", "error"], [ - ["10", 'The "total_people" in "dynamic_exposed_occupancy" group "group_1" should be a non-negative integer. Got "10".'], - [9.8, 'The "total_people" in "dynamic_exposed_occupancy" group "group_1" should be a non-negative integer. Got "9.8".'], - [[10], 'The "total_people" in "dynamic_exposed_occupancy" group "group_1" should be a non-negative integer. Got "[10]".'], - [-1, 'The "total_people" in "dynamic_exposed_occupancy" group "group_1" should be a non-negative integer. Got "-1".'], + ["10", 'The total_people input in occupancy group "group_A" should be a non-negative integer. Got 10.'], + [9.8, 'The total_people input in occupancy group "group_A" should be a non-negative integer. Got 9.8.'], + [[10], 'The total_people input in occupancy group "group_A" should be a non-negative integer. Got [10].'], + [-1, 'The total_people input in occupancy group "group_A" should be a non-negative integer. Got -1.'], ] ) -def test_dynamic_exposed_total_people_ValueError(exposed_total_people, error, baseline_form: virus_validator.VirusFormData): - baseline_form.occupancy_format = "dynamic" - baseline_form.dynamic_infected_occupancy = [ - {"total_people": 1, "start_time": "08:00", "finish_time": "10:00"}, - {"total_people": 2, "start_time": "10:00", "finish_time": "18:00"}, +def test_occupancy_total_people_ValueError(total_people, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy = { + "group_A": { + "total_people": total_people, + "infected": 10, + "presence": [{"start_time": "08:00", "finish_time": "18:00"},], + }, + } + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + +@pytest.mark.parametrize( + ["infected", "error"], + [ + ["10", 'The infected input in occupancy group "group_A" should be a non-negative integer. Got 10.'], + [9.8, 'The infected input in occupancy group "group_A" should be a non-negative integer. Got 9.8.'], + [[10], 'The infected input in occupancy group "group_A" should be a non-negative integer. Got [10].'], + [-1, 'The infected input in occupancy group "group_A" should be a non-negative integer. Got -1.'], + [30, 'The number of infected people (30) cannot be greater than the total people (20).'] ] - baseline_form.dynamic_exposed_occupancy = { - "group_1": { - "total_people": exposed_total_people, +) +def test_occupancy_infected_ValueError(infected, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy = { + "group_A": { + "total_people": 20, + "infected": infected, "presence": [{"start_time": "08:00", "finish_time": "18:00"},], }, } @@ -705,42 +704,11 @@ def test_dynamic_exposed_total_people_ValueError(exposed_total_people, error, ba baseline_form.validate() -def test_dynamic_infected_overlap(baseline_form: virus_validator.VirusFormData): - baseline_form.occupancy_format = "dynamic" - baseline_form.dynamic_infected_occupancy = [ - {"total_people": 10, "start_time": "08:00", "finish_time": "18:00"}, - {"total_people": 10, "start_time": "10:00", "finish_time": "18:00"}, - ] - baseline_form.dynamic_exposed_occupancy = { - "group_1": { - "total_people": 10, - "presence": [{"start_time": "08:00", "finish_time": "18:00"}], - }, - "group_2": { - "total_people": 10, - "presence": [{"start_time": "10:00", "finish_time": "11:00"}], - }, - "group_3": { - "total_people": 10, - "presence": [{"start_time": "15:00", "finish_time": "18:00"}], - }, - } - error = ( - 'Overlap detected: New interaction ' - '"{\'total_people\': 10, \'start_time\': \'10:00\', \'finish_time\': \'18:00\'}" ' - 'overlaps with existing interaction ' - '"{\'total_people\': 10, \'start_time\': \'08:00\', \'finish_time\': \'18:00\'}".' - ) - with pytest.raises(ValueError, match=re.escape(error)): - baseline_form.validate() - - -def test_dynamic_exposure_group_duplication(baseline_form: virus_validator.VirusFormData): - baseline_form.occupancy_format = "dynamic" - baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}] - baseline_form.dynamic_exposed_occupancy = { - "group_1": { +def test_occupancy_presence_overlap(baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy = { + "group_A": { "total_people": 10, + "infected": 5, "presence": [ {"start_time": "08:00", "finish_time": "17:00"}, {"start_time": "13:00", "finish_time": "14:00"}, @@ -748,10 +716,10 @@ def test_dynamic_exposure_group_duplication(baseline_form: virus_validator.Virus }, } error = ( - 'Overlap detected: New interaction ' - '"{\'start_time\': \'13:00\', \'finish_time\': \'14:00\'}"' - ' overlaps with existing interaction ' - '"{\'start_time\': \'08:00\', \'finish_time\': \'17:00\'}".' + 'Overlap detected: The entry ' + '{\'start_time\': \'13:00\', \'finish_time\': \'14:00\'}' + ' overlaps with an already existing entry ' + '({\'start_time\': \'08:00\', \'finish_time\': \'17:00\'}).' ) with pytest.raises(ValueError, match=re.escape(error)): baseline_form.validate() @@ -760,32 +728,32 @@ def test_dynamic_exposure_group_duplication(baseline_form: virus_validator.Virus @pytest.mark.parametrize( ["short_range_input", "error"], [ - [[["expiration", "Shouting", "start_time", "09:00", "duration", 30]], 'Each entry in "short_range_interactions" should be a dictionary. Got "".'], - [[{"expiratio": "Shouting", "start_time": "09:00", "duration": 30}], 'Missing "expiration" key in "short_range_interactions", "group_1". Got keys: "[\'expiratio\', \'start_time\', \'duration\']".'], - [[{"expiration": "Shouting", "start_tim": "09:00", "duration": 30}], 'Missing "start_time" key in "short_range_interactions", "group_1". Got keys: "[\'expiration\', \'start_tim\', \'duration\']".'], - [[{"expiration": "Shouting", "start_time": "09:00", "duratio": 30}], 'Missing "duration" key in "short_range_interactions", "group_1". Got keys: "[\'expiration\', \'start_time\', \'duratio\']".'], + [[["expiration", "Shouting", "start_time", "09:00", "duration", 30]], 'Each short-range interaction should be a dictionary. Got in occupancy group "group_A".'], + [[{"expiratio": "Shouting", "start_time": "09:00", "duration": 30}], 'Missing expiration key in short-range interaction for occupancy group "group_A". Got keys: expiratio, start_time, duration.'], + [[{"expiration": "Shouting", "start_tim": "09:00", "duration": 30}], 'Missing start_time key in short-range interaction for occupancy group "group_A". Got keys: expiration, start_tim, duration.'], + [[{"expiration": "Shouting", "start_time": "09:00", "duratio": 30}], 'Missing duration key in short-range interaction for occupancy group "group_A". Got keys: expiration, start_time, duratio.'], ] ) def test_short_range_TypeError(short_range_input, error, baseline_form: virus_validator.VirusFormData): baseline_form.short_range_option = "short_range_yes" - baseline_form.short_range_interactions = {"group_1": short_range_input} + baseline_form.short_range_interactions = {"group_A": short_range_input} with pytest.raises(TypeError, match=re.escape(error)): baseline_form.validate() def test_short_range_exposure_group(baseline_form: virus_validator.VirusFormData): - baseline_form.occupancy_format = 'dynamic' - baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}] - baseline_form.dynamic_exposed_occupancy = { - "group_1": { - "total_people": 10, + baseline_form.occupancy = { + "group_A": { + "total_people": 20, + "infected": 10, "presence": [ {"start_time": "10:00", "finish_time": "12:00"}, {"start_time": "13:00", "finish_time": "17:00"}, ], }, - "group_2": { - "total_people": 10, + "group_B": { + "total_people": 20, + 'infected': 10, "presence": [ {"start_time": "10:00", "finish_time": "11:00"}, ], @@ -795,19 +763,19 @@ def test_short_range_exposure_group(baseline_form: virus_validator.VirusFormData # Check for existence of the dictionary key baseline_form.short_range_option = 'short_range_yes' baseline_form.short_range_interactions = { - "group_4": [{"expiration": "Shouting", "start_time": "10:30", "duration": 30}], + "group_C": [{"expiration": "Shouting", "start_time": "10:30", "duration": 30}], } - error = 'Exposure group "group_4" in short-range interaction not found in dynamic exposed occupancy.' + error = 'Occupancy group "group_C" referenced in short-range interactions was not found in the occupancy input.' with pytest.raises(ValueError, match=re.escape(error)): baseline_form.validate() # Check if interaction time is within simulation time baseline_form.short_range_interactions = { - "group_1": [{"expiration": "Shouting", "start_time": "18:00", "duration": 30}], + "group_A": [{"expiration": "Shouting", "start_time": "18:00", "duration": 30}], } error = ( - 'Short-range interaction "{\'expiration\': \'Shouting\', \'start_time\': \'18:00\', \'duration\': 30}"' - ' does not fall within any long-range exposure interval in "group_1".' + 'Short-range interaction {\'expiration\': \'Shouting\', \'start_time\': \'18:00\', \'duration\': 30}' + ' does not fall within any presence interval in occupancy group "group_A".' ) with pytest.raises(ValueError, match=re.escape(error)): baseline_form.validate() @@ -816,52 +784,82 @@ def test_short_range_exposure_group(baseline_form: virus_validator.VirusFormData @pytest.mark.parametrize( ["short_range_input", "error"], [ - [[{"expiration": "Shouting", "start_time": "9", "duration": 30}], 'Invalid time format for "start_time" in "short_range_interactions", "group_1". Expected "HH:MM". Got "9".'], - [[{"expiration": "Whisper", "start_time": "09:00", "duration": 30}], 'The "expiration" in "short_range_interactions", "group_1", does not exist in the registry. Got "Whisper".'], - [[{"expiration": "Shouting", "start_time": "09:00", "duration": -30}], 'The "duration" in "short_range_interactions", "group_1", should be a non-negative integer. Got "-30".'], + [[{"expiration": "Shouting", "start_time": "9", "duration": 30}], 'Invalid time format for start_time in short-range interaction for occupancy group "group_A". Expected HH:MM, got 9.'], + [[{"expiration": "Whisper", "start_time": "09:00", "duration": 30}], 'Invalid expiration value in short-range interaction for occupancy group "group_A". Got "Whisper".'], + [[{"expiration": "Shouting", "start_time": "09:00", "duration": -30}], 'The duration value in short-range interaction for occupancy group "group_A" should be a non-negative integer. Got -30.'], ] ) def test_short_range_value_error(short_range_input, error, baseline_form: virus_validator.VirusFormData): baseline_form.short_range_option = "short_range_yes" - baseline_form.short_range_interactions = {"group_1": short_range_input} + baseline_form.short_range_interactions = {"group_A": short_range_input} with pytest.raises(ValueError, match=re.escape(error)): baseline_form.validate() -def test_short_range_with_static_occupancy(baseline_form: virus_validator.VirusFormData): - # By default the occupancy format is 'static' +def test_short_range_with_occupancy_format(baseline_form: virus_validator.VirusFormData): baseline_form.short_range_option = "short_range_yes" - baseline_form.short_range_interactions = {"group_1": [{"expiration": "Shouting", "start_time": "07:00", "duration": 30}]} + baseline_form.short_range_interactions = {"group_A": [{"expiration": "Shouting", "start_time": "07:00", "duration": 30}]} - # Check if interaction is defined during simulation time + # Checks if interaction is defined during simulation time error = ( - 'Short-range interactions should be defined during simulation time. Got ' - '"{\'expiration\': \'Shouting\', \'start_time\': \'07:00\', \'duration\': 30}".' + 'Short-range interactions must occur during simulation time. Got' + ' {\'expiration\': \'Shouting\', \'start_time\': \'07:00\', \'duration\': 30}' + ' in occupancy group "group_A".' ) with pytest.raises(ValueError, match=re.escape(error)): baseline_form.validate() - # Check overlap of interactions + # Checks overlap of short-range interactions baseline_form.short_range_interactions = { - "group_1": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}, + "group_A": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}, {"expiration": "Shouting", "start_time": "10:10", "duration": 15}], } error = ( - 'Overlap detected for "short-range interaction": New interaction ' - '"{\'expiration\': \'Shouting\', \'start_time\': \'10:10\', \'duration\': 15}"' - ' overlaps with existing interaction "{\'expiration\': \'Shouting\', \'start_time\': \'10:00\', \'duration\': 30}".' + 'Overlap detected: The entry ' + '{\'expiration\': \'Shouting\', \'start_time\': \'10:10\', \'duration\': 15}' + ' overlaps with an already existing entry ' + '({\'expiration\': \'Shouting\', \'start_time\': \'10:00\', \'duration\': 30}).' ) with pytest.raises(ValueError, match=re.escape(error)): baseline_form.validate() - # Check if more than one group is defined - baseline_form.short_range_interactions = { - "group_1": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}, - {"expiration": "Shouting", "start_time": "10:10", "duration": 15}], - "group_2": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}] - } + # Checks if short_range_option relates with the short_range-interactions input + baseline_form.short_range_option = "short_range_yes" + baseline_form.short_range_interactions = {} error = ( - 'When occupancy format is "static", there should be only one interaction group in "short_range_interactions".' + 'When short_range_option input is set to "short_range_yes", the short_range_interactions ' + 'input should not be empty. Got {}.' + ) + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + # Checks if more than one group is defined (legacy) + baseline_form.short_range_interactions = { + "group_A": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}], + "group_B": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}] + } + error = ( + 'Incompatible number of occupancy groups in the short_range_interactions input. ' + 'Got 2 groups when the maximum is 1.' + ) + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + # Checks if more than one group is defined + baseline_form.occupancy = { + "group_A": {"total_people": 20, "infected": 10, "presence": [ + {"start_time": "10:00", "finish_time": "12:00"}, + {"start_time": "13:00", "finish_time": "17:00"}, + ], + } + } + baseline_form.short_range_interactions = { + "group_A": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}], + "group_B": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}] + } + error = ( + 'Incompatible number of occupancy groups in the short_range_interactions input. ' + 'Got 2 groups when the maximum is 1 (from the occupancy input).' ) with pytest.raises(ValueError, match=re.escape(error)): baseline_form.validate() diff --git a/caimira/tests/models/test_dynamic_population.py b/caimira/tests/models/test_dynamic_population.py index bec88361..75400d2a 100644 --- a/caimira/tests/models/test_dynamic_population.py +++ b/caimira/tests/models/test_dynamic_population.py @@ -213,7 +213,7 @@ def test_exposure_model_group_structure(data_registry, full_exposure_model: mode """ another_full_exposure_model = dc_utils.nested_replace(full_exposure_model, {'concentration_model.infected.number': 2, }) - with pytest.raises(ValueError, match=re.escape("All ExposureModels must have the same ConcentrationModel.")): + with pytest.raises(ValueError, match=re.escape("All ExposureModels must have the same infected number and presence in the ConcentrationModel.")): models.ExposureModelGroup(data_registry, exposure_models=(full_exposure_model, another_full_exposure_model, )) diff --git a/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py b/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py index c3964204..6b7e5ce2 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py +++ b/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py @@ -122,8 +122,8 @@ class VirusReportGenerator: model: models.ExposureModel = report_data['model'] data_registry_version: typing.Optional[str] = f"v{model.data_registry.version}" if model.data_registry.version else None - # Alternative scenarios data - if form.occupancy_format == 'static': + # Alternative scenarios data (only generated in the legacy version - when occupancy input is empty) + if not form.occupancy: context.update(alternative_scenarios_data(form, report_data, executor_factory)) # Alternative viral load data diff --git a/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js b/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js index 09e53b03..30bf4af1 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js +++ b/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js @@ -1,8 +1,6 @@ // Input data for CO2 fitting algorithm const CO2_data_form = [ "CO2_data", - "dynamic_exposed_occupancy", - "dynamic_infected_occupancy", "exposed_coffee_break_option", "exposed_coffee_duration", "exposed_finish", @@ -20,7 +18,7 @@ const CO2_data_form = [ "infected_lunch_start", "infected_people", "infected_start", - "occupancy_format", + "occupancy", "room_capacity", "room_volume", "specific_breaks", diff --git a/cern_caimira/src/cern_caimira/apps/calculator/static/js/form.js b/cern_caimira/src/cern_caimira/apps/calculator/static/js/form.js index 2686ac60..27ab227d 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/static/js/form.js +++ b/cern_caimira/src/cern_caimira/apps/calculator/static/js/form.js @@ -1238,7 +1238,7 @@ $(document).ready(function () {
-

+

@@ -1258,11 +1258,11 @@ $(document).ready(function () { // When short_range_yes option is selected, we want to inject rows for each expiractory activity, start_time and duration. $("body").on("click", ".add_node_btn_frm_field", function(e) { let last_row = $(".form_field_outer").find(".form_field_outer_row"); - if (last_row.length == 0) $("#dialog_sr").append(inject_sr_interaction(1, value = { activity: "", start_time: "", duration: "15", exposure_group: "A" })); + if (last_row.length == 0) $("#dialog_sr").append(inject_sr_interaction(1, value = { activity: "", start_time: "", duration: "15", exposure_group: "group_1" })); else { last_index = last_row.last().find(".short_range_option").prop("id").split("_").slice(-1)[0]; index = parseInt(last_index) + 1; - $("#dialog_sr").append(inject_sr_interaction(index, value = { activity: "", start_time: "", duration: "15", exposure_group: "A"})); + $("#dialog_sr").append(inject_sr_interaction(index, value = { activity: "", start_time: "", duration: "15", exposure_group: "group_1"})); } }); diff --git a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 index 79e99b2e..fad5a331 100644 --- a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 +++ b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 @@ -454,9 +454,7 @@ ?

- {# "static" vs. "dynamic" #} - - +
diff --git a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 index ac098ec1..b779fc9d 100644 --- a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 +++ b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.report.html.j2 @@ -72,7 +72,7 @@ {% block report_results scoped %}
-
Results{% if form.occupancy_format == 'dynamic' %} - {{ group_id }}{% endif %} +
Results{% if form.occupancy == {} %} - {{ group_id }}{% endif %}
-
Result uncertainties{% if form.occupancy_format == 'dynamic' %} - {{ group_id }}{% endif %} +
Result uncertainties{% if form.occupancy == {} %} - {{ group_id }}{% endif %}
- {% if form.short_range_option == "short_range_no" and form.occupancy_format == "static" %} + {% if form.short_range_option == "short_range_no" and form.occupancy == {} %}
Alternative scenarios
- {% if form.short_range_option == "short_range_no" and form.occupancy_format == "static" %} + {% if form.short_range_option == "short_range_no" and form.occupancy == {} %}
  • Alternative Scenarios diff --git a/cern_caimira/src/cern_caimira/apps/templates/cern/calculator.report.html.j2 b/cern_caimira/src/cern_caimira/apps/templates/cern/calculator.report.html.j2 index 47f9971d..64dfc2b0 100644 --- a/cern_caimira/src/cern_caimira/apps/templates/cern/calculator.report.html.j2 +++ b/cern_caimira/src/cern_caimira/apps/templates/cern/calculator.report.html.j2 @@ -11,7 +11,7 @@ {% set long_range_prob_inf = group_results.prob_inf %} {% endif %} -{% if ((long_range_prob_inf > red_prob_lim) or (form.occupancy_format == "static" and group_results.expected_new_cases >= 1)) %} +{% if ((long_range_prob_inf > red_prob_lim) or (form.occupancy == {} and group_results.expected_new_cases >= 1)) %} {% set long_range_scale_warning = 'red' %} {% set long_range_warning_color= 'bg-danger' %} {% elif (orange_prob_lim <= long_range_prob_inf <= red_prob_lim) %} @@ -22,7 +22,7 @@ {% set long_range_warning_color = 'bg-success' %} {% endif %} -{% if ((group_results.prob_inf > red_prob_lim) or (form.occupancy_format == "static" and group_results.expected_new_cases >= 1)) %} {% set scale_warning = 'red' %} +{% if ((group_results.prob_inf > red_prob_lim) or (form.occupancy == {} and group_results.expected_new_cases >= 1)) %} {% set scale_warning = 'red' %} {% elif (orange_prob_lim <= group_results.prob_inf <= red_prob_lim) %} {% set scale_warning = 'orange' %} {% else %} {% set scale_warning = 'green' %} {% endif %} @@ -139,12 +139,12 @@ Scenario P(i) - {% if form.occupancy_format == "static" %}Expected new cases{% endif %} + {% if form.occupancy == {} %}Expected new cases{% endif %} {% for scenario_name, scenario_stats in alternative_scenarios.stats.items() %} - {%if (( scenario_stats.probability_of_infection > red_prob_lim) or (form.occupancy_format == "static" and scenario_stats.expected_new_cases >= 1)) %} + {%if (( scenario_stats.probability_of_infection > red_prob_lim) or (form.occupancy == {} and scenario_stats.expected_new_cases >= 1)) %} {% elif (orange_prob_lim <= scenario_stats.probability_of_infection <= red_prob_lim) %} @@ -153,7 +153,7 @@ {% endif%} {{ scenario_name }} {{ scenario_stats.probability_of_infection | non_zero_percentage }} - {% if form.occupancy_format == "static" %}{{ scenario_stats.expected_new_cases | float_format }}{% endif %} + {% if form.occupancy == {} %}{{ scenario_stats.expected_new_cases | float_format }}{% endif %} {% endfor %} diff --git a/cern_caimira/tests/conftest.py b/cern_caimira/tests/conftest.py index cbc0a4ce..8958147d 100644 --- a/cern_caimira/tests/conftest.py +++ b/cern_caimira/tests/conftest.py @@ -90,6 +90,6 @@ def exposure_model_w_outside_temp_changes(data_registry, baseline_exposure_model def baseline_form_with_sr(baseline_form_data, data_registry): form_data_sr = baseline_form_data form_data_sr['short_range_option'] = 'short_range_yes' - form_data_sr['short_range_interactions'] = '{"static": [{"expiration": "Shouting", "start_time": "10:30", "duration": 30}]}' + form_data_sr['short_range_interactions'] = '{"group_1": [{"expiration": "Shouting", "start_time": "10:30", "duration": 30}]}' form_data_sr['short_range_occupants'] = 5 return virus_validator.VirusFormData.from_dict(form_data_sr, data_registry) diff --git a/cern_caimira/tests/test_report_generator.py b/cern_caimira/tests/test_report_generator.py index 56d77e11..a61d1a9c 100644 --- a/cern_caimira/tests/test_report_generator.py +++ b/cern_caimira/tests/test_report_generator.py @@ -112,8 +112,8 @@ def test_expected_new_cases(baseline_form_with_sr: VirusFormData): # Short- and Long-range contributions report_data = rep_gen.calculate_report_data(baseline_form_with_sr, executor_factory) - sr_lr_expected_new_cases = report_data['groups']['static']['expected_new_cases'] - sr_lr_prob_inf = report_data['groups']['static']['prob_inf']/100 + sr_lr_expected_new_cases = report_data['groups']['group_1']['expected_new_cases'] + sr_lr_prob_inf = report_data['groups']['group_1']['prob_inf']/100 # Long-range contributions alone alternative_scenarios = rep_gen.manufacture_alternative_scenarios(baseline_form_with_sr) @@ -123,3 +123,27 @@ def test_expected_new_cases(baseline_form_with_sr: VirusFormData): lr_expected_new_cases = alternative_statistics['stats']['Base scenario without short-range interactions']['expected_new_cases'] np.testing.assert_almost_equal(sr_lr_expected_new_cases, lr_expected_new_cases + sr_lr_prob_inf * baseline_form_with_sr.short_range_occupants, 2) + + +def test_alternative_scenarios(baseline_form): + """ + Tests if the alternative scenarios are only generated when + the occupancy input is empty ({}) - legacy usage. + """ + generator: VirusReportGenerator = make_app().settings['report_generator'] + report_data = generator.prepare_context("", baseline_form, partial( + concurrent.futures.ThreadPoolExecutor, 1, + )) + assert "alternative_scenarios" in report_data.keys() + + baseline_form.occupancy = { + "group_A": { + "total_people": 10, + "infected": 5, + "presence": [{"start_time": "10:00", "finish_time": "11:00"}] + } + } + report_data = generator.prepare_context("", baseline_form, partial( + concurrent.futures.ThreadPoolExecutor, 1, + )) + assert "alternative_scenarios" not in report_data.keys()