diff --git a/caimira/src/caimira/calculator/validators/co2/co2_validator.py b/caimira/src/caimira/calculator/validators/co2/co2_validator.py index 7a41f6c6..659bd27c 100644 --- a/caimira/src/caimira/calculator/validators/co2/co2_validator.py +++ b/caimira/src/caimira/calculator/validators/co2/co2_validator.py @@ -44,11 +44,14 @@ class CO2FormData(FormData): 'infected_lunch_option': True, 'infected_lunch_start': '12:30', 'infected_people': 1, + 'dynamic_infected_occupancy': '[]', 'infected_start': '08:30', 'room_capacity': None, 'room_volume': NO_DEFAULT, 'specific_breaks': '{}', 'total_people': NO_DEFAULT, + 'dynamic_exposed_occupancy': '[]', + 'occupancy_format': 'static', } def __init__(self, **kwargs): @@ -196,20 +199,34 @@ class CO2FormData(FormData): size = size or self.data_registry.monte_carlo['sample_size'] # Build a simple infected and exposed population for the case when presence # intervals and number of people are dynamic. Activity type is not needed. - infected_presence = self.infected_present_interval() + if self.occupancy_format == 'dynamic': + if isinstance(self.dynamic_infected_occupancy, typing.List) and len(self.dynamic_infected_occupancy) > 0: + infected_people, infected_presence = self.generate_dynamic_occupancy(self.dynamic_infected_occupancy) + else: + raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_infected_occupancy}".') + if isinstance(self.dynamic_exposed_occupancy, typing.List) and len(self.dynamic_exposed_occupancy) > 0: + exposed_people, exposed_presence = self.generate_dynamic_occupancy(self.dynamic_exposed_occupancy) + else: + raise TypeError(f'If dynamic occupancy is selected, a populated list of occupancy intervals is expected. Got "{self.dynamic_exposed_occupancy}".') + else: + infected_people = self.infected_people + exposed_people = self.total_people - self.infected_people + infected_presence = self.infected_present_interval() + exposed_presence = self.exposed_present_interval() + infected_population = models.SimplePopulation( - number=self.infected_people, + number=infected_people, presence=infected_presence, activity=None, # type: ignore ) - exposed_presence = self.exposed_present_interval() exposed_population=models.SimplePopulation( - number=self.total_people - self.infected_people, + number=exposed_people, presence=exposed_presence, activity=None, # type: ignore ) - - all_state_changes = self.population_present_changes(infected_presence, exposed_presence) + + all_state_changes=self.population_present_changes(infected_population.presence_interval(), + exposed_population.presence_interval()) total_people = [infected_population.people_present(stop) + exposed_population.people_present(stop) for _, stop in zip(all_state_changes[:-1], all_state_changes[1:])] diff --git a/caimira/src/caimira/calculator/validators/form_validator.py b/caimira/src/caimira/calculator/validators/form_validator.py index 0e5a6450..281647b5 100644 --- a/caimira/src/caimira/calculator/validators/form_validator.py +++ b/caimira/src/caimira/calculator/validators/form_validator.py @@ -4,6 +4,7 @@ import logging import typing import ast import json +import re import numpy as np @@ -378,6 +379,56 @@ class FormData: self.exposed_start, self.exposed_finish, breaks=breaks, ) + + def generate_dynamic_occupancy(self, dynamic_occupancy: typing.List[typing.Dict[str, typing.Any]]): + ##### Data format validation ##### + for occupancy in dynamic_occupancy: + # Check if each occupancy entry is a dictionary + if not isinstance(occupancy, typing.Dict): + raise TypeError(f'Each occupancy entry should be in a dictionary format. Got "{type(occupancy)}."') + + # Check for required keys in each occupancy entry + dict_keys = list(occupancy.keys()) + if "total_people" not in dict_keys: + raise TypeError(f'Unable to fetch "total_people" key. Got "{dict_keys[0]}".') + else: + value = occupancy["total_people"] + # Check if the value is a non-negative integer + if not isinstance(value, int): + raise ValueError(f"Total number of people should be integer. Got {value}.") + elif not value >= 0: + raise ValueError(f"Total number of people should be non-negative. Got {value}.") + + if "start_time" not in dict_keys: + raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys[1]}".') + if "finish_time" not in dict_keys: + raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys[2]}".') + + # Validate time format for start_time and finish_time + for time_key in ["start_time", "finish_time"]: + time = occupancy[time_key] + if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time): + raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".') + + transition_times = [] + values = [] + for occupancy in dynamic_occupancy: + start_time = time_string_to_minutes(occupancy['start_time'])/60 + finish_time = time_string_to_minutes(occupancy['finish_time'])/60 + transition_times.extend([start_time, finish_time]) + values.append(occupancy['total_people']) + + unique_transition_times_sorted = np.array(sorted(set(transition_times))) + + if len(values) != len(unique_transition_times_sorted) - 1: + raise ValueError("Cannot compute dynamic occupancy with the provided inputs.") + + population_occupancy: models.IntPiecewiseConstant = models.IntPiecewiseConstant( + transition_times=tuple(unique_transition_times_sorted), + values=tuple(values) + ) + population_presence: typing.Union[None, models.Interval] = None + return population_occupancy, population_presence def _hours2timestring(hours: float): diff --git a/caimira/src/caimira/calculator/validators/virus/virus_validator.py b/caimira/src/caimira/calculator/validators/virus/virus_validator.py index c8230974..60c4c52a 100644 --- a/caimira/src/caimira/calculator/validators/virus/virus_validator.py +++ b/caimira/src/caimira/calculator/validators/virus/virus_validator.py @@ -443,56 +443,6 @@ class VirusFormData(FormData): ] = respiratory_activity['percentage'] return (self.precise_activity['physical_activity'], respiratory_dict) - - def generate_dynamic_occupancy(self, dynamic_occupancy: typing.List[typing.Dict[str, typing.Any]]): - ##### Data format validation ##### - for occupancy in dynamic_occupancy: - # Check if each occupancy entry is a dictionary - if not isinstance(occupancy, typing.Dict): - raise TypeError(f'Each occupancy entry should be in a dictionary format. Got "{type(occupancy)}."') - - # Check for required keys in each occupancy entry - dict_keys = list(occupancy.keys()) - if "total_people" not in dict_keys: - raise TypeError(f'Unable to fetch "total_people" key. Got "{dict_keys[0]}".') - else: - value = occupancy["total_people"] - # Check if the value is a non-negative integer - if not isinstance(value, int): - raise ValueError(f"Total number of people should be integer. Got {value}.") - elif not value >= 0: - raise ValueError(f"Total number of people should be non-negative. Got {value}.") - - if "start_time" not in dict_keys: - raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys[1]}".') - if "finish_time" not in dict_keys: - raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys[2]}".') - - # Validate time format for start_time and finish_time - for time_key in ["start_time", "finish_time"]: - time = occupancy[time_key] - if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time): - raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".') - - transition_times = [] - values = [] - for occupancy in dynamic_occupancy: - start_time = time_string_to_minutes(occupancy['start_time'])/60 - finish_time = time_string_to_minutes(occupancy['finish_time'])/60 - transition_times.extend([start_time, finish_time]) - values.append(occupancy['total_people']) - - unique_transition_times_sorted = np.array(sorted(set(transition_times))) - - if len(values) != len(unique_transition_times_sorted) - 1: - raise ValueError("Cannot compute dynamic occupancy with the provided inputs.") - - population_occupancy: models.IntPiecewiseConstant = models.IntPiecewiseConstant( - transition_times=tuple(unique_transition_times_sorted), - values=tuple(values) - ) - population_presence: typing.Union[None, models.Interval] = None - return population_occupancy, population_presence def infected_population(self) -> mc.InfectedPopulation: # Initializes the virus diff --git a/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js b/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js index 16df7060..9f491418 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js +++ b/cern_caimira/src/cern_caimira/apps/calculator/static/js/co2_form.js @@ -17,11 +17,14 @@ const CO2_data_form = [ "infected_lunch_option", "infected_lunch_start", "infected_people", + "dynamic_infected_occupancy", "infected_start", "room_capacity", "room_volume", "specific_breaks", "total_people", + "dynamic_exposed_occupancy", + "occupancy_format", ]; // Method to upload a valid data file (accepted formats: .xls and .xlsx) diff --git a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 index db159eb0..ed1e24ba 100644 --- a/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 +++ b/cern_caimira/src/cern_caimira/apps/templates/base/calculator.form.html.j2 @@ -448,8 +448,8 @@
{# "static" vs. "dynamic" #} - - + +