diff --git a/caimira/apps/calculator/__init__.py b/caimira/apps/calculator/__init__.py index 5a746648..46de85e9 100644 --- a/caimira/apps/calculator/__init__.py +++ b/caimira/apps/calculator/__init__.py @@ -404,7 +404,10 @@ class CO2ModelResponse(BaseRequestHandler): requested_model_config = tornado.escape.json_decode(self.request.body) try: - form = co2_model_generator.CO2FormData.from_dict(requested_model_config, data_registry) + form: co2_model_generator.CO2FormData = co2_model_generator.CO2FormData.from_dict( + requested_model_config, + data_registry + ) except Exception as err: if self.settings.get("debug", False): import traceback @@ -415,26 +418,28 @@ class CO2ModelResponse(BaseRequestHandler): return if endpoint.rstrip('/') == 'plot': - transition_times = co2_model_generator.CO2FormData.find_change_points_with_pelt(form.CO2_data) - self.finish({'CO2_plot': co2_model_generator.CO2FormData.generate_ventilation_plot(form.CO2_data, transition_times), - 'transition_times': [round(el, 2) for el in transition_times]}) + transition_times: list = form.find_change_points_with_pelt() + self.finish({ + 'CO2_plot': form.generate_ventilation_plot(transition_times), + 'transition_times': [round(el, 2) for el in transition_times] + }) else: executor = loky.get_reusable_executor( max_workers=self.settings['handler_worker_pool_size'], timeout=300, ) - report_task = executor.submit( - co2_model_generator.CO2FormData.build_model, form, - ) + report_task = executor.submit(form.build_model) report = await asyncio.wrap_future(report_task) - - result = dict(report.CO2_fit_params()) + # Ventilation times after user manipulation from the suggested Pelt algorithm times. ventilation_transition_times = report.ventilation_transition_times - result['fitting_ventilation_type'] = form.fitting_ventilation_type + # The result of the following method is a dict with the results of the fitting + # algorithm, namely the breathing rate and ACH values. It also returns the + # predictive CO2 result based on the fitting results. + result: typing.Dict = dict(report.CO2_fit_params()) + # Add the transition times and CO2 plot to the results. result['transition_times'] = ventilation_transition_times - result['CO2_plot'] = co2_model_generator.CO2FormData.generate_ventilation_plot(CO2_data=form.CO2_data, - transition_times=ventilation_transition_times[:-1], + result['CO2_plot'] = form.generate_ventilation_plot(transition_times=ventilation_transition_times[:-1], predictive_CO2=result['predictive_CO2']) self.finish(result) diff --git a/caimira/apps/calculator/co2_model_generator.py b/caimira/apps/calculator/co2_model_generator.py index e8a054ca..440a2b1c 100644 --- a/caimira/apps/calculator/co2_model_generator.py +++ b/caimira/apps/calculator/co2_model_generator.py @@ -99,15 +99,14 @@ class CO2FormData(FormData): if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time): raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".') - @classmethod - def find_change_points_with_pelt(self, CO2_data: dict): + def find_change_points_with_pelt(self) -> list: """ Perform change point detection using Pelt algorithm from ruptures library with pen=15. + Incorporate existing state change candidates and adjust the result accordingly. Returns a list of tuples containing (index, X-axis value) for the detected significant changes. """ - - times: list = CO2_data['times'] - CO2_values: list = CO2_data['CO2'] + times: list = self.CO2_data['times'] + CO2_values: list = self.CO2_data['CO2'] if len(times) != len(CO2_values): raise ValueError("times and CO2 values must have the same length.") @@ -131,14 +130,30 @@ class CO2FormData(FormData): for segment in merged_segments[:-2]: result_set.add(times[CO2_values.index(min(CO2_np[segment]))]) result_set.add(times[CO2_values.index(max(CO2_np[segment]))]) - return list(result_set) - @classmethod - def generate_ventilation_plot(self, CO2_data: dict, + # Calculate presence intervals and respective merge + infected_presence = self.infected_present_interval() + exposed_presence = self.exposed_present_interval() + all_state_change_times = self.population_present_changes(infected_presence, exposed_presence) + # Check proximity to existing state changes and update result set if necessary. + # If suggested point is close enough to a simulation time, replace the result with the + # simulation time. Otherwise, add the exact suggested point. + for change_point in all_state_change_times: + closest_point = min(result_set, key=lambda x: abs(x - change_point)) + if abs(closest_point - change_point) <= 1: # Threshold for close points + result_set.remove(closest_point) + result_set.add(change_point) + else: + result_set.add(change_point) + return sorted(list(result_set)) + + def generate_ventilation_plot(self, transition_times: typing.Optional[list] = None, - predictive_CO2: typing.Optional[list] = None): - times_values = CO2_data['times'] - CO2_values = CO2_data['CO2'] + predictive_CO2: typing.Optional[list] = None) -> str: + + # Plot data (x-axis: times; y-axis: CO2 concentrations) + times_values: list = self.CO2_data['times'] + CO2_values: list = self.CO2_data['CO2'] fig = plt.figure(figsize=(7, 4), dpi=110) plt.plot(times_values, CO2_values, label='Input CO₂') @@ -184,7 +199,7 @@ class CO2FormData(FormData): activity=None, # type: ignore ) - all_state_changes=self.population_present_changes(infected_presence, exposed_presence) + all_state_changes = self.population_present_changes(infected_presence, exposed_presence) total_people = [infected_population.people_present(stop) + exposed_population.people_present(stop) for _, stop in zip(all_state_changes[:-1], all_state_changes[1:])] diff --git a/caimira/models.py b/caimira/models.py index 3fe58e22..5170cc20 100644 --- a/caimira/models.py +++ b/caimira/models.py @@ -1561,7 +1561,7 @@ class CO2DataModel: # Calculate the predictive CO2 concentration return [CO2_concentration_model.concentration(time) for time in self.times] - def CO2_fit_params(self): + def CO2_fit_params(self) -> typing.Dict: if len(self.times) != len(self.CO2_concentrations): raise ValueError('times and CO2_concentrations must have same length.') @@ -1570,6 +1570,11 @@ class CO2DataModel: 'times and CO2_concentrations must contain at last two elements') def fun(x): + ''' + The objective function to be minimized, where x is an argument + containing the initial guess for the breathing rate (exhalation_rate) + and ventilation values (ventilation_values). + ''' exhalation_rate = x[0] ventilation_values = tuple(x[1:]) CO2_concentration_model = self.CO2_concentration_model(