Merge branch 'feature/short_range_bug_fix_and_tests' into 'master'

Adding breathing rate factor in short-range deposited exposure (bug fix), and exposure tests for short-range

See merge request cara/cara!345
This commit is contained in:
Andre Henriques 2022-04-07 09:29:23 +02:00
commit c1589a04fc
4 changed files with 353 additions and 54 deletions

View file

@ -33,7 +33,7 @@ from .user import AuthenticatedUser, AnonymousUser
# calculator version. If the calculator needs to make breaking changes (e.g. change
# form attributes) then it can also increase its MAJOR version without needing to
# increase the overall CARA version (found at ``cara.__version__``).
__version__ = "4.1.0"
__version__ = "4.1.1"
class BaseRequestHandler(RequestHandler):

View file

@ -501,7 +501,7 @@ Virus.types = {
),
'SARS_CoV_2_OMICRON': SARSCoV2(
viral_load_in_sputum=1e9,
infectious_dose=20.,
infectious_dose=50.,
viable_to_RNA_ratio=0.5,
transmissibility_factor=0.2
),
@ -1175,12 +1175,21 @@ class ShortRangeModel:
"""
start_bound, stop_bound = self.presence.boundaries()[0]
jet_origin_integrated = self.expiration.jet_origin_concentration()
jet_origin = self.expiration.jet_origin_concentration()
dilution = self.dilution_factor()
total_normed_concentration = -(concentration_model.integrated_concentration(start_bound, stop_bound)/concentration_model.virus.viral_load_in_sputum/dilution)
total_normed_concentration_interpolated = np.interp(self.expiration.particle.diameter, concentration_model.infected.particle.diameter, total_normed_concentration)
return (jet_origin_integrated/dilution * (stop_bound - start_bound)) + total_normed_concentration_interpolated
total_normed_concentration_diluted = (
concentration_model.integrated_concentration(start_bound,
stop_bound)/dilution/
concentration_model.virus.viral_load_in_sputum
)
total_normed_concentration_interpolated = np.interp(
self.expiration.particle.diameter,
concentration_model.infected.particle.diameter,
total_normed_concentration_diluted
)
return (jet_origin/dilution * (stop_bound - start_bound)
) - total_normed_concentration_interpolated
@dataclass(frozen=True)
@ -1318,14 +1327,20 @@ class ExposureModel:
# in the case of a single diameter or no diameter defined,
# one should not take any mean at this stage.
deposited_exposure += short_range_exposure*fdep
# then we multiply by the diameter-independent quantity virus viral load
deposited_exposure *= self.concentration_model.virus.viral_load_in_sputum
# long-range concentration
f_inf = self.concentration_model.infected.fraction_of_infectious_virus()
deposited_exposure += self.long_range_deposited_exposure_between_bounds(time1, time2)/f_inf
return deposited_exposure * f_inf
# multiply by the (diameter-independent) inhalation rate
deposited_exposure *= interaction.activity.inhalation_rate
# then we multiply by diameter-independent quantities: viral load
# and fraction of infected virions
f_inf = self.concentration_model.infected.fraction_of_infectious_virus()
deposited_exposure *= (f_inf
* self.concentration_model.virus.viral_load_in_sputum
)
# long-range concentration
deposited_exposure += self.long_range_deposited_exposure_between_bounds(time1, time2)
return deposited_exposure
def deposited_exposure(self) -> _VectorisedFloat:
"""

View file

