back-end updates for co2 logic

This commit is contained in:
Luis Aleixo 2023-07-19 17:05:11 +02:00
parent 7e29c0c5cf
commit 7da8cc438e
6 changed files with 106 additions and 83 deletions

View file

@ -351,8 +351,20 @@ class CO2Data(BaseRequestHandler):
Thus, XSRF cookies are disabled by overriding base class implementation of this method with a pass statement.
"""
pass
def generate_ventilation_plot(self, CO2Data, transition_times = None, ventilation_values = None):
fig = plt.figure(figsize=(7, 4), dpi=110)
plt.plot(CO2Data['times'], CO2Data['CO2'])
if (transition_times and ventilation_values):
for index, time in enumerate(transition_times[:-1]):
plt.axvline(x = time, color = 'grey', linewidth=0.5, linestyle='--')
y_location = (CO2Data['CO2'][min(range(len(CO2Data['times'])), key=lambda i: abs(CO2Data['times'][i]-time))])
plt.text(x = time + 0.04, y = y_location, s=round(ventilation_values[index], 2))
plt.xlabel('Time of day')
plt.ylabel('Concentration (ppm)')
return img2base64(_figure2bytes(fig))
async def post(self) -> None:
async def post(self, endpoint: str) -> None:
requested_model_config = tornado.escape.json_decode(self.request.body)
try:
form = co2_model_generator.CO2FormData.from_dict(requested_model_config)
@ -365,31 +377,24 @@ class CO2Data(BaseRequestHandler):
self.finish(json.dumps(response_json))
return
executor = loky.get_reusable_executor(
max_workers=self.settings['handler_worker_pool_size'],
timeout=300,
)
report_task = executor.submit(
co2_model_generator.CO2FormData.build_model, form,
)
report = await asyncio.wrap_future(report_task)
def generate_ventilation_plot(transition_times: tuple, ventilation_values: tuple):
fig = plt.figure(figsize=(7, 4), dpi=110)
plt.plot(form.CO2_data['times'], form.CO2_data['CO2'])
for index, time in enumerate(transition_times[:-1]):
plt.axvline(x = time, color = 'grey', linewidth=0.5, linestyle='--')
y_location = (form.CO2_data['CO2'][min(range(len(form.CO2_data['times'])), key=lambda i: abs(form.CO2_data['times'][i]-time))])
plt.text(x = time + 0.04, y = y_location, s=round(ventilation_values[index], 2))
plt.xlabel('Time of day')
plt.ylabel('Concentration (ppm)')
return fig
if endpoint == 'plot':
self.finish({'CO2_plot': self.generate_ventilation_plot(form.CO2_data)})
else:
executor = loky.get_reusable_executor(
max_workers=self.settings['handler_worker_pool_size'],
timeout=300,
)
report_task = executor.submit(
co2_model_generator.CO2FormData.build_model, form,
)
report = await asyncio.wrap_future(report_task)
result = dict(report.CO2_fit_params())
result['fitting_ventilation_type'] = 'fitting_natural_ventilation'
result['transition_times'] = report.ventilation_transition_times
result['CO2_plot'] = self.generate_ventilation_plot(form.CO2_data, report.ventilation_transition_times, result['ventilation_values'])
self.finish(result)
result = dict(report.CO2_fit_params())
result['transition_times'] = report.ventilation_transition_times
result['CO2_plot'] = img2base64(_figure2bytes(generate_ventilation_plot(report.ventilation_transition_times, result['ventilation_values'])))
self.finish(result)
def get_url(app_root: str, relative_path: str = '/'):
return app_root.rstrip('/') + relative_path.rstrip('/')
@ -412,7 +417,7 @@ def make_app(
base_urls: typing.List = [
(get_root_url(r'/?'), LandingPage),
(get_root_calculator_url(r'/?'), CalculatorForm),
(get_root_calculator_url(r'/co2-fit/'), CO2Data),
(get_root_calculator_url(r'/co2-fit/(.*)'), CO2Data),
(get_root_calculator_url(r'/report'), ConcentrationModel),
(get_root_url(r'/static/(.*)'), StaticFileHandler, {'path': static_dir}),
(get_root_calculator_url(r'/static/(.*)'), StaticFileHandler, {'path': calculator_static_dir}),

View file

@ -4,7 +4,10 @@ import logging
import typing
from caimira import models
from caimira import data
from . import model_generator
import caimira.monte_carlo as mc
from caimira.monte_carlo.data import activity_distributions, virus_distributions, mask_distributions, short_range_distances
minutes_since_midnight = typing.NewType('minutes_since_midnight', int)
@ -18,7 +21,6 @@ _NO_DEFAULT = object()
@dataclasses.dataclass
class CO2FormData:
CO2_data: dict
specific_breaks: dict
exposed_coffee_break_option: str
exposed_coffee_duration: int
exposed_finish: minutes_since_midnight
@ -26,6 +28,8 @@ class CO2FormData:
exposed_lunch_option: bool
exposed_lunch_start: minutes_since_midnight
exposed_start: minutes_since_midnight
fitting_ventilation_states: list
fitting_ventilation_type: str
infected_coffee_break_option: str #Used if infected_dont_have_breaks_with_exposed
infected_coffee_duration: int #Used if infected_dont_have_breaks_with_exposed
infected_dont_have_breaks_with_exposed: bool
@ -38,15 +42,11 @@ class CO2FormData:
room_volume: float
total_people: int
ventilation_type: str
windows_duration: float
windows_frequency: float
window_opening_regime: str
#: The default values for undefined fields. Note that the defaults here
#: and the defaults in the html form must not be contradictory.
_DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = {
'CO2_data': '{}',
'specific_breaks': '{}',
'exposed_coffee_break_option': 'coffee_break_0',
'exposed_coffee_duration': 5,
'exposed_finish': '17:30',
@ -54,6 +54,8 @@ class CO2FormData:
'exposed_lunch_option': True,
'exposed_lunch_start': '12:30',
'exposed_start': '08:30',
'fitting_ventilation_states': '[]',
'fitting_ventilation_type': 'fitting_natural_ventilation',
'infected_coffee_break_option': 'coffee_break_0',
'infected_coffee_duration': 5,
'infected_dont_have_breaks_with_exposed': False,
@ -66,9 +68,6 @@ class CO2FormData:
'room_volume': _NO_DEFAULT,
'total_people': _NO_DEFAULT,
'ventilation_type': 'no_ventilation',
'windows_duration': 10.,
'windows_frequency': 60.,
'window_opening_regime': 'windows_open_permanently',
}
@classmethod
@ -100,18 +99,47 @@ class CO2FormData:
return instance
def build_model(self) -> models.CO2Data:
population_presence=self.population_present_interval()
last_time_present = population_presence.boundaries()[-1][-1]
last_present_time_index = next((index for index, time in enumerate(self.CO2_data['times'])
if time > last_time_present), len(self.CO2_data['times']))
infected_population: models.Population = self.infected_population()
exposed_population: models.Population = self.exposed_population()
all_state_changes=self.population_present_interval()
total_people = [infected_population.people_present(stop) + exposed_population.people_present(stop)
for _, stop in zip(all_state_changes[:-1], all_state_changes[1:])]
return models.CO2Data(
room_volume=self.room_volume,
number=self.total_people,
presence=population_presence,
ventilation_transition_times=self.ventilation_transition_times(last_time_present),
times=self.CO2_data['times'][:last_present_time_index],
CO2_concentrations=self.CO2_data['CO2'][:last_present_time_index],
number=models.IntPiecewiseConstant(transition_times=tuple(all_state_changes), values=tuple(total_people)),
presence=None,
ventilation_transition_times=self.ventilation_transition_times(),
times=self.CO2_data['times'],
CO2_concentrations=self.CO2_data['CO2'],
)
def exposed_population(self) -> models.Population:
infected_occupants = self.infected_people
# The number of exposed occupants is the total number of occupants
# minus the number of infected occupants.
exposed_occupants = self.total_people - infected_occupants
exposed = models.Population(
number=exposed_occupants,
presence=self.exposed_present_interval(),
activity=models.Activity.types['Seated'],
mask=models.Mask.types['No mask'],
host_immunity=0.,
)
return exposed
def infected_population(self) -> models.Population:
infected_occupants = self.infected_people
infected = models.Population(
number=infected_occupants,
presence=self.infected_present_interval(),
activity=models.Activity.types['Seated'],
mask=models.Mask.types['No mask'],
host_immunity=0.,
)
return infected
def _compute_breaks_in_interval(self, start, finish, n_breaks, duration) -> models.BoundarySequence_t:
break_delay = ((finish - start) - (n_breaks * duration)) // (n_breaks+1)
@ -287,40 +315,32 @@ class CO2FormData:
LOG.debug("trailing interval")
present_intervals.append((current_time / 60, finish / 60))
return models.SpecificInterval(tuple(present_intervals))
def infected_present_interval(self) -> models.Interval:
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(self.specific_breaks['infected_breaks'])
else:
breaks = self.infected_lunch_break_times() + self.infected_coffee_break_times()
return self.present_interval(
self.infected_start, self.infected_finish,
breaks=breaks,
)
def population_present_interval(self) -> models.Interval:
def population_present_interval(self) -> typing.List[float]:
state_change_times = set(self.infected_present_interval().transition_times())
state_change_times.update(self.exposed_present_interval().transition_times())
all_state_changes = sorted(state_change_times)
return models.SpecificInterval(tuple(zip(all_state_changes[:-1], all_state_changes[1:])))
return sorted(state_change_times)
def exposed_present_interval(self) -> models.Interval:
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(self.specific_breaks['exposed_breaks'])
else:
breaks = self.exposed_lunch_break_times() + self.exposed_coffee_break_times()
breaks = self.exposed_lunch_break_times() + self.exposed_coffee_break_times()
return self.present_interval(
self.exposed_start, self.exposed_finish,
breaks=breaks,
)
def infected_present_interval(self) -> models.Interval:
breaks = self.infected_lunch_break_times() + self.infected_coffee_break_times()
return self.present_interval(
self.infected_start, self.infected_finish,
breaks=breaks,
)
def ventilation_transition_times(self, last_present_time) -> typing.Tuple[float, ...]:
if self.ventilation_type == 'from_fitting' and self.window_opening_regime == 'windows_open_periodically':
transition_times = sorted(models.PeriodicInterval(self.windows_frequency,
self.windows_duration, min(self.infected_start, self.exposed_start)/60).transition_times())
return tuple(filter(lambda x: x <= last_present_time, transition_times))
def ventilation_transition_times(self) -> typing.Tuple[float, ...]:
# Check what type of ventilation is considered for the fitting
if self.fitting_ventilation_type == 'fitting_natural_ventilation':
return tuple(self.fitting_ventilation_states)
else:
return tuple((min(self.infected_start, self.exposed_start)/60, max(self.infected_finish, self.exposed_finish)/60), ) # all day long
return tuple((self.CO2_data['times'][0], self.CO2_data['times'][-1]))
#: Mapping of field name to a callable which can convert values from form
#: input (URL encoded arguments / string) into the correct type.

View file

@ -22,7 +22,6 @@ DEFAULTS = {
'ceiling_height': 0.,
'conditional_probability_plot': False,
'conditional_probability_viral_loads': False,
'CO2_data_option': False,
'CO2_fitting_result': '{}',
'exposed_coffee_break_option': 'coffee_break_0',
'exposed_coffee_duration': 5,

View file

@ -37,7 +37,6 @@ class FormData:
ceiling_height: float
conditional_probability_plot: bool
conditional_probability_viral_loads: bool
CO2_data_option: bool
CO2_fitting_result: dict
exposed_coffee_break_option: str
exposed_coffee_duration: int
@ -438,16 +437,15 @@ class FormData:
def ventilation(self) -> models._VentilationBase:
always_on = models.PeriodicInterval(period=120, duration=120)
periodic_interval = models.PeriodicInterval(self.windows_frequency, self.windows_duration, min(self.infected_start, self.exposed_start)/60)
if self.CO2_data_option:
ventilations = []
if self.ventilation_type == 'from_fitting' and self.window_opening_regime == 'windows_open_periodically':
for index, time in enumerate(sorted(list(periodic_interval.transition_times()))[:-1]):
if index < len(self.CO2_fitting_result['ventilation_values']):
ventilations.append(models.AirChange(active=models.SpecificInterval(present_times=((time, time + self.windows_duration/60), )),
air_exch=self.CO2_fitting_result['ventilation_values'][index]))
else: break
periodic_interval = models.PeriodicInterval(self.windows_frequency, self.windows_duration,
min(self.infected_start, self.exposed_start)/60)
if self.ventilation_type == 'from_fitting':
ventilations = []
if self.CO2_fitting_result['fitting_ventilation_type'] == 'fitting_natural_ventilation':
transition_times = self.CO2_fitting_result['transition_times']
for index, (start, stop) in enumerate(zip(transition_times[:-1], transition_times[1:])):
ventilations.append(models.AirChange(active=models.SpecificInterval(present_times=((start, stop), )),
air_exch=self.CO2_fitting_result['ventilation_values'][index]))
else:
ventilations.append(models.AirChange(active=always_on, air_exch=self.CO2_fitting_result['ventilation_values'][0]))
return models.MultipleVentilation(tuple(ventilations))

View file

@ -323,7 +323,7 @@ def hour_format(hour: float) -> str:
hours = int(hour)
minutes = int(hour % 1 * 60)
return f"{hours}:{minutes}"
return f"{hours}:{minutes if minutes != 0 else '00'}"
def percentage(absolute: float) -> float:
return absolute * 100

View file

@ -1527,11 +1527,12 @@ class CO2Data:
exhalation_rate=exhalation_rate,
ventilation_values=ventilation_values
)
return np.sqrt(np.sum((np.array(self.CO2_concentrations) - np.array(the_concentrations))**2))
return np.sqrt(np.sum((np.array(self.CO2_concentrations) -
np.array(the_concentrations))**2))
# The goal is to minimize the difference between the two different curves (known concentrations vs. predicted concentrations)
res_dict = minimize(fun=fun, x0=np.ones(len(self.ventilation_transition_times)), method='powell', bounds=[
(0, None) for _ in range(len(self.ventilation_transition_times))], options={'xtol': 1e-3})
res_dict = minimize(fun=fun, x0=np.ones(len(self.ventilation_transition_times)), method='powell',
bounds=[(0, None) for _ in range(len(self.ventilation_transition_times))],
options={'xtol': 1e-3})
exhalation_rate = res_dict['x'][0]
ventilation_values = res_dict['x'][1:]