Merge branch 'master' into feature/report_reformulation

This commit is contained in:
Luis Aleixo 2021-08-23 11:52:46 +02:00
commit 7bd0851397
26 changed files with 612 additions and 277 deletions

View file

@ -42,7 +42,7 @@ test_dev:
- python ./config-generate.py ${CARA_INSTANCE} --output-directory ./${CARA_INSTANCE}/expected
- python ./config-normalise.py ./${CARA_INSTANCE}/actual ./${CARA_INSTANCE}/actual-normed
- python ./config-normalise.py ./${CARA_INSTANCE}/expected ./${CARA_INSTANCE}/expected-normed
- diff -u ./test-cara/actual-normed/ ./${CARA_INSTANCE}/expected-normed/
- diff -u ./${CARA_INSTANCE}/actual-normed/ ./${CARA_INSTANCE}/expected-normed/
artifacts:
paths:
@ -59,6 +59,15 @@ check_openshift_config_test-cara:
OC_TOKEN: "${OPENSHIFT_CONFIG_CHECKER_TOKEN_TEST_CARA}"
check_openshift_config_prod:
extends: .test_openshift_config
variables:
CARA_INSTANCE: 'cara'
BRANCH: 'master'
OC_SERVER: openshift.cern.ch
OC_TOKEN: "${OPENSHIFT_CONFIG_CHECKER_TOKEN_PROD}"
# A development installation of CARA tested with pytest.
test_dev-39:
variables:

View file

