From 99e57da4695619f5a7a24ea7e2362491ab6c7a32 Mon Sep 17 00:00:00 2001 From: Nicolas Mounet Date: Tue, 5 Apr 2022 18:30:14 +0200 Subject: [PATCH] Adding full algorithm tests on exposure (simple, with virus distributions, and with short-range) --- cara/tests/test_full_algorithm.py | 349 ++++++++++++++++++++++++++---- 1 file changed, 311 insertions(+), 38 deletions(-) diff --git a/cara/tests/test_full_algorithm.py b/cara/tests/test_full_algorithm.py index 56264d24..51e4d32c 100644 --- a/cara/tests/test_full_algorithm.py +++ b/cara/tests/test_full_algorithm.py @@ -13,8 +13,8 @@ from cara.utils import method_cache from cara.models import _VectorisedFloat,Interval,SpecificInterval from cara.monte_carlo.sampleable import LogNormal from cara.monte_carlo.data import (expiration_distributions, - expiration_BLO_factors,short_range_expiration_distributions, - short_range_distances) + expiration_BLO_factors,short_range_expiration_distributions, + short_range_distances,virus_distributions,activity_distributions) # TODO: seed better the random number generators np.random.seed(2000) @@ -103,14 +103,15 @@ class SimpleConcentrationModel: return 4*np.pi/3. * (diameter/2.)**3 @method_cache - def Np(self,diameter: float) -> float: + def Np(self,diameter: float, + BLO_factors: typing.Tuple[float, float, float]) -> float: """ number of emitted particles per unit volume (BLO model) in cm^-3.ln(micron)^-1 """ result = 0. for cn,mu,sigma,famp in zip(self.cn,self.mu,self.sigma, - self.BLO_factors): + BLO_factors): result += ( (cn * famp)/sigma * np.exp(-(np.log(diameter)-mu)**2/(2*sigma**2))) return result/(diameter*sqrt2pi) @@ -119,7 +120,7 @@ class SimpleConcentrationModel: """ emission rate per unit diameter, in RNA copies / h / micron """ - return (self.viral_load * self.breathing_rate * self.Np(diameter) + return (self.Np(diameter, self.BLO_factors) * self.aerosol_volume(diameter) * 1e-6) @method_cache @@ -134,7 +135,8 @@ class SimpleConcentrationModel: return (self.vR(diameter)/(a_dsquare + removal_rate) * np.exp(-a_dsquare*deltat)) - return quad(integrand,self.diameter_min,self.diameter_max)[0] + return (quad(integrand,self.diameter_min,self.diameter_max)[0] + * self.viral_load * self.breathing_rate) def concentration(self,t: float) -> _VectorisedFloat: """ @@ -148,14 +150,16 @@ class SimpleConcentrationModel: # transition_times[i] < t <= transition_times[i]+1 i: int = np.searchsorted(trans_times,t) - 1 # type: ignore ti = trans_times[i] - Pim1 = self.infected_presence.triggered((trans_times[i-1]+ti)/2.) + Pim1 = (False if i==0 else + self.infected_presence.triggered((trans_times[i-1]+ti)/2.)) Pi = self.infected_presence.triggered((ti+trans_times[i+1])/2.) result = (0 if not Pim1 else self.f(lambda_rate,t-ti)) result -= (0 if not Pi else self.f(lambda_rate,t-ti)) for k,tk in enumerate(trans_times[:i]): - Pkm1 = self.infected_presence.triggered((trans_times[k-1]+tk)/2.) + Pkm1 = (False if k==0 else + self.infected_presence.triggered((trans_times[k-1]+tk)/2.)) Pk = self.infected_presence.triggered((tk+trans_times[k+1])/2.) s = np.sum([lambda_rate*(trans_times[l]-trans_times[l-1]) for l in range(k+1,i+1)]) @@ -271,7 +275,163 @@ class SimpleShortRangeModel: - background_concentration) / S else: return 0. - + + +@dataclass(frozen=True) +class SimpleExposureModel(SimpleConcentrationModel): + """ + Simple model for the background (long-range) exposure, without + all the flexibility of cara.models.ExposureModel. + For independent, end-to-end testing purposes. + This assumes no mask wearing, identical inhalation and exhalation + breathing rate, indentical presence for the infected and the exposed + people, the same ventilation rate at all times, and all short-range + interaction intervals are within presence intervals of the infected. + """ + + #: fraction of infected viruses + finf: _VectorisedFloat = 0.5 + + #: host immunity factor (0. for not immune) + HI: _VectorisedFloat = 0. + + #: infectious dose (ID50) + ID50: _VectorisedFloat = 50. + + #: transmissibility factor w.r.t. original strain + # (<1 means more transmissible) + transmissibility: _VectorisedFloat = 1. + + #: list of short-range interaction models + sr_models: typing.Tuple[SimpleShortRangeModel, ...] = () + + def fdep(self, diameter: float, evaporation: float) -> float: + """ + fraction deposited + """ + d = diameter * evaporation + IFrac = 1 - 0.5 * (1 - (1 / (1 + (0.00076*(d**2.8))))) + fdep = IFrac * (0.0587 + + (0.911/(1 + np.exp(4.77 + 1.485 * np.log(d)))) + + (0.943/(1 + np.exp(0.508 - 2.58 * np.log(d))))) + return fdep + + @method_cache + def F(self, removal_rate: _VectorisedFloat, deltat: float) -> _VectorisedFloat: + """ + A general function to compute the main integral over diameters + """ + def integrand(diameter): + # function to return the integrand + a = self.deposition_removal_coefficient() + a_dsquare = a*diameter**2 + return -(self.vR(diameter)*self.fdep(diameter,self.evaporation)/( + (a_dsquare + removal_rate)**2) + * np.exp(-a_dsquare*deltat)) + + return (quad(integrand,self.diameter_min,self.diameter_max)[0] + * self.viral_load * self.breathing_rate) + + @method_cache + def f_with_fdep(self, removal_rate: _VectorisedFloat, deltat: float) -> _VectorisedFloat: + """ + Same as f but with fdep included in the integral. + """ + def integrand(diameter): + # function to return the integrand + a = self.deposition_removal_coefficient() + a_dsquare = a*diameter**2 + return (self.vR(diameter)*self.fdep(diameter,self.evaporation) + /(a_dsquare + removal_rate) * np.exp(-a_dsquare*deltat)) + + return (quad(integrand,self.diameter_min,self.diameter_max)[0] + * self.viral_load * self.breathing_rate) + + @method_cache + def integrated_background_concentration(self,t1: float,t2: float) -> _VectorisedFloat: + """ + background (long-range) concentration integrated from t1 to t2 + assuming both t1 and t2 are within a single presence interval. + This includes the deposition fraction (fdep). + """ + trans_times = sorted(self.infected_presence.transition_times()) + if t2==trans_times[0]: + return 0. + + lambda_rate = self.removal_rate() + # transition_times[i] < t2 <= transition_times[i]+1 + i: int = np.searchsorted(trans_times,t2) - 1 # type: ignore + ti = trans_times[i] + if np.searchsorted(trans_times,t1,side='right')-1 != i: + raise ValueError("t1={}, t2={}, i1={}, i2={} " + "(i1 and i2 should be equal)".format( + t1,t2,i,np.searchsorted(trans_times,t1,side='right')-1)) + + Pim1 = (False if i==0 else + self.infected_presence.triggered((trans_times[i-1]+ti)/2.)) + Pi = self.infected_presence.triggered((ti+trans_times[i+1])/2.) + + def primitive(time): + result = (0 if not Pim1 else self.F(lambda_rate,time-ti)) + result -= (0 if not Pi else self.F(lambda_rate,time-ti)) + + for k,tk in enumerate(trans_times[:i]): + Pkm1 = (False if k==0 else + self.infected_presence.triggered((trans_times[k-1]+tk)/2.)) + Pk = self.infected_presence.triggered((tk+trans_times[k+1])/2.) + s = np.sum([lambda_rate*(trans_times[l]-trans_times[l-1]) + for l in range(k+1,i+1)]) + result += ( (0 if not Pkm1 else self.F(lambda_rate,time-tk)) + -(0 if not Pk else self.F(lambda_rate,time-tk)) + ) * np.exp(-s) + return result + + return ( ( (0 if not self.infected_presence.triggered((t1+t2)/2.) + else self.f_with_fdep(lambda_rate,0)*(t2-t1)) + + (primitive(t2) * np.exp(-lambda_rate*(t2-ti)) - + primitive(t1) * np.exp(-lambda_rate*(t1-ti)) ) ) + * self.num_infected/self.room_volume) + + @method_cache + def integrated_shortrange_concentration(self) -> _VectorisedFloat: + """ + short-range concentration integrated over interaction times and + diameters. This includes the deposition fraction (fdep). + """ + result = 0. + for sr_model in self.sr_models: + t1,t2 = sr_model.interaction_interval.boundaries()[0] + # function to return the integrand + integrand = lambda d: (self.aerosol_volume(d) + * self.Np(d,sr_model.BLO_factors)*self.fdep(d,1.0)) + res = (quad(integrand, + sr_model.diameter_min,sr_model.diameter_max)[0] + * (t2-t1) ) + result += (res-self.integrated_background_concentration(t1,t2) + )/sr_model.dilution_factor() + + return self.viral_load * 1e-6 * result + + def dose(self) -> _VectorisedFloat: + """ + total deposited dose (integrated over time and over particle + diameters), including short and long range. + """ + result = 0. + for t1,t2 in self.infected_presence.boundaries(): + result += self.integrated_background_concentration(t1,t2) + + result += self.integrated_shortrange_concentration() + + return result * self.finf * self.breathing_rate * (1. - self.HI) + + def probability_infection(self): + """ + total probability of infection + """ + return (1. - np.exp(-self.dose() * ln2 * (1-self.HI) + /(self.ID50 * self.transmissibility) )) * 100. + presence = models.SpecificInterval(present_times=((8.5, 12), (13, 17.5))) interaction_intervals = (models.SpecificInterval(present_times=((10.5, 11.0),)), @@ -280,7 +440,7 @@ interaction_intervals = (models.SpecificInterval(present_times=((10.5, 11.0),)), @pytest.fixture -def conc_model() -> mc.ConcentrationModel: +def c_model() -> mc.ConcentrationModel: return mc.ConcentrationModel( room=models.Room(volume=50, humidity=0.3), ventilation=models.AirChange(active=models.PeriodicInterval(period=120, duration=120), air_exch=1.), @@ -297,18 +457,6 @@ def conc_model() -> mc.ConcentrationModel: ).build_model(SAMPLE_SIZE) -@pytest.fixture -def simple_conc_model() -> SimpleConcentrationModel: - return SimpleConcentrationModel( - infected_presence = presence, - viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum, - breathing_rate = models.Activity.types['Seated'].exhalation_rate, - room_volume = 50., - lambda_ventilation= 1., - BLO_factors = expiration_BLO_factors['Breathing'], - ) - - @pytest.fixture def sr_models() -> typing.Tuple[mc.ShortRangeModel, ...]: return ( @@ -327,6 +475,18 @@ def sr_models() -> typing.Tuple[mc.ShortRangeModel, ...]: ) +@pytest.fixture +def simple_c_model() -> SimpleConcentrationModel: + return SimpleConcentrationModel( + infected_presence = presence, + viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum, + breathing_rate = models.Activity.types['Seated'].exhalation_rate, + room_volume = 50., + lambda_ventilation= 1., + BLO_factors = expiration_BLO_factors['Breathing'], + ) + + @pytest.fixture def simple_sr_models() -> typing.Tuple[SimpleShortRangeModel, ...]: return ( @@ -348,36 +508,149 @@ def simple_sr_models() -> typing.Tuple[SimpleShortRangeModel, ...]: @pytest.mark.parametrize( "time", np.linspace(8.5,17.5,12), ) -def test_background_concentration(time,conc_model,simple_conc_model): +def test_background_concentration(time,c_model,simple_c_model): npt.assert_allclose( - conc_model.concentration(time).mean(), - simple_conc_model.concentration(time), rtol=TOLERANCE + c_model.concentration(time).mean(), + simple_c_model.concentration(time), rtol=TOLERANCE ) + @pytest.mark.parametrize( "time", [10, 10.7, 11., 12.5, 14.75, 14.9, 17] ) -def test_shortrange_concentration(time,conc_model,simple_conc_model, +def test_shortrange_concentration(time,c_model,simple_c_model, sr_models,simple_sr_models): result_sr_model = np.sum([np.array( - sr_mod.short_range_concentration(conc_model,time)).mean() + sr_mod.short_range_concentration(c_model,time)).mean() for sr_mod in sr_models]) result_simple_sr_model = np.sum([np.array( - sr_mod.concentration(simple_conc_model,time)).mean() + sr_mod.concentration(simple_c_model,time)).mean() for sr_mod in simple_sr_models]) npt.assert_allclose( result_sr_model,result_simple_sr_model,rtol=TOLERANCE ) -#times = np.linspace(8.5,17.5,120) -#plt.plot(times,[conc_model.concentration(t).mean() for t in times]) -#plt.plot(times,[simple_conc_model.concentration(t) for t in times]) +def test_background_exposure(c_model): + simple_expo_model = SimpleExposureModel( + infected_presence = presence, + viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum, + breathing_rate = models.Activity.types['Seated'].exhalation_rate, + room_volume = 50., + lambda_ventilation= 1., + BLO_factors = expiration_BLO_factors['Breathing'], + finf = models.Virus.types['SARS_CoV_2_DELTA'].viable_to_RNA_ratio, + HI = 0., + ID50 = models.Virus.types['SARS_CoV_2_DELTA'].infectious_dose, + transmissibility = models.Virus.types['SARS_CoV_2_DELTA'].transmissibility_factor, + sr_models = (), + ) + expo_model = mc.ExposureModel( + concentration_model=c_model, + short_range=(), + exposed=mc.Population( + number=1, + presence=presence, + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Seated'], + host_immunity=0., + ), + ).build_model(SAMPLE_SIZE) + npt.assert_allclose( + expo_model.deposited_exposure().mean(), + simple_expo_model.dose().mean(), rtol=TOLERANCE + ) + npt.assert_allclose( + expo_model.infection_probability().mean(), + simple_expo_model.probability_infection().mean(), rtol=TOLERANCE + ) + + +def test_background_exposure_with_distributions(): + simple_expo_model = SimpleExposureModel( + infected_presence = presence, + viral_load = virus_distributions['SARS_CoV_2_DELTA' + ].build_model(SAMPLE_SIZE).viral_load_in_sputum, + breathing_rate = activity_distributions['Seated'].build_model( + SAMPLE_SIZE).exhalation_rate, + room_volume = 50., + lambda_ventilation= 1., + BLO_factors = expiration_BLO_factors['Breathing'], + finf = virus_distributions['SARS_CoV_2_DELTA' + ].build_model(SAMPLE_SIZE).viable_to_RNA_ratio, + HI = 0., + ID50 = virus_distributions['SARS_CoV_2_DELTA' + ].build_model(SAMPLE_SIZE).infectious_dose, + transmissibility = virus_distributions['SARS_CoV_2_DELTA' + ].transmissibility_factor, + sr_models = (), + ) + expo_model = mc.ExposureModel( + concentration_model=mc.ConcentrationModel( + room=models.Room(volume=50, humidity=0.3), + ventilation=models.AirChange(active=models.PeriodicInterval( + period=120, duration=120), air_exch=1.), + infected=mc.InfectedPopulation( + number=1, + presence=presence, + virus=virus_distributions['SARS_CoV_2_DELTA'], + mask=models.Mask.types['No mask'], + activity=activity_distributions['Seated'], + expiration=expiration_distributions['Breathing'], + host_immunity=0., + ), + evaporation_factor=0.3, + ), + short_range=(), + exposed=mc.Population( + number=1, + presence=presence, + mask=models.Mask.types['No mask'], + activity=activity_distributions['Seated'], + host_immunity=0., + ), + ).build_model(SAMPLE_SIZE) + npt.assert_allclose( + expo_model.deposited_exposure().mean(), + simple_expo_model.dose().mean(), rtol=TOLERANCE + ) + npt.assert_allclose( + expo_model.infection_probability().mean(), + simple_expo_model.probability_infection().mean(), rtol=TOLERANCE + ) + + +def test_exposure_with_shortrange(c_model,sr_models,simple_sr_models): + simple_expo_sr_model = SimpleExposureModel( + infected_presence = presence, + viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum, + breathing_rate = models.Activity.types['Seated'].exhalation_rate, + room_volume = 50., + lambda_ventilation= 1., + BLO_factors = expiration_BLO_factors['Breathing'], + finf = models.Virus.types['SARS_CoV_2_DELTA'].viable_to_RNA_ratio, + HI = 0., + ID50 = models.Virus.types['SARS_CoV_2_DELTA'].infectious_dose, + transmissibility = models.Virus.types['SARS_CoV_2_DELTA'].transmissibility_factor, + sr_models = simple_sr_models, + ) + expo_sr_model = mc.ExposureModel( + concentration_model=c_model, + short_range=sr_models, + exposed=mc.Population( + number=1, + presence=presence, + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Seated'], + host_immunity=0., + ), + ).build_model(SAMPLE_SIZE) + npt.assert_allclose( + expo_sr_model.deposited_exposure().mean(), + simple_expo_sr_model.dose().mean(), rtol=TOLERANCE + ) + npt.assert_allclose( + expo_sr_model.infection_probability().mean(), + simple_expo_sr_model.probability_infection().mean(), rtol=TOLERANCE + ) -#plt.plot(times,[np.sum([np.array(sr_mod.short_range_concentration(conc_model,t)).mean() -# for sr_mod in sr_models]) -# + conc_model.concentration(t).mean() for t in times]) -#plt.plot(times,[np.sum([np.array(sr_mod.concentration(simple_conc_model,t)).mean() -# for sr_mod in simple_sr_models]) -# + simple_conc_model.concentration(t) -# for t in times])