Changed methods names and variables. New logic to calculate exposure between bounds

This commit is contained in:
Luis Aleixo 2021-09-07 15:30:54 +02:00
parent d3e56f69b2
commit 0b83c6491e
2 changed files with 51 additions and 69 deletions

View file

@ -1,3 +1,10 @@
from ... import dataclass_utils
from .model_generator import FormData, _DEFAULT_MC_SAMPLE_SIZE
from ... import monte_carlo as mc
from cara import models
import qrcode
import numpy as np
import matplotlib.pyplot as plt
import concurrent.futures
import base64
import dataclasses
@ -12,14 +19,6 @@ import jinja2
import matplotlib
from numpy.lib.function_base import append, quantile
matplotlib.use('agg')
import matplotlib.pyplot as plt
import numpy as np
import qrcode
from cara import models
from ... import monte_carlo as mc
from .model_generator import FormData, _DEFAULT_MC_SAMPLE_SIZE
from ... import dataclass_utils
def model_start_end(model: models.ExposureModel):
@ -97,24 +96,11 @@ def interesting_times(model: models.ExposureModel, approx_n_pts=100) -> typing.L
# Expand the times list to ensure that we have a maximum gap size between
# the key times.
nice_times = fill_big_gaps(times, gap_size=(max(times) - min(times)) / approx_n_pts)
nice_times = fill_big_gaps(times, gap_size=(
max(times) - min(times)) / approx_n_pts)
return nice_times
def get_boundaries_from_time(model: models.ExposureModel, time: float):
x = []
for start, stop in model.exposed.presence.boundaries():
if start > time:
break
elif time <= stop:
stop = time
x.append([start, stop])
break
else:
x.append([start, stop])
return x
def calculate_report_data(model: models.ExposureModel):
times = interesting_times(model)
@ -124,23 +110,10 @@ def calculate_report_data(model: models.ExposureModel):
]
highest_const = max(concentrations)
prob = np.array(model.infection_probability()).mean()
er = np.array(model.concentration_model.infected.emission_rate_when_present()).mean()
er = np.array(
model.concentration_model.infected.emission_rate_when_present()).mean()
exposed_occupants = model.exposed.number
expected_new_cases = np.array(model.expected_new_cases()).mean()
# present_indexes = [model.exposed.person_present(t) for t in times]
# filtered_times = np.array(times)[present_indexes]
b = [
np.array(model.cenas(list(get_boundaries_from_time(model, time)))).mean()
for time in times
]
print(b)
# cumulative_doses = [
# np.array(model.inhaled_exposure_between_bounds(list(get_boundaries_from_time(model, time)))).mean()
# for time in times
# ]
cumulative_doses = [
np.array(model.inhaled_exposure_between_bounds(float(time))).mean()
for time in times
@ -199,7 +172,8 @@ def _img2bytes(figure):
def _figure2bytes(figure):
# Draw the image
img_data = io.BytesIO()
figure.savefig(img_data, format='png', bbox_inches="tight", transparent=True)
figure.savefig(img_data, format='png',
bbox_inches="tight", transparent=True)
return img_data
@ -250,14 +224,18 @@ def manufacture_alternative_scenarios(form: FormData) -> typing.Dict[str, mc.Exp
scenarios = {}
# Two special option cases - HEPA and/or FFP2 masks.
FFP2_being_worn = bool(form.mask_wearing_option == 'mask_on' and form.mask_type == 'FFP2')
FFP2_being_worn = bool(form.mask_wearing_option ==
'mask_on' and form.mask_type == 'FFP2')
if FFP2_being_worn and form.hepa_option:
FFP2andHEPAalternative = dataclass_utils.replace(form, mask_type='Type I')
FFP2andHEPAalternative = dataclass_utils.replace(
form, mask_type='Type I')
scenarios['Base scenario with HEPA filter and Type I masks'] = FFP2andHEPAalternative.build_mc_model()
if not FFP2_being_worn and form.hepa_option:
noHEPAalternative = dataclass_utils.replace(form, mask_type = 'FFP2')
noHEPAalternative = dataclass_utils.replace(noHEPAalternative, mask_wearing_option = 'mask_on')
noHEPAalternative = dataclass_utils.replace(noHEPAalternative, hepa_option=False)
noHEPAalternative = dataclass_utils.replace(form, mask_type='FFP2')
noHEPAalternative = dataclass_utils.replace(
noHEPAalternative, mask_wearing_option='mask_on')
noHEPAalternative = dataclass_utils.replace(
noHEPAalternative, hepa_option=False)
scenarios['Base scenario without HEPA filter, with FFP2 masks'] = noHEPAalternative.build_mc_model()
# The remaining scenarios are based on Type I masks (possibly not worn)
@ -267,7 +245,8 @@ def manufacture_alternative_scenarios(form: FormData) -> typing.Dict[str, mc.Exp
form = dataclass_utils.replace(form, hepa_option=False)
with_mask = dataclass_utils.replace(form, mask_wearing_option='mask_on')
without_mask = dataclass_utils.replace(form, mask_wearing_option='mask_off')
without_mask = dataclass_utils.replace(
form, mask_wearing_option='mask_off')
if form.ventilation_type == 'mechanical_ventilation':
#scenarios['Mechanical ventilation with Type I masks'] = with_mask.build_mc_model()
@ -278,8 +257,10 @@ def manufacture_alternative_scenarios(form: FormData) -> typing.Dict[str, mc.Exp
scenarios['Windows open without masks'] = without_mask.build_mc_model()
# No matter the ventilation scheme, we include scenarios which don't have any ventilation.
with_mask_no_vent = dataclass_utils.replace(with_mask, ventilation_type='no_ventilation')
without_mask_or_vent = dataclass_utils.replace(without_mask, ventilation_type='no_ventilation')
with_mask_no_vent = dataclass_utils.replace(
with_mask, ventilation_type='no_ventilation')
without_mask_or_vent = dataclass_utils.replace(
without_mask, ventilation_type='no_ventilation')
scenarios['No ventilation with Type I masks'] = with_mask_no_vent.build_mc_model()
scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model()
@ -297,27 +278,23 @@ def comparison_plot(scenarios: typing.Dict[str, dict], sample_times: typing.List
'Base scenario with HEPA and FFP2 masks',
]
datetimes = [datetime(1970, 1, 1) + timedelta(hours=time) for time in sample_times]
datetimes = [datetime(1970, 1, 1) + timedelta(hours=time)
for time in sample_times]
for name, statistics in scenarios.items():
model = statistics['model']
concentrations = statistics['concentrations']
#See CERN-OPEN-2021-004, p. 15, eq. 16. - Cumulative Dose
qds = [np.mean(model.inhaled_exposure_between_bounds(t)) for t in sample_times]
# Plot concentrations and cumulative dose
# Plot concentrations
if name in dash_styled_scenarios:
ax.plot(datetimes, concentrations, label=name, linestyle='--')
ax1.plot(datetimes, qds, label=f'Mean cumulative dose:\n{name}', linestyle='dotted')
else:
ax.plot(datetimes, concentrations, label=name, linestyle='-', alpha=0.5)
ax1.plot(datetimes, qds, label=f'Mean cumulative dose:\n{name}', linestyle='dotted', alpha=0.5)
ax.plot(datetimes, concentrations,
label=name, linestyle='-', alpha=0.5)
# Place a legend outside of the axes itself.
fig.legend(bbox_to_anchor=(1.05, 0.95), loc='upper left')
ax.spines['right'].set_visible(False)
ax.spines['top'].set_visible(False)
ax.set_xlabel('Time of day', fontsize=14)
@ -379,7 +356,8 @@ class ReportGenerator:
executor_factory: typing.Callable[[], concurrent.futures.Executor],
) -> str:
model = form.build_model()
context = self.prepare_context(base_url, model, form, executor_factory=executor_factory)
context = self.prepare_context(
base_url, model, form, executor_factory=executor_factory)
return self.render(context)
def prepare_context(
@ -405,14 +383,15 @@ class ReportGenerator:
context['alternative_scenarios'] = comparison_report(
alternative_scenarios, scenario_sample_times, executor_factory=executor_factory,
)
context['qr_code'] = generate_qr_code(base_url, self.calculator_prefix, form)
context['qr_code'] = generate_qr_code(
base_url, self.calculator_prefix, form)
context['calculator_prefix'] = self.calculator_prefix
context['scale_warning'] = {
'level': 'yellow-2',
'level': 'yellow-2',
'incidence_rate': 'lower than 25 new cases per 100 000 inhabitants',
'onsite_access': 'of about 8000',
'onsite_access': 'of about 8000',
'threshold': ''
}
}
return context
def _template_environment(self) -> jinja2.Environment:

View file

@ -878,20 +878,23 @@ class ExposureModel:
fraction_deposited: _VectorisedFloat = 0.6
def exposure_between_bounds(self, time: float) -> _VectorisedFloat:
"""The cumulative number of virions per meter^3 from model start to the given time."""
exposure = 0.0
"""The number of virions per meter^3 from model start to the given time."""
boundaries = []
for start, stop in self.exposed.presence.boundaries():
if start > time:
break
elif time <= stop:
stop = time
exposure += self.concentration_model.integrated_concentration(start, stop)
boundaries.append([start, stop])
break
else:
exposure += self.concentration_model.integrated_concentration(start, stop)
boundaries.append([start, stop])
exposure = 0.0
for start, stop in boundaries:
exposure += self.concentration_model.integrated_concentration(start, stop)
return exposure * self.repeats
return exposure
def exposure(self) -> _VectorisedFloat:
"""The number of virions per meter^3 for the full simulation time."""