From 009c9eea01cb9fa4de8e7164849dc365a08579d7 Mon Sep 17 00:00:00 2001 From: Nicolas Mounet Date: Thu, 31 Mar 2022 18:17:15 +0200 Subject: [PATCH] Adding full algorithm test (partial - up to concentration only); cosmetic changes; improvement of dilution tests --- cara/apps/calculator/model_generator.py | 4 +- cara/models.py | 28 +- cara/tests/models/test_short_range_model.py | 39 +- cara/tests/test_full_algorithm.py | 383 ++++++++++++++++++++ cara/tests/test_monte_carlo_full_models.py | 4 +- 5 files changed, 423 insertions(+), 35 deletions(-) create mode 100644 cara/tests/test_full_algorithm.py diff --git a/cara/apps/calculator/model_generator.py b/cara/apps/calculator/model_generator.py index 5131c56d..ab90fab8 100644 --- a/cara/apps/calculator/model_generator.py +++ b/cara/apps/calculator/model_generator.py @@ -255,7 +255,7 @@ class FormData: expiration=short_range_expiration_distributions[interaction['expiration']], activity=infected_population.activity, presence=self.short_range_interval(interaction), - distances=short_range_distances, + distance=short_range_distances, )) # Initializes and returns a model with the attributes defined above @@ -651,7 +651,7 @@ class FormData: def short_range_interval(self, interaction) -> models.SpecificInterval: start_time = time_string_to_minutes(interaction['start_time']) duration = float(interaction['duration']) - return models.SpecificInterval(present_times=((start_time/60, (start_time + duration)/60),),) + return models.SpecificInterval(present_times=((start_time/60, (start_time + duration)/60),)) def exposed_present_interval(self) -> models.Interval: return self.present_interval( diff --git a/cara/models.py b/cara/models.py index 68617e7a..eb7324d5 100644 --- a/cara/models.py +++ b/cara/models.py @@ -1084,7 +1084,7 @@ class ShortRangeModel: presence: SpecificInterval #: Interpersonal distances - distances: _VectorisedFloat + distance: _VectorisedFloat def dilution_factor(self) -> _VectorisedFloat: ''' @@ -1093,17 +1093,17 @@ class ShortRangeModel: # Average mouth diameter D = 0.02 # Convert Breathing rate from m3/h to m3/s - BR = self.activity.exhalation_rate/3600 + BR = np.array(self.activity.exhalation_rate/3600.) # Area of the mouth assuming a perfect circle Am = np.pi*(D**2)/4 # Initial velocity from the division of the Breathing rate with the area - u0 = BR/Am - + u0 = np.array(BR/Am) + tstar = 2.0 Cr1 = 0.18 Cr2 = 0.2 Cx1 = 2.4 - + # The expired flow rate during the expiration period, m^3/s Q0 = u0 * np.pi/4*D**2 # Parameters in the jet-like stage @@ -1111,17 +1111,19 @@ class ShortRangeModel: # Time of virtual origin t01 = (x01/Cx1)**2 * (Q0*u0)**(-0.5) # The transition point, m - xstar = Cx1*(Q0*u0)**0.25*(tstar + t01)**0.5 - x01 + xstar = np.array(Cx1*(Q0*u0)**0.25*(tstar + t01)**0.5 - x01) # Dilution factor at the transition point xstar - Sxstar = 2*Cr1*(xstar+x01)/D + Sxstar = np.array(2*Cr1*(xstar+x01)/D) + + distances = np.array(self.distance) - distances = np.array(self.distances) - intermediate_range1 = distances < xstar - intermediate_range2 = distances >= xstar - factors = np.empty(distances.shape, dtype=np.float64) - factors[intermediate_range1] = 2*Cr1*(distances[intermediate_range1] + x01)/D - factors[intermediate_range2] = Sxstar[intermediate_range2]*(1 + Cr2*(distances[intermediate_range2] - xstar[intermediate_range2])/Cr1/(xstar[intermediate_range2] + x01))**3 + factors[distances < xstar] = 2*Cr1*(distances[distances < xstar] + + x01)/D + factors[distances >= xstar] = Sxstar[distances >= xstar]*(1 + + Cr2*(distances[distances >= xstar] - + xstar[distances >= xstar])/Cr1/(xstar[distances >= xstar] + + x01))**3 return factors def _normed_concentration(self, concentration_model: ConcentrationModel, time: float) -> _VectorisedFloat: diff --git a/cara/tests/models/test_short_range_model.py b/cara/tests/models/test_short_range_model.py index 94aaba0b..a07a656b 100644 --- a/cara/tests/models/test_short_range_model.py +++ b/cara/tests/models/test_short_range_model.py @@ -8,6 +8,8 @@ import cara.monte_carlo as mc_models from cara.apps.calculator.model_generator import build_expiration from cara.monte_carlo.data import short_range_expiration_distributions, short_range_distances, activity_distributions +# TODO: seed better the random number generators +np.random.seed(2000) @pytest.fixture def concentration_model() -> mc_models.ConcentrationModel: @@ -33,9 +35,9 @@ def concentration_model() -> mc_models.ConcentrationModel: @pytest.fixture def short_range_model(): return mc_models.ShortRangeModel(expiration=short_range_expiration_distributions['Breathing'], - activity=activity_distributions['Seated'], - presence=models.SpecificInterval(present_times=((10.5, 11.0),),), - distances=short_range_distances) + activity=activity_distributions['Seated'], + presence=models.SpecificInterval(present_times=((10.5, 11.0),)), + distance=short_range_distances) def test_short_range_model_ndarray(concentration_model, short_range_model): @@ -49,38 +51,39 @@ def test_short_range_model_ndarray(concentration_model, short_range_model): @pytest.mark.parametrize( "activity, expected_dilution", [ - ["Seated", 1016.558562890949], - ["Standing", 258.848724384952], - ["Light activity", 101.71457048643958], - ["Moderate activity", 77.93123076916304], - ["Heavy exercise", 145.70265022304739], + ["Seated", 176.04075727780327], + ["Standing", 157.12965288170005], + ["Light activity", 69.06672998536413], + ["Moderate activity", 47.165817446310115], + ["Heavy exercise", 23.759992220217875], ] ) def test_dilution_factor(activity, expected_dilution): - model = mc_models.ShortRangeModel(expiration="Breathing", - activity=activity_distributions[activity], - presence=models.SpecificInterval(present_times=((10.5, 11.0),),), - distances=short_range_distances).build_model(10) + model = models.ShortRangeModel(expiration="Breathing", + activity=models.Activity.types[activity], + presence=models.SpecificInterval(present_times=((10.5, 11.0),)), + distance=0.854) assert isinstance(model.dilution_factor(), np.ndarray) np.testing.assert_almost_equal( - model.dilution_factor().mean(), expected_dilution, decimal=10 + model.dilution_factor(), expected_dilution, decimal=10 ) @pytest.mark.parametrize( "time, expected_short_range_concentration", [ [8.5, 0.], - [10.5, 15.124713216706837], - [10.6, 15.193703513876928], - [11.0, 15.265132134078277], + [10.5, 15.24806213], + [10.6, 15.24806213], + [11.0, 15.24806213], [12.0, 0.], ] ) def test_short_range_concentration(time, expected_short_range_concentration, concentration_model, short_range_model): concentration_model = concentration_model.build_model(250_000) model = short_range_model.build_model(250_000) - np.testing.assert_almost_equal( - np.array(model.short_range_concentration(concentration_model, time)).mean(), expected_short_range_concentration + np.testing.assert_allclose( + np.array(model.short_range_concentration(concentration_model, time)).mean(), + expected_short_range_concentration, rtol=0.01 ) @pytest.mark.parametrize( diff --git a/cara/tests/test_full_algorithm.py b/cara/tests/test_full_algorithm.py new file mode 100644 index 00000000..56264d24 --- /dev/null +++ b/cara/tests/test_full_algorithm.py @@ -0,0 +1,383 @@ +from dataclasses import dataclass +import typing + +import numpy as np +from scipy.integrate import quad +from scipy.special import erf +import numpy.testing as npt +import pytest + +import cara.monte_carlo as mc +from cara import models,data +from cara.utils import method_cache +from cara.models import _VectorisedFloat,Interval,SpecificInterval +from cara.monte_carlo.sampleable import LogNormal +from cara.monte_carlo.data import (expiration_distributions, + expiration_BLO_factors,short_range_expiration_distributions, + short_range_distances) + +# TODO: seed better the random number generators +np.random.seed(2000) +SAMPLE_SIZE = 500000 +TOLERANCE = 0.05 + +sqrt2pi = np.sqrt(2.*np.pi) +sqrt2 = np.sqrt(2.) +ln2 = np.log(2) + +@dataclass(frozen=True) +class SimpleConcentrationModel: + """ + Simple model for the background (long-range) concentration, without + all the flexibility of cara.models.ConcentrationModel. + For independent, end-to-end testing purposes. + This assumes no mask wearing, and the same ventilation rate at all + times. + """ + + #: infected people presence interval + infected_presence: Interval + + #: viral load (RNA copies / mL) + viral_load: _VectorisedFloat + + #: breathing rate (m^3/h) + breathing_rate: _VectorisedFloat + + #: room volume (m^3) + room_volume: _VectorisedFloat + + #: ventilation rate (air changes per hour) - including HEPA + lambda_ventilation: _VectorisedFloat + + #: BLO factors + BLO_factors: typing.Tuple[float, float, float] + + #: number of infected people + num_infected: int = 1 + + #: relative humidity RH + humidity: float = 0.3 + + #: minimum particle diameter considered (microns) + diameter_min: float = 0.1 + + #: maximum particle diameter considered (microns) + diameter_max: float = 30. + + #: evaporation factor + evaporation: float = 0.3 + + #: cn (cm^-3) for resp. the B, L and O modes. Corresponds to the + # total concentration of aerosols for each mode. + cn: typing.Tuple[float, float, float] = (0.06, 0.2, 0.0010008) + + # mean of the underlying normal distributions (represents the log of a + # diameter in microns), for resp. the B, L and O modes. + mu: typing.Tuple[float, float, float] = (0.989541, 1.38629, 4.97673) + + # std deviation of the underlying normal distribution, for resp. + # the B, L and O modes. + sigma: typing.Tuple[float, float, float] = (0.262364, 0.506818, 0.585005) + + def removal_rate(self) -> _VectorisedFloat: + """ + removal rate lambda in h^-1, excluding the deposition rate. + """ + return (self.lambda_ventilation + + ln2/(6.43 if self.humidity<=0.4 else 1.1) ) + + @method_cache + def deposition_removal_coefficient(self) -> float: + """ + coefficient in front of gravitational deposition rate, in h^-1.microns^-2 + Note: 0.4512 = 1.88e-4 * 3600 / 1.5 + """ + return 0.4512*(self.evaporation/2.5)**2 + + @method_cache + def aerosol_volume(self,diameter: float) -> float: + """ + particle volume in microns^3 + """ + return 4*np.pi/3. * (diameter/2.)**3 + + @method_cache + def Np(self,diameter: float) -> float: + """ + number of emitted particles per unit volume (BLO model) + in cm^-3.ln(micron)^-1 + """ + result = 0. + for cn,mu,sigma,famp in zip(self.cn,self.mu,self.sigma, + self.BLO_factors): + result += ( (cn * famp)/sigma * + np.exp(-(np.log(diameter)-mu)**2/(2*sigma**2))) + return result/(diameter*sqrt2pi) + + def vR(self,diameter: float) -> float: + """ + emission rate per unit diameter, in RNA copies / h / micron + """ + return (self.viral_load * self.breathing_rate * self.Np(diameter) + * self.aerosol_volume(diameter) * 1e-6) + + @method_cache + def f(self, removal_rate: _VectorisedFloat, deltat: float) -> _VectorisedFloat: + """ + A general function to compute the main integral over diameters + """ + def integrand(diameter): + # function to return the integrand + a = self.deposition_removal_coefficient() + a_dsquare = a*diameter**2 + return (self.vR(diameter)/(a_dsquare + removal_rate) + * np.exp(-a_dsquare*deltat)) + + return quad(integrand,self.diameter_min,self.diameter_max)[0] + + def concentration(self,t: float) -> _VectorisedFloat: + """ + concentration at a given time t + """ + trans_times = sorted(self.infected_presence.transition_times()) + if t==trans_times[0]: + return 0. + + lambda_rate = self.removal_rate() + # transition_times[i] < t <= transition_times[i]+1 + i: int = np.searchsorted(trans_times,t) - 1 # type: ignore + ti = trans_times[i] + Pim1 = self.infected_presence.triggered((trans_times[i-1]+ti)/2.) + Pi = self.infected_presence.triggered((ti+trans_times[i+1])/2.) + + result = (0 if not Pim1 else self.f(lambda_rate,t-ti)) + result -= (0 if not Pi else self.f(lambda_rate,t-ti)) + + for k,tk in enumerate(trans_times[:i]): + Pkm1 = self.infected_presence.triggered((trans_times[k-1]+tk)/2.) + Pk = self.infected_presence.triggered((tk+trans_times[k+1])/2.) + s = np.sum([lambda_rate*(trans_times[l]-trans_times[l-1]) + for l in range(k+1,i+1)]) + result += ( (0 if not Pkm1 else self.f(lambda_rate,t-tk)) + -(0 if not Pk else self.f(lambda_rate,t-tk)) + ) * np.exp(-s) + + return ( ( (0 if not self.infected_presence.triggered(t) + else self.f(lambda_rate,0)) + + result * np.exp(-lambda_rate*(t-ti)) ) + * self.num_infected/self.room_volume) + + +@dataclass(frozen=True) +class SimpleShortRangeModel: + """ + Simple model for the short-range concentration, without + all the flexibility of cara.models.ShortRangeModel. + For independent, end-to-end testing purposes. + This assumes no mask wearing. + """ + + #: time intervals in which a short-range interaction occurs + interaction_interval: SpecificInterval + + #: tuple with interpersonal distanced from infected person (m) + distance : _VectorisedFloat = 0.854 + + #: breathing rate (m^3/h) + breathing_rate: _VectorisedFloat = 0.51 + + #: tuple with BLO factors + BLO_factors: typing.Tuple[float, float, float] = (1,0,0) + + #: minimum diameter for integration (short-range only) (microns) + diameter_min: float = 0.1 + + #: maximum diameter for integration (short-range only) (microns) + diameter_max: float = 100. + + #: mouth opening diameter (m) + D: float = 0.02 + + #: duration of the expiration (s) + tstar: float = 2. + + #: Streamwise and radial penetration coefficients + Cr1: float = 0.18 + Cx1: float = 2.4 + Cr2: float = 0.2 + Cx2: float = 2.2 + + @method_cache + def dilution_factor(self) -> _VectorisedFloat: + """ + computes dilution factor at a certain distance x + based on Wei JIA matlab script. + """ + x = np.array(self.distance) + dilution = np.empty(x.shape, dtype=np.float64) + # expired flow rate during the expiration period, m^3/s + Q0 = np.array(self.breathing_rate/3600) + # the expired flow velocity at the noozle (mouth opening), m/s + u0 = np.array(Q0/(np.pi/4. * self.D**2)) + # parameters in the jet-like stage + # position of virtual origin + x01 = self.D/2/self.Cr1 + # time of virtual origin + t01 = (x01/self.Cx1)**2 * (Q0*u0)**(-0.5) + # transition point (in m) + xstar = np.array(self.Cx1*(Q0*u0)**0.25*(self.tstar + t01)**0.5 + - x01) + # dilution factor at the transition point xstar + Sxstar = np.array(2.*self.Cr1*(xstar+x01)/self.D) + + # calculate dilution factor at the short-range distance x + dilution[x <= xstar] = 2.*self.Cr1*(x[x <= xstar] + x01)/self.D + dilution[x > xstar] = Sxstar[x > xstar]*(1. + self.Cr2*(x[x > xstar] + - xstar[x > xstar]) + /self.Cr1/(xstar[x > xstar] + x01))**3 + + return dilution + + @method_cache + def jet_concentration(self,conc_model: SimpleConcentrationModel) -> _VectorisedFloat: + """ + virion concentration at the origin of the jet (close to + the mouth of the infected person), in m^-3 + we perform the integral of Np(d)*V(d) over diameter analytically + """ + vl = conc_model.viral_load + dmin = self.diameter_min + dmax = self.diameter_max + result = 0. + for cn,mu,sigma,famp in zip(conc_model.cn,conc_model.mu,conc_model.sigma, + self.BLO_factors): + d0 = np.exp(mu) + ymin = (np.log(dmin)-mu)/(sqrt2*sigma)-3.*sigma/sqrt2 + ymax = (np.log(dmax)-mu)/(sqrt2*sigma)-3.*sigma/sqrt2 + result += ( (cn * famp * d0**3)/2. * np.exp(9*sigma**2/2.) * + (erf(ymax) - erf(ymin)) ) + return vl * 1e-6 * result * np.pi/6. + + def concentration(self, conc_model: SimpleConcentrationModel, time: float) -> _VectorisedFloat: + """ + compute the short-range part of the concentration, and add it + to the background concentration + """ + if self.interaction_interval.triggered(time): + background_concentration = conc_model.concentration(time) + S = self.dilution_factor() + return (self.jet_concentration(conc_model) + - background_concentration) / S + else: + return 0. + + +presence = models.SpecificInterval(present_times=((8.5, 12), (13, 17.5))) +interaction_intervals = (models.SpecificInterval(present_times=((10.5, 11.0),)), + models.SpecificInterval(present_times=((14.5, 15.0),)) + ) + + +@pytest.fixture +def conc_model() -> mc.ConcentrationModel: + return mc.ConcentrationModel( + room=models.Room(volume=50, humidity=0.3), + ventilation=models.AirChange(active=models.PeriodicInterval(period=120, duration=120), air_exch=1.), + infected=mc.InfectedPopulation( + number=1, + presence=presence, + virus=models.Virus.types['SARS_CoV_2_DELTA'], + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Seated'], + expiration=expiration_distributions['Breathing'], + host_immunity=0., + ), + evaporation_factor=0.3, + ).build_model(SAMPLE_SIZE) + + +@pytest.fixture +def simple_conc_model() -> SimpleConcentrationModel: + return SimpleConcentrationModel( + infected_presence = presence, + viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum, + breathing_rate = models.Activity.types['Seated'].exhalation_rate, + room_volume = 50., + lambda_ventilation= 1., + BLO_factors = expiration_BLO_factors['Breathing'], + ) + + +@pytest.fixture +def sr_models() -> typing.Tuple[mc.ShortRangeModel, ...]: + return ( + mc.ShortRangeModel( + expiration = short_range_expiration_distributions['Breathing'], + activity = models.Activity.types['Seated'], + presence = interaction_intervals[0], + distance = 0.854, + ).build_model(SAMPLE_SIZE), + mc.ShortRangeModel( + expiration = short_range_expiration_distributions['Speaking'], + activity = models.Activity.types['Seated'], + presence = interaction_intervals[1], + distance = 0.854, + ).build_model(SAMPLE_SIZE), + ) + + +@pytest.fixture +def simple_sr_models() -> typing.Tuple[SimpleShortRangeModel, ...]: + return ( + SimpleShortRangeModel( + interaction_interval = interaction_intervals[0], + distance = 0.854, + breathing_rate = models.Activity.types['Seated'].exhalation_rate, + BLO_factors = expiration_BLO_factors['Breathing'], + ), + SimpleShortRangeModel( + interaction_interval = interaction_intervals[1], + distance = 0.854, + breathing_rate = models.Activity.types['Seated'].exhalation_rate, + BLO_factors = expiration_BLO_factors['Speaking'], + ) + ) + + +@pytest.mark.parametrize( + "time", np.linspace(8.5,17.5,12), +) +def test_background_concentration(time,conc_model,simple_conc_model): + npt.assert_allclose( + conc_model.concentration(time).mean(), + simple_conc_model.concentration(time), rtol=TOLERANCE + ) + +@pytest.mark.parametrize( + "time", [10, 10.7, 11., 12.5, 14.75, 14.9, 17] +) +def test_shortrange_concentration(time,conc_model,simple_conc_model, + sr_models,simple_sr_models): + result_sr_model = np.sum([np.array( + sr_mod.short_range_concentration(conc_model,time)).mean() + for sr_mod in sr_models]) + result_simple_sr_model = np.sum([np.array( + sr_mod.concentration(simple_conc_model,time)).mean() + for sr_mod in simple_sr_models]) + npt.assert_allclose( + result_sr_model,result_simple_sr_model,rtol=TOLERANCE + ) + + +#times = np.linspace(8.5,17.5,120) +#plt.plot(times,[conc_model.concentration(t).mean() for t in times]) +#plt.plot(times,[simple_conc_model.concentration(t) for t in times]) + +#plt.plot(times,[np.sum([np.array(sr_mod.short_range_concentration(conc_model,t)).mean() +# for sr_mod in sr_models]) +# + conc_model.concentration(t).mean() for t in times]) +#plt.plot(times,[np.sum([np.array(sr_mod.concentration(simple_conc_model,t)).mean() +# for sr_mod in simple_sr_models]) +# + simple_conc_model.concentration(t) +# for t in times]) diff --git a/cara/tests/test_monte_carlo_full_models.py b/cara/tests/test_monte_carlo_full_models.py index e9d8ac7c..ecdb3374 100644 --- a/cara/tests/test_monte_carlo_full_models.py +++ b/cara/tests/test_monte_carlo_full_models.py @@ -9,8 +9,8 @@ from cara.apps.calculator.model_generator import build_expiration # TODO: seed better the random number generators np.random.seed(2000) -SAMPLE_SIZE = 250000 -TOLERANCE = 0.05 +SAMPLE_SIZE = 250_000 +TOLERANCE = 0.06 # Load the weather data (temperature in kelvin) for Toronto. toronto_coordinates = (43.667, 79.400)