From 3da67c8c95f27071bc61b3154cf71fffdf70c6c4 Mon Sep 17 00:00:00 2001 From: Luis Aleixo Date: Fri, 17 Nov 2023 11:22:41 +0100 Subject: [PATCH] add superclass FormData for CO2FormData and VirusFormData --- caimira/apps/calculator/__init__.py | 6 +- .../apps/calculator/co2_model_generator.py | 102 ++-- caimira/apps/calculator/form_data.py | 440 ++++++++++++++++++ caimira/apps/calculator/model_generator.py | 418 +---------------- caimira/apps/calculator/report_generator.py | 18 +- caimira/tests/apps/calculator/conftest.py | 2 +- .../apps/calculator/test_model_generator.py | 87 ++-- .../test_specific_model_generator.py | 10 +- 8 files changed, 541 insertions(+), 542 deletions(-) create mode 100644 caimira/apps/calculator/form_data.py diff --git a/caimira/apps/calculator/__init__.py b/caimira/apps/calculator/__init__.py index 28a0839d..b5a11900 100644 --- a/caimira/apps/calculator/__init__.py +++ b/caimira/apps/calculator/__init__.py @@ -106,7 +106,7 @@ class ConcentrationModel(BaseRequestHandler): start = datetime.datetime.now() try: - form = model_generator.FormData.from_dict(requested_model_config) + form = model_generator.VirusFormData.from_dict(requested_model_config) except Exception as err: if self.settings.get("debug", False): import traceback @@ -157,7 +157,7 @@ class ConcentrationModelJsonResponse(BaseRequestHandler): pprint(requested_model_config) try: - form = model_generator.FormData.from_dict(requested_model_config) + form = model_generator.VirusFormData.from_dict(requested_model_config) except Exception as err: if self.settings.get("debug", False): import traceback @@ -178,7 +178,7 @@ class ConcentrationModelJsonResponse(BaseRequestHandler): class StaticModel(BaseRequestHandler): async def get(self) -> None: - form = model_generator.FormData.from_dict(model_generator.baseline_raw_form_data()) + form = model_generator.VirusFormData.from_dict(model_generator.baseline_raw_form_data()) base_url = self.request.protocol + "://" + self.request.host report_generator: ReportGenerator = self.settings['report_generator'] executor = loky.get_reusable_executor(max_workers=self.settings['handler_worker_pool_size']) diff --git a/caimira/apps/calculator/co2_model_generator.py b/caimira/apps/calculator/co2_model_generator.py index 0ca201c0..3d90fb99 100644 --- a/caimira/apps/calculator/co2_model_generator.py +++ b/caimira/apps/calculator/co2_model_generator.py @@ -1,13 +1,13 @@ import dataclasses -import html import logging import typing import numpy as np import ruptures as rpt import matplotlib.pyplot as plt +import re from caimira import models -from . import model_generator +from .form_data import FormData, cast_class_fields from .defaults import DEFAULT_MC_SAMPLE_SIZE, NO_DEFAULT from .report_generator import img2base64, _figure2bytes @@ -17,29 +17,10 @@ LOG = logging.getLogger(__name__) @dataclasses.dataclass -class CO2FormData(model_generator.FormData): +class CO2FormData(FormData): CO2_data: dict - exposed_coffee_break_option: str - exposed_coffee_duration: int - exposed_finish: model_generator.minutes_since_midnight - exposed_lunch_finish: model_generator.minutes_since_midnight - exposed_lunch_option: bool - exposed_lunch_start: model_generator.minutes_since_midnight - exposed_start: model_generator.minutes_since_midnight fitting_ventilation_states: list fitting_ventilation_type: str - infected_coffee_break_option: str - infected_coffee_duration: int - infected_dont_have_breaks_with_exposed: bool - infected_finish: model_generator.minutes_since_midnight - infected_lunch_finish: model_generator.minutes_since_midnight - infected_lunch_option: bool - infected_lunch_start: model_generator.minutes_since_midnight - infected_people: int - infected_start: model_generator.minutes_since_midnight - room_volume: float - specific_breaks: dict - total_people: int #: The default values for undefined fields. Note that the defaults here #: and the defaults in the html form must not be contradictory. @@ -73,33 +54,37 @@ class CO2FormData(model_generator.FormData): for key, value in self._DEFAULTS.items(): setattr(self, key, kwargs.get(key, value)) - @classmethod - def from_dict(self, form_data: typing.Dict) -> "CO2FormData": - # Take a copy of the form data so that we can mutate it. - form_data = form_data.copy() - form_data.pop('_xsrf', None) + def validate(self): + # Validate population parameters + self.validate_population_parameters() - # Don't let arbitrary unescaped HTML through the net. - for key, value in form_data.items(): - if isinstance(value, str): - form_data[key] = html.escape(value) + # Validate specific inputs - breaks (exposed and infected) + if self.specific_breaks != {}: + if type(self.specific_breaks) is not dict: + raise TypeError('The specific breaks should be in a dictionary.') + + dict_keys = list(self.specific_breaks.keys()) + if "exposed_breaks" not in dict_keys: + raise TypeError(f'Unable to fetch "exposed_breaks" key. Got "{dict_keys[0]}".') + if "infected_breaks" not in dict_keys: + raise TypeError(f'Unable to fetch "infected_breaks" key. Got "{dict_keys[1]}".') - for key, default_value in self._DEFAULTS.items(): - if form_data.get(key, '') == '': - if default_value is NO_DEFAULT: - raise ValueError(f"{key} must be specified") - form_data[key] = default_value - - for key, value in form_data.items(): - if key in model_generator._CAST_RULES_FORM_ARG_TO_NATIVE: - form_data[key] = model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[key](value) - - if key not in self._DEFAULTS: - raise ValueError(f'Invalid argument "{html.escape(key)}" given') - - instance = self(**form_data) - instance.validate_population_parameters() - return instance + for population_breaks in ['exposed_breaks', 'infected_breaks']: + if self.specific_breaks[population_breaks] != []: + if type(self.specific_breaks[population_breaks]) is not list: + raise TypeError(f'All breaks should be in a list. Got {type(self.specific_breaks[population_breaks])}.') + for input_break in self.specific_breaks[population_breaks]: + # Input validations. + if type(input_break) is not dict: + raise TypeError(f'Each break should be a dictionary. Got {type(input_break)}.') + dict_keys = list(input_break.keys()) + if "start_time" not in input_break: + raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys[0]}".') + if "finish_time" not in input_break: + raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys[1]}".') + for time in input_break.values(): + if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time): + raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".') @classmethod def find_change_points_with_pelt(self, CO2_data: dict): @@ -196,23 +181,6 @@ class CO2FormData(model_generator.FormData): ventilation_transition_times=self.ventilation_transition_times(), times=self.CO2_data['times'], CO2_concentrations=self.CO2_data['CO2'], - ) - - -for _field in dataclasses.fields(CO2FormData): - if _field.type is minutes_since_midnight: - model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = model_generator.time_string_to_minutes - model_generator._CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = model_generator.time_minutes_to_string - elif _field.type is int: - model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = model_generator._safe_int_cast - elif _field.type is float: - model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = float - elif _field.type is bool: - model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = lambda v: v == '1' - model_generator._CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = int - elif _field.type is list: - model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = model_generator.string_to_list - model_generator._CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = model_generator.list_to_string - elif _field.type is dict: - model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = model_generator.string_to_dict - model_generator._CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = model_generator.dict_to_string + ) + +cast_class_fields(CO2FormData) diff --git a/caimira/apps/calculator/form_data.py b/caimira/apps/calculator/form_data.py new file mode 100644 index 00000000..9d82af94 --- /dev/null +++ b/caimira/apps/calculator/form_data.py @@ -0,0 +1,440 @@ +import dataclasses +import datetime +import html +import logging +import typing +import ast +import json + +import numpy as np + +from caimira import models +from .defaults import DEFAULTS, NO_DEFAULT, COFFEE_OPTIONS_INT, DEFAULT_MC_SAMPLE_SIZE + +LOG = logging.getLogger(__name__) + +minutes_since_midnight = typing.NewType('minutes_since_midnight', int) + + +@dataclasses.dataclass +class FormData: + specific_breaks: dict + exposed_coffee_break_option: str + exposed_coffee_duration: int + exposed_finish: minutes_since_midnight + exposed_lunch_finish: minutes_since_midnight + exposed_lunch_option: bool + exposed_lunch_start: minutes_since_midnight + exposed_start: minutes_since_midnight + infected_coffee_break_option: str #Used if infected_dont_have_breaks_with_exposed + infected_coffee_duration: int #Used if infected_dont_have_breaks_with_exposed + infected_dont_have_breaks_with_exposed: bool + infected_finish: minutes_since_midnight + infected_lunch_finish: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed + infected_lunch_option: bool #Used if infected_dont_have_breaks_with_exposed + infected_lunch_start: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed + infected_people: int + infected_start: minutes_since_midnight + room_volume: float + total_people: int + + _DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS + + @classmethod + def from_dict(cls, form_data: typing.Dict): + # Take a copy of the form data so that we can mutate it. + form_data = form_data.copy() + form_data.pop('_xsrf', None) + + # Don't let arbitrary unescaped HTML through the net. + for key, value in form_data.items(): + if isinstance(value, str): + form_data[key] = html.escape(value) + + for key, default_value in cls._DEFAULTS.items(): + if form_data.get(key, '') == '': + if default_value is NO_DEFAULT: + raise ValueError(f"{key} must be specified") + form_data[key] = default_value + + for key, value in form_data.items(): + if key in _CAST_RULES_FORM_ARG_TO_NATIVE: + form_data[key] = _CAST_RULES_FORM_ARG_TO_NATIVE[key](value) + + if key not in cls._DEFAULTS: + raise ValueError(f'Invalid argument "{html.escape(key)}" given') + + instance = cls(**form_data) + instance.validate() + return instance + + @classmethod + def to_dict(cls, form: "FormData", strip_defaults: bool = False) -> dict: + form_dict = { + field.name: getattr(form, field.name) + for field in dataclasses.fields(form) + } + + for attr, value in form_dict.items(): + if attr in _CAST_RULES_NATIVE_TO_FORM_ARG: + form_dict[attr] = _CAST_RULES_NATIVE_TO_FORM_ARG[attr](value) + + if strip_defaults: + del form_dict['calculator_version'] + + for attr, value in list(form_dict.items()): + default = cls._DEFAULTS.get(attr, NO_DEFAULT) + if default is not NO_DEFAULT and value in [default, 'not-applicable']: + form_dict.pop(attr) + return form_dict + + def validate_population_parameters(self): + # Validate number of infected <= number of total people + if self.infected_people >= self.total_people: + raise ValueError('Number of infected people cannot be greater or equal to the number of total people.') + + # Validate time intervals selected by user + time_intervals = [ + ['exposed_start', 'exposed_finish'], + ['infected_start', 'infected_finish'], + ] + if self.exposed_lunch_option: + time_intervals.append(['exposed_lunch_start', 'exposed_lunch_finish']) + if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option: + time_intervals.append(['infected_lunch_start', 'infected_lunch_finish']) + + for start_name, end_name in time_intervals: + start = getattr(self, start_name) + end = getattr(self, end_name) + if start > end: + raise ValueError( + f"{start_name} must be less than {end_name}. Got {start} and {end}.") + + def validate_lunch(start, finish): + lunch_start = getattr(self, f'{population}_lunch_start') + lunch_finish = getattr(self, f'{population}_lunch_finish') + return (start <= lunch_start <= finish and + start <= lunch_finish <= finish) + + def get_lunch_mins(population): + lunch_mins = 0 + if getattr(self, f'{population}_lunch_option'): + lunch_mins = getattr(self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start') + return lunch_mins + + def get_coffee_mins(population): + coffee_mins = 0 + if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0': + coffee_mins = COFFEE_OPTIONS_INT[getattr(self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration') + return coffee_mins + + def get_activity_mins(population): + return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start') + + populations = ['exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed'] + for population in populations: + # Validate lunch time within the activity times. + if (getattr(self, f'{population}_lunch_option') and + not validate_lunch(getattr(self, f'{population}_start'), getattr(self, f'{population}_finish')) + ): + raise ValueError( + f"{population} lunch break must be within presence times." + ) + + # Length of breaks < length of activity + if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population): + raise ValueError( + f"Length of breaks >= Length of {population} presence." + ) + + for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT), + ('infected_coffee_break_option', COFFEE_OPTIONS_INT)]: + if getattr(self, attr_name) not in valid_set: + raise ValueError(f"{getattr(self, attr_name)} is not a valid value for {attr_name}") + + def validate(self): + raise NotImplementedError("Subclass must implement") + + def build_model(self, sample_size=DEFAULT_MC_SAMPLE_SIZE): + raise NotImplementedError("Subclass must implement") + + def _compute_breaks_in_interval(self, start, finish, n_breaks, duration) -> models.BoundarySequence_t: + break_delay = ((finish - start) - (n_breaks * duration)) // (n_breaks+1) + break_times = [] + end = start + for n in range(n_breaks): + begin = end + break_delay + end = begin + duration + break_times.append((begin, end)) + return tuple(break_times) + + def exposed_lunch_break_times(self) -> models.BoundarySequence_t: + result = [] + if self.exposed_lunch_option: + result.append((self.exposed_lunch_start, self.exposed_lunch_finish)) + return tuple(result) + + def infected_lunch_break_times(self) -> models.BoundarySequence_t: + if self.infected_dont_have_breaks_with_exposed: + result = [] + if self.infected_lunch_option: + result.append((self.infected_lunch_start, self.infected_lunch_finish)) + return tuple(result) + else: + return self.exposed_lunch_break_times() + + def exposed_number_of_coffee_breaks(self) -> int: + return COFFEE_OPTIONS_INT[self.exposed_coffee_break_option] + + def infected_number_of_coffee_breaks(self) -> int: + return COFFEE_OPTIONS_INT[self.infected_coffee_break_option] + + def _coffee_break_times(self, activity_start, activity_finish, coffee_breaks, coffee_duration, lunch_start, lunch_finish) -> models.BoundarySequence_t: + time_before_lunch = lunch_start - activity_start + time_after_lunch = activity_finish - lunch_finish + before_lunch_frac = time_before_lunch / (time_before_lunch + time_after_lunch) + n_morning_breaks = round(coffee_breaks * before_lunch_frac) + breaks = ( + self._compute_breaks_in_interval( + activity_start, lunch_start, n_morning_breaks, coffee_duration + ) + + self._compute_breaks_in_interval( + lunch_finish, activity_finish, coffee_breaks - n_morning_breaks, coffee_duration + ) + ) + return breaks + + def exposed_coffee_break_times(self) -> models.BoundarySequence_t: + exposed_coffee_breaks = self.exposed_number_of_coffee_breaks() + if exposed_coffee_breaks == 0: + return () + if self.exposed_lunch_option: + breaks = self._coffee_break_times(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration, self.exposed_lunch_start, self.exposed_lunch_finish) + else: + breaks = self._compute_breaks_in_interval(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration) + return breaks + + def infected_coffee_break_times(self) -> models.BoundarySequence_t: + if self.infected_dont_have_breaks_with_exposed: + infected_coffee_breaks = self.infected_number_of_coffee_breaks() + if infected_coffee_breaks == 0: + return () + if self.infected_lunch_option: + breaks = self._coffee_break_times(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration, self.infected_lunch_start, self.infected_lunch_finish) + else: + breaks = self._compute_breaks_in_interval(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration) + return breaks + else: + return self.exposed_coffee_break_times() + + def generate_specific_break_times(self, population_breaks) -> models.BoundarySequence_t: + break_times = [] + for n in population_breaks: + # Parse break times. + begin = time_string_to_minutes(n["start_time"]) + end = time_string_to_minutes(n["finish_time"]) + for time in [begin, end]: + # For a specific break, the infected and exposed presence is the same. + if not getattr(self, 'infected_start') < time < getattr(self, 'infected_finish'): + raise ValueError(f'All breaks should be within the simulation time. Got {time_minutes_to_string(time)}.') + + break_times.append((begin, end)) + return tuple(break_times) + + def present_interval( + self, + start: int, + finish: int, + breaks: typing.Optional[models.BoundarySequence_t] = None, + ) -> models.Interval: + """ + Calculate the presence interval given the start and end times (in minutes), and + a number of monotonic, non-overlapping, but potentially unsorted, breaks (also in minutes). + + """ + if not breaks: + # If there are no breaks, the interval is the start and end. + return models.SpecificInterval(((start/60, finish/60),)) + + # Order the breaks by their start-time, and ensure that they are monotonic + # and that the start of one break happens after the end of another. + break_boundaries: models.BoundarySequence_t = tuple(sorted(breaks, key=lambda break_pair: break_pair[0])) + + for break_start, break_end in break_boundaries: + if break_start >= break_end: + raise ValueError("Break ends before it begins.") + + prev_break_end = break_boundaries[0][1] + for break_start, break_end in break_boundaries[1:]: + if prev_break_end >= break_start: + raise ValueError(f"A break starts before another ends ({break_start}, {break_end}, {prev_break_end}).") + prev_break_end = break_end + + present_intervals = [] + + current_time = start + LOG.debug(f"starting time march at {_hours2timestring(current_time/60)} to {_hours2timestring(finish/60)}") + + # As we step through the breaks. For each break there are 6 important cases + # we must cover. Let S=start; E=end; Bs=Break start; Be=Break end: + # 1. The interval is entirely before the break. S < E <= Bs < Be + # 2. The interval straddles the start of the break. S < Bs < E <= Be + # 3. The break is entirely inside the interval. S < Bs < Be <= E + # 4. The interval is entirely inside the break. Bs <= S < E <= Be + # 5. The interval straddles the end of the break. Bs <= S < Be <= E + # 6. The interval is entirely after the break. Bs < Be <= S < E + + for current_break in break_boundaries: + if current_time >= finish: + break + + LOG.debug(f"handling break {_hours2timestring(current_break[0]/60)}-{_hours2timestring(current_break[1]/60)} " + f" (current time: {_hours2timestring(current_time/60)})") + + break_s, break_e = current_break + case1 = finish <= break_s + case2 = current_time < break_s < finish < break_e + case3 = current_time < break_s < break_e <= finish + case4 = break_s <= current_time < finish <= break_e + case5 = break_s <= current_time < break_e < finish + case6 = break_e <= current_time + + if case1: + LOG.debug(f"case 1: interval entirely before break") + present_intervals.append((current_time / 60, finish / 60)) + LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} " + f"- {_hours2timestring(present_intervals[-1][1])}") + current_time = finish + elif case2: + LOG.debug(f"case 2: interval straddles start of break") + present_intervals.append((current_time / 60, break_s / 60)) + LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} " + f"- {_hours2timestring(present_intervals[-1][1])}") + current_time = break_e + elif case3: + LOG.debug(f"case 3: break entirely inside interval") + # We add the bit before the break, but not the bit afterwards, + # as it may hit another break. + present_intervals.append((current_time / 60, break_s / 60)) + LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} " + f"- {_hours2timestring(present_intervals[-1][1])}") + current_time = break_e + elif case4: + LOG.debug(f"case 4: interval entirely inside break") + current_time = finish + elif case5: + LOG.debug(f"case 5: interval straddles end of break") + current_time = break_e + elif case6: + LOG.debug(f"case 6: interval entirely after the break") + + if current_time < finish: + LOG.debug("trailing interval") + present_intervals.append((current_time / 60, finish / 60)) + return models.SpecificInterval(tuple(present_intervals)) + + def infected_present_interval(self) -> models.Interval: + if self.specific_breaks != {}: # It means the breaks are specific and not predefined + breaks = self.generate_specific_break_times(self.specific_breaks['infected_breaks']) + else: + breaks = self.infected_lunch_break_times() + self.infected_coffee_break_times() + return self.present_interval( + self.infected_start, self.infected_finish, + breaks=breaks, + ) + + def population_present_interval(self) -> models.Interval: + state_change_times = set(self.infected_present_interval().transition_times()) + state_change_times.update(self.exposed_present_interval().transition_times()) + all_state_changes = sorted(state_change_times) + return models.SpecificInterval(tuple(zip(all_state_changes[:-1], all_state_changes[1:]))) + + def exposed_present_interval(self) -> models.Interval: + if self.specific_breaks != {}: # It means the breaks are specific and not predefined + breaks = self.generate_specific_break_times(self.specific_breaks['exposed_breaks']) + else: + breaks = self.exposed_lunch_break_times() + self.exposed_coffee_break_times() + return self.present_interval( + self.exposed_start, self.exposed_finish, + breaks=breaks, + ) + + +def _hours2timestring(hours: float): + # Convert times like 14.5 to strings, like "14:30" + return f"{int(np.floor(hours)):02d}:{int(np.round((hours % 1) * 60)):02d}" + + +def time_string_to_minutes(time: str) -> minutes_since_midnight: + """ + Converts time from string-format to an integer number of minutes after 00:00 + :param time: A string of the form "HH:MM" representing a time of day + :return: The number of minutes between 'time' and 00:00 + """ + return minutes_since_midnight(60 * int(time[:2]) + int(time[3:])) + + +def time_minutes_to_string(time: int) -> str: + """ + Converts time from an integer number of minutes after 00:00 to string-format + :param time: The number of minutes between 'time' and 00:00 + :return: A string of the form "HH:MM" representing a time of day + """ + return "{0:0=2d}".format(int(time/60)) + ":" + "{0:0=2d}".format(time%60) + + +def string_to_list(s: str) -> list: + return list(ast.literal_eval(s.replace(""", "\""))) + + +def list_to_string(l: list) -> str: + return json.dumps(l) + + +def string_to_dict(s: str) -> dict: + return dict(ast.literal_eval(s.replace(""", "\""))) + + +def dict_to_string(d: dict) -> str: + return json.dumps(d) + + +def _safe_int_cast(value) -> int: + if isinstance(value, int): + return value + elif isinstance(value, float) and int(value) == value: + return int(value) + elif isinstance(value, str) and value.isdecimal(): + return int(value) + else: + raise TypeError(f"Unable to safely cast {value} ({type(value)} type) to int") + + +#: Mapping of field name to a callable which can convert values from form +#: input (URL encoded arguments / string) into the correct type. +_CAST_RULES_FORM_ARG_TO_NATIVE: typing.Dict[str, typing.Callable] = {} + +#: Mapping of field name to callable which can convert native type to values +#: that can be encoded to URL arguments. +_CAST_RULES_NATIVE_TO_FORM_ARG: typing.Dict[str, typing.Callable] = {} + +def cast_class_fields(cls): + for _field in dataclasses.fields(cls): + if _field.type is minutes_since_midnight: + _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = time_string_to_minutes + _CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = time_minutes_to_string + elif _field.type is int: + _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = _safe_int_cast + elif _field.type is float: + _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = float + elif _field.type is bool: + _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = lambda v: v == '1' + _CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = int + elif _field.type is list: + _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_list + _CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = list_to_string + elif _field.type is dict: + _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_dict + _CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = dict_to_string + +cast_class_fields(FormData) diff --git a/caimira/apps/calculator/model_generator.py b/caimira/apps/calculator/model_generator.py index 2e40b19c..19419b96 100644 --- a/caimira/apps/calculator/model_generator.py +++ b/caimira/apps/calculator/model_generator.py @@ -1,10 +1,7 @@ import dataclasses import datetime -import html import logging import typing -import ast -import json import re import numpy as np @@ -14,9 +11,10 @@ from caimira import data import caimira.data.weather import caimira.monte_carlo as mc from .. import calculator +from .form_data import FormData, cast_class_fields, time_string_to_minutes from caimira.monte_carlo.data import activity_distributions, virus_distributions, mask_distributions, short_range_distances from caimira.monte_carlo.data import expiration_distribution, expiration_BLO_factors, expiration_distributions, short_range_expiration_distributions -from .defaults import (NO_DEFAULT, DEFAULT_MC_SAMPLE_SIZE, DEFAULTS, ACTIVITIES, ACTIVITY_TYPES, COFFEE_OPTIONS_INT, CONFIDENCE_LEVEL_OPTIONS, +from .defaults import (DEFAULT_MC_SAMPLE_SIZE, DEFAULTS, ACTIVITIES, ACTIVITY_TYPES, CONFIDENCE_LEVEL_OPTIONS, MECHANICAL_VENTILATION_TYPES, MASK_TYPES, MASK_WEARING_OPTIONS, MONTH_NAMES, VACCINE_BOOSTER_TYPE, VACCINE_TYPE, VENTILATION_TYPES, VIRUS_TYPES, VOLUME_TYPES, WINDOWS_OPENING_REGIMES, WINDOWS_TYPES) from caimira.store.configuration import config @@ -27,37 +25,20 @@ minutes_since_midnight = typing.NewType('minutes_since_midnight', int) @dataclasses.dataclass -class FormData: +class VirusFormData(FormData): activity_type: str air_changes: float air_supply: float arve_sensors_option: bool - specific_breaks: dict precise_activity: dict ceiling_height: float conditional_probability_plot: bool conditional_probability_viral_loads: bool CO2_fitting_result: dict - exposed_coffee_break_option: str - exposed_coffee_duration: int - exposed_finish: minutes_since_midnight - exposed_lunch_finish: minutes_since_midnight - exposed_lunch_option: bool - exposed_lunch_start: minutes_since_midnight - exposed_start: minutes_since_midnight floor_area: float hepa_amount: float hepa_option: bool humidity: str - infected_coffee_break_option: str #Used if infected_dont_have_breaks_with_exposed - infected_coffee_duration: int #Used if infected_dont_have_breaks_with_exposed - infected_dont_have_breaks_with_exposed: bool - infected_finish: minutes_since_midnight - infected_lunch_finish: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed - infected_lunch_option: bool #Used if infected_dont_have_breaks_with_exposed - infected_lunch_start: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed - infected_people: int - infected_start: minutes_since_midnight inside_temp: float location_name: str location_latitude: float @@ -74,9 +55,7 @@ class FormData: event_month: str room_heating_option: bool room_number: str - room_volume: float simulation_name: str - total_people: int vaccine_option: bool vaccine_booster_option: bool vaccine_type: str @@ -96,119 +75,7 @@ class FormData: short_range_interactions: list _DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS - - @classmethod - def from_dict(cls, form_data: typing.Dict) -> "FormData": - # Take a copy of the form data so that we can mutate it. - form_data = form_data.copy() - form_data.pop('_xsrf', None) - - # Don't let arbitrary unescaped HTML through the net. - for key, value in form_data.items(): - if isinstance(value, str): - form_data[key] = html.escape(value) - - for key, default_value in cls._DEFAULTS.items(): - if form_data.get(key, '') == '': - if default_value is NO_DEFAULT: - raise ValueError(f"{key} must be specified") - form_data[key] = default_value - - for key, value in form_data.items(): - if key in _CAST_RULES_FORM_ARG_TO_NATIVE: - form_data[key] = _CAST_RULES_FORM_ARG_TO_NATIVE[key](value) - - if key not in cls._DEFAULTS: - raise ValueError(f'Invalid argument "{html.escape(key)}" given') - - instance = cls(**form_data) - instance.validate() - return instance - - @classmethod - def to_dict(cls, form: "FormData", strip_defaults: bool = False) -> dict: - form_dict = { - field.name: getattr(form, field.name) - for field in dataclasses.fields(form) - } - - for attr, value in form_dict.items(): - if attr in _CAST_RULES_NATIVE_TO_FORM_ARG: - form_dict[attr] = _CAST_RULES_NATIVE_TO_FORM_ARG[attr](value) - - if strip_defaults: - del form_dict['calculator_version'] - - for attr, value in list(form_dict.items()): - default = cls._DEFAULTS.get(attr, NO_DEFAULT) - if default is not NO_DEFAULT and value in [default, 'not-applicable']: - form_dict.pop(attr) - return form_dict - def validate_population_parameters(self): - # Validate number of infected <= number of total people - if self.infected_people >= self.total_people: - raise ValueError('Number of infected people cannot be greater or equal to the number of total people.') - - # Validate time intervals selected by user - time_intervals = [ - ['exposed_start', 'exposed_finish'], - ['infected_start', 'infected_finish'], - ] - if self.exposed_lunch_option: - time_intervals.append(['exposed_lunch_start', 'exposed_lunch_finish']) - if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option: - time_intervals.append(['infected_lunch_start', 'infected_lunch_finish']) - - for start_name, end_name in time_intervals: - start = getattr(self, start_name) - end = getattr(self, end_name) - if start > end: - raise ValueError( - f"{start_name} must be less than {end_name}. Got {start} and {end}.") - - def validate_lunch(start, finish): - lunch_start = getattr(self, f'{population}_lunch_start') - lunch_finish = getattr(self, f'{population}_lunch_finish') - return (start <= lunch_start <= finish and - start <= lunch_finish <= finish) - - def get_lunch_mins(population): - lunch_mins = 0 - if getattr(self, f'{population}_lunch_option'): - lunch_mins = getattr(self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start') - return lunch_mins - - def get_coffee_mins(population): - coffee_mins = 0 - if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0': - coffee_mins = COFFEE_OPTIONS_INT[getattr(self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration') - return coffee_mins - - def get_activity_mins(population): - return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start') - - populations = ['exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed'] - for population in populations: - # Validate lunch time within the activity times. - if (getattr(self, f'{population}_lunch_option') and - not validate_lunch(getattr(self, f'{population}_start'), getattr(self, f'{population}_finish')) - ): - raise ValueError( - f"{population} lunch break must be within presence times." - ) - - # Length of breaks < length of activity - if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population): - raise ValueError( - f"Length of breaks >= Length of {population} presence." - ) - - for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT), - ('infected_coffee_break_option', COFFEE_OPTIONS_INT)]: - if getattr(self, attr_name) not in valid_set: - raise ValueError(f"{getattr(self, attr_name)} is not a valid value for {attr_name}") - def validate(self): # Validate population parameters self.validate_population_parameters() @@ -591,212 +458,11 @@ class FormData: ) return exposed - def _compute_breaks_in_interval(self, start, finish, n_breaks, duration) -> models.BoundarySequence_t: - break_delay = ((finish - start) - (n_breaks * duration)) // (n_breaks+1) - break_times = [] - end = start - for n in range(n_breaks): - begin = end + break_delay - end = begin + duration - break_times.append((begin, end)) - return tuple(break_times) - - def exposed_lunch_break_times(self) -> models.BoundarySequence_t: - result = [] - if self.exposed_lunch_option: - result.append((self.exposed_lunch_start, self.exposed_lunch_finish)) - return tuple(result) - - def infected_lunch_break_times(self) -> models.BoundarySequence_t: - if self.infected_dont_have_breaks_with_exposed: - result = [] - if self.infected_lunch_option: - result.append((self.infected_lunch_start, self.infected_lunch_finish)) - return tuple(result) - else: - return self.exposed_lunch_break_times() - - def exposed_number_of_coffee_breaks(self) -> int: - return COFFEE_OPTIONS_INT[self.exposed_coffee_break_option] - - def infected_number_of_coffee_breaks(self) -> int: - return COFFEE_OPTIONS_INT[self.infected_coffee_break_option] - - def _coffee_break_times(self, activity_start, activity_finish, coffee_breaks, coffee_duration, lunch_start, lunch_finish) -> models.BoundarySequence_t: - time_before_lunch = lunch_start - activity_start - time_after_lunch = activity_finish - lunch_finish - before_lunch_frac = time_before_lunch / (time_before_lunch + time_after_lunch) - n_morning_breaks = round(coffee_breaks * before_lunch_frac) - breaks = ( - self._compute_breaks_in_interval( - activity_start, lunch_start, n_morning_breaks, coffee_duration - ) - + self._compute_breaks_in_interval( - lunch_finish, activity_finish, coffee_breaks - n_morning_breaks, coffee_duration - ) - ) - return breaks - - def exposed_coffee_break_times(self) -> models.BoundarySequence_t: - exposed_coffee_breaks = self.exposed_number_of_coffee_breaks() - if exposed_coffee_breaks == 0: - return () - if self.exposed_lunch_option: - breaks = self._coffee_break_times(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration, self.exposed_lunch_start, self.exposed_lunch_finish) - else: - breaks = self._compute_breaks_in_interval(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration) - return breaks - - def infected_coffee_break_times(self) -> models.BoundarySequence_t: - if self.infected_dont_have_breaks_with_exposed: - infected_coffee_breaks = self.infected_number_of_coffee_breaks() - if infected_coffee_breaks == 0: - return () - if self.infected_lunch_option: - breaks = self._coffee_break_times(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration, self.infected_lunch_start, self.infected_lunch_finish) - else: - breaks = self._compute_breaks_in_interval(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration) - return breaks - else: - return self.exposed_coffee_break_times() - - def generate_specific_break_times(self, population_breaks) -> models.BoundarySequence_t: - break_times = [] - for n in population_breaks: - # Parse break times. - begin = time_string_to_minutes(n["start_time"]) - end = time_string_to_minutes(n["finish_time"]) - for time in [begin, end]: - # For a specific break, the infected and exposed presence is the same. - if not getattr(self, 'infected_start') < time < getattr(self, 'infected_finish'): - raise ValueError(f'All breaks should be within the simulation time. Got {time_minutes_to_string(time)}.') - - break_times.append((begin, end)) - return tuple(break_times) - - def present_interval( - self, - start: int, - finish: int, - breaks: typing.Optional[models.BoundarySequence_t] = None, - ) -> models.Interval: - """ - Calculate the presence interval given the start and end times (in minutes), and - a number of monotonic, non-overlapping, but potentially unsorted, breaks (also in minutes). - - """ - if not breaks: - # If there are no breaks, the interval is the start and end. - return models.SpecificInterval(((start/60, finish/60),)) - - # Order the breaks by their start-time, and ensure that they are monotonic - # and that the start of one break happens after the end of another. - break_boundaries: models.BoundarySequence_t = tuple(sorted(breaks, key=lambda break_pair: break_pair[0])) - - for break_start, break_end in break_boundaries: - if break_start >= break_end: - raise ValueError("Break ends before it begins.") - - prev_break_end = break_boundaries[0][1] - for break_start, break_end in break_boundaries[1:]: - if prev_break_end >= break_start: - raise ValueError(f"A break starts before another ends ({break_start}, {break_end}, {prev_break_end}).") - prev_break_end = break_end - - present_intervals = [] - - current_time = start - LOG.debug(f"starting time march at {_hours2timestring(current_time/60)} to {_hours2timestring(finish/60)}") - - # As we step through the breaks. For each break there are 6 important cases - # we must cover. Let S=start; E=end; Bs=Break start; Be=Break end: - # 1. The interval is entirely before the break. S < E <= Bs < Be - # 2. The interval straddles the start of the break. S < Bs < E <= Be - # 3. The break is entirely inside the interval. S < Bs < Be <= E - # 4. The interval is entirely inside the break. Bs <= S < E <= Be - # 5. The interval straddles the end of the break. Bs <= S < Be <= E - # 6. The interval is entirely after the break. Bs < Be <= S < E - - for current_break in break_boundaries: - if current_time >= finish: - break - - LOG.debug(f"handling break {_hours2timestring(current_break[0]/60)}-{_hours2timestring(current_break[1]/60)} " - f" (current time: {_hours2timestring(current_time/60)})") - - break_s, break_e = current_break - case1 = finish <= break_s - case2 = current_time < break_s < finish < break_e - case3 = current_time < break_s < break_e <= finish - case4 = break_s <= current_time < finish <= break_e - case5 = break_s <= current_time < break_e < finish - case6 = break_e <= current_time - - if case1: - LOG.debug(f"case 1: interval entirely before break") - present_intervals.append((current_time / 60, finish / 60)) - LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} " - f"- {_hours2timestring(present_intervals[-1][1])}") - current_time = finish - elif case2: - LOG.debug(f"case 2: interval straddles start of break") - present_intervals.append((current_time / 60, break_s / 60)) - LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} " - f"- {_hours2timestring(present_intervals[-1][1])}") - current_time = break_e - elif case3: - LOG.debug(f"case 3: break entirely inside interval") - # We add the bit before the break, but not the bit afterwards, - # as it may hit another break. - present_intervals.append((current_time / 60, break_s / 60)) - LOG.debug(f" + added interval {_hours2timestring(present_intervals[-1][0])} " - f"- {_hours2timestring(present_intervals[-1][1])}") - current_time = break_e - elif case4: - LOG.debug(f"case 4: interval entirely inside break") - current_time = finish - elif case5: - LOG.debug(f"case 5: interval straddles end of break") - current_time = break_e - elif case6: - LOG.debug(f"case 6: interval entirely after the break") - - if current_time < finish: - LOG.debug("trailing interval") - present_intervals.append((current_time / 60, finish / 60)) - return models.SpecificInterval(tuple(present_intervals)) - - def infected_present_interval(self) -> models.Interval: - if self.specific_breaks != {}: # It means the breaks are specific and not predefined - breaks = self.generate_specific_break_times(self.specific_breaks['infected_breaks']) - else: - breaks = self.infected_lunch_break_times() + self.infected_coffee_break_times() - return self.present_interval( - self.infected_start, self.infected_finish, - breaks=breaks, - ) - - def population_present_interval(self) -> models.Interval: - state_change_times = set(self.infected_present_interval().transition_times()) - state_change_times.update(self.exposed_present_interval().transition_times()) - all_state_changes = sorted(state_change_times) - return models.SpecificInterval(tuple(zip(all_state_changes[:-1], all_state_changes[1:]))) - def short_range_interval(self, interaction) -> models.SpecificInterval: start_time = time_string_to_minutes(interaction['start_time']) duration = float(interaction['duration']) return models.SpecificInterval(present_times=((start_time/60, (start_time + duration)/60),)) - def exposed_present_interval(self) -> models.Interval: - if self.specific_breaks != {}: # It means the breaks are specific and not predefined - breaks = self.generate_specific_break_times(self.specific_breaks['exposed_breaks']) - else: - breaks = self.exposed_lunch_break_times() + self.exposed_coffee_break_times() - return self.present_interval( - self.exposed_start, self.exposed_finish, - breaks=breaks, - ) - def build_expiration(expiration_definition) -> mc._ExpirationBase: if isinstance(expiration_definition, str): @@ -875,80 +541,4 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]: 'short_range_interactions': '[]', } - -def _hours2timestring(hours: float): - # Convert times like 14.5 to strings, like "14:30" - return f"{int(np.floor(hours)):02d}:{int(np.round((hours % 1) * 60)):02d}" - - -def time_string_to_minutes(time: str) -> minutes_since_midnight: - """ - Converts time from string-format to an integer number of minutes after 00:00 - :param time: A string of the form "HH:MM" representing a time of day - :return: The number of minutes between 'time' and 00:00 - """ - return minutes_since_midnight(60 * int(time[:2]) + int(time[3:])) - - -def time_minutes_to_string(time: int) -> str: - """ - Converts time from an integer number of minutes after 00:00 to string-format - :param time: The number of minutes between 'time' and 00:00 - :return: A string of the form "HH:MM" representing a time of day - """ - return "{0:0=2d}".format(int(time/60)) + ":" + "{0:0=2d}".format(time%60) - - -def string_to_list(s: str) -> list: - return list(ast.literal_eval(s.replace(""", "\""))) - - -def list_to_string(l: list) -> str: - return json.dumps(l) - - -def string_to_dict(s: str) -> dict: - return dict(ast.literal_eval(s.replace(""", "\""))) - - -def dict_to_string(d: dict) -> str: - return json.dumps(d) - - -def _safe_int_cast(value) -> int: - if isinstance(value, int): - return value - elif isinstance(value, float) and int(value) == value: - return int(value) - elif isinstance(value, str) and value.isdecimal(): - return int(value) - else: - raise TypeError(f"Unable to safely cast {value} ({type(value)} type) to int") - - -#: Mapping of field name to a callable which can convert values from form -#: input (URL encoded arguments / string) into the correct type. -_CAST_RULES_FORM_ARG_TO_NATIVE: typing.Dict[str, typing.Callable] = {} - -#: Mapping of field name to callable which can convert native type to values -#: that can be encoded to URL arguments. -_CAST_RULES_NATIVE_TO_FORM_ARG: typing.Dict[str, typing.Callable] = {} - - -for _field in dataclasses.fields(FormData): - if _field.type is minutes_since_midnight: - _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = time_string_to_minutes - _CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = time_minutes_to_string - elif _field.type is int: - _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = _safe_int_cast - elif _field.type is float: - _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = float - elif _field.type is bool: - _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = lambda v: v == '1' - _CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = int - elif _field.type is list: - _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_list - _CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = list_to_string - elif _field.type is dict: - _CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_dict - _CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = dict_to_string +cast_class_fields(VirusFormData) diff --git a/caimira/apps/calculator/report_generator.py b/caimira/apps/calculator/report_generator.py index 15f2cc8a..e3757229 100644 --- a/caimira/apps/calculator/report_generator.py +++ b/caimira/apps/calculator/report_generator.py @@ -15,7 +15,7 @@ import matplotlib.pyplot as plt from caimira import models from caimira.apps.calculator import markdown_tools from ... import monte_carlo as mc -from .model_generator import FormData, DEFAULT_MC_SAMPLE_SIZE +from .model_generator import VirusFormData, DEFAULT_MC_SAMPLE_SIZE from ... import dataclass_utils from caimira.store.configuration import config @@ -102,7 +102,7 @@ def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional return nice_times -def concentrations_with_sr_breathing(form: FormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]: +def concentrations_with_sr_breathing(form: VirusFormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]: lower_concentrations = [] for time in times: for index, (start, stop) in enumerate(short_range_intervals): @@ -114,7 +114,7 @@ def concentrations_with_sr_breathing(form: FormData, model: models.ExposureModel return lower_concentrations -def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing.Dict[str, typing.Any]: +def calculate_report_data(form: VirusFormData, model: models.ExposureModel) -> typing.Dict[str, typing.Any]: times = interesting_times(model) short_range_intervals = [interaction.presence.boundaries()[0] for interaction in model.short_range] short_range_expirations = [interaction['expiration'] for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else [] @@ -175,8 +175,8 @@ def calculate_report_data(form: FormData, model: models.ExposureModel) -> typing } -def generate_permalink(base_url, get_root_url, get_root_calculator_url, form: FormData): - form_dict = FormData.to_dict(form, strip_defaults=True) +def generate_permalink(base_url, get_root_url, get_root_calculator_url, form: VirusFormData): + form_dict = VirusFormData.to_dict(form, strip_defaults=True) # Generate the calculator URL arguments that would be needed to re-create this # form. @@ -353,7 +353,7 @@ def manufacture_viral_load_scenarios_percentiles(model: mc.ExposureModel) -> typ return scenarios -def manufacture_alternative_scenarios(form: FormData) -> typing.Dict[str, mc.ExposureModel]: +def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]: scenarios = {} if (form.short_range_option == "short_range_no"): # Two special option cases - HEPA and/or FFP2 masks. @@ -424,7 +424,7 @@ def scenario_statistics(mc_model: mc.ExposureModel, sample_times: typing.List[fl def comparison_report( - form: FormData, + form: VirusFormData, report_data: typing.Dict[str, typing.Any], scenarios: typing.Dict[str, mc.ExposureModel], sample_times: typing.List[float], @@ -472,7 +472,7 @@ class ReportGenerator: def build_report( self, base_url: str, - form: FormData, + form: VirusFormData, executor_factory: typing.Callable[[], concurrent.futures.Executor], ) -> str: model = form.build_model() @@ -483,7 +483,7 @@ class ReportGenerator: self, base_url: str, model: models.ExposureModel, - form: FormData, + form: VirusFormData, executor_factory: typing.Callable[[], concurrent.futures.Executor], ) -> dict: now = datetime.utcnow().astimezone() diff --git a/caimira/tests/apps/calculator/conftest.py b/caimira/tests/apps/calculator/conftest.py index 7e46d0cf..e4eedb11 100644 --- a/caimira/tests/apps/calculator/conftest.py +++ b/caimira/tests/apps/calculator/conftest.py @@ -10,4 +10,4 @@ def baseline_form_data(): @pytest.fixture def baseline_form(baseline_form_data): - return model_generator.FormData.from_dict(baseline_form_data) + return model_generator.VirusFormData.from_dict(baseline_form_data) diff --git a/caimira/tests/apps/calculator/test_model_generator.py b/caimira/tests/apps/calculator/test_model_generator.py index 4d1f5756..697e962f 100644 --- a/caimira/tests/apps/calculator/test_model_generator.py +++ b/caimira/tests/apps/calculator/test_model_generator.py @@ -7,21 +7,22 @@ import pytest from retry import retry from caimira.apps.calculator import model_generator -from caimira.apps.calculator.model_generator import _hours2timestring -from caimira.apps.calculator.model_generator import minutes_since_midnight +from caimira.apps.calculator.form_data import (_hours2timestring, minutes_since_midnight, + _CAST_RULES_FORM_ARG_TO_NATIVE, _CAST_RULES_NATIVE_TO_FORM_ARG) from caimira import models from caimira.monte_carlo.data import expiration_distributions +from caimira.apps.calculator.defaults import NO_DEFAULT def test_model_from_dict(baseline_form_data): - form = model_generator.FormData.from_dict(baseline_form_data) + form = model_generator.VirusFormData.from_dict(baseline_form_data) assert isinstance(form.build_model(), models.ExposureModel) def test_model_from_dict_invalid(baseline_form_data): baseline_form_data['invalid_item'] = 'foobar' with pytest.raises(ValueError, match='Invalid argument "invalid_item" given'): - model_generator.FormData.from_dict(baseline_form_data) + model_generator.VirusFormData.from_dict(baseline_form_data) @retry(tries=10) @@ -44,7 +45,7 @@ def test_blend_expiration(mask_type): npt.assert_allclose(r.aerosols(mask).mean(), expected, rtol=TOLERANCE) -def test_ventilation_slidingwindow(baseline_form: model_generator.FormData): +def test_ventilation_slidingwindow(baseline_form: model_generator.VirusFormData): baseline_form.ventilation_type = 'natural_ventilation' baseline_form.windows_duration = 10 baseline_form.windows_frequency = 120 @@ -74,7 +75,7 @@ def test_ventilation_slidingwindow(baseline_form: model_generator.FormData): assert ventilation == baseline_vent -def test_ventilation_hingedwindow(baseline_form: model_generator.FormData): +def test_ventilation_hingedwindow(baseline_form: model_generator.VirusFormData): baseline_form.ventilation_type = 'natural_ventilation' baseline_form.windows_duration = 10 baseline_form.windows_frequency = 120 @@ -104,7 +105,7 @@ def test_ventilation_hingedwindow(baseline_form: model_generator.FormData): assert ventilation == baseline_vent -def test_ventilation_mechanical(baseline_form: model_generator.FormData): +def test_ventilation_mechanical(baseline_form: model_generator.VirusFormData): room = models.Room(volume=75, inside_temp=models.PiecewiseConstant((0, 24), (293,))) mech = models.HVACMechanical( active=models.PeriodicInterval(period=120, duration=120), @@ -119,7 +120,7 @@ def test_ventilation_mechanical(baseline_form: model_generator.FormData): np.array([baseline_form.ventilation().air_exchange(room, t) for t in ts])) -def test_ventilation_airchanges(baseline_form: model_generator.FormData): +def test_ventilation_airchanges(baseline_form: model_generator.VirusFormData): room = models.Room(75, inside_temp=models.PiecewiseConstant((0, 24), (293,))) airchange = models.AirChange( active=models.PeriodicInterval(period=120, duration=120), @@ -134,7 +135,7 @@ def test_ventilation_airchanges(baseline_form: model_generator.FormData): np.array([baseline_form.ventilation().air_exchange(room, t) for t in ts])) -def test_ventilation_window_hepa(baseline_form: model_generator.FormData): +def test_ventilation_window_hepa(baseline_form: model_generator.VirusFormData): baseline_form.ventilation_type = 'natural_ventilation' baseline_form.windows_duration = 10 baseline_form.windows_frequency = 120 @@ -177,7 +178,7 @@ def test_ventilation_window_hepa(baseline_form: model_generator.FormData): ] ) def test_infected_less_than_total_people(activity, total_people, infected_people, error, - baseline_form: model_generator.FormData): + baseline_form: model_generator.VirusFormData): baseline_form.activity_type = activity baseline_form.total_people = total_people baseline_form.infected_people = infected_people @@ -190,7 +191,7 @@ def present_times(interval: models.Interval) -> models.BoundarySequence_t: return interval.present_times -def test_infected_present_intervals(baseline_form: model_generator.FormData): +def test_infected_present_intervals(baseline_form: model_generator.VirusFormData): baseline_form.infected_dont_have_breaks_with_exposed = False baseline_form.exposed_coffee_duration = 15 baseline_form.exposed_coffee_break_option = 'coffee_break_2' @@ -204,7 +205,7 @@ def test_infected_present_intervals(baseline_form: model_generator.FormData): assert present_times(baseline_form.infected_present_interval()) == correct -def test_exposed_present_intervals(baseline_form: model_generator.FormData): +def test_exposed_present_intervals(baseline_form: model_generator.VirusFormData): baseline_form.exposed_coffee_duration = 15 baseline_form.exposed_coffee_break_option = 'coffee_break_2' baseline_form.exposed_start = minutes_since_midnight(9 * 60) @@ -215,7 +216,7 @@ def test_exposed_present_intervals(baseline_form: model_generator.FormData): assert present_times(baseline_form.exposed_present_interval()) == correct -def test_present_intervals_common_breaks(baseline_form: model_generator.FormData): +def test_present_intervals_common_breaks(baseline_form: model_generator.VirusFormData): baseline_form.infected_dont_have_breaks_with_exposed = False baseline_form.infected_coffee_duration = baseline_form.exposed_coffee_duration = 15 baseline_form.infected_coffee_break_option = baseline_form.exposed_coffee_break_option = 'coffee_break_2' @@ -231,7 +232,7 @@ def test_present_intervals_common_breaks(baseline_form: model_generator.FormData assert present_times(baseline_form.infected_present_interval()) == correct_infected -def test_present_intervals_split_breaks(baseline_form: model_generator.FormData): +def test_present_intervals_split_breaks(baseline_form: model_generator.VirusFormData): baseline_form.infected_dont_have_breaks_with_exposed = True baseline_form.infected_coffee_duration = baseline_form.exposed_coffee_duration = 15 baseline_form.infected_coffee_break_option = baseline_form.exposed_coffee_break_option = 'coffee_break_2' @@ -247,7 +248,7 @@ def test_present_intervals_split_breaks(baseline_form: model_generator.FormData) assert present_times(baseline_form.infected_present_interval()) == correct_infected -def test_exposed_present_intervals_starting_with_lunch(baseline_form: model_generator.FormData): +def test_exposed_present_intervals_starting_with_lunch(baseline_form: model_generator.VirusFormData): baseline_form.exposed_coffee_break_option = 'coffee_break_0' baseline_form.exposed_start = baseline_form.exposed_lunch_start = minutes_since_midnight(13 * 60) baseline_form.exposed_finish = minutes_since_midnight(18 * 60) @@ -256,7 +257,7 @@ def test_exposed_present_intervals_starting_with_lunch(baseline_form: model_gene assert present_times(baseline_form.exposed_present_interval()) == correct -def test_exposed_present_intervals_ending_with_lunch(baseline_form: model_generator.FormData): +def test_exposed_present_intervals_ending_with_lunch(baseline_form: model_generator.VirusFormData): baseline_form.exposed_coffee_break_option = 'coffee_break_0' baseline_form.exposed_start = minutes_since_midnight(11 * 60) baseline_form.exposed_finish = baseline_form.exposed_lunch_start = minutes_since_midnight(13 * 60) @@ -265,7 +266,7 @@ def test_exposed_present_intervals_ending_with_lunch(baseline_form: model_genera assert present_times(baseline_form.exposed_present_interval()) == correct -def test_exposed_present_lunch_end_before_beginning(baseline_form: model_generator.FormData): +def test_exposed_present_lunch_end_before_beginning(baseline_form: model_generator.VirusFormData): baseline_form.exposed_coffee_break_option = 'coffee_break_0' baseline_form.exposed_lunch_start = minutes_since_midnight(14 * 60) baseline_form.exposed_lunch_finish = minutes_since_midnight(13 * 60) @@ -282,7 +283,7 @@ def test_exposed_present_lunch_end_before_beginning(baseline_form: model_generat [9, 20], # lunch_finish after the presence finishing ], ) -def test_exposed_presence_lunch_break(baseline_form: model_generator.FormData, exposed_lunch_start, exposed_lunch_finish): +def test_exposed_presence_lunch_break(baseline_form: model_generator.VirusFormData, exposed_lunch_start, exposed_lunch_finish): baseline_form.exposed_lunch_start = minutes_since_midnight(exposed_lunch_start * 60) baseline_form.exposed_lunch_finish = minutes_since_midnight(exposed_lunch_finish * 60) with pytest.raises(ValueError, match='exposed lunch break must be within presence times.'): @@ -298,14 +299,14 @@ def test_exposed_presence_lunch_break(baseline_form: model_generator.FormData, e [9, 20], # lunch_finish after the presence finishing ], ) -def test_infected_presence_lunch_break(baseline_form: model_generator.FormData, infected_lunch_start, infected_lunch_finish): +def test_infected_presence_lunch_break(baseline_form: model_generator.VirusFormData, infected_lunch_start, infected_lunch_finish): baseline_form.infected_lunch_start = minutes_since_midnight(infected_lunch_start * 60) baseline_form.infected_lunch_finish = minutes_since_midnight(infected_lunch_finish * 60) with pytest.raises(ValueError, match='infected lunch break must be within presence times.'): baseline_form.validate() -def test_exposed_breaks_length(baseline_form: model_generator.FormData): +def test_exposed_breaks_length(baseline_form: model_generator.VirusFormData): baseline_form.exposed_coffee_break_option = 'coffee_break_4' baseline_form.exposed_coffee_duration = 30 baseline_form.exposed_start = minutes_since_midnight(10 * 60) @@ -315,7 +316,7 @@ def test_exposed_breaks_length(baseline_form: model_generator.FormData): baseline_form.validate() -def test_infected_breaks_length(baseline_form: model_generator.FormData): +def test_infected_breaks_length(baseline_form: model_generator.VirusFormData): baseline_form.infected_start = minutes_since_midnight(9 * 60) baseline_form.infected_finish = minutes_since_midnight(12 * 60) baseline_form.infected_lunch_start = minutes_since_midnight(10 * 60) @@ -327,7 +328,7 @@ def test_infected_breaks_length(baseline_form: model_generator.FormData): @pytest.fixture -def coffee_break_between_1045_and_1115(baseline_form: model_generator.FormData): +def coffee_break_between_1045_and_1115(baseline_form: model_generator.VirusFormData): baseline_form.exposed_coffee_break_option = 'coffee_break_1' baseline_form.exposed_coffee_duration = 30 baseline_form.exposed_start = minutes_since_midnight(10 * 60) @@ -385,7 +386,7 @@ def assert_boundaries(interval, boundaries_in_time_string_form): @pytest.fixture -def breaks_every_25_mins_for_20_mins(baseline_form: model_generator.FormData): +def breaks_every_25_mins_for_20_mins(baseline_form: model_generator.VirusFormData): baseline_form.exposed_coffee_break_option = 'coffee_break_4' baseline_form.exposed_coffee_duration = 20 baseline_form.exposed_start = time2mins("10:00") @@ -430,7 +431,7 @@ def test_present_only_during_second_break(breaks_every_25_mins_for_20_mins): assert_boundaries(interval, []) -def test_valid_no_lunch(baseline_form: model_generator.FormData): +def test_valid_no_lunch(baseline_form: model_generator.VirusFormData): # Check that it is valid to have a 0 length lunch if no lunch is selected. baseline_form.exposed_lunch_option = False baseline_form.exposed_lunch_start = minutes_since_midnight(0) @@ -438,7 +439,7 @@ def test_valid_no_lunch(baseline_form: model_generator.FormData): assert baseline_form.validate() is None -def test_no_breaks(baseline_form: model_generator.FormData): +def test_no_breaks(baseline_form: model_generator.VirusFormData): # Check that the times are correct in the absence of breaks. baseline_form.infected_dont_have_breaks_with_exposed = False baseline_form.exposed_lunch_option = False @@ -453,7 +454,7 @@ def test_no_breaks(baseline_form: model_generator.FormData): assert present_times(baseline_form.infected_present_interval()) == infected_correct -def test_coffee_lunch_breaks(baseline_form: model_generator.FormData): +def test_coffee_lunch_breaks(baseline_form: model_generator.VirusFormData): baseline_form.exposed_coffee_duration = 30 baseline_form.exposed_coffee_break_option = 'coffee_break_4' baseline_form.exposed_start = minutes_since_midnight(9 * 60) @@ -465,7 +466,7 @@ def test_coffee_lunch_breaks(baseline_form: model_generator.FormData): np.testing.assert_allclose(present_times(baseline_form.exposed_present_interval()), correct, rtol=1e-14) -def test_coffee_lunch_breaks_unbalance(baseline_form: model_generator.FormData): +def test_coffee_lunch_breaks_unbalance(baseline_form: model_generator.VirusFormData): baseline_form.exposed_coffee_duration = 30 baseline_form.exposed_coffee_break_option = 'coffee_break_2' baseline_form.exposed_start = minutes_since_midnight(9 * 60) @@ -476,7 +477,7 @@ def test_coffee_lunch_breaks_unbalance(baseline_form: model_generator.FormData): np.testing.assert_allclose(present_times(baseline_form.exposed_present_interval()), correct, rtol=1e-14) -def test_coffee_breaks(baseline_form: model_generator.FormData): +def test_coffee_breaks(baseline_form: model_generator.VirusFormData): baseline_form.exposed_coffee_duration = 10 baseline_form.exposed_coffee_break_option = 'coffee_break_4' baseline_form.exposed_start = minutes_since_midnight(9 * 60) @@ -489,24 +490,24 @@ def test_coffee_breaks(baseline_form: model_generator.FormData): def test_key_validation(baseline_form_data): baseline_form_data['activity_type'] = 'invalid key' with pytest.raises(ValueError): - model_generator.FormData.from_dict(baseline_form_data) + model_generator.VirusFormData.from_dict(baseline_form_data) def test_key_validation_natural_ventilation_window_type_na(baseline_form_data): baseline_form_data['ventilation_type'] = 'natural_ventilation' baseline_form_data['window_type'] = 'not-applicable' with pytest.raises(ValueError, match='window_type cannot be \'not-applicable\''): - model_generator.FormData.from_dict(baseline_form_data) + model_generator.VirusFormData.from_dict(baseline_form_data) def test_key_validation_natural_ventilation_window_opening_regime_na(baseline_form_data): baseline_form_data['ventilation_type'] = 'natural_ventilation' baseline_form_data['window_opening_regime'] = 'not-applicable' with pytest.raises(ValueError, match='window_opening_regime cannot be \'not-applicable\''): - model_generator.FormData.from_dict(baseline_form_data) + model_generator.VirusFormData.from_dict(baseline_form_data) -def test_natural_ventilation_window_opening_periodically(baseline_form: model_generator.FormData): +def test_natural_ventilation_window_opening_periodically(baseline_form: model_generator.VirusFormData): baseline_form.window_opening_regime = 'windows_open_periodically' baseline_form.windows_duration = 20 baseline_form.windows_frequency = 10 @@ -518,20 +519,20 @@ def test_key_validation_mech_ventilation_type_na(baseline_form_data): baseline_form_data['ventilation_type'] = 'mechanical_ventilation' baseline_form_data['mechanical_ventilation_type'] = 'not-applicable' with pytest.raises(ValueError, match='mechanical_ventilation_type cannot be \'not-applicable\''): - model_generator.FormData.from_dict(baseline_form_data) + model_generator.VirusFormData.from_dict(baseline_form_data) def test_key_validation_event_month(baseline_form_data): baseline_form_data['event_month'] = 'invalid month' with pytest.raises(ValueError, match='invalid month is not a valid value for event_month'): - model_generator.FormData.from_dict(baseline_form_data) + model_generator.VirusFormData.from_dict(baseline_form_data) def test_default_types(): - # Validate that FormData._DEFAULTS are complete and of the correct type. + # Validate that VirusFormData._DEFAULTS are complete and of the correct type. # Validate that we have the right types and matching attributes to the DEFAULTS. - fields = {field.name: field for field in dataclasses.fields(model_generator.FormData)} - for field, value in model_generator.FormData._DEFAULTS.items(): + fields = {field.name: field for field in dataclasses.fields(model_generator.VirusFormData)} + for field, value in model_generator.VirusFormData._DEFAULTS.items(): if field not in fields: raise ValueError(f"Unmatched default {field}") @@ -540,17 +541,17 @@ def test_default_types(): # Handle typing.NewType definitions. field_type = field_type.__supertype__ - if value is model_generator.NO_DEFAULT: + if value is NO_DEFAULT: continue - if field in model_generator._CAST_RULES_FORM_ARG_TO_NATIVE: - value = model_generator._CAST_RULES_FORM_ARG_TO_NATIVE[field](value) + if field in _CAST_RULES_FORM_ARG_TO_NATIVE: + value = _CAST_RULES_FORM_ARG_TO_NATIVE[field](value) if not isinstance(value, field_type): raise TypeError(f'{field} has type {field_type}, got {type(value)}') for field in fields.values(): - assert field.name in model_generator.FormData._DEFAULTS, f"No default set for field name {field.name}" + assert field.name in model_generator.VirusFormData._DEFAULTS, f"No default set for field name {field.name}" def test_form_to_dict(baseline_form): @@ -559,7 +560,7 @@ def test_form_to_dict(baseline_form): assert 1 < len(stripped) < len(full) assert 'exposed_coffee_break_option' in stripped # If we set the value to the default one, it should no longer turn up in the dictionary. - baseline_form.exposed_coffee_break_option = model_generator.FormData._DEFAULTS['exposed_coffee_break_option'] + baseline_form.exposed_coffee_break_option = model_generator.VirusFormData._DEFAULTS['exposed_coffee_break_option'] assert 'exposed_coffee_break_option' not in baseline_form.to_dict(baseline_form, strip_defaults=True) @@ -577,7 +578,7 @@ def test_form_timezone(baseline_form_data, longitude, latitude, month, expected_ baseline_form_data['location_latitude'] = latitude baseline_form_data['location_longitude'] = longitude baseline_form_data['event_month'] = month - form = model_generator.FormData.from_dict(baseline_form_data) + form = model_generator.VirusFormData.from_dict(baseline_form_data) name, offset = form.tz_name_and_utc_offset() assert name == expected_tz_name assert offset == expected_offset diff --git a/caimira/tests/apps/calculator/test_specific_model_generator.py b/caimira/tests/apps/calculator/test_specific_model_generator.py index e1d68e11..be2ac7dc 100644 --- a/caimira/tests/apps/calculator/test_specific_model_generator.py +++ b/caimira/tests/apps/calculator/test_specific_model_generator.py @@ -13,7 +13,7 @@ from caimira.apps.calculator import model_generator [{"exposed_breaks": [], "ifected_breaks": []}, 'Unable to fetch "infected_breaks" key. Got "ifected_breaks".'], ] ) -def test_specific_break_structure(break_input, error, baseline_form: model_generator.FormData): +def test_specific_break_structure(break_input, error, baseline_form: model_generator.VirusFormData): baseline_form.specific_breaks = break_input with pytest.raises(TypeError, match=error): baseline_form.validate() @@ -30,7 +30,7 @@ def test_specific_break_structure(break_input, error, baseline_form: model_gener [[{"start_time": "10:00", "finish_time": "11"}], 'Wrong time format - "HH:MM". Got "11".'], ] ) -def test_specific_population_break_data_structure(population_break_input, error, baseline_form: model_generator.FormData): +def test_specific_population_break_data_structure(population_break_input, error, baseline_form: model_generator.VirusFormData): baseline_form.specific_breaks = {'exposed_breaks': population_break_input, 'infected_breaks': population_break_input} with pytest.raises(TypeError, match=error): baseline_form.validate() @@ -45,7 +45,7 @@ def test_specific_population_break_data_structure(population_break_input, error, [[{"start_time": "08:00", "finish_time": "11:00"}, {"start_time": "14:00", "finish_time": "15:00"}, ], "All breaks should be within the simulation time. Got 08:00."], ] ) -def test_specific_break_time(break_input, error, baseline_form: model_generator.FormData): +def test_specific_break_time(break_input, error, baseline_form: model_generator.VirusFormData): with pytest.raises(ValueError, match=error): baseline_form.generate_specific_break_times(break_input) @@ -63,7 +63,7 @@ def test_specific_break_time(break_input, error, baseline_form: model_generator. [{"physical_activity": "Light activity", "respiratory_activity": [{"type": "Breathing", "percentag": 50}, {"type": "Speaking", "percentage": 50}]}, 'Unable to fetch "percentage" key. Got "percentag".'], ] ) -def test_precise_activity_structure(precise_activity_input, error, baseline_form: model_generator.FormData): +def test_precise_activity_structure(precise_activity_input, error, baseline_form: model_generator.VirusFormData): baseline_form.precise_activity = precise_activity_input with pytest.raises(TypeError, match=error): baseline_form.validate() @@ -78,7 +78,7 @@ def test_precise_activity_structure(precise_activity_input, error, baseline_form [{"physical_activity": "Light activity", "respiratory_activity": [{"type": "Breathing", "percentage": 50}]}, 'The sum of all respiratory activities should be 100. Got 50.'], ] ) -def test_sum_precise_activity(precise_activity_input, error, baseline_form: model_generator.FormData): +def test_sum_precise_activity(precise_activity_input, error, baseline_form: model_generator.VirusFormData): baseline_form.precise_activity = precise_activity_input with pytest.raises(ValueError, match=error): baseline_form.validate() \ No newline at end of file