Adding full algorithm tests on exposure (simple, with virus distributions, and with short-range)

This commit is contained in:
Nicolas Mounet 2022-04-05 18:30:14 +02:00
parent f6bd576134
commit 99e57da469

View file

@ -13,8 +13,8 @@ from cara.utils import method_cache
from cara.models import _VectorisedFloat,Interval,SpecificInterval
from cara.monte_carlo.sampleable import LogNormal
from cara.monte_carlo.data import (expiration_distributions,
expiration_BLO_factors,short_range_expiration_distributions,
short_range_distances)
expiration_BLO_factors,short_range_expiration_distributions,
short_range_distances,virus_distributions,activity_distributions)
# TODO: seed better the random number generators
np.random.seed(2000)
@ -103,14 +103,15 @@ class SimpleConcentrationModel:
return 4*np.pi/3. * (diameter/2.)**3
@method_cache
def Np(self,diameter: float) -> float:
def Np(self,diameter: float,
BLO_factors: typing.Tuple[float, float, float]) -> float:
"""
number of emitted particles per unit volume (BLO model)
in cm^-3.ln(micron)^-1
"""
result = 0.
for cn,mu,sigma,famp in zip(self.cn,self.mu,self.sigma,
self.BLO_factors):
BLO_factors):
result += ( (cn * famp)/sigma *
np.exp(-(np.log(diameter)-mu)**2/(2*sigma**2)))
return result/(diameter*sqrt2pi)
@ -119,7 +120,7 @@ class SimpleConcentrationModel:
"""
emission rate per unit diameter, in RNA copies / h / micron
"""
return (self.viral_load * self.breathing_rate * self.Np(diameter)
return (self.Np(diameter, self.BLO_factors)
* self.aerosol_volume(diameter) * 1e-6)
@method_cache
@ -134,7 +135,8 @@ class SimpleConcentrationModel:
return (self.vR(diameter)/(a_dsquare + removal_rate)
* np.exp(-a_dsquare*deltat))
return quad(integrand,self.diameter_min,self.diameter_max)[0]
return (quad(integrand,self.diameter_min,self.diameter_max)[0]
* self.viral_load * self.breathing_rate)
def concentration(self,t: float) -> _VectorisedFloat:
"""
@ -148,14 +150,16 @@ class SimpleConcentrationModel:
# transition_times[i] < t <= transition_times[i]+1
i: int = np.searchsorted(trans_times,t) - 1 # type: ignore
ti = trans_times[i]
Pim1 = self.infected_presence.triggered((trans_times[i-1]+ti)/2.)
Pim1 = (False if i==0 else
self.infected_presence.triggered((trans_times[i-1]+ti)/2.))
Pi = self.infected_presence.triggered((ti+trans_times[i+1])/2.)
result = (0 if not Pim1 else self.f(lambda_rate,t-ti))
result -= (0 if not Pi else self.f(lambda_rate,t-ti))
for k,tk in enumerate(trans_times[:i]):
Pkm1 = self.infected_presence.triggered((trans_times[k-1]+tk)/2.)
Pkm1 = (False if k==0 else
self.infected_presence.triggered((trans_times[k-1]+tk)/2.))
Pk = self.infected_presence.triggered((tk+trans_times[k+1])/2.)
s = np.sum([lambda_rate*(trans_times[l]-trans_times[l-1])
for l in range(k+1,i+1)])
@ -271,7 +275,163 @@ class SimpleShortRangeModel:
- background_concentration) / S
else:
return 0.
@dataclass(frozen=True)
class SimpleExposureModel(SimpleConcentrationModel):
"""
Simple model for the background (long-range) exposure, without
all the flexibility of cara.models.ExposureModel.
For independent, end-to-end testing purposes.
This assumes no mask wearing, identical inhalation and exhalation
breathing rate, indentical presence for the infected and the exposed
people, the same ventilation rate at all times, and all short-range
interaction intervals are within presence intervals of the infected.
"""
#: fraction of infected viruses
finf: _VectorisedFloat = 0.5
#: host immunity factor (0. for not immune)
HI: _VectorisedFloat = 0.
#: infectious dose (ID50)
ID50: _VectorisedFloat = 50.
#: transmissibility factor w.r.t. original strain
# (<1 means more transmissible)
transmissibility: _VectorisedFloat = 1.
#: list of short-range interaction models
sr_models: typing.Tuple[SimpleShortRangeModel, ...] = ()
def fdep(self, diameter: float, evaporation: float) -> float:
"""
fraction deposited
"""
d = diameter * evaporation
IFrac = 1 - 0.5 * (1 - (1 / (1 + (0.00076*(d**2.8)))))
fdep = IFrac * (0.0587
+ (0.911/(1 + np.exp(4.77 + 1.485 * np.log(d))))
+ (0.943/(1 + np.exp(0.508 - 2.58 * np.log(d)))))
return fdep
@method_cache
def F(self, removal_rate: _VectorisedFloat, deltat: float) -> _VectorisedFloat:
"""
A general function to compute the main integral over diameters
"""
def integrand(diameter):
# function to return the integrand
a = self.deposition_removal_coefficient()
a_dsquare = a*diameter**2
return -(self.vR(diameter)*self.fdep(diameter,self.evaporation)/(
(a_dsquare + removal_rate)**2)
* np.exp(-a_dsquare*deltat))
return (quad(integrand,self.diameter_min,self.diameter_max)[0]
* self.viral_load * self.breathing_rate)
@method_cache
def f_with_fdep(self, removal_rate: _VectorisedFloat, deltat: float) -> _VectorisedFloat:
"""
Same as f but with fdep included in the integral.
"""
def integrand(diameter):
# function to return the integrand
a = self.deposition_removal_coefficient()
a_dsquare = a*diameter**2
return (self.vR(diameter)*self.fdep(diameter,self.evaporation)
/(a_dsquare + removal_rate) * np.exp(-a_dsquare*deltat))
return (quad(integrand,self.diameter_min,self.diameter_max)[0]
* self.viral_load * self.breathing_rate)
@method_cache
def integrated_background_concentration(self,t1: float,t2: float) -> _VectorisedFloat:
"""
background (long-range) concentration integrated from t1 to t2
assuming both t1 and t2 are within a single presence interval.
This includes the deposition fraction (fdep).
"""
trans_times = sorted(self.infected_presence.transition_times())
if t2==trans_times[0]:
return 0.
lambda_rate = self.removal_rate()
# transition_times[i] < t2 <= transition_times[i]+1
i: int = np.searchsorted(trans_times,t2) - 1 # type: ignore
ti = trans_times[i]
if np.searchsorted(trans_times,t1,side='right')-1 != i:
raise ValueError("t1={}, t2={}, i1={}, i2={} "
"(i1 and i2 should be equal)".format(
t1,t2,i,np.searchsorted(trans_times,t1,side='right')-1))
Pim1 = (False if i==0 else
self.infected_presence.triggered((trans_times[i-1]+ti)/2.))
Pi = self.infected_presence.triggered((ti+trans_times[i+1])/2.)
def primitive(time):
result = (0 if not Pim1 else self.F(lambda_rate,time-ti))
result -= (0 if not Pi else self.F(lambda_rate,time-ti))
for k,tk in enumerate(trans_times[:i]):
Pkm1 = (False if k==0 else
self.infected_presence.triggered((trans_times[k-1]+tk)/2.))
Pk = self.infected_presence.triggered((tk+trans_times[k+1])/2.)
s = np.sum([lambda_rate*(trans_times[l]-trans_times[l-1])
for l in range(k+1,i+1)])
result += ( (0 if not Pkm1 else self.F(lambda_rate,time-tk))
-(0 if not Pk else self.F(lambda_rate,time-tk))
) * np.exp(-s)
return result
return ( ( (0 if not self.infected_presence.triggered((t1+t2)/2.)
else self.f_with_fdep(lambda_rate,0)*(t2-t1))
+ (primitive(t2) * np.exp(-lambda_rate*(t2-ti)) -
primitive(t1) * np.exp(-lambda_rate*(t1-ti)) ) )
* self.num_infected/self.room_volume)
@method_cache
def integrated_shortrange_concentration(self) -> _VectorisedFloat:
"""
short-range concentration integrated over interaction times and
diameters. This includes the deposition fraction (fdep).
"""
result = 0.
for sr_model in self.sr_models:
t1,t2 = sr_model.interaction_interval.boundaries()[0]
# function to return the integrand
integrand = lambda d: (self.aerosol_volume(d)
* self.Np(d,sr_model.BLO_factors)*self.fdep(d,1.0))
res = (quad(integrand,
sr_model.diameter_min,sr_model.diameter_max)[0]
* (t2-t1) )
result += (res-self.integrated_background_concentration(t1,t2)
)/sr_model.dilution_factor()
return self.viral_load * 1e-6 * result
def dose(self) -> _VectorisedFloat:
"""
total deposited dose (integrated over time and over particle
diameters), including short and long range.
"""
result = 0.
for t1,t2 in self.infected_presence.boundaries():
result += self.integrated_background_concentration(t1,t2)
result += self.integrated_shortrange_concentration()
return result * self.finf * self.breathing_rate * (1. - self.HI)
def probability_infection(self):
"""
total probability of infection
"""
return (1. - np.exp(-self.dose() * ln2 * (1-self.HI)
/(self.ID50 * self.transmissibility) )) * 100.
presence = models.SpecificInterval(present_times=((8.5, 12), (13, 17.5)))
interaction_intervals = (models.SpecificInterval(present_times=((10.5, 11.0),)),
@ -280,7 +440,7 @@ interaction_intervals = (models.SpecificInterval(present_times=((10.5, 11.0),)),
@pytest.fixture
def conc_model() -> mc.ConcentrationModel:
def c_model() -> mc.ConcentrationModel:
return mc.ConcentrationModel(
room=models.Room(volume=50, humidity=0.3),
ventilation=models.AirChange(active=models.PeriodicInterval(period=120, duration=120), air_exch=1.),
@ -297,18 +457,6 @@ def conc_model() -> mc.ConcentrationModel:
).build_model(SAMPLE_SIZE)
@pytest.fixture
def simple_conc_model() -> SimpleConcentrationModel:
return SimpleConcentrationModel(
infected_presence = presence,
viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum,
breathing_rate = models.Activity.types['Seated'].exhalation_rate,
room_volume = 50.,
lambda_ventilation= 1.,
BLO_factors = expiration_BLO_factors['Breathing'],
)
@pytest.fixture
def sr_models() -> typing.Tuple[mc.ShortRangeModel, ...]:
return (
@ -327,6 +475,18 @@ def sr_models() -> typing.Tuple[mc.ShortRangeModel, ...]:
)
@pytest.fixture
def simple_c_model() -> SimpleConcentrationModel:
return SimpleConcentrationModel(
infected_presence = presence,
viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum,
breathing_rate = models.Activity.types['Seated'].exhalation_rate,
room_volume = 50.,
lambda_ventilation= 1.,
BLO_factors = expiration_BLO_factors['Breathing'],
)
@pytest.fixture
def simple_sr_models() -> typing.Tuple[SimpleShortRangeModel, ...]:
return (
@ -348,36 +508,149 @@ def simple_sr_models() -> typing.Tuple[SimpleShortRangeModel, ...]:
@pytest.mark.parametrize(
"time", np.linspace(8.5,17.5,12),
)
def test_background_concentration(time,conc_model,simple_conc_model):
def test_background_concentration(time,c_model,simple_c_model):
npt.assert_allclose(
conc_model.concentration(time).mean(),
simple_conc_model.concentration(time), rtol=TOLERANCE
c_model.concentration(time).mean(),
simple_c_model.concentration(time), rtol=TOLERANCE
)
@pytest.mark.parametrize(
"time", [10, 10.7, 11., 12.5, 14.75, 14.9, 17]
)
def test_shortrange_concentration(time,conc_model,simple_conc_model,
def test_shortrange_concentration(time,c_model,simple_c_model,
sr_models,simple_sr_models):
result_sr_model = np.sum([np.array(
sr_mod.short_range_concentration(conc_model,time)).mean()
sr_mod.short_range_concentration(c_model,time)).mean()
for sr_mod in sr_models])
result_simple_sr_model = np.sum([np.array(
sr_mod.concentration(simple_conc_model,time)).mean()
sr_mod.concentration(simple_c_model,time)).mean()
for sr_mod in simple_sr_models])
npt.assert_allclose(
result_sr_model,result_simple_sr_model,rtol=TOLERANCE
)
#times = np.linspace(8.5,17.5,120)
#plt.plot(times,[conc_model.concentration(t).mean() for t in times])
#plt.plot(times,[simple_conc_model.concentration(t) for t in times])
def test_background_exposure(c_model):
simple_expo_model = SimpleExposureModel(
infected_presence = presence,
viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum,
breathing_rate = models.Activity.types['Seated'].exhalation_rate,
room_volume = 50.,
lambda_ventilation= 1.,
BLO_factors = expiration_BLO_factors['Breathing'],
finf = models.Virus.types['SARS_CoV_2_DELTA'].viable_to_RNA_ratio,
HI = 0.,
ID50 = models.Virus.types['SARS_CoV_2_DELTA'].infectious_dose,
transmissibility = models.Virus.types['SARS_CoV_2_DELTA'].transmissibility_factor,
sr_models = (),
)
expo_model = mc.ExposureModel(
concentration_model=c_model,
short_range=(),
exposed=mc.Population(
number=1,
presence=presence,
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Seated'],
host_immunity=0.,
),
).build_model(SAMPLE_SIZE)
npt.assert_allclose(
expo_model.deposited_exposure().mean(),
simple_expo_model.dose().mean(), rtol=TOLERANCE
)
npt.assert_allclose(
expo_model.infection_probability().mean(),
simple_expo_model.probability_infection().mean(), rtol=TOLERANCE
)
def test_background_exposure_with_distributions():
simple_expo_model = SimpleExposureModel(
infected_presence = presence,
viral_load = virus_distributions['SARS_CoV_2_DELTA'
].build_model(SAMPLE_SIZE).viral_load_in_sputum,
breathing_rate = activity_distributions['Seated'].build_model(
SAMPLE_SIZE).exhalation_rate,
room_volume = 50.,
lambda_ventilation= 1.,
BLO_factors = expiration_BLO_factors['Breathing'],
finf = virus_distributions['SARS_CoV_2_DELTA'
].build_model(SAMPLE_SIZE).viable_to_RNA_ratio,
HI = 0.,
ID50 = virus_distributions['SARS_CoV_2_DELTA'
].build_model(SAMPLE_SIZE).infectious_dose,
transmissibility = virus_distributions['SARS_CoV_2_DELTA'
].transmissibility_factor,
sr_models = (),
)
expo_model = mc.ExposureModel(
concentration_model=mc.ConcentrationModel(
room=models.Room(volume=50, humidity=0.3),
ventilation=models.AirChange(active=models.PeriodicInterval(
period=120, duration=120), air_exch=1.),
infected=mc.InfectedPopulation(
number=1,
presence=presence,
virus=virus_distributions['SARS_CoV_2_DELTA'],
mask=models.Mask.types['No mask'],
activity=activity_distributions['Seated'],
expiration=expiration_distributions['Breathing'],
host_immunity=0.,
),
evaporation_factor=0.3,
),
short_range=(),
exposed=mc.Population(
number=1,
presence=presence,
mask=models.Mask.types['No mask'],
activity=activity_distributions['Seated'],
host_immunity=0.,
),
).build_model(SAMPLE_SIZE)
npt.assert_allclose(
expo_model.deposited_exposure().mean(),
simple_expo_model.dose().mean(), rtol=TOLERANCE
)
npt.assert_allclose(
expo_model.infection_probability().mean(),
simple_expo_model.probability_infection().mean(), rtol=TOLERANCE
)
def test_exposure_with_shortrange(c_model,sr_models,simple_sr_models):
simple_expo_sr_model = SimpleExposureModel(
infected_presence = presence,
viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum,
breathing_rate = models.Activity.types['Seated'].exhalation_rate,
room_volume = 50.,
lambda_ventilation= 1.,
BLO_factors = expiration_BLO_factors['Breathing'],
finf = models.Virus.types['SARS_CoV_2_DELTA'].viable_to_RNA_ratio,
HI = 0.,
ID50 = models.Virus.types['SARS_CoV_2_DELTA'].infectious_dose,
transmissibility = models.Virus.types['SARS_CoV_2_DELTA'].transmissibility_factor,
sr_models = simple_sr_models,
)
expo_sr_model = mc.ExposureModel(
concentration_model=c_model,
short_range=sr_models,
exposed=mc.Population(
number=1,
presence=presence,
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Seated'],
host_immunity=0.,
),
).build_model(SAMPLE_SIZE)
npt.assert_allclose(
expo_sr_model.deposited_exposure().mean(),
simple_expo_sr_model.dose().mean(), rtol=TOLERANCE
)
npt.assert_allclose(
expo_sr_model.infection_probability().mean(),
simple_expo_sr_model.probability_infection().mean(), rtol=TOLERANCE
)
#plt.plot(times,[np.sum([np.array(sr_mod.short_range_concentration(conc_model,t)).mean()
# for sr_mod in sr_models])
# + conc_model.concentration(t).mean() for t in times])
#plt.plot(times,[np.sum([np.array(sr_mod.concentration(simple_conc_model,t)).mean()
# for sr_mod in simple_sr_models])
# + simple_conc_model.concentration(t)
# for t in times])