Merge branch 'feature/expert_app_initial_changes' into 'master'

Initial changes so that the expert app possess most of the the calculator features

See merge request cara/cara!348
This commit is contained in:
Andre Henriques 2022-05-31 10:23:36 +02:00
commit ec4c5732b2
3 changed files with 483 additions and 133 deletions

View file

@ -4,13 +4,14 @@ import uuid
import ipympl.backend_nbagg
import ipywidgets as widgets
import numpy as np
import matplotlib
import matplotlib.figure
import matplotlib.lines as mlines
import matplotlib.patches as patches
from matplotlib import pyplot as plt
import numpy as np
from cara import models
from cara import state
from cara import data
from cara import data, models, state
def collapsible(widgets_to_collapse: typing.List, title: str, start_collapsed=False):
@ -60,7 +61,7 @@ class WidgetGroup:
return widgets.VBox(
[
widgets.HBox(
[labels_w, widgets_w],
[labels_w, widgets_w], layout=widgets.Layout(justify_content='space-between')
),
],
)
@ -111,8 +112,10 @@ class ExposureModelResult(View):
self.figure = matplotlib.figure.Figure(figsize=(9, 6))
ipympl_canvas(self.figure)
self.html_output = widgets.HTML()
self.ax = self.figure.add_subplot(1, 1, 1)
self.line = None
self.ax, self.ax2 = self.initialize_axes()
self.concentration_line = None
self.concentration_area = None
self.cumulative_line = None
@property
def widget(self):
@ -121,34 +124,75 @@ class ExposureModelResult(View):
self.figure.canvas,
])
def update(self, model: models.ExposureModel):
self.update_plot(model.concentration_model)
self.update_textual_result(model)
def update_plot(self, model: models.ConcentrationModel):
resolution = 600
ts = np.linspace(sorted(model.infected.presence.transition_times())[0],
sorted(model.infected.presence.transition_times())[-1], resolution)
concentration = [model.concentration(t) for t in ts]
if self.line is None:
[self.line] = self.ax.plot(ts, concentration)
ax = self.ax
# ax.text(0.5, 0.9, 'Without masks & window open', transform=ax.transAxes, ha='center')
def initialize_axes(self) -> matplotlib.figure.Axes:
ax = self.figure.add_subplot(1, 1, 1)
ax.spines['right'].set_visible(False)
ax.spines['top'].set_visible(False)
ax.set_xlabel('Time (hours)')
ax.set_ylabel('Concentration ($virions/m^{3}$)')
ax.set_title('Concentration of virions')
ax.set_ylabel('Mean concentration ($virions/m^{3}$)')
ax.set_title('Concentration of virions \nand Cumulative dose')
ax2 = ax.twinx()
ax2.spines['left'].set_visible(False)
ax2.spines['top'].set_visible(False)
ax2.set_ylabel('Mean cumulative dose (infectious virus)')
ax2.spines['right'].set_linestyle((0,(1,4)))
return ax, ax2
def update(self, model: models.ExposureModel):
self.update_plot(model)
self.update_textual_result(model)
def update_plot(self, model: models.ExposureModel):
resolution = 600
ts = np.linspace(sorted(model.concentration_model.infected.presence.transition_times())[0],
sorted(model.concentration_model.infected.presence.transition_times())[-1], resolution)
concentration = [model.concentration(t) for t in ts]
cumulative_doses = np.cumsum([
np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean()
for time1, time2 in zip(ts[:-1], ts[1:])
])
if self.concentration_line is None:
[self.concentration_line] = self.ax.plot(ts, concentration, color='#3530fe')
else:
self.ax.ignore_existing_data_limits = True
self.line.set_data(ts, concentration)
# Update the top limit based on the concentration if it exceeds 5
# (rare but possible).
top = max([3, max(concentration)])
self.ax.set_ylim(bottom=0., top=top)
self.ax.ignore_existing_data_limits = False
self.concentration_line.set_data(ts, concentration)
if self.concentration_area is None:
self.concentration_area = self.ax.fill_between(x = ts, y1=0, y2=concentration, color="#96cbff",
where = ((model.exposed.presence.boundaries()[0][0] < ts) & (ts < model.exposed.presence.boundaries()[0][1]) |
(model.exposed.presence.boundaries()[1][0] < ts) & (ts < model.exposed.presence.boundaries()[1][1])))
else:
self.concentration_area.remove()
self.concentration_area = self.ax.fill_between(x = ts, y1=0, y2=concentration, color="#96cbff",
where = ((model.exposed.presence.boundaries()[0][0] < ts) & (ts < model.exposed.presence.boundaries()[0][1]) |
(model.exposed.presence.boundaries()[1][0] < ts) & (ts < model.exposed.presence.boundaries()[1][1])))
if self.cumulative_line is None:
[self.cumulative_line] = self.ax2.plot(ts[:-1], cumulative_doses, color='#0000c8', linestyle='dotted')
else:
self.ax2.ignore_existing_data_limits = False
self.cumulative_line.set_data(ts[:-1], cumulative_doses)
concentration_top = max(concentration)
self.ax.set_ylim(bottom=0., top=concentration_top)
cumulative_top = max(cumulative_doses)
self.ax2.set_ylim(bottom=0., top=cumulative_top)
self.ax.set_xlim(left = min(min(model.concentration_model.infected.presence.boundaries()[0]), min(model.exposed.presence.boundaries()[0])),
right = max(max(model.concentration_model.infected.presence.boundaries()[1]), max(model.exposed.presence.boundaries()[1])))
figure_legends = [mlines.Line2D([], [], color='#3530fe', markersize=15, label='Mean concentration'),
mlines.Line2D([], [], color='#0000c8', markersize=15, ls="dotted", label='Cumulative dose'),
patches.Patch(edgecolor="#96cbff", facecolor='#96cbff', label='Presence of exposed person(s)')]
self.ax.legend(handles=figure_legends)
self.figure.canvas.draw()
def update_textual_result(self, model: models.ExposureModel):
@ -172,7 +216,8 @@ class ExposureComparissonResult(View):
def __init__(self):
self.figure = matplotlib.figure.Figure(figsize=(9, 6))
ipympl_canvas(self.figure)
self.ax = self.initialize_axes()
self.html_output = widgets.HTML()
self.ax, self.ax2 = self.initialize_axes()
@property
def widget(self):
@ -184,28 +229,49 @@ class ExposureComparissonResult(View):
ax = self.figure.add_subplot(1, 1, 1)
ax.spines['right'].set_visible(False)
ax.spines['top'].set_visible(False)
ax.set_xlabel('Time (hours)')
ax.set_ylabel('Concentration ($virions/m^{3}$)')
ax.set_title('Concentration of virions')
return ax
ax.set_ylabel('Mean concentration ($virions/m^{3}$)')
ax.set_title('Concentration of virions \nand Cumulative dose')
ax2 = ax.twinx()
ax2.spines['left'].set_visible(False)
ax2.spines['top'].set_visible(False)
ax2.spines['right'].set_linestyle((0,(1,4)))
ax2.set_ylabel('Mean cumulative dose (infectious virus)')
return ax, ax2
def scenarios_updated(self, scenarios: typing.Sequence[ScenarioType], _):
updated_labels, updated_models = zip(*scenarios)
conc_models = tuple(
model.concentration_model.dcs_instance() for model in updated_models
exp_models = tuple(
model.dcs_instance() for model in updated_models
)
self.update_plot(conc_models, updated_labels)
self.update_plot(exp_models, updated_labels)
def update_plot(self, conc_models: typing.Tuple[models.ConcentrationModel, ...], labels: typing.Tuple[str, ...]):
def update_plot(self, exp_models: typing.Tuple[models.ExposureModel, ...], labels: typing.Tuple[str, ...]):
self.ax.lines.clear()
start, finish = models_start_end(conc_models)
self.ax2.lines.clear()
start, finish = models_start_end(exp_models)
colors=['blue', 'red', 'orange', 'yellow', 'pink', 'purple', 'green', 'brown', 'black' ]
ts = np.linspace(start, finish, num=250)
concentrations = [[conc_model.concentration(t) for t in ts] for conc_model in conc_models]
for label, concentration in zip(labels, concentrations):
self.ax.plot(ts, concentration, label=label)
concentrations = [[conc_model.concentration_model.concentration(t) for t in ts] for conc_model in exp_models]
for label, concentration, color in zip(labels, concentrations, colors):
self.ax.plot(ts, concentration, label=label, color=color)
top = max(3., max([max(conc) for conc in concentrations]))
self.ax.set_ylim(bottom=0., top=top)
cumulative_doses = [np.cumsum([
np.array(conc_model.deposited_exposure_between_bounds(float(time1), float(time2))).mean()
for time1, time2 in zip(ts[:-1], ts[1:])
]) for conc_model in exp_models]
for label, cumulative_dose, color in zip(labels, cumulative_doses, colors):
self.ax2.plot(ts[:-1], cumulative_dose, label=label, color=color, linestyle="dotted")
concentration_top = max([max(concentration) for concentration in concentrations])
self.ax.set_ylim(bottom=0., top=concentration_top)
cumulative_top = max([max(cumulative_dose) for cumulative_dose in cumulative_doses])
self.ax2.set_ylim(bottom=0., top=cumulative_top)
self.ax.legend()
self.figure.canvas.draw()
@ -224,60 +290,126 @@ class ModelWidgets(View):
def _build_widget(self, node):
self.widget.children += (self._build_room(node.concentration_model.room),)
self.widget.children += (self._build_ventilation(node.concentration_model.ventilation),)
self.widget.children += (self._build_infected(node.concentration_model.infected),)
self.widget.children += (self._build_infected(node.concentration_model.infected, node.concentration_model.ventilation),)
self.widget.children += (self._build_exposed(node),)
self.widget.children += (self._build_infectivity(node.concentration_model.infected),)
def _build_exposed(self, node):
return collapsible([widgets.HBox([
return collapsible([widgets.VBox([
self._build_exposed_number(node.exposed),
self._build_mask(node.exposed.mask),
self._build_activity(node.exposed.activity),
self._build_exposed_presence(node.exposed.presence)
])], title="Exposed")
def _build_infected(self, node):
return collapsible([widgets.HBox([
def _build_infected(self, node, ventilation_node):
return collapsible([widgets.VBox([
self._build_infected_number(node),
self._build_mask(node.mask),
self._build_activity(node.activity),
self._build_expiration(node.expiration),
self._build_viral_load(node.virus),
self._build_infected_presence(node.presence, ventilation_node.active),
self._build_infectivity(node)
])], title="Infected")
def _build_room(self, node):
room_volume = widgets.IntSlider(value=node.volume, min=10, max=150)
inside_temp = widgets.IntSlider(value=node.inside_temp.values[0]-273.15, min=15., max=25.)
def _build_room_volume(self, node):
room_volume = widgets.IntText(value=node.volume, min=10, max=500, step=5)
def on_volume_change(change):
node.volume = change['new']
# TODO: Link the state back to the widget, not just the other way around.
room_volume.observe(on_volume_change, names=['value'])
return widgets.HBox([widgets.Label('Room volume (m³)'), room_volume], layout=widgets.Layout(justify_content='space-between'))
def _build_room_area(self, node):
room_surface = widgets.IntText(value=25, min=1, max=200, step=10)
room_ceiling_height = widgets.IntText(value=3, min=1, max=20, step=1)
displayed_volume=widgets.Label('75')
def on_room_surface_change(change):
node.volume = change['new']*room_ceiling_height.value
displayed_volume.value=str(node.volume)
def on_room_ceiling_height_change(change):
node.volume = change['new']*room_surface.value
displayed_volume.value=str(node.volume)
room_surface.observe(on_room_surface_change, names=['value'])
room_ceiling_height.observe(on_room_ceiling_height_change, names=['value'])
return widgets.VBox([widgets.HBox([widgets.Label('Room surface area (m²) '), room_surface],
layout=widgets.Layout(justify_content='space-between', width='100%')),
widgets.HBox([widgets.Label('Room ceiling height (m)'), room_ceiling_height],
layout=widgets.Layout(justify_content='space-between', width='100%')),
widgets.HBox([widgets.Label('Total volume :'), displayed_volume, widgets.Label('')])])
def _build_room(self,node):
room_number = widgets.Text(value='', placeholder='653/R-004', disabled=False) #not linked to volume yet
room_widgets={
'Volume': self._build_room_volume(node),
'Room area and height': self._build_room_area(node)
}
for name, widget in room_widgets.items():
widget.layout.visible = False
widget.layout.display = 'none'
room_w = widgets.RadioButtons(
options= list(zip(['Volume', 'Room area and height'], room_widgets.keys())),
button_style='info',
layout=widgets.Layout(height='auto', width='auto'),
)
def toggle_room(value):
for name, widget in room_widgets.items():
widget.layout.visible = False
widget.layout.display = 'none'
widget = room_widgets[value]
widget.layout.visible = True
widget.layout.display = 'flex'
room_w.observe(lambda event: toggle_room(event['new']), 'value')
toggle_room(room_w.value)
humidity = widgets.FloatSlider(value = node.humidity*100, min=20, max=80, step=5)
inside_temp = widgets.IntSlider(value=node.inside_temp.values[0]-273.15, min=15., max=25.)
def on_humidity_change(change):
node.humidity = change['new']/100
def on_insidetemp_change(change):
node.inside_temp.values = (change['new']+273.15,)
# TODO: Link the state back to the widget, not just the other way around.
room_volume.observe(on_volume_change, names=['value'])
humidity.observe(on_humidity_change, names=['value'])
inside_temp.observe(on_insidetemp_change, names=['value'])
def on_state_change():
room_volume.value = node.volume
inside_temp.value = node.inside_temp
node.dcs_observe(on_state_change)
widget = collapsible(
[widget_group(
[[widgets.Label('Room volume (m³)'), room_volume],
[widgets.Label('Inside temperature (℃)'), inside_temp],
]
)],
title='Specification of workplace',
[ widgets.VBox([
widgets.HBox([
widgets.Label('Room number'), room_number],
layout=widgets.Layout(width='100%', justify_content='space-between')),
room_w, widgets.VBox(list(room_widgets.values())),
widgets.HBox([widgets.Label('Inside temperature (℃)'), inside_temp],
layout=widgets.Layout(width='100%', justify_content='space-between')),
widgets.HBox([widgets.Label('Indoor relative humidity (%)'), humidity],
layout=widgets.Layout(width='100%', justify_content='space-between')),
])]
, title="Specification of workspace"
)
return widget
def _build_outsidetemp(self, node) -> WidgetGroup:
outside_temp = widgets.IntSlider(value=10, min=-10, max=30)
def outsidetemp_change(change):
def on_outsidetemp_change(change):
node.values = (change['new'] + 273.15, )
outside_temp.observe(outsidetemp_change, names=['value'])
outside_temp.observe(on_outsidetemp_change, names=['value'])
auto_width = widgets.Layout(width='auto')
return WidgetGroup(
(
@ -288,9 +420,58 @@ class ModelWidgets(View):
),
)
def _build_hinged_window(self, node):
hinged_window = widgets.FloatSlider(value=node.window_width, min=0.1, max=2, step=0.1)
def on_hinged_window_change(change):
node.window_width = change['new']
# TODO: Link the state back to the widget, not just the other way around.
hinged_window.observe(on_hinged_window_change, names=['value'])
return widgets.HBox([widgets.Label('Window width (meters) '), hinged_window], layout=widgets.Layout(justify_content='space-between', width='100%'))
def _build_sliding_window(self, node):
return widgets.HBox([])
def _build_window(self, node) -> WidgetGroup:
window_widgets = {
'Natural': self._build_sliding_window(node._states['Natural']),
'Hinged window': self._build_hinged_window(node._states['Hinged window']),
}
for name, widget in window_widgets.items():
widget.layout.visible = False
widget.layout.display = 'none'
window_w = widgets.RadioButtons(
options= list(zip(['Sliding window', 'Hinged window'], window_widgets.keys())),
button_style='info',
layout=widgets.Layout(height='auto', width='auto'),
)
def toggle_window(value):
for name, widget in window_widgets.items():
widget.layout.visible = False
widget.layout.display = 'none'
node.dcs_select(value)
widget = window_widgets[value]
widget.layout.visible = True
widget.layout.display = 'flex'
window_w.observe(lambda event: toggle_window(event['new']), 'value')
toggle_window(window_w.value)
number_of_windows= widgets.IntText(value= 1, min= 0, max= 5, step=1)
period = widgets.IntSlider(value=node.active.period, min=0, max=240)
interval = widgets.IntSlider(value=node.active.duration, min=0, max=240)
opening_length = widgets.FloatSlider(value=node.opening_length, min=0, max=3, step=0.1)
window_height = widgets.FloatSlider(value=node.window_height, min=0, max=3, step=0.1)
def on_value_change(change):
node.number_of_windows = change['new']
def on_period_change(change):
node.active.period = change['new']
@ -298,16 +479,25 @@ class ModelWidgets(View):
def on_interval_change(change):
node.active.duration = change['new']
def on_opening_length_change(change):
node.opening_length = change['new']
def on_window_height_change(change):
node.window_height = change['new']
# TODO: Link the state back to the widget, not just the other way around.
number_of_windows.observe(on_value_change, names=['value'])
period.observe(on_period_change, names=['value'])
interval.observe(on_interval_change, names=['value'])
opening_length.observe(on_opening_length_change, names=['value'])
window_height.observe(on_window_height_change, names=['value'])
outsidetemp_widgets = {
'Fixed': self._build_outsidetemp(node.outside_temp),
'Daily variation': self._build_month(node),
'Meteorological data': self._build_month(node),
}
outsidetemp_w = widgets.ToggleButtons(
outsidetemp_w = widgets.Dropdown(
options=outsidetemp_widgets.keys(),
)
@ -321,9 +511,21 @@ class ModelWidgets(View):
outsidetemp_w.observe(lambda event: toggle_outsidetemp(event['new']), 'value')
toggle_outsidetemp(outsidetemp_w.value)
auto_width = widgets.Layout(width='auto')
auto_width = widgets.Layout(width='auto', justify_content='space-between')
result = WidgetGroup(
(
(
widgets.Label('Number of windows ', layout=auto_width),
number_of_windows,
),
(
widgets.Label('Opening distance (meters)', layout=auto_width),
opening_length,
),
(
widgets.Label('Window height (meters)', layout=auto_width),
window_height,
),
(
widgets.Label('Interval between openings (minutes)', layout=auto_width),
period,
@ -340,24 +542,58 @@ class ModelWidgets(View):
)
for sub_group in outsidetemp_widgets.values():
result.add_pairs(sub_group.pairs())
return result
return widgets.VBox([window_w, widgets.HBox(list(window_widgets.values())), result.build()])
def _build_mechanical(self, node):
q_air_mech = widgets.IntSlider(value=node.q_air_mech, min=0, max=1000, step=5)
def _build_q_air_mech(self, node):
q_air_mech = widgets.FloatSlider(value=node.q_air_mech, min=0, max=5000, step=25)
def q_air_mech_change(change):
def on_q_air_mech_change(change):
node.q_air_mech = change['new']
# TODO: Link the state back to the widget, not just the other way around.
q_air_mech.observe(q_air_mech_change, names=['value'])
q_air_mech.observe(on_q_air_mech_change, names=['value'])
auto_width = widgets.Layout(width='auto')
return widgets.VBox([widget_group([
[
widgets.Label('Flow rate (m³/h)', layout=auto_width),
q_air_mech,
],
])])
return widgets.HBox([q_air_mech, widgets.Label('m³/h')])
def _build_ach(self, node):
air_exch = widgets.IntSlider(value=node.air_exch, min=0, max=20, step=1)
def on_air_exch_change(change):
node.air_exch = change['new']
# TODO: Link the state back to the widget, not just the other way around.
air_exch.observe(on_air_exch_change, names=['value'])
return widgets.HBox([air_exch, widgets.Label('h⁻¹')])
def _build_mechanical(self, node):
mechanical_widgets = {
'HVACMechanical': self._build_q_air_mech(node._states['HVACMechanical']),
'AirChange': self._build_ach(node._states['AirChange']),
}
for name, widget in mechanical_widgets.items():
widget.layout.visible = False
mechanival_w = widgets.RadioButtons(
options=list(zip(['Air supply flow rate (m³/h)', 'Air changes per hour (h⁻¹)'], mechanical_widgets.keys())),
button_style='info',
)
def toggle_mechanical(value):
for name, widget in mechanical_widgets.items():
widget.layout.visible = False
widget.layout.display = 'none'
node.dcs_select(value)
widget = mechanical_widgets[value]
widget.layout.visible = True
widget.layout.display = 'flex'
mechanival_w.observe(lambda event: toggle_mechanical(event['new']), 'value')
toggle_mechanical(mechanival_w.value)
return widgets.VBox([mechanival_w, widgets.HBox(list(mechanical_widgets.values()))])
def _build_month(self, node) -> WidgetGroup:
@ -378,47 +614,106 @@ class ModelWidgets(View):
for name, activity_ in models.Activity.types.items():
if activity == activity_:
break
activity = widgets.Select(options=list(models.Activity.types.keys()), value=name)
activity = widgets.Dropdown(options=list(models.Activity.types.keys()), value=name)
def on_activity_change(change):
act = models.Activity.types[change['new']]
node.dcs_update_from(act)
activity.observe(on_activity_change, names=['value'])
return widget_group(
[[widgets.Label("Activity"), activity]]
)
return widgets.HBox([widgets.Label("Activity"), activity], layout=widgets.Layout(justify_content='space-between'))
def _build_mask(self, node):
mask = node.dcs_instance()
for name, mask_ in models.Mask.types.items():
if mask == mask_:
break
mask_choice = widgets.Select(options=list(models.Mask.types.keys()), value=name)
mask_choice = widgets.Dropdown(options=list(models.Mask.types.keys()), value=name)
def on_mask_change(change):
node.dcs_select(change['new'])
mask_choice.observe(on_mask_change, names=['value'])
return widget_group(
[[widgets.Label("Mask"), mask_choice]]
)
return widgets.HBox([widgets.Label("Mask"), mask_choice], layout=widgets.Layout(justify_content='space-between'))
def _build_exposed_number(self, node):
number = widgets.IntSlider(value=node.number, min=1, max=200, step=1)
def on_exposed_number_change(change):
node.number = change['new']
# TODO: Link the state back to the widget, not just the other way around.
number.observe(on_exposed_number_change, names=['value'])
return widgets.HBox([widgets.Label('Number of exposed people in the room '), number], layout=widgets.Layout(justify_content='space-between'))
def _build_exposed_presence(self, node):
presence_start = widgets.FloatRangeSlider(value = node.present_times[0], min = 8., max=13., step=0.1)
presence_finish = widgets.FloatRangeSlider(value = node.present_times[1], min = 13., max=18., step=0.1)
def on_presence_start_change(change):
node.present_times = (change['new'], presence_finish.value)
def on_presence_finish_change(change):
node.present_times = (presence_start.value, change['new'])
presence_start.observe(on_presence_start_change, names=['value'])
presence_finish.observe(on_presence_finish_change, names=['value'])
return widgets.HBox([widgets.Label('Exposed presence'), presence_start, presence_finish], layout = widgets.Layout(justify_content='space-between'))
def _build_infected_number(self, node):
number = widgets.IntSlider(value=node.number, min=1, max=200, step=1)
def on_infected_number_change(change):
node.number = change['new']
# TODO: Link the state back to the widget, not just the other way around.
number.observe(on_infected_number_change, names=['value'])
return widgets.HBox([widgets.Label('Number of infected people in the room '), number], layout=widgets.Layout(justify_content='space-between'))
def _build_expiration(self, node):
expiration = node.dcs_instance()
for name, expiration_ in models.Expiration.types.items():
if expiration == expiration_:
break
expiration_choice = widgets.Select(options=list(models.Expiration.types.keys()), value=name)
expiration_choice = widgets.Dropdown(options=list(models.Expiration.types.keys()), value=name)
def on_expiration_change(change):
expiration = models.Expiration.types[change['new']]
node.dcs_update_from(expiration)
expiration_choice.observe(on_expiration_change, names=['value'])
return widget_group(
[[widgets.Label("Expiration"), expiration_choice]]
)
return widgets.HBox([widgets.Label("Expiration"), expiration_choice], layout=widgets.Layout(justify_content='space-between'))
def _build_viral_load(self, node):
viral_load_in_sputum = widgets.Text(continuous_update=False, value=("{:.2e}".format(node.viral_load_in_sputum)))
def on_viral_load_change(change):
viral_load_in_sputum.value = "{:.2e}".format(float(change['new']))
node.viral_load_in_sputum = float(viral_load_in_sputum.value)
viral_load_in_sputum.observe(on_viral_load_change, names=['value'])
return widgets.HBox([widgets.Label("Viral load (copies/ml)"), viral_load_in_sputum], layout=widgets.Layout(justify_content='space-between'))
def _build_infected_presence(self, node, ventilation_node):
presence_start = widgets.FloatRangeSlider(value = node.present_times[0], min = 8., max=13., step=0.1)
presence_finish = widgets.FloatRangeSlider(value = node.present_times[1], min = 13., max=18., step=0.1)
def on_presence_start_change(change):
node.present_times = (change['new'], presence_finish.value)
ventilation_node.start = change['new'][0]
def on_presence_finish_change(change):
node.present_times = (presence_start.value, change['new'])
presence_start.observe(on_presence_start_change, names=['value'])
presence_finish.observe(on_presence_finish_change, names=['value'])
return widgets.HBox([widgets.Label('Infected presence'), presence_start, presence_finish], layout = widgets.Layout(justify_content='space-between'))
def _build_ventilation(
self,
@ -428,14 +723,18 @@ class ModelWidgets(View):
],
) -> widgets.Widget:
ventilation_widgets = {
'Natural': self._build_window(node._states['Natural']).build(),
'Mechanical': self._build_mechanical(node._states['Mechanical']),
'Natural': self._build_window(node),
'HVACMechanical': self._build_mechanical(node),
'HEPAFilter': self._build_HEPA(node),
}
keys=[("Natural", "Natural"), ("Mechanical", "HVACMechanical"), ("No ventilation", "No ventilation"), ("HEPA Filter", "HEPAFilter")]
for name, widget in ventilation_widgets.items():
widget.layout.visible = False
ventilation_w = widgets.ToggleButtons(
options=ventilation_widgets.keys(),
ventilation_w = widgets.Dropdown(
options=keys,
)
def toggle_ventilation(value):
@ -453,37 +752,74 @@ class ModelWidgets(View):
toggle_ventilation(ventilation_w.value)
w = collapsible(
[widget_group([[widgets.Label('Ventilation type'), ventilation_w]])]
([widgets.HBox([widgets.Label('Ventilation type'), ventilation_w], layout=widgets.Layout(justify_content='space-between'))])
+ list(ventilation_widgets.values()),
title='Ventilation scheme',
)
return w
def _build_HEPA(
self,
node,
) -> widgets.Widget:
HEPA_w = widgets.FloatSlider(value=node.q_air_mech, min=10, max=500, step=5)
def on_value_change(change):
node.q_air_mech=change['new']
HEPA_w.observe(on_value_change,names= ['value'])
return widgets.HBox([widgets.Label('HEPA Filtration (m³/h) '),HEPA_w], layout=widgets.Layout(justify_content='space-between'))
def _build_infectivity(self, node):
return collapsible([widgets.HBox([
return collapsible([widgets.VBox([
self._build_virus(node.virus),
])], title="Virus variant")
])], title="Virus data")
def _build_virus(self, node):
virus = node.dcs_instance()
for name, virus_ in models.Virus.types.items():
if virus == virus_:
if node.dcs_instance() == virus_:
break
virus_choice = widgets.Select(options=list(models.Virus.types.keys()), value=name)
virus_choice = widgets.Dropdown(options=list(models.Virus.types.keys()), value=name)
transmissibility_factor = widgets.FloatSlider(value=node.transmissibility_factor, min=0, max=1, step=0.1)
infectious_dose = widgets.FloatText(value=node.infectious_dose, placeholder='50', disabled=False)
def on_virus_change(change):
node.dcs_select(change['new'])
virus = models.Virus.types[change['new']]
node.dcs_update_from(virus)
transmissibility_factor.value = virus.transmissibility_factor
infectious_dose.value = virus.infectious_dose
def on_transmissibility_change(change):
virus = models.SARSCoV2(viral_load_in_sputum=node.dcs_instance().viral_load_in_sputum, infectious_dose=infectious_dose.value,
viable_to_RNA_ratio=0.5, transmissibility_factor=change['new'])
node.dcs_update_from(virus)
if (transmissibility_factor.value != models.Virus.types[virus_choice.value].transmissibility_factor):
virus_choice.options = list(models.Virus.types.keys()) + ["Custom"]
virus_choice.value = "Custom"
def on_infectious_dose_change(change):
virus = models.SARSCoV2(viral_load_in_sputum=node.dcs_instance().viral_load_in_sputum, infectious_dose=change['new'],
viable_to_RNA_ratio=0.5, transmissibility_factor=transmissibility_factor.value)
node.dcs_update_from(virus)
if (infectious_dose.value != models.Virus.types[virus_choice.value].infectious_dose):
virus_choice.options.append("Custom")
virus_choice.value = "Custom"
virus_choice.observe(on_virus_change, names=['value'])
transmissibility_factor.observe(on_transmissibility_change, names=['value'])
infectious_dose.observe(on_infectious_dose_change, names=['value'])
return widget_group(
[[widgets.Label("Virus"), virus_choice]]
)
space_between=widgets.Layout(justify_content='space-between')
return widgets.VBox([
widgets.HBox([widgets.Label("Virus"), virus_choice], layout=space_between),
widgets.HBox([widgets.Label("Tansmissibility factor "), transmissibility_factor], layout=space_between),
widgets.HBox([widgets.Label("Infectious dose "), infectious_dose], layout=space_between)])
def present(self):
return self.widget
baseline_model = models.ExposureModel(
concentration_model=models.ConcentrationModel(
room=models.Room(volume=75, inside_temp=models.PiecewiseConstant((0., 24.), (293.15,))),
@ -525,23 +861,40 @@ class CARAStateBuilder(state.StateBuilder):
choices=models.Mask.types,
)
def build_type_Virus(self, _: dataclasses.Field):
return state.DataclassStatePredefined(
models.Virus,
choices=models.Virus.types,
)
def build_type__VentilationBase(self, _: dataclasses.Field):
s: state.DataclassStateNamed = state.DataclassStateNamed(
states={
'Natural': self.build_generic(models.WindowOpening),
'Mechanical': self.build_generic(models.HVACMechanical),
'No ventilation': self.build_generic(models.AirChange),
'HVACMechanical': self.build_generic(models.HVACMechanical),
'AirChange': self.build_generic(models.AirChange),
'Hinged window': self.build_generic(models.WindowOpening),
'HEPAFilter': self.build_generic(models.HEPAFilter),
},
state_builder=self,
)
# Initialise the HVAC state
s._states['Mechanical'].dcs_update_from(
models.HVACMechanical(models.PeriodicInterval(period=24*60, duration=24*60), 500.)
#Initialise the "Hinged window" state
s._states['Hinged window'].dcs_update_from(
models.HingedWindow(active=models.PeriodicInterval(period=120, duration=15),
outside_temp=models.PiecewiseConstant((0,24.), (283.15,)),
window_height=1.6, opening_length=0.6,
window_width=10.
),
)
# Initialise the "HVAC" state
s._states['HVACMechanical'].dcs_update_from(
models.HVACMechanical(active=models.PeriodicInterval(period=24*60, duration=24*60), q_air_mech=500.)
)
s._states['AirChange'].dcs_update_from(
models.AirChange(models.PeriodicInterval(period=24*60, duration=24*60), 10.)
)
# Initialize the "No ventilation" state
s._states['No ventilation'].dcs_update_from(
models.AirChange(active=models.PeriodicInterval(period=60, duration=60), air_exch=0.) #will need to add the residual air change of 0.25
)
s._states['HEPAFilter'].dcs_update_from(
models.HEPAFilter(active=models.PeriodicInterval(period=60, duration=60), q_air_mech=500.)
)
return s
@ -730,11 +1083,11 @@ class MultiModelView(View):
return widgets.VBox(children=(buttons, rename_text_field))
def models_start_end(models: typing.Sequence[models.ConcentrationModel]) -> typing.Tuple[float, float]:
def models_start_end(models: typing.Sequence[models.ExposureModel]) -> typing.Tuple[float, float]:
"""
Returns the earliest start and latest end time of a collection of ConcentrationModel objects
"""
infected_start = min(model.infected.presence.boundaries()[0][0] for model in models)
infected_finish = min(model.infected.presence.boundaries()[-1][1] for model in models)
infected_start = min(model.concentration_model.infected.presence.boundaries()[0][0] for model in models)
infected_finish = min(model.concentration_model.infected.presence.boundaries()[-1][1] for model in models)
return infected_start, infected_finish

View file

@ -5,10 +5,7 @@
"metadata": {},
"source": [
"<div style=\"text-align: center;\" align=\"center\" >\n",
"<img src=\"../files/static/images/cara_logo_text_cern.png\" align=\"center\" style=\"text-align: center; height: 12em;\" /></div>\n",
"<p style=\"color: blue; font-weight: bold; font-size: x-large; text-align: center;\" align=\"center\">\n",
"Airborne Transmission of SARS-CoV-2\n",
"</p>\n",
"<img src=\"files/header_image.png\" style=\"height:8em\"></img></div>\n",
"<p style=\"text-align: center;\">\n",
"Please see the <a href=\"https://cara.web.cern.ch/\">CARA homepage</a> for details on the methodology, assumptions and limitations of CARA.</p>"
]

Binary file not shown.

After

Width:  |  Height:  |  Size: 65 KiB