added expert apps validation for presence times

This commit is contained in:
Luis Aleixo 2023-04-04 11:35:40 +02:00
parent 480cf36624
commit fe7cbf45b7
6 changed files with 189 additions and 79 deletions

View file

@ -20,10 +20,18 @@ from ... import dataclass_utils
def model_start_end(model: models.ExposureModel):
t_start = min(model.exposed.presence.boundaries()[0][0],
model.concentration_model.infected.presence.boundaries()[0][0])
t_end = max(model.exposed.presence.boundaries()[-1][1],
model.concentration_model.infected.presence.boundaries()[-1][1])
if (isinstance(model.exposed.number, int) and isinstance(model.exposed.presence, models.Interval)
and isinstance(model.concentration_model.infected.number, int) and isinstance(model.concentration_model.infected.presence, models.Interval)):
t_start = min(model.exposed.presence.boundaries()[0][0],
model.concentration_model.infected.presence.boundaries()[0][0])
t_end = max(model.exposed.presence.boundaries()[-1][1],
model.concentration_model.infected.presence.boundaries()[-1][1])
elif (isinstance(model.exposed.number, models.IntPiecewiseContant)
and isinstance(model.concentration_model.infected.number, models.IntPiecewiseContant)):
t_start = min(model.exposed.number.interval().boundaries()[0][0],
model.concentration_model.infected.number.interval().boundaries()[0][0])
t_end = max(model.exposed.number.interval().boundaries()[-1][1],
model.concentration_model.infected.number.interval().boundaries()[-1][1])
return t_start, t_end
@ -141,11 +149,15 @@ def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing
exposed_occupants = model.exposed.number
expected_new_cases = np.array(model.expected_new_cases()).mean()
uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(model))) if form.conditional_probability_plot else None
if isinstance(model.exposed.number, int) and isinstance(model.exposed.presence, models.Interval):
exposed_presence_intervals = [list(interval) for interval in model.exposed.presence.boundaries()]
elif isinstance(model.exposed.number, models.IntPiecewiseContant):
exposed_presence_intervals = [list(interval) for interval in model.exposed.number.interval().boundaries()]
return {
"model_repr": repr(model),
"times": list(times),
"exposed_presence_intervals": [list(interval) for interval in model.exposed.presence.boundaries()],
"exposed_presence_intervals": exposed_presence_intervals,
"short_range_intervals": short_range_intervals,
"short_range_expirations": short_range_expirations,
"concentrations": concentrations,

View file

@ -148,8 +148,12 @@ class ExposureModelResult(View):
def update_plot(self, model: models.ExposureModel):
resolution = 600
ts = np.linspace(sorted(model.concentration_model.infected.presence.transition_times())[0],
sorted(model.concentration_model.infected.presence.transition_times())[-1], resolution)
if isinstance(model.concentration_model.infected.number, int) and isinstance(model.concentration_model.infected.presence, models.Interval):
infected_presence = model.concentration_model.infected.presence
elif isinstance(model.concentration_model.infected.number, models.IntPiecewiseContant):
infected_presence = model.concentration_model.infected.number.interval()
ts = np.linspace(sorted(infected_presence.transition_times())[0],
sorted(infected_presence.transition_times())[-1], resolution)
concentration = [model.concentration(t) for t in ts]
cumulative_doses = np.cumsum([
@ -164,16 +168,21 @@ class ExposureModelResult(View):
self.ax.ignore_existing_data_limits = False
self.concentration_line.set_data(ts, concentration)
if isinstance(model.exposed.number, int) and isinstance(model.exposed.presence, models.Interval):
exposed_presence = model.exposed.presence
elif isinstance(model.exposed.number, models.IntPiecewiseContant):
exposed_presence = model.exposed.number.interval()
if self.concentration_area is None:
self.concentration_area = self.ax.fill_between(x = ts, y1=0, y2=concentration, color="#96cbff",
where = ((model.exposed.presence.boundaries()[0][0] < ts) & (ts < model.exposed.presence.boundaries()[0][1]) |
(model.exposed.presence.boundaries()[1][0] < ts) & (ts < model.exposed.presence.boundaries()[1][1])))
where = ((exposed_presence.boundaries()[0][0] < ts) & (ts < exposed_presence.boundaries()[0][1]) |
(exposed_presence.boundaries()[1][0] < ts) & (ts < exposed_presence.boundaries()[1][1])))
else:
self.concentration_area.remove()
self.concentration_area = self.ax.fill_between(x = ts, y1=0, y2=concentration, color="#96cbff",
where = ((model.exposed.presence.boundaries()[0][0] < ts) & (ts < model.exposed.presence.boundaries()[0][1]) |
(model.exposed.presence.boundaries()[1][0] < ts) & (ts < model.exposed.presence.boundaries()[1][1])))
where = ((exposed_presence.boundaries()[0][0] < ts) & (ts < exposed_presence.boundaries()[0][1]) |
(exposed_presence.boundaries()[1][0] < ts) & (ts < exposed_presence.boundaries()[1][1])))
if self.cumulative_line is None:
[self.cumulative_line] = self.ax2.plot(ts[:-1], cumulative_doses, color='#0000c8', linestyle='dotted')
@ -187,8 +196,8 @@ class ExposureModelResult(View):
cumulative_top = max(cumulative_doses)
self.ax2.set_ylim(bottom=0., top=cumulative_top)
self.ax.set_xlim(left = min(min(model.concentration_model.infected.presence.boundaries()[0]), min(model.exposed.presence.boundaries()[0])),
right = max(max(model.concentration_model.infected.presence.boundaries()[1]), max(model.exposed.presence.boundaries()[1])))
self.ax.set_xlim(left = min(min(infected_presence.boundaries()[0]), min(exposed_presence.boundaries()[0])),
right = max(max(infected_presence.boundaries()[1]), max(exposed_presence.boundaries()[1])))
figure_legends = [mlines.Line2D([], [], color='#3530fe', markersize=15, label='Mean concentration'),
mlines.Line2D([], [], color='#0000c8', markersize=15, ls="dotted", label='Cumulative dose'),
@ -649,21 +658,10 @@ class ModelWidgets(View):
number.observe(on_exposed_number_change, names=['value'])
return widgets.HBox([widgets.Label('Number of exposed people in the room '), number], layout=widgets.Layout(justify_content='space-between'))
def generate_presence_widget(self, min, max, node):
options = list(pd.date_range(min, max, freq="1min").strftime('%H:%M'))
start_hour = float(node[0])
end_hour = float(node[1])
start_hour_datetime = datetime.time(hour = int(start_hour), minute=int(start_hour%1*60))
end_hour_datetime = datetime.time(hour = int(end_hour), minute=int(end_hour%1*60))
return widgets.SelectionRangeSlider(
options=options,
index=(options.index(str(start_hour_datetime)[:-3]), options.index(str(end_hour_datetime)[:-3])),
)
def _build_exposed_presence(self, node):
presence_start = self.generate_presence_widget(min='00:00', max='13:00', node=node.present_times[0])
presence_finish = self.generate_presence_widget(min='13:00', max='23:59', node=node.present_times[1])
presence_start = generate_presence_widget(min='00:00', max='13:00', node=node.present_times[0])
presence_finish = generate_presence_widget(min='13:00', max='23:59', node=node.present_times[1])
def on_presence_start_change(change):
new_value = tuple([int(time[:-3])+float(time[3:])/60 for time in change['new']])
@ -719,8 +717,8 @@ class ModelWidgets(View):
return widgets.HBox([widgets.Label("Viral load (copies/ml)"), viral_load_in_sputum], layout=widgets.Layout(justify_content='space-between'))
def _build_infected_presence(self, node, ventilation_node):
presence_start = self.generate_presence_widget(min='00:00', max='13:00', node=node.present_times[0])
presence_finish = self.generate_presence_widget(min='13:00', max='23:59', node=node.present_times[1])
presence_start = generate_presence_widget(min='00:00', max='13:00', node=node.present_times[0])
presence_finish = generate_presence_widget(min='13:00', max='23:59', node=node.present_times[1])
def on_presence_start_change(change):
new_value = tuple([int(time[:-3])+float(time[3:])/60 for time in change['new']])
@ -1119,6 +1117,18 @@ def models_start_end(models: typing.Sequence[models.ExposureModel]) -> typing.Tu
Returns the earliest start and latest end time of a collection of ConcentrationModel objects
"""
infected_start = min(model.concentration_model.infected.presence.boundaries()[0][0] for model in models)
infected_finish = min(model.concentration_model.infected.presence.boundaries()[-1][1] for model in models)
infected_start = min(model.concentration_model.infected.presence.boundaries()[0][0] for model in models) # type: ignore
infected_finish = min(model.concentration_model.infected.presence.boundaries()[-1][1] for model in models) # type: ignore
return infected_start, infected_finish
def generate_presence_widget(min, max, node):
options = list(pd.date_range(min, max, freq="1min").strftime('%H:%M'))
start_hour = float(node[0])
end_hour = float(node[1])
start_hour_datetime = datetime.time(hour = int(start_hour), minute=int(start_hour%1*60))
end_hour_datetime = datetime.time(hour = int(end_hour), minute=int(end_hour%1*60))
return widgets.SelectionRangeSlider(
options=options,
index=(options.index(str(start_hour_datetime)[:-3]), options.index(str(end_hour_datetime)[:-3])),
)

View file

@ -8,7 +8,7 @@ import matplotlib
import matplotlib.figure
import matplotlib.lines as mlines
import matplotlib.patches as patches
from .expert import collapsible, ipympl_canvas, WidgetGroup, CAIMIRAStateBuilder
from .expert import generate_presence_widget, collapsible, ipympl_canvas, WidgetGroup, CAIMIRAStateBuilder
baseline_model = models.CO2ConcentrationModel(
@ -368,20 +368,26 @@ class ModelWidgets(View):
return widgets.HBox([widgets.Label('Number of people in the room '), number], layout=widgets.Layout(justify_content='space-between'))
def _build_population_presence(self, node, ventilation_node):
presence_start = widgets.FloatRangeSlider(value = node.present_times[0], min = 8., max=13., step=0.1)
presence_finish = widgets.FloatRangeSlider(value = node.present_times[1], min = 13., max=18., step=0.1)
presence_start = generate_presence_widget(min='00:00', max='13:00', node=node.present_times[0])
presence_finish = generate_presence_widget(min='13:00', max='23:59', node=node.present_times[1])
def on_presence_start_change(change):
ventilation_node.active.start = change['new'][0] - ventilation_node.active.duration / 60
node.present_times = (change['new'], presence_finish.value)
new_value = tuple([int(time[:-3])+float(time[3:])/60 for time in change['new']])
ventilation_node.active.start = new_value[0] - ventilation_node.active.duration / 60
node.present_times = (new_value, node.present_times[1])
def on_presence_finish_change(change):
node.present_times = (presence_start.value, change['new'])
new_value = tuple([int(time[:-3])+float(time[3:])/60 for time in change['new']])
node.present_times = (node.present_times[0], new_value)
presence_start.observe(on_presence_start_change, names=['value'])
presence_finish.observe(on_presence_finish_change, names=['value'])
return widgets.HBox([widgets.Label('Population presence'), presence_start, presence_finish], layout = widgets.Layout(justify_content='space-between'))
return widgets.VBox([
widgets.Label('Exposed presence:'),
widgets.HBox([widgets.Label('Morning:', layout=widgets.Layout(width='15%')), presence_start]),
widgets.HBox([widgets.Label('Afternoon:', layout=widgets.Layout(width='15%')), presence_finish])
])
def present(self):
return self.widget

View file

@ -186,6 +186,23 @@ class PiecewiseConstant:
)
@dataclass(frozen=True)
class IntPiecewiseContant(PiecewiseConstant):
#: values of the function between transitions
values: typing.Tuple[int, ...]
def value(self, time) -> _VectorisedFloat:
if time <= self.transition_times[0] or time > self.transition_times[-1]:
return 0
for t1, t2, value in zip(self.transition_times[:-1],
self.transition_times[1:], self.values):
if t1 < time <= t2:
break
return value
@dataclass(frozen=True)
class Room:
#: The total volume of the room
@ -779,7 +796,7 @@ class Population:
"""
#: How many in the population.
number: int
number: typing.Union[int, IntPiecewiseContant]
#: The times in which the people are in the room.
presence: Interval
@ -795,9 +812,46 @@ class Population:
# which might render inactive some RNA copies (virions).
host_immunity: float
def person_present(self, time):
return self.presence.triggered(time)
def __post_init__(self):
if isinstance(self.number, int):
if not isinstance(self.presence, Interval):
raise TypeError(f'The presence argument must be an "Interval". Got {type(self.presence)}')
else:
if self.presence is not None:
raise TypeError(f'The presence argument must be None for a IntPiecewiseConstant number.')
def _last_presence_state(self, time: float):
"""
Find the most recent/previous presence state change.
"""
times = set(self.number.transition_times) # type: ignore
t_index: int = np.searchsorted(sorted(times), time) # type: ignore
t_index = max([t_index - 1, 0])
return sorted(times)[t_index]
def person_present(self, time: float):
# Allow back-compatibility
if isinstance(self.number, int) and isinstance(self.presence, Interval):
return self.presence.triggered(time)
elif isinstance(self.number, IntPiecewiseContant):
return self.number.value(time) != 0
def people_present(self, time: typing.Optional[float] = None):
# Allow back-compatibility
if isinstance(self.number, int) and isinstance(self.presence, Interval):
return self.number
elif isinstance(self.number, IntPiecewiseContant):
# For a given scenario, time falls within a break,
# we should get the previous state occupancy profile.
if time and not self.person_present(time):
if time <= self.number.transition_times[0]:
number = self.number.values[0]
else:
number = int(self.number.value(self._last_presence_state(time)))
else:
number = int(self.number.value(time))
return number
@dataclass(frozen=True)
class _PopulationWithVirus(Population):
@ -818,7 +872,7 @@ class _PopulationWithVirus(Population):
"""
raise NotImplementedError("Subclass must implement")
def emission_rate_per_aerosol_when_present(self) -> _VectorisedFloat:
def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat:
"""
The emission rate of virions in the expired air per mL of respiratory fluid,
if the infected population is present, in (virion.cm^3)/(mL.h).
@ -828,12 +882,12 @@ class _PopulationWithVirus(Population):
raise NotImplementedError("Subclass must implement")
@method_cache
def emission_rate_when_present(self) -> _VectorisedFloat:
def emission_rate_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat:
"""
The emission rate if the infected population is present
(in virions / h).
"""
return (self.emission_rate_per_aerosol_when_present() *
return (self.emission_rate_per_aerosol_when_present(time) *
self.aerosols())
def emission_rate(self, time) -> _VectorisedFloat:
@ -851,7 +905,7 @@ class _PopulationWithVirus(Population):
# with a declaration of state change time, as is the case for things
# like Ventilation.
return self.emission_rate_when_present()
return self.emission_rate_when_present(time)
@property
def particle(self) -> Particle:
@ -875,14 +929,14 @@ class EmittingPopulation(_PopulationWithVirus):
return 1.
@method_cache
def emission_rate_per_aerosol_when_present(self) -> _VectorisedFloat:
def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat:
"""
The emission rate of virions in the expired air per mL of respiratory fluid,
if the infected population is present, in (virion.cm^3)/(mL.h).
This method includes only the diameter-independent variables within the emission rate.
It should not be a function of time.
"""
return self.known_individual_emission_rate * self.number
return self.known_individual_emission_rate * self.people_present(time)
@dataclass(frozen=True)
@ -904,7 +958,7 @@ class InfectedPopulation(_PopulationWithVirus):
return self.expiration.aerosols(self.mask)
@method_cache
def emission_rate_per_aerosol_when_present(self) -> _VectorisedFloat:
def emission_rate_per_aerosol_when_present(self, time: typing.Optional[float] = None) -> _VectorisedFloat:
"""
The emission rate of virions in the expired air per mL of respiratory fluid,
if the infected population is present, in (virion.cm^3)/(mL.h).
@ -917,8 +971,11 @@ class InfectedPopulation(_PopulationWithVirus):
ER = (self.virus.viral_load_in_sputum *
self.activity.exhalation_rate *
10 ** 6)
return ER * self.number
# if self.people_present(time) == 0:
# return ER
# else:
return ER * self.people_present(time)
@property
def particle(self) -> Particle:
@ -986,7 +1043,7 @@ class _ConcentrationModelBase:
"""
return 0.
def normalization_factor(self) -> _VectorisedFloat:
def normalization_factor(self, time: typing.Optional[float] = None) -> _VectorisedFloat:
"""
Normalization factor (in the same unit as the concentration).
This factor is applied to the normalized concentration only
@ -1006,12 +1063,12 @@ class _ConcentrationModelBase:
dependence has been solved for.
"""
if not self.population.person_present(time):
return self.min_background_concentration()/self.normalization_factor()
return self.min_background_concentration()/self.normalization_factor(time)
V = self.room.volume
RR = self.removal_rate(time)
return (1. / (RR * V) + self.min_background_concentration()/
self.normalization_factor())
self.normalization_factor(time))
@method_cache
def state_change_times(self) -> typing.List[float]:
@ -1020,7 +1077,10 @@ class _ConcentrationModelBase:
the times at which their state changes.
"""
state_change_times = {0.}
state_change_times.update(self.population.presence.transition_times())
if isinstance(self.population.number, int) and isinstance(self.population.presence, Interval):
state_change_times.update(self.population.presence.transition_times())
elif isinstance(self.population.number, IntPiecewiseContant):
state_change_times.update(self.population.number.interval().transition_times())
state_change_times.update(self.ventilation.transition_times(self.room))
return sorted(state_change_times)
@ -1029,7 +1089,11 @@ class _ConcentrationModelBase:
"""
First presence time. Before that, the concentration is zero.
"""
return self.population.presence.boundaries()[0][0]
if isinstance(self.population.number, int) and isinstance(self.population.presence, Interval):
first_presence = self.population.presence.boundaries()[0][0]
elif isinstance(self.population.number, IntPiecewiseContant):
first_presence = self.population.number.interval().boundaries()[0][0]
return first_presence
def last_state_change(self, time: float) -> float:
"""
@ -1082,7 +1146,11 @@ class _ConcentrationModelBase:
# The model always starts at t=0, but we avoid running concentration calculations
# before the first presence as an optimisation.
if time <= self._first_presence_time():
return self.min_background_concentration()/self.normalization_factor()
try:
return self.min_background_concentration()/self.normalization_factor(time)
except ZeroDivisionError:
return 0.
next_state_change_time = self._next_state_change(time)
RR = self.removal_rate(next_state_change_time)
# If RR is 0, conc_limit does not play a role but its computation
@ -1108,7 +1176,7 @@ class _ConcentrationModelBase:
to this method.
"""
return (self._normed_concentration_cached(time) *
self.normalization_factor())
self.normalization_factor(time))
@method_cache
def normed_integrated_concentration(self, start: float, stop: float) -> _VectorisedFloat:
@ -1145,7 +1213,7 @@ class _ConcentrationModelBase:
Get the integrated concentration of viruses in the air between the times start and stop.
"""
return (self.normed_integrated_concentration(start, stop) *
self.normalization_factor())
self.normalization_factor(start))
@dataclass(frozen=True)
@ -1170,9 +1238,9 @@ class ConcentrationModel(_ConcentrationModelBase):
def virus(self) -> Virus:
return self.infected.virus
def normalization_factor(self) -> _VectorisedFloat:
def normalization_factor(self, time: typing.Optional[float] = None) -> _VectorisedFloat:
# we normalize by the emission rate
return self.infected.emission_rate_when_present()
return self.infected.emission_rate_when_present(time)
def removal_rate(self, time: float) -> _VectorisedFloat:
# Equilibrium velocity of particle motion toward the floor
@ -1220,10 +1288,10 @@ class CO2ConcentrationModel(_ConcentrationModelBase):
"""
return self.CO2_atmosphere_concentration
def normalization_factor(self) -> _VectorisedFloat:
def normalization_factor(self, time: typing.Optional[float] = None) -> _VectorisedFloat:
# normalization by the CO2 exhaled.
# CO2 concentration given in ppm, hence the 1e6 factor.
return (1e6*self.population.number
return (1e6*self.population.people_present(time)
*self.population.activity.exhalation_rate
*self.CO2_fraction_exhaled)
@ -1469,19 +1537,21 @@ class ExposureModel:
by the emission rate of the infected population
"""
exposure = 0.
for start, stop in self.exposed.presence.boundaries():
if stop < time1:
continue
elif start > time2:
break
elif start <= time1 and time2<= stop:
exposure += self.concentration_model.normed_integrated_concentration(time1, time2)
elif start <= time1 and stop < time2:
exposure += self.concentration_model.normed_integrated_concentration(time1, stop)
elif time1 < start and time2 <= stop:
exposure += self.concentration_model.normed_integrated_concentration(start, time2)
elif time1 <= start and stop < time2:
exposure += self.concentration_model.normed_integrated_concentration(start, stop)
if isinstance(self.exposed.number, int) and isinstance(self.exposed.presence, Interval):
# TODO: Implement the case for the IntPiecewiseConstant
for start, stop in self.exposed.presence.boundaries():
if stop < time1:
continue
elif start > time2:
break
elif start <= time1 and time2<= stop:
exposure += self.concentration_model.normed_integrated_concentration(time1, time2)
elif start <= time1 and stop < time2:
exposure += self.concentration_model.normed_integrated_concentration(time1, stop)
elif time1 < start and time2 <= stop:
exposure += self.concentration_model.normed_integrated_concentration(start, time2)
elif time1 <= start and stop < time2:
exposure += self.concentration_model.normed_integrated_concentration(start, stop)
return exposure
def concentration(self, time: float) -> _VectorisedFloat:
@ -1589,9 +1659,10 @@ class ExposureModel:
The number of virus per m^3 deposited on the respiratory tract.
"""
deposited_exposure: _VectorisedFloat = 0.0
for start, stop in self.exposed.presence.boundaries():
deposited_exposure += self.deposited_exposure_between_bounds(start, stop)
if isinstance(self.exposed.number, int) and isinstance(self.exposed.presence, Interval):
# TODO: Implement the case for the IntPiecewiseConstant
for start, stop in self.exposed.presence.boundaries():
deposited_exposure += self.deposited_exposure_between_bounds(start, stop)
return deposited_exposure * self.repeats
@ -1610,7 +1681,9 @@ class ExposureModel:
if (self.geographical_data.geographic_population != 0 and self.geographical_data.geographic_cases != 0):
sum_probability = 0.0
# Create an equivalent exposure model but changing the number of infected cases.
total_people = self.concentration_model.infected.number + self.exposed.number
if isinstance(self.concentration_model.infected.number, int) and isinstance(self.exposed.number, int):
# TODO: Implement the case for the IntPiecewiseConstant
total_people = self.concentration_model.infected.number + self.exposed.number
max_num_infected = (total_people if total_people < 10 else 10)
# The influence of a higher number of simultainious infected people (> 4 - 5) yields an almost negligible contirbution to the total probability.
# To be on the safe side, a hard coded limit with a safety margin of 2x was set.

View file

@ -74,6 +74,14 @@ def _build_mc_model(model: dataclass_instance) -> typing.Type[MCModelBase[_Model
elif new_field.type == typing.Tuple[caimira.models.SpecificInterval, ...]:
SI = getattr(sys.modules[__name__], "SpecificInterval")
field_type = typing.Tuple[typing.Union[caimira.models.SpecificInterval, SI], ...]
elif new_field.type == typing.Union[int, caimira.models.IntPiecewiseContant]:
IPC = getattr(sys.modules[__name__], "IntPiecewiseContant")
field_type = typing.Union[caimira.models.IntPiecewiseContant, IPC]
elif new_field.type == typing.Union[caimira.models.Interval, None]:
I = getattr(sys.modules[__name__], "Interval")
field_type = typing.Union[caimira.models.Interval, I]
else:
# Check that we don't need to do anything with this type.
for item in new_field.type.__args__:

View file

@ -4,6 +4,7 @@ import numpy as np
import numpy.testing as npt
import pytest
from dataclasses import dataclass
import typing
from caimira import models
@ -32,7 +33,7 @@ class KnownConcentrationModelBase(models._ConcentrationModelBase):
def min_background_concentration(self) -> float:
return self.known_min_background_concentration
def normalization_factor(self) -> float:
def normalization_factor(self, time: typing.Optional[float] = None) -> float:
return self.known_normalization_factor