Merge branch 'feature/parallel-deposited-exposures' into 'master'
model: compute deposited exposures in parallel See merge request caimira/caimira!490
This commit is contained in:
commit
5042f5d82f
2 changed files with 42 additions and 16 deletions
|
|
@ -244,7 +244,11 @@ class ConcentrationModelJsonResponse(BaseRequestHandler):
|
|||
timeout=300,
|
||||
)
|
||||
model = form.build_model()
|
||||
report_data_task = executor.submit(calculate_report_data, form, model)
|
||||
report_data_task = executor.submit(calculate_report_data, form, model,
|
||||
executor_factory=functools.partial(
|
||||
concurrent.futures.ThreadPoolExecutor,
|
||||
self.settings['report_generation_parallelism'],
|
||||
),)
|
||||
report_data: dict = await asyncio.wrap_future(report_data_task)
|
||||
await self.finish(report_data)
|
||||
|
||||
|
|
|
|||
|
|
@ -114,9 +114,17 @@ def concentrations_with_sr_breathing(form: VirusFormData, model: models.Exposure
|
|||
lower_concentrations.append(np.array(model.concentration_model.concentration(float(time))).mean())
|
||||
return lower_concentrations
|
||||
|
||||
def _calculate_deposited_exposure(model, time1, time2, fn_name=None):
|
||||
return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(),fn_name
|
||||
|
||||
def _calculate_long_range_deposited_exposure(model, time1, time2, fn_name=None):
|
||||
return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name
|
||||
|
||||
def _calculate_co2_concentration(CO2_model, time, fn_name=None):
|
||||
return np.array(CO2_model.concentration(float(time))).mean(), fn_name
|
||||
|
||||
@profile
|
||||
def calculate_report_data(form: VirusFormData, model: models.ExposureModel) -> typing.Dict[str, typing.Any]:
|
||||
def calculate_report_data(form: VirusFormData, model: models.ExposureModel, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]:
|
||||
times = interesting_times(model)
|
||||
short_range_intervals = [interaction.presence.boundaries()[0] for interaction in model.short_range]
|
||||
short_range_expirations = [interaction['expiration'] for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else []
|
||||
|
|
@ -127,20 +135,34 @@ def calculate_report_data(form: VirusFormData, model: models.ExposureModel) -> t
|
|||
]
|
||||
lower_concentrations = concentrations_with_sr_breathing(form, model, times, short_range_intervals)
|
||||
|
||||
cumulative_doses = np.cumsum([
|
||||
np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean()
|
||||
for time1, time2 in zip(times[:-1], times[1:])
|
||||
])
|
||||
long_range_cumulative_doses = np.cumsum([
|
||||
np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean()
|
||||
for time1, time2 in zip(times[:-1], times[1:])
|
||||
])
|
||||
|
||||
CO2_model: models.CO2ConcentrationModel = form.build_CO2_model()
|
||||
CO2_concentrations = {'CO₂': {'concentrations': [
|
||||
np.array(CO2_model.concentration(float(time))).mean()
|
||||
for time in times
|
||||
]}}
|
||||
|
||||
# compute deposited exposures and CO2 concentrations in parallel to increase performance
|
||||
deposited_exposures = []
|
||||
long_range_deposited_exposures = []
|
||||
CO2_concentrations = []
|
||||
|
||||
tasks = []
|
||||
with executor_factory() as executor:
|
||||
for time1, time2 in zip(times[:-1], times[1:]):
|
||||
tasks.append(executor.submit(_calculate_deposited_exposure, model, time1, time2, fn_name="de"))
|
||||
tasks.append(executor.submit(_calculate_long_range_deposited_exposure, model, time1, time2, fn_name="lr"))
|
||||
# co2 concentration: takes each time as param, not the interval
|
||||
tasks.append(executor.submit(_calculate_co2_concentration, CO2_model, time1, fn_name="co2"))
|
||||
# co2 concentration: calculate the last time too
|
||||
tasks.append(executor.submit(_calculate_co2_concentration, CO2_model, times[-1], fn_name="co2"))
|
||||
|
||||
for task in tasks:
|
||||
result, fn_name = task.result()
|
||||
if fn_name == "de":
|
||||
deposited_exposures.append(result)
|
||||
elif fn_name == "lr":
|
||||
long_range_deposited_exposures.append(result)
|
||||
elif fn_name == "co2":
|
||||
CO2_concentrations.append(result)
|
||||
|
||||
cumulative_doses = np.cumsum(deposited_exposures)
|
||||
long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures)
|
||||
|
||||
prob = np.array(model.infection_probability())
|
||||
prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True)
|
||||
|
|
@ -513,7 +535,7 @@ class ReportGenerator:
|
|||
}
|
||||
|
||||
scenario_sample_times = interesting_times(model)
|
||||
report_data = calculate_report_data(form, model)
|
||||
report_data = calculate_report_data(form, model, executor_factory=executor_factory)
|
||||
context.update(report_data)
|
||||
|
||||
alternative_scenarios = manufacture_alternative_scenarios(form)
|
||||
|
|
|
|||
Loading…
Reference in a new issue