diff --git a/README.md b/README.md
index d71dc5a1..572f7221 100644
--- a/README.md
+++ b/README.md
@@ -109,7 +109,7 @@ python -m cern_caimira.apps.calculator
To run with a specific template theme created:
```
-python -m cern_caimira.apps.calculator --theme=ui/apps/templates/{theme}
+python -m cern_caimira.apps.calculator --theme=cern_caimira/src/cern_caimira/apps/templates/{theme}
```
To run the entire app in a different `APPLICATION_ROOT` path:
@@ -168,9 +168,11 @@ Running with Visual Studio Code (VSCode):
### Running the tests
+Make sure you are in the root directory of the project. Then:
+
```
-pip install -e .[test]
-pytest ./caimira
+pip install -e .[test]
+python -m pytest
```
### Running the profiler
diff --git a/caimira/src/caimira/api/app.py b/caimira/src/caimira/api/app.py
index db26c9d4..84a58532 100644
--- a/caimira/src/caimira/api/app.py
+++ b/caimira/src/caimira/api/app.py
@@ -8,22 +8,25 @@ import tornado.log
from tornado.options import define, options
import logging
-from caimira.api.routes.report_routes import ReportHandler
+from caimira.api.routes.report_routes import VirusReportHandler, CO2ReportHandler
define("port", default=8088, help="Port to listen on", type=int)
logging.basicConfig(format="%(message)s", level=logging.INFO)
+
class Application(tornado.web.Application):
def __init__(self):
handlers = [
- (r"/report", ReportHandler),
+ (r"/co2_report", CO2ReportHandler),
+ (r"/virus_report", VirusReportHandler),
]
settings = dict(
debug=True,
)
super(Application, self).__init__(handlers, **settings)
+
if __name__ == "__main__":
app = Application()
app.listen(options.port)
diff --git a/caimira/src/caimira/api/controller/co2_report_controller.py b/caimira/src/caimira/api/controller/co2_report_controller.py
new file mode 100644
index 00000000..02433318
--- /dev/null
+++ b/caimira/src/caimira/api/controller/co2_report_controller.py
@@ -0,0 +1,26 @@
+from caimira.calculator.validators.co2.co2_validator import CO2FormData
+from caimira.calculator.store.data_registry import DataRegistry
+
+
+def generate_form_obj(form_data, data_registry):
+ return CO2FormData.from_dict(form_data=form_data, data_registry=data_registry)
+
+
+def generate_model(form_obj, data_registry):
+ sample_size = data_registry.monte_carlo['sample_size']
+ return form_obj.build_model(sample_size=sample_size)
+
+
+def generate_report(model):
+ return dict(model.CO2_fit_params())
+
+
+def submit_CO2_form(form_data):
+ data_registry = DataRegistry()
+
+ form_obj = generate_form_obj(
+ form_data=form_data, data_registry=data_registry)
+ model = generate_model(form_obj=form_obj, data_registry=data_registry)
+ report_data = generate_report(model=model)
+
+ return report_data
diff --git a/caimira/src/caimira/api/controller/report_controller.py b/caimira/src/caimira/api/controller/report_controller.py
deleted file mode 100644
index be151555..00000000
--- a/caimira/src/caimira/api/controller/report_controller.py
+++ /dev/null
@@ -1,30 +0,0 @@
-import concurrent.futures
-import functools
-
-from caimira.calculator.validators.virus.virus_validator import VirusFormData
-from caimira.calculator.store.data_registry import DataRegistry
-import caimira.calculator.report.report_generator as rg
-
-
-def generate_form_obj(form_data, data_registry):
- return VirusFormData.from_dict(form_data, data_registry)
-
-
-def generate_model(form_obj):
- return form_obj.build_model(250_000)
-
-
-def generate_report_results(form_obj, model):
- return rg.calculate_report_data(form=form_obj, model=model, executor_factory=functools.partial(
- concurrent.futures.ThreadPoolExecutor, None, # TODO define report_parallelism
- ),)
-
-
-def submit_virus_form(form_data):
- data_registry = DataRegistry
-
- form_obj = generate_form_obj(form_data, data_registry)
- model = generate_model(form_obj)
- report_data = generate_report_results(form_obj, model=model)
-
- return report_data
diff --git a/caimira/src/caimira/api/controller/virus_report_controller.py b/caimira/src/caimira/api/controller/virus_report_controller.py
new file mode 100644
index 00000000..41bf4d72
--- /dev/null
+++ b/caimira/src/caimira/api/controller/virus_report_controller.py
@@ -0,0 +1,38 @@
+import concurrent.futures
+import functools
+
+from caimira.calculator.validators.virus.virus_validator import VirusFormData
+from caimira.calculator.store.data_registry import DataRegistry
+import caimira.calculator.report.virus_report_data as rg
+
+
+def generate_form_obj(form_data, data_registry):
+ return VirusFormData.from_dict(
+ form_data=form_data,
+ data_registry=data_registry,
+ )
+
+
+def generate_model(form_obj, data_registry):
+ sample_size = data_registry.monte_carlo['sample_size']
+ return form_obj.build_model(sample_size=sample_size)
+
+
+def generate_report_results(form_obj, model):
+ return rg.calculate_report_data(
+ form=form_obj,
+ model=model,
+ executor_factory=functools.partial(
+ concurrent.futures.ThreadPoolExecutor, None, # TODO define report_parallelism
+ ),
+ )
+
+
+def submit_virus_form(form_data):
+ data_registry = DataRegistry
+
+ form_obj = generate_form_obj(form_data=form_data, data_registry=data_registry)
+ model = generate_model(form_obj=form_obj, data_registry=data_registry)
+ report_data = generate_report_results(form_obj=form_obj, model=model)
+
+ return report_data
diff --git a/caimira/src/caimira/api/routes/report_routes.py b/caimira/src/caimira/api/routes/report_routes.py
index 7b6a51cb..f448df86 100644
--- a/caimira/src/caimira/api/routes/report_routes.py
+++ b/caimira/src/caimira/api/routes/report_routes.py
@@ -1,15 +1,24 @@
import json
import traceback
import tornado.web
+import sys
-from caimira.api.controller.report_controller import submit_virus_form
+from caimira.api.controller.virus_report_controller import submit_virus_form
+from caimira.api.controller.co2_report_controller import submit_CO2_form
-class ReportHandler(tornado.web.RequestHandler):
+
+class BaseReportHandler(tornado.web.RedirectHandler):
def set_default_headers(self):
self.set_header("Access-Control-Allow-Origin", "*")
self.set_header("Access-Control-Allow-Headers", "x-requested-with")
self.set_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
+ def write_error(self, status_code, **kwargs):
+ self.set_status(status_code)
+ self.write({"message": kwargs.get('exc_info')[1].__str__()})
+
+
+class VirusReportHandler(BaseReportHandler):
def post(self):
try:
form_data = json.loads(self.request.body)
@@ -24,5 +33,27 @@ class ReportHandler(tornado.web.RequestHandler):
self.write(response_data)
except Exception as e:
traceback.print_exc()
- self.set_status(400)
- self.write({"message": str(e)})
+ self.write_error(status_code=400, exc_info=sys.exc_info())
+
+
+class CO2ReportHandler(tornado.web.RequestHandler):
+ def set_default_headers(self):
+ self.set_header("Access-Control-Allow-Origin", "*")
+ self.set_header("Access-Control-Allow-Headers", "x-requested-with")
+ self.set_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
+
+ def post(self):
+ try:
+ form_data = json.loads(self.request.body)
+ report_data = submit_CO2_form(form_data)
+
+ response_data = {
+ "status": "success",
+ "message": "Results generated successfully",
+ "report_data": report_data,
+ }
+
+ self.write(response_data)
+ except Exception as e:
+ traceback.print_exc()
+ self.write_error(status_code=400, exc_info=sys.exc_info())
diff --git a/caimira/src/caimira/calculator/report/co2_report_data.py b/caimira/src/caimira/calculator/report/co2_report_data.py
new file mode 100644
index 00000000..793c064f
--- /dev/null
+++ b/caimira/src/caimira/calculator/report/co2_report_data.py
@@ -0,0 +1,60 @@
+from caimira.calculator.validators.co2.co2_validator import CO2FormData
+from caimira.calculator.models.models import CO2DataModel
+
+
+def build_initial_plot(
+ form: CO2FormData,
+) -> dict:
+ '''
+ Initial plot with the suggested ventilation state changes.
+ This method receives the form input and returns the CO2
+ plot with the respective transition times.
+ '''
+ CO2model: CO2DataModel = form.build_model()
+
+ occupancy_transition_times = list(CO2model.occupancy.transition_times)
+
+ ventilation_transition_times: list = form.find_change_points()
+ # The entire ventilation changes consider the initial and final occupancy state change
+ all_vent_transition_times: list = sorted(
+ [occupancy_transition_times[0]] +
+ [occupancy_transition_times[-1]] +
+ ventilation_transition_times)
+
+ ventilation_plot: str = form.generate_ventilation_plot(
+ ventilation_transition_times=all_vent_transition_times,
+ occupancy_transition_times=occupancy_transition_times
+ )
+
+ context = {
+ 'CO2_plot': ventilation_plot,
+ 'transition_times': [round(el, 2) for el in all_vent_transition_times],
+ }
+
+ return context
+
+
+def build_fitting_results(
+ form: CO2FormData,
+) -> dict:
+ '''
+ Final fitting results with the respective predictive CO2.
+ This method receives the form input and returns the fitting results
+ along with the CO2 plot with the predictive CO2.
+ '''
+ CO2model: CO2DataModel = form.build_model()
+
+ # Ventilation times after user manipulation from the suggested ventilation state change times.
+ ventilation_transition_times = list(CO2model.ventilation_transition_times)
+
+ # The result of the following method is a dict with the results of the fitting
+ # algorithm, namely the breathing rate and ACH values. It also returns the
+ # predictive CO2 result based on the fitting results.
+ context = dict(CO2model.CO2_fit_params())
+
+ # Add the transition times and CO2 plot to the results.
+ context['transition_times'] = ventilation_transition_times
+ context['CO2_plot'] = form.generate_ventilation_plot(ventilation_transition_times=ventilation_transition_times[:-1],
+ predictive_CO2=context['predictive_CO2'])
+
+ return context
diff --git a/caimira/src/caimira/calculator/report/co2_report_generator.py b/caimira/src/caimira/calculator/report/co2_report_generator.py
deleted file mode 100644
index 024f6b13..00000000
--- a/caimira/src/caimira/calculator/report/co2_report_generator.py
+++ /dev/null
@@ -1 +0,0 @@
-# Move here the backend logic.
\ No newline at end of file
diff --git a/caimira/src/caimira/calculator/report/report_generator.py b/caimira/src/caimira/calculator/report/report_generator.py
deleted file mode 100644
index df4ef695..00000000
--- a/caimira/src/caimira/calculator/report/report_generator.py
+++ /dev/null
@@ -1,309 +0,0 @@
-import concurrent.futures
-import base64
-import dataclasses
-import io
-import typing
-import numpy as np
-import matplotlib.pyplot as plt
-
-from caimira.calculator.models import models
-# from caimira.apps.calculator import markdown_tools
-# from caimira.profiler import profile
-from caimira.calculator.validators.virus.virus_validator import VirusFormData
-from caimira.calculator.models import dataclass_utils
-from caimira.calculator.models.enums import ViralLoads
-
-def model_start_end(model: models.ExposureModel):
- t_start = min(model.exposed.presence_interval().boundaries()[0][0],
- model.concentration_model.infected.presence_interval().boundaries()[0][0])
- t_end = max(model.exposed.presence_interval().boundaries()[-1][1],
- model.concentration_model.infected.presence_interval().boundaries()[-1][1])
- return t_start, t_end
-
-
-def fill_big_gaps(array, gap_size):
- """
- Insert values into the given sorted list if there is a gap of more than ``gap_size``.
- All values in the given array are preserved, even if they are within the ``gap_size`` of one another.
-
- >>> fill_big_gaps([1, 2, 4], gap_size=0.75)
- [1, 1.75, 2, 2.75, 3.5, 4]
-
- """
- result = []
- if len(array) == 0:
- raise ValueError("Input array must be len > 0")
-
- last_value = array[0]
- for value in array:
- while value - last_value > gap_size + 1e-15:
- last_value = last_value + gap_size
- result.append(last_value)
- result.append(value)
- last_value = value
- return result
-
-
-def non_temp_transition_times(model: models.ExposureModel):
- """
- Return the non-temperature (and PiecewiseConstant) based transition times.
-
- """
- def walk_model(model, name=""):
- # Extend walk_dataclass to handle lists of dataclasses
- # (e.g. in MultipleVentilation).
- for name, obj in dataclass_utils.walk_dataclass(model, name=name):
- if name.endswith('.ventilations') and isinstance(obj, (list, tuple)):
- for i, item in enumerate(obj):
- fq_name_i = f'{name}[{i}]'
- yield fq_name_i, item
- if dataclasses.is_dataclass(item):
- yield from dataclass_utils.walk_dataclass(item, name=fq_name_i)
- else:
- yield name, obj
-
- t_start, t_end = model_start_end(model)
-
- change_times = {t_start, t_end}
- for name, obj in walk_model(model, name="exposure"):
- if isinstance(obj, models.Interval):
- change_times |= obj.transition_times()
-
- # Only choose times that are in the range of the model (removes things
- # such as PeriodicIntervals, which extend beyond the model itself).
- return sorted(time for time in change_times if (t_start <= time <= t_end))
-
-def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional[int] = None) -> typing.List[float]:
- """
- Pick approximately ``approx_n_pts`` time points which are interesting for the
- given model. If not provided by argument, ``approx_n_pts`` is set to be 15 times
- the number of hours of the simulation.
-
- Initially the times are seeded by important state change times (excluding
- outside temperature), and the times are then subsequently expanded to ensure
- that the step size is at most ``(t_end - t_start) / approx_n_pts``.
-
- """
- times = non_temp_transition_times(model)
- sim_duration = max(times) - min(times)
- if not approx_n_pts: approx_n_pts = sim_duration * 15
-
- # Expand the times list to ensure that we have a maximum gap size between
- # the key times.
- nice_times = fill_big_gaps(times, gap_size=(sim_duration) / approx_n_pts)
- return nice_times
-
-
-
-def concentrations_with_sr_breathing(form: VirusFormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]:
- lower_concentrations = []
- for time in times:
- for index, (start, stop) in enumerate(short_range_intervals):
- # For visualization issues, add short-range breathing activity to the initial long-range concentrations
- if start <= time <= stop and form.short_range_interactions[index]['expiration'] == 'Breathing':
- lower_concentrations.append(np.array(model.concentration(float(time))).mean())
- break
- lower_concentrations.append(np.array(model.concentration_model.concentration(float(time))).mean())
- return lower_concentrations
-
-
-def _calculate_deposited_exposure(model, time1, time2, fn_name=None):
- return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(),fn_name
-
-
-def _calculate_long_range_deposited_exposure(model, time1, time2, fn_name=None):
- return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name
-
-
-def _calculate_co2_concentration(CO2_model, time, fn_name=None):
- return np.array(CO2_model.concentration(float(time))).mean(), fn_name
-
-
-# @profile
-def calculate_report_data(form: VirusFormData, model: models.ExposureModel, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]:
- times = interesting_times(model)
- short_range_intervals = [interaction.presence.boundaries()[0] for interaction in model.short_range]
- short_range_expirations = [interaction['expiration'] for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else []
-
- concentrations = [
- np.array(model.concentration(float(time))).mean()
- for time in times
- ]
- lower_concentrations = concentrations_with_sr_breathing(form, model, times, short_range_intervals)
-
- CO2_model: models.CO2ConcentrationModel = form.build_CO2_model()
-
- # compute deposited exposures and CO2 concentrations in parallel to increase performance
- deposited_exposures = []
- long_range_deposited_exposures = []
- CO2_concentrations = []
-
- tasks = []
- with executor_factory() as executor:
- for time1, time2 in zip(times[:-1], times[1:]):
- tasks.append(executor.submit(_calculate_deposited_exposure, model, time1, time2, fn_name="de"))
- tasks.append(executor.submit(_calculate_long_range_deposited_exposure, model, time1, time2, fn_name="lr"))
- # co2 concentration: takes each time as param, not the interval
- tasks.append(executor.submit(_calculate_co2_concentration, CO2_model, time1, fn_name="co2"))
- # co2 concentration: calculate the last time too
- tasks.append(executor.submit(_calculate_co2_concentration, CO2_model, times[-1], fn_name="co2"))
-
- for task in tasks:
- result, fn_name = task.result()
- if fn_name == "de":
- deposited_exposures.append(result)
- elif fn_name == "lr":
- long_range_deposited_exposures.append(result)
- elif fn_name == "co2":
- CO2_concentrations.append(result)
-
- cumulative_doses = np.cumsum(deposited_exposures)
- long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures)
-
- prob = np.array(model.infection_probability())
- prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True)
- prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean()
- expected_new_cases = np.array(model.expected_new_cases()).mean()
- exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()]
-
- conditional_probability_data = None
- uncertainties_plot_src = None
- if (form.conditional_probability_viral_loads and
- model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] == ViralLoads.COVID_OVERALL.value): # type: ignore
- # Generate all the required data for the conditional probability plot
- conditional_probability_data = manufacture_conditional_probability_data(model, prob)
- # Generate the matplotlib image based on the received data
- uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(prob, conditional_probability_data)))
-
- return {
- "model_repr": repr(model),
- "times": list(times),
- "exposed_presence_intervals": exposed_presence_intervals,
- "short_range_intervals": short_range_intervals,
- "short_range_expirations": short_range_expirations,
- "concentrations": concentrations,
- "concentrations_zoomed": lower_concentrations,
- "cumulative_doses": list(cumulative_doses),
- "long_range_cumulative_doses": list(long_range_cumulative_doses),
- "prob_inf": prob.mean(),
- "prob_inf_sd": prob.std(),
- "prob_dist": list(prob),
- "prob_hist_count": list(prob_dist_count),
- "prob_hist_bins": list(prob_dist_bins),
- "prob_probabilistic_exposure": prob_probabilistic_exposure,
- "expected_new_cases": expected_new_cases,
- "CO2_concentrations": CO2_concentrations,
- "conditional_probability_data": conditional_probability_data,
- "uncertainties_plot_src": uncertainties_plot_src,
- }
-
-
-def conditional_prob_inf_given_vl_dist(
- infection_probability: models._VectorisedFloat,
- viral_loads: np.ndarray,
- specific_vl: float,
- step: models._VectorisedFloat
- ):
-
- pi_means = []
- lower_percentiles = []
- upper_percentiles = []
-
- for vl_log in viral_loads:
- # Probability of infection corresponding to a certain viral load value in the distribution
- specific_prob = infection_probability[np.where((vl_log-step/2-specific_vl)*(vl_log+step/2-specific_vl)<0)[0]] #type: ignore
-
- pi_means.append(specific_prob.mean())
- lower_percentiles.append(np.quantile(specific_prob, 0.05))
- upper_percentiles.append(np.quantile(specific_prob, 0.95))
-
- return pi_means, lower_percentiles, upper_percentiles
-
-
-def manufacture_conditional_probability_data(
- exposure_model: models.ExposureModel,
- infection_probability: models._VectorisedFloat
-):
- min_vl = 2
- max_vl = 10
- step = (max_vl - min_vl)/100
- viral_loads = np.arange(min_vl, max_vl, step)
- specific_vl = np.log10(exposure_model.concentration_model.virus.viral_load_in_sputum)
- pi_means, lower_percentiles, upper_percentiles = conditional_prob_inf_given_vl_dist(infection_probability, viral_loads,
- specific_vl, step)
- log10_vl_in_sputum = np.log10(exposure_model.concentration_model.infected.virus.viral_load_in_sputum)
-
- return {
- 'viral_loads': list(viral_loads),
- 'pi_means': list(pi_means),
- 'lower_percentiles': list(lower_percentiles),
- 'upper_percentiles': list(upper_percentiles),
- 'log10_vl_in_sputum': list(log10_vl_in_sputum),
- }
-
-
-def uncertainties_plot(infection_probability: models._VectorisedFloat,
- conditional_probability_data: dict):
-
- viral_loads: list = conditional_probability_data['viral_loads']
- pi_means: list = conditional_probability_data['pi_means']
- lower_percentiles: list = conditional_probability_data['lower_percentiles']
- upper_percentiles: list = conditional_probability_data['upper_percentiles']
- log10_vl_in_sputum: list = conditional_probability_data['log10_vl_in_sputum']
-
- fig, ((axs00, axs01, axs02), (axs10, axs11, axs12)) = plt.subplots(nrows=2, ncols=3, # type: ignore
- gridspec_kw={'width_ratios': [5, 0.5] + [1],
- 'height_ratios': [3, 1], 'wspace': 0},
- sharey='row',
- sharex='col')
-
- axs01.axis('off')
- axs11.axis('off')
- axs12.axis('off')
-
- axs01.set_visible(False)
-
- axs00.plot(viral_loads, np.array(pi_means), label='Predictive total probability')
- axs00.fill_between(viral_loads, np.array(lower_percentiles), np.array(upper_percentiles), alpha=0.1, label='5ᵗʰ and 95ᵗʰ percentile')
-
- axs02.hist(infection_probability, bins=30, orientation='horizontal')
- axs02.set_xticks([])
- axs02.set_xticklabels([])
- axs02.set_facecolor("lightgrey")
-
- highest_bar = axs02.get_xlim()[1]
- axs02.set_xlim(0, highest_bar)
-
- axs02.text(highest_bar * 0.5, 50,
- "$P(I)=$\n" + rf"$\bf{np.round(np.mean(infection_probability), 1)}$%", ha='center', va='center')
- axs10.hist(log10_vl_in_sputum,
- bins=150, range=(2, 10), color='grey')
- axs10.set_facecolor("lightgrey")
- axs10.set_yticks([])
- axs10.set_yticklabels([])
- axs10.set_xticks([i for i in range(2, 13, 2)])
- axs10.set_xticklabels(['$10^{' + str(i) + '}$' for i in range(2, 13, 2)])
- axs10.set_xlim(2, 10)
- axs10.set_xlabel('Viral load\n(RNA copies)', fontsize=12)
- axs00.set_ylabel('Conditional Probability\nof Infection', fontsize=12)
-
- axs00.text(9.5, -0.01, '$(i)$')
- axs10.text(9.5, axs10.get_ylim()[1] * 0.8, '$(ii)$')
- axs02.set_title('$(iii)$', fontsize=10)
-
- axs00.legend()
- return fig
-
-
-def _figure2bytes(figure):
- # Draw the image
- img_data = io.BytesIO()
- figure.savefig(img_data, format='png', bbox_inches="tight", transparent=True, dpi=110)
- return img_data
-
-
-def img2base64(img_data) -> str:
- img_data.seek(0)
- pic_hash = base64.b64encode(img_data.read()).decode('ascii')
- # A src suitable for a tag such as f'
.
- return f'data:image/png;base64,{pic_hash}'
diff --git a/caimira/src/caimira/calculator/report/virus_report_data.py b/caimira/src/caimira/calculator/report/virus_report_data.py
new file mode 100644
index 00000000..8b6e3eca
--- /dev/null
+++ b/caimira/src/caimira/calculator/report/virus_report_data.py
@@ -0,0 +1,487 @@
+import concurrent.futures
+import base64
+import dataclasses
+import io
+import typing
+import numpy as np
+import matplotlib.pyplot as plt
+import urllib
+import zlib
+
+from caimira.calculator.models import models, dataclass_utils, profiler, monte_carlo as mc
+from caimira.calculator.models.enums import ViralLoads
+from caimira.calculator.validators.virus.virus_validator import VirusFormData
+
+
+def model_start_end(model: models.ExposureModel):
+ t_start = min(model.exposed.presence_interval().boundaries()[0][0],
+ model.concentration_model.infected.presence_interval().boundaries()[0][0])
+ t_end = max(model.exposed.presence_interval().boundaries()[-1][1],
+ model.concentration_model.infected.presence_interval().boundaries()[-1][1])
+ return t_start, t_end
+
+
+def fill_big_gaps(array, gap_size):
+ """
+ Insert values into the given sorted list if there is a gap of more than ``gap_size``.
+ All values in the given array are preserved, even if they are within the ``gap_size`` of one another.
+
+ >>> fill_big_gaps([1, 2, 4], gap_size=0.75)
+ [1, 1.75, 2, 2.75, 3.5, 4]
+
+ """
+ result = []
+ if len(array) == 0:
+ raise ValueError("Input array must be len > 0")
+
+ last_value = array[0]
+ for value in array:
+ while value - last_value > gap_size + 1e-15:
+ last_value = last_value + gap_size
+ result.append(last_value)
+ result.append(value)
+ last_value = value
+ return result
+
+
+def non_temp_transition_times(model: models.ExposureModel):
+ """
+ Return the non-temperature (and PiecewiseConstant) based transition times.
+
+ """
+ def walk_model(model, name=""):
+ # Extend walk_dataclass to handle lists of dataclasses
+ # (e.g. in MultipleVentilation).
+ for name, obj in dataclass_utils.walk_dataclass(model, name=name):
+ if name.endswith('.ventilations') and isinstance(obj, (list, tuple)):
+ for i, item in enumerate(obj):
+ fq_name_i = f'{name}[{i}]'
+ yield fq_name_i, item
+ if dataclasses.is_dataclass(item):
+ yield from dataclass_utils.walk_dataclass(item, name=fq_name_i)
+ else:
+ yield name, obj
+
+ t_start, t_end = model_start_end(model)
+
+ change_times = {t_start, t_end}
+ for name, obj in walk_model(model, name="exposure"):
+ if isinstance(obj, models.Interval):
+ change_times |= obj.transition_times()
+
+ # Only choose times that are in the range of the model (removes things
+ # such as PeriodicIntervals, which extend beyond the model itself).
+ return sorted(time for time in change_times if (t_start <= time <= t_end))
+
+
+def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional[int] = None) -> typing.List[float]:
+ """
+ Pick approximately ``approx_n_pts`` time points which are interesting for the
+ given model. If not provided by argument, ``approx_n_pts`` is set to be 15 times
+ the number of hours of the simulation.
+
+ Initially the times are seeded by important state change times (excluding
+ outside temperature), and the times are then subsequently expanded to ensure
+ that the step size is at most ``(t_end - t_start) / approx_n_pts``.
+
+ """
+ times = non_temp_transition_times(model)
+ sim_duration = max(times) - min(times)
+ if not approx_n_pts:
+ approx_n_pts = sim_duration * 15
+
+ # Expand the times list to ensure that we have a maximum gap size between
+ # the key times.
+ nice_times = fill_big_gaps(times, gap_size=(sim_duration) / approx_n_pts)
+ return nice_times
+
+
+def concentrations_with_sr_breathing(form: VirusFormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]:
+ lower_concentrations = []
+ for time in times:
+ for index, (start, stop) in enumerate(short_range_intervals):
+ # For visualization issues, add short-range breathing activity to the initial long-range concentrations
+ if start <= time <= stop and form.short_range_interactions[index]['expiration'] == 'Breathing':
+ lower_concentrations.append(
+ np.array(model.concentration(float(time))).mean())
+ break
+ lower_concentrations.append(
+ np.array(model.concentration_model.concentration(float(time))).mean())
+ return lower_concentrations
+
+
+def _calculate_deposited_exposure(model, time1, time2, fn_name=None):
+ return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name
+
+
+def _calculate_long_range_deposited_exposure(model, time1, time2, fn_name=None):
+ return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name
+
+
+def _calculate_co2_concentration(CO2_model, time, fn_name=None):
+ return np.array(CO2_model.concentration(float(time))).mean(), fn_name
+
+
+@profiler.profile
+def calculate_report_data(form: VirusFormData, model: models.ExposureModel, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]:
+ times = interesting_times(model)
+ short_range_intervals = [interaction.presence.boundaries()[0]
+ for interaction in model.short_range]
+ short_range_expirations = [interaction['expiration']
+ for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else []
+
+ concentrations = [
+ np.array(model.concentration(float(time))).mean()
+ for time in times
+ ]
+ lower_concentrations = concentrations_with_sr_breathing(
+ form, model, times, short_range_intervals)
+
+ CO2_model: models.CO2ConcentrationModel = form.build_CO2_model()
+
+ # compute deposited exposures and CO2 concentrations in parallel to increase performance
+ deposited_exposures = []
+ long_range_deposited_exposures = []
+ CO2_concentrations = []
+
+ tasks = []
+ with executor_factory() as executor:
+ for time1, time2 in zip(times[:-1], times[1:]):
+ tasks.append(executor.submit(
+ _calculate_deposited_exposure, model, time1, time2, fn_name="de"))
+ tasks.append(executor.submit(
+ _calculate_long_range_deposited_exposure, model, time1, time2, fn_name="lr"))
+ # co2 concentration: takes each time as param, not the interval
+ tasks.append(executor.submit(
+ _calculate_co2_concentration, CO2_model, time1, fn_name="co2"))
+ # co2 concentration: calculate the last time too
+ tasks.append(executor.submit(_calculate_co2_concentration,
+ CO2_model, times[-1], fn_name="co2"))
+
+ for task in tasks:
+ result, fn_name = task.result()
+ if fn_name == "de":
+ deposited_exposures.append(result)
+ elif fn_name == "lr":
+ long_range_deposited_exposures.append(result)
+ elif fn_name == "co2":
+ CO2_concentrations.append(result)
+
+ cumulative_doses = np.cumsum(deposited_exposures)
+ long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures)
+
+ prob = np.array(model.infection_probability())
+ prob_dist_count, prob_dist_bins = np.histogram(
+ prob/100, bins=100, density=True)
+ prob_probabilistic_exposure = np.array(
+ model.total_probability_rule()).mean()
+ expected_new_cases = np.array(model.expected_new_cases()).mean()
+ exposed_presence_intervals = [
+ list(interval) for interval in model.exposed.presence_interval().boundaries()]
+
+ conditional_probability_data = None
+ uncertainties_plot_src = None
+ if (form.conditional_probability_viral_loads and
+ model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] == ViralLoads.COVID_OVERALL.value): # type: ignore
+ # Generate all the required data for the conditional probability plot
+ conditional_probability_data = manufacture_conditional_probability_data(
+ model, prob)
+ # Generate the matplotlib image based on the received data
+ uncertainties_plot_src = img2base64(_figure2bytes(
+ uncertainties_plot(prob, conditional_probability_data)))
+
+ return {
+ "model_repr": repr(model),
+ "times": list(times),
+ "exposed_presence_intervals": exposed_presence_intervals,
+ "short_range_intervals": short_range_intervals,
+ "short_range_expirations": short_range_expirations,
+ "concentrations": concentrations,
+ "concentrations_zoomed": lower_concentrations,
+ "cumulative_doses": list(cumulative_doses),
+ "long_range_cumulative_doses": list(long_range_cumulative_doses),
+ "prob_inf": prob.mean(),
+ "prob_inf_sd": prob.std(),
+ "prob_dist": list(prob),
+ "prob_hist_count": list(prob_dist_count),
+ "prob_hist_bins": list(prob_dist_bins),
+ "prob_probabilistic_exposure": prob_probabilistic_exposure,
+ "expected_new_cases": expected_new_cases,
+ "CO2_concentrations": CO2_concentrations,
+ "conditional_probability_data": conditional_probability_data,
+ "uncertainties_plot_src": uncertainties_plot_src,
+ }
+
+
+def conditional_prob_inf_given_vl_dist(
+ infection_probability: models._VectorisedFloat,
+ viral_loads: np.ndarray,
+ specific_vl: float,
+ step: models._VectorisedFloat
+):
+
+ pi_means = []
+ lower_percentiles = []
+ upper_percentiles = []
+
+ for vl_log in viral_loads:
+ # Probability of infection corresponding to a certain viral load value in the distribution
+ specific_prob = infection_probability[np.where(
+ (vl_log-step/2-specific_vl)*(vl_log+step/2-specific_vl) < 0)[0]] # type: ignore
+
+ pi_means.append(specific_prob.mean())
+ lower_percentiles.append(np.quantile(specific_prob, 0.05))
+ upper_percentiles.append(np.quantile(specific_prob, 0.95))
+
+ return pi_means, lower_percentiles, upper_percentiles
+
+
+def manufacture_conditional_probability_data(
+ exposure_model: models.ExposureModel,
+ infection_probability: models._VectorisedFloat
+):
+ min_vl = 2
+ max_vl = 10
+ step = (max_vl - min_vl)/100
+ viral_loads = np.arange(min_vl, max_vl, step)
+ specific_vl = np.log10(
+ exposure_model.concentration_model.virus.viral_load_in_sputum)
+ pi_means, lower_percentiles, upper_percentiles = conditional_prob_inf_given_vl_dist(infection_probability, viral_loads,
+ specific_vl, step)
+ log10_vl_in_sputum = np.log10(
+ exposure_model.concentration_model.infected.virus.viral_load_in_sputum)
+
+ return {
+ 'viral_loads': list(viral_loads),
+ 'pi_means': list(pi_means),
+ 'lower_percentiles': list(lower_percentiles),
+ 'upper_percentiles': list(upper_percentiles),
+ 'log10_vl_in_sputum': list(log10_vl_in_sputum),
+ }
+
+
+def uncertainties_plot(infection_probability: models._VectorisedFloat,
+ conditional_probability_data: dict):
+
+ viral_loads: list = conditional_probability_data['viral_loads']
+ pi_means: list = conditional_probability_data['pi_means']
+ lower_percentiles: list = conditional_probability_data['lower_percentiles']
+ upper_percentiles: list = conditional_probability_data['upper_percentiles']
+ log10_vl_in_sputum: list = conditional_probability_data['log10_vl_in_sputum']
+
+ fig, ((axs00, axs01, axs02), (axs10, axs11, axs12)) = plt.subplots(nrows=2, ncols=3, # type: ignore
+ gridspec_kw={'width_ratios': [5, 0.5] + [1],
+ 'height_ratios': [3, 1], 'wspace': 0},
+ sharey='row',
+ sharex='col')
+
+ axs01.axis('off')
+ axs11.axis('off')
+ axs12.axis('off')
+
+ axs01.set_visible(False)
+
+ axs00.plot(viral_loads, np.array(pi_means),
+ label='Predictive total probability')
+ axs00.fill_between(viral_loads, np.array(lower_percentiles), np.array(
+ upper_percentiles), alpha=0.1, label='5ᵗʰ and 95ᵗʰ percentile')
+
+ axs02.hist(infection_probability, bins=30, orientation='horizontal')
+ axs02.set_xticks([])
+ axs02.set_xticklabels([])
+ axs02.set_facecolor("lightgrey")
+
+ highest_bar = axs02.get_xlim()[1]
+ axs02.set_xlim(0, highest_bar)
+
+ axs02.text(highest_bar * 0.5, 50,
+ "$P(I)=$\n" + rf"$\bf{np.round(np.mean(infection_probability), 1)}$%", ha='center', va='center')
+ axs10.hist(log10_vl_in_sputum,
+ bins=150, range=(2, 10), color='grey')
+ axs10.set_facecolor("lightgrey")
+ axs10.set_yticks([])
+ axs10.set_yticklabels([])
+ axs10.set_xticks([i for i in range(2, 13, 2)])
+ axs10.set_xticklabels(['$10^{' + str(i) + '}$' for i in range(2, 13, 2)])
+ axs10.set_xlim(2, 10)
+ axs10.set_xlabel('Viral load\n(RNA copies)', fontsize=12)
+ axs00.set_ylabel('Conditional Probability\nof Infection', fontsize=12)
+
+ axs00.text(9.5, -0.01, '$(i)$')
+ axs10.text(9.5, axs10.get_ylim()[1] * 0.8, '$(ii)$')
+ axs02.set_title('$(iii)$', fontsize=10)
+
+ axs00.legend()
+ return fig
+
+
+def _figure2bytes(figure):
+ # Draw the image
+ img_data = io.BytesIO()
+ figure.savefig(img_data, format='png', bbox_inches="tight",
+ transparent=True, dpi=110)
+ return img_data
+
+
+def img2base64(img_data) -> str:
+ img_data.seek(0)
+ pic_hash = base64.b64encode(img_data.read()).decode('ascii')
+ # A src suitable for a tag such as f'
.
+ return f'data:image/png;base64,{pic_hash}'
+
+
+def generate_permalink(base_url, get_root_url, get_root_calculator_url, form: VirusFormData):
+ form_dict = VirusFormData.to_dict(form, strip_defaults=True)
+
+ # Generate the calculator URL arguments that would be needed to re-create this
+ # form.
+ args = urllib.parse.urlencode(form_dict)
+
+ # Then zlib compress + base64 encode the string. To be inverted by the
+ # /_c/ endpoint.
+ compressed_args = base64.b64encode(zlib.compress(args.encode())).decode()
+ qr_url = f"{base_url}{get_root_url()}/_c/{compressed_args}"
+ url = f"{base_url}{get_root_calculator_url()}?{args}"
+
+ return {
+ 'link': url,
+ 'shortened': qr_url,
+ }
+
+
+def manufacture_viral_load_scenarios_percentiles(model: mc.ExposureModel) -> typing.Dict[str, mc.ExposureModel]:
+ viral_load = model.concentration_model.infected.virus.viral_load_in_sputum
+ scenarios = {}
+ for percentil in (0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99):
+ vl = np.quantile(viral_load, percentil)
+ specific_vl_scenario = dataclass_utils.nested_replace(model,
+ {'concentration_model.infected.virus.viral_load_in_sputum': vl}
+ )
+ scenarios[str(vl)] = np.mean(
+ specific_vl_scenario.infection_probability())
+ return scenarios
+
+
+def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]:
+ scenarios = {}
+ if (form.short_range_option == "short_range_no"):
+ # Two special option cases - HEPA and/or FFP2 masks.
+ FFP2_being_worn = bool(form.mask_wearing_option ==
+ 'mask_on' and form.mask_type == 'FFP2')
+ if FFP2_being_worn and form.hepa_option:
+ FFP2andHEPAalternative = dataclass_utils.replace(
+ form, mask_type='Type I')
+ if not (form.hepa_option and form.mask_wearing_option == 'mask_on' and form.mask_type == 'Type I'):
+ scenarios['Base scenario with HEPA filter and Type I masks'] = FFP2andHEPAalternative.build_mc_model()
+ if not FFP2_being_worn and form.hepa_option:
+ noHEPAalternative = dataclass_utils.replace(form, mask_type='FFP2')
+ noHEPAalternative = dataclass_utils.replace(
+ noHEPAalternative, mask_wearing_option='mask_on')
+ noHEPAalternative = dataclass_utils.replace(
+ noHEPAalternative, hepa_option=False)
+ if not (not form.hepa_option and FFP2_being_worn):
+ scenarios['Base scenario without HEPA filter, with FFP2 masks'] = noHEPAalternative.build_mc_model()
+
+ # The remaining scenarios are based on Type I masks (possibly not worn)
+ # and no HEPA filtration.
+ form = dataclass_utils.replace(form, mask_type='Type I')
+ if form.hepa_option:
+ form = dataclass_utils.replace(form, hepa_option=False)
+
+ with_mask = dataclass_utils.replace(
+ form, mask_wearing_option='mask_on')
+ without_mask = dataclass_utils.replace(
+ form, mask_wearing_option='mask_off')
+
+ if form.ventilation_type == 'mechanical_ventilation':
+ # scenarios['Mechanical ventilation with Type I masks'] = with_mask.build_mc_model()
+ if not (form.mask_wearing_option == 'mask_off'):
+ scenarios['Mechanical ventilation without masks'] = without_mask.build_mc_model(
+ )
+
+ elif form.ventilation_type == 'natural_ventilation':
+ # scenarios['Windows open with Type I masks'] = with_mask.build_mc_model()
+ if not (form.mask_wearing_option == 'mask_off'):
+ scenarios['Windows open without masks'] = without_mask.build_mc_model()
+
+ # No matter the ventilation scheme, we include scenarios which don't have any ventilation.
+ with_mask_no_vent = dataclass_utils.replace(
+ with_mask, ventilation_type='no_ventilation')
+ without_mask_or_vent = dataclass_utils.replace(
+ without_mask, ventilation_type='no_ventilation')
+
+ if not (form.mask_wearing_option == 'mask_on' and form.mask_type == 'Type I' and form.ventilation_type == 'no_ventilation'):
+ scenarios['No ventilation with Type I masks'] = with_mask_no_vent.build_mc_model()
+ if not (form.mask_wearing_option == 'mask_off' and form.ventilation_type == 'no_ventilation'):
+ scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model()
+
+ else:
+ no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[],
+ total_people=form.total_people - form.short_range_occupants)
+ scenarios['Base scenario without short-range interactions'] = no_short_range_alternative.build_mc_model()
+
+ return scenarios
+
+
+def scenario_statistics(
+ mc_model: mc.ExposureModel,
+ sample_times: typing.List[float],
+ compute_prob_exposure: bool
+):
+ model = mc_model.build_model(
+ size=mc_model.data_registry.monte_carlo['sample_size'])
+ if (compute_prob_exposure):
+ # It means we have data to calculate the total_probability_rule
+ prob_probabilistic_exposure = model.total_probability_rule()
+ else:
+ prob_probabilistic_exposure = 0.
+
+ return {
+ 'probability_of_infection': np.mean(model.infection_probability()),
+ 'expected_new_cases': np.mean(model.expected_new_cases()),
+ 'concentrations': [
+ np.mean(model.concentration(time))
+ for time in sample_times
+ ],
+ 'prob_probabilistic_exposure': prob_probabilistic_exposure,
+ }
+
+
+def comparison_report(
+ form: VirusFormData,
+ report_data: typing.Dict[str, typing.Any],
+ scenarios: typing.Dict[str, mc.ExposureModel],
+ sample_times: typing.List[float],
+ executor_factory: typing.Callable[[], concurrent.futures.Executor],
+):
+ if (form.short_range_option == "short_range_no"):
+ statistics = {
+ 'Current scenario': {
+ 'probability_of_infection': report_data['prob_inf'],
+ 'expected_new_cases': report_data['expected_new_cases'],
+ 'concentrations': report_data['concentrations'],
+ }
+ }
+ else:
+ statistics = {}
+
+ if (form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure"):
+ compute_prob_exposure = True
+ else:
+ compute_prob_exposure = False
+
+ with executor_factory() as executor:
+ results = executor.map(
+ scenario_statistics,
+ scenarios.values(),
+ [sample_times] * len(scenarios),
+ [compute_prob_exposure] * len(scenarios),
+ timeout=60,
+ )
+
+ for (name, model), model_stats in zip(scenarios.items(), results):
+ statistics[name] = model_stats
+
+ return {
+ 'stats': statistics,
+ }
diff --git a/caimira/src/caimira/calculator/validators/co2/co2_validator.py b/caimira/src/caimira/calculator/validators/co2/co2_validator.py
index aa43edfe..ba865a7c 100644
--- a/caimira/src/caimira/calculator/validators/co2/co2_validator.py
+++ b/caimira/src/caimira/calculator/validators/co2/co2_validator.py
@@ -11,7 +11,7 @@ from ..form_validator import FormData, cast_class_fields
from ..defaults import NO_DEFAULT
from ...store.data_registry import DataRegistry
from ...models import models
-from ...report.report_generator import img2base64, _figure2bytes
+from ...report.virus_report_data import img2base64, _figure2bytes
minutes_since_midnight = typing.NewType('minutes_since_midnight', int)
diff --git a/caimira/tests/test_conditional_probability.py b/caimira/tests/test_conditional_probability.py
index 9daaa26c..45001418 100644
--- a/caimira/tests/test_conditional_probability.py
+++ b/caimira/tests/test_conditional_probability.py
@@ -5,7 +5,7 @@ from retry import retry
import caimira.calculator.models.monte_carlo as mc
from caimira.calculator.models import models
from caimira.calculator.models.dataclass_utils import nested_replace
-from caimira.calculator.report import report_generator
+from caimira.calculator.report import virus_report_data
from caimira.calculator.models.monte_carlo.data import activity_distributions, virus_distributions, expiration_distributions
@@ -72,7 +72,7 @@ def test_conditional_prob_inf_given_vl_dist(data_registry, baseline_exposure_mod
specific_vl = np.log10(mc_model.concentration_model.infected.virus.viral_load_in_sputum)
step = 8/100
actual_pi_means, actual_lower_percentiles, actual_upper_percentiles = (
- report_generator.conditional_prob_inf_given_vl_dist(infection_probability, viral_loads, specific_vl, step)
+ virus_report_data.conditional_prob_inf_given_vl_dist(infection_probability, viral_loads, specific_vl, step)
)
assert np.allclose(actual_pi_means, expected_pi_means, atol=0.002)
diff --git a/cern_caimira/src/cern_caimira/apps/calculator/__init__.py b/cern_caimira/src/cern_caimira/apps/calculator/__init__.py
index 4d906ff4..167bacd8 100644
--- a/cern_caimira/src/cern_caimira/apps/calculator/__init__.py
+++ b/cern_caimira/src/cern_caimira/apps/calculator/__init__.py
@@ -29,13 +29,12 @@ from caimira.calculator.models.profiler import CaimiraProfiler, Profilers
from caimira.calculator.store.data_registry import DataRegistry
from caimira.calculator.store.data_service import DataService
-from caimira.api.controller.report_controller import generate_form_obj, generate_model, generate_report_results
-from caimira.calculator.report.report_generator import calculate_report_data
+from caimira.api.controller import virus_report_controller, co2_report_controller
+from caimira.calculator.report.virus_report_data import calculate_report_data
from caimira.calculator.validators.virus import virus_validator
-from caimira.calculator.validators.co2 import co2_validator
from . import markdown_tools
-from ..calculator.report.virus_report import ReportGenerator
+from .report.virus_report import VirusReportGenerator
from ..calculator.report.co2_report import CO2ReportGenerator
from .user import AuthenticatedUser, AnonymousUser
@@ -181,7 +180,7 @@ class ConcentrationModel(BaseRequestHandler):
LOG.debug(pformat(requested_model_config))
try:
- form = generate_form_obj(requested_model_config, data_registry)
+ form = virus_report_controller.generate_form_obj(requested_model_config, data_registry)
except Exception as err:
LOG.exception(err)
response_json = {'code': 400, 'error': f'Your request was invalid {html.escape(str(err))}'}
@@ -190,7 +189,7 @@ class ConcentrationModel(BaseRequestHandler):
return
base_url = self.request.protocol + "://" + self.request.host
- report_generator: ReportGenerator = self.settings['report_generator']
+ report_generator: VirusReportGenerator = self.settings['report_generator']
executor = loky.get_reusable_executor(
max_workers=self.settings['handler_worker_pool_size'],
timeout=300,
@@ -235,7 +234,7 @@ class ConcentrationModelJsonResponse(BaseRequestHandler):
LOG.debug(pformat(requested_model_config))
try:
- form = generate_form_obj(requested_model_config, data_registry)
+ form = virus_report_controller.generate_form_obj(requested_model_config, data_registry)
except Exception as err:
LOG.exception(err)
response_json = {'code': 400, 'error': f'Your request was invalid {html.escape(str(err))}'}
@@ -247,7 +246,7 @@ class ConcentrationModelJsonResponse(BaseRequestHandler):
max_workers=self.settings['handler_worker_pool_size'],
timeout=300,
)
- model = generate_model(form)
+ model = virus_report_controller.generate_model(form, data_registry)
report_data_task = executor.submit(calculate_report_data, form, model,
executor_factory=functools.partial(
concurrent.futures.ThreadPoolExecutor,
@@ -266,10 +265,10 @@ class StaticModel(BaseRequestHandler):
if data_service:
data_service.update_registry(data_registry)
- form = generate_form_obj(virus_validator.baseline_raw_form_data(), data_registry)
+ form = virus_report_controller.generate_form_obj(virus_validator.baseline_raw_form_data(), data_registry)
base_url = self.request.protocol + "://" + self.request.host
- report_generator: ReportGenerator = self.settings['report_generator']
+ report_generator: VirusReportGenerator = self.settings['report_generator']
executor = loky.get_reusable_executor(max_workers=self.settings['handler_worker_pool_size'])
report_task = executor.submit(
@@ -409,10 +408,7 @@ class CO2ModelResponse(BaseRequestHandler):
requested_model_config = tornado.escape.json_decode(self.request.body)
try:
- form: co2_validator.CO2FormData = co2_validator.CO2FormData.from_dict(
- requested_model_config,
- data_registry
- )
+ form = co2_report_controller.generate_form_obj(requested_model_config, data_registry)
except Exception as err:
if self.settings.get("debug", False):
import traceback
@@ -544,7 +540,7 @@ def make_app(
data_service=data_service,
template_environment=template_environment,
default_handler_class=Missing404Handler,
- report_generator=ReportGenerator(loader, get_root_url, get_root_calculator_url),
+ report_generator=VirusReportGenerator(loader, get_root_url, get_root_calculator_url),
xsrf_cookies=True,
# COOKIE_SECRET being undefined will result in no login information being
# presented to the user.
diff --git a/cern_caimira/src/cern_caimira/apps/calculator/report/co2_report.py b/cern_caimira/src/cern_caimira/apps/calculator/report/co2_report.py
index ece70b4b..befa988d 100644
--- a/cern_caimira/src/cern_caimira/apps/calculator/report/co2_report.py
+++ b/cern_caimira/src/cern_caimira/apps/calculator/report/co2_report.py
@@ -1,67 +1,14 @@
import dataclasses
+import caimira.calculator.report.co2_report_data as co2_rep_data
from caimira.calculator.validators.co2.co2_validator import CO2FormData
-from caimira.calculator.models.models import CO2DataModel
-
@dataclasses.dataclass
class CO2ReportGenerator:
- def build_initial_plot(
- self,
- form: CO2FormData,
- ) -> dict:
- '''
- Initial plot with the suggested ventilation state changes.
- This method receives the form input and returns the CO2
- plot with the respective transition times.
- '''
- CO2model: CO2DataModel = form.build_model()
-
- occupancy_transition_times = list(CO2model.occupancy.transition_times)
-
- ventilation_transition_times: list = form.find_change_points()
- # The entire ventilation changes consider the initial and final occupancy state change
- all_vent_transition_times: list = sorted(
- [occupancy_transition_times[0]] +
- [occupancy_transition_times[-1]] +
- ventilation_transition_times)
-
- ventilation_plot: str = form.generate_ventilation_plot(
- ventilation_transition_times=all_vent_transition_times,
- occupancy_transition_times=occupancy_transition_times
- )
-
- context = {
- 'CO2_plot': ventilation_plot,
- 'transition_times': [round(el, 2) for el in all_vent_transition_times],
- }
-
- return context
+ def build_initial_plot(self, form: CO2FormData):
+ return co2_rep_data.build_initial_plot(form=form)
- def build_fitting_results(
- self,
- form: CO2FormData,
- ) -> dict:
- '''
- Final fitting results with the respective predictive CO2.
- This method receives the form input and returns the fitting results
- along with the CO2 plot with the predictive CO2.
- '''
- CO2model: CO2DataModel = form.build_model()
-
- # Ventilation times after user manipulation from the suggested ventilation state change times.
- ventilation_transition_times = list(CO2model.ventilation_transition_times)
-
- # The result of the following method is a dict with the results of the fitting
- # algorithm, namely the breathing rate and ACH values. It also returns the
- # predictive CO2 result based on the fitting results.
- context = dict(CO2model.CO2_fit_params())
-
- # Add the transition times and CO2 plot to the results.
- context['transition_times'] = ventilation_transition_times
- context['CO2_plot'] = form.generate_ventilation_plot(ventilation_transition_times=ventilation_transition_times[:-1],
- predictive_CO2=context['predictive_CO2'])
-
- return context
+ def build_fitting_results(self, form: CO2FormData):
+ return co2_rep_data.build_fitting_results(form=form)
\ No newline at end of file
diff --git a/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py b/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py
index 4d4d4c8e..7b292ef9 100644
--- a/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py
+++ b/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py
@@ -6,254 +6,12 @@ import json
import typing
import jinja2
import numpy as np
-import urllib
-import base64
-import zlib
from .. import markdown_tools
-from caimira.calculator.models import dataclass_utils, models, monte_carlo as mc
+from caimira.calculator.models import models
from caimira.calculator.validators.virus.virus_validator import VirusFormData
-from caimira.calculator.report.report_generator import calculate_report_data
-
-
-def generate_permalink(base_url, get_root_url, get_root_calculator_url, form: VirusFormData):
- form_dict = VirusFormData.to_dict(form, strip_defaults=True)
-
- # Generate the calculator URL arguments that would be needed to re-create this
- # form.
- args = urllib.parse.urlencode(form_dict)
-
- # Then zlib compress + base64 encode the string. To be inverted by the
- # /_c/ endpoint.
- compressed_args = base64.b64encode(zlib.compress(args.encode())).decode()
- qr_url = f"{base_url}{get_root_url()}/_c/{compressed_args}"
- url = f"{base_url}{get_root_calculator_url()}?{args}"
-
- return {
- 'link': url,
- 'shortened': qr_url,
- }
-
-
-def model_start_end(model: models.ExposureModel):
- t_start = min(model.exposed.presence_interval().boundaries()[0][0],
- model.concentration_model.infected.presence_interval().boundaries()[0][0])
- t_end = max(model.exposed.presence_interval().boundaries()[-1][1],
- model.concentration_model.infected.presence_interval().boundaries()[-1][1])
- return t_start, t_end
-
-
-def fill_big_gaps(array, gap_size):
- """
- Insert values into the given sorted list if there is a gap of more than ``gap_size``.
- All values in the given array are preserved, even if they are within the ``gap_size`` of one another.
-
- >>> fill_big_gaps([1, 2, 4], gap_size=0.75)
- [1, 1.75, 2, 2.75, 3.5, 4]
-
- """
- result = []
- if len(array) == 0:
- raise ValueError("Input array must be len > 0")
-
- last_value = array[0]
- for value in array:
- while value - last_value > gap_size + 1e-15:
- last_value = last_value + gap_size
- result.append(last_value)
- result.append(value)
- last_value = value
- return result
-
-
-def non_temp_transition_times(model: models.ExposureModel):
- """
- Return the non-temperature (and PiecewiseConstant) based transition times.
-
- """
- def walk_model(model, name=""):
- # Extend walk_dataclass to handle lists of dataclasses
- # (e.g. in MultipleVentilation).
- for name, obj in dataclass_utils.walk_dataclass(model, name=name):
- if name.endswith('.ventilations') and isinstance(obj, (list, tuple)):
- for i, item in enumerate(obj):
- fq_name_i = f'{name}[{i}]'
- yield fq_name_i, item
- if dataclasses.is_dataclass(item):
- yield from dataclass_utils.walk_dataclass(item, name=fq_name_i)
- else:
- yield name, obj
-
- t_start, t_end = model_start_end(model)
-
- change_times = {t_start, t_end}
- for name, obj in walk_model(model, name="exposure"):
- if isinstance(obj, models.Interval):
- change_times |= obj.transition_times()
-
- # Only choose times that are in the range of the model (removes things
- # such as PeriodicIntervals, which extend beyond the model itself).
- return sorted(time for time in change_times if (t_start <= time <= t_end))
-
-
-def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional[int] = None) -> typing.List[float]:
- """
- Pick approximately ``approx_n_pts`` time points which are interesting for the
- given model. If not provided by argument, ``approx_n_pts`` is set to be 15 times
- the number of hours of the simulation.
-
- Initially the times are seeded by important state change times (excluding
- outside temperature), and the times are then subsequently expanded to ensure
- that the step size is at most ``(t_end - t_start) / approx_n_pts``.
-
- """
- times = non_temp_transition_times(model)
- sim_duration = max(times) - min(times)
- if not approx_n_pts:
- approx_n_pts = sim_duration * 15
-
- # Expand the times list to ensure that we have a maximum gap size between
- # the key times.
- nice_times = fill_big_gaps(times, gap_size=(sim_duration) / approx_n_pts)
- return nice_times
-
-
-def manufacture_viral_load_scenarios_percentiles(model: mc.ExposureModel) -> typing.Dict[str, mc.ExposureModel]:
- viral_load = model.concentration_model.infected.virus.viral_load_in_sputum
- scenarios = {}
- for percentil in (0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99):
- vl = np.quantile(viral_load, percentil)
- specific_vl_scenario = dataclass_utils.nested_replace(model,
- {'concentration_model.infected.virus.viral_load_in_sputum': vl}
- )
- scenarios[str(vl)] = np.mean(
- specific_vl_scenario.infection_probability())
- return scenarios
-
-def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]:
- scenarios = {}
- if (form.short_range_option == "short_range_no"):
- # Two special option cases - HEPA and/or FFP2 masks.
- FFP2_being_worn = bool(form.mask_wearing_option ==
- 'mask_on' and form.mask_type == 'FFP2')
- if FFP2_being_worn and form.hepa_option:
- FFP2andHEPAalternative = dataclass_utils.replace(
- form, mask_type='Type I')
- if not (form.hepa_option and form.mask_wearing_option == 'mask_on' and form.mask_type == 'Type I'):
- scenarios['Base scenario with HEPA filter and Type I masks'] = FFP2andHEPAalternative.build_mc_model()
- if not FFP2_being_worn and form.hepa_option:
- noHEPAalternative = dataclass_utils.replace(form, mask_type='FFP2')
- noHEPAalternative = dataclass_utils.replace(
- noHEPAalternative, mask_wearing_option='mask_on')
- noHEPAalternative = dataclass_utils.replace(
- noHEPAalternative, hepa_option=False)
- if not (not form.hepa_option and FFP2_being_worn):
- scenarios['Base scenario without HEPA filter, with FFP2 masks'] = noHEPAalternative.build_mc_model()
-
- # The remaining scenarios are based on Type I masks (possibly not worn)
- # and no HEPA filtration.
- form = dataclass_utils.replace(form, mask_type='Type I')
- if form.hepa_option:
- form = dataclass_utils.replace(form, hepa_option=False)
-
- with_mask = dataclass_utils.replace(
- form, mask_wearing_option='mask_on')
- without_mask = dataclass_utils.replace(
- form, mask_wearing_option='mask_off')
-
- if form.ventilation_type == 'mechanical_ventilation':
- # scenarios['Mechanical ventilation with Type I masks'] = with_mask.build_mc_model()
- if not (form.mask_wearing_option == 'mask_off'):
- scenarios['Mechanical ventilation without masks'] = without_mask.build_mc_model(
- )
-
- elif form.ventilation_type == 'natural_ventilation':
- # scenarios['Windows open with Type I masks'] = with_mask.build_mc_model()
- if not (form.mask_wearing_option == 'mask_off'):
- scenarios['Windows open without masks'] = without_mask.build_mc_model()
-
- # No matter the ventilation scheme, we include scenarios which don't have any ventilation.
- with_mask_no_vent = dataclass_utils.replace(
- with_mask, ventilation_type='no_ventilation')
- without_mask_or_vent = dataclass_utils.replace(
- without_mask, ventilation_type='no_ventilation')
-
- if not (form.mask_wearing_option == 'mask_on' and form.mask_type == 'Type I' and form.ventilation_type == 'no_ventilation'):
- scenarios['No ventilation with Type I masks'] = with_mask_no_vent.build_mc_model()
- if not (form.mask_wearing_option == 'mask_off' and form.ventilation_type == 'no_ventilation'):
- scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model()
-
- else:
- no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[],
- total_people=form.total_people - form.short_range_occupants)
- scenarios['Base scenario without short-range interactions'] = no_short_range_alternative.build_mc_model()
-
- return scenarios
-
-
-def scenario_statistics(
- mc_model: mc.ExposureModel,
- sample_times: typing.List[float],
- compute_prob_exposure: bool
-):
- model = mc_model.build_model(
- size=mc_model.data_registry.monte_carlo['sample_size'])
- if (compute_prob_exposure):
- # It means we have data to calculate the total_probability_rule
- prob_probabilistic_exposure = model.total_probability_rule()
- else:
- prob_probabilistic_exposure = 0.
-
- return {
- 'probability_of_infection': np.mean(model.infection_probability()),
- 'expected_new_cases': np.mean(model.expected_new_cases()),
- 'concentrations': [
- np.mean(model.concentration(time))
- for time in sample_times
- ],
- 'prob_probabilistic_exposure': prob_probabilistic_exposure,
- }
-
-
-def comparison_report(
- form: VirusFormData,
- report_data: typing.Dict[str, typing.Any],
- scenarios: typing.Dict[str, mc.ExposureModel],
- sample_times: typing.List[float],
- executor_factory: typing.Callable[[], concurrent.futures.Executor],
-):
- if (form.short_range_option == "short_range_no"):
- statistics = {
- 'Current scenario': {
- 'probability_of_infection': report_data['prob_inf'],
- 'expected_new_cases': report_data['expected_new_cases'],
- 'concentrations': report_data['concentrations'],
- }
- }
- else:
- statistics = {}
-
- if (form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure"):
- compute_prob_exposure = True
- else:
- compute_prob_exposure = False
-
- with executor_factory() as executor:
- results = executor.map(
- scenario_statistics,
- scenarios.values(),
- [sample_times] * len(scenarios),
- [compute_prob_exposure] * len(scenarios),
- timeout=60,
- )
-
- for (name, model), model_stats in zip(scenarios.items(), results):
- statistics[name] = model_stats
-
- return {
- 'stats': statistics,
- }
+from caimira.calculator.report.virus_report_data import calculate_report_data, interesting_times, manufacture_alternative_scenarios, manufacture_viral_load_scenarios_percentiles, comparison_report, generate_permalink
def minutes_to_time(minutes: int) -> str:
@@ -305,7 +63,7 @@ def non_zero_percentage(percentage: int) -> str:
@dataclasses.dataclass
-class ReportGenerator:
+class VirusReportGenerator:
jinja_loader: jinja2.BaseLoader
get_root_url: typing.Any
get_root_calculator_url: typing.Any
@@ -317,7 +75,8 @@ class ReportGenerator:
executor_factory: typing.Callable[[], concurrent.futures.Executor],
) -> str:
model = form.build_model()
- context = self.prepare_context(base_url, model, form, executor_factory=executor_factory)
+ context = self.prepare_context(
+ base_url, model, form, executor_factory=executor_factory)
return self.render(context)
def prepare_context(
@@ -339,15 +98,18 @@ class ReportGenerator:
}
scenario_sample_times = interesting_times(model)
- report_data = calculate_report_data(form, model, executor_factory=executor_factory)
+ report_data = calculate_report_data(
+ form, model, executor_factory=executor_factory)
context.update(report_data)
alternative_scenarios = manufacture_alternative_scenarios(form)
- context['alternative_viral_load'] = manufacture_viral_load_scenarios_percentiles(model) if form.conditional_probability_viral_loads else None
+ context['alternative_viral_load'] = manufacture_viral_load_scenarios_percentiles(
+ model) if form.conditional_probability_viral_loads else None
context['alternative_scenarios'] = comparison_report(
form, report_data, alternative_scenarios, scenario_sample_times, executor_factory=executor_factory,
)
- context['permalink'] = generate_permalink(base_url, self.get_root_url, self.get_root_calculator_url, form)
+ context['permalink'] = generate_permalink(
+ base_url, self.get_root_url, self.get_root_calculator_url, form)
context['get_url'] = self.get_root_url
context['get_calculator_url'] = self.get_root_calculator_url
@@ -374,4 +136,3 @@ class ReportGenerator:
def render(self, context: dict) -> str:
template = self._template_environment().get_template("calculator.report.html.j2")
return template.render(**context, text_blocks=template.globals["common_text"])
-
\ No newline at end of file
diff --git a/cern_caimira/tests/test_report_generator.py b/cern_caimira/tests/test_report_generator.py
index e6803def..372cad77 100644
--- a/cern_caimira/tests/test_report_generator.py
+++ b/cern_caimira/tests/test_report_generator.py
@@ -7,10 +7,9 @@ import numpy as np
import pytest
from cern_caimira.apps.calculator import make_app
-from cern_caimira.apps.calculator import ReportGenerator
-from cern_caimira.apps.calculator.report.virus_report import readable_minutes, manufacture_alternative_scenarios, comparison_report
-import caimira.calculator.report.report_generator as rep_gen
-from caimira.api.controller.report_controller import generate_model, generate_report_results
+from cern_caimira.apps.calculator import VirusReportGenerator
+from cern_caimira.apps.calculator.report.virus_report import readable_minutes
+import caimira.calculator.report.virus_report_data as rep_gen
from caimira.calculator.validators.virus.virus_validator import VirusFormData
@@ -24,7 +23,7 @@ def test_generate_report(baseline_form) -> None:
start = time.perf_counter()
- generator: ReportGenerator = make_app().settings['report_generator']
+ generator: VirusReportGenerator = make_app().settings['report_generator']
report = generator.build_report("", baseline_form, partial(
concurrent.futures.ThreadPoolExecutor, 1,
@@ -119,8 +118,8 @@ def test_expected_new_cases(baseline_form_with_sr: VirusFormData):
# Long-range contributions alone
scenario_sample_times = rep_gen.interesting_times(model)
- alternative_scenarios = manufacture_alternative_scenarios(baseline_form_with_sr)
- alternative_statistics = comparison_report(
+ alternative_scenarios = rep_gen.manufacture_alternative_scenarios(baseline_form_with_sr)
+ alternative_statistics = rep_gen.comparison_report(
baseline_form_with_sr, report_data, alternative_scenarios, scenario_sample_times, executor_factory=executor_factory,
)