diff --git a/cara/apps/calculator/report_generator.py b/cara/apps/calculator/report_generator.py index 3a8d3197..b4db8d28 100644 --- a/cara/apps/calculator/report_generator.py +++ b/cara/apps/calculator/report_generator.py @@ -10,6 +10,7 @@ import zlib import loky import jinja2 import matplotlib +from numpy.lib.function_base import quantile matplotlib.use('agg') import matplotlib.pyplot as plt import numpy as np @@ -131,8 +132,8 @@ def plot(times, concentrations, model: models.ExposureModel): ) #See CERN-OPEN-2021-004, p. 15, eq. 16. - Cumulative Dose - qds = [np.mean(dataclass_utils.nested_replace(model, {'exposed.presence': model.exposed.presence.generate_truncated_interval(t)}).cumulated_exposure()) for t in times] - + qds = [np.mean(model.cumulated_exposure_vs_time(t)) for t in times] + ax1 = ax.twinx() ax1.plot(datetimes, qds, label='Mean cumulative dose', color='#1f77b4', linestyle='dotted') ax1.spines["right"].set_linestyle("--") @@ -248,7 +249,7 @@ def comparison_plot(scenarios: typing.Dict[str, dict], sample_times: np.ndarray) concentrations = statistics['concentrations'] #See CERN-OPEN-2021-004, p. 15, eq. 16. - Cumulative Dose - qds = [np.mean(dataclass_utils.nested_replace(model, {'exposed.presence': model.exposed.presence.generate_truncated_interval(t)}).cumulated_exposure()) for t in sample_times] + qds = [np.mean(model.cumulated_exposure_vs_time(t)) for t in sample_times] # Plot concentrations and cumulative dose if name in dash_styled_scenarios: diff --git a/cara/models.py b/cara/models.py index 27bd965a..924d91cc 100644 --- a/cara/models.py +++ b/cara/models.py @@ -100,20 +100,6 @@ class Interval: return True return False - def generate_truncated_interval(self, time_stop: float) -> "Interval": - truncated_boundaries = [] - for start, end in self.boundaries(): - if start < time_stop <= end: - end = time_stop - truncated_boundaries.append((start, end)) - break - elif time_stop <= start: - break - else: - truncated_boundaries.append((start, end)) - - return SpecificInterval(present_times = tuple(truncated_boundaries)) - @dataclass(frozen=True) class SpecificInterval(Interval): #: A sequence of times (start, stop), in hours, that the infected person @@ -881,17 +867,32 @@ class ExposureModel: #: The fraction of viruses actually deposited in the respiratory tract fraction_deposited: _VectorisedFloat = 0.6 - def quanta_exposure(self) -> _VectorisedFloat: - """The number of virus quanta per meter^3.""" + def quanta_exposure_vs_time(self, time: float) -> _VectorisedFloat: + """The number of virus quanta per meter^3 integrated until time.""" exposure = 0.0 for start, stop in self.exposed.presence.boundaries(): - exposure += self.concentration_model.integrated_concentration(start, stop) - + if start > time: + break + elif time <= stop: + stop = time + exposure += self.concentration_model.integrated_concentration(start, stop) + break + else: + exposure += self.concentration_model.integrated_concentration(start, stop) + return exposure * self.repeats - def cumulated_exposure(self) -> _VectorisedFloat: - exposure = self.quanta_exposure() + def quanta_exposure(self) -> _VectorisedFloat: + """The number of virus quanta per meter^3 for the full simulation time.""" + if self.exposed.presence.transition_times(): + return self.quanta_exposure_vs_time(max(self.exposed.presence.transition_times())) + else: + return 0 + + def cumulated_exposure_vs_time(self, time: float) -> _VectorisedFloat: + + exposure = self.quanta_exposure_vs_time(time) return ( self.exposed.activity.inhalation_rate * @@ -899,6 +900,12 @@ class ExposureModel: exposure * self.fraction_deposited ) + def cumulated_exposure(self) -> _VectorisedFloat: + if self.exposed.presence.transition_times(): + return self.cumulated_exposure_vs_time(max(self.exposed.presence.transition_times())) + else: + return 0 + def infection_probability(self) -> _VectorisedFloat: inf_aero = self.cumulated_exposure() diff --git a/cara/tests/models/test_exposure_model.py b/cara/tests/models/test_exposure_model.py index 298a9c93..e3e35e87 100644 --- a/cara/tests/models/test_exposure_model.py +++ b/cara/tests/models/test_exposure_model.py @@ -4,26 +4,31 @@ import typing import numpy as np import numpy.testing import pytest +from dataclasses import dataclass from cara import models from cara.models import ExposureModel +@dataclass(frozen=True) class KnownConcentrations(models.ConcentrationModel): """ A ConcentrationModel which is based on pre-known quanta concentrations and which therefore doesn't need other components. Useful for testing. """ - def __init__(self, concentration_function: typing.Callable) -> None: - self._func = concentration_function + #def __init__(self, concentration_function: typing.Callable) -> None: + # self._func = concentration_function + + + concentration_function: typing.Callable def infectious_virus_removal_rate(self, time: float) -> models._VectorisedFloat: # very large decay constant -> same as constant concentration return 1.e50 def _concentration_limit(self, time: float) -> models._VectorisedFloat: - return self._func(time) + return self.concentration_function(time) def state_change_times(self): return [0, 24] @@ -32,7 +37,7 @@ class KnownConcentrations(models.ConcentrationModel): return 24 def concentration(self, time: float) -> models._VectorisedFloat: # noqa - return self._func(time) + return self.concentration_function(time) halftime = models.PeriodicInterval(120, 60) @@ -57,19 +62,19 @@ populations = [ @pytest.mark.parametrize( "population, cm, f_dep, expected_exposure, expected_cumulated_exposure, expected_probability",[ - [populations[1], KnownConcentrations(lambda t: 1.2), 1., + [populations[1], KnownConcentrations(None, None, None, lambda t: 1.2), 1., np.array([14.4, 14.4]), np.array([3.44736/0.6, 3.20112/0.6]), np.array([99.6803184113, 99.5181053773])], - [populations[2], KnownConcentrations(lambda t: 1.2), 1., + [populations[2], KnownConcentrations(None, None, None, lambda t: 1.2), 1., np.array([14.4, 14.4]), np.array([2.2032/0.6, 2.4624/0.6]), np.array([97.4574432074, 98.3493482895])], - [populations[0], KnownConcentrations(lambda t: np.array([1.2, 2.4])), 1., + [populations[0], KnownConcentrations(None, None, None,lambda t: np.array([1.2, 2.4])), 1., np.array([14.4, 28.8]), np.array([2.4624/0.6, 4.9248/0.6]), np.array([98.3493482895, 99.9727534893])], - [populations[1], KnownConcentrations(lambda t: np.array([1.2, 2.4])), 1., + [populations[1], KnownConcentrations(None, None, None,lambda t: np.array([1.2, 2.4])), 1., np.array([14.4, 28.8]), np.array([3.44736/0.6, 6.40224/0.6]), np.array([99.6803184113, 99.9976777757])], - [populations[0], KnownConcentrations(lambda t: 2.4), np.array([0.5, 1.]), + [populations[0], KnownConcentrations(None, None, None,lambda t: 2.4), np.array([0.5, 1.]), 28.8, np.array([4.104, 8.208]), np.array([98.3493482895, 99.9727534893])], ]) def test_exposure_model_ndarray(population, cm, f_dep, @@ -95,7 +100,7 @@ def test_exposure_model_ndarray(population, cm, f_dep, @pytest.mark.parametrize("population", populations) def test_exposure_model_ndarray_and_float_mix(population): - cm = KnownConcentrations(lambda t: 0 if np.floor(t) % 2 else np.array([1.2, 1.2])) + cm = KnownConcentrations(None, None, None, lambda t: 0 if np.floor(t) % 2 else np.array([1.2, 1.2])) model = ExposureModel(cm, population) expected_exposure = np.array([14.4, 14.4]) @@ -109,8 +114,8 @@ def test_exposure_model_ndarray_and_float_mix(population): @pytest.mark.parametrize("population", populations) def test_exposure_model_compare_scalar_vector(population): - cm_scalar = KnownConcentrations(lambda t: 1.2) - cm_array = KnownConcentrations(lambda t: np.array([1.2, 1.2])) + cm_scalar = KnownConcentrations(None, None, None,lambda t: 1.2) + cm_array = KnownConcentrations(None, None, None, lambda t: np.array([1.2, 1.2])) model_scalar = ExposureModel(cm_scalar, population) model_array = ExposureModel(cm_array, population) expected_exposure = 14.4 diff --git a/cara/tests/models/test_interval.py b/cara/tests/models/test_interval.py deleted file mode 100644 index 5aa73603..00000000 --- a/cara/tests/models/test_interval.py +++ /dev/null @@ -1,22 +0,0 @@ -import numpy as np -import pytest - -from cara import models - - -@pytest.mark.parametrize( - "stop_time, expected_boundaries", - [ - [1.05, ((0, 1),)], - [1.8, ((0, 1), (1.1, 1.8))], - [2., ((0, 1), (1.1, 1.999))], - [3., ((0, 1), (1.1, 1.999), (2, 3))], - [-1, ()], - [4, ((0, 1), (1.1, 1.999), (2, 3))], - ], -) -def test_interval_truncation(stop_time, expected_boundaries): - interesting_times = models.SpecificInterval( - ([0, 1], [1.1, 1.999], [2, 3]), ) - assert interesting_times.generate_truncated_interval( - stop_time).boundaries() == expected_boundaries