extracted CO2 logic to a dedicated route

This commit is contained in:
lrdossan 2024-09-03 16:53:13 +02:00
parent 41ea92cac2
commit ebde0e1ae4
16 changed files with 692 additions and 682 deletions

View file

@ -109,7 +109,7 @@ python -m cern_caimira.apps.calculator
To run with a specific template theme created:
```
python -m cern_caimira.apps.calculator --theme=ui/apps/templates/{theme}
python -m cern_caimira.apps.calculator --theme=cern_caimira/src/cern_caimira/apps/templates/{theme}
```
To run the entire app in a different `APPLICATION_ROOT` path:
@ -168,9 +168,11 @@ Running with Visual Studio Code (VSCode):
### Running the tests
Make sure you are in the root directory of the project. Then:
```
pip install -e .[test]
pytest ./caimira
pip install -e .[test]
python -m pytest
```
### Running the profiler

View file

@ -8,22 +8,25 @@ import tornado.log
from tornado.options import define, options
import logging
from caimira.api.routes.report_routes import ReportHandler
from caimira.api.routes.report_routes import VirusReportHandler, CO2ReportHandler
define("port", default=8088, help="Port to listen on", type=int)
logging.basicConfig(format="%(message)s", level=logging.INFO)
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/report", ReportHandler),
(r"/co2_report", CO2ReportHandler),
(r"/virus_report", VirusReportHandler),
]
settings = dict(
debug=True,
)
super(Application, self).__init__(handlers, **settings)
if __name__ == "__main__":
app = Application()
app.listen(options.port)

View file

@ -0,0 +1,26 @@
from caimira.calculator.validators.co2.co2_validator import CO2FormData
from caimira.calculator.store.data_registry import DataRegistry
def generate_form_obj(form_data, data_registry):
return CO2FormData.from_dict(form_data=form_data, data_registry=data_registry)
def generate_model(form_obj, data_registry):
sample_size = data_registry.monte_carlo['sample_size']
return form_obj.build_model(sample_size=sample_size)
def generate_report(model):
return dict(model.CO2_fit_params())
def submit_CO2_form(form_data):
data_registry = DataRegistry()
form_obj = generate_form_obj(
form_data=form_data, data_registry=data_registry)
model = generate_model(form_obj=form_obj, data_registry=data_registry)
report_data = generate_report(model=model)
return report_data

View file

@ -1,30 +0,0 @@
import concurrent.futures
import functools
from caimira.calculator.validators.virus.virus_validator import VirusFormData
from caimira.calculator.store.data_registry import DataRegistry
import caimira.calculator.report.report_generator as rg
def generate_form_obj(form_data, data_registry):
return VirusFormData.from_dict(form_data, data_registry)
def generate_model(form_obj):
return form_obj.build_model(250_000)
def generate_report_results(form_obj, model):
return rg.calculate_report_data(form=form_obj, model=model, executor_factory=functools.partial(
concurrent.futures.ThreadPoolExecutor, None, # TODO define report_parallelism
),)
def submit_virus_form(form_data):
data_registry = DataRegistry
form_obj = generate_form_obj(form_data, data_registry)
model = generate_model(form_obj)
report_data = generate_report_results(form_obj, model=model)
return report_data

View file

@ -0,0 +1,38 @@
import concurrent.futures
import functools
from caimira.calculator.validators.virus.virus_validator import VirusFormData
from caimira.calculator.store.data_registry import DataRegistry
import caimira.calculator.report.virus_report_data as rg
def generate_form_obj(form_data, data_registry):
return VirusFormData.from_dict(
form_data=form_data,
data_registry=data_registry,
)
def generate_model(form_obj, data_registry):
sample_size = data_registry.monte_carlo['sample_size']
return form_obj.build_model(sample_size=sample_size)
def generate_report_results(form_obj, model):
return rg.calculate_report_data(
form=form_obj,
model=model,
executor_factory=functools.partial(
concurrent.futures.ThreadPoolExecutor, None, # TODO define report_parallelism
),
)
def submit_virus_form(form_data):
data_registry = DataRegistry
form_obj = generate_form_obj(form_data=form_data, data_registry=data_registry)
model = generate_model(form_obj=form_obj, data_registry=data_registry)
report_data = generate_report_results(form_obj=form_obj, model=model)
return report_data

View file

@ -1,15 +1,24 @@
import json
import traceback
import tornado.web
import sys
from caimira.api.controller.report_controller import submit_virus_form
from caimira.api.controller.virus_report_controller import submit_virus_form
from caimira.api.controller.co2_report_controller import submit_CO2_form
class ReportHandler(tornado.web.RequestHandler):
class BaseReportHandler(tornado.web.RedirectHandler):
def set_default_headers(self):
self.set_header("Access-Control-Allow-Origin", "*")
self.set_header("Access-Control-Allow-Headers", "x-requested-with")
self.set_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
def write_error(self, status_code, **kwargs):
self.set_status(status_code)
self.write({"message": kwargs.get('exc_info')[1].__str__()})
class VirusReportHandler(BaseReportHandler):
def post(self):
try:
form_data = json.loads(self.request.body)
@ -24,5 +33,27 @@ class ReportHandler(tornado.web.RequestHandler):
self.write(response_data)
except Exception as e:
traceback.print_exc()
self.set_status(400)
self.write({"message": str(e)})
self.write_error(status_code=400, exc_info=sys.exc_info())
class CO2ReportHandler(tornado.web.RequestHandler):
def set_default_headers(self):
self.set_header("Access-Control-Allow-Origin", "*")
self.set_header("Access-Control-Allow-Headers", "x-requested-with")
self.set_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
def post(self):
try:
form_data = json.loads(self.request.body)
report_data = submit_CO2_form(form_data)
response_data = {
"status": "success",
"message": "Results generated successfully",
"report_data": report_data,
}
self.write(response_data)
except Exception as e:
traceback.print_exc()
self.write_error(status_code=400, exc_info=sys.exc_info())

View file

@ -0,0 +1,60 @@
from caimira.calculator.validators.co2.co2_validator import CO2FormData
from caimira.calculator.models.models import CO2DataModel
def build_initial_plot(
form: CO2FormData,
) -> dict:
'''
Initial plot with the suggested ventilation state changes.
This method receives the form input and returns the CO2
plot with the respective transition times.
'''
CO2model: CO2DataModel = form.build_model()
occupancy_transition_times = list(CO2model.occupancy.transition_times)
ventilation_transition_times: list = form.find_change_points()
# The entire ventilation changes consider the initial and final occupancy state change
all_vent_transition_times: list = sorted(
[occupancy_transition_times[0]] +
[occupancy_transition_times[-1]] +
ventilation_transition_times)
ventilation_plot: str = form.generate_ventilation_plot(
ventilation_transition_times=all_vent_transition_times,
occupancy_transition_times=occupancy_transition_times
)
context = {
'CO2_plot': ventilation_plot,
'transition_times': [round(el, 2) for el in all_vent_transition_times],
}
return context
def build_fitting_results(
form: CO2FormData,
) -> dict:
'''
Final fitting results with the respective predictive CO2.
This method receives the form input and returns the fitting results
along with the CO2 plot with the predictive CO2.
'''
CO2model: CO2DataModel = form.build_model()
# Ventilation times after user manipulation from the suggested ventilation state change times.
ventilation_transition_times = list(CO2model.ventilation_transition_times)
# The result of the following method is a dict with the results of the fitting
# algorithm, namely the breathing rate and ACH values. It also returns the
# predictive CO2 result based on the fitting results.
context = dict(CO2model.CO2_fit_params())
# Add the transition times and CO2 plot to the results.
context['transition_times'] = ventilation_transition_times
context['CO2_plot'] = form.generate_ventilation_plot(ventilation_transition_times=ventilation_transition_times[:-1],
predictive_CO2=context['predictive_CO2'])
return context

