From feedcc0df194039d16fcea9cd79dac5ed32e9782 Mon Sep 17 00:00:00 2001 From: Nicolas Mounet Date: Mon, 10 May 2021 17:13:09 +0000 Subject: [PATCH] Add ConcentrationModels.integrated_concentration and provide an analytic solution to the integral. --- cara/models.py | 41 ++++++++++++-- cara/tests/models/test_concentration_model.py | 28 ++++++++++ cara/tests/models/test_exposure_model.py | 54 ++++++++++++++++++- cara/tests/test_known_quantities.py | 16 ++++-- 4 files changed, 128 insertions(+), 11 deletions(-) diff --git a/cara/models.py b/cara/models.py index 9195ef76..e5048855 100644 --- a/cara/models.py +++ b/cara/models.py @@ -695,6 +695,14 @@ class ConcentrationModel: f"state change time ({change_time})" ) + def _is_interval_between_state_changes(self, start: float, stop: float) -> bool: + """ + Check that the times start and stop are in-between two state + changes of the concentration model (to ensure sure that all + model parameters stay constant between start and stop). + """ + return (self.last_state_change(stop) <= start) + @cached() def concentration(self, time: float) -> _VectorisedFloat: """ @@ -720,6 +728,32 @@ class ConcentrationModel: fac = np.exp(-IVRR * delta_time) return concentration_limit * (1 - fac) + concentration_at_last_state_change * fac + def integrated_concentration(self, start: float, stop: float) -> _VectorisedFloat: + """ + Get the integrated concentration dose between the times start and stop. + """ + state_change_times = self.state_change_times() + req_start, req_stop = start, stop + total_concentration = 0. + for interval_start, interval_stop in zip(state_change_times[:-1], state_change_times[1:]): + if start > interval_stop or stop < interval_start: + continue + # Clip the current interval to the requested range. + start = max([interval_start, req_start]) + stop = min([interval_stop, req_stop]) + + conc_start = self.concentration(start) + + next_conc_state = self._next_state_change(stop) + conc_limit = self._concentration_limit(next_conc_state) + IVRR = self.infectious_virus_removal_rate(next_conc_state) + delta_time = stop - start + total_concentration += ( + conc_limit * delta_time + + (conc_limit - conc_start) * (np.exp(-IVRR*delta_time)-1) / IVRR + ) + return total_concentration + @dataclass(frozen=True) class ExposureModel: @@ -736,12 +770,9 @@ class ExposureModel: """The number of virus quanta per meter^3.""" exposure = 0.0 - def integrate(fn, start, stop): - values = np.linspace(start, stop) - return np.trapz([fn(v) for v in values], values, axis=0) - for start, stop in self.exposed.presence.boundaries(): - exposure += integrate(self.concentration_model.concentration, start, stop) + exposure += self.concentration_model.integrated_concentration(start, stop) + return exposure * self.repeats def infection_probability(self) -> _VectorisedFloat: diff --git a/cara/tests/models/test_concentration_model.py b/cara/tests/models/test_concentration_model.py index 5f42a6d0..52fb0546 100644 --- a/cara/tests/models/test_concentration_model.py +++ b/cara/tests/models/test_concentration_model.py @@ -81,8 +81,10 @@ def simple_conc_model(): "time, expected_next_state_change", [ [0, 0], [1, 1], + [1.05, 1.1], [1.1, 1.1], [1.11, 1.999], + [1.9991, 2], [2, 2], [2.1, 3], [3, 3], @@ -102,3 +104,29 @@ def test_next_state_change_time_out_of_range(simple_conc_model: models.Concentra match=re.escape("The requested time (3.1) is greater than last available state change time (3)") ): simple_conc_model._next_state_change(3.1) + + +@pytest.mark.parametrize( + "start, stop, is_valid", [ + [0, 1.05, False], + [0.99, 1.1, False], + [0.5, 1.01, False], + [0, 1, True], + [1.01, 1.1, True], + [0.01, 1, True], + [1.11, 1.99, True], + ] +) +def test_valid_interval( + start, stop, is_valid, + simple_conc_model: models.ConcentrationModel +): + assert simple_conc_model._is_interval_between_state_changes(start, stop) == is_valid + + +def test_integrated_concentration(simple_conc_model): + c1 = simple_conc_model.integrated_concentration(0, 2) + c2 = simple_conc_model.integrated_concentration(0, 1) + c3 = simple_conc_model.integrated_concentration(1, 2) + assert c1 != 0 + assert c1 == c2 + c3 diff --git a/cara/tests/models/test_exposure_model.py b/cara/tests/models/test_exposure_model.py index 15f51303..34ed03af 100644 --- a/cara/tests/models/test_exposure_model.py +++ b/cara/tests/models/test_exposure_model.py @@ -17,6 +17,19 @@ class KnownConcentrations(models.ConcentrationModel): def __init__(self, concentration_function: typing.Callable) -> None: self._func = concentration_function + def infectious_virus_removal_rate(self, time: float) -> models._VectorisedFloat: + # very large decay constant -> same as constant concentration + return 1.e50 + + def _concentration_limit(self, time: float) -> models._VectorisedFloat: + return self._func(time) + + def state_change_times(self): + return [0, 24] + + def _next_state_change(self, time: float): + return 24 + def concentration(self, time: float) -> models._VectorisedFloat: # noqa return self._func(time) @@ -56,7 +69,7 @@ def test_exposure_model_ndarray(population, cm, expected_exposure): @pytest.mark.parametrize("population", populations) def test_exposure_model_ndarray_and_float_mix(population): - cm = KnownConcentrations(lambda t: 1.2 if np.floor(t) % 2 else np.array([1.2, 1.2])) + cm = KnownConcentrations(lambda t: 0 if np.floor(t) % 2 else np.array([1.2, 1.2])) model = ExposureModel(cm, population) expected_exposure = np.array([14.4, 14.4]) @@ -81,3 +94,42 @@ def test_exposure_model_compare_scalar_vector(population): np.testing.assert_almost_equal( model_array.quanta_exposure(), np.array([expected_exposure]*2) ) + + +@pytest.fixture +def conc_model(): + interesting_times = models.SpecificInterval(([0, 1], [1.01, 1.02], [12, 24])) + always = models.SpecificInterval(((0, 24),)) + return models.ConcentrationModel( + models.Room(25), + models.AirChange(always, 5), + models.InfectedPopulation( + number=1, + presence=interesting_times, + mask=models.Mask.types['Type I'], + activity=models.Activity.types['Seated'], + virus=models.Virus.types['SARS_CoV_2'], + expiration=models.Expiration.types['Breathing'], + ) + ) + +# expected quanta were computed with a trapezoidal integration, using +# a mesh of 10'000 pts per exposed presence interval. +@pytest.mark.parametrize("exposed_time_interval, expected_quanta", [ + [(0, 1), 0.0055680845], + [(1, 1.01), 6.4960491e-05], + [(1.01, 1.02), 6.3187723e-05], + [(12, 12.01), 1.9307359e-06], + [(12, 24), 0.079347465], + [(0, 24), 0.086122050], + ] +) +def test_exposure_model_integral_accuracy(exposed_time_interval, + expected_quanta, conc_model): + presence_interval = models.SpecificInterval((exposed_time_interval,)) + population = models.Population( + 10, presence_interval, models.Mask.types['Type I'], + models.Activity.types['Standing'], + ) + model = ExposureModel(conc_model, population) + np.testing.assert_allclose(model.quanta_exposure(), expected_quanta) diff --git a/cara/tests/test_known_quantities.py b/cara/tests/test_known_quantities.py index cc3a6249..ea8a1ad0 100644 --- a/cara/tests/test_known_quantities.py +++ b/cara/tests/test_known_quantities.py @@ -96,8 +96,10 @@ def test_concentrations_startup(baseline_model): def test_r0(baseline_exposure_model): + # expected r0 was computed with a trapezoidal integration, using + # a mesh of 100'000 pts per exposed presence interval. p = baseline_exposure_model.infection_probability() - npt.assert_allclose(p, 88.977694) + npt.assert_allclose(p, 89.001748) def test_periodic_window(baseline_periodic_window, baseline_room): @@ -368,11 +370,13 @@ def build_exposure_model(concentration_model): ) +# expected r0 were computed with a trapezoidal integration, using +# a mesh of 100'000 pts per exposed presence interval. @pytest.mark.parametrize( "month, expected_r0", [ - ['Jan', 86.220749], - ['Jun', 99.972046], + ['Jan', 86.258533], + ['Jun', 99.972103], ], ) def test_r0_hourly_dep(month,expected_r0): @@ -386,11 +390,13 @@ def test_r0_hourly_dep(month,expected_r0): p = m.infection_probability() npt.assert_allclose(p, expected_r0) +# expected r0 were computed with a trapezoidal integration, using +# a mesh of 100'000 pts per exposed presence interval. @pytest.mark.parametrize( "month, expected_r0", [ - ['Jan', 86.385018], - ['Jun', 99.982281], + ['Jan', 86.422736], + ['Jun', 99.982323], ], ) def test_r0_hourly_dep_refined(month,expected_r0):