Merge branch 'feature/dose_list' into 'master'
New logic for probability of infection See merge request caimira/caimira!451
This commit is contained in:
commit
fcee962a83
2 changed files with 137 additions and 40 deletions
|
|
@ -1494,9 +1494,17 @@ class ExposureModel:
|
|||
c_model.ventilation.air_exchange(c_model.room, time)) for time in c_model.state_change_times()))):
|
||||
raise ValueError("If the diameter is an array, none of the ventilation parameters "
|
||||
"or virus decay constant can be arrays at the same time.")
|
||||
if not isinstance(self.exposed.number, int):
|
||||
raise NotImplementedError("Cannot use dynamic occupancy for"
|
||||
" the exposed population")
|
||||
|
||||
@method_cache
|
||||
def population_state_change_times(self) -> typing.List[float]:
|
||||
"""
|
||||
All time dependent population entities on this model must provide information
|
||||
about the times at which their state changes.
|
||||
"""
|
||||
state_change_times = set(self.concentration_model.infected.presence_interval().transition_times())
|
||||
state_change_times.update(self.exposed.presence_interval().transition_times())
|
||||
|
||||
return sorted(state_change_times)
|
||||
|
||||
def long_range_fraction_deposited(self) -> _VectorisedFloat:
|
||||
"""
|
||||
|
|
@ -1629,29 +1637,40 @@ class ExposureModel:
|
|||
deposited_exposure += self.long_range_deposited_exposure_between_bounds(time1, time2)
|
||||
|
||||
return deposited_exposure
|
||||
|
||||
def _deposited_exposure_list(self):
|
||||
"""
|
||||
The number of virus per m^3 deposited on the respiratory tract.
|
||||
"""
|
||||
population_change_times = self.population_state_change_times()
|
||||
|
||||
deposited_exposure = []
|
||||
for start, stop in zip(population_change_times[:-1], population_change_times[1:]):
|
||||
deposited_exposure.append(self.deposited_exposure_between_bounds(start, stop))
|
||||
|
||||
return deposited_exposure
|
||||
|
||||
def deposited_exposure(self) -> _VectorisedFloat:
|
||||
"""
|
||||
The number of virus per m^3 deposited on the respiratory tract.
|
||||
"""
|
||||
deposited_exposure: _VectorisedFloat = 0.0
|
||||
for start, stop in self.exposed.presence_interval().boundaries():
|
||||
deposited_exposure += self.deposited_exposure_between_bounds(start, stop)
|
||||
|
||||
return deposited_exposure * self.repeats
|
||||
|
||||
@method_cache
|
||||
def infection_probability(self) -> _VectorisedFloat:
|
||||
return np.sum(self._deposited_exposure_list(), axis=0) * self.repeats
|
||||
|
||||
def _infection_probability_list(self):
|
||||
# Viral dose (vD)
|
||||
vD = self.deposited_exposure()
|
||||
vD_list = self._deposited_exposure_list()
|
||||
|
||||
# oneoverln2 multiplied by ID_50 corresponds to ID_63.
|
||||
infectious_dose = oneoverln2 * self.concentration_model.virus.infectious_dose
|
||||
|
||||
# Probability of infection.
|
||||
return (1 - np.exp(-((vD * (1 - self.exposed.host_immunity))/(infectious_dose *
|
||||
self.concentration_model.virus.transmissibility_factor)))) * 100
|
||||
|
||||
# Probability of infection.
|
||||
return [(1 - np.exp(-((vD * (1 - self.exposed.host_immunity))/(infectious_dose *
|
||||
self.concentration_model.virus.transmissibility_factor)))) for vD in vD_list]
|
||||
|
||||
@method_cache
|
||||
def infection_probability(self) -> _VectorisedFloat:
|
||||
return (1 - np.prod([1 - prob for prob in self._infection_probability_list()], axis = 0)) * 100
|
||||
|
||||
def total_probability_rule(self) -> _VectorisedFloat:
|
||||
if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or
|
||||
isinstance(self.exposed.number, IntPiecewiseConstant)):
|
||||
|
|
@ -1682,15 +1701,12 @@ class ExposureModel:
|
|||
return 0
|
||||
|
||||
def expected_new_cases(self) -> _VectorisedFloat:
|
||||
# Create an equivalent exposure model without short-range interactions, if any.
|
||||
if (len(self.short_range) == 0):
|
||||
exposure_model = nested_replace(self, {'short_range': ()})
|
||||
prob = exposure_model.infection_probability()
|
||||
else:
|
||||
prob = self.infection_probability()
|
||||
if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or
|
||||
isinstance(self.exposed.number, IntPiecewiseConstant)):
|
||||
raise NotImplementedError("Cannot compute expected new cases "
|
||||
"with dynamic occupancy")
|
||||
|
||||
exposed_occupants = self.exposed.number
|
||||
return prob * exposed_occupants / 100
|
||||
return self.infection_probability() * self.exposed.number / 100
|
||||
|
||||
def reproduction_number(self) -> _VectorisedFloat:
|
||||
"""
|
||||
|
|
@ -1698,13 +1714,19 @@ class ExposureModel:
|
|||
cases directly generated by one infected case in a population.
|
||||
|
||||
"""
|
||||
if (isinstance(self.concentration_model.infected.number, IntPiecewiseConstant) or
|
||||
isinstance(self.exposed.number, IntPiecewiseConstant)):
|
||||
raise NotImplementedError("Cannot compute reproduction number "
|
||||
"with dynamic occupancy")
|
||||
|
||||
if self.concentration_model.infected.number == 1:
|
||||
return self.expected_new_cases()
|
||||
|
||||
# Create an equivalent exposure model but with precisely
|
||||
# one infected case.
|
||||
single_exposure_model = nested_replace(
|
||||
self, {'concentration_model.infected.number': 1}
|
||||
self, {
|
||||
'concentration_model.infected.number': 1}
|
||||
)
|
||||
|
||||
return single_exposure_model.expected_new_cases()
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ def full_exposure_model():
|
|||
short_range=(),
|
||||
exposed=models.Population(
|
||||
number=10,
|
||||
presence=models.SpecificInterval(((8, 12), (13, 17), )),
|
||||
presence=models.SpecificInterval(((8, 12), (13, 17), )),
|
||||
mask=models.Mask.types['No mask'],
|
||||
activity=models.Activity.types['Seated'],
|
||||
host_immunity=0.
|
||||
|
|
@ -51,11 +51,37 @@ def baseline_infected_population_number():
|
|||
|
||||
|
||||
@pytest.fixture
|
||||
def dynamic_single_exposure_model(full_exposure_model, baseline_infected_population_number):
|
||||
def baseline_exposed_population_number():
|
||||
return models.Population(
|
||||
number=models.IntPiecewiseConstant(
|
||||
(8, 12, 13, 17), (10, 0, 10)),
|
||||
presence=None,
|
||||
mask=models.Mask.types['No mask'],
|
||||
activity=models.Activity.types['Seated'],
|
||||
host_immunity=0.,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def dynamic_infected_single_exposure_model(full_exposure_model, baseline_infected_population_number):
|
||||
return dc_utils.nested_replace(full_exposure_model,
|
||||
{'concentration_model.infected': baseline_infected_population_number, })
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def dynamic_exposed_single_exposure_model(full_exposure_model, baseline_exposed_population_number):
|
||||
return dc_utils.nested_replace(full_exposure_model,
|
||||
{'exposed': baseline_exposed_population_number, })
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def dynamic_population_exposure_model(full_exposure_model, baseline_infected_population_number ,baseline_exposed_population_number):
|
||||
return dc_utils.nested_replace(full_exposure_model, {
|
||||
'concentration_model.infected': baseline_infected_population_number,
|
||||
'exposed': baseline_exposed_population_number,
|
||||
})
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"time",
|
||||
[4., 8., 10., 12., 13., 14., 16., 20., 24.],
|
||||
|
|
@ -91,16 +117,16 @@ def test_population_number(full_exposure_model: models.ExposureModel,
|
|||
[4., 8., 10., 12., 13., 14., 16., 20., 24.],
|
||||
)
|
||||
def test_concentration_model_dynamic_population(full_exposure_model: models.ExposureModel,
|
||||
dynamic_single_exposure_model: models.ExposureModel,
|
||||
dynamic_infected_single_exposure_model: models.ExposureModel,
|
||||
time: float):
|
||||
|
||||
assert full_exposure_model.concentration(time) == dynamic_single_exposure_model.concentration(time)
|
||||
assert full_exposure_model.concentration(time) == dynamic_infected_single_exposure_model.concentration(time)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("number_of_infected",[1, 2, 3, 4, 5])
|
||||
@pytest.mark.parametrize("time",[9., 12.5, 16.])
|
||||
def test_linearity_with_number_of_infected(full_exposure_model: models.ExposureModel,
|
||||
dynamic_single_exposure_model: models.ExposureModel,
|
||||
dynamic_infected_single_exposure_model: models.ExposureModel,
|
||||
time: float,
|
||||
number_of_infected: int):
|
||||
|
||||
|
|
@ -112,14 +138,14 @@ def test_linearity_with_number_of_infected(full_exposure_model: models.ExposureM
|
|||
}
|
||||
)
|
||||
|
||||
npt.assert_almost_equal(static_multiple_exposure_model.concentration(time), dynamic_single_exposure_model.concentration(time) * number_of_infected)
|
||||
npt.assert_almost_equal(static_multiple_exposure_model.deposited_exposure(), dynamic_single_exposure_model.deposited_exposure() * number_of_infected)
|
||||
npt.assert_almost_equal(static_multiple_exposure_model.concentration(time), dynamic_infected_single_exposure_model.concentration(time) * number_of_infected)
|
||||
npt.assert_almost_equal(static_multiple_exposure_model.deposited_exposure(), dynamic_infected_single_exposure_model.deposited_exposure() * number_of_infected)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"time", (8., 9., 10., 11., 12., 13., 14.),
|
||||
)
|
||||
def test_dynamic_dose(full_exposure_model, time):
|
||||
def test_dynamic_dose(full_exposure_model: models.ExposureModel, time: float):
|
||||
|
||||
dynamic_infected: models.ExposureModel = dc_utils.nested_replace(
|
||||
full_exposure_model,
|
||||
|
|
@ -171,10 +197,59 @@ def test_dynamic_dose(full_exposure_model, time):
|
|||
npt.assert_almost_equal(dynamic_exposure, np.sum(static_exposure))
|
||||
|
||||
|
||||
def test_dynamic_total_probability_rule(dynamic_single_exposure_model: models.ExposureModel):
|
||||
with pytest.raises(
|
||||
NotImplementedError,
|
||||
match=re.escape("Cannot compute total probability "
|
||||
"(including incidence rate) with dynamic occupancy")
|
||||
):
|
||||
dynamic_single_exposure_model.total_probability_rule()
|
||||
def test_infection_probability(
|
||||
full_exposure_model: models.ExposureModel,
|
||||
dynamic_infected_single_exposure_model: models.ExposureModel,
|
||||
dynamic_exposed_single_exposure_model: models.ExposureModel,
|
||||
dynamic_population_exposure_model: models.ExposureModel):
|
||||
|
||||
base_infection_probability = full_exposure_model.infection_probability()
|
||||
npt.assert_almost_equal(base_infection_probability, dynamic_infected_single_exposure_model.infection_probability())
|
||||
npt.assert_almost_equal(base_infection_probability, dynamic_exposed_single_exposure_model.infection_probability())
|
||||
npt.assert_almost_equal(base_infection_probability, dynamic_population_exposure_model.infection_probability())
|
||||
|
||||
|
||||
def test_dynamic_total_probability_rule(
|
||||
dynamic_infected_single_exposure_model: models.ExposureModel,
|
||||
dynamic_exposed_single_exposure_model: models.ExposureModel,
|
||||
dynamic_population_exposure_model: models.ExposureModel):
|
||||
|
||||
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute total probability "
|
||||
"(including incidence rate) with dynamic occupancy")):
|
||||
dynamic_infected_single_exposure_model.total_probability_rule()
|
||||
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute total probability "
|
||||
"(including incidence rate) with dynamic occupancy")):
|
||||
dynamic_exposed_single_exposure_model.total_probability_rule()
|
||||
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute total probability "
|
||||
"(including incidence rate) with dynamic occupancy")):
|
||||
dynamic_population_exposure_model.total_probability_rule()
|
||||
|
||||
def test_dynamic_expected_new_cases(
|
||||
dynamic_infected_single_exposure_model: models.ExposureModel,
|
||||
dynamic_exposed_single_exposure_model: models.ExposureModel,
|
||||
dynamic_population_exposure_model: models.ExposureModel):
|
||||
|
||||
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases "
|
||||
"with dynamic occupancy")):
|
||||
dynamic_infected_single_exposure_model.expected_new_cases()
|
||||
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases "
|
||||
"with dynamic occupancy")):
|
||||
dynamic_exposed_single_exposure_model.expected_new_cases()
|
||||
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute expected new cases "
|
||||
"with dynamic occupancy")):
|
||||
dynamic_population_exposure_model.expected_new_cases()
|
||||
|
||||
def test_dynamic_reproduction_number(
|
||||
dynamic_infected_single_exposure_model: models.ExposureModel,
|
||||
dynamic_exposed_single_exposure_model: models.ExposureModel,
|
||||
dynamic_population_exposure_model: models.ExposureModel):
|
||||
|
||||
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number "
|
||||
"with dynamic occupancy")):
|
||||
dynamic_infected_single_exposure_model.reproduction_number()
|
||||
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number "
|
||||
"with dynamic occupancy")):
|
||||
dynamic_exposed_single_exposure_model.reproduction_number()
|
||||
with pytest.raises(NotImplementedError, match=re.escape("Cannot compute reproduction number "
|
||||
"with dynamic occupancy")):
|
||||
dynamic_population_exposure_model.reproduction_number()
|
||||
|
|
|
|||
Loading…
Reference in a new issue