Adding full algorithm test (partial - up to concentration only); cosmetic changes; improvement of dilution tests

This commit is contained in:
Nicolas Mounet 2022-03-31 18:17:15 +02:00
parent a2faa2523a
commit 009c9eea01
5 changed files with 423 additions and 35 deletions

View file

@ -255,7 +255,7 @@ class FormData:
expiration=short_range_expiration_distributions[interaction['expiration']],
activity=infected_population.activity,
presence=self.short_range_interval(interaction),
distances=short_range_distances,
distance=short_range_distances,
))
# Initializes and returns a model with the attributes defined above
@ -651,7 +651,7 @@ class FormData:
def short_range_interval(self, interaction) -> models.SpecificInterval:
start_time = time_string_to_minutes(interaction['start_time'])
duration = float(interaction['duration'])
return models.SpecificInterval(present_times=((start_time/60, (start_time + duration)/60),),)
return models.SpecificInterval(present_times=((start_time/60, (start_time + duration)/60),))
def exposed_present_interval(self) -> models.Interval:
return self.present_interval(

View file

@ -1084,7 +1084,7 @@ class ShortRangeModel:
presence: SpecificInterval
#: Interpersonal distances
distances: _VectorisedFloat
distance: _VectorisedFloat
def dilution_factor(self) -> _VectorisedFloat:
'''
@ -1093,17 +1093,17 @@ class ShortRangeModel:
# Average mouth diameter
D = 0.02
# Convert Breathing rate from m3/h to m3/s
BR = self.activity.exhalation_rate/3600
BR = np.array(self.activity.exhalation_rate/3600.)
# Area of the mouth assuming a perfect circle
Am = np.pi*(D**2)/4
# Initial velocity from the division of the Breathing rate with the area
u0 = BR/Am
u0 = np.array(BR/Am)
tstar = 2.0
Cr1 = 0.18
Cr2 = 0.2
Cx1 = 2.4
# The expired flow rate during the expiration period, m^3/s
Q0 = u0 * np.pi/4*D**2
# Parameters in the jet-like stage
@ -1111,17 +1111,19 @@ class ShortRangeModel:
# Time of virtual origin
t01 = (x01/Cx1)**2 * (Q0*u0)**(-0.5)
# The transition point, m
xstar = Cx1*(Q0*u0)**0.25*(tstar + t01)**0.5 - x01
xstar = np.array(Cx1*(Q0*u0)**0.25*(tstar + t01)**0.5 - x01)
# Dilution factor at the transition point xstar
Sxstar = 2*Cr1*(xstar+x01)/D
Sxstar = np.array(2*Cr1*(xstar+x01)/D)
distances = np.array(self.distance)
distances = np.array(self.distances)
intermediate_range1 = distances < xstar
intermediate_range2 = distances >= xstar
factors = np.empty(distances.shape, dtype=np.float64)
factors[intermediate_range1] = 2*Cr1*(distances[intermediate_range1] + x01)/D
factors[intermediate_range2] = Sxstar[intermediate_range2]*(1 + Cr2*(distances[intermediate_range2] - xstar[intermediate_range2])/Cr1/(xstar[intermediate_range2] + x01))**3
factors[distances < xstar] = 2*Cr1*(distances[distances < xstar]
+ x01)/D
factors[distances >= xstar] = Sxstar[distances >= xstar]*(1 +
Cr2*(distances[distances >= xstar] -
xstar[distances >= xstar])/Cr1/(xstar[distances >= xstar]
+ x01))**3
return factors
def _normed_concentration(self, concentration_model: ConcentrationModel, time: float) -> _VectorisedFloat:

View file

@ -8,6 +8,8 @@ import cara.monte_carlo as mc_models
from cara.apps.calculator.model_generator import build_expiration
from cara.monte_carlo.data import short_range_expiration_distributions, short_range_distances, activity_distributions
# TODO: seed better the random number generators
np.random.seed(2000)
@pytest.fixture
def concentration_model() -> mc_models.ConcentrationModel:
@ -33,9 +35,9 @@ def concentration_model() -> mc_models.ConcentrationModel:
@pytest.fixture
def short_range_model():
return mc_models.ShortRangeModel(expiration=short_range_expiration_distributions['Breathing'],
activity=activity_distributions['Seated'],
presence=models.SpecificInterval(present_times=((10.5, 11.0),),),
distances=short_range_distances)
activity=activity_distributions['Seated'],
presence=models.SpecificInterval(present_times=((10.5, 11.0),)),
distance=short_range_distances)
def test_short_range_model_ndarray(concentration_model, short_range_model):
@ -49,38 +51,39 @@ def test_short_range_model_ndarray(concentration_model, short_range_model):
@pytest.mark.parametrize(
"activity, expected_dilution", [
["Seated", 1016.558562890949],
["Standing", 258.848724384952],
["Light activity", 101.71457048643958],
["Moderate activity", 77.93123076916304],
["Heavy exercise", 145.70265022304739],
["Seated", 176.04075727780327],
["Standing", 157.12965288170005],
["Light activity", 69.06672998536413],
["Moderate activity", 47.165817446310115],
["Heavy exercise", 23.759992220217875],
]
)
def test_dilution_factor(activity, expected_dilution):
model = mc_models.ShortRangeModel(expiration="Breathing",
activity=activity_distributions[activity],
presence=models.SpecificInterval(present_times=((10.5, 11.0),),),
distances=short_range_distances).build_model(10)
model = models.ShortRangeModel(expiration="Breathing",
activity=models.Activity.types[activity],
presence=models.SpecificInterval(present_times=((10.5, 11.0),)),
distance=0.854)
assert isinstance(model.dilution_factor(), np.ndarray)
np.testing.assert_almost_equal(
model.dilution_factor().mean(), expected_dilution, decimal=10
model.dilution_factor(), expected_dilution, decimal=10
)
@pytest.mark.parametrize(
"time, expected_short_range_concentration", [
[8.5, 0.],
[10.5, 15.124713216706837],
[10.6, 15.193703513876928],
[11.0, 15.265132134078277],
[10.5, 15.24806213],
[10.6, 15.24806213],
[11.0, 15.24806213],
[12.0, 0.],
]
)
def test_short_range_concentration(time, expected_short_range_concentration, concentration_model, short_range_model):
concentration_model = concentration_model.build_model(250_000)
model = short_range_model.build_model(250_000)
np.testing.assert_almost_equal(
np.array(model.short_range_concentration(concentration_model, time)).mean(), expected_short_range_concentration
np.testing.assert_allclose(
np.array(model.short_range_concentration(concentration_model, time)).mean(),
expected_short_range_concentration, rtol=0.01
)
@pytest.mark.parametrize(

View file

@ -0,0 +1,383 @@
from dataclasses import dataclass
import typing
import numpy as np
from scipy.integrate import quad
from scipy.special import erf
import numpy.testing as npt
import pytest
import cara.monte_carlo as mc
from cara import models,data
from cara.utils import method_cache
from cara.models import _VectorisedFloat,Interval,SpecificInterval
from cara.monte_carlo.sampleable import LogNormal
from cara.monte_carlo.data import (expiration_distributions,
expiration_BLO_factors,short_range_expiration_distributions,
short_range_distances)
# TODO: seed better the random number generators
np.random.seed(2000)
SAMPLE_SIZE = 500000
TOLERANCE = 0.05
sqrt2pi = np.sqrt(2.*np.pi)
sqrt2 = np.sqrt(2.)
ln2 = np.log(2)
@dataclass(frozen=True)
class SimpleConcentrationModel:
"""
Simple model for the background (long-range) concentration, without
all the flexibility of cara.models.ConcentrationModel.
For independent, end-to-end testing purposes.
This assumes no mask wearing, and the same ventilation rate at all
times.
"""
#: infected people presence interval
infected_presence: Interval
#: viral load (RNA copies / mL)
viral_load: _VectorisedFloat
#: breathing rate (m^3/h)
breathing_rate: _VectorisedFloat
#: room volume (m^3)
room_volume: _VectorisedFloat
#: ventilation rate (air changes per hour) - including HEPA
lambda_ventilation: _VectorisedFloat
#: BLO factors
BLO_factors: typing.Tuple[float, float, float]
#: number of infected people
num_infected: int = 1
#: relative humidity RH
humidity: float = 0.3
#: minimum particle diameter considered (microns)
diameter_min: float = 0.1
#: maximum particle diameter considered (microns)
diameter_max: float = 30.
#: evaporation factor
evaporation: float = 0.3
#: cn (cm^-3) for resp. the B, L and O modes. Corresponds to the
# total concentration of aerosols for each mode.
cn: typing.Tuple[float, float, float] = (0.06, 0.2, 0.0010008)
# mean of the underlying normal distributions (represents the log of a
# diameter in microns), for resp. the B, L and O modes.
mu: typing.Tuple[float, float, float] = (0.989541, 1.38629, 4.97673)
# std deviation of the underlying normal distribution, for resp.
# the B, L and O modes.
sigma: typing.Tuple[float, float, float] = (0.262364, 0.506818, 0.585005)
def removal_rate(self) -> _VectorisedFloat:
"""
removal rate lambda in h^-1, excluding the deposition rate.
"""
return (self.lambda_ventilation
+ ln2/(6.43 if self.humidity<=0.4 else 1.1) )
@method_cache
def deposition_removal_coefficient(self) -> float:
"""
coefficient in front of gravitational deposition rate, in h^-1.microns^-2
Note: 0.4512 = 1.88e-4 * 3600 / 1.5
"""
return 0.4512*(self.evaporation/2.5)**2
@method_cache
def aerosol_volume(self,diameter: float) -> float:
"""
particle volume in microns^3
"""
return 4*np.pi/3. * (diameter/2.)**3
@method_cache
def Np(self,diameter: float) -> float:
"""
number of emitted particles per unit volume (BLO model)
in cm^-3.ln(micron)^-1
"""
result = 0.
for cn,mu,sigma,famp in zip(self.cn,self.mu,self.sigma,
self.BLO_factors):
result += ( (cn * famp)/sigma *
np.exp(-(np.log(diameter)-mu)**2/(2*sigma**2)))
return result/(diameter*sqrt2pi)
def vR(self,diameter: float) -> float:
"""
emission rate per unit diameter, in RNA copies / h / micron
"""
return (self.viral_load * self.breathing_rate * self.Np(diameter)
* self.aerosol_volume(diameter) * 1e-6)
@method_cache
def f(self, removal_rate: _VectorisedFloat, deltat: float) -> _VectorisedFloat:
"""
A general function to compute the main integral over diameters
"""
def integrand(diameter):
# function to return the integrand
a = self.deposition_removal_coefficient()
a_dsquare = a*diameter**2
return (self.vR(diameter)/(a_dsquare + removal_rate)
* np.exp(-a_dsquare*deltat))
return quad(integrand,self.diameter_min,self.diameter_max)[0]
def concentration(self,t: float) -> _VectorisedFloat:
"""
concentration at a given time t
"""
trans_times = sorted(self.infected_presence.transition_times())
if t==trans_times[0]:
return 0.
lambda_rate = self.removal_rate()
# transition_times[i] < t <= transition_times[i]+1
i: int = np.searchsorted(trans_times,t) - 1 # type: ignore
ti = trans_times[i]
Pim1 = self.infected_presence.triggered((trans_times[i-1]+ti)/2.)
Pi = self.infected_presence.triggered((ti+trans_times[i+1])/2.)
result = (0 if not Pim1 else self.f(lambda_rate,t-ti))
result -= (0 if not Pi else self.f(lambda_rate,t-ti))
for k,tk in enumerate(trans_times[:i]):
Pkm1 = self.infected_presence.triggered((trans_times[k-1]+tk)/2.)
Pk = self.infected_presence.triggered((tk+trans_times[k+1])/2.)
s = np.sum([lambda_rate*(trans_times[l]-trans_times[l-1])
for l in range(k+1,i+1)])
result += ( (0 if not Pkm1 else self.f(lambda_rate,t-tk))
-(0 if not Pk else self.f(lambda_rate,t-tk))
) * np.exp(-s)
return ( ( (0 if not self.infected_presence.triggered(t)
else self.f(lambda_rate,0))
+ result * np.exp(-lambda_rate*(t-ti)) )
* self.num_infected/self.room_volume)
@dataclass(frozen=True)
class SimpleShortRangeModel:
"""
Simple model for the short-range concentration, without
all the flexibility of cara.models.ShortRangeModel.
For independent, end-to-end testing purposes.
This assumes no mask wearing.
"""
#: time intervals in which a short-range interaction occurs
interaction_interval: SpecificInterval
#: tuple with interpersonal distanced from infected person (m)
distance : _VectorisedFloat = 0.854
#: breathing rate (m^3/h)
breathing_rate: _VectorisedFloat = 0.51
#: tuple with BLO factors
BLO_factors: typing.Tuple[float, float, float] = (1,0,0)
#: minimum diameter for integration (short-range only) (microns)
diameter_min: float = 0.1
#: maximum diameter for integration (short-range only) (microns)
diameter_max: float = 100.
#: mouth opening diameter (m)
D: float = 0.02
#: duration of the expiration (s)
tstar: float = 2.
#: Streamwise and radial penetration coefficients
Cr1: float = 0.18
Cx1: float = 2.4
Cr2: float = 0.2
Cx2: float = 2.2
@method_cache
def dilution_factor(self) -> _VectorisedFloat:
"""
computes dilution factor at a certain distance x
based on Wei JIA matlab script.
"""
x = np.array(self.distance)
dilution = np.empty(x.shape, dtype=np.float64)
# expired flow rate during the expiration period, m^3/s
Q0 = np.array(self.breathing_rate/3600)
# the expired flow velocity at the noozle (mouth opening), m/s
u0 = np.array(Q0/(np.pi/4. * self.D**2))
# parameters in the jet-like stage
# position of virtual origin
x01 = self.D/2/self.Cr1
# time of virtual origin
t01 = (x01/self.Cx1)**2 * (Q0*u0)**(-0.5)
# transition point (in m)
xstar = np.array(self.Cx1*(Q0*u0)**0.25*(self.tstar + t01)**0.5
- x01)
# dilution factor at the transition point xstar
Sxstar = np.array(2.*self.Cr1*(xstar+x01)/self.D)
# calculate dilution factor at the short-range distance x
dilution[x <= xstar] = 2.*self.Cr1*(x[x <= xstar] + x01)/self.D
dilution[x > xstar] = Sxstar[x > xstar]*(1. + self.Cr2*(x[x > xstar]
- xstar[x > xstar])
/self.Cr1/(xstar[x > xstar] + x01))**3
return dilution
@method_cache
def jet_concentration(self,conc_model: SimpleConcentrationModel) -> _VectorisedFloat:
"""
virion concentration at the origin of the jet (close to
the mouth of the infected person), in m^-3
we perform the integral of Np(d)*V(d) over diameter analytically
"""
vl = conc_model.viral_load
dmin = self.diameter_min
dmax = self.diameter_max
result = 0.
for cn,mu,sigma,famp in zip(conc_model.cn,conc_model.mu,conc_model.sigma,
self.BLO_factors):
d0 = np.exp(mu)
ymin = (np.log(dmin)-mu)/(sqrt2*sigma)-3.*sigma/sqrt2
ymax = (np.log(dmax)-mu)/(sqrt2*sigma)-3.*sigma/sqrt2
result += ( (cn * famp * d0**3)/2. * np.exp(9*sigma**2/2.) *
(erf(ymax) - erf(ymin)) )
return vl * 1e-6 * result * np.pi/6.
def concentration(self, conc_model: SimpleConcentrationModel, time: float) -> _VectorisedFloat:
"""
compute the short-range part of the concentration, and add it
to the background concentration
"""
if self.interaction_interval.triggered(time):
background_concentration = conc_model.concentration(time)
S = self.dilution_factor()
return (self.jet_concentration(conc_model)
- background_concentration) / S
else:
return 0.
presence = models.SpecificInterval(present_times=((8.5, 12), (13, 17.5)))
interaction_intervals = (models.SpecificInterval(present_times=((10.5, 11.0),)),
models.SpecificInterval(present_times=((14.5, 15.0),))
)
@pytest.fixture
def conc_model() -> mc.ConcentrationModel:
return mc.ConcentrationModel(
room=models.Room(volume=50, humidity=0.3),
ventilation=models.AirChange(active=models.PeriodicInterval(period=120, duration=120), air_exch=1.),
infected=mc.InfectedPopulation(
number=1,
presence=presence,
virus=models.Virus.types['SARS_CoV_2_DELTA'],
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Seated'],
expiration=expiration_distributions['Breathing'],
host_immunity=0.,
),
evaporation_factor=0.3,
).build_model(SAMPLE_SIZE)
@pytest.fixture
def simple_conc_model() -> SimpleConcentrationModel:
return SimpleConcentrationModel(
infected_presence = presence,
viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum,
breathing_rate = models.Activity.types['Seated'].exhalation_rate,
room_volume = 50.,
lambda_ventilation= 1.,
BLO_factors = expiration_BLO_factors['Breathing'],
)
@pytest.fixture
def sr_models() -> typing.Tuple[mc.ShortRangeModel, ...]:
return (
mc.ShortRangeModel(
expiration = short_range_expiration_distributions['Breathing'],
activity = models.Activity.types['Seated'],
presence = interaction_intervals[0],
distance = 0.854,
).build_model(SAMPLE_SIZE),
mc.ShortRangeModel(
expiration = short_range_expiration_distributions['Speaking'],
activity = models.Activity.types['Seated'],
presence = interaction_intervals[1],
distance = 0.854,
).build_model(SAMPLE_SIZE),
)
@pytest.fixture
def simple_sr_models() -> typing.Tuple[SimpleShortRangeModel, ...]:
return (
SimpleShortRangeModel(
interaction_interval = interaction_intervals[0],
distance = 0.854,
breathing_rate = models.Activity.types['Seated'].exhalation_rate,
BLO_factors = expiration_BLO_factors['Breathing'],
),
SimpleShortRangeModel(
interaction_interval = interaction_intervals[1],
distance = 0.854,
breathing_rate = models.Activity.types['Seated'].exhalation_rate,
BLO_factors = expiration_BLO_factors['Speaking'],
)
)
@pytest.mark.parametrize(
"time", np.linspace(8.5,17.5,12),
)
def test_background_concentration(time,conc_model,simple_conc_model):
npt.assert_allclose(
conc_model.concentration(time).mean(),
simple_conc_model.concentration(time), rtol=TOLERANCE
)
@pytest.mark.parametrize(
"time", [10, 10.7, 11., 12.5, 14.75, 14.9, 17]
)
def test_shortrange_concentration(time,conc_model,simple_conc_model,
sr_models,simple_sr_models):
result_sr_model = np.sum([np.array(
sr_mod.short_range_concentration(conc_model,time)).mean()
for sr_mod in sr_models])
result_simple_sr_model = np.sum([np.array(
sr_mod.concentration(simple_conc_model,time)).mean()
for sr_mod in simple_sr_models])
npt.assert_allclose(
result_sr_model,result_simple_sr_model,rtol=TOLERANCE
)
#times = np.linspace(8.5,17.5,120)
#plt.plot(times,[conc_model.concentration(t).mean() for t in times])
#plt.plot(times,[simple_conc_model.concentration(t) for t in times])
#plt.plot(times,[np.sum([np.array(sr_mod.short_range_concentration(conc_model,t)).mean()
# for sr_mod in sr_models])
# + conc_model.concentration(t).mean() for t in times])
#plt.plot(times,[np.sum([np.array(sr_mod.concentration(simple_conc_model,t)).mean()
# for sr_mod in simple_sr_models])
# + simple_conc_model.concentration(t)
# for t in times])

View file

@ -9,8 +9,8 @@ from cara.apps.calculator.model_generator import build_expiration
# TODO: seed better the random number generators
np.random.seed(2000)
SAMPLE_SIZE = 250000
TOLERANCE = 0.05
SAMPLE_SIZE = 250_000
TOLERANCE = 0.06
# Load the weather data (temperature in kelvin) for Toronto.
toronto_coordinates = (43.667, 79.400)