updates following reviewing process:

- replaced 'dynamic_exposed_occupancy', 'dynamic_infected_occupancy' and 'dynamic_occupancy' by 'occupancy' that encompasses both
- developed respective functionality, including CAiMIRA UI and REST API
- simplified logic with entry overlaps and respective validation
- simplified CO2 logic with dynamic groups
- simplified logic with alternative scenarios
- changed default identifier of unique group
- handled naming conventions
- updated existing tests and added new tests for the most recent structure
- generate docscrings update
This commit is contained in:
lrdossan 2025-03-21 17:35:49 +01:00
parent 53c1f22375
commit a56ad5f77a
18 changed files with 644 additions and 612 deletions

View file

@ -1643,7 +1643,7 @@ class ExposureModel:
exposed_to_short_range: int = 0
#: Unique group identifier
identifier: str = 'static'
identifier: str = 'group_1'
#: The number of times the exposure event is repeated (default 1).
@property
@ -1930,8 +1930,10 @@ class ExposureModelGroup:
"""
first_concentration_model = self.exposure_models[0].concentration_model
for model in self.exposure_models[1:]:
if model.concentration_model != first_concentration_model:
raise ValueError("All ExposureModels must have the same ConcentrationModel.")
# Check that the number of infected people and their presence is the same
if (model.concentration_model.infected.number != first_concentration_model.infected.number or
model.concentration_model.infected.presence != first_concentration_model.infected.presence):
raise ValueError("All ExposureModels must have the same infected number and presence in the ConcentrationModel.")
@method_cache
def _deposited_exposure_list(self) -> typing.List[_VectorisedFloat]:

View file

@ -74,7 +74,6 @@ def _build_mc_model(model: dataclass_instance) -> typing.Type[MCModelBase[_Model
elif new_field.type == typing.Tuple[models.SpecificInterval, ...]:
SI = getattr(sys.modules[__name__], "SpecificInterval")
field_type = typing.Tuple[typing.Union[models.SpecificInterval, SI], ...]
elif new_field.type == typing.Union[int, models.IntPiecewiseConstant]:
IPC = getattr(sys.modules[__name__], "IntPiecewiseConstant")
field_type = typing.Union[int, models.IntPiecewiseConstant, IPC]

View file

@ -246,13 +246,13 @@ def group_results(form: VirusFormData, model_group: models.ExposureModelGroup) -
@profiler.profile
def calculate_report_data(form: VirusFormData, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]:
def calculate_report_data(form: VirusFormData,
executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]:
"""
Simulation output data.
Generates the simulation output data.
"""
model_group: models.ExposureModelGroup = form.build_model()
results_per_group: typing.Dict[str, typing.Any] = group_results(form, model_group)
times = interesting_times(model_group)
# CO2 concentration
@ -458,8 +458,13 @@ def calculate_vl_scenarios_percentiles(model: mc.ExposureModel) -> typing.Dict[s
}
def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, typing.Union[mc.ExposureModel, mc.ExposureModelGroup]]:
scenarios = {}
def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]:
"""
Generates the data structure containing all the alternative scenarios.
It is only compatible with single group occupancy models, therefore
it returns an ExposureModel object and not an ExposureModelGroup.
"""
scenarios: typing.Dict[str, models.ExposureModelGroup] = {}
if (form.short_range_option == "short_range_no"):
# Two special option cases - HEPA and/or FFP2 masks.
FFP2_being_worn = bool(form.mask_wearing_option ==
@ -512,31 +517,33 @@ def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, t
scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model()
else:
# Adjust the number of exposed people with long-range exposure based on short-range interactions
if form.occupancy_format == 'static':
if not form.occupancy:
no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions={}, total_people=form.total_people - form.short_range_occupants)
elif form.occupancy_format == 'dynamic':
for group_name, occupancy_list in form.dynamic_exposed_occupancy.items():
else:
for group_id, group in form.occupancy.items():
# Check if the group exists in short-range interactions
if group_name in form.short_range_interactions:
if group_id in form.short_range_interactions:
short_range_count = form.short_range_occupants
total_people = occupancy_list['total_people']
total_people = group['total_people']
if total_people > short_range_count > 0:
# Update the total_people with the adjusted value
occupancy_list['total_people'] = max(0, total_people - short_range_count)
no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions={}, dynamic_exposed_occupancy=form.dynamic_exposed_occupancy)
group['total_people'] = max(0, total_people - short_range_count)
no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions={}, occupancy=form.occupancy)
scenarios['Base scenario without short-range interactions'] = no_short_range_alternative.build_mc_model()
for sceario_name, scenario in scenarios.items():
scenarios[sceario_name] = scenario.exposure_models[0] # type: ignore
return scenarios
def scenario_statistics(
mc_model_group: mc.ExposureModelGroup,
mc_model: mc.ExposureModel,
sample_times: typing.List[float],
compute_prob_exposure: bool,
):
model_group: models.ExposureModelGroup = mc_model_group.build_model(
size=mc_model_group.data_registry.monte_carlo['sample_size'])
model = model_group.exposure_models[0]
model = mc_model.build_model(
size=mc_model.data_registry.monte_carlo['sample_size']
)
return {
'probability_of_infection': np.mean(model.infection_probability()),
@ -552,21 +559,21 @@ def scenario_statistics(
def comparison_report(
form: VirusFormData,
report_data: typing.Dict[str, typing.Any],
scenarios: typing.Dict[str, mc.ExposureModelGroup],
scenarios: typing.Dict[str, mc.ExposureModel],
executor_factory: typing.Callable[[], concurrent.futures.Executor],
):
if (form.short_range_option == "short_range_no"):
statistics = {
'Current scenario': {
'probability_of_infection': report_data['groups']['static']['prob_inf'],
'expected_new_cases': report_data['groups']['static']['expected_new_cases'],
'concentrations': report_data['groups']['static']['concentrations'],
'probability_of_infection': report_data['groups']['group_1']['prob_inf'],
'expected_new_cases': report_data['groups']['group_1']['expected_new_cases'],
'concentrations': report_data['groups']['group_1']['concentrations'],
}
}
else:
statistics = {}
compute_prob_exposure = form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure" and form.occupancy_format == "static"
compute_prob_exposure = form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure" and not form.occupancy
with executor_factory() as executor:
results = executor.map(
@ -585,7 +592,9 @@ def comparison_report(
}
def alternative_scenarios_data(form: VirusFormData, report_data: typing.Dict[str, typing.Any], executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]:
def alternative_scenarios_data(form: VirusFormData,
report_data: typing.Dict[str, typing.Any],
executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]:
alternative_scenarios: typing.Dict[str, typing.Any] = manufacture_alternative_scenarios(form=form)
return {
'alternative_scenarios': comparison_report(form=form, report_data=report_data, scenarios=alternative_scenarios, executor_factory=executor_factory)

View file

@ -28,8 +28,6 @@ class CO2FormData(FormData):
# and the defaults in any html form must not be contradictory.
_DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = {
'CO2_data': '{}',
'dynamic_exposed_occupancy': '{}',
'dynamic_infected_occupancy': '[]',
'exposed_coffee_break_option': 'coffee_break_0',
'exposed_coffee_duration': 5,
'exposed_finish': '17:30',
@ -47,7 +45,7 @@ class CO2FormData(FormData):
'infected_lunch_start': '12:30',
'infected_people': 1,
'infected_start': '08:30',
'occupancy_format': 'static',
'occupancy': '{}',
'room_capacity': None,
'room_volume': NO_DEFAULT,
'specific_breaks': '{}',
@ -74,7 +72,7 @@ class CO2FormData(FormData):
raise TypeError(f'The room capacity should be a valid integer (> 0). Got {self.room_capacity}.')
# Validate specific inputs - breaks (exposed and infected)
if self.specific_breaks != {} and self.occupancy_format == 'static':
if self.specific_breaks != {} and not self.occupancy:
if type(self.specific_breaks) is not dict:
raise TypeError('The specific breaks should be in a dictionary.')

View file

@ -16,8 +16,6 @@ DEFAULTS = {
'calculator_version': NO_DEFAULT,
'ceiling_height': 0.,
'conditional_probability_viral_loads': False,
'dynamic_exposed_occupancy': '{}',
'dynamic_infected_occupancy': '[]',
'event_month': 'January',
'exposed_coffee_break_option': 'coffee_break_0',
'exposed_coffee_duration': 5,
@ -49,7 +47,7 @@ DEFAULTS = {
'mask_type': 'Type I',
'mask_wearing_option': 'mask_off',
'mechanical_ventilation_type': 'not-applicable',
'occupancy_format': 'static',
'occupancy': '{}',
'opening_distance': 0.,
'precise_activity': '{}',
'room_heating_option': False,

View file

@ -36,15 +36,11 @@ class FormData:
infected_lunch_start: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
infected_start: minutes_since_midnight
infected_people: int
occupancy_format: str
occupancy: dict
room_volume: float
specific_breaks: dict
total_people: int
# Dynamic occupancy inputs
dynamic_exposed_occupancy: dict
dynamic_infected_occupancy: list
data_registry: DataRegistry
_DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS
@ -97,146 +93,168 @@ class FormData:
form_dict.pop(attr)
return form_dict
def validate_dynamic_input(self, dynamic_input_value, input_name, dynamic_input_key = None):
# Check if the input is a valid non-empty list
specific_group_msg = f' in "{dynamic_input_key}" ("presence")' if dynamic_input_key else ''
def validate_group_presence_input(self, group_id: str, group_presence: typing.Dict):
"""
When occupancy is defined, this method validates the
presence times within an occupancy group.
"""
# Checks if the presence input is a valid list
if not isinstance(group_presence, list):
raise TypeError(f'The presence parameter in occupancy group "{group_id}" should be a valid list. Got {type(group_presence)}.')
# Checks if the presence input is populated
if len(group_presence) == 0:
raise TypeError(f'The presence parameter in occupancy group "{group_id}" should be a valid, non-empty list. Got {group_presence}.')
if not isinstance(dynamic_input_value, list) :
raise TypeError(f'The input "{input_name}"{specific_group_msg} should be a valid, non-empty list. Got "{type(dynamic_input_value)}".')
# Check if input is populated
if len(dynamic_input_value) == 0:
raise ValueError(f'The input "{input_name}" should be a valid, non-empty list. Got "{dynamic_input_value}".')
# Already processed presence intervals for overlap checking
existing_occupancy_presence_interval = []
# To store already processed interactions for overlap checking
existing_dynamic_infected_interval = []
existing_dynamic_exposed_interval = []
short_range_existing_interaction = []
for presence_interval in group_presence:
# Checks if each presence entry is a valid dict
if not isinstance(presence_interval, typing.Dict):
raise TypeError(f'Each presence interval should be a valid dictionary. Got {type(presence_interval)} in occupancy group "{group_id}".')
for entry in dynamic_input_value:
# Check if each entry is a dictionary
if not isinstance(entry, typing.Dict):
raise TypeError(f'Each entry in "{input_name}" should be a dictionary. Got "{type(entry)}".')
# Parameters in each presence entry
presence_params = presence_interval.keys()
# Check for required keys in each entry
dict_keys = entry.keys()
# Checks for the "start_time" and "finish_time" params
for time_param in ["start_time", "finish_time"]:
if time_param not in presence_params:
raise TypeError(f'Missing {time_param} key in presence parameter of occupancy group "{group_id}".'
f' Got keys: {", ".join(presence_params)}.')
# Check for the "start_time" and "finish_time" keys for "dynamic_exposed_occupancy" and "dynamic_infected_occupancy"
if input_name in ["dynamic_exposed_occupancy", "dynamic_infected_occupancy"]:
# Check for time format in "start_time" and "finish_time"
for time_key in ["start_time", "finish_time"]:
if time_key not in dict_keys:
raise TypeError(f'Missing "{time_key}" key in "{input_name}"{specific_group_msg}. Got keys: "{list(dict_keys)}".')
time_value = entry[time_key]
if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time_value):
raise ValueError(f'Invalid time format for "{time_key}" in "{input_name}"{specific_group_msg}. Expected "HH:MM". Got "{time_value}".')
if entry["finish_time"] <= entry["start_time"]:
raise ValueError(f'Occupancy presence in "{input_name}"{specific_group_msg} entry has a finish time after the start time. Got start time: "{entry["start_time"]}" and finish time: "{entry["finish_time"]}".')
time_value = presence_interval[time_param]
if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time_value):
raise ValueError(f'Invalid time format found in presence parameter of occupancy group "{group_id}". '
f'Expected HH:MM, got {time_value}.')
# Check for the "dynamic_infected_occupancy" uniqueness of intervals,
# as well as the "total_people" keyword and its constraints
if input_name == "dynamic_infected_occupancy":
self.check_overlap(entry, existing_dynamic_infected_interval)
existing_dynamic_infected_interval.append(entry)
if "total_people" not in dict_keys:
raise TypeError(f'Missing "total_people" key in "dynamic_infected_occupancy". Got keys: "{list(dict_keys)}".')
else:
value = entry["total_people"]
if not isinstance(value, int) or value < 0:
raise ValueError(f'The "total_people" in "{input_name}" should be a non-negative integer. Got "{value}".')
if presence_interval["finish_time"] <= presence_interval["start_time"]:
raise ValueError(f'Inconsistent times found in "presence" parameter of occupancy group "{group_id}".'
f'The "{presence_interval}" entry has a start time ("{presence_interval["start_time"]}") '
f'after the finish time ("{presence_interval["finish_time"]}").')
# Check for the "dynamic_exposed_occupancy" uniqueness of intervals
if input_name == "dynamic_exposed_occupancy":
self.check_overlap(entry, existing_dynamic_exposed_interval)
existing_dynamic_exposed_interval.append(entry)
# Checks for the occupancy group uniqueness of intervals
self.check_overlap(presence_interval, existing_occupancy_presence_interval)
existing_occupancy_presence_interval.append(presence_interval)
# Check for remaining short-range inputs
if input_name == "short_range_interactions":
# Check for time format in "start_time" and "finish_time"
if "start_time" not in dict_keys:
raise TypeError(f'Missing "start_time" key in "short_range_interactions", "{dynamic_input_key}". Got keys: "{list(dict_keys)}".')
start_time = entry["start_time"]
def validate_short_range_interaction_input(self, group_id: str, sr_interactions: typing.List):
"""
Validates the short-range interactions within an occupancy group.
"""
# Within a group, checks if the short-range input is a valid list
if not isinstance(sr_interactions, list):
raise TypeError(f'The short-range interactions in occupancy group "{group_id}" should be defined in a valid list. Got {type(sr_interactions)}.')
# Within a group, checks if the list is populated
if len(sr_interactions) == 0:
raise TypeError(f'The short-range interactions in occupancy group "{group_id}" should be a non-empty list. Got {type(sr_interactions)}.')
# Already processed interactions for overlap checking
existing_sr_interaction_interval: typing.List = []
for interaction in sr_interactions:
# Checks if each interaction is a valid dict
if not isinstance(interaction, typing.Dict):
raise TypeError(f'Each short-range interaction should be a dictionary. Got {type(interaction)} in occupancy group "{group_id}".')
# Parameters in each short-range interaction
interaction_params = interaction.keys()
# Checks for the expiration key and its constraints
if "expiration" not in interaction_params:
raise TypeError(f'Missing expiration key in short-range interaction for occupancy group "{group_id}". Got keys: {", ".join(interaction_params)}.')
else:
expiration = interaction["expiration"]
if expiration not in list(self.data_registry.expiration_particle['expiration_BLO_factors'].keys()): # type: ignore
raise ValueError(f'Invalid expiration value in short-range interaction for occupancy group "{group_id}". Got "{expiration}".')
# Checks for start_time key and its format
if "start_time" not in interaction_params:
raise TypeError(f'Missing start_time key in short-range interaction for occupancy group "{group_id}". Got keys: {", ".join(interaction_params)}.')
else:
start_time = interaction["start_time"]
if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(start_time):
raise ValueError(f'Invalid time format for "start_time" in "short_range_interactions", "{dynamic_input_key}". Expected "HH:MM". Got "{start_time}".')
raise ValueError(f'Invalid time format for start_time in short-range interaction for occupancy group "{group_id}". Expected HH:MM, got {start_time}.')
# Check for the "expiration" key and its constraints
if "expiration" not in dict_keys:
raise TypeError(f'Missing "expiration" key in "short_range_interactions", "{dynamic_input_key}". Got keys: "{list(dict_keys)}".')
else:
value = entry["expiration"]
if value not in list(self.data_registry.expiration_particle['expiration_BLO_factors'].keys()):
raise ValueError(f'The "expiration" in "short_range_interactions", "{dynamic_input_key}", does not exist in the registry. Got "{value}".')
if "duration" not in dict_keys:
raise TypeError(f'Missing "duration" key in "short_range_interactions", "{dynamic_input_key}". Got keys: "{list(dict_keys)}".')
duration = entry["duration"]
# Checks for "duration" and its format
if "duration" not in interaction_params:
raise TypeError(f'Missing duration key in short-range interaction for occupancy group "{group_id}". Got keys: {", ".join(interaction_params)}.')
else:
duration = interaction["duration"]
if duration < 0:
raise ValueError(f'The "duration" in "short_range_interactions", "{dynamic_input_key}", should be a non-negative integer. Got "{duration}".')
raise ValueError(f'The duration value in short-range interaction for occupancy group "{group_id}" should be a non-negative integer. Got {duration}.')
# Occupancy format dependent inputs
if self.occupancy_format == 'dynamic':
# Find corresponding exposure group
exposure_group_obj = next(
(occupancy_group for occupancy_key, occupancy_group in self.dynamic_exposed_occupancy.items()
if occupancy_key == dynamic_input_key),
None
# Legacy usage - occupancy input is not defined (default empty dict)
if not self.occupancy:
# It means that we have a single exposure model
lr_start = min(self.infected_start, self.exposed_start)/60
lr_stop = max(self.infected_finish, self.exposed_finish)/60
if not self.check_interaction_is_within_long_range(interaction, existing_sr_interaction_interval, lr_start, lr_stop):
raise ValueError(
f'Short-range interactions must occur during simulation time. Got {interaction} in occupancy group "{group_id}".'
)
if exposure_group_obj is None:
raise ValueError(
f'Exposure group "{dynamic_input_key}" in short-range interaction not found in dynamic exposed occupancy.'
)
# Add interaction to the list of already processed interactions
existing_sr_interaction_interval.append(interaction)
else:
# Find corresponding exposure group
occupancy_group_obj = next(
(occupancy_value for occupancy_key, occupancy_value in self.occupancy.items()
if occupancy_key == group_id),
None
)
is_within_any_long_range = False
for exposure_interval in exposure_group_obj['presence']:
# Check for correct timing within long-range exposure and overlaps with existing interactions
long_range_start = time_string_to_minutes(exposure_interval['start_time'])/60
long_range_stop = time_string_to_minutes(exposure_interval['finish_time'])/60
if occupancy_group_obj is None:
raise ValueError(
f'Occupancy group "{group_id}" referenced in short-range interactions was not found in the occupancy input.'
)
# Flag to check if interaction falls within any long-range exposure interval
if self.check_time_and_overlap(
entry, short_range_existing_interaction, long_range_start, long_range_stop
): is_within_any_long_range = True
is_within_any_lr = False
for presence in occupancy_group_obj['presence']:
# Check for correct timing within long-range exposure and overlaps with existing interactions
lr_start = time_string_to_minutes(presence['start_time'])/60
lr_stop = time_string_to_minutes(presence['finish_time'])/60
# Add interaction to the list of processed interactions if within long-range
short_range_existing_interaction.append(entry)
# Flag to check if interaction falls within any long-range exposure interval
if self.check_interaction_is_within_long_range(interaction, existing_sr_interaction_interval, lr_start, lr_stop):
is_within_any_lr = True
# If no long-range interval contains the interaction, raise an error
if not is_within_any_long_range:
raise ValueError(
f'Short-range interaction "{entry}" does not fall within any long-range exposure interval in "{dynamic_input_key}".'
)
# Add interaction to the list of processed interactions if within long-range
existing_sr_interaction_interval.append(interaction)
elif self.occupancy_format == 'static':
# It means that we have a single exposure model
long_range_start = min(self.infected_start, self.exposed_start)/60
long_range_stop = max(self.infected_finish, self.exposed_finish)/60
# If the interaction does not fall within any presence interval of the occupancy group, raise an error
if not is_within_any_lr:
raise ValueError(
f'Short-range interaction {interaction} does not fall within any presence interval in occupancy group "{group_id}".'
)
if not self.check_time_and_overlap(entry, short_range_existing_interaction, long_range_start, long_range_stop):
raise ValueError(
f'Short-range interactions should be defined during simulation time. Got "{entry}".'
)
# Add interaction to the list of processed interactions
short_range_existing_interaction.append(entry)
else:
raise TypeError(
f'Undefined exposure type. Got "{self.occupancy_format}", accepted formats are "dynamic" or "exposed".')
def validate_dynamic_exposed_format(self, dynamic_exposed_group: dict, key: str):
def validate_dynamic_exposed_format(self, group_id: str, group: typing.Dict):
"""
Validates the expected keywords for the dynamic
exposed input.
Validates the expected keywords for the occupancy input.
"""
# Total people
if 'total_people' not in dynamic_exposed_group:
raise TypeError(f'Missing "total_people" key in "dynamic_exposed_occupancy" group "{key}". Got keys: "{list(dynamic_exposed_group.keys())}".')
# Parameters in each presence entry
group_params = group.keys()
# Total people input
if 'total_people' not in group_params:
raise TypeError(f'Missing total_people key in occupancy group "{group_id}". Got keys: {", ".join(group_params)}.')
else:
total_people = dynamic_exposed_group['total_people']
total_people = group['total_people']
if not isinstance(total_people, int) or total_people < 0:
raise ValueError(f'The "total_people" in "dynamic_exposed_occupancy" group "{key}" should be a non-negative integer. Got "{total_people}".')
# Presence
if 'presence' not in dynamic_exposed_group:
raise TypeError(f'Missing "presence" key in "dynamic_exposed_occupancy" group "{key}". Got keys: "{list(dynamic_exposed_group.keys())}".')
raise ValueError(f'The total_people input in occupancy group "{group_id}" should be a non-negative integer. Got {total_people}.')
# Infected people input
if 'infected' not in group_params:
raise TypeError(f'Missing infected key in occupancy group "{group_id}". Got keys: {", ".join(group_params)}.')
else:
infected = group['infected']
if not isinstance(infected, int) or infected < 0:
raise ValueError(f'The infected input in occupancy group "{group_id}" should be a non-negative integer. Got {infected}.')
elif infected > total_people: # Validate number of infected <= number of total people
raise ValueError(f'The number of infected people ({infected}) cannot be greater than the total people ({total_people}).')
# Presence input
if 'presence' not in group_params:
raise TypeError(f'Missing presence key in occupancy group "{group_id}". Got keys: {", ".join(group_params)}.')
def get_start_and_finish_time(self, entry: dict):
entry_start = time_string_to_minutes(entry["start_time"])/60
@ -246,139 +264,127 @@ class FormData:
entry_finish = entry_start + entry['duration']/60
return entry_start, entry_finish
def check_time_and_overlap(self, interaction, existing_interactions, lr_start, lr_stop):
def check_interaction_is_within_long_range(self, interaction, existing_interactions,
lr_start, lr_stop):
"""
Checks if the interaction overlaps with any existing interactions for the
same exposure group and if it falls within the long-range exposure time.
Check if the short-range interaction falls within the long-range exposure time.
Check if the short-range interaction given as input overlaps with any already
existing interactions for the same occupancy group.
"""
interaction_start, interaction_finish = self.get_start_and_finish_time(interaction)
# Check if the SR interaction is within the LR exposure time
if lr_start <= interaction_start <= lr_stop and lr_start <= interaction_finish <= lr_stop:
for existing in existing_interactions:
existing_start, existing_finish = self.get_start_and_finish_time(existing)
# Check for overlap
if interaction_start < existing_finish and existing_start < interaction_finish:
raise ValueError(
f'Overlap detected for "short-range interaction": New interaction '
f'"{interaction}" overlaps with existing interaction "{existing}".'
)
# Return True if interaction falls within the current long-range interval
# Check the overlap with already existing interactions
self.check_overlap(interaction, existing_interactions)
return True
return False
def check_overlap(self, interaction, existing_interactions):
def check_overlap(self, entry, existing_entries):
"""
Checks if the dynamic entry overlaps with any already existing interaction.
Check if an entry overlaps with an already existing entry
by comparing the start and finish times of all entries.
"""
interaction_start = time_string_to_minutes(interaction["start_time"])/60
interaction_finish = time_string_to_minutes(interaction["finish_time"])/60
for existing in existing_interactions:
existing_start = time_string_to_minutes(existing["start_time"])/60
existing_finish = time_string_to_minutes(existing["finish_time"])/60
entry_start, entry_finish = self.get_start_and_finish_time(entry)
for existing_entry in existing_entries:
existing_entry_start, existing_entry_finish = self.get_start_and_finish_time(existing_entry)
# Check for overlap
if (interaction_start < existing_finish and existing_start < interaction_finish):
if (entry_start < existing_entry_finish and existing_entry_start < entry_finish):
raise ValueError(
f'Overlap detected: New interaction '
f'"{interaction}" overlaps with existing interaction "{existing}".'
f'Overlap detected: The entry {entry} overlaps with '
f'an already existing entry ({existing_entry}).'
)
# In case no exception is raised, simply returns
return
def validate_population_parameters(self):
"""Validates required parameters for dynamic inputs"""
# Static occupancy is defined.
if self.occupancy_format == 'static':
# Validate number of infected <= number of total people
if self.infected_people >= self.total_people:
raise ValueError(
'Number of infected people cannot be greater or equal to the number of total people.')
# Validate time intervals selected by user
time_intervals = [
['exposed_start', 'exposed_finish'],
['infected_start', 'infected_finish'],
]
if self.exposed_lunch_option:
time_intervals.append(
['exposed_lunch_start', 'exposed_lunch_finish'])
if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option:
time_intervals.append(
['infected_lunch_start', 'infected_lunch_finish'])
for start_name, end_name in time_intervals:
start = getattr(self, start_name)
end = getattr(self, end_name)
if start > end:
"""
Validate required parameters for dynamic inputs.
"""
if isinstance(self.occupancy, typing.Dict):
# Legacy usage - occupancy input is not defined (default empty dict)
if not self.occupancy:
# Validate number of infected <= number of total people
if self.infected_people >= self.total_people:
raise ValueError(
f"{start_name} must be less than {end_name}. Got {start} and {end}.")
'Number of infected people cannot be greater or equal to the number of total people.')
def validate_lunch(start, finish):
lunch_start = getattr(self, f'{population}_lunch_start')
lunch_finish = getattr(self, f'{population}_lunch_finish')
return (start <= lunch_start <= finish and
start <= lunch_finish <= finish)
# Validate time intervals selected by user
time_intervals = [
['exposed_start', 'exposed_finish'],
['infected_start', 'infected_finish'],
]
if self.exposed_lunch_option:
time_intervals.append(
['exposed_lunch_start', 'exposed_lunch_finish'])
if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option:
time_intervals.append(
['infected_lunch_start', 'infected_lunch_finish'])
def get_lunch_mins(population):
lunch_mins = 0
if getattr(self, f'{population}_lunch_option'):
lunch_mins = getattr(
self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start')
return lunch_mins
def get_coffee_mins(population):
coffee_mins = 0
if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0':
coffee_mins = COFFEE_OPTIONS_INT[getattr(
self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration')
return coffee_mins
def get_activity_mins(population):
return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start')
populations = [
'exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed']
for population in populations:
# Validate lunch time within the activity times.
if (getattr(self, f'{population}_lunch_option') and
not validate_lunch(getattr(self, f'{population}_start'), getattr(
self, f'{population}_finish'))
):
raise ValueError(
f"{population} lunch break must be within presence times."
)
# Length of breaks < length of activity
if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population):
raise ValueError(
f"Length of breaks >= Length of {population} presence."
)
for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT),
('infected_coffee_break_option', COFFEE_OPTIONS_INT)]:
if getattr(self, attr_name) not in valid_set:
for start_name, end_name in time_intervals:
start = getattr(self, start_name)
end = getattr(self, end_name)
if start > end:
raise ValueError(
f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
# Dynamic occupancy is defined.
elif self.occupancy_format == 'dynamic':
# Validate dynamic input format
self.validate_dynamic_input(self.dynamic_infected_occupancy, "dynamic_infected_occupancy")
# Check if dynamic exposed input is a dict
if isinstance(self.dynamic_exposed_occupancy, dict):
# Check if the dict is not empty
if not self.dynamic_exposed_occupancy:
raise ValueError(f'The input "dynamic_exposed_occupancy" should be a valid, non-empty dict. Got "{self.dynamic_exposed_occupancy}".')
# The key is the actual identifier
for key, group in self.dynamic_exposed_occupancy.items():
# For each group, validate dynamic exposed input format
self.validate_dynamic_exposed_format(group, key)
# ...as well as validate dynamic exposed presence data
self.validate_dynamic_input(group['presence'], "dynamic_exposed_occupancy", key)
f"{start_name} must be less than {end_name}. Got {start} and {end}.")
def validate_lunch(start, finish):
lunch_start = getattr(self, f'{population}_lunch_start')
lunch_finish = getattr(self, f'{population}_lunch_finish')
return (start <= lunch_start <= finish and
start <= lunch_finish <= finish)
def get_lunch_mins(population):
lunch_mins = 0
if getattr(self, f'{population}_lunch_option'):
lunch_mins = getattr(
self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start')
return lunch_mins
def get_coffee_mins(population):
coffee_mins = 0
if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0':
coffee_mins = COFFEE_OPTIONS_INT[getattr(
self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration')
return coffee_mins
def get_activity_mins(population):
return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start')
populations = [
'exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed']
for population in populations:
# Validate lunch time within the activity times.
if (getattr(self, f'{population}_lunch_option') and
not validate_lunch(getattr(self, f'{population}_start'), getattr(
self, f'{population}_finish'))
):
raise ValueError(
f"{population} lunch break must be within presence times."
)
# Length of breaks < length of activity
if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population):
raise ValueError(
f"Length of breaks >= Length of {population} presence."
)
for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT),
('infected_coffee_break_option', COFFEE_OPTIONS_INT)]:
if getattr(self, attr_name) not in valid_set:
raise ValueError(
f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
# Occupancy input is defined
else:
raise TypeError(f'The input "dynamic_exposed_occupancy" should be a valid, non-empty dict. Got "{type(self.dynamic_exposed_occupancy)}".')
# Checks if occupancy input is a valid dict
if self.occupancy and isinstance(self.occupancy, typing.Dict):
# The key is the actual identifier
for group_id, group in self.occupancy.items():
# For each group, validate input format
self.validate_dynamic_exposed_format(group_id, group)
# ...as well as the respective presence input
self.validate_group_presence_input(group_id, group['presence'])
else:
raise ValueError(f"'{self.occupancy_format}' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.")
raise TypeError(f'The occupancy input should be a valid dictionary. Got {self.occupancy}.')
def validate(self):
raise NotImplementedError("Subclass must implement")
@ -386,72 +392,73 @@ class FormData:
def build_model(self, sample_size: typing.Optional[int] = None):
raise NotImplementedError("Subclass must implement")
def population_present_changes(self, population_list: typing.List[models.Interval]) -> typing.List[float]:
def population_present_changes(self, transition_times_list: typing.Tuple[float, ...]) -> typing.List[float]:
"""
Returns a sorted list of unique state changes on
a population list.
"""
state_change_times = set(population_list[0].transition_times())
for population in population_list:
state_change_times.update(population.transition_times())
return sorted(state_change_times)
return sorted(set(transition_times_list))
def convert_interval_to_piecewise(self, interval: models.SpecificInterval, value: int):
"""
Converts an Interval and a single value to an IntPiecewiseConstant.
"""
transition_times = []
values = []
for start, end in interval.present_times:
transition_times.extend([start, end])
values.extend([value, 0])
# Drop the last value (0) to match number of intervals
if values:
values.pop()
return models.IntPiecewiseConstant(
transition_times=tuple(transition_times),
values=tuple(values),
)
def build_CO2_piecewise(self):
"""
Builds a simple IntPiecewiseConstant for the different
population groups that are defined.
"""
if self.occupancy_format == 'dynamic':
infected_occupancy = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy)
total_models = [models.SimplePopulation(
number=infected_occupancy, # IntPiecewiseConstant
presence=None,
activity=None, # type: ignore
)]
# Legacy usage - occupancy input is not defined (default empty dict)
if not self.occupancy:
infected_occupancy = self.convert_interval_to_piecewise(
interval=self.infected_present_interval(),
value=self.infected_people,
)
exposed_occupancy = self.convert_interval_to_piecewise(
interval=self.exposed_present_interval(),
value=self.total_people - self.infected_people,
)
total_models = [infected_occupancy, exposed_occupancy]
else:
infected_occupancy = self.generate_infected_occupancy(self.occupancy)
total_models = [infected_occupancy]
# For all state changes
total_presence = [infected_occupancy.interval()]
for exposure_group in self.dynamic_exposed_occupancy.values():
exposed_occupancy = exposure_group['total_people']
exposed_presence = self.generate_dynamic_occupancy(exposure_group['presence'])
total_models.append(models.SimplePopulation(
number=exposed_occupancy, # int
presence=exposed_presence, # SpecificInterval
activity=None, # type: ignore
))
# For all state changes
total_presence.append(exposed_presence)
elif self.occupancy_format == 'static':
infected_people = self.infected_people
exposed_people = self.total_people - self.infected_people
infected_presence = self.infected_present_interval()
exposed_presence = self.exposed_present_interval()
infected_population = models.SimplePopulation(
number=infected_people,
presence=infected_presence,
activity=None, # type: ignore
)
exposed_population=models.SimplePopulation(
number=exposed_people,
presence=exposed_presence,
activity=None, # type: ignore
)
total_presence = [infected_presence, exposed_presence]
total_models = [infected_population, exposed_population]
for group in self.occupancy.values():
model_piecewise = self.convert_interval_to_piecewise(
interval=self.generate_exposed_presence(group['presence']),
value=group['total_people'] - group['infected']
)
total_models.append(model_piecewise)
# Get all state change times from combined populations
all_state_changes=self.population_present_changes(total_presence)
all_state_changes = self.population_present_changes([t for model in total_models for t in model.transition_times])
# Compute total people at each state change
total_people = []
for _, stop in zip(all_state_changes[:-1], all_state_changes[1:]):
total_people_in_group = sum(population.people_present(stop) for population in total_models)
total_people_in_group = sum(model.value(stop) for model in total_models)
total_people.append(total_people_in_group)
return models.IntPiecewiseConstant(transition_times=tuple(all_state_changes), values=tuple(total_people))
return models.IntPiecewiseConstant(
transition_times=tuple(all_state_changes),
values=tuple(total_people)
)
def _compute_breaks_in_interval(self, start, finish, n_breaks, duration) -> models.BoundarySequence_t:
break_delay = ((finish - start) -
@ -662,65 +669,65 @@ class FormData:
breaks=breaks,
)
def generate_dynamic_occupancy(self, dynamic_occupancy: typing.List[typing.Dict[str, typing.Any]]):
"""
Creates a model to represent occupancy over time. If `total_people` is
provided in the input, the method generates an `IntPiecewiseConstant` model, representing
occupancy as a piecewise constant function with defined values for each interval.
Otherwise, it generates a `SpecificInterval` model, which only tracks the presence of
intervals without associating values to them.
"""
def generate_exposed_presence(self, presence: typing.List) -> models.SpecificInterval:
"""
Creates a model to represent exposed occupancy over time.
"""
exposed_intervals = []
# Initialize variables
if 'total_people' in dynamic_occupancy[0]: # build IntPiecewiseConstant
computePiecewiseConstant = True
transition_times = []
values = []
else:
computePiecewiseConstant = False
present_times = [] # build SpecificInterval
# Sort occupancy entries by start_time to ensure proper ordering
presence_sorted = sorted(
presence, key=lambda x: time_string_to_minutes(x['start_time'])
)
# Sort occupancy entries by start_time to ensure proper ordering
dynamic_occupancy_sorted = sorted(
dynamic_occupancy, key=lambda x: time_string_to_minutes(x['start_time'])
for period in presence_sorted:
start_time = time_string_to_minutes(period['start_time']) / 60
finish_time = time_string_to_minutes(period['finish_time']) / 60
exposed_intervals.append((start_time, finish_time))
return models.SpecificInterval(tuple(exposed_intervals))
def generate_infected_occupancy(self, occupancy: typing.Dict) -> models.IntPiecewiseConstant:
"""
Creates a model to represent infected occupancy over time.
"""
transition_times = set()
infected_intervals = []
# Extract presence data
for group in occupancy.values():
infected = group["infected"]
for period in group["presence"]:
start_time = time_string_to_minutes(period['start_time']) / 60
finish_time = time_string_to_minutes(period['finish_time']) / 60
transition_times.add(start_time) # unique time points
transition_times.add(finish_time) # unique time points
infected_intervals.append((start_time, finish_time, infected))
# Sort transition times
sorted_transition_times = list(sorted(transition_times))
# Values for each time segment
raw_values = [
sum(people for start, end, people in infected_intervals if start <= t1 < end)
for t1 in sorted_transition_times[:-1]
]
# Merge consecutive intervals with the same infected count
opt_times = [sorted_transition_times[0]]
opt_values = [raw_values[0]]
for i in range(1, len(raw_values)):
if raw_values[i] != opt_values[-1]:
opt_times.append(sorted_transition_times[i])
opt_values.append(raw_values[i])
# Ensure the last time is included
opt_times.append(sorted_transition_times[-1])
return models.IntPiecewiseConstant(
transition_times=tuple(opt_times),
values=tuple(opt_values)
)
last_finish_time = None
for occupancy in dynamic_occupancy_sorted:
start_time = time_string_to_minutes(occupancy['start_time']) / 60
finish_time = time_string_to_minutes(occupancy['finish_time']) / 60
if computePiecewiseConstant:
total_people = occupancy['total_people']
# Fill in gap with a zero occupancy if there is a time gap
if last_finish_time is not None and start_time > last_finish_time:
transition_times.append(last_finish_time)
values.append(0) # Add zero for the gap period
# Update lists with current occupancy entry
transition_times.extend([start_time, finish_time])
values.append(total_people)
# Update the last known finish time
last_finish_time = finish_time
else:
present_times.append((start_time, finish_time))
# When computing the total people, validate that we have enough values to compute the occupancy
if computePiecewiseConstant:
unique_transition_times_sorted = np.array(sorted(set(transition_times)))
if (len(values) != len(unique_transition_times_sorted) - 1):
raise ValueError("Cannot compute dynamic occupancy with the provided inputs.")
else:
return models.IntPiecewiseConstant(
transition_times=tuple(unique_transition_times_sorted),
values=tuple(values)
)
else:
return models.SpecificInterval(tuple(present_times))
def _hours2timestring(hours: float):
# Convert times like 14.5 to strings, like "14:30"

View file

@ -201,13 +201,14 @@ class VirusFormData(FormData):
f'The sum of all respiratory activities should be 100. Got {total_percentage}.')
# Validate number of people with short-range interactions
if self.occupancy_format == 'static':
if not self.occupancy:
# Legacy usage - occupancy input is not defined (default empty dict)
max_occupants_for_sr = self.total_people - self.infected_people
elif self.occupancy_format == 'dynamic':
else:
max_occupants_for_sr = 0
for key, group in self.dynamic_exposed_occupancy.items():
occupants_in_group = group['total_people']
max_occupants_for_sr = max(max_occupants_for_sr, occupants_in_group)
for group_id, group in self.occupancy.items():
exposed_occupants_in_group = group['total_people'] - group['infected']
max_occupants_for_sr = max(max_occupants_for_sr, exposed_occupants_in_group)
if self.short_range_occupants > max_occupants_for_sr:
raise ValueError(
@ -217,14 +218,19 @@ class VirusFormData(FormData):
# Validate short-range interactions interval
if self.short_range_option == "short_range_yes":
if isinstance(self.short_range_interactions, dict):
# Check if occupancy format is static, there should be one key-value only in short_range_interactions
if self.occupancy_format == "static" and len(self.short_range_interactions) > 1:
raise ValueError(
'When occupancy format is "static", there should be only one interaction group in "short_range_interactions".'
)
# The key is the actual identifier
for key, group in self.short_range_interactions.items():
self.validate_dynamic_input(group, "short_range_interactions", key)
# Checks if short_range_interactions input is not empty
if len(self.short_range_interactions) == 0:
raise ValueError(f'When short_range_option input is set to "{self.short_range_option}", the short_range_interactions input should not be empty. Got {self.short_range_interactions}.')
# Checks that the number of groups in the short_range_interactions input is less or equal than those defined in the occupancy
elif not self.occupancy:
# Legacy usage - occupancy input is not defined (default empty dict)
if len(self.short_range_interactions) > 1:
raise ValueError(f'Incompatible number of occupancy groups in the short_range_interactions input. Got {len(self.short_range_interactions)} groups when the maximum is 1.')
else:
if len(self.short_range_interactions) > len(self.occupancy):
raise ValueError(f'Incompatible number of occupancy groups in the short_range_interactions input. Got {len(self.short_range_interactions)} groups when the maximum is {len(self.occupancy)} (from the occupancy input).')
for group_id, interactions in self.short_range_interactions.items():
self.validate_short_range_interaction_input(group_id, interactions)
def initialize_room(self) -> models.Room:
# Initializes room with volume either given directly or as product of area and height
@ -246,8 +252,6 @@ class VirusFormData(FormData):
return models.Room(volume=volume, inside_temp=models.PiecewiseConstant((0, 24), (inside_temp,)), humidity=humidity) # type: ignore
def build_mc_model(self) -> mc.ExposureModelGroup:
size = self.data_registry.monte_carlo['sample_size']
room: models.Room = self.initialize_room()
ventilation: models._VentilationBase = self.ventilation()
infected_population: models.InfectedPopulation = self.infected_population()
@ -266,7 +270,7 @@ class VirusFormData(FormData):
presence=presence,
distance=distances,
expiration_def=interaction['expiration']
).build_model(size))
))
concentration_model: models.ConcentrationModel = mc.ConcentrationModel(
data_registry=self.data_registry,
@ -274,19 +278,34 @@ class VirusFormData(FormData):
ventilation=ventilation,
infected=infected_population,
evaporation_factor=0.3,
).build_model(size)
)
geographical_data: models.Cases = mc.Cases(
geographic_population=self.geographic_population,
geographic_cases=self.geographic_cases,
ascertainment_bias=CONFIDENCE_LEVEL_OPTIONS[self.ascertainment_bias],
).build_model(size)
)
if self.occupancy_format == 'dynamic':
if not self.occupancy:
# Legacy usage - occupancy input is not defined (default empty dict)
exposed_population = self.exposed_population()
short_range_tuple = tuple(item for sublist in short_range.values() for item in sublist)
return mc.ExposureModelGroup(
data_registry=self.data_registry,
exposure_models = (mc.ExposureModel(
data_registry=self.data_registry,
concentration_model=concentration_model,
short_range=short_range_tuple,
exposed=exposed_population,
geographical_data=geographical_data,
exposed_to_short_range=self.short_range_occupants,
),)
)
else:
exposure_model_set = []
for exposure_group in self.dynamic_exposed_occupancy.keys():
for exposure_group in self.occupancy.keys():
sr_models: typing.Tuple[models.ShortRangeModel, ...] = tuple(short_range[exposure_group])
exposed_population: mc.Population = self.exposed_population(exposure_group).build_model(size)
exposed_population = self.exposed_population(exposure_group)
exposure_model = mc.ExposureModel(
data_registry=self.data_registry,
@ -301,27 +320,12 @@ class VirusFormData(FormData):
return mc.ExposureModelGroup(
data_registry=self.data_registry,
exposure_models=[individual_model.build_model(size) for individual_model in exposure_model_set]
)
elif self.occupancy_format == 'static':
exposed_population = self.exposed_population()
short_range_tuple = tuple(item for sublist in short_range.values() for item in sublist)
return mc.ExposureModelGroup(
data_registry=self.data_registry,
exposure_models = [mc.ExposureModel(
data_registry=self.data_registry,
concentration_model=concentration_model,
short_range=short_range_tuple,
exposed=exposed_population,
geographical_data=geographical_data,
exposed_to_short_range=self.short_range_occupants,
).build_model(size)]
exposure_models=tuple(exposure_model_set)
)
def build_model(self, sample_size=None) -> models.ExposureModelGroup:
size = self.data_registry.monte_carlo['sample_size'] if not sample_size else sample_size
return self.build_mc_model().build_model(size=size)
sample_size = sample_size or self.data_registry.monte_carlo['sample_size']
return self.build_mc_model().build_model(sample_size)
def build_CO2_model(self, sample_size=None) -> models.CO2ConcentrationModel:
"""
@ -481,7 +485,7 @@ class VirusFormData(FormData):
def generate_precise_activity_expiration(self) -> typing.Tuple[typing.Any, ...]:
# It means the precise activity is not defined by a specific input.
if self.precise_activity == {}:
if not self.precise_activity:
return ()
respiratory_dict = {}
for respiratory_activity in self.precise_activity['respiratory_activity']:
@ -499,12 +503,13 @@ class VirusFormData(FormData):
virus = virus_distributions(self.data_registry)[self.virus_type]
# Occupancy
if self.occupancy_format == 'dynamic':
infected_occupancy = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy)
infected_presence = None
else:
infected_occupancy = self.infected_people
if not self.occupancy:
# Legacy usage - occupancy input is not defined (default empty dict)
infected_occupancy: typing.Union[int, models.IntPiecewiseConstant] = self.infected_people
infected_presence = self.infected_present_interval()
else:
infected_occupancy = self.generate_infected_occupancy(self.occupancy)
infected_presence = None
# Activity and expiration
activity_defn = self.data_registry.population_scenario_activity[
@ -513,8 +518,7 @@ class VirusFormData(FormData):
self.activity_type]['expiration']
if (self.activity_type == 'smallmeeting'):
# Conversation of N people is approximately 1/N% of the time speaking.
total_people: int = max(
infected_occupancy.values) if self.occupancy_format == 'dynamic' else self.total_people
total_people: int = self.total_people if not self.occupancy else max(infected_occupancy.values) # type: ignore
expiration_defn = {'Speaking': 1, 'Breathing': total_people - 1}
elif (self.activity_type == 'precise'):
activity_defn, expiration_defn = self.generate_precise_activity_expiration()
@ -542,16 +546,15 @@ class VirusFormData(FormData):
single group of exposed population, except when breaks are defined.
"""
# Occupancy
if self.occupancy_format == 'dynamic':
dynamic_group = self.dynamic_exposed_occupancy[exposure_group]
exposed_occupancy = dynamic_group['total_people']
exposed_presence = self.generate_dynamic_occupancy(dynamic_group['presence'])
else:
if not exposure_group and not self.occupancy:
# The number of exposed occupants is the total number of occupants
# minus the number of infected occupants.
exposed_occupancy = self.total_people - self.infected_people
exposed_presence = self.exposed_present_interval()
elif exposure_group:
dynamic_group = self.occupancy[exposure_group]
exposed_occupancy = dynamic_group['total_people'] - dynamic_group['infected']
exposed_presence = self.generate_exposed_presence(dynamic_group['presence'])
# Activity
activity_defn = (self.precise_activity['physical_activity']
@ -608,8 +611,6 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]:
'calculator_version': calculator_version,
'ceiling_height': '',
'conditional_probability_viral_loads': '0',
'dynamic_exposed_occupancy': '{}',
'dynamic_infected_occupancy': '[]',
'event_month': 'January',
'exposed_coffee_break_option': 'coffee_break_4',
'exposed_coffee_duration': '10',
@ -640,7 +641,7 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]:
'mask_type': 'Type I',
'mask_wearing_option': 'mask_off',
'mechanical_ventilation_type': '',
'occupancy_format': 'static',
'occupancy': '{}',
'opening_distance': '0.2',
'room_heating_option': '0',
'room_number': '123',

View file

@ -590,71 +590,52 @@ def test_form_timezone(baseline_form_data, data_registry, longitude, latitude, m
assert offset == expected_offset
@pytest.mark.parametrize(
["occupancy_format_input", "error"],
[
['dynamc', "'dynamc' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",],
['stact', "'stact' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",],
['random', "'random' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",]
]
)
def test_occupancy_format_input(occupancy_format_input, error, baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = occupancy_format_input
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
def test_dynamic_input_format_ValueError(baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = "dynamic"
# Check empty list for infected occupancy
baseline_form.dynamic_infected_occupancy = []
error = 'The input "dynamic_infected_occupancy" should be a valid, non-empty list. Got "[]".'
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
# Check empty dict for exposed occupancy
baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}]
baseline_form.dynamic_exposed_occupancy = {}
error = 'The input "dynamic_exposed_occupancy" should be a valid, non-empty dict. Got "{}".'
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
@pytest.mark.parametrize(
["exposed_format", "error"],
[
[
{"tal_people": 10, "presence": [{"start_time": "10:00", "finish_time": "11:00"}],},
'Missing "total_people" key in "dynamic_exposed_occupancy" group "group_1". Got keys: "[\'tal_people\', \'presence\']".'
],
[
{"total_people": 10, "pesence": [{"start_time": "10:00", "finish_time": "11:00"}],},
'Missing "presence" key in "dynamic_exposed_occupancy" group "group_1". Got keys: "[\'total_people\', \'pesence\']".'
],
]
)
def test_dynamic_exposed_format_TypeError(exposed_format, error, baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = "dynamic"
baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}]
baseline_form.dynamic_exposed_occupancy = {"group_1": exposed_format}
def test_occupancy_TypeError(baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy = [] # type: ignore
error = 'The occupancy input should be a valid dictionary. Got [].'
with pytest.raises(TypeError, match=re.escape(error)):
baseline_form.validate()
@pytest.mark.parametrize(
["exposed_presence", "error"],
["occupancy", "error"],
[
[[["start_time", "10:00", "finish_time", "11:00"]], 'Each entry in "dynamic_exposed_occupancy" should be a dictionary. Got "<class \'list\'>".'],
[[{"art_time": "10:00", "finish_time": "11:00"}], 'Missing "start_time" key in "dynamic_exposed_occupancy" in "group_1" ("presence"). Got keys: "[\'art_time\', \'finish_time\']".'],
[[{"start_time": "10:00", "ish_time": "11:00"}], 'Missing "finish_time" key in "dynamic_exposed_occupancy" in "group_1" ("presence"). Got keys: "[\'start_time\', \'ish_time\']".'],
[
{"tal_people": 10, "infected": 5, "presence": [{"start_time": "10:00", "finish_time": "11:00"}],},
'Missing total_people key in occupancy group "group_A". Got keys: tal_people, infected, presence.'
],
[
{"total_people": 10, "infeted": 5, "presence": [{"start_time": "10:00", "finish_time": "11:00"}],},
'Missing infected key in occupancy group "group_A". Got keys: total_people, infeted, presence.'
],
[
{"total_people": 10, "infected": 5, "pesence": [{"start_time": "10:00", "finish_time": "11:00"}],},
'Missing presence key in occupancy group "group_A". Got keys: total_people, infected, pesence.'
],
]
)
def test_dynamic_exposed_presence_TypeError(exposed_presence, error, baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = "dynamic"
baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}]
baseline_form.dynamic_exposed_occupancy = {
"group_1": {
def test_occupancy_general_params_TypeError(occupancy, error, baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy = {"group_A": occupancy}
with pytest.raises(TypeError, match=re.escape(error)):
baseline_form.validate()
@pytest.mark.parametrize(
["occupancy_presence", "error"],
[
[{"start_time": "10:00", "finish_time": "11:00"}, 'The presence parameter in occupancy group "group_A" should be a valid list. Got <class \'dict\'>.'],
[[], 'The presence parameter in occupancy group "group_A" should be a valid, non-empty list. Got [].'],
[[["start_time", "10:00", "finish_time", "11:00"]], 'Each presence interval should be a valid dictionary. Got <class \'list\'> in occupancy group "group_A".'],
[[{"art_time": "10:00", "finish_time": "11:00"}], 'Missing start_time key in presence parameter of occupancy group "group_A". Got keys: art_time, finish_time.'],
[[{"start_time": "10:00", "ish_time": "11:00"}], 'Missing finish_time key in presence parameter of occupancy group "group_A". Got keys: start_time, ish_time.'],
]
)
def test_occupancy_presence_TypeError(occupancy_presence, error, baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy = {
"group_A": {
"total_people": 10,
"presence": exposed_presence,
"infected": 5,
"presence": occupancy_presence,
}
}
with pytest.raises(TypeError, match=re.escape(error)):
@ -662,42 +643,38 @@ def test_dynamic_exposed_presence_TypeError(exposed_presence, error, baseline_fo
@pytest.mark.parametrize(
["exposed_presence", "error"],
["occupancy_presence", "error"],
[
[[{"start_time": "10", "finish_time": "11:00"}], 'Invalid time format for "start_time" in "dynamic_exposed_occupancy" in "group_1" ("presence"). Expected "HH:MM". Got "10".'],
[[{"start_time": "10:00", "finish_time": "11"}], 'Invalid time format for "finish_time" in "dynamic_exposed_occupancy" in "group_1" ("presence"). Expected "HH:MM". Got "11".'],
[[{"start_time": "10", "finish_time": "11:00"}], 'Invalid time format found in presence parameter of occupancy group "group_A". Expected HH:MM, got 10.'],
[[{"start_time": "10:00", "finish_time": "11"}], 'Invalid time format found in presence parameter of occupancy group "group_A". Expected HH:MM, got 11.'],
]
)
def test_dynamic_exposed_presence_ValueError(exposed_presence, error, baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = "dynamic"
baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}]
baseline_form.dynamic_exposed_occupancy = {
"group_1": {
def test_occupancy_presence_ValueError(occupancy_presence, error, baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy = {
"group_A": {
"total_people": 10,
"presence": exposed_presence
"infected": 5,
"presence": occupancy_presence
}
}
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
@pytest.mark.parametrize(
["exposed_total_people", "error"],
["total_people", "error"],
[
["10", 'The "total_people" in "dynamic_exposed_occupancy" group "group_1" should be a non-negative integer. Got "10".'],
[9.8, 'The "total_people" in "dynamic_exposed_occupancy" group "group_1" should be a non-negative integer. Got "9.8".'],
[[10], 'The "total_people" in "dynamic_exposed_occupancy" group "group_1" should be a non-negative integer. Got "[10]".'],
[-1, 'The "total_people" in "dynamic_exposed_occupancy" group "group_1" should be a non-negative integer. Got "-1".'],
["10", 'The total_people input in occupancy group "group_A" should be a non-negative integer. Got 10.'],
[9.8, 'The total_people input in occupancy group "group_A" should be a non-negative integer. Got 9.8.'],
[[10], 'The total_people input in occupancy group "group_A" should be a non-negative integer. Got [10].'],
[-1, 'The total_people input in occupancy group "group_A" should be a non-negative integer. Got -1.'],
]
)
def test_dynamic_exposed_total_people_ValueError(exposed_total_people, error, baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = "dynamic"
baseline_form.dynamic_infected_occupancy = [
{"total_people": 1, "start_time": "08:00", "finish_time": "10:00"},
{"total_people": 2, "start_time": "10:00", "finish_time": "18:00"},
]
baseline_form.dynamic_exposed_occupancy = {
"group_1": {
"total_people": exposed_total_people,
def test_occupancy_total_people_ValueError(total_people, error, baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy = {
"group_A": {
"total_people": total_people,
"infected": 10,
"presence": [{"start_time": "08:00", "finish_time": "18:00"},],
},
}
@ -705,42 +682,33 @@ def test_dynamic_exposed_total_people_ValueError(exposed_total_people, error, ba
baseline_form.validate()
def test_dynamic_infected_overlap(baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = "dynamic"
baseline_form.dynamic_infected_occupancy = [
{"total_people": 10, "start_time": "08:00", "finish_time": "18:00"},
{"total_people": 10, "start_time": "10:00", "finish_time": "18:00"},
@pytest.mark.parametrize(
["infected", "error"],
[
["10", 'The infected input in occupancy group "group_A" should be a non-negative integer. Got 10.'],
[9.8, 'The infected input in occupancy group "group_A" should be a non-negative integer. Got 9.8.'],
[[10], 'The infected input in occupancy group "group_A" should be a non-negative integer. Got [10].'],
[-1, 'The infected input in occupancy group "group_A" should be a non-negative integer. Got -1.'],
[30, 'The number of infected people (30) cannot be greater than the total people (20).']
]
baseline_form.dynamic_exposed_occupancy = {
"group_1": {
"total_people": 10,
"presence": [{"start_time": "08:00", "finish_time": "18:00"}],
},
"group_2": {
"total_people": 10,
"presence": [{"start_time": "10:00", "finish_time": "11:00"}],
},
"group_3": {
"total_people": 10,
"presence": [{"start_time": "15:00", "finish_time": "18:00"}],
)
def test_occupancy_infected_ValueError(infected, error, baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy = {
"group_A": {
"total_people": 20,
"infected": infected,
"presence": [{"start_time": "08:00", "finish_time": "18:00"},],
},
}
error = (
'Overlap detected: New interaction '
'"{\'total_people\': 10, \'start_time\': \'10:00\', \'finish_time\': \'18:00\'}" '
'overlaps with existing interaction '
'"{\'total_people\': 10, \'start_time\': \'08:00\', \'finish_time\': \'18:00\'}".'
)
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
def test_dynamic_exposure_group_duplication(baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = "dynamic"
baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}]
baseline_form.dynamic_exposed_occupancy = {
"group_1": {
def test_occupancy_presence_overlap(baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy = {
"group_A": {
"total_people": 10,
"infected": 5,
"presence": [
{"start_time": "08:00", "finish_time": "17:00"},
{"start_time": "13:00", "finish_time": "14:00"},
@ -748,10 +716,10 @@ def test_dynamic_exposure_group_duplication(baseline_form: virus_validator.Virus
},
}
error = (
'Overlap detected: New interaction '
'"{\'start_time\': \'13:00\', \'finish_time\': \'14:00\'}"'
' overlaps with existing interaction '
'"{\'start_time\': \'08:00\', \'finish_time\': \'17:00\'}".'
'Overlap detected: The entry '
'{\'start_time\': \'13:00\', \'finish_time\': \'14:00\'}'
' overlaps with an already existing entry '
'({\'start_time\': \'08:00\', \'finish_time\': \'17:00\'}).'
)
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
@ -760,32 +728,32 @@ def test_dynamic_exposure_group_duplication(baseline_form: virus_validator.Virus
@pytest.mark.parametrize(
["short_range_input", "error"],
[
[[["expiration", "Shouting", "start_time", "09:00", "duration", 30]], 'Each entry in "short_range_interactions" should be a dictionary. Got "<class \'list\'>".'],
[[{"expiratio": "Shouting", "start_time": "09:00", "duration": 30}], 'Missing "expiration" key in "short_range_interactions", "group_1". Got keys: "[\'expiratio\', \'start_time\', \'duration\']".'],
[[{"expiration": "Shouting", "start_tim": "09:00", "duration": 30}], 'Missing "start_time" key in "short_range_interactions", "group_1". Got keys: "[\'expiration\', \'start_tim\', \'duration\']".'],
[[{"expiration": "Shouting", "start_time": "09:00", "duratio": 30}], 'Missing "duration" key in "short_range_interactions", "group_1". Got keys: "[\'expiration\', \'start_time\', \'duratio\']".'],
[[["expiration", "Shouting", "start_time", "09:00", "duration", 30]], 'Each short-range interaction should be a dictionary. Got <class \'list\'> in occupancy group "group_A".'],
[[{"expiratio": "Shouting", "start_time": "09:00", "duration": 30}], 'Missing expiration key in short-range interaction for occupancy group "group_A". Got keys: expiratio, start_time, duration.'],
[[{"expiration": "Shouting", "start_tim": "09:00", "duration": 30}], 'Missing start_time key in short-range interaction for occupancy group "group_A". Got keys: expiration, start_tim, duration.'],
[[{"expiration": "Shouting", "start_time": "09:00", "duratio": 30}], 'Missing duration key in short-range interaction for occupancy group "group_A". Got keys: expiration, start_time, duratio.'],
]
)
def test_short_range_TypeError(short_range_input, error, baseline_form: virus_validator.VirusFormData):
baseline_form.short_range_option = "short_range_yes"
baseline_form.short_range_interactions = {"group_1": short_range_input}
baseline_form.short_range_interactions = {"group_A": short_range_input}
with pytest.raises(TypeError, match=re.escape(error)):
baseline_form.validate()
def test_short_range_exposure_group(baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = 'dynamic'
baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}]
baseline_form.dynamic_exposed_occupancy = {
"group_1": {
"total_people": 10,
baseline_form.occupancy = {
"group_A": {
"total_people": 20,
"infected": 10,
"presence": [
{"start_time": "10:00", "finish_time": "12:00"},
{"start_time": "13:00", "finish_time": "17:00"},
],
},
"group_2": {
"total_people": 10,
"group_B": {
"total_people": 20,
'infected': 10,
"presence": [
{"start_time": "10:00", "finish_time": "11:00"},
],
@ -795,19 +763,19 @@ def test_short_range_exposure_group(baseline_form: virus_validator.VirusFormData
# Check for existence of the dictionary key
baseline_form.short_range_option = 'short_range_yes'
baseline_form.short_range_interactions = {
"group_4": [{"expiration": "Shouting", "start_time": "10:30", "duration": 30}],
"group_C": [{"expiration": "Shouting", "start_time": "10:30", "duration": 30}],
}
error = 'Exposure group "group_4" in short-range interaction not found in dynamic exposed occupancy.'
error = 'Occupancy group "group_C" referenced in short-range interactions was not found in the occupancy input.'
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
# Check if interaction time is within simulation time
baseline_form.short_range_interactions = {
"group_1": [{"expiration": "Shouting", "start_time": "18:00", "duration": 30}],
"group_A": [{"expiration": "Shouting", "start_time": "18:00", "duration": 30}],
}
error = (
'Short-range interaction "{\'expiration\': \'Shouting\', \'start_time\': \'18:00\', \'duration\': 30}"'
' does not fall within any long-range exposure interval in "group_1".'
'Short-range interaction {\'expiration\': \'Shouting\', \'start_time\': \'18:00\', \'duration\': 30}'
' does not fall within any presence interval in occupancy group "group_A".'
)
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
@ -816,52 +784,82 @@ def test_short_range_exposure_group(baseline_form: virus_validator.VirusFormData
@pytest.mark.parametrize(
["short_range_input", "error"],
[
[[{"expiration": "Shouting", "start_time": "9", "duration": 30}], 'Invalid time format for "start_time" in "short_range_interactions", "group_1". Expected "HH:MM". Got "9".'],
[[{"expiration": "Whisper", "start_time": "09:00", "duration": 30}], 'The "expiration" in "short_range_interactions", "group_1", does not exist in the registry. Got "Whisper".'],
[[{"expiration": "Shouting", "start_time": "09:00", "duration": -30}], 'The "duration" in "short_range_interactions", "group_1", should be a non-negative integer. Got "-30".'],
[[{"expiration": "Shouting", "start_time": "9", "duration": 30}], 'Invalid time format for start_time in short-range interaction for occupancy group "group_A". Expected HH:MM, got 9.'],
[[{"expiration": "Whisper", "start_time": "09:00", "duration": 30}], 'Invalid expiration value in short-range interaction for occupancy group "group_A". Got "Whisper".'],
[[{"expiration": "Shouting", "start_time": "09:00", "duration": -30}], 'The duration value in short-range interaction for occupancy group "group_A" should be a non-negative integer. Got -30.'],
]
)
def test_short_range_value_error(short_range_input, error, baseline_form: virus_validator.VirusFormData):
baseline_form.short_range_option = "short_range_yes"
baseline_form.short_range_interactions = {"group_1": short_range_input}
baseline_form.short_range_interactions = {"group_A": short_range_input}
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
def test_short_range_with_static_occupancy(baseline_form: virus_validator.VirusFormData):
# By default the occupancy format is 'static'
def test_short_range_with_occupancy_format(baseline_form: virus_validator.VirusFormData):
baseline_form.short_range_option = "short_range_yes"
baseline_form.short_range_interactions = {"group_1": [{"expiration": "Shouting", "start_time": "07:00", "duration": 30}]}
baseline_form.short_range_interactions = {"group_A": [{"expiration": "Shouting", "start_time": "07:00", "duration": 30}]}
# Check if interaction is defined during simulation time
# Checks if interaction is defined during simulation time
error = (
'Short-range interactions should be defined during simulation time. Got '
'"{\'expiration\': \'Shouting\', \'start_time\': \'07:00\', \'duration\': 30}".'
'Short-range interactions must occur during simulation time. Got'
' {\'expiration\': \'Shouting\', \'start_time\': \'07:00\', \'duration\': 30}'
' in occupancy group "group_A".'
)
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
# Check overlap of interactions
# Checks overlap of short-range interactions
baseline_form.short_range_interactions = {
"group_1": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30},
"group_A": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30},
{"expiration": "Shouting", "start_time": "10:10", "duration": 15}],
}
error = (
'Overlap detected for "short-range interaction": New interaction '
'"{\'expiration\': \'Shouting\', \'start_time\': \'10:10\', \'duration\': 15}"'
' overlaps with existing interaction "{\'expiration\': \'Shouting\', \'start_time\': \'10:00\', \'duration\': 30}".'
'Overlap detected: The entry '
'{\'expiration\': \'Shouting\', \'start_time\': \'10:10\', \'duration\': 15}'
' overlaps with an already existing entry '
'({\'expiration\': \'Shouting\', \'start_time\': \'10:00\', \'duration\': 30}).'
)
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
# Check if more than one group is defined
baseline_form.short_range_interactions = {
"group_1": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30},
{"expiration": "Shouting", "start_time": "10:10", "duration": 15}],
"group_2": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}]
}
# Checks if short_range_option relates with the short_range-interactions input
baseline_form.short_range_option = "short_range_yes"
baseline_form.short_range_interactions = {}
error = (
'When occupancy format is "static", there should be only one interaction group in "short_range_interactions".'
'When short_range_option input is set to "short_range_yes", the short_range_interactions '
'input should not be empty. Got {}.'
)
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
# Checks if more than one group is defined (legacy)
baseline_form.short_range_interactions = {
"group_A": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}],
"group_B": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}]
}
error = (
'Incompatible number of occupancy groups in the short_range_interactions input. '
'Got 2 groups when the maximum is 1.'
)
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
# Checks if more than one group is defined
baseline_form.occupancy = {
"group_A": {"total_people": 20, "infected": 10, "presence": [
{"start_time": "10:00", "finish_time": "12:00"},
{"start_time": "13:00", "finish_time": "17:00"},
],
}
}
baseline_form.short_range_interactions = {
"group_A": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}],
"group_B": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}]
}
error = (
'Incompatible number of occupancy groups in the short_range_interactions input. '
'Got 2 groups when the maximum is 1 (from the occupancy input).'
)
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()

View file

@ -213,7 +213,7 @@ def test_exposure_model_group_structure(data_registry, full_exposure_model: mode
"""
another_full_exposure_model = dc_utils.nested_replace(full_exposure_model,
{'concentration_model.infected.number': 2, })
with pytest.raises(ValueError, match=re.escape("All ExposureModels must have the same ConcentrationModel.")):
with pytest.raises(ValueError, match=re.escape("All ExposureModels must have the same infected number and presence in the ConcentrationModel.")):
models.ExposureModelGroup(data_registry, exposure_models=(full_exposure_model, another_full_exposure_model, ))

