From ebde0e1ae45ebbb8b6dc6d5c2a455288250a112a Mon Sep 17 00:00:00 2001 From: lrdossan Date: Tue, 3 Sep 2024 16:53:13 +0200 Subject: [PATCH] extracted CO2 logic to a dedicated route --- README.md | 8 +- caimira/src/caimira/api/app.py | 7 +- .../api/controller/co2_report_controller.py | 26 + .../api/controller/report_controller.py | 30 -- .../api/controller/virus_report_controller.py | 38 ++ .../src/caimira/api/routes/report_routes.py | 39 +- .../calculator/report/co2_report_data.py | 60 +++ .../calculator/report/co2_report_generator.py | 1 - .../calculator/report/report_generator.py | 309 ----------- .../calculator/report/virus_report_data.py | 487 ++++++++++++++++++ .../validators/co2/co2_validator.py | 2 +- caimira/tests/test_conditional_probability.py | 4 +- .../cern_caimira/apps/calculator/__init__.py | 26 +- .../apps/calculator/report/co2_report.py | 63 +-- .../apps/calculator/report/virus_report.py | 261 +--------- cern_caimira/tests/test_report_generator.py | 13 +- 16 files changed, 692 insertions(+), 682 deletions(-) create mode 100644 caimira/src/caimira/api/controller/co2_report_controller.py delete mode 100644 caimira/src/caimira/api/controller/report_controller.py create mode 100644 caimira/src/caimira/api/controller/virus_report_controller.py create mode 100644 caimira/src/caimira/calculator/report/co2_report_data.py delete mode 100644 caimira/src/caimira/calculator/report/co2_report_generator.py delete mode 100644 caimira/src/caimira/calculator/report/report_generator.py create mode 100644 caimira/src/caimira/calculator/report/virus_report_data.py diff --git a/README.md b/README.md index d71dc5a1..572f7221 100644 --- a/README.md +++ b/README.md @@ -109,7 +109,7 @@ python -m cern_caimira.apps.calculator To run with a specific template theme created: ``` -python -m cern_caimira.apps.calculator --theme=ui/apps/templates/{theme} +python -m cern_caimira.apps.calculator --theme=cern_caimira/src/cern_caimira/apps/templates/{theme} ``` To run the entire app in a different `APPLICATION_ROOT` path: @@ -168,9 +168,11 @@ Running with Visual Studio Code (VSCode): ### Running the tests +Make sure you are in the root directory of the project. Then: + ``` -pip install -e .[test] -pytest ./caimira +pip install -e .[test] +python -m pytest ``` ### Running the profiler diff --git a/caimira/src/caimira/api/app.py b/caimira/src/caimira/api/app.py index db26c9d4..84a58532 100644 --- a/caimira/src/caimira/api/app.py +++ b/caimira/src/caimira/api/app.py @@ -8,22 +8,25 @@ import tornado.log from tornado.options import define, options import logging -from caimira.api.routes.report_routes import ReportHandler +from caimira.api.routes.report_routes import VirusReportHandler, CO2ReportHandler define("port", default=8088, help="Port to listen on", type=int) logging.basicConfig(format="%(message)s", level=logging.INFO) + class Application(tornado.web.Application): def __init__(self): handlers = [ - (r"/report", ReportHandler), + (r"/co2_report", CO2ReportHandler), + (r"/virus_report", VirusReportHandler), ] settings = dict( debug=True, ) super(Application, self).__init__(handlers, **settings) + if __name__ == "__main__": app = Application() app.listen(options.port) diff --git a/caimira/src/caimira/api/controller/co2_report_controller.py b/caimira/src/caimira/api/controller/co2_report_controller.py new file mode 100644 index 00000000..02433318 --- /dev/null +++ b/caimira/src/caimira/api/controller/co2_report_controller.py @@ -0,0 +1,26 @@ +from caimira.calculator.validators.co2.co2_validator import CO2FormData +from caimira.calculator.store.data_registry import DataRegistry + + +def generate_form_obj(form_data, data_registry): + return CO2FormData.from_dict(form_data=form_data, data_registry=data_registry) + + +def generate_model(form_obj, data_registry): + sample_size = data_registry.monte_carlo['sample_size'] + return form_obj.build_model(sample_size=sample_size) + + +def generate_report(model): + return dict(model.CO2_fit_params()) + + +def submit_CO2_form(form_data): + data_registry = DataRegistry() + + form_obj = generate_form_obj( + form_data=form_data, data_registry=data_registry) + model = generate_model(form_obj=form_obj, data_registry=data_registry) + report_data = generate_report(model=model) + + return report_data diff --git a/caimira/src/caimira/api/controller/report_controller.py b/caimira/src/caimira/api/controller/report_controller.py deleted file mode 100644 index be151555..00000000 --- a/caimira/src/caimira/api/controller/report_controller.py +++ /dev/null @@ -1,30 +0,0 @@ -import concurrent.futures -import functools - -from caimira.calculator.validators.virus.virus_validator import VirusFormData -from caimira.calculator.store.data_registry import DataRegistry -import caimira.calculator.report.report_generator as rg - - -def generate_form_obj(form_data, data_registry): - return VirusFormData.from_dict(form_data, data_registry) - - -def generate_model(form_obj): - return form_obj.build_model(250_000) - - -def generate_report_results(form_obj, model): - return rg.calculate_report_data(form=form_obj, model=model, executor_factory=functools.partial( - concurrent.futures.ThreadPoolExecutor, None, # TODO define report_parallelism - ),) - - -def submit_virus_form(form_data): - data_registry = DataRegistry - - form_obj = generate_form_obj(form_data, data_registry) - model = generate_model(form_obj) - report_data = generate_report_results(form_obj, model=model) - - return report_data diff --git a/caimira/src/caimira/api/controller/virus_report_controller.py b/caimira/src/caimira/api/controller/virus_report_controller.py new file mode 100644 index 00000000..41bf4d72 --- /dev/null +++ b/caimira/src/caimira/api/controller/virus_report_controller.py @@ -0,0 +1,38 @@ +import concurrent.futures +import functools + +from caimira.calculator.validators.virus.virus_validator import VirusFormData +from caimira.calculator.store.data_registry import DataRegistry +import caimira.calculator.report.virus_report_data as rg + + +def generate_form_obj(form_data, data_registry): + return VirusFormData.from_dict( + form_data=form_data, + data_registry=data_registry, + ) + + +def generate_model(form_obj, data_registry): + sample_size = data_registry.monte_carlo['sample_size'] + return form_obj.build_model(sample_size=sample_size) + + +def generate_report_results(form_obj, model): + return rg.calculate_report_data( + form=form_obj, + model=model, + executor_factory=functools.partial( + concurrent.futures.ThreadPoolExecutor, None, # TODO define report_parallelism + ), + ) + + +def submit_virus_form(form_data): + data_registry = DataRegistry + + form_obj = generate_form_obj(form_data=form_data, data_registry=data_registry) + model = generate_model(form_obj=form_obj, data_registry=data_registry) + report_data = generate_report_results(form_obj=form_obj, model=model) + + return report_data diff --git a/caimira/src/caimira/api/routes/report_routes.py b/caimira/src/caimira/api/routes/report_routes.py index 7b6a51cb..f448df86 100644 --- a/caimira/src/caimira/api/routes/report_routes.py +++ b/caimira/src/caimira/api/routes/report_routes.py @@ -1,15 +1,24 @@ import json import traceback import tornado.web +import sys -from caimira.api.controller.report_controller import submit_virus_form +from caimira.api.controller.virus_report_controller import submit_virus_form +from caimira.api.controller.co2_report_controller import submit_CO2_form -class ReportHandler(tornado.web.RequestHandler): + +class BaseReportHandler(tornado.web.RedirectHandler): def set_default_headers(self): self.set_header("Access-Control-Allow-Origin", "*") self.set_header("Access-Control-Allow-Headers", "x-requested-with") self.set_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS") + def write_error(self, status_code, **kwargs): + self.set_status(status_code) + self.write({"message": kwargs.get('exc_info')[1].__str__()}) + + +class VirusReportHandler(BaseReportHandler): def post(self): try: form_data = json.loads(self.request.body) @@ -24,5 +33,27 @@ class ReportHandler(tornado.web.RequestHandler): self.write(response_data) except Exception as e: traceback.print_exc() - self.set_status(400) - self.write({"message": str(e)}) + self.write_error(status_code=400, exc_info=sys.exc_info()) + + +class CO2ReportHandler(tornado.web.RequestHandler): + def set_default_headers(self): + self.set_header("Access-Control-Allow-Origin", "*") + self.set_header("Access-Control-Allow-Headers", "x-requested-with") + self.set_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS") + + def post(self): + try: + form_data = json.loads(self.request.body) + report_data = submit_CO2_form(form_data) + + response_data = { + "status": "success", + "message": "Results generated successfully", + "report_data": report_data, + } + + self.write(response_data) + except Exception as e: + traceback.print_exc() + self.write_error(status_code=400, exc_info=sys.exc_info()) diff --git a/caimira/src/caimira/calculator/report/co2_report_data.py b/caimira/src/caimira/calculator/report/co2_report_data.py new file mode 100644 index 00000000..793c064f --- /dev/null +++ b/caimira/src/caimira/calculator/report/co2_report_data.py @@ -0,0 +1,60 @@ +from caimira.calculator.validators.co2.co2_validator import CO2FormData +from caimira.calculator.models.models import CO2DataModel + + +def build_initial_plot( + form: CO2FormData, +) -> dict: + ''' + Initial plot with the suggested ventilation state changes. + This method receives the form input and returns the CO2 + plot with the respective transition times. + ''' + CO2model: CO2DataModel = form.build_model() + + occupancy_transition_times = list(CO2model.occupancy.transition_times) + + ventilation_transition_times: list = form.find_change_points() + # The entire ventilation changes consider the initial and final occupancy state change + all_vent_transition_times: list = sorted( + [occupancy_transition_times[0]] + + [occupancy_transition_times[-1]] + + ventilation_transition_times) + + ventilation_plot: str = form.generate_ventilation_plot( + ventilation_transition_times=all_vent_transition_times, + occupancy_transition_times=occupancy_transition_times + ) + + context = { + 'CO2_plot': ventilation_plot, + 'transition_times': [round(el, 2) for el in all_vent_transition_times], + } + + return context + + +def build_fitting_results( + form: CO2FormData, +) -> dict: + ''' + Final fitting results with the respective predictive CO2. + This method receives the form input and returns the fitting results + along with the CO2 plot with the predictive CO2. + ''' + CO2model: CO2DataModel = form.build_model() + + # Ventilation times after user manipulation from the suggested ventilation state change times. + ventilation_transition_times = list(CO2model.ventilation_transition_times) + + # The result of the following method is a dict with the results of the fitting + # algorithm, namely the breathing rate and ACH values. It also returns the + # predictive CO2 result based on the fitting results. + context = dict(CO2model.CO2_fit_params()) + + # Add the transition times and CO2 plot to the results. + context['transition_times'] = ventilation_transition_times + context['CO2_plot'] = form.generate_ventilation_plot(ventilation_transition_times=ventilation_transition_times[:-1], + predictive_CO2=context['predictive_CO2']) + + return context diff --git a/caimira/src/caimira/calculator/report/co2_report_generator.py b/caimira/src/caimira/calculator/report/co2_report_generator.py deleted file mode 100644 index 024f6b13..00000000 --- a/caimira/src/caimira/calculator/report/co2_report_generator.py +++ /dev/null @@ -1 +0,0 @@ -# Move here the backend logic. \ No newline at end of file diff --git a/caimira/src/caimira/calculator/report/report_generator.py b/caimira/src/caimira/calculator/report/report_generator.py deleted file mode 100644 index df4ef695..00000000 --- a/caimira/src/caimira/calculator/report/report_generator.py +++ /dev/null @@ -1,309 +0,0 @@ -import concurrent.futures -import base64 -import dataclasses -import io -import typing -import numpy as np -import matplotlib.pyplot as plt - -from caimira.calculator.models import models -# from caimira.apps.calculator import markdown_tools -# from caimira.profiler import profile -from caimira.calculator.validators.virus.virus_validator import VirusFormData -from caimira.calculator.models import dataclass_utils -from caimira.calculator.models.enums import ViralLoads - -def model_start_end(model: models.ExposureModel): - t_start = min(model.exposed.presence_interval().boundaries()[0][0], - model.concentration_model.infected.presence_interval().boundaries()[0][0]) - t_end = max(model.exposed.presence_interval().boundaries()[-1][1], - model.concentration_model.infected.presence_interval().boundaries()[-1][1]) - return t_start, t_end - - -def fill_big_gaps(array, gap_size): - """ - Insert values into the given sorted list if there is a gap of more than ``gap_size``. - All values in the given array are preserved, even if they are within the ``gap_size`` of one another. - - >>> fill_big_gaps([1, 2, 4], gap_size=0.75) - [1, 1.75, 2, 2.75, 3.5, 4] - - """ - result = [] - if len(array) == 0: - raise ValueError("Input array must be len > 0") - - last_value = array[0] - for value in array: - while value - last_value > gap_size + 1e-15: - last_value = last_value + gap_size - result.append(last_value) - result.append(value) - last_value = value - return result - - -def non_temp_transition_times(model: models.ExposureModel): - """ - Return the non-temperature (and PiecewiseConstant) based transition times. - - """ - def walk_model(model, name=""): - # Extend walk_dataclass to handle lists of dataclasses - # (e.g. in MultipleVentilation). - for name, obj in dataclass_utils.walk_dataclass(model, name=name): - if name.endswith('.ventilations') and isinstance(obj, (list, tuple)): - for i, item in enumerate(obj): - fq_name_i = f'{name}[{i}]' - yield fq_name_i, item - if dataclasses.is_dataclass(item): - yield from dataclass_utils.walk_dataclass(item, name=fq_name_i) - else: - yield name, obj - - t_start, t_end = model_start_end(model) - - change_times = {t_start, t_end} - for name, obj in walk_model(model, name="exposure"): - if isinstance(obj, models.Interval): - change_times |= obj.transition_times() - - # Only choose times that are in the range of the model (removes things - # such as PeriodicIntervals, which extend beyond the model itself). - return sorted(time for time in change_times if (t_start <= time <= t_end)) - -def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional[int] = None) -> typing.List[float]: - """ - Pick approximately ``approx_n_pts`` time points which are interesting for the - given model. If not provided by argument, ``approx_n_pts`` is set to be 15 times - the number of hours of the simulation. - - Initially the times are seeded by important state change times (excluding - outside temperature), and the times are then subsequently expanded to ensure - that the step size is at most ``(t_end - t_start) / approx_n_pts``. - - """ - times = non_temp_transition_times(model) - sim_duration = max(times) - min(times) - if not approx_n_pts: approx_n_pts = sim_duration * 15 - - # Expand the times list to ensure that we have a maximum gap size between - # the key times. - nice_times = fill_big_gaps(times, gap_size=(sim_duration) / approx_n_pts) - return nice_times - - - -def concentrations_with_sr_breathing(form: VirusFormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]: - lower_concentrations = [] - for time in times: - for index, (start, stop) in enumerate(short_range_intervals): - # For visualization issues, add short-range breathing activity to the initial long-range concentrations - if start <= time <= stop and form.short_range_interactions[index]['expiration'] == 'Breathing': - lower_concentrations.append(np.array(model.concentration(float(time))).mean()) - break - lower_concentrations.append(np.array(model.concentration_model.concentration(float(time))).mean()) - return lower_concentrations - - -def _calculate_deposited_exposure(model, time1, time2, fn_name=None): - return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(),fn_name - - -def _calculate_long_range_deposited_exposure(model, time1, time2, fn_name=None): - return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name - - -def _calculate_co2_concentration(CO2_model, time, fn_name=None): - return np.array(CO2_model.concentration(float(time))).mean(), fn_name - - -# @profile -def calculate_report_data(form: VirusFormData, model: models.ExposureModel, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]: - times = interesting_times(model) - short_range_intervals = [interaction.presence.boundaries()[0] for interaction in model.short_range] - short_range_expirations = [interaction['expiration'] for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else [] - - concentrations = [ - np.array(model.concentration(float(time))).mean() - for time in times - ] - lower_concentrations = concentrations_with_sr_breathing(form, model, times, short_range_intervals) - - CO2_model: models.CO2ConcentrationModel = form.build_CO2_model() - - # compute deposited exposures and CO2 concentrations in parallel to increase performance - deposited_exposures = [] - long_range_deposited_exposures = [] - CO2_concentrations = [] - - tasks = [] - with executor_factory() as executor: - for time1, time2 in zip(times[:-1], times[1:]): - tasks.append(executor.submit(_calculate_deposited_exposure, model, time1, time2, fn_name="de")) - tasks.append(executor.submit(_calculate_long_range_deposited_exposure, model, time1, time2, fn_name="lr")) - # co2 concentration: takes each time as param, not the interval - tasks.append(executor.submit(_calculate_co2_concentration, CO2_model, time1, fn_name="co2")) - # co2 concentration: calculate the last time too - tasks.append(executor.submit(_calculate_co2_concentration, CO2_model, times[-1], fn_name="co2")) - - for task in tasks: - result, fn_name = task.result() - if fn_name == "de": - deposited_exposures.append(result) - elif fn_name == "lr": - long_range_deposited_exposures.append(result) - elif fn_name == "co2": - CO2_concentrations.append(result) - - cumulative_doses = np.cumsum(deposited_exposures) - long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures) - - prob = np.array(model.infection_probability()) - prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True) - prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean() - expected_new_cases = np.array(model.expected_new_cases()).mean() - exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()] - - conditional_probability_data = None - uncertainties_plot_src = None - if (form.conditional_probability_viral_loads and - model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] == ViralLoads.COVID_OVERALL.value): # type: ignore - # Generate all the required data for the conditional probability plot - conditional_probability_data = manufacture_conditional_probability_data(model, prob) - # Generate the matplotlib image based on the received data - uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(prob, conditional_probability_data))) - - return { - "model_repr": repr(model), - "times": list(times), - "exposed_presence_intervals": exposed_presence_intervals, - "short_range_intervals": short_range_intervals, - "short_range_expirations": short_range_expirations, - "concentrations": concentrations, - "concentrations_zoomed": lower_concentrations, - "cumulative_doses": list(cumulative_doses), - "long_range_cumulative_doses": list(long_range_cumulative_doses), - "prob_inf": prob.mean(), - "prob_inf_sd": prob.std(), - "prob_dist": list(prob), - "prob_hist_count": list(prob_dist_count), - "prob_hist_bins": list(prob_dist_bins), - "prob_probabilistic_exposure": prob_probabilistic_exposure, - "expected_new_cases": expected_new_cases, - "CO2_concentrations": CO2_concentrations, - "conditional_probability_data": conditional_probability_data, - "uncertainties_plot_src": uncertainties_plot_src, - } - - -def conditional_prob_inf_given_vl_dist( - infection_probability: models._VectorisedFloat, - viral_loads: np.ndarray, - specific_vl: float, - step: models._VectorisedFloat - ): - - pi_means = [] - lower_percentiles = [] - upper_percentiles = [] - - for vl_log in viral_loads: - # Probability of infection corresponding to a certain viral load value in the distribution - specific_prob = infection_probability[np.where((vl_log-step/2-specific_vl)*(vl_log+step/2-specific_vl)<0)[0]] #type: ignore - - pi_means.append(specific_prob.mean()) - lower_percentiles.append(np.quantile(specific_prob, 0.05)) - upper_percentiles.append(np.quantile(specific_prob, 0.95)) - - return pi_means, lower_percentiles, upper_percentiles - - -def manufacture_conditional_probability_data( - exposure_model: models.ExposureModel, - infection_probability: models._VectorisedFloat -): - min_vl = 2 - max_vl = 10 - step = (max_vl - min_vl)/100 - viral_loads = np.arange(min_vl, max_vl, step) - specific_vl = np.log10(exposure_model.concentration_model.virus.viral_load_in_sputum) - pi_means, lower_percentiles, upper_percentiles = conditional_prob_inf_given_vl_dist(infection_probability, viral_loads, - specific_vl, step) - log10_vl_in_sputum = np.log10(exposure_model.concentration_model.infected.virus.viral_load_in_sputum) - - return { - 'viral_loads': list(viral_loads), - 'pi_means': list(pi_means), - 'lower_percentiles': list(lower_percentiles), - 'upper_percentiles': list(upper_percentiles), - 'log10_vl_in_sputum': list(log10_vl_in_sputum), - } - - -def uncertainties_plot(infection_probability: models._VectorisedFloat, - conditional_probability_data: dict): - - viral_loads: list = conditional_probability_data['viral_loads'] - pi_means: list = conditional_probability_data['pi_means'] - lower_percentiles: list = conditional_probability_data['lower_percentiles'] - upper_percentiles: list = conditional_probability_data['upper_percentiles'] - log10_vl_in_sputum: list = conditional_probability_data['log10_vl_in_sputum'] - - fig, ((axs00, axs01, axs02), (axs10, axs11, axs12)) = plt.subplots(nrows=2, ncols=3, # type: ignore - gridspec_kw={'width_ratios': [5, 0.5] + [1], - 'height_ratios': [3, 1], 'wspace': 0}, - sharey='row', - sharex='col') - - axs01.axis('off') - axs11.axis('off') - axs12.axis('off') - - axs01.set_visible(False) - - axs00.plot(viral_loads, np.array(pi_means), label='Predictive total probability') - axs00.fill_between(viral_loads, np.array(lower_percentiles), np.array(upper_percentiles), alpha=0.1, label='5ᵗʰ and 95ᵗʰ percentile') - - axs02.hist(infection_probability, bins=30, orientation='horizontal') - axs02.set_xticks([]) - axs02.set_xticklabels([]) - axs02.set_facecolor("lightgrey") - - highest_bar = axs02.get_xlim()[1] - axs02.set_xlim(0, highest_bar) - - axs02.text(highest_bar * 0.5, 50, - "$P(I)=$\n" + rf"$\bf{np.round(np.mean(infection_probability), 1)}$%", ha='center', va='center') - axs10.hist(log10_vl_in_sputum, - bins=150, range=(2, 10), color='grey') - axs10.set_facecolor("lightgrey") - axs10.set_yticks([]) - axs10.set_yticklabels([]) - axs10.set_xticks([i for i in range(2, 13, 2)]) - axs10.set_xticklabels(['$10^{' + str(i) + '}$' for i in range(2, 13, 2)]) - axs10.set_xlim(2, 10) - axs10.set_xlabel('Viral load\n(RNA copies)', fontsize=12) - axs00.set_ylabel('Conditional Probability\nof Infection', fontsize=12) - - axs00.text(9.5, -0.01, '$(i)$') - axs10.text(9.5, axs10.get_ylim()[1] * 0.8, '$(ii)$') - axs02.set_title('$(iii)$', fontsize=10) - - axs00.legend() - return fig - - -def _figure2bytes(figure): - # Draw the image - img_data = io.BytesIO() - figure.savefig(img_data, format='png', bbox_inches="tight", transparent=True, dpi=110) - return img_data - - -def img2base64(img_data) -> str: - img_data.seek(0) - pic_hash = base64.b64encode(img_data.read()).decode('ascii') - # A src suitable for a tag such as f'. - return f'data:image/png;base64,{pic_hash}' diff --git a/caimira/src/caimira/calculator/report/virus_report_data.py b/caimira/src/caimira/calculator/report/virus_report_data.py new file mode 100644 index 00000000..8b6e3eca --- /dev/null +++ b/caimira/src/caimira/calculator/report/virus_report_data.py @@ -0,0 +1,487 @@ +import concurrent.futures +import base64 +import dataclasses +import io +import typing +import numpy as np +import matplotlib.pyplot as plt +import urllib +import zlib + +from caimira.calculator.models import models, dataclass_utils, profiler, monte_carlo as mc +from caimira.calculator.models.enums import ViralLoads +from caimira.calculator.validators.virus.virus_validator import VirusFormData + + +def model_start_end(model: models.ExposureModel): + t_start = min(model.exposed.presence_interval().boundaries()[0][0], + model.concentration_model.infected.presence_interval().boundaries()[0][0]) + t_end = max(model.exposed.presence_interval().boundaries()[-1][1], + model.concentration_model.infected.presence_interval().boundaries()[-1][1]) + return t_start, t_end + + +def fill_big_gaps(array, gap_size): + """ + Insert values into the given sorted list if there is a gap of more than ``gap_size``. + All values in the given array are preserved, even if they are within the ``gap_size`` of one another. + + >>> fill_big_gaps([1, 2, 4], gap_size=0.75) + [1, 1.75, 2, 2.75, 3.5, 4] + + """ + result = [] + if len(array) == 0: + raise ValueError("Input array must be len > 0") + + last_value = array[0] + for value in array: + while value - last_value > gap_size + 1e-15: + last_value = last_value + gap_size + result.append(last_value) + result.append(value) + last_value = value + return result + + +def non_temp_transition_times(model: models.ExposureModel): + """ + Return the non-temperature (and PiecewiseConstant) based transition times. + + """ + def walk_model(model, name=""): + # Extend walk_dataclass to handle lists of dataclasses + # (e.g. in MultipleVentilation). + for name, obj in dataclass_utils.walk_dataclass(model, name=name): + if name.endswith('.ventilations') and isinstance(obj, (list, tuple)): + for i, item in enumerate(obj): + fq_name_i = f'{name}[{i}]' + yield fq_name_i, item + if dataclasses.is_dataclass(item): + yield from dataclass_utils.walk_dataclass(item, name=fq_name_i) + else: + yield name, obj + + t_start, t_end = model_start_end(model) + + change_times = {t_start, t_end} + for name, obj in walk_model(model, name="exposure"): + if isinstance(obj, models.Interval): + change_times |= obj.transition_times() + + # Only choose times that are in the range of the model (removes things + # such as PeriodicIntervals, which extend beyond the model itself). + return sorted(time for time in change_times if (t_start <= time <= t_end)) + + +def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional[int] = None) -> typing.List[float]: + """ + Pick approximately ``approx_n_pts`` time points which are interesting for the + given model. If not provided by argument, ``approx_n_pts`` is set to be 15 times + the number of hours of the simulation. + + Initially the times are seeded by important state change times (excluding + outside temperature), and the times are then subsequently expanded to ensure + that the step size is at most ``(t_end - t_start) / approx_n_pts``. + + """ + times = non_temp_transition_times(model) + sim_duration = max(times) - min(times) + if not approx_n_pts: + approx_n_pts = sim_duration * 15 + + # Expand the times list to ensure that we have a maximum gap size between + # the key times. + nice_times = fill_big_gaps(times, gap_size=(sim_duration) / approx_n_pts) + return nice_times + + +def concentrations_with_sr_breathing(form: VirusFormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]: + lower_concentrations = [] + for time in times: + for index, (start, stop) in enumerate(short_range_intervals): + # For visualization issues, add short-range breathing activity to the initial long-range concentrations + if start <= time <= stop and form.short_range_interactions[index]['expiration'] == 'Breathing': + lower_concentrations.append( + np.array(model.concentration(float(time))).mean()) + break + lower_concentrations.append( + np.array(model.concentration_model.concentration(float(time))).mean()) + return lower_concentrations + + +def _calculate_deposited_exposure(model, time1, time2, fn_name=None): + return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name + + +def _calculate_long_range_deposited_exposure(model, time1, time2, fn_name=None): + return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name + + +def _calculate_co2_concentration(CO2_model, time, fn_name=None): + return np.array(CO2_model.concentration(float(time))).mean(), fn_name + + +@profiler.profile +def calculate_report_data(form: VirusFormData, model: models.ExposureModel, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]: + times = interesting_times(model) + short_range_intervals = [interaction.presence.boundaries()[0] + for interaction in model.short_range] + short_range_expirations = [interaction['expiration'] + for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else [] + + concentrations = [ + np.array(model.concentration(float(time))).mean() + for time in times + ] + lower_concentrations = concentrations_with_sr_breathing( + form, model, times, short_range_intervals) + + CO2_model: models.CO2ConcentrationModel = form.build_CO2_model() + + # compute deposited exposures and CO2 concentrations in parallel to increase performance + deposited_exposures = [] + long_range_deposited_exposures = [] + CO2_concentrations = [] + + tasks = [] + with executor_factory() as executor: + for time1, time2 in zip(times[:-1], times[1:]): + tasks.append(executor.submit( + _calculate_deposited_exposure, model, time1, time2, fn_name="de")) + tasks.append(executor.submit( + _calculate_long_range_deposited_exposure, model, time1, time2, fn_name="lr")) + # co2 concentration: takes each time as param, not the interval + tasks.append(executor.submit( + _calculate_co2_concentration, CO2_model, time1, fn_name="co2")) + # co2 concentration: calculate the last time too + tasks.append(executor.submit(_calculate_co2_concentration, + CO2_model, times[-1], fn_name="co2")) + + for task in tasks: + result, fn_name = task.result() + if fn_name == "de": + deposited_exposures.append(result) + elif fn_name == "lr": + long_range_deposited_exposures.append(result) + elif fn_name == "co2": + CO2_concentrations.append(result) + + cumulative_doses = np.cumsum(deposited_exposures) + long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures) + + prob = np.array(model.infection_probability()) + prob_dist_count, prob_dist_bins = np.histogram( + prob/100, bins=100, density=True) + prob_probabilistic_exposure = np.array( + model.total_probability_rule()).mean() + expected_new_cases = np.array(model.expected_new_cases()).mean() + exposed_presence_intervals = [ + list(interval) for interval in model.exposed.presence_interval().boundaries()] + + conditional_probability_data = None + uncertainties_plot_src = None + if (form.conditional_probability_viral_loads and + model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] == ViralLoads.COVID_OVERALL.value): # type: ignore + # Generate all the required data for the conditional probability plot + conditional_probability_data = manufacture_conditional_probability_data( + model, prob) + # Generate the matplotlib image based on the received data + uncertainties_plot_src = img2base64(_figure2bytes( + uncertainties_plot(prob, conditional_probability_data))) + + return { + "model_repr": repr(model), + "times": list(times), + "exposed_presence_intervals": exposed_presence_intervals, + "short_range_intervals": short_range_intervals, + "short_range_expirations": short_range_expirations, + "concentrations": concentrations, + "concentrations_zoomed": lower_concentrations, + "cumulative_doses": list(cumulative_doses), + "long_range_cumulative_doses": list(long_range_cumulative_doses), + "prob_inf": prob.mean(), + "prob_inf_sd": prob.std(), + "prob_dist": list(prob), + "prob_hist_count": list(prob_dist_count), + "prob_hist_bins": list(prob_dist_bins), + "prob_probabilistic_exposure": prob_probabilistic_exposure, + "expected_new_cases": expected_new_cases, + "CO2_concentrations": CO2_concentrations, + "conditional_probability_data": conditional_probability_data, + "uncertainties_plot_src": uncertainties_plot_src, + } + + +def conditional_prob_inf_given_vl_dist( + infection_probability: models._VectorisedFloat, + viral_loads: np.ndarray, + specific_vl: float, + step: models._VectorisedFloat +): + + pi_means = [] + lower_percentiles = [] + upper_percentiles = [] + + for vl_log in viral_loads: + # Probability of infection corresponding to a certain viral load value in the distribution + specific_prob = infection_probability[np.where( + (vl_log-step/2-specific_vl)*(vl_log+step/2-specific_vl) < 0)[0]] # type: ignore + + pi_means.append(specific_prob.mean()) + lower_percentiles.append(np.quantile(specific_prob, 0.05)) + upper_percentiles.append(np.quantile(specific_prob, 0.95)) + + return pi_means, lower_percentiles, upper_percentiles + + +def manufacture_conditional_probability_data( + exposure_model: models.ExposureModel, + infection_probability: models._VectorisedFloat +): + min_vl = 2 + max_vl = 10 + step = (max_vl - min_vl)/100 + viral_loads = np.arange(min_vl, max_vl, step) + specific_vl = np.log10( + exposure_model.concentration_model.virus.viral_load_in_sputum) + pi_means, lower_percentiles, upper_percentiles = conditional_prob_inf_given_vl_dist(infection_probability, viral_loads, + specific_vl, step) + log10_vl_in_sputum = np.log10( + exposure_model.concentration_model.infected.virus.viral_load_in_sputum) + + return { + 'viral_loads': list(viral_loads), + 'pi_means': list(pi_means), + 'lower_percentiles': list(lower_percentiles), + 'upper_percentiles': list(upper_percentiles), + 'log10_vl_in_sputum': list(log10_vl_in_sputum), + } + + +def uncertainties_plot(infection_probability: models._VectorisedFloat, + conditional_probability_data: dict): + + viral_loads: list = conditional_probability_data['viral_loads'] + pi_means: list = conditional_probability_data['pi_means'] + lower_percentiles: list = conditional_probability_data['lower_percentiles'] + upper_percentiles: list = conditional_probability_data['upper_percentiles'] + log10_vl_in_sputum: list = conditional_probability_data['log10_vl_in_sputum'] + + fig, ((axs00, axs01, axs02), (axs10, axs11, axs12)) = plt.subplots(nrows=2, ncols=3, # type: ignore + gridspec_kw={'width_ratios': [5, 0.5] + [1], + 'height_ratios': [3, 1], 'wspace': 0}, + sharey='row', + sharex='col') + + axs01.axis('off') + axs11.axis('off') + axs12.axis('off') + + axs01.set_visible(False) + + axs00.plot(viral_loads, np.array(pi_means), + label='Predictive total probability') + axs00.fill_between(viral_loads, np.array(lower_percentiles), np.array( + upper_percentiles), alpha=0.1, label='5ᵗʰ and 95ᵗʰ percentile') + + axs02.hist(infection_probability, bins=30, orientation='horizontal') + axs02.set_xticks([]) + axs02.set_xticklabels([]) + axs02.set_facecolor("lightgrey") + + highest_bar = axs02.get_xlim()[1] + axs02.set_xlim(0, highest_bar) + + axs02.text(highest_bar * 0.5, 50, + "$P(I)=$\n" + rf"$\bf{np.round(np.mean(infection_probability), 1)}$%", ha='center', va='center') + axs10.hist(log10_vl_in_sputum, + bins=150, range=(2, 10), color='grey') + axs10.set_facecolor("lightgrey") + axs10.set_yticks([]) + axs10.set_yticklabels([]) + axs10.set_xticks([i for i in range(2, 13, 2)]) + axs10.set_xticklabels(['$10^{' + str(i) + '}$' for i in range(2, 13, 2)]) + axs10.set_xlim(2, 10) + axs10.set_xlabel('Viral load\n(RNA copies)', fontsize=12) + axs00.set_ylabel('Conditional Probability\nof Infection', fontsize=12) + + axs00.text(9.5, -0.01, '$(i)$') + axs10.text(9.5, axs10.get_ylim()[1] * 0.8, '$(ii)$') + axs02.set_title('$(iii)$', fontsize=10) + + axs00.legend() + return fig + + +def _figure2bytes(figure): + # Draw the image + img_data = io.BytesIO() + figure.savefig(img_data, format='png', bbox_inches="tight", + transparent=True, dpi=110) + return img_data + + +def img2base64(img_data) -> str: + img_data.seek(0) + pic_hash = base64.b64encode(img_data.read()).decode('ascii') + # A src suitable for a tag such as f'. + return f'data:image/png;base64,{pic_hash}' + + +def generate_permalink(base_url, get_root_url, get_root_calculator_url, form: VirusFormData): + form_dict = VirusFormData.to_dict(form, strip_defaults=True) + + # Generate the calculator URL arguments that would be needed to re-create this + # form. + args = urllib.parse.urlencode(form_dict) + + # Then zlib compress + base64 encode the string. To be inverted by the + # /_c/ endpoint. + compressed_args = base64.b64encode(zlib.compress(args.encode())).decode() + qr_url = f"{base_url}{get_root_url()}/_c/{compressed_args}" + url = f"{base_url}{get_root_calculator_url()}?{args}" + + return { + 'link': url, + 'shortened': qr_url, + } + + +def manufacture_viral_load_scenarios_percentiles(model: mc.ExposureModel) -> typing.Dict[str, mc.ExposureModel]: + viral_load = model.concentration_model.infected.virus.viral_load_in_sputum + scenarios = {} + for percentil in (0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99): + vl = np.quantile(viral_load, percentil) + specific_vl_scenario = dataclass_utils.nested_replace(model, + {'concentration_model.infected.virus.viral_load_in_sputum': vl} + ) + scenarios[str(vl)] = np.mean( + specific_vl_scenario.infection_probability()) + return scenarios + + +def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]: + scenarios = {} + if (form.short_range_option == "short_range_no"): + # Two special option cases - HEPA and/or FFP2 masks. + FFP2_being_worn = bool(form.mask_wearing_option == + 'mask_on' and form.mask_type == 'FFP2') + if FFP2_being_worn and form.hepa_option: + FFP2andHEPAalternative = dataclass_utils.replace( + form, mask_type='Type I') + if not (form.hepa_option and form.mask_wearing_option == 'mask_on' and form.mask_type == 'Type I'): + scenarios['Base scenario with HEPA filter and Type I masks'] = FFP2andHEPAalternative.build_mc_model() + if not FFP2_being_worn and form.hepa_option: + noHEPAalternative = dataclass_utils.replace(form, mask_type='FFP2') + noHEPAalternative = dataclass_utils.replace( + noHEPAalternative, mask_wearing_option='mask_on') + noHEPAalternative = dataclass_utils.replace( + noHEPAalternative, hepa_option=False) + if not (not form.hepa_option and FFP2_being_worn): + scenarios['Base scenario without HEPA filter, with FFP2 masks'] = noHEPAalternative.build_mc_model() + + # The remaining scenarios are based on Type I masks (possibly not worn) + # and no HEPA filtration. + form = dataclass_utils.replace(form, mask_type='Type I') + if form.hepa_option: + form = dataclass_utils.replace(form, hepa_option=False) + + with_mask = dataclass_utils.replace( + form, mask_wearing_option='mask_on') + without_mask = dataclass_utils.replace( + form, mask_wearing_option='mask_off') + + if form.ventilation_type == 'mechanical_ventilation': + # scenarios['Mechanical ventilation with Type I masks'] = with_mask.build_mc_model() + if not (form.mask_wearing_option == 'mask_off'): + scenarios['Mechanical ventilation without masks'] = without_mask.build_mc_model( + ) + + elif form.ventilation_type == 'natural_ventilation': + # scenarios['Windows open with Type I masks'] = with_mask.build_mc_model() + if not (form.mask_wearing_option == 'mask_off'): + scenarios['Windows open without masks'] = without_mask.build_mc_model() + + # No matter the ventilation scheme, we include scenarios which don't have any ventilation. + with_mask_no_vent = dataclass_utils.replace( + with_mask, ventilation_type='no_ventilation') + without_mask_or_vent = dataclass_utils.replace( + without_mask, ventilation_type='no_ventilation') + + if not (form.mask_wearing_option == 'mask_on' and form.mask_type == 'Type I' and form.ventilation_type == 'no_ventilation'): + scenarios['No ventilation with Type I masks'] = with_mask_no_vent.build_mc_model() + if not (form.mask_wearing_option == 'mask_off' and form.ventilation_type == 'no_ventilation'): + scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model() + + else: + no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[], + total_people=form.total_people - form.short_range_occupants) + scenarios['Base scenario without short-range interactions'] = no_short_range_alternative.build_mc_model() + + return scenarios + + +def scenario_statistics( + mc_model: mc.ExposureModel, + sample_times: typing.List[float], + compute_prob_exposure: bool +): + model = mc_model.build_model( + size=mc_model.data_registry.monte_carlo['sample_size']) + if (compute_prob_exposure): + # It means we have data to calculate the total_probability_rule + prob_probabilistic_exposure = model.total_probability_rule() + else: + prob_probabilistic_exposure = 0. + + return { + 'probability_of_infection': np.mean(model.infection_probability()), + 'expected_new_cases': np.mean(model.expected_new_cases()), + 'concentrations': [ + np.mean(model.concentration(time)) + for time in sample_times + ], + 'prob_probabilistic_exposure': prob_probabilistic_exposure, + } + + +def comparison_report( + form: VirusFormData, + report_data: typing.Dict[str, typing.Any], + scenarios: typing.Dict[str, mc.ExposureModel], + sample_times: typing.List[float], + executor_factory: typing.Callable[[], concurrent.futures.Executor], +): + if (form.short_range_option == "short_range_no"): + statistics = { + 'Current scenario': { + 'probability_of_infection': report_data['prob_inf'], + 'expected_new_cases': report_data['expected_new_cases'], + 'concentrations': report_data['concentrations'], + } + } + else: + statistics = {} + + if (form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure"): + compute_prob_exposure = True + else: + compute_prob_exposure = False + + with executor_factory() as executor: + results = executor.map( + scenario_statistics, + scenarios.values(), + [sample_times] * len(scenarios), + [compute_prob_exposure] * len(scenarios), + timeout=60, + ) + + for (name, model), model_stats in zip(scenarios.items(), results): + statistics[name] = model_stats + + return { + 'stats': statistics, + } diff --git a/caimira/src/caimira/calculator/validators/co2/co2_validator.py b/caimira/src/caimira/calculator/validators/co2/co2_validator.py index aa43edfe..ba865a7c 100644 --- a/caimira/src/caimira/calculator/validators/co2/co2_validator.py +++ b/caimira/src/caimira/calculator/validators/co2/co2_validator.py @@ -11,7 +11,7 @@ from ..form_validator import FormData, cast_class_fields from ..defaults import NO_DEFAULT from ...store.data_registry import DataRegistry from ...models import models -from ...report.report_generator import img2base64, _figure2bytes +from ...report.virus_report_data import img2base64, _figure2bytes minutes_since_midnight = typing.NewType('minutes_since_midnight', int) diff --git a/caimira/tests/test_conditional_probability.py b/caimira/tests/test_conditional_probability.py index 9daaa26c..45001418 100644 --- a/caimira/tests/test_conditional_probability.py +++ b/caimira/tests/test_conditional_probability.py @@ -5,7 +5,7 @@ from retry import retry import caimira.calculator.models.monte_carlo as mc from caimira.calculator.models import models from caimira.calculator.models.dataclass_utils import nested_replace -from caimira.calculator.report import report_generator +from caimira.calculator.report import virus_report_data from caimira.calculator.models.monte_carlo.data import activity_distributions, virus_distributions, expiration_distributions @@ -72,7 +72,7 @@ def test_conditional_prob_inf_given_vl_dist(data_registry, baseline_exposure_mod specific_vl = np.log10(mc_model.concentration_model.infected.virus.viral_load_in_sputum) step = 8/100 actual_pi_means, actual_lower_percentiles, actual_upper_percentiles = ( - report_generator.conditional_prob_inf_given_vl_dist(infection_probability, viral_loads, specific_vl, step) + virus_report_data.conditional_prob_inf_given_vl_dist(infection_probability, viral_loads, specific_vl, step) ) assert np.allclose(actual_pi_means, expected_pi_means, atol=0.002) diff --git a/cern_caimira/src/cern_caimira/apps/calculator/__init__.py b/cern_caimira/src/cern_caimira/apps/calculator/__init__.py index 4d906ff4..167bacd8 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/__init__.py +++ b/cern_caimira/src/cern_caimira/apps/calculator/__init__.py @@ -29,13 +29,12 @@ from caimira.calculator.models.profiler import CaimiraProfiler, Profilers from caimira.calculator.store.data_registry import DataRegistry from caimira.calculator.store.data_service import DataService -from caimira.api.controller.report_controller import generate_form_obj, generate_model, generate_report_results -from caimira.calculator.report.report_generator import calculate_report_data +from caimira.api.controller import virus_report_controller, co2_report_controller +from caimira.calculator.report.virus_report_data import calculate_report_data from caimira.calculator.validators.virus import virus_validator -from caimira.calculator.validators.co2 import co2_validator from . import markdown_tools -from ..calculator.report.virus_report import ReportGenerator +from .report.virus_report import VirusReportGenerator from ..calculator.report.co2_report import CO2ReportGenerator from .user import AuthenticatedUser, AnonymousUser @@ -181,7 +180,7 @@ class ConcentrationModel(BaseRequestHandler): LOG.debug(pformat(requested_model_config)) try: - form = generate_form_obj(requested_model_config, data_registry) + form = virus_report_controller.generate_form_obj(requested_model_config, data_registry) except Exception as err: LOG.exception(err) response_json = {'code': 400, 'error': f'Your request was invalid {html.escape(str(err))}'} @@ -190,7 +189,7 @@ class ConcentrationModel(BaseRequestHandler): return base_url = self.request.protocol + "://" + self.request.host - report_generator: ReportGenerator = self.settings['report_generator'] + report_generator: VirusReportGenerator = self.settings['report_generator'] executor = loky.get_reusable_executor( max_workers=self.settings['handler_worker_pool_size'], timeout=300, @@ -235,7 +234,7 @@ class ConcentrationModelJsonResponse(BaseRequestHandler): LOG.debug(pformat(requested_model_config)) try: - form = generate_form_obj(requested_model_config, data_registry) + form = virus_report_controller.generate_form_obj(requested_model_config, data_registry) except Exception as err: LOG.exception(err) response_json = {'code': 400, 'error': f'Your request was invalid {html.escape(str(err))}'} @@ -247,7 +246,7 @@ class ConcentrationModelJsonResponse(BaseRequestHandler): max_workers=self.settings['handler_worker_pool_size'], timeout=300, ) - model = generate_model(form) + model = virus_report_controller.generate_model(form, data_registry) report_data_task = executor.submit(calculate_report_data, form, model, executor_factory=functools.partial( concurrent.futures.ThreadPoolExecutor, @@ -266,10 +265,10 @@ class StaticModel(BaseRequestHandler): if data_service: data_service.update_registry(data_registry) - form = generate_form_obj(virus_validator.baseline_raw_form_data(), data_registry) + form = virus_report_controller.generate_form_obj(virus_validator.baseline_raw_form_data(), data_registry) base_url = self.request.protocol + "://" + self.request.host - report_generator: ReportGenerator = self.settings['report_generator'] + report_generator: VirusReportGenerator = self.settings['report_generator'] executor = loky.get_reusable_executor(max_workers=self.settings['handler_worker_pool_size']) report_task = executor.submit( @@ -409,10 +408,7 @@ class CO2ModelResponse(BaseRequestHandler): requested_model_config = tornado.escape.json_decode(self.request.body) try: - form: co2_validator.CO2FormData = co2_validator.CO2FormData.from_dict( - requested_model_config, - data_registry - ) + form = co2_report_controller.generate_form_obj(requested_model_config, data_registry) except Exception as err: if self.settings.get("debug", False): import traceback @@ -544,7 +540,7 @@ def make_app( data_service=data_service, template_environment=template_environment, default_handler_class=Missing404Handler, - report_generator=ReportGenerator(loader, get_root_url, get_root_calculator_url), + report_generator=VirusReportGenerator(loader, get_root_url, get_root_calculator_url), xsrf_cookies=True, # COOKIE_SECRET being undefined will result in no login information being # presented to the user. diff --git a/cern_caimira/src/cern_caimira/apps/calculator/report/co2_report.py b/cern_caimira/src/cern_caimira/apps/calculator/report/co2_report.py index ece70b4b..befa988d 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/report/co2_report.py +++ b/cern_caimira/src/cern_caimira/apps/calculator/report/co2_report.py @@ -1,67 +1,14 @@ import dataclasses +import caimira.calculator.report.co2_report_data as co2_rep_data from caimira.calculator.validators.co2.co2_validator import CO2FormData -from caimira.calculator.models.models import CO2DataModel - @dataclasses.dataclass class CO2ReportGenerator: - def build_initial_plot( - self, - form: CO2FormData, - ) -> dict: - ''' - Initial plot with the suggested ventilation state changes. - This method receives the form input and returns the CO2 - plot with the respective transition times. - ''' - CO2model: CO2DataModel = form.build_model() - - occupancy_transition_times = list(CO2model.occupancy.transition_times) - - ventilation_transition_times: list = form.find_change_points() - # The entire ventilation changes consider the initial and final occupancy state change - all_vent_transition_times: list = sorted( - [occupancy_transition_times[0]] + - [occupancy_transition_times[-1]] + - ventilation_transition_times) - - ventilation_plot: str = form.generate_ventilation_plot( - ventilation_transition_times=all_vent_transition_times, - occupancy_transition_times=occupancy_transition_times - ) - - context = { - 'CO2_plot': ventilation_plot, - 'transition_times': [round(el, 2) for el in all_vent_transition_times], - } - - return context + def build_initial_plot(self, form: CO2FormData): + return co2_rep_data.build_initial_plot(form=form) - def build_fitting_results( - self, - form: CO2FormData, - ) -> dict: - ''' - Final fitting results with the respective predictive CO2. - This method receives the form input and returns the fitting results - along with the CO2 plot with the predictive CO2. - ''' - CO2model: CO2DataModel = form.build_model() - - # Ventilation times after user manipulation from the suggested ventilation state change times. - ventilation_transition_times = list(CO2model.ventilation_transition_times) - - # The result of the following method is a dict with the results of the fitting - # algorithm, namely the breathing rate and ACH values. It also returns the - # predictive CO2 result based on the fitting results. - context = dict(CO2model.CO2_fit_params()) - - # Add the transition times and CO2 plot to the results. - context['transition_times'] = ventilation_transition_times - context['CO2_plot'] = form.generate_ventilation_plot(ventilation_transition_times=ventilation_transition_times[:-1], - predictive_CO2=context['predictive_CO2']) - - return context + def build_fitting_results(self, form: CO2FormData): + return co2_rep_data.build_fitting_results(form=form) \ No newline at end of file diff --git a/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py b/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py index 4d4d4c8e..7b292ef9 100644 --- a/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py +++ b/cern_caimira/src/cern_caimira/apps/calculator/report/virus_report.py @@ -6,254 +6,12 @@ import json import typing import jinja2 import numpy as np -import urllib -import base64 -import zlib from .. import markdown_tools -from caimira.calculator.models import dataclass_utils, models, monte_carlo as mc +from caimira.calculator.models import models from caimira.calculator.validators.virus.virus_validator import VirusFormData -from caimira.calculator.report.report_generator import calculate_report_data - - -def generate_permalink(base_url, get_root_url, get_root_calculator_url, form: VirusFormData): - form_dict = VirusFormData.to_dict(form, strip_defaults=True) - - # Generate the calculator URL arguments that would be needed to re-create this - # form. - args = urllib.parse.urlencode(form_dict) - - # Then zlib compress + base64 encode the string. To be inverted by the - # /_c/ endpoint. - compressed_args = base64.b64encode(zlib.compress(args.encode())).decode() - qr_url = f"{base_url}{get_root_url()}/_c/{compressed_args}" - url = f"{base_url}{get_root_calculator_url()}?{args}" - - return { - 'link': url, - 'shortened': qr_url, - } - - -def model_start_end(model: models.ExposureModel): - t_start = min(model.exposed.presence_interval().boundaries()[0][0], - model.concentration_model.infected.presence_interval().boundaries()[0][0]) - t_end = max(model.exposed.presence_interval().boundaries()[-1][1], - model.concentration_model.infected.presence_interval().boundaries()[-1][1]) - return t_start, t_end - - -def fill_big_gaps(array, gap_size): - """ - Insert values into the given sorted list if there is a gap of more than ``gap_size``. - All values in the given array are preserved, even if they are within the ``gap_size`` of one another. - - >>> fill_big_gaps([1, 2, 4], gap_size=0.75) - [1, 1.75, 2, 2.75, 3.5, 4] - - """ - result = [] - if len(array) == 0: - raise ValueError("Input array must be len > 0") - - last_value = array[0] - for value in array: - while value - last_value > gap_size + 1e-15: - last_value = last_value + gap_size - result.append(last_value) - result.append(value) - last_value = value - return result - - -def non_temp_transition_times(model: models.ExposureModel): - """ - Return the non-temperature (and PiecewiseConstant) based transition times. - - """ - def walk_model(model, name=""): - # Extend walk_dataclass to handle lists of dataclasses - # (e.g. in MultipleVentilation). - for name, obj in dataclass_utils.walk_dataclass(model, name=name): - if name.endswith('.ventilations') and isinstance(obj, (list, tuple)): - for i, item in enumerate(obj): - fq_name_i = f'{name}[{i}]' - yield fq_name_i, item - if dataclasses.is_dataclass(item): - yield from dataclass_utils.walk_dataclass(item, name=fq_name_i) - else: - yield name, obj - - t_start, t_end = model_start_end(model) - - change_times = {t_start, t_end} - for name, obj in walk_model(model, name="exposure"): - if isinstance(obj, models.Interval): - change_times |= obj.transition_times() - - # Only choose times that are in the range of the model (removes things - # such as PeriodicIntervals, which extend beyond the model itself). - return sorted(time for time in change_times if (t_start <= time <= t_end)) - - -def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional[int] = None) -> typing.List[float]: - """ - Pick approximately ``approx_n_pts`` time points which are interesting for the - given model. If not provided by argument, ``approx_n_pts`` is set to be 15 times - the number of hours of the simulation. - - Initially the times are seeded by important state change times (excluding - outside temperature), and the times are then subsequently expanded to ensure - that the step size is at most ``(t_end - t_start) / approx_n_pts``. - - """ - times = non_temp_transition_times(model) - sim_duration = max(times) - min(times) - if not approx_n_pts: - approx_n_pts = sim_duration * 15 - - # Expand the times list to ensure that we have a maximum gap size between - # the key times. - nice_times = fill_big_gaps(times, gap_size=(sim_duration) / approx_n_pts) - return nice_times - - -def manufacture_viral_load_scenarios_percentiles(model: mc.ExposureModel) -> typing.Dict[str, mc.ExposureModel]: - viral_load = model.concentration_model.infected.virus.viral_load_in_sputum - scenarios = {} - for percentil in (0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99): - vl = np.quantile(viral_load, percentil) - specific_vl_scenario = dataclass_utils.nested_replace(model, - {'concentration_model.infected.virus.viral_load_in_sputum': vl} - ) - scenarios[str(vl)] = np.mean( - specific_vl_scenario.infection_probability()) - return scenarios - -def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]: - scenarios = {} - if (form.short_range_option == "short_range_no"): - # Two special option cases - HEPA and/or FFP2 masks. - FFP2_being_worn = bool(form.mask_wearing_option == - 'mask_on' and form.mask_type == 'FFP2') - if FFP2_being_worn and form.hepa_option: - FFP2andHEPAalternative = dataclass_utils.replace( - form, mask_type='Type I') - if not (form.hepa_option and form.mask_wearing_option == 'mask_on' and form.mask_type == 'Type I'): - scenarios['Base scenario with HEPA filter and Type I masks'] = FFP2andHEPAalternative.build_mc_model() - if not FFP2_being_worn and form.hepa_option: - noHEPAalternative = dataclass_utils.replace(form, mask_type='FFP2') - noHEPAalternative = dataclass_utils.replace( - noHEPAalternative, mask_wearing_option='mask_on') - noHEPAalternative = dataclass_utils.replace( - noHEPAalternative, hepa_option=False) - if not (not form.hepa_option and FFP2_being_worn): - scenarios['Base scenario without HEPA filter, with FFP2 masks'] = noHEPAalternative.build_mc_model() - - # The remaining scenarios are based on Type I masks (possibly not worn) - # and no HEPA filtration. - form = dataclass_utils.replace(form, mask_type='Type I') - if form.hepa_option: - form = dataclass_utils.replace(form, hepa_option=False) - - with_mask = dataclass_utils.replace( - form, mask_wearing_option='mask_on') - without_mask = dataclass_utils.replace( - form, mask_wearing_option='mask_off') - - if form.ventilation_type == 'mechanical_ventilation': - # scenarios['Mechanical ventilation with Type I masks'] = with_mask.build_mc_model() - if not (form.mask_wearing_option == 'mask_off'): - scenarios['Mechanical ventilation without masks'] = without_mask.build_mc_model( - ) - - elif form.ventilation_type == 'natural_ventilation': - # scenarios['Windows open with Type I masks'] = with_mask.build_mc_model() - if not (form.mask_wearing_option == 'mask_off'): - scenarios['Windows open without masks'] = without_mask.build_mc_model() - - # No matter the ventilation scheme, we include scenarios which don't have any ventilation. - with_mask_no_vent = dataclass_utils.replace( - with_mask, ventilation_type='no_ventilation') - without_mask_or_vent = dataclass_utils.replace( - without_mask, ventilation_type='no_ventilation') - - if not (form.mask_wearing_option == 'mask_on' and form.mask_type == 'Type I' and form.ventilation_type == 'no_ventilation'): - scenarios['No ventilation with Type I masks'] = with_mask_no_vent.build_mc_model() - if not (form.mask_wearing_option == 'mask_off' and form.ventilation_type == 'no_ventilation'): - scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model() - - else: - no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[], - total_people=form.total_people - form.short_range_occupants) - scenarios['Base scenario without short-range interactions'] = no_short_range_alternative.build_mc_model() - - return scenarios - - -def scenario_statistics( - mc_model: mc.ExposureModel, - sample_times: typing.List[float], - compute_prob_exposure: bool -): - model = mc_model.build_model( - size=mc_model.data_registry.monte_carlo['sample_size']) - if (compute_prob_exposure): - # It means we have data to calculate the total_probability_rule - prob_probabilistic_exposure = model.total_probability_rule() - else: - prob_probabilistic_exposure = 0. - - return { - 'probability_of_infection': np.mean(model.infection_probability()), - 'expected_new_cases': np.mean(model.expected_new_cases()), - 'concentrations': [ - np.mean(model.concentration(time)) - for time in sample_times - ], - 'prob_probabilistic_exposure': prob_probabilistic_exposure, - } - - -def comparison_report( - form: VirusFormData, - report_data: typing.Dict[str, typing.Any], - scenarios: typing.Dict[str, mc.ExposureModel], - sample_times: typing.List[float], - executor_factory: typing.Callable[[], concurrent.futures.Executor], -): - if (form.short_range_option == "short_range_no"): - statistics = { - 'Current scenario': { - 'probability_of_infection': report_data['prob_inf'], - 'expected_new_cases': report_data['expected_new_cases'], - 'concentrations': report_data['concentrations'], - } - } - else: - statistics = {} - - if (form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure"): - compute_prob_exposure = True - else: - compute_prob_exposure = False - - with executor_factory() as executor: - results = executor.map( - scenario_statistics, - scenarios.values(), - [sample_times] * len(scenarios), - [compute_prob_exposure] * len(scenarios), - timeout=60, - ) - - for (name, model), model_stats in zip(scenarios.items(), results): - statistics[name] = model_stats - - return { - 'stats': statistics, - } +from caimira.calculator.report.virus_report_data import calculate_report_data, interesting_times, manufacture_alternative_scenarios, manufacture_viral_load_scenarios_percentiles, comparison_report, generate_permalink def minutes_to_time(minutes: int) -> str: @@ -305,7 +63,7 @@ def non_zero_percentage(percentage: int) -> str: @dataclasses.dataclass -class ReportGenerator: +class VirusReportGenerator: jinja_loader: jinja2.BaseLoader get_root_url: typing.Any get_root_calculator_url: typing.Any @@ -317,7 +75,8 @@ class ReportGenerator: executor_factory: typing.Callable[[], concurrent.futures.Executor], ) -> str: model = form.build_model() - context = self.prepare_context(base_url, model, form, executor_factory=executor_factory) + context = self.prepare_context( + base_url, model, form, executor_factory=executor_factory) return self.render(context) def prepare_context( @@ -339,15 +98,18 @@ class ReportGenerator: } scenario_sample_times = interesting_times(model) - report_data = calculate_report_data(form, model, executor_factory=executor_factory) + report_data = calculate_report_data( + form, model, executor_factory=executor_factory) context.update(report_data) alternative_scenarios = manufacture_alternative_scenarios(form) - context['alternative_viral_load'] = manufacture_viral_load_scenarios_percentiles(model) if form.conditional_probability_viral_loads else None + context['alternative_viral_load'] = manufacture_viral_load_scenarios_percentiles( + model) if form.conditional_probability_viral_loads else None context['alternative_scenarios'] = comparison_report( form, report_data, alternative_scenarios, scenario_sample_times, executor_factory=executor_factory, ) - context['permalink'] = generate_permalink(base_url, self.get_root_url, self.get_root_calculator_url, form) + context['permalink'] = generate_permalink( + base_url, self.get_root_url, self.get_root_calculator_url, form) context['get_url'] = self.get_root_url context['get_calculator_url'] = self.get_root_calculator_url @@ -374,4 +136,3 @@ class ReportGenerator: def render(self, context: dict) -> str: template = self._template_environment().get_template("calculator.report.html.j2") return template.render(**context, text_blocks=template.globals["common_text"]) - \ No newline at end of file diff --git a/cern_caimira/tests/test_report_generator.py b/cern_caimira/tests/test_report_generator.py index e6803def..372cad77 100644 --- a/cern_caimira/tests/test_report_generator.py +++ b/cern_caimira/tests/test_report_generator.py @@ -7,10 +7,9 @@ import numpy as np import pytest from cern_caimira.apps.calculator import make_app -from cern_caimira.apps.calculator import ReportGenerator -from cern_caimira.apps.calculator.report.virus_report import readable_minutes, manufacture_alternative_scenarios, comparison_report -import caimira.calculator.report.report_generator as rep_gen -from caimira.api.controller.report_controller import generate_model, generate_report_results +from cern_caimira.apps.calculator import VirusReportGenerator +from cern_caimira.apps.calculator.report.virus_report import readable_minutes +import caimira.calculator.report.virus_report_data as rep_gen from caimira.calculator.validators.virus.virus_validator import VirusFormData @@ -24,7 +23,7 @@ def test_generate_report(baseline_form) -> None: start = time.perf_counter() - generator: ReportGenerator = make_app().settings['report_generator'] + generator: VirusReportGenerator = make_app().settings['report_generator'] report = generator.build_report("", baseline_form, partial( concurrent.futures.ThreadPoolExecutor, 1, @@ -119,8 +118,8 @@ def test_expected_new_cases(baseline_form_with_sr: VirusFormData): # Long-range contributions alone scenario_sample_times = rep_gen.interesting_times(model) - alternative_scenarios = manufacture_alternative_scenarios(baseline_form_with_sr) - alternative_statistics = comparison_report( + alternative_scenarios = rep_gen.manufacture_alternative_scenarios(baseline_form_with_sr) + alternative_statistics = rep_gen.comparison_report( baseline_form_with_sr, report_data, alternative_scenarios, scenario_sample_times, executor_factory=executor_factory, )