@ -2,7 +2,7 @@
CARA is a risk assessment tool developed to model the concentration of viruses in enclosed spaces, in order to inform space-management decisions.
CARA models the concentration profile of potential infectious viruses in enclosed spaces with clear and intuitive graphs.
CARA models the concentration profile of potential virions in enclosed spaces with clear and intuitive graphs.
The user can set a number of parameters, including room volume, exposure time, activity type, mask-wearing and ventilation.
The report generated indicates how to avoid exceeding critical concentrations and chains of airborne transmission in spaces such as individual offices, meeting rooms and labs.
@ -26,7 +26,7 @@ Each event modelled is unique, and the results generated therein are only as acc
## Authors
CARA was developed by following members of CERN - European Council for Nuclear Research (visit https://home.cern/):
Andre Henriques<sup>1</sup>, Marco Andreini<sup>1</sup>, Gabriella Azzopardi<sup>2</sup>, James Devine<sup>3</sup>, Philip Elson<sup>4</sup>, Nicolas Mounet<sup>2</sup>, Markus Kongstein Rognlien<sup>2,6</sup>, Nicola Tarocco<sup>5</sup>
Andre Henriques<sup>1</sup>, Luis Aleixo<sup>1</sup>, Marco Andreini<sup>1</sup>, Gabriella Azzopardi<sup>2</sup>, James Devine<sup>3</sup>, Philip Elson<sup>4</sup>, Nicolas Mounet<sup>2</sup>, Markus Kongstein Rognlien<sup>2,6</sup>, Nicola Tarocco<sup>5</sup>
<sup>1</sup>HSE Unit, Occupational Health & Safety Group, CERN<br>
<sup>2</sup>Beams Department, Accelerators and Beam Physics Group, CERN<br>
@ -148,7 +148,7 @@ export CLIENT_SECRET
Run docker-compose:
```
cd app-config
docker-compose up
CURRENT_UID=$(id -u):$(id -g) docker-compose up
```
Then visit http://localhost:8080/.

View file

@ -23,9 +23,14 @@ FROM debian
COPY --from=conda /opt/app /opt/app
ENV PATH=/opt/app/bin/:$PATH
# Make a convenient location to the installed CARA package (i.e. a directory called cara in the CWD).
# It is important that this directory is also writable by a non-root user.
RUN mkdir -p /scratch \
&& chmod a+wx /scratch
# Set the HOME directory to something that anybody can write to (to support non root users, such as on openshift).
ENV HOME=/scratch
WORKDIR /scratch
RUN CARA_INIT_FILE=$(/opt/app/bin/python -c "import cara; print(cara.__file__)") \
&& ln -s $(dirname $(dirname ${CARA_INIT_FILE})) /opt/site-packages \
&& ln -s /opt/site-packages/cara ./cara
&& ln -s $(dirname ${CARA_INIT_FILE}) /scratch/cara
CMD [ \
"cara-app.sh" \
]

View file

@ -4,6 +4,7 @@ services:
image: cara-webservice
environment:
- APP_NAME=cara-voila
user: ${CURRENT_UID:?"Please run as follows 'CURRENT_UID=$(id -u):$(id -g) docker-compose up'"}
cara-webservice:
image: cara-webservice
@ -12,6 +13,7 @@ services:
- APP_NAME=cara-webservice
- CARA_CALCULATOR_PREFIX=/calculator-cern
- CARA_THEME=cara/apps/calculator/themes/cern
user: ${CURRENT_UID}
cara-calculator-open:
image: cara-webservice
@ -19,6 +21,7 @@ services:
- COOKIE_SECRET
- APP_NAME=cara-webservice
- CARA_CALCULATOR_PREFIX=/calculator-open
user: ${CURRENT_UID}
auth-service:
image: auth-service
@ -28,6 +31,7 @@ services:
- OIDC_REALM
- CLIENT_ID
- CLIENT_SECRET
user: ${CURRENT_UID}
cara-router:
image: cara-nginx-app
@ -35,10 +39,11 @@ services:
- "8080:8080"
depends_on:
cara-webservice:
condition: service_started
condition: service_started
cara-calculator-open:
condition: service_started
condition: service_started
cara-app:
condition: service_started
condition: service_started
auth-service:
condition: service_started
condition: service_started
user: ${CURRENT_UID}

View file

@ -41,6 +41,9 @@
namespace: openshift
type: Source
triggers:
- type: ImageChange
imageChange: {}
- type: ConfigChange
- generic:
secretReference:
name: gitlab-cara-webhook-secret

View file

@ -73,6 +73,7 @@
kind: DeploymentConfig
metadata:
name: cara-app
labels: {app: cara-app}
spec:
replicas: 1
template:
@ -81,16 +82,18 @@
app: cara-app
spec:
containers:
- name: cara-app
- name: cara-webservice
env:
- name: APP_NAME
value: cara-voila
image: '${PROJECT_NAME}/cara-app'
image: '${PROJECT_NAME}/cara-webservice'
ports:
- containerPort: 8080
protocol: TCP
imagePullPolicy: Always
resources: {}
resources:
limits: { cpu: '1', memory: 1Gi }
requests: { cpu: 1m, memory: 512Mi }
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
dnsPolicy: ClusterFirst
@ -117,10 +120,10 @@
imageChangeParams:
automatic: true
containerNames:
- cara-app
- cara-webservice
from:
kind: ImageStreamTag
name: 'cara-app:latest'
name: 'cara-webservice:latest'
namespace: ${PROJECT_NAME}
-
apiVersion: v1
@ -165,7 +168,6 @@
selector:
app: cara-router
triggers:
- type: ConfigChange
- type: ImageChange
imageChangeParams:
automatic: true
@ -181,6 +183,9 @@
kind: DeploymentConfig
metadata:
name: cara-webservice
labels:
image: cara-webservice
app: cara-webservice
spec:
replicas: 1
template:
@ -196,6 +201,8 @@
secretKeyRef:
key: COOKIE_SECRET
name: auth-service-secrets
- name: REPORT_PARALLELISM
value: '3'
- name: APP_NAME
value: cara-webservice
- name: CARA_CALCULATOR_PREFIX
@ -260,6 +267,9 @@
kind: DeploymentConfig
metadata:
name: cara-calculator-open
labels:
image: cara-webservice
app: cara-calculator-open
spec:
replicas: 1
template:

View file

@ -550,7 +550,6 @@ class FormData:
if current_time < finish:
LOG.debug("trailing interval")
present_intervals.append((current_time / 60, finish / 60))
return models.SpecificInterval(tuple(present_intervals))
def infected_present_interval(self) -> models.Interval:

View file

@ -34,8 +34,10 @@ def calculate_report_data(model: models.ExposureModel):
t_start, t_end = model_start_end(model)
times = np.linspace(t_start, t_end, resolution)
concentrations = [np.array(model.concentration_model.concentration(time)).mean()
for time in times]
concentrations = [
np.array(model.concentration_model.concentration(float(time))).mean()
for time in times
]
highest_const = max(concentrations)
prob = np.array(model.infection_probability()).mean()
er = np.array(model.concentration_model.infected.emission_rate_when_present()).mean()
@ -44,13 +46,13 @@ def calculate_report_data(model: models.ExposureModel):
return {
"times": list(times),
"exposed_presence_intervals": [list(interval) for interval in model.exposed.presence.boundaries()],
"concentrations": concentrations,
"highest_const": highest_const,
"prob_inf": prob,
"emission_rate": er,
"exposed_occupants": exposed_occupants,
"expected_new_cases": expected_new_cases,
"scenario_plot_src": img2base64(_figure2bytes(plot(times, concentrations, model))),
}
@ -115,8 +117,8 @@ def plot(times, concentrations, model: models.ExposureModel):
ax.spines['top'].set_visible(False)
ax.set_xlabel('Time of day')
ax.set_ylabel('Mean concentration ($q/m^3$)')
ax.set_title('Mean concentration of infectious quanta')
ax.set_ylabel('Mean concentration ($virions/m^{3}$)')
ax.set_title('Mean concentration of virions')
ax.xaxis.set_major_formatter(matplotlib.dates.DateFormatter("%H:%M"))
# Plot presence of exposed person
@ -133,7 +135,7 @@ def plot(times, concentrations, model: models.ExposureModel):
ax.set_ylim(0)
return fig
def minutes_to_time(minutes: int) -> str:
minute_string = str(minutes % 60)
@ -236,8 +238,8 @@ def comparison_plot(scenarios: typing.Dict[str, dict], sample_times: np.ndarray)
ax.xaxis.set_major_formatter(matplotlib.dates.DateFormatter("%H:%M"))
ax.set_xlabel('Time of day')
ax.set_ylabel('Mean concentration ($q/m^3$)')
ax.set_title('Mean concentration of infectious quanta')
ax.set_ylabel('Mean concentration ($virions/m^{3}$)')
ax.set_title('Mean concentration of virions')
return fig

View file

@ -0,0 +1,184 @@
/* Generate the concentration plot using d3 library. */
function draw_concentration_plot(svg_id, times, concentrations, exposed_presence_intervals) {
var visBoundingBox = d3.select(svg_id)
.node()
.getBoundingClientRect();
var time_format = d3.timeFormat('%H:%M');
var data = []
times.map((time, index) => data.push({ 'time': time, 'hour': new Date().setHours(Math.trunc(time), (time - Math.trunc(time)) * 60), 'concentration': concentrations[index] }))
var vis = d3.select(svg_id),
width = visBoundingBox.width - 300,
height = visBoundingBox.height,
margins = { top: 30, right: 20, bottom: 50, left: 50 },
// H:M time format for x axis.
xRange = d3.scaleTime().range([margins.left, width - margins.right]).domain([data[0].hour, data[data.length - 1].hour]),
xTimeRange = d3.scaleLinear().range([margins.left, width - margins.right]).domain([data[0].time, data[data.length - 1].time]),
bisecHour = d3.bisector((d) => { return d.hour; }).left,
yRange = d3.scaleLinear().range([height - margins.bottom, margins.top]).domain([0., Math.max(...concentrations)]),
xAxis = d3.axisBottom(xRange).tickFormat(d => time_format(d)),
yAxis = d3.axisLeft(yRange);
// Plot tittle.
vis.append('svg:foreignObject')
.attr("background-color", "transparent")
.attr('width', width)
.attr('height', margins.top)
.style('text-align', 'center')
.html('<b>Mean concentration of virions</b>');
// Line representing the mean concentration.
var lineFunc = d3.line()
.defined(d => !isNaN(d.concentration))
.x(d => xTimeRange(d.time))
.y(d => yRange(d.concentration))
.curve(d3.curveBasis);
vis.append('svg:path')
.attr('d', lineFunc(data))
.attr('stroke', '#1f77b4')
.attr('stroke-width', 2)
.attr('fill', 'none');
// X axis declaration.
vis.append('svg:g')
.attr('class', 'x axis')
.attr('transform', 'translate(0,' + (height - margins.bottom) + ')')
.call(xAxis);
// X axis label.
vis.append('text')
.attr('class', 'x label')
.attr('fill', 'black')
.attr('text-anchor', 'middle')
.attr('x', (width + margins.right) / 2)
.attr('y', height * 0.97)
.text('Time of day')
// Y axis declaration.
vis.append('svg:g')
.attr('class', 'y axis')
.attr('transform', 'translate(' + margins.left + ',0)')
.call(yAxis);
// Y axis label.
vis.append('svg:text')
.attr('class', 'y label')
.attr('fill', 'black')
.attr('transform', 'rotate(-90, 0,' + height + ')')
.attr('text-anchor', 'middle')
.attr('x', (height + margins.bottom) / 2)
.attr('y', (height + margins.left) * 0.92)
.text('Mean concentration (virions/m³)');
// Area representing the presence of exposed person(s).
exposed_presence_intervals.forEach(b => {
var curveFunc = d3.area()
.x(d => xTimeRange(d.time))
.y0(height - margins.bottom)
.y1(d => yRange(d.concentration));
vis.append('svg:path')
.attr('d', curveFunc(data.filter(d => {
return d.time >= b[0] && d.time <= b[1]
})))
.attr('fill', '#1f77b4')
.attr('fill-opacity', '0.1');
})
// Legend for the plot elements - line and area.
var size = 20
vis.append('rect')
.attr('x', width + size)
.attr('y', margins.top + size)
.attr('width', 20)
.attr('height', 3)
.style('fill', '#1f77b4');
vis.append('rect')
.attr('x', width + size)
.attr('y', 3 * size)
.attr('width', 20)
.attr('height', 20)
.attr('fill', '#1f77b4')
.attr('fill-opacity', '0.1');
vis.append('text')
.attr('x', width + 3 * size)
.attr('y', margins.top + size)
.text('Mean concentration')
.style('font-size', '15px')
.attr('alignment-baseline', 'central');
vis.append('text')
.attr('x', width + 3 * size)
.attr('y', margins.top + 2 * size)
.text('Presence of exposed person(s)')
.style('font-size', '15px')
.attr('alignment-baseline', 'central');
// Legend bounding box.
vis.append('rect')
.attr('width', 275)
.attr('height', 50)
.attr('x', width * 1.005)
.attr('y', margins.top + 5)
.attr('stroke', 'lightgrey')
.attr('stroke-width', '2')
.attr('rx', '5px')
.attr('ry', '5px')
.attr('stroke-linejoin', 'round')
.attr('fill', 'none');
// Tooltip.
var focus = vis.append('svg:g')
.style('display', 'none');
focus.append('circle')
.attr('r', 3);
focus.append('rect')
.attr('fill', 'white')
.attr('stroke', '#000')
.attr('width', 80)
.attr('height', 50)
.attr('x', 10)
.attr('y', -22)
.attr('rx', 4)
.attr('ry', 4);
focus.append('text')
.attr('id', 'tooltip-time')
.attr('x', 18)
.attr('y', -2);
focus.append('text')
.attr('id', 'tooltip-concentration')
.attr('x', 18)
.attr('y', 18);
vis.append('rect')
.attr('fill', 'none')
.attr('pointer-events', 'all')
.attr('width', width - margins.right)
.attr('height', height)
.on('mouseover', () => { focus.style('display', null); })
.on('mouseout', () => { focus.style('display', 'none'); })
.on('mousemove', mousemove);
function mousemove() {
var x0 = xRange.invert(d3.pointer(event, this)[0]),
i = bisecHour(data, x0, 1),
d0 = data[i - 1],
d1 = data[i],
d = x0 - d0.hour > d1.hour - x0 ? d1 : d0;
focus.attr('transform', 'translate(' + xRange(d.hour) + ',' + yRange(d.concentration) + ')');
focus.select('#tooltip-time').text('x = ' + time_format(d.hour));
focus.select('#tooltip-concentration').text('y = ' + d.concentration.toFixed(2));
}
}

View file

@ -8,7 +8,9 @@
<link rel="stylesheet" type="text/css" href="{{ calculator_prefix }}/static/css/report.css">
<link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">
<script src="{{ calculator_prefix }}/static/js/pdf.js"></script>
<script src="https://d3js.org/d3.v7.min.js"></script>
<script src="{{ calculator_prefix }}/static/js/pdf.js"></script>
<script src="{{ calculator_prefix }}/static/js/report.js" type="application/javascript"></script>
</head>
@ -82,9 +84,15 @@
{% block report_summary_footnote %}
{% endblock report_summary_footnote %}
</div>
<img id="scenario_concentration_plot" src="{{ scenario_plot_src }}" />
<p id="section1">[*] The results are based on the parameters and assumptions published in the CERN Open Report <a href="https://cds.cern.ch/record/2756083"> CERN-OPEN-2021-004</a></p>
</p>
<svg id="result_plot" width="900" height="400"></svg>
<script type="application/javascript">
var times = {{times}}
var concentrations = {{concentrations}}
var exposed_presence_intervals = {{exposed_presence_intervals}}
draw_concentration_plot("#result_plot", times, concentrations, exposed_presence_intervals);
</script>
</p>
</div>
</div>
</div>

View file

@ -380,7 +380,7 @@ v{{ calculator_version }} <span style="float:right; font-weight:bold">Please sen
CARA is a risk assessment tool developed to model the concentration of viruses in enclosed spaces, in order to inform space-management decisions.
</p>
<p>
CARA models the concentration profile of potential infectious viruses in enclosed spaces with clear and intuitive graphs.
CARA models the concentration profile of virions in enclosed spaces with clear and intuitive graphs.
The user can set a number of parameters, including room volume, exposure time, activity type, mask-wearing and ventilation.
The report generated indicates how to avoid exceeding critical concentrations and chains of airborne transmission in spaces such as individual offices, meeting rooms and labs.
</p>

View file

@ -15,7 +15,7 @@ If you are using the expert version of the tool, you should look at the expert
CARA is a risk assessment tool developed to model the concentration of viruses in enclosed spaces, in order to inform space-management decisions.
</p>
<p>
CARA models the concentration profile of potential infectious viruses in enclosed spaces with clear and intuitive graphs.
CARA models the concentration profile of potential virions in enclosed spaces with clear and intuitive graphs.
The user can set a number of parameters, including room volume, exposure time, activity type, mask-wearing and ventilation.
The report generated indicates how to avoid exceeding critical concentrations and chains of airborne transmission in spaces such as individual offices, meeting rooms and labs.
</p>
@ -183,15 +183,15 @@ It is estimated based on the emission rate of virus into the simulated volume, a
This probability is valid for the simulation duration - i.e. the start and end time.
If you are using the natural ventilation option, the simulation is only valid for the selected month, because the following or preceding month will have a different average temperature profile.
The <code>expected number of new cases</code> for the simulation is calculated based on the probability of infection, multiplied by the number of exposed occupants.</p>
<p>The graph shows the variation in the concentration of infectious viruses within the simulated volume.
<p>The graph shows the variation in the concentration of virions within the simulated volume.
It is determined by:</p>
<ul>
<li>The presence of the infected person, who emits airborne viruses in the volume.</li>
<li>The emission rate is related to the type of activity of the infected person (sitting, light exercise), their level of vocalisation (breathing, talking or shouting).</li>
<li>The accumulation of infectious quanta in the volume, which is driven, among other factors, by ventilation (if applicable).<ul>
<li>The accumulation of virions in the volume, which is driven, among other factors, by ventilation (if applicable).<ul>
<li>In a mechanical ventilation scenario, the removal rate is constant, based on fresh airflow supply in and out of the simulated space.</li>
<li>Under natural ventilation conditions, the effectiveness of ventilation relies upon the hourly temperature difference between the inside and outside air temperature.</li>
<li>A HEPA filter removes infectious virus from the air at a constant rate and is modelled in the same way as mechanical ventilation, however air passed through a HEPA filter is recycled (i.e. it is not fresh air).</li>
<li>A HEPA filter removes virions from the air at a constant rate and is modelled in the same way as mechanical ventilation, however air passed through a HEPA filter is recycled (i.e. it is not fresh air).</li>
</ul>
</li>
</ul>

View file

@ -140,8 +140,8 @@ class ExposureModelResult(View):
ax.spines['top'].set_visible(False)
ax.set_xlabel('Time (hours)')
ax.set_ylabel('Concentration ($q/m^3$)')
ax.set_title('Concentration of infectious quanta')
ax.set_ylabel('Concentration ($virions/m^{3}$)')
ax.set_title('Concentration of virions')
else:
self.ax.ignore_existing_data_limits = True
self.line.set_data(ts, concentration)
@ -154,7 +154,7 @@ class ExposureModelResult(View):
def update_textual_result(self, model: models.ExposureModel):
lines = []
P = model.infection_probability()
lines.append(f'Emission rate (quanta/hr): {np.round(model.concentration_model.infected.emission_rate_when_present(),0)}')
lines.append(f'Emission rate (virus/hr): {np.round(model.concentration_model.infected.emission_rate_when_present(),0)}')
lines.append(f'Probability of infection: {np.round(P, 0)}%')
lines.append(f'Number of exposed: {model.exposed.number}')
@ -185,8 +185,8 @@ class ExposureComparissonResult(View):
ax.spines['right'].set_visible(False)
ax.spines['top'].set_visible(False)
ax.set_xlabel('Time (hours)')
ax.set_ylabel('Concentration ($q/m^3$)')
ax.set_title('Concentration of infectious quanta')
ax.set_ylabel('Concentration ($virions/m^{3}$)')
ax.set_title('Concentration of virions')
return ax
def scenarios_updated(self, scenarios: typing.Sequence[ScenarioType], _):
@ -488,14 +488,14 @@ baseline_model = models.ExposureModel(
room=models.Room(volume=75),
ventilation=models.SlidingWindow(
active=models.PeriodicInterval(period=120, duration=15),
inside_temp=models.PiecewiseConstant((0,24),(293.15,)),
outside_temp=models.PiecewiseConstant((0,24),(283.15,)),
inside_temp=models.PiecewiseConstant((0., 24.), (293.15,)),
outside_temp=models.PiecewiseConstant((0., 24.), (283.15,)),
window_height=1.6, opening_length=0.6,
),
infected=models.InfectedPopulation(
number=1,
virus=models.Virus.types['SARS_CoV_2'],
presence=models.SpecificInterval(((8, 12), (13, 17))),
presence=models.SpecificInterval(((8., 12.), (13., 17.))),
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Seated'],
expiration=models.Expiration.types['Talking'],
@ -503,7 +503,7 @@ baseline_model = models.ExposureModel(
),
exposed=models.Population(
number=10,
presence=models.SpecificInterval(((8, 12), (13, 17))),
presence=models.SpecificInterval(((8., 12.), (13., 17.))),
activity=models.Activity.types['Seated'],
mask=models.Mask.types['No mask'],
),

View file

@ -24,7 +24,7 @@ The model used is based on scientific publications relating to airborne transmis
The tool helps assess the potential dose of infectious airborne viruses in indoor gatherings, with people seated, standing, moving around, while breathing, speaking or shouting/singing. The model is based on the Wells-Riley model of aerosol disease transmission, which assumes a fixed value for the average infectious dose. The dose-response models for respiratory diseases is more accurate, although since this parameter for SARS-CoV-2 is not known so far, the Wells-Riley method is recommended in the health science community (see <a href="#references_block">References</a>).
The methodology of the model is divided into three parts:
<ol>
<li>Estimating the emission rate of infectious viruses.</li>
<li>Estimating the emission rate of virions.</li>
<li>Modeling the concentration evolution of viruses within a given volume and consequent inhalation dose during the exposure time.</li>
<li>Estimating the probability of a COVID-19 infection, the expected number of new cases arising from the transmission event and the basic reproduction rate (R0).</li>
</ol>

View file

@ -43,7 +43,7 @@
<h2>Authors</h2>
<div class="text-component-text cern_full_html" >
<p>
<h4>Andre Henriques<sup>1</sup>, Marco Andreini<sup>1</sup>, Gabriella Azzopardi<sup>2</sup>, James Devine<sup>3</sup>, Philip Elson<sup>4</sup>, Nicolas Mounet<sup>2</sup>, Markus Kongstein Rognlien<sup>2,6</sup>, Nicola Tarocco<sup>5</sup></h4><br>
<h4>Andre Henriques<sup>1</sup>, Luis Aleixo<sup>1</sup>, Marco Andreini<sup>1</sup>, Gabriella Azzopardi<sup>2</sup>, James Devine<sup>3</sup>, Philip Elson<sup>4</sup>, Nicolas Mounet<sup>2</sup>, Markus Kongstein Rognlien<sup>2,6</sup>, Nicola Tarocco<sup>5</sup></h4><br>
<sup>1</sup>HSE Unit, Occupational Health & Safety Group, CERN<br>
<sup>2</sup>Beams Department, Accelerators and Beam Physics Group, CERN<br>

View file

@ -30,15 +30,20 @@ Geneva_hourly_temperatures_celsius_per_hour = {
}
# Geneva hourly temperatures as piecewise constant function (in Kelvin)
# Geneva hourly temperatures as piecewise constant function (in Kelvin).
GenevaTemperatures_hourly = {
month: models.PiecewiseConstant(tuple(np.arange(25.)),
tuple(273.15+np.array(temperatures)))
for month,temperatures in Geneva_hourly_temperatures_celsius_per_hour.items()
}
# same temperatures on a finer temperature mesh
GenevaTemperatures = {
month: GenevaTemperatures_hourly[month].refine(refine_factor=4)
for month,temperatures in Geneva_hourly_temperatures_celsius_per_hour.items()
month: models.PiecewiseConstant(
# NOTE: It is important that the time type is float, not np.float, in
# order to allow hashability (for caching).
tuple(float(time) for time in range(25)),
tuple(273.15 + np.array(temperatures)),
)
for month, temperatures in Geneva_hourly_temperatures_celsius_per_hour.items()
}
# Same temperatures on a finer temperature mesh (every 6 minutes).
GenevaTemperatures = {
month: GenevaTemperatures_hourly[month].refine(refine_factor=10)
for month, temperatures in Geneva_hourly_temperatures_celsius_per_hour.items()
}

View file

@ -46,6 +46,8 @@ else:
# by providing a no-op cache decorator when type-checking.
cached = lambda *cached_args, **cached_kwargs: lambda function: function # noqa
from .utils import method_cache
from .dataclass_utils import nested_replace
@ -62,7 +64,7 @@ class Room:
volume: _VectorisedFloat
#: The humidity in the room (from 0 to 1 - e.g. 0.5 is 50% humidity)
humidity: _VectorisedFloat=0.5
humidity: _VectorisedFloat = 0.5
Time_t = typing.TypeVar('Time_t', float, int)
@ -127,7 +129,9 @@ class PeriodicInterval(Interval):
return tuple()
result = []
for i in np.arange(0, 24, self.period / 60):
result.append((i, i+self.duration/60))
# NOTE: It is important that the time type is float, not np.float, in
# order to allow hashability (for caching).
result.append((float(i), float(i+self.duration/60)))
return tuple(result)
@ -183,7 +187,9 @@ class PiecewiseConstant:
np.concatenate([self.values, self.values[-1:]], axis=0),
axis=0)
return PiecewiseConstant(
tuple(refined_times),
# NOTE: It is important that the time type is float, not np.float, in
# order to allow hashability (for caching).
tuple(float(time) for time in refined_times),
tuple(interpolator(refined_times)[:-1]),
)
@ -420,8 +426,8 @@ class Virus:
#: RNA copies / mL
viral_load_in_sputum: _VectorisedFloat
#: RNA-copies per quantum
quantum_infectious_dose: _VectorisedFloat
#: Dose to initiate infection, in RNA copies
infectious_dose: _VectorisedFloat
#: Pre-populated examples of Viruses.
types: typing.ClassVar[typing.Dict[str, "Virus"]]
@ -458,20 +464,20 @@ Virus.types = {
# It is somewhere between 1000 or 10 SARS-CoV viruses,
# as per https://www.dhs.gov/publication/st-master-question-list-covid-19
# 50 comes from Buonanno et al.
quantum_infectious_dose=50.,
infectious_dose=50.,
),
'SARS_CoV_2_B117': SARSCoV2(
# also called VOC-202012/01
viral_load_in_sputum=1e9,
quantum_infectious_dose=30.,
infectious_dose=30.,
),
'SARS_CoV_2_P1': SARSCoV2(
viral_load_in_sputum=1e9,
quantum_infectious_dose=1/0.045,
infectious_dose=1/0.045,
),
'SARS_CoV_2_B16172': SARSCoV2(
viral_load_in_sputum=1e9,
quantum_infectious_dose=30/1.6,
infectious_dose=30/1.6,
),
}
@ -670,6 +676,7 @@ class InfectedPopulation(Population):
#: The type of expiration that is being emitted whilst doing the activity.
expiration: _ExpirationBase
@method_cache
def emission_rate_when_present(self) -> _VectorisedFloat:
"""
The emission rate if the infected population is present.
@ -677,7 +684,7 @@ class InfectedPopulation(Population):
Note that the rate is not currently time-dependent.
"""
# Emission Rate (infectious quantum / h)
# Emission Rate (virions / h)
# Note on units: exhalation rate is in m^3/h, aerosols in mL/cm^3
# and viral load in virus/mL -> 1e6 conversion factor
aerosols = self.expiration.aerosols(self.mask)
@ -685,15 +692,14 @@ class InfectedPopulation(Population):
ER = (self.virus.viral_load_in_sputum *
self.activity.exhalation_rate *
10 ** 6 *
aerosols /
self.virus.quantum_infectious_dose)
aerosols)
# For superspreading event, where ejection_factor is infinite we fix the ER
# based on Miller et al. (2020).
if isinstance(aerosols, np.ndarray):
ER[np.isinf(aerosols)] = 970
ER[np.isinf(aerosols)] = 970 * self.virus.infectious_dose
elif np.isinf(aerosols):
ER = 970
ER = 970 * self.virus.infectious_dose
return ER
@ -741,9 +747,12 @@ class ConcentrationModel:
# Deposition rate (h^-1)
k = (vg * 3600) / h
return k + self.virus.decay_constant(self.room.humidity
) + self.ventilation.air_exchange(self.room, time)
return (
k + self.virus.decay_constant(self.room.humidity)
+ self.ventilation.air_exchange(self.room, time)
)
@method_cache
def _concentration_limit(self, time: float) -> _VectorisedFloat:
"""
Provides a constant that represents the theoretical asymptotic
@ -755,29 +764,36 @@ class ConcentrationModel:
return (self.infected.emission_rate(time)) / (IVRR * V)
def state_change_times(self):
@method_cache
def state_change_times(self) -> typing.List[float]:
"""
All time dependent entities on this model must provide information about
the times at which their state changes.
"""
state_change_times = set()
state_change_times = {0.}
state_change_times.update(self.infected.presence.transition_times())
state_change_times.update(self.ventilation.transition_times())
return sorted(state_change_times)
def last_state_change(self, time: float):
def last_state_change(self, time: float) -> float:
"""
Find the most recent state change.
Find the most recent/previous state change.
Find the nearest time less than the given one. If there is a state
change exactly at ``time`` the previous state change is returned
(except at ``time == 0``).
"""
for change_time in self.state_change_times()[::-1]:
if change_time < time:
return change_time
return 0
times = self.state_change_times()
t_index: int = np.searchsorted(times, time) # type: ignore
# Search sorted gives us the index to insert the given time. Instead we
# want to get the index of the most recent time, so reduce the index by
# one unless we are already at 0.
t_index = max([t_index - 1, 0])
return times[t_index]
def _next_state_change(self, time: float):
def _next_state_change(self, time: float) -> float:
"""
Find the nearest future state change.
@ -790,21 +806,16 @@ class ConcentrationModel:
f"state change time ({change_time})"
)
def _is_interval_between_state_changes(self, start: float, stop: float) -> bool:
"""
Check that the times start and stop are in-between two state
changes of the concentration model (to ensure sure that all
model parameters stay constant between start and stop).
"""
return (self.last_state_change(stop) <= start)
@cached()
def _concentration_at_state_change(self, time: float) -> _VectorisedFloat:
@method_cache
def _concentration_cached(self, time: float) -> _VectorisedFloat:
# A cached version of the concentration method. Use this method if you
# expect that there may be multiple concentration calculations for the
# same time (e.g. at state change times).
return self.concentration(time)
def concentration(self, time: float) -> _VectorisedFloat:
"""
Virus quanta concentration, as a function of time.
Virus exposure concentration, as a function of time.
The formulas used here assume that all parameters (ventilation,
emission rate) are constant between two state changes - only
the value of these parameters at the next state change, are used.
@ -812,7 +823,6 @@ class ConcentrationModel:
Note that time is not vectorised. You can only pass a single float
to this method.
"""
if time == 0:
return 0.0
next_state_change_time = self._next_state_change(time)
@ -820,12 +830,13 @@ class ConcentrationModel:
concentration_limit = self._concentration_limit(next_state_change_time)
t_last_state_change = self.last_state_change(time)
concentration_at_last_state_change = self._concentration_at_state_change(t_last_state_change)
concentration_at_last_state_change = self._concentration_cached(t_last_state_change)
delta_time = time - t_last_state_change
fac = np.exp(-IVRR * delta_time)
return concentration_limit * (1 - fac) + concentration_at_last_state_change * fac
@method_cache
def integrated_concentration(self, start: float, stop: float) -> _VectorisedFloat:
"""
Get the integrated concentration dose between the times start and stop.
@ -840,7 +851,7 @@ class ConcentrationModel:
start = max([interval_start, req_start])
stop = min([interval_stop, req_stop])
conc_start = self.concentration(start)
conc_start = self._concentration_cached(start)
next_conc_state = self._next_state_change(stop)
conc_limit = self._concentration_limit(next_conc_state)
@ -867,8 +878,8 @@ class ExposureModel:
#: The fraction of viruses actually deposited in the respiratory tract
fraction_deposited: _VectorisedFloat = 0.6
def quanta_exposure(self) -> _VectorisedFloat:
"""The number of virus quanta per meter^3."""
def exposure(self) -> _VectorisedFloat:
"""The number of virus per meter^3."""
exposure = 0.0
for start, stop in self.exposed.presence.boundaries():
@ -877,7 +888,7 @@ class ExposureModel:
return exposure * self.repeats
def infection_probability(self) -> _VectorisedFloat:
exposure = self.quanta_exposure()
exposure = self.exposure()
inf_aero = (
self.exposed.activity.inhalation_rate *
@ -886,7 +897,7 @@ class ExposureModel:
)
# Probability of infection.
return (1 - np.exp(-inf_aero)) * 100
return (1 - np.exp(-(inf_aero/self.concentration_model.virus.infectious_dose))) * 100
def expected_new_cases(self) -> _VectorisedFloat:
prob = self.infection_probability()

View file

@ -43,18 +43,18 @@ symptomatic_vl_frequencies = LogCustomKernel(
virus_distributions = {
'SARS_CoV_2': mc.SARSCoV2(
viral_load_in_sputum=symptomatic_vl_frequencies,
quantum_infectious_dose=100,
infectious_dose=100,
),
'SARS_CoV_2_B117': mc.SARSCoV2(
viral_load_in_sputum=symptomatic_vl_frequencies,
quantum_infectious_dose=60,
infectious_dose=60,
),
'SARS_CoV_2_P1': mc.SARSCoV2(
viral_load_in_sputum=symptomatic_vl_frequencies,
quantum_infectious_dose=100/2.25,
infectious_dose=100/2.25,
),
'SARS_CoV_2_B16172': mc.SARSCoV2(
viral_load_in_sputum=symptomatic_vl_frequencies,
quantum_infectious_dose=60/1.6,
infectious_dose=60/1.6,
),
}

View file

@ -8,13 +8,13 @@ def baseline_model():
model = models.ConcentrationModel(
room=models.Room(volume=75),
ventilation=models.AirChange(
active=models.SpecificInterval(((0,24),)),
active=models.SpecificInterval(((0., 24.), )),
air_exch=30.,
),
infected=models.InfectedPopulation(
number=1,
virus=models.Virus.types['SARS_CoV_2'],
presence=models.SpecificInterval(((0, 4), (5, 8))),
presence=models.SpecificInterval(((0., 4.), (5., 8.))),
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Light activity'],
expiration=models.Expiration.types['Superspreading event'],

View file

@ -13,7 +13,6 @@ from cara import models
{'humidity': np.array([0.5, 0.4])},
{'air_change': np.array([100, 120])},
{'viral_load_in_sputum': np.array([5e8, 1e9])},
{'quantum_infectious_dose': np.array([50, 20])},
]
)
def test_concentration_model_vectorisation(override_params):
@ -21,8 +20,7 @@ def test_concentration_model_vectorisation(override_params):
'volume': 75,
'humidity': 0.5,
'air_change': 100,
'viral_load_in_sputum': 1e9,
'quantum_infectious_dose': 50,
'viral_load_in_sputum': 1e9
}
defaults.update(override_params)
@ -43,7 +41,7 @@ def test_concentration_model_vectorisation(override_params):
),
virus=models.SARSCoV2(
viral_load_in_sputum=defaults['viral_load_in_sputum'],
quantum_infectious_dose=defaults['quantum_infectious_dose'],
infectious_dose=50.,
),
expiration=models.Expiration((1., 0., 0.)),
)
@ -55,7 +53,7 @@ def test_concentration_model_vectorisation(override_params):
@pytest.fixture
def simple_conc_model():
interesting_times = models.SpecificInterval(([0, 1], [1.1, 1.999], [2, 3]), )
interesting_times = models.SpecificInterval(([0.5, 1.], [1.1, 2], [2., 3.]), )
return models.ConcentrationModel(
models.Room(75),
models.AirChange(interesting_times, 100),
@ -70,14 +68,38 @@ def simple_conc_model():
)
@pytest.mark.parametrize(
"time, expected_last_state_change", [
[-15., 0.], # Out of range goes to the first state.
[0., 0.],
[0.5, 0.0],
[0.51, 0.5],
[1., 0.5],
[1.05, 1.],
[1.1, 1.],
[1.11, 1.1],
[2., 1.1],
[2.1, 2],
[3., 2],
[15., 3.], # Out of range goes to the last state.
]
)
def test_last_state_change_time(
simple_conc_model: models.ConcentrationModel,
time,
expected_last_state_change,
):
assert simple_conc_model.last_state_change(float(time)) == expected_last_state_change
@pytest.mark.parametrize(
"time, expected_next_state_change", [
[0, 0],
[0.0, 0.0],
[0.5, 0.5],
[1, 1],
[1.05, 1.1],
[1.1, 1.1],
[1.11, 1.999],
[1.9991, 2],
[1.11, 2],
[2, 2],
[2.1, 3],
[3, 3],
@ -88,35 +110,17 @@ def test_next_state_change_time(
time,
expected_next_state_change,
):
assert simple_conc_model._next_state_change(time) == expected_next_state_change
assert simple_conc_model._next_state_change(float(time)) == expected_next_state_change
def test_next_state_change_time_out_of_range(simple_conc_model: models.ConcentrationModel):
with pytest.raises(
ValueError,
match=re.escape("The requested time (3.1) is greater than last available state change time (3)")
match=re.escape("The requested time (3.1) is greater than last available state change time (3.0)")
):
simple_conc_model._next_state_change(3.1)
@pytest.mark.parametrize(
"start, stop, is_valid", [
[0, 1.05, False],
[0.99, 1.1, False],
[0.5, 1.01, False],
[0, 1, True],
[1.01, 1.1, True],
[0.01, 1, True],
[1.11, 1.99, True],
]
)
def test_valid_interval(
start, stop, is_valid,
simple_conc_model: models.ConcentrationModel
):
assert simple_conc_model._is_interval_between_state_changes(start, stop) == is_valid
def test_integrated_concentration(simple_conc_model):
c1 = simple_conc_model.integrated_concentration(0, 2)
c2 = simple_conc_model.integrated_concentration(0, 1)

View file

@ -3,35 +3,37 @@ import typing
import numpy as np
import numpy.testing
import pytest
from dataclasses import dataclass
from cara import models
from cara.models import ExposureModel
from cara.dataclass_utils import replace
@dataclass(frozen=True)
class KnownConcentrations(models.ConcentrationModel):
"""
A ConcentrationModel which is based on pre-known quanta concentrations and
A ConcentrationModel which is based on pre-known exposure concentrations and
which therefore doesn't need other components. Useful for testing.
"""
def __init__(self, concentration_function: typing.Callable) -> None:
self._func = concentration_function
concentration_function: typing.Callable
def infectious_virus_removal_rate(self, time: float) -> models._VectorisedFloat:
# very large decay constant -> same as constant concentration
return 1.e50
def _concentration_limit(self, time: float) -> models._VectorisedFloat:
return self._func(time)
return self.concentration_function(time)
def state_change_times(self):
return [0, 24]
return [0., 24.]
def _next_state_change(self, time: float):
return 24
return 24.
def concentration(self, time: float) -> models._VectorisedFloat: # noqa
return self._func(time)
return self.concentration_function(time)
halftime = models.PeriodicInterval(120, 60)
@ -49,33 +51,47 @@ populations = [
# A population with some array component for inhalation_rate.
models.Population(
10, halftime, models.Mask.types['Type I'],
models.Activity(np.array([0.51,0.57]), 0.57),
models.Activity(np.array([0.51, 0.57]), 0.57),
),
]
def known_concentrations(func):
dummy_room = models.Room(50, 0.5)
dummy_ventilation = models._VentilationBase()
dummy_infected_population = models.InfectedPopulation(
number=1,
presence=halftime,
mask=models.Mask.types['Type I'],
activity=models.Activity.types['Standing'],
virus=models.Virus.types['SARS_CoV_2_B117'],
expiration=models.Expiration.types['Talking']
)
return KnownConcentrations(dummy_room, dummy_ventilation, dummy_infected_population, func)
@pytest.mark.parametrize(
"population, cm, f_dep, expected_exposure, expected_probability",[
[populations[1], KnownConcentrations(lambda t: 1.2), 1.,
np.array([14.4, 14.4]), np.array([99.6803184113, 99.5181053773])],
"population, cm, f_dep, expected_exposure, expected_probability", [
[populations[1], known_concentrations(lambda t: 36.), 1.,
np.array([432, 432]), np.array([99.6803184113, 99.5181053773])],
[populations[2], KnownConcentrations(lambda t: 1.2), 1.,
np.array([14.4, 14.4]), np.array([97.4574432074, 98.3493482895])],
[populations[2], known_concentrations(lambda t: 36.), 1.,
np.array([432, 432]), np.array([97.4574432074, 98.3493482895])],
[populations[0], KnownConcentrations(lambda t: np.array([1.2, 2.4])), 1.,
np.array([14.4, 28.8]), np.array([98.3493482895, 99.9727534893])],
[populations[0], known_concentrations(lambda t: np.array([36., 72.])), 1.,
np.array([432, 864]), np.array([98.3493482895, 99.9727534893])],
[populations[1], KnownConcentrations(lambda t: np.array([1.2, 2.4])), 1.,
np.array([14.4, 28.8]), np.array([99.6803184113, 99.9976777757])],
[populations[1], known_concentrations(lambda t: np.array([36., 72.])), 1.,
np.array([432, 864]), np.array([99.6803184113, 99.9976777757])],
[populations[0], KnownConcentrations(lambda t: 2.4), np.array([0.5, 1.]),
28.8, np.array([98.3493482895, 99.9727534893])],
[populations[0], known_concentrations(lambda t: 72.), np.array([0.5, 1.]),
864, np.array([98.3493482895, 99.9727534893])],
])
def test_exposure_model_ndarray(population, cm, f_dep,
expected_exposure, expected_probability):
model = ExposureModel(cm, population, fraction_deposited = f_dep)
model = ExposureModel(cm, population, fraction_deposited=f_dep)
np.testing.assert_almost_equal(
model.quanta_exposure(), expected_exposure
model.exposure(), expected_exposure
)
np.testing.assert_almost_equal(
model.infection_probability(), expected_probability, decimal=10
@ -89,12 +105,13 @@ def test_exposure_model_ndarray(population, cm, f_dep,
@pytest.mark.parametrize("population", populations)
def test_exposure_model_ndarray_and_float_mix(population):
cm = KnownConcentrations(lambda t: 0 if np.floor(t) % 2 else np.array([1.2, 1.2]))
cm = known_concentrations(
lambda t: 0. if np.floor(t) % 2 else np.array([1.2, 1.2]))
model = ExposureModel(cm, population)
expected_exposure = np.array([14.4, 14.4])
np.testing.assert_almost_equal(
model.quanta_exposure(), expected_exposure
model.exposure(), expected_exposure
)
assert isinstance(model.infection_probability(), np.ndarray)
@ -103,23 +120,25 @@ def test_exposure_model_ndarray_and_float_mix(population):
@pytest.mark.parametrize("population", populations)
def test_exposure_model_compare_scalar_vector(population):
cm_scalar = KnownConcentrations(lambda t: 1.2)
cm_array = KnownConcentrations(lambda t: np.array([1.2, 1.2]))
cm_scalar = known_concentrations(lambda t: 1.2)
cm_array = known_concentrations(lambda t: np.array([1.2, 1.2]))
model_scalar = ExposureModel(cm_scalar, population)
model_array = ExposureModel(cm_array, population)
expected_exposure = 14.4
np.testing.assert_almost_equal(
model_scalar.quanta_exposure(), expected_exposure
model_scalar.exposure(), expected_exposure
)
np.testing.assert_almost_equal(
model_array.quanta_exposure(), np.array([expected_exposure]*2)
model_array.exposure(), np.array([expected_exposure]*2)
)
@pytest.fixture
def conc_model():
interesting_times = models.SpecificInterval(([0, 1], [1.01, 1.02], [12, 24]))
always = models.SpecificInterval(((0, 24),))
interesting_times = models.SpecificInterval(
([0., 1.], [1.01, 1.02], [12., 24.]),
)
always = models.SpecificInterval(((0., 24.), ))
return models.ConcentrationModel(
models.Room(25),
models.AirChange(always, 5),
@ -133,23 +152,52 @@ def conc_model():
)
)
# expected quanta were computed with a trapezoidal integration, using
# Expected exposure were computed with a trapezoidal integration, using
# a mesh of 10'000 pts per exposed presence interval.
@pytest.mark.parametrize("exposed_time_interval, expected_quanta", [
[(0, 1), 5.3334352],
[(1, 1.01), 0.061759078],
[(1.01, 1.02), 0.060016487],
[(12, 12.01), 0.0019012647],
[(12, 24), 75.513005],
[(0, 24), 81.956988],
@pytest.mark.parametrize(
["exposed_time_interval", "expected_exposure"],
[
[(0., 1.), 266.67176],
[(1., 1.01), 3.0879539],
[(1.01, 1.02), 3.00082435],
[(12., 12.01), 0.095063235],
[(12., 24.), 3775.65025],
[(0., 24.), 4097.8494],
]
)
def test_exposure_model_integral_accuracy(exposed_time_interval,
expected_quanta, conc_model):
expected_exposure, conc_model):
presence_interval = models.SpecificInterval((exposed_time_interval,))
population = models.Population(
10, presence_interval, models.Mask.types['Type I'],
models.Activity.types['Standing'],
)
model = ExposureModel(conc_model, population, fraction_deposited=1.)
np.testing.assert_allclose(model.quanta_exposure(), expected_quanta)
np.testing.assert_allclose(model.exposure(), expected_exposure)
def test_infectious_dose_vectorisation():
infected_population = models.InfectedPopulation(
number=1,
presence=halftime,
mask=models.Mask.types['Type I'],
activity=models.Activity.types['Standing'],
virus=models.SARSCoV2(
viral_load_in_sputum=1e9,
infectious_dose=np.array([50, 20, 30]),
),
expiration=models.Expiration.types['Talking']
)
cm = known_concentrations(lambda t: 1.2)
cm = replace(cm, infected=infected_population)
presence_interval = models.SpecificInterval(((0., 1.),))
population = models.Population(
10, presence_interval, models.Mask.types['Type I'],
models.Activity.types['Standing'],
)
model = ExposureModel(cm, population, fraction_deposited=1.0)
inf_probability = model.infection_probability()
assert isinstance(inf_probability, np.ndarray)
assert inf_probability.shape == (3, )

View file

@ -7,14 +7,12 @@ import cara.models
@pytest.mark.parametrize(
"override_params", [
{'viral_load_in_sputum': np.array([5e8, 1e9])},
{'quantum_infectious_dose': np.array([50, 20])},
{'exhalation_rate': np.array([0.75, 0.81])},
]
)
def test_infected_population_vectorisation(override_params):
defaults = {
'viral_load_in_sputum': 1e9,
'quantum_infectious_dose': 50,
'exhalation_rate': 0.75,
}
defaults.update(override_params)
@ -33,7 +31,7 @@ def test_infected_population_vectorisation(override_params):
),
virus=cara.models.Virus(
viral_load_in_sputum=defaults['viral_load_in_sputum'],
quantum_infectious_dose=defaults['quantum_infectious_dose'],
infectious_dose=50.,
),
expiration=cara.models.Expiration((1., 0., 0.)),
)

View file

@ -7,9 +7,9 @@ import cara.data as data
def test_no_mask_superspeading_emission_rate(baseline_model):
expected_rate = 970.
expected_rate = 48500.
npt.assert_allclose(
[baseline_model.infected.emission_rate(t) for t in [0, 1, 4, 4.5, 5, 8, 9]],
[baseline_model.infected.emission_rate(float(t)) for t in [0, 1, 4, 4.5, 5, 8, 9]],
[0, expected_rate, expected_rate, 0, 0, expected_rate, 0],
rtol=1e-12
)
@ -19,8 +19,8 @@ def test_no_mask_superspeading_emission_rate(baseline_model):
def baseline_periodic_window():
return models.SlidingWindow(
active=models.PeriodicInterval(period=120, duration=15),
inside_temp=models.PiecewiseConstant((0,24),(293,)),
outside_temp=models.PiecewiseConstant((0,24),(283,)),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=models.PiecewiseConstant((0., 24.), (283,)),
window_height=1.6, opening_length=0.6,
)
@ -41,10 +41,10 @@ def baseline_periodic_hepa():
def test_concentrations(baseline_model):
# expected concentrations were computed analytically
ts = [0, 4, 5, 7, 10]
concentrations = [baseline_model.concentration(t) for t in ts]
concentrations = [baseline_model.concentration(float(t)) for t in ts]
npt.assert_allclose(
concentrations,
[0.000000e+00, 0.41611256, 1.3205628e-14, 0.41611256, 4.1909001e-28],
[0.000000e+00, 20.805628, 6.602814e-13, 20.805628, 2.09545e-26],
rtol=1e-6
)
@ -55,7 +55,7 @@ def test_smooth_concentrations(baseline_model):
dx = 0.002
dy_limit = 0.2 # Anything more than this (in relative) is a bit steep.
ts = np.arange(0, 10, dx)
concentrations = [baseline_model.concentration(t) for t in ts]
concentrations = [baseline_model.concentration(float(t)) for t in ts]
assert np.abs(np.diff(concentrations)).max()/np.mean(concentrations) < dy_limit
@ -69,7 +69,7 @@ def build_model(interval_duration):
infected=models.InfectedPopulation(
number=1,
virus=models.Virus.types['SARS_CoV_2'],
presence=models.SpecificInterval(((0, 4), (5, 8))),
presence=models.SpecificInterval(((0., 4.), (5., 8.))),
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Light activity'],
expiration=models.Expiration.types['Superspreading event'],
@ -78,7 +78,7 @@ def build_model(interval_duration):
return model
def test_concentrations_startup(baseline_model):
def test_concentrations_startup():
# The concentrations should be the same until the beginning of the
# first time that the ventilation is disabled.
m1 = build_model(interval_duration=120)
@ -183,28 +183,38 @@ def test_multiple_ventilation_HEPA_HVAC_AirChange(volume, expected_value):
],
)
def test_windowopening(time, expected_value):
tempOutside = models.PiecewiseConstant((0,10,24),(273.15,283.15))
tempInside = models.PiecewiseConstant((0,24),(293.15,))
w = models.SlidingWindow(active=models.SpecificInterval([(0,24)]),
inside_temp=tempInside,outside_temp=tempOutside,
window_height=1.,opening_length=0.6)
npt.assert_allclose(w.air_exchange(models.Room(volume=68),time),
expected_value,rtol=1e-5)
tempOutside = models.PiecewiseConstant((0., 10., 24.),(273.15, 283.15))
tempInside = models.PiecewiseConstant((0., 24.), (293.15,))
w = models.SlidingWindow(
active=models.SpecificInterval([(0., 24.)]),
inside_temp=tempInside,outside_temp=tempOutside,
window_height=1., opening_length=0.6,
)
npt.assert_allclose(
w.air_exchange(models.Room(volume=68), time), expected_value, rtol=1e-5
)
def build_hourly_dependent_model(month, intervals_open=((7.5, 8.5),),
intervals_presence_infected=((0, 4), (5, 7.5)),
artificial_refinement=False,
temperatures=data.GenevaTemperatures_hourly):
def build_hourly_dependent_model(
month,
intervals_open=((7.5, 8.5),),
intervals_presence_infected=((0., 4.), (5., 7.5)),
artificial_refinement=False,
temperatures=data.GenevaTemperatures_hourly
):
if artificial_refinement:
# 5-fold increase of number of times, WITHOUT interpolation
# (hence transparent for the results)
refine_factor = 2
times_refined = tuple(np.linspace(0.,24,
refine_factor*len(temperatures[month].values)+1))
temperatures_refined = tuple(np.hstack([[v]*refine_factor
for v in temperatures[month].values]))
outside_temp = models.PiecewiseConstant(times_refined,temperatures_refined)
times_refined = tuple(
float(t) for t in np.linspace(
0., 24, refine_factor * len(temperatures[month].values) + 1
)
)
temperatures_refined = tuple(np.hstack(
[[v] * refine_factor for v in temperatures[month].values]
))
outside_temp = models.PiecewiseConstant(times_refined, temperatures_refined)
else:
outside_temp = temperatures[month]
@ -212,7 +222,7 @@ def build_hourly_dependent_model(month, intervals_open=((7.5, 8.5),),
room=models.Room(volume=75),
ventilation=models.SlidingWindow(
active=models.SpecificInterval(intervals_open),
inside_temp=models.PiecewiseConstant((0,24),(293,)),
inside_temp=models.PiecewiseConstant((0., 24.), (293, )),
outside_temp=outside_temp,
window_height=1.6, opening_length=0.6,
),
@ -233,14 +243,14 @@ def build_constant_temp_model(outside_temp, intervals_open=((7.5, 8.5),)):
room=models.Room(volume=75),
ventilation=models.SlidingWindow(
active=models.SpecificInterval(intervals_open),
inside_temp=models.PiecewiseConstant((0,24),(293,)),
outside_temp=models.PiecewiseConstant((0,24),(outside_temp,)),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=models.PiecewiseConstant((0., 24.), (outside_temp,)),
window_height=1.6, opening_length=0.6,
),
infected=models.InfectedPopulation(
number=1,
virus=models.Virus.types['SARS_CoV_2'],
presence=models.SpecificInterval(((0, 4), (5, 7.5))),
presence=models.SpecificInterval(((0., 4.), (5., 7.5))),
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Light activity'],
expiration=models.Expiration.types['Superspreading event'],
@ -253,21 +263,22 @@ def build_hourly_dependent_model_multipleventilation(month, intervals_open=((7.5
vent = models.MultipleVentilation((
models.SlidingWindow(
active=models.SpecificInterval(intervals_open),
inside_temp=models.PiecewiseConstant((0,24),(293,)),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=data.GenevaTemperatures[month],
window_height=1.6, opening_length=0.6,
),
models.HEPAFilter(
active=models.SpecificInterval(((0,24),)),
q_air_mech=500.,
)))
active=models.SpecificInterval(((0., 24.),)),
q_air_mech=500.,
),
))
model = models.ConcentrationModel(
room=models.Room(volume=75),
ventilation=vent,
infected=models.InfectedPopulation(
number=1,
virus=models.Virus.types['SARS_CoV_2'],
presence=models.SpecificInterval(((0, 4), (5, 7.5))),
presence=models.SpecificInterval(((0., 4.), (5., 7.5))),
mask=models.Mask.types['No mask'],
activity=models.Activity.types['Light activity'],
expiration=models.Expiration.types['Superspreading event'],
@ -288,7 +299,7 @@ def test_concentrations_hourly_dep_temp_vs_constant(month, temperatures, time):
# The concentrations should be the same up to 8 AM (time when the
# temperature changes DURING the window opening).
m1 = build_hourly_dependent_model(month)
m2 = build_constant_temp_model(temperatures[7]+273.15)
m2 = build_constant_temp_model(temperatures[7] + 273.15)
npt.assert_allclose(m1.concentration(time), m2.concentration(time), rtol=1e-5)
@pytest.mark.parametrize(
@ -302,8 +313,11 @@ def test_concentrations_hourly_dep_temp_vs_constant(month, temperatures, time):
def test_concentrations_hourly_dep_temp_startup(month, temperatures, time):
# The concentrations should be the zero up to the first presence time
# of an infecter person.
m = build_hourly_dependent_model(month,((0.,0.5),(1,1.5),(4,4.5),(7.5,8)),
((8,12.),))
m = build_hourly_dependent_model(
month,
((0., 0.5), (1., 1.5), (4., 4.5), (7.5, 8), ),
((8., 12.), ),
)
assert m.concentration(time) == 0.
@ -323,19 +337,22 @@ def test_concentrations_hourly_dep_multipleventilation():
def test_concentrations_hourly_dep_adding_artificial_transitions(month_temp_item, time):
month, temperatures = month_temp_item
# Adding a second opening inside the first one should not change anything
m1 = build_hourly_dependent_model(month,intervals_open=((7.5, 8.5),))
m2 = build_hourly_dependent_model(month,intervals_open=((7.5, 8.5),(8.,8.1)))
m1 = build_hourly_dependent_model(month, intervals_open=((7.5, 8.5), ))
m2 = build_hourly_dependent_model(month, intervals_open=((7.5, 8.5), (8., 8.1), ))
npt.assert_allclose(m1.concentration(time), m2.concentration(time), rtol=1e-5)
@pytest.mark.parametrize(
"time",
list(np.random.random_sample(10)*24.)+list(np.arange(0,24.5,0.5)),
(
[float(t) for t in np.random.random_sample(10) * 24.] # type: ignore
+ [float(t) for t in np.arange(0, 24.5, 0.5)]
),
)
def test_concentrations_refine_times(time):
month = 'Jan'
m1 = build_hourly_dependent_model(month,intervals_open=((0, 24),))
m2 = build_hourly_dependent_model(month,intervals_open=((0, 24),),
m1 = build_hourly_dependent_model(month, intervals_open=((0., 24.),))
m2 = build_hourly_dependent_model(month, intervals_open=((0., 24.),),
artificial_refinement=True)
npt.assert_allclose(m1.concentration(time), m2.concentration(time), rtol=1e-8)
@ -350,48 +367,48 @@ def build_exposure_model(concentration_model):
activity=infected.activity,
mask=infected.mask,
),
fraction_deposited = 1.,
fraction_deposited=1.,
)
# expected quanta were computed with a trapezoidal integration, using
# expected exposure were computed with a trapezoidal integration, using
# a mesh of 100'000 pts per exposed presence interval.
@pytest.mark.parametrize(
"month, expected_quanta",
"month, expected_exposure",
[
['Jan', 9.930854],
['Jun', 37.962708],
['Jan', 496.5427],
['Jun', 1898.1354],
],
)
def test_quanta_hourly_dep(month,expected_quanta):
def test_exposure_hourly_dep(month,expected_exposure):
m = build_exposure_model(
build_hourly_dependent_model(
month,
intervals_open=((0,24),),
intervals_presence_infected=((8, 12), (13, 17))
intervals_open=((0., 24.), ),
intervals_presence_infected=((8., 12.), (13., 17.))
)
)
quanta = m.quanta_exposure()
npt.assert_allclose(quanta, expected_quanta)
exposure = m.exposure()
npt.assert_allclose(exposure, expected_exposure)
# expected quanta were computed with a trapezoidal integration, using
# expected exposure were computed with a trapezoidal integration, using
# a mesh of 100'000 pts per exposed presence interval and 25 pts per hour
# for the temperature discretization.
@pytest.mark.parametrize(
"month, expected_quanta",
"month, expected_exposure",
[
['Jan', 9.993842],
['Jun', 40.151985],
['Jan', 499.6921],
['Jun', 2007.59925],
],
)
def test_quanta_hourly_dep_refined(month,expected_quanta):
def test_exposure_hourly_dep_refined(month,expected_exposure):
m = build_exposure_model(
build_hourly_dependent_model(
month,
intervals_open=((0, 24),),
intervals_presence_infected=((8, 12), (13, 17)),
intervals_open=((0., 24.),),
intervals_presence_infected=((8., 12.), (13., 17.)),
temperatures=data.GenevaTemperatures,
)
)
quanta = m.quanta_exposure()
npt.assert_allclose(quanta, expected_quanta, rtol=0.02)
exposure = m.exposure()
npt.assert_allclose(exposure, expected_exposure, rtol=0.02)

View file

@ -43,14 +43,14 @@ def baseline_mc_model() -> cara.monte_carlo.ConcentrationModel:
room=cara.monte_carlo.Room(volume=cara.monte_carlo.sampleable.Normal(75, 20)),
ventilation=cara.monte_carlo.SlidingWindow(
active=cara.models.PeriodicInterval(period=120, duration=120),
inside_temp=cara.models.PiecewiseConstant((0, 24), (293,)),
outside_temp=cara.models.PiecewiseConstant((0, 24), (283,)),
inside_temp=cara.models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=cara.models.PiecewiseConstant((0., 24.), (283,)),
window_height=1.6, opening_length=0.6,
),
infected=cara.models.InfectedPopulation(
number=1,
virus=cara.models.Virus.types['SARS_CoV_2'],
presence=cara.models.SpecificInterval(((0, 4), (5, 8))),
presence=cara.models.SpecificInterval(((0., 4.), (5., 8.))),
mask=cara.models.Mask.types['No mask'],
activity=cara.models.Activity.types['Light activity'],
expiration=cara.models.Expiration.types['Breathing'],
@ -75,8 +75,8 @@ def baseline_mc_exposure_model(baseline_mc_model) -> cara.monte_carlo.ExposureMo
def test_build_concentration_model(baseline_mc_model: cara.monte_carlo.ConcentrationModel):
model = baseline_mc_model.build_model(7)
assert isinstance(model, cara.models.ConcentrationModel)
assert isinstance(model.concentration(time=0), float)
conc = model.concentration(time=1)
assert isinstance(model.concentration(time=0.), float)
conc = model.concentration(time=1.)
assert isinstance(conc, np.ndarray)
assert conc.shape == (7, )
@ -84,6 +84,6 @@ def test_build_concentration_model(baseline_mc_model: cara.monte_carlo.Concentra
def test_build_exposure_model(baseline_mc_exposure_model: cara.monte_carlo.ExposureModel):
model = baseline_mc_exposure_model.build_model(7)
assert isinstance(model, cara.models.ExposureModel)
prob = model.quanta_exposure()
prob = model.exposure()
assert isinstance(prob, np.ndarray)
assert prob.shape == (7, )

View file

@ -26,12 +26,12 @@ def shared_office_mc():
(
models.SlidingWindow(
active=models.PeriodicInterval(period=120, duration=10),
inside_temp=models.PiecewiseConstant((0, 24), (293,)),
outside_temp=models.PiecewiseConstant((0, 24), (283,)),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=models.PiecewiseConstant((0., 24.), (283,)),
window_height=1.6, opening_length=0.6,
),
models.AirChange(
active=models.SpecificInterval(((0,24),)),
active=models.SpecificInterval(((0., 24.), )),
air_exch=0.25,
),
),
@ -39,7 +39,7 @@ def shared_office_mc():
infected=mc.InfectedPopulation(
number=1,
virus=virus_distributions['SARS_CoV_2_B117'],
presence=mc.SpecificInterval(((0, 2), (2.1, 4), (5, 7), (7.1, 9))),
presence=mc.SpecificInterval(((0., 2.), (2.1, 4.), (5., 7.), (7.1, 9.))),
mask=models.Mask(η_inhale=0.3),
activity=activity_distributions['Seated'],
expiration=models.MultipleExpiration(
@ -70,12 +70,12 @@ def classroom_mc():
(
models.SlidingWindow(
active=models.PeriodicInterval(period=120, duration=10),
inside_temp=models.PiecewiseConstant((0, 24), (293,)),
outside_temp=models.PiecewiseConstant((0, 24), (283,)),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=models.PiecewiseConstant((0., 24.), (283,)),
window_height=1.6, opening_length=0.6,
),
models.AirChange(
active=models.SpecificInterval(((0,24),)),
active=models.SpecificInterval(((0., 24.),)),
air_exch=0.25,
),
),
@ -83,7 +83,7 @@ def classroom_mc():
infected=mc.InfectedPopulation(
number=1,
virus=virus_distributions['SARS_CoV_2_B117'],
presence=mc.SpecificInterval(((0, 2), (2.5, 4), (5, 7), (7.5, 9))),
presence=mc.SpecificInterval(((0., 2.), (2.5, 4.), (5., 7.), (7.5, 9.))),
mask=models.Mask.types['No mask'],
activity=activity_distributions['Light activity'],
expiration=models.Expiration.types['Talking'],
@ -108,13 +108,13 @@ def ski_cabin_mc():
concentration_mc = mc.ConcentrationModel(
room=models.Room(volume=10, humidity=0.5),
ventilation=models.AirChange(
active=models.SpecificInterval(((0,24),)),
active=models.SpecificInterval(((0., 24.),)),
air_exch=0,
),
infected=mc.InfectedPopulation(
number=1,
virus=virus_distributions['SARS_CoV_2_B117'],
presence=mc.SpecificInterval(((0, 1/3),)),
presence=mc.SpecificInterval(((0., 1/3),)),
mask=models.Mask(η_inhale=0.3),
activity=activity_distributions['Moderate activity'],
expiration=models.Expiration.types['Talking'],
@ -141,13 +141,13 @@ def gym_mc():
concentration_mc = mc.ConcentrationModel(
room=models.Room(volume=300, humidity=0.5),
ventilation=models.AirChange(
active=models.SpecificInterval(((0,24),)),
active=models.SpecificInterval(((0., 24.),)),
air_exch=6,
),
infected=mc.InfectedPopulation(
number=2,
virus=virus_distributions['SARS_CoV_2_B117'],
presence=mc.SpecificInterval(((0, 1),)),
presence=mc.SpecificInterval(((0., 1.),)),
mask=models.Mask.types["No mask"],
activity=activity_distributions['Heavy exercise'],
expiration=models.Expiration.types['Breathing'],
@ -173,13 +173,13 @@ def waiting_room_mc():
concentration_mc = mc.ConcentrationModel(
room=models.Room(volume=100, humidity=0.5),
ventilation=models.AirChange(
active=models.SpecificInterval(((0,24),)),
active=models.SpecificInterval(((0., 24.),)),
air_exch=0.25,
),
infected=mc.InfectedPopulation(
number=1,
virus=virus_distributions['SARS_CoV_2_B117'],
presence=mc.SpecificInterval(((0, 2),)),
presence=mc.SpecificInterval(((0., 2.),)),
mask=models.Mask.types["No mask"],
activity=activity_distributions['Seated'],
expiration=models.MultipleExpiration(
@ -215,7 +215,7 @@ def skagit_chorale_mc():
infected=mc.InfectedPopulation(
number=1,
virus=virus_distributions['SARS_CoV_2'],
presence=mc.SpecificInterval(((0, 2.5),)),
presence=mc.SpecificInterval(((0., 2.5),)),
mask=models.Mask.types["No mask"],
activity=activity_distributions['Light activity'],
expiration=models.Expiration((5., 5., 5.)),
@ -233,54 +233,54 @@ def skagit_chorale_mc():
@pytest.mark.parametrize(
"mc_model, expected_pi, expected_new_cases, expected_dose, expected_qR",
"mc_model, expected_pi, expected_new_cases, expected_dose, expected_ER",
[
["shared_office_mc", 10.7, 0.32, 0.954, 10.9],
["classroom_mc", 36.1, 6.85, 13.0, 474.4],
["ski_cabin_mc", 16.3, 0.49, 0.599, 123.4],
["gym_mc", 2.25, 0.63, 0.01307, 16.4],
["waiting_room_mc", 9.72, 1.36, 0.571, 58.9],
["skagit_chorale_mc",29.9, 17.9, 1.90, 1414],
["shared_office_mc", 10.7, 0.32, 57.24, 654],
["classroom_mc", 36.1, 6.85, 780.0, 28464],
["ski_cabin_mc", 16.3, 0.49, 35.94, 7404],
["gym_mc", 2.25, 0.63, 0.7842, 984],
["waiting_room_mc", 9.72, 1.36, 34.26, 3534],
["skagit_chorale_mc",29.9, 17.9, 190.0, 141400],
]
)
def test_report_models(mc_model, expected_pi, expected_new_cases,
expected_dose, expected_qR, request):
expected_dose, expected_ER, request):
mc_model = request.getfixturevalue(mc_model)
exposure_model = mc_model.build_model(size=SAMPLE_SIZE)
npt.assert_allclose(exposure_model.infection_probability().mean(),
expected_pi, rtol=TOLERANCE)
npt.assert_allclose(exposure_model.expected_new_cases().mean(),
expected_new_cases, rtol=TOLERANCE)
npt.assert_allclose(exposure_model.quanta_exposure().mean(),
npt.assert_allclose(exposure_model.exposure().mean(),
expected_dose, rtol=TOLERANCE)
npt.assert_allclose(
exposure_model.concentration_model.infected.emission_rate_when_present().mean(),
expected_qR, rtol=TOLERANCE)
expected_ER, rtol=TOLERANCE)
@pytest.mark.parametrize(
"mask_type, month, expected_pi, expected_dose, expected_qR",
"mask_type, month, expected_pi, expected_dose, expected_ER",
[
["No mask", "Jul", 30.0, 6.764, 64.9],
["Type I", "Jul", 10.2, 1.223, 11.7],
["FFP2", "Jul", 4.0, 1.223, 11.7],
["Type I", "Feb", 4.25, 0.357, 11.7],
["No mask", "Jul", 30.0, 405.84, 3894],
["Type I", "Jul", 10.2, 73.38, 702],
["FFP2", "Jul", 4.0, 73.38, 702],
["Type I", "Feb", 4.25, 21.42, 702],
],
)
def test_small_shared_office_Geneva(mask_type, month, expected_pi,
expected_dose, expected_qR):
expected_dose, expected_ER):
concentration_mc = mc.ConcentrationModel(
room=models.Room(volume=33, humidity=0.5),
ventilation=models.MultipleVentilation(
(
models.SlidingWindow(
active=models.SpecificInterval(((0,24),)),
inside_temp=models.PiecewiseConstant((0, 24), (293,)),
active=models.SpecificInterval(((0., 24.),)),
inside_temp=models.PiecewiseConstant((0., 24.), (293,)),
outside_temp=data.GenevaTemperatures[month],
window_height=1.5, opening_length=0.2,
),
models.AirChange(
active=models.SpecificInterval(((0,24),)),
active=models.SpecificInterval(((0., 24.),)),
air_exch=0.25,
),
),
@ -288,7 +288,7 @@ def test_small_shared_office_Geneva(mask_type, month, expected_pi,
infected=mc.InfectedPopulation(
number=1,
virus=virus_distributions['SARS_CoV_2_B117'],
presence=mc.SpecificInterval(((9, 10+2/3), (10+5/6, 12.5), (13.5, 15+2/3), (15+5/6, 18))),
presence=mc.SpecificInterval(((9., 10+2/3), (10+5/6, 12.5), (13.5, 15+2/3), (15+5/6, 18.))),
mask=models.Mask.types[mask_type],
activity=activity_distributions['Seated'],
expiration=models.MultipleExpiration(
@ -309,8 +309,8 @@ def test_small_shared_office_Geneva(mask_type, month, expected_pi,
exposure_model = exposure_mc.build_model(size=SAMPLE_SIZE)
npt.assert_allclose(exposure_model.infection_probability().mean(),
expected_pi, rtol=TOLERANCE)
npt.assert_allclose(exposure_model.quanta_exposure().mean(),
npt.assert_allclose(exposure_model.exposure().mean(),
expected_dose, rtol=TOLERANCE)
npt.assert_allclose(
exposure_model.concentration_model.infected.emission_rate_when_present().mean(),
expected_qR, rtol=TOLERANCE)
expected_ER, rtol=TOLERANCE)

27
cara/utils.py Normal file
View file

@ -0,0 +1,27 @@
import functools
def method_cache(fn):
"""
A decorator for instance based caching.
Unlike lru_cache / memoization, this allows us to not have to have the
instance itself be hashable - only the arguments must be so.
The cache is stored as a dictionary in a private attribute on the instance
with the name ``_cache_{func_name}``.
"""
cache_name = f'_cache_{fn.__name__}'
@functools.wraps(fn)
def cached_method(self, *args, **kwargs):
cache = getattr(self, cache_name, None)
if cache is None:
cache = {}
object.__setattr__(self, cache_name, cache)
cache_key = hash(args + tuple(kwargs.items()))
if cache_key not in cache:
cache[cache_key] = fn(self, *args, **kwargs)
return cache[cache_key]
return cached_method