add superclass FormData for CO2FormData and VirusFormData

This commit is contained in:
Luis Aleixo 2023-11-17 11:22:41 +01:00
parent ac2e8b799d
commit 3da67c8c95
8 changed files with 541 additions and 542 deletions

View file

@ -106,7 +106,7 @@ class ConcentrationModel(BaseRequestHandler):
start = datetime.datetime.now()
try:
form = model_generator.FormData.from_dict(requested_model_config)
form = model_generator.VirusFormData.from_dict(requested_model_config)
except Exception as err:
if self.settings.get("debug", False):
import traceback
@ -157,7 +157,7 @@ class ConcentrationModelJsonResponse(BaseRequestHandler):
pprint(requested_model_config)
try:
form = model_generator.FormData.from_dict(requested_model_config)
form = model_generator.VirusFormData.from_dict(requested_model_config)
except Exception as err:
if self.settings.get("debug", False):
import traceback
@ -178,7 +178,7 @@ class ConcentrationModelJsonResponse(BaseRequestHandler):
class StaticModel(BaseRequestHandler):
async def get(self) -> None:
form = model_generator.FormData.from_dict(model_generator.baseline_raw_form_data())
form = model_generator.VirusFormData.from_dict(model_generator.baseline_raw_form_data())
base_url = self.request.protocol + "://" + self.request.host
report_generator: ReportGenerator = self.settings['report_generator']
executor = loky.get_reusable_executor(max_workers=self.settings['handler_worker_pool_size'])

View file

@ -1,13 +1,13 @@
import dataclasses
import html
import logging
import typing
import numpy as np
import ruptures as rpt
import matplotlib.pyplot as plt
import re
from caimira import models
from . import model_generator
from .form_data import FormData, cast_class_fields
from .defaults import DEFAULT_MC_SAMPLE_SIZE, NO_DEFAULT
from .report_generator import img2base64, _figure2bytes
@ -17,29 +17,10 @@ LOG = logging.getLogger(__name__)
@dataclasses.dataclass
class CO2FormData(model_generator.FormData):
class CO2FormData(FormData):
CO2_data: dict
exposed_coffee_break_option: str
exposed_coffee_duration: int
exposed_finish: model_generator.minutes_since_midnight
exposed_lunch_finish: model_generator.minutes_since_midnight
exposed_lunch_option: bool
exposed_lunch_start: model_generator.minutes_since_midnight
exposed_start: model_generator.minutes_since_midnight
fitting_ventilation_states: list
fitting_ventilation_type: str
infected_coffee_break_option: str
infected_coffee_duration: int
infected_dont_have_breaks_with_exposed: bool
infected_finish: model_generator.minutes_since_midnight
infected_lunch_finish: model_generator.minutes_since_midnight
infected_lunch_option: bool
infected_lunch_start: model_generator.minutes_since_midnight
infected_people: int
infected_start: model_generator.minutes_since_midnight
room_volume: float
specific_breaks: dict
total_people: int
#: The default values for undefined fields. Note that the defaults here
#: and the defaults in the html form must not be contradictory.
@ -73,33 +54,37 @@ class CO2FormData(model_generator.FormData):
for key, value in self._DEFAULTS.items():
setattr(self, key, kwargs.get(key, value))
@classmethod
def from_dict(self, form_data: typing.Dict) -> "CO2FormData":
# Take a copy of the form data so that we can mutate it.
form_data = form_data.copy()
form_data.pop('_xsrf', None)
def validate(self):
# Validate population parameters
self.validate_population_parameters()
# Don't let arbitrary unescaped HTML through the net.
for key, value in form_data.items():
if isinstance(value, str):
form_data[key] = html.escape(value)
# Validate specific inputs - breaks (exposed and infected)
if self.specific_breaks != {}:
if type(self.specific_breaks) is not dict:
raise TypeError('The specific breaks should be in a dictionary.')
dict_keys = list(self.specific_breaks.keys())
if "exposed_breaks" not in dict_keys:
raise TypeError(f'Unable to fetch "exposed_breaks" key. Got "{dict_keys[0]}".')
if "infected_breaks" not in dict_keys:
raise TypeError(f'Unable to fetch "infected_breaks" key. Got "{dict_keys[1]}".')
for key, default_value in self._DEFAULTS.items():
if form_data.get(key, '') == '':
if default_value is NO_DEFAULT:
raise ValueError(f"{key} must be specified")
form_data[key] = default_value
for key, value in form_data.items():
if key in model_generator._CAST_RULES_FORM_ARG_TO_NATIVE:
form_data[key] = model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[key](value)
if key not in self._DEFAULTS:
raise ValueError(f'Invalid argument "{html.escape(key)}" given')
instance = self(**form_data)
instance.validate_population_parameters()
return instance
for population_breaks in ['exposed_breaks', 'infected_breaks']:
if self.specific_breaks[population_breaks] != []:
if type(self.specific_breaks[population_breaks]) is not list:
raise TypeError(f'All breaks should be in a list. Got {type(self.specific_breaks[population_breaks])}.')
for input_break in self.specific_breaks[population_breaks]:
# Input validations.
if type(input_break) is not dict:
raise TypeError(f'Each break should be a dictionary. Got {type(input_break)}.')
dict_keys = list(input_break.keys())
if "start_time" not in input_break:
raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys[0]}".')
if "finish_time" not in input_break:
raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys[1]}".')
for time in input_break.values():
if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time):
raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".')
@classmethod
def find_change_points_with_pelt(self, CO2_data: dict):
@ -196,23 +181,6 @@ class CO2FormData(model_generator.FormData):
ventilation_transition_times=self.ventilation_transition_times(),
times=self.CO2_data['times'],
CO2_concentrations=self.CO2_data['CO2'],
)
for _field in dataclasses.fields(CO2FormData):
if _field.type is minutes_since_midnight:
model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = model_generator.time_string_to_minutes
model_generator._CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = model_generator.time_minutes_to_string
elif _field.type is int:
model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = model_generator._safe_int_cast
elif _field.type is float:
model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = float
elif _field.type is bool:
model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = lambda v: v == '1'
model_generator._CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = int
elif _field.type is list:
model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = model_generator.string_to_list
model_generator._CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = model_generator.list_to_string
elif _field.type is dict:
model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = model_generator.string_to_dict
model_generator._CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = model_generator.dict_to_string
)
cast_class_fields(CO2FormData)

View file

