Merge branch 'feature/backend_separation' into 'master'

CAiMIRA API development

See merge request caimira/caimira!502
This commit is contained in:
Luis Aleixo 2024-09-13 17:26:48 +02:00
commit 048c791fb1
188 changed files with 1640 additions and 1328 deletions

View file

@ -24,6 +24,7 @@ jobs:
env:
PROJECT_ROOT: ./
PROJECT_NAME: caimira
CAIMIRA_TESTS_CALCULATOR_TIMEOUT: 30
steps:
- name: Checkout
uses: actions/checkout@v2

3
.gitignore vendored
View file

@ -3,6 +3,7 @@ __pycache__
*.egg-info
*.DS_Store
*.pyc
dist
# Editor stuff
*.swp
@ -18,4 +19,4 @@ app-config/openshift/caimira-test
app-config/openshift/caimira-prod
# documentation build folder
caimira/docs/_build
caimira/docs/_build

View file

@ -1,7 +1,6 @@
stages:
- test
- docker-build
- oc-tag
- deploy
# Use the acc-py-devtools templates found at
@ -11,39 +10,61 @@ include:
file: acc_py_devtools/templates/gitlab-ci/python.yml
variables:
project_name: caimira
PY_VERSION: "3.11"
# ###################################################################################################
# Test code
# Test code - CAiMIRA (model) and CERN CAiMIRA (CERN's UI)
# A full installation of CAiMIRA, tested with pytest.
test_install:
extends: .acc_py_full_test
.test-base:
image: registry.cern.ch/docker.io/library/python:${PY_VERSION}
stage: test
except:
- live/caimira-test # do not run tests on live/caimira-test branch
.test-run:
extends:
- .test-base
script:
- cd ./${PROJECT_ROOT}
- pip install -e .[test]
- python -m pytest
# A development installation of CAiMIRA tested with pytest.
test_dev:
extends: .acc_py_dev_test
test-caimira-py311:
variables:
PROJECT_ROOT: "caimira"
extends:
- .test-run
test-cern-caimira-py311:
before_script:
- cd ./caimira
- pip install -e .[test]
- cd ../
variables:
PROJECT_ROOT: "cern_caimira"
extends:
- .test-run
# A development installation of CAiMIRA tested with pytest.
test_dev-39:
test-caimira-py39:
variables:
PY_VERSION: "3.9"
extends: .acc_py_dev_test
PROJECT_ROOT: "caimira"
extends:
- test-caimira-py311
test-cern-caimira-py39:
variables:
PY_VERSION: "3.9"
PROJECT_ROOT: "cern_caimira"
extends:
- test-cern-caimira-py311
# ###################################################################################################
# Test OpenShift config
.test_openshift_config:
stage: test
rules:
- if: '$OC_TOKEN && $CI_MERGE_REQUEST_SOURCE_BRANCH_NAME == $BRANCH'
allow_failure: true # The branch must represent what is deployed. FIXME: change to true because of a diff between ConfigMaps
- if: '$OC_TOKEN && $CI_MERGE_REQUEST_EVENT_TYPE != "detached"'
allow_failure: true # Anything other than the branch may fail without blocking the pipeline.
allow_failure: true
image: registry.cern.ch/docker.io/mambaorg/micromamba
before_script:
- micromamba create --yes -p $HOME/env python=3.9 ruamel.yaml wget -c conda-forge
@ -63,7 +84,9 @@ test_dev-39:
paths:
- ./app-config/openshift/${CAIMIRA_INSTANCE}/actual
- ./app-config/openshift/${CAIMIRA_INSTANCE}/expected
only:
- master
- live/caimira-test # do not run tests on live/caimira-test branch
check_openshift_config_test:
extends: .test_openshift_config
@ -73,35 +96,29 @@ check_openshift_config_test:
OC_SERVER: https://api.paas.okd.cern.ch
OC_TOKEN: "${OPENSHIFT_CAIMIRA_TEST_CONFIG_CHECKER_TOKEN}"
check_openshift_config_prod:
extends: .test_openshift_config
variables:
CAIMIRA_INSTANCE: 'caimira-prod'
BRANCH: 'master'
OC_SERVER: https://api.paas.okd.cern.ch
OC_TOKEN: "${OPENSHIFT_CAIMIRA_PROD_CONFIG_CHECKER_TOKEN}"
# TODO: for prod, it should ignore the different tag in the `image` field
# check_openshift_config_prod:
# extends: .test_openshift_config
# variables:
# CAIMIRA_INSTANCE: 'caimira-prod'
# BRANCH: 'master'
# OC_SERVER: https://api.paas.okd.cern.ch
# OC_TOKEN: "${OPENSHIFT_CAIMIRA_PROD_CONFIG_CHECKER_TOKEN}"
# ###################################################################################################
# Build docker images
.image_builder:
# Build and push images to the openshift instance, which automatically triggers an application re-deployment.
# base
.docker-build:
stage: docker-build
rules:
- if: '$CI_COMMIT_BRANCH == "live/caimira-test"'
variables:
IMAGE_TAG: caimira-test-latest
- if: '$CI_COMMIT_BRANCH == "master"'
variables:
IMAGE_TAG: caimira-prod-latest
image:
# Based on guidance at https://gitlab.cern.ch/gitlabci-examples/build_docker_image.
# The kaniko debug image is recommended because it has a shell, and a shell is required for an image to be used with GitLab CI/CD.
name: gcr.io/kaniko-project/executor:debug
entrypoint: [""]
script:
- echo "Building image for ${CI_COMMIT_REF_NAME} branch with tag ${IMAGE_TAG}"
# Prepare Kaniko configuration file
- echo "{\"auths\":{\"$CI_REGISTRY\":{\"username\":\"$CI_REGISTRY_USER\",\"password\":\"$CI_REGISTRY_PASSWORD\"}}}" > /kaniko/.docker/config.json
- echo "Building ${CI_REGISTRY_IMAGE}/${IMAGE_NAME}:latest Docker image..."
@ -110,85 +127,95 @@ check_openshift_config_prod:
# Print the full registry path of the pushed image
- echo "Image pushed successfully to ${CI_REGISTRY_IMAGE}/${IMAGE_NAME}:${IMAGE_TAG}"
auth-service-image_builder:
extends:
- .image_builder
.docker-build-auth-service:
variables:
IMAGE_NAME: auth-service
DOCKERFILE_DIRECTORY: app-config/auth-service
DOCKER_CONTEXT_DIRECTORY: app-config/auth-service
extends: .docker-build
calculator-app-image_builder:
extends:
- .image_builder
.docker-build-calculator-app:
variables:
IMAGE_NAME: calculator-app
DOCKERFILE_DIRECTORY: app-config/calculator-app
DOCKER_CONTEXT_DIRECTORY: ""
extends: .docker-build
oci_calculator:
extends: .image_builder
# on push to live/caimira-test
.docker-build-test:
variables:
IMAGE_NAME: calculator
DOCKERFILE_DIRECTORY: app-config/caimira-public-docker-image
DOCKER_CONTEXT_DIRECTORY: ""
IMAGE_TAG: caimira-test-latest
docker-build-auth-service-test:
extends:
- .docker-build-test
- .docker-build-auth-service
only:
- live/caimira-test
docker-build-calculator-app-test:
extends:
- .docker-build-test
- .docker-build-calculator-app
only:
- live/caimira-test
# on release
.docker-build-release:
before_script:
# Extract version number without 'v' prefix as IMAGE_TAG
- IMAGE_TAG=$(echo "$CI_COMMIT_REF_NAME" | sed 's/^v//')
- echo "Version is $IMAGE_TAG"
docker-build-auth-service-release:
extends:
- .docker-build-release
- .docker-build-auth-service
only:
- tag
docker-build-calculator-app-release:
extends:
- .docker-build-release
- .docker-build-calculator-app
only:
- tag
# ###################################################################################################
# Link build Docker images OpenShift <-> GitLab registry
.link_docker_images_with_gitlab_registry:
stage: oc-tag
image: gitlab-registry.cern.ch/paas-tools/openshift-client:latest
rules:
- if: '$CI_COMMIT_BRANCH == "live/caimira-test"'
variables:
OC_PROJECT: "caimira-test"
OC_TOKEN: ${OPENSHIFT_CAIMIRA_TEST_DEPLOY_TOKEN}
IMAGE_TAG: caimira-test-latest
- if: '$CI_COMMIT_BRANCH == "master"'
variables:
OC_PROJECT: "caimira-prod"
OC_TOKEN: ${OPENSHIFT_CAIMIRA_PROD_DEPLOY_TOKEN}
IMAGE_TAG: caimira-prod-latest
# Deploy to OpenShift
.deploy:
stage: deploy
image: gitlab-registry.cern.ch/paas-tools/openshift-client
variables:
IMAGE_TAG: caimira-test-latest
OPENSHIFT_SERVER: https://api.paas.okd.cern.ch
OPENSHIFT_PROJECT: caimira-test
script:
- oc tag --source=docker ${CI_REGISTRY_IMAGE}/${IMAGE_NAME}:${IMAGE_TAG} ${IMAGE_NAME}:latest --token ${OC_TOKEN} --server=https://api.paas.okd.cern.ch -n ${OC_PROJECT}
- echo "Deploying ${CI_REGISTRY_IMAGE}/${IMAGE_NAME}:${IMAGE_TAG} to OpenShift"
- oc login $OPENSHIFT_SERVER --token=$OPENSHIFT_CAIMIRA_TEST_DEPLOY_TOKEN
- oc project $OPENSHIFT_PROJECT
- oc set image dc/$OPENSHIFT_DEPLOYMENT $OPENSHIFT_CONTAINER_NAME=${CI_REGISTRY_IMAGE}/${IMAGE_NAME}:${IMAGE_TAG}
- oc rollout status dc/$OPENSHIFT_DEPLOYMENT
only:
- live/caimira-test
link_auth-service_with_gitlab_registry:
extends:
- .link_docker_images_with_gitlab_registry
deploy-auth-service-test:
extends: .deploy
variables:
IMAGE_NAME: auth-service
OPENSHIFT_DEPLOYMENT: auth-service
OPENSHIFT_CONTAINER_NAME: auth-service
link_calculator-app_with_gitlab_registry:
extends:
- .link_docker_images_with_gitlab_registry
deploy-calculator-app-test:
extends: .deploy
variables:
IMAGE_NAME: calculator-app
OPENSHIFT_DEPLOYMENT: calculator-app
OPENSHIFT_CONTAINER_NAME: calculator-app
link_calculator_with_gitlab_registry:
extends:
- .link_docker_images_with_gitlab_registry
deploy-calculator-open-app-test:
extends: .deploy
variables:
IMAGE_NAME: calculator
# ###################################################################################################
# Trigger build of CAiMIRA router on OpenShift
trigger_caimira-router_build_on_openshift:
stage: deploy
rules:
- if: '$CI_COMMIT_BRANCH == "live/caimira-test"'
variables:
OC_PROJECT: "caimira-test"
BUILD_WEBHOOK_SECRET: ${OPENSHIFT_CAIMIRA_TEST_BUILD_WEBHOOK_SECRET}
- if: '$CI_COMMIT_BRANCH == "master"'
variables:
OC_PROJECT: "caimira-prod"
BUILD_WEBHOOK_SECRET: ${OPENSHIFT_CAIMIRA_PROD_BUILD_WEBHOOK_SECRET}
script:
- curl -X POST -k https://api.paas.okd.cern.ch/apis/build.openshift.io/v1/namespaces/${OC_PROJECT}/buildconfigs/caimira-router/webhooks/${BUILD_WEBHOOK_SECRET}/generic
IMAGE_NAME: calculator-app
OPENSHIFT_DEPLOYMENT: calculator-open-app
OPENSHIFT_CONTAINER_NAME: calculator-open-app

108
README.md
View file

