Merge branch 'feature/model_vectorisation_pt3' into 'master'

Vectorise ExposureModel

See merge request cara/cara!163
This commit is contained in:
Philip James Elson 2021-05-04 04:31:46 +00:00
commit 9e3adf30b0
4 changed files with 163 additions and 9 deletions

View file

@ -366,7 +366,7 @@ class HEPAFilter(Ventilation):
#: The rate at which the HEPA exchanges air (when switched on)
# in m^3/h
q_air_mech: float
q_air_mech: _VectorisedFloat
def air_exchange(self, room: Room, time: float) -> _VectorisedFloat:
# If the HEPA is off, no air is being exchanged.
@ -383,7 +383,7 @@ class HVACMechanical(Ventilation):
#: The rate at which the HVAC exchanges air (when switched on)
# in m^3/h
q_air_mech: float
q_air_mech: _VectorisedFloat
def air_exchange(self, room: Room, time: float) -> _VectorisedFloat:
# If the HVAC is off, no air is being exchanged.
@ -463,7 +463,7 @@ class Mask:
η_leaks: _VectorisedFloat
#: Filtration efficiency of masks when inhaling.
η_inhale: float
η_inhale: _VectorisedFloat
#: Particle sizes in cm.
particle_sizes: typing.Tuple[float, float, float, float] = (
@ -698,19 +698,19 @@ class ExposureModel:
#: The number of times the exposure event is repeated (default 1).
repeats: int = 1
def quanta_exposure(self) -> float:
def quanta_exposure(self) -> _VectorisedFloat:
"""The number of virus quanta per meter^3."""
exposure = 0.0
def integrate(fn, start, stop):
values = np.linspace(start, stop)
return np.trapz([fn(v) for v in values], values)
return np.trapz([fn(v) for v in values], values, axis=0)
for start, stop in self.exposed.presence.boundaries():
exposure += integrate(self.concentration_model.concentration, start, stop)
return exposure * self.repeats
def infection_probability(self):
def infection_probability(self) -> _VectorisedFloat:
exposure = self.quanta_exposure()
inf_aero = (
@ -722,12 +722,12 @@ class ExposureModel:
# Probability of infection.
return (1 - np.exp(-inf_aero)) * 100
def expected_new_cases(self):
def expected_new_cases(self) -> _VectorisedFloat:
prob = self.infection_probability()
exposed_occupants = self.exposed.number
return prob * exposed_occupants / 100
def reproduction_number(self):
def reproduction_number(self) -> _VectorisedFloat:
"""
The reproduction number can be thought of as the expected number of
cases directly generated by one infected case in a population.

View file

@ -0,0 +1,83 @@
import typing
import numpy as np
import numpy.testing
import pytest
from cara import models
from cara.models import ExposureModel
class KnownConcentrations(models.ConcentrationModel):
"""
A ConcentrationModel which is based on pre-known quanta concentrations and
which therefore doesn't need other components. Useful for testing.
"""
def __init__(self, concentration_function: typing.Callable) -> None:
self._func = concentration_function
def concentration(self, time: float) -> models._VectorisedFloat: # noqa
return self._func(time)
halftime = models.PeriodicInterval(120, 60)
populations = [
# A simple scalar population.
models.Population(
10, halftime, models.Mask.types['Type I'],
models.Activity.types['Standing'],
),
# A population with some array component for η_inhale.
models.Population(
10, halftime, models.Mask(0.95, 0.15, np.array([0.3, 0.35])),
models.Activity.types['Standing'],
),
]
@pytest.mark.parametrize(
"population, cm, expected_exposure",[
[populations[1], KnownConcentrations(lambda t: 1.2), np.array([14.4, 14.4])],
[populations[0], KnownConcentrations(lambda t: np.array([1.2, 2.4])), np.array([14.4, 28.8])],
[populations[1], KnownConcentrations(lambda t: np.array([1.2, 2.4])), np.array([14.4, 28.8])],
])
def test_exposure_model_ndarray(population, cm, expected_exposure):
model = ExposureModel(cm, population)
np.testing.assert_almost_equal(
model.quanta_exposure(), expected_exposure
)
assert isinstance(model.infection_probability(), np.ndarray)
assert isinstance(model.expected_new_cases(), np.ndarray)
assert model.infection_probability().shape == (2,)
assert model.expected_new_cases().shape == (2,)
@pytest.mark.parametrize("population", populations)
def test_exposure_model_ndarray_and_float_mix(population):
cm = KnownConcentrations(lambda t: 1.2 if np.floor(t) % 2 else np.array([1.2, 1.2]))
model = ExposureModel(cm, population)
expected_exposure = np.array([14.4, 14.4])
np.testing.assert_almost_equal(
model.quanta_exposure(), expected_exposure
)
assert isinstance(model.infection_probability(), np.ndarray)
assert isinstance(model.expected_new_cases(), np.ndarray)
@pytest.mark.parametrize("population", populations)
def test_exposure_model_compare_scalar_vector(population):
cm_scalar = KnownConcentrations(lambda t: 1.2)
cm_array = KnownConcentrations(lambda t: np.array([1.2, 1.2]))
model_scalar = ExposureModel(cm_scalar, population)
model_array = ExposureModel(cm_array, population)
expected_exposure = 14.4
np.testing.assert_almost_equal(
model_scalar.quanta_exposure(), expected_exposure
)
np.testing.assert_almost_equal(
model_array.quanta_exposure(), np.array([expected_exposure]*2)
)

View file

@ -0,0 +1,49 @@
import numpy as np
import pytest
import cara.models
@pytest.mark.parametrize(
"override_params", [
{'viral_load_in_sputum': np.array([5e8, 1e9])},
{'coefficient_of_infectivity': np.array([0.02, 0.05])},
{'η_exhale': np.array([0.92, 0.95])},
{'η_leaks': np.array([0.15, 0.20])},
]
)
def test_infected_population_vectorisation(override_params):
defaults = {
'virus_halflife': 1.1,
'viral_load_in_sputum': 1e9,
'coefficient_of_infectivity': 0.02,
'η_exhale': 0.95,
'η_leaks': 0.15,
}
defaults.update(override_params)
office_hours = cara.models.SpecificInterval(present_times=[(8,17)])
infected = cara.models.InfectedPopulation(
number=1,
presence=office_hours,
mask=cara.models.Mask(
η_exhale=defaults['η_exhale'],
η_leaks=defaults['η_leaks'],
η_inhale=0.3,
),
activity=cara.models.Activity(
0.51,
0.75,
),
virus=cara.models.Virus(
halflife=defaults['virus_halflife'],
viral_load_in_sputum=defaults['viral_load_in_sputum'],
coefficient_of_infectivity=defaults['coefficient_of_infectivity'],
),
expiration=cara.models.Expiration(
ejection_factor=(0.084, 0.009, 0.003, 0.002),
),
)
emission_rate = infected.emission_rate(10)
assert isinstance(emission_rate, np.ndarray)
assert emission_rate.shape == (2, )

View file

@ -68,7 +68,7 @@ def test_hinged_window(baseline_hingedwindow, window_width,
)},
]
)
def test_HingedWindow_vectorisation(override_params):
def test_hinged_window_vectorisation(override_params):
defaults = {
'window_height': 0.15,
'window_width': 0.15,
@ -92,6 +92,28 @@ def test_sliding_window(baseline_slidingwindow):
assert baseline_slidingwindow.discharge_coefficient == 0.6
def test_hvac_mechanical_vectorisation():
room = models.Room(volume=50)
interval = models.SpecificInterval(((0, 4), (5, 9)))
t = 0.5
q_air_mech = np.array([250., 500.])
v = models.HVACMechanical(interval,q_air_mech)
assert isinstance(v.air_exchange(room, t), np.ndarray)
npt.assert_array_equal(v.air_exchange(room, t),
np.array([250/room.volume, 500/room.volume]))
def test_hepa_filter_vectorisation():
room = models.Room(volume=50)
interval = models.SpecificInterval(((0, 4), (5, 9)))
t = 0.5
q_air_mech = np.array([250., 500.])
v = models.HEPAFilter(interval,q_air_mech)
assert isinstance(v.air_exchange(room, t), np.ndarray)
npt.assert_array_equal(v.air_exchange(room, t),
np.array([250/room.volume, 500/room.volume]))
def test_multiple(baseline_slidingwindow, baseline_hingedwindow):
v = models.MultipleVentilation([baseline_hingedwindow, baseline_slidingwindow])
room = models.Room(75)