diff --git a/caimira/apps/calculator/co2_model_generator.py b/caimira/apps/calculator/co2_model_generator.py
index 4e238ce4..b4305bdb 100644
--- a/caimira/apps/calculator/co2_model_generator.py
+++ b/caimira/apps/calculator/co2_model_generator.py
@@ -2,8 +2,9 @@ import dataclasses
import logging
import typing
import numpy as np
-import ruptures as rpt
import matplotlib.pyplot as plt
+from scipy.signal import find_peaks
+import pandas as pd
import re
from caimira import models
@@ -95,13 +96,13 @@ class CO2FormData(FormData):
raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys[1]}".')
for time in input_break.values():
if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time):
- raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".')
+ raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".')
- def find_change_points_with_pelt(self) -> list:
+ def find_change_points_with_scipy(self) -> list:
"""
- Perform change point detection using Pelt algorithm from ruptures library with pen=15.
+ Perform change point detection using scipy library (find_peaks method) with rolling average of data.
Incorporate existing state change candidates and adjust the result accordingly.
- Returns a list of tuples containing (index, X-axis value) for the detected significant changes.
+ Returns a list of the detected ventilation state changes.
"""
times: list = self.CO2_data['times']
CO2_values: list = self.CO2_data['CO2']
@@ -109,44 +110,32 @@ class CO2FormData(FormData):
if len(times) != len(CO2_values):
raise ValueError("times and CO2 values must have the same length.")
- # Convert the input list to a numpy array for use with the ruptures library
- CO2_np = np.array(CO2_values)
+ # Calculate minimum interval for peak detection
+ diff = times[1] - times[0]
+ interval_in_minutes = 30
+ distance_points = interval_in_minutes // (diff * 60) # minutes
- # Define the model for change point detection (Radial Basis Function kernel)
- model = "rbf"
+ # Applying a rolling average to smooth the initial data
+ window_size = int(0.01*len(times)) # 1% of the initial points window for smoothing
+ smoothed_co2 = pd.Series(CO2_values).rolling(window=window_size, center=True).mean()
- # Fit the Pelt algorithm to the data with the specified model
- algo = rpt.Pelt(model=model).fit(CO2_np)
+ # Find peaks (maxima) in the smoothed data
+ distance = distance_points
+ peaks, _ = find_peaks(smoothed_co2.values, prominence=100, distance=distance)
+
+ # Find valleys (minima) by inverting the smoothed data and applying a smooth factor
+ valleys, _ = find_peaks(-smoothed_co2.values, prominence=20, width=int(0.05*len(times)), distance=distance)
- # Predict change points using the Pelt algorithm with a penalty value of 15
- result = algo.predict(pen=15)
+ # Extract peak and valley timestamps
+ timestamps = np.array(times)
+ peak_timestamps = timestamps[peaks]
+ valley_timestamps = timestamps[valleys]
- # Find local minima and maxima
- segments = np.split(np.arange(len(CO2_values)), result)
- merged_segments = [np.hstack((segments[i], segments[i + 1])) for i in range(len(segments) - 1)]
- result_set = set()
- for segment in merged_segments[:-2]:
- result_set.add(times[CO2_values.index(min(CO2_np[segment]))])
- result_set.add(times[CO2_values.index(max(CO2_np[segment]))])
-
- # Calculate presence intervals and respective merge
- infected_presence = self.infected_present_interval()
- exposed_presence = self.exposed_present_interval()
- all_state_change_times = self.population_present_changes(infected_presence, exposed_presence)
- # Check proximity to existing state changes and update result set if necessary.
- # If suggested point is close enough to a simulation time, replace the result with the
- # simulation time. Otherwise, add the exact suggested point.
- for change_point in all_state_change_times:
- closest_point = min(result_set, key=lambda x: abs(x - change_point))
- if abs(closest_point - change_point) <= 1: # Threshold for close points
- result_set.remove(closest_point)
- result_set.add(change_point)
- else:
- result_set.add(change_point)
- return sorted(list(result_set))
+ return sorted(np.concatenate((peak_timestamps, valley_timestamps)))
def generate_ventilation_plot(self,
- transition_times: typing.Optional[list] = None,
+ ventilation_transition_times: typing.Optional[list] = None,
+ occupancy_transition_times: typing.Optional[list] = None,
predictive_CO2: typing.Optional[list] = None) -> str:
# Plot data (x-axis: times; y-axis: CO2 concentrations)
@@ -156,9 +145,18 @@ class CO2FormData(FormData):
fig = plt.figure(figsize=(7, 4), dpi=110)
plt.plot(times_values, CO2_values, label='Input CO₂')
- if (transition_times):
- for time in transition_times:
- plt.axvline(x = time, color = 'grey', linewidth=0.5, linestyle='--')
+ # Add occupancy state changes:
+ if (occupancy_transition_times):
+ for i, time in enumerate(occupancy_transition_times):
+ plt.axvline(x = time, color = 'grey', linewidth=0.5, linestyle='--', label='Occupancy change (from input)' if i == 0 else None)
+ # Add ventilation state changes:
+ if (ventilation_transition_times):
+ for i, time in enumerate(ventilation_transition_times):
+ if i == 0:
+ label = 'Ventilation change (detected)' if occupancy_transition_times else 'Ventilation state changes'
+ else: label = None
+ plt.axvline(x = time, color = 'red', linewidth=0.5, linestyle='--', label=label)
+
if (predictive_CO2):
plt.plot(times_values, predictive_CO2, label='Predictive CO₂')
plt.xlabel('Time of day')
@@ -173,7 +171,9 @@ class CO2FormData(FormData):
def ventilation_transition_times(self) -> typing.List[float]:
vent_states = self.fitting_ventilation_states
- vent_states.append(self.CO2_data['times'][-1]) # The last time value is always needed for the last ACH interval.
+ last_time_from_input = self.CO2_data['times'][-1]
+ if (vent_states != [] and last_time_from_input != vent_states[-1]): # The last time value is always needed for the last ACH interval.
+ vent_states.append(last_time_from_input)
return vent_states
def build_model(self, size=None) -> models.CO2DataModel: # type: ignore
diff --git a/caimira/apps/calculator/co2_report_generator.py b/caimira/apps/calculator/co2_report_generator.py
index d93293a5..11281bba 100644
--- a/caimira/apps/calculator/co2_report_generator.py
+++ b/caimira/apps/calculator/co2_report_generator.py
@@ -13,13 +13,29 @@ class CO2ReportGenerator:
self,
form: CO2FormData,
) -> dict:
-
- transition_times: list = form.find_change_points_with_pelt()
- ventilation_plot: str = form.generate_ventilation_plot(transition_times)
+ '''
+ Initial plot with the suggested ventilation state changes.
+ This method receives the form input and returns the CO2
+ plot with the respective transition times.
+ '''
+ CO2model: CO2DataModel = form.build_model()
+
+ occupancy_transition_times: list = list(CO2model.number.transition_times)
+ ventilation_transition_times: list = form.find_change_points_with_scipy()
+ # The entire ventilation changes consider the initial and final occupancy state change
+ all_vent_transition_times: list = sorted(
+ [occupancy_transition_times[0]] +
+ [occupancy_transition_times[-1]] +
+ ventilation_transition_times)
+
+ ventilation_plot: str = form.generate_ventilation_plot(
+ ventilation_transition_times=all_vent_transition_times,
+ occupancy_transition_times=occupancy_transition_times
+ )
context = {
'CO2_plot': ventilation_plot,
- 'transition_times': [round(el, 2) for el in transition_times],
+ 'transition_times': [round(el, 2) for el in all_vent_transition_times],
}
return context
@@ -28,7 +44,11 @@ class CO2ReportGenerator:
self,
form: CO2FormData,
) -> dict:
-
+ '''
+ Final fitting results with the respective predictive CO2.
+ This method receives the form input and returns the fitting results
+ along with the CO2 plot with the predictive CO2.
+ '''
CO2model: CO2DataModel = form.build_model()
# Ventilation times after user manipulation from the suggested ventilation state change times.
@@ -41,7 +61,7 @@ class CO2ReportGenerator:
# Add the transition times and CO2 plot to the results.
context['transition_times'] = ventilation_transition_times
- context['CO2_plot'] = form.generate_ventilation_plot(transition_times=ventilation_transition_times[:-1],
+ context['CO2_plot'] = form.generate_ventilation_plot(ventilation_transition_times=ventilation_transition_times[:-1],
predictive_CO2=context['predictive_CO2'])
return context
diff --git a/caimira/apps/calculator/report_generator.py b/caimira/apps/calculator/report_generator.py
index dc83ccef..87155e96 100644
--- a/caimira/apps/calculator/report_generator.py
+++ b/caimira/apps/calculator/report_generator.py
@@ -368,9 +368,9 @@ def readable_minutes(minutes: int) -> str:
def hour_format(hour: float) -> str:
# Convert float hour to HH:MM format
- hours = int(hour)
- minutes = int(hour % 1 * 60)
- return f"{hours}:{minutes if minutes != 0 else '00'}"
+ hours = f"{int(hour):02}"
+ minutes = f"{int(hour % 1 * 60):02}"
+ return f"{hours}:{minutes}"
def percentage(absolute: float) -> float:
diff --git a/caimira/apps/calculator/static/js/co2_form.js b/caimira/apps/calculator/static/js/co2_form.js
index 5f6ff406..91113142 100644
--- a/caimira/apps/calculator/static/js/co2_form.js
+++ b/caimira/apps/calculator/static/js/co2_form.js
@@ -106,6 +106,39 @@ function uploadFile(endpoint) {
}
}
+ // Validate Excel times input encompass simulation time
+ const firstTimeInExcel = parseFloat((data[1][timesColumnIndex] * 60).toFixed(2))
+ const lastTimeInExcel = parseFloat((data[data.length - 1][timesColumnIndex] * 60).toFixed(2))
+ // Validate start time
+ const infected_start = $(`[name=infected_start]`).first().val();
+ const exposed_start = $(`[name=exposed_start]`).first().val();
+
+ let [hours_infected, minutes_infected] = infected_start.split(":").map(Number);
+ let elapsed_time_infected = hours_infected * 60 + minutes_infected;
+
+ let [hours_exposed, minutes_exposed] = exposed_start.split(":").map(Number);
+ let elapsed_time_exposed = hours_exposed * 60 + minutes_exposed;
+
+ const min_presence_time = parseFloat((Math.min(elapsed_time_infected, elapsed_time_exposed)).toFixed(2));
+
+ // Validate finish time
+ const infected_finish = $(`[name=infected_finish]`).first().val();
+ const exposed_finish = $(`[name=exposed_finish]`).first().val();
+
+ [hours_infected, minutes_infected] = infected_finish.split(":").map(Number);
+ elapsed_time_infected = hours_infected * 60 + minutes_infected;
+
+ [hours_exposed, minutes_exposed] = exposed_finish.split(":").map(Number);
+ elapsed_time_exposed = hours_exposed * 60 + minutes_exposed;
+
+ const max_presence_time = parseFloat((Math.max(elapsed_time_infected, elapsed_time_exposed)).toFixed(2));
+ if (firstTimeInExcel > min_presence_time || lastTimeInExcel < max_presence_time) {
+ $("#upload-error")
+ .text(`The Excel times should encompass the entire simulation time (from ${min_presence_time/60} to ${max_presence_time/60}).
+ Got Excel times from ${firstTimeInExcel/60} to ${lastTimeInExcel/60}. Either adapt the simulation presence times, or the Excel data.`).show();
+ return;
+ }
+
// Convert Excel file to JSON and further processing
try {
generateJSONStructure(endpoint, data);
diff --git a/caimira/apps/templates/base/calculator.form.html.j2 b/caimira/apps/templates/base/calculator.form.html.j2
index 40fc6d90..ce39e4d3 100644
--- a/caimira/apps/templates/base/calculator.form.html.j2
+++ b/caimira/apps/templates/base/calculator.form.html.j2
@@ -347,7 +347,7 @@
The dashed lines are suggestions for the ventilation transition times
- (generated from the input data using the Pelt algorithm).
+ (generated from the input data using the scipy find_peaks algorithm).