Added dynamic groups in backend and propagated required changes to requests

- Correction of typos

- Changes do model to accommodate group of exposure models

- Handled virus validator to accommodate group of exposure models

- Modifications to accomodate generation of results for exposure model group

- Added data registry attribute to exposuremodelgroups class

- Fixed test for dynamic models

- Defined ExposureModelGroup with required methods

- Adapted virus report data to generate results for groups of exposed population

- Build same distributions for different models

- Fixed bug with defaults

- Handled definition of a single ExposureModel root obj when only one group is defined

- Added short-range expirations per group

- Added full validation on short_range interactions with dynamic exposure model

- Added full set of tests

- Updated docstrings

- Added support for number of people in exposed population, which should be identical within each group

- Added type checks

- Added UI adjustments for expected new cases

- Adapted exposed population data format to accommodate constant total_people
This commit is contained in:
lrdossan 2024-10-03 16:44:28 +02:00
parent c50f9b4ad7
commit cd3df90c4c
16 changed files with 1118 additions and 475 deletions

View file

@ -799,7 +799,7 @@ Activity.types = {
@dataclass(frozen=True)
class SimplePopulation:
"""
Represents a group of people all with exactly the same behaviour and
Represents a group of people all with exactly the same behavior and
situation.
"""
@ -844,7 +844,7 @@ class SimplePopulation:
@dataclass(frozen=True)
class Population(SimplePopulation):
"""
Represents a group of people all with exactly the same behaviour and
Represents a group of people all with exactly the same behavior and
situation, considering the usage of mask and a certain host immunity.
"""
@ -1324,6 +1324,9 @@ class ShortRangeModel:
#: Interpersonal distances
distance: _VectorisedFloat
#: Expiration definition
expiration_def: typing.Optional[str] = None
def dilution_factor(self) -> _VectorisedFloat:
'''
The dilution factor for the respective expiratory activity type.
@ -1653,6 +1656,9 @@ class ExposureModel:
In other words, the air exchange rate from the
ventilation, and the virus decay constant, must
not be given as arrays.
It also checks that the number of exposed is
static during the simulation time.
"""
c_model = self.concentration_model
# Check if the diameter is vectorised.
@ -1663,6 +1669,11 @@ class ExposureModel:
c_model.ventilation.air_exchange(c_model.room, time)) for time in c_model.state_change_times()))):
raise ValueError("If the diameter is an array, none of the ventilation parameters "
"or virus decay constant can be arrays at the same time.")
# Check if exposed population is static
if not isinstance(self.exposed.number, int) or not isinstance(self.exposed.presence, Interval):
raise TypeError("The exposed number must be an int and presence an Interval. "
f"Got {type(self.exposed.number)} and {type(self.exposed.presence)}.")
@method_cache
def population_state_change_times(self) -> typing.List[float]:
@ -1809,11 +1820,9 @@ class ExposureModel:
The number of virus per m^3 deposited on the respiratory tract.
"""
population_change_times = self.population_state_change_times()
deposited_exposure = []
for start, stop in zip(population_change_times[:-1], population_change_times[1:]):
deposited_exposure.append(self.deposited_exposure_between_bounds(start, stop))
return deposited_exposure
def deposited_exposure(self) -> _VectorisedFloat:
@ -1838,8 +1847,7 @@ class ExposureModel:
return (1 - np.prod([1 - prob for prob in self._infection_probability_list()], axis = 0)) * 100
def total_probability_rule(self) -> _VectorisedFloat:
if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or
isinstance(self.exposed.number, IntPiecewiseConstant)):
if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant)):
raise NotImplementedError("Cannot compute total probability "
"(including incidence rate) with dynamic occupancy")
@ -1847,9 +1855,9 @@ class ExposureModel:
sum_probability = 0.0
# Create an equivalent exposure model but changing the number of infected cases.
total_people = self.concentration_model.infected.number + self.exposed.number
total_people = self.concentration_model.infected.number + self.exposed.number # type: ignore
max_num_infected = (total_people if total_people < 10 else 10)
# The influence of a higher number of simultainious infected people (> 4 - 5) yields an almost negligible contirbution to the total probability.
# The influence of a higher number of simultaneous infected people (> 4 - 5) yields an almost negligible contribution to the total probability.
# To be on the safe side, a hard coded limit with a safety margin of 2x was set.
# Therefore we decided a hard limit of 10 infected people.
for num_infected in range(1, max_num_infected + 1):
@ -1872,43 +1880,88 @@ class ExposureModel:
1) Long-range exposure: take the infection_probability and multiply by the occupants exposed to long-range.
2) Short- and long-range exposure: take the infection_probability of long-range multiplied by the occupants exposed to long-range only,
plus the infection_probability of short- and long-range multiplied by the occupants exposed to short-range only.
Currently disabled when dynamic occupancy is defined for the exposed population.
"""
if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or
isinstance(self.exposed.number, IntPiecewiseConstant)):
raise NotImplementedError("Cannot compute expected new cases "
"with dynamic occupancy")
number = self.exposed.number
if self.short_range != ():
new_cases_long_range = nested_replace(self, {'short_range': [],}).infection_probability() * (self.exposed.number - self.exposed_to_short_range)
new_cases_long_range = nested_replace(self, {'short_range': [],}).infection_probability() * (number - self.exposed_to_short_range) # type: ignore
return (new_cases_long_range + (self.infection_probability() * self.exposed_to_short_range)) / 100
return self.infection_probability() * self.exposed.number / 100
return self.infection_probability() * number / 100
def reproduction_number(self) -> _VectorisedFloat:
"""
The reproduction number can be thought of as the expected number of
cases directly generated by one infected case in a population.
Currently disabled when dynamic occupancy is defined for both the infected and exposed population.
"""
if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or
isinstance(self.exposed.number, IntPiecewiseConstant)):
raise NotImplementedError("Cannot compute reproduction number "
"with dynamic occupancy")
if self.concentration_model.infected.number == 1:
infected_population: InfectedPopulation = self.concentration_model.infected
if isinstance(infected_population.number, int) and infected_population.number == 1:
return self.expected_new_cases()
# Create an equivalent exposure model but with precisely
# one infected case.
# one infected case, respecting the presence interval.
single_exposure_model = nested_replace(
self, {
'concentration_model.infected.number': 1}
'concentration_model.infected.number': 1,
'concentration_model.infected.presence': infected_population.presence_interval(),
}
)
return single_exposure_model.expected_new_cases()
@dataclass(frozen=True)
class ExposureModelGroup:
"""
Represents a group of exposure models. This is to handle the case
when different groups of people come and go in the room at different
times. These groups are then handled fully independently, with
exposure dose and probability of infection defined for each of them.
"""
data_registry: DataRegistry
#: The set of exposure models for each exposed population
exposure_models: typing.Tuple[ExposureModel, ...]
@method_cache
def _deposited_exposure_list(self) -> typing.List[_VectorisedFloat]:
"""
List of doses absorbed by each member of the groups.
"""
return [model.deposited_exposure() for model in self.exposure_models]
@method_cache
def _infection_probability_list(self):
"""
List of the probability of infection for each group.
"""
return [model.infection_probability() for model in self.exposure_models] # type: ignore
def expected_new_cases(self) -> _VectorisedFloat:
"""
Final expected number of new cases considering the
contribution of each individual probability of infection.
"""
return np.sum([model.expected_new_cases() for model in self.exposure_models], axis=0) # type: ignore
def reproduction_number(self) -> _VectorisedFloat:
"""
Reproduction number considering the contribution
of each individual probability of infection and
a single infected occupant.
"""
single_exposure_models = []
for model in self.exposure_models:
if model.concentration_model.infected.number != 1:
model = nested_replace(
self, {
'model.concentration_model.infected.number': 1
}
)
single_exposure_models.append(model)
single_exposure_model_group = nested_replace(
self, {
'exposure_models': single_exposure_models,
}
)
return single_exposure_model_group.expected_new_cases()

View file

@ -12,13 +12,37 @@ from caimira.calculator.validators.virus.virus_validator import VirusFormData
def model_start_end(model: models.ExposureModel):
"""
Calculates the start and end times for a single ExposureModel.
Determines the boundary times of an ExposureModel by comparing
the presence intervals of both the exposed and the infected people.
"""
t_start = min(model.exposed.presence_interval().boundaries()[0][0],
model.concentration_model.infected.presence_interval().boundaries()[0][0])
t_end = max(model.exposed.presence_interval().boundaries()[-1][1],
model.concentration_model.infected.presence_interval().boundaries()[-1][1])
return t_start, t_end
def model_boundary_times(model: typing.Union[models.ExposureModelGroup, models.ExposureModel]):
"""
Calculates the boundary times for an ExposureModel or ExposureModelGroup.
For a single ExposureModel, determines its boundary times by comparing
the presence intervals of both the exposed and the infected people.
For an ExposureModelGroup, finds the earliest start time and the latest end time
across all models in the group.
"""
if isinstance(model, models.ExposureModelGroup):
t_start = min((model_start_end(nth_model)[0] for nth_model in model.exposure_models))
t_end = max((model_start_end(nth_model)[1] for nth_model in model.exposure_models))
return t_start, t_end
else:
return model_start_end(model)
def fill_big_gaps(array, gap_size):
"""
Insert values into the given sorted list if there is a gap of more than ``gap_size``.
@ -42,7 +66,7 @@ def fill_big_gaps(array, gap_size):
return result
def non_temp_transition_times(model: models.ExposureModel):
def non_temp_transition_times(model: typing.Union[models.ExposureModelGroup, models.ExposureModel]):
"""
Return the non-temperature (and PiecewiseConstant) based transition times.
@ -60,10 +84,10 @@ def non_temp_transition_times(model: models.ExposureModel):
else:
yield name, obj
t_start, t_end = model_start_end(model)
t_start, t_end = model_boundary_times(model)
change_times = {t_start, t_end}
for name, obj in walk_model(model, name="exposure"):
for _, obj in walk_model(model, name="exposure"):
if isinstance(obj, models.Interval):
change_times |= obj.transition_times()
@ -72,7 +96,8 @@ def non_temp_transition_times(model: models.ExposureModel):
return sorted(time for time in change_times if (t_start <= time <= t_end))
def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional[int] = None) -> typing.List[float]:
def interesting_times(model: typing.Union[models.ExposureModelGroup, models.ExposureModel],
approx_n_pts: typing.Optional[int] = None) -> typing.List[float]:
"""
Pick approximately ``approx_n_pts`` time points which are interesting for the
given model. If not provided by argument, ``approx_n_pts`` is set to be 15 times
@ -94,113 +119,195 @@ def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional
return nice_times
def concentrations_with_sr_breathing(form: VirusFormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]:
def process_short_range_interactions(model: typing.Union[models.ExposureModelGroup, models.ExposureModel],
times: typing.List[float]):
"""
Process both ExposureModel and ExposureModelGroup for short-range
expirations, intervals and concentrations. Returns a tuple containing
lower concentrations, short-range expirations, and short-range intervals.
"""
if isinstance(model, models.ExposureModelGroup):
model_list = model.exposure_models
elif isinstance(model, models.ExposureModel):
model_list = (model,)
else:
raise TypeError(f"Model should be either an instance of ExposureModel or ExposureModelGroup. Got '{type(model)}.'")
# Collect short-range expirations and intervals
short_range_expirations: typing.List[str] = []
short_range_intervals: typing.List[models.BoundarySequence_t] = []
for model in model_list:
for short_range_model in model.short_range:
short_range_expirations.append(short_range_model.expiration_def) # type: ignore
short_range_intervals.extend(short_range_model.presence.boundaries())
# Collect lower concentrations (including Breathing)
lower_concentrations = []
for time in times:
for index, (start, stop) in enumerate(short_range_intervals):
# For visualization issues, add short-range breathing activity to the initial long-range concentrations
if start <= time <= stop and form.short_range_interactions[index]['expiration'] == 'Breathing':
lower_concentrations.append(
np.array(model.concentration(float(time))).mean())
breathing_found = False
for model in model_list:
for short_range_model in model.short_range:
((start, stop),) = short_range_model.presence.boundaries()
# Check if the expiration is "Breathing" and the if time is within boundaries
if short_range_model.expiration_def == 'Breathing' and (start <= time <= stop):
lower_concentrations.append(np.sum([np.array(model.concentration(float(time))).mean() for model in model_list]))
breathing_found = True
break
if breathing_found:
break
lower_concentrations.append(
np.array(model.concentration_model.concentration(float(time))).mean())
return lower_concentrations
lower_concentrations.append(np.sum([np.array(model.concentration_model.concentration(float(time))).mean() for model in model_list]))
return lower_concentrations, short_range_expirations, short_range_intervals
def _calculate_deposited_exposure(model, time1, time2, fn_name=None):
return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name
def _calculate_deposited_exposure(model: typing.Union[models.ExposureModelGroup, models.ExposureModel],
time1: float, time2: float, fn_name: typing.Optional[str] = None):
if isinstance(model, models.ExposureModelGroup):
return np.sum([np.array(nth_model.deposited_exposure_between_bounds(float(time1), float(time2))).mean() for nth_model in model.exposure_models]), fn_name
else:
return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name
def _calculate_long_range_deposited_exposure(model, time1, time2, fn_name=None):
return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name
def _calculate_long_range_deposited_exposure(model: typing.Union[models.ExposureModelGroup, models.ExposureModel],
time1: float, time2: float, fn_name: typing.Optional[str] = None):
if isinstance(model, models.ExposureModelGroup):
return np.sum([np.array(nth_model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean() for nth_model in model.exposure_models]), fn_name
else:
return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name
def _calculate_co2_concentration(CO2_model, time, fn_name=None):
def _calculate_concentration(model: typing.Union[models.ExposureModelGroup, models.ExposureModel],
time: float, fn_name: typing.Optional[str] = None):
if isinstance(model, models.ExposureModelGroup):
return np.sum([np.array(nth_model.concentration(float(time))).mean() for nth_model in model.exposure_models]), fn_name
else:
return np.array(model.concentration(float(time))).mean(), fn_name
def _calculate_co2_concentration(CO2_model: models.CO2ConcentrationModel, time: float, fn_name: typing.Optional[str] = None):
return np.array(CO2_model.concentration(float(time))).mean(), fn_name
@profiler.profile
def calculate_report_data(form: VirusFormData, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]:
model: models.ExposureModel = form.build_model()
"""
General output data of a test scenario.
"""
model: typing.Union[models.ExposureModel, models.ExposureModelGroup] = form.build_model()
times = interesting_times(model)
short_range_intervals = [interaction.presence.boundaries()[0]
for interaction in model.short_range]
short_range_expirations = [interaction['expiration']
for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else []
if isinstance(model, models.ExposureModelGroup):
exposed_presence_intervals = []
probabilities_of_infection = []
for nth_model in model.exposure_models:
exposed_presence_intervals.extend(list(nth_model.exposed.presence_interval().boundaries()))
probabilities_of_infection.append(nth_model.infection_probability())
index_of_max_mean = max(
range(len(probabilities_of_infection)),
key=lambda i: probabilities_of_infection[i].mean()
)
probability_of_infection = probabilities_of_infection[index_of_max_mean]
elif isinstance(model, models.ExposureModel):
exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()]
probability_of_infection = model.infection_probability()
else:
raise TypeError(f"Model should be either an instance of ExposureModel or ExposureModelGroup. Got '{type(model)}.'")
# Handle short-range related outputs
lower_concentrations, short_range_expirations, short_range_intervals = None, None, None
# Short-range related data:
if (form.short_range_option == "short_range_yes"):
lower_concentrations, short_range_expirations, short_range_intervals = process_short_range_interactions(model, times)
# Probability of infection
prob = probability_of_infection
prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True)
concentrations = [
np.array(model.concentration(float(time))).mean()
for time in times
]
lower_concentrations = concentrations_with_sr_breathing(
form, model, times, short_range_intervals)
# Expected new cases
expected_new_cases = np.array(model.expected_new_cases()).mean()
# Expected number of new cases per group
# expected_new_cases_per_group = [np.array(model.expected_new_cases()).mean() for model in models_set.exposure_models]
# CO2 concentration
CO2_model: models.CO2ConcentrationModel = form.build_CO2_model()
# compute deposited exposures and CO2 concentrations in parallel to increase performance
# Compute deposited exposures and virus/CO2 concentrations in parallel to increase performance
deposited_exposures = []
long_range_deposited_exposures = []
CO2_concentrations = []
concentrations = []
tasks = []
with executor_factory() as executor:
with executor_factory() as executor: # TODO: parallelism in the models
for time1, time2 in zip(times[:-1], times[1:]):
tasks.append(executor.submit(
_calculate_deposited_exposure, model, time1, time2, fn_name="de"))
tasks.append(executor.submit(
_calculate_long_range_deposited_exposure, model, time1, time2, fn_name="lr"))
# co2 concentration: takes each time as param, not the interval
_calculate_long_range_deposited_exposure, model, time1, time2, fn_name="de_lr"))
tasks.append(executor.submit(
_calculate_concentration, model, time1, fn_name="cn"))
# virus and co2 concentration: takes each time as param, not the interval
tasks.append(executor.submit(
_calculate_co2_concentration, CO2_model, time1, fn_name="co2"))
# co2 concentration: calculate the last time too
# virus and co2 concentration: calculate the last time too
tasks.append(executor.submit( _calculate_concentration,
model, times[-1], fn_name="cn"))
tasks.append(executor.submit(_calculate_co2_concentration,
CO2_model, times[-1], fn_name="co2"))
CO2_model, times[-1], fn_name="co2"))
for task in tasks:
result, fn_name = task.result()
if fn_name == "de":
deposited_exposures.append(result)
elif fn_name == "lr":
elif fn_name == "de_lr":
long_range_deposited_exposures.append(result)
elif fn_name == "cn":
concentrations.append(result)
elif fn_name == "co2":
CO2_concentrations.append(result)
cumulative_doses = np.cumsum(deposited_exposures)
long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures)
prob = np.array(model.infection_probability())
prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True)
# Probabilistic exposure and expected new cases (only for static occupancy)
prob_probabilistic_exposure = None
expected_new_cases = None
if form.occupancy_format == "static":
if form.exposure_option == "p_probabilistic_exposure":
prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean()
expected_new_cases = np.array(model.expected_new_cases()).mean()
exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()]
if isinstance(model, models.ExposureModel) and form.exposure_option == "p_probabilistic_exposure":
prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean()
conditional_probability_data = None
uncertainties_plot_src = None
if (form.conditional_probability_viral_loads and
model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] == ViralLoads.COVID_OVERALL.value): # type: ignore
# Generate all the required data for the conditional probability plot
conditional_probability_data = manufacture_conditional_probability_data(
model, prob)
# Generate the matplotlib image based on the received data
uncertainties_plot_src = img2base64(_figure2bytes(
uncertainties_plot(prob, conditional_probability_data)))
if form.conditional_probability_viral_loads:
if isinstance(model, models.ExposureModelGroup):
all_the_same_virus = True
for nth_model in model.exposure_models:
if nth_model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] != ViralLoads.COVID_OVERALL.value: # type: ignore
all_the_same_virus = False
if all_the_same_virus:
# Given the similarities, pick the first exposure model
the_model: models.ExposureModel = model.exposure_models[0]
# Generate all the required data for the conditional probability plot
conditional_probability_data = manufacture_conditional_probability_data(the_model, prob)
# Generate the matplotlib image based on the received data
uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(prob, conditional_probability_data)))
elif isinstance(model, models.ExposureModel):
if model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] != ViralLoads.COVID_OVERALL.value: # type: ignore
# Generate all the required data for the conditional probability plot
conditional_probability_data = manufacture_conditional_probability_data(model, prob)
# Generate the matplotlib image based on the received data
uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(prob, conditional_probability_data)))
else:
raise TypeError(f"Model should be either an instance of ExposureModel or ExposureModelGroup. Got '{type(model)}.'")
return {
"model": model,
"model": model.exposure_models[0] if isinstance(model, models.ExposureModelGroup) else model, # TODO: which model do we want to show info about?
"times": list(times),
"exposed_presence_intervals": exposed_presence_intervals,
"short_range_intervals": short_range_intervals,
"short_range_expirations": short_range_expirations,
"concentrations": concentrations,
"concentrations": list(concentrations),
"concentrations_zoomed": lower_concentrations,
"cumulative_doses": list(cumulative_doses),
"long_range_cumulative_doses": list(long_range_cumulative_doses),
@ -349,7 +456,7 @@ def calculate_vl_scenarios_percentiles(model: mc.ExposureModel) -> typing.Dict[s
}
def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]:
def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, typing.Union[mc.ExposureModel, mc.ExposureModelGroup]]:
scenarios = {}
if (form.short_range_option == "short_range_no"):
# Two special option cases - HEPA and/or FFP2 masks.
@ -401,45 +508,60 @@ def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, m
scenarios['No ventilation with Type I masks'] = with_mask_no_vent.build_mc_model()
if not (form.mask_wearing_option == 'mask_off' and form.ventilation_type == 'no_ventilation'):
scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model()
else:
# When dynamic occupancy is defined, the replace of total people is useless - the expected number of new cases is not calculated.
# Adjust the number of exposed people with long-range exposure based on short-range interactions
if form.occupancy_format == 'static':
no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[], total_people=form.total_people - form.short_range_occupants)
no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions={}, total_people=form.total_people - form.short_range_occupants)
elif form.occupancy_format == 'dynamic':
for occ in form.dynamic_exposed_occupancy: # Update the number of exposed people with long-range exposure
if occ['total_people'] > form.short_range_occupants: occ['total_people'] = max(0, occ['total_people'] - form.short_range_occupants)
no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[], dynamic_exposed_occupancy=form.dynamic_exposed_occupancy)
for group_name, occupancy_list in form.dynamic_exposed_occupancy.items():
# Check if the group exists in short-range interactions
if group_name in form.short_range_interactions:
short_range_count = form.short_range_occupants
total_people = occupancy_list['total_people']
if total_people > short_range_count > 0:
# Update the total_people with the adjusted value
occupancy_list['total_people'] = max(0, total_people - short_range_count)
no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions={}, dynamic_exposed_occupancy=form.dynamic_exposed_occupancy)
scenarios['Base scenario without short-range interactions'] = no_short_range_alternative.build_mc_model()
return scenarios
def scenario_statistics(
mc_model: mc.ExposureModel,
mc_model: typing.Union[mc.ExposureModel, mc.ExposureModelGroup],
sample_times: typing.List[float],
static_occupancy: bool,
compute_prob_exposure: bool,
):
model = mc_model.build_model(
model: typing.Union[models.ExposureModelGroup, models.ExposureModel] = mc_model.build_model(
size=mc_model.data_registry.monte_carlo['sample_size'])
return {
'probability_of_infection': np.mean(model.infection_probability()),
'expected_new_cases': np.mean(model.expected_new_cases()) if static_occupancy else None,
'concentrations': [
if isinstance(model, models.ExposureModelGroup):
concentrations = [
np.sum([np.array(nth_model.concentration(float(time))).mean() for nth_model in model.exposure_models])
for time in sample_times
]
prob = np.max([np.mean(nth_model.infection_probability()) for nth_model in model.exposure_models])
elif isinstance(model, models.ExposureModel):
concentrations = [
np.mean(model.concentration(time))
for time in sample_times
],
'prob_probabilistic_exposure': model.total_probability_rule() if compute_prob_exposure else None,
]
prob = np.mean(model.infection_probability())
else:
raise TypeError(f"Model should be either an instance of ExposureModel or ExposureModelGroup. Got '{type(model)}.'")
return {
'probability_of_infection': prob,
'expected_new_cases': np.mean(model.expected_new_cases()),
'concentrations': concentrations,
'prob_probabilistic_exposure': model.total_probability_rule() if isinstance(model, models.ExposureModel) and compute_prob_exposure else None
}
def comparison_report(
form: VirusFormData,
report_data: typing.Dict[str, typing.Any],
scenarios: typing.Dict[str, mc.ExposureModel],
scenarios: typing.Dict[str, typing.Union[mc.ExposureModel, mc.ExposureModelGroup]],
executor_factory: typing.Callable[[], concurrent.futures.Executor],
):
if (form.short_range_option == "short_range_no"):
@ -461,7 +583,6 @@ def comparison_report(
scenario_statistics,
scenarios.values(),
[report_data['times']] * len(scenarios),
[static_occupancy] * len(scenarios),
[compute_prob_exposure] * len(scenarios),
timeout=60,
)

View file

@ -28,8 +28,8 @@ class CO2FormData(FormData):
# and the defaults in any html form must not be contradictory.
_DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = {
'CO2_data': '{}',
'dynamic_exposed_occupancy': '{}',
'dynamic_infected_occupancy': '[]',
'dynamic_exposed_occupancy': '[]',
'exposed_coffee_break_option': 'coffee_break_0',
'exposed_coffee_duration': 5,
'exposed_finish': '17:30',
@ -74,7 +74,7 @@ class CO2FormData(FormData):
raise TypeError(f'The room capacity should be a valid integer (> 0). Got {self.room_capacity}.')
# Validate specific inputs - breaks (exposed and infected)
if self.specific_breaks != {}:
if self.specific_breaks != {} and self.occupancy_format == 'static':
if type(self.specific_breaks) is not dict:
raise TypeError('The specific breaks should be in a dictionary.')
@ -188,11 +188,6 @@ class CO2FormData(FormData):
return img2base64(_figure2bytes(fig)), vent_plot_data
def population_present_changes(self, infected_presence: models.Interval, exposed_presence: models.Interval) -> typing.List[float]:
state_change_times = set(infected_presence.transition_times())
state_change_times.update(exposed_presence.transition_times())
return sorted(state_change_times)
def ventilation_transition_times(self) -> typing.Tuple[float]:
'''
Check if the last time from the input data is
@ -207,45 +202,16 @@ class CO2FormData(FormData):
return tuple(vent_states)
def build_model(self, sample_size = None) -> models.CO2DataModel:
# Build a simple infected and exposed population for the case when presence
# intervals and number of people are dynamic. Activity type is not needed.
if self.occupancy_format == 'dynamic':
if isinstance(self.dynamic_infected_occupancy, typing.List) and len(self.dynamic_infected_occupancy) > 0:
infected_people = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy)
infected_presence = None
else:
raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_infected_occupancy}".')
if isinstance(self.dynamic_exposed_occupancy, typing.List) and len(self.dynamic_exposed_occupancy) > 0:
exposed_people = self.generate_dynamic_occupancy(self.dynamic_exposed_occupancy)
exposed_presence = None
else:
raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_exposed_occupancy}".')
else:
infected_people = self.infected_people
exposed_people = self.total_people - self.infected_people
infected_presence = self.infected_present_interval()
exposed_presence = self.exposed_present_interval()
infected_population = models.SimplePopulation(
number=infected_people,
presence=infected_presence,
activity=None, # type: ignore
)
exposed_population=models.SimplePopulation(
number=exposed_people,
presence=exposed_presence,
activity=None, # type: ignore
)
all_state_changes=self.population_present_changes(infected_population.presence_interval(),
exposed_population.presence_interval())
total_people = [infected_population.people_present(stop) + exposed_population.people_present(stop)
for _, stop in zip(all_state_changes[:-1], all_state_changes[1:])]
"""
Builds a CO2 data model that considers data
from the defined population groups.
"""
occupancy = self.build_CO2_piecewise()
return models.CO2DataModel(
data_registry=self.data_registry,
room=models.Room(volume=self.room_volume, capacity=self.room_capacity),
occupancy=models.IntPiecewiseConstant(transition_times=tuple(all_state_changes), values=tuple(total_people)),
occupancy=occupancy,
ventilation_transition_times=self.ventilation_transition_times(),
times=self.CO2_data['times'],
CO2_concentrations=self.CO2_data['CO2'],

View file

@ -16,7 +16,7 @@ DEFAULTS = {
'calculator_version': NO_DEFAULT,
'ceiling_height': 0.,
'conditional_probability_viral_loads': False,
'dynamic_exposed_occupancy': '[]',
'dynamic_exposed_occupancy': '{}',
'dynamic_infected_occupancy': '[]',
'event_month': 'January',
'exposed_coffee_break_option': 'coffee_break_0',
@ -56,7 +56,7 @@ DEFAULTS = {
'room_number': NO_DEFAULT,
'room_volume': 0.,
'sensor_in_use': '',
'short_range_interactions': '[]',
'short_range_interactions': '{}',
'short_range_occupants': 0,
'short_range_option': 'short_range_no',
'simulation_name': NO_DEFAULT,
@ -82,7 +82,8 @@ DEFAULTS = {
# ------------------ Validation ----------------------
COFFEE_OPTIONS_INT = {'coffee_break_0': 0, 'coffee_break_1': 1,
'coffee_break_2': 2, 'coffee_break_4': 4}
'coffee_break_2': 2, 'coffee_break_3': 3,
'coffee_break_4': 4}
CONFIDENCE_LEVEL_OPTIONS = {'confidence_low': 10,
'confidence_medium': 5, 'confidence_high': 2}
MECHANICAL_VENTILATION_TYPES = {

View file

@ -6,6 +6,7 @@ import ast
import json
import re
from collections import defaultdict
import numpy as np
from .defaults import DEFAULTS, NO_DEFAULT, COFFEE_OPTIONS_INT
@ -42,7 +43,7 @@ class FormData:
total_people: int
# Dynamic occupancy inputs
dynamic_exposed_occupancy: list
dynamic_exposed_occupancy: dict
dynamic_infected_occupancy: list
data_registry: DataRegistry
@ -96,8 +97,198 @@ class FormData:
if default is not NO_DEFAULT and value in [default, 'not-applicable']:
form_dict.pop(attr)
return form_dict
def validate_dynamic_input(self, dynamic_input_value, input_name, dynamic_input_key = None):
# Check if the input is a valid non-empty list
specific_group_msg = f' in "{dynamic_input_key}" ("presence")' if dynamic_input_key else ''
if not isinstance(dynamic_input_value, list) :
raise TypeError(f'The input "{input_name}"{specific_group_msg} should be a valid, non-empty list. Got "{type(dynamic_input_value)}".')
# Check if input is populated
if len(dynamic_input_value) == 0:
raise ValueError(f'The input "{input_name}" should be a valid, non-empty list. Got "{dynamic_input_value}".')
# To store already processed interactions for overlap checking
existing_dynamic_infected_interval = []
existing_dynamic_exposed_interval = []
short_range_existing_interaction = []
for entry in dynamic_input_value:
# Check if each entry is a dictionary
if not isinstance(entry, typing.Dict):
raise TypeError(f'Each entry in "{input_name}" should be a dictionary. Got "{type(entry)}".')
# Check for required keys in each entry
dict_keys = entry.keys()
# Check for the "start_time" and "finish_time" keys for "dynamic_exposed_occupancy" and "dynamic_infected_occupancy"
if input_name in ["dynamic_exposed_occupancy", "dynamic_infected_occupancy"]:
# Check for time format in "start_time" and "finish_time"
for time_key in ["start_time", "finish_time"]:
if time_key not in dict_keys:
raise TypeError(f'Missing "{time_key}" key in "{input_name}"{specific_group_msg}. Got keys: "{list(dict_keys)}".')
time_value = entry[time_key]
if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time_value):
raise ValueError(f'Invalid time format for "{time_key}" in "{input_name}"{specific_group_msg}. Expected "HH:MM". Got "{time_value}".')
if entry["finish_time"] <= entry["start_time"]:
raise ValueError(f'Occupancy presence in "{input_name}"{specific_group_msg} entry has a finish time after the start time. Got start time: "{entry["start_time"]}" and finish time: "{entry["finish_time"]}".')
# Check for the "dynamic_infected_occupancy" uniqueness of intervals,
# as well as the "total_people" keyword and its constraints
if input_name == "dynamic_infected_occupancy":
self.check_overlap(entry, existing_dynamic_infected_interval)
existing_dynamic_infected_interval.append(entry)
if "total_people" not in dict_keys:
raise TypeError(f'Missing "total_people" key in "dynamic_infected_occupancy". Got keys: "{list(dict_keys)}".')
else:
value = entry["total_people"]
if not isinstance(value, int) or value < 0:
raise ValueError(f'The "total_people" in "{input_name}" should be a non-negative integer. Got "{value}".')
# Check for the "dynamic_exposed_occupancy" uniqueness of intervals
if input_name == "dynamic_exposed_occupancy":
self.check_overlap(entry, existing_dynamic_exposed_interval)
existing_dynamic_exposed_interval.append(entry)
# Check for remaining short-range inputs
if input_name == "short_range_interactions":
# Check for time format in "start_time" and "finish_time"
if "start_time" not in dict_keys:
raise TypeError(f'Missing "start_time" key in "short_range_interactions", "{dynamic_input_key}". Got keys: "{list(dict_keys)}".')
start_time = entry["start_time"]
if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(start_time):
raise ValueError(f'Invalid time format for "start_time" in "short_range_interactions", "{dynamic_input_key}". Expected "HH:MM". Got "{start_time}".')
# Check for the "expiration" key and its constraints
if "expiration" not in dict_keys:
raise TypeError(f'Missing "expiration" key in "short_range_interactions", "{dynamic_input_key}". Got keys: "{list(dict_keys)}".')
else:
value = entry["expiration"]
if value not in list(self.data_registry.expiration_particle['expiration_BLO_factors'].keys()):
raise ValueError(f'The "expiration" in "short_range_interactions", "{dynamic_input_key}", does not exist in the registry. Got "{value}".')
if "duration" not in dict_keys:
raise TypeError(f'Missing "duration" key in "short_range_interactions", "{dynamic_input_key}". Got keys: "{list(dict_keys)}".')
duration = entry["duration"]
if duration < 0:
raise ValueError(f'The "duration" in "short_range_interactions", "{dynamic_input_key}", should be a non-negative integer. Got "{duration}".')
# Occupancy format dependent inputs
if self.occupancy_format == 'dynamic':
# Find corresponding exposure group
exposure_group_obj = next(
(occupancy_group for occupancy_key, occupancy_group in self.dynamic_exposed_occupancy.items()
if occupancy_key == dynamic_input_key),
None
)
if exposure_group_obj is None:
raise ValueError(
f'Exposure group "{dynamic_input_key}" in short-range interaction not found in dynamic exposed occupancy.'
)
is_within_any_long_range = False
for exposure_interval in exposure_group_obj['presence']:
# Check for correct timing within long-range exposure and overlaps with existing interactions
long_range_start = time_string_to_minutes(exposure_interval['start_time'])/60
long_range_stop = time_string_to_minutes(exposure_interval['finish_time'])/60
# Flag to check if interaction falls within any long-range exposure interval
if self.check_time_and_overlap(
entry, short_range_existing_interaction, long_range_start, long_range_stop
): is_within_any_long_range = True
# Add interaction to the list of processed interactions if within long-range
short_range_existing_interaction.append(entry)
# If no long-range interval contains the interaction, raise an error
if not is_within_any_long_range:
raise ValueError(
f'Short-range interaction "{entry}" does not fall within any long-range exposure interval in "{dynamic_input_key}".'
)
elif self.occupancy_format == 'static':
# It means that we have a single exposure model
long_range_start = min(self.infected_start, self.exposed_start)/60
long_range_stop = max(self.infected_finish, self.exposed_finish)/60
if not self.check_time_and_overlap(entry, short_range_existing_interaction, long_range_start, long_range_stop):
raise ValueError(
f'Short-range interactions should be defined during simulation time. Got "{entry}".'
)
# Add interaction to the list of processed interactions
short_range_existing_interaction.append(entry)
else:
raise TypeError(
f'Undefined exposure type. Got "{self.occupancy_format}", accepted formats are "dynamic" or "exposed".')
def validate_dynamic_exposed_format(self, dynamic_exposed_group: dict, key: str):
"""
Validates the expected keywords for the dynamic
exposed input.
"""
# Total people
if 'total_people' not in dynamic_exposed_group:
raise TypeError(f'Missing "total_people" key in "dynamic_exposed_occupancy" group "{key}". Got keys: "{list(dynamic_exposed_group.keys())}".')
else:
total_people = dynamic_exposed_group['total_people']
if not isinstance(total_people, int) or total_people < 0:
raise ValueError(f'The "total_people" in "dynamic_exposed_occupancy" group "{key}" should be a non-negative integer. Got "{total_people}".')
# Presence
if 'presence' not in dynamic_exposed_group:
raise TypeError(f'Missing "presence" key in "dynamic_exposed_occupancy" group "{key}". Got keys: "{list(dynamic_exposed_group.keys())}".')
def get_start_and_finish_time(self, entry: dict):
entry_start = time_string_to_minutes(entry["start_time"])/60
if "finish_time" in list(entry.keys()):
entry_finish = time_string_to_minutes(entry["finish_time"])/60
else:
entry_finish = entry_start + entry['duration']/60
return entry_start, entry_finish
def check_time_and_overlap(self, interaction, existing_interactions, lr_start, lr_stop):
"""
Checks if the interaction overlaps with any existing interactions for the
same exposure group and if it falls within the long-range exposure time.
"""
interaction_start, interaction_finish = self.get_start_and_finish_time(interaction)
# Check if the SR interaction is within the LR exposure time
if lr_start <= interaction_start <= lr_stop and lr_start <= interaction_finish <= lr_stop:
for existing in existing_interactions:
existing_start, existing_finish = self.get_start_and_finish_time(existing)
# Check for overlap
if interaction_start < existing_finish and existing_start < interaction_finish:
raise ValueError(
f'Overlap detected for "short-range interaction": New interaction '
f'"{interaction}" overlaps with existing interaction "{existing}".'
)
# Return True if interaction falls within the current long-range interval
return True
return False
def check_overlap(self, interaction, existing_interactions):
"""
Checks if the dynamic entry overlaps with any already existing interaction.
"""
interaction_start = time_string_to_minutes(interaction["start_time"])/60
interaction_finish = time_string_to_minutes(interaction["finish_time"])/60
for existing in existing_interactions:
existing_start = time_string_to_minutes(existing["start_time"])/60
existing_finish = time_string_to_minutes(existing["finish_time"])/60
# Check for overlap
if (interaction_start < existing_finish and existing_start < interaction_finish):
raise ValueError(
f'Overlap detected: New interaction '
f'"{interaction}" overlaps with existing interaction "{existing}".'
)
return
def validate_population_parameters(self):
"""Validates required parameters for dynamic inputs"""
# Static occupancy is defined.
if self.occupancy_format == 'static':
# Validate number of infected <= number of total people
@ -172,34 +363,21 @@ class FormData:
f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
# Dynamic occupancy is defined.
elif self.occupancy_format == 'dynamic':
for dynamic_format in (self.dynamic_infected_occupancy, self.dynamic_exposed_occupancy):
for occupancy in dynamic_format:
# Check if each occupancy entry is a dictionary
if not isinstance(occupancy, typing.Dict):
raise TypeError(f'Each occupancy entry should be in a dictionary format. Got "{type(occupancy)}".')
# Check for required keys in each occupancy entry
dict_keys = list(occupancy.keys())
if "total_people" not in dict_keys:
raise TypeError(f'Unable to fetch "total_people" key. Got "{dict_keys}".')
else:
value = occupancy["total_people"]
# Check if the value is a non-negative integer
if not isinstance(value, int):
raise ValueError(f'Total number of people should be integer. Got "{type(value)}".')
elif not value >= 0:
raise ValueError(f'Total number of people should be non-negative. Got "{value}".')
if "start_time" not in dict_keys:
raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys}".')
if "finish_time" not in dict_keys:
raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys}".')
# Validate time format for start_time and finish_time
for time_key in ["start_time", "finish_time"]:
time = occupancy[time_key]
if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time):
raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".')
# Validate dynamic input format
self.validate_dynamic_input(self.dynamic_infected_occupancy, "dynamic_infected_occupancy")
# Check if dynamic exposed input is a dict
if isinstance(self.dynamic_exposed_occupancy, dict):
# Check if the dict is not empty
if not self.dynamic_exposed_occupancy:
raise ValueError(f'The input "dynamic_exposed_occupancy" should be a valid, non-empty dict. Got "{self.dynamic_exposed_occupancy}".')
# The key is the actual identifier
for key, group in self.dynamic_exposed_occupancy.items():
# For each group, validate dynamic exposed input format
self.validate_dynamic_exposed_format(group, key)
# ...as well as validate dynamic exposed presence data
self.validate_dynamic_input(group['presence'], "dynamic_exposed_occupancy", key)
else:
raise TypeError(f'The input "dynamic_exposed_occupancy" should be a valid, non-empty dict. Got "{type(self.dynamic_exposed_occupancy)}".')
else:
raise ValueError(f"'{self.occupancy_format}' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.")
@ -208,6 +386,73 @@ class FormData:
def build_model(self, sample_size: typing.Optional[int] = None):
raise NotImplementedError("Subclass must implement")
def population_present_changes(self, population_list: typing.List[models.Interval]) -> typing.List[float]:
"""
Returns a sorted list of unique state changes on
a population list.
"""
state_change_times = set(population_list[0].transition_times())
for population in population_list:
state_change_times.update(population.transition_times())
return sorted(state_change_times)
def build_CO2_piecewise(self):
"""
Builds a simple IntPiecewiseConstant for the different
population groups that are defined.
"""
if self.occupancy_format == 'dynamic':
infected_occupancy = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy)
total_models = [models.SimplePopulation(
number=infected_occupancy, # IntPiecewiseConstant
presence=None,
activity=None, # type: ignore
)]
# For all state changes
total_presence = [infected_occupancy.interval()]
for exposure_group in self.dynamic_exposed_occupancy.values():
exposed_occupancy = exposure_group['total_people']
exposed_presence = self.generate_dynamic_occupancy(exposure_group['presence'])
total_models.append(models.SimplePopulation(
number=exposed_occupancy, # int
presence=exposed_presence, # SpecificInterval
activity=None, # type: ignore
))
# For all state changes
total_presence.append(exposed_presence)
elif self.occupancy_format == 'static':
infected_people = self.infected_people
exposed_people = self.total_people - self.infected_people
infected_presence = self.infected_present_interval()
exposed_presence = self.exposed_present_interval()
infected_population = models.SimplePopulation(
number=infected_people,
presence=infected_presence,
activity=None, # type: ignore
)
exposed_population=models.SimplePopulation(
number=exposed_people,
presence=exposed_presence,
activity=None, # type: ignore
)
total_presence = [infected_presence, exposed_presence]
total_models = [infected_population, exposed_population]
# Get all state change times from combined populations
all_state_changes=self.population_present_changes(total_presence)
# Compute total people at each state change
total_people = []
for _, stop in zip(all_state_changes[:-1], all_state_changes[1:]):
total_people_in_group = sum(population.people_present(stop) for population in total_models)
total_people.append(total_people_in_group)
return models.IntPiecewiseConstant(transition_times=tuple(all_state_changes), values=tuple(total_people))
def _compute_breaks_in_interval(self, start, finish, n_breaks, duration) -> models.BoundarySequence_t:
break_delay = ((finish - start) -
@ -419,25 +664,64 @@ class FormData:
)
def generate_dynamic_occupancy(self, dynamic_occupancy: typing.List[typing.Dict[str, typing.Any]]):
transition_times = []
values = []
for occupancy in dynamic_occupancy:
start_time = time_string_to_minutes(occupancy['start_time'])/60
finish_time = time_string_to_minutes(occupancy['finish_time'])/60
transition_times.extend([start_time, finish_time])
values.append(occupancy['total_people'])
"""
Creates a model to represent occupancy over time. If `total_people` is
provided in the input, the method generates an `IntPiecewiseConstant` model, representing
occupancy as a piecewise constant function with defined values for each interval.
Otherwise, it generates a `SpecificInterval` model, which only tracks the presence of
intervals without associating values to them.
"""
unique_transition_times_sorted = np.array(sorted(set(transition_times)))
# Initialize variables
if 'total_people' in dynamic_occupancy[0]: # build IntPiecewiseConstant
computePiecewiseConstant = True
transition_times = []
values = []
else:
computePiecewiseConstant = False
present_times = [] # build SpecificInterval
if len(values) != len(unique_transition_times_sorted) - 1:
raise ValueError("Cannot compute dynamic occupancy with the provided inputs.")
population_occupancy: models.IntPiecewiseConstant = models.IntPiecewiseConstant(
transition_times=tuple(unique_transition_times_sorted),
values=tuple(values)
# Sort occupancy entries by start_time to ensure proper ordering
dynamic_occupancy_sorted = sorted(
dynamic_occupancy, key=lambda x: time_string_to_minutes(x['start_time'])
)
return population_occupancy
last_finish_time = None
for occupancy in dynamic_occupancy_sorted:
start_time = time_string_to_minutes(occupancy['start_time']) / 60
finish_time = time_string_to_minutes(occupancy['finish_time']) / 60
if computePiecewiseConstant:
total_people = occupancy['total_people']
# Fill in gap with a zero occupancy if there is a time gap
if last_finish_time is not None and start_time > last_finish_time:
transition_times.append(last_finish_time)
values.append(0) # Add zero for the gap period
# Update lists with current occupancy entry
transition_times.extend([start_time, finish_time])
values.append(total_people)
# Update the last known finish time
last_finish_time = finish_time
else:
present_times.append((start_time, finish_time))
# When computing the total people, validate that we have enough values to compute the occupancy
if computePiecewiseConstant:
unique_transition_times_sorted = np.array(sorted(set(transition_times)))
if (len(values) != len(unique_transition_times_sorted) - 1):
raise ValueError("Cannot compute dynamic occupancy with the provided inputs.")
else:
return models.IntPiecewiseConstant(
transition_times=tuple(unique_transition_times_sorted),
values=tuple(values)
)
else:
return models.SpecificInterval(tuple(present_times))
def _hours2timestring(hours: float):
# Convert times like 14.5 to strings, like "14:30"
@ -450,6 +734,8 @@ def time_string_to_minutes(time: str) -> minutes_since_midnight:
:param time: A string of the form "HH:MM" representing a time of day
:return: The number of minutes between 'time' and 00:00
"""
if not (0 <= int(time[:2]) <= 23) or not (0 <= int(time[3:]) <= 59):
raise ValueError(f"Wrong time format. Got {time}")
return minutes_since_midnight(60 * int(time[:2]) + int(time[3:]))

View file

@ -4,6 +4,7 @@ import logging
import typing
import re
from collections import defaultdict
import numpy as np
from caimira import __version__ as calculator_version
@ -11,7 +12,7 @@ from ..form_validator import FormData, cast_class_fields, time_string_to_minutes
from ..defaults import (DEFAULTS, CONFIDENCE_LEVEL_OPTIONS,
MECHANICAL_VENTILATION_TYPES, MASK_WEARING_OPTIONS, MONTH_NAMES, VACCINE_BOOSTER_TYPE, VACCINE_TYPE,
VENTILATION_TYPES, VOLUME_TYPES, WINDOWS_OPENING_REGIMES, WINDOWS_TYPES)
from ...models import models, data, monte_carlo as mc
from ...models import models, data, dataclass_utils, monte_carlo as mc
from ...models.monte_carlo.data import activity_distributions, virus_distributions, mask_distributions, short_range_distances
from ...models.monte_carlo.data import expiration_distribution, expiration_BLO_factors, expiration_distributions, short_range_expiration_distributions
@ -51,7 +52,7 @@ class VirusFormData(FormData):
room_heating_option: bool
room_number: str
sensor_in_use: str
short_range_interactions: list
short_range_interactions: dict
short_range_occupants: int
short_range_option: str
simulation_name: str
@ -73,7 +74,7 @@ class VirusFormData(FormData):
_DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS
def validate(self):
# Validate population parameters
# Validate population parameters
self.validate_population_parameters()
validation_tuples = [('activity_type', self.data_registry.population_scenario_activity.keys()),
@ -200,30 +201,31 @@ class VirusFormData(FormData):
f'The sum of all respiratory activities should be 100. Got {total_percentage}.')
# Validate number of people with short-range interactions
if self.occupancy_format == "static": max_occupants_for_sr = self.total_people - self.infected_people
else: max_occupants_for_sr = np.max(np.array([entry["total_people"] for entry in self.dynamic_exposed_occupancy]))
if self.occupancy_format == 'static':
max_occupants_for_sr = self.total_people - self.infected_people
elif self.occupancy_format == 'dynamic':
max_occupants_for_sr = 0
for key, group in self.dynamic_exposed_occupancy.items():
occupants_in_group = group['total_people']
max_occupants_for_sr = max(max_occupants_for_sr, occupants_in_group)
if self.short_range_occupants > max_occupants_for_sr:
raise ValueError(
f'The total number of occupants having short-range interactions ({self.short_range_occupants}) should be lower than the exposed population ({max_occupants_for_sr}).'
)
# Validate short-range interactions interval
if self.short_range_option == "short_range_yes":
for interaction in self.short_range_interactions:
# Check if presence is within long-range exposure
presence = self.short_range_interval(interaction)
if (self.occupancy_format == 'dynamic'):
long_range_start = min(time_string_to_minutes(self.dynamic_infected_occupancy[0]['start_time']),
time_string_to_minutes(self.dynamic_exposed_occupancy[0]['start_time']))
long_range_stop = max(time_string_to_minutes(self.dynamic_infected_occupancy[-1]['finish_time']),
time_string_to_minutes(self.dynamic_exposed_occupancy[-1]['finish_time']))
else:
long_range_start = min(self.infected_start, self.exposed_start)
long_range_stop = max(self.infected_finish, self.exposed_finish)
if not (long_range_start/60 <= presence.present_times[0][0] <= long_range_stop/60 and
long_range_start/60 <= presence.present_times[0][-1] <= long_range_stop/60):
raise ValueError(f"Short-range interactions should be defined during simulation time. Got {interaction}")
if isinstance(self.short_range_interactions, dict):
# Check if occupancy format is static, there should be one key-value only in short_range_interactions
if self.occupancy_format == "static" and len(self.short_range_interactions) > 1:
raise ValueError(
'When occupancy format is "static", there should be only one interaction group in "short_range_interactions".'
)
# The key is the actual identifier
for key, group in self.short_range_interactions.items():
self.validate_dynamic_input(group, "short_range_interactions", key)
def initialize_room(self) -> models.Room:
# Initializes room with volume either given directly or as product of area and height
if self.volume_type == 'room_volume_explicit':
@ -243,69 +245,99 @@ class VirusFormData(FormData):
return models.Room(volume=volume, inside_temp=models.PiecewiseConstant((0, 24), (inside_temp,)), humidity=humidity) # type: ignore
def build_mc_model(self) -> mc.ExposureModel:
room = self.initialize_room()
def build_mc_model(self) -> typing.Union[mc.ExposureModel, mc.ExposureModelGroup]:
size = self.data_registry.monte_carlo['sample_size']
room: models.Room = self.initialize_room()
ventilation: models._VentilationBase = self.ventilation()
infected_population: models.InfectedPopulation = self.infected_population()
short_range = []
short_range = defaultdict(list)
if self.short_range_option == "short_range_yes":
for interaction in self.short_range_interactions:
short_range.append(mc.ShortRangeModel(
data_registry=self.data_registry,
expiration=short_range_expiration_distributions(
self.data_registry)[interaction['expiration']],
activity=infected_population.activity,
presence=self.short_range_interval(interaction),
distance=short_range_distances(self.data_registry),
))
for key, group in self.short_range_interactions.items():
for interaction in group:
expiration = short_range_expiration_distributions(self.data_registry)[interaction['expiration']]
presence = self.short_range_interval(interaction)
distances = short_range_distances(self.data_registry)
short_range[key].append(mc.ShortRangeModel(
data_registry=self.data_registry,
expiration=expiration,
activity=infected_population.activity,
presence=presence,
distance=distances,
expiration_def=interaction['expiration']
).build_model(size))
return mc.ExposureModel(
concentration_model: models.ConcentrationModel = mc.ConcentrationModel(
data_registry=self.data_registry,
concentration_model=mc.ConcentrationModel(
data_registry=self.data_registry,
room=room,
ventilation=ventilation,
infected=infected_population,
evaporation_factor=0.3,
),
short_range=tuple(short_range),
exposed=self.exposed_population(),
geographical_data=mc.Cases(
geographic_population=self.geographic_population,
geographic_cases=self.geographic_cases,
ascertainment_bias=CONFIDENCE_LEVEL_OPTIONS[self.ascertainment_bias],
),
exposed_to_short_range=self.short_range_occupants,
)
room=room,
ventilation=ventilation,
infected=infected_population,
evaporation_factor=0.3,
).build_model(size)
def build_model(self, sample_size=None) -> models.ExposureModel:
sample_size = sample_size or self.data_registry.monte_carlo['sample_size']
return self.build_mc_model().build_model(size=sample_size)
geographical_data: models.Cases = mc.Cases(
geographic_population=self.geographic_population,
geographic_cases=self.geographic_cases,
ascertainment_bias=CONFIDENCE_LEVEL_OPTIONS[self.ascertainment_bias],
).build_model(size)
if self.occupancy_format == 'dynamic':
exposure_model_set = []
for exposure_group in self.dynamic_exposed_occupancy.keys():
sr_models: typing.Tuple[models.ShortRangeModel, ...] = tuple(short_range[exposure_group])
exposed_population: mc.Population = self.exposed_population(exposure_group).build_model(size)
exposure_model = mc.ExposureModel(
data_registry=self.data_registry,
concentration_model=concentration_model,
short_range=sr_models,
exposed=exposed_population,
geographical_data=geographical_data,
exposed_to_short_range=self.short_range_occupants,
)
exposure_model_set.append(exposure_model)
if len(list(self.dynamic_exposed_occupancy.keys())) == 1:
return exposure_model_set[0]
else:
return mc.ExposureModelGroup(
data_registry=self.data_registry,
exposure_models=[individual_model.build_model(size) for individual_model in exposure_model_set]
)
elif self.occupancy_format == 'static':
exposed_population = self.exposed_population()
short_range_tuple = tuple(item for sublist in short_range.values() for item in sublist)
return mc.ExposureModel(
data_registry=self.data_registry,
concentration_model=concentration_model,
short_range=short_range_tuple,
exposed=exposed_population,
geographical_data=geographical_data,
exposed_to_short_range=self.short_range_occupants,
)
def build_model(self, sample_size=None) -> typing.Union[models.ExposureModel, models.ExposureModelGroup]:
size = self.data_registry.monte_carlo['sample_size'] if not sample_size else sample_size
return self.build_mc_model().build_model(size=size)
def build_CO2_model(self, sample_size=None) -> models.CO2ConcentrationModel:
"""
Builds a CO2 model that considers the type of
activity and data from the defined population groups.
"""
sample_size = sample_size or self.data_registry.monte_carlo['sample_size']
infected_population: models.InfectedPopulation = self.infected_population(
).build_model(sample_size)
exposed_population: models.Population = self.exposed_population().build_model(sample_size)
state_change_times = set(
infected_population.presence_interval().transition_times())
state_change_times.update(
exposed_population.presence_interval().transition_times())
transition_times = sorted(state_change_times)
total_people = [infected_population.people_present(stop) + exposed_population.people_present(stop)
for _, stop in zip(transition_times[:-1], transition_times[1:])]
if (self.activity_type == 'precise'):
activity_defn, _ = self.generate_precise_activity_expiration()
else:
activity_defn = self.data_registry.population_scenario_activity[
self.activity_type]['activity']
occupancy = self.build_CO2_piecewise()
population = mc.SimplePopulation(
number=models.IntPiecewiseConstant(transition_times=tuple(
transition_times), values=tuple(total_people)),
number=occupancy,
presence=None,
activity=activity_distributions(self.data_registry)[activity_defn],
)
@ -420,6 +452,7 @@ class VirusFormData(FormData):
# This is a minimal, always present source of ventilation, due
# to the air infiltration from the outside.
# See CERN-OPEN-2021-004, p. 12.
# type: ignore
residual_vent: float = self.data_registry.ventilation['infiltration_ventilation'] # type: ignore
infiltration_ventilation = models.AirChange(
active=always_on, air_exch=residual_vent)
@ -457,30 +490,30 @@ class VirusFormData(FormData):
return (self.precise_activity['physical_activity'], respiratory_dict)
def infected_population(self) -> mc.InfectedPopulation:
"""
Generates an InfectedPopulation class, for both static and
dynamic occupancy.
"""
# Initializes the virus
virus = virus_distributions(self.data_registry)[self.virus_type]
# Occupancy
if self.occupancy_format == 'dynamic':
if isinstance(self.dynamic_infected_occupancy, typing.List) and len(self.dynamic_infected_occupancy) > 0:
# If dynamic occupancy is defined, the generator will parse and validate the
# respective input to a format readable by the model - `IntPiecewiseConstant`.
infected_occupancy = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy)
infected_presence = None
else:
raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_infected_occupancy}".')
infected_occupancy = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy)
infected_presence = None
else:
# The number of exposed occupants is the total number of occupants
# minus the number of infected occupants.
infected_occupancy = self.infected_people
infected_presence = self.infected_present_interval()
# Activity and expiration
activity_defn = self.data_registry.population_scenario_activity[self.activity_type]['activity']
expiration_defn = self.data_registry.population_scenario_activity[self.activity_type]['expiration']
activity_defn = self.data_registry.population_scenario_activity[
self.activity_type]['activity']
expiration_defn = self.data_registry.population_scenario_activity[
self.activity_type]['expiration']
if (self.activity_type == 'smallmeeting'):
# Conversation of N people is approximately 1/N% of the time speaking.
total_people: int = max(infected_occupancy.values) if self.occupancy_format == 'dynamic' else self.total_people
total_people: int = max(
infected_occupancy.values) if self.occupancy_format == 'dynamic' else self.total_people
expiration_defn = {'Speaking': 1, 'Breathing': total_people - 1}
elif (self.activity_type == 'precise'):
activity_defn, expiration_defn = self.generate_precise_activity_expiration()
@ -496,31 +529,36 @@ class VirusFormData(FormData):
mask=self.mask(),
activity=activity,
expiration=expiration,
# Vaccination status does not affect the infected population (for now)
# Vaccination status does not affect the infected population (for the time being)
host_immunity=0.,
)
return infected
def exposed_population(self) -> mc.Population:
activity_defn = (self.precise_activity['physical_activity']
if self.activity_type == 'precise'
else str(self.data_registry.population_scenario_activity[self.activity_type]['activity']))
activity = activity_distributions(self.data_registry)[activity_defn]
def exposed_population(self, exposure_group: typing.Optional[str] = None) -> mc.Population:
"""
Generates an exposed Population class, for both static and
dynamic occupancy. The number of people is constant for a
single group of exposed population, except when breaks are defined.
"""
# Occupancy
if self.occupancy_format == 'dynamic':
if isinstance(self.dynamic_exposed_occupancy, typing.List) and len(self.dynamic_exposed_occupancy) > 0:
# If dynamic occupancy is defined, the generator will parse and validate the
# respective input to a format readable by the model - IntPiecewiseConstant.
exposed_occupancy = self.generate_dynamic_occupancy(self.dynamic_exposed_occupancy)
exposed_presence = None
else:
raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_exposed_occupancy}".')
dynamic_group = self.dynamic_exposed_occupancy[exposure_group]
exposed_occupancy = dynamic_group['total_people']
exposed_presence = self.generate_dynamic_occupancy(dynamic_group['presence'])
else:
# The number of exposed occupants is the total number of occupants
# minus the number of infected occupants.
exposed_occupancy = self.total_people - self.infected_people
exposed_presence = self.exposed_present_interval()
# Activity
activity_defn = (self.precise_activity['physical_activity']
if self.activity_type == 'precise'
else str(self.data_registry.population_scenario_activity[self.activity_type]['activity']))
activity = activity_distributions(self.data_registry)[activity_defn]
# Vaccination
if (self.vaccine_option):
if (self.vaccine_booster_option and self.vaccine_booster_type != 'Other'):
host_immunity = [vaccine['VE'] for vaccine in data.vaccine_booster_host_immunity if
@ -569,7 +607,7 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]:
'calculator_version': calculator_version,
'ceiling_height': '',
'conditional_probability_viral_loads': '0',
'dynamic_exposed_occupancy': '[]',
'dynamic_exposed_occupancy': '{}',
'dynamic_infected_occupancy': '[]',
'event_month': 'January',
'exposed_coffee_break_option': 'coffee_break_4',
@ -606,7 +644,7 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]:
'room_heating_option': '0',
'room_number': '123',
'room_volume': '75',
'short_range_interactions': '[]',
'short_range_interactions': '{}',
'short_range_option': 'short_range_no',
'simulation_name': 'Test',
'total_people': '10',

View file

@ -598,43 +598,270 @@ def test_form_timezone(baseline_form_data, data_registry, longitude, latitude, m
['random', "'random' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",]
]
)
def test_dynamic_format_input(occupancy_format_input, error, baseline_form: virus_validator.VirusFormData):
def test_occupancy_format_input(occupancy_format_input, error, baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = occupancy_format_input
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
def test_dynamic_input_format_ValueError(baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = "dynamic"
# Check empty list for infected occupancy
baseline_form.dynamic_infected_occupancy = []
error = 'The input "dynamic_infected_occupancy" should be a valid, non-empty list. Got "[]".'
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
# Check empty dict for exposed occupancy
baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}]
baseline_form.dynamic_exposed_occupancy = {}
error = 'The input "dynamic_exposed_occupancy" should be a valid, non-empty dict. Got "{}".'
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
@pytest.mark.parametrize(
["dynamic_occupancy_input", "error"],
[
[[["total_people", 10, "start_time", "10:00", "finish_time", "11:00"]], "Each occupancy entry should be in a dictionary format. Got \"<class 'list'>\"."],
[[{"tal_people": 10, "start_time": "10:00", "finish_time": "11:00"}], "Unable to fetch \"total_people\" key. Got \"['tal_people', 'start_time', 'finish_time']\"."],
[[{"total_people": 10, "art_time": "10:00", "finish_time": "11:00"}], "Unable to fetch \"start_time\" key. Got \"['total_people', 'art_time', 'finish_time']\"."],
[[{"total_people": 10, "start_time": "10:00", "ish_time": "11:00"}], "Unable to fetch \"finish_time\" key. Got \"['total_people', 'start_time', 'ish_time']\"."],
[[{"total_people": 10, "start_time": "10", "finish_time": "11:00"}], "Wrong time format - \"HH:MM\". Got \"10\"."],
[[{"total_people": 10, "start_time": "10:00", "finish_time": "11"}], "Wrong time format - \"HH:MM\". Got \"11\"."],
["exposed_format", "error"],
[
[
{"tal_people": 10, "presence": [{"start_time": "10:00", "finish_time": "11:00"}],},
'Missing "total_people" key in "dynamic_exposed_occupancy" group "group_1". Got keys: "[\'tal_people\', \'presence\']".'
],
[
{"total_people": 10, "pesence": [{"start_time": "10:00", "finish_time": "11:00"}],},
'Missing "presence" key in "dynamic_exposed_occupancy" group "group_1". Got keys: "[\'total_people\', \'pesence\']".'
],
]
)
def test_dynamic_occupancy_structure(dynamic_occupancy_input, error, baseline_form: virus_validator.VirusFormData):
def test_dynamic_exposed_format_TypeError(exposed_format, error, baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = "dynamic"
baseline_form.dynamic_infected_occupancy = dynamic_occupancy_input
baseline_form.dynamic_exposed_occupancy = dynamic_occupancy_input
baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}]
baseline_form.dynamic_exposed_occupancy = {"group_1": exposed_format}
with pytest.raises(TypeError, match=re.escape(error)):
baseline_form.validate()
@pytest.mark.parametrize(
["dynamic_occupancy_input", "error"],
[
[[{"total_people": "10", "start_time": "10:00", "finish_time": "11:00"}], "Total number of people should be integer. Got \"<class 'str'>\"."],
[[{"total_people": 9.8, "start_time": "10:00", "finish_time": "11:00"}], "Total number of people should be integer. Got \"<class 'float'>\"."],
[[{"total_people": [10], "start_time": "10:00", "finish_time": "11:00"}], "Total number of people should be integer. Got \"<class 'list'>\"."],
[[{"total_people": -1, "start_time": "10:00", "finish_time": "11:00"}], "Total number of people should be non-negative. Got \"-1\"."],
["exposed_presence", "error"],
[
[[["start_time", "10:00", "finish_time", "11:00"]], 'Each entry in "dynamic_exposed_occupancy" should be a dictionary. Got "<class \'list\'>".'],
[[{"art_time": "10:00", "finish_time": "11:00"}], 'Missing "start_time" key in "dynamic_exposed_occupancy" in "group_1" ("presence"). Got keys: "[\'art_time\', \'finish_time\']".'],
[[{"start_time": "10:00", "ish_time": "11:00"}], 'Missing "finish_time" key in "dynamic_exposed_occupancy" in "group_1" ("presence"). Got keys: "[\'start_time\', \'ish_time\']".'],
]
)
def test_dynamic_occupancy_total_people(dynamic_occupancy_input, error, baseline_form: virus_validator.VirusFormData):
def test_dynamic_exposed_presence_TypeError(exposed_presence, error, baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = "dynamic"
baseline_form.dynamic_infected_occupancy = dynamic_occupancy_input
baseline_form.dynamic_exposed_occupancy = dynamic_occupancy_input
baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}]
baseline_form.dynamic_exposed_occupancy = {
"group_1": {
"total_people": 10,
"presence": exposed_presence,
}
}
with pytest.raises(TypeError, match=re.escape(error)):
baseline_form.validate()
@pytest.mark.parametrize(
["exposed_presence", "error"],
[
[[{"start_time": "10", "finish_time": "11:00"}], 'Invalid time format for "start_time" in "dynamic_exposed_occupancy" in "group_1" ("presence"). Expected "HH:MM". Got "10".'],
[[{"start_time": "10:00", "finish_time": "11"}], 'Invalid time format for "finish_time" in "dynamic_exposed_occupancy" in "group_1" ("presence"). Expected "HH:MM". Got "11".'],
]
)
def test_dynamic_exposed_presence_ValueError(exposed_presence, error, baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = "dynamic"
baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}]
baseline_form.dynamic_exposed_occupancy = {
"group_1": {
"total_people": 10,
"presence": exposed_presence
}
}
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
@pytest.mark.parametrize(
["exposed_total_people", "error"],
[
["10", 'The "total_people" in "dynamic_exposed_occupancy" group "group_1" should be a non-negative integer. Got "10".'],
[9.8, 'The "total_people" in "dynamic_exposed_occupancy" group "group_1" should be a non-negative integer. Got "9.8".'],
[[10], 'The "total_people" in "dynamic_exposed_occupancy" group "group_1" should be a non-negative integer. Got "[10]".'],
[-1, 'The "total_people" in "dynamic_exposed_occupancy" group "group_1" should be a non-negative integer. Got "-1".'],
]
)
def test_dynamic_exposed_total_people_ValueError(exposed_total_people, error, baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = "dynamic"
baseline_form.dynamic_infected_occupancy = [
{"total_people": 1, "start_time": "08:00", "finish_time": "10:00"},
{"total_people": 2, "start_time": "10:00", "finish_time": "18:00"},
]
baseline_form.dynamic_exposed_occupancy = {
"group_1": {
"total_people": exposed_total_people,
"presence": [{"start_time": "08:00", "finish_time": "18:00"},],
},
}
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
def test_dynamic_infected_overlap(baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = "dynamic"
baseline_form.dynamic_infected_occupancy = [
{"total_people": 10, "start_time": "08:00", "finish_time": "18:00"},
{"total_people": 10, "start_time": "10:00", "finish_time": "18:00"},
]
baseline_form.dynamic_exposed_occupancy = {
"group_1": {
"total_people": 10,
"presence": [{"start_time": "08:00", "finish_time": "18:00"}],
},
"group_2": {
"total_people": 10,
"presence": [{"start_time": "10:00", "finish_time": "11:00"}],
},
"group_3": {
"total_people": 10,
"presence": [{"start_time": "15:00", "finish_time": "18:00"}],
},
}
error = (
'Overlap detected: New interaction '
'"{\'total_people\': 10, \'start_time\': \'10:00\', \'finish_time\': \'18:00\'}" '
'overlaps with existing interaction '
'"{\'total_people\': 10, \'start_time\': \'08:00\', \'finish_time\': \'18:00\'}".'
)
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
def test_dynamic_exposure_group_duplication(baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = "dynamic"
baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}]
baseline_form.dynamic_exposed_occupancy = {
"group_1": {
"total_people": 10,
"presence": [
{"start_time": "08:00", "finish_time": "17:00"},
{"start_time": "13:00", "finish_time": "14:00"},
],
},
}
error = (
'Overlap detected: New interaction '
'"{\'start_time\': \'13:00\', \'finish_time\': \'14:00\'}"'
' overlaps with existing interaction '
'"{\'start_time\': \'08:00\', \'finish_time\': \'17:00\'}".'
)
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
@pytest.mark.parametrize(
["short_range_input", "error"],
[
[[["expiration", "Shouting", "start_time", "09:00", "duration", 30]], 'Each entry in "short_range_interactions" should be a dictionary. Got "<class \'list\'>".'],
[[{"expiratio": "Shouting", "start_time": "09:00", "duration": 30}], 'Missing "expiration" key in "short_range_interactions", "group_1". Got keys: "[\'expiratio\', \'start_time\', \'duration\']".'],
[[{"expiration": "Shouting", "start_tim": "09:00", "duration": 30}], 'Missing "start_time" key in "short_range_interactions", "group_1". Got keys: "[\'expiration\', \'start_tim\', \'duration\']".'],
[[{"expiration": "Shouting", "start_time": "09:00", "duratio": 30}], 'Missing "duration" key in "short_range_interactions", "group_1". Got keys: "[\'expiration\', \'start_time\', \'duratio\']".'],
]
)
def test_short_range_TypeError(short_range_input, error, baseline_form: virus_validator.VirusFormData):
baseline_form.short_range_option = "short_range_yes"
baseline_form.short_range_interactions = {"group_1": short_range_input}
with pytest.raises(TypeError, match=re.escape(error)):
baseline_form.validate()
def test_short_range_exposure_group(baseline_form: virus_validator.VirusFormData):
baseline_form.occupancy_format = 'dynamic'
baseline_form.dynamic_infected_occupancy = [{"total_people": 10, "start_time": "10:00", "finish_time": "11:00"}]
baseline_form.dynamic_exposed_occupancy = {
"group_1": {
"total_people": 10,
"presence": [
{"start_time": "10:00", "finish_time": "12:00"},
{"start_time": "13:00", "finish_time": "17:00"},
],
},
"group_2": {
"total_people": 10,
"presence": [
{"start_time": "10:00", "finish_time": "11:00"},
],
},
}
# Check for existence of the dictionary key
baseline_form.short_range_option = 'short_range_yes'
baseline_form.short_range_interactions = {
"group_4": [{"expiration": "Shouting", "start_time": "10:30", "duration": 30}],
}
error = 'Exposure group "group_4" in short-range interaction not found in dynamic exposed occupancy.'
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
# Check if interaction time is within simulation time
baseline_form.short_range_interactions = {
"group_1": [{"expiration": "Shouting", "start_time": "18:00", "duration": 30}],
}
error = (
'Short-range interaction "{\'expiration\': \'Shouting\', \'start_time\': \'18:00\', \'duration\': 30}"'
' does not fall within any long-range exposure interval in "group_1".'
)
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
@pytest.mark.parametrize(
["short_range_input", "error"],
[
[[{"expiration": "Shouting", "start_time": "9", "duration": 30}], 'Invalid time format for "start_time" in "short_range_interactions", "group_1". Expected "HH:MM". Got "9".'],
[[{"expiration": "Whisper", "start_time": "09:00", "duration": 30}], 'The "expiration" in "short_range_interactions", "group_1", does not exist in the registry. Got "Whisper".'],
[[{"expiration": "Shouting", "start_time": "09:00", "duration": -30}], 'The "duration" in "short_range_interactions", "group_1", should be a non-negative integer. Got "-30".'],
]
)
def test_short_range_value_error(short_range_input, error, baseline_form: virus_validator.VirusFormData):
baseline_form.short_range_option = "short_range_yes"
baseline_form.short_range_interactions = {"group_1": short_range_input}
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
def test_short_range_with_static_occupancy(baseline_form: virus_validator.VirusFormData):
# By default the occupancy format is 'static'
baseline_form.short_range_option = "short_range_yes"
baseline_form.short_range_interactions = {"group_1": [{"expiration": "Shouting", "start_time": "07:00", "duration": 30}]}
# Check if interaction is defined during simulation time
error = (
'Short-range interactions should be defined during simulation time. Got '
'"{\'expiration\': \'Shouting\', \'start_time\': \'07:00\', \'duration\': 30}".'
)
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
# Check overlap of interactions
baseline_form.short_range_interactions = {
"group_1": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30},
{"expiration": "Shouting", "start_time": "10:10", "duration": 15}],
}
error = (
'Overlap detected for "short-range interaction": New interaction '
'"{\'expiration\': \'Shouting\', \'start_time\': \'10:10\', \'duration\': 15}"'
' overlaps with existing interaction "{\'expiration\': \'Shouting\', \'start_time\': \'10:00\', \'duration\': 30}".'
)
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()
# Check if more than one group is defined
baseline_form.short_range_interactions = {
"group_1": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30},
{"expiration": "Shouting", "start_time": "10:10", "duration": 15}],
"group_2": [{"expiration": "Shouting", "start_time": "10:00", "duration": 30}]
}
error = (
'When occupancy format is "static", there should be only one interaction group in "short_range_interactions".'
)
with pytest.raises(ValueError, match=re.escape(error)):
baseline_form.validate()

View file

@ -41,7 +41,7 @@ def full_exposure_model(data_registry):
@pytest.fixture
def baseline_infected_population_number(data_registry):
def baseline_infected_population(data_registry):
return models.InfectedPopulation(
data_registry=data_registry,
number=models.IntPiecewiseConstant(
@ -56,34 +56,15 @@ def baseline_infected_population_number(data_registry):
@pytest.fixture
def baseline_exposed_population_number():
return models.Population(
number=models.IntPiecewiseConstant(
(8, 12, 13, 17), (10, 0, 10)),
presence=None,
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Seated'],
host_immunity=0.,
)
@pytest.fixture
def dynamic_infected_single_exposure_model(full_exposure_model, baseline_infected_population_number):
def dynamic_infected_single_exposure_model(full_exposure_model, baseline_infected_population):
return dc_utils.nested_replace(full_exposure_model,
{'concentration_model.infected': baseline_infected_population_number, })
{'concentration_model.infected': baseline_infected_population, })
@pytest.fixture
def dynamic_exposed_single_exposure_model(full_exposure_model, baseline_exposed_population_number):
return dc_utils.nested_replace(full_exposure_model,
{'exposed': baseline_exposed_population_number, })
@pytest.fixture
def dynamic_population_exposure_model(full_exposure_model, baseline_infected_population_number ,baseline_exposed_population_number):
def dynamic_population_exposure_model(full_exposure_model, baseline_infected_population):
return dc_utils.nested_replace(full_exposure_model, {
'concentration_model.infected': baseline_infected_population_number,
'exposed': baseline_exposed_population_number,
'concentration_model.infected': baseline_infected_population,
})
@ -92,10 +73,10 @@ def dynamic_population_exposure_model(full_exposure_model, baseline_infected_pop
[4., 8., 10., 12., 13., 14., 16., 20., 24.],
)
def test_population_number(full_exposure_model: models.ExposureModel,
baseline_infected_population_number: models.InfectedPopulation, time: float):
baseline_infected_population: models.InfectedPopulation, time: float):
int_population_number: models.InfectedPopulation = full_exposure_model.concentration_model.infected
piecewise_population_number: models.InfectedPopulation = baseline_infected_population_number
piecewise_population_number: models.InfectedPopulation = baseline_infected_population
with pytest.raises(
TypeError,
@ -206,58 +187,47 @@ def test_dynamic_dose(data_registry, full_exposure_model: models.ExposureModel,
def test_infection_probability(
full_exposure_model: models.ExposureModel,
dynamic_infected_single_exposure_model: models.ExposureModel,
dynamic_exposed_single_exposure_model: models.ExposureModel,
dynamic_population_exposure_model: models.ExposureModel):
base_infection_probability = full_exposure_model.infection_probability()
npt.assert_almost_equal(base_infection_probability, dynamic_infected_single_exposure_model.infection_probability())
npt.assert_almost_equal(base_infection_probability, dynamic_exposed_single_exposure_model.infection_probability())
npt.assert_almost_equal(base_infection_probability, dynamic_population_exposure_model.infection_probability())
@pytest.mark.skip
def test_dynamic_total_probability_rule(
dynamic_infected_single_exposure_model: models.ExposureModel,
dynamic_exposed_single_exposure_model: models.ExposureModel,
dynamic_population_exposure_model: models.ExposureModel):
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute total probability "
"(including incidence rate) with dynamic occupancy")):
dynamic_infected_single_exposure_model.total_probability_rule()
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute total probability "
"(including incidence rate) with dynamic occupancy")):
dynamic_exposed_single_exposure_model.total_probability_rule()
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute total probability "
"(including incidence rate) with dynamic occupancy")):
dynamic_population_exposure_model.total_probability_rule()
@pytest.mark.skip
def test_dynamic_expected_new_cases(
dynamic_infected_single_exposure_model: models.ExposureModel,
dynamic_exposed_single_exposure_model: models.ExposureModel,
dynamic_population_exposure_model: models.ExposureModel):
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases "
"with dynamic occupancy")):
dynamic_infected_single_exposure_model.expected_new_cases()
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases "
"with dynamic occupancy")):
dynamic_exposed_single_exposure_model.expected_new_cases()
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases "
"with dynamic occupancy")):
dynamic_population_exposure_model.expected_new_cases()
@pytest.mark.skip
def test_dynamic_reproduction_number(
dynamic_infected_single_exposure_model: models.ExposureModel,
dynamic_exposed_single_exposure_model: models.ExposureModel,
dynamic_population_exposure_model: models.ExposureModel):
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number "
"with dynamic occupancy")):
dynamic_infected_single_exposure_model.reproduction_number()
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number "
"with dynamic occupancy")):
dynamic_exposed_single_exposure_model.reproduction_number()
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number "
"with dynamic occupancy")):
dynamic_population_exposure_model.reproduction_number()

View file

@ -1,6 +1,8 @@
// Input data for CO2 fitting algorithm
const CO2_data_form = [
"CO2_data",
"dynamic_exposed_occupancy",
"dynamic_infected_occupancy",
"exposed_coffee_break_option",
"exposed_coffee_duration",
"exposed_finish",
@ -17,14 +19,12 @@ const CO2_data_form = [
"infected_lunch_option",
"infected_lunch_start",
"infected_people",
"dynamic_infected_occupancy",
"infected_start",
"occupancy_format",
"room_capacity",
"room_volume",
"specific_breaks",
"total_people",
"dynamic_exposed_occupancy",
"occupancy_format",
"total_people"
];
// Method to upload a valid data file (accepted formats: .xls and .xlsx)

View file

@ -631,22 +631,38 @@ function validate_form(form) {
}
// Generate the short-range interactions list
var short_range_interactions = [];
$(".form_field_outer_row").each(function (index, element){
let obj = {};
const $element = $(element);
obj.expiration = $element.find("[name='short_range_expiration']").val();
obj.start_time = $element.find("[name='short_range_start_time']").val();
obj.duration = $element.find("[name='short_range_duration']").val();
short_range_interactions.push(JSON.stringify(obj));
let short_range_interactions = {};
$(".form_field_outer_row").each(function (index, element) {
const $element = $(element);
let obj = {};
obj.expiration = $element.find("[name='short_range_expiration']").val();
obj.start_time = $element.find("[name='short_range_start_time']").val();
obj.duration = parseFloat($element.find("[name='short_range_duration']").val());
const exposure_group = $element.find("[name='short_range_exposure_group']").val();
// If the exposure_group key already exists, push the new obj into the array
if (short_range_interactions[exposure_group]) {
short_range_interactions[exposure_group].push(obj);
} else {
// Otherwise, create a new array with the current obj
short_range_interactions[exposure_group] = [obj];
}
});
// Sort list by time
short_range_interactions.sort(function (a, b) {
return JSON.parse(a).start_time.localeCompare(JSON.parse(b).start_time);
});
$("input[type=text][name=short_range_interactions]").val('[' + short_range_interactions + ']');
if (short_range_interactions.length == 0) {
// Sort each array within the short_range_interactions object by start_time
for (const key in short_range_interactions) {
short_range_interactions[key].sort(function (a, b) {
return a.start_time.localeCompare(b.start_time);
});
}
// Convert the entire object to a JSON string and assign it to the input field
$("input[type=text][name=short_range_interactions]").val(JSON.stringify(short_range_interactions));
// Check if there are no entries and update the radio button accordingly
if (Object.keys(short_range_interactions).length === 0) {
$("input[type=radio][id=short_range_no]").prop("checked", true);
on_short_range_option_change();
}
@ -907,18 +923,42 @@ $(document).ready(function () {
}
// Read short-range from URL
else if (name == 'short_range_interactions') {
let index = 1;
for (const interaction of JSON.parse(value)) {
$("#dialog_sr").append(inject_sr_interaction(index, value = interaction, is_validated="row_validated"))
$('#sr_expiration_no_' + String(index)).val(interaction.expiration).change();
document.getElementById('sr_expiration_no_' + String(index)).disabled = true;
document.getElementById('sr_start_no_' + String(index)).disabled = true;
document.getElementById('sr_duration_no_' + String(index)).disabled = true;
document.getElementById('edit_row_no_' + String(index)).style.cssText = 'display:inline !important';
document.getElementById('validate_row_no_' + String(index)).style.cssText = 'display: none !important';
index++;
else if (name === 'short_range_interactions') {
// Parse the JSON value from the URL
let interactions = JSON.parse(value);
let index = 1; // Initialize interaction index
// Iterate over each group in the interactions
for (const group in interactions) {
if (interactions.hasOwnProperty(group)) {
// Iterate over each interaction within the group
for (const interaction of interactions[group]) {
// Append the interaction row to the dialog
$("#dialog_sr").append(inject_sr_interaction(index, interaction, "row_validated"));
// Set the values for each input field based on the interaction
$('#sr_expiration_no_' + index).val(interaction.expiration).change();
document.getElementById('sr_start_no_' + index).value = interaction.start_time; // Set start time
document.getElementById('sr_duration_no_' + index).value = interaction.duration; // Set duration
document.getElementById('sr_group_no_' + index).value = group; // Set exposure group
// Disable the input fields for editing
document.getElementById('sr_expiration_no_' + index).disabled = true;
document.getElementById('sr_start_no_' + index).disabled = true;
document.getElementById('sr_duration_no_' + index).disabled = true;
document.getElementById('sr_group_no_' + index).disabled = true;
// Update visibility of editing and validation rows
document.getElementById('edit_row_no_' + index).style.display = 'inline';
document.getElementById('validate_row_no_' + index).style.display = 'none';
// Increment the index for the next interaction
index++;
}
}
}
// Update the total count of interactions displayed
$("#sr_interactions").text(index - 1);
}
@ -1196,6 +1236,11 @@ $(document).ready(function () {
<div class="col-sm-6"><input type="number" id="sr_duration_no_${index}" value="${value.duration}" class="form-control form-control-sm short_range_option" name="short_range_duration" min=1 placeholder="Minutes" onchange="validate_sr_time(this)" form="not-submitted"><br></div>
</div>
<div class='form-group row d-none'>
<div class="col-sm-6"><label class="col-form-label col-form-label-sm"> Exposure group:</label></div>
<div class="col-sm-6"><input type="text" id="sr_group_no_${index}" value="${value.exposure_group}" class="form-control form-control-sm short_range_option" name="short_range_exposure_group" placeholder="A" onchange="validate_sr_time(this)" form="not-submitted"><br></div>
</div>
<div class="form-group" style="max-width: 8rem">
<button type="button" id="edit_row_no_${index}" class="edit_node_btn_frm_field btn btn-success btn-sm d-none">Edit</button>
<button type="button" id="validate_row_no_${index}" class="validate_node_btn_frm_field btn btn-success btn-sm">Save</button>
@ -1213,11 +1258,11 @@ $(document).ready(function () {
// When short_range_yes option is selected, we want to inject rows for each expiractory activity, start_time and duration.
$("body").on("click", ".add_node_btn_frm_field", function(e) {
let last_row = $(".form_field_outer").find(".form_field_outer_row");
if (last_row.length == 0) $("#dialog_sr").append(inject_sr_interaction(1, value = { activity: "", start_time: "", duration: "15" }));
if (last_row.length == 0) $("#dialog_sr").append(inject_sr_interaction(1, value = { activity: "", start_time: "", duration: "15", exposure_group: "A" }));
else {
last_index = last_row.last().find(".short_range_option").prop("id").split("_").slice(-1)[0];
index = parseInt(last_index) + 1;
$("#dialog_sr").append(inject_sr_interaction(index, value = { activity: "", start_time: "", duration: "15" }));
$("#dialog_sr").append(inject_sr_interaction(index, value = { activity: "", start_time: "", duration: "15", exposure_group: "A"}));
}
});

View file

@ -10,7 +10,7 @@ function draw_plot(svg_id) {
let button_full_exposure = document.getElementById("button_full_exposure");
let button_hide_high_concentration = document.getElementById("button_hide_high_concentration");
let long_range_checkbox = document.getElementById('long_range_cumulative_checkbox')
let show_sr_legend = short_range_expirations.length > 0;
let show_sr_legend = short_range_expirations?.length > 0;
var data_for_graphs = {
'concentrations': [],
@ -192,7 +192,7 @@ function draw_plot(svg_id) {
// Area representing the short-range interaction(s).
var shortRangeArea = {};
var drawShortRangeArea = {};
short_range_intervals.forEach((b, index) => {
short_range_intervals?.forEach((b, index) => {
shortRangeArea[index] = d3.area();
drawShortRangeArea[index] = draw_area.append('svg:path');

View file

@ -455,8 +455,8 @@
</div><br>
<input type="text" class="form-control d-none" name="occupancy_format" value="static" required> {# "static" vs. "dynamic" #}
<input type="text" class="form-control d-none" name="dynamic_exposed_occupancy" value="[]">
<input type="text" class="form-control d-none" name="dynamic_infected_occupancy" value="[]">
<input type="text" class="form-control d-none" name="dynamic_exposed_occupancy">
<input type="text" class="form-control d-none" name="dynamic_infected_occupancy">
<div class="form-group row">
<div class="col-sm-4"><label class="col-form-label">Total number of occupants:</label></div>

View file

@ -104,7 +104,7 @@
</div>
{% endblock long_range_warning_animation %}
</div>
{% if form.occupancy_format == "static" %}<h6><b>Expected new cases:</b> {{ long_range_expected_cases | float_format }}</h6>{% endif %}
<h6><b>Expected new cases:</b> {{ long_range_expected_cases | float_format }}</h6>
</div>
<br>
{% if form.short_range_option == "short_range_yes" %}
@ -126,9 +126,7 @@
</div>
{% endblock warning_animation %}
</div>
{% if form.occupancy_format == "static" %}
<h6><b>Expected new cases:</b> {{ expected_new_cases | float_format }}</h6>
{% endif %}
<h6><b>Expected new cases:</b> {{ expected_new_cases | float_format }}</h6>
</div>
{% endif %}
<div class="d-flex">
@ -136,17 +134,13 @@
<div class="flex-row align-self-center">
<div class="align-self-center alert alert-dark mb-0" role="alert">
Taking into account the uncertainties tied to the model variables, in this scenario and assuming all occupants are exposed equally (i.e. without short-range interactions), the <b>probability of one exposed occupant getting infected is {{ long_range_prob_inf | non_zero_percentage }}</b>
{% if form.occupancy_format == "static" %}
and the <b>expected number of new cases is {{ long_range_expected_cases | float_format }}</b>
{% endif %}*.
and the <b>expected number of new cases is {{ long_range_expected_cases | float_format }}</b>*.
</div>
{% if form.short_range_option == "short_range_yes" %}
<br>
<div class="align-self-center alert alert-dark mb-0" role="alert">
In this scenario, the <b>probability the occupant(s) exposed to short-range interactions get infected can go as high as {{ prob_inf | non_zero_percentage }}</b>
{% if form.occupancy_format == "static" %}
and the <b>expected number of new cases increases to {{ expected_new_cases | float_format }}</b>
{% endif %}.
and the <b>expected number of new cases increases to {{ expected_new_cases | float_format }}</b>.
</div>
{% endif %}
{% block probabilistic_exposure_probability %}
@ -636,14 +630,19 @@
{% if form.short_range_option == "short_range_yes" %}
<li><p class="data_text">Total number of occupants having short-range interactions: {{ form.short_range_occupants }}</p></li>
<ul>
{% for interaction in form.short_range_interactions %}
<li>Interaction no. {{ loop.index }}:
<ul>
<li>Expiratory activity: {{ "Shouting/Singing" if interaction.expiration == "Shouting" else interaction.expiration }} </li>
<li>Start time: {{ interaction.start_time }} </li>
<li>Duration: {{ interaction.duration }} {{ "minutes" if interaction.duration|float > 1 else "minute" }}</li>
</ul>
</li>
{% for key, interactions in form.short_range_interactions.items() %}
<li>Interactions for group "{{ key }}":</li>
<ul>
{% for interaction in interactions %}
<li>Interaction no. {{ loop.index }}:
<ul>
<li>Expiratory activity: {{ "Shouting/Singing" if interaction.expiration == "Shouting" else interaction.expiration }} </li>
<li>Start time: {{ interaction.start_time }} </li>
<li>Duration: {{ interaction.duration }} {{ "minutes" if interaction.duration|float > 1 else "minute" }}</li>
</ul>
</li>
{% endfor %}
</ul>
{% endfor %}
</ul>
{% endif %}

View file

@ -70,10 +70,8 @@
<div class="alert alert-success mb-0" role="alert">
<strong>Acceptable:</strong>
{% endif %}
Taking into account the uncertainties tied to the model variables, in this scenario and assuming all occupants are exposed equally (i.e. without short-range interactions), the <b>probability of one exposed occupant getting infected is {{long_range_prob_inf | non_zero_percentage}}</b>
{% if form.occupancy_format == "static" %}
and the <b>expected number of new cases is {{ long_range_expected_cases | float_format }}</b>
{% endif %}*.
Taking into account the uncertainties tied to the model variables, in this scenario and assuming all occupants are exposed equally (i.e. without short-range interactions), the <b>probability of one exposed occupant getting infected is {{long_range_prob_inf | non_zero_percentage}}</b>
and the <b>expected number of new cases is {{ long_range_expected_cases | float_format }}</b>*.
</div>
{% if form.short_range_option == "short_range_yes" %}
<br>
@ -88,9 +86,7 @@
<strong>Acceptable:</strong>
{% endif %}
In this scenario, the <b>probability the occupant(s) exposed to short-range interactions get infected can go as high as {{ prob_inf | non_zero_percentage }}</b>
{% if form.occupancy_format == "static" %}
and the <b>expected number of new cases increases to {{ expected_new_cases | float_format }}</b>
{% endif %}.
and the <b>expected number of new cases increases to {{ expected_new_cases | float_format }}</b>
</div>
{% endif %}

View file

@ -90,6 +90,6 @@ def exposure_model_w_outside_temp_changes(data_registry, baseline_exposure_model
def baseline_form_with_sr(baseline_form_data, data_registry):
form_data_sr = baseline_form_data
form_data_sr['short_range_option'] = 'short_range_yes'
form_data_sr['short_range_interactions'] = '[{"expiration": "Shouting", "start_time": "10:30", "duration": "30"}]'
form_data_sr['short_range_interactions'] = '{"group_1": [{"expiration": "Shouting", "start_time": "10:30", "duration": 30}]}'
form_data_sr['short_range_occupants'] = 5
return virus_validator.VirusFormData.from_dict(form_data_sr, data_registry)

View file

@ -123,62 +123,3 @@ def test_expected_new_cases(baseline_form_with_sr: VirusFormData):
lr_expected_new_cases = alternative_statistics['stats']['Base scenario without short-range interactions']['expected_new_cases']
np.testing.assert_almost_equal(sr_lr_expected_new_cases, lr_expected_new_cases + sr_lr_prob_inf * baseline_form_with_sr.short_range_occupants, 2)
def test_static_vs_dynamic_occupancy_from_form(baseline_form_data, data_registry):
"""
Assert that the results between a static and dynamic occupancy model (from form inputs) are similar.
"""
executor_factory = partial(
concurrent.futures.ThreadPoolExecutor, 1,
)
# By default the baseline form accepts static occupancy
static_occupancy_baseline_form: VirusFormData = VirusFormData.from_dict(baseline_form_data, data_registry)
static_occupancy_model = static_occupancy_baseline_form.build_model()
static_occupancy_report_data = rep_gen.calculate_report_data(static_occupancy_baseline_form, executor_factory)
# Update the initial form data to include dynamic occupancy (please note the 4 coffee and 1 lunch breaks)
baseline_form_data['occupancy_format'] = 'dynamic'
baseline_form_data['dynamic_infected_occupancy'] = json.dumps([
{'total_people': 1, 'start_time': '09:00', 'finish_time': '10:03'},
{'total_people': 0, 'start_time': '10:03', 'finish_time': '10:13'},
{'total_people': 1, 'start_time': '10:13', 'finish_time': '11:16'},
{'total_people': 0, 'start_time': '11:16', 'finish_time': '11:26'},
{'total_people': 1, 'start_time': '11:26', 'finish_time': '12:30'},
{'total_people': 0, 'start_time': '12:30', 'finish_time': '13:30'},
{'total_people': 1, 'start_time': '13:30', 'finish_time': '14:53'},
{'total_people': 0, 'start_time': '14:53', 'finish_time': '15:03'},
{'total_people': 1, 'start_time': '15:03', 'finish_time': '16:26'},
{'total_people': 0, 'start_time': '16:26', 'finish_time': '16:36'},
{'total_people': 1, 'start_time': '16:36', 'finish_time': '18:00'},
])
baseline_form_data['dynamic_exposed_occupancy'] = json.dumps([
{'total_people': 9, 'start_time': '09:00', 'finish_time': '10:03'},
{'total_people': 0, 'start_time': '10:03', 'finish_time': '10:13'},
{'total_people': 9, 'start_time': '10:13', 'finish_time': '11:16'},
{'total_people': 0, 'start_time': '11:16', 'finish_time': '11:26'},
{'total_people': 9, 'start_time': '11:26', 'finish_time': '12:30'},
{'total_people': 0, 'start_time': '12:30', 'finish_time': '13:30'},
{'total_people': 9, 'start_time': '13:30', 'finish_time': '14:53'},
{'total_people': 0, 'start_time': '14:53', 'finish_time': '15:03'},
{'total_people': 9, 'start_time': '15:03', 'finish_time': '16:26'},
{'total_people': 0, 'start_time': '16:26', 'finish_time': '16:36'},
{'total_people': 9, 'start_time': '16:36', 'finish_time': '18:00'},
])
baseline_form_data['total_people'] = 0
baseline_form_data['infected_people'] = 0
dynamic_occupancy_baseline_form: VirusFormData = VirusFormData.from_dict(baseline_form_data, data_registry)
dynamic_occupancy_model = dynamic_occupancy_baseline_form.build_model()
dynamic_occupancy_report_data = rep_gen.calculate_report_data(dynamic_occupancy_baseline_form, executor_factory)
assert (list(sorted(static_occupancy_model.concentration_model.infected.presence.transition_times())) ==
list(dynamic_occupancy_model.concentration_model.infected.number.transition_times))
assert (list(sorted(static_occupancy_model.exposed.presence.transition_times())) ==
list(dynamic_occupancy_model.exposed.number.transition_times))
np.testing.assert_almost_equal(static_occupancy_report_data['prob_inf'], dynamic_occupancy_report_data['prob_inf'], 1)
assert dynamic_occupancy_report_data['expected_new_cases'] == None
assert dynamic_occupancy_report_data['prob_probabilistic_exposure'] == None