Merge branch 'feature/CO2_fit' into 'master'

Fit ventilation and exhalation rates on CO2 sensor data

See merge request caimira/caimira!444
This commit is contained in:
Luis Aleixo 2023-11-22 15:43:00 +01:00
commit d70029d0ed
19 changed files with 1551 additions and 577 deletions

View file

@ -26,7 +26,7 @@ from tornado.httpclient import AsyncHTTPClient, HTTPRequest
import tornado.log
from . import markdown_tools
from . import model_generator
from . import model_generator, co2_model_generator
from .report_generator import ReportGenerator, calculate_report_data
from .user import AuthenticatedUser, AnonymousUser
@ -37,7 +37,7 @@ from .user import AuthenticatedUser, AnonymousUser
# calculator version. If the calculator needs to make breaking changes (e.g. change
# form attributes) then it can also increase its MAJOR version without needing to
# increase the overall CAiMIRA version (found at ``caimira.__version__``).
__version__ = "4.13.0"
__version__ = "4.14.0"
LOG = logging.getLogger(__name__)
@ -106,7 +106,7 @@ class ConcentrationModel(BaseRequestHandler):
start = datetime.datetime.now()
try:
form = model_generator.FormData.from_dict(requested_model_config)
form = model_generator.VirusFormData.from_dict(requested_model_config)
except Exception as err:
if self.settings.get("debug", False):
import traceback
@ -157,7 +157,7 @@ class ConcentrationModelJsonResponse(BaseRequestHandler):
pprint(requested_model_config)
try:
form = model_generator.FormData.from_dict(requested_model_config)
form = model_generator.VirusFormData.from_dict(requested_model_config)
except Exception as err:
if self.settings.get("debug", False):
import traceback
@ -178,7 +178,7 @@ class ConcentrationModelJsonResponse(BaseRequestHandler):
class StaticModel(BaseRequestHandler):
async def get(self) -> None:
form = model_generator.FormData.from_dict(model_generator.baseline_raw_form_data())
form = model_generator.VirusFormData.from_dict(model_generator.baseline_raw_form_data())
base_url = self.request.protocol + "://" + self.request.host
report_generator: ReportGenerator = self.settings['report_generator']
executor = loky.get_reusable_executor(max_workers=self.settings['handler_worker_pool_size'])
@ -340,7 +340,53 @@ class GenericExtraPage(BaseRequestHandler):
active_page=self.active_page,
text_blocks=template_environment.globals["common_text"]
))
class CO2ModelResponse(BaseRequestHandler):
def check_xsrf_cookie(self):
"""
This request handler implements a stateless API that returns report data in JSON format.
Thus, XSRF cookies are disabled by overriding base class implementation of this method with a pass statement.
"""
pass
async def post(self, endpoint: str) -> None:
requested_model_config = tornado.escape.json_decode(self.request.body)
try:
form = co2_model_generator.CO2FormData.from_dict(requested_model_config)
except Exception as err:
if self.settings.get("debug", False):
import traceback
print(traceback.format_exc())
response_json = {'code': 400, 'error': f'Your request was invalid {html.escape(str(err))}'}
self.set_status(400)
self.finish(json.dumps(response_json))
return
if endpoint.rstrip('/') == 'plot':
transition_times = co2_model_generator.CO2FormData.find_change_points_with_pelt(form.CO2_data)
self.finish({'CO2_plot': co2_model_generator.CO2FormData.generate_ventilation_plot(form.CO2_data, transition_times),
'transition_times': [round(el, 2) for el in transition_times]})
else:
executor = loky.get_reusable_executor(
max_workers=self.settings['handler_worker_pool_size'],
timeout=300,
)
report_task = executor.submit(
co2_model_generator.CO2FormData.build_model, form,
)
report = await asyncio.wrap_future(report_task)
result = dict(report.CO2_fit_params())
ventilation_transition_times = report.ventilation_transition_times
result['fitting_ventilation_type'] = form.fitting_ventilation_type
result['transition_times'] = ventilation_transition_times
result['CO2_plot'] = co2_model_generator.CO2FormData.generate_ventilation_plot(CO2_data=form.CO2_data,
transition_times=ventilation_transition_times[:-1],
predictive_CO2=result['predictive_CO2'])
self.finish(result)
def get_url(app_root: str, relative_path: str = '/'):
return app_root.rstrip('/') + relative_path.rstrip('/')
@ -363,6 +409,7 @@ def make_app(
base_urls: typing.List = [
(get_root_url(r'/?'), LandingPage),
(get_root_calculator_url(r'/?'), CalculatorForm),
(get_root_calculator_url(r'/co2-fit/(.*)'), CO2ModelResponse),
(get_root_calculator_url(r'/report'), ConcentrationModel),
(get_root_url(r'/static/(.*)'), StaticFileHandler, {'path': static_dir}),
(get_root_calculator_url(r'/static/(.*)'), StaticFileHandler, {'path': calculator_static_dir}),

View file

@ -0,0 +1,186 @@
import dataclasses
import logging
import typing
import numpy as np
import ruptures as rpt
import matplotlib.pyplot as plt
import re
from caimira import models
from .form_data import FormData, cast_class_fields
from .defaults import DEFAULT_MC_SAMPLE_SIZE, NO_DEFAULT
from .report_generator import img2base64, _figure2bytes
minutes_since_midnight = typing.NewType('minutes_since_midnight', int)
LOG = logging.getLogger(__name__)
@dataclasses.dataclass
class CO2FormData(FormData):
CO2_data: dict
fitting_ventilation_states: list
fitting_ventilation_type: str
#: The default values for undefined fields. Note that the defaults here
#: and the defaults in the html form must not be contradictory.
_DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = {
'CO2_data': '{}',
'exposed_coffee_break_option': 'coffee_break_0',
'exposed_coffee_duration': 5,
'exposed_finish': '17:30',
'exposed_lunch_finish': '13:30',
'exposed_lunch_option': True,
'exposed_lunch_start': '12:30',
'exposed_start': '08:30',
'fitting_ventilation_states': '[]',
'fitting_ventilation_type': 'fitting_natural_ventilation',
'infected_coffee_break_option': 'coffee_break_0',
'infected_coffee_duration': 5,
'infected_dont_have_breaks_with_exposed': False,
'infected_finish': '17:30',
'infected_lunch_finish': '13:30',
'infected_lunch_option': True,
'infected_lunch_start': '12:30',
'infected_people': 1,
'infected_start': '08:30',
'room_volume': NO_DEFAULT,
'specific_breaks': '{}',
'total_people': NO_DEFAULT,
}
def __init__(self, **kwargs):
# Set default values defined in CO2FormData
for key, value in self._DEFAULTS.items():
setattr(self, key, kwargs.get(key, value))
def validate(self):
# Validate population parameters
self.validate_population_parameters()
# Validate specific inputs - breaks (exposed and infected)
if self.specific_breaks != {}:
if type(self.specific_breaks) is not dict:
raise TypeError('The specific breaks should be in a dictionary.')
dict_keys = list(self.specific_breaks.keys())
if "exposed_breaks" not in dict_keys:
raise TypeError(f'Unable to fetch "exposed_breaks" key. Got "{dict_keys[0]}".')
if "infected_breaks" not in dict_keys:
raise TypeError(f'Unable to fetch "infected_breaks" key. Got "{dict_keys[1]}".')
for population_breaks in ['exposed_breaks', 'infected_breaks']:
if self.specific_breaks[population_breaks] != []:
if type(self.specific_breaks[population_breaks]) is not list:
raise TypeError(f'All breaks should be in a list. Got {type(self.specific_breaks[population_breaks])}.')
for input_break in self.specific_breaks[population_breaks]:
# Input validations.
if type(input_break) is not dict:
raise TypeError(f'Each break should be a dictionary. Got {type(input_break)}.')
dict_keys = list(input_break.keys())
if "start_time" not in input_break:
raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys[0]}".')
if "finish_time" not in input_break:
raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys[1]}".')
for time in input_break.values():
if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time):
raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".')
@classmethod
def find_change_points_with_pelt(self, CO2_data: dict):
"""
Perform change point detection using Pelt algorithm from ruptures library with pen=15.
Returns a list of tuples containing (index, X-axis value) for the detected significant changes.
"""
times: list = CO2_data['times']
CO2_values: list = CO2_data['CO2']
if len(times) != len(CO2_values):
raise ValueError("times and CO2 values must have the same length.")
# Convert the input list to a numpy array for use with the ruptures library
CO2_np = np.array(CO2_values)
# Define the model for change point detection (Radial Basis Function kernel)
model = "rbf"
# Fit the Pelt algorithm to the data with the specified model
algo = rpt.Pelt(model=model).fit(CO2_np)
# Predict change points using the Pelt algorithm with a penalty value of 15
result = algo.predict(pen=15)
# Find local minima and maxima
segments = np.split(np.arange(len(CO2_values)), result)
merged_segments = [np.hstack((segments[i], segments[i + 1])) for i in range(len(segments) - 1)]
result_set = set()
for segment in merged_segments[:-2]:
result_set.add(times[CO2_values.index(min(CO2_np[segment]))])
result_set.add(times[CO2_values.index(max(CO2_np[segment]))])
return list(result_set)
@classmethod
def generate_ventilation_plot(self, CO2_data: dict,
transition_times: typing.Optional[list] = None,
predictive_CO2: typing.Optional[list] = None):
times_values = CO2_data['times']
CO2_values = CO2_data['CO2']
fig = plt.figure(figsize=(7, 4), dpi=110)
plt.plot(times_values, CO2_values, label='Input CO₂')
if (transition_times):
for time in transition_times:
plt.axvline(x = time, color = 'grey', linewidth=0.5, linestyle='--')
if (predictive_CO2):
plt.plot(times_values, predictive_CO2, label='Predictive CO₂')
plt.xlabel('Time of day')
plt.ylabel('Concentration (ppm)')
plt.legend()
return img2base64(_figure2bytes(fig))
def population_present_changes(self, infected_presence: models.Interval, exposed_presence: models.Interval) -> typing.List[float]:
state_change_times = set(infected_presence.transition_times())
state_change_times.update(exposed_presence.transition_times())
return sorted(state_change_times)
def ventilation_transition_times(self) -> typing.Tuple[float, ...]:
# Check what type of ventilation is considered for the fitting
if self.fitting_ventilation_type == 'fitting_natural_ventilation':
vent_states = self.fitting_ventilation_states
vent_states.append(self.CO2_data['times'][-1])
return tuple(vent_states)
else:
return tuple((self.CO2_data['times'][0], self.CO2_data['times'][-1]))
def build_model(self, size=DEFAULT_MC_SAMPLE_SIZE) -> models.CO2DataModel: # type: ignore
# Build a simple infected and exposed population for the case when presence
# intervals and number of people are dynamic. Activity type is not needed.
infected_presence = self.infected_present_interval()
infected_population = models.SimplePopulation(
number=self.infected_people,
presence=infected_presence,
activity=None, # type: ignore
)
exposed_presence = self.exposed_present_interval()
exposed_population=models.SimplePopulation(
number=self.total_people - self.infected_people,
presence=exposed_presence,
activity=None, # type: ignore
)
all_state_changes=self.population_present_changes(infected_presence, exposed_presence)
total_people = [infected_population.people_present(stop) + exposed_population.people_present(stop)
for _, stop in zip(all_state_changes[:-1], all_state_changes[1:])]
return models.CO2DataModel(
room_volume=self.room_volume,
number=models.IntPiecewiseConstant(transition_times=tuple(all_state_changes), values=tuple(total_people)),
presence=None,
ventilation_transition_times=self.ventilation_transition_times(),
times=self.CO2_data['times'],
CO2_concentrations=self.CO2_data['CO2'],
)
cast_class_fields(CO2FormData)

View file

