Merge branch 'master' into feature/expert_app_header

This commit is contained in:
Germain Personne 2022-05-13 10:39:04 +02:00
commit 71b6c59e65
25 changed files with 652 additions and 302 deletions

View file

@ -38,7 +38,11 @@ Andre Henriques<sup>1</sup>, Luis Aleixo<sup>1</sup>, Marco Andreini<sup>1</sup>
### Reference and Citation
**For the use of the CARA web app**
CARA COVID Airborne Risk Assessment tool
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.6520432.svg)](https://doi.org/10.5281/zenodo.6520432)
© Copyright 2020-2021 CERN. All rights not expressly granted are reserved.
**For use of the model**

View file

@ -22,7 +22,7 @@ import tornado.log
from . import markdown_tools
from . import model_generator
from .report_generator import ReportGenerator
from .report_generator import ReportGenerator, calculate_report_data
from .user import AuthenticatedUser, AnonymousUser
@ -33,7 +33,7 @@ from .user import AuthenticatedUser, AnonymousUser
# calculator version. If the calculator needs to make breaking changes (e.g. change
# form attributes) then it can also increase its MAJOR version without needing to
# increase the overall CARA version (found at ``cara.__version__``).
__version__ = "4.1.1"
__version__ = "4.1.2"
class BaseRequestHandler(RequestHandler):
@ -129,6 +129,44 @@ class ConcentrationModel(BaseRequestHandler):
self.finish(report)
class ConcentrationModelJsonResponse(BaseRequestHandler):
def check_xsrf_cookie(self):
"""
This request handler implements a stateless API that returns report data in JSON format.
Thus, XSRF cookies are disabled by overriding base class implementation of this method with a pass statement.
"""
pass
async def post(self):
"""
Expects algorithm input in HTTP POST request body in JSON format.
Returns report data (algorithm output) in HTTP POST response body in JSON format.
"""
requested_model_config = json.loads(self.request.body)
if self.settings.get("debug", False):
from pprint import pprint
pprint(requested_model_config)
try:
form = model_generator.FormData.from_dict(requested_model_config)
except Exception as err:
if self.settings.get("debug", False):
import traceback
print(traceback.format_exc())
response_json = {'code': 400, 'error': f'Your request was invalid {html.escape(str(err))}'}
self.set_status(400)
await self.finish(json.dumps(response_json))
return
executor = loky.get_reusable_executor(
max_workers=self.settings['handler_worker_pool_size'],
timeout=300,
)
report_data_task = executor.submit(calculate_report_data, form, form.build_model())
report_data: dict = await asyncio.wrap_future(report_data_task)
await self.finish(report_data)
class StaticModel(BaseRequestHandler):
async def get(self):
form = model_generator.FormData.from_dict(model_generator.baseline_raw_form_data())
@ -226,6 +264,7 @@ def make_app(
(r'/static/(.*)', StaticFileHandler, {'path': static_dir}),
(calculator_prefix + r'/?', CalculatorForm),
(calculator_prefix + r'/report', ConcentrationModel),
(calculator_prefix + r'/report-json', ConcentrationModelJsonResponse),
(calculator_prefix + r'/baseline-model/result', StaticModel),
(calculator_prefix + r'/user-guide', ReadmeHandler),
(calculator_prefix + r'/static/(.*)', StaticFileHandler, {'path': calculator_static_dir}),

View file

@ -45,6 +45,7 @@ class FormData:
floor_area: float
hepa_amount: float
hepa_option: bool
humidity: str
infected_coffee_break_option: str #Used if infected_dont_have_breaks_with_exposed
infected_coffee_duration: int #Used if infected_dont_have_breaks_with_exposed
infected_dont_have_breaks_with_exposed: bool
@ -54,6 +55,7 @@ class FormData:
infected_lunch_start: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
infected_people: int
infected_start: minutes_since_midnight
inside_temp: float
location_name: str
location_latitude: float
location_longitude: float
@ -100,6 +102,7 @@ class FormData:
'floor_area': 0.,
'hepa_amount': 0.,
'hepa_option': False,
'humidity': '',
'infected_coffee_break_option': 'coffee_break_0',
'infected_coffee_duration': 5,
'infected_dont_have_breaks_with_exposed': False,
@ -109,6 +112,7 @@ class FormData:
'infected_lunch_start': '12:30',
'infected_people': _NO_DEFAULT,
'infected_start': '08:30',
'inside_temp': 293.,
'location_latitude': _NO_DEFAULT,
'location_longitude': _NO_DEFAULT,
'location_name': _NO_DEFAULT,
@ -240,11 +244,14 @@ class FormData:
volume = self.room_volume
else:
volume = self.floor_area * self.ceiling_height
if self.room_heating_option:
humidity = 0.3
if self.humidity == '':
if self.room_heating_option:
humidity = 0.3
else:
humidity = 0.5
else:
humidity = 0.5
room = models.Room(volume=volume, humidity=humidity)
humidity = float(self.humidity)
room = models.Room(volume=volume, inside_temp=models.PiecewiseConstant((0, 24), (self.inside_temp,)), humidity=humidity)
infected_population = self.infected_population()
@ -324,18 +331,16 @@ class FormData:
# Initializes a ventilation instance as a window if 'natural_ventilation' is selected, or as a HEPA-filter otherwise
if self.ventilation_type == 'natural_ventilation':
if self.window_opening_regime == 'windows_open_periodically':
window_interval = models.PeriodicInterval(self.windows_frequency, self.windows_duration, min(self.infected_start, self.exposed_start))
window_interval = models.PeriodicInterval(self.windows_frequency, self.windows_duration, min(self.infected_start, self.exposed_start)/60)
else:
window_interval = always_on
outside_temp = self.outside_temp()
inside_temp = models.PiecewiseConstant((0, 24), (293,))
ventilation: models.Ventilation
if self.window_type == 'window_sliding':
ventilation = models.SlidingWindow(
active=window_interval,
inside_temp=inside_temp,
outside_temp=outside_temp,
window_height=self.window_height,
opening_length=self.opening_distance,
@ -344,7 +349,6 @@ class FormData:
elif self.window_type == 'window_hinged':
ventilation = models.HingedWindow(
active=window_interval,
inside_temp=inside_temp,
outside_temp=outside_temp,
window_height=self.window_height,
window_width=self.window_width,
@ -672,7 +676,7 @@ def build_expiration(expiration_definition) -> mc._ExpirationBase:
return expiration_distribution(BLO_factors=tuple(BLO_factors))
def baseline_raw_form_data():
def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]:
# Note: This isn't a special "baseline". It can be updated as required.
return {
'activity_type': 'office',
@ -689,6 +693,7 @@ def baseline_raw_form_data():
'floor_area': '',
'hepa_amount': '250',
'hepa_option': '0',
'humidity': '',
'infected_coffee_break_option': 'coffee_break_4',
'infected_coffee_duration': '10',
'infected_dont_have_breaks_with_exposed': '1',
@ -698,6 +703,7 @@ def baseline_raw_form_data():
'infected_lunch_start': '12:30',
'infected_people': '1',
'infected_start': '09:00',
'inside_temp': 293.,
'location_latitude': 46.20833,
'location_longitude': 6.14275,
'location_name': 'Geneva',

View file

@ -109,7 +109,7 @@ def concentrations_with_sr_breathing(form: FormData, model: models.ExposureModel
return lower_concentrations
def calculate_report_data(form: FormData, model: models.ExposureModel):
def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing.Dict[str, typing.Any]:
times = interesting_times(model)
short_range_intervals = [interaction.presence.boundaries()[0] for interaction in model.short_range]
short_range_expirations = [interaction['expiration'] for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else []

View file

@ -783,6 +783,9 @@ $(document).ready(function () {
templateSelection: formatLocationSelection
});
// Logic for the API requests. Always set humity input as the empty string so that we can profit from the "room_heating_option default" values for humidity.
$("[name='humidity']").val("");
function formatlocation(suggestedLocation) {
// Function is called for each location from the geocoding API.

View file

@ -7,7 +7,6 @@ import ipywidgets as widgets
import matplotlib
import matplotlib.figure
import numpy as np
import mplcursors
from matplotlib import pyplot as plt
from cara import data, models, state
import matplotlib.lines as mlines
@ -166,7 +165,6 @@ class ExposureModelResult(View):
else:
self.ax.ignore_existing_data_limits = False
self.concentration_line.set_data(ts, concentration)
mplcursors.cursor(self.ax, hover=True)
if self.concentration_area is None:
self.concentration_area = self.ax.fill_between(x = ts, y1=0, y2=concentration, color="#96cbff", label="Exposed person presence",
@ -205,7 +203,7 @@ class ExposureModelResult(View):
figure_legends = [mlines.Line2D([], [], color='#3530fe', markersize=15, label='Mean concentration'),
mlines.Line2D([], [], color='#0000c8', markersize=15, ls="dotted", label='Cumulative dose'),
patches.Patch(edgecolor="#96cbff", facecolor='#96cbff', label='Presence of exposed person(s)')]
self.figure.legend(handles=figure_legends)
self.ax.legend(handles=figure_legends)
self.figure.canvas.draw()
@ -350,14 +348,13 @@ class ModelWidgets(View):
])], title="Infected")
def _build_room_volume(self, node):
room_volume = widgets.IntText(value=node.volume, min=10, max=500
, step=5)
room_volume = widgets.IntText(value=node.volume, min=10, max=500, step=5)
def on_value_change(change):
def on_volume_change(change):
node.volume = change['new']
# TODO: Link the state back to the widget, not just the other way around.
room_volume.observe(on_value_change, names=['value'])
room_volume.observe(on_volume_change, names=['value'])
return widgets.HBox([widgets.Label('Room volume (m³)'), room_volume], layout=widgets.Layout(justify_content='space-between'))
@ -367,22 +364,22 @@ class ModelWidgets(View):
room_ceiling_height = widgets.IntText(value=3, min=1, max=20, step=1)
displayed_volume=widgets.Label('1')
def room_surface_change(change):
def on_room_surface_change(change):
node.volume = change['new']*room_ceiling_height.value
displayed_volume.value=str(node.volume)
def room_ceiling_height_change(change):
def on_room_ceiling_height_change(change):
node.volume = change['new']*room_surface.value
displayed_volume.value=str(node.volume)
room_surface.observe(room_surface_change, names=['value'])
room_ceiling_height.observe(room_ceiling_height_change, names=['value'])
room_surface.observe(on_room_surface_change, names=['value'])
room_ceiling_height.observe(on_room_ceiling_height_change, names=['value'])
return widgets.VBox([widgets.HBox([widgets.Label('Room surface area (m²) '), room_surface]
, layout=widgets.Layout(justify_content='space-between', width='100%'))
, widgets.HBox([widgets.Label('Room ceiling height (m)'), room_ceiling_height]
, layout=widgets.Layout(justify_content='space-between', width='100%'))
, widgets.HBox([widgets.Label('Total volume :'), displayed_volume, widgets.Label('')])])
return widgets.VBox([widgets.HBox([widgets.Label('Room surface area (m²) '), room_surface],
layout=widgets.Layout(justify_content='space-between', width='100%')),
widgets.HBox([widgets.Label('Room ceiling height (m)'), room_ceiling_height],
layout=widgets.Layout(justify_content='space-between', width='100%')),
widgets.HBox([widgets.Label('Total volume :'), displayed_volume, widgets.Label('')])])
def _build_room(self,node):
room_number = widgets.Text(value='', placeholder='653/R-004', disabled=False) #not linked to volume yet
@ -414,20 +411,27 @@ class ModelWidgets(View):
toggle_room(room_w.value)
humidity = widgets.FloatSlider(value = node.humidity, min=0, max=1, step=0.01)
inside_temp = widgets.IntSlider(value=node.inside_temp.values[0]-273.15, min=15., max=25.)
def humidity_change(change):
def on_humidity_change(change):
node.humidity = change['new']
humidity.observe(humidity_change, names=['value'])
def on_insidetemp_change(change):
node.inside_temp.values = (change['new']+273.15,)
humidity.observe(on_humidity_change, names=['value'])
inside_temp.observe(on_insidetemp_change, names=['value'])
widget = collapsible(
[ widgets.VBox([
widgets.HBox([
widgets.Label('Room number '), room_number]
, layout=widgets.Layout(width='100%', justify_content='space-between'))
, room_w, widgets.VBox(list(room_widgets.values()))
, widgets.HBox([widgets.Label('Indoor relative humidity '),humidity]
, layout=widgets.Layout(width='100%', justify_content='space-between'))
widgets.Label('Room number'), room_number],
layout=widgets.Layout(width='100%', justify_content='space-between')),
room_w, widgets.VBox(list(room_widgets.values())),
widgets.HBox([widgets.Label('Inside temperature (℃)'), inside_temp],
layout=widgets.Layout(width='100%', justify_content='space-between')),
widgets.HBox([widgets.Label('Indoor relative humidity'), humidity],
layout=widgets.Layout(width='100%', justify_content='space-between')),
])]
, title="Specification of workspace"
)
@ -437,10 +441,10 @@ class ModelWidgets(View):
def _build_outsidetemp(self, node) -> WidgetGroup:
outside_temp = widgets.IntSlider(value=10, min=-10, max=30)
def outsidetemp_change(change):
def on_outsidetemp_change(change):
node.values = (change['new'] + 273.15, )
outside_temp.observe(outsidetemp_change, names=['value'])
outside_temp.observe(on_outsidetemp_change, names=['value'])
auto_width = widgets.Layout(width='auto')
return WidgetGroup(
(
@ -454,11 +458,11 @@ class ModelWidgets(View):
def _build_hinged_window(self, node):
hinged_window = widgets.FloatSlider(value=node.window_width, min=0.1, max=2, step=0.1)
def hinged_window_change(change):
def on_hinged_window_change(change):
node.window_width = change['new']
# TODO: Link the state back to the widget, not just the other way around.
hinged_window.observe(hinged_window_change, names=['value'])
hinged_window.observe(on_hinged_window_change, names=['value'])
return widgets.HBox([widgets.Label('Window width (meters) '), hinged_window], layout=widgets.Layout(justify_content='space-between', width='100%'))
@ -510,22 +514,18 @@ class ModelWidgets(View):
def on_interval_change(change):
node.active.duration = change['new']
def insidetemp_change(change):
node.inside_temp.values = (change['new']+273.15,)
def opening_length_change(change):
def on_opening_length_change(change):
node.opening_length = change['new']
def window_height_change(change):
def on_window_height_change(change):
node.window_height = change['new']
# TODO: Link the state back to the widget, not just the other way around.
number_of_windows.observe(on_value_change, names=['value'])
period.observe(on_period_change, names=['value'])
interval.observe(on_interval_change, names=['value'])
inside_temp.observe(insidetemp_change, names=['value'])
opening_length.observe(opening_length_change, names=['value'])
window_height.observe(window_height_change, names=['value'])
opening_length.observe(on_opening_length_change, names=['value'])
window_height.observe(on_window_height_change, names=['value'])
outsidetemp_widgets = {
'Fixed': self._build_outsidetemp(node.outside_temp),
@ -569,10 +569,6 @@ class ModelWidgets(View):
widgets.Label('Duration of opening (minutes)', layout=auto_width),
interval,
),
(
widgets.Label('Inside temperature (℃)', layout=auto_width),
inside_temp,
),
(
widgets.Label('Outside temperature scheme', layout=auto_width),
outsidetemp_w,
@ -586,22 +582,22 @@ class ModelWidgets(View):
def _build_q_air_mech(self, node):
q_air_mech = widgets.FloatSlider(value=node.q_air_mech, min=0, max=1000, step=5)
def q_air_mech_change(change):
def on_q_air_mech_change(change):
node.q_air_mech = change['new']
# TODO: Link the state back to the widget, not just the other way around.
q_air_mech.observe(q_air_mech_change, names=['value'])
q_air_mech.observe(on_q_air_mech_change, names=['value'])
return widgets.HBox([q_air_mech, widgets.Label('m³/h')])
def _build_ach(self, node):
air_exch = widgets.IntSlider(value=node.air_exch, min=0, max=50, step=5)
def air_exch_change(change):
def on_air_exch_change(change):
node.air_exch = change['new']
# TODO: Link the state back to the widget, not just the other way around.
air_exch.observe(air_exch_change, names=['value'])
air_exch.observe(on_air_exch_change, names=['value'])
return widgets.HBox([air_exch, widgets.Label('h⁻¹')])
@ -679,10 +675,10 @@ class ModelWidgets(View):
def _build_exposed_number(self, node):
number = widgets.IntSlider(value=node.number, min=1, max=200, step=1)
def exposed_number_change(change):
def on_exposed_number_change(change):
node.number = change['new']
# TODO: Link the state back to the widget, not just the other way around.
number.observe(exposed_number_change, names=['value'])
number.observe(on_exposed_number_change, names=['value'])
return widgets.HBox([widgets.Label('Number of exposed people in the room '), number], layout=widgets.Layout(justify_content='space-between'))
@ -704,10 +700,10 @@ class ModelWidgets(View):
def _build_infected_number(self, node):
number = widgets.IntSlider(value=node.number, min=1, max=200, step=1)
def infected_number_change(change):
def on_infected_number_change(change):
node.number = change['new']
# TODO: Link the state back to the widget, not just the other way around.
number.observe(infected_number_change, names=['value'])
number.observe(on_infected_number_change, names=['value'])
return widgets.HBox([widgets.Label('Number of infected people in the room '), number], layout=widgets.Layout(justify_content='space-between'))
@ -729,11 +725,11 @@ class ModelWidgets(View):
def _build_viral_load(self, node):
viral_load_in_sputum = widgets.Text(continuous_update=False, value=("{:.2e}".format(node.viral_load_in_sputum)))
def viral_load_change(change):
def on_viral_load_change(change):
viral_load_in_sputum.value = "{:.2e}".format(float(change['new']))
node.viral_load_in_sputum = float(viral_load_in_sputum.value)
viral_load_in_sputum.observe(viral_load_change, names=['value'])
viral_load_in_sputum.observe(on_viral_load_change, names=['value'])
return widgets.HBox([widgets.Label("Viral load (copies/ml)"), viral_load_in_sputum], layout=widgets.Layout(justify_content='space-between'))
@ -832,14 +828,14 @@ class ModelWidgets(View):
transmissibility_factor.value = virus.transmissibility_factor
infectious_dose.value = virus.infectious_dose
def transmissibility_change(change):
def on_transmissibility_change(change):
virus = models.SARSCoV2(viral_load_in_sputum=ModelWidgets._build_viral_load(self, node).children[1].value, infectious_dose=infectious_dose.value, viable_to_RNA_ratio=0.5, transmissibility_factor=change['new'])
node.dcs_update_from(virus)
if (transmissibility_factor.value != models.Virus.types[virus_choice.value].transmissibility_factor):
virus_choice.options = list(models.Virus.types.keys()) + ["Custom"]
virus_choice.value = "Custom"
def infectious_dose_change(change):
def on_infectious_dose_change(change):
virus = models.SARSCoV2(viral_load_in_sputum=ModelWidgets._build_viral_load(self, node).children[1].value, infectious_dose=change['new'], viable_to_RNA_ratio=0.5, transmissibility_factor=transmissibility_factor.value)
node.dcs_update_from(virus)
if (infectious_dose.value != models.Virus.types[virus_choice.value].infectious_dose):
@ -847,8 +843,8 @@ class ModelWidgets(View):
virus_choice.value = "Custom"
virus_choice.observe(on_virus_change, names=['value'])
transmissibility_factor.observe(transmissibility_change, names=['value'])
infectious_dose.observe(infectious_dose_change, names=['value'])
transmissibility_factor.observe(on_transmissibility_change, names=['value'])
infectious_dose.observe(on_infectious_dose_change, names=['value'])
space_between=widgets.Layout(justify_content='space-between')
return widgets.VBox([
@ -861,10 +857,9 @@ class ModelWidgets(View):
baseline_model = models.ExposureModel(
concentration_model=models.ConcentrationModel(
room=models.Room(volume=75, humidity=0.5),
room=models.Room(volume=75, inside_temp=models.PiecewiseConstant((0., 24.), (293.15,))),
ventilation=models.SlidingWindow(
active=models.PeriodicInterval(period= 120, duration= 15, start=8.0),
inside_temp=models.PiecewiseConstant((0., 24.), (293.15,)),
active=models.PeriodicInterval(period=120, duration=15),
outside_temp=models.PiecewiseConstant((0., 24.), (283.15,)),
window_height=1.6, opening_length=0.6,
),
@ -923,7 +918,6 @@ class CARAStateBuilder(state.StateBuilder):
#Initialise the "Hinged window" state
s._states['Hinged window'].dcs_update_from(
models.HingedWindow(active=models.PeriodicInterval(period=120, duration=15),
inside_temp=models.PiecewiseConstant((0,24.), (293.15,)),
outside_temp=models.PiecewiseConstant((0,24.), (283.15,)),
window_height=1.6, opening_length=0.6,
window_width=10.
@ -1138,4 +1132,4 @@ def models_start_end(models: typing.Sequence[models.ExposureModel]) -> typing.Tu
"""
infected_start = min(model.concentration_model.infected.presence.boundaries()[0][0] for model in models)
infected_finish = min(model.concentration_model.infected.presence.boundaries()[-1][1] for model in models)
return infected_start, infected_finish
return infected_start, infected_finish

View file

@ -137,6 +137,8 @@
<input type="text" name="location_name" value="Geneva, CHE">
<input type="text" name="location_latitude" value="46.20833">
<input type="text" name="location_longitude" value="6.14275">
<input type="text" name="inside_temp" value="293">
<input type="text" name="humidity" value="">
</div>
<hr width="80%">

View file

@ -64,6 +64,7 @@
<b>For use of the CARA web app:</b><br>
<ul>
<li>CARA COVID Airborne Risk Assessment tool</li>
<a href="https://doi.org/10.5281/zenodo.6520432"><img src="https://zenodo.org/badge/DOI/10.5281/zenodo.6520432.svg" alt="DOI"></a><br>
© Copyright 2020-2021 CERN. All rights not expressly granted are reserved.<br>
Licensed under the Apache License, Version 2.0<br>
<a href=https://gitlab.cern.ch/cara/cara/-/blob/master/LICENSE><i>LICENSE</i></a>

View file

@ -57,15 +57,6 @@ _VectorisedFloat = typing.Union[float, np.ndarray]
_VectorisedInt = typing.Union[int, np.ndarray]
@dataclass(frozen=True)
class Room:
#: The total volume of the room
volume: _VectorisedFloat
#: The humidity in the room (from 0 to 1 - e.g. 0.5 is 50% humidity)
humidity: _VectorisedFloat = 0.5
Time_t = typing.TypeVar('Time_t', float, int)
BoundaryPair_t = typing.Tuple[Time_t, Time_t]
BoundarySequence_t = typing.Union[typing.Tuple[BoundaryPair_t, ...], typing.Tuple]
@ -195,6 +186,18 @@ class PiecewiseConstant:
)
@dataclass(frozen=True)
class Room:
#: The total volume of the room
volume: _VectorisedFloat
#: The temperature inside the room (Kelvin).
inside_temp: PiecewiseConstant = PiecewiseConstant((0, 24), (293,))
#: The humidity in the room (from 0 to 1 - e.g. 0.5 is 50% humidity)
humidity: _VectorisedFloat = 0.5
@dataclass(frozen=True)
class _VentilationBase:
"""
@ -207,7 +210,7 @@ class _VentilationBase:
mechanical air exchange through a filter.
"""
def transition_times(self) -> typing.Set[float]:
def transition_times(self, room: Room) -> typing.Set[float]:
raise NotImplementedError("Subclass must implement")
def air_exchange(self, room: Room, time: float) -> _VectorisedFloat:
@ -228,7 +231,7 @@ class Ventilation(_VentilationBase):
#: The interval in which the ventilation is active.
active: Interval
def transition_times(self) -> typing.Set[float]:
def transition_times(self, room: Room) -> typing.Set[float]:
return self.active.transition_times()
@ -243,10 +246,10 @@ class MultipleVentilation(_VentilationBase):
"""
ventilations: typing.Tuple[_VentilationBase, ...]
def transition_times(self) -> typing.Set[float]:
def transition_times(self, room: Room) -> typing.Set[float]:
transitions = set()
for ventilation in self.ventilations:
transitions.update(ventilation.transition_times())
transitions.update(ventilation.transition_times(room))
return transitions
def air_exchange(self, room: Room, time: float) -> _VectorisedFloat:
@ -265,9 +268,6 @@ class WindowOpening(Ventilation):
#: The interval in which the window is open.
active: Interval
#: The temperature inside the room (Kelvin).
inside_temp: PiecewiseConstant
#: The temperature outside of the window (Kelvin).
outside_temp: PiecewiseConstant
@ -292,9 +292,9 @@ class WindowOpening(Ventilation):
"""
raise NotImplementedError("Unknown discharge coefficient")
def transition_times(self) -> typing.Set[float]:
transitions = super().transition_times()
transitions.update(self.inside_temp.transition_times)
def transition_times(self, room: Room) -> typing.Set[float]:
transitions = super().transition_times(room)
transitions.update(room.inside_temp.transition_times)
transitions.update(self.outside_temp.transition_times)
return transitions
@ -304,7 +304,7 @@ class WindowOpening(Ventilation):
return 0.
# Reminder, no dependence on time in the resulting calculation.
inside_temp: _VectorisedFloat = self.inside_temp.value(time)
inside_temp: _VectorisedFloat = room.inside_temp.value(time)
outside_temp: _VectorisedFloat = self.outside_temp.value(time)
# The inside_temperature is forced to be always at least min_deltaT degree
@ -439,28 +439,35 @@ class Virus:
#: Pre-populated examples of Viruses.
types: typing.ClassVar[typing.Dict[str, "Virus"]]
def halflife(self, humidity: _VectorisedFloat) -> _VectorisedFloat:
def halflife(self, humidity: _VectorisedFloat, inside_temp: _VectorisedFloat) -> _VectorisedFloat:
# Biological decay (inactivation of the virus in air) - virus
# dependent and function of humidity
raise NotImplementedError
def decay_constant(self, humidity: _VectorisedFloat) -> _VectorisedFloat:
# Viral inactivation per hour (h^-1) (function of humidity)
return np.log(2) / self.halflife(humidity)
def decay_constant(self, humidity: _VectorisedFloat, inside_temp: _VectorisedFloat) -> _VectorisedFloat:
# Viral inactivation per hour (h^-1) (function of humidity and inside temperature)
return np.log(2) / self.halflife(humidity, inside_temp)
@dataclass(frozen=True)
class SARSCoV2(Virus):
def halflife(self, humidity: _VectorisedFloat) -> _VectorisedFloat:
def halflife(self, humidity: _VectorisedFloat, inside_temp: _VectorisedFloat) -> _VectorisedFloat:
"""
Half-life changes with humidity level. Here is implemented a simple
piecewise constant model (for more details see A. Henriques et al,
CERN-OPEN-2021-004, DOI: 10.17181/CERN.1GDQ.5Y75)
"""
# Taken from Morris et al (https://doi.org/10.7554/eLife.65902) data at T = 22°C and RH = 40 %,
# and from Doremalen et al (https://www.nejm.org/doi/10.1056/NEJMc2004973).
return np.piecewise(humidity, [humidity <= 0.4, humidity > 0.4], [6.43, 1.1])
# Updated to use the formula from Dabish et al. with correction https://doi.org/10.1080/02786826.2020.1829536
# with a maximum at hl = 6.43 (compensate for the negative decay values in the paper).
# Note that humidity is in percentage and inside_temp in °C.
# factor np.log(2) -> decay rate to half-life; factor 60 -> minutes to hours
hl_calc = ((np.log(2)/((0.16030 + 0.04018*(((inside_temp-273.15)-20.615)/10.585)
+0.02176*(((humidity*100)-45.235)/28.665)
-0.14369
-0.02636*((inside_temp-273.15)-20.615)/10.585)))/60)
return np.where(hl_calc <= 0, 6.43, np.minimum(6.43, hl_calc))
Virus.types = {
@ -917,9 +924,9 @@ class ConcentrationModel:
h = 1.5
# Deposition rate (h^-1)
k = (vg * 3600) / h
#todo: Inside_temp needs to be exposed/added to the room;
return (
k + self.virus.decay_constant(self.room.humidity)
k + self.virus.decay_constant(self.room.humidity, self.room.inside_temp.value(time))
+ self.ventilation.air_exchange(self.room, time)
)
@ -950,7 +957,7 @@ class ConcentrationModel:
"""
state_change_times = {0.}
state_change_times.update(self.infected.presence.transition_times())
state_change_times.update(self.ventilation.transition_times())
state_change_times.update(self.ventilation.transition_times(self.room))
return sorted(state_change_times)
@method_cache
@ -1168,28 +1175,70 @@ class ShortRangeModel:
# calculations for the same time (e.g. at state change times).
return self._normed_concentration(concentration_model, time)
def normed_exposure_between_bounds(self, concentration_model: ConcentrationModel, time1: float, time2: float):
@method_cache
def extract_between_bounds(self, time1: float, time2: float) -> typing.Tuple[float,float]:
"""
Get the integrated short-range concentration of viruses in the air between the times start and stop,
normalized by the virus viral load.
Extract the bounds of the interval resulting from the
intersection of [time1, time2] and the presence interval.
If [time1, time2] has nothing common to the presence interval,
we return (0, 0).
Raise an error if time1 and time2 are not in ascending order.
"""
start_bound, stop_bound = self.presence.boundaries()[0]
jet_origin = self.expiration.jet_origin_concentration()
dilution = self.dilution_factor()
if time1>time2:
raise ValueError("time1 must be less or equal to time2")
total_normed_concentration_diluted = (
concentration_model.integrated_concentration(start_bound,
stop_bound)/dilution/
concentration_model.virus.viral_load_in_sputum
start, stop = self.presence.boundaries()[0]
if (stop < time1) or (start > time2):
return (0, 0)
elif start <= time1 and time2<= stop:
return time1, time2
elif start <= time1 and stop < time2:
return time1, stop
elif time1 < start and time2 <= stop:
return start, time2
elif time1 <= start and stop < time2:
return start, stop
def _normed_jet_exposure_between_bounds(self,
concentration_model: ConcentrationModel,
time1: float, time2: float):
"""
Get the part of the integrated short-range concentration of
viruses in the air, between the times start and stop, coming
from the jet concentration, normalized by the viral load, and
without dilution.
"""
start, stop = self.extract_between_bounds(time1, time2)
jet_origin = self.expiration.jet_origin_concentration()
return jet_origin * (stop - start)
def _normed_interpolated_longrange_exposure_between_bounds(
self, concentration_model: ConcentrationModel,
time1: float, time2: float):
"""
Get the part of the integrated short-range concentration due
to the background concentration, normalized by the viral load
and the breathing rate, and without dilution.
One needs to interpolate the integrated long-range concentration
for the particle diameters defined here.
TODO: make sure any potential extrapolation has a
negligible effect.
"""
start, stop = self.extract_between_bounds(time1, time2)
if stop<=start:
return 0.
normed_int_concentration = (
concentration_model.integrated_concentration(start, stop)
/concentration_model.virus.viral_load_in_sputum
/concentration_model.infected.activity.exhalation_rate
)
total_normed_concentration_interpolated = np.interp(
normed_int_concentration_interpolated = np.interp(
self.expiration.particle.diameter,
concentration_model.infected.particle.diameter,
total_normed_concentration_diluted
normed_int_concentration
)
return (jet_origin/dilution * (stop_bound - start_bound)
) - total_normed_concentration_interpolated
return normed_int_concentration_interpolated
@dataclass(frozen=True)
@ -1267,7 +1316,7 @@ class ExposureModel:
# we compute first the mean of all diameter-dependent quantities
# to perform properly the Monte-Carlo integration over
# particle diameters (doing things in another order would
# lead to wrong results).
# lead to wrong results for the probability of infection).
dep_exposure_integrated = np.array(self._long_range_normed_exposure_between_bounds(time1, time2) *
aerosols *
fdep).mean()
@ -1297,46 +1346,45 @@ class ExposureModel:
"""
deposited_exposure = 0.
for interaction in self.short_range:
start, stop = interaction.presence.boundaries()[0]
if stop < time1:
continue
elif start > time2:
break
elif start <= time1 and time2<= stop:
start_bound, stop_bound = time1, time2
elif start <= time1 and stop < time2:
start_bound, stop_bound = time1, stop
elif time1 < start and time2 <= stop:
start_bound, stop_bound = start, time2
elif time1 <= start and stop < time2:
start_bound, stop_bound = start, stop
short_range_exposure = interaction.normed_exposure_between_bounds(self.concentration_model, start_bound, stop_bound)
start, stop = interaction.extract_between_bounds(time1, time2)
short_range_jet_exposure = interaction._normed_jet_exposure_between_bounds(
self.concentration_model, start, stop)
short_range_lr_exposure = interaction._normed_interpolated_longrange_exposure_between_bounds(
self.concentration_model, start, stop)
dilution = interaction.dilution_factor()
fdep = interaction.expiration.particle.fraction_deposited(evaporation_factor=1.0)
diameter = interaction.expiration.particle.diameter
# Aerosols not considered given the formula for the initial concentration at mouth/nose.
# Aerosols not considered given the formula for the initial
# concentration at mouth/nose.
if diameter is not None and not np.isscalar(diameter):
# we compute first the mean of all diameter-dependent quantities
# to perform properly the Monte-Carlo integration over
# particle diameters (doing things in another order would
# lead to wrong results).
deposited_exposure += np.array(short_range_exposure *
fdep).mean()
# lead to wrong results for the probability of infection).
this_deposited_exposure = (np.array(short_range_jet_exposure
* fdep).mean()
- np.array(short_range_lr_exposure * fdep).mean()
* self.concentration_model.infected.activity.exhalation_rate)
else:
# in the case of a single diameter or no diameter defined,
# one should not take any mean at this stage.
deposited_exposure += short_range_exposure*fdep
this_deposited_exposure = (short_range_jet_exposure * fdep
- short_range_lr_exposure * fdep
* self.concentration_model.infected.activity.exhalation_rate)
# multiply by the (diameter-independent) inhalation rate
deposited_exposure *= interaction.activity.inhalation_rate
deposited_exposure += (this_deposited_exposure *
interaction.activity.inhalation_rate
/dilution)
# then we multiply by diameter-independent quantities: viral load
# and fraction of infected virions
f_inf = self.concentration_model.infected.fraction_of_infectious_virus()
deposited_exposure *= (f_inf
* self.concentration_model.virus.viral_load_in_sputum
)
* (1 - self.exposed.mask.inhale_efficiency()))
# long-range concentration
deposited_exposure += self.long_range_deposited_exposure_between_bounds(time1, time2)

View file

@ -5,7 +5,7 @@ import numpy as np
from scipy import special as sp
import cara.monte_carlo as mc
from cara.monte_carlo.sampleable import LogNormal,LogCustomKernel,CustomKernel,Uniform
from cara.monte_carlo.sampleable import LogNormal,LogCustomKernel,CustomKernel,Uniform, Custom
sqrt2pi = np.sqrt(2.*np.pi)
@ -202,5 +202,9 @@ short_range_expiration_distributions = {
}
# Fit from Fig 8 a) "stand-stand" in https://www.mdpi.com/1660-4601/17/4/1445/htm
short_range_distances = LogNormal(-0.269359136417347, 0.4728300188814934)
# Derived from Fig 8 a) "stand-stand" in https://www.mdpi.com/1660-4601/17/4/1445/htm
distances = np.array((0.5,0.6,0.7,0.8,0.9,1,1.1,1.2,1.3,1.4,1.5,1.6,1.7,1.8,1.9,2))
frequencies = np.array((0.0598036,0.0946154,0.1299152,0.1064905,0.1099066,0.0998209, 0.0845298,0.0479286,0.0406084,0.039795,0.0205997,0.0152316,0.0118155,0.0118155,0.018485,0.0205997))
short_range_distances = Custom(bounds=(0.5,2.),
function=lambda x: np.interp(x,distances,frequencies,left=0.,right=0.),
max_function=0.13)

View file

@ -59,8 +59,7 @@ def test_ventilation_slidingwindow(baseline_form: model_generator.FormData):
assert isinstance(baseline_window, models.SlidingWindow)
window = models.SlidingWindow(
active=models.PeriodicInterval(period=120, duration=10, start=minutes_since_midnight(9 * 60)),
inside_temp=models.PiecewiseConstant((0, 24), (293,)),
active=models.PeriodicInterval(period=120, duration=10, start=9),
outside_temp=baseline_window.outside_temp,
window_height=1.6, opening_length=0.6,
)
@ -91,8 +90,7 @@ def test_ventilation_hingedwindow(baseline_form: model_generator.FormData):
assert isinstance(baseline_window, models.HingedWindow)
window = models.HingedWindow(
active=models.PeriodicInterval(period=120, duration=10, start=minutes_since_midnight(9 * 60)),
inside_temp=models.PiecewiseConstant((0, 24), (293,)),
active=models.PeriodicInterval(period=120, duration=10, start=9),
outside_temp=baseline_window.outside_temp,
window_height=1.6, window_width=1., opening_length=0.6,
)
@ -106,7 +104,7 @@ def test_ventilation_hingedwindow(baseline_form: model_generator.FormData):
def test_ventilation_mechanical(baseline_form: model_generator.FormData):
room = models.Room(75)
room = models.Room(volume=75, inside_temp=models.PiecewiseConstant((0, 24), (293,)))
mech = models.HVACMechanical(
active=models.PeriodicInterval(period=120, duration=120),
q_air_mech=500.,
@ -121,7 +119,7 @@ def test_ventilation_mechanical(baseline_form: model_generator.FormData):
def test_ventilation_airchanges(baseline_form: model_generator.FormData):
room = models.Room(75)
room = models.Room(75, inside_temp=models.PiecewiseConstant((0, 24), (293,)))
airchange = models.AirChange(
active=models.PeriodicInterval(period=120, duration=120),
air_exch=3.,
@ -152,8 +150,7 @@ def test_ventilation_window_hepa(baseline_form: model_generator.FormData):
# Now build the equivalent ventilation instance directly, and compare.
window = models.SlidingWindow(
active=models.PeriodicInterval(period=120, duration=10, start=minutes_since_midnight(9 * 60)),
inside_temp=models.PiecewiseConstant((0, 24), (293,)),
active=models.PeriodicInterval(period=120, duration=10, start=9),
outside_temp=baseline_window.outside_temp,
window_height=1.6, opening_length=0.6,
)

View file

@ -0,0 +1,31 @@
import json
import tornado.testing
import cara.apps.calculator
from cara.apps.calculator import model_generator
_TIMEOUT = 40.
class TestCalculatorJsonResponse(tornado.testing.AsyncHTTPTestCase):
def setUp(self):
super().setUp()
self.http_client.defaults['request_timeout'] = _TIMEOUT
def get_app(self):
return cara.apps.calculator.make_app()
@tornado.testing.gen_test(timeout=_TIMEOUT)
def test_json_response(self):
response = yield self.http_client.fetch(
request=self.get_url("/calculator/report-json"),
method="POST",
headers={'content-type': 'application/json'},
body=json.dumps(model_generator.baseline_raw_form_data())
)
self.assertEqual(response.code, 200)
data = json.loads(response.body)
self.assertIsInstance(data['prob_inf'], float)
self.assertIsInstance(data['expected_new_cases'], float)

View file

@ -2,6 +2,7 @@ from pathlib import Path
import pytest
import tornado.testing
from retry import retry
import cara.apps.calculator
from cara.apps.calculator.report_generator import generate_permalink
@ -42,6 +43,7 @@ async def test_404(http_server_client):
assert resp.code == 404
@retry()
class TestBasicApp(tornado.testing.AsyncHTTPTestCase):
def get_app(self):
return cara.apps.calculator.make_app()
@ -70,6 +72,7 @@ class TestBasicApp(tornado.testing.AsyncHTTPTestCase):
assert 'expected number of new cases is' in response.body.decode()
@retry()
class TestCernApp(tornado.testing.AsyncHTTPTestCase):
def get_app(self):
cern_theme = Path(cara.apps.calculator.__file__).parent.parent / 'themes' / 'cern'
@ -82,6 +85,7 @@ class TestCernApp(tornado.testing.AsyncHTTPTestCase):
assert 'expected number of new cases is' in response.body.decode()
retry()
class TestOpenApp(tornado.testing.AsyncHTTPTestCase):
def get_app(self):
return cara.apps.calculator.make_app(calculator_prefix="/mycalc")

View file

@ -8,7 +8,7 @@ import pytest
@pytest.fixture
def baseline_concentration_model():
model = models.ConcentrationModel(
room=models.Room(volume=75),
room=models.Room(volume=75, inside_temp=models.PiecewiseConstant((0., 24.), (293,))),
ventilation=models.AirChange(
active=models.SpecificInterval(((0., 24.), )),
air_exch=30.,
@ -55,7 +55,6 @@ def exposure_model_w_outside_temp_changes(baseline_exposure_model: models.Exposu
baseline_exposure_model, {
'concentration_model.ventilation': models.SlidingWindow(
active=models.PeriodicInterval(2.2 * 60, 1.8 * 60),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=cara.data.GenevaTemperatures['Jan'],
window_height=1.6,
opening_length=0.6,

View file

@ -26,7 +26,7 @@ def test_concentration_model_vectorisation(override_params):
always = models.PeriodicInterval(240, 240) # TODO: This should be a thing on an interval.
c_model = models.ConcentrationModel(
models.Room(defaults['volume'], defaults['humidity']),
models.Room(defaults['volume'], models.PiecewiseConstant((0., 24.), (293,)), defaults['humidity']),
models.AirChange(always, defaults['air_change']),
models.InfectedPopulation(
number=1,
@ -59,7 +59,7 @@ def test_concentration_model_vectorisation(override_params):
def simple_conc_model():
interesting_times = models.SpecificInterval(([0.5, 1.], [1.1, 2], [2., 3.]), )
return models.ConcentrationModel(
models.Room(75),
models.Room(75, models.PiecewiseConstant((0., 24.), (293,))),
models.AirChange(interesting_times, 100),
models.InfectedPopulation(
number=1,

View file

@ -152,7 +152,7 @@ def conc_model():
)
always = models.SpecificInterval(((0., 24.), ))
return models.ConcentrationModel(
models.Room(25),
models.Room(25, models.PiecewiseConstant((0., 24.), (293,))),
models.AirChange(always, 5),
models.EmittingPopulation(
number=1,
@ -179,12 +179,12 @@ def sr_model():
@pytest.mark.parametrize(
["exposed_time_interval", "expected_deposited_exposure"],
[
[(0., 1.), 45.6008710],
[(1., 1.01), 0.5280401],
[(1.01, 1.02), 0.51314096385],
[(12., 12.01), 0.016255813185],
[(12., 24.), 645.63619275],
[(0., 24.), 700.7322474],
[(0., 1.), 42.63222033436878],
[(1., 1.01), 0.485377549596179],
[(1.01, 1.02), 0.47058239520823814],
[(12., 12.01), 0.01622776617499709],
[(12., 24.), 595.1115223695439],
[(0., 24.), 645.8401125684933],
]
)
def test_exposure_model_integral_accuracy(exposed_time_interval,

View file

@ -6,10 +6,13 @@ import pytest
from cara import models
import cara.monte_carlo as mc_models
from cara.apps.calculator.model_generator import build_expiration
from cara.monte_carlo.data import short_range_expiration_distributions, short_range_distances, activity_distributions
from cara.monte_carlo.data import short_range_expiration_distributions,\
expiration_distributions, short_range_distances, activity_distributions
# TODO: seed better the random number generators
np.random.seed(2000)
SAMPLE_SIZE = 250_000
@pytest.fixture
def concentration_model() -> mc_models.ConcentrationModel:
@ -41,11 +44,12 @@ def short_range_model():
def test_short_range_model_ndarray(concentration_model, short_range_model):
concentration_model = concentration_model.build_model(250_000)
model = short_range_model.build_model(250_000)
concentration_model = concentration_model.build_model(SAMPLE_SIZE)
model = short_range_model.build_model(SAMPLE_SIZE)
assert isinstance(model._normed_concentration(concentration_model, 10.75), np.ndarray)
assert isinstance(model.short_range_concentration(concentration_model, 10.75), np.ndarray)
assert isinstance(model.normed_exposure_between_bounds(concentration_model, 10.75, 10.85), np.ndarray)
assert isinstance(model._normed_jet_exposure_between_bounds(concentration_model, 10.75, 10.85), np.ndarray)
assert isinstance(model._normed_interpolated_longrange_exposure_between_bounds(concentration_model, 10.75, 10.85), np.ndarray)
assert isinstance(model.short_range_concentration(concentration_model, 14.0), float)
@ -59,47 +63,95 @@ def test_short_range_model_ndarray(concentration_model, short_range_model):
]
)
def test_dilution_factor(activity, expected_dilution):
model = models.ShortRangeModel(expiration="Breathing",
model = mc_models.ShortRangeModel(expiration=short_range_expiration_distributions['Breathing'],
activity=models.Activity.types[activity],
presence=models.SpecificInterval(present_times=((10.5, 11.0),)),
distance=0.854)
distance=0.854).build_model(SAMPLE_SIZE)
assert isinstance(model.dilution_factor(), np.ndarray)
np.testing.assert_almost_equal(
model.dilution_factor(), expected_dilution, decimal=10
)
def test_extract_between_bounds_raise_on_wrong_order(short_range_model):
model = short_range_model.build_model(1)
with pytest.raises(ValueError, match='time1 must be less or equal to time2'):
model.extract_between_bounds(11.,10.)
@pytest.mark.parametrize(
"time1, time2, expected_start, expected_stop", [
[10., 12., 10.5, 11.],
[10., 10.7, 10.5, 10.7],
[10., 10.45, 0., 0.],
[11.01, 11.5, 0., 0.],
[10.8, 10.9, 10.8, 10.9],
[10.8, 11.5, 10.8, 11.],
[10.5, 11., 10.5, 11.],
]
)
def test_extract_between_bounds(short_range_model, time1, time2,
expected_start, expected_stop):
model = short_range_model.build_model(1)
np.testing.assert_equal(
model.extract_between_bounds(time1, time2),
(expected_start, expected_stop),
)
@pytest.mark.parametrize(
"time, expected_short_range_concentration", [
[8.5, 0.],
[10.5, 15.24806213],
[10.6, 15.24806213],
[11.0, 15.24806213],
[10.5, 5.401601371244907],
[10.6, 5.401601371244907],
[11.0, 5.401601371244907],
[12.0, 0.],
]
)
def test_short_range_concentration(time, expected_short_range_concentration, concentration_model, short_range_model):
concentration_model = concentration_model.build_model(250_000)
model = short_range_model.build_model(250_000)
def test_short_range_concentration(time, expected_short_range_concentration,
concentration_model, short_range_model):
concentration_model = concentration_model.build_model(SAMPLE_SIZE)
model = short_range_model.build_model(SAMPLE_SIZE)
np.testing.assert_allclose(
np.array(model.short_range_concentration(concentration_model, time)).mean(),
expected_short_range_concentration, rtol=0.01
expected_short_range_concentration, rtol=0.02
)
@pytest.mark.parametrize(
"start, stop, expected_exposure", [
[8.5, 12.5, 7.875963317294013e-09],
[10.5, 11.0, 7.875963317294013e-09],
[10.4, 11.1, 7.875963317294013e-09],
[10.5, 11.1, 7.875963317294013e-09],
[10.6, 11.1, 7.66539809488759e-09],
[10.4, 10.9, 7.66539809488759e-09],
]
)
def test_normed_exposure_between_bounds(start, stop, expected_exposure, concentration_model, short_range_model):
concentration_model = concentration_model.build_model(250_000)
model = short_range_model.build_model(250_000)
np.testing.assert_almost_equal(
model.normed_exposure_between_bounds(concentration_model, start, stop).mean(), expected_exposure
def test_short_range_exposure_with_ndarray_mask():
c_model = mc_models.ConcentrationModel(
room=models.Room(volume=50, humidity=0.3),
ventilation=models.AirChange(active=models.PeriodicInterval(period=120, duration=120),
air_exch=10_000_000,),
infected=mc_models.InfectedPopulation(
number=1,
presence=models.SpecificInterval(present_times=((8.5, 12.5), (13.5, 17.5))),
virus=models.Virus.types['SARS_CoV_2_DELTA'],
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Seated'],
expiration=expiration_distributions['Breathing'],
host_immunity=0.,
),
evaporation_factor=0.3,
)
sr_model = mc_models.ShortRangeModel(expiration=short_range_expiration_distributions['Shouting'],
activity=models.Activity.types['Heavy exercise'],
presence=models.SpecificInterval(present_times=((10.5, 11.0),)),
distance=0.854)
e_model = mc_models.ExposureModel(
concentration_model = c_model,
short_range = (sr_model,),
exposed = mc_models.Population(
number=1,
presence=models.SpecificInterval(present_times=((8.5, 12.5), (13.5, 17.5))),
mask=models.Mask(η_inhale=np.array([0., 0.3, 0.5])),
activity=models.Activity.types['Light activity'],
host_immunity=0.,
),
).build_model(SAMPLE_SIZE)
assert isinstance(e_model.deposited_exposure(), np.ndarray)
assert len(e_model.deposited_exposure()) == 3
np.testing.assert_allclose(e_model.deposited_exposure(),
e_model.deposited_exposure()[0]*np.array([1., 0.7, 0.5]),
rtol=1e-8)

View file

@ -0,0 +1,24 @@
import numpy as np
import numpy.testing as npt
import pytest
from cara import models
@pytest.mark.parametrize(
"inside_temp, humidity, expected_halflife, expected_decay_constant",
[
[293.15, 0.5, 0.5947447349860315, 1.1654532436949188],
[272.15, 0.7, 1.6070844193207476, 0.4313072619127947],
[300.15, 1., 0.17367078830147223, 3.9911558376571805],
[300.15, 0., 6.43, 0.10779893943389507],
[np.array([272.15, 300.15]), np.array([0.7, 0.]),
np.array([1.60708442, 6.43]), np.array([0.43130726, 0.10779894])],
[np.array([293.15, 300.15]), np.array([0.5, 1.]),
np.array([0.59474473, 0.17367079]), np.array([1.16545324, 3.99115584])]
],
)
def test_decay_constant(inside_temp, humidity, expected_halflife, expected_decay_constant):
npt.assert_almost_equal(models.Virus.types['SARS_CoV_2'].halflife(humidity, inside_temp),
expected_halflife)
npt.assert_almost_equal(models.Virus.types['SARS_CoV_2'].decay_constant(humidity, inside_temp),
expected_decay_constant)

View file

@ -19,7 +19,7 @@ from cara.monte_carlo.data import (expiration_distributions,
# TODO: seed better the random number generators
np.random.seed(2000)
SAMPLE_SIZE = 1_000_000
TOLERANCE = 0.02
TOLERANCE = 0.04
sqrt2pi = np.sqrt(2.*np.pi)
sqrt2 = np.sqrt(2.)
@ -84,8 +84,13 @@ class SimpleConcentrationModel:
"""
removal rate lambda in h^-1, excluding the deposition rate.
"""
return (self.lambda_ventilation
+ ln2/(6.43 if self.humidity<=0.4 else 1.1) )
hl_calc = ((ln2/((0.16030 + 0.04018*(((293-273.15)-20.615)/10.585)
+0.02176*(((self.humidity*100)-45.235)/28.665)
-0.14369
-0.02636*((293-273.15)-20.615)/10.585)))/60)
return (self.lambda_ventilation
+ ln2/(np.where(hl_calc <= 0, 6.43, np.minimum(6.43, hl_calc))))
@method_cache
def deposition_removal_coefficient(self) -> float:
@ -244,7 +249,6 @@ class SimpleShortRangeModel:
return dilution
@method_cache
def jet_concentration(self,conc_model: SimpleConcentrationModel) -> _VectorisedFloat:
"""
virion concentration at the origin of the jet (close to
@ -267,13 +271,13 @@ class SimpleShortRangeModel:
def concentration(self, conc_model: SimpleConcentrationModel, time: float) -> _VectorisedFloat:
"""
compute the short-range part of the concentration, and add it
to the background concentration
to the long-range concentration
"""
if self.interaction_interval.triggered(time):
background_concentration = conc_model.concentration(time)
lr_concentration = conc_model.concentration(time)
S = self.dilution_factor()
return (self.jet_concentration(conc_model)
- background_concentration) / S
- lr_concentration) / S
else:
return 0.
@ -352,8 +356,17 @@ class SimpleExposureModel(SimpleConcentrationModel):
epsabs=0.,limit=500)[0]
* self.viral_load * self.breathing_rate)
def total_concentration(self, t: float):
"""
total concentration at time t
"""
res = self.concentration(t)
for sr_mod in self.sr_models:
res += sr_mod.concentration(self,t)
return res
@method_cache
def integrated_background_concentration(self,t1: float,t2: float,
def integrated_longrange_concentration(self,t1: float,t2: float,
evaporation: float) -> _VectorisedFloat:
"""
background (long-range) concentration integrated from t1 to t2
@ -417,7 +430,7 @@ class SimpleExposureModel(SimpleConcentrationModel):
epsabs=0.,limit=500)[0]
* self.viral_load * 1e-6 * (t2-t1) )
result += sr_model.breathing_rate * (
res-self.integrated_background_concentration(t1,t2,evaporation)
res-self.integrated_longrange_concentration(t1,t2,evaporation)
)/sr_model.dilution_factor()
return result
@ -429,7 +442,7 @@ class SimpleExposureModel(SimpleConcentrationModel):
"""
result = 0.
for t1,t2 in self.infected_presence.boundaries():
result += (self.integrated_background_concentration(t1,t2,self.evaporation)
result += (self.integrated_longrange_concentration(t1,t2,self.evaporation)
* self.breathing_rate)
result += self.integrated_shortrange_concentration()
@ -453,7 +466,7 @@ interaction_intervals = (models.SpecificInterval(present_times=((10.5, 11.0),)),
@pytest.fixture
def c_model() -> mc.ConcentrationModel:
return mc.ConcentrationModel(
room=models.Room(volume=50, humidity=0.3),
room=models.Room(volume=50, inside_temp=models.PiecewiseConstant((0., 24.), (293,)), humidity=0.3),
ventilation=models.AirChange(active=models.PeriodicInterval(period=120, duration=120), air_exch=1.),
infected=mc.InfectedPopulation(
number=1,
@ -468,18 +481,37 @@ def c_model() -> mc.ConcentrationModel:
).build_model(SAMPLE_SIZE)
@pytest.fixture
def c_model_distr() -> mc.ConcentrationModel:
return mc.ConcentrationModel(
room=models.Room(volume=50, humidity=0.3),
ventilation=models.AirChange(active=models.PeriodicInterval(
period=120, duration=120), air_exch=1.),
infected=mc.InfectedPopulation(
number=1,
presence=presence,
virus=virus_distributions['SARS_CoV_2_DELTA'],
mask=models.Mask.types['No mask'],
activity=activity_distributions['Seated'],
expiration=expiration_distributions['Breathing'],
host_immunity=0.,
),
evaporation_factor=0.3,
).build_model(SAMPLE_SIZE)
@pytest.fixture
def sr_models() -> typing.Tuple[mc.ShortRangeModel, ...]:
return (
mc.ShortRangeModel(
expiration = short_range_expiration_distributions['Breathing'],
expiration = short_range_expiration_distributions['Speaking'],
activity = models.Activity.types['Seated'],
presence = interaction_intervals[0],
distance = 0.854,
).build_model(SAMPLE_SIZE),
mc.ShortRangeModel(
expiration = short_range_expiration_distributions['Speaking'],
activity = models.Activity.types['Seated'],
expiration = short_range_expiration_distributions['Breathing'],
activity = models.Activity.types['Heavy exercise'],
presence = interaction_intervals[1],
distance = 0.854,
).build_model(SAMPLE_SIZE),
@ -505,21 +537,118 @@ def simple_sr_models() -> typing.Tuple[SimpleShortRangeModel, ...]:
interaction_interval = interaction_intervals[0],
distance = 0.854,
breathing_rate = models.Activity.types['Seated'].exhalation_rate,
BLO_factors = expiration_BLO_factors['Breathing'],
BLO_factors = expiration_BLO_factors['Speaking'],
),
SimpleShortRangeModel(
interaction_interval = interaction_intervals[1],
distance = 0.854,
breathing_rate = models.Activity.types['Seated'].exhalation_rate,
BLO_factors = expiration_BLO_factors['Speaking'],
)
breathing_rate = models.Activity.types['Heavy exercise'].exhalation_rate,
BLO_factors = expiration_BLO_factors['Breathing'],
),
)
@pytest.fixture
def expo_sr_model(c_model,sr_models) -> mc.ExposureModel:
return mc.ExposureModel(
concentration_model=c_model,
short_range=sr_models,
exposed=mc.Population(
number=1,
presence=presence,
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Seated'],
host_immunity=0.,
),
).build_model(SAMPLE_SIZE)
@pytest.fixture
def simple_expo_sr_model(simple_sr_models) -> SimpleExposureModel:
return SimpleExposureModel(
infected_presence = presence,
viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum,
breathing_rate = models.Activity.types['Seated'].exhalation_rate,
room_volume = 50.,
lambda_ventilation= 1.,
BLO_factors = expiration_BLO_factors['Breathing'],
finf = models.Virus.types['SARS_CoV_2_DELTA'].viable_to_RNA_ratio,
HI = 0.,
ID50 = models.Virus.types['SARS_CoV_2_DELTA'].infectious_dose,
transmissibility = models.Virus.types['SARS_CoV_2_DELTA'].transmissibility_factor,
sr_models = simple_sr_models,
)
@pytest.fixture
def expo_sr_model_distr(c_model_distr) -> mc.ExposureModel:
return mc.ExposureModel(
concentration_model=c_model_distr,
short_range=(
mc.ShortRangeModel(
expiration = short_range_expiration_distributions['Breathing'],
activity = activity_distributions['Seated'],
presence = interaction_intervals[0],
distance = short_range_distances,
).build_model(SAMPLE_SIZE),
mc.ShortRangeModel(
expiration = short_range_expiration_distributions['Speaking'],
activity = activity_distributions['Seated'],
presence = interaction_intervals[1],
distance = short_range_distances,
).build_model(SAMPLE_SIZE),
),
exposed=mc.Population(
number=1,
presence=presence,
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Seated'],
host_immunity=0.,
),
).build_model(SAMPLE_SIZE)
@pytest.fixture
def simple_expo_sr_model_distr(c_model_distr) -> SimpleExposureModel:
return SimpleExposureModel(
infected_presence = presence,
viral_load = virus_distributions['SARS_CoV_2_DELTA'
].build_model(SAMPLE_SIZE).viral_load_in_sputum,
breathing_rate = activity_distributions['Seated'].build_model(
SAMPLE_SIZE).exhalation_rate,
room_volume = 50.,
lambda_ventilation= 1.,
BLO_factors = expiration_BLO_factors['Breathing'],
finf = virus_distributions['SARS_CoV_2_DELTA'
].build_model(SAMPLE_SIZE).viable_to_RNA_ratio,
HI = 0.,
ID50 = virus_distributions['SARS_CoV_2_DELTA'
].build_model(SAMPLE_SIZE).infectious_dose,
transmissibility = virus_distributions['SARS_CoV_2_DELTA'
].transmissibility_factor,
sr_models = (
SimpleShortRangeModel(
interaction_interval = interaction_intervals[0],
distance = short_range_distances.generate_samples(SAMPLE_SIZE),
breathing_rate = activity_distributions['Seated'].build_model(
SAMPLE_SIZE).exhalation_rate,
BLO_factors = expiration_BLO_factors['Breathing'],
),
SimpleShortRangeModel(
interaction_interval = interaction_intervals[1],
distance = short_range_distances.generate_samples(SAMPLE_SIZE),
breathing_rate = activity_distributions['Seated'].build_model(
SAMPLE_SIZE).exhalation_rate,
BLO_factors = expiration_BLO_factors['Speaking'],
)
),
)
@pytest.mark.parametrize(
"time", np.linspace(8.5,17.5,12),
)
def test_background_concentration(time,c_model,simple_c_model):
def test_longrange_concentration(time,c_model,simple_c_model):
npt.assert_allclose(
c_model.concentration(time).mean(),
simple_c_model.concentration(time), rtol=TOLERANCE
@ -542,7 +671,7 @@ def test_shortrange_concentration(time,c_model,simple_c_model,
)
def test_background_exposure(c_model):
def test_longrange_exposure(c_model):
simple_expo_model = SimpleExposureModel(
infected_presence = presence,
viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum,
@ -577,7 +706,27 @@ def test_background_exposure(c_model):
)
def test_background_exposure_with_distributions():
@pytest.mark.parametrize(
"time", [11., 12.5, 17.]
)
def test_longrange_concentration_with_distributions(c_model_distr,time):
simple_expo_model = SimpleConcentrationModel(
infected_presence = presence,
viral_load = virus_distributions['SARS_CoV_2_DELTA'
].build_model(SAMPLE_SIZE).viral_load_in_sputum,
breathing_rate = activity_distributions['Seated'].build_model(
SAMPLE_SIZE).exhalation_rate,
room_volume = 50.,
lambda_ventilation= 1.,
BLO_factors = expiration_BLO_factors['Breathing'],
)
npt.assert_allclose(
c_model_distr.concentration(time).mean(),
simple_expo_model.concentration(time).mean(), rtol=TOLERANCE
)
def test_longrange_exposure_with_distributions(c_model_distr):
simple_expo_model = SimpleExposureModel(
infected_presence = presence,
viral_load = virus_distributions['SARS_CoV_2_DELTA'
@ -597,21 +746,7 @@ def test_background_exposure_with_distributions():
sr_models = (),
)
expo_model = mc.ExposureModel(
concentration_model=mc.ConcentrationModel(
room=models.Room(volume=50, humidity=0.3),
ventilation=models.AirChange(active=models.PeriodicInterval(
period=120, duration=120), air_exch=1.),
infected=mc.InfectedPopulation(
number=1,
presence=presence,
virus=virus_distributions['SARS_CoV_2_DELTA'],
mask=models.Mask.types['No mask'],
activity=activity_distributions['Seated'],
expiration=expiration_distributions['Breathing'],
host_immunity=0.,
),
evaporation_factor=0.3,
),
concentration_model=c_model_distr,
short_range=(),
exposed=mc.Population(
number=1,
@ -631,31 +766,21 @@ def test_background_exposure_with_distributions():
)
def test_exposure_with_shortrange(c_model,sr_models,simple_sr_models):
simple_expo_sr_model = SimpleExposureModel(
infected_presence = presence,
viral_load = models.Virus.types['SARS_CoV_2_DELTA'].viral_load_in_sputum,
breathing_rate = models.Activity.types['Seated'].exhalation_rate,
room_volume = 50.,
lambda_ventilation= 1.,
BLO_factors = expiration_BLO_factors['Breathing'],
finf = models.Virus.types['SARS_CoV_2_DELTA'].viable_to_RNA_ratio,
HI = 0.,
ID50 = models.Virus.types['SARS_CoV_2_DELTA'].infectious_dose,
transmissibility = models.Virus.types['SARS_CoV_2_DELTA'].transmissibility_factor,
sr_models = simple_sr_models,
)
expo_sr_model = mc.ExposureModel(
concentration_model=c_model,
short_range=sr_models,
exposed=mc.Population(
number=1,
presence=presence,
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Seated'],
host_immunity=0.,
),
).build_model(SAMPLE_SIZE)
# tests on the concentration with short-range should be skipped until
# one finds a way to avoid the large variability of the concentration
# with short-range 'Speaking' or 'Shouting' interactions
@pytest.mark.skip
@pytest.mark.parametrize(
"time", [10.75, 14.75, 16.]
)
def test_concentration_with_shortrange(expo_sr_model,simple_expo_sr_model,time):
npt.assert_allclose(
expo_sr_model.concentration(time).mean(),
simple_expo_sr_model.total_concentration(time).mean(), rtol=TOLERANCE
)
def test_exposure_with_shortrange(expo_sr_model,simple_expo_sr_model):
npt.assert_allclose(
expo_sr_model.deposited_exposure().mean(),
simple_expo_sr_model.dose().mean(), rtol=TOLERANCE
@ -665,3 +790,29 @@ def test_exposure_with_shortrange(c_model,sr_models,simple_sr_models):
simple_expo_sr_model.probability_infection().mean(), rtol=TOLERANCE
)
@pytest.mark.skip
@pytest.mark.parametrize(
"time", [10.75, 14.75, 16.]
)
def test_concentration_with_shortrange_and_distributions(
expo_sr_model_distr,simple_expo_sr_model_distr,time):
npt.assert_allclose(
expo_sr_model_distr.concentration(time).mean(),
simple_expo_sr_model_distr.total_concentration(time).mean(),
rtol=TOLERANCE
)
def test_exposure_with_shortrange_and_distributions(expo_sr_model_distr,
simple_expo_sr_model_distr):
npt.assert_allclose(
expo_sr_model_distr.deposited_exposure().mean(),
simple_expo_sr_model_distr.dose().mean(), rtol=0.05
)
npt.assert_allclose(
expo_sr_model_distr.infection_probability().mean(),
simple_expo_sr_model_distr.probability_infection().mean(),
rtol=0.03
)

View file

@ -19,7 +19,6 @@ def test_no_mask_superspeading_emission_rate(baseline_concentration_model):
def baseline_periodic_window():
return models.SlidingWindow(
active=models.PeriodicInterval(period=120, duration=15),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=models.PiecewiseConstant((0., 24.), (283,)),
window_height=1.6, opening_length=0.6,
)
@ -27,7 +26,7 @@ def baseline_periodic_window():
@pytest.fixture
def baseline_room():
return models.Room(volume=75)
return models.Room(volume=75, inside_temp=models.PiecewiseConstant((0., 24.), (293,)))
@pytest.fixture
@ -44,7 +43,7 @@ def test_concentrations(baseline_concentration_model):
concentrations = [baseline_concentration_model.concentration(float(t)) for t in ts]
npt.assert_allclose(
concentrations,
[0.000000e+00, 20.805628, 6.602814e-13, 20.805628, 2.09545e-26],
[0.000000e+00, 2.046096e+01, 3.846725e-13, 2.046096e+01, 7.231966e-27],
rtol=1e-6
)
@ -95,7 +94,7 @@ def test_r0(baseline_exposure_model):
# expected r0 was computed with a trapezoidal integration, using
# a mesh of 100'000 pts per exposed presence interval.
r0 = baseline_exposure_model.reproduction_number()
npt.assert_allclose(r0, 776.941990)
npt.assert_allclose(r0, 771.380385)
def test_periodic_window(baseline_periodic_window, baseline_room):
@ -131,11 +130,10 @@ def test_periodic_hepa(baseline_periodic_hepa, baseline_room):
],
)
def test_multiple_ventilation_HEPA_window(baseline_periodic_hepa, time, expected_value):
room = models.Room(volume=68.)
room = models.Room(volume=68., inside_temp=models.PiecewiseConstant((0., 24.),(293.15,)))
tempOutside = models.PiecewiseConstant((0., 1., 2.5),(273.15, 283.15))
tempInside = models.PiecewiseConstant((0., 24.),(293.15,))
window = models.SlidingWindow(active=models.SpecificInterval([(1 / 60, 24.)]),
inside_temp=tempInside,outside_temp=tempOutside,
outside_temp=tempOutside,
window_height=1.,opening_length=0.6)
vent = models.MultipleVentilation([window, baseline_periodic_hepa])
npt.assert_allclose(vent.air_exchange(room,time), expected_value, rtol=1e-5)
@ -143,12 +141,12 @@ def test_multiple_ventilation_HEPA_window(baseline_periodic_hepa, time, expected
def test_multiple_ventilation_HEPA_window_transitions(baseline_periodic_hepa):
tempOutside = models.PiecewiseConstant((0., 1., 2.5),(273.15, 283.15))
tempInside = models.PiecewiseConstant((0., 24.),(293.15,))
room = models.Room(68, models.PiecewiseConstant((0., 24.),(293.15,)))
window = models.SlidingWindow(active=models.SpecificInterval([(1 / 60, 24.)]),
inside_temp=tempInside,outside_temp=tempOutside,
outside_temp=tempOutside,
window_height=1.,opening_length=0.6)
vent = models.MultipleVentilation([window, baseline_periodic_hepa])
assert set(vent.transition_times()) == set([0.0, 1/60, 0.25, 1.0, 2.0, 2.25,
assert set(vent.transition_times(room)) == set([0.0, 1/60, 0.25, 1.0, 2.0, 2.25,
2.5, 4.0, 4.25, 6.0, 6.25, 8.0, 8.25, 10.0, 10.25, 12.0, 12.25,
14.0, 14.25, 16.0, 16.25, 18.0, 18.25, 20.0, 20.25, 22.0, 22.25, 24.])
@ -188,14 +186,13 @@ def test_multiple_ventilation_HEPA_HVAC_AirChange(volume, expected_value):
)
def test_windowopening(time, expected_value):
tempOutside = models.PiecewiseConstant((0., 10., 24.),(273.15, 283.15))
tempInside = models.PiecewiseConstant((0., 24.), (293.15,))
w = models.SlidingWindow(
active=models.SpecificInterval([(0., 24.)]),
inside_temp=tempInside,outside_temp=tempOutside,
outside_temp=tempOutside,
window_height=1., opening_length=0.6,
)
npt.assert_allclose(
w.air_exchange(models.Room(volume=68), time), expected_value, rtol=1e-5
w.air_exchange(models.Room(volume=68, inside_temp=models.PiecewiseConstant((0., 24.), (293.15, ))), time), expected_value, rtol=1e-5
)
@ -223,10 +220,9 @@ def build_hourly_dependent_model(
outside_temp = temperatures[month]
model = models.ConcentrationModel(
room=models.Room(volume=75),
room=models.Room(volume=75, inside_temp=models.PiecewiseConstant((0., 24.), (293, ))),
ventilation=models.SlidingWindow(
active=models.SpecificInterval(intervals_open),
inside_temp=models.PiecewiseConstant((0., 24.), (293, )),
outside_temp=outside_temp,
window_height=1.6, opening_length=0.6,
),
@ -246,10 +242,9 @@ def build_hourly_dependent_model(
def build_constant_temp_model(outside_temp, intervals_open=((7.5, 8.5),)):
model = models.ConcentrationModel(
room=models.Room(volume=75),
room=models.Room(volume=75, inside_temp=models.PiecewiseConstant((0., 24.), (293,))),
ventilation=models.SlidingWindow(
active=models.SpecificInterval(intervals_open),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=models.PiecewiseConstant((0., 24.), (outside_temp,)),
window_height=1.6, opening_length=0.6,
),
@ -271,7 +266,6 @@ def build_hourly_dependent_model_multipleventilation(month, intervals_open=((7.5
vent = models.MultipleVentilation((
models.SlidingWindow(
active=models.SpecificInterval(intervals_open),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=data.GenevaTemperatures[month],
window_height=1.6, opening_length=0.6,
),
@ -281,7 +275,7 @@ def build_hourly_dependent_model_multipleventilation(month, intervals_open=((7.5
),
))
model = models.ConcentrationModel(
room=models.Room(volume=75),
room=models.Room(volume=75, inside_temp=models.PiecewiseConstant((0., 24.), (293,))),
ventilation=vent,
infected=models.EmittingPopulation(
number=1,
@ -387,8 +381,8 @@ def build_exposure_model(concentration_model, short_range_model):
@pytest.mark.parametrize(
"month, expected_deposited_exposure",
[
['Jan', 377.440565819],
['Jun', 1721.03336729],
['Jan', 359.140499],
['Jun', 1385.917562],
],
)
def test_exposure_hourly_dep(month,expected_deposited_exposure, baseline_sr_model):
@ -408,8 +402,8 @@ def test_exposure_hourly_dep(month,expected_deposited_exposure, baseline_sr_mode
@pytest.mark.parametrize(
"month, expected_deposited_exposure",
[
['Jan', 383.339206111],
['Jun', 1799.17597184],
['Jan', 359.983716],
['Jun', 1439.267381],
],
)
def test_exposure_hourly_dep_refined(month,expected_deposited_exposure, baseline_sr_model):

View file

@ -40,10 +40,10 @@ def test_type_annotations():
@pytest.fixture
def baseline_mc_concentration_model() -> cara.monte_carlo.ConcentrationModel:
mc_model = cara.monte_carlo.ConcentrationModel(
room=cara.monte_carlo.Room(volume=cara.monte_carlo.sampleable.Normal(75, 20)),
room=cara.monte_carlo.Room(volume=cara.monte_carlo.sampleable.Normal(75, 20),
inside_temp=cara.models.PiecewiseConstant((0., 24.), (293,))),
ventilation=cara.monte_carlo.SlidingWindow(
active=cara.models.PeriodicInterval(period=120, duration=120),
inside_temp=cara.models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=cara.models.PiecewiseConstant((0., 24.), (283,)),
window_height=1.6, opening_length=0.6,
),

View file

@ -1,6 +1,7 @@
import numpy as np
import numpy.testing as npt
import pytest
from retry import retry
import cara.monte_carlo as mc
from cara import models,data
@ -9,8 +10,8 @@ from cara.apps.calculator.model_generator import build_expiration
# TODO: seed better the random number generators
np.random.seed(2000)
SAMPLE_SIZE = 600_000
TOLERANCE = 0.06
SAMPLE_SIZE = 500_000
TOLERANCE = 0.05
# Load the weather data (temperature in kelvin) for Toronto.
toronto_coordinates = (43.667, 79.400)
@ -45,12 +46,11 @@ def shared_office_mc():
Corresponds to the 1st line of Table 4 in https://doi.org/10.1101/2021.10.14.21264988
"""
concentration_mc = mc.ConcentrationModel(
room=models.Room(volume=50, humidity=0.5),
room=models.Room(volume=50, inside_temp=models.PiecewiseConstant((0., 24.), (298,)), humidity=0.5),
ventilation=models.MultipleVentilation(
ventilations=(
models.SlidingWindow(
active=models.PeriodicInterval(period=120, duration=120),
inside_temp=models.PiecewiseConstant((0., 24.), (298,)),
outside_temp=data.GenevaTemperatures['Jun'],
window_height=1.6,
opening_length=0.2,
@ -88,12 +88,11 @@ def classroom_mc():
Corresponds to the 2nd line of Table 4 in https://doi.org/10.1101/2021.10.14.21264988
"""
concentration_mc = mc.ConcentrationModel(
room=models.Room(volume=160, humidity=0.3),
room=models.Room(volume=160, inside_temp=models.PiecewiseConstant((0., 24.), (293,)), humidity=0.3),
ventilation=models.MultipleVentilation(
ventilations=(
models.SlidingWindow(
active=models.PeriodicInterval(period=120, duration=120),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=TorontoTemperatures['Dec'],
window_height=1.6,
opening_length=0.2,
@ -309,16 +308,17 @@ def waiting_room_mc():
)
@retry()
@pytest.mark.parametrize(
"mc_model, expected_pi, expected_new_cases, expected_dose, expected_ER",
[
["shared_office_mc", 6.03, 0.18, 3.198, 809],
["classroom_mc", 9.5, 1.85, 9.478, 5624],
["ski_cabin_mc", 16.0, 0.5, 17.315, 7966],
["skagit_chorale_mc",65.7, 40.0, 102.213, 190422],
["bus_ride_mc", 12.0, 8.0, 7.65, 5419],
["gym_mc", 0.45, 0.13, 0.208, 1145],
["waiting_room_mc", 1.59, 0.22, 0.821, 737],
["shared_office_mc", 5.55, 0.17, 2.699, 809],
["classroom_mc", 9.58, 1.82, 9.034, 5624],
["ski_cabin_mc", 16.0, 0.47, 17.315, 7966],
["skagit_chorale_mc",61.01, 36.53, 84.730, 190422],
["bus_ride_mc", 10.59, 7.06, 6.65, 5419],
["gym_mc", 0.43, 0.12, 0.197, 1145],
["waiting_room_mc", 1.34, 0.18, 0.670, 737],
]
)
def test_report_models(mc_model, expected_pi, expected_new_cases,
@ -339,21 +339,20 @@ def test_report_models(mc_model, expected_pi, expected_new_cases,
@pytest.mark.parametrize(
"mask_type, month, expected_pi, expected_dose, expected_ER",
[
["No mask", "Jul", 9.52, 9.920, 809],
["Type I", "Jul", 1.7, 0.913, 149],
["FFP2", "Jul", 0.51, 0.239, 149],
["Type I", "Feb", 0.57, 0.272, 162],
["No mask", "Jul", 8.46, 8.113, 809],
["Type I", "Jul", 1.44, 0.727, 149],
["FFP2", "Jul", 0.43, 0.197, 149],
["Type I", "Feb", 0.54, 0.253, 149],
],
)
def test_small_shared_office_Geneva(mask_type, month, expected_pi,
expected_dose, expected_ER):
concentration_mc = mc.ConcentrationModel(
room=models.Room(volume=33, humidity=0.5),
room=models.Room(volume=33, inside_temp=models.PiecewiseConstant((0., 24.), (293,)), humidity=0.5),
ventilation=models.MultipleVentilation(
(
models.SlidingWindow(
active=models.SpecificInterval(((0., 24.),)),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=data.GenevaTemperatures[month],
window_height=1.5, opening_length=0.2,
),

View file

@ -11,7 +11,6 @@ from cara import models
def baseline_slidingwindow():
return models.SlidingWindow(
active=models.SpecificInterval(((0, 4), (5, 9))),
inside_temp=models.PiecewiseConstant((0, 24), (293,)),
outside_temp=models.PiecewiseConstant((0, 24), (283,)),
window_height=1.6, opening_length=0.6,
)
@ -21,14 +20,13 @@ def baseline_slidingwindow():
def baseline_hingedwindow():
return models.HingedWindow(
active=models.SpecificInterval(((0, 4), (5, 9))),
inside_temp=models.PiecewiseConstant((0, 24), (293,)),
outside_temp=models.PiecewiseConstant((0, 24), (283,)),
window_height=1.6, opening_length=0.6, window_width=1.,
)
def test_number_of_windows(baseline_slidingwindow):
room = models.Room(75)
room = models.Room(volume=75, inside_temp=models.PiecewiseConstant((0, 24), (293,)))
two_windows = dataclasses.replace(baseline_slidingwindow, number_of_windows=2)
one_window_exchange = baseline_slidingwindow.air_exchange(room, 1)
@ -63,9 +61,6 @@ def test_hinged_window(baseline_hingedwindow, window_width,
{'outside_temp': models.PiecewiseConstant(
(0, 2, 3), (np.array([20, 30, 28]), np.array([25, 30, 27]))
)},
{'inside_temp': models.PiecewiseConstant(
(0, 20), (np.array([20, 30, 25]), )
)},
]
)
def test_hinged_window_vectorisation(override_params):
@ -73,11 +68,10 @@ def test_hinged_window_vectorisation(override_params):
'window_height': 0.15,
'window_width': 0.15,
'opening_length': 0.15,
'inside_temp': models.PiecewiseConstant((0, 2, 3), (20, 25)),
'outside_temp': models.PiecewiseConstant((0, 2, 3), (10, 15)),
}
defaults.update(override_params)
room = models.Room(volume=75)
room = models.Room(volume=75, inside_temp=models.PiecewiseConstant((0, 2, 3), (20, 25)))
t = 0.5
window = models.HingedWindow(models.PeriodicInterval(60, 30), **defaults)
if {'window_height', 'opening_length', 'window_width'}.intersection(override_params):

View file

@ -64,6 +64,7 @@ python-dateutil==2.8.2
pyzmq==22.1.0
requests==2.26.0
requests-unixsocket==0.2.0
retry==0.9.2
scikit-learn==0.24.2
scipy==1.7.0
Send2Trash==1.7.1
@ -76,6 +77,7 @@ threadpoolctl==2.2.0
timezonefinder==5.2.0
tornado==6.1
traitlets==5.0.5
types-retry==0.9.7
urllib3==1.26.6
voila==0.2.10
wcwidth==0.2.5

View file

@ -30,10 +30,12 @@ REQUIREMENTS: dict = {
'numpy',
'psutil',
'python-dateutil',
'retry',
'scipy',
'sklearn',
'timezonefinder',
'tornado',
'types-retry',
'voila >=0.2.4',
],
'app': [],