diff --git a/cara/apps.py b/cara/apps.py index 2fad786c..3caa3fb1 100644 --- a/cara/apps.py +++ b/cara/apps.py @@ -34,8 +34,8 @@ class ConcentrationFigure: self.line = None def update(self, model: models.Model): - resolution = 600 - ts = np.linspace(0, 10, resolution) + resolution = 500 + ts = np.linspace(0, 9.09, resolution) concentration = [model.concentration(t) for t in ts] if self.line is None: [self.line] = self.ax.plot(ts, concentration) @@ -56,6 +56,11 @@ class ConcentrationFigure: self.line.set_data(ts, concentration) self.ax.relim() self.ax.autoscale_view() + # self.ax.set_yscale('log') + # if max(concentration) > 1: + self.ax.set_ylim(bottom=1e-4, top=5) + # else: + # self.ax.set_ylim(bottom=0, top=1) self.figure.canvas.draw() diff --git a/cara/models.py b/cara/models.py index ce0c64d9..643d33ea 100644 --- a/cara/models.py +++ b/cara/models.py @@ -33,31 +33,54 @@ class Interval: @dataclass(frozen=True) class PeriodicInterval(Interval): - #: How often does the interval occur. + #: How often does the interval occur (minutes). period: int - #: How long does the interval occur for. + #: How long does the interval occur for (minutes). #: A value greater than :data:`period` signifies the event is permanently #: occurring, a value of 0 signifies that the event never happens. duration: int + def states(self): + result = [] + for i in np.arange(0, 24, self.period / 60): + result.append((i, i+self.duration/60)) + return tuple(result) + def triggered(self, time: float) -> bool: + for start, end in self.states(): + if start < time <= end: + return True + return False + + if self.duration >= self.period: + return True + if self.duration == 0: + return False + period = self.period / 60. duration = self.duration / 60. - return (time % period) >= (period - duration) + t = time % period + return 0 <= t < duration def boundaries(self) -> typing.Set[float]: state_changes = set() + for start, end in self.states(): + state_changes.add(start) + state_changes.add(end) + return state_changes + + if self.duration >= self.period: + return set() + if self.duration == 0: + return set() + state_changes = set() period_h = self.period / 60 duration_h = self.duration / 60 - # The current implementation starts at the end of the first period, not at 0. - start = period_h # Take as many steps as we need to get to a full day. - for i in np.arange(start, 24, period_h): + for i in np.arange(0, 24, period_h): state_changes.add(i) - if duration_h < period_h: - state_changes.add(i - duration_h) - + state_changes.add(i + duration_h) return state_changes @@ -260,20 +283,20 @@ class InfectedPerson: def person_present(self, time): for start, end in self.present_times: - if start <= time <= end: + if start < time <= end: return True return False - def start_end_of_presence(self, time) -> typing.Tuple[float, float]: - """ - Find the most recent start (and associated) end-time (even if the - given time is after the end-point) given a time. - - """ - for start, end in self.present_times[::-1]: - if time > start: - return start, end - return start, end + # def start_end_of_presence(self, time) -> typing.Tuple[float, float]: + # """ + # Find the most recent start (and associated) end-time (even if the + # given time is after the end-point) given a time. + # + # """ + # for start, end in self.present_times[::-1]: + # if time > start: + # return start, end + # return start, end @functools.lru_cache() def emission_rate(self, time) -> float: @@ -339,26 +362,24 @@ class Model: """ for change_time in self.collect_time_state_changes()[::-1]: - if time >= change_time: + if change_time < time: return change_time return 0 @functools.lru_cache() def concentration(self, time: float) -> float: - t = time + if time == 0: + return 0.0 IVRR = self.infectious_virus_removal_rate(time) V = self.room.volume Ni = self.infected_occupants ER = self.infected.emission_rate(time) - if t == 0: - return 0.0 - t_last_state_change = self.last_state_change(time) - if t != t_last_state_change: - concentration_at_last_state_change = self.concentration(t_last_state_change) - else: - concentration_at_last_state_change = self.concentration(t - 0.0001) - fac = np.exp(IVRR * (t_last_state_change - t)) + t_last_state_change = self.last_state_change(time) + concentration_at_last_state_change = self.concentration(t_last_state_change) + + delta_time = time - t_last_state_change + fac = np.exp(-IVRR * delta_time) concentration_limit = (ER * Ni) / (IVRR * V) return concentration_limit * (1 - fac) + concentration_at_last_state_change * fac diff --git a/cara/tests/test_known_quantities.py b/cara/tests/test_known_quantities.py index b6e4be51..82393d4c 100644 --- a/cara/tests/test_known_quantities.py +++ b/cara/tests/test_known_quantities.py @@ -1,3 +1,4 @@ +import numpy as np import numpy.testing as npt import pytest @@ -17,7 +18,7 @@ def test_no_mask_emission_rate(baseline_model): rate = 167.74011998223307 npt.assert_allclose( [baseline_model.infected.emission_rate(t) for t in [0, 1, 4, 4.5, 5, 8, 9]], - [rate, rate, rate, 0, rate, rate, 0], + [0, rate, rate, 0, 0, rate, 0], rtol=1e-5 ) @@ -72,11 +73,51 @@ def test_concentrations(baseline_model): concentrations = [baseline_model.concentration(t) for t in ts] npt.assert_allclose( concentrations, - [0.000000e+00, 2.891970e-01, 1.267267e-04, 2.891969e-01, 5.548897e-08], + [0.000000e+00, 2.891970e-01, 1.266287e-04, 2.891969e-01, 5.544607e-08], rtol=1e-5 ) +def test_smooth_concentrations(baseline_model): + # We don't care about the actual concentrations in this test, but rather + # that the curve itself is smooth. + dx = 0.1 + dy_limit = dx * 2 # Anything more than this is a bit steep. + ts = np.arange(0, 10, dx) + concentrations = [baseline_model.concentration(t) for t in ts] + assert np.abs(np.diff(concentrations)).max() < dy_limit + + +def build_model(interval_duration): + model = models.Model( + room=models.Room(volume=75), + ventilation=models.HEPAFilter( + active=models.PeriodicInterval(period=120, duration=interval_duration), + q_air_mech=500., + ), + infected=models.InfectedPerson( + virus=models.Virus.types['SARS_CoV_2'], + present_times=((0, 4), (5, 8)), + mask=models.Mask.types['No mask'], + activity=models.Activity.types['Light exercise'], + expiration=models.Expiration.types['Unmodulated Vocalization'], + ), + infected_occupants=1, + exposed_occupants=10, + exposed_activity=models.Activity.types['Light exercise'], + ) + return model + + +def test_concentrations_startup(baseline_model): + # The concentrations should be the same until the beginning of the + # first time that the ventilation is disabled. + m1 = build_model(interval_duration=120) + m2 = build_model(interval_duration=65) + + assert m1.concentration(1.) == m2.concentration(1.) + + def test_r0(baseline_model): p = baseline_model.infection_probability() npt.assert_allclose(p, 93.196908) @@ -87,7 +128,7 @@ def test_periodic_window(baseline_periodic_window, baseline_room): ts = [0, 14/60, 15/60, 16/60, 119/60, 120/60, 121/60, 239/60, 240/60] aes = [baseline_periodic_window.air_exchange(baseline_room, t) for t in ts] rate = 6.86347 - answers = [0, 0, 0, 0, rate, 0, 0, rate, 0] + answers = [0, rate, rate, 0, 0, 0, rate, 0, 0] npt.assert_allclose(aes, answers, rtol=1e-5) @@ -96,5 +137,5 @@ def test_periodic_hepa(baseline_periodic_hepa, baseline_room): ts = [0, 14 / 60, 15 / 60, 16 / 60, 119 / 60, 120 / 60, 121 / 60, 239 / 60, 240 / 60] rate = 514.74 / 75 aes = [baseline_periodic_hepa.air_exchange(baseline_room, t) for t in ts] - answers = [0, 0, 0, 0, rate, 0, 0, rate, 0] + answers = [0, rate, rate, 0, 0, 0, rate, 0, 0] npt.assert_allclose(aes, answers, rtol=1e-5)