test exposure model fix

This commit is contained in:
Luis Aleixo 2021-07-26 09:55:56 +02:00
parent 7bab716532
commit 5983fca92e
4 changed files with 48 additions and 57 deletions

View file

@ -10,6 +10,7 @@ import zlib
import loky
import jinja2
import matplotlib
from numpy.lib.function_base import quantile
matplotlib.use('agg')
import matplotlib.pyplot as plt
import numpy as np
@ -131,8 +132,8 @@ def plot(times, concentrations, model: models.ExposureModel):
)
#See CERN-OPEN-2021-004, p. 15, eq. 16. - Cumulative Dose
qds = [np.mean(dataclass_utils.nested_replace(model, {'exposed.presence': model.exposed.presence.generate_truncated_interval(t)}).cumulated_exposure()) for t in times]
qds = [np.mean(model.cumulated_exposure_vs_time(t)) for t in times]
ax1 = ax.twinx()
ax1.plot(datetimes, qds, label='Mean cumulative dose', color='#1f77b4', linestyle='dotted')
ax1.spines["right"].set_linestyle("--")
@ -248,7 +249,7 @@ def comparison_plot(scenarios: typing.Dict[str, dict], sample_times: np.ndarray)
concentrations = statistics['concentrations']
#See CERN-OPEN-2021-004, p. 15, eq. 16. - Cumulative Dose
qds = [np.mean(dataclass_utils.nested_replace(model, {'exposed.presence': model.exposed.presence.generate_truncated_interval(t)}).cumulated_exposure()) for t in sample_times]
qds = [np.mean(model.cumulated_exposure_vs_time(t)) for t in sample_times]
# Plot concentrations and cumulative dose
if name in dash_styled_scenarios:

View file

@ -100,20 +100,6 @@ class Interval:
return True
return False
def generate_truncated_interval(self, time_stop: float) -> "Interval":
truncated_boundaries = []
for start, end in self.boundaries():
if start < time_stop <= end:
end = time_stop
truncated_boundaries.append((start, end))
break
elif time_stop <= start:
break
else:
truncated_boundaries.append((start, end))
return SpecificInterval(present_times = tuple(truncated_boundaries))
@dataclass(frozen=True)
class SpecificInterval(Interval):
#: A sequence of times (start, stop), in hours, that the infected person
@ -881,17 +867,32 @@ class ExposureModel:
#: The fraction of viruses actually deposited in the respiratory tract
fraction_deposited: _VectorisedFloat = 0.6
def quanta_exposure(self) -> _VectorisedFloat:
"""The number of virus quanta per meter^3."""
def quanta_exposure_vs_time(self, time: float) -> _VectorisedFloat:
"""The number of virus quanta per meter^3 integrated until time."""
exposure = 0.0
for start, stop in self.exposed.presence.boundaries():
exposure += self.concentration_model.integrated_concentration(start, stop)
if start > time:
break
elif time <= stop:
stop = time
exposure += self.concentration_model.integrated_concentration(start, stop)
break
else:
exposure += self.concentration_model.integrated_concentration(start, stop)
return exposure * self.repeats
def cumulated_exposure(self) -> _VectorisedFloat:
exposure = self.quanta_exposure()
def quanta_exposure(self) -> _VectorisedFloat:
"""The number of virus quanta per meter^3 for the full simulation time."""
if self.exposed.presence.transition_times():
return self.quanta_exposure_vs_time(max(self.exposed.presence.transition_times()))
else:
return 0
def cumulated_exposure_vs_time(self, time: float) -> _VectorisedFloat:
exposure = self.quanta_exposure_vs_time(time)
return (
self.exposed.activity.inhalation_rate *
@ -899,6 +900,12 @@ class ExposureModel:
exposure * self.fraction_deposited
)
def cumulated_exposure(self) -> _VectorisedFloat:
if self.exposed.presence.transition_times():
return self.cumulated_exposure_vs_time(max(self.exposed.presence.transition_times()))
else:
return 0
def infection_probability(self) -> _VectorisedFloat:
inf_aero = self.cumulated_exposure()

View file

