Add a cara.monte_carlo submodule as syntactic sugar on top of the existing cara.models vectorisation.

This allows us to define SampleableDistributions for key variables (and in the future, good default values for these), as well as giving us an exact mirror of the non-MC models which we can ultimately generate those models.
This commit is contained in:
Phil Elson 2021-04-16 07:47:31 +02:00
parent fd4ceef0f1
commit 9669e5afd0
5 changed files with 202 additions and 0 deletions

View file

@ -0,0 +1 @@
from .models import *

View file

@ -0,0 +1,4 @@
from typing import Any
# For now we disable all type-checking in the monte-carlo submodule.
def __getattr__(name) -> Any: ...

View file

@ -0,0 +1,85 @@
import copy
import dataclasses
import sys
import typing
import cara.models
from .sampleable import SampleableDistribution, _VectorisedFloatOrSampleable
_ModelType = typing.TypeVar('_ModelType')
class MCModelBase(typing.Generic[_ModelType]):
"""
A model base class for monte carlo types.
This base class is essentially a declarative description of a cara.models
model with a :meth:`.build_model` method to generate an appropriate
``cara.models` model instance on demand.
"""
_base_cls: typing.Type[_ModelType]
def build_model(self, size: int) -> _ModelType:
"""
Turn this MCModelBase subclass into a cara.models Model instance
from which you can then run the model.
"""
kwargs = {}
for field in dataclasses.fields(self._base_cls):
attr = getattr(self, field.name)
if isinstance(attr, SampleableDistribution):
attr = attr.generate_samples(size)
elif isinstance(attr, MCModelBase):
# Recurse into other MCModelBase instances by calling their
# build_model method.
attr = attr.build_model(size)
kwargs[field.name] = attr
return self._base_cls(**kwargs) # type: ignore
def _build_mc_model(model: _ModelType) -> typing.Type[MCModelBase[_ModelType]]:
"""
Generate a new MCModelBase subclass for the given cara.models model.
"""
fields = []
for field in dataclasses.fields(model):
# Note: deepcopy not needed here as we aren't mutating entities beyond
# the top level.
new_field = copy.copy(field)
if field.type is cara.models._VectorisedFloat: # noqa
new_field.type = _VectorisedFloatOrSampleable # type: ignore
# TODO: Update the type annotation to support the new model classes that exist.
fields.append((new_field.name, new_field.type, new_field))
cls = dataclasses.make_dataclass(
model.__name__, # type: ignore
fields, # type: ignore
bases=(MCModelBase, ),
namespace={'_base_cls': model},
# This thing can be mutable - the calculations live on
# the wrapped class, not on the MCModelBase.
frozen=False,
)
# Update the module of the generated class to be this one. Without this the
# module will be "types".
cls.__module__ = __name__
return cls
_MODEL_CLASSES = [
cls for cls in vars(cara.models).values()
if dataclasses.is_dataclass(cls)
]
# Inject the runtime generated MC types into this module.
for _model in _MODEL_CLASSES:
setattr(sys.modules[__name__], _model.__name__, _build_mc_model(_model))
# Make sure that each of the models is imported if you do a ``import *``.
__all__ = [_model.__name__ for _model in _MODEL_CLASSES] + ["MCModelBase"]

View file

@ -0,0 +1,29 @@
import typing
import numpy as np
import cara.models
# Declare a float array type of a given size.
# There is no better way to declare this currently, unfortunately.
float_array_size_n = np.ndarray
class SampleableDistribution:
def generate_samples(self, size: int) -> float_array_size_n:
raise NotImplementedError()
class Normal(SampleableDistribution):
def __init__(self, mean: float, scale: float):
self.mean = mean
self.scale = scale
def generate_samples(self, size: int) -> float_array_size_n:
return np.random.normal(self.mean, self.scale, size=size)
_VectorisedFloatOrSampleable = typing.Union[
SampleableDistribution, cara.models._VectorisedFloat,
]

View file

@ -0,0 +1,83 @@
import dataclasses
import pytest
import cara.models
import cara.monte_carlo.models as mc_models
import cara.monte_carlo.sampleable
MODEL_CLASSES = [
cls for cls in vars(cara.models).values()
if dataclasses.is_dataclass(cls)
]
def test_type_annotations():
# Check that there are appropriate type annotations for all of the model
# classes in cara.models. Note that these must be statically defined in
# cara.monte_carlo, rather than being dynamically generated, in order to
# allow the type system to be able to see their definition without needing
# runtime execution.
missing = []
for cls in MODEL_CLASSES:
if not hasattr(cara.monte_carlo, cls.__name__):
missing.append(cls.__name__)
continue
mc_cls = getattr(cara.monte_carlo, cls.__name__)
assert issubclass(mc_cls, cara.monte_carlo.MCModelBase)
if missing:
msg = (
'There are missing model implementations in cara.monte_carlo. '
'The following definitions are needed:\n ' +
'\n '.join([f'{model} = build_mc_model(cara.models.{model})' for model in missing])
)
pytest.fail(msg)
@pytest.fixture
def baseline_mc_model() -> cara.monte_carlo.ConcentrationModel:
mc_model = cara.monte_carlo.ConcentrationModel(
room=cara.monte_carlo.Room(volume=cara.monte_carlo.sampleable.Normal(75, 20)),
ventilation=cara.monte_carlo.SlidingWindow(
active=cara.models.PeriodicInterval(period=120, duration=120),
inside_temp=cara.models.PiecewiseConstant((0, 24), (293,)),
outside_temp=cara.models.PiecewiseConstant((0, 24), (283,)),
window_height=1.6, opening_length=0.6,
),
infected=cara.models.InfectedPopulation(
number=1,
virus=cara.models.Virus.types['SARS_CoV_2'],
presence=cara.models.SpecificInterval(((0, 4), (5, 8))),
mask=cara.models.Mask.types['No mask'],
activity=cara.models.Activity.types['Light activity'],
expiration=cara.models.Expiration.types['Unmodulated Vocalization'],
),
)
return mc_model
@pytest.fixture
def baseline_mc_exposure_model(baseline_mc_model) -> cara.monte_carlo.ExposureModel:
return cara.monte_carlo.ExposureModel(
baseline_mc_model,
exposed=cara.models.Population(
number=10,
presence=baseline_mc_model.infected.presence,
activity=baseline_mc_model.infected.activity,
mask=baseline_mc_model.infected.mask,
)
)
def test_build_concentration_model(baseline_mc_model: cara.monte_carlo.ConcentrationModel):
model = baseline_mc_model.build_model(7)
assert isinstance(model, cara.models.ConcentrationModel)
assert isinstance(model.concentration(time=0), float)
assert model.concentration(time=1).shape == (7, )
def test_build_exposure_model(baseline_mc_exposure_model: cara.monte_carlo.ExposureModel):
model = baseline_mc_exposure_model.build_model(7)
assert isinstance(model, cara.models.ExposureModel)