Re-introduce parallelism through loky.

This commit is contained in:
Phil Elson 2021-07-16 12:34:27 +02:00
parent b21a387053
commit 77e9215812
7 changed files with 86 additions and 22 deletions

View file

@ -5,6 +5,7 @@ import asyncio
import concurrent.futures
import datetime
import base64
import functools
import html
import json
import os
@ -15,7 +16,9 @@ import uuid
import zlib
import jinja2
import loky
from tornado.web import Application, RequestHandler, StaticFileHandler
import tornado.log
from . import markdown_tools
from . import model_generator
@ -111,10 +114,18 @@ class ConcentrationModel(BaseRequestHandler):
base_url = self.request.protocol + "://" + self.request.host
report_generator: ReportGenerator = self.settings['report_generator']
report = report_generator.build_report(base_url, form)
if self.settings.get("debug", False):
dt = (datetime.datetime.now() - start)
print(f'Report response time {dt.seconds}.{dt.microseconds}s')
executor = loky.get_reusable_executor(
max_workers=self.settings['handler_worker_pool_size'],
timeout=300,
)
report_task = executor.submit(
report_generator.build_report, base_url, form,
executor_factory=functools.partial(
concurrent.futures.ThreadPoolExecutor,
self.settings['report_generation_parallelism'],
),
)
report: str = await asyncio.wrap_future(report_task)
self.finish(report)
@ -123,11 +134,18 @@ class StaticModel(BaseRequestHandler):
form = model_generator.FormData.from_dict(model_generator.baseline_raw_form_data())
base_url = self.request.protocol + "://" + self.request.host
report_generator: ReportGenerator = self.settings['report_generator']
report = report_generator.build_report(base_url, form)
executor = loky.get_reusable_executor(max_workers=self.settings['handler_worker_pool_size'])
report_task = executor.submit(
report_generator.build_report, base_url, form,
executor_factory=functools.partial(
concurrent.futures.ThreadPoolExecutor,
self.settings['report_generation_parallelism'],
),
)
report: str = await asyncio.wrap_future(report_task)
self.finish(report)
class LandingPage(BaseRequestHandler):
def get(self):
template = self.settings["template_environment"].get_template(
@ -222,6 +240,9 @@ def make_app(
template_environment.get_template('common_text.md.j2')
)
if debug:
tornado.log.enable_pretty_logging()
return Application(
urls,
debug=debug,
@ -233,4 +254,19 @@ def make_app(
# COOKIE_SECRET being undefined will result in no login information being
# presented to the user.
cookie_secret=os.environ.get('COOKIE_SECRET', '<undefined>'),
# Process parallelism controls. There is a balance between serving a single report
# requests quickly or serving multiple requests concurrently.
# The defaults are: handle one report at a time, and allow parallelism
# of that report generation. A value of ``None`` will result in the number of
# processes being determined based on the number of CPUs. For some deployments,
# such as on OpenShift this number does *not* reflect the real number of CPUs that
# can be used, and it is recommended to specify these values explicitly (through
# the environment variables).
handler_worker_pool_size=(
int(os.environ.get("HANDLER_WORKER_POOL_SIZE", 1)) or None
),
report_generation_parallelism=(
int(os.environ.get('REPORT_PARALLELISM', 0)) or None
),
)

View file

@ -4,15 +4,16 @@ import dataclasses
from datetime import datetime, timedelta
import io
import typing
import urllib
import zlib
import qrcode
import urllib
import loky
import jinja2
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
import numpy as np
import qrcode
from cara import models
from ... import monte_carlo as mc
@ -251,15 +252,20 @@ def scenario_statistics(mc_model: mc.ExposureModel, sample_times: np.ndarray):
}
def comparison_report(scenarios: typing.Dict[str, mc.ExposureModel], sample_times: np.ndarray):
def comparison_report(
scenarios: typing.Dict[str, mc.ExposureModel],
sample_times: np.ndarray,
executor_factory: typing.Callable[[], concurrent.futures.Executor],
):
statistics = {}
with concurrent.futures.ProcessPoolExecutor() as executor:
with executor_factory() as executor:
results = executor.map(
scenario_statistics,
scenarios.values(),
[sample_times] * len(scenarios),
timeout=60,
)
for (name, model), model_stats in zip(scenarios.items(), results):
statistics[name] = model_stats
return {
@ -273,12 +279,23 @@ class ReportGenerator:
jinja_loader: jinja2.BaseLoader
calculator_prefix: str
def build_report(self, base_url: str, form: FormData) -> str:
def build_report(
self,
base_url: str,
form: FormData,
executor_factory: typing.Callable[[], concurrent.futures.Executor],
) -> str:
model = form.build_model()
context = self.prepare_context(base_url, model, form)
context = self.prepare_context(base_url, model, form, executor_factory=executor_factory)
return self.render(context)
def prepare_context(self, base_url: str, model: models.ExposureModel, form: FormData) -> dict:
def prepare_context(
self,
base_url: str,
model: models.ExposureModel,
form: FormData,
executor_factory: typing.Callable[[], concurrent.futures.Executor],
) -> dict:
now = datetime.utcnow().astimezone()
time = now.strftime("%Y-%m-%d %H:%M:%S UTC")
@ -293,7 +310,9 @@ class ReportGenerator:
context.update(calculate_report_data(model))
alternative_scenarios = manufacture_alternative_scenarios(form)
context['alternative_scenarios'] = comparison_report(alternative_scenarios, scenario_sample_times)
context['alternative_scenarios'] = comparison_report(
alternative_scenarios, scenario_sample_times, executor_factory=executor_factory,
)
context['qr_code'] = generate_qr_code(base_url, self.calculator_prefix, form)
context['calculator_prefix'] = self.calculator_prefix
context['scale_warning'] = {

View file

@ -1,8 +1,10 @@
import concurrent.futures
from functools import partial
import time
import pytest
from cara.apps.calculator import report_generator
from cara.apps.calculator.report_generator import ReportGenerator, readable_minutes
from cara.apps.calculator import make_app
@ -15,8 +17,10 @@ def test_generate_report(baseline_form):
start = time.perf_counter()
generator: report_generator.ReportGenerator = make_app().settings['report_generator']
report = generator.build_report("", baseline_form)
generator: ReportGenerator = make_app().settings['report_generator']
report = generator.build_report("", baseline_form, partial(
concurrent.futures.ThreadPoolExecutor, 1,
))
end = time.perf_counter()
assert report != ""
assert end - start < time_limit
@ -33,4 +37,4 @@ def test_generate_report(baseline_form):
],
)
def test_readable_minutes(test_input, expected):
assert report_generator.readable_minutes(test_input) == expected
assert readable_minutes(test_input) == expected

View file

@ -63,9 +63,7 @@ class TestBasicApp(tornado.testing.AsyncHTTPTestCase):
# but the end time is after the other request (because it takes longer
# to process a report than a simple page).
assert response.start_time < other_response.start_time
# Known fail after reverting in https://gitlab.cern.ch/cara/cara/-/merge_requests/219.
with pytest.raises(AssertionError):
assert end_time(response) > end_time(other_response)
assert end_time(response) > end_time(other_response)
self.assertEqual(response.code, 200)
assert 'CERN HSE' not in response.body.decode()

View file

@ -35,6 +35,7 @@ jupyter-server==1.4.1
jupyterlab-pygments==0.1.2
jupyterlab-widgets==1.0.0
kiwisolver==1.3.1
loky==2.9.0
MarkupSafe==1.1.1
matplotlib==3.3.4
memoization==0.3.2
@ -53,6 +54,7 @@ pickleshare==0.7.5
Pillow==8.1.0
prometheus-client==0.9.0
prompt-toolkit==3.0.16
psutil==5.8.0
ptyprocess==0.7.0
pycparser==2.20
Pygments==2.8.0

View file

@ -5,7 +5,7 @@ addopts = --mypy
[mypy]
no_warn_no_return = True
[mypy-matplotlib.*]
[mypy-loky.*]
ignore_missing_imports = True
[mypy-ipympl.*]
@ -14,6 +14,9 @@ ignore_missing_imports = True
[mypy-ipywidgets.*]
ignore_missing_imports = True
[mypy-matplotlib.*]
ignore_missing_imports = True
[mypy-mistune.*]
ignore_missing_imports = True

View file

@ -23,10 +23,12 @@ REQUIREMENTS: dict = {
'ipympl',
'ipywidgets',
'Jinja2',
'loky',
'matplotlib',
'memoization',
'mistune',
'numpy',
'psutil',
'qrcode[pil]',
'scipy',
'sklearn',