diff --git a/cara/monte_carlo/__init__.py b/cara/monte_carlo/__init__.py new file mode 100644 index 00000000..aed4fa32 --- /dev/null +++ b/cara/monte_carlo/__init__.py @@ -0,0 +1 @@ +from .models import * diff --git a/cara/monte_carlo/__init__.pyi b/cara/monte_carlo/__init__.pyi new file mode 100644 index 00000000..5a184e17 --- /dev/null +++ b/cara/monte_carlo/__init__.pyi @@ -0,0 +1,4 @@ +from typing import Any + +# For now we disable all type-checking in the monte-carlo submodule. +def __getattr__(name) -> Any: ... diff --git a/cara/monte_carlo/models.py b/cara/monte_carlo/models.py new file mode 100644 index 00000000..7348e3be --- /dev/null +++ b/cara/monte_carlo/models.py @@ -0,0 +1,123 @@ +import copy +import dataclasses +import sys +import typing + +import cara.models + +from .sampleable import SampleableDistribution, _VectorisedFloatOrSampleable + + +_ModelType = typing.TypeVar('_ModelType') + + +class MCModelBase(typing.Generic[_ModelType]): + """ + A model base class for monte carlo types. + + This base class is essentially a declarative description of a cara.models + model with a :meth:`.build_model` method to generate an appropriate + ``cara.models` model instance on demand. + + """ + _base_cls: typing.Type[_ModelType] + + @classmethod + def _to_vectorized_form(cls, item, size): + if isinstance(item, SampleableDistribution): + return item.generate_samples(size) + elif isinstance(item, MCModelBase): + # Recurse into other MCModelBase instances by calling their + # build_model method. + return item.build_model(size) + elif isinstance(item, tuple): + return tuple(cls._to_vectorized_form(sub, size) for sub in item) + else: + return item + + def build_model(self, size: int) -> _ModelType: + """ + Turn this MCModelBase subclass into a cara.models Model instance + from which you can then run the model. + + """ + kwargs = {} + for field in dataclasses.fields(self._base_cls): + attr = getattr(self, field.name) + kwargs[field.name] = self._to_vectorized_form(attr, size) + return self._base_cls(**kwargs) # type: ignore + + +def _build_mc_model(model: _ModelType) -> typing.Type[MCModelBase[_ModelType]]: + """ + Generate a new MCModelBase subclass for the given cara.models model. + + """ + fields = [] + for field in dataclasses.fields(model): + # Note: deepcopy not needed here as we aren't mutating entities beyond + # the top level. + new_field = copy.copy(field) + if field.type is cara.models._VectorisedFloat: # noqa + new_field.type = _VectorisedFloatOrSampleable # type: ignore + + field_type: typing.Any = new_field.type + + if getattr(field_type, '__origin__', None) in [typing.Union, typing.Tuple]: + # It is challenging to generalise this code, so we provide specific transformations, + # and raise for unforseen cases. + if new_field.type == typing.Tuple[cara.models._VentilationBase, ...]: + VB = getattr(sys.modules[__name__], "_VentilationBase") + field_type = typing.Tuple[typing.Union[cara.models._VentilationBase, VB], ...] + elif new_field.type == typing.Tuple[cara.models._ExpirationBase, ...]: + EB = getattr(sys.modules[__name__], "_ExpirationBase") + field_type = typing.Tuple[typing.Union[cara.models._ExpirationBase, EB], ...] + else: + # Check that we don't need to do anything with this type. + for item in new_field.type.__args__: + if getattr(item, '__module__', None) == 'cara.models': + raise ValueError( + f"unsupported type annotation transformation required for {new_field.type}") + elif field_type.__module__ == 'cara.models': + mc_model = getattr(sys.modules[__name__], new_field.type.__name__) + field_type = typing.Union[new_field.type, mc_model] + + fields.append((new_field.name, field_type, new_field)) + + bases = [] + # Update the inheritance/based to use the new MC classes, rather than the cara.models ones. + for model_base in model.__bases__: # type: ignore + if model_base is object: + bases.append(MCModelBase) + else: + mc_model = getattr(sys.modules[__name__], model_base.__name__) + bases.append(mc_model) + + cls = dataclasses.make_dataclass( + model.__name__, # type: ignore + fields, # type: ignore + bases=bases, # type: ignore + namespace={'_base_cls': model}, + # This thing can be mutable - the calculations live on + # the wrapped class, not on the MCModelBase. + frozen=False, + ) + # Update the module of the generated class to be this one. Without this the + # module will be "types". + cls.__module__ = __name__ + return cls + + +_MODEL_CLASSES = [ + cls for cls in vars(cara.models).values() + if dataclasses.is_dataclass(cls) +] + + +# Inject the runtime generated MC types into this module. +for _model in _MODEL_CLASSES: + setattr(sys.modules[__name__], _model.__name__, _build_mc_model(_model)) + + +# Make sure that each of the models is imported if you do a ``import *``. +__all__ = [_model.__name__ for _model in _MODEL_CLASSES] + ["MCModelBase"] diff --git a/cara/monte_carlo/sampleable.py b/cara/monte_carlo/sampleable.py new file mode 100644 index 00000000..4ed49d82 --- /dev/null +++ b/cara/monte_carlo/sampleable.py @@ -0,0 +1,29 @@ +import typing + +import numpy as np + +import cara.models + + +# Declare a float array type of a given size. +# There is no better way to declare this currently, unfortunately. +float_array_size_n = np.ndarray + + +class SampleableDistribution: + def generate_samples(self, size: int) -> float_array_size_n: + raise NotImplementedError() + + +class Normal(SampleableDistribution): + def __init__(self, mean: float, standard_deviation: float): + self.mean = mean + self.standard_deviation = standard_deviation + + def generate_samples(self, size: int) -> float_array_size_n: + return np.random.normal(self.mean, self.standard_deviation, size=size) + + +_VectorisedFloatOrSampleable = typing.Union[ + SampleableDistribution, cara.models._VectorisedFloat, +] diff --git a/cara/tests/test_monte_carlo.py b/cara/tests/test_monte_carlo.py new file mode 100644 index 00000000..f3d9fb08 --- /dev/null +++ b/cara/tests/test_monte_carlo.py @@ -0,0 +1,87 @@ +import dataclasses + +import numpy as np +import pytest + +import cara.models +import cara.monte_carlo.models as mc_models +import cara.monte_carlo.sampleable + + +MODEL_CLASSES = [ + cls for cls in vars(cara.models).values() + if dataclasses.is_dataclass(cls) +] + + +def test_type_annotations(): + # Check that there are appropriate type annotations for all of the model + # classes in cara.models. Note that these must be statically defined in + # cara.monte_carlo, rather than being dynamically generated, in order to + # allow the type system to be able to see their definition without needing + # runtime execution. + missing = [] + for cls in MODEL_CLASSES: + if not hasattr(cara.monte_carlo, cls.__name__): + missing.append(cls.__name__) + continue + mc_cls = getattr(cara.monte_carlo, cls.__name__) + assert issubclass(mc_cls, cara.monte_carlo.MCModelBase) + + if missing: + msg = ( + 'There are missing model implementations in cara.monte_carlo. ' + 'The following definitions are needed:\n ' + + '\n '.join([f'{model} = build_mc_model(cara.models.{model})' for model in missing]) + ) + pytest.fail(msg) + + +@pytest.fixture +def baseline_mc_model() -> cara.monte_carlo.ConcentrationModel: + mc_model = cara.monte_carlo.ConcentrationModel( + room=cara.monte_carlo.Room(volume=cara.monte_carlo.sampleable.Normal(75, 20)), + ventilation=cara.monte_carlo.SlidingWindow( + active=cara.models.PeriodicInterval(period=120, duration=120), + inside_temp=cara.models.PiecewiseConstant((0, 24), (293,)), + outside_temp=cara.models.PiecewiseConstant((0, 24), (283,)), + window_height=1.6, opening_length=0.6, + ), + infected=cara.models.InfectedPopulation( + number=1, + virus=cara.models.Virus.types['SARS_CoV_2'], + presence=cara.models.SpecificInterval(((0, 4), (5, 8))), + mask=cara.models.Mask.types['No mask'], + activity=cara.models.Activity.types['Light activity'], + expiration=cara.models.Expiration.types['Unmodulated Vocalization'], + ), + ) + return mc_model + + +@pytest.fixture +def baseline_mc_exposure_model(baseline_mc_model) -> cara.monte_carlo.ExposureModel: + return cara.monte_carlo.ExposureModel( + baseline_mc_model, + exposed=cara.models.Population( + number=10, + presence=baseline_mc_model.infected.presence, + activity=baseline_mc_model.infected.activity, + mask=baseline_mc_model.infected.mask, + ) + ) + + +def test_build_concentration_model(baseline_mc_model: cara.monte_carlo.ConcentrationModel): + model = baseline_mc_model.build_model(7) + assert isinstance(model, cara.models.ConcentrationModel) + assert isinstance(model.concentration(time=0), float) + assert model.concentration(time=1).shape == (7, ) + + +def test_build_exposure_model(baseline_mc_exposure_model: cara.monte_carlo.ExposureModel): + model = baseline_mc_exposure_model.build_model(7) + assert isinstance(model, cara.models.ExposureModel) + prob = model.quanta_exposure() + assert isinstance(prob, np.ndarray) + assert prob.shape == (7, )