@ -0,0 +1,440 @@
import dataclasses
import datetime
import html
import logging
import typing
import ast
import json
import numpy as np
from caimira import models
from .defaults import DEFAULTS, NO_DEFAULT, COFFEE_OPTIONS_INT, DEFAULT_MC_SAMPLE_SIZE
LOG = logging.getLogger(__name__)
minutes_since_midnight = typing.NewType('minutes_since_midnight', int)
@dataclasses.dataclass
class FormData:
specific_breaks: dict
exposed_coffee_break_option: str
exposed_coffee_duration: int
exposed_finish: minutes_since_midnight
exposed_lunch_finish: minutes_since_midnight
exposed_lunch_option: bool
exposed_lunch_start: minutes_since_midnight
exposed_start: minutes_since_midnight
infected_coffee_break_option: str #Used if infected_dont_have_breaks_with_exposed
infected_coffee_duration: int #Used if infected_dont_have_breaks_with_exposed
infected_dont_have_breaks_with_exposed: bool
infected_finish: minutes_since_midnight
infected_lunch_finish: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
infected_lunch_option: bool #Used if infected_dont_have_breaks_with_exposed
infected_lunch_start: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
infected_people: int
infected_start: minutes_since_midnight
room_volume: float
total_people: int
_DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS
@classmethod
def from_dict(cls, form_data: typing.Dict):
# Take a copy of the form data so that we can mutate it.
form_data = form_data.copy()
form_data.pop('_xsrf', None)
# Don't let arbitrary unescaped HTML through the net.
for key, value in form_data.items():
if isinstance(value, str):
form_data[key] = html.escape(value)
for key, default_value in cls._DEFAULTS.items():
if form_data.get(key, '') == '':
if default_value is NO_DEFAULT:
raise ValueError(f"{key} must be specified")
form_data[key] = default_value
for key, value in form_data.items():
if key in _CAST_RULES_FORM_ARG_TO_NATIVE:
form_data[key] = _CAST_RULES_FORM_ARG_TO_NATIVE[key](value)
if key not in cls._DEFAULTS:
raise ValueError(f'Invalid argument "{html.escape(key)}" given')
instance = cls(**form_data)
instance.validate()
return instance
@classmethod
def to_dict(cls, form: "FormData", strip_defaults: bool = False) -> dict:
form_dict = {
field.name: getattr(form, field.name)
for field in dataclasses.fields(form)
}
for attr, value in form_dict.items():
if attr in _CAST_RULES_NATIVE_TO_FORM_ARG:
form_dict[attr] = _CAST_RULES_NATIVE_TO_FORM_ARG[attr](value)
if strip_defaults:
del form_dict['calculator_version']
for attr, value in list(form_dict.items()):
default = cls._DEFAULTS.get(attr, NO_DEFAULT)
if default is not NO_DEFAULT and value in [default, 'not-applicable']:
form_dict.pop(attr)
return form_dict
def validate_population_parameters(self):
# Validate number of infected <= number of total people
if self.infected_people >= self.total_people:
raise ValueError('Number of infected people cannot be greater or equal to the number of total people.')
# Validate time intervals selected by user
time_intervals = [
['exposed_start', 'exposed_finish'],
['infected_start', 'infected_finish'],
]
if self.exposed_lunch_option:
time_intervals.append(['exposed_lunch_start', 'exposed_lunch_finish'])
if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option:
time_intervals.append(['infected_lunch_start', 'infected_lunch_finish'])
for start_name, end_name in time_intervals:
start = getattr(self, start_name)
end = getattr(self, end_name)
if start > end:
raise ValueError(
f"{start_name} must be less than {end_name}. Got {start} and {end}.")
def validate_lunch(start, finish):
lunch_start = getattr(self, f'{population}_lunch_start')
lunch_finish = getattr(self, f'{population}_lunch_finish')
return (start <= lunch_start <= finish and
start <= lunch_finish <= finish)
def get_lunch_mins(population):
lunch_mins = 0
if getattr(self, f'{population}_lunch_option'):
lunch_mins = getattr(self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start')
return lunch_mins
def get_coffee_mins(population):
coffee_mins = 0
if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0':
coffee_mins = COFFEE_OPTIONS_INT[getattr(self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration')
return coffee_mins
def get_activity_mins(population):
return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start')
populations = ['exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed']
for population in populations:
# Validate lunch time within the activity times.
if (getattr(self, f'{population}_lunch_option') and
not validate_lunch(getattr(self, f'{population}_start'), getattr(self, f'{population}_finish'))
):
raise ValueError(
f"{population} lunch break must be within presence times."
)
# Length of breaks < length of activity
if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population):
raise ValueError(
f"Length of breaks >= Length of {population} presence."
)
for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT),
('infected_coffee_break_option', COFFEE_OPTIONS_INT)]:
if getattr(self, attr_name) not in valid_set:
raise ValueError(f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
def validate(self):
raise NotImplementedError("Subclass must implement")
def build_model(self, sample_size=DEFAULT_MC_SAMPLE_SIZE):
raise NotImplementedError("Subclass must implement")
def _compute_breaks_in_interval(self, start, finish, n_breaks, duration) -> models.BoundarySequence_t:
break_delay = ((finish - start) - (n_breaks * duration)) // (n_breaks+1)
break_times = []
end = start
for n in range(n_breaks):
begin = end + break_delay
end = begin + duration
break_times.append((begin, end))
return tuple(break_times)
def exposed_lunch_break_times(self) -> models.BoundarySequence_t:
result = []
if self.exposed_lunch_option:
result.append((self.exposed_lunch_start, self.exposed_lunch_finish))
return tuple(result)
def infected_lunch_break_times(self) -> models.BoundarySequence_t:
if self.infected_dont_have_breaks_with_exposed:
result = []
if self.infected_lunch_option:
result.append((self.infected_lunch_start, self.infected_lunch_finish))
return tuple(result)
else:
return self.exposed_lunch_break_times()
def exposed_number_of_coffee_breaks(self) -> int:
return COFFEE_OPTIONS_INT[self.exposed_coffee_break_option]
def infected_number_of_coffee_breaks(self) -> int:
return COFFEE_OPTIONS_INT[self.infected_coffee_break_option]
def _coffee_break_times(self, activity_start, activity_finish, coffee_breaks, coffee_duration, lunch_start, lunch_finish) -> models.BoundarySequence_t:
time_before_lunch = lunch_start - activity_start
time_after_lunch = activity_finish - lunch_finish
before_lunch_frac = time_before_lunch / (time_before_lunch + time_after_lunch)
n_morning_breaks = round(coffee_breaks * before_lunch_frac)
breaks = (
self._compute_breaks_in_interval(
activity_start, lunch_start, n_morning_breaks, coffee_duration
)
+ self._compute_breaks_in_interval(
lunch_finish, activity_finish, coffee_breaks - n_morning_breaks, coffee_duration
)
)
return breaks
def exposed_coffee_break_times(self) -> models.BoundarySequence_t:
exposed_coffee_breaks = self.exposed_number_of_coffee_breaks()
if exposed_coffee_breaks == 0:
return ()
if self.exposed_lunch_option:
breaks = self._coffee_break_times(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration, self.exposed_lunch_start, self.exposed_lunch_finish)
else:
breaks = self._compute_breaks_in_interval(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration)
return breaks
def infected_coffee_break_times(self) -> models.BoundarySequence_t:
if self.infected_dont_have_breaks_with_exposed:
infected_coffee_breaks = self.infected_number_of_coffee_breaks()
if infected_coffee_breaks == 0:
return ()
if self.infected_lunch_option:
breaks = self._coffee_break_times(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration, self.infected_lunch_start, self.infected_lunch_finish)
else:
breaks = self._compute_breaks_in_interval(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration)
return breaks
else:
return self.exposed_coffee_break_times()
def generate_specific_break_times(self, population_breaks) -> models.BoundarySequence_t:
break_times = []
for n in population_breaks:
# Parse break times.
begin = time_string_to_minutes(n["start_time"])
end = time_string_to_minutes(n["finish_time"])
for time in [begin, end]:
# For a specific break, the infected and exposed presence is the same.
if not getattr(self, 'infected_start') < time < getattr(self, 'infected_finish'):
raise ValueError(f'All breaks should be within the simulation time. Got {time_minutes_to_string(time)}.')
break_times.append((begin, end))
return tuple(break_times)
def present_interval(
self,
start: int,
finish: int,
breaks: typing.Optional[models.BoundarySequence_t] = None,
) -> models.Interval:
"""
Calculate the presence interval given the start and end times (in minutes), and
a number of monotonic, non-overlapping, but potentially unsorted, breaks (also in minutes).
"""
if not breaks:
# If there are no breaks, the interval is the start and end.
return models.SpecificInterval(((start/60, finish/60),))
# Order the breaks by their start-time, and ensure that they are monotonic
# and that the start of one break happens after the end of another.
break_boundaries: models.BoundarySequence_t = tuple(sorted(breaks, key=lambda break_pair: break_pair[0]))
for break_start, break_end in break_boundaries:
if break_start >= break_end:
raise ValueError("Break ends before it begins.")
prev_break_end = break_boundaries[0][1]
for break_start, break_end in break_boundaries[1:]:
if prev_break_end >= break_start:
raise ValueError(f"A break starts before another ends ({break_start}, {break_end}, {prev_break_end}).")
prev_break_end = break_end
present_intervals = []
current_time = start
LOG.debug(f"starting time march at {_hours2timestring(current_time/60)} to {_hours2timestring(finish/60)}")
# As we step through the breaks. For each break there are 6 important cases
# we must cover. Let S=start; E=end; Bs=Break start; Be=Break end:
# 1. The interval is entirely before the break. S < E <= Bs < Be
# 2. The interval straddles the start of the break. S < Bs < E <= Be
# 3. The break is entirely inside the interval. S < Bs < Be <= E
# 4. The interval is entirely inside the break. Bs <= S < E <= Be
# 5. The interval straddles the end of the break. Bs <= S < Be <= E
# 6. The interval is entirely after the break. Bs < Be <= S < E
for current_break in break_boundaries:
if current_time >= finish:
break
LOG.debug(f"handling break {_hours2timestring(current_break[0]/60)}-{_hours2timestring(current_break[1]/60)} "
f" (current time: {_hours2timestring(current_time/60)})")
break_s, break_e = current_break
case1 = finish <= break_s
case2 = current_time < break_s < finish < break_e
case3 = current_time < break_s < break_e <= finish
case4 = break_s <= current_time < finish <= break_e
case5 = break_s <= current_time < break_e < finish
case6 = break_e <= current_time
if case1:
LOG.debug(f"case 1: interval entirely before break")
present_intervals.append((current_time / 60, finish / 60))
LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
f"- {_hours2timestring(present_intervals[-1][1])}")
current_time = finish
elif case2:
LOG.debug(f"case 2: interval straddles start of break")
present_intervals.append((current_time / 60, break_s / 60))
LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
f"- {_hours2timestring(present_intervals[-1][1])}")
current_time = break_e
elif case3:
LOG.debug(f"case 3: break entirely inside interval")
# We add the bit before the break, but not the bit afterwards,
# as it may hit another break.
present_intervals.append((current_time / 60, break_s / 60))
LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
f"- {_hours2timestring(present_intervals[-1][1])}")
current_time = break_e
elif case4:
LOG.debug(f"case 4: interval entirely inside break")
current_time = finish
elif case5:
LOG.debug(f"case 5: interval straddles end of break")
current_time = break_e
elif case6:
LOG.debug(f"case 6: interval entirely after the break")
if current_time < finish:
LOG.debug("trailing interval")
present_intervals.append((current_time / 60, finish / 60))
return models.SpecificInterval(tuple(present_intervals))
def infected_present_interval(self) -> models.Interval:
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(self.specific_breaks['infected_breaks'])
else:
breaks = self.infected_lunch_break_times() + self.infected_coffee_break_times()
return self.present_interval(
self.infected_start, self.infected_finish,
breaks=breaks,
)
def population_present_interval(self) -> models.Interval:
state_change_times = set(self.infected_present_interval().transition_times())
state_change_times.update(self.exposed_present_interval().transition_times())
all_state_changes = sorted(state_change_times)
return models.SpecificInterval(tuple(zip(all_state_changes[:-1], all_state_changes[1:])))
def exposed_present_interval(self) -> models.Interval:
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(self.specific_breaks['exposed_breaks'])
else:
breaks = self.exposed_lunch_break_times() + self.exposed_coffee_break_times()
return self.present_interval(
self.exposed_start, self.exposed_finish,
breaks=breaks,
)
def _hours2timestring(hours: float):
# Convert times like 14.5 to strings, like "14:30"
return f"{int(np.floor(hours)):02d}:{int(np.round((hours % 1) * 60)):02d}"
def time_string_to_minutes(time: str) -> minutes_since_midnight:
"""
Converts time from string-format to an integer number of minutes after 00:00
:param time: A string of the form "HH:MM" representing a time of day
:return: The number of minutes between 'time' and 00:00
"""
return minutes_since_midnight(60 * int(time[:2]) + int(time[3:]))
def time_minutes_to_string(time: int) -> str:
"""
Converts time from an integer number of minutes after 00:00 to string-format
:param time: The number of minutes between 'time' and 00:00
:return: A string of the form "HH:MM" representing a time of day
"""
return "{0:0=2d}".format(int(time/60)) + ":" + "{0:0=2d}".format(time%60)
def string_to_list(s: str) -> list:
return list(ast.literal_eval(s.replace("&quot;", "\"")))
def list_to_string(l: list) -> str:
return json.dumps(l)
def string_to_dict(s: str) -> dict:
return dict(ast.literal_eval(s.replace("&quot;", "\"")))
def dict_to_string(d: dict) -> str:
return json.dumps(d)
def _safe_int_cast(value) -> int:
if isinstance(value, int):
return value
elif isinstance(value, float) and int(value) == value:
return int(value)
elif isinstance(value, str) and value.isdecimal():
return int(value)
else:
raise TypeError(f"Unable to safely cast {value} ({type(value)} type) to int")
#: Mapping of field name to a callable which can convert values from form
#: input (URL encoded arguments / string) into the correct type.
_CAST_RULES_FORM_ARG_TO_NATIVE: typing.Dict[str, typing.Callable] = {}
#: Mapping of field name to callable which can convert native type to values
#: that can be encoded to URL arguments.
_CAST_RULES_NATIVE_TO_FORM_ARG: typing.Dict[str, typing.Callable] = {}
def cast_class_fields(cls):
for _field in dataclasses.fields(cls):
if _field.type is minutes_since_midnight:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = time_string_to_minutes
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = time_minutes_to_string
elif _field.type is int:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = _safe_int_cast
elif _field.type is float:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = float
elif _field.type is bool:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = lambda v: v == '1'
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = int
elif _field.type is list:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_list
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = list_to_string
elif _field.type is dict:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_dict
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = dict_to_string
cast_class_fields(FormData)