View file

@ -122,8 +122,8 @@ class VirusReportGenerator:
model: models.ExposureModel = report_data['model']
data_registry_version: typing.Optional[str] = f"v{model.data_registry.version}" if model.data_registry.version else None
# Alternative scenarios data
if form.occupancy_format == 'static':
# Alternative scenarios data (only generated in the legacy version - when occupancy input is empty)
if not form.occupancy:
context.update(alternative_scenarios_data(form, report_data, executor_factory))
# Alternative viral load data

View file

@ -1,8 +1,6 @@
// Input data for CO2 fitting algorithm
const CO2_data_form = [
"CO2_data",
"dynamic_exposed_occupancy",
"dynamic_infected_occupancy",
"exposed_coffee_break_option",
"exposed_coffee_duration",
"exposed_finish",
@ -20,7 +18,7 @@ const CO2_data_form = [
"infected_lunch_start",
"infected_people",
"infected_start",
"occupancy_format",
"occupancy",
"room_capacity",
"room_volume",
"specific_breaks",

View file

@ -1238,7 +1238,7 @@ $(document).ready(function () {
<div class='form-group row d-none'>
<div class="col-sm-6"><label class="col-form-label col-form-label-sm"> Exposure group:</label></div>
<div class="col-sm-6"><input type="text" id="sr_group_no_${index}" value="${value.exposure_group}" class="form-control form-control-sm short_range_option" name="short_range_exposure_group" placeholder="A" onchange="validate_sr_time(this)" form="not-submitted"><br></div>
<div class="col-sm-6"><input type="text" id="sr_group_no_${index}" value="${value.exposure_group}" class="form-control form-control-sm short_range_option" name="short_range_exposure_group" placeholder="group_1" onchange="validate_sr_time(this)" form="not-submitted"><br></div>
</div>
<div class="form-group" style="max-width: 8rem">
@ -1258,11 +1258,11 @@ $(document).ready(function () {
// When short_range_yes option is selected, we want to inject rows for each expiractory activity, start_time and duration.
$("body").on("click", ".add_node_btn_frm_field", function(e) {
let last_row = $(".form_field_outer").find(".form_field_outer_row");
if (last_row.length == 0) $("#dialog_sr").append(inject_sr_interaction(1, value = { activity: "", start_time: "", duration: "15", exposure_group: "A" }));
if (last_row.length == 0) $("#dialog_sr").append(inject_sr_interaction(1, value = { activity: "", start_time: "", duration: "15", exposure_group: "group_1" }));
else {
last_index = last_row.last().find(".short_range_option").prop("id").split("_").slice(-1)[0];
index = parseInt(last_index) + 1;
$("#dialog_sr").append(inject_sr_interaction(index, value = { activity: "", start_time: "", duration: "15", exposure_group: "A"}));
$("#dialog_sr").append(inject_sr_interaction(index, value = { activity: "", start_time: "", duration: "15", exposure_group: "group_1"}));
}
});

View file

@ -454,9 +454,7 @@
<span class="tooltip_text">?</span>
</div><br>
<input type="text" class="form-control d-none" name="occupancy_format" value="static" required> {# "static" vs. "dynamic" #}
<input type="text" class="form-control d-none" name="dynamic_exposed_occupancy" value="{}">
<input type="text" class="form-control d-none" name="dynamic_infected_occupancy" value="[]">
<input type="text" class="form-control d-none" name="occupancy" value="{}">
<div class="form-group row">
<div class="col-sm-4"><label class="col-form-label">Total number of occupants:</label></div>

View file

@ -72,7 +72,7 @@
{% block report_results scoped %}
<div class="card bg-light mb-3" id="results-div">
<div class="card-header"><strong>Results{% if form.occupancy_format == 'dynamic' %} - {{ group_id }}{% endif %}</strong>
<div class="card-header"><strong>Results{% if form.occupancy == {} %} - {{ group_id }}{% endif %}</strong>
<button class="icon_button p-0 float-right" data-toggle="collapse" href="#collapseResults-group_{{ group_id }}" role="button" aria-expanded="true" aria-controls="collapseResults">
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" class="bi bi-chevron-expand" viewBox="0 0 16 16">
<path fill-rule="evenodd" d="M3.646 9.146a.5.5 0 0 1 .708 0L8 12.793l3.646-3.647a.5.5 0 0 1 .708.708l-4 4a.5.5 0 0 1-.708 0l-4-4a.5.5 0 0 1 0-.708zm0-2.292a.5.5 0 0 0 .708 0L8 3.207l3.646 3.647a.5.5 0 0 0 .708-.708l-4-4a.5.5 0 0 0-.708 0l-4 4a.5.5 0 0 0 0 .708z"/>
@ -147,7 +147,7 @@
{% if form.exposure_option == "p_probabilistic_exposure" %}
<br>
<div class="align-self-center alert alert-dark mb-0" role="alert">
{% if form.occupancy_format == "static" %}
{% if form.occupancy == {} %}
The above {{ "result assumes" if form.short_range_option == "short_range_no" else "results assume" }} that <b>{{ form.infected_people }}
{{ "occupant is infected" if form.infected_people == 1 else "occupants are infected" }}
</b> in the room.
@ -204,7 +204,7 @@
</div>
</div>
<div class="card bg-light mb-3" id="results-div">
<div class="card-header"><strong>Result uncertainties{% if form.occupancy_format == 'dynamic' %} - {{ group_id }}{% endif %}</strong>
<div class="card-header"><strong>Result uncertainties{% if form.occupancy == {} %} - {{ group_id }}{% endif %}</strong>
<button class="icon_button p-0 float-right" data-toggle="collapse" href="#collapseUncertainties-group_{{ group_id }}" role="button" aria-expanded="true" aria-controls="collapseUncertainties">
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" class="bi bi-chevron-expand" viewBox="0 0 16 16">
<path fill-rule="evenodd" d="M3.646 9.146a.5.5 0 0 1 .708 0L8 12.793l3.646-3.647a.5.5 0 0 1 .708.708l-4 4a.5.5 0 0 1-.708 0l-4-4a.5.5 0 0 1 0-.708zm0-2.292a.5.5 0 0 0 .708 0L8 3.207l3.646 3.647a.5.5 0 0 0 .708-.708l-4-4a.5.5 0 0 0-.708 0l-4 4a.5.5 0 0 0 0 .708z"/>
@ -242,7 +242,7 @@
</div>
</div>
</div>
{% if form.short_range_option == "short_range_no" and form.occupancy_format == "static" %}
{% if form.short_range_option == "short_range_no" and form.occupancy == {} %}
<div class="card bg-light mb-3">
<div class="card-header"><strong>Alternative scenarios</strong>
<button class="icon_button p-0 float-right" data-toggle="collapse" href="#collapseAlternativeScenarios" role="button" aria-expanded="false" aria-controls="collapseAlternativeScenarios">
@ -353,7 +353,7 @@
<input type="text" class="form-control form-control-sm col-sm-4 ml-2" id="CO2_Concentration__rename" placeholder="CO₂ Concentration" value="CO₂ Concentration">
</div>
</div>
{% if form.short_range_option == "short_range_no" and form.occupancy_format == "static" %}
{% if form.short_range_option == "short_range_no" and form.occupancy == {} %}
</li>
<li>
Alternative Scenarios

View file

@ -11,7 +11,7 @@
{% set long_range_prob_inf = group_results.prob_inf %}
{% endif %}
{% if ((long_range_prob_inf > red_prob_lim) or (form.occupancy_format == "static" and group_results.expected_new_cases >= 1)) %}
{% if ((long_range_prob_inf > red_prob_lim) or (form.occupancy == {} and group_results.expected_new_cases >= 1)) %}
{% set long_range_scale_warning = 'red' %}
{% set long_range_warning_color= 'bg-danger' %}
{% elif (orange_prob_lim <= long_range_prob_inf <= red_prob_lim) %}
@ -22,7 +22,7 @@
{% set long_range_warning_color = 'bg-success' %}
{% endif %}
{% if ((group_results.prob_inf > red_prob_lim) or (form.occupancy_format == "static" and group_results.expected_new_cases >= 1)) %} {% set scale_warning = 'red' %}
{% if ((group_results.prob_inf > red_prob_lim) or (form.occupancy == {} and group_results.expected_new_cases >= 1)) %} {% set scale_warning = 'red' %}
{% elif (orange_prob_lim <= group_results.prob_inf <= red_prob_lim) %} {% set scale_warning = 'orange' %}
{% else %} {% set scale_warning = 'green' %}
{% endif %}
@ -139,12 +139,12 @@
<tr>
<th>Scenario</th>
<th>P(i)</th>
{% if form.occupancy_format == "static" %}<th>Expected new cases</th>{% endif %}
{% if form.occupancy == {} %}<th>Expected new cases</th>{% endif %}
</tr>
</thead>
<tbody>
{% for scenario_name, scenario_stats in alternative_scenarios.stats.items() %}
{%if (( scenario_stats.probability_of_infection > red_prob_lim) or (form.occupancy_format == "static" and scenario_stats.expected_new_cases >= 1)) %}
{%if (( scenario_stats.probability_of_infection > red_prob_lim) or (form.occupancy == {} and scenario_stats.expected_new_cases >= 1)) %}
<tr class="alert-danger">
{% elif (orange_prob_lim <= scenario_stats.probability_of_infection <= red_prob_lim) %}
<tr class="alert-warning">
@ -153,7 +153,7 @@
{% endif%}
<td> {{ scenario_name }}</td>
<td> {{ scenario_stats.probability_of_infection | non_zero_percentage }}</td>
{% if form.occupancy_format == "static" %}<td style="text-align:right">{{ scenario_stats.expected_new_cases | float_format }}</td>{% endif %}
{% if form.occupancy == {} %}<td style="text-align:right">{{ scenario_stats.expected_new_cases | float_format }}</td>{% endif %}
</tr>
{% endfor %}
</tbody>

View file

@ -90,6 +90,6 @@ def exposure_model_w_outside_temp_changes(data_registry, baseline_exposure_model
def baseline_form_with_sr(baseline_form_data, data_registry):
form_data_sr = baseline_form_data
form_data_sr['short_range_option'] = 'short_range_yes'
form_data_sr['short_range_interactions'] = '{"static": [{"expiration": "Shouting", "start_time": "10:30", "duration": 30}]}'
form_data_sr['short_range_interactions'] = '{"group_1": [{"expiration": "Shouting", "start_time": "10:30", "duration": 30}]}'
form_data_sr['short_range_occupants'] = 5
return virus_validator.VirusFormData.from_dict(form_data_sr, data_registry)

View file

@ -112,8 +112,8 @@ def test_expected_new_cases(baseline_form_with_sr: VirusFormData):
# Short- and Long-range contributions
report_data = rep_gen.calculate_report_data(baseline_form_with_sr, executor_factory)
sr_lr_expected_new_cases = report_data['groups']['static']['expected_new_cases']
sr_lr_prob_inf = report_data['groups']['static']['prob_inf']/100
sr_lr_expected_new_cases = report_data['groups']['group_1']['expected_new_cases']
sr_lr_prob_inf = report_data['groups']['group_1']['prob_inf']/100
# Long-range contributions alone
alternative_scenarios = rep_gen.manufacture_alternative_scenarios(baseline_form_with_sr)
@ -123,3 +123,27 @@ def test_expected_new_cases(baseline_form_with_sr: VirusFormData):
lr_expected_new_cases = alternative_statistics['stats']['Base scenario without short-range interactions']['expected_new_cases']
np.testing.assert_almost_equal(sr_lr_expected_new_cases, lr_expected_new_cases + sr_lr_prob_inf * baseline_form_with_sr.short_range_occupants, 2)
def test_alternative_scenarios(baseline_form):
"""
Tests if the alternative scenarios are only generated when
the occupancy input is empty ({}) - legacy usage.
"""
generator: VirusReportGenerator = make_app().settings['report_generator']
report_data = generator.prepare_context("", baseline_form, partial(
concurrent.futures.ThreadPoolExecutor, 1,
))
assert "alternative_scenarios" in report_data.keys()
baseline_form.occupancy = {
"group_A": {
"total_people": 10,
"infected": 5,
"presence": [{"start_time": "10:00", "finish_time": "11:00"}]
}
}
report_data = generator.prepare_context("", baseline_form, partial(
concurrent.futures.ThreadPoolExecutor, 1,
))
assert "alternative_scenarios" not in report_data.keys()