Merge branch 'master' of https://gitlab.cern.ch/cara/cara into feature/refined_mask_model

This commit is contained in:
Nicolas Mounet 2021-05-30 07:44:40 +02:00
commit 4c2abf797b
9 changed files with 329 additions and 34 deletions

View file

@ -537,38 +537,16 @@ class FormData:
)
def build_expiration(expiration_definition) -> models.Expiration:
def build_expiration(expiration_definition) -> models._ExpirationBase:
if isinstance(expiration_definition, str):
return models.Expiration.types[expiration_definition]
return models._ExpirationBase.types[expiration_definition]
elif isinstance(expiration_definition, dict):
return expiration_blend({
build_expiration(exp): amount
for exp, amount in expiration_definition.items()
}
return models.MultipleExpiration(
tuple([build_expiration(exp) for exp in expiration_definition.keys()]),
tuple(expiration_definition.values())
)
def expiration_blend(expiration_weights: typing.Dict[models.Expiration, int]) -> models.Expiration:
"""
Combine together multiple types of Expiration, using a weighted mean to
compute their ejection factor and particle sizes.
"""
ejection_factor = np.zeros(4)
particle_sizes = np.zeros(4)
total_weight = 0
for expiration, weight in expiration_weights.items():
total_weight += weight
ejection_factor += np.array(expiration.ejection_factor) * weight
particle_sizes += np.array(expiration.particle_sizes) * weight
r_ejection_factor: typing.Tuple[float, float, float, float] = tuple(ejection_factor/total_weight) # type: ignore
r_particle_sizes: typing.Tuple[float, float, float, float] = tuple(particle_sizes/total_weight) # type: ignore
return models.Expiration(ejection_factor=r_ejection_factor, particle_sizes=r_particle_sizes)
def model_from_form(form: FormData) -> models.ExposureModel:
# Initializes room with volume either given directly or as product of area and height
if form.volume_type == 'room_volume_explicit':

View file

@ -564,13 +564,31 @@ _MaskBase.types = {
@dataclass(frozen=True)
class Expiration:
class _ExpirationBase:
"""
Represents the expiration of aerosols by a person.
Subclasses of _ExpirationBase represent different models.
"""
#: Pre-populated examples of Masks.
types: typing.ClassVar[typing.Dict[str, "_ExpirationBase"]]
def aerosols(self, mask: Mask):
# total volume of aerosols expired per volume of air (mL/cm^3).
raise NotImplementedError("Subclass must implement")
@dataclass(frozen=True)
class Expiration(_ExpirationBase):
"""
Simple model based on four different sizes of particles emitted,
with different ejection factors. See Fig. 4 in L. Morawska et al,
Size distribution and sites of origin of droplets expelled from the
human respiratory tract during expiratory activities,
Aerosol Science 40 (2009) pp. 256 - 269.
"""
ejection_factor: typing.Tuple[float, ...]
particle_sizes: typing.Tuple[float, ...] = (0.8e-4, 1.8e-4, 3.5e-4, 5.5e-4) # In cm.
#: Pre-populated examples of Expiration.
types: typing.ClassVar[typing.Dict[str, "Expiration"]]
def aerosols(self, mask: _MaskBase):
def volume(diameter):
return (4 * np.pi * (diameter/2)**3) / 3
@ -582,7 +600,31 @@ class Expiration:
return total
Expiration.types = {
@dataclass(frozen=True)
class MultipleExpiration(_ExpirationBase):
"""
Represents an expiration of aerosols.
Group together different modes of expiration, that represent
each the main expiration mode for a certain fraction of time (given by
the weights).
"""
expirations: typing.Tuple[_ExpirationBase, ...]
weights: typing.Tuple[float, ...]
def __post_init__(self):
if len(self.expirations) != len(self.weights):
raise ValueError("expirations and weigths should contain the"
"same number of elements")
def aerosols(self, mask: Mask):
return np.array([
weight * expiration.aerosols(mask) / sum(self.weights)
for weight,expiration in zip(self.weights,self.expirations)
]).sum(axis=0)
_ExpirationBase.types = {
'Breathing': Expiration((0.084, 0.009, 0.003, 0.002)),
'Whispering': Expiration((0.11, 0.014, 0.004, 0.002)),
'Talking': Expiration((0.236, 0.068, 0.007, 0.011)),
@ -638,7 +680,7 @@ class InfectedPopulation(Population):
virus: Virus
#: The type of expiration that is being emitted whilst doing the activity.
expiration: Expiration
expiration: _ExpirationBase
def emission_rate_when_present(self) -> _VectorisedFloat:
"""

View file

@ -0,0 +1 @@
from .models import *

View file

@ -0,0 +1,4 @@
from typing import Any
# For now we disable all type-checking in the monte-carlo submodule.
def __getattr__(name) -> Any: ...

123
cara/monte_carlo/models.py Normal file
View file

@ -0,0 +1,123 @@
import copy
import dataclasses
import sys
import typing
import cara.models
from .sampleable import SampleableDistribution, _VectorisedFloatOrSampleable
_ModelType = typing.TypeVar('_ModelType')
class MCModelBase(typing.Generic[_ModelType]):
"""
A model base class for monte carlo types.
This base class is essentially a declarative description of a cara.models
model with a :meth:`.build_model` method to generate an appropriate
``cara.models` model instance on demand.
"""
_base_cls: typing.Type[_ModelType]
@classmethod
def _to_vectorized_form(cls, item, size):
if isinstance(item, SampleableDistribution):
return item.generate_samples(size)
elif isinstance(item, MCModelBase):
# Recurse into other MCModelBase instances by calling their
# build_model method.
return item.build_model(size)
elif isinstance(item, tuple):
return tuple(cls._to_vectorized_form(sub, size) for sub in item)
else:
return item
def build_model(self, size: int) -> _ModelType:
"""
Turn this MCModelBase subclass into a cara.models Model instance
from which you can then run the model.
"""
kwargs = {}
for field in dataclasses.fields(self._base_cls):
attr = getattr(self, field.name)
kwargs[field.name] = self._to_vectorized_form(attr, size)
return self._base_cls(**kwargs) # type: ignore
def _build_mc_model(model: _ModelType) -> typing.Type[MCModelBase[_ModelType]]:
"""
Generate a new MCModelBase subclass for the given cara.models model.
"""
fields = []
for field in dataclasses.fields(model):
# Note: deepcopy not needed here as we aren't mutating entities beyond
# the top level.
new_field = copy.copy(field)
if field.type is cara.models._VectorisedFloat: # noqa
new_field.type = _VectorisedFloatOrSampleable # type: ignore
field_type: typing.Any = new_field.type
if getattr(field_type, '__origin__', None) in [typing.Union, typing.Tuple]:
# It is challenging to generalise this code, so we provide specific transformations,
# and raise for unforseen cases.
if new_field.type == typing.Tuple[cara.models._VentilationBase, ...]:
VB = getattr(sys.modules[__name__], "_VentilationBase")
field_type = typing.Tuple[typing.Union[cara.models._VentilationBase, VB], ...]
elif new_field.type == typing.Tuple[cara.models._ExpirationBase, ...]:
EB = getattr(sys.modules[__name__], "_ExpirationBase")
field_type = typing.Tuple[typing.Union[cara.models._ExpirationBase, EB], ...]
else:
# Check that we don't need to do anything with this type.
for item in new_field.type.__args__:
if getattr(item, '__module__', None) == 'cara.models':
raise ValueError(
f"unsupported type annotation transformation required for {new_field.type}")
elif field_type.__module__ == 'cara.models':
mc_model = getattr(sys.modules[__name__], new_field.type.__name__)
field_type = typing.Union[new_field.type, mc_model]
fields.append((new_field.name, field_type, new_field))
bases = []
# Update the inheritance/based to use the new MC classes, rather than the cara.models ones.
for model_base in model.__bases__: # type: ignore
if model_base is object:
bases.append(MCModelBase)
else:
mc_model = getattr(sys.modules[__name__], model_base.__name__)
bases.append(mc_model)
cls = dataclasses.make_dataclass(
model.__name__, # type: ignore
fields, # type: ignore
bases=bases, # type: ignore
namespace={'_base_cls': model},
# This thing can be mutable - the calculations live on
# the wrapped class, not on the MCModelBase.
frozen=False,
)
# Update the module of the generated class to be this one. Without this the
# module will be "types".
cls.__module__ = __name__
return cls
_MODEL_CLASSES = [
cls for cls in vars(cara.models).values()
if dataclasses.is_dataclass(cls)
]
# Inject the runtime generated MC types into this module.
for _model in _MODEL_CLASSES:
setattr(sys.modules[__name__], _model.__name__, _build_mc_model(_model))
# Make sure that each of the models is imported if you do a ``import *``.
__all__ = [_model.__name__ for _model in _MODEL_CLASSES] + ["MCModelBase"]

View file

@ -0,0 +1,29 @@
import typing
import numpy as np
import cara.models
# Declare a float array type of a given size.
# There is no better way to declare this currently, unfortunately.
float_array_size_n = np.ndarray
class SampleableDistribution:
def generate_samples(self, size: int) -> float_array_size_n:
raise NotImplementedError()
class Normal(SampleableDistribution):
def __init__(self, mean: float, standard_deviation: float):
self.mean = mean
self.standard_deviation = standard_deviation
def generate_samples(self, size: int) -> float_array_size_n:
return np.random.normal(self.mean, self.standard_deviation, size=size)
_VectorisedFloatOrSampleable = typing.Union[
SampleableDistribution, cara.models._VectorisedFloat,
]

View file

@ -8,6 +8,7 @@ from cara.apps.calculator.model_generator import minutes_since_midnight
from cara import models
from cara import data
import numpy as np
import numpy.testing as npt
def test_model_from_dict(baseline_form_data):
@ -24,10 +25,11 @@ def test_model_from_dict_invalid(baseline_form_data):
def test_blend_expiration():
blend = {'Breathing': 2, 'Talking': 1}
r = model_generator.build_expiration(blend)
mask = models.Mask.types['Type I']
expected = models.Expiration(
(0.13466666666666668, 0.02866666666666667, 0.004333333333333334, 0.005)
)
assert r == expected
npt.assert_almost_equal(r.aerosols(mask), expected.aerosols(mask))
def test_ventilation_slidingwindow(baseline_form: model_generator.FormData):

View file

@ -0,0 +1,29 @@
import re
import numpy as np
import numpy.testing as npt
import pytest
from cara import models
def test_multiple_wrong_weight_size():
weights = (1., 2., 3.)
e_base = models.Expiration((0.084, 0.009, 0.003, 0.002))
with pytest.raises(
ValueError,
match=re.escape("expirations and weigths should contain the"
"same number of elements")
):
e = models.MultipleExpiration([e_base, e_base], weights)
def test_multiple():
weights = (1., 2.)
e1 = models.Expiration((0.03, 0.02, 0.01, 0.005))
e2 = models.Expiration((0.05, 0.04, 0.03, 0.01))
e = models.MultipleExpiration([e1, e2], weights)
assert e.aerosols(models.Mask.types['No mask']) == (
e1.aerosols(models.Mask.types['No mask'])/3. +
2*e2.aerosols(models.Mask.types['No mask'])/3.
)

View file

@ -0,0 +1,87 @@
import dataclasses
import numpy as np
import pytest
import cara.models
import cara.monte_carlo.models as mc_models
import cara.monte_carlo.sampleable
MODEL_CLASSES = [
cls for cls in vars(cara.models).values()
if dataclasses.is_dataclass(cls)
]
def test_type_annotations():
# Check that there are appropriate type annotations for all of the model
# classes in cara.models. Note that these must be statically defined in
# cara.monte_carlo, rather than being dynamically generated, in order to
# allow the type system to be able to see their definition without needing
# runtime execution.
missing = []
for cls in MODEL_CLASSES:
if not hasattr(cara.monte_carlo, cls.__name__):
missing.append(cls.__name__)
continue
mc_cls = getattr(cara.monte_carlo, cls.__name__)
assert issubclass(mc_cls, cara.monte_carlo.MCModelBase)
if missing:
msg = (
'There are missing model implementations in cara.monte_carlo. '
'The following definitions are needed:\n ' +
'\n '.join([f'{model} = build_mc_model(cara.models.{model})' for model in missing])
)
pytest.fail(msg)
@pytest.fixture
def baseline_mc_model() -> cara.monte_carlo.ConcentrationModel:
mc_model = cara.monte_carlo.ConcentrationModel(
room=cara.monte_carlo.Room(volume=cara.monte_carlo.sampleable.Normal(75, 20)),
ventilation=cara.monte_carlo.SlidingWindow(
active=cara.models.PeriodicInterval(period=120, duration=120),
inside_temp=cara.models.PiecewiseConstant((0, 24), (293,)),
outside_temp=cara.models.PiecewiseConstant((0, 24), (283,)),
window_height=1.6, opening_length=0.6,
),
infected=cara.models.InfectedPopulation(
number=1,
virus=cara.models.Virus.types['SARS_CoV_2'],
presence=cara.models.SpecificInterval(((0, 4), (5, 8))),
mask=cara.models.Mask.types['No mask'],
activity=cara.models.Activity.types['Light activity'],
expiration=cara.models.Expiration.types['Unmodulated Vocalization'],
),
)
return mc_model
@pytest.fixture
def baseline_mc_exposure_model(baseline_mc_model) -> cara.monte_carlo.ExposureModel:
return cara.monte_carlo.ExposureModel(
baseline_mc_model,
exposed=cara.models.Population(
number=10,
presence=baseline_mc_model.infected.presence,
activity=baseline_mc_model.infected.activity,
mask=baseline_mc_model.infected.mask,
)
)
def test_build_concentration_model(baseline_mc_model: cara.monte_carlo.ConcentrationModel):
model = baseline_mc_model.build_model(7)
assert isinstance(model, cara.models.ConcentrationModel)
assert isinstance(model.concentration(time=0), float)
assert model.concentration(time=1).shape == (7, )
def test_build_exposure_model(baseline_mc_exposure_model: cara.monte_carlo.ExposureModel):
model = baseline_mc_exposure_model.build_model(7)
assert isinstance(model, cara.models.ExposureModel)
prob = model.quanta_exposure()
assert isinstance(prob, np.ndarray)
assert prob.shape == (7, )