View file

@ -1,10 +1,7 @@
import dataclasses
import datetime
import html
import logging
import typing
import ast
import json
import re
import numpy as np
@ -14,9 +11,10 @@ from caimira import data
import caimira.data.weather
import caimira.monte_carlo as mc
from .. import calculator
from .form_data import FormData, cast_class_fields, time_string_to_minutes
from caimira.monte_carlo.data import activity_distributions, virus_distributions, mask_distributions, short_range_distances
from caimira.monte_carlo.data import expiration_distribution, expiration_BLO_factors, expiration_distributions, short_range_expiration_distributions
from .defaults import (NO_DEFAULT, DEFAULT_MC_SAMPLE_SIZE, DEFAULTS, ACTIVITIES, ACTIVITY_TYPES, COFFEE_OPTIONS_INT, CONFIDENCE_LEVEL_OPTIONS,
from .defaults import (DEFAULT_MC_SAMPLE_SIZE, DEFAULTS, ACTIVITIES, ACTIVITY_TYPES, CONFIDENCE_LEVEL_OPTIONS,
MECHANICAL_VENTILATION_TYPES, MASK_TYPES, MASK_WEARING_OPTIONS, MONTH_NAMES, VACCINE_BOOSTER_TYPE, VACCINE_TYPE,
VENTILATION_TYPES, VIRUS_TYPES, VOLUME_TYPES, WINDOWS_OPENING_REGIMES, WINDOWS_TYPES)
from caimira.store.configuration import config
@ -27,37 +25,20 @@ minutes_since_midnight = typing.NewType('minutes_since_midnight', int)
@dataclasses.dataclass
class FormData:
class VirusFormData(FormData):
activity_type: str
air_changes: float
air_supply: float
arve_sensors_option: bool
specific_breaks: dict
precise_activity: dict
ceiling_height: float
conditional_probability_plot: bool
conditional_probability_viral_loads: bool
CO2_fitting_result: dict
exposed_coffee_break_option: str
exposed_coffee_duration: int
exposed_finish: minutes_since_midnight
exposed_lunch_finish: minutes_since_midnight
exposed_lunch_option: bool
exposed_lunch_start: minutes_since_midnight
exposed_start: minutes_since_midnight
floor_area: float
hepa_amount: float
hepa_option: bool
humidity: str
infected_coffee_break_option: str #Used if infected_dont_have_breaks_with_exposed
infected_coffee_duration: int #Used if infected_dont_have_breaks_with_exposed
infected_dont_have_breaks_with_exposed: bool
infected_finish: minutes_since_midnight
infected_lunch_finish: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
infected_lunch_option: bool #Used if infected_dont_have_breaks_with_exposed
infected_lunch_start: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
infected_people: int
infected_start: minutes_since_midnight
inside_temp: float
location_name: str
location_latitude: float
@ -74,9 +55,7 @@ class FormData:
event_month: str
room_heating_option: bool
room_number: str
room_volume: float
simulation_name: str
total_people: int
vaccine_option: bool
vaccine_booster_option: bool
vaccine_type: str
@ -96,119 +75,7 @@ class FormData:
short_range_interactions: list
_DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS
@classmethod
def from_dict(cls, form_data: typing.Dict) -> "FormData":
# Take a copy of the form data so that we can mutate it.
form_data = form_data.copy()
form_data.pop('_xsrf', None)
# Don't let arbitrary unescaped HTML through the net.
for key, value in form_data.items():
if isinstance(value, str):
form_data[key] = html.escape(value)
for key, default_value in cls._DEFAULTS.items():
if form_data.get(key, '') == '':
if default_value is NO_DEFAULT:
raise ValueError(f"{key} must be specified")
form_data[key] = default_value
for key, value in form_data.items():
if key in _CAST_RULES_FORM_ARG_TO_NATIVE:
form_data[key] = _CAST_RULES_FORM_ARG_TO_NATIVE[key](value)
if key not in cls._DEFAULTS:
raise ValueError(f'Invalid argument "{html.escape(key)}" given')
instance = cls(**form_data)
instance.validate()
return instance
@classmethod
def to_dict(cls, form: "FormData", strip_defaults: bool = False) -> dict:
form_dict = {
field.name: getattr(form, field.name)
for field in dataclasses.fields(form)
}
for attr, value in form_dict.items():
if attr in _CAST_RULES_NATIVE_TO_FORM_ARG:
form_dict[attr] = _CAST_RULES_NATIVE_TO_FORM_ARG[attr](value)
if strip_defaults:
del form_dict['calculator_version']
for attr, value in list(form_dict.items()):
default = cls._DEFAULTS.get(attr, NO_DEFAULT)
if default is not NO_DEFAULT and value in [default, 'not-applicable']:
form_dict.pop(attr)
return form_dict
def validate_population_parameters(self):
# Validate number of infected <= number of total people
if self.infected_people >= self.total_people:
raise ValueError('Number of infected people cannot be greater or equal to the number of total people.')
# Validate time intervals selected by user
time_intervals = [
['exposed_start', 'exposed_finish'],
['infected_start', 'infected_finish'],
]
if self.exposed_lunch_option:
time_intervals.append(['exposed_lunch_start', 'exposed_lunch_finish'])
if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option:
time_intervals.append(['infected_lunch_start', 'infected_lunch_finish'])
for start_name, end_name in time_intervals:
start = getattr(self, start_name)
end = getattr(self, end_name)
if start > end:
raise ValueError(
f"{start_name} must be less than {end_name}. Got {start} and {end}.")
def validate_lunch(start, finish):
lunch_start = getattr(self, f'{population}_lunch_start')
lunch_finish = getattr(self, f'{population}_lunch_finish')
return (start <= lunch_start <= finish and
start <= lunch_finish <= finish)
def get_lunch_mins(population):
lunch_mins = 0
if getattr(self, f'{population}_lunch_option'):
lunch_mins = getattr(self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start')
return lunch_mins
def get_coffee_mins(population):
coffee_mins = 0
if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0':
coffee_mins = COFFEE_OPTIONS_INT[getattr(self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration')
return coffee_mins
def get_activity_mins(population):
return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start')
populations = ['exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed']
for population in populations:
# Validate lunch time within the activity times.
if (getattr(self, f'{population}_lunch_option') and
not validate_lunch(getattr(self, f'{population}_start'), getattr(self, f'{population}_finish'))
):
raise ValueError(
f"{population} lunch break must be within presence times."
)
# Length of breaks < length of activity
if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population):
raise ValueError(
f"Length of breaks >= Length of {population} presence."
)
for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT),
('infected_coffee_break_option', COFFEE_OPTIONS_INT)]:
if getattr(self, attr_name) not in valid_set:
raise ValueError(f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
def validate(self):
# Validate population parameters
self.validate_population_parameters()
@ -591,212 +458,11 @@ class FormData:
)
return exposed
def _compute_breaks_in_interval(self, start, finish, n_breaks, duration) -> models.BoundarySequence_t:
break_delay = ((finish - start) - (n_breaks * duration)) // (n_breaks+1)
break_times = []
end = start
for n in range(n_breaks):
begin = end + break_delay
end = begin + duration
break_times.append((begin, end))
return tuple(break_times)
def exposed_lunch_break_times(self) -> models.BoundarySequence_t:
result = []
if self.exposed_lunch_option:
result.append((self.exposed_lunch_start, self.exposed_lunch_finish))
return tuple(result)
def infected_lunch_break_times(self) -> models.BoundarySequence_t:
if self.infected_dont_have_breaks_with_exposed:
result = []
if self.infected_lunch_option:
result.append((self.infected_lunch_start, self.infected_lunch_finish))
return tuple(result)
else:
return self.exposed_lunch_break_times()
def exposed_number_of_coffee_breaks(self) -> int:
return COFFEE_OPTIONS_INT[self.exposed_coffee_break_option]
def infected_number_of_coffee_breaks(self) -> int:
return COFFEE_OPTIONS_INT[self.infected_coffee_break_option]
def _coffee_break_times(self, activity_start, activity_finish, coffee_breaks, coffee_duration, lunch_start, lunch_finish) -> models.BoundarySequence_t:
time_before_lunch = lunch_start - activity_start
time_after_lunch = activity_finish - lunch_finish
before_lunch_frac = time_before_lunch / (time_before_lunch + time_after_lunch)
n_morning_breaks = round(coffee_breaks * before_lunch_frac)
breaks = (
self._compute_breaks_in_interval(
activity_start, lunch_start, n_morning_breaks, coffee_duration
)
+ self._compute_breaks_in_interval(
lunch_finish, activity_finish, coffee_breaks - n_morning_breaks, coffee_duration
)
)
return breaks
def exposed_coffee_break_times(self) -> models.BoundarySequence_t:
exposed_coffee_breaks = self.exposed_number_of_coffee_breaks()
if exposed_coffee_breaks == 0:
return ()
if self.exposed_lunch_option:
breaks = self._coffee_break_times(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration, self.exposed_lunch_start, self.exposed_lunch_finish)
else:
breaks = self._compute_breaks_in_interval(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration)
return breaks
def infected_coffee_break_times(self) -> models.BoundarySequence_t:
if self.infected_dont_have_breaks_with_exposed:
infected_coffee_breaks = self.infected_number_of_coffee_breaks()
if infected_coffee_breaks == 0:
return ()
if self.infected_lunch_option:
breaks = self._coffee_break_times(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration, self.infected_lunch_start, self.infected_lunch_finish)
else:
breaks = self._compute_breaks_in_interval(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration)
return breaks
else:
return self.exposed_coffee_break_times()
def generate_specific_break_times(self, population_breaks) -> models.BoundarySequence_t:
break_times = []
for n in population_breaks:
# Parse break times.
begin = time_string_to_minutes(n["start_time"])
end = time_string_to_minutes(n["finish_time"])
for time in [begin, end]:
# For a specific break, the infected and exposed presence is the same.
if not getattr(self, 'infected_start') < time < getattr(self, 'infected_finish'):
raise ValueError(f'All breaks should be within the simulation time. Got {time_minutes_to_string(time)}.')
break_times.append((begin, end))
return tuple(break_times)
def present_interval(
self,
start: int,
finish: int,
breaks: typing.Optional[models.BoundarySequence_t] = None,
) -> models.Interval:
"""
Calculate the presence interval given the start and end times (in minutes), and
a number of monotonic, non-overlapping, but potentially unsorted, breaks (also in minutes).
"""
if not breaks:
# If there are no breaks, the interval is the start and end.
return models.SpecificInterval(((start/60, finish/60),))
# Order the breaks by their start-time, and ensure that they are monotonic
# and that the start of one break happens after the end of another.
break_boundaries: models.BoundarySequence_t = tuple(sorted(breaks, key=lambda break_pair: break_pair[0]))
for break_start, break_end in break_boundaries:
if break_start >= break_end:
raise ValueError("Break ends before it begins.")
prev_break_end = break_boundaries[0][1]
for break_start, break_end in break_boundaries[1:]:
if prev_break_end >= break_start:
raise ValueError(f"A break starts before another ends ({break_start}, {break_end}, {prev_break_end}).")
prev_break_end = break_end
present_intervals = []
current_time = start
LOG.debug(f"starting time march at {_hours2timestring(current_time/60)} to {_hours2timestring(finish/60)}")
# As we step through the breaks. For each break there are 6 important cases
# we must cover. Let S=start; E=end; Bs=Break start; Be=Break end:
# 1. The interval is entirely before the break. S < E <= Bs < Be
# 2. The interval straddles the start of the break. S < Bs < E <= Be
# 3. The break is entirely inside the interval. S < Bs < Be <= E
# 4. The interval is entirely inside the break. Bs <= S < E <= Be
# 5. The interval straddles the end of the break. Bs <= S < Be <= E
# 6. The interval is entirely after the break. Bs < Be <= S < E
for current_break in break_boundaries:
if current_time >= finish:
break
LOG.debug(f"handling break {_hours2timestring(current_break[0]/60)}-{_hours2timestring(current_break[1]/60)} "
f" (current time: {_hours2timestring(current_time/60)})")
break_s, break_e = current_break
case1 = finish <= break_s
case2 = current_time < break_s < finish < break_e
case3 = current_time < break_s < break_e <= finish
case4 = break_s <= current_time < finish <= break_e
case5 = break_s <= current_time < break_e < finish
case6 = break_e <= current_time
if case1:
LOG.debug(f"case 1: interval entirely before break")
present_intervals.append((current_time / 60, finish / 60))
LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
f"- {_hours2timestring(present_intervals[-1][1])}")
current_time = finish
elif case2:
LOG.debug(f"case 2: interval straddles start of break")
present_intervals.append((current_time / 60, break_s / 60))
LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
f"- {_hours2timestring(present_intervals[-1][1])}")
current_time = break_e
elif case3:
LOG.debug(f"case 3: break entirely inside interval")
# We add the bit before the break, but not the bit afterwards,
# as it may hit another break.
present_intervals.append((current_time / 60, break_s / 60))
LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} "
f"- {_hours2timestring(present_intervals[-1][1])}")
current_time = break_e
elif case4:
LOG.debug(f"case 4: interval entirely inside break")
current_time = finish
elif case5:
LOG.debug(f"case 5: interval straddles end of break")
current_time = break_e
elif case6:
LOG.debug(f"case 6: interval entirely after the break")
if current_time < finish:
LOG.debug("trailing interval")
present_intervals.append((current_time / 60, finish / 60))
return models.SpecificInterval(tuple(present_intervals))
def infected_present_interval(self) -> models.Interval:
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(self.specific_breaks['infected_breaks'])
else:
breaks = self.infected_lunch_break_times() + self.infected_coffee_break_times()
return self.present_interval(
self.infected_start, self.infected_finish,
breaks=breaks,
)
def population_present_interval(self) -> models.Interval:
state_change_times = set(self.infected_present_interval().transition_times())
state_change_times.update(self.exposed_present_interval().transition_times())
all_state_changes = sorted(state_change_times)
return models.SpecificInterval(tuple(zip(all_state_changes[:-1], all_state_changes[1:])))
def short_range_interval(self, interaction) -> models.SpecificInterval:
start_time = time_string_to_minutes(interaction['start_time'])
duration = float(interaction['duration'])
return models.SpecificInterval(present_times=((start_time/60, (start_time + duration)/60),))
def exposed_present_interval(self) -> models.Interval:
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(self.specific_breaks['exposed_breaks'])
else:
breaks = self.exposed_lunch_break_times() + self.exposed_coffee_break_times()
return self.present_interval(
self.exposed_start, self.exposed_finish,
breaks=breaks,
)
def build_expiration(expiration_definition) -> mc._ExpirationBase:
if isinstance(expiration_definition, str):
@ -875,80 +541,4 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]:
'short_range_interactions': '[]',
}
def _hours2timestring(hours: float):
# Convert times like 14.5 to strings, like "14:30"
return f"{int(np.floor(hours)):02d}:{int(np.round((hours % 1) * 60)):02d}"
def time_string_to_minutes(time: str) -> minutes_since_midnight:
"""
Converts time from string-format to an integer number of minutes after 00:00
:param time: A string of the form "HH:MM" representing a time of day
:return: The number of minutes between 'time' and 00:00
"""
return minutes_since_midnight(60 * int(time[:2]) + int(time[3:]))
def time_minutes_to_string(time: int) -> str:
"""
Converts time from an integer number of minutes after 00:00 to string-format
:param time: The number of minutes between 'time' and 00:00
:return: A string of the form "HH:MM" representing a time of day
"""
return "{0:0=2d}".format(int(time/60)) + ":" + "{0:0=2d}".format(time%60)
def string_to_list(s: str) -> list:
return list(ast.literal_eval(s.replace("&quot;", "\"")))
def list_to_string(l: list) -> str:
return json.dumps(l)
def string_to_dict(s: str) -> dict:
return dict(ast.literal_eval(s.replace("&quot;", "\"")))
def dict_to_string(d: dict) -> str:
return json.dumps(d)
def _safe_int_cast(value) -> int:
if isinstance(value, int):
return value
elif isinstance(value, float) and int(value) == value:
return int(value)
elif isinstance(value, str) and value.isdecimal():
return int(value)
else:
raise TypeError(f"Unable to safely cast {value} ({type(value)} type) to int")
#: Mapping of field name to a callable which can convert values from form
#: input (URL encoded arguments / string) into the correct type.
_CAST_RULES_FORM_ARG_TO_NATIVE: typing.Dict[str, typing.Callable] = {}
#: Mapping of field name to callable which can convert native type to values
#: that can be encoded to URL arguments.
_CAST_RULES_NATIVE_TO_FORM_ARG: typing.Dict[str, typing.Callable] = {}
for _field in dataclasses.fields(FormData):
if _field.type is minutes_since_midnight:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = time_string_to_minutes
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = time_minutes_to_string
elif _field.type is int:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = _safe_int_cast
elif _field.type is float:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = float
elif _field.type is bool:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = lambda v: v == '1'
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = int
elif _field.type is list:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_list
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = list_to_string
elif _field.type is dict:
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_dict
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = dict_to_string
cast_class_fields(VirusFormData)

