Merge branch 'feature/store-token-validation' into 'master'

data service: add JWT expiration validation

See merge request caimira/caimira!470
This commit is contained in:
Luis Aleixo 2023-11-22 13:38:02 +01:00
commit 8fddfc3a78
4 changed files with 42 additions and 4 deletions

View file

@ -438,7 +438,7 @@ class Configuration:
"activity": "Seated",
"expiration": {"Breathing": 1, "Speaking": 9},
},
"precise": {"activity": '', "expiration": {}},
"precise": {"activity": "", "expiration": {}},
}
def update(self, data):

View file

@ -1,7 +1,9 @@
import logging
import os
import typing
from datetime import datetime, timedelta, timezone
import jwt
import requests
from .configuration import config
@ -24,12 +26,26 @@ class DataService:
self._host = host
def _is_valid(self, access_token):
# decode access_token
# check validity
try:
decoded = jwt.decode(
access_token, algorithms=["HS256"], options={"verify_signature": False}
)
expiration_timestamp = decoded["exp"]
expiration = datetime.utcfromtimestamp(expiration_timestamp).replace(
tzinfo=timezone.utc
)
now = datetime.now(timezone.utc)
return now < expiration - timedelta(
seconds=5
) # 5 seconds time delta to avoid timing issues
except jwt.ExpiredSignatureError:
logger.warning("JWT token expired.")
except jwt.InvalidTokenError:
logger.warning("JWT token invalid.")
return False
def _login(self):
if self._is_valid(self._access_token):
if self._access_token and self._is_valid(self._access_token):
return self._access_token
# invalid access_token, fetch it again

View file

@ -1,6 +1,9 @@
import time
import unittest
from unittest.mock import Mock, patch
import jwt
from caimira.store.data_service import DataService
@ -10,6 +13,24 @@ class DataServiceTests(unittest.TestCase):
self.credentials = {"email": "test@example.com", "password": "password123"}
self.data_service = DataService(self.credentials)
def test_jwt_expiration(self):
is_valid = self.data_service._is_valid(None)
self.assertFalse(is_valid)
now = time.time()
encoded = jwt.encode({"exp": now - 10}, "very secret", algorithm="HS256")
is_valid = self.data_service._is_valid(encoded)
self.assertFalse(is_valid)
encoded = jwt.encode({"exp": now}, "very secret", algorithm="HS256")
is_valid = self.data_service._is_valid(encoded)
self.assertFalse(is_valid)
encoded = jwt.encode({"exp": now + 10}, "very secret", algorithm="HS256")
is_valid = self.data_service._is_valid(encoded)
self.assertTrue(is_valid)
@patch("requests.post")
def test_login_successful(self, mock_post):
# Mock successful login response

View file

@ -30,6 +30,7 @@ REQUIREMENTS: dict = {
'numpy',
'pandas',
'psutil',
'pyjwt',
'python-dateutil',
'retry',
'scipy',