Backend separation

- extract, isolate and package it in a completely independent Python module, versioned and in a way that allows releases on PyPI.org
- fixed error in placeholder for secondary school (data registry defaults)
- added restriction in pytest version to install
- expected number of new cases fix
- data registry update (schema v2.1.1)
- github update
- deprecate ExpertApplication and CO2Application
- changes to reflect schema update 2.0.2
- version update
- Fixed error with f_inf (short-range)
- new folder layout
- Conditional probability data update
- General fixes
- Fitting results in L/S/person
- CO2 fitting algorithm refinement
This commit is contained in:
lrdossan 2024-06-20 10:14:38 +02:00
parent 6f77b6e725
commit 20b0467f89
181 changed files with 2094 additions and 866 deletions

View file

@ -24,6 +24,7 @@ jobs:
env:
PROJECT_ROOT: ./
PROJECT_NAME: caimira
CAIMIRA_TESTS_CALCULATOR_TIMEOUT: 30
steps:
- name: Checkout
uses: actions/checkout@v2

View file

@ -15,24 +15,55 @@ variables:
# ###################################################################################################
# Test code
# Test code - CAiMIRA (model) and CERN CAiMIRA (CERN's UI)
# A full installation of CAiMIRA, tested with pytest.
# A full installation of CAiMIRA, tested with pytest.
test_install:
extends: .acc_py_full_test
variables:
project_root: ./caimira
project_name: caimira
# A development installation of CAiMIRA tested with pytest.
test_dev:
extends: .acc_py_dev_test
variables:
project_root: ./caimira
project_name: caimira
# A development installation of CAiMIRA tested with pytest.
test_dev-39:
variables:
PY_VERSION: "3.9"
project_root: ./caimira
project_name: caimira
extends: .acc_py_dev_test
# A full installation of CERN CAiMIRA, tested with pytest.
test_install_cern_caimira:
extends: .acc_py_full_test
variables:
project_root: ./_cern_caimira
project_name: cern_caimira
# A development installation of CERN CAiMIRA tested with pytest.
test_dev_cern_caimira:
extends: .acc_py_dev_test
variables:
project_root: ./_cern_caimira
project_name: cern_caimira
# A development installation of CERN CAiMIRA tested with pytest.
test_dev-39_cern_caimira:
variables:
PY_VERSION: "3.9"
project_root: ./_cern_caimira
project_name: cern_caimira
extends: .acc_py_dev_test
# ###################################################################################################
# Test OpenShift config

409
README.md
View file