View file

@ -15,7 +15,7 @@ import matplotlib.pyplot as plt
from caimira import models
from caimira.apps.calculator import markdown_tools
from ... import monte_carlo as mc
from .model_generator import FormData, DEFAULT_MC_SAMPLE_SIZE
from .model_generator import VirusFormData, DEFAULT_MC_SAMPLE_SIZE
from ... import dataclass_utils
from caimira.store.configuration import config
@ -102,7 +102,7 @@ def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional
return nice_times
def concentrations_with_sr_breathing(form: FormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]:
def concentrations_with_sr_breathing(form: VirusFormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]:
lower_concentrations = []
for time in times:
for index, (start, stop) in enumerate(short_range_intervals):
@ -114,7 +114,7 @@ def concentrations_with_sr_breathing(form: FormData, model: models.ExposureModel
return lower_concentrations
def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing.Dict[str, typing.Any]:
def calculate_report_data(form: VirusFormData, model: models.ExposureModel) -> typing.Dict[str, typing.Any]:
times = interesting_times(model)
short_range_intervals = [interaction.presence.boundaries()[0] for interaction in model.short_range]
short_range_expirations = [interaction['expiration'] for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else []
@ -175,8 +175,8 @@ def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing
}
def generate_permalink(base_url, get_root_url, get_root_calculator_url, form: FormData):
form_dict = FormData.to_dict(form, strip_defaults=True)
def generate_permalink(base_url, get_root_url, get_root_calculator_url, form: VirusFormData):
form_dict = VirusFormData.to_dict(form, strip_defaults=True)
# Generate the calculator URL arguments that would be needed to re-create this
# form.
@ -353,7 +353,7 @@ def manufacture_viral_load_scenarios_percentiles(model: mc.ExposureModel) -> typ
return scenarios
def manufacture_alternative_scenarios(form: FormData) -> typing.Dict[str, mc.ExposureModel]:
def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]:
scenarios = {}
if (form.short_range_option == "short_range_no"):
# Two special option cases - HEPA and/or FFP2 masks.
@ -424,7 +424,7 @@ def scenario_statistics(mc_model: mc.ExposureModel, sample_times: typing.List[fl
def comparison_report(
form: FormData,
form: VirusFormData,
report_data: typing.Dict[str, typing.Any],
scenarios: typing.Dict[str, mc.ExposureModel],
sample_times: typing.List[float],
@ -472,7 +472,7 @@ class ReportGenerator:
def build_report(
self,
base_url: str,
form: FormData,
form: VirusFormData,
executor_factory: typing.Callable[[], concurrent.futures.Executor],
) -> str:
model = form.build_model()
@ -483,7 +483,7 @@ class ReportGenerator:
self,
base_url: str,
model: models.ExposureModel,
form: FormData,
form: VirusFormData,
executor_factory: typing.Callable[[], concurrent.futures.Executor],
) -> dict:
now = datetime.utcnow().astimezone()

View file

@ -10,4 +10,4 @@ def baseline_form_data():
@pytest.fixture
def baseline_form(baseline_form_data):
return model_generator.FormData.from_dict(baseline_form_data)
return model_generator.VirusFormData.from_dict(baseline_form_data)

View file

@ -7,21 +7,22 @@ import pytest
from retry import retry
from caimira.apps.calculator import model_generator
from caimira.apps.calculator.model_generator import _hours2timestring
from caimira.apps.calculator.model_generator import minutes_since_midnight
from caimira.apps.calculator.form_data import (_hours2timestring, minutes_since_midnight,
_CAST_RULES_FORM_ARG_TO_NATIVE, _CAST_RULES_NATIVE_TO_FORM_ARG)
from caimira import models
from caimira.monte_carlo.data import expiration_distributions
from caimira.apps.calculator.defaults import NO_DEFAULT
def test_model_from_dict(baseline_form_data):
form = model_generator.FormData.from_dict(baseline_form_data)
form = model_generator.VirusFormData.from_dict(baseline_form_data)
assert isinstance(form.build_model(), models.ExposureModel)
def test_model_from_dict_invalid(baseline_form_data):
baseline_form_data['invalid_item'] = 'foobar'
with pytest.raises(ValueError, match='Invalid argument "invalid_item" given'):
model_generator.FormData.from_dict(baseline_form_data)
model_generator.VirusFormData.from_dict(baseline_form_data)
@retry(tries=10)
@ -44,7 +45,7 @@ def test_blend_expiration(mask_type):
npt.assert_allclose(r.aerosols(mask).mean(), expected, rtol=TOLERANCE)
def test_ventilation_slidingwindow(baseline_form: model_generator.FormData):
def test_ventilation_slidingwindow(baseline_form: model_generator.VirusFormData):
baseline_form.ventilation_type = 'natural_ventilation'
baseline_form.windows_duration = 10
baseline_form.windows_frequency = 120
@ -74,7 +75,7 @@ def test_ventilation_slidingwindow(baseline_form: model_generator.FormData):
assert ventilation == baseline_vent
def test_ventilation_hingedwindow(baseline_form: model_generator.FormData):
def test_ventilation_hingedwindow(baseline_form: model_generator.VirusFormData):
baseline_form.ventilation_type = 'natural_ventilation'
baseline_form.windows_duration = 10
baseline_form.windows_frequency = 120
@ -104,7 +105,7 @@ def test_ventilation_hingedwindow(baseline_form: model_generator.FormData):
assert ventilation == baseline_vent
def test_ventilation_mechanical(baseline_form: model_generator.FormData):
def test_ventilation_mechanical(baseline_form: model_generator.VirusFormData):
room = models.Room(volume=75, inside_temp=models.PiecewiseConstant((0, 24), (293,)))
mech = models.HVACMechanical(
active=models.PeriodicInterval(period=120, duration=120),
@ -119,7 +120,7 @@ def test_ventilation_mechanical(baseline_form: model_generator.FormData):
np.array([baseline_form.ventilation().air_exchange(room, t) for t in ts]))
def test_ventilation_airchanges(baseline_form: model_generator.FormData):
def test_ventilation_airchanges(baseline_form: model_generator.VirusFormData):
room = models.Room(75, inside_temp=models.PiecewiseConstant((0, 24), (293,)))
airchange = models.AirChange(
active=models.PeriodicInterval(period=120, duration=120),
@ -134,7 +135,7 @@ def test_ventilation_airchanges(baseline_form: model_generator.FormData):
np.array([baseline_form.ventilation().air_exchange(room, t) for t in ts]))
def test_ventilation_window_hepa(baseline_form: model_generator.FormData):
def test_ventilation_window_hepa(baseline_form: model_generator.VirusFormData):
baseline_form.ventilation_type = 'natural_ventilation'
baseline_form.windows_duration = 10
baseline_form.windows_frequency = 120
@ -177,7 +178,7 @@ def test_ventilation_window_hepa(baseline_form: model_generator.FormData):
]
)
def test_infected_less_than_total_people(activity, total_people, infected_people, error,
baseline_form: model_generator.FormData):
baseline_form: model_generator.VirusFormData):
baseline_form.activity_type = activity
baseline_form.total_people = total_people
baseline_form.infected_people = infected_people
@ -190,7 +191,7 @@ def present_times(interval: models.Interval) -> models.BoundarySequence_t:
return interval.present_times
def test_infected_present_intervals(baseline_form: model_generator.FormData):
def test_infected_present_intervals(baseline_form: model_generator.VirusFormData):
baseline_form.infected_dont_have_breaks_with_exposed = False
baseline_form.exposed_coffee_duration = 15
baseline_form.exposed_coffee_break_option = 'coffee_break_2'
@ -204,7 +205,7 @@ def test_infected_present_intervals(baseline_form: model_generator.FormData):
assert present_times(baseline_form.infected_present_interval()) == correct
def test_exposed_present_intervals(baseline_form: model_generator.FormData):
def test_exposed_present_intervals(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_duration = 15
baseline_form.exposed_coffee_break_option = 'coffee_break_2'
baseline_form.exposed_start = minutes_since_midnight(9 * 60)
@ -215,7 +216,7 @@ def test_exposed_present_intervals(baseline_form: model_generator.FormData):
assert present_times(baseline_form.exposed_present_interval()) == correct
def test_present_intervals_common_breaks(baseline_form: model_generator.FormData):
def test_present_intervals_common_breaks(baseline_form: model_generator.VirusFormData):
baseline_form.infected_dont_have_breaks_with_exposed = False
baseline_form.infected_coffee_duration = baseline_form.exposed_coffee_duration = 15
baseline_form.infected_coffee_break_option = baseline_form.exposed_coffee_break_option = 'coffee_break_2'
@ -231,7 +232,7 @@ def test_present_intervals_common_breaks(baseline_form: model_generator.FormData
assert present_times(baseline_form.infected_present_interval()) == correct_infected
def test_present_intervals_split_breaks(baseline_form: model_generator.FormData):
def test_present_intervals_split_breaks(baseline_form: model_generator.VirusFormData):
baseline_form.infected_dont_have_breaks_with_exposed = True
baseline_form.infected_coffee_duration = baseline_form.exposed_coffee_duration = 15
baseline_form.infected_coffee_break_option = baseline_form.exposed_coffee_break_option = 'coffee_break_2'
@ -247,7 +248,7 @@ def test_present_intervals_split_breaks(baseline_form: model_generator.FormData)
assert present_times(baseline_form.infected_present_interval()) == correct_infected
def test_exposed_present_intervals_starting_with_lunch(baseline_form: model_generator.FormData):
def test_exposed_present_intervals_starting_with_lunch(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_0'
baseline_form.exposed_start = baseline_form.exposed_lunch_start = minutes_since_midnight(13 * 60)
baseline_form.exposed_finish = minutes_since_midnight(18 * 60)
@ -256,7 +257,7 @@ def test_exposed_present_intervals_starting_with_lunch(baseline_form: model_gene
assert present_times(baseline_form.exposed_present_interval()) == correct
def test_exposed_present_intervals_ending_with_lunch(baseline_form: model_generator.FormData):
def test_exposed_present_intervals_ending_with_lunch(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_0'
baseline_form.exposed_start = minutes_since_midnight(11 * 60)
baseline_form.exposed_finish = baseline_form.exposed_lunch_start = minutes_since_midnight(13 * 60)
@ -265,7 +266,7 @@ def test_exposed_present_intervals_ending_with_lunch(baseline_form: model_genera
assert present_times(baseline_form.exposed_present_interval()) == correct
def test_exposed_present_lunch_end_before_beginning(baseline_form: model_generator.FormData):
def test_exposed_present_lunch_end_before_beginning(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_0'
baseline_form.exposed_lunch_start = minutes_since_midnight(14 * 60)
baseline_form.exposed_lunch_finish = minutes_since_midnight(13 * 60)
@ -282,7 +283,7 @@ def test_exposed_present_lunch_end_before_beginning(baseline_form: model_generat
[9, 20], # lunch_finish after the presence finishing
],
)
def test_exposed_presence_lunch_break(baseline_form: model_generator.FormData, exposed_lunch_start, exposed_lunch_finish):
def test_exposed_presence_lunch_break(baseline_form: model_generator.VirusFormData, exposed_lunch_start, exposed_lunch_finish):
baseline_form.exposed_lunch_start = minutes_since_midnight(exposed_lunch_start * 60)
baseline_form.exposed_lunch_finish = minutes_since_midnight(exposed_lunch_finish * 60)
with pytest.raises(ValueError, match='exposed lunch break must be within presence times.'):
@ -298,14 +299,14 @@ def test_exposed_presence_lunch_break(baseline_form: model_generator.FormData, e
[9, 20], # lunch_finish after the presence finishing
],
)
def test_infected_presence_lunch_break(baseline_form: model_generator.FormData, infected_lunch_start, infected_lunch_finish):
def test_infected_presence_lunch_break(baseline_form: model_generator.VirusFormData, infected_lunch_start, infected_lunch_finish):
baseline_form.infected_lunch_start = minutes_since_midnight(infected_lunch_start * 60)
baseline_form.infected_lunch_finish = minutes_since_midnight(infected_lunch_finish * 60)
with pytest.raises(ValueError, match='infected lunch break must be within presence times.'):
baseline_form.validate()
def test_exposed_breaks_length(baseline_form: model_generator.FormData):
def test_exposed_breaks_length(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_4'
baseline_form.exposed_coffee_duration = 30
baseline_form.exposed_start = minutes_since_midnight(10 * 60)
@ -315,7 +316,7 @@ def test_exposed_breaks_length(baseline_form: model_generator.FormData):
baseline_form.validate()
def test_infected_breaks_length(baseline_form: model_generator.FormData):
def test_infected_breaks_length(baseline_form: model_generator.VirusFormData):
baseline_form.infected_start = minutes_since_midnight(9 * 60)
baseline_form.infected_finish = minutes_since_midnight(12 * 60)
baseline_form.infected_lunch_start = minutes_since_midnight(10 * 60)
@ -327,7 +328,7 @@ def test_infected_breaks_length(baseline_form: model_generator.FormData):
@pytest.fixture
def coffee_break_between_1045_and_1115(baseline_form: model_generator.FormData):
def coffee_break_between_1045_and_1115(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_1'
baseline_form.exposed_coffee_duration = 30
baseline_form.exposed_start = minutes_since_midnight(10 * 60)
@ -385,7 +386,7 @@ def assert_boundaries(interval, boundaries_in_time_string_form):
@pytest.fixture
def breaks_every_25_mins_for_20_mins(baseline_form: model_generator.FormData):
def breaks_every_25_mins_for_20_mins(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_4'
baseline_form.exposed_coffee_duration = 20
baseline_form.exposed_start = time2mins("10:00")
@ -430,7 +431,7 @@ def test_present_only_during_second_break(breaks_every_25_mins_for_20_mins):
assert_boundaries(interval, [])
def test_valid_no_lunch(baseline_form: model_generator.FormData):
def test_valid_no_lunch(baseline_form: model_generator.VirusFormData):
# Check that it is valid to have a 0 length lunch if no lunch is selected.
baseline_form.exposed_lunch_option = False
baseline_form.exposed_lunch_start = minutes_since_midnight(0)
@ -438,7 +439,7 @@ def test_valid_no_lunch(baseline_form: model_generator.FormData):
assert baseline_form.validate() is None
def test_no_breaks(baseline_form: model_generator.FormData):
def test_no_breaks(baseline_form: model_generator.VirusFormData):
# Check that the times are correct in the absence of breaks.
baseline_form.infected_dont_have_breaks_with_exposed = False
baseline_form.exposed_lunch_option = False
@ -453,7 +454,7 @@ def test_no_breaks(baseline_form: model_generator.FormData):
assert present_times(baseline_form.infected_present_interval()) == infected_correct
def test_coffee_lunch_breaks(baseline_form: model_generator.FormData):
def test_coffee_lunch_breaks(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_duration = 30
baseline_form.exposed_coffee_break_option = 'coffee_break_4'
baseline_form.exposed_start = minutes_since_midnight(9 * 60)
@ -465,7 +466,7 @@ def test_coffee_lunch_breaks(baseline_form: model_generator.FormData):
np.testing.assert_allclose(present_times(baseline_form.exposed_present_interval()), correct, rtol=1e-14)
def test_coffee_lunch_breaks_unbalance(baseline_form: model_generator.FormData):
def test_coffee_lunch_breaks_unbalance(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_duration = 30
baseline_form.exposed_coffee_break_option = 'coffee_break_2'
baseline_form.exposed_start = minutes_since_midnight(9 * 60)
@ -476,7 +477,7 @@ def test_coffee_lunch_breaks_unbalance(baseline_form: model_generator.FormData):
np.testing.assert_allclose(present_times(baseline_form.exposed_present_interval()), correct, rtol=1e-14)
def test_coffee_breaks(baseline_form: model_generator.FormData):
def test_coffee_breaks(baseline_form: model_generator.VirusFormData):
baseline_form.exposed_coffee_duration = 10
baseline_form.exposed_coffee_break_option = 'coffee_break_4'
baseline_form.exposed_start = minutes_since_midnight(9 * 60)
@ -489,24 +490,24 @@ def test_coffee_breaks(baseline_form: model_generator.FormData):
def test_key_validation(baseline_form_data):
baseline_form_data['activity_type'] = 'invalid key'
with pytest.raises(ValueError):
model_generator.FormData.from_dict(baseline_form_data)
model_generator.VirusFormData.from_dict(baseline_form_data)
def test_key_validation_natural_ventilation_window_type_na(baseline_form_data):
baseline_form_data['ventilation_type'] = 'natural_ventilation'
baseline_form_data['window_type'] = 'not-applicable'
with pytest.raises(ValueError, match='window_type cannot be \'not-applicable\''):
model_generator.FormData.from_dict(baseline_form_data)
model_generator.VirusFormData.from_dict(baseline_form_data)
def test_key_validation_natural_ventilation_window_opening_regime_na(baseline_form_data):
baseline_form_data['ventilation_type'] = 'natural_ventilation'
baseline_form_data['window_opening_regime'] = 'not-applicable'
with pytest.raises(ValueError, match='window_opening_regime cannot be \'not-applicable\''):
model_generator.FormData.from_dict(baseline_form_data)
model_generator.VirusFormData.from_dict(baseline_form_data)
def test_natural_ventilation_window_opening_periodically(baseline_form: model_generator.FormData):
def test_natural_ventilation_window_opening_periodically(baseline_form: model_generator.VirusFormData):
baseline_form.window_opening_regime = 'windows_open_periodically'
baseline_form.windows_duration = 20
baseline_form.windows_frequency = 10
@ -518,20 +519,20 @@ def test_key_validation_mech_ventilation_type_na(baseline_form_data):
baseline_form_data['ventilation_type'] = 'mechanical_ventilation'
baseline_form_data['mechanical_ventilation_type'] = 'not-applicable'
with pytest.raises(ValueError, match='mechanical_ventilation_type cannot be \'not-applicable\''):
model_generator.FormData.from_dict(baseline_form_data)
model_generator.VirusFormData.from_dict(baseline_form_data)
def test_key_validation_event_month(baseline_form_data):
baseline_form_data['event_month'] = 'invalid month'
with pytest.raises(ValueError, match='invalid month is not a valid value for event_month'):
model_generator.FormData.from_dict(baseline_form_data)
model_generator.VirusFormData.from_dict(baseline_form_data)
def test_default_types():
# Validate that FormData._DEFAULTS are complete and of the correct type.
# Validate that VirusFormData._DEFAULTS are complete and of the correct type.
# Validate that we have the right types and matching attributes to the DEFAULTS.
fields = {field.name: field for field in dataclasses.fields(model_generator.FormData)}
for field, value in model_generator.FormData._DEFAULTS.items():
fields = {field.name: field for field in dataclasses.fields(model_generator.VirusFormData)}
for field, value in model_generator.VirusFormData._DEFAULTS.items():
if field not in fields:
raise ValueError(f"Unmatched default {field}")
@ -540,17 +541,17 @@ def test_default_types():
# Handle typing.NewType definitions.
field_type = field_type.__supertype__
if value is model_generator.NO_DEFAULT:
if value is NO_DEFAULT:
continue
if field in model_generator._CAST_RULES_FORM_ARG_TO_NATIVE:
value = model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[field](value)
if field in _CAST_RULES_FORM_ARG_TO_NATIVE:
value = _CAST_RULES_FORM_ARG_TO_NATIVE[field](value)
if not isinstance(value, field_type):
raise TypeError(f'{field} has type {field_type}, got {type(value)}')
for field in fields.values():
assert field.name in model_generator.FormData._DEFAULTS, f"No default set for field name {field.name}"
assert field.name in model_generator.VirusFormData._DEFAULTS, f"No default set for field name {field.name}"
def test_form_to_dict(baseline_form):
@ -559,7 +560,7 @@ def test_form_to_dict(baseline_form):
assert 1 < len(stripped) < len(full)
assert 'exposed_coffee_break_option' in stripped
# If we set the value to the default one, it should no longer turn up in the dictionary.
baseline_form.exposed_coffee_break_option = model_generator.FormData._DEFAULTS['exposed_coffee_break_option']
baseline_form.exposed_coffee_break_option = model_generator.VirusFormData._DEFAULTS['exposed_coffee_break_option']
assert 'exposed_coffee_break_option' not in baseline_form.to_dict(baseline_form, strip_defaults=True)
@ -577,7 +578,7 @@ def test_form_timezone(baseline_form_data, longitude, latitude, month, expected_
baseline_form_data['location_latitude'] = latitude
baseline_form_data['location_longitude'] = longitude
baseline_form_data['event_month'] = month
form = model_generator.FormData.from_dict(baseline_form_data)
form = model_generator.VirusFormData.from_dict(baseline_form_data)
name, offset = form.tz_name_and_utc_offset()
assert name == expected_tz_name
assert offset == expected_offset

View file

@ -13,7 +13,7 @@ from caimira.apps.calculator import model_generator
[{"exposed_breaks": [], "ifected_breaks": []}, 'Unable to fetch "infected_breaks" key. Got "ifected_breaks".'],
]
)
def test_specific_break_structure(break_input, error, baseline_form: model_generator.FormData):
def test_specific_break_structure(break_input, error, baseline_form: model_generator.VirusFormData):
baseline_form.specific_breaks = break_input
with pytest.raises(TypeError, match=error):
baseline_form.validate()
@ -30,7 +30,7 @@ def test_specific_break_structure(break_input, error, baseline_form: model_gener
[[{"start_time": "10:00", "finish_time": "11"}], 'Wrong time format - "HH:MM". Got "11".'],
]
)
def test_specific_population_break_data_structure(population_break_input, error, baseline_form: model_generator.FormData):
def test_specific_population_break_data_structure(population_break_input, error, baseline_form: model_generator.VirusFormData):
baseline_form.specific_breaks = {'exposed_breaks': population_break_input, 'infected_breaks': population_break_input}
with pytest.raises(TypeError, match=error):
baseline_form.validate()
@ -45,7 +45,7 @@ def test_specific_population_break_data_structure(population_break_input, error,
[[{"start_time": "08:00", "finish_time": "11:00"}, {"start_time": "14:00", "finish_time": "15:00"}, ], "All breaks should be within the simulation time. Got 08:00."],
]
)
def test_specific_break_time(break_input, error, baseline_form: model_generator.FormData):
def test_specific_break_time(break_input, error, baseline_form: model_generator.VirusFormData):
with pytest.raises(ValueError, match=error):
baseline_form.generate_specific_break_times(break_input)
@ -63,7 +63,7 @@ def test_specific_break_time(break_input, error, baseline_form: model_generator.
[{"physical_activity": "Light activity", "respiratory_activity": [{"type": "Breathing", "percentag": 50}, {"type": "Speaking", "percentage": 50}]}, 'Unable to fetch "percentage" key. Got "percentag".'],
]
)
def test_precise_activity_structure(precise_activity_input, error, baseline_form: model_generator.FormData):
def test_precise_activity_structure(precise_activity_input, error, baseline_form: model_generator.VirusFormData):
baseline_form.precise_activity = precise_activity_input
with pytest.raises(TypeError, match=error):
baseline_form.validate()
@ -78,7 +78,7 @@ def test_precise_activity_structure(precise_activity_input, error, baseline_form
[{"physical_activity": "Light activity", "respiratory_activity": [{"type": "Breathing", "percentage": 50}]}, 'The sum of all respiratory activities should be 100. Got 50.'],
]
)
def test_sum_precise_activity(precise_activity_input, error, baseline_form: model_generator.FormData):
def test_sum_precise_activity(precise_activity_input, error, baseline_form: model_generator.VirusFormData):
baseline_form.precise_activity = precise_activity_input
with pytest.raises(ValueError, match=error):
baseline_form.validate()