Use floats throughout the cara codebase to represent time.

This commit is contained in:
Phil Elson 2021-08-05 15:48:24 +02:00
parent e1d8b3a2c8
commit aa269aaf6e
12 changed files with 193 additions and 135 deletions

View file

@ -550,7 +550,6 @@ class FormData:
if current_time < finish:
LOG.debug("trailing interval")
present_intervals.append((current_time / 60, finish / 60))
return models.SpecificInterval(tuple(present_intervals))
def infected_present_interval(self) -> models.Interval:

View file

@ -34,8 +34,10 @@ def calculate_report_data(model: models.ExposureModel):
t_start, t_end = model_start_end(model)
times = np.linspace(t_start, t_end, resolution)
concentrations = [np.array(model.concentration_model.concentration(time)).mean()
for time in times]
concentrations = [
np.array(model.concentration_model.concentration(float(time))).mean()
for time in times
]
highest_const = max(concentrations)
prob = np.array(model.infection_probability()).mean()
er = np.array(model.concentration_model.infected.emission_rate_when_present()).mean()

View file

@ -488,14 +488,14 @@ baseline_model = models.ExposureModel(
room=models.Room(volume=75),
ventilation=models.SlidingWindow(
active=models.PeriodicInterval(period=120, duration=15),
inside_temp=models.PiecewiseConstant((0,24),(293.15,)),
outside_temp=models.PiecewiseConstant((0,24),(283.15,)),
inside_temp=models.PiecewiseConstant((0., 24.), (293.15,)),
outside_temp=models.PiecewiseConstant((0., 24.), (283.15,)),
window_height=1.6, opening_length=0.6,
),
infected=models.InfectedPopulation(
number=1,
virus=models.Virus.types['SARS_CoV_2'],
presence=models.SpecificInterval(((8, 12), (13, 17))),
presence=models.SpecificInterval(((8., 12.), (13., 17.))),
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Seated'],
expiration=models.Expiration.types['Talking'],
@ -503,7 +503,7 @@ baseline_model = models.ExposureModel(
),
exposed=models.Population(
number=10,
presence=models.SpecificInterval(((8, 12), (13, 17))),
presence=models.SpecificInterval(((8., 12.), (13., 17.))),
activity=models.Activity.types['Seated'],
mask=models.Mask.types['No mask'],
),

View file

@ -30,15 +30,19 @@ Geneva_hourly_temperatures_celsius_per_hour = {
}
# Geneva hourly temperatures as piecewise constant function (in Kelvin)
# Geneva hourly temperatures as piecewise constant function (in Kelvin).
GenevaTemperatures_hourly = {
month: models.PiecewiseConstant(tuple(np.arange(25.)),
tuple(273.15+np.array(temperatures)))
for month,temperatures in Geneva_hourly_temperatures_celsius_per_hour.items()
month: models.PiecewiseConstant(
# NOTE: It is important that the time type is float, not np.float, in
# order to allow hashability (for caching).
tuple(float(time) for time in range(25)),
tuple(273.15 + np.array(temperatures)),
)
for month, temperatures in Geneva_hourly_temperatures_celsius_per_hour.items()
}
# same temperatures on a finer temperature mesh
# Same temperatures on a finer temperature mesh.
GenevaTemperatures = {
month: GenevaTemperatures_hourly[month].refine(refine_factor=4)
for month,temperatures in Geneva_hourly_temperatures_celsius_per_hour.items()
for month, temperatures in Geneva_hourly_temperatures_celsius_per_hour.items()
}

View file

@ -46,6 +46,8 @@ else:
# by providing a no-op cache decorator when type-checking.
cached = lambda *cached_args, **cached_kwargs: lambda function: function # noqa
from .utils import method_cache
from .dataclass_utils import nested_replace
@ -62,7 +64,7 @@ class Room:
volume: _VectorisedFloat
#: The humidity in the room (from 0 to 1 - e.g. 0.5 is 50% humidity)
humidity: _VectorisedFloat=0.5
humidity: _VectorisedFloat = 0.5
Time_t = typing.TypeVar('Time_t', float, int)
@ -127,7 +129,9 @@ class PeriodicInterval(Interval):
return tuple()
result = []
for i in np.arange(0, 24, self.period / 60):
result.append((i, i+self.duration/60))
# NOTE: It is important that the time type is float, not np.float, in
# order to allow hashability (for caching).
result.append((float(i), float(i+self.duration/60)))
return tuple(result)
@ -183,7 +187,9 @@ class PiecewiseConstant:
np.concatenate([self.values, self.values[-1:]], axis=0),
axis=0)
return PiecewiseConstant(
tuple(refined_times),
# NOTE: It is important that the time type is float, not np.float, in
# order to allow hashability (for caching).
tuple(float(time) for time in refined_times),
tuple(interpolator(refined_times)[:-1]),
)
@ -740,8 +746,10 @@ class ConcentrationModel:
# Deposition rate (h^-1)
k = (vg * 3600) / h
return k + self.virus.decay_constant(self.room.humidity
) + self.ventilation.air_exchange(self.room, time)
return (
k + self.virus.decay_constant(self.room.humidity)
+ self.ventilation.air_exchange(self.room, time)
)
def _concentration_limit(self, time: float) -> _VectorisedFloat:
"""
@ -754,7 +762,7 @@ class ConcentrationModel:
return (self.infected.emission_rate(time)) / (IVRR * V)
def state_change_times(self):
def state_change_times(self) -> typing.List[float]:
"""
All time dependent entities on this model must provide information about
the times at which their state changes.
@ -763,10 +771,9 @@ class ConcentrationModel:
state_change_times = set()
state_change_times.update(self.infected.presence.transition_times())
state_change_times.update(self.ventilation.transition_times())
return sorted(state_change_times)
def last_state_change(self, time: float):
def last_state_change(self, time: float) -> float:
"""
Find the most recent state change.
@ -774,9 +781,9 @@ class ConcentrationModel:
for change_time in self.state_change_times()[::-1]:
if change_time < time:
return change_time
return 0
return 0.
def _next_state_change(self, time: float):
def _next_state_change(self, time: float) -> float:
"""
Find the nearest future state change.
@ -797,7 +804,7 @@ class ConcentrationModel:
"""
return (self.last_state_change(stop) <= start)
@cached()
@method_cache
def _concentration_at_state_change(self, time: float) -> _VectorisedFloat:
return self.concentration(time)
@ -811,7 +818,6 @@ class ConcentrationModel:
Note that time is not vectorised. You can only pass a single float
to this method.
"""
if time == 0:
return 0.0
next_state_change_time = self._next_state_change(time)
@ -839,7 +845,7 @@ class ConcentrationModel:
start = max([interval_start, req_start])
stop = min([interval_stop, req_stop])
conc_start = self.concentration(start)
conc_start = self._concentration_at_state_change(start)
next_conc_state = self._next_state_change(stop)
conc_limit = self._concentration_limit(next_conc_state)

View file

@ -8,13 +8,13 @@ def baseline_model():
model = models.ConcentrationModel(
room=models.Room(volume=75),
ventilation=models.AirChange(
active=models.SpecificInterval(((0,24),)),
active=models.SpecificInterval(((0., 24.), )),
air_exch=30.,
),
infected=models.InfectedPopulation(
number=1,
virus=models.Virus.types['SARS_CoV_2'],
presence=models.SpecificInterval(((0, 4), (5, 8))),
presence=models.SpecificInterval(((0., 4.), (5., 8.))),
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Light activity'],
expiration=models.Expiration.types['Superspreading event'],

View file

@ -53,7 +53,7 @@ def test_concentration_model_vectorisation(override_params):
@pytest.fixture
def simple_conc_model():
interesting_times = models.SpecificInterval(([0, 1], [1.1, 1.999], [2, 3]), )
interesting_times = models.SpecificInterval(([0., 1.], [1.1, 1.999], [2., 3.]), )
return models.ConcentrationModel(
models.Room(75),
models.AirChange(interesting_times, 100),
@ -86,13 +86,13 @@ def test_next_state_change_time(
time,
expected_next_state_change,
):
assert simple_conc_model._next_state_change(time) == expected_next_state_change
assert simple_conc_model._next_state_change(float(time)) == expected_next_state_change
def test_next_state_change_time_out_of_range(simple_conc_model: models.ConcentrationModel):
with pytest.raises(
ValueError,
match=re.escape("The requested time (3.1) is greater than last available state change time (3)")
match=re.escape("The requested time (3.1) is greater than last available state change time (3.0)")
):
simple_conc_model._next_state_change(3.1)

View file

@ -7,6 +7,7 @@ from dataclasses import dataclass
from cara import models
from cara.models import ExposureModel
from cara.dataclass_utils import replace
@dataclass(frozen=True)
@ -26,10 +27,10 @@ class KnownConcentrations(models.ConcentrationModel):
return self.concentration_function(time)
def state_change_times(self):
return [0, 24]
return [0., 24.]
def _next_state_change(self, time: float):
return 24
return 24.
def concentration(self, time: float) -> models._VectorisedFloat: # noqa
return self.concentration_function(time)
@ -53,37 +54,37 @@ populations = [
models.Activity(np.array([0.51, 0.57]), 0.57),
),
]
dummy_room = models.Room(50, 0.5)
dummy_ventilation = models._VentilationBase()
dummy_infected_population = models.InfectedPopulation(
number=1,
presence=halftime,
mask=models.Mask.types['Type I'],
activity=models.Activity.types['Standing'],
virus=models.Virus.types['SARS_CoV_2_B117'],
expiration=models.Expiration.types['Talking']
)
def known_concentrations(func):
dummy_room = models.Room(50, 0.5)
dummy_ventilation = models._VentilationBase()
dummy_infected_population = models.InfectedPopulation(
number=1,
presence=halftime,
mask=models.Mask.types['Type I'],
activity=models.Activity.types['Standing'],
virus=models.Virus.types['SARS_CoV_2_B117'],
expiration=models.Expiration.types['Talking']
)
return KnownConcentrations(dummy_room, dummy_ventilation, dummy_infected_population, func)
@pytest.mark.parametrize(
"population, cm, f_dep, expected_exposure, expected_probability", [
[populations[1], known_concentrations(lambda t: 36), 1.,
[populations[1], known_concentrations(lambda t: 36.), 1.,
np.array([432, 432]), np.array([99.6803184113, 99.5181053773])],
[populations[2], known_concentrations(lambda t: 36), 1.,
[populations[2], known_concentrations(lambda t: 36.), 1.,
np.array([432, 432]), np.array([97.4574432074, 98.3493482895])],
[populations[0], known_concentrations(lambda t: np.array([36, 72])), 1.,
[populations[0], known_concentrations(lambda t: np.array([36., 72.])), 1.,
np.array([432, 864]), np.array([98.3493482895, 99.9727534893])],
[populations[1], known_concentrations(lambda t: np.array([36, 72])), 1.,
[populations[1], known_concentrations(lambda t: np.array([36., 72.])), 1.,
np.array([432, 864]), np.array([99.6803184113, 99.9976777757])],
[populations[0], known_concentrations(lambda t: 72), np.array([0.5, 1.]),
[populations[0], known_concentrations(lambda t: 72.), np.array([0.5, 1.]),
864, np.array([98.3493482895, 99.9727534893])],
])
def test_exposure_model_ndarray(population, cm, f_dep,
@ -104,8 +105,8 @@ def test_exposure_model_ndarray(population, cm, f_dep,
@pytest.mark.parametrize("population", populations)
def test_exposure_model_ndarray_and_float_mix(population):
cm = known_concentrations(lambda t: 0 if np.floor(t) %
2 else np.array([1.2, 1.2]))
cm = known_concentrations(
lambda t: 0. if np.floor(t) % 2 else np.array([1.2, 1.2]))
model = ExposureModel(cm, population)
expected_exposure = np.array([14.4, 14.4])
@ -135,8 +136,9 @@ def test_exposure_model_compare_scalar_vector(population):
@pytest.fixture
def conc_model():
interesting_times = models.SpecificInterval(
([0, 1], [1.01, 1.02], [12, 24]))
always = models.SpecificInterval(((0, 24),))
([0., 1.], [1.01, 1.02], [12., 24.]),
)
always = models.SpecificInterval(((0., 24.), ))
return models.ConcentrationModel(
models.Room(25),
models.AirChange(always, 5),
@ -150,18 +152,19 @@ def conc_model():
)
)
# expected exposure were computed with a trapezoidal integration, using
# Expected exposure were computed with a trapezoidal integration, using
# a mesh of 10'000 pts per exposed presence interval.
@pytest.mark.parametrize("exposed_time_interval, expected_exposure", [
[(0, 1), 266.67176],
[(1, 1.01), 3.0879539],
[(1.01, 1.02), 3.00082435],
[(12, 12.01), 0.095063235],
[(12, 24), 3775.65025],
[(0, 24), 4097.8494],
]
@pytest.mark.parametrize(
["exposed_time_interval", "expected_exposure"],
[
[(0., 1.), 266.67176],
[(1., 1.01), 3.0879539],
[(1.01, 1.02), 3.00082435],
[(12., 12.01), 0.095063235],
[(12., 24.), 3775.65025],
[(0., 24.), 4097.8494],
]
)
def test_exposure_model_integral_accuracy(exposed_time_interval,
expected_exposure, conc_model):
@ -186,10 +189,10 @@ def test_infectious_dose_vectorisation():
),
expiration=models.Expiration.types['Talking']
)
cm = KnownConcentrations(
dummy_room, dummy_ventilation, infected_population, lambda t: 1.2)
cm = known_concentrations(lambda t: 1.2)
cm = replace(cm, infected=infected_population)
presence_interval = models.SpecificInterval(((0, 1),))
presence_interval = models.SpecificInterval(((0., 1.),))
population = models.Population(
10, presence_interval, models.Mask.types['Type I'],
models.Activity.types['Standing'],

View file

@ -9,7 +9,7 @@ import cara.data as data
def test_no_mask_superspeading_emission_rate(baseline_model):
expected_rate = 48500.
npt.assert_allclose(
[baseline_model.infected.emission_rate(t) for t in [0, 1, 4, 4.5, 5, 8, 9]],
[baseline_model.infected.emission_rate(float(t)) for t in [0, 1, 4, 4.5, 5, 8, 9]],
[0, expected_rate, expected_rate, 0, 0, expected_rate, 0],
rtol=1e-12
)
@ -19,8 +19,8 @@ def test_no_mask_superspeading_emission_rate(baseline_model):
def baseline_periodic_window():
return models.SlidingWindow(
active=models.PeriodicInterval(period=120, duration=15),
inside_temp=models.PiecewiseConstant((0,24),(293,)),
outside_temp=models.PiecewiseConstant((0,24),(283,)),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=models.PiecewiseConstant((0., 24.), (283,)),
window_height=1.6, opening_length=0.6,
)
@ -41,7 +41,7 @@ def baseline_periodic_hepa():
def test_concentrations(baseline_model):
# expected concentrations were computed analytically
ts = [0, 4, 5, 7, 10]
concentrations = [baseline_model.concentration(t) for t in ts]
concentrations = [baseline_model.concentration(float(t)) for t in ts]
npt.assert_allclose(
concentrations,
[0.000000e+00, 20.805628, 6.602814e-13, 20.805628, 2.09545e-26],
@ -55,7 +55,7 @@ def test_smooth_concentrations(baseline_model):
dx = 0.002
dy_limit = 0.2 # Anything more than this (in relative) is a bit steep.
ts = np.arange(0, 10, dx)
concentrations = [baseline_model.concentration(t) for t in ts]
concentrations = [baseline_model.concentration(float(t)) for t in ts]
assert np.abs(np.diff(concentrations)).max()/np.mean(concentrations) < dy_limit
@ -69,7 +69,7 @@ def build_model(interval_duration):
infected=models.InfectedPopulation(
number=1,
virus=models.Virus.types['SARS_CoV_2'],
presence=models.SpecificInterval(((0, 4), (5, 8))),
presence=models.SpecificInterval(((0., 4.), (5., 8.))),
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Light activity'],
expiration=models.Expiration.types['Superspreading event'],
@ -78,7 +78,7 @@ def build_model(interval_duration):
return model
def test_concentrations_startup(baseline_model):
def test_concentrations_startup():
# The concentrations should be the same until the beginning of the
# first time that the ventilation is disabled.
m1 = build_model(interval_duration=120)
@ -183,28 +183,38 @@ def test_multiple_ventilation_HEPA_HVAC_AirChange(volume, expected_value):
],
)
def test_windowopening(time, expected_value):
tempOutside = models.PiecewiseConstant((0,10,24),(273.15,283.15))
tempInside = models.PiecewiseConstant((0,24),(293.15,))
w = models.SlidingWindow(active=models.SpecificInterval([(0,24)]),
inside_temp=tempInside,outside_temp=tempOutside,
window_height=1.,opening_length=0.6)
npt.assert_allclose(w.air_exchange(models.Room(volume=68),time),
expected_value,rtol=1e-5)
tempOutside = models.PiecewiseConstant((0., 10., 24.),(273.15, 283.15))
tempInside = models.PiecewiseConstant((0., 24.), (293.15,))
w = models.SlidingWindow(
active=models.SpecificInterval([(0., 24.)]),
inside_temp=tempInside,outside_temp=tempOutside,
window_height=1., opening_length=0.6,
)
npt.assert_allclose(
w.air_exchange(models.Room(volume=68), time), expected_value, rtol=1e-5
)
def build_hourly_dependent_model(month, intervals_open=((7.5, 8.5),),
intervals_presence_infected=((0, 4), (5, 7.5)),
artificial_refinement=False,
temperatures=data.GenevaTemperatures_hourly):
def build_hourly_dependent_model(
month,
intervals_open=((7.5, 8.5),),
intervals_presence_infected=((0., 4.), (5., 7.5)),
artificial_refinement=False,
temperatures=data.GenevaTemperatures_hourly
):
if artificial_refinement:
# 5-fold increase of number of times, WITHOUT interpolation
# (hence transparent for the results)
refine_factor = 2
times_refined = tuple(np.linspace(0.,24,
refine_factor*len(temperatures[month].values)+1))
temperatures_refined = tuple(np.hstack([[v]*refine_factor
for v in temperatures[month].values]))
outside_temp = models.PiecewiseConstant(times_refined,temperatures_refined)
times_refined = tuple(
float(t) for t in np.linspace(
0., 24, refine_factor * len(temperatures[month].values) + 1
)
)
temperatures_refined = tuple(np.hstack(
[[v] * refine_factor for v in temperatures[month].values]
))
outside_temp = models.PiecewiseConstant(times_refined, temperatures_refined)
else:
outside_temp = temperatures[month]
@ -212,7 +222,7 @@ def build_hourly_dependent_model(month, intervals_open=((7.5, 8.5),),
room=models.Room(volume=75),
ventilation=models.SlidingWindow(
active=models.SpecificInterval(intervals_open),
inside_temp=models.PiecewiseConstant((0,24),(293,)),
inside_temp=models.PiecewiseConstant((0., 24.), (293, )),
outside_temp=outside_temp,
window_height=1.6, opening_length=0.6,
),
@ -233,14 +243,14 @@ def build_constant_temp_model(outside_temp, intervals_open=((7.5, 8.5),)):
room=models.Room(volume=75),
ventilation=models.SlidingWindow(
active=models.SpecificInterval(intervals_open),
inside_temp=models.PiecewiseConstant((0,24),(293,)),
outside_temp=models.PiecewiseConstant((0,24),(outside_temp,)),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=models.PiecewiseConstant((0., 24.), (outside_temp,)),
window_height=1.6, opening_length=0.6,
),
infected=models.InfectedPopulation(
number=1,
virus=models.Virus.types['SARS_CoV_2'],
presence=models.SpecificInterval(((0, 4), (5, 7.5))),
presence=models.SpecificInterval(((0., 4.), (5., 7.5))),
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Light activity'],
expiration=models.Expiration.types['Superspreading event'],
@ -253,21 +263,22 @@ def build_hourly_dependent_model_multipleventilation(month, intervals_open=((7.5
vent = models.MultipleVentilation((
models.SlidingWindow(
active=models.SpecificInterval(intervals_open),
inside_temp=models.PiecewiseConstant((0,24),(293,)),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=data.GenevaTemperatures[month],
window_height=1.6, opening_length=0.6,
),
models.HEPAFilter(
active=models.SpecificInterval(((0,24),)),
q_air_mech=500.,
)))
active=models.SpecificInterval(((0., 24.),)),
q_air_mech=500.,
),
))
model = models.ConcentrationModel(
room=models.Room(volume=75),
ventilation=vent,
infected=models.InfectedPopulation(
number=1,
virus=models.Virus.types['SARS_CoV_2'],
presence=models.SpecificInterval(((0, 4), (5, 7.5))),
presence=models.SpecificInterval(((0., 4.), (5., 7.5))),
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Light activity'],
expiration=models.Expiration.types['Superspreading event'],
@ -288,7 +299,7 @@ def test_concentrations_hourly_dep_temp_vs_constant(month, temperatures, time):
# The concentrations should be the same up to 8 AM (time when the
# temperature changes DURING the window opening).
m1 = build_hourly_dependent_model(month)
m2 = build_constant_temp_model(temperatures[7]+273.15)
m2 = build_constant_temp_model(temperatures[7] + 273.15)
npt.assert_allclose(m1.concentration(time), m2.concentration(time), rtol=1e-5)
@pytest.mark.parametrize(
@ -302,8 +313,11 @@ def test_concentrations_hourly_dep_temp_vs_constant(month, temperatures, time):
def test_concentrations_hourly_dep_temp_startup(month, temperatures, time):
# The concentrations should be the zero up to the first presence time
# of an infecter person.
m = build_hourly_dependent_model(month,((0.,0.5),(1,1.5),(4,4.5),(7.5,8)),
((8,12.),))
m = build_hourly_dependent_model(
month,
((0., 0.5), (1., 1.5), (4., 4.5), (7.5, 8), ),
((8., 12.), ),
)
assert m.concentration(time) == 0.
@ -323,19 +337,22 @@ def test_concentrations_hourly_dep_multipleventilation():
def test_concentrations_hourly_dep_adding_artificial_transitions(month_temp_item, time):
month, temperatures = month_temp_item
# Adding a second opening inside the first one should not change anything
m1 = build_hourly_dependent_model(month,intervals_open=((7.5, 8.5),))
m2 = build_hourly_dependent_model(month,intervals_open=((7.5, 8.5),(8.,8.1)))
m1 = build_hourly_dependent_model(month, intervals_open=((7.5, 8.5), ))
m2 = build_hourly_dependent_model(month, intervals_open=((7.5, 8.5), (8., 8.1), ))
npt.assert_allclose(m1.concentration(time), m2.concentration(time), rtol=1e-5)
@pytest.mark.parametrize(
"time",
list(np.random.random_sample(10)*24.)+list(np.arange(0,24.5,0.5)),
(
[float(t) for t in np.random.random_sample(10) * 24.] # type: ignore
+ [float(t) for t in np.arange(0, 24.5, 0.5)]
),
)
def test_concentrations_refine_times(time):
month = 'Jan'
m1 = build_hourly_dependent_model(month,intervals_open=((0, 24),))
m2 = build_hourly_dependent_model(month,intervals_open=((0, 24),),
m1 = build_hourly_dependent_model(month, intervals_open=((0., 24.),))
m2 = build_hourly_dependent_model(month, intervals_open=((0., 24.),),
artificial_refinement=True)
npt.assert_allclose(m1.concentration(time), m2.concentration(time), rtol=1e-8)
@ -350,7 +367,7 @@ def build_exposure_model(concentration_model):
activity=infected.activity,
mask=infected.mask,
),
fraction_deposited = 1.,
fraction_deposited=1.,
)
@ -367,8 +384,8 @@ def test_exposure_hourly_dep(month,expected_exposure):
m = build_exposure_model(
build_hourly_dependent_model(
month,
intervals_open=((0,24),),
intervals_presence_infected=((8, 12), (13, 17))
intervals_open=((0., 24.), ),
intervals_presence_infected=((8., 12.), (13., 17.))
)
)
exposure = m.exposure()
@ -388,8 +405,8 @@ def test_exposure_hourly_dep_refined(month,expected_exposure):
m = build_exposure_model(
build_hourly_dependent_model(
month,
intervals_open=((0, 24),),
intervals_presence_infected=((8, 12), (13, 17)),
intervals_open=((0., 24.),),
intervals_presence_infected=((8., 12.), (13., 17.)),
temperatures=data.GenevaTemperatures,
)
)

View file

@ -43,14 +43,14 @@ def baseline_mc_model() -> cara.monte_carlo.ConcentrationModel:
room=cara.monte_carlo.Room(volume=cara.monte_carlo.sampleable.Normal(75, 20)),
ventilation=cara.monte_carlo.SlidingWindow(
active=cara.models.PeriodicInterval(period=120, duration=120),
inside_temp=cara.models.PiecewiseConstant((0, 24), (293,)),
outside_temp=cara.models.PiecewiseConstant((0, 24), (283,)),
inside_temp=cara.models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=cara.models.PiecewiseConstant((0., 24.), (283,)),
window_height=1.6, opening_length=0.6,
),
infected=cara.models.InfectedPopulation(
number=1,
virus=cara.models.Virus.types['SARS_CoV_2'],
presence=cara.models.SpecificInterval(((0, 4), (5, 8))),
presence=cara.models.SpecificInterval(((0., 4.), (5., 8.))),
mask=cara.models.Mask.types['No mask'],
activity=cara.models.Activity.types['Light activity'],
expiration=cara.models.Expiration.types['Breathing'],
@ -75,8 +75,8 @@ def baseline_mc_exposure_model(baseline_mc_model) -> cara.monte_carlo.ExposureMo
def test_build_concentration_model(baseline_mc_model: cara.monte_carlo.ConcentrationModel):
model = baseline_mc_model.build_model(7)
assert isinstance(model, cara.models.ConcentrationModel)
assert isinstance(model.concentration(time=0), float)
conc = model.concentration(time=1)
assert isinstance(model.concentration(time=0.), float)
conc = model.concentration(time=1.)
assert isinstance(conc, np.ndarray)
assert conc.shape == (7, )

View file

@ -26,12 +26,12 @@ def shared_office_mc():
(
models.SlidingWindow(
active=models.PeriodicInterval(period=120, duration=10),
inside_temp=models.PiecewiseConstant((0, 24), (293,)),
outside_temp=models.PiecewiseConstant((0, 24), (283,)),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=models.PiecewiseConstant((0., 24.), (283,)),
window_height=1.6, opening_length=0.6,
),
models.AirChange(
active=models.SpecificInterval(((0,24),)),
active=models.SpecificInterval(((0., 24.), )),
air_exch=0.25,
),
),
@ -39,7 +39,7 @@ def shared_office_mc():
infected=mc.InfectedPopulation(
number=1,
virus=virus_distributions['SARS_CoV_2_B117'],
presence=mc.SpecificInterval(((0, 2), (2.1, 4), (5, 7), (7.1, 9))),
presence=mc.SpecificInterval(((0., 2.), (2.1, 4.), (5., 7.), (7.1, 9.))),
mask=models.Mask(η_inhale=0.3),
activity=activity_distributions['Seated'],
expiration=models.MultipleExpiration(
@ -70,12 +70,12 @@ def classroom_mc():
(
models.SlidingWindow(
active=models.PeriodicInterval(period=120, duration=10),
inside_temp=models.PiecewiseConstant((0, 24), (293,)),
outside_temp=models.PiecewiseConstant((0, 24), (283,)),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=models.PiecewiseConstant((0., 24.), (283,)),
window_height=1.6, opening_length=0.6,
),
models.AirChange(
active=models.SpecificInterval(((0,24),)),
active=models.SpecificInterval(((0., 24.),)),
air_exch=0.25,
),
),
@ -83,7 +83,7 @@ def classroom_mc():
infected=mc.InfectedPopulation(
number=1,
virus=virus_distributions['SARS_CoV_2_B117'],
presence=mc.SpecificInterval(((0, 2), (2.5, 4), (5, 7), (7.5, 9))),
presence=mc.SpecificInterval(((0., 2.), (2.5, 4.), (5., 7.), (7.5, 9.))),
mask=models.Mask.types['No mask'],
activity=activity_distributions['Light activity'],
expiration=models.Expiration.types['Talking'],
@ -108,13 +108,13 @@ def ski_cabin_mc():
concentration_mc = mc.ConcentrationModel(
room=models.Room(volume=10, humidity=0.5),
ventilation=models.AirChange(
active=models.SpecificInterval(((0,24),)),
active=models.SpecificInterval(((0., 24.),)),
air_exch=0,
),
infected=mc.InfectedPopulation(
number=1,
virus=virus_distributions['SARS_CoV_2_B117'],
presence=mc.SpecificInterval(((0, 1/3),)),
presence=mc.SpecificInterval(((0., 1/3),)),
mask=models.Mask(η_inhale=0.3),
activity=activity_distributions['Moderate activity'],
expiration=models.Expiration.types['Talking'],
@ -141,13 +141,13 @@ def gym_mc():
concentration_mc = mc.ConcentrationModel(
room=models.Room(volume=300, humidity=0.5),
ventilation=models.AirChange(
active=models.SpecificInterval(((0,24),)),
active=models.SpecificInterval(((0., 24.),)),
air_exch=6,
),
infected=mc.InfectedPopulation(
number=2,
virus=virus_distributions['SARS_CoV_2_B117'],
presence=mc.SpecificInterval(((0, 1),)),
presence=mc.SpecificInterval(((0., 1.),)),
mask=models.Mask.types["No mask"],
activity=activity_distributions['Heavy exercise'],
expiration=models.Expiration.types['Breathing'],
@ -173,13 +173,13 @@ def waiting_room_mc():
concentration_mc = mc.ConcentrationModel(
room=models.Room(volume=100, humidity=0.5),
ventilation=models.AirChange(
active=models.SpecificInterval(((0,24),)),
active=models.SpecificInterval(((0., 24.),)),
air_exch=0.25,
),
infected=mc.InfectedPopulation(
number=1,
virus=virus_distributions['SARS_CoV_2_B117'],
presence=mc.SpecificInterval(((0, 2),)),
presence=mc.SpecificInterval(((0., 2.),)),
mask=models.Mask.types["No mask"],
activity=activity_distributions['Seated'],
expiration=models.MultipleExpiration(
@ -215,7 +215,7 @@ def skagit_chorale_mc():
infected=mc.InfectedPopulation(
number=1,
virus=virus_distributions['SARS_CoV_2'],
presence=mc.SpecificInterval(((0, 2.5),)),
presence=mc.SpecificInterval(((0., 2.5),)),
mask=models.Mask.types["No mask"],
activity=activity_distributions['Light activity'],
expiration=models.Expiration((5., 5., 5.)),
@ -274,13 +274,13 @@ def test_small_shared_office_Geneva(mask_type, month, expected_pi,
ventilation=models.MultipleVentilation(
(
models.SlidingWindow(
active=models.SpecificInterval(((0,24),)),
inside_temp=models.PiecewiseConstant((0, 24), (293,)),
active=models.SpecificInterval(((0., 24.),)),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=data.GenevaTemperatures[month],
window_height=1.5, opening_length=0.2,
),
models.AirChange(
active=models.SpecificInterval(((0,24),)),
active=models.SpecificInterval(((0., 24.),)),
air_exch=0.25,
),
),
@ -288,7 +288,7 @@ def test_small_shared_office_Geneva(mask_type, month, expected_pi,
infected=mc.InfectedPopulation(
number=1,
virus=virus_distributions['SARS_CoV_2_B117'],
presence=mc.SpecificInterval(((9, 10+2/3), (10+5/6, 12.5), (13.5, 15+2/3), (15+5/6, 18))),
presence=mc.SpecificInterval(((9., 10+2/3), (10+5/6, 12.5), (13.5, 15+2/3), (15+5/6, 18.))),
mask=models.Mask.types[mask_type],
activity=activity_distributions['Seated'],
expiration=models.MultipleExpiration(

27
cara/utils.py Normal file
View file

@ -0,0 +1,27 @@
import functools
def method_cache(fn):
"""
A decorator for instance based caching.
Unlike lru_cache / memoization, this allows us to not have to have the
instance itself be hashable - only the arguments must be so.
The cache is stored as a dictionary in a private attribute on the instance
with the name ``_cache_{func_name}``.
"""
cache_name = f'_cache_{fn.__name__}'
@functools.wraps(fn)
def cached_method(self, *args, **kwargs):
cache = getattr(self, cache_name, None)
if cache is None:
cache = {}
object.__setattr__(self, cache_name, cache)
cache_key = hash(args + tuple(kwargs.items()))
if cache_key not in cache:
cache[cache_key] = fn(self, *args, **kwargs)
return cache[cache_key]
return cached_method