handled expected number of new cases calculation (including tests)
This commit is contained in:
parent
13c112b656
commit
3c20ed0306
4 changed files with 59 additions and 60 deletions
|
|
@ -1867,43 +1867,56 @@ class ExposureModel:
|
|||
return 0
|
||||
|
||||
def expected_new_cases(self) -> _VectorisedFloat:
|
||||
if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or
|
||||
isinstance(self.exposed.number, IntPiecewiseConstant)):
|
||||
raise NotImplementedError("Cannot compute expected new cases "
|
||||
"with dynamic occupancy")
|
||||
|
||||
"""
|
||||
The expected_new_cases may provide one or two different outputs:
|
||||
1) Long-range exposure: take the infection_probability and multiply by the occupants exposed to long-range.
|
||||
2) Short- and long-range exposure: take the infection_probability of long-range multiplied by the occupants exposed to long-range only,
|
||||
plus the infection_probability of short- and long-range multiplied by the occupants exposed to short-range only.
|
||||
|
||||
In the case dynamic occupancy is defined, the maximum number of exposed occupants during the course of the simulation will be considered.
|
||||
"""
|
||||
exposed_occ: int = max(self.exposed.number.values) if isinstance(self.exposed.number, IntPiecewiseConstant) else self.exposed.number
|
||||
|
||||
if self.short_range != ():
|
||||
new_cases_long_range = nested_replace(self, {'short_range': (),}).infection_probability() * (self.exposed.number - self.exposed_to_short_range)
|
||||
new_cases_long_range = nested_replace(self, {'short_range': [],}).infection_probability() * (exposed_occ - self.exposed_to_short_range)
|
||||
return (new_cases_long_range + (self.infection_probability() * self.exposed_to_short_range)) / 100
|
||||
|
||||
return self.infection_probability() * self.exposed.number / 100
|
||||
return self.infection_probability() * exposed_occ / 100
|
||||
|
||||
def reproduction_number(self) -> _VectorisedFloat:
|
||||
"""
|
||||
The reproduction number can be thought of as the expected number of
|
||||
cases directly generated by one infected case in a population.
|
||||
|
||||
It handles the cases when dynamic occupancy for the infected population is defined.
|
||||
"""
|
||||
if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or
|
||||
isinstance(self.exposed.number, IntPiecewiseConstant)):
|
||||
raise NotImplementedError("Cannot compute reproduction number "
|
||||
"with dynamic occupancy")
|
||||
|
||||
if self.concentration_model.infected.number == 1:
|
||||
return self.expected_new_cases()
|
||||
infected_number = self.concentration_model.infected.number
|
||||
if isinstance(infected_number, IntPiecewiseConstant):
|
||||
# Handle case when infected number is dynamic
|
||||
max_occ = max(infected_number.values)
|
||||
if max_occ == 1:
|
||||
return self.expected_new_cases()
|
||||
else:
|
||||
# Adjust to treat dynamic occupancy, limiting infected to 1 when present
|
||||
inf_occ_values = [1 if occ > 0 else occ for occ in infected_number.values]
|
||||
single_exposure_model = nested_replace(
|
||||
self, {
|
||||
'concentration_model.infected.number.values': inf_occ_values
|
||||
}
|
||||
)
|
||||
return single_exposure_model.expected_new_cases()
|
||||
|
||||
elif isinstance(infected_number, int):
|
||||
# Handle case when infected number is a single integer
|
||||
if infected_number == 1:
|
||||
return self.expected_new_cases()
|
||||
|
||||
# Create an equivalent exposure model but with precisely
|
||||
# one infected case.
|
||||
single_exposure_model = nested_replace(
|
||||
self, {
|
||||
'concentration_model.infected.number': 1}
|
||||
)
|
||||
# Create an equivalent exposure model but with precisely
|
||||
# one infected case.
|
||||
single_exposure_model = nested_replace(
|
||||
self, {
|
||||
'concentration_model.infected.number': 1}
|
||||
)
|
||||
|
||||
return single_exposure_model.expected_new_cases()
|
||||
return single_exposure_model.expected_new_cases()
|
||||
|
|
|
|||
|
|
@ -177,12 +177,6 @@ def calculate_report_data(form: VirusFormData, executor_factory: typing.Callable
|
|||
if form.exposure_option == "p_probabilistic_exposure" and form.occupancy_format == "static":
|
||||
prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean()
|
||||
else: prob_probabilistic_exposure = -1
|
||||
# Expected new cases
|
||||
if (form.occupancy_format == "static"):
|
||||
expected_new_cases = np.array(model.expected_new_cases()).mean()
|
||||
else:
|
||||
# With dynamic occupancy, the expected number of new cases feature is disabled.
|
||||
expected_new_cases = -1
|
||||
|
||||
exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()]
|
||||
|
||||
|
|
@ -213,7 +207,8 @@ def calculate_report_data(form: VirusFormData, executor_factory: typing.Callable
|
|||
"prob_hist_count": list(prob_dist_count),
|
||||
"prob_hist_bins": list(prob_dist_bins),
|
||||
"prob_probabilistic_exposure": prob_probabilistic_exposure,
|
||||
"expected_new_cases": expected_new_cases,
|
||||
"expected_new_cases": np.array(model.expected_new_cases()).mean(),
|
||||
"uncertainties_plot_src": uncertainties_plot_src,
|
||||
"CO2_concentrations": CO2_concentrations,
|
||||
"conditional_probability_data": conditional_probability_data,
|
||||
"uncertainties_plot_src": uncertainties_plot_src,
|
||||
|
|
@ -406,8 +401,14 @@ def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, m
|
|||
scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model()
|
||||
|
||||
else:
|
||||
no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[],
|
||||
total_people=form.total_people - form.short_range_occupants)
|
||||
# When dynamic occupancy is defined, the replace of total people is useless - the expected number of new cases is not calculated.
|
||||
if form.occupancy_format == 'static':
|
||||
no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[], total_people=form.total_people - form.short_range_occupants)
|
||||
elif form.occupancy_format == 'dynamic':
|
||||
for occ in form.dynamic_exposed_occupancy: # Update the number of exposed people with long-range exposure
|
||||
if occ['total_people'] > form.short_range_occupants: occ['total_people'] = max(0, occ['total_people'] - form.short_range_occupants)
|
||||
no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[], dynamic_exposed_occupancy=form.dynamic_exposed_occupancy)
|
||||
|
||||
scenarios['Base scenario without short-range interactions'] = no_short_range_alternative.build_mc_model()
|
||||
|
||||
return scenarios
|
||||
|
|
@ -417,7 +418,6 @@ def scenario_statistics(
|
|||
mc_model: mc.ExposureModel,
|
||||
sample_times: typing.List[float],
|
||||
compute_prob_exposure: bool,
|
||||
compute_expected_new_cases: bool,
|
||||
):
|
||||
model = mc_model.build_model(
|
||||
size=mc_model.data_registry.monte_carlo['sample_size'])
|
||||
|
|
@ -425,16 +425,11 @@ def scenario_statistics(
|
|||
# It means we have data to calculate the total_probability_rule
|
||||
prob_probabilistic_exposure = model.total_probability_rule()
|
||||
else:
|
||||
prob_probabilistic_exposure = -1
|
||||
|
||||
if (compute_expected_new_cases):
|
||||
expected_new_cases = np.mean(model.expected_new_cases())
|
||||
else:
|
||||
expected_new_cases = -1
|
||||
prob_probabilistic_exposure = -1
|
||||
|
||||
return {
|
||||
'probability_of_infection': np.mean(model.infection_probability()),
|
||||
'expected_new_cases': expected_new_cases,
|
||||
'expected_new_cases': np.mean(model.expected_new_cases()),
|
||||
'concentrations': [
|
||||
np.mean(model.concentration(time))
|
||||
for time in sample_times
|
||||
|
|
@ -464,8 +459,6 @@ def comparison_report(
|
|||
compute_prob_exposure = True
|
||||
else:
|
||||
compute_prob_exposure = False
|
||||
|
||||
compute_expected_new_cases = True if (form.occupancy_format == "static") else False
|
||||
|
||||
with executor_factory() as executor:
|
||||
results = executor.map(
|
||||
|
|
@ -473,7 +466,6 @@ def comparison_report(
|
|||
scenarios.values(),
|
||||
[report_data['times']] * len(scenarios),
|
||||
[compute_prob_exposure] * len(scenarios),
|
||||
[compute_expected_new_cases] * len(scenarios),
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -230,32 +230,26 @@ def test_dynamic_total_probability_rule(
|
|||
"(including incidence rate) with dynamic occupancy")):
|
||||
dynamic_population_exposure_model.total_probability_rule()
|
||||
|
||||
|
||||
def test_dynamic_expected_new_cases(
|
||||
full_exposure_model: models.ExposureModel,
|
||||
dynamic_infected_single_exposure_model: models.ExposureModel,
|
||||
dynamic_exposed_single_exposure_model: models.ExposureModel,
|
||||
dynamic_population_exposure_model: models.ExposureModel):
|
||||
|
||||
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases "
|
||||
"with dynamic occupancy")):
|
||||
dynamic_infected_single_exposure_model.expected_new_cases()
|
||||
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases "
|
||||
"with dynamic occupancy")):
|
||||
dynamic_exposed_single_exposure_model.expected_new_cases()
|
||||
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases "
|
||||
"with dynamic occupancy")):
|
||||
dynamic_population_exposure_model.expected_new_cases()
|
||||
base_expected_new_cases = full_exposure_model.expected_new_cases()
|
||||
npt.assert_almost_equal(base_expected_new_cases, dynamic_infected_single_exposure_model.expected_new_cases())
|
||||
npt.assert_almost_equal(base_expected_new_cases, dynamic_exposed_single_exposure_model.expected_new_cases())
|
||||
npt.assert_almost_equal(base_expected_new_cases, dynamic_population_exposure_model.expected_new_cases())
|
||||
|
||||
|
||||
def test_dynamic_reproduction_number(
|
||||
full_exposure_model: models.ExposureModel,
|
||||
dynamic_infected_single_exposure_model: models.ExposureModel,
|
||||
dynamic_exposed_single_exposure_model: models.ExposureModel,
|
||||
dynamic_population_exposure_model: models.ExposureModel):
|
||||
|
||||
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number "
|
||||
"with dynamic occupancy")):
|
||||
dynamic_infected_single_exposure_model.reproduction_number()
|
||||
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number "
|
||||
"with dynamic occupancy")):
|
||||
dynamic_exposed_single_exposure_model.reproduction_number()
|
||||
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number "
|
||||
"with dynamic occupancy")):
|
||||
dynamic_population_exposure_model.reproduction_number()
|
||||
|
||||
base_reproduction_number = full_exposure_model.reproduction_number()
|
||||
npt.assert_almost_equal(base_reproduction_number, dynamic_infected_single_exposure_model.reproduction_number())
|
||||
npt.assert_almost_equal(base_reproduction_number, dynamic_exposed_single_exposure_model.reproduction_number())
|
||||
npt.assert_almost_equal(base_reproduction_number, dynamic_population_exposure_model.reproduction_number())
|
||||
|
|
|
|||
|
|
@ -180,6 +180,6 @@ def test_static_vs_dynamic_occupancy_from_form(baseline_form_data, data_registry
|
|||
list(dynamic_occupancy_model.exposed.number.transition_times))
|
||||
|
||||
np.testing.assert_almost_equal(static_occupancy_report_data['prob_inf'], dynamic_occupancy_report_data['prob_inf'], 1)
|
||||
assert dynamic_occupancy_report_data['expected_new_cases'] == -1
|
||||
np.testing.assert_almost_equal(static_occupancy_report_data['expected_new_cases'], dynamic_occupancy_report_data['expected_new_cases'], 1)
|
||||
assert dynamic_occupancy_report_data['prob_probabilistic_exposure'] == -1
|
||||
|
||||
Loading…
Reference in a new issue