@ -103,25 +103,426 @@ pip install -e . # At the root of the repository
### Running the Calculator app in development mode
```
python -m caimira.apps.calculator
python -m ui.apps.calculator
```
To run with a specific template theme created:
```
python -m caimira.apps.calculator --theme=caimira/apps/templates/{theme}
python -m ui.apps.calculator --theme=ui/apps/templates/{theme}
```
To run the entire app in a different `APPLICATION_ROOT` path:
```
python -m caimira.apps.calculator --app_root=/myroot
python -m ui.apps.calculator --app_root=/myroot
```
To run the calculator on a different URL path:
```
python -m caimira.apps.calculator --prefix=/mycalc
python -m ui.apps.calculator --prefix=/mycalc
```
Each of these commands will start a local version of CAiMIRA, which can be visited at http://localhost:8080/.
### How to compile and read the documentation
In order to generate the documentation, CAiMIRA must be installed first with the `doc` dependencies:
```
pip install -e .[doc]
```
To generate the HTML documentation page, the command `make html` should be executed in the `caimira/docs` directory.
If any of the `.rst` files under the `caimira/docs` folder is changed, this command should be executed again.
Then, right click on `caimira/docs/_build/html/index.html` and select `Open with` your preferred web browser.
### Running the CAiMIRA Expert-App or CO2-App apps in development mode
#### Disclaimer
The `ExpertApplication` and `CO2Application` are no longer actively maintained but will remain in the codebase for legacy purposes.
Please note that the functionality of these applications might be compromised due to deprecation issues.
#### Running the Applications
These applications only work within Jupyter notebooks. Attempting to run them outside of a Jupyter environment may result in errors or degraded functionality.
##### Prerequisites
Make sure you have the needed dependencies intalled:
```
pip install notebook jupyterlab
```
Running with Visual Studio Code (VSCode):
1. Ensure you have the following extensions installed in VSCode: `Jupyter` and `Python`.
2. Open VSCode and navigate to the directory containing the notebook.
3. Open the notebook (e.g. `caimira/apps/expert/caimira.ipynb`) and run the cells by clicking the `run` button next to each cell.
### Running the tests
```
pip install -e .[test]
pytest ./caimira
```
### Running the profiler
The profiler is enabled when the environment variable `CAIMIRA_PROFILER_ENABLED` is set to 1.
When visiting http://localhost:8080/profiler, you can start a new session and choose between [PyInstrument](https://github.com/joerick/pyinstrument) or [cProfile](https://docs.python.org/3/library/profile.html#module-cProfile). The app includes two different profilers, mainly because they can give different information.
Keep the profiler page open. Then, in another window, navigate to any page in CAiMIRA, for example generate a new report. Refresh the profiler page, and click on the `Report` link to see the profiler output.
The sessions are stored in a local file in the `/tmp` folder. To share it across multiple web nodes, a shared storage should be added to all web nodes. The folder can be customized via the environment variable `CAIMIRA_PROFILER_CACHE_DIR`.
### Building the whole environment for local development
**Simulate the docker build that takes place on openshift with:**
```
s2i build file://$(pwd) --copy --keep-symlinks --context-dir ./app-config/nginx/ centos/nginx-112-centos7 caimira-nginx-app
docker build . -f ./app-config/calculator-app/Dockerfile -t calculator-app
docker build ./app-config/auth-service -t auth-service
```
Get the client secret from the CERN Application portal for the `caimira-test` app. See [CERN-SSO-integration](#cern-sso-integration) for more info.
```
read CLIENT_SECRET
```
Define some env vars (copy/paste):
```
export COOKIE_SECRET=$(openssl rand -hex 50)
export OIDC_SERVER=https://auth.cern.ch/auth
export OIDC_REALM=CERN
export CLIENT_ID=caimira-test
```
Run docker-compose:
```
cd app-config
CURRENT_UID=$(id -u):$(id -g) docker-compose up
```
Then visit http://localhost:8080/.
### Setting up the application on openshift
The https://cern.ch/caimira application is running on CERN's OpenShift platform. In order to set it up for the first time, we followed the documentation at https://paas.docs.cern.ch/. In particular we:
* Added the OpenShift application deploy key to the GitLab repository
* Created a Python 3.6 (the highest possible at the time of writing) application in OpenShift
* Configured a generic webhook on OpenShift, and call that from the CI of the GitLab repository
### Updating the caimira-test.web.cern.ch instance
We have a replica of https://caimira.web.cern.ch running on http://caimira-test.web.cern.ch. Its purpose is to simulate what will happen when
a feature is merged. To push your changes to caimira-test, simply push your branch to `live/caimira-test` and the CI pipeline will trigger the
deployment. To push to this branch, there is a good chance that you will need to force push - you should always force push with care and
understanding why you are doing it. Syntactically, it will look something like (assuming that you have "upstream" as your remote name,
but it may be origin if you haven't configured it differently):
git push --force upstream name-of-local-branch:live/caimira-test
## OpenShift templates
### First setup
First, get the [oc](https://docs.okd.io/3.11/cli_reference/get_started_cli.html) client and then login:
```console
$ oc login https://api.paas.okd.cern.ch
```
Then, switch to the project that you want to update:
```console
$ oc project caimira-test
```
Create a new service account in OpenShift to use GitLab container registry:
```console
$ oc create serviceaccount gitlabci-deployer
serviceaccount "gitlabci-deployer" created
$ oc policy add-role-to-user registry-editor -z gitlabci-deployer
# We will refer to the output of this command as `test-token`
$ oc serviceaccounts get-token gitlabci-deployer
<...test-token...>
```
Add the token to GitLab to allow GitLab to access OpenShift and define/change image stream tags. Go to `Settings` -> `CI / CD` -> `Variables` -> click on `Expand` button and create the variable `OPENSHIFT_CAIMIRA_TEST_DEPLOY_TOKEN`: insert the token `<...test-token...>`.
Then, create the webhook secret to be able to trigger automatic builds from GitLab.
Create and store the secret. Copy the secret above and add it to the GitLab project under `CI /CD` -> `Variables` with the name `OPENSHIFT_CAIMIRA_TEST_WEBHOOK_SECRET`.
```console
$ WEBHOOKSECRET=$(openssl rand -hex 50)
$ oc create secret generic \
--from-literal="WebHookSecretKey=$WEBHOOKSECRET" \
gitlab-caimira-webhook-secret
```
For CI usage, we also suggest creating a service account:
```console
oc create sa gitlab-config-checker
```
Under ``User Management`` -> ``RoleBindings`` create a new `RoleBinding` to grant `View` access to the `gitlab-config-checker` service account:
* name: `gitlab-config-checker-view-role`
* role name: `view`
* service account: `gitlab-config-checker`
To get this new user's authentication token go to ``User Management`` -> ``Service Accounts`` -> `gitlab-config-checker` and locate the token in the newly created secret associated with the user (in this case ``gitlab-config-checker-token-XXXX``). Copy the `token` value from `Data`.
Create the various configurations:
```console
$ cd app-config/openshift
$ oc process -f configmap.yaml | oc create -f -
$ oc process -f services.yaml | oc create -f -
$ oc process -f imagestreams.yaml | oc create -f -
$ oc process -f buildconfig.yaml --param GIT_BRANCH='live/caimira-test' | oc create -f -
$ oc process -f deploymentconfig.yaml --param PROJECT_NAME='caimira-test' | oc create -f -
```
Manually create the **route** to access the website, see `routes.example.yaml`.
After having created the route, make sure that you extend the HTTP request timeout annotation: the
report generation can take more time than the default 30 seconds.
```
$ oc annotate route caimira-route --overwrite haproxy.router.openshift.io/timeout=60s
```
### CERN SSO integration
The SSO integration uses OpenID credentials configured in [CERN Applications portal](https://application-portal.web.cern.ch/).
How to configure the application:
* Application Identifier: `caimira-test`
* Homepage: `https://caimira-test.web.cern.ch`
* Administrators: `caimira-dev`
* SSO Registration:
* Protocol: `OpenID (OIDC)`
* Redirect URI: `https://caimira-test.web.cern.ch/auth/authorize`
* Leave unchecked all the other checkboxes
* Define new roles:
* Name: `CERN Users`
* Role Identifier: `external-users`
* Leave unchecked checkboxes
* Minimum Level Of Assurance: `CERN (highest)`
* Assign role to groups: `cern-accounts-primary` e-group
* Name: `External accounts`
* Role Identifier: `admin`
* Leave unchecked checkboxes
* Minimum Level Of Assurance: `Any (no restrictions)`
* Assign role to groups: `caimira-app-external-access` e-group
* Name: `Allowed users`
* Role Identifier: `allowed-users`
* Check `This role is required to access my application`
* Minimum Level Of Assurance:`Any (no restrictions)`
* Assign role to groups: `cern-accounts-primary` and `caimira-app-external-access` e-groups
Copy the client id and client secret and use it below.
```console
$ COOKIE_SECRET=$(openssl rand -hex 50)
$ oc create secret generic \
--from-literal="CLIENT_ID=$CLIENT_ID" \
--from-literal="CLIENT_SECRET=$CLIENT_SECRET" \
--from-literal="COOKIE_SECRET=$COOKIE_SECRET" \
auth-service-secrets
```
### External APIs
- **Geographical location:**
There is one external API call to fetch required information related to the geographical location inserted by a user.
The documentation for this geocoding service is available at https://developers.arcgis.com/rest/geocode/api-reference/geocoding-suggest.htm .
Please note that there is no need for keys on this API call. It is **free-of-charge**.
- **Humidity and Inside Temperature:**
There is the possibility of using one external API call to fetch information related to a location specified in the UI. The data is related to the inside temperature and humidity taken from an indoor measurement device. Note that the API currently used from ARVE is only available for the `CERN theme` as the authorised sensors are installed at CERN."
- **ARVE:**
The ARVE Swiss Air Quality System provides trusted air data for commercial buildings in real-time and analyzes it with the help of AI and machine learning algorithms to create actionable insights.
Create secret:
```console
$ read ARVE_CLIENT_ID
$ read ARVE_CLIENT_SECRET
$ read ARVE_API_KEY
$ oc create secret generic \
--from-literal="ARVE_CLIENT_ID=$ARVE_CLIENT_ID" \
--from-literal="ARVE_CLIENT_SECRET=$ARVE_CLIENT_SECRET" \
--from-literal="ARVE_API_KEY=$ARVE_API_KEY" \
arve-api
```
- **CERN Data Service:**
The CERN data service collects data from various sources and expose them via a REST API endpoint.
The service is enabled when the environment variable `DATA_SERVICE_ENABLED` is set to 1.
## Update configuration
If you need to **update** existing configuration, then modify this repository and after having logged in, run:
```console
$ cd app-config/openshift
$ oc process -f configmap.yaml | oc replace -f -
$ oc process -f services.yaml | oc replace -f -
$ oc process -f imagestreams.yaml | oc replace -f -
$ oc process -f buildconfig.yaml --param GIT_BRANCH='live/caimira-test' | oc replace -f -
$ oc process -f deploymentconfig.yaml --param PROJECT_NAME='caimira-test' | oc replace -f -
```
Be aware that if you create/recreate the environment you must manually create a **route** in OpenShift,
specifying the respective annotation to be exposed outside CERN.
# CAiMIRA - CERN Airborne Model for Risk Assessment
CAiMIRA is a risk assessment tool developed to model the concentration of viruses in enclosed spaces, in order to inform space-management decisions.
CAiMIRA models the concentration profile of potential virions in enclosed spaces , both as background (room) concentration and during close-proximity interations, with clear and intuitive graphs.
The user can set a number of parameters, including room volume, exposure time, activity type, mask-wearing and ventilation.
The report generated indicates how to avoid exceeding critical concentrations and chains of airborne transmission in spaces such as individual offices, meeting rooms and labs.
The risk assessment tool simulates the airborne spread SARS-CoV-2 virus in a finite volume, assuming a homogenous mixture and a two-stage exhaled jet model, and estimates the risk of COVID-19 infection therein.
The results DO NOT include the other known modes of SARS-CoV-2 transmission, such as fomite or blood-bound.
Hence, the output from this model is only valid when the other recommended public health & safety instructions are observed, such as good hand hygiene and other barrier measures.
The model used is based on scientific publications relating to airborne transmission of infectious diseases, dose-response exposures and aerosol science, as of February 2022.
It can be used to compare the effectiveness of different airborne-related risk mitigation measures.
Note that this model applies a deterministic approach, i.e., it is assumed at least one person is infected and shedding viruses into the simulated volume.
Nonetheless, it is also important to understand that the absolute risk of infection is uncertain, as it will depend on the probability that someone infected attends the event.
The model is most useful for comparing the impact and effectiveness of different mitigation measures such as ventilation, filtration, exposure time, physical activity, amount and nature of close-range interactions and
the size of the room, considering both long- and short-range airborne transmission modes of COVID-19 in indoor settings.
This tool is designed to be informative, allowing the user to adapt different settings and model the relative impact on the estimated infection probabilities.
The objective is to facilitate targeted decision-making and investment through comparisons, rather than a singular determination of absolute risk.
While the SARS-CoV-2 virus is in circulation among the population, the notion of 'zero risk' or 'completely safe scenario' does not exist.
Each event modelled is unique, and the results generated therein are only as accurate as the inputs and assumptions.
## Authors
CAiMIRA was developed by following members of CERN - European Council for Nuclear Research (visit https://home.cern/):
Andre Henriques<sup>1</sup>, Luis Aleixo<sup>1</sup>, Marco Andreini<sup>1</sup>, Gabriella Azzopardi<sup>2</sup>, James Devine<sup>3</sup>, Philip Elson<sup>4</sup>, Nicolas Mounet<sup>2</sup>, Markus Kongstein Rognlien<sup>2,6</sup>, Nicola Tarocco<sup>5</sup>
<sup>1</sup>HSE Unit, Occupational Health & Safety Group, CERN<br>
<sup>2</sup>Beams Department, Accelerators and Beam Physics Group, CERN<br>
<sup>3</sup>Experimental Physics Department, Safety Office, CERN<br>
<sup>4</sup>Beams Department, Controls Group, CERN<br>
<sup>5</sup>Information Technology Department, Collaboration, Devices & Applications Group, CERN<br>
<sup>6</sup>Norwegian University of Science and Technology (NTNU)<br>
### Reference and Citation
**For the use of the CAiMIRA web app**
CAiMIRA CERN Airborne Model for Indoor Risk Assessment tool
[![DOI](https://zenodo.org/badge/DOI/10.5281/zenodo.6520431.svg)](https://doi.org/10.5281/zenodo.6520431)
© Copyright 2020-2021 CERN. All rights not expressly granted are reserved.
**For use of the CAiMIRA model**
Henriques A, Mounet N, Aleixo L, Elson P, Devine J, Azzopardi G, Andreini M, Rognlien M, Tarocco N, Tang J. (2022). Modelling airborne transmission of SARS-CoV-2 using CARA: risk assessment for enclosed spaces. _Interface Focus 20210076_. https://doi.org/10.1098/rsfs.2021.0076
Reference on the Short-range expiratory jet model from:
Jia W, Wei J, Cheng P, Wang Q, Li Y. (2022). Exposure and respiratory infection risk via the short-range airborne route. _Building and Environment_ *219*: 109166.
https://doi.org/10.1016/j.buildenv.2022.109166
***Open Source Acknowledgments***
For a detailed list of the open-source dependencies used in this project along with their respective licenses, please refer to [License Information](open-source-licences/README.md). This includes both the core dependencies specified in the project's requirements and their transitive dependencies.
The information also features a distribution diagram of licenses and a brief description of each of them.
## Applications
### Calculator
A risk assessment tool which simulates the airborne spread of the SARS-CoV-2 virus for space managers.
### CAiMIRA Expert App and CO₂ App
A tool to interact with various parameters of the CAiMIRA model.
## Disclaimer
CAiMIRA has not undergone review, approval or certification by competent authorities, and as a result, it cannot be considered as a fully endorsed and reliable tool, namely in the assessment of potential viral emissions from infected hosts to be modelled.
The software is provided "as is", without warranty of any kind, express or implied, including but not limited to the warranties of merchantability, fitness for a particular purpose and non-infringement.
In no event shall the authors or copyright holders be liable for any claim, damages or other liability, whether in an action of contract, tort or otherwise, arising from, out of or in connection with the software or the use or other dealings in the software.
## Running CAiMIRA locally
The easiest way to run a version of CAiMIRA Calculator is to use docker. A pre-built
image of CAiMIRA is made available at https://gitlab.cern.ch/caimira/caimira/container_registry.
In order to run CAiMIRA locally with docker, run the following:
$ docker run -it -p 8080:8080 gitlab-registry.cern.ch/caimira/caimira/calculator
This will start a local version of CAiMIRA, which can be visited at http://localhost:8080/.
## Development guide
CAiMIRA is also mirrored to Github if you wish to collaborate on development and can be found at: https://github.com/CERN/caimira
### Installing CAiMIRA in editable mode
```
pip install -e . # At the root of the repository
```
### Running the Calculator app in development mode
```
python -m cern_caimira.apps.calculator
```
To run with a specific template theme created:
```
python -m cern_caimira.apps.calculator --theme=cern_caimira/apps/templates/{theme}
```
To run the entire app in a different `APPLICATION_ROOT` path:
```
python -m cern_caimira.apps.calculator --app_root=/myroot
```
To run the calculator on a different URL path:
```
python -m cern_caimira.apps.calculator --prefix=/mycalc
```
Each of these commands will start a local version of CAiMIRA, which can be visited at http://localhost:8080/.

View file

@ -7,4 +7,4 @@ nginx -c /opt/caimira/nginx.conf
cd /opt/caimira/src/caimira
# Run the calculator in the foreground.
/opt/caimira/app/bin/python -m caimira.apps.calculator --port 8081 --no-debug
/opt/caimira/app/bin/python -m ui.apps.calculator --port 8081 --no-debug

View file

@ -26,8 +26,8 @@ if [[ "$APP_NAME" == "calculator-app" ]]; then
export "DATA_SERVICE_ENABLED"="${DATA_SERVICE_ENABLED:=0}"
export "CAIMIRA_PROFILER_ENABLED"="${CAIMIRA_PROFILER_ENABLED:=0}"
echo "Starting the caimira webservice with: python -m caimira.apps.calculator ${args[@]}"
python -m caimira.apps.calculator "${args[@]}"
echo "Starting the caimira webservice with: python -m ui.apps.calculator ${args[@]}"
python -m ui.apps.calculator "${args[@]}"
else
echo "No APP_NAME specified"

View file

@ -1,6 +1,5 @@
version: "3.8"
services:
calculator-app:
image: calculator-app
environment:
@ -8,7 +7,7 @@ services:
- APP_NAME=calculator-app
- APPLICATION_ROOT=/
- CAIMIRA_CALCULATOR_PREFIX=/calculator-cern
- CAIMIRA_THEME=caimira/apps/templates/cern
- CAIMIRA_THEME=ui/apps/templates/cern
- DATA_SERVICE_ENABLED=0
- CAIMIRA_PROFILER_ENABLED=0
user: ${CURRENT_UID}

View file

@ -303,3 +303,4 @@
- name: PROJECT_NAME
description: The name of this project, e.g. caimira-test
required: true

View file

@ -14,4 +14,4 @@ sphinx:
python:
install:
- requirements: caimira/docs/requirements.txt
- requirements: caimira/docs/requirements.txt

View file

@ -1,4 +0,0 @@
from .expert import ExpertApplication
from .expert_co2 import CO2Application
__all__ = ['ExpertApplication', 'CO2Application']

110
caimira/pyproject.toml Normal file
View file

@ -0,0 +1,110 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "caimira"
version = "2.0.0"
description = "CAiMIRA - CERN Airborne Model for Indoor Risk Assessment"
readme = "README.md"
license = { text = "Apache-2.0" }
authors = [
{ name = "Andre Henriques", email = "andre.henriques@cern.ch" }
]
classifiers = [
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
"License :: OSI Approved :: Apache Software License",
]
requires-python = ">=3.9"
dependencies = [
"ipykernel",
"ipympl>=0.9.0",
"ipywidgets<8.0",
"Jinja2",
"loky",
"matplotlib",
"memoization",
"mistune",
"numpy",
"pandas",
"psutil",
"pyinstrument",
"pyjwt",
"python-dateutil",
"retry",
"ruptures",
"scipy",
"scikit-learn",
"timezonefinder",
"tornado",
"types-retry",
]
[project.optional-dependencies]
dev = []
test = [
"pytest",
"pytest-mypy >= 0.10.3",
"mypy >= 1.0.0",
"pytest-tornasync",
"numpy-stubs @ git+https://github.com/numpy/numpy-stubs.git",
"types-dataclasses",
"types-python-dateutil",
"types-requests"
]
doc = [
"sphinx",
"sphinx_rtd_theme"
]
[project.urls]
Homepage = "https://github.com/cern/caimira"
[tool.setuptools]
packages = ["caimira"]
package-dir = {"" = "src"}
[tool.pytest.ini_options]
addopts = "--mypy"
[tool.mypy]
no_warn_no_return = true
exclude = "caimira/profiler.py"
ignore_missing_imports = true # TODO what to do here?
[tool.mypy-loky]
ignore_missing_imports = true
[tool.mypy-ipympl]
ignore_missing_imports = true
[tool.mypy-ipywidgets]
ignore_missing_imports = true
[tool.mypy-matplotlib]
ignore_missing_imports = true
[tool.mypy-mistune]
ignore_missing_imports = true
[tool.mypy-qrcode]
ignore_missing_imports = true
[tool.mypy-scipy]
ignore_missing_imports = true
[tool.mypy-timezonefinder]
ignore_missing_imports = true
[tool.mypy-pandas]
ignore_missing_imports = true
[tool.mypy-pstats]
follow_imports = "skip"
[tool.mypy-tabulate]
ignore_missing_imports = true
[tool.mypy-ruptures]
ignore_missing_imports = true

2
caimira/setup.cfg Normal file
View file

@ -0,0 +1,2 @@
[tool:pytest]
addopts = --mypy

View file

@ -0,0 +1,3 @@
import importlib.metadata
__version__ = importlib.metadata.version(__package__ or __name__)

View file

View file

@ -0,0 +1,31 @@
# """
# Entry point for the CAiMIRA application
# """
import tornado.ioloop
import tornado.web
import tornado.log
from tornado.options import define, options
import logging
from caimira.api.routes.report_routes import ReportHandler
define("port", default=8088, help="Port to listen on", type=int)
logging.basicConfig(format="%(message)s", level=logging.INFO)
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/report", ReportHandler),
]
settings = dict(
debug=True,
)
super(Application, self).__init__(handlers, **settings)
if __name__ == "__main__":
app = Application()
app.listen(options.port)
logging.info(f"Tornado server is running on port {options.port}")
tornado.ioloop.IOLoop.current().start()

View file

@ -0,0 +1,30 @@
import concurrent.futures
import functools
from caimira.calculator.validators.virus.virus_validator import VirusFormData
from caimira.calculator.store.data_registry import DataRegistry
import caimira.calculator.report.report_generator as rg
def generate_form_obj(form_data, data_registry):
return VirusFormData.from_dict(form_data, data_registry)
def generate_model(form_obj):
return form_obj.build_model(250_000)
def generate_report_results(form_obj, model):
return rg.calculate_report_data(form=form_obj, model=model, executor_factory=functools.partial(
concurrent.futures.ThreadPoolExecutor, None, # TODO define report_parallelism
),)
def submit_virus_form(form_data):
data_registry = DataRegistry
form_obj = generate_form_obj(form_data, data_registry)
model = generate_model(form_obj)
report_data = generate_report_results(form_obj, model=model)
return report_data

View file

@ -0,0 +1,28 @@
import json
import traceback
import tornado.web
from caimira.api.controller.report_controller import submit_virus_form
class ReportHandler(tornado.web.RequestHandler):
def set_default_headers(self):
self.set_header("Access-Control-Allow-Origin", "*")
self.set_header("Access-Control-Allow-Headers", "x-requested-with")
self.set_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
def post(self):
try:
form_data = json.loads(self.request.body)
report_data = submit_virus_form(form_data)
response_data = {
"status": "success",
"message": "Results generated successfully",
"report_data": report_data,
}
self.write(response_data)
except Exception as e:
traceback.print_exc()
self.set_status(400)
self.write({"message": str(e)})

View file

Before

Width:  |  Height:  |  Size: 805 KiB

After

Width:  |  Height:  |  Size: 805 KiB

View file

@ -3,4 +3,4 @@ sphinx-rtd-theme==1.2.2
pillow==5.4.1
mock==1.0.1
commonmark==0.9.1
recommonmark==0.5.0
recommonmark==0.5.0

View file

@ -1,6 +1,6 @@
import numpy as np
from caimira import models
from caimira.data.weather import wx_data, nearest_wx_station
from caimira.calculator.models import models
from .weather import wx_data, nearest_wx_station
MONTH_NAMES = [
'January', 'February', 'March', 'April', 'May', 'June', 'July',

View file

@ -40,7 +40,7 @@ from scipy.interpolate import interp1d
import scipy.stats as sct
from scipy.optimize import minimize
from caimira.store.data_registry import DataRegistry
from caimira.calculator.store.data_registry import DataRegistry
if not typing.TYPE_CHECKING:
from memoization import cached

View file

@ -7,11 +7,11 @@ import numpy as np
from scipy import special as sp
from scipy.stats import weibull_min
from caimira.enums import ViralLoads
from ..enums import ViralLoads
import caimira.monte_carlo.models as mc
from caimira.monte_carlo.sampleable import LogCustom, LogNormal, Normal, LogCustomKernel, CustomKernel, Uniform, Custom
from caimira.store.data_registry import DataRegistry
import caimira.calculator.models.monte_carlo.models as mc
from caimira.calculator.models.monte_carlo.sampleable import LogCustom, LogNormal, Normal, LogCustomKernel, CustomKernel, Uniform, Custom
from caimira.calculator.store.data_registry import DataRegistry
def evaluate_vl(root: typing.Dict, value: str, data_registry: DataRegistry):

View file

@ -3,7 +3,7 @@ import dataclasses
import sys
import typing
import caimira.models
from caimira.calculator.models import models
from .sampleable import SampleableDistribution, _VectorisedFloatOrSampleable
@ -57,7 +57,7 @@ def _build_mc_model(model: dataclass_instance) -> typing.Type[MCModelBase[_Model
# Note: deepcopy not needed here as we aren't mutating entities beyond
# the top level.
new_field = copy.copy(field)
if field.type is caimira.models._VectorisedFloat: # noqa
if field.type is models._VectorisedFloat: # noqa
new_field.type = _VectorisedFloatOrSampleable # type: ignore
field_type: typing.Any = new_field.type
@ -65,30 +65,30 @@ def _build_mc_model(model: dataclass_instance) -> typing.Type[MCModelBase[_Model
if getattr(field_type, '__origin__', None) in [typing.Union, typing.Tuple]:
# It is challenging to generalise this code, so we provide specific transformations,
# and raise for unforseen cases.
if new_field.type == typing.Tuple[caimira.models._VentilationBase, ...]:
if new_field.type == typing.Tuple[models._VentilationBase, ...]:
VB = getattr(sys.modules[__name__], "_VentilationBase")
field_type = typing.Tuple[typing.Union[caimira.models._VentilationBase, VB], ...]
elif new_field.type == typing.Tuple[caimira.models._ExpirationBase, ...]:
field_type = typing.Tuple[typing.Union[models._VentilationBase, VB], ...]
elif new_field.type == typing.Tuple[models._ExpirationBase, ...]:
EB = getattr(sys.modules[__name__], "_ExpirationBase")
field_type = typing.Tuple[typing.Union[caimira.models._ExpirationBase, EB], ...]
elif new_field.type == typing.Tuple[caimira.models.SpecificInterval, ...]:
field_type = typing.Tuple[typing.Union[models._ExpirationBase, EB], ...]
elif new_field.type == typing.Tuple[models.SpecificInterval, ...]:
SI = getattr(sys.modules[__name__], "SpecificInterval")
field_type = typing.Tuple[typing.Union[caimira.models.SpecificInterval, SI], ...]
field_type = typing.Tuple[typing.Union[models.SpecificInterval, SI], ...]
elif new_field.type == typing.Union[int, caimira.models.IntPiecewiseConstant]:
elif new_field.type == typing.Union[int, models.IntPiecewiseConstant]:
IPC = getattr(sys.modules[__name__], "IntPiecewiseConstant")
field_type = typing.Union[int, caimira.models.IntPiecewiseConstant, IPC]
elif new_field.type == typing.Union[caimira.models.Interval, None]:
field_type = typing.Union[int, models.IntPiecewiseConstant, IPC]
elif new_field.type == typing.Union[models.Interval, None]:
I = getattr(sys.modules[__name__], "Interval")
field_type = typing.Union[None, caimira.models.Interval, I]
field_type = typing.Union[None, models.Interval, I]
else:
# Check that we don't need to do anything with this type.
for item in new_field.type.__args__:
if getattr(item, '__module__', None) == 'caimira.models':
if getattr(item, '__module__', None) == 'source.models.models':
raise ValueError(
f"unsupported type annotation transformation required for {new_field.type}")
elif field_type.__module__ == 'caimira.models':
elif field_type.__module__ == 'source.models.models':
mc_model = getattr(sys.modules[__name__], new_field.type.__name__)
field_type = typing.Union[new_field.type, mc_model]
@ -119,7 +119,7 @@ def _build_mc_model(model: dataclass_instance) -> typing.Type[MCModelBase[_Model
_MODEL_CLASSES = [
cls for cls in vars(caimira.models).values()
cls for cls in vars(models).values()
if dataclasses.is_dataclass(cls)
]

View file

@ -3,7 +3,7 @@ import typing
import numpy as np
from sklearn.neighbors import KernelDensity # type: ignore
import caimira.models
from caimira.calculator.models import models
# Declare a float array type of a given size.
# There is no better way to declare this currently, unfortunately.
@ -158,5 +158,5 @@ class LogCustomKernel(SampleableDistribution):
_VectorisedFloatOrSampleable = typing.Union[
SampleableDistribution, caimira.models._VectorisedFloat,
SampleableDistribution, models._VectorisedFloat,
]

View file

@ -0,0 +1 @@
# Move here the backend logic.

View file

@ -1,26 +1,17 @@
import concurrent.futures
import base64
import dataclasses
from datetime import datetime
import io
import json
import typing
import urllib
import zlib
import jinja2
import numpy as np
import matplotlib.pyplot as plt
from caimira import models
from caimira.apps.calculator import markdown_tools
from caimira.profiler import profile
from caimira.store.data_registry import DataRegistry
from ... import monte_carlo as mc
from .model_generator import VirusFormData
from ... import dataclass_utils
from caimira.enums import ViralLoads
from caimira.calculator.models import models
# from caimira.apps.calculator import markdown_tools
# from caimira.profiler import profile
from caimira.calculator.validators.virus.virus_validator import VirusFormData
from caimira.calculator.models import dataclass_utils
from caimira.calculator.models.enums import ViralLoads
def model_start_end(model: models.ExposureModel):
t_start = min(model.exposed.presence_interval().boundaries()[0][0],
@ -82,7 +73,6 @@ def non_temp_transition_times(model: models.ExposureModel):
# such as PeriodicIntervals, which extend beyond the model itself).
return sorted(time for time in change_times if (t_start <= time <= t_end))
def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional[int] = None) -> typing.List[float]:
"""
Pick approximately ``approx_n_pts`` time points which are interesting for the
@ -104,6 +94,7 @@ def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional
return nice_times
def concentrations_with_sr_breathing(form: VirusFormData, model: models.ExposureModel, times: typing.List[float], short_range_intervals: typing.List) -> typing.List[float]:
lower_concentrations = []
for time in times:
@ -115,16 +106,20 @@ def concentrations_with_sr_breathing(form: VirusFormData, model: models.Exposure
lower_concentrations.append(np.array(model.concentration_model.concentration(float(time))).mean())
return lower_concentrations
def _calculate_deposited_exposure(model, time1, time2, fn_name=None):
return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(),fn_name
def _calculate_long_range_deposited_exposure(model, time1, time2, fn_name=None):
return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name
def _calculate_co2_concentration(CO2_model, time, fn_name=None):
return np.array(CO2_model.concentration(float(time))).mean(), fn_name
@profile
# @profile
def calculate_report_data(form: VirusFormData, model: models.ExposureModel, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]:
times = interesting_times(model)
short_range_intervals = [interaction.presence.boundaries()[0] for interaction in model.short_range]
@ -203,25 +198,6 @@ def calculate_report_data(form: VirusFormData, model: models.ExposureModel, exec
}
def generate_permalink(base_url, get_root_url, get_root_calculator_url, form: VirusFormData):
form_dict = VirusFormData.to_dict(form, strip_defaults=True)
# Generate the calculator URL arguments that would be needed to re-create this
# form.
args = urllib.parse.urlencode(form_dict)
# Then zlib compress + base64 encode the string. To be inverted by the
# /_c/ endpoint.
compressed_args = base64.b64encode(zlib.compress(args.encode())).decode()
qr_url = f"{base_url}{get_root_url()}/_c/{compressed_args}"
url = f"{base_url}{get_root_calculator_url()}?{args}"
return {
'link': url,
'shortened': qr_url,
}
def conditional_prob_inf_given_vl_dist(
infection_probability: models._VectorisedFloat,
viral_loads: np.ndarray,
@ -319,13 +295,6 @@ def uncertainties_plot(infection_probability: models._VectorisedFloat,
return fig
def _img2bytes(figure):
# Draw the image
img_data = io.BytesIO()
figure.save(img_data, format='png', bbox_inches="tight")
return img_data
def _figure2bytes(figure):
# Draw the image
img_data = io.BytesIO()
@ -338,249 +307,3 @@ def img2base64(img_data) -> str:
pic_hash = base64.b64encode(img_data.read()).decode('ascii')
# A src suitable for a tag such as f'<img id="scenario_concentration_plot" src="{result}">.
return f'data:image/png;base64,{pic_hash}'
def minutes_to_time(minutes: int) -> str:
minute_string = str(minutes % 60)
minute_string = "0" * (2 - len(minute_string)) + minute_string
hour_string = str(minutes // 60)
hour_string = "0" * (2 - len(hour_string)) + hour_string
return f"{hour_string}:{minute_string}"
def readable_minutes(minutes: int) -> str:
time = float(minutes)
unit = " minute"
if time % 60 == 0:
time = minutes/60
unit = " hour"
if time != 1:
unit += "s"
if time.is_integer():
time_str = "{:0.0f}".format(time)
else:
time_str = "{0:.2f}".format(time)
return time_str + unit
def hour_format(hour: float) -> str:
# Convert float hour to HH:MM format
hours = f"{int(hour):02}"
minutes = f"{int(hour % 1 * 60):02}"
return f"{hours}:{minutes}"
def percentage(absolute: float) -> float:
return absolute * 100
def non_zero_percentage(percentage: int) -> str:
if percentage < 0.01:
return "<0.01%"
elif percentage < 1:
return "{:0.2f}%".format(percentage)
elif percentage > 99.9 or np.isnan(percentage):
return ">99.9%"
else:
return "{:0.1f}%".format(percentage)
def manufacture_viral_load_scenarios_percentiles(model: mc.ExposureModel) -> typing.Dict[str, mc.ExposureModel]:
viral_load = model.concentration_model.infected.virus.viral_load_in_sputum
scenarios = {}
for percentil in (0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99):
vl = np.quantile(viral_load, percentil)
specific_vl_scenario = dataclass_utils.nested_replace(model,
{'concentration_model.infected.virus.viral_load_in_sputum': vl}
)
scenarios[str(vl)] = np.mean(specific_vl_scenario.infection_probability())
return scenarios
def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]:
scenarios = {}
if (form.short_range_option == "short_range_no"):
# Two special option cases - HEPA and/or FFP2 masks.
FFP2_being_worn = bool(form.mask_wearing_option == 'mask_on' and form.mask_type == 'FFP2')
if FFP2_being_worn and form.hepa_option:
FFP2andHEPAalternative = dataclass_utils.replace(form, mask_type='Type I')
if not (form.hepa_option and form.mask_wearing_option == 'mask_on' and form.mask_type == 'Type I'):
scenarios['Base scenario with HEPA filter and Type I masks'] = FFP2andHEPAalternative.build_mc_model()
if not FFP2_being_worn and form.hepa_option:
noHEPAalternative = dataclass_utils.replace(form, mask_type = 'FFP2')
noHEPAalternative = dataclass_utils.replace(noHEPAalternative, mask_wearing_option = 'mask_on')
noHEPAalternative = dataclass_utils.replace(noHEPAalternative, hepa_option=False)
if not (not form.hepa_option and FFP2_being_worn):
scenarios['Base scenario without HEPA filter, with FFP2 masks'] = noHEPAalternative.build_mc_model()
# The remaining scenarios are based on Type I masks (possibly not worn)
# and no HEPA filtration.
form = dataclass_utils.replace(form, mask_type='Type I')
if form.hepa_option:
form = dataclass_utils.replace(form, hepa_option=False)
with_mask = dataclass_utils.replace(form, mask_wearing_option='mask_on')
without_mask = dataclass_utils.replace(form, mask_wearing_option='mask_off')
if form.ventilation_type == 'mechanical_ventilation':
#scenarios['Mechanical ventilation with Type I masks'] = with_mask.build_mc_model()
if not (form.mask_wearing_option == 'mask_off'):
scenarios['Mechanical ventilation without masks'] = without_mask.build_mc_model()
elif form.ventilation_type == 'natural_ventilation':
#scenarios['Windows open with Type I masks'] = with_mask.build_mc_model()
if not (form.mask_wearing_option == 'mask_off'):
scenarios['Windows open without masks'] = without_mask.build_mc_model()
# No matter the ventilation scheme, we include scenarios which don't have any ventilation.
with_mask_no_vent = dataclass_utils.replace(with_mask, ventilation_type='no_ventilation')
without_mask_or_vent = dataclass_utils.replace(without_mask, ventilation_type='no_ventilation')
if not (form.mask_wearing_option == 'mask_on' and form.mask_type == 'Type I' and form.ventilation_type == 'no_ventilation'):
scenarios['No ventilation with Type I masks'] = with_mask_no_vent.build_mc_model()
if not (form.mask_wearing_option == 'mask_off' and form.ventilation_type == 'no_ventilation'):
scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model()
else:
no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[], total_people=form.total_people - form.short_range_occupants)
scenarios['Base scenario without short-range interactions'] = no_short_range_alternative.build_mc_model()
return scenarios
def scenario_statistics(
mc_model: mc.ExposureModel,
sample_times: typing.List[float],
compute_prob_exposure: bool
):
model = mc_model.build_model(size=mc_model.data_registry.monte_carlo['sample_size'])
if (compute_prob_exposure):
# It means we have data to calculate the total_probability_rule
prob_probabilistic_exposure = model.total_probability_rule()
else:
prob_probabilistic_exposure = 0.
return {
'probability_of_infection': np.mean(model.infection_probability()),
'expected_new_cases': np.mean(model.expected_new_cases()),
'concentrations': [
np.mean(model.concentration(time))
for time in sample_times
],
'prob_probabilistic_exposure': prob_probabilistic_exposure,
}
def comparison_report(
form: VirusFormData,
report_data: typing.Dict[str, typing.Any],
scenarios: typing.Dict[str, mc.ExposureModel],
sample_times: typing.List[float],
executor_factory: typing.Callable[[], concurrent.futures.Executor],
):
if (form.short_range_option == "short_range_no"):
statistics = {
'Current scenario' : {
'probability_of_infection': report_data['prob_inf'],
'expected_new_cases': report_data['expected_new_cases'],
'concentrations': report_data['concentrations'],
}
}
else:
statistics = {}
if (form.short_range_option == "short_range_yes" and form.exposure_option == "p_probabilistic_exposure"):
compute_prob_exposure = True
else:
compute_prob_exposure = False
with executor_factory() as executor:
results = executor.map(
scenario_statistics,
scenarios.values(),
[sample_times] * len(scenarios),
[compute_prob_exposure] * len(scenarios),
timeout=60,
)
for (name, model), model_stats in zip(scenarios.items(), results):
statistics[name] = model_stats
return {
'stats': statistics,
}
@dataclasses.dataclass
class ReportGenerator:
jinja_loader: jinja2.BaseLoader
get_root_url: typing.Any
get_root_calculator_url: typing.Any
def build_report(
self,
base_url: str,
form: VirusFormData,
executor_factory: typing.Callable[[], concurrent.futures.Executor],
) -> str:
model = form.build_model()
context = self.prepare_context(base_url, model, form, executor_factory=executor_factory)
return self.render(context)
def prepare_context(
self,
base_url: str,
model: models.ExposureModel,
form: VirusFormData,
executor_factory: typing.Callable[[], concurrent.futures.Executor],
) -> dict:
now = datetime.utcnow().astimezone()
time = now.strftime("%Y-%m-%d %H:%M:%S UTC")
data_registry_version = f"v{model.data_registry.version}" if model.data_registry.version else None
context = {
'model': model,
'form': form,
'creation_date': time,
'data_registry_version': data_registry_version,
}
scenario_sample_times = interesting_times(model)
report_data = calculate_report_data(form, model, executor_factory=executor_factory)
context.update(report_data)
alternative_scenarios = manufacture_alternative_scenarios(form)
context['alternative_viral_load'] = manufacture_viral_load_scenarios_percentiles(model) if form.conditional_probability_viral_loads else None
context['alternative_scenarios'] = comparison_report(
form, report_data, alternative_scenarios, scenario_sample_times, executor_factory=executor_factory,
)
context['permalink'] = generate_permalink(base_url, self.get_root_url, self.get_root_calculator_url, form)
context['get_url'] = self.get_root_url
context['get_calculator_url'] = self.get_root_calculator_url
return context
def _template_environment(self) -> jinja2.Environment:
env = jinja2.Environment(
loader=self.jinja_loader,
undefined=jinja2.StrictUndefined,
)
env.globals["common_text"] = markdown_tools.extract_rendered_markdown_blocks(
env.get_template('common_text.md.j2')
)
env.filters['non_zero_percentage'] = non_zero_percentage
env.filters['readable_minutes'] = readable_minutes
env.filters['minutes_to_time'] = minutes_to_time
env.filters['hour_format'] = hour_format
env.filters['float_format'] = "{0:.2f}".format
env.filters['int_format'] = "{:0.0f}".format
env.filters['percentage'] = percentage
env.filters['JSONify'] = json.dumps
return env
def render(self, context: dict) -> str:
template = self._template_environment().get_template("calculator.report.html.j2")
return template.render(**context, text_blocks=template.globals["common_text"])

View file

@ -1,4 +1,4 @@
from caimira.enums import ViralLoads
from ..models.enums import ViralLoads
class DataRegistry:

View file

@ -2,7 +2,7 @@ import logging
import typing
import requests
from caimira.store.data_registry import DataRegistry
from ..store.data_registry import DataRegistry
logger = logging.getLogger("DATA")

View file

@ -7,11 +7,11 @@ from scipy.signal import find_peaks
import pandas as pd
import re
from caimira import models
from caimira.store.data_registry import DataRegistry
from .form_data import FormData, cast_class_fields
from .defaults import NO_DEFAULT
from .report_generator import img2base64, _figure2bytes
from ..form_validator import FormData, cast_class_fields
from ..defaults import NO_DEFAULT
from ...store.data_registry import DataRegistry
from ...models import models
from ...report.report_generator import img2base64, _figure2bytes
minutes_since_midnight = typing.NewType('minutes_since_midnight', int)

View file

@ -7,9 +7,9 @@ import json
import numpy as np
from caimira import models
from caimira.store.data_registry import DataRegistry
from .defaults import DEFAULTS, NO_DEFAULT, COFFEE_OPTIONS_INT
from ..models import models
from ..store.data_registry import DataRegistry
LOG = logging.getLogger(__name__)
@ -26,13 +26,16 @@ class FormData:
exposed_lunch_option: bool
exposed_lunch_start: minutes_since_midnight
exposed_start: minutes_since_midnight
infected_coffee_break_option: str #Used if infected_dont_have_breaks_with_exposed
infected_coffee_duration: int #Used if infected_dont_have_breaks_with_exposed
# Used if infected_dont_have_breaks_with_exposed
infected_coffee_break_option: str
infected_coffee_duration: int # Used if infected_dont_have_breaks_with_exposed
infected_dont_have_breaks_with_exposed: bool
infected_finish: minutes_since_midnight
infected_lunch_finish: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
infected_lunch_option: bool #Used if infected_dont_have_breaks_with_exposed
infected_lunch_start: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
# Used if infected_dont_have_breaks_with_exposed
infected_lunch_finish: minutes_since_midnight
infected_lunch_option: bool # Used if infected_dont_have_breaks_with_exposed
# Used if infected_dont_have_breaks_with_exposed
infected_lunch_start: minutes_since_midnight
infected_people: int
infected_start: minutes_since_midnight
room_volume: float
@ -47,7 +50,6 @@ class FormData:
# Take a copy of the form data so that we can mutate it.
form_data = form_data.copy()
form_data.pop('_xsrf', None)
# Don't let arbitrary unescaped HTML through the net.
for key, value in form_data.items():
if isinstance(value, str):
@ -64,7 +66,8 @@ class FormData:
form_data[key] = _CAST_RULES_FORM_ARG_TO_NATIVE[key](value)
if key not in cls._DEFAULTS:
raise ValueError(f'Invalid argument "{html.escape(key)}" given')
raise ValueError(
f'Invalid argument "{html.escape(key)}" given')
instance = cls(**form_data, data_registry=data_registry)
instance.validate()
@ -93,7 +96,8 @@ class FormData:
def validate_population_parameters(self):
# Validate number of infected <= number of total people
if self.infected_people >= self.total_people:
raise ValueError('Number of infected people cannot be greater or equal to the number of total people.')
raise ValueError(
'Number of infected people cannot be greater or equal to the number of total people.')
# Validate time intervals selected by user
time_intervals = [
@ -101,9 +105,11 @@ class FormData:
['infected_start', 'infected_finish'],
]
if self.exposed_lunch_option:
time_intervals.append(['exposed_lunch_start', 'exposed_lunch_finish'])
time_intervals.append(
['exposed_lunch_start', 'exposed_lunch_finish'])
if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option:
time_intervals.append(['infected_lunch_start', 'infected_lunch_finish'])
time_intervals.append(
['infected_lunch_start', 'infected_lunch_finish'])
for start_name, end_name in time_intervals:
start = getattr(self, start_name)
@ -116,29 +122,33 @@ class FormData:
lunch_start = getattr(self, f'{population}_lunch_start')
lunch_finish = getattr(self, f'{population}_lunch_finish')
return (start <= lunch_start <= finish and
start <= lunch_finish <= finish)
start <= lunch_finish <= finish)
def get_lunch_mins(population):
lunch_mins = 0
if getattr(self, f'{population}_lunch_option'):
lunch_mins = getattr(self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start')
lunch_mins = getattr(
self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start')
return lunch_mins
def get_coffee_mins(population):
coffee_mins = 0
if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0':
coffee_mins = COFFEE_OPTIONS_INT[getattr(self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration')
coffee_mins = COFFEE_OPTIONS_INT[getattr(
self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration')
return coffee_mins
def get_activity_mins(population):
return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start')
populations = ['exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed']
populations = [
'exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed']
for population in populations:
# Validate lunch time within the activity times.
if (getattr(self, f'{population}_lunch_option') and
not validate_lunch(getattr(self, f'{population}_start'), getattr(self, f'{population}_finish'))
):
not validate_lunch(getattr(self, f'{population}_start'), getattr(
self, f'{population}_finish'))
):
raise ValueError(
f"{population} lunch break must be within presence times."
)
@ -152,7 +162,8 @@ class FormData:
for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT),
('infected_coffee_break_option', COFFEE_OPTIONS_INT)]:
if getattr(self, attr_name) not in valid_set:
raise ValueError(f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
raise ValueError(
f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
def validate(self):
raise NotImplementedError("Subclass must implement")
@ -161,7 +172,8 @@ class FormData:
raise NotImplementedError("Subclass must implement")
def _compute_breaks_in_interval(self, start, finish, n_breaks, duration) -> models.BoundarySequence_t:
break_delay = ((finish - start) - (n_breaks * duration)) // (n_breaks+1)
break_delay = ((finish - start) -
(n_breaks * duration)) // (n_breaks+1)
break_times = []
end = start
for n in range(n_breaks):
@ -173,14 +185,16 @@ class FormData:
def exposed_lunch_break_times(self) -> models.BoundarySequence_t:
result = []
if self.exposed_lunch_option:
result.append((self.exposed_lunch_start, self.exposed_lunch_finish))
result.append((self.exposed_lunch_start,
self.exposed_lunch_finish))
return tuple(result)
def infected_lunch_break_times(self) -> models.BoundarySequence_t:
if self.infected_dont_have_breaks_with_exposed:
result = []
if self.infected_lunch_option:
result.append((self.infected_lunch_start, self.infected_lunch_finish))
result.append((self.infected_lunch_start,
self.infected_lunch_finish))
return tuple(result)
else:
return self.exposed_lunch_break_times()
@ -194,7 +208,8 @@ class FormData:
def _coffee_break_times(self, activity_start, activity_finish, coffee_breaks, coffee_duration, lunch_start, lunch_finish) -> models.BoundarySequence_t:
time_before_lunch = lunch_start - activity_start
time_after_lunch = activity_finish - lunch_finish
before_lunch_frac = time_before_lunch / (time_before_lunch + time_after_lunch)
before_lunch_frac = time_before_lunch / \
(time_before_lunch + time_after_lunch)
n_morning_breaks = round(coffee_breaks * before_lunch_frac)
breaks = (
self._compute_breaks_in_interval(
@ -211,9 +226,11 @@ class FormData:
if exposed_coffee_breaks == 0:
return ()
if self.exposed_lunch_option:
breaks = self._coffee_break_times(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration, self.exposed_lunch_start, self.exposed_lunch_finish)
breaks = self._coffee_break_times(self.exposed_start, self.exposed_finish, exposed_coffee_breaks,
self.exposed_coffee_duration, self.exposed_lunch_start, self.exposed_lunch_finish)
else:
breaks = self._compute_breaks_in_interval(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration)
breaks = self._compute_breaks_in_interval(
self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration)
return breaks
def infected_coffee_break_times(self) -> models.BoundarySequence_t:
@ -222,9 +239,11 @@ class FormData:
if infected_coffee_breaks == 0:
return ()
if self.infected_lunch_option:
breaks = self._coffee_break_times(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration, self.infected_lunch_start, self.infected_lunch_finish)
breaks = self._coffee_break_times(self.infected_start, self.infected_finish, infected_coffee_breaks,
self.infected_coffee_duration, self.infected_lunch_start, self.infected_lunch_finish)
else:
breaks = self._compute_breaks_in_interval(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration)
breaks = self._compute_breaks_in_interval(
self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration)
return breaks
else:
return self.exposed_coffee_break_times()
@ -232,13 +251,14 @@ class FormData:
def generate_specific_break_times(self, breaks_dict: dict, target: str) -> models.BoundarySequence_t:
break_times = []
for n in breaks_dict[f'{target}_breaks']:
# Parse break times.
# Parse break times.
begin = time_string_to_minutes(n["start_time"])
end = time_string_to_minutes(n["finish_time"])
for time in [begin, end]:
# For a specific break, the infected and exposed presence is the same.
if not getattr(self, f'{target}_start') < time < getattr(self, f'{target}_finish'):
raise ValueError(f'All breaks should be within the simulation time. Got {time_minutes_to_string(time)}.')
raise ValueError(
f'All breaks should be within the simulation time. Got {time_minutes_to_string(time)}.')
break_times.append((begin, end))
return tuple(break_times)
@ -260,7 +280,8 @@ class FormData:
# Order the breaks by their start-time, and ensure that they are monotonic
# and that the start of one break happens after the end of another.
break_boundaries: models.BoundarySequence_t = tuple(sorted(breaks, key=lambda break_pair: break_pair[0]))
break_boundaries: models.BoundarySequence_t = tuple(
sorted(breaks, key=lambda break_pair: break_pair[0]))
for break_start, break_end in break_boundaries:
if break_start >= break_end:
@ -269,13 +290,15 @@ class FormData:
prev_break_end = break_boundaries[0][1]
for break_start, break_end in break_boundaries[1:]:
if prev_break_end >= break_start:
raise ValueError(f"A break starts before another ends ({break_start}, {break_end}, {prev_break_end}).")
raise ValueError(
f"A break starts before another ends ({break_start}, {break_end}, {prev_break_end}).")
prev_break_end = break_end
present_intervals = []
current_time = start
LOG.debug(f"starting time march at {_hours2timestring(current_time/60)} to {_hours2timestring(finish/60)}")
LOG.debug(
f"starting time march at {_hours2timestring(current_time/60)} to {_hours2timestring(finish/60)}")
# As we step through the breaks. For each break there are 6 important cases
# we must cover. Let S=start; E=end; Bs=Break start; Be=Break end:
@ -336,8 +359,9 @@ class FormData:
return models.SpecificInterval(tuple(present_intervals))
def infected_present_interval(self) -> models.Interval:
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(breaks_dict=self.specific_breaks, target='exposed')
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(
breaks_dict=self.specific_breaks, target='exposed')
else:
breaks = self.infected_lunch_break_times() + self.infected_coffee_break_times()
return self.present_interval(
@ -346,14 +370,17 @@ class FormData:
)
def population_present_interval(self) -> models.Interval:
state_change_times = set(self.infected_present_interval().transition_times())
state_change_times.update(self.exposed_present_interval().transition_times())
state_change_times = set(
self.infected_present_interval().transition_times())
state_change_times.update(
self.exposed_present_interval().transition_times())
all_state_changes = sorted(state_change_times)
return models.SpecificInterval(tuple(zip(all_state_changes[:-1], all_state_changes[1:])))
def exposed_present_interval(self) -> models.Interval:
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(breaks_dict=self.specific_breaks, target='exposed')
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(
breaks_dict=self.specific_breaks, target='exposed')
else:
breaks = self.exposed_lunch_break_times() + self.exposed_coffee_break_times()
return self.present_interval(
@ -382,7 +409,7 @@ def time_minutes_to_string(time: int) -> str:
:param time: The number of minutes between 'time' and 00:00
:return: A string of the form "HH:MM" representing a time of day
"""
return "{0:0=2d}".format(int(time/60)) + ":" + "{0:0=2d}".format(time%60)
return "{0:0=2d}".format(int(time/60)) + ":" + "{0:0=2d}".format(time % 60)
def string_to_list(s: str) -> list:
@ -426,6 +453,7 @@ _CAST_RULES_FORM_ARG_TO_NATIVE: typing.Dict[str, typing.Callable] = {}
#: that can be encoded to URL arguments.
_CAST_RULES_NATIVE_TO_FORM_ARG: typing.Dict[str, typing.Callable] = {}
def cast_class_fields(cls):
for _field in dataclasses.fields(cls):
if _field.type is minutes_since_midnight:
@ -447,4 +475,5 @@ def cast_class_fields(cls):
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_dict
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = dict_to_string
cast_class_fields(FormData)

View file

@ -6,17 +6,13 @@ import re
import numpy as np
from caimira import models
from caimira import data
import caimira.data.weather
import caimira.monte_carlo as mc
from .. import calculator
from .form_data import FormData, cast_class_fields, time_string_to_minutes
from caimira.monte_carlo.data import activity_distributions, virus_distributions, mask_distributions, short_range_distances
from caimira.monte_carlo.data import expiration_distribution, expiration_BLO_factors, expiration_distributions, short_range_expiration_distributions
from .defaults import (DEFAULTS, CONFIDENCE_LEVEL_OPTIONS,
MECHANICAL_VENTILATION_TYPES, MASK_WEARING_OPTIONS, MONTH_NAMES, VACCINE_BOOSTER_TYPE, VACCINE_TYPE,
VENTILATION_TYPES, VOLUME_TYPES, WINDOWS_OPENING_REGIMES, WINDOWS_TYPES)
from ..form_validator import FormData, cast_class_fields, time_string_to_minutes
from ..defaults import (DEFAULTS, CONFIDENCE_LEVEL_OPTIONS,
MECHANICAL_VENTILATION_TYPES, MASK_WEARING_OPTIONS, MONTH_NAMES, VACCINE_BOOSTER_TYPE, VACCINE_TYPE,
VENTILATION_TYPES, VOLUME_TYPES, WINDOWS_OPENING_REGIMES, WINDOWS_TYPES)
from ...models import models, data, monte_carlo as mc
from ...models.monte_carlo.data import activity_distributions, virus_distributions, mask_distributions, short_range_distances
from ...models.monte_carlo.data import expiration_distribution, expiration_BLO_factors, expiration_distributions, short_range_expiration_distributions
LOG = logging.getLogger("MODEL")
@ -76,15 +72,17 @@ class VirusFormData(FormData):
_DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS
def validate(self):
# Validate population parameters
self.validate_population_parameters()
validation_tuples = [('activity_type', self.data_registry.population_scenario_activity.keys()),
('mechanical_ventilation_type', MECHANICAL_VENTILATION_TYPES),
('mask_type', list(mask_distributions(self.data_registry).keys())),
('mechanical_ventilation_type',
MECHANICAL_VENTILATION_TYPES),
('mask_type', list(mask_distributions(
self.data_registry).keys())),
('mask_wearing_option', MASK_WEARING_OPTIONS),
('ventilation_type', VENTILATION_TYPES),
('virus_type', list(virus_distributions(self.data_registry).keys())),
('virus_type', list(virus_distributions(
self.data_registry).keys())),
('volume_type', VOLUME_TYPES),
('window_opening_regime', WINDOWS_OPENING_REGIMES),
('window_type', WINDOWS_TYPES),
@ -95,11 +93,13 @@ class VirusFormData(FormData):
for attr_name, valid_set in validation_tuples:
if getattr(self, attr_name) not in valid_set:
raise ValueError(f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
raise ValueError(
f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
# Validate number of infected people == 1 when activity is Conference/Training.
if self.activity_type == 'training' and self.infected_people > 1:
raise ValueError('Conference/Training activities are limited to 1 infected.')
raise ValueError(
'Conference/Training activities are limited to 1 infected.')
# Validate ventilation parameters
if self.ventilation_type == 'natural_ventilation':
@ -114,7 +114,7 @@ class VirusFormData(FormData):
"ventilation_type is 'natural_ventilation'"
)
if (self.window_opening_regime == 'windows_open_periodically' and
self.windows_duration > self.windows_frequency):
self.windows_duration > self.windows_frequency):
raise ValueError(
'Duration cannot be bigger than frequency.'
)
@ -127,61 +127,78 @@ class VirusFormData(FormData):
# Validate specific inputs - breaks (exposed and infected)
if self.specific_breaks != {}:
if type(self.specific_breaks) is not dict:
raise TypeError('The specific breaks should be in a dictionary.')
raise TypeError(
'The specific breaks should be in a dictionary.')
dict_keys = list(self.specific_breaks.keys())
if "exposed_breaks" not in dict_keys:
raise TypeError(f'Unable to fetch "exposed_breaks" key. Got "{dict_keys[0]}".')
raise TypeError(
f'Unable to fetch "exposed_breaks" key. Got "{dict_keys[0]}".')
if "infected_breaks" not in dict_keys:
raise TypeError(f'Unable to fetch "infected_breaks" key. Got "{dict_keys[1]}".')
raise TypeError(
f'Unable to fetch "infected_breaks" key. Got "{dict_keys[1]}".')
for population_breaks in ['exposed_breaks', 'infected_breaks']:
if self.specific_breaks[population_breaks] != []:
if type(self.specific_breaks[population_breaks]) is not list:
raise TypeError(f'All breaks should be in a list. Got {type(self.specific_breaks[population_breaks])}.')
raise TypeError(
f'All breaks should be in a list. Got {type(self.specific_breaks[population_breaks])}.')
for input_break in self.specific_breaks[population_breaks]:
# Input validations.
if type(input_break) is not dict:
raise TypeError(f'Each break should be a dictionary. Got {type(input_break)}.')
raise TypeError(
f'Each break should be a dictionary. Got {type(input_break)}.')
dict_keys = list(input_break.keys())
if "start_time" not in input_break:
raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys[0]}".')
raise TypeError(
f'Unable to fetch "start_time" key. Got "{dict_keys[0]}".')
if "finish_time" not in input_break:
raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys[1]}".')
raise TypeError(
f'Unable to fetch "finish_time" key. Got "{dict_keys[1]}".')
for time in input_break.values():
if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time):
raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".')
raise TypeError(
f'Wrong time format - "HH:MM". Got "{time}".')
# Validate specific inputs - precise activity
if self.precise_activity != {}:
if type(self.precise_activity) is not dict:
raise TypeError('The precise activities should be in a dictionary.')
raise TypeError(
'The precise activities should be in a dictionary.')
dict_keys = list(self.precise_activity.keys())
if "physical_activity" not in dict_keys:
raise TypeError(f'Unable to fetch "physical_activity" key. Got "{dict_keys[0]}".')
raise TypeError(
f'Unable to fetch "physical_activity" key. Got "{dict_keys[0]}".')
if "respiratory_activity" not in dict_keys:
raise TypeError(f'Unable to fetch "respiratory_activity" key. Got "{dict_keys[1]}".')
raise TypeError(
f'Unable to fetch "respiratory_activity" key. Got "{dict_keys[1]}".')
if type(self.precise_activity['physical_activity']) is not str:
raise TypeError('The physical activities should be a single string.')
raise TypeError(
'The physical activities should be a single string.')
if type(self.precise_activity['respiratory_activity']) is not list:
raise TypeError('The respiratory activities should be in a list.')
raise TypeError(
'The respiratory activities should be in a list.')
total_percentage = 0
for respiratory_activity in self.precise_activity['respiratory_activity']:
if type(respiratory_activity) is not dict:
raise TypeError('Each respiratory activity should be defined in a dictionary.')
raise TypeError(
'Each respiratory activity should be defined in a dictionary.')
dict_keys = list(respiratory_activity.keys())
if "type" not in dict_keys:
raise TypeError(f'Unable to fetch "type" key. Got "{dict_keys[0]}".')
raise TypeError(
f'Unable to fetch "type" key. Got "{dict_keys[0]}".')
if "percentage" not in dict_keys:
raise TypeError(f'Unable to fetch "percentage" key. Got "{dict_keys[1]}".')
raise TypeError(
f'Unable to fetch "percentage" key. Got "{dict_keys[1]}".')
total_percentage += respiratory_activity['percentage']
if total_percentage != 100:
raise ValueError(f'The sum of all respiratory activities should be 100. Got {total_percentage}.')
raise ValueError(
f'The sum of all respiratory activities should be 100. Got {total_percentage}.')
# Validate number of people with short-range interactions
max_occupants_for_sr = self.total_people - self.infected_people
@ -218,7 +235,8 @@ class VirusFormData(FormData):
for interaction in self.short_range_interactions:
short_range.append(mc.ShortRangeModel(
data_registry=self.data_registry,
expiration=short_range_expiration_distributions(self.data_registry)[interaction['expiration']],
expiration=short_range_expiration_distributions(
self.data_registry)[interaction['expiration']],
activity=infected_population.activity,
presence=self.short_range_interval(interaction),
distance=short_range_distances(self.data_registry),
@ -233,7 +251,7 @@ class VirusFormData(FormData):
infected=infected_population,
evaporation_factor=0.3,
),
short_range = tuple(short_range),
short_range=tuple(short_range),
exposed=self.exposed_population(),
geographical_data=mc.Cases(
geographic_population=self.geographic_population,
@ -249,11 +267,14 @@ class VirusFormData(FormData):
def build_CO2_model(self, sample_size=None) -> models.CO2ConcentrationModel:
sample_size = sample_size or self.data_registry.monte_carlo['sample_size']
infected_population: models.InfectedPopulation = self.infected_population().build_model(sample_size)
infected_population: models.InfectedPopulation = self.infected_population(
).build_model(sample_size)
exposed_population: models.Population = self.exposed_population().build_model(sample_size)
state_change_times = set(infected_population.presence_interval().transition_times())
state_change_times.update(exposed_population.presence_interval().transition_times())
state_change_times = set(
infected_population.presence_interval().transition_times())
state_change_times.update(
exposed_population.presence_interval().transition_times())
transition_times = sorted(state_change_times)
total_people = [infected_population.people_present(stop) + exposed_population.people_present(stop)
@ -262,10 +283,12 @@ class VirusFormData(FormData):
if (self.activity_type == 'precise'):
activity_defn, _ = self.generate_precise_activity_expiration()
else:
activity_defn = self.data_registry.population_scenario_activity[self.activity_type]['activity']
activity_defn = self.data_registry.population_scenario_activity[
self.activity_type]['activity']
population = mc.SimplePopulation(
number=models.IntPiecewiseConstant(transition_times=tuple(transition_times), values=tuple(total_people)),
number=models.IntPiecewiseConstant(transition_times=tuple(
transition_times), values=tuple(total_people)),
presence=None,
activity=activity_distributions(self.data_registry)[activity_defn],
)
@ -285,7 +308,7 @@ class VirusFormData(FormData):
"""
month = MONTH_NAMES.index(self.event_month) + 1
timezone = caimira.data.weather.timezone_at(
timezone = data.weather.timezone_at(
latitude=self.location_latitude, longitude=self.location_longitude,
)
# We choose the first of the month for the current year.
@ -306,7 +329,8 @@ class VirusFormData(FormData):
month = MONTH_NAMES.index(self.event_month) + 1
wx_station = self.nearest_weather_station()
temp_profile = caimira.data.weather.mean_hourly_temperatures(wx_station = wx_station[0], month = MONTH_NAMES.index(self.event_month) + 1)
temp_profile = data.weather.mean_hourly_temperatures(
wx_station=wx_station[0], month=MONTH_NAMES.index(self.event_month) + 1)
_, utc_offset = self.tz_name_and_utc_offset()
@ -314,13 +338,14 @@ class VirusFormData(FormData):
# result the first data value may no longer be a midnight, and the hours
# no longer ordered modulo 24).
source_times = np.arange(24) + utc_offset
times, temp_profile = caimira.data.weather.refine_hourly_data(
times, temp_profile = data.weather.refine_hourly_data(
source_times,
temp_profile,
npts=24*10, # 10 steps per hour => 6 min steps
)
outside_temp = models.PiecewiseConstant(
tuple(float(t) for t in times), tuple(float(t) for t in temp_profile),
tuple(float(t) for t in times), tuple(float(t)
for t in temp_profile),
)
return outside_temp
@ -333,7 +358,7 @@ class VirusFormData(FormData):
transition_times = self.CO2_fitting_result['transition_times']
for index, (start, stop) in enumerate(zip(transition_times[:-1], transition_times[1:])):
ventilations.append(models.AirChange(active=models.SpecificInterval(present_times=((start, stop), )),
air_exch=self.CO2_fitting_result['ventilation_values'][index]))
air_exch=self.CO2_fitting_result['ventilation_values'][index]))
return models.MultipleVentilation(tuple(ventilations))
# Initializes a ventilation instance as a window if 'natural_ventilation' is selected, or as a HEPA-filter otherwise
@ -369,7 +394,8 @@ class VirusFormData(FormData):
ventilation = models.AirChange(active=always_on, air_exch=0.)
else:
if self.mechanical_ventilation_type == 'mech_type_air_changes':
ventilation = models.AirChange(active=always_on, air_exch=self.air_changes)
ventilation = models.AirChange(
active=always_on, air_exch=self.air_changes)
else:
ventilation = models.HVACMechanical(
active=always_on, q_air_mech=self.air_supply)
@ -378,16 +404,18 @@ class VirusFormData(FormData):
# to the air infiltration from the outside.
# See CERN-OPEN-2021-004, p. 12.
residual_vent: float = self.data_registry.ventilation['infiltration_ventilation'] # type: ignore
infiltration_ventilation = models.AirChange(active=always_on, air_exch=residual_vent)
infiltration_ventilation = models.AirChange(
active=always_on, air_exch=residual_vent)
if self.hepa_option:
hepa = models.HEPAFilter(active=always_on, q_air_mech=self.hepa_amount)
hepa = models.HEPAFilter(
active=always_on, q_air_mech=self.hepa_amount)
return models.MultipleVentilation((ventilation, hepa, infiltration_ventilation))
else:
return models.MultipleVentilation((ventilation, infiltration_ventilation))
def nearest_weather_station(self) -> caimira.data.weather.WxStationRecordType:
def nearest_weather_station(self) -> data.weather.WxStationRecordType:
"""Return the nearest weather station (which has valid data) for this form"""
return caimira.data.weather.nearest_wx_station(
return data.weather.nearest_wx_station(
longitude=self.location_longitude, latitude=self.location_latitude
)
@ -401,11 +429,13 @@ class VirusFormData(FormData):
return mask
def generate_precise_activity_expiration(self) -> typing.Tuple[typing.Any, ...]:
if self.precise_activity == {}: # It means the precise activity is not defined by a specific input.
# It means the precise activity is not defined by a specific input.
if self.precise_activity == {}:
return ()
respiratory_dict = {}
for respiratory_activity in self.precise_activity['respiratory_activity']:
respiratory_dict[respiratory_activity['type']] = respiratory_activity['percentage']
respiratory_dict[respiratory_activity['type']
] = respiratory_activity['percentage']
return (self.precise_activity['physical_activity'], respiratory_dict)
@ -413,11 +443,14 @@ class VirusFormData(FormData):
# Initializes the virus
virus = virus_distributions(self.data_registry)[self.virus_type]
activity_defn = self.data_registry.population_scenario_activity[self.activity_type]['activity']
expiration_defn = self.data_registry.population_scenario_activity[self.activity_type]['expiration']
activity_defn = self.data_registry.population_scenario_activity[
self.activity_type]['activity']
expiration_defn = self.data_registry.population_scenario_activity[
self.activity_type]['expiration']
if (self.activity_type == 'smallmeeting'):
# Conversation of N people is approximately 1/N% of the time speaking.
expiration_defn = {'Speaking': 1, 'Breathing': self.total_people - 1}
expiration_defn = {'Speaking': 1,
'Breathing': self.total_people - 1}
elif (self.activity_type == 'precise'):
activity_defn, expiration_defn = self.generate_precise_activity_expiration()
@ -434,7 +467,8 @@ class VirusFormData(FormData):
mask=self.mask(),
activity=activity,
expiration=expiration,
host_immunity=0., # Vaccination status does not affect the infected population (for now)
# Vaccination status does not affect the infected population (for now)
host_immunity=0.,
)
return infected
@ -452,8 +486,8 @@ class VirusFormData(FormData):
if (self.vaccine_option):
if (self.vaccine_booster_option and self.vaccine_booster_type != 'Other'):
host_immunity = [vaccine['VE'] for vaccine in data.vaccine_booster_host_immunity if
vaccine['primary series vaccine'] == self.vaccine_type and
vaccine['booster vaccine'] == self.vaccine_booster_type][0]
vaccine['primary series vaccine'] == self.vaccine_type and
vaccine['booster vaccine'] == self.vaccine_booster_type][0]
else:
host_immunity = data.vaccine_primary_host_immunity[self.vaccine_type]
else:
@ -480,9 +514,10 @@ def build_expiration(data_registry, expiration_definition) -> mc._ExpirationBase
elif isinstance(expiration_definition, dict):
total_weight = sum(expiration_definition.values())
BLO_factors = np.sum([
np.array(expiration_BLO_factors(data_registry)[exp_type]) * weight/total_weight
np.array(expiration_BLO_factors(data_registry)
[exp_type]) * weight/total_weight
for exp_type, weight in expiration_definition.items()
], axis=0)
], axis=0)
return expiration_distribution(data_registry=data_registry, BLO_factors=tuple(BLO_factors))
@ -524,7 +559,7 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]:
'mask_type': 'Type I',
'mask_wearing_option': 'mask_off',
'mechanical_ventilation_type': '',
'calculator_version': calculator.__version__,
'calculator_version': '4.17.0', #TODO different version for API and calculator form?
'opening_distance': '0.2',
'event_month': 'January',
'room_heating_option': '0',
@ -550,4 +585,5 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]:
'short_range_interactions': '[]',
}
cast_class_fields(VirusFormData)

View file

@ -1,5 +1,5 @@
import pandas as pd
from tabulate import tabulate
# import pandas as pd
# from tabulate import tabulate
'''
Script file to generate the vaccine effectiveness values.

View file

@ -1,22 +1,13 @@
import pytest
from caimira.apps.calculator import model_generator
from caimira.calculator.validators.virus import virus_validator
@pytest.fixture
def baseline_form_data():
return model_generator.baseline_raw_form_data()
return virus_validator.baseline_raw_form_data()
@pytest.fixture
def baseline_form(baseline_form_data, data_registry):
return model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
@pytest.fixture
def baseline_form_with_sr(baseline_form_data, data_registry):
form_data_sr = baseline_form_data
form_data_sr['short_range_option'] = 'short_range_yes'
form_data_sr['short_range_interactions'] = '[{"expiration": "Shouting", "start_time": "10:30", "duration": "30"}]'
form_data_sr['short_range_occupants'] = 5
return model_generator.VirusFormData.from_dict(form_data_sr, data_registry)
return virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)

View file

@ -6,24 +6,24 @@ import numpy.testing as npt
import pytest
from retry import retry
from caimira.apps.calculator import model_generator
from caimira.apps.calculator.form_data import (_hours2timestring, minutes_since_midnight,
from caimira.calculator.validators.virus import virus_validator
from caimira.calculator.validators.form_validator import (_hours2timestring, minutes_since_midnight,
_CAST_RULES_FORM_ARG_TO_NATIVE, _CAST_RULES_NATIVE_TO_FORM_ARG)
from caimira import models
from caimira.monte_carlo.data import expiration_distributions
from caimira.apps.calculator.defaults import NO_DEFAULT
from caimira.store.data_registry import DataRegistry
from caimira.calculator.models import models
from caimira.calculator.models.monte_carlo.data import expiration_distributions
from caimira.calculator.validators.defaults import NO_DEFAULT
from caimira.calculator.store.data_registry import DataRegistry
def test_model_from_dict(baseline_form_data, data_registry):
form = model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
form = virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)
assert isinstance(form.build_model(), models.ExposureModel)
def test_model_from_dict_invalid(baseline_form_data, data_registry):
baseline_form_data['invalid_item'] = 'foobar'
with pytest.raises(ValueError, match='Invalid argument "invalid_item" given'):
model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)
@retry(tries=10)
@ -39,14 +39,14 @@ def test_blend_expiration(data_registry, mask_type):
SAMPLE_SIZE = 250000
TOLERANCE = 0.02
blend = {'Breathing': 2, 'Speaking': 1}
r = model_generator.build_expiration(data_registry, blend).build_model(SAMPLE_SIZE)
r = virus_validator.build_expiration(data_registry, blend).build_model(SAMPLE_SIZE)
mask = models.Mask.types[mask_type]
expected = (expiration_distributions(data_registry)['Breathing'].build_model(SAMPLE_SIZE).aerosols(mask).mean()*2/3. +
expiration_distributions(data_registry)['Speaking'].build_model(SAMPLE_SIZE).aerosols(mask).mean()/3.)
npt.assert_allclose(r.aerosols(mask).mean(), expected, rtol=TOLERANCE)
def test_ventilation_slidingwindow(data_registry: DataRegistry, baseline_form: model_generator.VirusFormData):
def test_ventilation_slidingwindow(data_registry: DataRegistry, baseline_form: virus_validator.VirusFormData):
baseline_form.ventilation_type = 'natural_ventilation'
baseline_form.windows_duration = 10
baseline_form.windows_frequency = 120
@ -77,7 +77,7 @@ def test_ventilation_slidingwindow(data_registry: DataRegistry, baseline_form: m
assert ventilation == baseline_vent
def test_ventilation_hingedwindow(baseline_form: model_generator.VirusFormData):
def test_ventilation_hingedwindow(baseline_form: virus_validator.VirusFormData):
baseline_form.ventilation_type = 'natural_ventilation'
baseline_form.windows_duration = 10
baseline_form.windows_frequency = 120
@ -107,7 +107,7 @@ def test_ventilation_hingedwindow(baseline_form: model_generator.VirusFormData):
assert ventilation == baseline_vent
def test_ventilation_mechanical(baseline_form: model_generator.VirusFormData):
def test_ventilation_mechanical(baseline_form: virus_validator.VirusFormData):
room = models.Room(volume=75, inside_temp=models.PiecewiseConstant((0, 24), (293,)))
mech = models.HVACMechanical(
active=models.PeriodicInterval(period=120, duration=120),
@ -122,7 +122,7 @@ def test_ventilation_mechanical(baseline_form: model_generator.VirusFormData):
np.array([baseline_form.ventilation().air_exchange(room, t) for t in ts]))
def test_ventilation_airchanges(baseline_form: model_generator.VirusFormData):
def test_ventilation_airchanges(baseline_form: virus_validator.VirusFormData):
room = models.Room(75, inside_temp=models.PiecewiseConstant((0, 24), (293,)))
airchange = models.AirChange(
active=models.PeriodicInterval(period=120, duration=120),
@ -137,7 +137,7 @@ def test_ventilation_airchanges(baseline_form: model_generator.VirusFormData):
np.array([baseline_form.ventilation().air_exchange(room, t) for t in ts]))
def test_ventilation_window_hepa(data_registry: DataRegistry, baseline_form: model_generator.VirusFormData):
def test_ventilation_window_hepa(data_registry: DataRegistry, baseline_form: virus_validator.VirusFormData):
baseline_form.ventilation_type = 'natural_ventilation'
baseline_form.windows_duration = 10
baseline_form.windows_frequency = 120
@ -181,7 +181,7 @@ def test_ventilation_window_hepa(data_registry: DataRegistry, baseline_form: mod
]
)
def test_infected_less_than_total_people(activity, total_people, infected_people, error,
baseline_form: model_generator.VirusFormData,
baseline_form: virus_validator.VirusFormData,
data_registry: DataRegistry):
baseline_form.activity_type = activity
baseline_form.total_people = total_people
@ -195,7 +195,7 @@ def present_times(interval: models.Interval) -> models.BoundarySequence_t:
return interval.present_times
def test_infected_present_intervals(baseline_form: model_generator.VirusFormData):
def test_infected_present_intervals(baseline_form: virus_validator.VirusFormData):
baseline_form.infected_dont_have_breaks_with_exposed = False
baseline_form.exposed_coffee_duration = 15
baseline_form.exposed_coffee_break_option = 'coffee_break_2'
@ -209,7 +209,7 @@ def test_infected_present_intervals(baseline_form: model_generator.VirusFormData
assert present_times(baseline_form.infected_present_interval()) == correct
def test_exposed_present_intervals(baseline_form: model_generator.VirusFormData):
def test_exposed_present_intervals(baseline_form: virus_validator.VirusFormData):
baseline_form.exposed_coffee_duration = 15
baseline_form.exposed_coffee_break_option = 'coffee_break_2'
baseline_form.exposed_start = minutes_since_midnight(9 * 60)
@ -220,7 +220,7 @@ def test_exposed_present_intervals(baseline_form: model_generator.VirusFormData)
assert present_times(baseline_form.exposed_present_interval()) == correct
def test_present_intervals_common_breaks(baseline_form: model_generator.VirusFormData):
def test_present_intervals_common_breaks(baseline_form: virus_validator.VirusFormData):
baseline_form.infected_dont_have_breaks_with_exposed = False
baseline_form.infected_coffee_duration = baseline_form.exposed_coffee_duration = 15
baseline_form.infected_coffee_break_option = baseline_form.exposed_coffee_break_option = 'coffee_break_2'
@ -236,7 +236,7 @@ def test_present_intervals_common_breaks(baseline_form: model_generator.VirusFor
assert present_times(baseline_form.infected_present_interval()) == correct_infected
def test_present_intervals_split_breaks(baseline_form: model_generator.VirusFormData):
def test_present_intervals_split_breaks(baseline_form: virus_validator.VirusFormData):
baseline_form.infected_dont_have_breaks_with_exposed = True
baseline_form.infected_coffee_duration = baseline_form.exposed_coffee_duration = 15
baseline_form.infected_coffee_break_option = baseline_form.exposed_coffee_break_option = 'coffee_break_2'
@ -252,7 +252,7 @@ def test_present_intervals_split_breaks(baseline_form: model_generator.VirusForm
assert present_times(baseline_form.infected_present_interval()) == correct_infected
def test_exposed_present_intervals_starting_with_lunch(baseline_form: model_generator.VirusFormData):
def test_exposed_present_intervals_starting_with_lunch(baseline_form: virus_validator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_0'
baseline_form.exposed_start = baseline_form.exposed_lunch_start = minutes_since_midnight(13 * 60)
baseline_form.exposed_finish = minutes_since_midnight(18 * 60)
@ -261,7 +261,7 @@ def test_exposed_present_intervals_starting_with_lunch(baseline_form: model_gene
assert present_times(baseline_form.exposed_present_interval()) == correct
def test_exposed_present_intervals_ending_with_lunch(baseline_form: model_generator.VirusFormData):
def test_exposed_present_intervals_ending_with_lunch(baseline_form: virus_validator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_0'
baseline_form.exposed_start = minutes_since_midnight(11 * 60)
baseline_form.exposed_finish = baseline_form.exposed_lunch_start = minutes_since_midnight(13 * 60)
@ -270,7 +270,7 @@ def test_exposed_present_intervals_ending_with_lunch(baseline_form: model_genera
assert present_times(baseline_form.exposed_present_interval()) == correct
def test_exposed_present_lunch_end_before_beginning(baseline_form: model_generator.VirusFormData, data_registry: DataRegistry):
def test_exposed_present_lunch_end_before_beginning(baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry):
baseline_form.exposed_coffee_break_option = 'coffee_break_0'
baseline_form.exposed_lunch_start = minutes_since_midnight(14 * 60)
baseline_form.exposed_lunch_finish = minutes_since_midnight(13 * 60)
@ -287,7 +287,7 @@ def test_exposed_present_lunch_end_before_beginning(baseline_form: model_generat
[9, 20], # lunch_finish after the presence finishing
],
)
def test_exposed_presence_lunch_break(baseline_form: model_generator.VirusFormData, data_registry: DataRegistry, exposed_lunch_start, exposed_lunch_finish):
def test_exposed_presence_lunch_break(baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry, exposed_lunch_start, exposed_lunch_finish):
baseline_form.exposed_lunch_start = minutes_since_midnight(exposed_lunch_start * 60)
baseline_form.exposed_lunch_finish = minutes_since_midnight(exposed_lunch_finish * 60)
with pytest.raises(ValueError, match='exposed lunch break must be within presence times.'):
@ -303,14 +303,14 @@ def test_exposed_presence_lunch_break(baseline_form: model_generator.VirusFormDa
[9, 20], # lunch_finish after the presence finishing
],
)
def test_infected_presence_lunch_break(baseline_form: model_generator.VirusFormData, data_registry: DataRegistry, infected_lunch_start, infected_lunch_finish):
def test_infected_presence_lunch_break(baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry, infected_lunch_start, infected_lunch_finish):
baseline_form.infected_lunch_start = minutes_since_midnight(infected_lunch_start * 60)
baseline_form.infected_lunch_finish = minutes_since_midnight(infected_lunch_finish * 60)
with pytest.raises(ValueError, match='infected lunch break must be within presence times.'):
baseline_form.validate()
def test_exposed_breaks_length(baseline_form: model_generator.VirusFormData, data_registry: DataRegistry):
def test_exposed_breaks_length(baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry):
baseline_form.exposed_coffee_break_option = 'coffee_break_4'
baseline_form.exposed_coffee_duration = 30
baseline_form.exposed_start = minutes_since_midnight(10 * 60)
@ -320,7 +320,7 @@ def test_exposed_breaks_length(baseline_form: model_generator.VirusFormData, dat
baseline_form.validate()
def test_infected_breaks_length(baseline_form: model_generator.VirusFormData, data_registry: DataRegistry):
def test_infected_breaks_length(baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry):
baseline_form.infected_start = minutes_since_midnight(9 * 60)
baseline_form.infected_finish = minutes_since_midnight(12 * 60)
baseline_form.infected_lunch_start = minutes_since_midnight(10 * 60)
@ -332,7 +332,7 @@ def test_infected_breaks_length(baseline_form: model_generator.VirusFormData, da
@pytest.fixture
def coffee_break_between_1045_and_1115(baseline_form: model_generator.VirusFormData):
def coffee_break_between_1045_and_1115(baseline_form: virus_validator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_1'
baseline_form.exposed_coffee_duration = 30
baseline_form.exposed_start = minutes_since_midnight(10 * 60)
@ -390,7 +390,7 @@ def assert_boundaries(interval, boundaries_in_time_string_form):
@pytest.fixture
def breaks_every_25_mins_for_20_mins(baseline_form: model_generator.VirusFormData):
def breaks_every_25_mins_for_20_mins(baseline_form: virus_validator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_4'
baseline_form.exposed_coffee_duration = 20
baseline_form.exposed_start = time2mins("10:00")
@ -435,7 +435,7 @@ def test_present_only_during_second_break(breaks_every_25_mins_for_20_mins):
assert_boundaries(interval, [])
def test_valid_no_lunch(baseline_form: model_generator.VirusFormData, data_registry: DataRegistry):
def test_valid_no_lunch(baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry):
# Check that it is valid to have a 0 length lunch if no lunch is selected.
baseline_form.exposed_lunch_option = False
baseline_form.exposed_lunch_start = minutes_since_midnight(0)
@ -443,7 +443,7 @@ def test_valid_no_lunch(baseline_form: model_generator.VirusFormData, data_regis
assert baseline_form.validate() is None
def test_no_breaks(baseline_form: model_generator.VirusFormData):
def test_no_breaks(baseline_form: virus_validator.VirusFormData):
# Check that the times are correct in the absence of breaks.
baseline_form.infected_dont_have_breaks_with_exposed = False
baseline_form.exposed_lunch_option = False
@ -458,7 +458,7 @@ def test_no_breaks(baseline_form: model_generator.VirusFormData):
assert present_times(baseline_form.infected_present_interval()) == infected_correct
def test_coffee_lunch_breaks(baseline_form: model_generator.VirusFormData):
def test_coffee_lunch_breaks(baseline_form: virus_validator.VirusFormData):
baseline_form.exposed_coffee_duration = 30
baseline_form.exposed_coffee_break_option = 'coffee_break_4'
baseline_form.exposed_start = minutes_since_midnight(9 * 60)
@ -470,7 +470,7 @@ def test_coffee_lunch_breaks(baseline_form: model_generator.VirusFormData):
np.testing.assert_allclose(present_times(baseline_form.exposed_present_interval()), correct, rtol=1e-14)
def test_coffee_lunch_breaks_unbalance(baseline_form: model_generator.VirusFormData):
def test_coffee_lunch_breaks_unbalance(baseline_form: virus_validator.VirusFormData):
baseline_form.exposed_coffee_duration = 30
baseline_form.exposed_coffee_break_option = 'coffee_break_2'
baseline_form.exposed_start = minutes_since_midnight(9 * 60)
@ -481,7 +481,7 @@ def test_coffee_lunch_breaks_unbalance(baseline_form: model_generator.VirusFormD
np.testing.assert_allclose(present_times(baseline_form.exposed_present_interval()), correct, rtol=1e-14)
def test_coffee_breaks(baseline_form: model_generator.VirusFormData):
def test_coffee_breaks(baseline_form: virus_validator.VirusFormData):
baseline_form.exposed_coffee_duration = 10
baseline_form.exposed_coffee_break_option = 'coffee_break_4'
baseline_form.exposed_start = minutes_since_midnight(9 * 60)
@ -494,24 +494,24 @@ def test_coffee_breaks(baseline_form: model_generator.VirusFormData):
def test_key_validation(baseline_form_data, data_registry):
baseline_form_data['activity_type'] = 'invalid key'
with pytest.raises(ValueError):
model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)
def test_key_validation_natural_ventilation_window_type_na(baseline_form_data, data_registry):
baseline_form_data['ventilation_type'] = 'natural_ventilation'
baseline_form_data['window_type'] = 'not-applicable'
with pytest.raises(ValueError, match='window_type cannot be \'not-applicable\''):
model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)
def test_key_validation_natural_ventilation_window_opening_regime_na(baseline_form_data, data_registry):
baseline_form_data['ventilation_type'] = 'natural_ventilation'
baseline_form_data['window_opening_regime'] = 'not-applicable'
with pytest.raises(ValueError, match='window_opening_regime cannot be \'not-applicable\''):
model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)
def test_natural_ventilation_window_opening_periodically(baseline_form: model_generator.VirusFormData, data_registry: DataRegistry):
def test_natural_ventilation_window_opening_periodically(baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry):
baseline_form.window_opening_regime = 'windows_open_periodically'
baseline_form.windows_duration = 20
baseline_form.windows_frequency = 10
@ -523,20 +523,20 @@ def test_key_validation_mech_ventilation_type_na(baseline_form_data, data_regist
baseline_form_data['ventilation_type'] = 'mechanical_ventilation'
baseline_form_data['mechanical_ventilation_type'] = 'not-applicable'
with pytest.raises(ValueError, match='mechanical_ventilation_type cannot be \'not-applicable\''):
model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)
def test_key_validation_event_month(baseline_form_data, data_registry):
baseline_form_data['event_month'] = 'invalid month'
with pytest.raises(ValueError, match='invalid month is not a valid value for event_month'):
model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)
def test_default_types():
# Validate that VirusFormData._DEFAULTS are complete and of the correct type.
# Validate that we have the right types and matching attributes to the DEFAULTS.
fields = {field.name: field for field in dataclasses.fields(model_generator.VirusFormData)}
for field, value in model_generator.VirusFormData._DEFAULTS.items():
fields = {field.name: field for field in dataclasses.fields(virus_validator.VirusFormData)}
for field, value in virus_validator.VirusFormData._DEFAULTS.items():
if field not in fields:
raise ValueError(f"Unmatched default {field}")
@ -557,7 +557,7 @@ def test_default_types():
for field in fields.values():
if field.name == "data_registry":
continue # Skip the assertion for the "data_registry" field
assert field.name in model_generator.VirusFormData._DEFAULTS, f"No default set for field name {field.name}"
assert field.name in virus_validator.VirusFormData._DEFAULTS, f"No default set for field name {field.name}"
def test_form_to_dict(baseline_form):
@ -566,7 +566,7 @@ def test_form_to_dict(baseline_form):
assert 1 < len(stripped) < len(full)
assert 'exposed_coffee_break_option' in stripped
# If we set the value to the default one, it should no longer turn up in the dictionary.
baseline_form.exposed_coffee_break_option = model_generator.VirusFormData._DEFAULTS['exposed_coffee_break_option']
baseline_form.exposed_coffee_break_option = virus_validator.VirusFormData._DEFAULTS['exposed_coffee_break_option']
assert 'exposed_coffee_break_option' not in baseline_form.to_dict(baseline_form, strip_defaults=True)
@ -584,7 +584,7 @@ def test_form_timezone(baseline_form_data, data_registry, longitude, latitude, m
baseline_form_data['location_latitude'] = latitude
baseline_form_data['location_longitude'] = longitude
baseline_form_data['event_month'] = month
form = model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
form = virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)
name, offset = form.tz_name_and_utc_offset()
assert name == expected_tz_name
assert offset == expected_offset

View file

@ -1,32 +0,0 @@
import json
import tornado.testing
import caimira.apps.calculator
from caimira.apps.calculator import model_generator
_TIMEOUT = 40.
class TestCalculatorJsonResponse(tornado.testing.AsyncHTTPTestCase):
def setUp(self):
super().setUp()
self.http_client.defaults['request_timeout'] = _TIMEOUT
def get_app(self):
return caimira.apps.calculator.make_app()
@tornado.testing.gen_test(timeout=_TIMEOUT)
def test_json_response(self):
response = yield self.http_client.fetch(
request=self.get_url("/calculator/report-json"),
method="POST",
headers={'content-type': 'application/json'},
body=json.dumps(model_generator.baseline_raw_form_data())
)
self.assertEqual(response.code, 200)
data = json.loads(response.body)
self.assertIsInstance(data['prob_inf'], float)
self.assertIsInstance(data['expected_new_cases'], float)

View file

@ -2,8 +2,8 @@ from typing import Type
import numpy as np
import pytest
from caimira.apps.calculator import model_generator
from caimira.store.data_registry import DataRegistry
from caimira.calculator.validators.virus import virus_validator
from caimira.calculator.store.data_registry import DataRegistry
@pytest.mark.parametrize(
@ -14,7 +14,7 @@ from caimira.store.data_registry import DataRegistry
[{"exposed_breaks": [], "ifected_breaks": []}, 'Unable to fetch "infected_breaks" key. Got "ifected_breaks".'],
]
)
def test_specific_break_structure(break_input, error, baseline_form: model_generator.VirusFormData, data_registry: DataRegistry):
def test_specific_break_structure(break_input, error, baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry):
baseline_form.specific_breaks = break_input
with pytest.raises(TypeError, match=error):
baseline_form.validate()
@ -31,7 +31,7 @@ def test_specific_break_structure(break_input, error, baseline_form: model_gener
[[{"start_time": "10:00", "finish_time": "11"}], 'Wrong time format - "HH:MM". Got "11".'],
]
)
def test_specific_population_break_data_structure(population_break_input, error, baseline_form: model_generator.VirusFormData, data_registry: DataRegistry):
def test_specific_population_break_data_structure(population_break_input, error, baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry):
baseline_form.specific_breaks = {'exposed_breaks': population_break_input, 'infected_breaks': population_break_input}
with pytest.raises(TypeError, match=error):
baseline_form.validate()
@ -46,7 +46,7 @@ def test_specific_population_break_data_structure(population_break_input, error,
[{'exposed_breaks': [], 'infected_breaks': [{"start_time": "08:00", "finish_time": "11:00"}, {"start_time": "14:00", "finish_time": "15:00"}, ]}, "All breaks should be within the simulation time. Got 08:00."],
]
)
def test_specific_break_time(break_input, error, baseline_form: model_generator.VirusFormData):
def test_specific_break_time(break_input, error, baseline_form: virus_validator.VirusFormData):
with pytest.raises(ValueError, match=error):
baseline_form.generate_specific_break_times(breaks_dict=break_input, target='exposed')
baseline_form.generate_specific_break_times(breaks_dict=break_input, target='infected')
@ -65,7 +65,7 @@ def test_specific_break_time(break_input, error, baseline_form: model_generator.
[{"physical_activity": "Light activity", "respiratory_activity": [{"type": "Breathing", "percentag": 50}, {"type": "Speaking", "percentage": 50}]}, 'Unable to fetch "percentage" key. Got "percentag".'],
]
)
def test_precise_activity_structure(precise_activity_input, error, baseline_form: model_generator.VirusFormData, data_registry: DataRegistry):
def test_precise_activity_structure(precise_activity_input, error, baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry):
baseline_form.precise_activity = precise_activity_input
with pytest.raises(TypeError, match=error):
baseline_form.validate()
@ -80,7 +80,7 @@ def test_precise_activity_structure(precise_activity_input, error, baseline_form
[{"physical_activity": "Light activity", "respiratory_activity": [{"type": "Breathing", "percentage": 50}]}, 'The sum of all respiratory activities should be 100. Got 50.'],
]
)
def test_sum_precise_activity(precise_activity_input, error, baseline_form: model_generator.VirusFormData):
def test_sum_precise_activity(precise_activity_input, error, baseline_form: virus_validator.VirusFormData):
baseline_form.precise_activity = precise_activity_input
with pytest.raises(ValueError, match=error):
baseline_form.validate()

View file

@ -1,11 +1,9 @@
from caimira import models
import caimira.data
import caimira.dataclass_utils
import pytest
from caimira.store.data_registry import DataRegistry
from caimira.calculator.models import models
import caimira.calculator.models.data
import caimira.calculator.models.dataclass_utils
from caimira.calculator.store.data_registry import DataRegistry
@pytest.fixture
def data_registry():
@ -61,12 +59,12 @@ def baseline_exposure_model(data_registry, baseline_concentration_model, baselin
@pytest.fixture
def exposure_model_w_outside_temp_changes(data_registry, baseline_exposure_model: models.ExposureModel):
exp_model = caimira.dataclass_utils.nested_replace(
exp_model = caimira.calculator.models.dataclass_utils.nested_replace(
baseline_exposure_model, {
'concentration_model.ventilation': models.SlidingWindow(
data_registry=data_registry,
active=models.PeriodicInterval(2.2 * 60, 1.8 * 60),
outside_temp=caimira.data.GenevaTemperatures['Jan'],
outside_temp=caimira.calculator.models.data.GenevaTemperatures['Jan'],
window_height=1.6,
opening_length=0.6,
)

View file

@ -5,7 +5,7 @@ import numpy as np
import numpy.testing
import pytest
import caimira.data.weather as wx
import caimira.calculator.models.data.weather as wx
def test_nearest_wx_station():

View file

@ -3,8 +3,8 @@ import numpy as np
import typing
import pytest
from caimira import models
from caimira.apps.calculator.co2_model_generator import CO2FormData
from caimira.calculator.models import models
from caimira.calculator.validators.co2.co2_validator import CO2FormData
@pytest.fixture

View file

@ -5,8 +5,8 @@ import numpy.testing as npt
import pytest
from dataclasses import dataclass
from caimira import models
from caimira.store.data_registry import DataRegistry
from caimira.calculator.models import models
from caimira.calculator.store.data_registry import DataRegistry
@dataclass(frozen=True)
class KnownConcentrationModelBase(models._ConcentrationModelBase):

View file

@ -4,8 +4,8 @@ import numpy as np
import numpy.testing as npt
import pytest
from caimira import models
import caimira.dataclass_utils as dc_utils
from caimira.calculator.models import models
from caimira.calculator.models import dataclass_utils as dc_utils
@pytest.fixture
def full_exposure_model(data_registry):

View file

@ -5,11 +5,11 @@ import numpy.testing
import pytest
from dataclasses import dataclass
from caimira import models
from caimira.models import ExposureModel
from caimira.dataclass_utils import replace
from caimira.monte_carlo.data import expiration_distributions
from caimira.store.data_registry import DataRegistry
from caimira.calculator.models import models
from caimira.calculator.models.models import ExposureModel
from caimira.calculator.models.dataclass_utils import replace
from caimira.calculator.models.monte_carlo.data import expiration_distributions
from caimira.calculator.store.data_registry import DataRegistry
@dataclass(frozen=True)
class KnownNormedconcentration(models.ConcentrationModel):

View file

@ -2,7 +2,7 @@ import numpy as np
import numpy.testing as npt
import pytest
from caimira import models
from caimira.calculator.models import models
@pytest.mark.parametrize(

View file

@ -2,7 +2,7 @@ import numpy as np
import numpy.testing as npt
import pytest
from caimira import models
from caimira.calculator.models import models
@pytest.mark.parametrize(

View file

@ -1,8 +1,8 @@
import numpy as np
import pytest
from caimira import models
from caimira import data
from caimira.calculator.models import models
from caimira.calculator.models import data
def test_piecewiseconstantfunction_wrongarguments():

View file

@ -3,10 +3,10 @@ import typing
import numpy as np
import pytest
from caimira import models
import caimira.monte_carlo as mc_models
from caimira.apps.calculator.model_generator import build_expiration
from caimira.monte_carlo.data import short_range_expiration_distributions,\
from caimira.calculator.models import models
import caimira.calculator.models.monte_carlo as mc_models
from caimira.calculator.validators.virus.virus_validator import build_expiration
from caimira.calculator.models.monte_carlo.data import short_range_expiration_distributions,\
expiration_distributions, short_range_distances, activity_distributions
SAMPLE_SIZE = 250_000

View file

@ -1,25 +0,0 @@
import numpy as np
import numpy.testing as npt
import pytest
from caimira import models
@pytest.mark.parametrize(
"inside_temp, humidity, expected_halflife, expected_decay_constant",
[
[293.15, 0.5, 0.5947447349860315, 1.1654532436949188],
[272.15, 0.7, 1.6070844193207476, 0.4313072619127947],
[300.15, 1., 0.17367078830147223, 3.9911558376571805],
[300.15, 0., 6.43, 0.10779893943389507],
[np.array([272.15, 300.15]), np.array([0.7, 0.]),
np.array([1.60708442, 6.43]), np.array([0.43130726, 0.10779894])],
[np.array([293.15, 300.15]), np.array([0.5, 1.]),
np.array([0.59474473, 0.17367079]), np.array([1.16545324, 3.99115584])]
],
)
def test_decay_constant(inside_temp, humidity, expected_halflife, expected_decay_constant):
npt.assert_almost_equal(models.Virus.types['SARS_CoV_2'].halflife(humidity, inside_temp),
expected_halflife)
npt.assert_almost_equal(models.Virus.types['SARS_CoV_2'].decay_constant(humidity, inside_temp),
expected_decay_constant)

View file

@ -3,8 +3,8 @@ High-level tests for the package.
"""
import caimira
import caimira.calculator.models
def test_version():
assert caimira.__version__ is not None
assert caimira.calculator.models.__version__ is not None

View file

@ -2,11 +2,11 @@ import numpy as np
import pytest
from retry import retry
import caimira.monte_carlo as mc
from caimira import models
from caimira.dataclass_utils import nested_replace
from caimira.apps.calculator import report_generator
from caimira.monte_carlo.data import activity_distributions, virus_distributions, expiration_distributions
import caimira.calculator.models.monte_carlo as mc
from caimira.calculator.models import models
from caimira.calculator.models.dataclass_utils import nested_replace
from caimira.calculator.report import report_generator
from caimira.calculator.models.monte_carlo.data import activity_distributions, virus_distributions, expiration_distributions
@pytest.fixture

View file

@ -1,7 +1,7 @@
import unittest
from unittest.mock import Mock, patch
from caimira.store.data_service import DataService
from caimira.calculator.store.data_service import DataService
class DataServiceTests(unittest.TestCase):

View file

@ -1,6 +1,6 @@
import dataclasses
from caimira.dataclass_utils import nested_replace, walk_dataclass
from caimira.calculator.models.dataclass_utils import nested_replace, walk_dataclass
@dataclasses.dataclass(frozen=True)

View file

@ -5,8 +5,8 @@ import numpy.testing as npt
import pytest
from retry import retry
from caimira import models
from caimira.monte_carlo.data import expiration_distribution
from caimira.calculator.models import models
from caimira.calculator.models.monte_carlo.data import expiration_distribution
def test_multiple_wrong_weight_size():

View file

@ -8,11 +8,11 @@ import numpy.testing as npt
import pytest
from retry import retry
import caimira.monte_carlo as mc
from caimira import models
from caimira.utils import method_cache
from caimira.models import _VectorisedFloat,Interval,SpecificInterval
from caimira.monte_carlo.data import (expiration_distributions,
import caimira.calculator.models.monte_carlo as mc
from caimira.calculator.models import models
from caimira.calculator.models.utils import method_cache
from caimira.calculator.models.models import _VectorisedFloat,Interval,SpecificInterval
from caimira.calculator.models.monte_carlo.data import (expiration_distributions,
expiration_BLO_factors,short_range_expiration_distributions,
short_range_distances,virus_distributions,activity_distributions)

View file

@ -1,7 +1,7 @@
import numpy as np
import pytest
import caimira.models
import caimira.calculator.models.models
@pytest.mark.parametrize(
@ -17,26 +17,26 @@ def test_infected_population_vectorisation(override_params, data_registry):
}
defaults.update(override_params)
office_hours = caimira.models.SpecificInterval(present_times=[(8,17)])
infected = caimira.models.InfectedPopulation(
office_hours = caimira.calculator.models.models.SpecificInterval(present_times=[(8,17)])
infected = caimira.calculator.models.models.InfectedPopulation(
data_registry=data_registry,
number=1,
presence=office_hours,
mask=caimira.models.Mask(
mask=caimira.calculator.models.models.Mask(
factor_exhale=0.95,
η_inhale=0.3,
),
activity=caimira.models.Activity(
activity=caimira.calculator.models.models.Activity(
0.51,
defaults['exhalation_rate'],
),
virus=caimira.models.SARSCoV2(
virus=caimira.calculator.models.models.SARSCoV2(
viral_load_in_sputum=defaults['viral_load_in_sputum'],
infectious_dose=50.,
viable_to_RNA_ratio = 0.5,
transmissibility_factor=1.0,
),
expiration=caimira.models._ExpirationBase.types['Breathing'],
expiration=caimira.calculator.models.models._ExpirationBase.types['Breathing'],
host_immunity=0.,
)
emission_rate = infected.emission_rate(10)

View file

@ -2,8 +2,8 @@ import numpy as np
import numpy.testing as npt
import pytest
import caimira.models as models
import caimira.data as data
import caimira.calculator.models.models as models
import caimira.calculator.models.data as data
def test_no_mask_superspeading_emission_rate(baseline_concentration_model):

View file

@ -1,5 +1,4 @@
import caimira.models
from caimira.dataclass_utils import nested_replace
from caimira.calculator.models.dataclass_utils import nested_replace
def test_exposure_r0(baseline_exposure_model):

View file

@ -3,12 +3,12 @@ import dataclasses
import numpy as np
import pytest
import caimira.models
import caimira.monte_carlo.models as mc_models
import caimira.monte_carlo.sampleable
import caimira.calculator.models
import caimira.calculator.models.models
import caimira.calculator.models.monte_carlo.sampleable
MODEL_CLASSES = [
cls for cls in vars(caimira.models).values()
cls for cls in vars(caimira.calculator.models).values()
if dataclasses.is_dataclass(cls)
]
@ -21,11 +21,11 @@ def test_type_annotations():
# runtime execution.
missing = []
for cls in MODEL_CLASSES:
if not hasattr(caimira.monte_carlo, cls.__name__):
if not hasattr(caimira.calculator.models.monte_carlo, cls.__name__):
missing.append(cls.__name__)
continue
mc_cls = getattr(caimira.monte_carlo, cls.__name__)
assert issubclass(mc_cls, caimira.monte_carlo.MCModelBase)
mc_cls = getattr(caimira.calculator.models.monte_carlo, cls.__name__)
assert issubclass(mc_cls, caimira.calculator.models.monte_carlo.MCModelBase)
if missing:
msg = (
@ -37,25 +37,25 @@ def test_type_annotations():
@pytest.fixture
def baseline_mc_concentration_model(data_registry) -> caimira.monte_carlo.ConcentrationModel:
mc_model = caimira.monte_carlo.ConcentrationModel(
def baseline_mc_concentration_model(data_registry) -> caimira.calculator.models.monte_carlo.ConcentrationModel:
mc_model = caimira.calculator.models.monte_carlo.ConcentrationModel(
data_registry=data_registry,
room=caimira.monte_carlo.Room(volume=caimira.monte_carlo.sampleable.Normal(75, 20),
inside_temp=caimira.models.PiecewiseConstant((0., 24.), (293,))),
ventilation=caimira.monte_carlo.SlidingWindow(
room=caimira.calculator.models.monte_carlo.Room(volume=caimira.calculator.models.monte_carlo.sampleable.Normal(75, 20),
inside_temp=caimira.calculator.models.models.PiecewiseConstant((0., 24.), (293,))),
ventilation=caimira.calculator.models.monte_carlo.SlidingWindow(
data_registry=data_registry,
active=caimira.models.PeriodicInterval(period=120, duration=120),
outside_temp=caimira.models.PiecewiseConstant((0., 24.), (283,)),
active=caimira.calculator.models.models.PeriodicInterval(period=120, duration=120),
outside_temp=caimira.calculator.models.models.PiecewiseConstant((0., 24.), (283,)),
window_height=1.6, opening_length=0.6,
),
infected=caimira.models.InfectedPopulation(
infected=caimira.calculator.models.models.InfectedPopulation(
data_registry=data_registry,
number=1,
virus=caimira.models.Virus.types['SARS_CoV_2'],
presence=caimira.models.SpecificInterval(((0., 4.), (5., 8.))),
mask=caimira.models.Mask.types['No mask'],
activity=caimira.models.Activity.types['Light activity'],
expiration=caimira.models.Expiration.types['Breathing'],
virus=caimira.calculator.models.models.Virus.types['SARS_CoV_2'],
presence=caimira.calculator.models.models.SpecificInterval(((0., 4.), (5., 8.))),
mask=caimira.calculator.models.models.Mask.types['No mask'],
activity=caimira.calculator.models.models.Activity.types['Light activity'],
expiration=caimira.calculator.models.models.Expiration.types['Breathing'],
host_immunity=0.,
),
evaporation_factor=0.3,
@ -64,39 +64,39 @@ def baseline_mc_concentration_model(data_registry) -> caimira.monte_carlo.Concen
@pytest.fixture
def baseline_mc_sr_model() -> caimira.monte_carlo.ShortRangeModel:
def baseline_mc_sr_model() -> caimira.calculator.models.monte_carlo.ShortRangeModel:
return ()
@pytest.fixture
def baseline_mc_exposure_model(data_registry, baseline_mc_concentration_model, baseline_mc_sr_model) -> caimira.monte_carlo.ExposureModel:
return caimira.monte_carlo.ExposureModel(
def baseline_mc_exposure_model(data_registry, baseline_mc_concentration_model, baseline_mc_sr_model) -> caimira.calculator.models.monte_carlo.ExposureModel:
return caimira.calculator.models.monte_carlo.ExposureModel(
data_registry,
baseline_mc_concentration_model,
baseline_mc_sr_model,
exposed=caimira.models.Population(
exposed=caimira.calculator.models.models.Population(
number=10,
presence=baseline_mc_concentration_model.infected.presence,
activity=baseline_mc_concentration_model.infected.activity,
mask=baseline_mc_concentration_model.infected.mask,
host_immunity=0.,
),
geographical_data=caimira.models.Cases(),
geographical_data=caimira.calculator.models.models.Cases(),
)
def test_build_concentration_model(baseline_mc_concentration_model: caimira.monte_carlo.ConcentrationModel):
def test_build_concentration_model(baseline_mc_concentration_model: caimira.calculator.models.monte_carlo.ConcentrationModel):
model = baseline_mc_concentration_model.build_model(7)
assert isinstance(model, caimira.models.ConcentrationModel)
assert isinstance(model, caimira.calculator.models.models.ConcentrationModel)
assert isinstance(model.concentration(time=0.), float)
conc = model.concentration(time=1.)
assert isinstance(conc, np.ndarray)
assert conc.shape == (7, )
def test_build_exposure_model(baseline_mc_exposure_model: caimira.monte_carlo.ExposureModel):
def test_build_exposure_model(baseline_mc_exposure_model: caimira.calculator.models.monte_carlo.ExposureModel):
model = baseline_mc_exposure_model.build_model(7)
assert isinstance(model, caimira.models.ExposureModel)
assert isinstance(model, caimira.calculator.models.models.ExposureModel)
prob = model.deposited_exposure()
assert isinstance(prob, np.ndarray)
assert prob.shape == (7, )

View file

@ -3,10 +3,10 @@ import numpy.testing as npt
import pytest
from retry import retry
import caimira.monte_carlo as mc
from caimira import models,data
from caimira.monte_carlo.data import activity_distributions, virus_distributions, expiration_distributions, infectious_dose_distribution, viable_to_RNA_ratio_distribution
from caimira.apps.calculator.model_generator import build_expiration
import caimira.calculator.models.monte_carlo as mc
from caimira.calculator.models import models, data
from caimira.calculator.models.monte_carlo.data import activity_distributions, virus_distributions, expiration_distributions, infectious_dose_distribution, viable_to_RNA_ratio_distribution
from caimira.calculator.validators.virus.virus_validator import build_expiration
SAMPLE_SIZE = 500_000
TOLERANCE = 0.05

View file

@ -2,8 +2,7 @@ import numpy as np
import numpy.testing as npt
import pytest
from caimira.monte_carlo.data import activity_distributions, virus_distributions
from caimira.store import data_registry
from caimira.calculator.models.monte_carlo.data import activity_distributions, virus_distributions
# Mean & std deviations from https://doi.org/10.1101/2021.10.14.21264988 (Table 3)

View file

@ -3,7 +3,7 @@ import numpy.testing as npt
import pytest
from retry import retry
from caimira.monte_carlo import sampleable
from caimira.calculator.models.monte_carlo import sampleable
@retry(tries=10)

View file

@ -4,7 +4,7 @@ import numpy as np
import numpy.testing as npt
import pytest
from caimira import models
from caimira.calculator.models import models
@pytest.fixture

Some files were not shown because too many files have changed in this diff Show more