@ -2,7 +2,7 @@
CAiMIRA is a risk assessment tool developed to model the concentration of viruses in enclosed spaces, in order to inform space-management decisions.
CAiMIRA models the concentration profile of potential virions in enclosed spaces , both as background (room) concentration and during close-proximity interations, with clear and intuitive graphs.
CAiMIRA models the concentration profile of potential virions in enclosed spaces , both as background (room) concentration and during close-proximity interactions, with clear and intuitive graphs.
The user can set a number of parameters, including room volume, exposure time, activity type, mask-wearing and ventilation.
The report generated indicates how to avoid exceeding critical concentrations and chains of airborne transmission in spaces such as individual offices, meeting rooms and labs.
@ -90,54 +90,77 @@ In order to run CAiMIRA locally with docker, run the following:
This will start a local version of CAiMIRA, which can be visited at http://localhost:8080/.
## Folder structure
The project contains two different Python packages:
- `caimira`: Contains all the backend logic and the calculator model. It is the package published in PyPI.
- `cern_caimira`: Imports and uses the backend package (`caimira`) and includes CERN-specific UI implementation.
The folder layout follows best practices as described [here](https://ianhopkinson.org.uk/2022/02/understanding-setup-py-setup-cfg-and-pyproject-toml-in-python/).
## Development guide
CAiMIRA is also mirrored to Github if you wish to collaborate on development and can be found at: https://github.com/CERN/caimira
### Installing CAiMIRA in editable mode
In order to install the CAiMIRA's backend logic, create your own virtualenv and, from the root directory of the project, run:
```
pip install -e . # At the root of the repository
cd caimira
pip install -e .
```
In order to install the CERN-specific UI version, that links to the previously installed backend, activate your virtualenv and, from the root directory of the project, run:
```
cd cern_caimira
pip install -e .
```
### Running the Calculator app in development mode
This example describes how to run the calculator with the CERN-specific UI. In the root directory of the project:
```
python -m caimira.apps.calculator
python -m cern_caimira.apps.calculator
```
To run with a specific template theme created:
```
python -m caimira.apps.calculator --theme=caimira/apps/templates/{theme}
python -m cern_caimira.apps.calculator --theme=cern_caimira/apps/templates/{theme}
```
To run the entire app in a different `APPLICATION_ROOT` path:
```
python -m caimira.apps.calculator --app_root=/myroot
python -m cern_caimira.apps.calculator --app_root=/myroot
```
To run the calculator on a different URL path:
```
python -m caimira.apps.calculator --prefix=/mycalc
python -m cern_caimira.apps.calculator --prefix=/mycalc
```
Each of these commands will start a local version of CAiMIRA, which can be visited at http://localhost:8080/.
### How to compile and read the documentation
In order to generate the documentation, CAiMIRA must be installed first with the `doc` dependencies:
In order to generate the documentation, CAiMIRA must be installed first with the `doc` optional dependencies:
```
cd caimira
pip install -e .[doc]
```
To generate the HTML documentation page, the command `make html` should be executed in the `caimira/docs` directory.
To generate the HTML documentation page, the command `make html` should be executed in the `caimira/src/caimira/calculator/docs` directory.
If any of the `.rst` files under the `caimira/docs` folder is changed, this command should be executed again.
Then, right click on `caimira/docs/_build/html/index.html` and select `Open with` your preferred web browser.
Then, right click on `caimira/src/caimira/calculator/docs/_build/html/index.html` and select `Open with` your preferred web browser.
### Running the CAiMIRA Expert-App or CO2-App apps in development mode
@ -152,7 +175,7 @@ These applications only work within Jupyter notebooks. Attempting to run them ou
##### Prerequisites
Make sure you have the needed dependencies intalled:
Make sure you have the needed dependencies installed:
```
pip install notebook jupyterlab
@ -168,14 +191,27 @@ Running with Visual Studio Code (VSCode):
### Running the tests
The project contains test files that separately test the functionality of the `caimira` backend and `cern_caimira` UI.
To test the `caimira` package, from the root repository of the project:
```
cd caimira
pip install -e .[test]
pytest ./caimira
python -m pytest
```
To test the `cern_caimira` package, from the root repository of the project:
```
cd cern_caimira
pip install -e .[test]
python -m pytest
```
### Running the profiler
The profiler is enabled when the environment variable `CAIMIRA_PROFILER_ENABLED` is set to 1.
CAiMIRA includes a profiler designed to identify performance bottlenecks. The profiler is enabled when the environment variable `CAIMIRA_PROFILER_ENABLED` is set to 1.
When visiting http://localhost:8080/profiler, you can start a new session and choose between [PyInstrument](https://github.com/joerick/pyinstrument) or [cProfile](https://docs.python.org/3/library/profile.html#module-cProfile). The app includes two different profilers, mainly because they can give different information.
@ -183,6 +219,33 @@ Keep the profiler page open. Then, in another window, navigate to any page in CA
The sessions are stored in a local file in the `/tmp` folder. To share it across multiple web nodes, a shared storage should be added to all web nodes. The folder can be customized via the environment variable `CAIMIRA_PROFILER_CACHE_DIR`.
### CAiMIRA REST API Usage
From the root directory of the project:
1. Run the backend API:
```
python -m caimira.api.app
```
2. The Tornado server will run on port `8088`.
To test the API functionality, you can send a `POST` request to `http://localhost:8088/report` with the required inputs in the request body. For an example of the required inputs, see [this link](https://gitlab.cern.ch/caimira/caimira/-/blob/master/caimira/apps/calculator/model_generator.py?ref_type=heads#L492).
The response format will be:
```json
{
"status": "success",
"message": "Results generated successfully",
"report_data": {
...
},
...
}
```
### Building the whole environment for local development
**Simulate the docker build that takes place on openshift with:**
@ -249,14 +312,20 @@ Then, switch to the project that you want to update:
$ oc project caimira-test
```
Create a new service account in OpenShift to use GitLab container registry:
Create a new service account in OpenShift to access GitLab container registry:
```console
$ oc create serviceaccount gitlabci-deployer
serviceaccount "gitlabci-deployer" created
```
$ oc policy add-role-to-user registry-editor -z gitlabci-deployer
Grant `edit` permission to the service account to run `oc set image` from CI an update the tag to deploy:
```
$ oc policy add-role-to-user edit -z gitlabci-deployer
```
Get the service account token for GitLab:
```
# We will refer to the output of this command as `test-token`
$ oc serviceaccounts get-token gitlabci-deployer
<...test-token...>
@ -264,17 +333,6 @@ $ oc serviceaccounts get-token gitlabci-deployer
Add the token to GitLab to allow GitLab to access OpenShift and define/change image stream tags. Go to `Settings` -> `CI / CD` -> `Variables` -> click on `Expand` button and create the variable `OPENSHIFT_CAIMIRA_TEST_DEPLOY_TOKEN`: insert the token `<...test-token...>`.
Then, create the webhook secret to be able to trigger automatic builds from GitLab.
Create and store the secret. Copy the secret above and add it to the GitLab project under `CI /CD` -> `Variables` with the name `OPENSHIFT_CAIMIRA_TEST_WEBHOOK_SECRET`.
```console
$ WEBHOOKSECRET=$(openssl rand -hex 50)
$ oc create secret generic \
--from-literal="WebHookSecretKey=$WEBHOOKSECRET" \
gitlab-caimira-webhook-secret
```
For CI usage, we also suggest creating a service account:
```console

View file

@ -1,20 +0,0 @@
FROM registry.cern.ch/docker.io/library/python:3.9
# Copy just the requirements.txt initially, allowing Docker effectively to cache the build (good for dev).
COPY ./requirements.txt /tmp/requirements.txt
RUN python -m venv /opt/caimira/app
RUN sed '/\.\[/d' -i /tmp/requirements.txt && /opt/caimira/app/bin/pip install -r /tmp/requirements.txt
RUN apt-get update && apt-get install -y nginx
# Now that we have done the installation of the dependencies, copy the caimira source.
COPY ./ /opt/caimira/src
COPY ./app-config/caimira-public-docker-image/run_caimira.sh /opt/caimira/start.sh
# To ensure that we have installed the full requirements, re-run the pip install.
# In the best case this will be a no-op.
RUN cd /opt/caimira/src/ && /opt/caimira/app/bin/pip install -r /opt/caimira/src/requirements.txt
COPY ./app-config/caimira-public-docker-image/nginx.conf /opt/caimira/nginx.conf
EXPOSE 8080
ENTRYPOINT ["/bin/sh", "-c", "/opt/caimira/start.sh"]

View file

@ -1,51 +0,0 @@
worker_processes auto;
error_log /var/log/nginx/error.log;
pid /run/nginx.pid;
include /usr/share/nginx/modules/*.conf;
events {
worker_connections 1024;
}
http {
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
types_hash_max_size 2048;
include /etc/nginx/mime.types;
default_type application/octet-stream;
large_client_header_buffers 4 16k;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 86400;
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
server {
listen 8080 default_server;
listen [::]:8080 default_server;
server_name _;
root /opt/caimira/src;
# Load configuration files for the default server block.
include /opt/app-root/etc/nginx.default.d/*.conf;
large_client_header_buffers 4 16k;
location / {
proxy_pass http://localhost:8081;
}
}
}

View file

@ -1,10 +0,0 @@
echo 'CAiMIRA is running on http://localhost:8080'
echo 'Please see https://gitlab.cern.ch/caimira/caimira for terms of use.'
# Run a proxy for the apps (listening on 8080).
nginx -c /opt/caimira/nginx.conf
cd /opt/caimira/src/caimira
# Run the calculator in the foreground.
/opt/caimira/app/bin/python -m caimira.apps.calculator --port 8081 --no-debug

View file

@ -1,22 +1,33 @@
FROM registry.cern.ch/docker.io/condaforge/mambaforge as conda
FROM registry.cern.ch/docker.io/condaforge/mambaforge AS conda
ARG PYTHON_VERSION=3.12
RUN mamba create --yes -p /opt/app python=${PYTHON_VERSION}
RUN mamba create --yes -p /opt/app python=3.9
COPY . /opt/app-source
RUN cd /opt/app-source && conda run -p /opt/app python -m pip install -r ./requirements.txt .[app]
WORKDIR /opt/app-source
# install Python deps
RUN cd cern_caimira \
&& conda run -p /opt/app python -m pip install -r requirements.txt
RUN cd caimira \
&& conda run -p /opt/app python -m pip install .
RUN cd cern_caimira \
&& conda run -p /opt/app python -m pip install .
COPY app-config/calculator-app/app.sh /opt/app/bin/calculator-app.sh
RUN cd /opt/app \
&& find -name '*.a' -delete \
&& rm -rf /opt/app/conda-meta \
&& rm -rf /opt/app/include \
&& find -name '__pycache__' -type d -exec rm -rf '{}' '+' \
&& rm -rf /opt/app/lib/python*/site-packages/pip /opt/app/lib/python*/idlelib /opt/app/lib/python*/ensurepip \
/opt/app/bin/x86_64-conda-linux-gnu-ld \
/opt/app/bin/sqlite3 \
/opt/app/bin/openssl \
/opt/app/share/terminfo \
&& find /opt/app/lib/ -name 'tests' -type d -exec rm -rf '{}' '+' \
&& find /opt/app/lib -name '*.pyx' -delete \
;
&& find -name '*.a' -delete \
&& rm -rf /opt/app/conda-meta \
&& rm -rf /opt/app/include \
&& find -name '__pycache__' -type d -exec rm -rf '{}' '+' \
&& rm -rf /opt/app/lib/python*/site-packages/pip /opt/app/lib/python*/idlelib /opt/app/lib/python*/ensurepip \
/opt/app/bin/x86_64-conda-linux-gnu-ld \
/opt/app/bin/sqlite3 \
/opt/app/bin/openssl \
/opt/app/share/terminfo \
&& find /opt/app/lib/ -name 'tests' -type d -exec rm -rf '{}' '+' \
&& find /opt/app/lib -name '*.pyx' -delete \
;
FROM registry.cern.ch/docker.io/library/debian
@ -25,12 +36,10 @@ ENV PATH=/opt/app/bin/:$PATH
# Make a convenient location to the installed CAiMIRA package (i.e. a directory called caimira in the CWD).
# It is important that this directory is also writable by a non-root user.
RUN mkdir -p /scratch \
&& chmod a+wx /scratch
&& chmod a+wx /scratch
# Set the HOME directory to something that anybody can write to (to support non root users, such as on openshift).
ENV HOME=/scratch
WORKDIR /scratch
RUN CAIMIRA_INIT_FILE=$(/opt/app/bin/python -c "import caimira; print(caimira.__file__)") \
&& ln -s $(dirname ${CAIMIRA_INIT_FILE}) /scratch/caimira
CMD [ \
"calculator-app.sh" \
]
RUN CERN_CAIMIRA_INIT_FILE=$(python -c "import cern_caimira; print(cern_caimira.__file__)") \
&& ln -s $(dirname ${CERN_CAIMIRA_INIT_FILE}) /scratch/cern_caimira
CMD [ "calculator-app.sh" ]

View file

@ -16,7 +16,7 @@ if [[ "$APP_NAME" == "calculator-app" ]]; then
if [ ! -z "$CAIMIRA_THEME" ]; then
args+=("--theme=${CAIMIRA_THEME}")
fi
export "ARVE_API_KEY"="$ARVE_API_KEY"
export "ARVE_CLIENT_ID"="$ARVE_CLIENT_ID"
export "ARVE_CLIENT_SECRET"="$ARVE_CLIENT_SECRET"
@ -26,8 +26,8 @@ if [[ "$APP_NAME" == "calculator-app" ]]; then
export "DATA_SERVICE_ENABLED"="${DATA_SERVICE_ENABLED:=0}"
export "CAIMIRA_PROFILER_ENABLED"="${CAIMIRA_PROFILER_ENABLED:=0}"
echo "Starting the caimira webservice with: python -m caimira.apps.calculator ${args[@]}"
python -m caimira.apps.calculator "${args[@]}"
echo "Starting the caimira webservice with: python -m cern_caimira.apps.calculator ${args[@]}"
python -m cern_caimira.apps.calculator "${args[@]}"
else
echo "No APP_NAME specified"

View file

@ -1,6 +1,5 @@
version: "3.8"
services:
calculator-app:
image: calculator-app
environment:
@ -8,7 +7,7 @@ services:
- APP_NAME=calculator-app
- APPLICATION_ROOT=/
- CAIMIRA_CALCULATOR_PREFIX=/calculator-cern
- CAIMIRA_THEME=caimira/apps/templates/cern
- CAIMIRA_THEME=ui/apps/templates/cern
- DATA_SERVICE_ENABLED=0
- CAIMIRA_PROFILER_ENABLED=0
user: ${CURRENT_UID}

View file

@ -1,103 +1,82 @@
worker_processes auto;
error_log /var/log/nginx/error.log;
pid /run/nginx.pid;
tcp_nopush on;
tcp_nodelay on;
types_hash_max_size 2048;
include /usr/share/nginx/modules/*.conf;
server {
listen 8080 default_server;
listen [::]:8080 default_server;
server_name _;
root /opt/app-root/src;
events {
worker_connections 1024;
}
# Load configuration files for the default server block.
include /opt/app-root/etc/nginx.default.d/*.conf;
http {
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
large_client_header_buffers 4 16k;
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
types_hash_max_size 2048;
error_page 404 /404.html;
location = /40x.html {
}
include /etc/nginx/mime.types;
default_type application/octet-stream;
error_page 500 502 503 504 /50x.html;
location = /50x.html {
}
server {
listen 8080 default_server;
listen [::]:8080 default_server;
server_name _;
root /opt/app-root/src;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 86400;
# Load configuration files for the default server block.
include /opt/app-root/etc/nginx.default.d/*.conf;
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
large_client_header_buffers 4 16k;
error_page 404 /404.html;
location = /40x.html {
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
}
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 86400;
location /auth {
proxy_pass_request_body off;
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Content-Length "";
proxy_set_header If-None-Match "";
proxy_pass http://auth-service:8080;
}
location /auth {
proxy_pass_request_body off;
location @error401 {
# Store the request_uri (complete with args) to be redirected to
# when we hit /auth/complete.
add_header Set-Cookie "POST_AUTH_REDIRECT=$request_uri;";
return 302 /auth/login;
}
proxy_set_header Host $http_host;
proxy_set_header Content-Length "";
proxy_set_header If-None-Match "";
proxy_pass http://auth-service:8080;
}
location @proxy_404_error_handler {
# Pass the request on to the webservice. Most likely the URI won't
# exist so we get a 404 from that service instead (good as the 404
# pages are consistent).
proxy_pass http://calculator-app:8080/$request_uri;
}
location @error401 {
# Store the request_uri (complete with args) to be redirected to
# when we hit /auth/complete.
add_header Set-Cookie "POST_AUTH_REDIRECT=$request_uri;";
return 302 /auth/login;
}
# Redirect URLs to the new scheme.
absolute_redirect off;
location @proxy_404_error_handler {
# Pass the request on to the webservice. Most likely the URI won't
# exist so we get a 404 from that service instead (good as the 404
# pages are consistent).
proxy_pass http://calculator-app:8080/$request_uri;
}
location / {
# By default we have no authentication.
proxy_pass http://calculator-app:8080;
}
# Redirect URLs to the new scheme.
absolute_redirect off;
location /calculator {
return 302 /calculator-cern$is_args$args;
}
location / {
# By default we have no authentication.
proxy_pass http://calculator-app:8080;
}
location /calculator-cern {
# CERN calculator is authenticated.
auth_request /auth/probe;
error_page 401 = @error401;
location /calculator {
return 302 /calculator-cern$is_args$args;
}
# calculator-app is the name of the tornado server (for the calculator)
# in each of docker-compose, caimira-test.web.cern.ch and caimira.web.cern.ch.
proxy_pass http://calculator-app:8080/calculator-cern;
}
location /calculator-cern {
# CERN calculator is authenticated.
auth_request /auth/probe;
error_page 401 = @error401;
# calculator-app is the name of the tornado server (for the calculator)
# in each of docker-compose, caimira-test.web.cern.ch and caimira.web.cern.ch.
proxy_pass http://calculator-app:8080/calculator-cern;
}
location /calculator-open {
# Public open calculator
proxy_pass http://calculator-open-app:8080/calculator-open;
}
location /calculator-open {
# Public open calculator
proxy_pass http://calculator-open-app:8080/calculator-open;
}
}

View file

@ -303,3 +303,4 @@
- name: PROJECT_NAME
description: The name of this project, e.g. caimira-test
required: true

View file

@ -14,4 +14,4 @@ sphinx:
python:
install:
- requirements: caimira/docs/requirements.txt
- requirements: caimira/docs/requirements.txt

View file

@ -1,4 +0,0 @@
from .expert import ExpertApplication
from .expert_co2 import CO2Application
__all__ = ['ExpertApplication', 'CO2Application']

View file

@ -1,68 +0,0 @@
import dataclasses
import typing
from caimira.models import CO2DataModel, Interval, IntPiecewiseConstant
from .co2_model_generator import CO2FormData
@dataclasses.dataclass
class CO2ReportGenerator:
def build_initial_plot(
self,
form: CO2FormData,
) -> dict:
'''
Initial plot with the suggested ventilation state changes.
This method receives the form input and returns the CO2
plot with the respective transition times.
'''
CO2model: CO2DataModel = form.build_model()
occupancy_transition_times = list(CO2model.occupancy.transition_times)
ventilation_transition_times: list = form.find_change_points()
# The entire ventilation changes consider the initial and final occupancy state change
all_vent_transition_times: list = sorted(
[occupancy_transition_times[0]] +
[occupancy_transition_times[-1]] +
ventilation_transition_times)
ventilation_plot: str = form.generate_ventilation_plot(
ventilation_transition_times=all_vent_transition_times,
occupancy_transition_times=occupancy_transition_times
)
context = {
'CO2_plot': ventilation_plot,
'transition_times': [round(el, 2) for el in all_vent_transition_times],
}
return context
def build_fitting_results(
self,
form: CO2FormData,
) -> dict:
'''
Final fitting results with the respective predictive CO2.
This method receives the form input and returns the fitting results
along with the CO2 plot with the predictive CO2.
'''
CO2model: CO2DataModel = form.build_model()
# Ventilation times after user manipulation from the suggested ventilation state change times.
ventilation_transition_times = list(CO2model.ventilation_transition_times)
# The result of the following method is a dict with the results of the fitting
# algorithm, namely the breathing rate and ACH values. It also returns the
# predictive CO2 result based on the fitting results.
context: typing.Dict = dict(CO2model.CO2_fit_params())
# Add the transition times and CO2 plot to the results.
context['transition_times'] = ventilation_transition_times
context['CO2_plot'] = form.generate_ventilation_plot(ventilation_transition_times=ventilation_transition_times[:-1],
predictive_CO2=context['predictive_CO2'])
return context

70
caimira/pyproject.toml Normal file
View file

@ -0,0 +1,70 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "caimira"
version = "4.17.0a1"
description = "CAiMIRA - CERN Airborne Model for Indoor Risk Assessment"
license = { text = "Apache-2.0" }
authors = [
{ name = "Andre Henriques", email = "andre.henriques@cern.ch" }
]
classifiers = [
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
"License :: OSI Approved :: Apache Software License",
]
requires-python = ">=3.9"
dependencies = [
"matplotlib",
"memoization",
"mistune",
"numpy",
"pandas",
"pyinstrument",
"python-dateutil",
"requests",
"retry",
"ruptures",
"scipy",
"scikit-learn",
"tabulate",
"timezonefinder",
"tornado",
]
[project.optional-dependencies]
dev = []
test = [
"pytest",
"pytest-mypy >= 0.10.3",
"mypy >= 1.0.0",
"pytest-tornasync",
"numpy-stubs @ git+https://github.com/numpy/numpy-stubs.git",
"types-dataclasses",
"types-python-dateutil",
"types-requests",
"types-retry",
]
doc = [
"sphinx",
"sphinx_rtd_theme"
]
[project.urls]
Homepage = "https://github.com/cern/caimira"
[tool.setuptools.package-data]
caimira = ["**/*"]
[tool.pytest.ini_options]
addopts = "--mypy"
[tool.mypy]
no_warn_no_return = true
exclude = "caimira/profiler.py"
[[tool.mypy.overrides]]
module = ["pandas", "ruptures", "scipy.*", "setuptools", "sklearn.*", "tabulate"]
ignore_missing_imports = true

2
caimira/setup.cfg Normal file
View file

@ -0,0 +1,2 @@
[tool:pytest]
addopts = --mypy

View file

@ -0,0 +1,3 @@
import importlib.metadata
__version__ = importlib.metadata.version(__package__ or __name__)

View file

@ -0,0 +1,34 @@
# """
# Entry point for the CAiMIRA application
# """
import tornado.ioloop
import tornado.web
import tornado.log
from tornado.options import define, options
import logging
from caimira.api.routes.report_routes import VirusReportHandler, CO2ReportHandler
define("port", default=8088, help="Port to listen on", type=int)
logging.basicConfig(format="%(message)s", level=logging.INFO)
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/co2_report", CO2ReportHandler),
(r"/virus_report", VirusReportHandler),
]
settings = dict(
debug=True,
)
super().__init__(handlers, **settings)
if __name__ == "__main__":
app = Application()
app.listen(options.port)
logging.info(f"Tornado server is running on port {options.port}")
tornado.ioloop.IOLoop.current().start()

View file

@ -0,0 +1,26 @@
from caimira.calculator.validators.co2.co2_validator import CO2FormData
from caimira.calculator.store.data_registry import DataRegistry
def generate_form_obj(form_data, data_registry):
return CO2FormData.from_dict(form_data=form_data, data_registry=data_registry)
def generate_model(form_obj, data_registry):
sample_size = data_registry.monte_carlo['sample_size']
return form_obj.build_model(sample_size=sample_size)
def generate_report(model):
return dict(model.CO2_fit_params())
def submit_CO2_form(form_data):
data_registry = DataRegistry()
form_obj = generate_form_obj(
form_data=form_data, data_registry=data_registry)
model = generate_model(form_obj=form_obj, data_registry=data_registry)
report_data = generate_report(model=model)
return report_data

View file

@ -0,0 +1,37 @@
import concurrent.futures
import functools
from caimira.calculator.validators.virus.virus_validator import VirusFormData
from caimira.calculator.store.data_registry import DataRegistry
import caimira.calculator.report.virus_report_data as rg
def generate_form_obj(form_data, data_registry):
return VirusFormData.from_dict(
form_data=form_data,
data_registry=data_registry,
)
def generate_model(form_obj, data_registry):
sample_size = data_registry.monte_carlo['sample_size']
return form_obj.build_model(sample_size=sample_size)
def generate_report_results(form_obj):
return rg.calculate_report_data(
form=form_obj,
executor_factory=functools.partial(
concurrent.futures.ThreadPoolExecutor, None, # TODO define report_parallelism
),
)
def submit_virus_form(form_data):
data_registry = DataRegistry
form_obj = generate_form_obj(form_data=form_data, data_registry=data_registry)
model = generate_model(form_obj=form_obj, data_registry=data_registry)
report_data = generate_report_results(form_obj=form_obj, model=model)
return report_data

View file

@ -0,0 +1,59 @@
import json
import traceback
import tornado.web
import sys
from caimira.api.controller.virus_report_controller import submit_virus_form
from caimira.api.controller.co2_report_controller import submit_CO2_form
class BaseReportHandler(tornado.web.RedirectHandler):
def set_default_headers(self):
self.set_header("Access-Control-Allow-Origin", "*")
self.set_header("Access-Control-Allow-Headers", "x-requested-with")
self.set_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
def write_error(self, status_code, **kwargs):
self.set_status(status_code)
self.write({"message": kwargs.get('exc_info')[1].__str__()})
class VirusReportHandler(BaseReportHandler):
def post(self):
try:
form_data = json.loads(self.request.body)
report_data = submit_virus_form(form_data)
response_data = {
"status": "success",
"message": "Results generated successfully",
"report_data": report_data,
}
self.write(response_data)
except Exception as e:
traceback.print_exc()
self.write_error(status_code=400, exc_info=sys.exc_info())
class CO2ReportHandler(tornado.web.RequestHandler):
def set_default_headers(self):
self.set_header("Access-Control-Allow-Origin", "*")
self.set_header("Access-Control-Allow-Headers", "x-requested-with")
self.set_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
def post(self):
try:
form_data = json.loads(self.request.body)
report_data = submit_CO2_form(form_data)
response_data = {
"status": "success",
"message": "Results generated successfully",
"report_data": report_data,
}
self.write(response_data)
except Exception as e:
traceback.print_exc()
self.write_error(status_code=400, exc_info=sys.exc_info())

View file

Before

Width:  |  Height:  |  Size: 805 KiB

After

Width:  |  Height:  |  Size: 805 KiB

View file

@ -3,4 +3,4 @@ sphinx-rtd-theme==1.2.2
pillow==5.4.1
mock==1.0.1
commonmark==0.9.1
recommonmark==0.5.0
recommonmark==0.5.0

View file

@ -4,5 +4,3 @@
Documentation for the CAiMIRA package
"""
__version__ = "1.0.0"

View file

@ -1,6 +1,6 @@
import numpy as np
from caimira import models
from caimira.data.weather import wx_data, nearest_wx_station
from caimira.calculator.models import models
from .weather import wx_data, nearest_wx_station
MONTH_NAMES = [
'January', 'February', 'March', 'April', 'May', 'June', 'July',

View file

@ -40,7 +40,7 @@ from scipy.interpolate import interp1d
import scipy.stats as sct
from scipy.optimize import minimize
from caimira.store.data_registry import DataRegistry
from caimira.calculator.store.data_registry import DataRegistry
if not typing.TYPE_CHECKING:
from memoization import cached

View file

@ -7,11 +7,11 @@ import numpy as np
from scipy import special as sp
from scipy.stats import weibull_min
from caimira.enums import ViralLoads
from ..enums import ViralLoads
import caimira.monte_carlo.models as mc
from caimira.monte_carlo.sampleable import LogCustom, LogNormal, Normal, LogCustomKernel, CustomKernel, Uniform, Custom
from caimira.store.data_registry import DataRegistry
import caimira.calculator.models.monte_carlo.models as mc
from caimira.calculator.models.monte_carlo.sampleable import LogCustom, LogNormal, Normal, LogCustomKernel, CustomKernel, Uniform, Custom
from caimira.calculator.store.data_registry import DataRegistry
def evaluate_vl(root: typing.Dict, value: str, data_registry: DataRegistry):

View file

@ -3,7 +3,7 @@ import dataclasses
import sys
import typing
import caimira.models
from caimira.calculator.models import models
from .sampleable import SampleableDistribution, _VectorisedFloatOrSampleable
@ -57,7 +57,7 @@ def _build_mc_model(model: dataclass_instance) -> typing.Type[MCModelBase[_Model
# Note: deepcopy not needed here as we aren't mutating entities beyond
# the top level.
new_field = copy.copy(field)
if field.type is caimira.models._VectorisedFloat: # noqa
if field.type is models._VectorisedFloat: # noqa
new_field.type = _VectorisedFloatOrSampleable # type: ignore
field_type: typing.Any = new_field.type
@ -65,30 +65,30 @@ def _build_mc_model(model: dataclass_instance) -> typing.Type[MCModelBase[_Model
if getattr(field_type, '__origin__', None) in [typing.Union, typing.Tuple]:
# It is challenging to generalise this code, so we provide specific transformations,
# and raise for unforseen cases.
if new_field.type == typing.Tuple[caimira.models._VentilationBase, ...]:
if new_field.type == typing.Tuple[models._VentilationBase, ...]:
VB = getattr(sys.modules[__name__], "_VentilationBase")
field_type = typing.Tuple[typing.Union[caimira.models._VentilationBase, VB], ...]
elif new_field.type == typing.Tuple[caimira.models._ExpirationBase, ...]:
field_type = typing.Tuple[typing.Union[models._VentilationBase, VB], ...]
elif new_field.type == typing.Tuple[models._ExpirationBase, ...]:
EB = getattr(sys.modules[__name__], "_ExpirationBase")
field_type = typing.Tuple[typing.Union[caimira.models._ExpirationBase, EB], ...]
elif new_field.type == typing.Tuple[caimira.models.SpecificInterval, ...]:
field_type = typing.Tuple[typing.Union[models._ExpirationBase, EB], ...]
elif new_field.type == typing.Tuple[models.SpecificInterval, ...]:
SI = getattr(sys.modules[__name__], "SpecificInterval")
field_type = typing.Tuple[typing.Union[caimira.models.SpecificInterval, SI], ...]
field_type = typing.Tuple[typing.Union[models.SpecificInterval, SI], ...]
elif new_field.type == typing.Union[int, caimira.models.IntPiecewiseConstant]:
elif new_field.type == typing.Union[int, models.IntPiecewiseConstant]:
IPC = getattr(sys.modules[__name__], "IntPiecewiseConstant")
field_type = typing.Union[int, caimira.models.IntPiecewiseConstant, IPC]
elif new_field.type == typing.Union[caimira.models.Interval, None]:
field_type = typing.Union[int, models.IntPiecewiseConstant, IPC]
elif new_field.type == typing.Union[models.Interval, None]:
I = getattr(sys.modules[__name__], "Interval")
field_type = typing.Union[None, caimira.models.Interval, I]
field_type = typing.Union[None, models.Interval, I]
else:
# Check that we don't need to do anything with this type.
for item in new_field.type.__args__:
if getattr(item, '__module__', None) == 'caimira.models':
if getattr(item, '__module__', None) == 'source.models.models':
raise ValueError(
f"unsupported type annotation transformation required for {new_field.type}")
elif field_type.__module__ == 'caimira.models':
elif field_type.__module__ == 'source.models.models':
mc_model = getattr(sys.modules[__name__], new_field.type.__name__)
field_type = typing.Union[new_field.type, mc_model]
@ -119,7 +119,7 @@ def _build_mc_model(model: dataclass_instance) -> typing.Type[MCModelBase[_Model
_MODEL_CLASSES = [
cls for cls in vars(caimira.models).values()
cls for cls in vars(models).values()
if dataclasses.is_dataclass(cls)
]

View file

@ -1,9 +1,9 @@
import typing
import numpy as np
from sklearn.neighbors import KernelDensity # type: ignore
from sklearn.neighbors import KernelDensity
import caimira.models
from caimira.calculator.models import models
# Declare a float array type of a given size.
# There is no better way to declare this currently, unfortunately.
@ -158,5 +158,5 @@ class LogCustomKernel(SampleableDistribution):
_VectorisedFloatOrSampleable = typing.Union[
SampleableDistribution, caimira.models._VectorisedFloat,
SampleableDistribution, models._VectorisedFloat,
]

View file

@ -0,0 +1,60 @@
from caimira.calculator.validators.co2.co2_validator import CO2FormData
from caimira.calculator.models.models import CO2DataModel
def build_initial_plot(
form: CO2FormData,
) -> dict:
'''
Initial plot with the suggested ventilation state changes.
This method receives the form input and returns the CO2
plot with the respective transition times.
'''
CO2model: CO2DataModel = form.build_model()
occupancy_transition_times = list(CO2model.occupancy.transition_times)
ventilation_transition_times: list = form.find_change_points()
# The entire ventilation changes consider the initial and final occupancy state change
all_vent_transition_times: list = sorted(
[occupancy_transition_times[0]] +
[occupancy_transition_times[-1]] +
ventilation_transition_times)
ventilation_plot: str = form.generate_ventilation_plot(
ventilation_transition_times=all_vent_transition_times,
occupancy_transition_times=occupancy_transition_times
)
context = {
'CO2_plot': ventilation_plot,
'transition_times': [round(el, 2) for el in all_vent_transition_times],
}
return context
def build_fitting_results(
form: CO2FormData,
) -> dict:
'''
Final fitting results with the respective predictive CO2.
This method receives the form input and returns the fitting results
along with the CO2 plot with the predictive CO2.
'''
CO2model: CO2DataModel = form.build_model()
# Ventilation times after user manipulation from the suggested ventilation state change times.
ventilation_transition_times = list(CO2model.ventilation_transition_times)
# The result of the following method is a dict with the results of the fitting
# algorithm, namely the breathing rate and ACH values. It also returns the
# predictive CO2 result based on the fitting results.
context = dict(CO2model.CO2_fit_params())
# Add the transition times and CO2 plot to the results.
context['transition_times'] = ventilation_transition_times
context['CO2_plot'] = form.generate_ventilation_plot(ventilation_transition_times=ventilation_transition_times[:-1],
predictive_CO2=context['predictive_CO2'])
return context

View file

@ -1,25 +1,14 @@
import concurrent.futures
import base64
import dataclasses
from datetime import datetime
import io
import json
import typing
import urllib
import zlib
import jinja2
import numpy as np
import matplotlib.pyplot as plt
from caimira import models
from caimira.apps.calculator import markdown_tools
from caimira.profiler import profile
from caimira.store.data_registry import DataRegistry
from ... import monte_carlo as mc
from .model_generator import VirusFormData
from ... import dataclass_utils
from caimira.enums import ViralLoads
from caimira.calculator.models import models, dataclass_utils, profiler, monte_carlo as mc
from caimira.calculator.models.enums import ViralLoads
from caimira.calculator.validators.virus.virus_validator import VirusFormData
def model_start_end(model: models.ExposureModel):
@ -96,7 +85,8 @@ def interesting_times(model: models.ExposureModel, approx_n_pts: typing.Optional
"""
times = non_temp_transition_times(model)
sim_duration = max(times) - min(times)
if not approx_n_pts: approx_n_pts = sim_duration * 15
if not approx_n_pts:
approx_n_pts = sim_duration * 15
# Expand the times list to ensure that we have a maximum gap size between
# the key times.
@ -110,31 +100,42 @@ def concentrations_with_sr_breathing(form: VirusFormData, model: models.Exposure
for index, (start, stop) in enumerate(short_range_intervals):
# For visualization issues, add short-range breathing activity to the initial long-range concentrations
if start <= time <= stop and form.short_range_interactions[index]['expiration'] == 'Breathing':
lower_concentrations.append(np.array(model.concentration(float(time))).mean())
lower_concentrations.append(
np.array(model.concentration(float(time))).mean())
break
lower_concentrations.append(np.array(model.concentration_model.concentration(float(time))).mean())
lower_concentrations.append(
np.array(model.concentration_model.concentration(float(time))).mean())
return lower_concentrations
def _calculate_deposited_exposure(model, time1, time2, fn_name=None):
return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(),fn_name
return np.array(model.deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name
def _calculate_long_range_deposited_exposure(model, time1, time2, fn_name=None):
return np.array(model.long_range_deposited_exposure_between_bounds(float(time1), float(time2))).mean(), fn_name
def _calculate_co2_concentration(CO2_model, time, fn_name=None):
return np.array(CO2_model.concentration(float(time))).mean(), fn_name
@profile
def calculate_report_data(form: VirusFormData, model: models.ExposureModel, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]:
@profiler.profile
def calculate_report_data(form: VirusFormData, executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]:
model: models.ExposureModel = form.build_model()
times = interesting_times(model)
short_range_intervals = [interaction.presence.boundaries()[0] for interaction in model.short_range]
short_range_expirations = [interaction['expiration'] for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else []
short_range_intervals = [interaction.presence.boundaries()[0]
for interaction in model.short_range]
short_range_expirations = [interaction['expiration']
for interaction in form.short_range_interactions] if form.short_range_option == "short_range_yes" else []
concentrations = [
np.array(model.concentration(float(time))).mean()
for time in times
]
lower_concentrations = concentrations_with_sr_breathing(form, model, times, short_range_intervals)
lower_concentrations = concentrations_with_sr_breathing(
form, model, times, short_range_intervals)
CO2_model: models.CO2ConcentrationModel = form.build_CO2_model()
@ -146,12 +147,16 @@ def calculate_report_data(form: VirusFormData, model: models.ExposureModel, exec
tasks = []
with executor_factory() as executor:
for time1, time2 in zip(times[:-1], times[1:]):
tasks.append(executor.submit(_calculate_deposited_exposure, model, time1, time2, fn_name="de"))
tasks.append(executor.submit(_calculate_long_range_deposited_exposure, model, time1, time2, fn_name="lr"))
tasks.append(executor.submit(
_calculate_deposited_exposure, model, time1, time2, fn_name="de"))
tasks.append(executor.submit(
_calculate_long_range_deposited_exposure, model, time1, time2, fn_name="lr"))
# co2 concentration: takes each time as param, not the interval
tasks.append(executor.submit(_calculate_co2_concentration, CO2_model, time1, fn_name="co2"))
tasks.append(executor.submit(
_calculate_co2_concentration, CO2_model, time1, fn_name="co2"))
# co2 concentration: calculate the last time too
tasks.append(executor.submit(_calculate_co2_concentration, CO2_model, times[-1], fn_name="co2"))
tasks.append(executor.submit(_calculate_co2_concentration,
CO2_model, times[-1], fn_name="co2"))
for task in tasks:
result, fn_name = task.result()
@ -166,22 +171,27 @@ def calculate_report_data(form: VirusFormData, model: models.ExposureModel, exec
long_range_cumulative_doses = np.cumsum(long_range_deposited_exposures)
prob = np.array(model.infection_probability())
prob_dist_count, prob_dist_bins = np.histogram(prob/100, bins=100, density=True)
prob_probabilistic_exposure = np.array(model.total_probability_rule()).mean()
prob_dist_count, prob_dist_bins = np.histogram(
prob/100, bins=100, density=True)
prob_probabilistic_exposure = np.array(
model.total_probability_rule()).mean()
expected_new_cases = np.array(model.expected_new_cases()).mean()
exposed_presence_intervals = [list(interval) for interval in model.exposed.presence_interval().boundaries()]
exposed_presence_intervals = [
list(interval) for interval in model.exposed.presence_interval().boundaries()]
conditional_probability_data = None
uncertainties_plot_src = None
if (form.conditional_probability_viral_loads and
model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] == ViralLoads.COVID_OVERALL.value): # type: ignore
# Generate all the required data for the conditional probability plot
conditional_probability_data = manufacture_conditional_probability_data(model, prob)
# Generate the matplotlib image based on the received data
uncertainties_plot_src = img2base64(_figure2bytes(uncertainties_plot(prob, conditional_probability_data)))
if (form.conditional_probability_viral_loads and
model.data_registry.virological_data['virus_distributions'][form.virus_type]['viral_load_in_sputum'] == ViralLoads.COVID_OVERALL.value): # type: ignore
# Generate all the required data for the conditional probability plot
conditional_probability_data = manufacture_conditional_probability_data(
model, prob)
# Generate the matplotlib image based on the received data
uncertainties_plot_src = img2base64(_figure2bytes(
uncertainties_plot(prob, conditional_probability_data)))
return {
"model_repr": repr(model),
"model": model,
"times": list(times),
"exposed_presence_intervals": exposed_presence_intervals,
"short_range_intervals": short_range_intervals,
@ -203,31 +213,12 @@ def calculate_report_data(form: VirusFormData, model: models.ExposureModel, exec
}
def generate_permalink(base_url, get_root_url, get_root_calculator_url, form: VirusFormData):
form_dict = VirusFormData.to_dict(form, strip_defaults=True)
# Generate the calculator URL arguments that would be needed to re-create this
# form.
args = urllib.parse.urlencode(form_dict)
# Then zlib compress + base64 encode the string. To be inverted by the
# /_c/ endpoint.
compressed_args = base64.b64encode(zlib.compress(args.encode())).decode()
qr_url = f"{base_url}{get_root_url()}/_c/{compressed_args}"
url = f"{base_url}{get_root_calculator_url()}?{args}"
return {
'link': url,
'shortened': qr_url,
}
def conditional_prob_inf_given_vl_dist(
infection_probability: models._VectorisedFloat,
viral_loads: np.ndarray,
specific_vl: float,
step: models._VectorisedFloat
):
infection_probability: models._VectorisedFloat,
viral_loads: np.ndarray,
specific_vl: float,
step: models._VectorisedFloat
):
pi_means = []
lower_percentiles = []
@ -235,8 +226,9 @@ def conditional_prob_inf_given_vl_dist(
for vl_log in viral_loads:
# Probability of infection corresponding to a certain viral load value in the distribution
specific_prob = infection_probability[np.where((vl_log-step/2-specific_vl)*(vl_log+step/2-specific_vl)<0)[0]] #type: ignore
specific_prob = infection_probability[np.where(
(vl_log-step/2-specific_vl)*(vl_log+step/2-specific_vl) < 0)[0]] # type: ignore
pi_means.append(specific_prob.mean())
lower_percentiles.append(np.quantile(specific_prob, 0.05))
upper_percentiles.append(np.quantile(specific_prob, 0.95))
@ -252,14 +244,16 @@ def manufacture_conditional_probability_data(
max_vl = 10
step = (max_vl - min_vl)/100
viral_loads = np.arange(min_vl, max_vl, step)
specific_vl = np.log10(exposure_model.concentration_model.virus.viral_load_in_sputum)
specific_vl = np.log10(
exposure_model.concentration_model.virus.viral_load_in_sputum)
pi_means, lower_percentiles, upper_percentiles = conditional_prob_inf_given_vl_dist(infection_probability, viral_loads,
specific_vl, step)
log10_vl_in_sputum = np.log10(exposure_model.concentration_model.infected.virus.viral_load_in_sputum)
log10_vl_in_sputum = np.log10(
exposure_model.concentration_model.infected.virus.viral_load_in_sputum)
return {
'viral_loads': list(viral_loads),
'pi_means': list(pi_means),
'viral_loads': list(viral_loads),
'pi_means': list(pi_means),
'lower_percentiles': list(lower_percentiles),
'upper_percentiles': list(upper_percentiles),
'log10_vl_in_sputum': list(log10_vl_in_sputum),
@ -274,21 +268,23 @@ def uncertainties_plot(infection_probability: models._VectorisedFloat,
lower_percentiles: list = conditional_probability_data['lower_percentiles']
upper_percentiles: list = conditional_probability_data['upper_percentiles']
log10_vl_in_sputum: list = conditional_probability_data['log10_vl_in_sputum']
fig, ((axs00, axs01, axs02), (axs10, axs11, axs12)) = plt.subplots(nrows=2, ncols=3, # type: ignore
gridspec_kw={'width_ratios': [5, 0.5] + [1],
'height_ratios': [3, 1], 'wspace': 0},
sharey='row',
sharex='col')
fig, ((axs00, axs01, axs02), (axs10, axs11, axs12)) = plt.subplots(nrows=2, ncols=3, # type: ignore
gridspec_kw={'width_ratios': [5, 0.5] + [1],
'height_ratios': [3, 1], 'wspace': 0},
sharey='row',
sharex='col')
axs01.axis('off')
axs11.axis('off')
axs12.axis('off')
axs01.set_visible(False)
axs00.plot(viral_loads, np.array(pi_means), label='Predictive total probability')
axs00.fill_between(viral_loads, np.array(lower_percentiles), np.array(upper_percentiles), alpha=0.1, label='5ᵗʰ and 95ᵗʰ percentile')
axs00.plot(viral_loads, np.array(pi_means),
label='Predictive total probability')
axs00.fill_between(viral_loads, np.array(lower_percentiles), np.array(
upper_percentiles), alpha=0.1, label='5ᵗʰ and 95ᵗʰ percentile')
axs02.hist(infection_probability, bins=30, orientation='horizontal')
axs02.set_xticks([])
@ -299,9 +295,9 @@ def uncertainties_plot(infection_probability: models._VectorisedFloat,
axs02.set_xlim(0, highest_bar)
axs02.text(highest_bar * 0.5, 50,
"$P(I)=$\n" + rf"$\bf{np.round(np.mean(infection_probability), 1)}$%", ha='center', va='center')
"$P(I)=$\n" + rf"$\bf{np.round(np.mean(infection_probability), 1)}$%", ha='center', va='center')
axs10.hist(log10_vl_in_sputum,
bins=150, range=(2, 10), color='grey')
bins=150, range=(2, 10), color='grey')
axs10.set_facecolor("lightgrey")
axs10.set_yticks([])
axs10.set_yticklabels([])
@ -319,17 +315,11 @@ def uncertainties_plot(infection_probability: models._VectorisedFloat,
return fig
def _img2bytes(figure):
# Draw the image
img_data = io.BytesIO()
figure.save(img_data, format='png', bbox_inches="tight")
return img_data
def _figure2bytes(figure):
# Draw the image
img_data = io.BytesIO()
figure.savefig(img_data, format='png', bbox_inches="tight", transparent=True, dpi=110)
figure.savefig(img_data, format='png', bbox_inches="tight",
transparent=True, dpi=110)
return img_data
@ -340,79 +330,38 @@ def img2base64(img_data) -> str:
return f'data:image/png;base64,{pic_hash}'
def minutes_to_time(minutes: int) -> str:
minute_string = str(minutes % 60)
minute_string = "0" * (2 - len(minute_string)) + minute_string
hour_string = str(minutes // 60)
hour_string = "0" * (2 - len(hour_string)) + hour_string
return f"{hour_string}:{minute_string}"
def readable_minutes(minutes: int) -> str:
time = float(minutes)
unit = " minute"
if time % 60 == 0:
time = minutes/60
unit = " hour"
if time != 1:
unit += "s"
if time.is_integer():
time_str = "{:0.0f}".format(time)
else:
time_str = "{0:.2f}".format(time)
return time_str + unit
def hour_format(hour: float) -> str:
# Convert float hour to HH:MM format
hours = f"{int(hour):02}"
minutes = f"{int(hour % 1 * 60):02}"
return f"{hours}:{minutes}"
def percentage(absolute: float) -> float:
return absolute * 100
def non_zero_percentage(percentage: int) -> str:
if percentage < 0.01:
return "<0.01%"
elif percentage < 1:
return "{:0.2f}%".format(percentage)
elif percentage > 99.9 or np.isnan(percentage):
return ">99.9%"
else:
return "{:0.1f}%".format(percentage)
def manufacture_viral_load_scenarios_percentiles(model: mc.ExposureModel) -> typing.Dict[str, mc.ExposureModel]:
def calculate_vl_scenarios_percentiles(model: mc.ExposureModel) -> typing.Dict[str, mc.ExposureModel]:
viral_load = model.concentration_model.infected.virus.viral_load_in_sputum
scenarios = {}
for percentil in (0.01, 0.05, 0.25, 0.5, 0.75, 0.95, 0.99):
vl = np.quantile(viral_load, percentil)
specific_vl_scenario = dataclass_utils.nested_replace(model,
{'concentration_model.infected.virus.viral_load_in_sputum': vl}
)
scenarios[str(vl)] = np.mean(specific_vl_scenario.infection_probability())
return scenarios
{'concentration_model.infected.virus.viral_load_in_sputum': vl}
)
scenarios[str(vl)] = np.mean(
specific_vl_scenario.infection_probability())
return {
'alternative_viral_load': scenarios,
}
def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, mc.ExposureModel]:
scenarios = {}
if (form.short_range_option == "short_range_no"):
# Two special option cases - HEPA and/or FFP2 masks.
FFP2_being_worn = bool(form.mask_wearing_option == 'mask_on' and form.mask_type == 'FFP2')
FFP2_being_worn = bool(form.mask_wearing_option ==
'mask_on' and form.mask_type == 'FFP2')
if FFP2_being_worn and form.hepa_option:
FFP2andHEPAalternative = dataclass_utils.replace(form, mask_type='Type I')
FFP2andHEPAalternative = dataclass_utils.replace(
form, mask_type='Type I')
if not (form.hepa_option and form.mask_wearing_option == 'mask_on' and form.mask_type == 'Type I'):
scenarios['Base scenario with HEPA filter and Type I masks'] = FFP2andHEPAalternative.build_mc_model()
if not FFP2_being_worn and form.hepa_option:
noHEPAalternative = dataclass_utils.replace(form, mask_type = 'FFP2')
noHEPAalternative = dataclass_utils.replace(noHEPAalternative, mask_wearing_option = 'mask_on')
noHEPAalternative = dataclass_utils.replace(noHEPAalternative, hepa_option=False)
noHEPAalternative = dataclass_utils.replace(form, mask_type='FFP2')
noHEPAalternative = dataclass_utils.replace(
noHEPAalternative, mask_wearing_option='mask_on')
noHEPAalternative = dataclass_utils.replace(
noHEPAalternative, hepa_option=False)
if not (not form.hepa_option and FFP2_being_worn):
scenarios['Base scenario without HEPA filter, with FFP2 masks'] = noHEPAalternative.build_mc_model()
@ -422,22 +371,27 @@ def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, m
if form.hepa_option:
form = dataclass_utils.replace(form, hepa_option=False)
with_mask = dataclass_utils.replace(form, mask_wearing_option='mask_on')
without_mask = dataclass_utils.replace(form, mask_wearing_option='mask_off')
with_mask = dataclass_utils.replace(
form, mask_wearing_option='mask_on')
without_mask = dataclass_utils.replace(
form, mask_wearing_option='mask_off')
if form.ventilation_type == 'mechanical_ventilation':
#scenarios['Mechanical ventilation with Type I masks'] = with_mask.build_mc_model()
# scenarios['Mechanical ventilation with Type I masks'] = with_mask.build_mc_model()
if not (form.mask_wearing_option == 'mask_off'):
scenarios['Mechanical ventilation without masks'] = without_mask.build_mc_model()
scenarios['Mechanical ventilation without masks'] = without_mask.build_mc_model(
)
elif form.ventilation_type == 'natural_ventilation':
#scenarios['Windows open with Type I masks'] = with_mask.build_mc_model()
# scenarios['Windows open with Type I masks'] = with_mask.build_mc_model()
if not (form.mask_wearing_option == 'mask_off'):
scenarios['Windows open without masks'] = without_mask.build_mc_model()
# No matter the ventilation scheme, we include scenarios which don't have any ventilation.
with_mask_no_vent = dataclass_utils.replace(with_mask, ventilation_type='no_ventilation')
without_mask_or_vent = dataclass_utils.replace(without_mask, ventilation_type='no_ventilation')
with_mask_no_vent = dataclass_utils.replace(
with_mask, ventilation_type='no_ventilation')
without_mask_or_vent = dataclass_utils.replace(
without_mask, ventilation_type='no_ventilation')
if not (form.mask_wearing_option == 'mask_on' and form.mask_type == 'Type I' and form.ventilation_type == 'no_ventilation'):
scenarios['No ventilation with Type I masks'] = with_mask_no_vent.build_mc_model()
@ -445,7 +399,8 @@ def manufacture_alternative_scenarios(form: VirusFormData) -> typing.Dict[str, m
scenarios['Neither ventilation nor masks'] = without_mask_or_vent.build_mc_model()
else:
no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[], total_people=form.total_people - form.short_range_occupants)
no_short_range_alternative = dataclass_utils.replace(form, short_range_interactions=[],
total_people=form.total_people - form.short_range_occupants)
scenarios['Base scenario without short-range interactions'] = no_short_range_alternative.build_mc_model()
return scenarios
@ -456,7 +411,8 @@ def scenario_statistics(
sample_times: typing.List[float],
compute_prob_exposure: bool
):
model = mc_model.build_model(size=mc_model.data_registry.monte_carlo['sample_size'])
model = mc_model.build_model(
size=mc_model.data_registry.monte_carlo['sample_size'])
if (compute_prob_exposure):
# It means we have data to calculate the total_probability_rule
prob_probabilistic_exposure = model.total_probability_rule()
@ -478,12 +434,11 @@ def comparison_report(
form: VirusFormData,
report_data: typing.Dict[str, typing.Any],
scenarios: typing.Dict[str, mc.ExposureModel],
sample_times: typing.List[float],
executor_factory: typing.Callable[[], concurrent.futures.Executor],
):
if (form.short_range_option == "short_range_no"):
statistics = {
'Current scenario' : {
'Current scenario': {
'probability_of_infection': report_data['prob_inf'],
'expected_new_cases': report_data['expected_new_cases'],
'concentrations': report_data['concentrations'],
@ -501,7 +456,7 @@ def comparison_report(
results = executor.map(
scenario_statistics,
scenarios.values(),
[sample_times] * len(scenarios),
[report_data['times']] * len(scenarios),
[compute_prob_exposure] * len(scenarios),
timeout=60,
)
@ -514,73 +469,8 @@ def comparison_report(
}
@dataclasses.dataclass
class ReportGenerator:
jinja_loader: jinja2.BaseLoader
get_root_url: typing.Any
get_root_calculator_url: typing.Any
def build_report(
self,
base_url: str,
form: VirusFormData,
executor_factory: typing.Callable[[], concurrent.futures.Executor],
) -> str:
model = form.build_model()
context = self.prepare_context(base_url, model, form, executor_factory=executor_factory)
return self.render(context)
def prepare_context(
self,
base_url: str,
model: models.ExposureModel,
form: VirusFormData,
executor_factory: typing.Callable[[], concurrent.futures.Executor],
) -> dict:
now = datetime.utcnow().astimezone()
time = now.strftime("%Y-%m-%d %H:%M:%S UTC")
data_registry_version = f"v{model.data_registry.version}" if model.data_registry.version else None
context = {
'model': model,
'form': form,
'creation_date': time,
'data_registry_version': data_registry_version,
}
scenario_sample_times = interesting_times(model)
report_data = calculate_report_data(form, model, executor_factory=executor_factory)
context.update(report_data)
alternative_scenarios = manufacture_alternative_scenarios(form)
context['alternative_viral_load'] = manufacture_viral_load_scenarios_percentiles(model) if form.conditional_probability_viral_loads else None
context['alternative_scenarios'] = comparison_report(
form, report_data, alternative_scenarios, scenario_sample_times, executor_factory=executor_factory,
)
context['permalink'] = generate_permalink(base_url, self.get_root_url, self.get_root_calculator_url, form)
context['get_url'] = self.get_root_url
context['get_calculator_url'] = self.get_root_calculator_url
return context
def _template_environment(self) -> jinja2.Environment:
env = jinja2.Environment(
loader=self.jinja_loader,
undefined=jinja2.StrictUndefined,
)
env.globals["common_text"] = markdown_tools.extract_rendered_markdown_blocks(
env.get_template('common_text.md.j2')
)
env.filters['non_zero_percentage'] = non_zero_percentage
env.filters['readable_minutes'] = readable_minutes
env.filters['minutes_to_time'] = minutes_to_time
env.filters['hour_format'] = hour_format
env.filters['float_format'] = "{0:.2f}".format
env.filters['int_format'] = "{:0.0f}".format
env.filters['percentage'] = percentage
env.filters['JSONify'] = json.dumps
return env
def render(self, context: dict) -> str:
template = self._template_environment().get_template("calculator.report.html.j2")
return template.render(**context, text_blocks=template.globals["common_text"])
def alternative_scenarios_data(form: VirusFormData, report_data: typing.Dict[str, typing.Any], executor_factory: typing.Callable[[], concurrent.futures.Executor]) -> typing.Dict[str, typing.Any]:
alternative_scenarios: typing.Dict[str, typing.Any] = manufacture_alternative_scenarios(form=form)
return {
'alternative_scenarios': comparison_report(form=form, report_data=report_data, scenarios=alternative_scenarios, executor_factory=executor_factory)
}

View file

@ -1,4 +1,4 @@
from caimira.enums import ViralLoads
from ..models.enums import ViralLoads
class DataRegistry:

View file

@ -2,7 +2,7 @@ import logging
import typing
import requests
from caimira.store.data_registry import DataRegistry
from ..store.data_registry import DataRegistry
logger = logging.getLogger("DATA")

View file

@ -7,11 +7,11 @@ from scipy.signal import find_peaks
import pandas as pd
import re
from caimira import models
from caimira.store.data_registry import DataRegistry
from .form_data import FormData, cast_class_fields
from .defaults import NO_DEFAULT
from .report_generator import img2base64, _figure2bytes
from ..form_validator import FormData, cast_class_fields
from ..defaults import NO_DEFAULT
from ...store.data_registry import DataRegistry
from ...models import models
from ...report.virus_report_data import img2base64, _figure2bytes
minutes_since_midnight = typing.NewType('minutes_since_midnight', int)

View file

@ -7,9 +7,9 @@ import json
import numpy as np
from caimira import models
from caimira.store.data_registry import DataRegistry
from .defaults import DEFAULTS, NO_DEFAULT, COFFEE_OPTIONS_INT
from ..models import models
from ..store.data_registry import DataRegistry
LOG = logging.getLogger(__name__)
@ -26,13 +26,16 @@ class FormData:
exposed_lunch_option: bool
exposed_lunch_start: minutes_since_midnight
exposed_start: minutes_since_midnight
infected_coffee_break_option: str #Used if infected_dont_have_breaks_with_exposed
infected_coffee_duration: int #Used if infected_dont_have_breaks_with_exposed
# Used if infected_dont_have_breaks_with_exposed
infected_coffee_break_option: str
infected_coffee_duration: int # Used if infected_dont_have_breaks_with_exposed
infected_dont_have_breaks_with_exposed: bool
infected_finish: minutes_since_midnight
infected_lunch_finish: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
infected_lunch_option: bool #Used if infected_dont_have_breaks_with_exposed
infected_lunch_start: minutes_since_midnight #Used if infected_dont_have_breaks_with_exposed
# Used if infected_dont_have_breaks_with_exposed
infected_lunch_finish: minutes_since_midnight
infected_lunch_option: bool # Used if infected_dont_have_breaks_with_exposed
# Used if infected_dont_have_breaks_with_exposed
infected_lunch_start: minutes_since_midnight
infected_people: int
infected_start: minutes_since_midnight
room_volume: float
@ -47,7 +50,6 @@ class FormData:
# Take a copy of the form data so that we can mutate it.
form_data = form_data.copy()
form_data.pop('_xsrf', None)
# Don't let arbitrary unescaped HTML through the net.
for key, value in form_data.items():
if isinstance(value, str):
@ -64,7 +66,8 @@ class FormData:
form_data[key] = _CAST_RULES_FORM_ARG_TO_NATIVE[key](value)
if key not in cls._DEFAULTS:
raise ValueError(f'Invalid argument "{html.escape(key)}" given')
raise ValueError(
f'Invalid argument "{html.escape(key)}" given')
instance = cls(**form_data, data_registry=data_registry)
instance.validate()
@ -93,7 +96,8 @@ class FormData:
def validate_population_parameters(self):
# Validate number of infected <= number of total people
if self.infected_people >= self.total_people:
raise ValueError('Number of infected people cannot be greater or equal to the number of total people.')
raise ValueError(
'Number of infected people cannot be greater or equal to the number of total people.')
# Validate time intervals selected by user
time_intervals = [
@ -101,9 +105,11 @@ class FormData:
['infected_start', 'infected_finish'],
]
if self.exposed_lunch_option:
time_intervals.append(['exposed_lunch_start', 'exposed_lunch_finish'])
time_intervals.append(
['exposed_lunch_start', 'exposed_lunch_finish'])
if self.infected_dont_have_breaks_with_exposed and self.infected_lunch_option:
time_intervals.append(['infected_lunch_start', 'infected_lunch_finish'])
time_intervals.append(
['infected_lunch_start', 'infected_lunch_finish'])
for start_name, end_name in time_intervals:
start = getattr(self, start_name)
@ -116,29 +122,33 @@ class FormData:
lunch_start = getattr(self, f'{population}_lunch_start')
lunch_finish = getattr(self, f'{population}_lunch_finish')
return (start <= lunch_start <= finish and
start <= lunch_finish <= finish)
start <= lunch_finish <= finish)
def get_lunch_mins(population):
lunch_mins = 0
if getattr(self, f'{population}_lunch_option'):
lunch_mins = getattr(self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start')
lunch_mins = getattr(
self, f'{population}_lunch_finish') - getattr(self, f'{population}_lunch_start')
return lunch_mins
def get_coffee_mins(population):
coffee_mins = 0
if getattr(self, f'{population}_coffee_break_option') != 'coffee_break_0':
coffee_mins = COFFEE_OPTIONS_INT[getattr(self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration')
coffee_mins = COFFEE_OPTIONS_INT[getattr(
self, f'{population}_coffee_break_option')] * getattr(self, f'{population}_coffee_duration')
return coffee_mins
def get_activity_mins(population):
return getattr(self, f'{population}_finish') - getattr(self, f'{population}_start')
populations = ['exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed']
populations = [
'exposed', 'infected'] if self.infected_dont_have_breaks_with_exposed else ['exposed']
for population in populations:
# Validate lunch time within the activity times.
if (getattr(self, f'{population}_lunch_option') and
not validate_lunch(getattr(self, f'{population}_start'), getattr(self, f'{population}_finish'))
):
not validate_lunch(getattr(self, f'{population}_start'), getattr(
self, f'{population}_finish'))
):
raise ValueError(
f"{population} lunch break must be within presence times."
)
@ -152,7 +162,8 @@ class FormData:
for attr_name, valid_set in [('exposed_coffee_break_option', COFFEE_OPTIONS_INT),
('infected_coffee_break_option', COFFEE_OPTIONS_INT)]:
if getattr(self, attr_name) not in valid_set:
raise ValueError(f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
raise ValueError(
f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
def validate(self):
raise NotImplementedError("Subclass must implement")
@ -161,7 +172,8 @@ class FormData:
raise NotImplementedError("Subclass must implement")
def _compute_breaks_in_interval(self, start, finish, n_breaks, duration) -> models.BoundarySequence_t:
break_delay = ((finish - start) - (n_breaks * duration)) // (n_breaks+1)
break_delay = ((finish - start) -
(n_breaks * duration)) // (n_breaks+1)
break_times = []
end = start
for n in range(n_breaks):
@ -173,14 +185,16 @@ class FormData:
def exposed_lunch_break_times(self) -> models.BoundarySequence_t:
result = []
if self.exposed_lunch_option:
result.append((self.exposed_lunch_start, self.exposed_lunch_finish))
result.append((self.exposed_lunch_start,
self.exposed_lunch_finish))
return tuple(result)
def infected_lunch_break_times(self) -> models.BoundarySequence_t:
if self.infected_dont_have_breaks_with_exposed:
result = []
if self.infected_lunch_option:
result.append((self.infected_lunch_start, self.infected_lunch_finish))
result.append((self.infected_lunch_start,
self.infected_lunch_finish))
return tuple(result)
else:
return self.exposed_lunch_break_times()
@ -194,7 +208,8 @@ class FormData:
def _coffee_break_times(self, activity_start, activity_finish, coffee_breaks, coffee_duration, lunch_start, lunch_finish) -> models.BoundarySequence_t:
time_before_lunch = lunch_start - activity_start
time_after_lunch = activity_finish - lunch_finish
before_lunch_frac = time_before_lunch / (time_before_lunch + time_after_lunch)
before_lunch_frac = time_before_lunch / \
(time_before_lunch + time_after_lunch)
n_morning_breaks = round(coffee_breaks * before_lunch_frac)
breaks = (
self._compute_breaks_in_interval(
@ -211,9 +226,11 @@ class FormData:
if exposed_coffee_breaks == 0:
return ()
if self.exposed_lunch_option:
breaks = self._coffee_break_times(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration, self.exposed_lunch_start, self.exposed_lunch_finish)
breaks = self._coffee_break_times(self.exposed_start, self.exposed_finish, exposed_coffee_breaks,
self.exposed_coffee_duration, self.exposed_lunch_start, self.exposed_lunch_finish)
else:
breaks = self._compute_breaks_in_interval(self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration)
breaks = self._compute_breaks_in_interval(
self.exposed_start, self.exposed_finish, exposed_coffee_breaks, self.exposed_coffee_duration)
return breaks
def infected_coffee_break_times(self) -> models.BoundarySequence_t:
@ -222,9 +239,11 @@ class FormData:
if infected_coffee_breaks == 0:
return ()
if self.infected_lunch_option:
breaks = self._coffee_break_times(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration, self.infected_lunch_start, self.infected_lunch_finish)
breaks = self._coffee_break_times(self.infected_start, self.infected_finish, infected_coffee_breaks,
self.infected_coffee_duration, self.infected_lunch_start, self.infected_lunch_finish)
else:
breaks = self._compute_breaks_in_interval(self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration)
breaks = self._compute_breaks_in_interval(
self.infected_start, self.infected_finish, infected_coffee_breaks, self.infected_coffee_duration)
return breaks
else:
return self.exposed_coffee_break_times()
@ -232,13 +251,14 @@ class FormData:
def generate_specific_break_times(self, breaks_dict: dict, target: str) -> models.BoundarySequence_t:
break_times = []
for n in breaks_dict[f'{target}_breaks']:
# Parse break times.
# Parse break times.
begin = time_string_to_minutes(n["start_time"])
end = time_string_to_minutes(n["finish_time"])
for time in [begin, end]:
# For a specific break, the infected and exposed presence is the same.
if not getattr(self, f'{target}_start') < time < getattr(self, f'{target}_finish'):
raise ValueError(f'All breaks should be within the simulation time. Got {time_minutes_to_string(time)}.')
raise ValueError(
f'All breaks should be within the simulation time. Got {time_minutes_to_string(time)}.')
break_times.append((begin, end))
return tuple(break_times)
@ -260,7 +280,8 @@ class FormData:
# Order the breaks by their start-time, and ensure that they are monotonic
# and that the start of one break happens after the end of another.
break_boundaries: models.BoundarySequence_t = tuple(sorted(breaks, key=lambda break_pair: break_pair[0]))
break_boundaries: models.BoundarySequence_t = tuple(
sorted(breaks, key=lambda break_pair: break_pair[0]))
for break_start, break_end in break_boundaries:
if break_start >= break_end:
@ -269,13 +290,15 @@ class FormData:
prev_break_end = break_boundaries[0][1]
for break_start, break_end in break_boundaries[1:]:
if prev_break_end >= break_start:
raise ValueError(f"A break starts before another ends ({break_start}, {break_end}, {prev_break_end}).")
raise ValueError(
f"A break starts before another ends ({break_start}, {break_end}, {prev_break_end}).")
prev_break_end = break_end
present_intervals = []
current_time = start
LOG.debug(f"starting time march at {_hours2timestring(current_time/60)} to {_hours2timestring(finish/60)}")
LOG.debug(
f"starting time march at {_hours2timestring(current_time/60)} to {_hours2timestring(finish/60)}")
# As we step through the breaks. For each break there are 6 important cases
# we must cover. Let S=start; E=end; Bs=Break start; Be=Break end:
@ -336,8 +359,9 @@ class FormData:
return models.SpecificInterval(tuple(present_intervals))
def infected_present_interval(self) -> models.Interval:
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(breaks_dict=self.specific_breaks, target='exposed')
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(
breaks_dict=self.specific_breaks, target='exposed')
else:
breaks = self.infected_lunch_break_times() + self.infected_coffee_break_times()
return self.present_interval(
@ -346,14 +370,17 @@ class FormData:
)
def population_present_interval(self) -> models.Interval:
state_change_times = set(self.infected_present_interval().transition_times())
state_change_times.update(self.exposed_present_interval().transition_times())
state_change_times = set(
self.infected_present_interval().transition_times())
state_change_times.update(
self.exposed_present_interval().transition_times())
all_state_changes = sorted(state_change_times)
return models.SpecificInterval(tuple(zip(all_state_changes[:-1], all_state_changes[1:])))
def exposed_present_interval(self) -> models.Interval:
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(breaks_dict=self.specific_breaks, target='exposed')
if self.specific_breaks != {}: # It means the breaks are specific and not predefined
breaks = self.generate_specific_break_times(
breaks_dict=self.specific_breaks, target='exposed')
else:
breaks = self.exposed_lunch_break_times() + self.exposed_coffee_break_times()
return self.present_interval(
@ -382,7 +409,7 @@ def time_minutes_to_string(time: int) -> str:
:param time: The number of minutes between 'time' and 00:00
:return: A string of the form "HH:MM" representing a time of day
"""
return "{0:0=2d}".format(int(time/60)) + ":" + "{0:0=2d}".format(time%60)
return "{0:0=2d}".format(int(time/60)) + ":" + "{0:0=2d}".format(time % 60)
def string_to_list(s: str) -> list:
@ -426,6 +453,7 @@ _CAST_RULES_FORM_ARG_TO_NATIVE: typing.Dict[str, typing.Callable] = {}
#: that can be encoded to URL arguments.
_CAST_RULES_NATIVE_TO_FORM_ARG: typing.Dict[str, typing.Callable] = {}
def cast_class_fields(cls):
for _field in dataclasses.fields(cls):
if _field.type is minutes_since_midnight:
@ -447,4 +475,5 @@ def cast_class_fields(cls):
_CAST_RULES_FORM_ARG_TO_NATIVE[_field.name] = string_to_dict
_CAST_RULES_NATIVE_TO_FORM_ARG[_field.name] = dict_to_string
cast_class_fields(FormData)

View file

@ -6,17 +6,14 @@ import re
import numpy as np
from caimira import models
from caimira import data
import caimira.data.weather
import caimira.monte_carlo as mc
from .. import calculator
from .form_data import FormData, cast_class_fields, time_string_to_minutes
from caimira.monte_carlo.data import activity_distributions, virus_distributions, mask_distributions, short_range_distances
from caimira.monte_carlo.data import expiration_distribution, expiration_BLO_factors, expiration_distributions, short_range_expiration_distributions
from .defaults import (DEFAULTS, CONFIDENCE_LEVEL_OPTIONS,
MECHANICAL_VENTILATION_TYPES, MASK_WEARING_OPTIONS, MONTH_NAMES, VACCINE_BOOSTER_TYPE, VACCINE_TYPE,
VENTILATION_TYPES, VOLUME_TYPES, WINDOWS_OPENING_REGIMES, WINDOWS_TYPES)
from caimira import __version__ as calculator_version
from ..form_validator import FormData, cast_class_fields, time_string_to_minutes
from ..defaults import (DEFAULTS, CONFIDENCE_LEVEL_OPTIONS,
MECHANICAL_VENTILATION_TYPES, MASK_WEARING_OPTIONS, MONTH_NAMES, VACCINE_BOOSTER_TYPE, VACCINE_TYPE,
VENTILATION_TYPES, VOLUME_TYPES, WINDOWS_OPENING_REGIMES, WINDOWS_TYPES)
from ...models import models, data, monte_carlo as mc
from ...models.monte_carlo.data import activity_distributions, virus_distributions, mask_distributions, short_range_distances
from ...models.monte_carlo.data import expiration_distribution, expiration_BLO_factors, expiration_distributions, short_range_expiration_distributions
LOG = logging.getLogger("MODEL")
@ -76,15 +73,17 @@ class VirusFormData(FormData):
_DEFAULTS: typing.ClassVar[typing.Dict[str, typing.Any]] = DEFAULTS
def validate(self):
# Validate population parameters
self.validate_population_parameters()
validation_tuples = [('activity_type', self.data_registry.population_scenario_activity.keys()),
('mechanical_ventilation_type', MECHANICAL_VENTILATION_TYPES),
('mask_type', list(mask_distributions(self.data_registry).keys())),
('mechanical_ventilation_type',
MECHANICAL_VENTILATION_TYPES),
('mask_type', list(mask_distributions(
self.data_registry).keys())),
('mask_wearing_option', MASK_WEARING_OPTIONS),
('ventilation_type', VENTILATION_TYPES),
('virus_type', list(virus_distributions(self.data_registry).keys())),
('virus_type', list(virus_distributions(
self.data_registry).keys())),
('volume_type', VOLUME_TYPES),
('window_opening_regime', WINDOWS_OPENING_REGIMES),
('window_type', WINDOWS_TYPES),
@ -95,11 +94,13 @@ class VirusFormData(FormData):
for attr_name, valid_set in validation_tuples:
if getattr(self, attr_name) not in valid_set:
raise ValueError(f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
raise ValueError(
f"{getattr(self, attr_name)} is not a valid value for {attr_name}")
# Validate number of infected people == 1 when activity is Conference/Training.
if self.activity_type == 'training' and self.infected_people > 1:
raise ValueError('Conference/Training activities are limited to 1 infected.')
raise ValueError(
'Conference/Training activities are limited to 1 infected.')
# Validate ventilation parameters
if self.ventilation_type == 'natural_ventilation':
@ -114,7 +115,7 @@ class VirusFormData(FormData):
"ventilation_type is 'natural_ventilation'"
)
if (self.window_opening_regime == 'windows_open_periodically' and
self.windows_duration > self.windows_frequency):
self.windows_duration > self.windows_frequency):
raise ValueError(
'Duration cannot be bigger than frequency.'
)
@ -127,62 +128,79 @@ class VirusFormData(FormData):
# Validate specific inputs - breaks (exposed and infected)
if self.specific_breaks != {}:
if type(self.specific_breaks) is not dict:
raise TypeError('The specific breaks should be in a dictionary.')
raise TypeError(
'The specific breaks should be in a dictionary.')
dict_keys = list(self.specific_breaks.keys())
if "exposed_breaks" not in dict_keys:
raise TypeError(f'Unable to fetch "exposed_breaks" key. Got "{dict_keys[0]}".')
raise TypeError(
f'Unable to fetch "exposed_breaks" key. Got "{dict_keys[0]}".')
if "infected_breaks" not in dict_keys:
raise TypeError(f'Unable to fetch "infected_breaks" key. Got "{dict_keys[1]}".')
raise TypeError(
f'Unable to fetch "infected_breaks" key. Got "{dict_keys[1]}".')
for population_breaks in ['exposed_breaks', 'infected_breaks']:
if self.specific_breaks[population_breaks] != []:
if type(self.specific_breaks[population_breaks]) is not list:
raise TypeError(f'All breaks should be in a list. Got {type(self.specific_breaks[population_breaks])}.')
raise TypeError(
f'All breaks should be in a list. Got {type(self.specific_breaks[population_breaks])}.')
for input_break in self.specific_breaks[population_breaks]:
# Input validations.
if type(input_break) is not dict:
raise TypeError(f'Each break should be a dictionary. Got {type(input_break)}.')
raise TypeError(
f'Each break should be a dictionary. Got {type(input_break)}.')
dict_keys = list(input_break.keys())
if "start_time" not in input_break:
raise TypeError(f'Unable to fetch "start_time" key. Got "{dict_keys[0]}".')
raise TypeError(
f'Unable to fetch "start_time" key. Got "{dict_keys[0]}".')
if "finish_time" not in input_break:
raise TypeError(f'Unable to fetch "finish_time" key. Got "{dict_keys[1]}".')
raise TypeError(
f'Unable to fetch "finish_time" key. Got "{dict_keys[1]}".')
for time in input_break.values():
if not re.compile("^(2[0-3]|[01]?[0-9]):([0-5]?[0-9])$").match(time):
raise TypeError(f'Wrong time format - "HH:MM". Got "{time}".')
raise TypeError(
f'Wrong time format - "HH:MM". Got "{time}".')
# Validate specific inputs - precise activity
if self.precise_activity != {}:
if type(self.precise_activity) is not dict:
raise TypeError('The precise activities should be in a dictionary.')
raise TypeError(
'The precise activities should be in a dictionary.')
dict_keys = list(self.precise_activity.keys())
if "physical_activity" not in dict_keys:
raise TypeError(f'Unable to fetch "physical_activity" key. Got "{dict_keys[0]}".')
raise TypeError(
f'Unable to fetch "physical_activity" key. Got "{dict_keys[0]}".')
if "respiratory_activity" not in dict_keys:
raise TypeError(f'Unable to fetch "respiratory_activity" key. Got "{dict_keys[1]}".')
raise TypeError(
f'Unable to fetch "respiratory_activity" key. Got "{dict_keys[1]}".')
if type(self.precise_activity['physical_activity']) is not str:
raise TypeError('The physical activities should be a single string.')
raise TypeError(
'The physical activities should be a single string.')
if type(self.precise_activity['respiratory_activity']) is not list:
raise TypeError('The respiratory activities should be in a list.')
raise TypeError(
'The respiratory activities should be in a list.')
total_percentage = 0
for respiratory_activity in self.precise_activity['respiratory_activity']:
if type(respiratory_activity) is not dict:
raise TypeError('Each respiratory activity should be defined in a dictionary.')
raise TypeError(
'Each respiratory activity should be defined in a dictionary.')
dict_keys = list(respiratory_activity.keys())
if "type" not in dict_keys:
raise TypeError(f'Unable to fetch "type" key. Got "{dict_keys[0]}".')
raise TypeError(
f'Unable to fetch "type" key. Got "{dict_keys[0]}".')
if "percentage" not in dict_keys:
raise TypeError(f'Unable to fetch "percentage" key. Got "{dict_keys[1]}".')
raise TypeError(
f'Unable to fetch "percentage" key. Got "{dict_keys[1]}".')
total_percentage += respiratory_activity['percentage']
if total_percentage != 100:
raise ValueError(f'The sum of all respiratory activities should be 100. Got {total_percentage}.')
raise ValueError(
f'The sum of all respiratory activities should be 100. Got {total_percentage}.')
# Validate number of people with short-range interactions
max_occupants_for_sr = self.total_people - self.infected_people
if self.short_range_occupants > max_occupants_for_sr:
@ -218,7 +236,8 @@ class VirusFormData(FormData):
for interaction in self.short_range_interactions:
short_range.append(mc.ShortRangeModel(
data_registry=self.data_registry,
expiration=short_range_expiration_distributions(self.data_registry)[interaction['expiration']],
expiration=short_range_expiration_distributions(
self.data_registry)[interaction['expiration']],
activity=infected_population.activity,
presence=self.short_range_interval(interaction),
distance=short_range_distances(self.data_registry),
@ -233,7 +252,7 @@ class VirusFormData(FormData):
infected=infected_population,
evaporation_factor=0.3,
),
short_range = tuple(short_range),
short_range=tuple(short_range),
exposed=self.exposed_population(),
geographical_data=mc.Cases(
geographic_population=self.geographic_population,
@ -249,11 +268,14 @@ class VirusFormData(FormData):
def build_CO2_model(self, sample_size=None) -> models.CO2ConcentrationModel:
sample_size = sample_size or self.data_registry.monte_carlo['sample_size']
infected_population: models.InfectedPopulation = self.infected_population().build_model(sample_size)
infected_population: models.InfectedPopulation = self.infected_population(
).build_model(sample_size)
exposed_population: models.Population = self.exposed_population().build_model(sample_size)
state_change_times = set(infected_population.presence_interval().transition_times())
state_change_times.update(exposed_population.presence_interval().transition_times())
state_change_times = set(
infected_population.presence_interval().transition_times())
state_change_times.update(
exposed_population.presence_interval().transition_times())
transition_times = sorted(state_change_times)
total_people = [infected_population.people_present(stop) + exposed_population.people_present(stop)
@ -262,10 +284,12 @@ class VirusFormData(FormData):
if (self.activity_type == 'precise'):
activity_defn, _ = self.generate_precise_activity_expiration()
else:
activity_defn = self.data_registry.population_scenario_activity[self.activity_type]['activity']
activity_defn = self.data_registry.population_scenario_activity[
self.activity_type]['activity']
population = mc.SimplePopulation(
number=models.IntPiecewiseConstant(transition_times=tuple(transition_times), values=tuple(total_people)),
number=models.IntPiecewiseConstant(transition_times=tuple(
transition_times), values=tuple(total_people)),
presence=None,
activity=activity_distributions(self.data_registry)[activity_defn],
)
@ -285,7 +309,7 @@ class VirusFormData(FormData):
"""
month = MONTH_NAMES.index(self.event_month) + 1
timezone = caimira.data.weather.timezone_at(
timezone = data.weather.timezone_at(
latitude=self.location_latitude, longitude=self.location_longitude,
)
# We choose the first of the month for the current year.
@ -306,7 +330,8 @@ class VirusFormData(FormData):
month = MONTH_NAMES.index(self.event_month) + 1
wx_station = self.nearest_weather_station()
temp_profile = caimira.data.weather.mean_hourly_temperatures(wx_station = wx_station[0], month = MONTH_NAMES.index(self.event_month) + 1)
temp_profile = data.weather.mean_hourly_temperatures(
wx_station=wx_station[0], month=MONTH_NAMES.index(self.event_month) + 1)
_, utc_offset = self.tz_name_and_utc_offset()
@ -314,13 +339,14 @@ class VirusFormData(FormData):
# result the first data value may no longer be a midnight, and the hours
# no longer ordered modulo 24).
source_times = np.arange(24) + utc_offset
times, temp_profile = caimira.data.weather.refine_hourly_data(
times, temp_profile = data.weather.refine_hourly_data(
source_times,
temp_profile,
npts=24*10, # 10 steps per hour => 6 min steps
)
outside_temp = models.PiecewiseConstant(
tuple(float(t) for t in times), tuple(float(t) for t in temp_profile),
tuple(float(t) for t in times), tuple(float(t)
for t in temp_profile),
)
return outside_temp
@ -333,7 +359,7 @@ class VirusFormData(FormData):
transition_times = self.CO2_fitting_result['transition_times']
for index, (start, stop) in enumerate(zip(transition_times[:-1], transition_times[1:])):
ventilations.append(models.AirChange(active=models.SpecificInterval(present_times=((start, stop), )),
air_exch=self.CO2_fitting_result['ventilation_values'][index]))
air_exch=self.CO2_fitting_result['ventilation_values'][index]))
return models.MultipleVentilation(tuple(ventilations))
# Initializes a ventilation instance as a window if 'natural_ventilation' is selected, or as a HEPA-filter otherwise
@ -369,7 +395,8 @@ class VirusFormData(FormData):
ventilation = models.AirChange(active=always_on, air_exch=0.)
else:
if self.mechanical_ventilation_type == 'mech_type_air_changes':
ventilation = models.AirChange(active=always_on, air_exch=self.air_changes)
ventilation = models.AirChange(
active=always_on, air_exch=self.air_changes)
else:
ventilation = models.HVACMechanical(
active=always_on, q_air_mech=self.air_supply)
@ -378,16 +405,18 @@ class VirusFormData(FormData):
# to the air infiltration from the outside.
# See CERN-OPEN-2021-004, p. 12.
residual_vent: float = self.data_registry.ventilation['infiltration_ventilation'] # type: ignore
infiltration_ventilation = models.AirChange(active=always_on, air_exch=residual_vent)
infiltration_ventilation = models.AirChange(
active=always_on, air_exch=residual_vent)
if self.hepa_option:
hepa = models.HEPAFilter(active=always_on, q_air_mech=self.hepa_amount)
hepa = models.HEPAFilter(
active=always_on, q_air_mech=self.hepa_amount)
return models.MultipleVentilation((ventilation, hepa, infiltration_ventilation))
else:
return models.MultipleVentilation((ventilation, infiltration_ventilation))
def nearest_weather_station(self) -> caimira.data.weather.WxStationRecordType:
def nearest_weather_station(self) -> data.weather.WxStationRecordType:
"""Return the nearest weather station (which has valid data) for this form"""
return caimira.data.weather.nearest_wx_station(
return data.weather.nearest_wx_station(
longitude=self.location_longitude, latitude=self.location_latitude
)
@ -401,11 +430,13 @@ class VirusFormData(FormData):
return mask
def generate_precise_activity_expiration(self) -> typing.Tuple[typing.Any, ...]:
if self.precise_activity == {}: # It means the precise activity is not defined by a specific input.
# It means the precise activity is not defined by a specific input.
if self.precise_activity == {}:
return ()
respiratory_dict = {}
for respiratory_activity in self.precise_activity['respiratory_activity']:
respiratory_dict[respiratory_activity['type']] = respiratory_activity['percentage']
respiratory_dict[respiratory_activity['type']
] = respiratory_activity['percentage']
return (self.precise_activity['physical_activity'], respiratory_dict)
@ -413,11 +444,14 @@ class VirusFormData(FormData):
# Initializes the virus
virus = virus_distributions(self.data_registry)[self.virus_type]
activity_defn = self.data_registry.population_scenario_activity[self.activity_type]['activity']
expiration_defn = self.data_registry.population_scenario_activity[self.activity_type]['expiration']
activity_defn = self.data_registry.population_scenario_activity[
self.activity_type]['activity']
expiration_defn = self.data_registry.population_scenario_activity[
self.activity_type]['expiration']
if (self.activity_type == 'smallmeeting'):
# Conversation of N people is approximately 1/N% of the time speaking.
expiration_defn = {'Speaking': 1, 'Breathing': self.total_people - 1}
expiration_defn = {'Speaking': 1,
'Breathing': self.total_people - 1}
elif (self.activity_type == 'precise'):
activity_defn, expiration_defn = self.generate_precise_activity_expiration()
@ -434,7 +468,8 @@ class VirusFormData(FormData):
mask=self.mask(),
activity=activity,
expiration=expiration,
host_immunity=0., # Vaccination status does not affect the infected population (for now)
# Vaccination status does not affect the infected population (for now)
host_immunity=0.,
)
return infected
@ -452,8 +487,8 @@ class VirusFormData(FormData):
if (self.vaccine_option):
if (self.vaccine_booster_option and self.vaccine_booster_type != 'Other'):
host_immunity = [vaccine['VE'] for vaccine in data.vaccine_booster_host_immunity if
vaccine['primary series vaccine'] == self.vaccine_type and
vaccine['booster vaccine'] == self.vaccine_booster_type][0]
vaccine['primary series vaccine'] == self.vaccine_type and
vaccine['booster vaccine'] == self.vaccine_booster_type][0]
else:
host_immunity = data.vaccine_primary_host_immunity[self.vaccine_type]
else:
@ -480,9 +515,10 @@ def build_expiration(data_registry, expiration_definition) -> mc._ExpirationBase
elif isinstance(expiration_definition, dict):
total_weight = sum(expiration_definition.values())
BLO_factors = np.sum([
np.array(expiration_BLO_factors(data_registry)[exp_type]) * weight/total_weight
np.array(expiration_BLO_factors(data_registry)
[exp_type]) * weight/total_weight
for exp_type, weight in expiration_definition.items()
], axis=0)
], axis=0)
return expiration_distribution(data_registry=data_registry, BLO_factors=tuple(BLO_factors))
@ -524,7 +560,7 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]:
'mask_type': 'Type I',
'mask_wearing_option': 'mask_off',
'mechanical_ventilation_type': '',
'calculator_version': calculator.__version__,
'calculator_version': calculator_version,
'opening_distance': '0.2',
'event_month': 'January',
'room_heating_option': '0',
@ -550,4 +586,5 @@ def baseline_raw_form_data() -> typing.Dict[str, typing.Union[str, float]]:
'short_range_interactions': '[]',
}
cast_class_fields(VirusFormData)

View file

@ -1,22 +1,13 @@
import pytest
from caimira.apps.calculator import model_generator
from caimira.calculator.validators.virus import virus_validator
@pytest.fixture
def baseline_form_data():
return model_generator.baseline_raw_form_data()
return virus_validator.baseline_raw_form_data()
@pytest.fixture
def baseline_form(baseline_form_data, data_registry):
return model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
@pytest.fixture
def baseline_form_with_sr(baseline_form_data, data_registry):
form_data_sr = baseline_form_data
form_data_sr['short_range_option'] = 'short_range_yes'
form_data_sr['short_range_interactions'] = '[{"expiration": "Shouting", "start_time": "10:30", "duration": "30"}]'
form_data_sr['short_range_occupants'] = 5
return model_generator.VirusFormData.from_dict(form_data_sr, data_registry)
return virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)

View file

@ -6,24 +6,24 @@ import numpy.testing as npt
import pytest
from retry import retry
from caimira.apps.calculator import model_generator
from caimira.apps.calculator.form_data import (_hours2timestring, minutes_since_midnight,
from caimira.calculator.validators.virus import virus_validator
from caimira.calculator.validators.form_validator import (_hours2timestring, minutes_since_midnight,
_CAST_RULES_FORM_ARG_TO_NATIVE, _CAST_RULES_NATIVE_TO_FORM_ARG)
from caimira import models
from caimira.monte_carlo.data import expiration_distributions
from caimira.apps.calculator.defaults import NO_DEFAULT
from caimira.store.data_registry import DataRegistry
from caimira.calculator.models import models
from caimira.calculator.models.monte_carlo.data import expiration_distributions
from caimira.calculator.validators.defaults import NO_DEFAULT
from caimira.calculator.store.data_registry import DataRegistry
def test_model_from_dict(baseline_form_data, data_registry):
form = model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
form = virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)
assert isinstance(form.build_model(), models.ExposureModel)
def test_model_from_dict_invalid(baseline_form_data, data_registry):
baseline_form_data['invalid_item'] = 'foobar'
with pytest.raises(ValueError, match='Invalid argument "invalid_item" given'):
model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)
@retry(tries=10)
@ -39,14 +39,14 @@ def test_blend_expiration(data_registry, mask_type):
SAMPLE_SIZE = 250000
TOLERANCE = 0.02
blend = {'Breathing': 2, 'Speaking': 1}
r = model_generator.build_expiration(data_registry, blend).build_model(SAMPLE_SIZE)
r = virus_validator.build_expiration(data_registry, blend).build_model(SAMPLE_SIZE)
mask = models.Mask.types[mask_type]
expected = (expiration_distributions(data_registry)['Breathing'].build_model(SAMPLE_SIZE).aerosols(mask).mean()*2/3. +
expiration_distributions(data_registry)['Speaking'].build_model(SAMPLE_SIZE).aerosols(mask).mean()/3.)
npt.assert_allclose(r.aerosols(mask).mean(), expected, rtol=TOLERANCE)
def test_ventilation_slidingwindow(data_registry: DataRegistry, baseline_form: model_generator.VirusFormData):
def test_ventilation_slidingwindow(data_registry: DataRegistry, baseline_form: virus_validator.VirusFormData):
baseline_form.ventilation_type = 'natural_ventilation'
baseline_form.windows_duration = 10
baseline_form.windows_frequency = 120
@ -77,7 +77,7 @@ def test_ventilation_slidingwindow(data_registry: DataRegistry, baseline_form: m
assert ventilation == baseline_vent
def test_ventilation_hingedwindow(baseline_form: model_generator.VirusFormData):
def test_ventilation_hingedwindow(baseline_form: virus_validator.VirusFormData):
baseline_form.ventilation_type = 'natural_ventilation'
baseline_form.windows_duration = 10
baseline_form.windows_frequency = 120
@ -107,7 +107,7 @@ def test_ventilation_hingedwindow(baseline_form: model_generator.VirusFormData):
assert ventilation == baseline_vent
def test_ventilation_mechanical(baseline_form: model_generator.VirusFormData):
def test_ventilation_mechanical(baseline_form: virus_validator.VirusFormData):
room = models.Room(volume=75, inside_temp=models.PiecewiseConstant((0, 24), (293,)))
mech = models.HVACMechanical(
active=models.PeriodicInterval(period=120, duration=120),
@ -122,7 +122,7 @@ def test_ventilation_mechanical(baseline_form: model_generator.VirusFormData):
np.array([baseline_form.ventilation().air_exchange(room, t) for t in ts]))
def test_ventilation_airchanges(baseline_form: model_generator.VirusFormData):
def test_ventilation_airchanges(baseline_form: virus_validator.VirusFormData):
room = models.Room(75, inside_temp=models.PiecewiseConstant((0, 24), (293,)))
airchange = models.AirChange(
active=models.PeriodicInterval(period=120, duration=120),
@ -137,7 +137,7 @@ def test_ventilation_airchanges(baseline_form: model_generator.VirusFormData):
np.array([baseline_form.ventilation().air_exchange(room, t) for t in ts]))
def test_ventilation_window_hepa(data_registry: DataRegistry, baseline_form: model_generator.VirusFormData):
def test_ventilation_window_hepa(data_registry: DataRegistry, baseline_form: virus_validator.VirusFormData):
baseline_form.ventilation_type = 'natural_ventilation'
baseline_form.windows_duration = 10
baseline_form.windows_frequency = 120
@ -181,7 +181,7 @@ def test_ventilation_window_hepa(data_registry: DataRegistry, baseline_form: mod
]
)
def test_infected_less_than_total_people(activity, total_people, infected_people, error,
baseline_form: model_generator.VirusFormData,
baseline_form: virus_validator.VirusFormData,
data_registry: DataRegistry):
baseline_form.activity_type = activity
baseline_form.total_people = total_people
@ -195,7 +195,7 @@ def present_times(interval: models.Interval) -> models.BoundarySequence_t:
return interval.present_times
def test_infected_present_intervals(baseline_form: model_generator.VirusFormData):
def test_infected_present_intervals(baseline_form: virus_validator.VirusFormData):
baseline_form.infected_dont_have_breaks_with_exposed = False
baseline_form.exposed_coffee_duration = 15
baseline_form.exposed_coffee_break_option = 'coffee_break_2'
@ -209,7 +209,7 @@ def test_infected_present_intervals(baseline_form: model_generator.VirusFormData
assert present_times(baseline_form.infected_present_interval()) == correct
def test_exposed_present_intervals(baseline_form: model_generator.VirusFormData):
def test_exposed_present_intervals(baseline_form: virus_validator.VirusFormData):
baseline_form.exposed_coffee_duration = 15
baseline_form.exposed_coffee_break_option = 'coffee_break_2'
baseline_form.exposed_start = minutes_since_midnight(9 * 60)
@ -220,7 +220,7 @@ def test_exposed_present_intervals(baseline_form: model_generator.VirusFormData)
assert present_times(baseline_form.exposed_present_interval()) == correct
def test_present_intervals_common_breaks(baseline_form: model_generator.VirusFormData):
def test_present_intervals_common_breaks(baseline_form: virus_validator.VirusFormData):
baseline_form.infected_dont_have_breaks_with_exposed = False
baseline_form.infected_coffee_duration = baseline_form.exposed_coffee_duration = 15
baseline_form.infected_coffee_break_option = baseline_form.exposed_coffee_break_option = 'coffee_break_2'
@ -236,7 +236,7 @@ def test_present_intervals_common_breaks(baseline_form: model_generator.VirusFor
assert present_times(baseline_form.infected_present_interval()) == correct_infected
def test_present_intervals_split_breaks(baseline_form: model_generator.VirusFormData):
def test_present_intervals_split_breaks(baseline_form: virus_validator.VirusFormData):
baseline_form.infected_dont_have_breaks_with_exposed = True
baseline_form.infected_coffee_duration = baseline_form.exposed_coffee_duration = 15
baseline_form.infected_coffee_break_option = baseline_form.exposed_coffee_break_option = 'coffee_break_2'
@ -252,7 +252,7 @@ def test_present_intervals_split_breaks(baseline_form: model_generator.VirusForm
assert present_times(baseline_form.infected_present_interval()) == correct_infected
def test_exposed_present_intervals_starting_with_lunch(baseline_form: model_generator.VirusFormData):
def test_exposed_present_intervals_starting_with_lunch(baseline_form: virus_validator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_0'
baseline_form.exposed_start = baseline_form.exposed_lunch_start = minutes_since_midnight(13 * 60)
baseline_form.exposed_finish = minutes_since_midnight(18 * 60)
@ -261,7 +261,7 @@ def test_exposed_present_intervals_starting_with_lunch(baseline_form: model_gene
assert present_times(baseline_form.exposed_present_interval()) == correct
def test_exposed_present_intervals_ending_with_lunch(baseline_form: model_generator.VirusFormData):
def test_exposed_present_intervals_ending_with_lunch(baseline_form: virus_validator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_0'
baseline_form.exposed_start = minutes_since_midnight(11 * 60)
baseline_form.exposed_finish = baseline_form.exposed_lunch_start = minutes_since_midnight(13 * 60)
@ -270,7 +270,7 @@ def test_exposed_present_intervals_ending_with_lunch(baseline_form: model_genera
assert present_times(baseline_form.exposed_present_interval()) == correct
def test_exposed_present_lunch_end_before_beginning(baseline_form: model_generator.VirusFormData, data_registry: DataRegistry):
def test_exposed_present_lunch_end_before_beginning(baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry):
baseline_form.exposed_coffee_break_option = 'coffee_break_0'
baseline_form.exposed_lunch_start = minutes_since_midnight(14 * 60)
baseline_form.exposed_lunch_finish = minutes_since_midnight(13 * 60)
@ -287,7 +287,7 @@ def test_exposed_present_lunch_end_before_beginning(baseline_form: model_generat
[9, 20], # lunch_finish after the presence finishing
],
)
def test_exposed_presence_lunch_break(baseline_form: model_generator.VirusFormData, data_registry: DataRegistry, exposed_lunch_start, exposed_lunch_finish):
def test_exposed_presence_lunch_break(baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry, exposed_lunch_start, exposed_lunch_finish):
baseline_form.exposed_lunch_start = minutes_since_midnight(exposed_lunch_start * 60)
baseline_form.exposed_lunch_finish = minutes_since_midnight(exposed_lunch_finish * 60)
with pytest.raises(ValueError, match='exposed lunch break must be within presence times.'):
@ -303,14 +303,14 @@ def test_exposed_presence_lunch_break(baseline_form: model_generator.VirusFormDa
[9, 20], # lunch_finish after the presence finishing
],
)
def test_infected_presence_lunch_break(baseline_form: model_generator.VirusFormData, data_registry: DataRegistry, infected_lunch_start, infected_lunch_finish):
def test_infected_presence_lunch_break(baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry, infected_lunch_start, infected_lunch_finish):
baseline_form.infected_lunch_start = minutes_since_midnight(infected_lunch_start * 60)
baseline_form.infected_lunch_finish = minutes_since_midnight(infected_lunch_finish * 60)
with pytest.raises(ValueError, match='infected lunch break must be within presence times.'):
baseline_form.validate()
def test_exposed_breaks_length(baseline_form: model_generator.VirusFormData, data_registry: DataRegistry):
def test_exposed_breaks_length(baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry):
baseline_form.exposed_coffee_break_option = 'coffee_break_4'
baseline_form.exposed_coffee_duration = 30
baseline_form.exposed_start = minutes_since_midnight(10 * 60)
@ -320,7 +320,7 @@ def test_exposed_breaks_length(baseline_form: model_generator.VirusFormData, dat
baseline_form.validate()
def test_infected_breaks_length(baseline_form: model_generator.VirusFormData, data_registry: DataRegistry):
def test_infected_breaks_length(baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry):
baseline_form.infected_start = minutes_since_midnight(9 * 60)
baseline_form.infected_finish = minutes_since_midnight(12 * 60)
baseline_form.infected_lunch_start = minutes_since_midnight(10 * 60)
@ -332,7 +332,7 @@ def test_infected_breaks_length(baseline_form: model_generator.VirusFormData, da
@pytest.fixture
def coffee_break_between_1045_and_1115(baseline_form: model_generator.VirusFormData):
def coffee_break_between_1045_and_1115(baseline_form: virus_validator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_1'
baseline_form.exposed_coffee_duration = 30
baseline_form.exposed_start = minutes_since_midnight(10 * 60)
@ -390,7 +390,7 @@ def assert_boundaries(interval, boundaries_in_time_string_form):
@pytest.fixture
def breaks_every_25_mins_for_20_mins(baseline_form: model_generator.VirusFormData):
def breaks_every_25_mins_for_20_mins(baseline_form: virus_validator.VirusFormData):
baseline_form.exposed_coffee_break_option = 'coffee_break_4'
baseline_form.exposed_coffee_duration = 20
baseline_form.exposed_start = time2mins("10:00")
@ -435,7 +435,7 @@ def test_present_only_during_second_break(breaks_every_25_mins_for_20_mins):
assert_boundaries(interval, [])
def test_valid_no_lunch(baseline_form: model_generator.VirusFormData, data_registry: DataRegistry):
def test_valid_no_lunch(baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry):
# Check that it is valid to have a 0 length lunch if no lunch is selected.
baseline_form.exposed_lunch_option = False
baseline_form.exposed_lunch_start = minutes_since_midnight(0)
@ -443,7 +443,7 @@ def test_valid_no_lunch(baseline_form: model_generator.VirusFormData, data_regis
assert baseline_form.validate() is None
def test_no_breaks(baseline_form: model_generator.VirusFormData):
def test_no_breaks(baseline_form: virus_validator.VirusFormData):
# Check that the times are correct in the absence of breaks.
baseline_form.infected_dont_have_breaks_with_exposed = False
baseline_form.exposed_lunch_option = False
@ -458,7 +458,7 @@ def test_no_breaks(baseline_form: model_generator.VirusFormData):
assert present_times(baseline_form.infected_present_interval()) == infected_correct
def test_coffee_lunch_breaks(baseline_form: model_generator.VirusFormData):
def test_coffee_lunch_breaks(baseline_form: virus_validator.VirusFormData):
baseline_form.exposed_coffee_duration = 30
baseline_form.exposed_coffee_break_option = 'coffee_break_4'
baseline_form.exposed_start = minutes_since_midnight(9 * 60)
@ -470,7 +470,7 @@ def test_coffee_lunch_breaks(baseline_form: model_generator.VirusFormData):
np.testing.assert_allclose(present_times(baseline_form.exposed_present_interval()), correct, rtol=1e-14)
def test_coffee_lunch_breaks_unbalance(baseline_form: model_generator.VirusFormData):
def test_coffee_lunch_breaks_unbalance(baseline_form: virus_validator.VirusFormData):
baseline_form.exposed_coffee_duration = 30
baseline_form.exposed_coffee_break_option = 'coffee_break_2'
baseline_form.exposed_start = minutes_since_midnight(9 * 60)
@ -481,7 +481,7 @@ def test_coffee_lunch_breaks_unbalance(baseline_form: model_generator.VirusFormD
np.testing.assert_allclose(present_times(baseline_form.exposed_present_interval()), correct, rtol=1e-14)
def test_coffee_breaks(baseline_form: model_generator.VirusFormData):
def test_coffee_breaks(baseline_form: virus_validator.VirusFormData):
baseline_form.exposed_coffee_duration = 10
baseline_form.exposed_coffee_break_option = 'coffee_break_4'
baseline_form.exposed_start = minutes_since_midnight(9 * 60)
@ -494,24 +494,24 @@ def test_coffee_breaks(baseline_form: model_generator.VirusFormData):
def test_key_validation(baseline_form_data, data_registry):
baseline_form_data['activity_type'] = 'invalid key'
with pytest.raises(ValueError):
model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)
def test_key_validation_natural_ventilation_window_type_na(baseline_form_data, data_registry):
baseline_form_data['ventilation_type'] = 'natural_ventilation'
baseline_form_data['window_type'] = 'not-applicable'
with pytest.raises(ValueError, match='window_type cannot be \'not-applicable\''):
model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)
def test_key_validation_natural_ventilation_window_opening_regime_na(baseline_form_data, data_registry):
baseline_form_data['ventilation_type'] = 'natural_ventilation'
baseline_form_data['window_opening_regime'] = 'not-applicable'
with pytest.raises(ValueError, match='window_opening_regime cannot be \'not-applicable\''):
model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)
def test_natural_ventilation_window_opening_periodically(baseline_form: model_generator.VirusFormData, data_registry: DataRegistry):
def test_natural_ventilation_window_opening_periodically(baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry):
baseline_form.window_opening_regime = 'windows_open_periodically'
baseline_form.windows_duration = 20
baseline_form.windows_frequency = 10
@ -523,20 +523,20 @@ def test_key_validation_mech_ventilation_type_na(baseline_form_data, data_regist
baseline_form_data['ventilation_type'] = 'mechanical_ventilation'
baseline_form_data['mechanical_ventilation_type'] = 'not-applicable'
with pytest.raises(ValueError, match='mechanical_ventilation_type cannot be \'not-applicable\''):
model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)
def test_key_validation_event_month(baseline_form_data, data_registry):
baseline_form_data['event_month'] = 'invalid month'
with pytest.raises(ValueError, match='invalid month is not a valid value for event_month'):
model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)
def test_default_types():
# Validate that VirusFormData._DEFAULTS are complete and of the correct type.
# Validate that we have the right types and matching attributes to the DEFAULTS.
fields = {field.name: field for field in dataclasses.fields(model_generator.VirusFormData)}
for field, value in model_generator.VirusFormData._DEFAULTS.items():
fields = {field.name: field for field in dataclasses.fields(virus_validator.VirusFormData)}
for field, value in virus_validator.VirusFormData._DEFAULTS.items():
if field not in fields:
raise ValueError(f"Unmatched default {field}")
@ -557,7 +557,7 @@ def test_default_types():
for field in fields.values():
if field.name == "data_registry":
continue # Skip the assertion for the "data_registry" field
assert field.name in model_generator.VirusFormData._DEFAULTS, f"No default set for field name {field.name}"
assert field.name in virus_validator.VirusFormData._DEFAULTS, f"No default set for field name {field.name}"
def test_form_to_dict(baseline_form):
@ -566,7 +566,7 @@ def test_form_to_dict(baseline_form):
assert 1 < len(stripped) < len(full)
assert 'exposed_coffee_break_option' in stripped
# If we set the value to the default one, it should no longer turn up in the dictionary.
baseline_form.exposed_coffee_break_option = model_generator.VirusFormData._DEFAULTS['exposed_coffee_break_option']
baseline_form.exposed_coffee_break_option = virus_validator.VirusFormData._DEFAULTS['exposed_coffee_break_option']
assert 'exposed_coffee_break_option' not in baseline_form.to_dict(baseline_form, strip_defaults=True)
@ -584,7 +584,7 @@ def test_form_timezone(baseline_form_data, data_registry, longitude, latitude, m
baseline_form_data['location_latitude'] = latitude
baseline_form_data['location_longitude'] = longitude
baseline_form_data['event_month'] = month
form = model_generator.VirusFormData.from_dict(baseline_form_data, data_registry)
form = virus_validator.VirusFormData.from_dict(baseline_form_data, data_registry)
name, offset = form.tz_name_and_utc_offset()
assert name == expected_tz_name
assert offset == expected_offset

View file

@ -1,32 +0,0 @@
import json
import tornado.testing
import caimira.apps.calculator
from caimira.apps.calculator import model_generator
_TIMEOUT = 40.
class TestCalculatorJsonResponse(tornado.testing.AsyncHTTPTestCase):
def setUp(self):
super().setUp()
self.http_client.defaults['request_timeout'] = _TIMEOUT
def get_app(self):
return caimira.apps.calculator.make_app()
@tornado.testing.gen_test(timeout=_TIMEOUT)
def test_json_response(self):
response = yield self.http_client.fetch(
request=self.get_url("/calculator/report-json"),
method="POST",
headers={'content-type': 'application/json'},
body=json.dumps(model_generator.baseline_raw_form_data())
)
self.assertEqual(response.code, 200)
data = json.loads(response.body)
self.assertIsInstance(data['prob_inf'], float)
self.assertIsInstance(data['expected_new_cases'], float)

View file

@ -2,8 +2,8 @@ from typing import Type
import numpy as np
import pytest
from caimira.apps.calculator import model_generator
from caimira.store.data_registry import DataRegistry
from caimira.calculator.validators.virus import virus_validator
from caimira.calculator.store.data_registry import DataRegistry
@pytest.mark.parametrize(
@ -14,7 +14,7 @@ from caimira.store.data_registry import DataRegistry
[{"exposed_breaks": [], "ifected_breaks": []}, 'Unable to fetch "infected_breaks" key. Got "ifected_breaks".'],
]
)
def test_specific_break_structure(break_input, error, baseline_form: model_generator.VirusFormData, data_registry: DataRegistry):
def test_specific_break_structure(break_input, error, baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry):
baseline_form.specific_breaks = break_input
with pytest.raises(TypeError, match=error):
baseline_form.validate()
@ -31,7 +31,7 @@ def test_specific_break_structure(break_input, error, baseline_form: model_gener
[[{"start_time": "10:00", "finish_time": "11"}], 'Wrong time format - "HH:MM". Got "11".'],
]
)
def test_specific_population_break_data_structure(population_break_input, error, baseline_form: model_generator.VirusFormData, data_registry: DataRegistry):
def test_specific_population_break_data_structure(population_break_input, error, baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry):
baseline_form.specific_breaks = {'exposed_breaks': population_break_input, 'infected_breaks': population_break_input}
with pytest.raises(TypeError, match=error):
baseline_form.validate()
@ -46,7 +46,7 @@ def test_specific_population_break_data_structure(population_break_input, error,
[{'exposed_breaks': [], 'infected_breaks': [{"start_time": "08:00", "finish_time": "11:00"}, {"start_time": "14:00", "finish_time": "15:00"}, ]}, "All breaks should be within the simulation time. Got 08:00."],
]
)
def test_specific_break_time(break_input, error, baseline_form: model_generator.VirusFormData):
def test_specific_break_time(break_input, error, baseline_form: virus_validator.VirusFormData):
with pytest.raises(ValueError, match=error):
baseline_form.generate_specific_break_times(breaks_dict=break_input, target='exposed')
baseline_form.generate_specific_break_times(breaks_dict=break_input, target='infected')
@ -65,7 +65,7 @@ def test_specific_break_time(break_input, error, baseline_form: model_generator.
[{"physical_activity": "Light activity", "respiratory_activity": [{"type": "Breathing", "percentag": 50}, {"type": "Speaking", "percentage": 50}]}, 'Unable to fetch "percentage" key. Got "percentag".'],
]
)
def test_precise_activity_structure(precise_activity_input, error, baseline_form: model_generator.VirusFormData, data_registry: DataRegistry):
def test_precise_activity_structure(precise_activity_input, error, baseline_form: virus_validator.VirusFormData, data_registry: DataRegistry):
baseline_form.precise_activity = precise_activity_input
with pytest.raises(TypeError, match=error):
baseline_form.validate()
@ -80,7 +80,7 @@ def test_precise_activity_structure(precise_activity_input, error, baseline_form
[{"physical_activity": "Light activity", "respiratory_activity": [{"type": "Breathing", "percentage": 50}]}, 'The sum of all respiratory activities should be 100. Got 50.'],
]
)
def test_sum_precise_activity(precise_activity_input, error, baseline_form: model_generator.VirusFormData):
def test_sum_precise_activity(precise_activity_input, error, baseline_form: virus_validator.VirusFormData):
baseline_form.precise_activity = precise_activity_input
with pytest.raises(ValueError, match=error):
baseline_form.validate()

View file

@ -1,11 +1,9 @@
from caimira import models
import caimira.data
import caimira.dataclass_utils
import pytest
from caimira.store.data_registry import DataRegistry
from caimira.calculator.models import models
import caimira.calculator.models.data
import caimira.calculator.models.dataclass_utils
from caimira.calculator.store.data_registry import DataRegistry
@pytest.fixture
def data_registry():
@ -61,12 +59,12 @@ def baseline_exposure_model(data_registry, baseline_concentration_model, baselin
@pytest.fixture
def exposure_model_w_outside_temp_changes(data_registry, baseline_exposure_model: models.ExposureModel):
exp_model = caimira.dataclass_utils.nested_replace(
exp_model = caimira.calculator.models.dataclass_utils.nested_replace(
baseline_exposure_model, {
'concentration_model.ventilation': models.SlidingWindow(
data_registry=data_registry,
active=models.PeriodicInterval(2.2 * 60, 1.8 * 60),
outside_temp=caimira.data.GenevaTemperatures['Jan'],
outside_temp=caimira.calculator.models.data.GenevaTemperatures['Jan'],
window_height=1.6,
opening_length=0.6,
)

View file

@ -5,7 +5,7 @@ import numpy as np
import numpy.testing
import pytest
import caimira.data.weather as wx
import caimira.calculator.models.data.weather as wx
def test_nearest_wx_station():

View file

@ -3,8 +3,8 @@ import numpy as np
import typing
import pytest
from caimira import models
from caimira.apps.calculator.co2_model_generator import CO2FormData
from caimira.calculator.models import models
from caimira.calculator.validators.co2.co2_validator import CO2FormData
@pytest.fixture

View file

@ -5,8 +5,8 @@ import numpy.testing as npt
import pytest
from dataclasses import dataclass
from caimira import models
from caimira.store.data_registry import DataRegistry
from caimira.calculator.models import models
from caimira.calculator.store.data_registry import DataRegistry
@dataclass(frozen=True)
class KnownConcentrationModelBase(models._ConcentrationModelBase):

View file

@ -4,8 +4,8 @@ import numpy as np
import numpy.testing as npt
import pytest
from caimira import models
import caimira.dataclass_utils as dc_utils
from caimira.calculator.models import models
from caimira.calculator.models import dataclass_utils as dc_utils
@pytest.fixture
def full_exposure_model(data_registry):

View file

@ -5,11 +5,11 @@ import numpy.testing
import pytest
from dataclasses import dataclass
from caimira import models
from caimira.models import ExposureModel
from caimira.dataclass_utils import replace
from caimira.monte_carlo.data import expiration_distributions
from caimira.store.data_registry import DataRegistry
from caimira.calculator.models import models
from caimira.calculator.models.models import ExposureModel
from caimira.calculator.models.dataclass_utils import replace
from caimira.calculator.models.monte_carlo.data import expiration_distributions
from caimira.calculator.store.data_registry import DataRegistry
@dataclass(frozen=True)
class KnownNormedconcentration(models.ConcentrationModel):

View file

@ -2,7 +2,7 @@ import numpy as np
import numpy.testing as npt
import pytest
from caimira import models
from caimira.calculator.models import models
@pytest.mark.parametrize(

View file

@ -2,7 +2,7 @@ import numpy as np
import numpy.testing as npt
import pytest
from caimira import models
from caimira.calculator.models import models
@pytest.mark.parametrize(

View file

@ -1,8 +1,8 @@
import numpy as np
import pytest
from caimira import models
from caimira import data
from caimira.calculator.models import models
from caimira.calculator.models import data
def test_piecewiseconstantfunction_wrongarguments():

View file

@ -3,10 +3,10 @@ import typing
import numpy as np
import pytest
from caimira import models
import caimira.monte_carlo as mc_models
from caimira.apps.calculator.model_generator import build_expiration
from caimira.monte_carlo.data import short_range_expiration_distributions,\
from caimira.calculator.models import models
import caimira.calculator.models.monte_carlo as mc_models
from caimira.calculator.validators.virus.virus_validator import build_expiration
from caimira.calculator.models.monte_carlo.data import short_range_expiration_distributions,\
expiration_distributions, short_range_distances, activity_distributions
SAMPLE_SIZE = 250_000

View file

@ -1,25 +0,0 @@
import numpy as np
import numpy.testing as npt
import pytest
from caimira import models
@pytest.mark.parametrize(
"inside_temp, humidity, expected_halflife, expected_decay_constant",
[
[293.15, 0.5, 0.5947447349860315, 1.1654532436949188],
[272.15, 0.7, 1.6070844193207476, 0.4313072619127947],
[300.15, 1., 0.17367078830147223, 3.9911558376571805],
[300.15, 0., 6.43, 0.10779893943389507],
[np.array([272.15, 300.15]), np.array([0.7, 0.]),
np.array([1.60708442, 6.43]), np.array([0.43130726, 0.10779894])],
[np.array([293.15, 300.15]), np.array([0.5, 1.]),
np.array([0.59474473, 0.17367079]), np.array([1.16545324, 3.99115584])]
],
)
def test_decay_constant(inside_temp, humidity, expected_halflife, expected_decay_constant):
npt.assert_almost_equal(models.Virus.types['SARS_CoV_2'].halflife(humidity, inside_temp),
expected_halflife)
npt.assert_almost_equal(models.Virus.types['SARS_CoV_2'].decay_constant(humidity, inside_temp),
expected_decay_constant)

View file

@ -1,10 +0,0 @@
"""
High-level tests for the package.
"""
import caimira
def test_version():
assert caimira.__version__ is not None

View file

@ -2,11 +2,11 @@ import numpy as np
import pytest
from retry import retry
import caimira.monte_carlo as mc
from caimira import models
from caimira.dataclass_utils import nested_replace
from caimira.apps.calculator import report_generator
from caimira.monte_carlo.data import activity_distributions, virus_distributions, expiration_distributions
import caimira.calculator.models.monte_carlo as mc
from caimira.calculator.models import models
from caimira.calculator.models.dataclass_utils import nested_replace
from caimira.calculator.report import virus_report_data
from caimira.calculator.models.monte_carlo.data import activity_distributions, virus_distributions, expiration_distributions
@pytest.fixture
@ -72,7 +72,7 @@ def test_conditional_prob_inf_given_vl_dist(data_registry, baseline_exposure_mod
specific_vl = np.log10(mc_model.concentration_model.infected.virus.viral_load_in_sputum)
step = 8/100
actual_pi_means, actual_lower_percentiles, actual_upper_percentiles = (
report_generator.conditional_prob_inf_given_vl_dist(infection_probability, viral_loads, specific_vl, step)
virus_report_data.conditional_prob_inf_given_vl_dist(infection_probability, viral_loads, specific_vl, step)
)
assert np.allclose(actual_pi_means, expected_pi_means, atol=0.002)

View file

@ -1,7 +1,7 @@
import unittest
from unittest.mock import Mock, patch
from caimira.store.data_service import DataService
from caimira.calculator.store.data_service import DataService
class DataServiceTests(unittest.TestCase):

View file

@ -1,6 +1,6 @@
import dataclasses
from caimira.dataclass_utils import nested_replace, walk_dataclass
from caimira.calculator.models.dataclass_utils import nested_replace, walk_dataclass
@dataclasses.dataclass(frozen=True)

View file

@ -5,8 +5,8 @@ import numpy.testing as npt
import pytest
from retry import retry
from caimira import models
from caimira.monte_carlo.data import expiration_distribution
from caimira.calculator.models import models
from caimira.calculator.models.monte_carlo.data import expiration_distribution
def test_multiple_wrong_weight_size():

View file

@ -8,11 +8,11 @@ import numpy.testing as npt
import pytest
from retry import retry
import caimira.monte_carlo as mc
from caimira import models
from caimira.utils import method_cache
from caimira.models import _VectorisedFloat,Interval,SpecificInterval
from caimira.monte_carlo.data import (expiration_distributions,
import caimira.calculator.models.monte_carlo as mc
from caimira.calculator.models import models
from caimira.calculator.models.utils import method_cache
from caimira.calculator.models.models import _VectorisedFloat,Interval,SpecificInterval
from caimira.calculator.models.monte_carlo.data import (expiration_distributions,
expiration_BLO_factors,short_range_expiration_distributions,
short_range_distances,virus_distributions,activity_distributions)

View file

@ -1,7 +1,7 @@
import numpy as np
import pytest
import caimira.models
import caimira.calculator.models.models
@pytest.mark.parametrize(
@ -17,26 +17,26 @@ def test_infected_population_vectorisation(override_params, data_registry):
}
defaults.update(override_params)
office_hours = caimira.models.SpecificInterval(present_times=[(8,17)])
infected = caimira.models.InfectedPopulation(
office_hours = caimira.calculator.models.models.SpecificInterval(present_times=[(8,17)])
infected = caimira.calculator.models.models.InfectedPopulation(
data_registry=data_registry,
number=1,
presence=office_hours,
mask=caimira.models.Mask(
mask=caimira.calculator.models.models.Mask(
factor_exhale=0.95,
η_inhale=0.3,
),
activity=caimira.models.Activity(
activity=caimira.calculator.models.models.Activity(
0.51,
defaults['exhalation_rate'],
),
virus=caimira.models.SARSCoV2(
virus=caimira.calculator.models.models.SARSCoV2(
viral_load_in_sputum=defaults['viral_load_in_sputum'],
infectious_dose=50.,
viable_to_RNA_ratio = 0.5,
transmissibility_factor=1.0,
),
expiration=caimira.models._ExpirationBase.types['Breathing'],
expiration=caimira.calculator.models.models._ExpirationBase.types['Breathing'],
host_immunity=0.,
)
emission_rate = infected.emission_rate(10)

View file

@ -2,8 +2,8 @@ import numpy as np
import numpy.testing as npt
import pytest
import caimira.models as models
import caimira.data as data
import caimira.calculator.models.models as models
import caimira.calculator.models.data as data
def test_no_mask_superspeading_emission_rate(baseline_concentration_model):

Some files were not shown because too many files have changed in this diff Show more