Merge branch 'feature/mc_module' into 'master'

Introduce a cara.monte_carlo subpackage for managing monte carlo simulations

See merge request cara/cara!189
This commit is contained in:
Nicolas Mounet 2021-05-29 17:12:01 +00:00
commit 0b26a0da2b
5 changed files with 244 additions and 0 deletions

View file

@ -0,0 +1 @@
from .models import *

View file

@ -0,0 +1,4 @@
from typing import Any
# For now we disable all type-checking in the monte-carlo submodule.
def __getattr__(name) -> Any: ...

123
cara/monte_carlo/models.py Normal file
View file

@ -0,0 +1,123 @@
import copy
import dataclasses
import sys
import typing
import cara.models
from .sampleable import SampleableDistribution, _VectorisedFloatOrSampleable
_ModelType = typing.TypeVar('_ModelType')
class MCModelBase(typing.Generic[_ModelType]):
"""
A model base class for monte carlo types.
This base class is essentially a declarative description of a cara.models
model with a :meth:`.build_model` method to generate an appropriate
``cara.models` model instance on demand.
"""
_base_cls: typing.Type[_ModelType]
@classmethod
def _to_vectorized_form(cls, item, size):
if isinstance(item, SampleableDistribution):
return item.generate_samples(size)
elif isinstance(item, MCModelBase):
# Recurse into other MCModelBase instances by calling their
# build_model method.
return item.build_model(size)
elif isinstance(item, tuple):
return tuple(cls._to_vectorized_form(sub, size) for sub in item)
else:
return item
def build_model(self, size: int) -> _ModelType:
"""
Turn this MCModelBase subclass into a cara.models Model instance
from which you can then run the model.
"""
kwargs = {}
for field in dataclasses.fields(self._base_cls):
attr = getattr(self, field.name)
kwargs[field.name] = self._to_vectorized_form(attr, size)
return self._base_cls(**kwargs) # type: ignore
def _build_mc_model(model: _ModelType) -> typing.Type[MCModelBase[_ModelType]]:
"""
Generate a new MCModelBase subclass for the given cara.models model.
"""
fields = []
for field in dataclasses.fields(model):
# Note: deepcopy not needed here as we aren't mutating entities beyond
# the top level.
new_field = copy.copy(field)
if field.type is cara.models._VectorisedFloat: # noqa
new_field.type = _VectorisedFloatOrSampleable # type: ignore
field_type: typing.Any = new_field.type
if getattr(field_type, '__origin__', None) in [typing.Union, typing.Tuple]:
# It is challenging to generalise this code, so we provide specific transformations,
# and raise for unforseen cases.
if new_field.type == typing.Tuple[cara.models._VentilationBase, ...]:
VB = getattr(sys.modules[__name__], "_VentilationBase")
field_type = typing.Tuple[typing.Union[cara.models._VentilationBase, VB], ...]
elif new_field.type == typing.Tuple[cara.models._ExpirationBase, ...]:
EB = getattr(sys.modules[__name__], "_ExpirationBase")
field_type = typing.Tuple[typing.Union[cara.models._ExpirationBase, EB], ...]
else:
# Check that we don't need to do anything with this type.
for item in new_field.type.__args__:
if getattr(item, '__module__', None) == 'cara.models':
raise ValueError(
f"unsupported type annotation transformation required for {new_field.type}")
elif field_type.__module__ == 'cara.models':
mc_model = getattr(sys.modules[__name__], new_field.type.__name__)
field_type = typing.Union[new_field.type, mc_model]
fields.append((new_field.name, field_type, new_field))
bases = []
# Update the inheritance/based to use the new MC classes, rather than the cara.models ones.
for model_base in model.__bases__: # type: ignore
if model_base is object:
bases.append(MCModelBase)
else:
mc_model = getattr(sys.modules[__name__], model_base.__name__)
bases.append(mc_model)
cls = dataclasses.make_dataclass(
model.__name__, # type: ignore
fields, # type: ignore
bases=bases, # type: ignore
namespace={'_base_cls': model},
# This thing can be mutable - the calculations live on
# the wrapped class, not on the MCModelBase.
frozen=False,
)
# Update the module of the generated class to be this one. Without this the
# module will be "types".
cls.__module__ = __name__
return cls
_MODEL_CLASSES = [
cls for cls in vars(cara.models).values()
if dataclasses.is_dataclass(cls)
]
# Inject the runtime generated MC types into this module.
for _model in _MODEL_CLASSES:
setattr(sys.modules[__name__], _model.__name__, _build_mc_model(_model))
# Make sure that each of the models is imported if you do a ``import *``.
__all__ = [_model.__name__ for _model in _MODEL_CLASSES] + ["MCModelBase"]

