Improve the concentration calculation scheme by more strictly defining the time interval concept to be closed start and open end
This commit is contained in:
parent
a70f899b52
commit
2a20bf834f
3 changed files with 103 additions and 36 deletions
|
|
@ -34,8 +34,8 @@ class ConcentrationFigure:
|
|||
self.line = None
|
||||
|
||||
def update(self, model: models.Model):
|
||||
resolution = 600
|
||||
ts = np.linspace(0, 10, resolution)
|
||||
resolution = 500
|
||||
ts = np.linspace(0, 9.09, resolution)
|
||||
concentration = [model.concentration(t) for t in ts]
|
||||
if self.line is None:
|
||||
[self.line] = self.ax.plot(ts, concentration)
|
||||
|
|
@ -56,6 +56,11 @@ class ConcentrationFigure:
|
|||
self.line.set_data(ts, concentration)
|
||||
self.ax.relim()
|
||||
self.ax.autoscale_view()
|
||||
# self.ax.set_yscale('log')
|
||||
# if max(concentration) > 1:
|
||||
self.ax.set_ylim(bottom=1e-4, top=5)
|
||||
# else:
|
||||
# self.ax.set_ylim(bottom=0, top=1)
|
||||
self.figure.canvas.draw()
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -33,31 +33,54 @@ class Interval:
|
|||
|
||||
@dataclass(frozen=True)
|
||||
class PeriodicInterval(Interval):
|
||||
#: How often does the interval occur.
|
||||
#: How often does the interval occur (minutes).
|
||||
period: int
|
||||
|
||||
#: How long does the interval occur for.
|
||||
#: How long does the interval occur for (minutes).
|
||||
#: A value greater than :data:`period` signifies the event is permanently
|
||||
#: occurring, a value of 0 signifies that the event never happens.
|
||||
duration: int
|
||||
|
||||
def states(self):
|
||||
result = []
|
||||
for i in np.arange(0, 24, self.period / 60):
|
||||
result.append((i, i+self.duration/60))
|
||||
return tuple(result)
|
||||
|
||||
def triggered(self, time: float) -> bool:
|
||||
for start, end in self.states():
|
||||
if start < time <= end:
|
||||
return True
|
||||
return False
|
||||
|
||||
if self.duration >= self.period:
|
||||
return True
|
||||
if self.duration == 0:
|
||||
return False
|
||||
|
||||
period = self.period / 60.
|
||||
duration = self.duration / 60.
|
||||
return (time % period) >= (period - duration)
|
||||
t = time % period
|
||||
return 0 <= t < duration
|
||||
|
||||
def boundaries(self) -> typing.Set[float]:
|
||||
state_changes = set()
|
||||
for start, end in self.states():
|
||||
state_changes.add(start)
|
||||
state_changes.add(end)
|
||||
return state_changes
|
||||
|
||||
if self.duration >= self.period:
|
||||
return set()
|
||||
if self.duration == 0:
|
||||
return set()
|
||||
state_changes = set()
|
||||
period_h = self.period / 60
|
||||
duration_h = self.duration / 60
|
||||
# The current implementation starts at the end of the first period, not at 0.
|
||||
start = period_h
|
||||
# Take as many steps as we need to get to a full day.
|
||||
for i in np.arange(start, 24, period_h):
|
||||
for i in np.arange(0, 24, period_h):
|
||||
state_changes.add(i)
|
||||
if duration_h < period_h:
|
||||
state_changes.add(i - duration_h)
|
||||
|
||||
state_changes.add(i + duration_h)
|
||||
return state_changes
|
||||
|
||||
|
||||
|
|
@ -260,20 +283,20 @@ class InfectedPerson:
|
|||
|
||||
def person_present(self, time):
|
||||
for start, end in self.present_times:
|
||||
if start <= time <= end:
|
||||
if start < time <= end:
|
||||
return True
|
||||
return False
|
||||
|
||||
def start_end_of_presence(self, time) -> typing.Tuple[float, float]:
|
||||
"""
|
||||
Find the most recent start (and associated) end-time (even if the
|
||||
given time is after the end-point) given a time.
|
||||
|
||||
"""
|
||||
for start, end in self.present_times[::-1]:
|
||||
if time > start:
|
||||
return start, end
|
||||
return start, end
|
||||
# def start_end_of_presence(self, time) -> typing.Tuple[float, float]:
|
||||
# """
|
||||
# Find the most recent start (and associated) end-time (even if the
|
||||
# given time is after the end-point) given a time.
|
||||
#
|
||||
# """
|
||||
# for start, end in self.present_times[::-1]:
|
||||
# if time > start:
|
||||
# return start, end
|
||||
# return start, end
|
||||
|
||||
@functools.lru_cache()
|
||||
def emission_rate(self, time) -> float:
|
||||
|
|
@ -339,26 +362,24 @@ class Model:
|
|||
|
||||
"""
|
||||
for change_time in self.collect_time_state_changes()[::-1]:
|
||||
if time >= change_time:
|
||||
if change_time < time:
|
||||
return change_time
|
||||
return 0
|
||||
|
||||
@functools.lru_cache()
|
||||
def concentration(self, time: float) -> float:
|
||||
t = time
|
||||
if time == 0:
|
||||
return 0.0
|
||||
IVRR = self.infectious_virus_removal_rate(time)
|
||||
V = self.room.volume
|
||||
Ni = self.infected_occupants
|
||||
ER = self.infected.emission_rate(time)
|
||||
if t == 0:
|
||||
return 0.0
|
||||
t_last_state_change = self.last_state_change(time)
|
||||
if t != t_last_state_change:
|
||||
concentration_at_last_state_change = self.concentration(t_last_state_change)
|
||||
else:
|
||||
concentration_at_last_state_change = self.concentration(t - 0.0001)
|
||||
|
||||
fac = np.exp(IVRR * (t_last_state_change - t))
|
||||
t_last_state_change = self.last_state_change(time)
|
||||
concentration_at_last_state_change = self.concentration(t_last_state_change)
|
||||
|
||||
delta_time = time - t_last_state_change
|
||||
fac = np.exp(-IVRR * delta_time)
|
||||
concentration_limit = (ER * Ni) / (IVRR * V)
|
||||
return concentration_limit * (1 - fac) + concentration_at_last_state_change * fac
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import numpy as np
|
||||
import numpy.testing as npt
|
||||
import pytest
|
||||
|
||||
|
|
@ -17,7 +18,7 @@ def test_no_mask_emission_rate(baseline_model):
|
|||
rate = 167.74011998223307
|
||||
npt.assert_allclose(
|
||||
[baseline_model.infected.emission_rate(t) for t in [0, 1, 4, 4.5, 5, 8, 9]],
|
||||
[rate, rate, rate, 0, rate, rate, 0],
|
||||
[0, rate, rate, 0, 0, rate, 0],
|
||||
rtol=1e-5
|
||||
)
|
||||
|
||||
|
|
@ -72,11 +73,51 @@ def test_concentrations(baseline_model):
|
|||
concentrations = [baseline_model.concentration(t) for t in ts]
|
||||
npt.assert_allclose(
|
||||
concentrations,
|
||||
[0.000000e+00, 2.891970e-01, 1.267267e-04, 2.891969e-01, 5.548897e-08],
|
||||
[0.000000e+00, 2.891970e-01, 1.266287e-04, 2.891969e-01, 5.544607e-08],
|
||||
rtol=1e-5
|
||||
)
|
||||
|
||||
|
||||
def test_smooth_concentrations(baseline_model):
|
||||
# We don't care about the actual concentrations in this test, but rather
|
||||
# that the curve itself is smooth.
|
||||
dx = 0.1
|
||||
dy_limit = dx * 2 # Anything more than this is a bit steep.
|
||||
ts = np.arange(0, 10, dx)
|
||||
concentrations = [baseline_model.concentration(t) for t in ts]
|
||||
assert np.abs(np.diff(concentrations)).max() < dy_limit
|
||||
|
||||
|
||||
def build_model(interval_duration):
|
||||
model = models.Model(
|
||||
room=models.Room(volume=75),
|
||||
ventilation=models.HEPAFilter(
|
||||
active=models.PeriodicInterval(period=120, duration=interval_duration),
|
||||
q_air_mech=500.,
|
||||
),
|
||||
infected=models.InfectedPerson(
|
||||
virus=models.Virus.types['SARS_CoV_2'],
|
||||
present_times=((0, 4), (5, 8)),
|
||||
mask=models.Mask.types['No mask'],
|
||||
activity=models.Activity.types['Light exercise'],
|
||||
expiration=models.Expiration.types['Unmodulated Vocalization'],
|
||||
),
|
||||
infected_occupants=1,
|
||||
exposed_occupants=10,
|
||||
exposed_activity=models.Activity.types['Light exercise'],
|
||||
)
|
||||
return model
|
||||
|
||||
|
||||
def test_concentrations_startup(baseline_model):
|
||||
# The concentrations should be the same until the beginning of the
|
||||
# first time that the ventilation is disabled.
|
||||
m1 = build_model(interval_duration=120)
|
||||
m2 = build_model(interval_duration=65)
|
||||
|
||||
assert m1.concentration(1.) == m2.concentration(1.)
|
||||
|
||||
|
||||
def test_r0(baseline_model):
|
||||
p = baseline_model.infection_probability()
|
||||
npt.assert_allclose(p, 93.196908)
|
||||
|
|
@ -87,7 +128,7 @@ def test_periodic_window(baseline_periodic_window, baseline_room):
|
|||
ts = [0, 14/60, 15/60, 16/60, 119/60, 120/60, 121/60, 239/60, 240/60]
|
||||
aes = [baseline_periodic_window.air_exchange(baseline_room, t) for t in ts]
|
||||
rate = 6.86347
|
||||
answers = [0, 0, 0, 0, rate, 0, 0, rate, 0]
|
||||
answers = [0, rate, rate, 0, 0, 0, rate, 0, 0]
|
||||
npt.assert_allclose(aes, answers, rtol=1e-5)
|
||||
|
||||
|
||||
|
|
@ -96,5 +137,5 @@ def test_periodic_hepa(baseline_periodic_hepa, baseline_room):
|
|||
ts = [0, 14 / 60, 15 / 60, 16 / 60, 119 / 60, 120 / 60, 121 / 60, 239 / 60, 240 / 60]
|
||||
rate = 514.74 / 75
|
||||
aes = [baseline_periodic_hepa.air_exchange(baseline_room, t) for t in ts]
|
||||
answers = [0, 0, 0, 0, rate, 0, 0, rate, 0]
|
||||
answers = [0, rate, rate, 0, 0, 0, rate, 0, 0]
|
||||
npt.assert_allclose(aes, answers, rtol=1e-5)
|
||||
|
|
|
|||
Loading…
Reference in a new issue