Merge branch 'feature/cumulative_inf_aero' into 'master'

Method to calculate the deposited_exposure between two times

See merge request cara/cara!322
This commit is contained in:
Andre Henriques 2022-02-04 10:38:22 +01:00
commit 7e61cc33e5
6 changed files with 91 additions and 102 deletions

View file

@ -109,7 +109,7 @@ def calculate_report_data(model: models.ExposureModel):
exposed_occupants = model.exposed.number
expected_new_cases = np.array(model.expected_new_cases()).mean()
cumulative_doses = np.cumsum([
np.array(model.exposure_between_bounds(float(time1), float(time2))).mean()
np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean()
for time1, time2 in zip(times[:-1], times[1:])
])

View file

@ -1104,11 +1104,6 @@ class ExposureModel:
elif time1 <= start and stop < time2:
exposure += self.concentration_model.normed_integrated_concentration(start, stop)
return exposure
def exposure_between_bounds(self, time1: float, time2: float) -> _VectorisedFloat:
"""The number of virions per meter^3 between any two times."""
return (self._normed_exposure_between_bounds(time1, time2) *
self.concentration_model.infected.emission_rate_when_present())
def _normed_exposure(self) -> _VectorisedFloat:
"""
@ -1122,37 +1117,15 @@ class ExposureModel:
return normed_exposure * self.repeats
def exposure(self) -> _VectorisedFloat:
def deposited_exposure_between_bounds(self, time1: float, time2: float) -> _VectorisedFloat:
"""
The number of virus per meter^3.
"""
emission_rate_per_aerosol = self.concentration_model.infected.emission_rate_per_aerosol_when_present()
aerosols = self.concentration_model.infected.aerosols()
diameter = self.concentration_model.infected.particle.diameter
if not np.isscalar(diameter) and diameter is not None:
# we compute first the mean of all diameter-dependent quantities
# to perform properly the Monte-Carlo integration over
# particle diameters (doing things in another order would
# lead to wrong results).
exposure_integrated = np.array(self._normed_exposure()*aerosols).mean()
else:
# in the case of a single diameter or no diameter defined,
# one should not take any mean at this stage.
exposure_integrated = self._normed_exposure()*aerosols
# then we multiply by the diameter-independent quantity
# emission_rate_per_aerosol
return exposure_integrated * emission_rate_per_aerosol
def _deposited_exposure(self) -> _VectorisedFloat:
"""
The number of virus per m^3 deposited on the respiratory tract.
The number of virus per m^3 deposited on the respiratory tract
between any two times.
"""
emission_rate_per_aerosol = self.concentration_model.infected.emission_rate_per_aerosol_when_present()
aerosols = self.concentration_model.infected.aerosols()
fdep = self.fraction_deposited()
f_inf = self.concentration_model.infected.fraction_of_infectious_virus()
diameter = self.concentration_model.infected.particle.diameter
@ -1161,34 +1134,40 @@ class ExposureModel:
# to perform properly the Monte-Carlo integration over
# particle diameters (doing things in another order would
# lead to wrong results).
dep_exposure_integrated = np.array(self._normed_exposure() *
dep_exposure_integrated = np.array(self._normed_exposure_between_bounds(time1, time2) *
aerosols *
fdep).mean()
else:
# in the case of a single diameter or no diameter defined,
# one should not take any mean at this stage.
dep_exposure_integrated = self._normed_exposure()*aerosols*fdep
dep_exposure_integrated = self._normed_exposure_between_bounds(time1, time2)*aerosols*fdep
# then we multiply by the diameter-independent quantity
# emission_rate_per_aerosol
return dep_exposure_integrated * emission_rate_per_aerosol
# then we multiply by the diameter-independent quantity emission_rate_per_aerosol,
# and parameters of the vD equation (i.e. f_inf, BR_k and n_in).
return (dep_exposure_integrated * emission_rate_per_aerosol *
f_inf * self.exposed.activity.inhalation_rate *
(1 - self.exposed.mask.inhale_efficiency()))
def deposited_exposure(self) -> _VectorisedFloat:
"""
The number of virus per m^3 deposited on the respiratory tract.
"""
deposited_exposure = 0.0
for start, stop in self.exposed.presence.boundaries():
deposited_exposure += self.deposited_exposure_between_bounds(start, stop)
return deposited_exposure * self.repeats
def infection_probability(self) -> _VectorisedFloat:
deposited_exposure = self._deposited_exposure()
f_inf = self.concentration_model.infected.fraction_of_infectious_virus()
inf_aero = (
self.exposed.activity.inhalation_rate *
(1 - self.exposed.mask.inhale_efficiency()) *
deposited_exposure * f_inf
)
# viral dose (vD)
vD = self.deposited_exposure()
# oneoverln2 multiplied by ID_50 corresponds to ID_63.
infectious_dose = oneoverln2 * self.concentration_model.virus.infectious_dose
# Probability of infection.
return (1 - np.exp(-((inf_aero * (1 - self.exposed.host_immunity))/(infectious_dose *
return (1 - np.exp(-((vD * (1 - self.exposed.host_immunity))/(infectious_dose *
self.concentration_model.virus.transmissibility_factor)))) * 100
def expected_new_cases(self) -> _VectorisedFloat:

View file

@ -75,25 +75,25 @@ def known_concentrations(func):
@pytest.mark.parametrize(
"population, cm, expected_exposure, expected_probability", [
[populations[1], known_concentrations(lambda t: 36.),
np.array([432, 432]), np.array([67.9503762594, 65.2366759251])],
np.array([64.02320633, 59.45012016]), np.array([67.9503762594, 65.2366759251])],
[populations[2], known_concentrations(lambda t: 36.),
np.array([432, 432]), np.array([51.6749232285, 55.6374622042])],
np.array([40.91708675, 45.73086166]), np.array([51.6749232285, 55.6374622042])],
[populations[0], known_concentrations(lambda t: np.array([36., 72.])),
np.array([432, 864]), np.array([55.6374622042, 80.3196524031])],
np.array([45.73086166, 91.46172332]), np.array([55.6374622042, 80.3196524031])],
[populations[1], known_concentrations(lambda t: np.array([36., 72.])),
np.array([432, 864]), np.array([67.9503762594, 87.9151129926])],
np.array([64.02320633, 118.90024032]), np.array([67.9503762594, 87.9151129926])],
[populations[2], known_concentrations(lambda t: np.array([36., 72.])),
np.array([432, 864]), np.array([51.6749232285, 80.3196524031])],
np.array([40.91708675, 91.46172332]), np.array([51.6749232285, 80.3196524031])],
])
def test_exposure_model_ndarray(population, cm,
expected_exposure, expected_probability):
model = ExposureModel(cm, population)
np.testing.assert_almost_equal(
model.exposure(), expected_exposure
model.deposited_exposure(), expected_exposure
)
np.testing.assert_almost_equal(
model.infection_probability(), expected_probability, decimal=10
@ -105,33 +105,43 @@ def test_exposure_model_ndarray(population, cm,
assert model.expected_new_cases().shape == (2,)
@pytest.mark.parametrize("population", populations)
def test_exposure_model_ndarray_and_float_mix(population):
@pytest.mark.parametrize("population, expected_deposited_exposure", [
[populations[0], np.array([1.52436206, 1.52436206])],
[populations[1], np.array([2.13410688, 1.98167067])],
[populations[2], np.array([1.36390289, 1.52436206])],
])
def test_exposure_model_ndarray_and_float_mix(population, expected_deposited_exposure):
cm = known_concentrations(
lambda t: 0. if np.floor(t) % 2 else np.array([1.2, 1.2]))
model = ExposureModel(cm, population)
expected_exposure = np.array([14.4, 14.4])
np.testing.assert_almost_equal(
model.exposure(), expected_exposure
model.deposited_exposure(), expected_deposited_exposure
)
assert isinstance(model.infection_probability(), np.ndarray)
assert isinstance(model.expected_new_cases(), np.ndarray)
@pytest.mark.parametrize("population", populations)
def test_exposure_model_compare_scalar_vector(population):
cm_scalar = known_concentrations(lambda t: 1.2)
@pytest.mark.parametrize("population, expected_deposited_exposure", [
[populations[0], np.array([1.52436206, 1.52436206])],
[populations[1], np.array([2.13410688, 1.98167067])],
[populations[2], np.array([1.36390289, 1.52436206])],
])
def test_exposure_model_vector(population, expected_deposited_exposure):
cm_array = known_concentrations(lambda t: np.array([1.2, 1.2]))
model_scalar = ExposureModel(cm_scalar, population)
model_array = ExposureModel(cm_array, population)
expected_exposure = 14.4
np.testing.assert_almost_equal(
model_scalar.exposure(), expected_exposure
model_array.deposited_exposure(), np.array(expected_deposited_exposure)
)
def test_exposure_model_scalar():
cm_scalar = known_concentrations(lambda t: 1.2)
model_scalar = ExposureModel(cm_scalar, populations[0])
expected_deposited_exposure = 1.52436206
np.testing.assert_almost_equal(
model_array.exposure(), np.array([expected_exposure]*2)
model_scalar.deposited_exposure(), expected_deposited_exposure
)
@ -159,28 +169,28 @@ def conc_model():
)
# Expected exposure were computed with a trapezoidal integration, using
# Expected deposited exposure were computed with a trapezoidal integration, using
# a mesh of 10'000 pts per exposed presence interval.
@pytest.mark.parametrize(
["exposed_time_interval", "expected_exposure"],
["exposed_time_interval", "expected_deposited_exposure"],
[
[(0., 1.), 266.67176],
[(1., 1.01), 3.0879539],
[(1.01, 1.02), 3.00082435],
[(12., 12.01), 0.095063235],
[(12., 24.), 3775.65025],
[(0., 24.), 4097.8494],
[(0., 1.), 45.6008710],
[(1., 1.01), 0.5280401],
[(1.01, 1.02), 0.51314096385],
[(12., 12.01), 0.016255813185],
[(12., 24.), 645.63619275],
[(0., 24.), 700.7322474],
]
)
def test_exposure_model_integral_accuracy(exposed_time_interval,
expected_exposure, conc_model):
expected_deposited_exposure, conc_model):
presence_interval = models.SpecificInterval((exposed_time_interval,))
population = models.Population(
10, presence_interval, models.Mask.types['Type I'],
models.Activity.types['Standing'], 0.,
)
model = ExposureModel(conc_model, population)
np.testing.assert_allclose(model.exposure(), expected_exposure)
np.testing.assert_allclose(model.deposited_exposure(), expected_deposited_exposure)
def test_infectious_dose_vectorisation():

View file

@ -381,16 +381,16 @@ def build_exposure_model(concentration_model):
)
# expected exposure were computed with a trapezoidal integration, using
# expected deposited exposure were computed with a trapezoidal integration, using
# a mesh of 100'000 pts per exposed presence interval.
@pytest.mark.parametrize(
"month, expected_exposure",
"month, expected_deposited_exposure",
[
['Jan', 503.254087759],
['Jun', 2294.71115639],
['Jan', 377.440565819],
['Jun', 1721.03336729],
],
)
def test_exposure_hourly_dep(month,expected_exposure):
def test_exposure_hourly_dep(month,expected_deposited_exposure):
m = build_exposure_model(
build_hourly_dependent_model(
month,
@ -398,20 +398,20 @@ def test_exposure_hourly_dep(month,expected_exposure):
intervals_presence_infected=((8., 12.), (13., 17.))
)
)
exposure = m.exposure()
npt.assert_allclose(exposure, expected_exposure)
deposited_exposure = m.deposited_exposure()
npt.assert_allclose(deposited_exposure, expected_deposited_exposure)
# expected exposure were computed with a trapezoidal integration, using
# expected deposited exposure were computed with a trapezoidal integration, using
# a mesh of 100'000 pts per exposed presence interval and 25 pts per hour
# for the temperature discretization.
@pytest.mark.parametrize(
"month, expected_exposure",
"month, expected_deposited_exposure",
[
['Jan', 511.118941481],
['Jun', 2398.90129579],
['Jan', 383.339206111],
['Jun', 1799.17597184],
],
)
def test_exposure_hourly_dep_refined(month,expected_exposure):
def test_exposure_hourly_dep_refined(month,expected_deposited_exposure):
m = build_exposure_model(
build_hourly_dependent_model(
month,
@ -420,5 +420,5 @@ def test_exposure_hourly_dep_refined(month,expected_exposure):
temperatures=data.GenevaTemperatures,
)
)
exposure = m.exposure()
npt.assert_allclose(exposure, expected_exposure, rtol=0.02)
deposited_exposure = m.deposited_exposure()
npt.assert_allclose(deposited_exposure, expected_deposited_exposure, rtol=0.02)

View file

@ -87,6 +87,6 @@ def test_build_concentration_model(baseline_mc_model: cara.monte_carlo.Concentra
def test_build_exposure_model(baseline_mc_exposure_model: cara.monte_carlo.ExposureModel):
model = baseline_mc_exposure_model.build_model(7)
assert isinstance(model, cara.models.ExposureModel)
prob = model.exposure()
prob = model.deposited_exposure()
assert isinstance(prob, np.ndarray)
assert prob.shape == (7, )

View file

@ -306,13 +306,13 @@ def waiting_room_mc():
@pytest.mark.parametrize(
"mc_model, expected_pi, expected_new_cases, expected_dose, expected_ER",
[
["shared_office_mc", 6.03, 0.18, 27.22, 809],
["classroom_mc", 9.5, 1.85, 79.98, 5624],
["ski_cabin_mc", 16.0, 0.5, 40.25, 7966],
["skagit_chorale_mc",65.7, 40.0, 241.28, 190422],
["bus_ride_mc", 12.0, 8.0, 63.79, 5419],
["gym_mc", 0.45, 0.13, 0.4852, 1145],
["waiting_room_mc", 1.59, 0.22, 7.23, 737],
["shared_office_mc", 6.03, 0.18, 3.198, 809],
["classroom_mc", 9.5, 1.85, 9.478, 5624],
["ski_cabin_mc", 16.0, 0.5, 17.315, 7966],
["skagit_chorale_mc",65.7, 40.0, 102.213, 190422],
["bus_ride_mc", 12.0, 8.0, 7.65, 5419],
["gym_mc", 0.45, 0.13, 0.208, 1145],
["waiting_room_mc", 1.59, 0.22, 0.821, 737],
]
)
def test_report_models(mc_model, expected_pi, expected_new_cases,
@ -323,7 +323,7 @@ def test_report_models(mc_model, expected_pi, expected_new_cases,
expected_pi, rtol=TOLERANCE)
npt.assert_allclose(exposure_model.expected_new_cases().mean(),
expected_new_cases, rtol=TOLERANCE)
npt.assert_allclose(exposure_model.exposure().mean(),
npt.assert_allclose(exposure_model.deposited_exposure().mean(),
expected_dose, rtol=TOLERANCE)
npt.assert_allclose(
exposure_model.concentration_model.infected.emission_rate_when_present().mean(),
@ -333,10 +333,10 @@ def test_report_models(mc_model, expected_pi, expected_new_cases,
@pytest.mark.parametrize(
"mask_type, month, expected_pi, expected_dose, expected_ER",
[
["No mask", "Jul", 9.52, 84.54, 809],
["Type I", "Jul", 1.7, 15.64, 149],
["FFP2", "Jul", 0.51, 15.64, 149],
["Type I", "Feb", 0.57, 4.59, 162],
["No mask", "Jul", 9.52, 9.920, 809],
["Type I", "Jul", 1.7, 0.913, 149],
["FFP2", "Jul", 0.51, 0.239, 149],
["Type I", "Feb", 0.57, 0.272, 162],
],
)
def test_small_shared_office_Geneva(mask_type, month, expected_pi,
@ -381,7 +381,7 @@ def test_small_shared_office_Geneva(mask_type, month, expected_pi,
exposure_model = exposure_mc.build_model(size=SAMPLE_SIZE)
npt.assert_allclose(exposure_model.infection_probability().mean(),
expected_pi, rtol=TOLERANCE)
npt.assert_allclose(exposure_model.exposure().mean(),
npt.assert_allclose(exposure_model.deposited_exposure().mean(),
expected_dose, rtol=TOLERANCE)
npt.assert_allclose(
exposure_model.concentration_model.infected.emission_rate_when_present().mean(),