Merge branch 'feature/refined_mask_model' into feature/refined_expiration_model

This commit is contained in:
Nicolas Mounet 2021-05-31 09:30:43 +02:00
commit 43cdb36a47
12 changed files with 320 additions and 148 deletions

View file

@ -269,10 +269,10 @@ class FormData:
else:
return ventilation
def mask(self) -> models._MaskBase:
def mask(self) -> models.Mask:
# Initializes the mask type if mask wearing is "continuous", otherwise instantiates the mask attribute as
# the "No mask"-mask
mask = models._MaskBase.types[self.mask_type if self.mask_wearing_option == "mask_on" else 'No mask']
mask = models.Mask.types[self.mask_type if self.mask_wearing_option == "mask_on" else 'No mask']
return mask
def infected_population(self) -> models.InfectedPopulation:

View file

@ -390,10 +390,10 @@ class ModelWidgets(View):
def _build_mask(self, node):
mask = node.dcs_instance()
for name, mask_ in models._MaskBase.types.items():
for name, mask_ in models.Mask.types.items():
if mask == mask_:
break
mask_choice = widgets.Select(options=list(models._MaskBase.types.keys()), value=name)
mask_choice = widgets.Select(options=list(models.Mask.types.keys()), value=name)
def on_mask_change(change):
node.dcs_select(change['new'])
@ -496,7 +496,7 @@ baseline_model = models.ExposureModel(
number=1,
virus=models.Virus.types['SARS_CoV_2'],
presence=models.SpecificInterval(((8, 12), (13, 17))),
mask=models._MaskBase.types['No mask'],
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Seated'],
expiration=models.Expiration.types['Talking'],
),
@ -505,7 +505,7 @@ baseline_model = models.ExposureModel(
number=10,
presence=models.SpecificInterval(((8, 12), (13, 17))),
activity=models.Activity.types['Seated'],
mask=models._MaskBase.types['No mask'],
mask=models.Mask.types['No mask'],
),
)
@ -515,10 +515,10 @@ class CARAStateBuilder(state.StateBuilder):
# For example, build_type__VentilationBase is called when dealing with ConcentrationModel
# types as it has a ventilation: _VentilationBase field.
def build_type__MaskBase(self, _: dataclasses.Field):
def build_type_Mask(self, _: dataclasses.Field):
return state.DataclassStatePredefined(
models._MaskBase,
choices=models._MaskBase.types,
models.Mask,
choices=models.Mask.types,
)
def build_type_Virus(self, _: dataclasses.Field):

View file