@ -13,13 +13,13 @@ from cara.utils import method_cache
from cara.models import _VectorisedFloat,Interval,SpecificInterval
from cara.monte_carlo.sampleable import LogNormal
from cara.monte_carlo.data import (expiration_distributions,
expiration_BLO_factors,short_range_expiration_distributions,
short_range_distances)
expiration_BLO_factors,short_range_expiration_distributions,
short_range_distances,virus_distributions,activity_distributions)
# TODO: seed better the random number generators
np.random.seed(2000)
SAMPLE_SIZE = 500000
TOLERANCE = 0.05
SAMPLE_SIZE = 1_000_000
TOLERANCE = 0.02
sqrt2pi = np.sqrt(2.*np.pi)
sqrt2 = np.sqrt(2.)
@ -103,14 +103,15 @@ class SimpleConcentrationModel:
return 4*np.pi/3. * (diameter/2.)**3
@method_cache
def Np(self,diameter: float) -> float:
def Np(self,diameter: float,
BLO_factors: typing.Tuple[float, float, float]) -> float:
"""
number of emitted particles per unit volume (BLO model)
in cm^-3.ln(micron)^-1
"""
result = 0.
for cn,mu,sigma,famp in zip(self.cn,self.mu,self.sigma,
self.BLO_factors):
BLO_factors):
result += ( (cn * famp)/sigma *
np.exp(-(np.log(diameter)-mu)**2/(2*sigma**2)))
return result/(diameter*sqrt2pi)
@ -119,7 +120,7 @@ class SimpleConcentrationModel:
"""
emission rate per unit diameter, in RNA copies / h / micron
"""
return (self.viral_load * self.breathing_rate * self.Np(diameter)
return (self.Np(diameter, self.BLO_factors)
* self.aerosol_volume(diameter) * 1e-6)
@method_cache
@ -134,7 +135,9 @@ class SimpleConcentrationModel:
return (self.vR(diameter)/(a_dsquare + removal_rate)
* np.exp(-a_dsquare*deltat))
return quad(integrand,self.diameter_min,self.diameter_max)[0]
return (quad(integrand,self.diameter_min,self.diameter_max,
epsabs=0.,limit=500)[0]
* self.viral_load * self.breathing_rate)
def concentration(self,t: float) -> _VectorisedFloat:
"""
@ -148,14 +151,16 @@ class SimpleConcentrationModel:
# transition_times[i] < t <= transition_times[i]+1
i: int = np.searchsorted(trans_times,t) - 1 # type: ignore
ti = trans_times[i]
Pim1 = self.infected_presence.triggered((trans_times[i-1]+ti)/2.)
Pim1 = (False if i==0 else
self.infected_presence.triggered((trans_times[i-1]+ti)/2.))
Pi = self.infected_presence.triggered((ti+trans_times[i+1])/2.)
result = (0 if not Pim1 else self.f(lambda_rate,t-ti))
result -= (0 if not Pi else self.f(lambda_rate,t-ti))
for k,tk in enumerate(trans_times[:i]):
Pkm1 = self.infected_presence.triggered((trans_times[k-1]+tk)/2.)
Pkm1 = (False if k==0 else
self.infected_presence.triggered((trans_times[k-1]+tk)/2.))
Pk = self.infected_presence.triggered((tk+trans_times[k+1])/2.)
s = np.sum([lambda_rate*(trans_times[l]-trans_times[l-1])
for l in range(k+1,i+1)])
@ -271,7 +276,173 @@ class SimpleShortRangeModel:
- background_concentration) / S
else:
return 0.
@dataclass(frozen=True)
class SimpleExposureModel(SimpleConcentrationModel):
"""
Simple model for the background (long-range) exposure, without
all the flexibility of cara.models.ExposureModel.
For independent, end-to-end testing purposes.
This assumes no mask wearing, identical inhalation and exhalation
breathing rate, indentical presence for the infected and the exposed
people, the same ventilation rate at all times, and all short-range
interaction intervals are within presence intervals of the infected.
"""
#: fraction of infected viruses
finf: _VectorisedFloat = 0.5
#: host immunity factor (0. for not immune)
HI: _VectorisedFloat = 0.
#: infectious dose (ID50)
ID50: _VectorisedFloat = 50.
#: transmissibility factor w.r.t. original strain
# (<1 means more transmissible)
transmissibility: _VectorisedFloat = 1.
#: list of short-range interaction models
sr_models: typing.Tuple[SimpleShortRangeModel, ...] = ()
def fdep(self, diameter: float, evaporation: float) -> float:
"""
fraction deposited
"""
d = diameter * evaporation
IFrac = 1 - 0.5 * (1 - (1 / (1 + (0.00076*(d**2.8)))))
fdep = IFrac * (0.0587
+ (0.911/(1 + np.exp(4.77 + 1.485 * np.log(d))))
+ (0.943/(1 + np.exp(0.508 - 2.58 * np.log(d)))))
return fdep
@method_cache
def F(self, removal_rate: _VectorisedFloat, deltat: float,
evaporation: float) -> _VectorisedFloat:
"""
A general function to compute the main integral over diameters
"""
def integrand(diameter):
# function to return the integrand
a = self.deposition_removal_coefficient()
a_dsquare = a*diameter**2
return -(self.vR(diameter)*self.fdep(diameter,evaporation)/(
(a_dsquare + removal_rate)**2)
* np.exp(-a_dsquare*deltat))
return (quad(integrand,self.diameter_min,self.diameter_max,
epsabs=0.,limit=500)[0]
* self.viral_load * self.breathing_rate)
@method_cache
def f_with_fdep(self, removal_rate: _VectorisedFloat, deltat: float,
evaporation: float) -> _VectorisedFloat:
"""
Same as f but with fdep included in the integral.
"""
def integrand(diameter):
# function to return the integrand
a = self.deposition_removal_coefficient()
a_dsquare = a*diameter**2
return (self.vR(diameter)*self.fdep(diameter,evaporation)
/(a_dsquare + removal_rate) * np.exp(-a_dsquare*deltat))
return (quad(integrand,self.diameter_min,self.diameter_max,
epsabs=0.,limit=500)[0]
* self.viral_load * self.breathing_rate)
@method_cache
def integrated_background_concentration(self,t1: float,t2: float,
evaporation: float) -> _VectorisedFloat:
"""
background (long-range) concentration integrated from t1 to t2
assuming both t1 and t2 are within a single presence interval.
This includes the deposition fraction (fdep).
"""
trans_times = sorted(self.infected_presence.transition_times())
if t2==trans_times[0]:
return 0.
lambda_rate = self.removal_rate()
# transition_times[i] < t2 <= transition_times[i]+1
i: int = np.searchsorted(trans_times,t2) - 1 # type: ignore
ti = trans_times[i]
if np.searchsorted(trans_times,t1,side='right')-1 != i:
raise ValueError("t1={}, t2={}, i1={}, i2={} "
"(i1 and i2 should be equal)".format(
t1,t2,i,np.searchsorted(trans_times,t1,side='right')-1))
Pim1 = (False if i==0 else
self.infected_presence.triggered((trans_times[i-1]+ti)/2.))
Pi = self.infected_presence.triggered((ti+trans_times[i+1])/2.)
def primitive(time):
result = (0 if not Pim1 else self.F(lambda_rate,time-ti,evaporation))
result -= (0 if not Pi else self.F(lambda_rate,time-ti,evaporation))
for k,tk in enumerate(trans_times[:i]):
Pkm1 = (False if k==0 else
self.infected_presence.triggered((trans_times[k-1]+tk)/2.))
Pk = self.infected_presence.triggered((tk+trans_times[k+1])/2.)
s = np.sum([lambda_rate*(trans_times[l]-trans_times[l-1])
for l in range(k+1,i+1)])
result += ( (0 if not Pkm1 else self.F(lambda_rate,time-tk,evaporation))
-(0 if not Pk else self.F(lambda_rate,time-tk,evaporation))
) * np.exp(-s)
return result
return ( ( (0 if not self.infected_presence.triggered((t1+t2)/2.)
else self.f_with_fdep(lambda_rate,0,evaporation)*(t2-t1))
+ (primitive(t2) * np.exp(-lambda_rate*(t2-ti)) -
primitive(t1) * np.exp(-lambda_rate*(t1-ti)) ) )
* self.num_infected/self.room_volume)
@method_cache
def integrated_shortrange_concentration(self) -> _VectorisedFloat:
"""
short-range concentration integrated over interaction times and
diameters. This includes the deposition fraction (fdep).
"""
result = 0.
# evaporation set to 1 (particles do not have time to evaporate)
evaporation = 1.
for sr_model in self.sr_models:
t1,t2 = sr_model.interaction_interval.boundaries()[0]
# function to return the integrand
integrand = lambda d: (self.aerosol_volume(d)
* self.Np(d,sr_model.BLO_factors)*self.fdep(d,evaporation))
res = (quad(integrand,
sr_model.diameter_min,sr_model.diameter_max,
epsabs=0.,limit=500)[0]
* self.viral_load * 1e-6 * (t2-t1) )
result += sr_model.breathing_rate * (
res-self.integrated_background_concentration(t1,t2,evaporation)
)/sr_model.dilution_factor()
return result
def dose(self) -> _VectorisedFloat:
"""
total deposited dose (integrated over time and over particle
diameters), including short and long range.
"""
result = 0.
for t1,t2 in self.infected_presence.boundaries():
result += (self.integrated_background_concentration(t1,t2,self.evaporation)
* self.breathing_rate)
result += self.integrated_shortrange_concentration()
return result * self.finf * (1. - self.HI)
def probability_infection(self):
"""
total probability of infection
"""
return (1. - np.exp(-self.dose() * ln2 * (1-self.HI)
/(self.ID50 * self.transmissibility) )) * 100.
presence = models.SpecificInterval(present_times=((8.5, 12), (13, 17.5)))
interaction_intervals = (models.SpecificInterval(present_times=((10.5, 11.0),)),
@ -280,7 +451,7 @@ interaction_intervals = (models.SpecificInterval(present_times=((10.5, 11.0),)),
@pytest.fixture
def conc_model() -> mc.ConcentrationModel:
def c_model() -> mc.ConcentrationModel:
return mc.ConcentrationModel(
room=models.Room(volume=50, humidity=0.3),
ventilation=models.AirChange(active=models.PeriodicInterval(period=120, duration=120), air_exch=1.),
@ -297,18 +468,6 @@ def conc_model() -> mc.ConcentrationModel:
).build_model(SAMPLE_SIZE)
@pytest.fixture
def simple_conc_model() -> SimpleConcentrationModel:
return SimpleConcentrationModel(
infected_presence = presence,
viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum,
breathing_rate = models.Activity.types['Seated'].exhalation_rate,
room_volume = 50.,
lambda_ventilation= 1.,
BLO_factors = expiration_BLO_factors['Breathing'],
)
@pytest.fixture
def sr_models() -> typing.Tuple[mc.ShortRangeModel, ...]:
return (
@ -327,6 +486,18 @@ def sr_models() -> typing.Tuple[mc.ShortRangeModel, ...]:
)
@pytest.fixture
def simple_c_model() -> SimpleConcentrationModel:
return SimpleConcentrationModel(
infected_presence = presence,
viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum,
breathing_rate = models.Activity.types['Seated'].exhalation_rate,
room_volume = 50.,
lambda_ventilation= 1.,
BLO_factors = expiration_BLO_factors['Breathing'],
)
@pytest.fixture
def simple_sr_models() -> typing.Tuple[SimpleShortRangeModel, ...]:
return (
@ -348,36 +519,149 @@ def simple_sr_models() -> typing.Tuple[SimpleShortRangeModel, ...]:
@pytest.mark.parametrize(
"time", np.linspace(8.5,17.5,12),
)
def test_background_concentration(time,conc_model,simple_conc_model):
def test_background_concentration(time,c_model,simple_c_model):
npt.assert_allclose(
conc_model.concentration(time).mean(),
simple_conc_model.concentration(time), rtol=TOLERANCE
c_model.concentration(time).mean(),
simple_c_model.concentration(time), rtol=TOLERANCE
)
@pytest.mark.parametrize(
"time", [10, 10.7, 11., 12.5, 14.75, 14.9, 17]
)
def test_shortrange_concentration(time,conc_model,simple_conc_model,
def test_shortrange_concentration(time,c_model,simple_c_model,
sr_models,simple_sr_models):
result_sr_model = np.sum([np.array(
sr_mod.short_range_concentration(conc_model,time)).mean()
sr_mod.short_range_concentration(c_model,time)).mean()
for sr_mod in sr_models])
result_simple_sr_model = np.sum([np.array(
sr_mod.concentration(simple_conc_model,time)).mean()
sr_mod.concentration(simple_c_model,time)).mean()
for sr_mod in simple_sr_models])
npt.assert_allclose(
result_sr_model,result_simple_sr_model,rtol=TOLERANCE
)
#times = np.linspace(8.5,17.5,120)
#plt.plot(times,[conc_model.concentration(t).mean() for t in times])
#plt.plot(times,[simple_conc_model.concentration(t) for t in times])
def test_background_exposure(c_model):
simple_expo_model = SimpleExposureModel(
infected_presence = presence,
viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum,
breathing_rate = models.Activity.types['Seated'].exhalation_rate,
room_volume = 50.,
lambda_ventilation= 1.,
BLO_factors = expiration_BLO_factors['Breathing'],
finf = models.Virus.types['SARS_CoV_2_DELTA'].viable_to_RNA_ratio,
HI = 0.,
ID50 = models.Virus.types['SARS_CoV_2_DELTA'].infectious_dose,
transmissibility = models.Virus.types['SARS_CoV_2_DELTA'].transmissibility_factor,
sr_models = (),
)
expo_model = mc.ExposureModel(
concentration_model=c_model,
short_range=(),
exposed=mc.Population(
number=1,
presence=presence,
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Seated'],
host_immunity=0.,
),
).build_model(SAMPLE_SIZE)
npt.assert_allclose(
expo_model.deposited_exposure().mean(),
simple_expo_model.dose().mean(), rtol=TOLERANCE
)
npt.assert_allclose(
expo_model.infection_probability().mean(),
simple_expo_model.probability_infection().mean(), rtol=TOLERANCE
)
def test_background_exposure_with_distributions():
simple_expo_model = SimpleExposureModel(
infected_presence = presence,
viral_load = virus_distributions['SARS_CoV_2_DELTA'
].build_model(SAMPLE_SIZE).viral_load_in_sputum,
breathing_rate = activity_distributions['Seated'].build_model(
SAMPLE_SIZE).exhalation_rate,
room_volume = 50.,
lambda_ventilation= 1.,
BLO_factors = expiration_BLO_factors['Breathing'],
finf = virus_distributions['SARS_CoV_2_DELTA'
].build_model(SAMPLE_SIZE).viable_to_RNA_ratio,
HI = 0.,
ID50 = virus_distributions['SARS_CoV_2_DELTA'
].build_model(SAMPLE_SIZE).infectious_dose,
transmissibility = virus_distributions['SARS_CoV_2_DELTA'
].transmissibility_factor,
sr_models = (),
)
expo_model = mc.ExposureModel(
concentration_model=mc.ConcentrationModel(
room=models.Room(volume=50, humidity=0.3),
ventilation=models.AirChange(active=models.PeriodicInterval(
period=120, duration=120), air_exch=1.),
infected=mc.InfectedPopulation(
number=1,
presence=presence,
virus=virus_distributions['SARS_CoV_2_DELTA'],
mask=models.Mask.types['No mask'],
activity=activity_distributions['Seated'],
expiration=expiration_distributions['Breathing'],
host_immunity=0.,
),
evaporation_factor=0.3,
),
short_range=(),
exposed=mc.Population(
number=1,
presence=presence,
mask=models.Mask.types['No mask'],
activity=activity_distributions['Seated'],
host_immunity=0.,
),
).build_model(SAMPLE_SIZE)
npt.assert_allclose(
expo_model.deposited_exposure().mean(),
simple_expo_model.dose().mean(), rtol=TOLERANCE
)
npt.assert_allclose(
expo_model.infection_probability().mean(),
simple_expo_model.probability_infection().mean(), rtol=TOLERANCE
)
def test_exposure_with_shortrange(c_model,sr_models,simple_sr_models):
simple_expo_sr_model = SimpleExposureModel(
infected_presence = presence,
viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum,
breathing_rate = models.Activity.types['Seated'].exhalation_rate,
room_volume = 50.,
lambda_ventilation= 1.,
BLO_factors = expiration_BLO_factors['Breathing'],
finf = models.Virus.types['SARS_CoV_2_DELTA'].viable_to_RNA_ratio,
HI = 0.,
ID50 = models.Virus.types['SARS_CoV_2_DELTA'].infectious_dose,
transmissibility = models.Virus.types['SARS_CoV_2_DELTA'].transmissibility_factor,
sr_models = simple_sr_models,
)
expo_sr_model = mc.ExposureModel(
concentration_model=c_model,
short_range=sr_models,
exposed=mc.Population(
number=1,
presence=presence,
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Seated'],
host_immunity=0.,
),
).build_model(SAMPLE_SIZE)
npt.assert_allclose(
expo_sr_model.deposited_exposure().mean(),
simple_expo_sr_model.dose().mean(), rtol=TOLERANCE
)
npt.assert_allclose(
expo_sr_model.infection_probability().mean(),
simple_expo_sr_model.probability_infection().mean(), rtol=TOLERANCE
)
#plt.plot(times,[np.sum([np.array(sr_mod.short_range_concentration(conc_model,t)).mean()
# for sr_mod in sr_models])
# + conc_model.concentration(t).mean() for t in times])
#plt.plot(times,[np.sum([np.array(sr_mod.concentration(simple_conc_model,t)).mean()
# for sr_mod in simple_sr_models])
# + simple_conc_model.concentration(t)
# for t in times])

View file

@ -9,7 +9,7 @@ from cara.apps.calculator.model_generator import build_expiration
# TODO: seed better the random number generators
np.random.seed(2000)
SAMPLE_SIZE = 250_000
SAMPLE_SIZE = 600_000
TOLERANCE = 0.06
# Load the weather data (temperature in kelvin) for Toronto.