replaced Pelt algorithm by scipy find peaks method

This commit is contained in:
lrdossan 2024-08-02 11:21:41 +02:00
parent 9f8cf3b807
commit 0d035aad05
5 changed files with 104 additions and 51 deletions

View file

@ -2,8 +2,9 @@ import dataclasses
import logging
import typing
import numpy as np
import ruptures as rpt
import matplotlib.pyplot as plt
from scipy.signal import find_peaks
import pandas as pd
import re
from caimira import models
@ -95,13 +96,13 @@ class CO2FormData(FormData):
raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys[1]}".')
for time in input_break.values():
if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time):
raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".')
raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".')
def find_change_points_with_pelt(self) -> list:
def find_change_points_with_scipy(self) -> list:
"""
Perform change point detection using Pelt algorithm from ruptures library with pen=15.
Perform change point detection using scipy library (find_peaks method) with rolling average of data.
Incorporate existing state change candidates and adjust the result accordingly.
Returns a list of tuples containing (index, X-axis value) for the detected significant changes.
Returns a list of the detected ventilation state changes.
"""
times: list = self.CO2_data['times']
CO2_values: list = self.CO2_data['CO2']
@ -109,44 +110,32 @@ class CO2FormData(FormData):
if len(times) != len(CO2_values):
raise ValueError("times and CO2 values must have the same length.")
# Convert the input list to a numpy array for use with the ruptures library
CO2_np = np.array(CO2_values)
# Calculate minimum interval for peak detection
diff = times[1] - times[0]
interval_in_minutes = 30
distance_points = interval_in_minutes // (diff * 60) # minutes
# Define the model for change point detection (Radial Basis Function kernel)
model = "rbf"
# Applying a rolling average to smooth the initial data
window_size = int(0.01*len(times)) # 1% of the initial points window for smoothing
smoothed_co2 = pd.Series(CO2_values).rolling(window=window_size, center=True).mean()
# Fit the Pelt algorithm to the data with the specified model
algo = rpt.Pelt(model=model).fit(CO2_np)
# Find peaks (maxima) in the smoothed data
distance = distance_points
peaks, _ = find_peaks(smoothed_co2.values, prominence=100, distance=distance)
# Find valleys (minima) by inverting the smoothed data and applying a smooth factor
valleys, _ = find_peaks(-smoothed_co2.values, prominence=20, width=int(0.05*len(times)), distance=distance)
# Predict change points using the Pelt algorithm with a penalty value of 15
result = algo.predict(pen=15)
# Extract peak and valley timestamps
timestamps = np.array(times)
peak_timestamps = timestamps[peaks]
valley_timestamps = timestamps[valleys]
# Find local minima and maxima
segments = np.split(np.arange(len(CO2_values)), result)
merged_segments = [np.hstack((segments[i], segments[i + 1])) for i in range(len(segments) - 1)]
result_set = set()
for segment in merged_segments[:-2]:
result_set.add(times[CO2_values.index(min(CO2_np[segment]))])
result_set.add(times[CO2_values.index(max(CO2_np[segment]))])
# Calculate presence intervals and respective merge
infected_presence = self.infected_present_interval()
exposed_presence = self.exposed_present_interval()
all_state_change_times = self.population_present_changes(infected_presence, exposed_presence)
# Check proximity to existing state changes and update result set if necessary.
# If suggested point is close enough to a simulation time, replace the result with the
# simulation time. Otherwise, add the exact suggested point.
for change_point in all_state_change_times:
closest_point = min(result_set, key=lambda x: abs(x - change_point))
if abs(closest_point - change_point) <= 1: # Threshold for close points
result_set.remove(closest_point)
result_set.add(change_point)
else:
result_set.add(change_point)
return sorted(list(result_set))
return sorted(np.concatenate((peak_timestamps, valley_timestamps)))
def generate_ventilation_plot(self,
transition_times: typing.Optional[list] = None,
ventilation_transition_times: typing.Optional[list] = None,
occupancy_transition_times: typing.Optional[list] = None,
predictive_CO2: typing.Optional[list] = None) -> str:
# Plot data (x-axis: times; y-axis: CO2 concentrations)
@ -156,9 +145,18 @@ class CO2FormData(FormData):
fig = plt.figure(figsize=(7, 4), dpi=110)
plt.plot(times_values, CO2_values, label='Input CO₂')
if (transition_times):
for time in transition_times:
plt.axvline(x = time, color = 'grey', linewidth=0.5, linestyle='--')
# Add occupancy state changes:
if (occupancy_transition_times):
for i, time in enumerate(occupancy_transition_times):
plt.axvline(x = time, color = 'grey', linewidth=0.5, linestyle='--', label='Occupancy change (from input)' if i == 0 else None)
# Add ventilation state changes:
if (ventilation_transition_times):
for i, time in enumerate(ventilation_transition_times):
if i == 0:
label = 'Ventilation change (detected)' if occupancy_transition_times else 'Ventilation state changes'
else: label = None
plt.axvline(x = time, color = 'red', linewidth=0.5, linestyle='--', label=label)
if (predictive_CO2):
plt.plot(times_values, predictive_CO2, label='Predictive CO₂')
plt.xlabel('Time of day')
@ -173,7 +171,9 @@ class CO2FormData(FormData):
def ventilation_transition_times(self) -> typing.List[float]:
vent_states = self.fitting_ventilation_states
vent_states.append(self.CO2_data['times'][-1]) # The last time value is always needed for the last ACH interval.
last_time_from_input = self.CO2_data['times'][-1]
if (vent_states != [] and last_time_from_input != vent_states[-1]): # The last time value is always needed for the last ACH interval.
vent_states.append(last_time_from_input)
return vent_states
def build_model(self, size=None) -> models.CO2DataModel: # type: ignore

