diff --git a/caimira/src/caimira/calculator/validators/co2/co2_validator.py b/caimira/src/caimira/calculator/validators/co2/co2_validator.py index 941f8b75..36ed80fa 100644 --- a/caimira/src/caimira/calculator/validators/co2/co2_validator.py +++ b/caimira/src/caimira/calculator/validators/co2/co2_validator.py @@ -63,10 +63,8 @@ class CO2FormData(FormData): self.data_registry = DataRegistry() def validate(self): - # Validate population parameters when static occupancy is defined. - # Dynamic population is validated in the generate_dynamic_occupancy method. - if self.occupancy_format == 'static': - self.validate_population_parameters() + # Validate population parameters + self.validate_population_parameters() # Validate room capacity if self.room_capacity: diff --git a/caimira/src/caimira/calculator/validators/form_validator.py b/caimira/src/caimira/calculator/validators/form_validator.py index 70e89e58..074ebf20 100644 --- a/caimira/src/caimira/calculator/validators/form_validator.py +++ b/caimira/src/caimira/calculator/validators/form_validator.py @@ -98,76 +98,110 @@ class FormData: return form_dict def validate_population_parameters(self): - # Validate number of infected <= number of total people - if self.infected_people >= self.total_people: - raise ValueError( - 'Number of infected people cannot be greater or equal to the number of total people.') - - # Validate time intervals selected by user - time_intervals = [ - ['exposed_start', 'exposed_finish'], - ['infected_start', 'infected_finish'], - ] - if self.exposed_lunch_option: - time_intervals.append( - ['exposed_lunch_start', 'exposed_lunch_finish']) - if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option: - time_intervals.append( - ['infected_lunch_start', 'infected_lunch_finish']) - - for start_name, end_name in time_intervals: - start = getattr(self, start_name) - end = getattr(self, end_name) - if start > end: + # Static occupancy is defined. + if self.occupancy_format == 'static': + # Validate number of infected <= number of total people + if self.infected_people >= self.total_people: raise ValueError( - f"{start_name} must be less than {end_name}. Got {start} and {end}.") + 'Number of infected people cannot be greater or equal to the number of total people.') - def validate_lunch(start, finish): - lunch_start = getattr(self, f'{population}_lunch_start') - lunch_finish = getattr(self, f'{population}_lunch_finish') - return (start <= lunch_start <= finish and - start <= lunch_finish <= finish) + # Validate time intervals selected by user + time_intervals = [ + ['exposed_start', 'exposed_finish'], + ['infected_start', 'infected_finish'], + ] + if self.exposed_lunch_option: + time_intervals.append( + ['exposed_lunch_start', 'exposed_lunch_finish']) + if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option: + time_intervals.append( + ['infected_lunch_start', 'infected_lunch_finish']) - def get_lunch_mins(population): - lunch_mins = 0 - if getattr(self, f'{population}_lunch_option'): - lunch_mins = getattr( - self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start') - return lunch_mins - - def get_coffee_mins(population): - coffee_mins = 0 - if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0': - coffee_mins = COFFEE_OPTIONS_INT[getattr( - self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration') - return coffee_mins - - def get_activity_mins(population): - return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start') - - populations = [ - 'exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed'] - for population in populations: - # Validate lunch time within the activity times. - if (getattr(self, f'{population}_lunch_option') and - not validate_lunch(getattr(self, f'{population}_start'), getattr( - self, f'{population}_finish')) - ): - raise ValueError( - f"{population} lunch break must be within presence times." - ) - - # Length of breaks < length of activity - if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population): - raise ValueError( - f"Length of breaks >= Length of {population} presence." - ) - - for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT), - ('infected_coffee_break_option', COFFEE_OPTIONS_INT)]: - if getattr(self, attr_name) not in valid_set: + for start_name, end_name in time_intervals: + start = getattr(self, start_name) + end = getattr(self, end_name) + if start > end: raise ValueError( - f"{getattr(self, attr_name)} is not a valid value for {attr_name}") + f"{start_name} must be less than {end_name}. Got {start} and {end}.") + + def validate_lunch(start, finish): + lunch_start = getattr(self, f'{population}_lunch_start') + lunch_finish = getattr(self, f'{population}_lunch_finish') + return (start <= lunch_start <= finish and + start <= lunch_finish <= finish) + + def get_lunch_mins(population): + lunch_mins = 0 + if getattr(self, f'{population}_lunch_option'): + lunch_mins = getattr( + self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start') + return lunch_mins + + def get_coffee_mins(population): + coffee_mins = 0 + if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0': + coffee_mins = COFFEE_OPTIONS_INT[getattr( + self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration') + return coffee_mins + + def get_activity_mins(population): + return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start') + + populations = [ + 'exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed'] + for population in populations: + # Validate lunch time within the activity times. + if (getattr(self, f'{population}_lunch_option') and + not validate_lunch(getattr(self, f'{population}_start'), getattr( + self, f'{population}_finish')) + ): + raise ValueError( + f"{population} lunch break must be within presence times." + ) + + # Length of breaks < length of activity + if (get_lunch_mins(population) + get_coffee_mins(population)) >= get_activity_mins(population): + raise ValueError( + f"Length of breaks >= Length of {population} presence." + ) + + for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT), + ('infected_coffee_break_option', COFFEE_OPTIONS_INT)]: + if getattr(self, attr_name) not in valid_set: + raise ValueError( + f"{getattr(self, attr_name)} is not a valid value for {attr_name}") + # Dynamic occupancy is defined. + elif self.occupancy_format == 'dynamic': + for dynamic_format in (self.dynamic_infected_occupancy, self.dynamic_exposed_occupancy): + for occupancy in dynamic_format: + # Check if each occupancy entry is a dictionary + if not isinstance(occupancy, typing.Dict): + raise TypeError(f'Each occupancy entry should be in a dictionary format. Got "{type(occupancy)}".') + + # Check for required keys in each occupancy entry + dict_keys = list(occupancy.keys()) + if "total_people" not in dict_keys: + raise TypeError(f'Unable to fetch "total_people" key. Got "{dict_keys}".') + else: + value = occupancy["total_people"] + # Check if the value is a non-negative integer + if not isinstance(value, int): + raise ValueError(f'Total number of people should be integer. Got "{type(value)}".') + elif not value >= 0: + raise ValueError(f'Total number of people should be non-negative. Got "{value}".') + + if "start_time" not in dict_keys: + raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys}".') + if "finish_time" not in dict_keys: + raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys}".') + + # Validate time format for start_time and finish_time + for time_key in ["start_time", "finish_time"]: + time = occupancy[time_key] + if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time): + raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".') + else: + raise ValueError(f"'{self.occupancy_format}' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.") def validate(self): raise NotImplementedError("Subclass must implement") @@ -385,35 +419,6 @@ class FormData: ) def generate_dynamic_occupancy(self, dynamic_occupancy: typing.List[typing.Dict[str, typing.Any]]): - ##### Data format validation ##### - for occupancy in dynamic_occupancy: - # Check if each occupancy entry is a dictionary - if not isinstance(occupancy, typing.Dict): - raise TypeError(f'Each occupancy entry should be in a dictionary format. Got "{type(occupancy)}".') - - # Check for required keys in each occupancy entry - dict_keys = list(occupancy.keys()) - if "total_people" not in dict_keys: - raise TypeError(f'Unable to fetch "total_people" key. Got "{dict_keys}".') - else: - value = occupancy["total_people"] - # Check if the value is a non-negative integer - if not isinstance(value, int): - raise ValueError(f'Total number of people should be integer. Got "{type(value)}".') - elif not value >= 0: - raise ValueError(f'Total number of people should be non-negative. Got "{value}".') - - if "start_time" not in dict_keys: - raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys}".') - if "finish_time" not in dict_keys: - raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys}".') - - # Validate time format for start_time and finish_time - for time_key in ["start_time", "finish_time"]: - time = occupancy[time_key] - if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time): - raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".') - transition_times = [] values = [] for occupancy in dynamic_occupancy: diff --git a/caimira/src/caimira/calculator/validators/virus/virus_validator.py b/caimira/src/caimira/calculator/validators/virus/virus_validator.py index ea3642c8..ca491b7f 100644 --- a/caimira/src/caimira/calculator/validators/virus/virus_validator.py +++ b/caimira/src/caimira/calculator/validators/virus/virus_validator.py @@ -73,10 +73,8 @@ class VirusFormData(FormData): _DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS def validate(self): - # Validate population parameters when static occupancy is defined. - # Dynamic population is validated in the generate_dynamic_occupancy method. - if self.occupancy_format == 'static': - self.validate_population_parameters() + # Validate population parameters + self.validate_population_parameters() validation_tuples = [('activity_type', self.data_registry.population_scenario_activity.keys()), ('mechanical_ventilation_type', diff --git a/caimira/tests/apps/calculator/test_model_generator.py b/caimira/tests/apps/calculator/test_model_generator.py index 2aca9c3e..5feb19ef 100644 --- a/caimira/tests/apps/calculator/test_model_generator.py +++ b/caimira/tests/apps/calculator/test_model_generator.py @@ -590,6 +590,20 @@ def test_form_timezone(baseline_form_data, data_registry, longitude, latitude, m assert offset == expected_offset +@pytest.mark.parametrize( + ["occupancy_format_input", "error"], + [ + ['dynamc', "'dynamc' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",], + ['stact', "'stact' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",], + ['random', "'random' is not a valid value for 'self.occupancy_format'. Accepted values are 'static' or 'dynamic'.",] + ] +) +def test_dynamic_format_input(occupancy_format_input, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy_format = occupancy_format_input + with pytest.raises(ValueError, match=re.escape(error)): + baseline_form.validate() + + @pytest.mark.parametrize( ["dynamic_occupancy_input", "error"], [ @@ -602,8 +616,11 @@ def test_form_timezone(baseline_form_data, data_registry, longitude, latitude, m ] ) def test_dynamic_occupancy_structure(dynamic_occupancy_input, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy_format = "dynamic" + baseline_form.dynamic_infected_occupancy = dynamic_occupancy_input + baseline_form.dynamic_exposed_occupancy = dynamic_occupancy_input with pytest.raises(TypeError, match=re.escape(error)): - baseline_form.generate_dynamic_occupancy(dynamic_occupancy_input) + baseline_form.validate() @pytest.mark.parametrize( @@ -616,5 +633,8 @@ def test_dynamic_occupancy_structure(dynamic_occupancy_input, error, baseline_fo ] ) def test_dynamic_occupancy_total_people(dynamic_occupancy_input, error, baseline_form: virus_validator.VirusFormData): + baseline_form.occupancy_format = "dynamic" + baseline_form.dynamic_infected_occupancy = dynamic_occupancy_input + baseline_form.dynamic_exposed_occupancy = dynamic_occupancy_input with pytest.raises(ValueError, match=re.escape(error)): - baseline_form.generate_dynamic_occupancy(dynamic_occupancy_input) + baseline_form.validate()