@ -22,6 +22,7 @@ DEFAULTS = {
'ceiling_height': 0.,
'conditional_probability_plot': False,
'conditional_probability_viral_loads': False,
'CO2_fitting_result': '{}',
'exposed_coffee_break_option': 'coffee_break_0',
'exposed_coffee_duration': 5,
'exposed_finish': '17:30',
@ -103,8 +104,8 @@ VACCINE_BOOSTER_TYPE = ['AZD1222_(AstraZeneca)', 'Ad26.COV2.S_(Janssen)', 'BNT16
VACCINE_TYPE = ['Ad26.COV2.S_(Janssen)', 'Any_mRNA_-_heterologous', 'AZD1222_(AstraZeneca)', 'AZD1222_(AstraZeneca)_and_any_mRNA_-_heterologous', 'AZD1222_(AstraZeneca)_and_BNT162b2_(Pfizer)',
'BBIBP-CorV_(Beijing_CNBG)', 'BNT162b2_(Pfizer)', 'BNT162b2_(Pfizer)_and_mRNA-1273_(Moderna)', 'CoronaVac_(Sinovac)', 'CoronaVac_(Sinovac)_and_AZD1222_(AstraZeneca)', 'Covishield',
'mRNA-1273_(Moderna)', 'Sputnik_V_(Gamaleya)', 'CoronaVac_(Sinovac)_and_BNT162b2_(Pfizer)']
VENTILATION_TYPES = {'natural_ventilation',
'mechanical_ventilation', 'no_ventilation'}
VENTILATION_TYPES = {'natural_ventilation', 'mechanical_ventilation',
'no_ventilation', 'from_fitting'}
VIRUS_TYPES: typing.List[str] = list(config.virus_distributions)
VOLUME_TYPES = {'room_volume_explicit', 'room_volume_from_dimensions'}
WINDOWS_OPENING_REGIMES = {'windows_open_permanently',

View file

@ -0,0 +1,440 @@
import dataclasses
import datetime
import html
import logging
import typing
import ast
import json
import numpy as np
from caimira import models
from .defaults import DEFAULTS, NO_DEFAULT, COFFEE_OPTIONS_INT, DEFAULT_MC_SAMPLE_SIZE
LOG = logging.getLogger(__name__)
minutes_since_midnight = typing.NewType('minutes_since_midnight', int)
@dataclasses.dataclass
class FormData:
specific_breaks: dict
exposed_coffee_break_option: str
exposed_coffee_duration: int
exposed_finish: minutes_since_midnight
exposed_lunch_finish: minutes_since_midnight
exposed_lunch_option: bool
exposed_lunch_start: minutes_since_midnight
exposed_start: minutes_since_midnight
infected_coffee_break_option: str #Used if infected_dont_have_breaks_with_exposed
infected_coffee_duration: int #Used if infected_dont_have_breaks_with_exposed
infected_dont_have_breaks_with_exposed: bool
infected_finish: minutes_since_midnight
infected_lunch_finish: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
infected_lunch_option: bool #Used if infected_dont_have_breaks_with_exposed
infected_lunch_start: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
infected_people: int
infected_start: minutes_since_midnight
room_volume: float
total_people: int
_DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS
@classmethod
def from_dict(cls, form_data: typing.Dict):
# Take a copy of the form data so that we can mutate it.
form_data = form_data.copy()
form_data.pop('_xsrf', None)
# Don't let arbitrary unescaped HTML through the net.
for key, value in form_data.items():
if isinstance(value, str):
form_data[key] = html.escape(value)
for key, default_value in cls._DEFAULTS.items():
if form_data.get(key, '') == '':
if default_value is NO_DEFAULT:
raise ValueError(f"{key} must be specified")
form_data[key] = default_value
for key, value in form_data.items():
if key in _CAST_RULES_FORM_ARG_TO_NATIVE:
form_data[key] = _CAST_RULES_FORM_ARG_TO_NATIVE[key](value)
if key not in cls._DEFAULTS:
raise ValueError(f'Invalid argument "{html.escape(key)}" given')
instance = cls(**form_data)
instance.validate()
return instance
@classmethod
def to_dict(cls, form: "FormData", strip_defaults: bool = False) -> dict:
form_dict = {
field.name: getattr(form, field.name)
for field in dataclasses.fields(form)
}
for attr, value in form_dict.items():
if attr in _CAST_RULES_NATIVE_TO_FORM_ARG:
form_dict[attr] = _CAST_RULES_NATIVE_TO_FORM_ARG[attr](value)
if strip_defaults:
del form_dict['calculator_version']
for attr, value in list(form_dict.items()):
default = cls._DEFAULTS.get(attr, NO_DEFAULT)
if default is not NO_DEFAULT and value in [default, 'not-applicable']:
form_dict.pop(attr)
return form_dict
def validate_population_parameters(self):
# Validate number of infected <= number of total people
if self.infected_people >= self.total_people:
raise ValueError('Number of infected people cannot be greater or equal to the number of total people.')
# Validate time intervals selected by user
time_intervals = [
['exposed_start', 'exposed_finish'],
['infected_start', 'infected_finish'],
]
if self.exposed_lunch_option:
time_intervals.append(['exposed_lunch_start', 'exposed_lunch_finish'])
if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option:
time_intervals.append(['infected_lunch_start', 'infected_lunch_finish'])
for start_name, end_name in time_intervals:
start = getattr(self, start_name)
end = getattr(self, end_name)
if start > end:
raise ValueError(
f"{start_name} must be less than {end_name}. Got {start} and {end}.")
def validate_lunch(start, finish):
lunch_start = getattr(self, f'{population}_lunch_start')
lunch_finish = getattr(self, f'{population}_lunch_finish')
return (start <= lunch_start <= finish and
start <= lunch_finish <= finish)
def get_lunch_mins(population):
lunch_mins = 0
if getattr(self, f'{population}_lunch_option'):
lunch_mins = getattr(self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start')
return lunch_mins
def get_coffee_mins(population):
coffee_mins = 0
if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0':
coffee_mins = COFFEE_OPTIONS_INT[getattr(self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration')
return coffee_mins
def get_activity_mins(population):
return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start')
populations = ['exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed']
for population in populations:
# Validate lunch time within the activity times.
if (getattr(self, f'{population}_lunch_option') and
not validate_lunch(getattr(self, f'{population}_start'), getattr(self, f'{population}_finish'))
):
raise ValueError(
f"{population} lunch break must be within presence times."
)
# Length of breaks < length of activity
if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population):
raise ValueError(
f"Length of breaks >= Length of {population} presence."
)
for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT),
('infected_coffee_break_option', COFFEE_OPTIONS_INT)]:
if getattr(self, attr_name) not in valid_set:
raise ValueError(f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
def validate(self):
raise NotImplementedError("Subclass must implement")
def build_model(self, sample_size=DEFAULT_MC_SAMPLE_SIZE):
raise NotImplementedError("Subclass must implement")
def _compute_breaks_in_interval(self, start, finish, n_breaks, duration) -> models.BoundarySequence_t:
break_delay = ((finish - start) - (n_breaks * duration)) // (n_breaks+1)
break_times = []
end = start
for n in range(n_breaks):
begin = end + break_delay
end = begin + duration
break_times.append((begin, end))
return tuple(break_times)
def exposed_lunch_break_times(self) -> models.BoundarySequence_t:
result = []
if self.exposed_lunch_option:
result.append((self.exposed_lunch_start, self.exposed_lunch_finish))
return tuple(result)
def infected_lunch_break_times(self) -> models.BoundarySequence_t:
if self.infected_dont_have_breaks_with_exposed:
result = []
if self.infected_lunch_option:
result.append((self.infected_lunch_start, self.infected_lunch_finish))
return tuple(result)
else:
return self.exposed_lunch_break_times()
def exposed_number_of_coffee_breaks(self) -> int:
return COFFEE_OPTIONS_INT[self.exposed_coffee_break_option]
def infected_number_of_coffee_breaks(self) -> int:
return COFFEE_OPTIONS_INT[self.infected_coffee_break_option]
def _coffee_break_times(self, activity_start, activity_finish, coffee_breaks, coffee_duration, lunch_start, lunch_finish) -> models.BoundarySequence_t:
time_before_lunch = lunch_start - activity_start
time_after_lunch = activity_finish - lunch_finish
before_lunch_frac = time_before_lunch / (time_before_lunch + time_after_lunch)
n_morning_breaks = round(coffee_breaks * before_lunch_frac)
breaks = (
self._compute_breaks_in_interval(
activity_start, lunch_start, n_morning_breaks, coffee_duration
)
+ self._compute_breaks_in_interval(
lunch_finish, activity_finish, coffee_breaks - n_morning_breaks, coffee_duration
)
)
return breaks
def exposed_coffee_break_times(self) -> models.BoundarySequence_t:
exposed_coffee_breaks = self.exposed_number_of_coffee_breaks()
if exposed_coffee_breaks == 0:
return ()
if self.exposed_lunch_option:
breaks = self._coffee_break_times(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration, self.exposed_lunch_start, self.exposed_lunch_finish)
else:
breaks = self._compute_breaks_in_interval(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration)
return breaks
def infected_coffee_break_times(self) -> models.BoundarySequence_t:
if self.infected_dont_have_breaks_with_exposed:
infected_coffee_breaks = self.infected_number_of_coffee_breaks()
if infected_coffee_breaks == 0:
return ()
if self.infected_lunch_option:
breaks = self._coffee_break_times(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration, self.infected_lunch_start, self.infected_lunch_finish)
else:
breaks = self._compute_breaks_in_interval(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration)
return breaks
else:
return self.exposed_coffee_break_times()
def generate_specific_break_times(self, population_breaks) -> models.BoundarySequence_t:
break_times = []
for n in population_breaks:
# Parse break times.
begin = time_string_to_minutes(n["start_time"])
end = time_string_to_minutes(n["finish_time"])
for time in [begin, end]:
# For a specific break, the infected and exposed presence is the same.
if not getattr(self, 'infected_start') < time < getattr(self, 'infected_finish'):
raise ValueError(f'All breaks should be within the simulation time. Got {time_minutes_to_string(time)}.')
break_times.append((begin, end))
return tuple(break_times)
def present_interval(
self,
start: int,
finish: int,
breaks: typing.Optional[models.BoundarySequence_t] = None,
) -> models.Interval:
"""
Calculate the presence interval given the start and end times (in minutes), and
a number of monotonic, non-overlapping, but potentially unsorted, breaks (also in minutes).
"""
if not breaks:
# If there are no breaks, the interval is the start and end.
return models.SpecificInterval(((start/60, finish/60),))
# Order the breaks by their start-time, and ensure that they are monotonic
# and that the start of one break happens after the end of another.
break_boundaries: models.BoundarySequence_t = tuple(sorted(breaks, key=lambda break_pair: break_pair[0]))
for break_start, break_end in break_boundaries:
if break_start >= break_end:
raise ValueError("Break ends before it begins.")
prev_break_end = break_boundaries[0][1]
for break_start, break_end in break_boundaries[1:]:
if prev_break_end >= break_start:
raise ValueError(f"A break starts before another ends ({break_start}, {break_end}, {prev_break_end}).")
prev_break_end = break_end
present_intervals = []
current_time = start
LOG.debug(f"starting time march at {_hours2timestring(current_time/60)} to {_hours2timestring(finish/60)}")
# As we step through the breaks. For each break there are 6 important cases
# we must cover. Let S=start; E=end; Bs=Break start; Be=Break end:
# 1. The interval is entirely before the break. S < E <= Bs < Be
# 2. The interval straddles the start of the break. S < Bs < E <= Be
# 3. The break is entirely inside the interval. S < Bs < Be <= E
# 4. The interval is entirely inside the break. Bs <= S < E <= Be
# 5. The interval straddles the end of the break. Bs <= S < Be <= E
# 6. The interval is entirely after the break. Bs < Be <= S < E
for current_break in break_boundaries:
if current_time >= finish:
break
LOG.debug(f"handling break {_hours2timestring(current_break[0]/60)}-{_hours2timestring(current_break[1]/60)} "
f" (current time: {_hours2timestring(current_time/60)})")
break_s, break_e = current_break
case1 = finish <= break_s
case2 = current_time < break_s < finish < break_e
case3 = current_time < break_s < break_e <= finish
case4 = break_s <= current_time < finish <= break_e
case5 = break_s <= current_time < break_e < finish
case6 = break_e <= current_time
if case1:
LOG.debug(f"case 1: interval entirely before break")
present_intervals.append((current_time / 60, finish / 60))
LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
f"- {_hours2timestring(present_intervals[-1][1])}")
current_time = finish
elif case2:
LOG.debug(f"case 2: interval straddles start of break")
present_intervals.append((current_time / 60, break_s / 60))
LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
f"- {_hours2timestring(present_intervals[-1][1])}")
current_time = break_e
elif case3:
LOG.debug(f"case 3: break entirely inside interval")
# We add the bit before the break, but not the bit afterwards,
# as it may hit another break.
present_intervals.append((current_time / 60, break_s / 60))
LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
f"- {_hours2timestring(present_intervals[-1][1])}")
current_time = break_e
elif case4:
LOG.debug(f"case 4: interval entirely inside break")
current_time = finish
elif case5:
LOG.debug(f"case 5: interval straddles end of break")
current_time = break_e
elif case6:
LOG.debug(f"case 6: interval entirely after the break")
if current_time < finish:
LOG.debug("trailing interval")
present_intervals.append((current_time / 60, finish / 60))
return models.SpecificInterval(tuple(present_intervals))
def infected_present_interval(self) -> models.Interval:
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(self.specific_breaks['infected_breaks'])
else:
breaks = self.infected_lunch_break_times() + self.infected_coffee_break_times()
return self.present_interval(
self.infected_start, self.infected_finish,
breaks=breaks,
)
def population_present_interval(self) -> models.Interval:
state_change_times = set(self.infected_present_interval().transition_times())
state_change_times.update(self.exposed_present_interval().transition_times())
all_state_changes = sorted(state_change_times)
return models.SpecificInterval(tuple(zip(all_state_changes[:-1], all_state_changes[1:])))
def exposed_present_interval(self) -> models.Interval:
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(self.specific_breaks['exposed_breaks'])
else:
breaks = self.exposed_lunch_break_times() + self.exposed_coffee_break_times()
return self.present_interval(
self.exposed_start, self.exposed_finish,
breaks=breaks,
)
def _hours2timestring(hours: float):
# Convert times like 14.5 to strings, like "14:30"
return f"{int(np.floor(hours)):02d}:{int(np.round((hours % 1) * 60)):02d}"
def time_string_to_minutes(time: str) -> minutes_since_midnight:
"""
Converts time from string-format to an integer number of minutes after 00:00
:param time: A string of the form "HH:MM" representing a time of day
:return: The number of minutes between 'time' and 00:00
"""
return minutes_since_midnight(60 * int(time[:2]) + int(time[3:]))
def time_minutes_to_string(time: int) -> str:
"""
Converts time from an integer number of minutes after 00:00 to string-format
:param time: The number of minutes between 'time' and 00:00
:return: A string of the form "HH:MM" representing a time of day
"""
return "{0:0=2d}".format(int(time/60)) + ":" + "{0:0=2d}".format(time%60)
def string_to_list(s: str) -> list:
return list(ast.literal_eval(s.replace("&quot;", "\"")))
def list_to_string(l: list) -> str:
return json.dumps(l)
def string_to_dict(s: str) -> dict:
return dict(ast.literal_eval(s.replace("&quot;", "\"")))
def dict_to_string(d: dict) -> str:
return json.dumps(d)
def _safe_int_cast(value) -> int:
if isinstance(value, int):
return value
elif isinstance(value, float) and int(value) == value:
return int(value)
elif isinstance(value, str) and value.isdecimal():
return int(value)
else:
raise TypeError(f"Unable to safely cast {value} ({type(value)} type) to int")
#: Mapping of field name to a callable which can convert values from form
#: input (URL encoded arguments / string) into the correct type.
_CAST_RULES_FORM_ARG_TO_NATIVE: typing.Dict[str, typing.Callable] = {}
#: Mapping of field name to callable which can convert native type to values
#: that can be encoded to URL arguments.
_CAST_RULES_NATIVE_TO_FORM_ARG: typing.Dict[str, typing.Callable] = {}
def cast_class_fields(cls):
for _field in dataclasses.fields(cls):
if _field.type is minutes_since_midnight:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = time_string_to_minutes
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = time_minutes_to_string
elif _field.type is int:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = _safe_int_cast
elif _field.type is float:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = float
elif _field.type is bool:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = lambda v: v == '1'
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = int
elif _field.type is list:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_list
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = list_to_string
elif _field.type is dict:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_dict
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = dict_to_string
cast_class_fields(FormData)

View file

@ -1,10 +1,7 @@
import dataclasses
import datetime
import html
import logging
import typing
import ast
import json
import re
import numpy as np
@ -14,9 +11,10 @@ from caimira import data
import caimira.data.weather
import caimira.monte_carlo as mc
from .. import calculator
from .form_data import FormData, cast_class_fields, time_string_to_minutes
from caimira.monte_carlo.data import activity_distributions, virus_distributions, mask_distributions, short_range_distances
from caimira.monte_carlo.data import expiration_distribution, expiration_BLO_factors, expiration_distributions, short_range_expiration_distributions
from .defaults import (NO_DEFAULT, DEFAULT_MC_SAMPLE_SIZE, DEFAULTS, ACTIVITIES, ACTIVITY_TYPES, COFFEE_OPTIONS_INT, CONFIDENCE_LEVEL_OPTIONS,
from .defaults import (DEFAULT_MC_SAMPLE_SIZE, DEFAULTS, ACTIVITIES, ACTIVITY_TYPES, CONFIDENCE_LEVEL_OPTIONS,
MECHANICAL_VENTILATION_TYPES, MASK_TYPES, MASK_WEARING_OPTIONS, MONTH_NAMES, VACCINE_BOOSTER_TYPE, VACCINE_TYPE,
VENTILATION_TYPES, VIRUS_TYPES, VOLUME_TYPES, WINDOWS_OPENING_REGIMES, WINDOWS_TYPES)
from caimira.store.configuration import config
@ -27,36 +25,20 @@ minutes_since_midnight = typing.NewType('minutes_since_midnight', int)
@dataclasses.dataclass
class FormData:
class VirusFormData(FormData):
activity_type: str
air_changes: float
air_supply: float
arve_sensors_option: bool
specific_breaks: dict
precise_activity: dict
ceiling_height: float
conditional_probability_plot: bool
conditional_probability_viral_loads: bool
exposed_coffee_break_option: str
exposed_coffee_duration: int
exposed_finish: minutes_since_midnight
exposed_lunch_finish: minutes_since_midnight
exposed_lunch_option: bool
exposed_lunch_start: minutes_since_midnight
exposed_start: minutes_since_midnight
CO2_fitting_result: dict
floor_area: float
hepa_amount: float
hepa_option: bool
humidity: str
infected_coffee_break_option: str #Used if infected_dont_have_breaks_with_exposed
infected_coffee_duration: int #Used if infected_dont_have_breaks_with_exposed
infected_dont_have_breaks_with_exposed: bool
infected_finish: minutes_since_midnight
infected_lunch_finish: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
infected_lunch_option: bool #Used if infected_dont_have_breaks_with_exposed
infected_lunch_start: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
infected_people: int
infected_start: minutes_since_midnight
inside_temp: float
location_name: str
location_latitude: float
@ -73,9 +55,7 @@ class FormData:
event_month: str
room_heating_option: bool
room_number: str
room_volume: float
simulation_name: str
total_people: int
vaccine_option: bool
vaccine_booster_option: bool
vaccine_type: str
@ -95,120 +75,12 @@ class FormData:
short_range_interactions: list
_DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS
@classmethod
def from_dict(cls, form_data: typing.Dict) -> "FormData":
# Take a copy of the form data so that we can mutate it.
form_data = form_data.copy()
form_data.pop('_xsrf', None)
# Don't let arbitrary unescaped HTML through the net.
for key, value in form_data.items():
if isinstance(value, str):
form_data[key] = html.escape(value)
for key, default_value in cls._DEFAULTS.items():
if form_data.get(key, '') == '':
if default_value is NO_DEFAULT:
raise ValueError(f"{key} must be specified")
form_data[key] = default_value
for key, value in form_data.items():
if key in _CAST_RULES_FORM_ARG_TO_NATIVE:
form_data[key] = _CAST_RULES_FORM_ARG_TO_NATIVE[key](value)
if key not in cls._DEFAULTS:
raise ValueError(f'Invalid argument "{html.escape(key)}" given')
instance = cls(**form_data)
instance.validate()
return instance
@classmethod
def to_dict(cls, form: "FormData", strip_defaults: bool = False) -> dict:
form_dict = {
field.name: getattr(form, field.name)
for field in dataclasses.fields(form)
}
for attr, value in form_dict.items():
if attr in _CAST_RULES_NATIVE_TO_FORM_ARG:
form_dict[attr] = _CAST_RULES_NATIVE_TO_FORM_ARG[attr](value)
if strip_defaults:
del form_dict['calculator_version']
for attr, value in list(form_dict.items()):
default = cls._DEFAULTS.get(attr, NO_DEFAULT)
if default is not NO_DEFAULT and value in [default, 'not-applicable']:
form_dict.pop(attr)
return form_dict
def validate(self):
# Validate number of infected people == 1 when activity is Conference/Training.
if self.activity_type == 'training' and self.infected_people > 1:
raise ValueError('Conference/Training activities are limited to 1 infected.')
# Validate number of infected <= number of total people
elif self.infected_people >= self.total_people:
raise ValueError('Number of infected people cannot be more or equal than number of total people.')
# Validate population parameters
self.validate_population_parameters()
# Validate time intervals selected by user
time_intervals = [
['exposed_start', 'exposed_finish'],
['infected_start', 'infected_finish'],
]
if self.exposed_lunch_option:
time_intervals.append(['exposed_lunch_start', 'exposed_lunch_finish'])
if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option:
time_intervals.append(['infected_lunch_start', 'infected_lunch_finish'])
for start_name, end_name in time_intervals:
start = getattr(self, start_name)
end = getattr(self, end_name)
if start > end:
raise ValueError(
f"{start_name} must be less than {end_name}. Got {start} and {end}.")
def validate_lunch(start, finish):
lunch_start = getattr(self, f'{population}_lunch_start')
lunch_finish = getattr(self, f'{population}_lunch_finish')
return (start <= lunch_start <= finish and
start <= lunch_finish <= finish)
def get_lunch_mins(population):
lunch_mins = 0
if getattr(self, f'{population}_lunch_option'):
lunch_mins = getattr(self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start')
return lunch_mins
def get_coffee_mins(population):
coffee_mins = 0
if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0':
coffee_mins = COFFEE_OPTIONS_INT[getattr(self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration')
return coffee_mins
def get_activity_mins(population):
return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start')
populations = ['exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed']
for population in populations:
# Validate lunch time within the activity times.
if (getattr(self, f'{population}_lunch_option') and
not validate_lunch(getattr(self, f'{population}_start'), getattr(self, f'{population}_finish'))
):
raise ValueError(
f"{population} lunch break must be within presence times."
)
# Length of breaks < length of activity
if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population):
raise ValueError(
f"Length of breaks >= Length of {population} presence."
)
validation_tuples = [('activity_type', ACTIVITY_TYPES),
('exposed_coffee_break_option', COFFEE_OPTIONS_INT),
('infected_coffee_break_option', COFFEE_OPTIONS_INT),
validation_tuples = [('activity_type', ACTIVITY_TYPES),
('mechanical_ventilation_type', MECHANICAL_VENTILATION_TYPES),
('mask_type', MASK_TYPES),
('mask_wearing_option', MASK_WEARING_OPTIONS),
@ -221,10 +93,16 @@ class FormData:
('ascertainment_bias', CONFIDENCE_LEVEL_OPTIONS),
('vaccine_type', VACCINE_TYPE),
('vaccine_booster_type', VACCINE_BOOSTER_TYPE),]
for attr_name, valid_set in validation_tuples:
if getattr(self, attr_name) not in valid_set:
raise ValueError(f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
# Validate number of infected people == 1 when activity is Conference/Training.
if self.activity_type == 'training' and self.infected_people > 1:
raise ValueError('Conference/Training activities are limited to 1 infected.')
# Validate ventilation parameters
if self.ventilation_type == 'natural_ventilation':
if self.window_type == 'not-applicable':
raise ValueError(
@ -327,7 +205,7 @@ class FormData:
def build_mc_model(self) -> mc.ExposureModel:
room = self.initialize_room()
ventilation: models._VentilationBase = self.ventilation()
infected_population = self.infected_population()
short_range = []
@ -340,11 +218,10 @@ class FormData:
distance=short_range_distances,
))
# Initializes and returns a model with the attributes defined above
return mc.ExposureModel(
concentration_model=mc.ConcentrationModel(
room=room,
ventilation=self.ventilation(),
ventilation=ventilation,
infected=infected_population,
evaporation_factor=0.3,
),
@ -354,7 +231,7 @@ class FormData:
geographic_population=self.geographic_population,
geographic_cases=self.geographic_cases,
ascertainment_bias=CONFIDENCE_LEVEL_OPTIONS[self.ascertainment_bias],
),
),
)
def build_model(self, sample_size=DEFAULT_MC_SAMPLE_SIZE) -> models.ExposureModel:
@ -437,10 +314,23 @@ class FormData:
def ventilation(self) -> models._VentilationBase:
always_on = models.PeriodicInterval(period=120, duration=120)
periodic_interval = models.PeriodicInterval(self.windows_frequency, self.windows_duration,
min(self.infected_start, self.exposed_start)/60)
if self.ventilation_type == 'from_fitting':
ventilations = []
if self.CO2_fitting_result['fitting_ventilation_type'] == 'fitting_natural_ventilation':
transition_times = self.CO2_fitting_result['transition_times']
for index, (start, stop) in enumerate(zip(transition_times[:-1], transition_times[1:])):
ventilations.append(models.AirChange(active=models.SpecificInterval(present_times=((start, stop), )),
air_exch=self.CO2_fitting_result['ventilation_values'][index]))
else:
ventilations.append(models.AirChange(active=always_on, air_exch=self.CO2_fitting_result['ventilation_values'][0]))
return models.MultipleVentilation(tuple(ventilations))
# Initializes a ventilation instance as a window if 'natural_ventilation' is selected, or as a HEPA-filter otherwise
if self.ventilation_type == 'natural_ventilation':
if self.window_opening_regime == 'windows_open_periodically':
window_interval = models.PeriodicInterval(self.windows_frequency, self.windows_duration, min(self.infected_start, self.exposed_start)/60)
window_interval = periodic_interval
else:
window_interval = always_on
@ -568,206 +458,11 @@ class FormData:
)
return exposed
def _compute_breaks_in_interval(self, start, finish, n_breaks, duration) -> models.BoundarySequence_t:
break_delay = ((finish - start) - (n_breaks * duration)) // (n_breaks+1)
break_times = []
end = start
for n in range(n_breaks):
begin = end + break_delay
end = begin + duration
break_times.append((begin, end))
return tuple(break_times)
def exposed_lunch_break_times(self) -> models.BoundarySequence_t:
result = []
if self.exposed_lunch_option:
result.append((self.exposed_lunch_start, self.exposed_lunch_finish))
return tuple(result)
def infected_lunch_break_times(self) -> models.BoundarySequence_t:
if self.infected_dont_have_breaks_with_exposed:
result = []
if self.infected_lunch_option:
result.append((self.infected_lunch_start, self.infected_lunch_finish))
return tuple(result)
else:
return self.exposed_lunch_break_times()
def exposed_number_of_coffee_breaks(self) -> int:
return COFFEE_OPTIONS_INT[self.exposed_coffee_break_option]
def infected_number_of_coffee_breaks(self) -> int:
return COFFEE_OPTIONS_INT[self.infected_coffee_break_option]
def _coffee_break_times(self, activity_start, activity_finish, coffee_breaks, coffee_duration, lunch_start, lunch_finish) -> models.BoundarySequence_t:
time_before_lunch = lunch_start - activity_start
time_after_lunch = activity_finish - lunch_finish
before_lunch_frac = time_before_lunch / (time_before_lunch + time_after_lunch)
n_morning_breaks = round(coffee_breaks * before_lunch_frac)
breaks = (
self._compute_breaks_in_interval(
activity_start, lunch_start, n_morning_breaks, coffee_duration
)
+ self._compute_breaks_in_interval(
lunch_finish, activity_finish, coffee_breaks - n_morning_breaks, coffee_duration
)
)
return breaks
def exposed_coffee_break_times(self) -> models.BoundarySequence_t:
exposed_coffee_breaks = self.exposed_number_of_coffee_breaks()
if exposed_coffee_breaks == 0:
return ()
if self.exposed_lunch_option:
breaks = self._coffee_break_times(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration, self.exposed_lunch_start, self.exposed_lunch_finish)
else:
breaks = self._compute_breaks_in_interval(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration)
return breaks
def infected_coffee_break_times(self) -> models.BoundarySequence_t:
if self.infected_dont_have_breaks_with_exposed:
infected_coffee_breaks = self.infected_number_of_coffee_breaks()
if infected_coffee_breaks == 0:
return ()
if self.infected_lunch_option:
breaks = self._coffee_break_times(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration, self.infected_lunch_start, self.infected_lunch_finish)
else:
breaks = self._compute_breaks_in_interval(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration)
return breaks
else:
return self.exposed_coffee_break_times()
def generate_specific_break_times(self, population_breaks) -> models.BoundarySequence_t:
break_times = []
for n in population_breaks:
# Parse break times.
begin = time_string_to_minutes(n["start_time"])
end = time_string_to_minutes(n["finish_time"])
for time in [begin, end]:
# For a specific break, the infected and exposed presence is the same.
if not getattr(self, 'infected_start') < time < getattr(self, 'infected_finish'):
raise ValueError(f'All breaks should be within the simulation time. Got {time_minutes_to_string(time)}.')
break_times.append((begin, end))
return tuple(break_times)
def present_interval(
self,
start: int,
finish: int,
breaks: typing.Optional[models.BoundarySequence_t] = None,
) -> models.Interval:
"""
Calculate the presence interval given the start and end times (in minutes), and
a number of monotonic, non-overlapping, but potentially unsorted, breaks (also in minutes).
"""
if not breaks:
# If there are no breaks, the interval is the start and end.
return models.SpecificInterval(((start/60, finish/60),))
# Order the breaks by their start-time, and ensure that they are monotonic
# and that the start of one break happens after the end of another.
break_boundaries: models.BoundarySequence_t = tuple(sorted(breaks, key=lambda break_pair: break_pair[0]))
for break_start, break_end in break_boundaries:
if break_start >= break_end:
raise ValueError("Break ends before it begins.")
prev_break_end = break_boundaries[0][1]
for break_start, break_end in break_boundaries[1:]:
if prev_break_end >= break_start:
raise ValueError(f"A break starts before another ends ({break_start}, {break_end}, {prev_break_end}).")
prev_break_end = break_end
present_intervals = []
current_time = start
LOG.debug(f"starting time march at {_hours2timestring(current_time/60)} to {_hours2timestring(finish/60)}")
# As we step through the breaks. For each break there are 6 important cases
# we must cover. Let S=start; E=end; Bs=Break start; Be=Break end:
# 1. The interval is entirely before the break. S < E <= Bs < Be
# 2. The interval straddles the start of the break. S < Bs < E <= Be
# 3. The break is entirely inside the interval. S < Bs < Be <= E
# 4. The interval is entirely inside the break. Bs <= S < E <= Be
# 5. The interval straddles the end of the break. Bs <= S < Be <= E
# 6. The interval is entirely after the break. Bs < Be <= S < E
for current_break in break_boundaries:
if current_time >= finish:
break
LOG.debug(f"handling break {_hours2timestring(current_break[0]/60)}-{_hours2timestring(current_break[1]/60)} "
f" (current time: {_hours2timestring(current_time/60)})")
break_s, break_e = current_break
case1 = finish <= break_s
case2 = current_time < break_s < finish < break_e
case3 = current_time < break_s < break_e <= finish
case4 = break_s <= current_time < finish <= break_e
case5 = break_s <= current_time < break_e < finish
case6 = break_e <= current_time
if case1:
LOG.debug(f"case 1: interval entirely before break")
present_intervals.append((current_time / 60, finish / 60))
LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
f"- {_hours2timestring(present_intervals[-1][1])}")
current_time = finish
elif case2:
LOG.debug(f"case 2: interval straddles start of break")
present_intervals.append((current_time / 60, break_s / 60))
LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
f"- {_hours2timestring(present_intervals[-1][1])}")
current_time = break_e
elif case3:
LOG.debug(f"case 3: break entirely inside interval")
# We add the bit before the break, but not the bit afterwards,
# as it may hit another break.
present_intervals.append((current_time / 60, break_s / 60))
LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
f"- {_hours2timestring(present_intervals[-1][1])}")
current_time = break_e
elif case4:
LOG.debug(f"case 4: interval entirely inside break")
current_time = finish
elif case5:
LOG.debug(f"case 5: interval straddles end of break")
current_time = break_e
elif case6:
LOG.debug(f"case 6: interval entirely after the break")
if current_time < finish:
LOG.debug("trailing interval")
present_intervals.append((current_time / 60, finish / 60))
return models.SpecificInterval(tuple(present_intervals))
def infected_present_interval(self) -> models.Interval:
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(self.specific_breaks['infected_breaks'])
else:
breaks = self.infected_lunch_break_times() + self.infected_coffee_break_times()
return self.present_interval(
self.infected_start, self.infected_finish,
breaks=breaks,
)
def short_range_interval(self, interaction) -> models.SpecificInterval:
start_time = time_string_to_minutes(interaction['start_time'])
duration = float(interaction['duration'])
return models.SpecificInterval(present_times=((start_time/60, (start_time + duration)/60),))
def exposed_present_interval(self) -> models.Interval:
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(self.specific_breaks['exposed_breaks'])
else:
breaks = self.exposed_lunch_break_times() + self.exposed_coffee_break_times()
return self.present_interval(
self.exposed_start, self.exposed_finish,
breaks=breaks,
)
def build_expiration(expiration_definition) -> mc._ExpirationBase:
if isinstance(expiration_definition, str):
@ -846,80 +541,4 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]:
'short_range_interactions': '[]',
}
def _hours2timestring(hours: float):
# Convert times like 14.5 to strings, like "14:30"
return f"{int(np.floor(hours)):02d}:{int(np.round((hours % 1) * 60)):02d}"
def time_string_to_minutes(time: str) -> minutes_since_midnight:
"""
Converts time from string-format to an integer number of minutes after 00:00
:param time: A string of the form "HH:MM" representing a time of day
:return: The number of minutes between 'time' and 00:00
"""
return minutes_since_midnight(60 * int(time[:2]) + int(time[3:]))
def time_minutes_to_string(time: int) -> str:
"""
Converts time from an integer number of minutes after 00:00 to string-format
:param time: The number of minutes between 'time' and 00:00
:return: A string of the form "HH:MM" representing a time of day
"""
return "{0:0=2d}".format(int(time/60)) + ":" + "{0:0=2d}".format(time%60)
def string_to_list(s: str) -> list:
return list(ast.literal_eval(s.replace("&quot;", "\"")))
def list_to_string(l: list) -> str:
return json.dumps(l)
def string_to_dict(s: str) -> dict:
return dict(ast.literal_eval(s.replace("&quot;", "\"")))
def dict_to_string(d: dict) -> str:
return json.dumps(d)
def _safe_int_cast(value) -> int:
if isinstance(value, int):
return value
elif isinstance(value, float) and int(value) == value:
return int(value)
elif isinstance(value, str) and value.isdecimal():
return int(value)
else:
raise TypeError(f"Unable to safely cast {value} ({type(value)} type) to int")
#: Mapping of field name to a callable which can convert values from form
#: input (URL encoded arguments / string) into the correct type.
_CAST_RULES_FORM_ARG_TO_NATIVE: typing.Dict[str, typing.Callable] = {}
#: Mapping of field name to callable which can convert native type to values
#: that can be encoded to URL arguments.
_CAST_RULES_NATIVE_TO_FORM_ARG: typing.Dict[str, typing.Callable] = {}
for _field in dataclasses.fields(FormData):
if _field.type is minutes_since_midnight:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = time_string_to_minutes
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = time_minutes_to_string
elif _field.type is int:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = _safe_int_cast
elif _field.type is float:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = float
elif _field.type is bool:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = lambda v: v == '1'
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = int
elif _field.type is list:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_list
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = list_to_string
elif _field.type is dict:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_dict
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = dict_to_string
cast_class_fields(VirusFormData)