@ -473,92 +473,48 @@ Virus.types = {
@dataclass(frozen=True)
class _MaskBase:
"""
Represents the filtration of aerosols by a mask, both inward and
outward.
The nature of the various mask models means that it is expected
for subclasses of _MaskBase to exist.
"""
class Mask:
#: Filtration efficiency of masks when inhaling.
η_inhale: _VectorisedFloat
#: Global factor applied to filtration efficiency of masks when exhaling.
factor_exhale: _VectorisedFloat = 1.
#: Pre-populated examples of Masks.
types: typing.ClassVar[typing.Dict[str, "_MaskBase"]]
types: typing.ClassVar[typing.Dict[str, "Mask"]]
def exhale_efficiency(self, diameter: float) -> _VectorisedFloat:
# Overall exhale efficiency, including the effect of the leaks.
raise NotImplementedError("Subclass must implement")
def inhale_efficiency(self) -> _VectorisedFloat:
# Overall inhale efficiency, including the effect of the leaks.
raise NotImplementedError("Subclass must implement")
@dataclass(frozen=True)
class Mask(_MaskBase):
#: Filtration efficiency.
η_exhale: _VectorisedFloat
#: Leakage through side of masks.
η_leaks: _VectorisedFloat
#: Filtration efficiency of masks when inhaling.
η_inhale: _VectorisedFloat
def exhale_efficiency(self, diameter: float) -> _VectorisedFloat:
# Overall efficiency with the effect of the leaks for aerosol emission
# Gammaitoni et al (1997). Diameter is in cm.
if diameter < 3e-4:
"""
Overall exhale efficiency, including the effect of the leaks.
See CERN-OPEN-2021-004 (doi: 10.17181/CERN.1GDQ.5Y75), and Ref.
therein (Asadi 2020).
Obtained from measurements of filtration efficiency and of
the leakage through the sides.
Diameter is in microns.
"""
if diameter < 0.5:
eta_out = 0.
else:
eta_out = self.η_exhale * (1 - self.η_leaks)
return eta_out
def inhale_efficiency(self) -> _VectorisedFloat:
# Overall inhale efficiency, including the effect of the leaks.
return self.η_inhale
@dataclass(frozen=True)
class MeasuredMask(_MaskBase):
#: Filtration efficiency of masks when inhaling.
η_inhale: _VectorisedFloat
def exhale_efficiency(self, diameter: float) -> _VectorisedFloat:
# See CERN-OPEN-2021-004 (doi: 10.17181/CERN.1GDQ.5Y75), and Ref.
# therein (Asadi 2020).
# Obtained from measurements of filtration efficiency and of
# the leakage through the sides.
# Diameter is in cm.
if diameter < 0.5e-4:
eta_out = 0.
elif diameter < 0.94614e-4:
eta_out = 0.5893 * diameter * 1e4 + 0.1546
elif diameter < 3e-4:
eta_out = 0.0509 * diameter * 1e4 + 0.664
elif diameter < 0.94614:
eta_out = 0.5893 * diameter + 0.1546
elif diameter < 3.:
eta_out = 0.0509 * diameter + 0.664
else:
eta_out = 0.8167
return eta_out
return eta_out*self.factor_exhale
def inhale_efficiency(self) -> _VectorisedFloat:
# Overall inhale efficiency, including the effect of the leaks.
"""
Overall inhale efficiency, including the effect of the leaks.
"""
return self.η_inhale
_MaskBase.types = {
'No mask': Mask(0, 0, 0),
Mask.types = {
'No mask': Mask(0, 0),
'Type I': Mask(
η_exhale=0.95,
η_leaks=0.15, # (Huang 2007)
η_inhale=0.3, # (Browen 2010)
),
'FFP2': Mask(
η_exhale=0.95, # (same outward effect as type 1 - Asadi 2020)
η_leaks=0.15, # (same outward effect as type 1 - Asadi 2020)
η_inhale=0.865, # (94% penetration efficiency + 8% max inward leakage -> EN 149)
),
'Type I measured': MeasuredMask(
η_inhale=0.5, # (CERN-OPEN-2021-004)
),
'FFP2 measured': MeasuredMask(
'FFP2': Mask(
η_inhale=0.865, # (94% penetration efficiency + 8% max inward leakage -> EN 149)
),
}
@ -570,11 +526,13 @@ class _ExpirationBase:
Represents the expiration of aerosols by a person.
Subclasses of _ExpirationBase represent different models.
"""
#: Pre-populated examples of Masks.
#: Pre-populated examples of Expirations.
types: typing.ClassVar[typing.Dict[str, "_ExpirationBase"]]
def aerosols(self, mask: _MaskBase):
# total volume of aerosols expired per volume of air (mL/cm^3).
def aerosols(self, mask: Mask):
"""
total volume of aerosols expired per volume of air (mL/cm^3).
"""
raise NotImplementedError("Subclass must implement")
@ -590,13 +548,13 @@ class Expiration(_ExpirationBase):
ejection_factor: typing.Tuple[float, ...]
particle_sizes: typing.Tuple[float, ...] = (0.8e-4, 1.8e-4, 3.5e-4, 5.5e-4) # In cm.
def aerosols(self, mask: _MaskBase):
def aerosols(self, mask: Mask):
def volume(diameter):
return (4 * np.pi * (diameter/2)**3) / 3
total = 0
for diameter, factor in zip(self.particle_sizes, self.ejection_factor):
contribution = (volume(diameter) * factor *
(1 - mask.exhale_efficiency(diameter)))
(1 - mask.exhale_efficiency(diameter*1e4)))
total += contribution
return total
@ -618,7 +576,7 @@ class MultipleExpiration(_ExpirationBase):
raise ValueError("expirations and weigths should contain the"
"same number of elements")
def aerosols(self, mask: _MaskBase):
def aerosols(self, mask: Mask):
return np.array([
weight * expiration.aerosols(mask) / sum(self.weights)
for weight,expiration in zip(self.weights,self.expirations)
@ -640,7 +598,7 @@ class ExpirationBLO(_ExpirationBase):
BLO_factors: typing.Tuple[float, float, float]
def aerosols(self, mask: _MaskBase):
# result is in mL.cm^-3
""" Result is in mL.cm^-3 """
def volume(d):
return (np.pi * d**3) / 6.
@ -713,7 +671,7 @@ class Population:
presence: Interval
#: The kind of mask being worn by the people.
mask: _MaskBase
mask: Mask
#: The physical activity being carried out by the people.
activity: Activity

View file

@ -0,0 +1 @@
from .models import *

View file

@ -0,0 +1,4 @@
from typing import Any
# For now we disable all type-checking in the monte-carlo submodule.
def __getattr__(name) -> Any: ...

123
cara/monte_carlo/models.py Normal file
View file

@ -0,0 +1,123 @@
import copy
import dataclasses
import sys
import typing
import cara.models
from .sampleable import SampleableDistribution, _VectorisedFloatOrSampleable
_ModelType = typing.TypeVar('_ModelType')
class MCModelBase(typing.Generic[_ModelType]):
"""
A model base class for monte carlo types.
This base class is essentially a declarative description of a cara.models
model with a :meth:`.build_model` method to generate an appropriate
``cara.models` model instance on demand.
"""
_base_cls: typing.Type[_ModelType]
@classmethod
def _to_vectorized_form(cls, item, size):
if isinstance(item, SampleableDistribution):
return item.generate_samples(size)
elif isinstance(item, MCModelBase):
# Recurse into other MCModelBase instances by calling their
# build_model method.
return item.build_model(size)
elif isinstance(item, tuple):
return tuple(cls._to_vectorized_form(sub, size) for sub in item)
else:
return item
def build_model(self, size: int) -> _ModelType:
"""
Turn this MCModelBase subclass into a cara.models Model instance
from which you can then run the model.
"""
kwargs = {}
for field in dataclasses.fields(self._base_cls):
attr = getattr(self, field.name)
kwargs[field.name] = self._to_vectorized_form(attr, size)
return self._base_cls(**kwargs) # type: ignore
def _build_mc_model(model: _ModelType) -> typing.Type[MCModelBase[_ModelType]]:
"""
Generate a new MCModelBase subclass for the given cara.models model.
"""
fields = []
for field in dataclasses.fields(model):
# Note: deepcopy not needed here as we aren't mutating entities beyond
# the top level.
new_field = copy.copy(field)
if field.type is cara.models._VectorisedFloat: # noqa
new_field.type = _VectorisedFloatOrSampleable # type: ignore
field_type: typing.Any = new_field.type
if getattr(field_type, '__origin__', None) in [typing.Union, typing.Tuple]:
# It is challenging to generalise this code, so we provide specific transformations,
# and raise for unforseen cases.
if new_field.type == typing.Tuple[cara.models._VentilationBase, ...]:
VB = getattr(sys.modules[__name__], "_VentilationBase")
field_type = typing.Tuple[typing.Union[cara.models._VentilationBase, VB], ...]
elif new_field.type == typing.Tuple[cara.models._ExpirationBase, ...]:
EB = getattr(sys.modules[__name__], "_ExpirationBase")
field_type = typing.Tuple[typing.Union[cara.models._ExpirationBase, EB], ...]
else:
# Check that we don't need to do anything with this type.
for item in new_field.type.__args__:
if getattr(item, '__module__', None) == 'cara.models':
raise ValueError(
f"unsupported type annotation transformation required for {new_field.type}")
elif field_type.__module__ == 'cara.models':
mc_model = getattr(sys.modules[__name__], new_field.type.__name__)
field_type = typing.Union[new_field.type, mc_model]
fields.append((new_field.name, field_type, new_field))
bases = []
# Update the inheritance/based to use the new MC classes, rather than the cara.models ones.
for model_base in model.__bases__: # type: ignore
if model_base is object:
bases.append(MCModelBase)
else:
mc_model = getattr(sys.modules[__name__], model_base.__name__)
bases.append(mc_model)
cls = dataclasses.make_dataclass(
model.__name__, # type: ignore
fields, # type: ignore
bases=bases, # type: ignore
namespace={'_base_cls': model},
# This thing can be mutable - the calculations live on
# the wrapped class, not on the MCModelBase.
frozen=False,
)
# Update the module of the generated class to be this one. Without this the
# module will be "types".
cls.__module__ = __name__
return cls
_MODEL_CLASSES = [
cls for cls in vars(cara.models).values()
if dataclasses.is_dataclass(cls)
]
# Inject the runtime generated MC types into this module.
for _model in _MODEL_CLASSES:
setattr(sys.modules[__name__], _model.__name__, _build_mc_model(_model))
# Make sure that each of the models is imported if you do a ``import *``.
__all__ = [_model.__name__ for _model in _MODEL_CLASSES] + ["MCModelBase"]

View file

@ -0,0 +1,29 @@
import typing
import numpy as np
import cara.models
# Declare a float array type of a given size.
# There is no better way to declare this currently, unfortunately.
float_array_size_n = np.ndarray
class SampleableDistribution:
def generate_samples(self, size: int) -> float_array_size_n:
raise NotImplementedError()
class Normal(SampleableDistribution):
def __init__(self, mean: float, standard_deviation: float):
self.mean = mean
self.standard_deviation = standard_deviation
def generate_samples(self, size: int) -> float_array_size_n:
return np.random.normal(self.mean, self.standard_deviation, size=size)
_VectorisedFloatOrSampleable = typing.Union[
SampleableDistribution, cara.models._VectorisedFloat,
]

View file

@ -1,6 +1,7 @@
import re
import numpy as np
import numpy.testing as npt
import pytest
from cara import models
@ -13,8 +14,7 @@ from cara import models
{'air_change': np.array([100, 120])},
{'viral_load_in_sputum': np.array([5e8, 1e9])},
{'quantum_infectious_dose': np.array([50, 20])},
{'η_exhale': np.array([0.92, 0.95])},
{'η_leaks': np.array([0.15, 0.20])},
{'factor_exhale': np.array([0.92, 0.95])},
]
)
def test_concentration_model_vectorisation(override_params):
@ -24,8 +24,7 @@ def test_concentration_model_vectorisation(override_params):
'air_change': 100,
'viral_load_in_sputum': 1e9,
'quantum_infectious_dose': 50,
'η_exhale': 0.95,
'η_leaks': 0.15,
'factor_exhale': 0.95,
}
defaults.update(override_params)
@ -37,8 +36,7 @@ def test_concentration_model_vectorisation(override_params):
number=1,
presence=always,
mask=models.Mask(
η_exhale=defaults['η_exhale'],
η_leaks=defaults['η_leaks'],
factor_exhale=defaults['factor_exhale'],
η_inhale=0.3,
),
activity=models.Activity(
@ -128,4 +126,4 @@ def test_integrated_concentration(simple_conc_model):
c2 = simple_conc_model.integrated_concentration(0, 1)
c3 = simple_conc_model.integrated_concentration(1, 2)
assert c1 != 0
assert c1 == c2 + c3
npt.assert_almost_equal(c1, c2 + c3, decimal=15)

View file

@ -43,7 +43,7 @@ populations = [
),
# A population with some array component for η_inhale.
models.Population(
10, halftime, models.Mask(0.95, 0.15, np.array([0.3, 0.35])),
10, halftime, models.Mask(np.array([0.3, 0.35])),
models.Activity.types['Standing'],
),
# A population with some array component for inhalation_rate.
@ -60,10 +60,10 @@ populations = [
np.array([14.4, 14.4]), np.array([99.6803184113, 99.5181053773])],
[populations[2], KnownConcentrations(lambda t: 1.2),
np.array([14.4, 14.4]), np.array([99.4146994564, 99.6803184113])],
np.array([14.4, 14.4]), np.array([97.4574432074, 98.3493482895])],
[populations[0], KnownConcentrations(lambda t: np.array([1.2, 2.4])),
np.array([14.4, 28.8]), np.array([99.6803184113, 99.9989780368])],
np.array([14.4, 28.8]), np.array([98.3493482895, 99.9727534893])],
[populations[1], KnownConcentrations(lambda t: np.array([1.2, 2.4])),
np.array([14.4, 28.8]), np.array([99.6803184113, 99.9976777757])],
@ -123,22 +123,22 @@ def conc_model():
models.InfectedPopulation(
number=1,
presence=interesting_times,
mask=models.Mask.types['Type I'],
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Seated'],
virus=models.Virus.types['SARS_CoV_2'],
expiration=models.Expiration.types['Breathing'],
expiration=models.Expiration.types['Superspreading event'],
)
)
# expected quanta were computed with a trapezoidal integration, using
# a mesh of 10'000 pts per exposed presence interval.
@pytest.mark.parametrize("exposed_time_interval, expected_quanta", [
[(0, 1), 0.0055680845],
[(1, 1.01), 6.4960491e-05],
[(1.01, 1.02), 6.3187723e-05],
[(12, 12.01), 1.9307359e-06],
[(12, 24), 0.079347465],
[(0, 24), 0.086122050],
[(0, 1), 5.4869151],
[(1, 1.01), 0.064013521],
[(1.01, 1.02), 0.062266596],
[(12, 12.01), 0.0019025904],
[(12, 24), 78.190763],
[(0, 24), 84.866592],
]
)
def test_exposure_model_integral_accuracy(exposed_time_interval,

View file

@ -1,6 +1,3 @@
import dataclasses
import numpy as np
import numpy.testing as npt
import pytest
@ -14,45 +11,23 @@ from cara import models
[np.array([0.3, 0.5]), np.array([0.3, 0.5])],
],
)
def test_masks_inhale(η_inhale, expected_inhale_efficiency):
mask = models.Mask(η_inhale=η_inhale,η_exhale=0.95,η_leaks=0.15)
measuredmask = models.MeasuredMask(η_inhale=η_inhale)
def test_mask_inhale(η_inhale, expected_inhale_efficiency):
mask = models.Mask(η_inhale=η_inhale)
npt.assert_equal(mask.inhale_efficiency(),
expected_inhale_efficiency)
npt.assert_equal(measuredmask.inhale_efficiency(),
expected_inhale_efficiency)
@pytest.mark.parametrize(
"η_exhale, η_leaks, expected_exhale_efficiency_small, expected_exhale_efficiency_large",
"diameter, factor_exhale, expected_exhale_efficiency",
[
[0.95, 0.15, 0., 0.8075],
[np.array([0.95, 1.]), 0.15, np.zeros(2), np.array([0.8075, 0.85])],
[0.95, np.array([0.15, 0.]), np.zeros(2), np.array([0.8075, 0.95])],
[np.array([0.95, 1.]), np.array([0.15, 0.]), np.zeros(2), np.array([0.8075, 1.])],
[0.3, 1., 0.],
[0.7, 0.3, 0.56711*0.3],
[1., 1., 0.7149],
[4., 0.5, 0.8167*0.5],
[5., 0., 0.],
],
)
def test_mask_exhale(η_exhale, η_leaks, expected_exhale_efficiency_small,
expected_exhale_efficiency_large):
mask = models.Mask(η_inhale=0.3,η_exhale=η_exhale,η_leaks=η_leaks)
# we test one small and one large diameter (resp. 1 and 4 microns)
npt.assert_equal(mask.exhale_efficiency(1.e-4),
expected_exhale_efficiency_small)
npt.assert_equal(mask.exhale_efficiency(4.e-4),
expected_exhale_efficiency_large)
@pytest.mark.parametrize(
"diameter, expected_exhale_efficiency",
[
[0.3e-4, 0.],
[0.7e-4, 0.56711],
[1.e-4, 0.7149],
[4.e-4, 0.8167],
],
)
def test_measuredmask_exhale(diameter, expected_exhale_efficiency):
mask = models.MeasuredMask(η_inhale=0.3)
def test_mask_exhale(diameter, factor_exhale, expected_exhale_efficiency):
mask = models.Mask(η_inhale=0.3, factor_exhale=factor_exhale)
npt.assert_almost_equal(mask.exhale_efficiency(diameter),
expected_exhale_efficiency)

View file

@ -8,8 +8,7 @@ import cara.models
"override_params", [
{'viral_load_in_sputum': np.array([5e8, 1e9])},
{'quantum_infectious_dose': np.array([50, 20])},
{'η_exhale': np.array([0.92, 0.95])},
{'η_leaks': np.array([0.15, 0.20])},
{'factor_exhale': np.array([0.92, 0.95])},
{'exhalation_rate': np.array([0.75, 0.81])},
]
)
@ -17,8 +16,7 @@ def test_infected_population_vectorisation(override_params):
defaults = {
'viral_load_in_sputum': 1e9,
'quantum_infectious_dose': 50,
'η_exhale': 0.95,
'η_leaks': 0.15,
'factor_exhale': 0.95,
'exhalation_rate': 0.75,
}
defaults.update(override_params)
@ -28,8 +26,7 @@ def test_infected_population_vectorisation(override_params):
number=1,
presence=office_hours,
mask=cara.models.Mask(
η_exhale=defaults['η_exhale'],
η_leaks=defaults['η_leaks'],
factor_exhale=defaults['factor_exhale'],
η_inhale=0.3,
),
activity=cara.models.Activity(

View file

@ -0,0 +1,87 @@
import dataclasses
import numpy as np
import pytest
import cara.models
import cara.monte_carlo.models as mc_models
import cara.monte_carlo.sampleable
MODEL_CLASSES = [
cls for cls in vars(cara.models).values()
if dataclasses.is_dataclass(cls)
]
def test_type_annotations():
# Check that there are appropriate type annotations for all of the model
# classes in cara.models. Note that these must be statically defined in
# cara.monte_carlo, rather than being dynamically generated, in order to
# allow the type system to be able to see their definition without needing
# runtime execution.
missing = []
for cls in MODEL_CLASSES:
if not hasattr(cara.monte_carlo, cls.__name__):
missing.append(cls.__name__)
continue
mc_cls = getattr(cara.monte_carlo, cls.__name__)
assert issubclass(mc_cls, cara.monte_carlo.MCModelBase)
if missing:
msg = (
'There are missing model implementations in cara.monte_carlo. '
'The following definitions are needed:\n ' +
'\n '.join([f'{model} = build_mc_model(cara.models.{model})' for model in missing])
)
pytest.fail(msg)
@pytest.fixture
def baseline_mc_model() -> cara.monte_carlo.ConcentrationModel:
mc_model = cara.monte_carlo.ConcentrationModel(
room=cara.monte_carlo.Room(volume=cara.monte_carlo.sampleable.Normal(75, 20)),
ventilation=cara.monte_carlo.SlidingWindow(
active=cara.models.PeriodicInterval(period=120, duration=120),
inside_temp=cara.models.PiecewiseConstant((0, 24), (293,)),
outside_temp=cara.models.PiecewiseConstant((0, 24), (283,)),
window_height=1.6, opening_length=0.6,
),
infected=cara.models.InfectedPopulation(
number=1,
virus=cara.models.Virus.types['SARS_CoV_2'],
presence=cara.models.SpecificInterval(((0, 4), (5, 8))),
mask=cara.models.Mask.types['No mask'],
activity=cara.models.Activity.types['Light activity'],
expiration=cara.models.Expiration.types['Unmodulated Vocalization'],
),
)
return mc_model
@pytest.fixture
def baseline_mc_exposure_model(baseline_mc_model) -> cara.monte_carlo.ExposureModel:
return cara.monte_carlo.ExposureModel(
baseline_mc_model,
exposed=cara.models.Population(
number=10,
presence=baseline_mc_model.infected.presence,
activity=baseline_mc_model.infected.activity,
mask=baseline_mc_model.infected.mask,
)
)
def test_build_concentration_model(baseline_mc_model: cara.monte_carlo.ConcentrationModel):
model = baseline_mc_model.build_model(7)
assert isinstance(model, cara.models.ConcentrationModel)
assert isinstance(model.concentration(time=0), float)
assert model.concentration(time=1).shape == (7, )
def test_build_exposure_model(baseline_mc_exposure_model: cara.monte_carlo.ExposureModel):
model = baseline_mc_exposure_model.build_model(7)
assert isinstance(model, cara.models.ExposureModel)
prob = model.quanta_exposure()
assert isinstance(prob, np.ndarray)
assert prob.shape == (7, )