View file

@ -1 +0,0 @@
# Move here the backend logic.

View file

@ -1,309 +0,0 @@
import concurrent.futures
import base64
import dataclasses
import io
import typing
import numpy as np
import matplotlib.pyplot as plt
from caimira.calculator.models import models
# from caimira.apps.calculator import markdown_tools
# from caimira.profiler import profile
from caimira.calculator.validators.virus.virus_validator import VirusFormData
from caimira.calculator.models import dataclass_utils
from caimira.calculator.models.enums import ViralLoads
def model_start_end(model: models.ExposureModel):
t_start = min(model.exposed.presence_interval().boundaries()[0][0],
model.concentration_model.infected.presence_interval().boundaries()[0][0])
t_end = max(model.exposed.presence_interval().boundaries()[-1][1],
model.concentration_model.infected.presence_interval().boundaries()[-1][1])
return t_start, t_end
def fill_big_gaps(array, gap_size):
"""
Insert values into the given sorted list if there is a gap of more than ``gap_size``.
All values in the given array are preserved, even if they are within the ``gap_size`` of one another.
>>> fill_big_gaps([1, 2, 4], gap_size=0.75)
[1, 1.75, 2, 2.75, 3.5, 4]
"""
result = []
if len(array) == 0:
raise ValueError("Input array must be len > 0")
last_value = array[0]
for value in array:
while value - last_value > gap_size + 1e-15:
last_value = last_value + gap_size
result.append(last_value)
result.append(value)
last_value = value
return result
def non_temp_transition_times(model: models.ExposureModel):
"""
Return the non-temperature (and PiecewiseConstant) based transition times.
"""
def walk_model(model, name=""):
# Extend walk_dataclass to handle lists of dataclasses
# (e.g. in MultipleVentilation).
for name, obj in dataclass_utils.walk_dataclass(model, name=name):
if name.endswith('.ventilations') and isinstance(obj, (list, tuple)):
for i, item in enumerate(obj):
fq_name_i = f'{name}[{i}]'
yield fq_name_i, item
if dataclasses.is_dataclass(item):
yield from dataclass_utils.walk_dataclass(item, name=fq_name_i)
else:
yield name, obj
t_start, t_end = model_start_end(model)
change_times = {t_start, t_end}
for name, obj in walk_model(model, name="exposure"):
if isinstance(obj, models.Interval):
change_times |= obj.transition_times()
# Only choose times that are in the range of the model (removes things
# such as PeriodicIntervals, which extend beyond the model itself).
return sorted(time for time in change_times if (t_start <= time <= t_end))
def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional[int] = None) -> typing.List[float]:
"""
Pick approximately ``approx_n_pts`` time points which are interesting for the
given model. If not provided by argument, ``approx_n_pts`` is set to be 15 times
the number of hours of the simulation.
Initially the times are seeded by important state change times (excluding
outside temperature), and the times are then subsequently expanded to ensure
that the step size is at most ``(t_end - t_start) / approx_n_pts``.
"""
times = non_temp_transition_times(model)
sim_duration = max(times) - min(times)
if not approx_n_pts: approx_n_pts = sim_duration * 15
# Expand the times list to ensure that we have a maximum gap size between
# the key times.
nice_times = fill_big_gaps(times, gap_size=(sim_duration) / approx_n_pts)
return nice_times
def concentrations_with_sr_breathing(form: VirusFormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]:
lower_concentrations = []
for time in times:
for index, (start, stop) in enumerate(short_range_intervals):
# For visualization issues, add short-range breathing activity to the initial long-range concentrations
if start <= time <= stop and form.short_range_interactions[index]['expiration'] == 'Breathing':
lower_concentrations.append(np.array(model.concentration(float(time))).mean())
break
lower_concentrations.append(np.array(model.concentration_model.concentration(float(time))).mean())
return lower_concentrations
def _calculate_deposited_exposure(model, time1, time2, fn_name=None):
return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(),fn_name
def _calculate_long_range_deposited_exposure(model, time1, time2, fn_name=None):
return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name
def _calculate_co2_concentration(CO2_model, time, fn_name=None):
return np.array(CO2_model.concentration(float(time))).mean(), fn_name
# @profile
def calculate_report_data(form: VirusFormData, model: models.ExposureModel, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]:
times = interesting_times(model)
short_range_intervals = [interaction.presence.boundaries()[0] for interaction in model.short_range]
short_range_expirations = [interaction['expiration'] for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else []
concentrations = [
np.array(model.concentration(float(time))).mean()
for time in times
]
lower_concentrations = concentrations_with_sr_breathing(form, model, times, short_range_intervals)
CO2_model: models.CO2ConcentrationModel = form.build_CO2_model()
# compute deposited exposures and CO2 concentrations in parallel to increase performance
deposited_exposures = []
long_range_deposited_exposures = []
CO2_concentrations = []
tasks = []
with executor_factory() as executor:
for time1, time2 in zip(times[:-1], times[1:]):
tasks.append(executor.submit(_calculate_deposited_exposure, model, time1, time2, fn_name="de"))
tasks.append(executor.submit(_calculate_long_range_deposited_exposure, model, time1, time2, fn_name="lr"))
# co2 concentration: takes each time as param, not the interval
tasks.append(executor.submit(_calculate_co2_concentration, CO2_model, time1, fn_name="co2"))
# co2 concentration: calculate the last time too
tasks.append(executor.submit(_calculate_co2_concentration, CO2_model, times[-1], fn_name="co2"))
for task in tasks:
result, fn_name = task.result()
if fn_name == "de":
deposited_exposures.append(result)
elif fn_name == "lr":
long_range_deposited_exposures.append(result)
elif fn_name == "co2":
CO2_concentrations.append(result)
cumulative_doses = np.cumsum(deposited_exposures)
long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures)
prob = np.array(model.infection_probability())
prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True)
prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean()
expected_new_cases = np.array(model.expected_new_cases()).mean()
exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()]
conditional_probability_data = None
uncertainties_plot_src = None
if (form.conditional_probability_viral_loads and
model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] == ViralLoads.COVID_OVERALL.value): # type: ignore
# Generate all the required data for the conditional probability plot
conditional_probability_data = manufacture_conditional_probability_data(model, prob)
# Generate the matplotlib image based on the received data
uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(prob, conditional_probability_data)))
return {
"model_repr": repr(model),
"times": list(times),
"exposed_presence_intervals": exposed_presence_intervals,
"short_range_intervals": short_range_intervals,
"short_range_expirations": short_range_expirations,
"concentrations": concentrations,
"concentrations_zoomed": lower_concentrations,
"cumulative_doses": list(cumulative_doses),
"long_range_cumulative_doses": list(long_range_cumulative_doses),
"prob_inf": prob.mean(),
"prob_inf_sd": prob.std(),
"prob_dist": list(prob),
"prob_hist_count": list(prob_dist_count),
"prob_hist_bins": list(prob_dist_bins),
"prob_probabilistic_exposure": prob_probabilistic_exposure,
"expected_new_cases": expected_new_cases,
"CO2_concentrations": CO2_concentrations,
"conditional_probability_data": conditional_probability_data,
"uncertainties_plot_src": uncertainties_plot_src,
}
def conditional_prob_inf_given_vl_dist(
infection_probability: models._VectorisedFloat,
viral_loads: np.ndarray,
specific_vl: float,
step: models._VectorisedFloat
):
pi_means = []
lower_percentiles = []
upper_percentiles = []
for vl_log in viral_loads:
# Probability of infection corresponding to a certain viral load value in the distribution
specific_prob = infection_probability[np.where((vl_log-step/2-specific_vl)*(vl_log+step/2-specific_vl)<0)[0]] #type: ignore
pi_means.append(specific_prob.mean())
lower_percentiles.append(np.quantile(specific_prob, 0.05))
upper_percentiles.append(np.quantile(specific_prob, 0.95))
return pi_means, lower_percentiles, upper_percentiles
def manufacture_conditional_probability_data(
exposure_model: models.ExposureModel,
infection_probability: models._VectorisedFloat
):
min_vl = 2
max_vl = 10
step = (max_vl - min_vl)/100
viral_loads = np.arange(min_vl, max_vl, step)
specific_vl = np.log10(exposure_model.concentration_model.virus.viral_load_in_sputum)
pi_means, lower_percentiles, upper_percentiles = conditional_prob_inf_given_vl_dist(infection_probability, viral_loads,
specific_vl, step)
log10_vl_in_sputum = np.log10(exposure_model.concentration_model.infected.virus.viral_load_in_sputum)
return {
'viral_loads': list(viral_loads),
'pi_means': list(pi_means),
'lower_percentiles': list(lower_percentiles),
'upper_percentiles': list(upper_percentiles),
'log10_vl_in_sputum': list(log10_vl_in_sputum),
}
def uncertainties_plot(infection_probability: models._VectorisedFloat,
conditional_probability_data: dict):
viral_loads: list = conditional_probability_data['viral_loads']
pi_means: list = conditional_probability_data['pi_means']
lower_percentiles: list = conditional_probability_data['lower_percentiles']
upper_percentiles: list = conditional_probability_data['upper_percentiles']
log10_vl_in_sputum: list = conditional_probability_data['log10_vl_in_sputum']
fig, ((axs00, axs01, axs02), (axs10, axs11, axs12)) = plt.subplots(nrows=2, ncols=3, # type: ignore
gridspec_kw={'width_ratios': [5, 0.5] + [1],
'height_ratios': [3, 1], 'wspace': 0},
sharey='row',
sharex='col')
axs01.axis('off')
axs11.axis('off')
axs12.axis('off')
axs01.set_visible(False)
axs00.plot(viral_loads, np.array(pi_means), label='Predictive total probability')
axs00.fill_between(viral_loads, np.array(lower_percentiles), np.array(upper_percentiles), alpha=0.1, label='5ᵗʰ and 95ᵗʰ percentile')
axs02.hist(infection_probability, bins=30, orientation='horizontal')
axs02.set_xticks([])
axs02.set_xticklabels([])
axs02.set_facecolor("lightgrey")
highest_bar = axs02.get_xlim()[1]
axs02.set_xlim(0, highest_bar)
axs02.text(highest_bar * 0.5, 50,
"$P(I)=$\n" + rf"$\bf{np.round(np.mean(infection_probability), 1)}$%", ha='center', va='center')
axs10.hist(log10_vl_in_sputum,
bins=150, range=(2, 10), color='grey')
axs10.set_facecolor("lightgrey")
axs10.set_yticks([])
axs10.set_yticklabels([])
axs10.set_xticks([i for i in range(2, 13, 2)])
axs10.set_xticklabels(['$10^{' + str(i) + '}$' for i in range(2, 13, 2)])
axs10.set_xlim(2, 10)
axs10.set_xlabel('Viral load\n(RNA copies)', fontsize=12)
axs00.set_ylabel('Conditional Probability\nof Infection', fontsize=12)
axs00.text(9.5, -0.01, '$(i)$')
axs10.text(9.5, axs10.get_ylim()[1] * 0.8, '$(ii)$')
axs02.set_title('$(iii)$', fontsize=10)
axs00.legend()
return fig
def _figure2bytes(figure):
# Draw the image
img_data = io.BytesIO()
figure.savefig(img_data, format='png', bbox_inches="tight", transparent=True, dpi=110)
return img_data
def img2base64(img_data) -> str:
img_data.seek(0)
pic_hash = base64.b64encode(img_data.read()).decode('ascii')
# A src suitable for a tag such as f'<img id="scenario_concentration_plot" src="{result}">.
return f'data:image/png;base64,{pic_hash}'