View file

@ -15,7 +15,7 @@ import matplotlib.pyplot as plt
from caimira import models
from caimira.apps.calculator import markdown_tools
from ... import monte_carlo as mc
from .model_generator import FormData, DEFAULT_MC_SAMPLE_SIZE
from .model_generator import VirusFormData, DEFAULT_MC_SAMPLE_SIZE
from ... import dataclass_utils
from caimira.store.configuration import config
@ -102,7 +102,7 @@ def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional
return nice_times
def concentrations_with_sr_breathing(form: FormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]:
def concentrations_with_sr_breathing(form: VirusFormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]:
lower_concentrations = []
for time in times:
for index, (start, stop) in enumerate(short_range_intervals):
@ -114,7 +114,7 @@ def concentrations_with_sr_breathing(form: FormData, model: models.ExposureModel
return lower_concentrations
def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing.Dict[str, typing.Any]:
def calculate_report_data(form: VirusFormData, model: models.ExposureModel) -> typing.Dict[str, typing.Any]:
times = interesting_times(model)
short_range_intervals = [interaction.presence.boundaries()[0] for interaction in model.short_range]
short_range_expirations = [interaction['expiration'] for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else []
@ -150,6 +150,7 @@ def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing
zip(('viral_loads', 'pi_means', 'lower_percentiles', 'upper_percentiles'),
manufacture_conditional_probability_data(model, prob))}
return {
"model_repr": repr(model),
"times": list(times),
@ -174,8 +175,8 @@ def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing
}
def generate_permalink(base_url, get_root_url, get_root_calculator_url, form: FormData):
form_dict = FormData.to_dict(form, strip_defaults=True)
def generate_permalink(base_url, get_root_url, get_root_calculator_url, form: VirusFormData):
form_dict = VirusFormData.to_dict(form, strip_defaults=True)
# Generate the calculator URL arguments that would be needed to re-create this
# form.
@ -318,6 +319,13 @@ def readable_minutes(minutes: int) -> str:
return time_str + unit
def hour_format(hour: float) -> str:
# Convert float hour to HH:MM format
hours = int(hour)
minutes = int(hour % 1 * 60)
return f"{hours}:{minutes if minutes != 0 else '00'}"
def percentage(absolute: float) -> float:
return absolute * 100
@ -345,7 +353,7 @@ def manufacture_viral_load_scenarios_percentiles(model: mc.ExposureModel) -> typ
return scenarios
def manufacture_alternative_scenarios(form: FormData) -> typing.Dict[str, mc.ExposureModel]:
def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]:
scenarios = {}
if (form.short_range_option == "short_range_no"):
# Two special option cases - HEPA and/or FFP2 masks.
@ -416,7 +424,7 @@ def scenario_statistics(mc_model: mc.ExposureModel, sample_times: typing.List[fl
def comparison_report(
form: FormData,
form: VirusFormData,
report_data: typing.Dict[str, typing.Any],
scenarios: typing.Dict[str, mc.ExposureModel],
sample_times: typing.List[float],
@ -464,7 +472,7 @@ class ReportGenerator:
def build_report(
self,
base_url: str,
form: FormData,
form: VirusFormData,
executor_factory: typing.Callable[[], concurrent.futures.Executor],
) -> str:
model = form.build_model()
@ -475,7 +483,7 @@ class ReportGenerator:
self,
base_url: str,
model: models.ExposureModel,
form: FormData,
form: VirusFormData,
executor_factory: typing.Callable[[], concurrent.futures.Executor],
) -> dict:
now = datetime.utcnow().astimezone()
@ -513,6 +521,7 @@ class ReportGenerator:
env.filters['non_zero_percentage'] = non_zero_percentage
env.filters['readable_minutes'] = readable_minutes
env.filters['minutes_to_time'] = minutes_to_time
env.filters['hour_format'] = hour_format
env.filters['float_format'] = "{0:.2f}".format
env.filters['int_format'] = "{:0.0f}".format
env.filters['percentage'] = percentage

View file

@ -0,0 +1,388 @@
// Input data for CO2 fitting algorithm
const CO2_data_form = [
"CO2_data",
"exposed_coffee_break_option",
"exposed_coffee_duration",
"exposed_finish",
"exposed_lunch_finish",
"exposed_lunch_option",
"exposed_lunch_start",
"exposed_start",
"fitting_ventilation_states",
"fitting_ventilation_type",
"infected_coffee_break_option",
"infected_coffee_duration",
"infected_dont_have_breaks_with_exposed",
"infected_finish",
"infected_lunch_finish",
"infected_lunch_option",
"infected_lunch_start",
"infected_people",
"infected_start",
"room_volume",
"specific_breaks",
"total_people",
];
// Method to upload a valid excel file
function uploadFile(endpoint) {
clearFittingResultComponent();
const files = $("#file_upload")[0].files;
if (files.length === 0) {
$("#upload-error")
.text('Please choose a file.')
.show();
return;
}
const file = files[0];
const extension = file.name
.substring(file.name.lastIndexOf("."))
.toUpperCase();
if (extension !== ".XLS" && extension !== ".XLSX") {
$("#upload-error")
.text("Please select a valid excel file (.XLS or .XLSX).")
.show();
return;
}
// FileReader API to read the Excel file
const reader = new FileReader();
reader.onload = function (event) {
const fileContent = event.target.result;
const workbook = XLSX.read(fileContent, { type: "binary" });
// Assuming the first sheet is the one we want to validate
const firstSheetName = workbook.SheetNames[0];
const worksheet = workbook.Sheets[firstSheetName];
// Check if the headers match the expected format
const headerCoordinates = {
Times: "A1",
CO2: "B1",
};
for (const header in headerCoordinates) {
const cellValue = worksheet[headerCoordinates[header]]?.v;
if (
!cellValue ||
$.type(cellValue) !== "string" ||
cellValue.trim().toLowerCase() !== header.toLowerCase()
) {
$("#upload-error")
.text(`The file does not have the expected header "${header}".`)
.show();
return;
}
}
const data = XLSX.utils.sheet_to_json(worksheet, { header: 1, raw: false });
// Check if there is any data below the header row
if (data.length <= 1) {
$("#upload-error")
.text(
"The Excel file is empty. Please make sure it contains data below the header row."
)
.show();
return;
}
// Validate data in the columns
const timesColumnIndex = 0;
const CO2ColumnIndex = 1;
for (let i = 1; i < data.length; i++) {
try {
const timesCellValue = parseFloat(data[i][timesColumnIndex]);
const CO2CellValue = parseFloat(data[i][CO2ColumnIndex]);
if (isNaN(timesCellValue) || isNaN(CO2CellValue)) {
throw new Error("Invalid data in the Times or CO2 columns.");
}
} catch (error) {
$("#upload-error")
.text(
"Invalid data in the Times or CO2 columns. Please make sure they contain only float values."
)
.show();
return;
}
}
// Convert Excel file to JSON and further processing
try {
generateJSONStructure(endpoint, data);
// If all validations pass, process the file here or display a success message
$("#upload-file-extention-error").hide();
} catch (error) {
console.log(error);
}
};
reader.readAsBinaryString(file); // Read the file as a binary string
}
// Method to generate the JSON structure
function generateJSONStructure(endpoint, jsonData) {
const inputToPopulate = $("#CO2_data");
// Initialize the final structure
const finalStructure = { times: [], CO2: [] };
if (jsonData.length > 0) {
// Loop through the input dataArray and extract the values starting from the second array (index 1)
for (let i = 1; i < jsonData.length; i++) {
const arr = jsonData[i];
// Assuming arr contains two float values
finalStructure.times.push(parseFloat(arr[0]));
finalStructure.CO2.push(parseFloat(arr[1]));
}
inputToPopulate.val(JSON.stringify(finalStructure));
$("#generate_fitting_data").prop("disabled", false);
$("#fitting_ventilation_states").prop("disabled", false);
$("[name=fitting_ventilation_type]").prop("disabled", false);
plotCO2Data(endpoint);
}
}
function insertErrorFor(referenceNode, text) {
$(`<span class='error_text text-danger'>${text}</span>`).insertAfter(referenceNode)
}
function validateFormInputs(obj) {
$("#ventilation_data").find("span.error_text").remove(); // Remove all error spans
let submit = true;
const $referenceNode = $("#DIVCO2_data_dialog");
for (let i = 0; i < CO2_data_form.length; i++) {
const $requiredElement = $(`[name=${CO2_data_form[i]}]`).first();
if ($requiredElement.attr('name') !== "fitting_ventilation_states" && $requiredElement.val() === "") {
insertErrorFor(
$referenceNode,
`'${$requiredElement.attr('name')}' must be defined.<br />`
);
submit = false;
}
}
if (submit) {
$($(obj).data("target")).modal("show");
$("#upload-error").hide();
$("#upload-file-extention-error").hide();
}
return submit;
}
function validateCO2Form() {
let submit = true;
if (validateFormInputs($("#button_fit_data"))) submit = true;
const $fittingToSubmit = $('#DIVCO2_fitting_to_submit');
// Check if natural ventilation is selected
if (
$fittingToSubmit.find('input[name="fitting_ventilation_type"]:checked').val() ==
"fitting_natural_ventilation"
) {
// Validate ventilation scheme
const $ventilationStates = $fittingToSubmit.find("input[name=fitting_ventilation_states]");
const $referenceNode = $("#DIVCO2_fitting_result");
if ($ventilationStates.val() !== "") {
// validate input format
try {
const parsedValue = JSON.parse($ventilationStates.val());
if (Array.isArray(parsedValue)) {
if (parsedValue.length <= 1) {
insertErrorFor(
$referenceNode,
`'${$ventilationStates.attr('name')}' must have more than one $ventilationStates.<br />`
);
submit = false;
}
else {
const infected_finish = $(`[name=infected_finish]`).first().val();
const exposed_finish = $(`[name=exposed_finish]`).first().val();
const [hours_infected, minutes_infected] = infected_finish.split(":").map(Number);
const elapsed_time_infected = hours_infected * 60 + minutes_infected;
const [hours_exposed, minutes_exposed] = exposed_finish.split(":").map(Number);
const elapsed_time_exposed = hours_exposed * 60 + minutes_exposed;
const max_presence_time = Math.max(elapsed_time_infected, elapsed_time_exposed);
const max_transition_time = parsedValue[parsedValue.length - 1] * 60;
if (max_transition_time > max_presence_time) {
insertErrorFor(
$referenceNode,
`The last transition time (${parsedValue[parsedValue.length - 1]}) should be before the last presence time (${max_presence_time / 60}).<br />`
);
submit = false;
}
}
}
else {
insertErrorFor(
$referenceNode,
`'${$ventilationStates.attr('name')}' must be a list.</br>`
);
submit = false;
}
} catch {
insertErrorFor(
$referenceNode,
`'${$ventilationStates.attr('name')}' must be a list of numbers.</br>`
);
submit = false;
}
} else {
insertErrorFor(
$referenceNode,
`'${$ventilationStates.attr('name')}' must be defined.</br>`
);
submit = false;
}
}
return submit;
}
function displayTransitionTimesHourFormat(start, stop) {
var minutes_start = ((start % 1) * 60).toPrecision(2);
var minutes_stop = ((stop % 1) * 60).toPrecision(2);
return (
Math.floor(start) +
":" +
(minutes_start != "0.0" ? minutes_start : "00") +
" - " +
Math.floor(stop) +
":" +
(minutes_stop != "0.0" ? minutes_stop : "00")
);
}
function displayFittingData(json_response) {
$("#DIVCO2_fitting_result").show();
$("#CO2_data_plot").attr("src", json_response["CO2_plot"]);
// Not needed for the form submission
delete json_response["CO2_plot"];
delete json_response["predictive_CO2"];
$("#CO2_fitting_result").val(JSON.stringify(json_response));
$("#exhalation_rate_fit").html(
"Exhalation rate: " +
String(json_response["exhalation_rate"].toFixed(2)) +
" m³/h"
);
let ventilation_table =
"<tr><th>Time (HH:MM)</th><th>ACH value (h⁻¹)</th></tr>";
json_response["ventilation_values"].forEach((val, index) => {
let transition_times = displayTransitionTimesHourFormat(
json_response["transition_times"][index],
json_response["transition_times"][index + 1]
);
ventilation_table += `<tr><td>${transition_times}</td><td>${val.toPrecision(
2
)}</td></tr>`;
});
$("#disable_fitting_algorithm").prop("disabled", false);
$("#ventilation_rate_fit").html(ventilation_table);
$("#generate_fitting_data").html("Fit data");
$("#generate_fitting_data").hide();
$("#save_and_dismiss_dialog").show();
}
function formatCO2DataForm(CO2_data_form) {
let CO2_mapping = {};
CO2_data_form.map((el) => {
let element = $(`[name=${el}]`).first();
// Validate checkboxes
if (element.prop('type') == "checkbox") {
CO2_mapping[element.attr('name')] = String(+element.prop('checked'));
}
// Validate radio buttons
else if (element.prop('type') == "radio")
CO2_mapping[element.attr('name')] = $(
`[name=${element.attr('name')}]:checked`
).first().val();
else {
CO2_mapping[element.attr('name')] = element.val();
}
});
return CO2_mapping;
}
function plotCO2Data(url) {
if (validateFormInputs()) {
let CO2_mapping = formatCO2DataForm(CO2_data_form);
fetch(url, {
method: "POST",
body: JSON.stringify(CO2_mapping),
}).then((response) =>
response
.json()
.then((json_response) => {
$("#CO2_data_plot").attr("src", json_response["CO2_plot"])
$("#fitting_ventilation_states").val(`[${json_response["transition_times"]}]`)
})
.then($("#DIVCO2_fitting_to_submit").show())
.catch((error) => console.log(error))
);
}
}
function submitFittingAlgorithm(url) {
if (validateCO2Form()) {
// Disable all the ventilation inputs
$("#fitting_ventilation_states, [name=fitting_ventilation_type]").prop(
"disabled",
true
);
// Prepare data for submission
const CO2_mapping = formatCO2DataForm(CO2_data_form);
$("#CO2_input_data_div").show();
$("#disable_fitting_algorithm").prop("disabled", true);
$("#generate_fitting_data")
.html(
'<span id="loading_spinner" class="spinner-border spinner-border-sm mr-2" role="status" aria-hidden="true"></span>Loading...'
)
.prop("disabled", true);
$("#CO2_input_data").html(JSON.stringify(CO2_mapping, null, "\t"));
fetch(url, {
method: "POST",
body: JSON.stringify(CO2_mapping),
})
.then((response) => response.json())
.then((json_response) => {
displayFittingData(json_response);
// Hide the suggestion transition lines warning
$("#suggestion_lines_txt").hide();
});
}
}
function clearFittingResultComponent() {
const $referenceNode = $("#DIVCO2_data_dialog");
// Add the warning suggestion line
$referenceNode.find("#suggestion_lines_txt").show();
// Remove all the previously generated fitting elements
$referenceNode.find("#generate_fitting_data").prop("disabled", true);
$referenceNode.find("#CO2_fitting_result").val("");
$referenceNode.find("#CO2_data").val("{}");
$referenceNode.find("#fitting_ventilation_states").val("");
$referenceNode.find("span.error_text").remove();
$referenceNode.find("#DIVCO2_fitting_result, #CO2_input_data_div").hide();
$referenceNode.find("#DIVCO2_fitting_to_submit").hide();
$referenceNode.find("#CO2_data_plot").attr("src", "");
// Update the ventilation scheme components
$referenceNode.find("#fitting_ventilation_states, [name=fitting_ventilation_type]").prop(
"disabled",
false
);
// Update the bottom right buttons
$referenceNode.find("#generate_fitting_data").show();
$referenceNode.find("#save_and_dismiss_dialog").hide();
}
function disableFittingAlgorithm() {
clearFittingResultComponent();
$("#CO2_data_no").click();
}

View file

@ -493,6 +493,20 @@ function on_coffee_break_option_change() {
}
}
function on_CO2_fitting_ventilation_change() {
ventilation_options = $('input[type=radio][name=fitting_ventilation_type]');
ventilation_options.each(function (index) {
if (this.checked) {
getChildElement($(this)).show();
require_fields(this);
}
else {
getChildElement($(this)).hide();
require_fields(this);
}
})
}
/* -------UI------- */
function show_disclaimer() {
@ -665,6 +679,14 @@ function validate_form(form) {
on_short_range_option_change();
}
// Check if fitting is selected
if ($('input[type=radio][id=from_fitting]').prop('checked')) {
if ($('#CO2_fitting_result').val() == '')
$("input[type=radio][id=no_ventilation]").prop("checked", true);
$("span.error_text").remove();
on_ventilation_type_change();
}
if (submit) {
$("#generate_report").prop("disabled", true);
//Add spinner to button
@ -673,6 +695,8 @@ function validate_form(form) {
);
}
if ($("#CO2_fitting_result").val() == "") $("#CO2_data_no").click();
return submit;
}
@ -914,6 +938,12 @@ $(document).ready(function () {
// Validation after
}
// Read CO2 Fitting Algorithms result
else if (name == 'CO2_fitting_result' || name == 'CO2_data') {
// Validation after
}
//Ignore 0 (default) values from server side
else if (!(elemObj.classList.contains("non_zero") || elemObj.classList.contains("remove_zero")) || (value != "0.0" && value != "0")) {
elemObj.value = value;
@ -924,6 +954,10 @@ $(document).ready(function () {
// Handle default URL values if they are not explicitly defined.
// Populate CO2 Fitting Algorithm Dialog
let CO2_data = url.searchParams.has('CO2_fitting_result') ? url.searchParams.get('CO2_fitting_result') : null;
if (CO2_data) displayFittingData(JSON.parse(CO2_data));
// Populate primary vaccine dropdown
$("#vaccine_type option").remove();
let primary_vaccine = url.searchParams.has('vaccine_type') ? url.searchParams.get('vaccine_type') : null;
@ -1039,6 +1073,12 @@ $(document).ready(function () {
// Call the function now to handle forward/back button presses in the browser.
on_coffee_break_option_change();
// When the ventilation on the fitting changes we want to make its respective
// children show/hide.
$("input[type=radio][name=fitting_ventilation_type]").change(on_CO2_fitting_ventilation_change);
// Call the function now to handle forward/back button presses in the browser.
on_CO2_fitting_ventilation_change();
// Setup the maximum number of people at page load (to handle back/forward),
// and update it when total people is changed.
validateMaxInfectedPeople();

View file

@ -12,7 +12,9 @@
{% block body_scripts %}
<script src="https://cdnjs.cloudflare.com/ajax/libs/jqueryui/1.12.1/jquery-ui.min.js" integrity="sha512-uto9mlQzrs59VwILcLiRYeLKPPbS/bT71da/OEBYEwcdNUk8jYIy+D176RYoop1Da+f9mvkYrmj5MCLZWEtQuA==" crossorigin="anonymous"></script>
<script src="{{ get_calculator_url('/static/js') }}/form.js"></script>
<script src="{{ get_calculator_url('/static/js') }}/co2_form.js"></script>
<script src="https://cdn.jsdelivr.net/npm/select2@4.1.0-rc.0/dist/js/select2.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/xlsx/0.16.2/xlsx.full.min.js"></script>
{% endblock body_scripts %}
@ -185,116 +187,201 @@
<hr width="80%">
<!-- Ventilation Options -->
<b>Ventilation data:</b>
<div data-tooltip="The available means of venting / filtration of indoor spaces.">
<span class="tooltip_text">?</span>
</div>
<br>
<div id="ventilation_data">
<div class='sub_title'>Ventilation type:</div>
<div class="split">
<div>
<input type="radio" id="no_ventilation" name="ventilation_type" value="no_ventilation" checked>
<label for="no_ventilation">No ventilation</label>
<input class="ml-3" type="radio" id="mechanical_ventilation" name="ventilation_type" value="mechanical_ventilation" data-enables="#DIVmechanical_ventilation">
<label for="mechanical_ventilation">Mechanical</label>
</div>
<div class="ml-0">
<input type="radio" id="natural_ventilation" name="ventilation_type" value="natural_ventilation" data-enables="#DIVnatural_ventilation" data-toggle="modal" data-target="#warning_modal">
<label for="natural_ventilation">Natural</label>
<input class="ml-3" type="radio" id="from_fitting" name="ventilation_type" value="from_fitting" data-enables="#DIVfrom_fitting">
<label for="from_fitting">From Fitting</label>
</div>
</div>
<div class='sub_title'>Ventilation type:</div>
<div class="split">
<div>
<input type="radio" id="no_ventilation" name="ventilation_type" value="no_ventilation" checked>
<label for="no_ventilation">No ventilation</label>
<input class="ml-2" type="radio" id="mechanical_ventilation" name="ventilation_type" value="mechanical_ventilation" data-enables="#DIVmechanical_ventilation">
<label for="mechanical_ventilation">Mechanical</label>
</div>
<div>
<input type="radio" id="natural_ventilation" name="ventilation_type" value="natural_ventilation" data-enables="#DIVnatural_ventilation" data-toggle="modal" data-target="#warning_modal">
<label for="natural_ventilation">Natural</label><br>
</div>
</div>
<!-- Modal -->
<div class="modal fade" id="warning_modal" tabindex="-1" role="dialog" aria-hidden="true">
<div class="modal-dialog" role="document">
<div class="modal-content">
<div class="modal-header">
<h4 class="modal-title">Natural Ventilation</h4>
<button type="button" class="close align-self-center" data-dismiss="modal" aria-label="Close">
<span aria-hidden="true">&times;</span>
</button>
</div>
<div class="modal-body">
Single-sided ventilation is assumed in the model and is typically effective for room depths up to a depth 2.5 x the ceiling height.
If these conditions are not met, the air exchange might not be homogenous producing an artificially lower risk further away from the window.
<br>
<br>
<img src="{{ get_url('/static/images') }}/nat_vent_dimensions.png" id="nat_vent_image">
<!-- Natural Ventilation Modal -->
<div class="modal fade" id="warning_modal" tabindex="-1" role="dialog" aria-hidden="true">
<div class="modal-dialog" role="document">
<div class="modal-content">
<div class="modal-header">
<h4 class="modal-title">Natural Ventilation</h4>
<button type="button" class="close align-self-center" data-dismiss="modal" aria-label="Close">
<span aria-hidden="true">&times;</span>
</button>
</div>
<div class="modal-body">
Single-sided ventilation is assumed in the model and is typically effective for room depths up to a depth 2.5 x the ceiling height.
If these conditions are not met, the air exchange might not be homogenous producing an artificially lower risk further away from the window.
<br>
<br>
<img src="{{ get_url('/static/images') }}/nat_vent_dimensions.png" id="nat_vent_image">
</div>
</div>
</div>
</div>
</div>
<div id="DIVmechanical_ventilation" class="tabbed" style="display:none">
<div class="split">
<div style="min-width: 55%">
<input type="radio" id="mech_type_air_supply" name="mechanical_ventilation_type" value="mech_type_air_supply" class="center_radio" onclick="require_fields(this)" tabindex="-1">
<label for="mech_type_air_supply" class="col-form-label ml-2">Air supply flow rate (m³ / hour)</label>
<div id="DIVmechanical_ventilation" class="tabbed" style="display:none">
<div class="split">
<div style="min-width: 55%">
<input type="radio" id="mech_type_air_supply" name="mechanical_ventilation_type" value="mech_type_air_supply" class="center_radio" onclick="require_fields(this)" tabindex="-1">
<label for="mech_type_air_supply" class="col-form-label ml-2">Air supply flow rate (m³ / hour)</label>
</div>
<div>
<input type="number" step="any" id="air_supply" class="non_zero form-control" name="air_supply" min="0" placeholder="Flow rate" data-has-radio="#mech_type_air_supply"><br>
</div>
</div>
<div>
<input type="number" step="any" id="air_supply" class="non_zero form-control" name="air_supply" min="0" placeholder="Flow rate" data-has-radio="#mech_type_air_supply"><br>
<div class="split">
<div style="min-width: 55%">
<input type="radio" id="mech_type_air_changes" name="mechanical_ventilation_type" value="mech_type_air_changes" class="center_radio" onclick="require_fields(this)" tabindex="-1">
<label for="mech_type_air_changes" class="col-form-label ml-2">Air changes per hour (h⁻¹)</label>
</div>
<div>
<input type="number" step="any" id="air_changes" class="non_zero form-control" name="air_changes" min="0" placeholder="Air exchange" data-has-radio="#mech_type_air_changes"><br>
</div>
</div>
</div>
<div class="split">
<div style="min-width: 55%">
<input type="radio" id="mech_type_air_changes" name="mechanical_ventilation_type" value="mech_type_air_changes" class="center_radio" onclick="require_fields(this)" tabindex="-1">
<label for="mech_type_air_changes" class="col-form-label ml-2">Air changes per hour (h⁻¹)</label>
</div>
<div>
<input type="number" step="any" id="air_changes" class="non_zero form-control" name="air_changes" min="0" placeholder="Air exchange" data-has-radio="#mech_type_air_changes"><br>
</div>
</div>
</div>
</div>
<div id="DIVnatural_ventilation" style="display:none">
<div class="form-group row tabbed">
<div class="col-md-5"><label class="col-form-label">Number of windows:</label></div>
<div class="col-md-5"><input type="number" id="windows_number" class="non_zero form-control" name="windows_number" placeholder="Number" min="1"></div>
</div>
<div class="form-group row tabbed">
<div class="col-md-5"><label class="col-form-label">Height of window (m): </label></div>
<div class="col-md-5"><input type="number" step="any" id="window_height" class="non_zero form-control" name="window_height" placeholder="Height" min="0"></div>
</div>
<div class='sub_title'>Window type:</div>
<input class="ml-2" type="radio" id="window_sliding" name="window_type" value="window_sliding" onclick="require_fields(this)" checked="checked">
<label for="window_sliding">Sliding / Side-Hung</label>
<input class="ml-2" type="radio" id="window_hinged" name="window_type" value="window_hinged" onclick="require_fields(this)">
<label for="window_hinged">Top- or Bottom-Hung</label><br>
<div id="DIVnatural_ventilation" style="display:none">
<div class="form-group row tabbed">
<div class="col-md-5"><label class="col-form-label">Width of window (m): </label></div>
<div class="col-md-5"><input type="number" step="any" id="window_width" class="non_zero disabled form-control" name="window_width" placeholder="Width" min="0" data-has-radio="#window_hinged"></div>
<div class="col-md-5"><label class="col-form-label">Number of windows:</label></div>
<div class="col-md-5"><input type="number" id="windows_number" class="non_zero form-control" name="windows_number" placeholder="Number" min="1"></div>
</div>
<div class="form-group row tabbed">
<div class="col-md-5"><label class="col-form-label">Opening distance (m): </label></div>
<div class="col-md-5"><input type="number" step="any" id="opening_distance" class="non_zero form-control" name="opening_distance" placeholder="Opening distance" min="0"></div>
<div class="col-md-5"><label class="col-form-label">Height of window (m): </label></div>
<div class="col-md-5"><input type="number" step="any" id="window_height" class="non_zero form-control" name="window_height" placeholder="Height" min="0"></div>
</div>
<div class='sub_title'>Window open:
<div data-tooltip="Permanently or periodically - e.g. open the window for 10 minutes (duration) every 60 minutes (frequency).">
<span class="tooltip_text">?</span>
</div>
</div>
<input class="ml-2" type="radio" id="windows_open_permanently" name="window_opening_regime" value="windows_open_permanently" onclick="require_fields(this)" checked="checked"></span>
<label for="windows_open_permanently" class="col-form-label ml-2">Permanently</label>
<input class="ml-2" type="radio" id="windows_open_periodically" name="window_opening_regime" value="windows_open_periodically" onclick="require_fields(this)" data-enables="#DIVperiodic_opening"></span>
<label for="windows_open_periodically" class="col-form-label ml-2 mr-2">Periodically</label>
<div id="DIVperiodic_opening">
<div class="form-group row tabbed">
<div class="col-md-5"><label for="window_duration" class="col-form-label ml-2">Duration (min):</label></div>
<div class="col-md-5"><input type="number" step="any" id="windows_duration" class="non_zero form-control" name="windows_duration" value="10" min="1" data-has-radio="#windows_open_periodically"></div>
</div>
<div class='sub_title'>Window type:</div>
<input class="ml-2" type="radio" id="window_sliding" name="window_type" value="window_sliding" onclick="require_fields(this)" checked="checked">
<label for="window_sliding">Sliding / Side-Hung</label>
<input class="ml-2" type="radio" id="window_hinged" name="window_type" value="window_hinged" onclick="require_fields(this)">
<label for="window_hinged">Top- or Bottom-Hung</label><br>
<div class="form-group row tabbed">
<div class="col-md-5"><label for="window_frequency" class="col-form-label ml-2">Frequency (min):</label></div>
<div class="col-md-5"><input type="number" step="any" id="windows_frequency" class="non_zero form-control" name="windows_frequency" value="60" min="1" data-has-radio="#windows_open_periodically"></div>
<div class="col-md-5"><label class="col-form-label">Width of window (m): </label></div>
<div class="col-md-5"><input type="number" step="any" id="window_width" class="non_zero disabled form-control" name="window_width" placeholder="Width" min="0" data-has-radio="#window_hinged"></div>
</div>
<div class="form-group row tabbed" id="DIVopening_distance">
<div class="col-md-5"><label class="col-form-label">Opening distance (m): </label></div>
<div class="col-md-5"><input type="number" step="any" id="opening_distance" class="non_zero form-control" name="opening_distance" placeholder="Opening distance" min="0"></div>
</div>
<div id="window_opening_regime">
<div class='sub_title'>Window open:
<div data-tooltip="Permanently or periodically - e.g. open the window for 10 minutes (duration) every 60 minutes (frequency).">
<span class="tooltip_text">?</span>
</div>
</div>
<input class="ml-2" type="radio" id="windows_open_permanently" name="window_opening_regime" value="windows_open_permanently" onclick="require_fields(this)" checked="checked"></span>
<label for="windows_open_permanently" class="col-form-label ml-2">Permanently</label>
<input class="ml-2" type="radio" id="windows_open_periodically" name="window_opening_regime" value="windows_open_periodically" onclick="require_fields(this)" data-enables="#DIVperiodic_opening"></span>
<label for="windows_open_periodically" class="col-form-label ml-2 mr-2">Periodically</label>
<div id="DIVperiodic_opening">
<div class="form-group row tabbed">
<div class="col-md-5"><label for="window_duration" class="col-form-label ml-2">Duration (min):</label></div>
<div class="col-md-5"><input type="number" step="any" id="windows_duration" class="non_zero form-control" name="windows_duration" value="10" min="1" data-has-radio="#windows_open_periodically"></div>
</div>
<div class="form-group row tabbed">
<div class="col-md-5"><label for="window_frequency" class="col-form-label ml-2">Frequency (min):</label></div>
<div class="col-md-5"><input type="number" step="any" id="windows_frequency" class="non_zero form-control" name="windows_frequency" value="60" min="1" data-has-radio="#windows_open_periodically"></div>
</div>
</div>
</div>
</div>
<br/>
<div id="DIVfrom_fitting" class="split" style="display: none">
<div>
For the CO₂ fitting algorithm, the following input values will be considered:
<ul>
<li>Room volume (m³)</li>
<li>Total number of occupants</li>
<li>Presence transition times</li>
</ul>
</div>
<div class="align-self-center text-center">
<button id="button_fit_data" type="button" class="btn btn-primary w-50" data-target="#DIVCO2_data_dialog" data-keyboard="false" data-backdrop="static" onclick="validateFormInputs(this)">Fitting Algorithm</button>
</div>
</div>
<!-- CO2 Modal -->
<div class="modal fade" id="DIVCO2_data_dialog" tabindex="-1" role="dialog" aria-labelledby="CO2_values_title" aria-hidden="true" data-backdrop="static">
<div class="modal-dialog modal-lg" role="document" overflow: visible>
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title" id="CO2_values_title">CO₂ Fitting Algorithm</h5>
</div>
<div class="modal-body">
<div class="d-flex d-row justify-content-between">
<h5 class="mt-auto mb-auto">Upload an Excel file:</h5>
<a class="btn btn-primary btn-sm" href="https://caimira-resources.web.cern.ch/CO2_template.xlsx">Download template</a>
</div>
<div class="d-flex">
<p>You may use the provided template and fill in the values.</p>
<div data-tooltip="File must have 2 columns, with headers 'Times' and 'CO2'.">
<span class="tooltip_text">?</span>
</div>
</div>
<br>
<!-- Input element to upload an excel file -->
<input type="file" id="file_upload" accept=".xlsx, .xls"/>
<button type="button" class="btn btn-primary btn-sm" onclick="uploadFile('{{ get_calculator_url() }}/co2-fit/plot');">Upload</button>
<div id="upload-error" class="mb-0 mt-2 alert alert-danger" style="display: none" role="alert"></div>
<br>
<!-- Formatted excel data -->
<input id="CO2_data" type="text" name="CO2_data" form="not-submitted" class="form-control d-none" placeholder='{"times": [...], "concentrations": [...]}' value="{}"><br>
<div id="CO2_input_data_div" style="display: none"></div>
<input type="text" class="form-control d-none" name="CO2_fitting_result" id="CO2_fitting_result">
<div id="DIVCO2_fitting_to_submit" style="display: none">
<img id="CO2_data_plot"/><br>
<p id="suggestion_lines_txt" class="text-danger text-center">
The dashed lines are suggestions for the ventilation transition times<br>
(generated from the input data using the Pelt algorithm).</p>
<strong>Ventilation scheme:</strong>
<div>
<input class="ml-2" type="radio" id="fitting_natural_ventilation" name="fitting_ventilation_type" value='fitting_natural_ventilation' checked="checked" data-enables="#DIVfitting_natural_ventilation" form="not-submitted" >
<label for="fitting_natural_ventilation">Natural</label>
<input class="ml-2" type="radio" id="fitting_mechanical_ventilation" name="fitting_ventilation_type" value='fitting_mechanical_ventilation' form="not-submitted">
<label for="fitting_mechanical_ventilation">Mechanical</label>
</div>
<div id="DIVfitting_natural_ventilation" class="form-group mb-0" style="display: none">
<label for="fitting_ventilation_states">Please enter the ventilation state change times, separated by comma - e.g. [8.5, 10, 11.5, 17]. </label>
<div data-tooltip="Default values indicated below correspond to the dashed lines in the above plot - these are only suggestions and can be changed.">
<span class="tooltip_text">?</span>
</div>
<input type="text" class="form-control" id="fitting_ventilation_states" name="fitting_ventilation_states" placeholder="e.g. [8.5, 10, 11.5, 17]" form="not-submitted"><br>
</div>
</div>
<div id="DIVCO2_fitting_result" style="display: none">
<h5 class="mt-auto mb-auto">Fitting result</h5><br>
<p id="exhalation_rate_fit"></p>
<p>Ventilation values (ACH):</p><br>
<!-- table to display the ventilation result data -->
<table id="ventilation_rate_fit" class="w-75 mr-auto ml-auto" border="1"></table>
</div>
</div>
<div class="modal-footer">
<button type="button" id="disable_fitting_algorithm" class="btn btn-secondary dismiss_btn_frm_field" data-dismiss="modal" onclick="disableFittingAlgorithm()">Discard</button>
<button type="button" id="generate_fitting_data" class="btn btn-primary close_btn_frm_field" onclick="submitFittingAlgorithm('{{ get_calculator_url() }}/co2-fit/')" disabled>Fit data</button>
<button type="button" style="display: none" id="save_and_dismiss_dialog" class="btn btn-primary close_btn_frm_field" data-dismiss="modal">Use fit data</button>
</div>
</div>
</div>
</div>
<br/>
<div class='sub_title'>HEPA filtration:</div>
</div></br>
<div class='sub_title'>HEPA filtration:</div>
<div>
<input type="radio" id="hepa_yes" name="hepa_option" value=1 onclick="require_fields(this)" data-enables="#DIVhepa_amount">
<label for="hepa_yes" class="col-form-label ml-2">Yes</label>
@ -305,7 +392,8 @@
<div class="col-md-5"><label for="hepa_amount" class="col-form-label">Flow rate (m³ / hour):</label></div>
<div class="col-md-5"><input type="number" step="any" id="hepa_amount" class="non_zero form-control" name="hepa_amount" placeholder="Flow rate" min="0" data-has-radio="#hepa_yes"></div>
</div>
<hr width="80%">
<hr width="80%">
</div>
<b>Face masks:</b>
<div data-tooltip="Masks worn at workstations or removed when a 2m physical distance is respected and proper venting is ensured.">
@ -545,7 +633,7 @@
<!-- Lunch Options -->
<div class="form-check">
<input type="checkbox" id="infected_dont_have_breaks_with_exposed" class="tabbed form-check-input" name="infected_dont_have_breaks_with_exposed" value='1' onclick="toggle_split_breaks()">
<input type="checkbox" id="infected_dont_have_breaks_with_exposed" class="tabbed form-check-input" name="infected_dont_have_breaks_with_exposed" value="0" onclick="toggle_split_breaks()">
<label for="infected_dont_have_breaks_with_exposed" class="form-check-label col-sm-12">Input separate breaks for infected and exposed person(s)</label>
</div><br>
@ -658,11 +746,11 @@
</div>
<br style="clear:both;">
<i>Coffee breaks are spread evenly throughout the day.</i><br>
<i>Coffee breaks are spread evenly throughout the day.</i>
<br><br>
</div>
</div>
</section>
<div class="center">

View file

@ -528,6 +528,23 @@
<li><p class="data_text">HEPA amount: {{ form.hepa_amount }} m³ / hour</p></li>
</ul>
{% endif %}
<li><p class="data_text">From Fitting:
{% if form.ventilation_type == "from_fitting" %}
Yes
<table class="w-25 mt-3 ml-4" border="1">
<tr><th> Time (HH:MM)</th><th>ACH value (h⁻¹)</th></tr>
{% for ventilation in form.CO2_fitting_result['ventilation_values'] %}
{% set transition_time = form.CO2_fitting_result['transition_times'] %}
<tr>
<td>{{ transition_time[loop.index - 1] | hour_format }} - {{ transition_time[loop.index] | hour_format }}</td>
<td>{{ ventilation | float_format }} </td>
</tr>
{% endfor %}
</table>
{% else %}
No </p></li>
{% endif %}
</p></li>
</ul>
</div>
</div>
@ -578,6 +595,9 @@
{% elif form.activity_type == "gym" %}
Gym - For comparison only, all persons doing heavy physical exercise, breathing and not speaking.
{% endif %}
{% if form.ventilation_type == "from_fitting" %}
<p class="ml-5">Exhalation rate from fitting algorithm - {{form.CO2_fitting_result['exhalation_rate'] | round(2, 'floor')}} m³/h</p>
{% endif %}
</p></li>
{% if form.short_range_option == "short_range_yes" %}
<li><p class="data_text">

View file

@ -110,7 +110,10 @@
<script src="{{ get_url('/static/js') }}/ScrollMagic.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/twitter-bootstrap/4.5.3/js/bootstrap.min.js" integrity="sha512-8qmis31OQi6hIRgvkht0s6mCOittjMa9GMqtK9hes5iEQBQE/Ca6yGE5FsW36vyipGoWQswBj/QBm2JR086Rkw==" crossorigin="anonymous"></script>
<script src="{{ get_url('/static/js') }}/usage-tracking.js"></script>
<!-- SheetJS for Excel validation -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/xlsx/0.16.9/xlsx.full.min.js"></script>
{% block body_scripts %}
{% endblock body_scripts %}

View file

@ -38,6 +38,7 @@ import typing
import numpy as np
from scipy.interpolate import interp1d
import scipy.stats as sct
from scipy.optimize import minimize
if not typing.TYPE_CHECKING:
from memoization import cached
@ -440,6 +441,18 @@ class AirChange(Ventilation):
return self.air_exch
@dataclass(frozen=True)
class CustomVentilation(_VentilationBase):
# The ventilation value for a given time
ventilation_value: PiecewiseConstant
def transition_times(self, room: Room) -> typing.Set[float]:
return set(self.ventilation_value.transition_times)
def air_exchange(self, room: Room, time: float) -> _VectorisedFloat:
return self.ventilation_value.value(time)
@dataclass(frozen=True)
class Virus:
#: RNA copies / mL
@ -1076,7 +1089,6 @@ class _ConcentrationModelBase:
state_change_times = {0.}
state_change_times.update(self.population.presence_interval().transition_times())
state_change_times.update(self.ventilation.transition_times(self.room))
return sorted(state_change_times)
@method_cache
@ -1472,6 +1484,64 @@ class ShortRangeModel:
return normed_int_concentration_interpolated
@dataclass(frozen=True)
class CO2DataModel:
'''
The CO2DataModel class models CO2 data based on room volume, ventilation transition times, and people presence.
It uses optimization techniques to fit the model's parameters and estimate the exhalation rate and ventilation
values that best match the measured CO2 concentrations.
'''
room_volume: float
number: typing.Union[int, IntPiecewiseConstant]
presence: typing.Optional[Interval]
ventilation_transition_times: typing.Tuple[float, ...]
times: typing.Sequence[float]
CO2_concentrations: typing.Sequence[float]
def CO2_concentrations_from_params(self,
exhalation_rate: float,
ventilation_values: typing.Tuple[float, ...]) -> typing.List[_VectorisedFloat]:
CO2_concentrations = CO2ConcentrationModel(
room=Room(volume=self.room_volume),
ventilation=CustomVentilation(PiecewiseConstant(
self.ventilation_transition_times, ventilation_values)),
CO2_emitters=SimplePopulation(
number=self.number,
presence=self.presence,
activity=Activity(
exhalation_rate=exhalation_rate, inhalation_rate=exhalation_rate),
)
)
return [CO2_concentrations.concentration(time) for time in self.times]
def CO2_fit_params(self):
if len(self.times) != len(self.CO2_concentrations):
raise ValueError('times and CO2_concentrations must have same length.')
if len(self.times) < 2:
raise ValueError(
'times and CO2_concentrations must contain at last two elements')
def fun(x):
exhalation_rate = x[0]
ventilation_values = tuple(x[1:])
the_concentrations = self.CO2_concentrations_from_params(
exhalation_rate=exhalation_rate,
ventilation_values=ventilation_values
)
return np.sqrt(np.sum((np.array(self.CO2_concentrations) -
np.array(the_concentrations))**2))
# The goal is to minimize the difference between the two different curves (known concentrations vs. predicted concentrations)
res_dict = minimize(fun=fun, x0=np.ones(len(self.ventilation_transition_times)), method='powell',
bounds=[(0, None) for _ in range(len(self.ventilation_transition_times))],
options={'xtol': 1e-3})
exhalation_rate = res_dict['x'][0]
ventilation_values = res_dict['x'][1:]
predictive_CO2 = self.CO2_concentrations_from_params(exhalation_rate=exhalation_rate, ventilation_values=ventilation_values)
return {"exhalation_rate": exhalation_rate, "ventilation_values": list(ventilation_values), 'predictive_CO2': list(predictive_CO2)}
@dataclass(frozen=True)
class ExposureModel:
"""
@ -1511,7 +1581,7 @@ class ExposureModel:
c_model.ventilation.air_exchange(c_model.room, time)) for time in c_model.state_change_times()))):
raise ValueError("If the diameter is an array, none of the ventilation parameters "
"or virus decay constant can be arrays at the same time.")
@method_cache
def population_state_change_times(self) -> typing.List[float]:
"""

View file

@ -10,4 +10,4 @@ def baseline_form_data():
@pytest.fixture
def baseline_form(baseline_form_data):
return model_generator.FormData.from_dict(baseline_form_data)
return model_generator.VirusFormData.from_dict(baseline_form_data)

View file

@ -7,21 +7,22 @@ import pytest
from retry import retry
from caimira.apps.calculator import model_generator
from caimira.apps.calculator.model_generator import _hours2timestring
from caimira.apps.calculator.model_generator import minutes_since_midnight
from caimira.apps.calculator.form_data import (_hours2timestring, minutes_since_midnight,
_CAST_RULES_FORM_ARG_TO_NATIVE, _CAST_RULES_NATIVE_TO_FORM_ARG)
from caimira import models
from caimira.monte_carlo.data import expiration_distributions
from caimira.apps.calculator.defaults import NO_DEFAULT
def test_model_from_dict(baseline_form_data):
form = model_generator.FormData.from_dict(baseline_form_data)
form = model_generator.VirusFormData.from_dict(baseline_form_data)
assert isinstance(form.build_model(), models.ExposureModel)
def test_model_from_dict_invalid(baseline_form_data):
baseline_form_data['invalid_item'] = 'foobar'
with pytest.raises(ValueError, match='Invalid argument "invalid_item" given'):
model_generator.FormData.from_dict(baseline_form_data)
model_generator.VirusFormData.from_dict(baseline_form_data)
@retry(tries=10)
@ -44,7 +45,7 @@ def test_blend_expiration(mask_type):
npt.assert_allclose(r.aerosols(mask).mean(), expected, rtol=TOLERANCE)
def test_ventilation_slidingwindow(baseline_form: model_generator.FormData):
def test_ventilation_slidingwindow(baseline_form: model_generator.VirusFormData):
baseline_form.ventilation_type = 'natural_ventilation'
baseline_form.windows_duration = 10
baseline_form.windows_frequency = 120
@ -74,7 +75,7 @@ def test_ventilation_slidingwindow(baseline_form: model_generator.FormData):
assert ventilation == baseline_vent
def test_ventilation_hingedwindow(baseline_form: model_generator.FormData):
def test_ventilation_hingedwindow(baseline_form: model_generator.VirusFormData):
baseline_form.ventilation_type = 'natural_ventilation'
baseline_form.windows_duration = 10
baseline_form.windows_frequency = 120
@ -104,7 +105,7 @@ def test_ventilation_hingedwindow(baseline_form: model_generator.FormData):
assert ventilation == baseline_vent
def test_ventilation_mechanical(baseline_form: model_generator.FormData):
def test_ventilation_mechanical(baseline_form: model_generator.VirusFormData):
room = models.Room(volume=75, inside_temp=models.PiecewiseConstant((0, 24), (293,)))
mech = models.HVACMechanical(
active=models.PeriodicInterval(period=120, duration=120),
@ -119,7 +120,7 @@ def test_ventilation_mechanical(baseline_form: model_generator.FormData):
np.array([baseline_form.ventilation().air_exchange(room, t) for t in ts]))
def test_ventilation_airchanges(baseline_form: model_generator.FormData):
def test_ventilation_airchanges(baseline_form: model_generator.VirusFormData):
room = models.Room(75, inside_temp=models.PiecewiseConstant((0, 24), (293,)))
airchange = models.AirChange(
active=models.PeriodicInterval(period=120, duration=120),
@ -134,7 +135,7 @@ def test_ventilation_airchanges(baseline_form: model_generator.FormData):
np.array([baseline_form.ventilation().air_exchange(room, t) for t in ts]))
def test_ventilation_window_hepa(baseline_form: model_generator.FormData):
def test_ventilation_window_hepa(baseline_form: model_generator.VirusFormData):
baseline_form.ventilation_type = 'natural_ventilation'
baseline_form.windows_duration = 10
baseline_form.windows_frequency = 120
@ -171,13 +172,13 @@ def test_ventilation_window_hepa(baseline_form: model_generator.FormData):
@pytest.mark.parametrize(
["activity", "total_people", "infected_people", "error"],
[
['office', 10, 11, "Number of infected people cannot be more or equal than number of total people."],
['office', 10, 10, "Number of infected people cannot be more or equal than number of total people."],
['office', 10, 11, "Number of infected people cannot be greater or equal to the number of total people"],
['office', 10, 10, "Number of infected people cannot be greater or equal to the number of total people"],
['training', 10, 2, "Conference/Training activities are limited to 1 infected."],
]
)
def test_infected_less_than_total_people(activity, total_people, infected_people, error,
baseline_form: model_generator.FormData):
baseline_form: model_generator.VirusFormData):
baseline_form.activity_type = activity
baseline_form.total_people = total_people
baseline_form.infected_people = infected_people
@ -190,7 +191,7 @@ def present_times(interval: models.Interval) -> models.BoundarySequence_t:
return interval.present_times
def test_infected_present_intervals(baseline_form: model_generator.FormData):
def test_infected_present_intervals(baseline_form: model_generator.VirusFormData):
baseline_form.infected_dont_have_breaks_with_exposed = False
baseline_form.exposed_coffee_duration = 15
baseline_form.exposed_coffee_break_option = 'coffee_break_2'
@ -204,7 +205,7 @@ def test_infected_present_intervals(baseline_form: model_generator.FormData):
assert present_times(baseline_form.infected_present_interval()) == correct
def test_exposed_present_intervals(baseline_form: model_generator.FormData):
def test_exposed_present_intervals(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_duration = 15
baseline_form.exposed_coffee_break_option = 'coffee_break_2'
baseline_form.exposed_start = minutes_since_midnight(9 * 60)
@ -215,7 +216,7 @@ def test_exposed_present_intervals(baseline_form: model_generator.FormData):
assert present_times(baseline_form.exposed_present_interval()) == correct
def test_present_intervals_common_breaks(baseline_form: model_generator.FormData):
def test_present_intervals_common_breaks(baseline_form: model_generator.VirusFormData):
baseline_form.infected_dont_have_breaks_with_exposed = False
baseline_form.infected_coffee_duration = baseline_form.exposed_coffee_duration = 15
baseline_form.infected_coffee_break_option = baseline_form.exposed_coffee_break_option = 'coffee_break_2'
@ -231,7 +232,7 @@ def test_present_intervals_common_breaks(baseline_form: model_generator.FormData
assert present_times(baseline_form.infected_present_interval()) == correct_infected
def test_present_intervals_split_breaks(baseline_form: model_generator.FormData):
def test_present_intervals_split_breaks(baseline_form: model_generator.VirusFormData):
baseline_form.infected_dont_have_breaks_with_exposed = True
baseline_form.infected_coffee_duration = baseline_form.exposed_coffee_duration = 15
baseline_form.infected_coffee_break_option = baseline_form.exposed_coffee_break_option = 'coffee_break_2'
@ -247,7 +248,7 @@ def test_present_intervals_split_breaks(baseline_form: model_generator.FormData)
assert present_times(baseline_form.infected_present_interval()) == correct_infected
def test_exposed_present_intervals_starting_with_lunch(baseline_form: model_generator.FormData):
def test_exposed_present_intervals_starting_with_lunch(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_0'
baseline_form.exposed_start = baseline_form.exposed_lunch_start = minutes_since_midnight(13 * 60)
baseline_form.exposed_finish = minutes_since_midnight(18 * 60)
@ -256,7 +257,7 @@ def test_exposed_present_intervals_starting_with_lunch(baseline_form: model_gene
assert present_times(baseline_form.exposed_present_interval()) == correct
def test_exposed_present_intervals_ending_with_lunch(baseline_form: model_generator.FormData):
def test_exposed_present_intervals_ending_with_lunch(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_0'
baseline_form.exposed_start = minutes_since_midnight(11 * 60)
baseline_form.exposed_finish = baseline_form.exposed_lunch_start = minutes_since_midnight(13 * 60)
@ -265,7 +266,7 @@ def test_exposed_present_intervals_ending_with_lunch(baseline_form: model_genera
assert present_times(baseline_form.exposed_present_interval()) == correct
def test_exposed_present_lunch_end_before_beginning(baseline_form: model_generator.FormData):
def test_exposed_present_lunch_end_before_beginning(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_0'
baseline_form.exposed_lunch_start = minutes_since_midnight(14 * 60)
baseline_form.exposed_lunch_finish = minutes_since_midnight(13 * 60)
@ -282,7 +283,7 @@ def test_exposed_present_lunch_end_before_beginning(baseline_form: model_generat
[9, 20], # lunch_finish after the presence finishing
],
)
def test_exposed_presence_lunch_break(baseline_form: model_generator.FormData, exposed_lunch_start, exposed_lunch_finish):
def test_exposed_presence_lunch_break(baseline_form: model_generator.VirusFormData, exposed_lunch_start, exposed_lunch_finish):
baseline_form.exposed_lunch_start = minutes_since_midnight(exposed_lunch_start * 60)
baseline_form.exposed_lunch_finish = minutes_since_midnight(exposed_lunch_finish * 60)
with pytest.raises(ValueError, match='exposed lunch break must be within presence times.'):
@ -298,14 +299,14 @@ def test_exposed_presence_lunch_break(baseline_form: model_generator.FormData, e
[9, 20], # lunch_finish after the presence finishing
],
)
def test_infected_presence_lunch_break(baseline_form: model_generator.FormData, infected_lunch_start, infected_lunch_finish):
def test_infected_presence_lunch_break(baseline_form: model_generator.VirusFormData, infected_lunch_start, infected_lunch_finish):
baseline_form.infected_lunch_start = minutes_since_midnight(infected_lunch_start * 60)
baseline_form.infected_lunch_finish = minutes_since_midnight(infected_lunch_finish * 60)
with pytest.raises(ValueError, match='infected lunch break must be within presence times.'):
baseline_form.validate()
def test_exposed_breaks_length(baseline_form: model_generator.FormData):
def test_exposed_breaks_length(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_4'
baseline_form.exposed_coffee_duration = 30
baseline_form.exposed_start = minutes_since_midnight(10 * 60)
@ -315,7 +316,7 @@ def test_exposed_breaks_length(baseline_form: model_generator.FormData):
baseline_form.validate()
def test_infected_breaks_length(baseline_form: model_generator.FormData):
def test_infected_breaks_length(baseline_form: model_generator.VirusFormData):
baseline_form.infected_start = minutes_since_midnight(9 * 60)
baseline_form.infected_finish = minutes_since_midnight(12 * 60)
baseline_form.infected_lunch_start = minutes_since_midnight(10 * 60)
@ -327,7 +328,7 @@ def test_infected_breaks_length(baseline_form: model_generator.FormData):
@pytest.fixture
def coffee_break_between_1045_and_1115(baseline_form: model_generator.FormData):
def coffee_break_between_1045_and_1115(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_1'
baseline_form.exposed_coffee_duration = 30
baseline_form.exposed_start = minutes_since_midnight(10 * 60)
@ -385,7 +386,7 @@ def assert_boundaries(interval, boundaries_in_time_string_form):
@pytest.fixture
def breaks_every_25_mins_for_20_mins(baseline_form: model_generator.FormData):
def breaks_every_25_mins_for_20_mins(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_4'
baseline_form.exposed_coffee_duration = 20
baseline_form.exposed_start = time2mins("10:00")
@ -430,7 +431,7 @@ def test_present_only_during_second_break(breaks_every_25_mins_for_20_mins):
assert_boundaries(interval, [])
def test_valid_no_lunch(baseline_form: model_generator.FormData):
def test_valid_no_lunch(baseline_form: model_generator.VirusFormData):
# Check that it is valid to have a 0 length lunch if no lunch is selected.
baseline_form.exposed_lunch_option = False
baseline_form.exposed_lunch_start = minutes_since_midnight(0)
@ -438,7 +439,7 @@ def test_valid_no_lunch(baseline_form: model_generator.FormData):
assert baseline_form.validate() is None
def test_no_breaks(baseline_form: model_generator.FormData):
def test_no_breaks(baseline_form: model_generator.VirusFormData):
# Check that the times are correct in the absence of breaks.
baseline_form.infected_dont_have_breaks_with_exposed = False
baseline_form.exposed_lunch_option = False
@ -453,7 +454,7 @@ def test_no_breaks(baseline_form: model_generator.FormData):
assert present_times(baseline_form.infected_present_interval()) == infected_correct
def test_coffee_lunch_breaks(baseline_form: model_generator.FormData):
def test_coffee_lunch_breaks(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_duration = 30
baseline_form.exposed_coffee_break_option = 'coffee_break_4'
baseline_form.exposed_start = minutes_since_midnight(9 * 60)
@ -465,7 +466,7 @@ def test_coffee_lunch_breaks(baseline_form: model_generator.FormData):
np.testing.assert_allclose(present_times(baseline_form.exposed_present_interval()), correct, rtol=1e-14)
def test_coffee_lunch_breaks_unbalance(baseline_form: model_generator.FormData):
def test_coffee_lunch_breaks_unbalance(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_duration = 30
baseline_form.exposed_coffee_break_option = 'coffee_break_2'
baseline_form.exposed_start = minutes_since_midnight(9 * 60)
@ -476,7 +477,7 @@ def test_coffee_lunch_breaks_unbalance(baseline_form: model_generator.FormData):
np.testing.assert_allclose(present_times(baseline_form.exposed_present_interval()), correct, rtol=1e-14)
def test_coffee_breaks(baseline_form: model_generator.FormData):
def test_coffee_breaks(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_duration = 10
baseline_form.exposed_coffee_break_option = 'coffee_break_4'
baseline_form.exposed_start = minutes_since_midnight(9 * 60)
@ -489,24 +490,24 @@ def test_coffee_breaks(baseline_form: model_generator.FormData):
def test_key_validation(baseline_form_data):
baseline_form_data['activity_type'] = 'invalid key'
with pytest.raises(ValueError):
model_generator.FormData.from_dict(baseline_form_data)
model_generator.VirusFormData.from_dict(baseline_form_data)
def test_key_validation_natural_ventilation_window_type_na(baseline_form_data):
baseline_form_data['ventilation_type'] = 'natural_ventilation'
baseline_form_data['window_type'] = 'not-applicable'
with pytest.raises(ValueError, match='window_type cannot be \'not-applicable\''):
model_generator.FormData.from_dict(baseline_form_data)
model_generator.VirusFormData.from_dict(baseline_form_data)
def test_key_validation_natural_ventilation_window_opening_regime_na(baseline_form_data):
baseline_form_data['ventilation_type'] = 'natural_ventilation'
baseline_form_data['window_opening_regime'] = 'not-applicable'
with pytest.raises(ValueError, match='window_opening_regime cannot be \'not-applicable\''):
model_generator.FormData.from_dict(baseline_form_data)
model_generator.VirusFormData.from_dict(baseline_form_data)
def test_natural_ventilation_window_opening_periodically(baseline_form: model_generator.FormData):
def test_natural_ventilation_window_opening_periodically(baseline_form: model_generator.VirusFormData):
baseline_form.window_opening_regime = 'windows_open_periodically'
baseline_form.windows_duration = 20
baseline_form.windows_frequency = 10
@ -518,20 +519,20 @@ def test_key_validation_mech_ventilation_type_na(baseline_form_data):
baseline_form_data['ventilation_type'] = 'mechanical_ventilation'
baseline_form_data['mechanical_ventilation_type'] = 'not-applicable'
with pytest.raises(ValueError, match='mechanical_ventilation_type cannot be \'not-applicable\''):
model_generator.FormData.from_dict(baseline_form_data)
model_generator.VirusFormData.from_dict(baseline_form_data)
def test_key_validation_event_month(baseline_form_data):
baseline_form_data['event_month'] = 'invalid month'
with pytest.raises(ValueError, match='invalid month is not a valid value for event_month'):
model_generator.FormData.from_dict(baseline_form_data)
model_generator.VirusFormData.from_dict(baseline_form_data)
def test_default_types():
# Validate that FormData._DEFAULTS are complete and of the correct type.
# Validate that VirusFormData._DEFAULTS are complete and of the correct type.
# Validate that we have the right types and matching attributes to the DEFAULTS.
fields = {field.name: field for field in dataclasses.fields(model_generator.FormData)}
for field, value in model_generator.FormData._DEFAULTS.items():
fields = {field.name: field for field in dataclasses.fields(model_generator.VirusFormData)}
for field, value in model_generator.VirusFormData._DEFAULTS.items():
if field not in fields:
raise ValueError(f"Unmatched default {field}")
@ -540,17 +541,17 @@ def test_default_types():
# Handle typing.NewType definitions.
field_type = field_type.__supertype__
if value is model_generator.NO_DEFAULT:
if value is NO_DEFAULT:
continue
if field in model_generator._CAST_RULES_FORM_ARG_TO_NATIVE:
value = model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[field](value)
if field in _CAST_RULES_FORM_ARG_TO_NATIVE:
value = _CAST_RULES_FORM_ARG_TO_NATIVE[field](value)
if not isinstance(value, field_type):
raise TypeError(f'{field} has type {field_type}, got {type(value)}')
for field in fields.values():
assert field.name in model_generator.FormData._DEFAULTS, f"No default set for field name {field.name}"
assert field.name in model_generator.VirusFormData._DEFAULTS, f"No default set for field name {field.name}"
def test_form_to_dict(baseline_form):
@ -559,7 +560,7 @@ def test_form_to_dict(baseline_form):
assert 1 < len(stripped) < len(full)
assert 'exposed_coffee_break_option' in stripped
# If we set the value to the default one, it should no longer turn up in the dictionary.
baseline_form.exposed_coffee_break_option = model_generator.FormData._DEFAULTS['exposed_coffee_break_option']
baseline_form.exposed_coffee_break_option = model_generator.VirusFormData._DEFAULTS['exposed_coffee_break_option']
assert 'exposed_coffee_break_option' not in baseline_form.to_dict(baseline_form, strip_defaults=True)
@ -577,7 +578,7 @@ def test_form_timezone(baseline_form_data, longitude, latitude, month, expected_
baseline_form_data['location_latitude'] = latitude
baseline_form_data['location_longitude'] = longitude
baseline_form_data['event_month'] = month
form = model_generator.FormData.from_dict(baseline_form_data)
form = model_generator.VirusFormData.from_dict(baseline_form_data)
name, offset = form.tz_name_and_utc_offset()
assert name == expected_tz_name
assert offset == expected_offset

View file

@ -13,7 +13,7 @@ from caimira.apps.calculator import model_generator
[{"exposed_breaks": [], "ifected_breaks": []}, 'Unable to fetch "infected_breaks" key. Got "ifected_breaks".'],
]
)
def test_specific_break_structure(break_input, error, baseline_form: model_generator.FormData):
def test_specific_break_structure(break_input, error, baseline_form: model_generator.VirusFormData):
baseline_form.specific_breaks = break_input
with pytest.raises(TypeError, match=error):
baseline_form.validate()
@ -30,7 +30,7 @@ def test_specific_break_structure(break_input, error, baseline_form: model_gener
[[{"start_time": "10:00", "finish_time": "11"}], 'Wrong time format - "HH:MM". Got "11".'],
]
)
def test_specific_population_break_data_structure(population_break_input, error, baseline_form: model_generator.FormData):
def test_specific_population_break_data_structure(population_break_input, error, baseline_form: model_generator.VirusFormData):
baseline_form.specific_breaks = {'exposed_breaks': population_break_input, 'infected_breaks': population_break_input}
with pytest.raises(TypeError, match=error):
baseline_form.validate()
@ -45,7 +45,7 @@ def test_specific_population_break_data_structure(population_break_input, error,
[[{"start_time": "08:00", "finish_time": "11:00"}, {"start_time": "14:00", "finish_time": "15:00"}, ], "All breaks should be within the simulation time. Got 08:00."],
]
)
def test_specific_break_time(break_input, error, baseline_form: model_generator.FormData):
def test_specific_break_time(break_input, error, baseline_form: model_generator.VirusFormData):
with pytest.raises(ValueError, match=error):
baseline_form.generate_specific_break_times(break_input)
@ -63,7 +63,7 @@ def test_specific_break_time(break_input, error, baseline_form: model_generator.
[{"physical_activity": "Light activity", "respiratory_activity": [{"type": "Breathing", "percentag": 50}, {"type": "Speaking", "percentage": 50}]}, 'Unable to fetch "percentage" key. Got "percentag".'],
]
)
def test_precise_activity_structure(precise_activity_input, error, baseline_form: model_generator.FormData):
def test_precise_activity_structure(precise_activity_input, error, baseline_form: model_generator.VirusFormData):
baseline_form.precise_activity = precise_activity_input
with pytest.raises(TypeError, match=error):
baseline_form.validate()
@ -78,7 +78,7 @@ def test_precise_activity_structure(precise_activity_input, error, baseline_form
[{"physical_activity": "Light activity", "respiratory_activity": [{"type": "Breathing", "percentage": 50}]}, 'The sum of all respiratory activities should be 100. Got 50.'],
]
)
def test_sum_precise_activity(precise_activity_input, error, baseline_form: model_generator.FormData):
def test_sum_precise_activity(precise_activity_input, error, baseline_form: model_generator.VirusFormData):
baseline_form.precise_activity = precise_activity_input
with pytest.raises(ValueError, match=error):
baseline_form.validate()

View file

@ -0,0 +1,57 @@
import numpy as np
import numpy.testing as npt
import pytest
from caimira import models
@pytest.mark.parametrize(
"activity_type, ventilation_active, air_exch", [
['Seated', [8, 12, 13, 17], [0.25, 2.45, 0.25]],
['Standing', [8, 10, 11, 12, 17], [1.25, 3.25, 1.45, 0.25]],
['Light activity', [8, 12, 17], [1.25, 0.25]],
['Moderate activity', [8, 13, 15, 16, 17], [2.25, 0.25, 3.45, 0.25]],
['Heavy exercise', [8, 17], [0.25]],
['Seated', [8, 17], [0.25]],
['Standing', [8, 17], [2.45]],
]
)
def test_fitting_algorithm(activity_type, ventilation_active, air_exch):
conc_model = models.CO2ConcentrationModel(
room=models.Room(
volume=75, inside_temp=models.PiecewiseConstant((0., 24.), (293,))),
ventilation=models.CustomVentilation(models.PiecewiseConstant(
tuple(ventilation_active), tuple(air_exch))),
CO2_emitters=models.SimplePopulation(
number=models.IntPiecewiseConstant(transition_times=tuple(
[8, 12, 13, 17]), values=tuple([2, 1, 2])),
presence=None,
activity=models.Activity.types[activity_type]
),
)
times = np.linspace(8, 17, 100)
CO2_concentrations = [
conc_model.concentration(float(time))
for time in times
]
# Generate CO2DataModel
data_model = models.CO2DataModel(
room_volume=75,
number=models.IntPiecewiseConstant(transition_times=tuple(
[8, 12, 13, 17]), values=tuple([2, 1, 2])),
presence=None,
ventilation_transition_times=tuple(ventilation_active),
times=times,
CO2_concentrations=CO2_concentrations
)
fit_parameters = data_model.CO2_fit_params()
exhalation_rate = fit_parameters['exhalation_rate']
npt.assert_almost_equal(
exhalation_rate, conc_model.CO2_emitters.activity.exhalation_rate, decimal=2)
ventilation_values = fit_parameters['ventilation_values']
npt.assert_allclose(ventilation_values, air_exch, rtol=1e-2)

View file

@ -85,6 +85,7 @@ pytz==2022.7.1
pyzmq==25.0.0
requests==2.28.2
retry==0.9.2
ruptures==1.1.8
scikit-learn==1.2.1
scipy==1.10.1
Send2Trash==1.8.0

View file

@ -34,3 +34,6 @@ ignore_missing_imports = True
[mypy-tabulate.*]
ignore_missing_imports = True
[mypy-ruptures.*]
ignore_missing_imports = True

View file

@ -33,6 +33,7 @@ REQUIREMENTS: dict = {
'pyjwt',
'python-dateutil',
'retry',
'ruptures',
'scipy',
'scikit-learn',
'timezonefinder',