Merge branch 'develop/time_dependence_concentration_model' into 'master'

Avoid using time-dependence of parameters in concentration computation

See merge request cara/cara!172
This commit is contained in:
Philip James Elson 2021-05-06 17:59:56 +00:00
commit 13cf8bafb3
2 changed files with 96 additions and 16 deletions

View file

@ -643,7 +643,21 @@ class ConcentrationModel:
# Deposition rate (h^-1)
k = (vg * 3600) / h
return k + self.virus.decay_constant + self.ventilation.air_exchange(self.room, time)
return k + self.virus.decay_constant + self.ventilation.air_exchange(
self.room, time
)
@cached()
def _concentration_limit(self, time: float) -> _VectorisedFloat:
"""
Provides a constant that represents the theoretical asymptotic
value reached by the concentration when time goes to infinity,
if all parameters were to stay time-independent.
"""
V = self.room.volume
IVRR = self.infectious_virus_removal_rate(time)
return (self.infected.emission_rate(time)) / (IVRR * V)
@cached()
def state_change_times(self):
@ -668,22 +682,42 @@ class ConcentrationModel:
return change_time
return 0
def _next_state_change(self, time: float):
"""
Find the nearest future state change.
"""
for change_time in self.state_change_times():
if change_time >= time:
return change_time
raise ValueError(
f"The requested time ({time}) is greater than last available "
f"state change time ({change_time})"
)
@cached()
def concentration(self, time: float) -> _VectorisedFloat:
# Note that time is not vectorised. You can only pass a single float
# to this method.
"""
Virus quanta concentration, as a function of time.
The formulas used here assume that all parameters (ventilation,
emission rate) are constant between two state changes - only
the value of these parameters at the next state change, are used.
Note that time is not vectorised. You can only pass a single float
to this method.
"""
if time == 0:
return 0.0
IVRR = self.infectious_virus_removal_rate(time)
V = self.room.volume
next_state_change_time = self._next_state_change(time)
IVRR = self.infectious_virus_removal_rate(next_state_change_time)
concentration_limit = self._concentration_limit(next_state_change_time)
t_last_state_change = self.last_state_change(time)
concentration_at_last_state_change = self.concentration(t_last_state_change)
delta_time = time - t_last_state_change
fac = np.exp(-IVRR * delta_time)
concentration_limit = (self.infected.emission_rate(time)) / (IVRR * V)
return concentration_limit * (1 - fac) + concentration_at_last_state_change * fac

View file

@ -1,7 +1,9 @@
import re
import numpy as np
import pytest
import cara.models
from cara import models
@pytest.mark.parametrize(
@ -27,28 +29,28 @@ def test_concentration_model_vectorisation(override_params):
}
defaults.update(override_params)
always = cara.models.PeriodicInterval(240, 240) # TODO: This should be a thing on an interval.
c_model = cara.models.ConcentrationModel(
cara.models.Room(defaults['volume']),
cara.models.AirChange(always, defaults['air_change']),
cara.models.InfectedPopulation(
always = models.PeriodicInterval(240, 240) # TODO: This should be a thing on an interval.
c_model = models.ConcentrationModel(
models.Room(defaults['volume']),
models.AirChange(always, defaults['air_change']),
models.InfectedPopulation(
number=1,
presence=always,
mask=cara.models.Mask(
mask=models.Mask(
η_exhale=defaults['η_exhale'],
η_leaks=defaults['η_leaks'],
η_inhale=0.3,
),
activity=cara.models.Activity(
activity=models.Activity(
0.51,
0.75,
),
virus=cara.models.Virus(
virus=models.Virus(
halflife=defaults['virus_halflife'],
viral_load_in_sputum=defaults['viral_load_in_sputum'],
coefficient_of_infectivity=defaults['coefficient_of_infectivity'],
),
expiration=cara.models.Expiration(
expiration=models.Expiration(
ejection_factor=(0.084, 0.009, 0.003, 0.002),
),
)
@ -56,3 +58,47 @@ def test_concentration_model_vectorisation(override_params):
concentrations = c_model.concentration(10)
assert isinstance(concentrations, np.ndarray)
assert concentrations.shape == (2, )
@pytest.fixture
def simple_conc_model():
interesting_times = models.SpecificInterval(([0, 1], [1.1, 1.999], [2, 3]), )
return models.ConcentrationModel(
models.Room(75),
models.AirChange(interesting_times, 100),
models.InfectedPopulation(
number=1,
presence=interesting_times,
mask=models.Mask.types['Type I'],
activity=models.Activity.types['Seated'],
virus=models.Virus.types['SARS_CoV_2'],
expiration=models.Expiration.types['Breathing'],
)
)
@pytest.mark.parametrize(
"time, expected_next_state_change", [
[0, 0],
[1, 1],
[1.1, 1.1],
[1.11, 1.999],
[2, 2],
[2.1, 3],
[3, 3],
]
)
def test_next_state_change_time(
simple_conc_model: models.ConcentrationModel,
time,
expected_next_state_change,
):
assert simple_conc_model._next_state_change(time) == expected_next_state_change
def test_next_state_change_time_out_of_range(simple_conc_model: models.ConcentrationModel):
with pytest.raises(
ValueError,
match=re.escape("The requested time (3.1) is greater than last available state change time (3)")
):
simple_conc_model._next_state_change(3.1)