@ -4,26 +4,31 @@ import typing
import numpy as np
import numpy.testing
import pytest
from dataclasses import dataclass
from cara import models
from cara.models import ExposureModel
@dataclass(frozen=True)
class KnownConcentrations(models.ConcentrationModel):
"""
A ConcentrationModel which is based on pre-known quanta concentrations and
which therefore doesn't need other components. Useful for testing.
"""
def __init__(self, concentration_function: typing.Callable) -> None:
self._func = concentration_function
#def __init__(self, concentration_function: typing.Callable) -> None:
# self._func = concentration_function
concentration_function: typing.Callable
def infectious_virus_removal_rate(self, time: float) -> models._VectorisedFloat:
# very large decay constant -> same as constant concentration
return 1.e50
def _concentration_limit(self, time: float) -> models._VectorisedFloat:
return self._func(time)
return self.concentration_function(time)
def state_change_times(self):
return [0, 24]
@ -32,7 +37,7 @@ class KnownConcentrations(models.ConcentrationModel):
return 24
def concentration(self, time: float) -> models._VectorisedFloat: # noqa
return self._func(time)
return self.concentration_function(time)
halftime = models.PeriodicInterval(120, 60)
@ -57,19 +62,19 @@ populations = [
@pytest.mark.parametrize(
"population, cm, f_dep, expected_exposure, expected_cumulated_exposure, expected_probability",[
[populations[1], KnownConcentrations(lambda t: 1.2), 1.,
[populations[1], KnownConcentrations(None, None, None, lambda t: 1.2), 1.,
np.array([14.4, 14.4]), np.array([3.44736/0.6, 3.20112/0.6]), np.array([99.6803184113, 99.5181053773])],
[populations[2], KnownConcentrations(lambda t: 1.2), 1.,
[populations[2], KnownConcentrations(None, None, None, lambda t: 1.2), 1.,
np.array([14.4, 14.4]), np.array([2.2032/0.6, 2.4624/0.6]), np.array([97.4574432074, 98.3493482895])],
[populations[0], KnownConcentrations(lambda t: np.array([1.2, 2.4])), 1.,
[populations[0], KnownConcentrations(None, None, None,lambda t: np.array([1.2, 2.4])), 1.,
np.array([14.4, 28.8]), np.array([2.4624/0.6, 4.9248/0.6]), np.array([98.3493482895, 99.9727534893])],
[populations[1], KnownConcentrations(lambda t: np.array([1.2, 2.4])), 1.,
[populations[1], KnownConcentrations(None, None, None,lambda t: np.array([1.2, 2.4])), 1.,
np.array([14.4, 28.8]), np.array([3.44736/0.6, 6.40224/0.6]), np.array([99.6803184113, 99.9976777757])],
[populations[0], KnownConcentrations(lambda t: 2.4), np.array([0.5, 1.]),
[populations[0], KnownConcentrations(None, None, None,lambda t: 2.4), np.array([0.5, 1.]),
28.8, np.array([4.104, 8.208]), np.array([98.3493482895, 99.9727534893])],
])
def test_exposure_model_ndarray(population, cm, f_dep,
@ -95,7 +100,7 @@ def test_exposure_model_ndarray(population, cm, f_dep,
@pytest.mark.parametrize("population", populations)
def test_exposure_model_ndarray_and_float_mix(population):
cm = KnownConcentrations(lambda t: 0 if np.floor(t) % 2 else np.array([1.2, 1.2]))
cm = KnownConcentrations(None, None, None, lambda t: 0 if np.floor(t) % 2 else np.array([1.2, 1.2]))
model = ExposureModel(cm, population)
expected_exposure = np.array([14.4, 14.4])
@ -109,8 +114,8 @@ def test_exposure_model_ndarray_and_float_mix(population):
@pytest.mark.parametrize("population", populations)
def test_exposure_model_compare_scalar_vector(population):
cm_scalar = KnownConcentrations(lambda t: 1.2)
cm_array = KnownConcentrations(lambda t: np.array([1.2, 1.2]))
cm_scalar = KnownConcentrations(None, None, None,lambda t: 1.2)
cm_array = KnownConcentrations(None, None, None, lambda t: np.array([1.2, 1.2]))
model_scalar = ExposureModel(cm_scalar, population)
model_array = ExposureModel(cm_array, population)
expected_exposure = 14.4

View file

@ -1,22 +0,0 @@
import numpy as np
import pytest
from cara import models
@pytest.mark.parametrize(
"stop_time, expected_boundaries",
[
[1.05, ((0, 1),)],
[1.8, ((0, 1), (1.1, 1.8))],
[2., ((0, 1), (1.1, 1.999))],
[3., ((0, 1), (1.1, 1.999), (2, 3))],
[-1, ()],
[4, ((0, 1), (1.1, 1.999), (2, 3))],
],
)
def test_interval_truncation(stop_time, expected_boundaries):
interesting_times = models.SpecificInterval(
([0, 1], [1.1, 1.999], [2, 3]), )
assert interesting_times.generate_truncated_interval(
stop_time).boundaries() == expected_boundaries