From c52d017a31e15a6617a19733c8d49f4305396a00 Mon Sep 17 00:00:00 2001 From: Nicola Tarocco Date: Sun, 17 Mar 2024 18:12:39 +0100 Subject: [PATCH 1/2] model: compute deposited exposures in parallel * compute deposited exposures by calculating each time window in a thread, in parallel * compute the CO2 concentration calcuation of each window in parallel * the usage of multithreading speeds up the overall calculation by at least by a factor of 3. --- caimira/apps/calculator/report_generator.py | 52 +++++++++++++++------ 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/caimira/apps/calculator/report_generator.py b/caimira/apps/calculator/report_generator.py index 3b44904b..fae89ef5 100644 --- a/caimira/apps/calculator/report_generator.py +++ b/caimira/apps/calculator/report_generator.py @@ -114,9 +114,17 @@ def concentrations_with_sr_breathing(form: VirusFormData, model: models.Exposure lower_concentrations.append(np.array(model.concentration_model.concentration(float(time))).mean()) return lower_concentrations +def _calculate_deposited_exposure(model, time1, time2, fn_name=None): + return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(),fn_name + +def _calculate_long_range_deposited_exposure(model, time1, time2, fn_name=None): + return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name + +def _calculate_co2_concentration(CO2_model, time, fn_name=None): + return np.array(CO2_model.concentration(float(time))).mean(), fn_name @profile -def calculate_report_data(form: VirusFormData, model: models.ExposureModel) -> typing.Dict[str, typing.Any]: +def calculate_report_data(form: VirusFormData, model: models.ExposureModel, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]: times = interesting_times(model) short_range_intervals = [interaction.presence.boundaries()[0] for interaction in model.short_range] short_range_expirations = [interaction['expiration'] for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else [] @@ -127,20 +135,34 @@ def calculate_report_data(form: VirusFormData, model: models.ExposureModel) -> t ] lower_concentrations = concentrations_with_sr_breathing(form, model, times, short_range_intervals) - cumulative_doses = np.cumsum([ - np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean() - for time1, time2 in zip(times[:-1], times[1:]) - ]) - long_range_cumulative_doses = np.cumsum([ - np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean() - for time1, time2 in zip(times[:-1], times[1:]) - ]) - CO2_model: models.CO2ConcentrationModel = form.build_CO2_model() - CO2_concentrations = {'CO₂': {'concentrations': [ - np.array(CO2_model.concentration(float(time))).mean() - for time in times - ]}} + + # compute deposited exposures and CO2 concentrations in parallel to increase performance + deposited_exposures = [] + long_range_deposited_exposures = [] + CO2_concentrations = [] + + tasks = [] + with executor_factory() as executor: + for time1, time2 in zip(times[:-1], times[1:]): + tasks.append(executor.submit(_calculate_deposited_exposure, model, time1, time2, fn_name="de")) + tasks.append(executor.submit(_calculate_long_range_deposited_exposure, model, time1, time2, fn_name="lr")) + # co2 concentration: takes each time as param, not the interval + tasks.append(executor.submit(_calculate_co2_concentration, CO2_model, time1, fn_name="co2")) + # co2 concentration: calculate the last time too + tasks.append(executor.submit(_calculate_co2_concentration, CO2_model, times[-1], fn_name="co2")) + + for task in tasks: + result, fn_name = task.result() + if fn_name == "de": + deposited_exposures.append(result) + elif fn_name == "lr": + long_range_deposited_exposures.append(result) + elif fn_name == "co2": + CO2_concentrations.append(result) + + cumulative_doses = np.cumsum(deposited_exposures) + long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures) prob = np.array(model.infection_probability()) prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True) @@ -513,7 +535,7 @@ class ReportGenerator: } scenario_sample_times = interesting_times(model) - report_data = calculate_report_data(form, model) + report_data = calculate_report_data(form, model, executor_factory=executor_factory) context.update(report_data) alternative_scenarios = manufacture_alternative_scenarios(form) From ddf7684798d76397acf5136b160c460c1ede7d60 Mon Sep 17 00:00:00 2001 From: Luis Aleixo Date: Wed, 20 Mar 2024 11:32:01 +0100 Subject: [PATCH 2/2] added executor factory to ConcentrationModelJsonResponse class --- caimira/apps/calculator/__init__.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/caimira/apps/calculator/__init__.py b/caimira/apps/calculator/__init__.py index 48f022e4..3dc5f501 100644 --- a/caimira/apps/calculator/__init__.py +++ b/caimira/apps/calculator/__init__.py @@ -244,7 +244,11 @@ class ConcentrationModelJsonResponse(BaseRequestHandler): timeout=300, ) model = form.build_model() - report_data_task = executor.submit(calculate_report_data, form, model) + report_data_task = executor.submit(calculate_report_data, form, model, + executor_factory=functools.partial( + concurrent.futures.ThreadPoolExecutor, + self.settings['report_generation_parallelism'], + ),) report_data: dict = await asyncio.wrap_future(report_data_task) await self.finish(report_data)