Merge branch 'feature/parallel_scenarios' into 'master'

Distribute the calculation of alternative scenarios

See merge request cara/cara!209
This commit is contained in:
Philip James Elson 2021-07-12 08:06:55 +00:00
commit db9af9cf04
3 changed files with 77 additions and 79 deletions

View file

@ -96,6 +96,7 @@ class ConcentrationModel(BaseRequestHandler):
if self.settings.get("debug", False):
from pprint import pprint
pprint(requested_model_config)
start = datetime.datetime.now()
try:
form = model_generator.FormData.from_dict(requested_model_config)
@ -114,6 +115,9 @@ class ConcentrationModel(BaseRequestHandler):
report_generator.build_report, base_url, form,
)
report: str = await asyncio.wrap_future(report_task)
if self.settings.get("debug", False):
dt = (datetime.datetime.now() - start)
print(f'Report response time {dt.seconds}.{dt.microseconds}s')
self.finish(report)

View file

@ -22,7 +22,8 @@ minutes_since_midnight = typing.NewType('minutes_since_midnight', int)
# Used to declare when an attribute of a class must have a value provided, and
# there should be no default value used.
_NO_DEFAULT = object()
_SAMPLE_SIZE = 50000
_DEFAULT_MC_SAMPLE_SIZE = 50000
@dataclass
class FormData:
@ -218,8 +219,30 @@ class FormData:
raise ValueError("mechanical_ventilation_type cannot be 'not-applicable' if "
"ventilation_type is 'mechanical_ventilation'")
def build_model(self) -> models.ExposureModel:
return model_from_form(self)
def build_mc_model(self) -> mc.ExposureModel:
# Initializes room with volume either given directly or as product of area and height
if self.volume_type == 'room_volume_explicit':
volume = self.room_volume
else:
volume = self.floor_area * self.ceiling_height
if self.room_heating_option:
humidity = 0.3
else:
humidity = 0.5
room = models.Room(volume=volume, humidity=humidity)
# Initializes and returns a model with the attributes defined above
return mc.ExposureModel(
concentration_model=mc.ConcentrationModel(
room=room,
ventilation=self.ventilation(),
infected=self.infected_population(),
),
exposed=self.exposed_population()
)
def build_model(self, sample_size=_DEFAULT_MC_SAMPLE_SIZE) -> models.ExposureModel:
return self.build_mc_model().build_model(size=sample_size)
def ventilation(self) -> models._VentilationBase:
always_on = models.PeriodicInterval(period=120, duration=120)
@ -553,29 +576,6 @@ def build_expiration(expiration_definition) -> models._ExpirationBase:
)
def model_from_form(form: FormData) -> models.ExposureModel:
# Initializes room with volume either given directly or as product of area and height
if form.volume_type == 'room_volume_explicit':
volume = form.room_volume
else:
volume = form.floor_area * form.ceiling_height
if form.room_heating_option:
humidity = 0.3
else:
humidity = 0.5
room = models.Room(volume=volume, humidity=humidity)
# Initializes and returns a model with the attributes defined above
return mc.ExposureModel(
concentration_model=mc.ConcentrationModel(
room=room,
ventilation=form.ventilation(),
infected=form.infected_population(),
),
exposed=form.exposed_population()
).build_model(size=_SAMPLE_SIZE)
def baseline_raw_form_data():
# Note: This isn't a special "baseline". It can be updated as required.
return {

View file

@ -1,3 +1,4 @@
import concurrent.futures
import base64
import dataclasses
from datetime import datetime, timedelta
@ -14,17 +15,11 @@ import matplotlib.pyplot as plt
import numpy as np
from cara import models
from .model_generator import FormData
from ... import monte_carlo as mc
from .model_generator import FormData, _DEFAULT_MC_SAMPLE_SIZE
from ... import dataclass_utils
@dataclasses.dataclass(frozen=True)
class RepeatEvents:
repeats: int
probability_of_infection: float
expected_new_cases: float
def model_start_end(model: models.ExposureModel):
t_start = min(model.exposed.presence.boundaries()[0][0],
model.concentration_model.infected.presence.boundaries()[0][0])
@ -46,18 +41,6 @@ def calculate_report_data(model: models.ExposureModel):
exposed_occupants = model.exposed.number
expected_new_cases = np.mean(model.expected_new_cases())
repeated_events = []
for n in [1, 2, 3, 4, 5]:
repeat_model = dataclass_utils.replace(model, repeats=n)
repeated_events.append(
RepeatEvents(
repeats=n,
probability_of_infection=np.mean(repeat_model.infection_probability()),
expected_new_cases=np.mean(repeat_model.expected_new_cases()),
)
)
return {
"times": times,
"concentrations": concentrations,
@ -67,7 +50,6 @@ def calculate_report_data(model: models.ExposureModel):
"exposed_occupants": exposed_occupants,
"expected_new_cases": expected_new_cases,
"scenario_plot_src": img2base64(_figure2bytes(plot(times, concentrations, model))),
"repeated_events": repeated_events,
}
@ -187,17 +169,17 @@ def non_zero_percentage(percentage: int) -> str:
return "{:0.0f}%".format(percentage)
def manufacture_alternative_scenarios(form: FormData) -> typing.Dict[str, models.ExposureModel]:
def manufacture_alternative_scenarios(form: FormData) -> typing.Dict[str, mc.ExposureModel]:
scenarios = {}
# Two special option cases - HEPA and/or FFP2 masks.
FFP2_being_worn = bool(form.mask_wearing_option == 'mask_on' and form.mask_type == 'FFP2')
if FFP2_being_worn and form.hepa_option:
scenarios['Base scenario with HEPA and FFP2 masks'] = form.build_model()
scenarios['Base scenario with HEPA and FFP2 masks'] = form.build_mc_model()
elif FFP2_being_worn:
scenarios['Base scenario with FFP2 masks'] = form.build_model()
scenarios['Base scenario with FFP2 masks'] = form.build_mc_model()
elif form.hepa_option:
scenarios['Base scenario with HEPA filter'] = form.build_model()
scenarios['Base scenario with HEPA filter'] = form.build_mc_model()
# The remaining scenarios are based on Type I masks (possibly not worn)
# and no HEPA filtration.
@ -209,47 +191,40 @@ def manufacture_alternative_scenarios(form: FormData) -> typing.Dict[str, models
without_mask = dataclass_utils.replace(form, mask_wearing_option='mask_off')
if form.ventilation_type == 'mechanical_ventilation':
scenarios['Mechanical ventilation with Type I masks'] = with_mask.build_model()
scenarios['Mechanical ventilation without masks'] = without_mask.build_model()
scenarios['Mechanical ventilation with Type I masks'] = with_mask.build_mc_model()
scenarios['Mechanical ventilation without masks'] = without_mask.build_mc_model()
elif form.ventilation_type == 'natural_ventilation':
scenarios['Windows open with Type I masks'] = with_mask.build_model()
scenarios['Windows open without masks'] = without_mask.build_model()
scenarios['Windows open with Type I masks'] = with_mask.build_mc_model()
scenarios['Windows open without masks'] = without_mask.build_mc_model()
# No matter the ventilation scheme, we include scenarios which don't have any ventilation.
with_mask_no_vent = dataclass_utils.replace(with_mask, ventilation_type='no_ventilation')
without_mask_or_vent = dataclass_utils.replace(without_mask, ventilation_type='no_ventilation')
scenarios['No ventilation with Type I masks'] = with_mask_no_vent.build_model()
scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_model()
scenarios['No ventilation with Type I masks'] = with_mask_no_vent.build_mc_model()
scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model()
return scenarios
def comparison_plot(scenarios: typing.Dict[str, models.ExposureModel]):
def comparison_plot(scenarios: typing.Dict[str, dict], sample_times: np.ndarray):
fig = plt.figure()
ax = fig.add_subplot(1, 1, 1)
resolution = 350
times = None
dash_styled_scenarios = [
'Base scenario with FFP2 masks',
'Base scenario with HEPA filter',
'Base scenario with HEPA and FFP2 masks',
]
for name, model in scenarios.items():
if times is None:
t_start, t_end = model_start_end(model)
times = np.linspace(t_start, t_end, resolution)
datetimes = [datetime(1970, 1, 1) + timedelta(hours=time) for time in times]
concentrations = [np.mean(model.concentration_model.concentration(time))
for time in times]
sample_dts = [datetime(1970, 1, 1) + timedelta(hours=time) for time in sample_times]
for name, statistics in scenarios.items():
concentrations = statistics['concentrations']
if name in dash_styled_scenarios:
ax.plot(datetimes, concentrations, label=name, linestyle='--')
ax.plot(sample_dts, concentrations, label=name, linestyle='--')
else:
ax.plot(datetimes, concentrations, label=name, linestyle='-', alpha=0.5)
ax.plot(sample_dts, concentrations, label=name, linestyle='-', alpha=0.5)
# Place a legend outside of the axes itself.
ax.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
@ -264,15 +239,31 @@ def comparison_plot(scenarios: typing.Dict[str, models.ExposureModel]):
return fig
def comparison_report(scenarios: typing.Dict[str, models.ExposureModel]):
statistics = {}
for name, model in scenarios.items():
statistics[name] = {
'probability_of_infection': np.mean(model.infection_probability()),
'expected_new_cases': np.mean(model.expected_new_cases()),
}
def scenario_statistics(mc_model: mc.ExposureModel, sample_times: np.ndarray):
model = mc_model.build_model(size=_DEFAULT_MC_SAMPLE_SIZE)
return {
'plot': img2base64(_figure2bytes(comparison_plot(scenarios))),
'probability_of_infection': np.mean(model.infection_probability()),
'expected_new_cases': np.mean(model.expected_new_cases()),
'concentrations': [
np.mean(model.concentration_model.concentration(time))
for time in sample_times
],
}
def comparison_report(scenarios: typing.Dict[str, mc.ExposureModel], sample_times: np.ndarray):
statistics = {}
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(
scenario_statistics,
scenarios.values(),
[sample_times] * len(scenarios),
timeout=60,
)
for (name, model), model_stats in zip(scenarios.items(), results):
statistics[name] = model_stats
return {
'plot': img2base64(_figure2bytes(comparison_plot(statistics, sample_times))),
'stats': statistics,
}
@ -297,9 +288,12 @@ class ReportGenerator:
'creation_date': time,
}
t_start, t_end = model_start_end(model)
scenario_sample_times = np.linspace(t_start, t_end, 350)
context.update(calculate_report_data(model))
alternative_scenarios = manufacture_alternative_scenarios(form)
context['alternative_scenarios'] = comparison_report(alternative_scenarios)
context['alternative_scenarios'] = comparison_report(alternative_scenarios, scenario_sample_times)
context['qr_code'] = generate_qr_code(base_url, self.calculator_prefix, form)
context['calculator_prefix'] = self.calculator_prefix
return context