View file

@ -0,0 +1,487 @@
import concurrent.futures
import base64
import dataclasses
import io
import typing
import numpy as np
import matplotlib.pyplot as plt
import urllib
import zlib
from caimira.calculator.models import models, dataclass_utils, profiler, monte_carlo as mc
from caimira.calculator.models.enums import ViralLoads
from caimira.calculator.validators.virus.virus_validator import VirusFormData
def model_start_end(model: models.ExposureModel):
t_start = min(model.exposed.presence_interval().boundaries()[0][0],
model.concentration_model.infected.presence_interval().boundaries()[0][0])
t_end = max(model.exposed.presence_interval().boundaries()[-1][1],
model.concentration_model.infected.presence_interval().boundaries()[-1][1])
return t_start, t_end
def fill_big_gaps(array, gap_size):
"""
Insert values into the given sorted list if there is a gap of more than ``gap_size``.
All values in the given array are preserved, even if they are within the ``gap_size`` of one another.
>>> fill_big_gaps([1, 2, 4], gap_size=0.75)
[1, 1.75, 2, 2.75, 3.5, 4]
"""
result = []
if len(array) == 0:
raise ValueError("Input array must be len > 0")
last_value = array[0]
for value in array:
while value - last_value > gap_size + 1e-15:
last_value = last_value + gap_size
result.append(last_value)
result.append(value)
last_value = value
return result
def non_temp_transition_times(model: models.ExposureModel):
"""
Return the non-temperature (and PiecewiseConstant) based transition times.
"""
def walk_model(model, name=""):
# Extend walk_dataclass to handle lists of dataclasses
# (e.g. in MultipleVentilation).
for name, obj in dataclass_utils.walk_dataclass(model, name=name):
if name.endswith('.ventilations') and isinstance(obj, (list, tuple)):
for i, item in enumerate(obj):
fq_name_i = f'{name}[{i}]'
yield fq_name_i, item
if dataclasses.is_dataclass(item):
yield from dataclass_utils.walk_dataclass(item, name=fq_name_i)
else:
yield name, obj
t_start, t_end = model_start_end(model)
change_times = {t_start, t_end}
for name, obj in walk_model(model, name="exposure"):
if isinstance(obj, models.Interval):
change_times |= obj.transition_times()
# Only choose times that are in the range of the model (removes things
# such as PeriodicIntervals, which extend beyond the model itself).
return sorted(time for time in change_times if (t_start <= time <= t_end))
def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional[int] = None) -> typing.List[float]:
"""
Pick approximately ``approx_n_pts`` time points which are interesting for the
given model. If not provided by argument, ``approx_n_pts`` is set to be 15 times
the number of hours of the simulation.
Initially the times are seeded by important state change times (excluding
outside temperature), and the times are then subsequently expanded to ensure
that the step size is at most ``(t_end - t_start) / approx_n_pts``.
"""
times = non_temp_transition_times(model)
sim_duration = max(times) - min(times)
if not approx_n_pts:
approx_n_pts = sim_duration * 15
# Expand the times list to ensure that we have a maximum gap size between
# the key times.
nice_times = fill_big_gaps(times, gap_size=(sim_duration) / approx_n_pts)
return nice_times
def concentrations_with_sr_breathing(form: VirusFormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]:
lower_concentrations = []
for time in times:
for index, (start, stop) in enumerate(short_range_intervals):
# For visualization issues, add short-range breathing activity to the initial long-range concentrations
if start <= time <= stop and form.short_range_interactions[index]['expiration'] == 'Breathing':
lower_concentrations.append(
np.array(model.concentration(float(time))).mean())
break
lower_concentrations.append(
np.array(model.concentration_model.concentration(float(time))).mean())
return lower_concentrations
def _calculate_deposited_exposure(model, time1, time2, fn_name=None):
return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name
def _calculate_long_range_deposited_exposure(model, time1, time2, fn_name=None):
return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name
def _calculate_co2_concentration(CO2_model, time, fn_name=None):
return np.array(CO2_model.concentration(float(time))).mean(), fn_name
@profiler.profile
def calculate_report_data(form: VirusFormData, model: models.ExposureModel, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]:
times = interesting_times(model)
short_range_intervals = [interaction.presence.boundaries()[0]
for interaction in model.short_range]
short_range_expirations = [interaction['expiration']
for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else []
concentrations = [
np.array(model.concentration(float(time))).mean()
for time in times
]
lower_concentrations = concentrations_with_sr_breathing(
form, model, times, short_range_intervals)
CO2_model: models.CO2ConcentrationModel = form.build_CO2_model()
# compute deposited exposures and CO2 concentrations in parallel to increase performance
deposited_exposures = []
long_range_deposited_exposures = []
CO2_concentrations = []
tasks = []
with executor_factory() as executor:
for time1, time2 in zip(times[:-1], times[1:]):
tasks.append(executor.submit(
_calculate_deposited_exposure, model, time1, time2, fn_name="de"))
tasks.append(executor.submit(
_calculate_long_range_deposited_exposure, model, time1, time2, fn_name="lr"))
# co2 concentration: takes each time as param, not the interval
tasks.append(executor.submit(
_calculate_co2_concentration, CO2_model, time1, fn_name="co2"))
# co2 concentration: calculate the last time too
tasks.append(executor.submit(_calculate_co2_concentration,
CO2_model, times[-1], fn_name="co2"))
for task in tasks:
result, fn_name = task.result()
if fn_name == "de":
deposited_exposures.append(result)
elif fn_name == "lr":
long_range_deposited_exposures.append(result)
elif fn_name == "co2":
CO2_concentrations.append(result)
cumulative_doses = np.cumsum(deposited_exposures)
long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures)
prob = np.array(model.infection_probability())
prob_dist_count, prob_dist_bins = np.histogram(
prob/100, bins=100, density=True)
prob_probabilistic_exposure = np.array(
model.total_probability_rule()).mean()
expected_new_cases = np.array(model.expected_new_cases()).mean()
exposed_presence_intervals = [
list(interval) for interval in model.exposed.presence_interval().boundaries()]
conditional_probability_data = None
uncertainties_plot_src = None
if (form.conditional_probability_viral_loads and
model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] == ViralLoads.COVID_OVERALL.value): # type: ignore
# Generate all the required data for the conditional probability plot
conditional_probability_data = manufacture_conditional_probability_data(
model, prob)
# Generate the matplotlib image based on the received data
uncertainties_plot_src = img2base64(_figure2bytes(
uncertainties_plot(prob, conditional_probability_data)))
return {
"model_repr": repr(model),
"times": list(times),
"exposed_presence_intervals": exposed_presence_intervals,
"short_range_intervals": short_range_intervals,
"short_range_expirations": short_range_expirations,
"concentrations": concentrations,
"concentrations_zoomed": lower_concentrations,
"cumulative_doses": list(cumulative_doses),
"long_range_cumulative_doses": list(long_range_cumulative_doses),
"prob_inf": prob.mean(),
"prob_inf_sd": prob.std(),
"prob_dist": list(prob),
"prob_hist_count": list(prob_dist_count),
"prob_hist_bins": list(prob_dist_bins),
"prob_probabilistic_exposure": prob_probabilistic_exposure,
"expected_new_cases": expected_new_cases,
"CO2_concentrations": CO2_concentrations,
"conditional_probability_data": conditional_probability_data,
"uncertainties_plot_src": uncertainties_plot_src,
}
def conditional_prob_inf_given_vl_dist(
infection_probability: models._VectorisedFloat,
viral_loads: np.ndarray,
specific_vl: float,
step: models._VectorisedFloat
):
pi_means = []
lower_percentiles = []
upper_percentiles = []
for vl_log in viral_loads:
# Probability of infection corresponding to a certain viral load value in the distribution
specific_prob = infection_probability[np.where(
(vl_log-step/2-specific_vl)*(vl_log+step/2-specific_vl) < 0)[0]] # type: ignore
pi_means.append(specific_prob.mean())
lower_percentiles.append(np.quantile(specific_prob, 0.05))
upper_percentiles.append(np.quantile(specific_prob, 0.95))
return pi_means, lower_percentiles, upper_percentiles
def manufacture_conditional_probability_data(
exposure_model: models.ExposureModel,
infection_probability: models._VectorisedFloat
):
min_vl = 2
max_vl = 10
step = (max_vl - min_vl)/100
viral_loads = np.arange(min_vl, max_vl, step)
specific_vl = np.log10(
exposure_model.concentration_model.virus.viral_load_in_sputum)
pi_means, lower_percentiles, upper_percentiles = conditional_prob_inf_given_vl_dist(infection_probability, viral_loads,
specific_vl, step)
log10_vl_in_sputum = np.log10(
exposure_model.concentration_model.infected.virus.viral_load_in_sputum)
return {
'viral_loads': list(viral_loads),
'pi_means': list(pi_means),
'lower_percentiles': list(lower_percentiles),
'upper_percentiles': list(upper_percentiles),
'log10_vl_in_sputum': list(log10_vl_in_sputum),
}
def uncertainties_plot(infection_probability: models._VectorisedFloat,
conditional_probability_data: dict):
viral_loads: list = conditional_probability_data['viral_loads']
pi_means: list = conditional_probability_data['pi_means']
lower_percentiles: list = conditional_probability_data['lower_percentiles']
upper_percentiles: list = conditional_probability_data['upper_percentiles']
log10_vl_in_sputum: list = conditional_probability_data['log10_vl_in_sputum']
fig, ((axs00, axs01, axs02), (axs10, axs11, axs12)) = plt.subplots(nrows=2, ncols=3, # type: ignore
gridspec_kw={'width_ratios': [5, 0.5] + [1],
'height_ratios': [3, 1], 'wspace': 0},
sharey='row',
sharex='col')
axs01.axis('off')
axs11.axis('off')
axs12.axis('off')
axs01.set_visible(False)
axs00.plot(viral_loads, np.array(pi_means),
label='Predictive total probability')
axs00.fill_between(viral_loads, np.array(lower_percentiles), np.array(
upper_percentiles), alpha=0.1, label='5ᵗʰ and 95ᵗʰ percentile')
axs02.hist(infection_probability, bins=30, orientation='horizontal')
axs02.set_xticks([])
axs02.set_xticklabels([])
axs02.set_facecolor("lightgrey")
highest_bar = axs02.get_xlim()[1]
axs02.set_xlim(0, highest_bar)
axs02.text(highest_bar * 0.5, 50,
"$P(I)=$\n" + rf"$\bf{np.round(np.mean(infection_probability), 1)}$%", ha='center', va='center')
axs10.hist(log10_vl_in_sputum,
bins=150, range=(2, 10), color='grey')
axs10.set_facecolor("lightgrey")
axs10.set_yticks([])
axs10.set_yticklabels([])
axs10.set_xticks([i for i in range(2, 13, 2)])
axs10.set_xticklabels(['$10^{' + str(i) + '}$' for i in range(2, 13, 2)])
axs10.set_xlim(2, 10)
axs10.set_xlabel('Viral load\n(RNA copies)', fontsize=12)
axs00.set_ylabel('Conditional Probability\nof Infection', fontsize=12)
axs00.text(9.5, -0.01, '$(i)$')
axs10.text(9.5, axs10.get_ylim()[1] * 0.8, '$(ii)$')
axs02.set_title('$(iii)$', fontsize=10)
axs00.legend()
return fig
def _figure2bytes(figure):
# Draw the image
img_data = io.BytesIO()
figure.savefig(img_data, format='png', bbox_inches="tight",
transparent=True, dpi=110)
return img_data
def img2base64(img_data) -> str:
img_data.seek(0)
pic_hash = base64.b64encode(img_data.read()).decode('ascii')
# A src suitable for a tag such as f'<img id="scenario_concentration_plot" src="{result}">.
return f'data:image/png;base64,{pic_hash}'
def generate_permalink(base_url, get_root_url, get_root_calculator_url, form: VirusFormData):
form_dict = VirusFormData.to_dict(form, strip_defaults=True)
# Generate the calculator URL arguments that would be needed to re-create this
# form.
args = urllib.parse.urlencode(form_dict)
# Then zlib compress + base64 encode the string. To be inverted by the
# /_c/ endpoint.
compressed_args = base64.b64encode(zlib.compress(args.encode())).decode()
qr_url = f"{base_url}{get_root_url()}/_c/{compressed_args}"
url = f"{base_url}{get_root_calculator_url()}?{args}"
return {
'link': url,
'shortened': qr_url,
}
def manufacture_viral_load_scenarios_percentiles(model: mc.ExposureModel) -> typing.Dict[str, mc.ExposureModel]:
viral_load = model.concentration_model.infected.virus.viral_load_in_sputum
scenarios = {}
for percentil in (0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99):
vl = np.quantile(viral_load, percentil)
specific_vl_scenario = dataclass_utils.nested_replace(model,
{'concentration_model.infected.virus.viral_load_in_sputum': vl}
)
scenarios[str(vl)] = np.mean(
specific_vl_scenario.infection_probability())
return scenarios
def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]:
scenarios = {}
if (form.short_range_option == "short_range_no"):
# Two special option cases - HEPA and/or FFP2 masks.
FFP2_being_worn = bool(form.mask_wearing_option ==
'mask_on' and form.mask_type == 'FFP2')
if FFP2_being_worn and form.hepa_option:
FFP2andHEPAalternative = dataclass_utils.replace(
form, mask_type='Type I')
if not (form.hepa_option and form.mask_wearing_option == 'mask_on' and form.mask_type == 'Type I'):
scenarios['Base scenario with HEPA filter and Type I masks'] = FFP2andHEPAalternative.build_mc_model()
if not FFP2_being_worn and form.hepa_option:
noHEPAalternative = dataclass_utils.replace(form, mask_type='FFP2')
noHEPAalternative = dataclass_utils.replace(
noHEPAalternative, mask_wearing_option='mask_on')
noHEPAalternative = dataclass_utils.replace(
noHEPAalternative, hepa_option=False)
if not (not form.hepa_option and FFP2_being_worn):
scenarios['Base scenario without HEPA filter, with FFP2 masks'] = noHEPAalternative.build_mc_model()
# The remaining scenarios are based on Type I masks (possibly not worn)
# and no HEPA filtration.
form = dataclass_utils.replace(form, mask_type='Type I')
if form.hepa_option:
form = dataclass_utils.replace(form, hepa_option=False)
with_mask = dataclass_utils.replace(
form, mask_wearing_option='mask_on')
without_mask = dataclass_utils.replace(
form, mask_wearing_option='mask_off')
if form.ventilation_type == 'mechanical_ventilation':
# scenarios['Mechanical ventilation with Type I masks'] = with_mask.build_mc_model()
if not (form.mask_wearing_option == 'mask_off'):
scenarios['Mechanical ventilation without masks'] = without_mask.build_mc_model(
)
elif form.ventilation_type == 'natural_ventilation':
# scenarios['Windows open with Type I masks'] = with_mask.build_mc_model()
if not (form.mask_wearing_option == 'mask_off'):
scenarios['Windows open without masks'] = without_mask.build_mc_model()
# No matter the ventilation scheme, we include scenarios which don't have any ventilation.
with_mask_no_vent = dataclass_utils.replace(
with_mask, ventilation_type='no_ventilation')
without_mask_or_vent = dataclass_utils.replace(
without_mask, ventilation_type='no_ventilation')
if not (form.mask_wearing_option == 'mask_on' and form.mask_type == 'Type I' and form.ventilation_type == 'no_ventilation'):
scenarios['No ventilation with Type I masks'] = with_mask_no_vent.build_mc_model()
if not (form.mask_wearing_option == 'mask_off' and form.ventilation_type == 'no_ventilation'):
scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model()
else:
no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[],
total_people=form.total_people - form.short_range_occupants)
scenarios['Base scenario without short-range interactions'] = no_short_range_alternative.build_mc_model()
return scenarios
def scenario_statistics(
mc_model: mc.ExposureModel,
sample_times: typing.List[float],
compute_prob_exposure: bool
):
model = mc_model.build_model(
size=mc_model.data_registry.monte_carlo['sample_size'])
if (compute_prob_exposure):
# It means we have data to calculate the total_probability_rule
prob_probabilistic_exposure = model.total_probability_rule()
else:
prob_probabilistic_exposure = 0.
return {
'probability_of_infection': np.mean(model.infection_probability()),
'expected_new_cases': np.mean(model.expected_new_cases()),
'concentrations': [
np.mean(model.concentration(time))
for time in sample_times
],
'prob_probabilistic_exposure': prob_probabilistic_exposure,
}
def comparison_report(
form: VirusFormData,
report_data: typing.Dict[str, typing.Any],
scenarios: typing.Dict[str, mc.ExposureModel],
sample_times: typing.List[float],
executor_factory: typing.Callable[[], concurrent.futures.Executor],
):
if (form.short_range_option == "short_range_no"):
statistics = {
'Current scenario': {
'probability_of_infection': report_data['prob_inf'],
'expected_new_cases': report_data['expected_new_cases'],
'concentrations': report_data['concentrations'],
}
}
else:
statistics = {}
if (form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure"):
compute_prob_exposure = True
else:
compute_prob_exposure = False
with executor_factory() as executor:
results = executor.map(
scenario_statistics,
scenarios.values(),
[sample_times] * len(scenarios),
[compute_prob_exposure] * len(scenarios),
timeout=60,
)
for (name, model), model_stats in zip(scenarios.items(), results):
statistics[name] = model_stats
return {
'stats': statistics,
}

View file

@ -11,7 +11,7 @@ from ..form_validator import FormData, cast_class_fields
from ..defaults import NO_DEFAULT
from ...store.data_registry import DataRegistry
from ...models import models
from ...report.report_generator import img2base64, _figure2bytes
from ...report.virus_report_data import img2base64, _figure2bytes
minutes_since_midnight = typing.NewType('minutes_since_midnight', int)

View file

@ -5,7 +5,7 @@ from retry import retry
import caimira.calculator.models.monte_carlo as mc
from caimira.calculator.models import models
from caimira.calculator.models.dataclass_utils import nested_replace
from caimira.calculator.report import report_generator
from caimira.calculator.report import virus_report_data
from caimira.calculator.models.monte_carlo.data import activity_distributions, virus_distributions, expiration_distributions
@ -72,7 +72,7 @@ def test_conditional_prob_inf_given_vl_dist(data_registry, baseline_exposure_mod
specific_vl = np.log10(mc_model.concentration_model.infected.virus.viral_load_in_sputum)
step = 8/100
actual_pi_means, actual_lower_percentiles, actual_upper_percentiles = (
report_generator.conditional_prob_inf_given_vl_dist(infection_probability, viral_loads, specific_vl, step)
virus_report_data.conditional_prob_inf_given_vl_dist(infection_probability, viral_loads, specific_vl, step)
)
assert np.allclose(actual_pi_means, expected_pi_means, atol=0.002)

View file

@ -29,13 +29,12 @@ from caimira.calculator.models.profiler import CaimiraProfiler, Profilers
from caimira.calculator.store.data_registry import DataRegistry
from caimira.calculator.store.data_service import DataService
from caimira.api.controller.report_controller import generate_form_obj, generate_model, generate_report_results
from caimira.calculator.report.report_generator import calculate_report_data
from caimira.api.controller import virus_report_controller, co2_report_controller
from caimira.calculator.report.virus_report_data import calculate_report_data
from caimira.calculator.validators.virus import virus_validator
from caimira.calculator.validators.co2 import co2_validator
from . import markdown_tools
from ..calculator.report.virus_report import ReportGenerator
from .report.virus_report import VirusReportGenerator
from ..calculator.report.co2_report import CO2ReportGenerator
from .user import AuthenticatedUser, AnonymousUser
@ -181,7 +180,7 @@ class ConcentrationModel(BaseRequestHandler):
LOG.debug(pformat(requested_model_config))
try:
form = generate_form_obj(requested_model_config, data_registry)
form = virus_report_controller.generate_form_obj(requested_model_config, data_registry)
except Exception as err:
LOG.exception(err)
response_json = {'code': 400, 'error': f'Your request was invalid {html.escape(str(err))}'}
@ -190,7 +189,7 @@ class ConcentrationModel(BaseRequestHandler):
return
base_url = self.request.protocol + "://" + self.request.host
report_generator: ReportGenerator = self.settings['report_generator']
report_generator: VirusReportGenerator = self.settings['report_generator']
executor = loky.get_reusable_executor(
max_workers=self.settings['handler_worker_pool_size'],
timeout=300,
@ -235,7 +234,7 @@ class ConcentrationModelJsonResponse(BaseRequestHandler):
LOG.debug(pformat(requested_model_config))
try:
form = generate_form_obj(requested_model_config, data_registry)
form = virus_report_controller.generate_form_obj(requested_model_config, data_registry)
except Exception as err:
LOG.exception(err)
response_json = {'code': 400, 'error': f'Your request was invalid {html.escape(str(err))}'}
@ -247,7 +246,7 @@ class ConcentrationModelJsonResponse(BaseRequestHandler):
max_workers=self.settings['handler_worker_pool_size'],
timeout=300,
)
model = generate_model(form)
model = virus_report_controller.generate_model(form, data_registry)
report_data_task = executor.submit(calculate_report_data, form, model,
executor_factory=functools.partial(
concurrent.futures.ThreadPoolExecutor,
@ -266,10 +265,10 @@ class StaticModel(BaseRequestHandler):
if data_service:
data_service.update_registry(data_registry)
form = generate_form_obj(virus_validator.baseline_raw_form_data(), data_registry)
form = virus_report_controller.generate_form_obj(virus_validator.baseline_raw_form_data(), data_registry)
base_url = self.request.protocol + "://" + self.request.host
report_generator: ReportGenerator = self.settings['report_generator']
report_generator: VirusReportGenerator = self.settings['report_generator']
executor = loky.get_reusable_executor(max_workers=self.settings['handler_worker_pool_size'])
report_task = executor.submit(
@ -409,10 +408,7 @@ class CO2ModelResponse(BaseRequestHandler):
requested_model_config = tornado.escape.json_decode(self.request.body)
try:
form: co2_validator.CO2FormData = co2_validator.CO2FormData.from_dict(
requested_model_config,
data_registry
)
form = co2_report_controller.generate_form_obj(requested_model_config, data_registry)
except Exception as err:
if self.settings.get("debug", False):
import traceback
@ -544,7 +540,7 @@ def make_app(
data_service=data_service,
template_environment=template_environment,
default_handler_class=Missing404Handler,
report_generator=ReportGenerator(loader, get_root_url, get_root_calculator_url),
report_generator=VirusReportGenerator(loader, get_root_url, get_root_calculator_url),
xsrf_cookies=True,
# COOKIE_SECRET being undefined will result in no login information being
# presented to the user.

View file

@ -1,67 +1,14 @@
import dataclasses
import caimira.calculator.report.co2_report_data as co2_rep_data
from caimira.calculator.validators.co2.co2_validator import CO2FormData
from caimira.calculator.models.models import CO2DataModel
@dataclasses.dataclass
class CO2ReportGenerator:
def build_initial_plot(
self,
form: CO2FormData,
) -> dict:
'''
Initial plot with the suggested ventilation state changes.
This method receives the form input and returns the CO2
plot with the respective transition times.
'''
CO2model: CO2DataModel = form.build_model()
occupancy_transition_times = list(CO2model.occupancy.transition_times)
ventilation_transition_times: list = form.find_change_points()
# The entire ventilation changes consider the initial and final occupancy state change
all_vent_transition_times: list = sorted(
[occupancy_transition_times[0]] +
[occupancy_transition_times[-1]] +
ventilation_transition_times)
ventilation_plot: str = form.generate_ventilation_plot(
ventilation_transition_times=all_vent_transition_times,
occupancy_transition_times=occupancy_transition_times
)
context = {
'CO2_plot': ventilation_plot,
'transition_times': [round(el, 2) for el in all_vent_transition_times],
}
return context
def build_initial_plot(self, form: CO2FormData):
return co2_rep_data.build_initial_plot(form=form)
def build_fitting_results(
self,
form: CO2FormData,
) -> dict:
'''
Final fitting results with the respective predictive CO2.
This method receives the form input and returns the fitting results
along with the CO2 plot with the predictive CO2.
'''
CO2model: CO2DataModel = form.build_model()
# Ventilation times after user manipulation from the suggested ventilation state change times.
ventilation_transition_times = list(CO2model.ventilation_transition_times)
# The result of the following method is a dict with the results of the fitting
# algorithm, namely the breathing rate and ACH values. It also returns the
# predictive CO2 result based on the fitting results.
context = dict(CO2model.CO2_fit_params())
# Add the transition times and CO2 plot to the results.
context['transition_times'] = ventilation_transition_times
context['CO2_plot'] = form.generate_ventilation_plot(ventilation_transition_times=ventilation_transition_times[:-1],
predictive_CO2=context['predictive_CO2'])
return context
def build_fitting_results(self, form: CO2FormData):
return co2_rep_data.build_fitting_results(form=form)

View file

@ -6,254 +6,12 @@ import json
import typing
import jinja2
import numpy as np
import urllib
import base64
import zlib
from .. import markdown_tools
from caimira.calculator.models import dataclass_utils, models, monte_carlo as mc
from caimira.calculator.models import models
from caimira.calculator.validators.virus.virus_validator import VirusFormData
from caimira.calculator.report.report_generator import calculate_report_data
def generate_permalink(base_url, get_root_url, get_root_calculator_url, form: VirusFormData):
form_dict = VirusFormData.to_dict(form, strip_defaults=True)
# Generate the calculator URL arguments that would be needed to re-create this
# form.
args = urllib.parse.urlencode(form_dict)
# Then zlib compress + base64 encode the string. To be inverted by the
# /_c/ endpoint.
compressed_args = base64.b64encode(zlib.compress(args.encode())).decode()
qr_url = f"{base_url}{get_root_url()}/_c/{compressed_args}"
url = f"{base_url}{get_root_calculator_url()}?{args}"
return {
'link': url,
'shortened': qr_url,
}
def model_start_end(model: models.ExposureModel):
t_start = min(model.exposed.presence_interval().boundaries()[0][0],
model.concentration_model.infected.presence_interval().boundaries()[0][0])
t_end = max(model.exposed.presence_interval().boundaries()[-1][1],
model.concentration_model.infected.presence_interval().boundaries()[-1][1])
return t_start, t_end
def fill_big_gaps(array, gap_size):
"""
Insert values into the given sorted list if there is a gap of more than ``gap_size``.
All values in the given array are preserved, even if they are within the ``gap_size`` of one another.
>>> fill_big_gaps([1, 2, 4], gap_size=0.75)
[1, 1.75, 2, 2.75, 3.5, 4]
"""
result = []
if len(array) == 0:
raise ValueError("Input array must be len > 0")
last_value = array[0]
for value in array:
while value - last_value > gap_size + 1e-15:
last_value = last_value + gap_size
result.append(last_value)
result.append(value)
last_value = value
return result
def non_temp_transition_times(model: models.ExposureModel):
"""
Return the non-temperature (and PiecewiseConstant) based transition times.
"""
def walk_model(model, name=""):
# Extend walk_dataclass to handle lists of dataclasses
# (e.g. in MultipleVentilation).
for name, obj in dataclass_utils.walk_dataclass(model, name=name):
if name.endswith('.ventilations') and isinstance(obj, (list, tuple)):
for i, item in enumerate(obj):
fq_name_i = f'{name}[{i}]'
yield fq_name_i, item
if dataclasses.is_dataclass(item):
yield from dataclass_utils.walk_dataclass(item, name=fq_name_i)
else:
yield name, obj
t_start, t_end = model_start_end(model)
change_times = {t_start, t_end}
for name, obj in walk_model(model, name="exposure"):
if isinstance(obj, models.Interval):
change_times |= obj.transition_times()
# Only choose times that are in the range of the model (removes things
# such as PeriodicIntervals, which extend beyond the model itself).
return sorted(time for time in change_times if (t_start <= time <= t_end))
def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional[int] = None) -> typing.List[float]:
"""
Pick approximately ``approx_n_pts`` time points which are interesting for the
given model. If not provided by argument, ``approx_n_pts`` is set to be 15 times
the number of hours of the simulation.
Initially the times are seeded by important state change times (excluding
outside temperature), and the times are then subsequently expanded to ensure
that the step size is at most ``(t_end - t_start) / approx_n_pts``.
"""
times = non_temp_transition_times(model)
sim_duration = max(times) - min(times)
if not approx_n_pts:
approx_n_pts = sim_duration * 15
# Expand the times list to ensure that we have a maximum gap size between
# the key times.
nice_times = fill_big_gaps(times, gap_size=(sim_duration) / approx_n_pts)
return nice_times
def manufacture_viral_load_scenarios_percentiles(model: mc.ExposureModel) -> typing.Dict[str, mc.ExposureModel]:
viral_load = model.concentration_model.infected.virus.viral_load_in_sputum
scenarios = {}
for percentil in (0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99):
vl = np.quantile(viral_load, percentil)
specific_vl_scenario = dataclass_utils.nested_replace(model,
{'concentration_model.infected.virus.viral_load_in_sputum': vl}
)
scenarios[str(vl)] = np.mean(
specific_vl_scenario.infection_probability())
return scenarios
def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]:
scenarios = {}
if (form.short_range_option == "short_range_no"):
# Two special option cases - HEPA and/or FFP2 masks.
FFP2_being_worn = bool(form.mask_wearing_option ==
'mask_on' and form.mask_type == 'FFP2')
if FFP2_being_worn and form.hepa_option:
FFP2andHEPAalternative = dataclass_utils.replace(
form, mask_type='Type I')
if not (form.hepa_option and form.mask_wearing_option == 'mask_on' and form.mask_type == 'Type I'):
scenarios['Base scenario with HEPA filter and Type I masks'] = FFP2andHEPAalternative.build_mc_model()
if not FFP2_being_worn and form.hepa_option:
noHEPAalternative = dataclass_utils.replace(form, mask_type='FFP2')
noHEPAalternative = dataclass_utils.replace(
noHEPAalternative, mask_wearing_option='mask_on')
noHEPAalternative = dataclass_utils.replace(
noHEPAalternative, hepa_option=False)
if not (not form.hepa_option and FFP2_being_worn):
scenarios['Base scenario without HEPA filter, with FFP2 masks'] = noHEPAalternative.build_mc_model()
# The remaining scenarios are based on Type I masks (possibly not worn)
# and no HEPA filtration.
form = dataclass_utils.replace(form, mask_type='Type I')
if form.hepa_option:
form = dataclass_utils.replace(form, hepa_option=False)
with_mask = dataclass_utils.replace(
form, mask_wearing_option='mask_on')
without_mask = dataclass_utils.replace(
form, mask_wearing_option='mask_off')
if form.ventilation_type == 'mechanical_ventilation':
# scenarios['Mechanical ventilation with Type I masks'] = with_mask.build_mc_model()
if not (form.mask_wearing_option == 'mask_off'):
scenarios['Mechanical ventilation without masks'] = without_mask.build_mc_model(
)
elif form.ventilation_type == 'natural_ventilation':
# scenarios['Windows open with Type I masks'] = with_mask.build_mc_model()
if not (form.mask_wearing_option == 'mask_off'):
scenarios['Windows open without masks'] = without_mask.build_mc_model()
# No matter the ventilation scheme, we include scenarios which don't have any ventilation.
with_mask_no_vent = dataclass_utils.replace(
with_mask, ventilation_type='no_ventilation')
without_mask_or_vent = dataclass_utils.replace(
without_mask, ventilation_type='no_ventilation')
if not (form.mask_wearing_option == 'mask_on' and form.mask_type == 'Type I' and form.ventilation_type == 'no_ventilation'):
scenarios['No ventilation with Type I masks'] = with_mask_no_vent.build_mc_model()
if not (form.mask_wearing_option == 'mask_off' and form.ventilation_type == 'no_ventilation'):
scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model()
else:
no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[],
total_people=form.total_people - form.short_range_occupants)
scenarios['Base scenario without short-range interactions'] = no_short_range_alternative.build_mc_model()
return scenarios
def scenario_statistics(
mc_model: mc.ExposureModel,
sample_times: typing.List[float],
compute_prob_exposure: bool
):
model = mc_model.build_model(
size=mc_model.data_registry.monte_carlo['sample_size'])
if (compute_prob_exposure):
# It means we have data to calculate the total_probability_rule
prob_probabilistic_exposure = model.total_probability_rule()
else:
prob_probabilistic_exposure = 0.
return {
'probability_of_infection': np.mean(model.infection_probability()),
'expected_new_cases': np.mean(model.expected_new_cases()),
'concentrations': [
np.mean(model.concentration(time))
for time in sample_times
],
'prob_probabilistic_exposure': prob_probabilistic_exposure,
}
def comparison_report(
form: VirusFormData,
report_data: typing.Dict[str, typing.Any],
scenarios: typing.Dict[str, mc.ExposureModel],
sample_times: typing.List[float],
executor_factory: typing.Callable[[], concurrent.futures.Executor],
):
if (form.short_range_option == "short_range_no"):
statistics = {
'Current scenario': {
'probability_of_infection': report_data['prob_inf'],
'expected_new_cases': report_data['expected_new_cases'],
'concentrations': report_data['concentrations'],
}
}
else:
statistics = {}
if (form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure"):
compute_prob_exposure = True
else:
compute_prob_exposure = False
with executor_factory() as executor:
results = executor.map(
scenario_statistics,
scenarios.values(),
[sample_times] * len(scenarios),
[compute_prob_exposure] * len(scenarios),
timeout=60,
)
for (name, model), model_stats in zip(scenarios.items(), results):
statistics[name] = model_stats
return {
'stats': statistics,
}
from caimira.calculator.report.virus_report_data import calculate_report_data, interesting_times, manufacture_alternative_scenarios, manufacture_viral_load_scenarios_percentiles, comparison_report, generate_permalink
def minutes_to_time(minutes: int) -> str:
@ -305,7 +63,7 @@ def non_zero_percentage(percentage: int) -> str:
@dataclasses.dataclass
class ReportGenerator:
class VirusReportGenerator:
jinja_loader: jinja2.BaseLoader
get_root_url: typing.Any
get_root_calculator_url: typing.Any
@ -317,7 +75,8 @@ class ReportGenerator:
executor_factory: typing.Callable[[], concurrent.futures.Executor],
) -> str:
model = form.build_model()
context = self.prepare_context(base_url, model, form, executor_factory=executor_factory)
context = self.prepare_context(
base_url, model, form, executor_factory=executor_factory)
return self.render(context)
def prepare_context(
@ -339,15 +98,18 @@ class ReportGenerator:
}
scenario_sample_times = interesting_times(model)
report_data = calculate_report_data(form, model, executor_factory=executor_factory)
report_data = calculate_report_data(
form, model, executor_factory=executor_factory)
context.update(report_data)
alternative_scenarios = manufacture_alternative_scenarios(form)
context['alternative_viral_load'] = manufacture_viral_load_scenarios_percentiles(model) if form.conditional_probability_viral_loads else None
context['alternative_viral_load'] = manufacture_viral_load_scenarios_percentiles(
model) if form.conditional_probability_viral_loads else None
context['alternative_scenarios'] = comparison_report(
form, report_data, alternative_scenarios, scenario_sample_times, executor_factory=executor_factory,
)
context['permalink'] = generate_permalink(base_url, self.get_root_url, self.get_root_calculator_url, form)
context['permalink'] = generate_permalink(
base_url, self.get_root_url, self.get_root_calculator_url, form)
context['get_url'] = self.get_root_url
context['get_calculator_url'] = self.get_root_calculator_url
@ -374,4 +136,3 @@ class ReportGenerator:
def render(self, context: dict) -> str:
template = self._template_environment().get_template("calculator.report.html.j2")
return template.render(**context, text_blocks=template.globals["common_text"])

View file

@ -7,10 +7,9 @@ import numpy as np
import pytest
from cern_caimira.apps.calculator import make_app
from cern_caimira.apps.calculator import ReportGenerator
from cern_caimira.apps.calculator.report.virus_report import readable_minutes, manufacture_alternative_scenarios, comparison_report
import caimira.calculator.report.report_generator as rep_gen
from caimira.api.controller.report_controller import generate_model, generate_report_results
from cern_caimira.apps.calculator import VirusReportGenerator
from cern_caimira.apps.calculator.report.virus_report import readable_minutes
import caimira.calculator.report.virus_report_data as rep_gen
from caimira.calculator.validators.virus.virus_validator import VirusFormData
@ -24,7 +23,7 @@ def test_generate_report(baseline_form) -> None:
start = time.perf_counter()
generator: ReportGenerator = make_app().settings['report_generator']
generator: VirusReportGenerator = make_app().settings['report_generator']
report = generator.build_report("", baseline_form, partial(
concurrent.futures.ThreadPoolExecutor, 1,
@ -119,8 +118,8 @@ def test_expected_new_cases(baseline_form_with_sr: VirusFormData):
# Long-range contributions alone
scenario_sample_times = rep_gen.interesting_times(model)
alternative_scenarios = manufacture_alternative_scenarios(baseline_form_with_sr)
alternative_statistics = comparison_report(
alternative_scenarios = rep_gen.manufacture_alternative_scenarios(baseline_form_with_sr)
alternative_statistics = rep_gen.comparison_report(
baseline_form_with_sr, report_data, alternative_scenarios, scenario_sample_times, executor_factory=executor_factory,
)