View file

@ -0,0 +1,29 @@
import typing
import numpy as np
import cara.models
# Declare a float array type of a given size.
# There is no better way to declare this currently, unfortunately.
float_array_size_n = np.ndarray
class SampleableDistribution:
def generate_samples(self, size: int) -> float_array_size_n:
raise NotImplementedError()
class Normal(SampleableDistribution):
def __init__(self, mean: float, standard_deviation: float):
self.mean = mean
self.standard_deviation = standard_deviation
def generate_samples(self, size: int) -> float_array_size_n:
return np.random.normal(self.mean, self.standard_deviation, size=size)
_VectorisedFloatOrSampleable = typing.Union[
SampleableDistribution, cara.models._VectorisedFloat,
]

View file

@ -0,0 +1,87 @@
import dataclasses
import numpy as np
import pytest
import cara.models
import cara.monte_carlo.models as mc_models
import cara.monte_carlo.sampleable
MODEL_CLASSES = [
cls for cls in vars(cara.models).values()
if dataclasses.is_dataclass(cls)
]
def test_type_annotations():
# Check that there are appropriate type annotations for all of the model
# classes in cara.models. Note that these must be statically defined in
# cara.monte_carlo, rather than being dynamically generated, in order to
# allow the type system to be able to see their definition without needing
# runtime execution.
missing = []
for cls in MODEL_CLASSES:
if not hasattr(cara.monte_carlo, cls.__name__):
missing.append(cls.__name__)
continue
mc_cls = getattr(cara.monte_carlo, cls.__name__)
assert issubclass(mc_cls, cara.monte_carlo.MCModelBase)
if missing:
msg = (
'There are missing model implementations in cara.monte_carlo. '
'The following definitions are needed:\n ' +
'\n '.join([f'{model} = build_mc_model(cara.models.{model})' for model in missing])
)
pytest.fail(msg)
@pytest.fixture
def baseline_mc_model() -> cara.monte_carlo.ConcentrationModel:
mc_model = cara.monte_carlo.ConcentrationModel(
room=cara.monte_carlo.Room(volume=cara.monte_carlo.sampleable.Normal(75, 20)),
ventilation=cara.monte_carlo.SlidingWindow(
active=cara.models.PeriodicInterval(period=120, duration=120),
inside_temp=cara.models.PiecewiseConstant((0, 24), (293,)),
outside_temp=cara.models.PiecewiseConstant((0, 24), (283,)),
window_height=1.6, opening_length=0.6,
),
infected=cara.models.InfectedPopulation(
number=1,
virus=cara.models.Virus.types['SARS_CoV_2'],
presence=cara.models.SpecificInterval(((0, 4), (5, 8))),
mask=cara.models.Mask.types['No mask'],
activity=cara.models.Activity.types['Light activity'],
expiration=cara.models.Expiration.types['Unmodulated Vocalization'],
),
)
return mc_model
@pytest.fixture
def baseline_mc_exposure_model(baseline_mc_model) -> cara.monte_carlo.ExposureModel:
return cara.monte_carlo.ExposureModel(
baseline_mc_model,
exposed=cara.models.Population(
number=10,
presence=baseline_mc_model.infected.presence,
activity=baseline_mc_model.infected.activity,
mask=baseline_mc_model.infected.mask,
)
)
def test_build_concentration_model(baseline_mc_model: cara.monte_carlo.ConcentrationModel):
model = baseline_mc_model.build_model(7)
assert isinstance(model, cara.models.ConcentrationModel)
assert isinstance(model.concentration(time=0), float)
assert model.concentration(time=1).shape == (7, )
def test_build_exposure_model(baseline_mc_exposure_model: cara.monte_carlo.ExposureModel):
model = baseline_mc_exposure_model.build_model(7)
assert isinstance(model, cara.models.ExposureModel)
prob = model.quanta_exposure()
assert isinstance(prob, np.ndarray)
assert prob.shape == (7, )