View file

@ -13,13 +13,29 @@ class CO2ReportGenerator:
self,
form: CO2FormData,
) -> dict:
transition_times: list = form.find_change_points_with_pelt()
ventilation_plot: str = form.generate_ventilation_plot(transition_times)
'''
Initial plot with the suggested ventilation state changes.
This method receives the form input and returns the CO2
plot with the respective transition times.
'''
CO2model: CO2DataModel = form.build_model()
occupancy_transition_times: list = list(CO2model.number.transition_times)
ventilation_transition_times: list = form.find_change_points_with_scipy()
# The entire ventilation changes consider the initial and final occupancy state change
all_vent_transition_times: list = sorted(
[occupancy_transition_times[0]] +
[occupancy_transition_times[-1]] +
ventilation_transition_times)
ventilation_plot: str = form.generate_ventilation_plot(
ventilation_transition_times=all_vent_transition_times,
occupancy_transition_times=occupancy_transition_times
)
context = {
'CO2_plot': ventilation_plot,
'transition_times': [round(el, 2) for el in transition_times],
'transition_times': [round(el, 2) for el in all_vent_transition_times],
}
return context
@ -28,7 +44,11 @@ class CO2ReportGenerator:
self,
form: CO2FormData,
) -> dict:
'''
Final fitting results with the respective predictive CO2.
This method receives the form input and returns the fitting results
along with the CO2 plot with the predictive CO2.
'''
CO2model: CO2DataModel = form.build_model()
# Ventilation times after user manipulation from the suggested ventilation state change times.
@ -41,7 +61,7 @@ class CO2ReportGenerator:
# Add the transition times and CO2 plot to the results.
context['transition_times'] = ventilation_transition_times
context['CO2_plot'] = form.generate_ventilation_plot(transition_times=ventilation_transition_times[:-1],
context['CO2_plot'] = form.generate_ventilation_plot(ventilation_transition_times=ventilation_transition_times[:-1],
predictive_CO2=context['predictive_CO2'])
return context

View file

@ -368,9 +368,9 @@ def readable_minutes(minutes: int) -> str:
def hour_format(hour: float) -> str:
# Convert float hour to HH:MM format
hours = int(hour)
minutes = int(hour % 1 * 60)
return f"{hours}:{minutes if minutes != 0 else '00'}"
hours = f"{int(hour):02}"
minutes = f"{int(hour % 1 * 60):02}"
return f"{hours}:{minutes}"
def percentage(absolute: float) -> float:

View file

@ -106,6 +106,39 @@ function uploadFile(endpoint) {
}
}
// Validate Excel times input encompass simulation time
const firstTimeInExcel = parseFloat((data[1][timesColumnIndex] * 60).toFixed(2))
const lastTimeInExcel = parseFloat((data[data.length - 1][timesColumnIndex] * 60).toFixed(2))
// Validate start time
const infected_start = $(`[name=infected_start]`).first().val();
const exposed_start = $(`[name=exposed_start]`).first().val();
let [hours_infected, minutes_infected] = infected_start.split(":").map(Number);
let elapsed_time_infected = hours_infected * 60 + minutes_infected;
let [hours_exposed, minutes_exposed] = exposed_start.split(":").map(Number);
let elapsed_time_exposed = hours_exposed * 60 + minutes_exposed;
const min_presence_time = parseFloat((Math.min(elapsed_time_infected, elapsed_time_exposed)).toFixed(2));
// Validate finish time
const infected_finish = $(`[name=infected_finish]`).first().val();
const exposed_finish = $(`[name=exposed_finish]`).first().val();
[hours_infected, minutes_infected] = infected_finish.split(":").map(Number);
elapsed_time_infected = hours_infected * 60 + minutes_infected;
[hours_exposed, minutes_exposed] = exposed_finish.split(":").map(Number);
elapsed_time_exposed = hours_exposed * 60 + minutes_exposed;
const max_presence_time = parseFloat((Math.max(elapsed_time_infected, elapsed_time_exposed)).toFixed(2));
if (firstTimeInExcel > min_presence_time || lastTimeInExcel < max_presence_time) {
$("#upload-error")
.text(`The Excel times should encompass the entire simulation time (from ${min_presence_time/60} to ${max_presence_time/60}).
Got Excel times from ${firstTimeInExcel/60} to ${lastTimeInExcel/60}. Either adapt the simulation presence times, or the Excel data.`).show();
return;
}
// Convert Excel file to JSON and further processing
try {
generateJSONStructure(endpoint, data);

View file

@ -347,7 +347,7 @@
<img id="CO2_data_plot"/><br>
<p id="suggestion_lines_txt" class="text-danger text-center">
The dashed lines are suggestions for the ventilation transition times<br>
(generated from the input data using the Pelt algorithm).
(generated from the input data using the <a href="https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.find_peaks.html">scipy find_peaks</a> algorithm).
</p>
<div id="DIVfitting_ventilation" class="form-group mb-0">
<label for="fitting_ventilation_states">Please enter the ventilation state change times, separated by comma - e.g. [8.5, 10, 11.5, 17]. </label>