diff --git a/cara/monte_carlo/__init__.py b/cara/monte_carlo/__init__.py new file mode 100644 index 00000000..aed4fa32 --- /dev/null +++ b/cara/monte_carlo/__init__.py @@ -0,0 +1 @@ +from .models import * diff --git a/cara/monte_carlo/__init__.pyi b/cara/monte_carlo/__init__.pyi new file mode 100644 index 00000000..5a184e17 --- /dev/null +++ b/cara/monte_carlo/__init__.pyi @@ -0,0 +1,4 @@ +from typing import Any + +# For now we disable all type-checking in the monte-carlo submodule. +def __getattr__(name) -> Any: ... diff --git a/cara/monte_carlo/models.py b/cara/monte_carlo/models.py new file mode 100644 index 00000000..acdc1dad --- /dev/null +++ b/cara/monte_carlo/models.py @@ -0,0 +1,85 @@ +import copy +import dataclasses +import sys +import typing + +import cara.models + +from .sampleable import SampleableDistribution, _VectorisedFloatOrSampleable + + +_ModelType = typing.TypeVar('_ModelType') + + +class MCModelBase(typing.Generic[_ModelType]): + """ + A model base class for monte carlo types. + + This base class is essentially a declarative description of a cara.models + model with a :meth:`.build_model` method to generate an appropriate + ``cara.models` model instance on demand. + + """ + _base_cls: typing.Type[_ModelType] + + def build_model(self, size: int) -> _ModelType: + """ + Turn this MCModelBase subclass into a cara.models Model instance + from which you can then run the model. + + """ + kwargs = {} + for field in dataclasses.fields(self._base_cls): + attr = getattr(self, field.name) + if isinstance(attr, SampleableDistribution): + attr = attr.generate_samples(size) + elif isinstance(attr, MCModelBase): + # Recurse into other MCModelBase instances by calling their + # build_model method. + attr = attr.build_model(size) + kwargs[field.name] = attr + return self._base_cls(**kwargs) # type: ignore + + +def _build_mc_model(model: _ModelType) -> typing.Type[MCModelBase[_ModelType]]: + """ + Generate a new MCModelBase subclass for the given cara.models model. + + """ + fields = [] + for field in dataclasses.fields(model): + # Note: deepcopy not needed here as we aren't mutating entities beyond + # the top level. + new_field = copy.copy(field) + if field.type is cara.models._VectorisedFloat: # noqa + new_field.type = _VectorisedFloatOrSampleable # type: ignore + # TODO: Update the type annotation to support the new model classes that exist. + fields.append((new_field.name, new_field.type, new_field)) + cls = dataclasses.make_dataclass( + model.__name__, # type: ignore + fields, # type: ignore + bases=(MCModelBase, ), + namespace={'_base_cls': model}, + # This thing can be mutable - the calculations live on + # the wrapped class, not on the MCModelBase. + frozen=False, + ) + # Update the module of the generated class to be this one. Without this the + # module will be "types". + cls.__module__ = __name__ + return cls + + +_MODEL_CLASSES = [ + cls for cls in vars(cara.models).values() + if dataclasses.is_dataclass(cls) +] + + +# Inject the runtime generated MC types into this module. +for _model in _MODEL_CLASSES: + setattr(sys.modules[__name__], _model.__name__, _build_mc_model(_model)) + + +# Make sure that each of the models is imported if you do a ``import *``. +__all__ = [_model.__name__ for _model in _MODEL_CLASSES] + ["MCModelBase"] diff --git a/cara/monte_carlo/sampleable.py b/cara/monte_carlo/sampleable.py new file mode 100644 index 00000000..6f53c43c --- /dev/null +++ b/cara/monte_carlo/sampleable.py @@ -0,0 +1,29 @@ +import typing + +import numpy as np + +import cara.models + + +# Declare a float array type of a given size. +# There is no better way to declare this currently, unfortunately. +float_array_size_n = np.ndarray + + +class SampleableDistribution: + def generate_samples(self, size: int) -> float_array_size_n: + raise NotImplementedError() + + +class Normal(SampleableDistribution): + def __init__(self, mean: float, scale: float): + self.mean = mean + self.scale = scale + + def generate_samples(self, size: int) -> float_array_size_n: + return np.random.normal(self.mean, self.scale, size=size) + + +_VectorisedFloatOrSampleable = typing.Union[ + SampleableDistribution, cara.models._VectorisedFloat, +] diff --git a/cara/tests/test_monte_carlo.py b/cara/tests/test_monte_carlo.py new file mode 100644 index 00000000..28f7dc28 --- /dev/null +++ b/cara/tests/test_monte_carlo.py @@ -0,0 +1,83 @@ +import dataclasses + +import pytest + +import cara.models +import cara.monte_carlo.models as mc_models +import cara.monte_carlo.sampleable + + +MODEL_CLASSES = [ + cls for cls in vars(cara.models).values() + if dataclasses.is_dataclass(cls) +] + + +def test_type_annotations(): + # Check that there are appropriate type annotations for all of the model + # classes in cara.models. Note that these must be statically defined in + # cara.monte_carlo, rather than being dynamically generated, in order to + # allow the type system to be able to see their definition without needing + # runtime execution. + missing = [] + for cls in MODEL_CLASSES: + if not hasattr(cara.monte_carlo, cls.__name__): + missing.append(cls.__name__) + continue + mc_cls = getattr(cara.monte_carlo, cls.__name__) + assert issubclass(mc_cls, cara.monte_carlo.MCModelBase) + + if missing: + msg = ( + 'There are missing model implementations in cara.monte_carlo. ' + 'The following definitions are needed:\n ' + + '\n '.join([f'{model} = build_mc_model(cara.models.{model})' for model in missing]) + ) + pytest.fail(msg) + + +@pytest.fixture +def baseline_mc_model() -> cara.monte_carlo.ConcentrationModel: + mc_model = cara.monte_carlo.ConcentrationModel( + room=cara.monte_carlo.Room(volume=cara.monte_carlo.sampleable.Normal(75, 20)), + ventilation=cara.monte_carlo.SlidingWindow( + active=cara.models.PeriodicInterval(period=120, duration=120), + inside_temp=cara.models.PiecewiseConstant((0, 24), (293,)), + outside_temp=cara.models.PiecewiseConstant((0, 24), (283,)), + window_height=1.6, opening_length=0.6, + ), + infected=cara.models.InfectedPopulation( + number=1, + virus=cara.models.Virus.types['SARS_CoV_2'], + presence=cara.models.SpecificInterval(((0, 4), (5, 8))), + mask=cara.models.Mask.types['No mask'], + activity=cara.models.Activity.types['Light activity'], + expiration=cara.models.Expiration.types['Unmodulated Vocalization'], + ), + ) + return mc_model + + +@pytest.fixture +def baseline_mc_exposure_model(baseline_mc_model) -> cara.monte_carlo.ExposureModel: + return cara.monte_carlo.ExposureModel( + baseline_mc_model, + exposed=cara.models.Population( + number=10, + presence=baseline_mc_model.infected.presence, + activity=baseline_mc_model.infected.activity, + mask=baseline_mc_model.infected.mask, + ) + ) + + +def test_build_concentration_model(baseline_mc_model: cara.monte_carlo.ConcentrationModel): + model = baseline_mc_model.build_model(7) + assert isinstance(model, cara.models.ConcentrationModel) + assert isinstance(model.concentration(time=0), float) + assert model.concentration(time=1).shape == (7, ) + + +def test_build_exposure_model(baseline_mc_exposure_model: cara.monte_carlo.ExposureModel): + model = baseline_mc_exposure_model.build_model(7) + assert isinstance(model, cara.models.ExposureModel)