diff --git a/caimira/store/configuration.py b/caimira/store/configuration.py index 155d80b2..bf7390ab 100644 --- a/caimira/store/configuration.py +++ b/caimira/store/configuration.py @@ -438,7 +438,7 @@ class Configuration: "activity": "Seated", "expiration": {"Breathing": 1, "Speaking": 9}, }, - "precise": {"activity": '', "expiration": {}}, + "precise": {"activity": "", "expiration": {}}, } def update(self, data): diff --git a/caimira/store/data_service.py b/caimira/store/data_service.py index e203be48..04fb5353 100644 --- a/caimira/store/data_service.py +++ b/caimira/store/data_service.py @@ -1,7 +1,9 @@ import logging import os import typing +from datetime import datetime, timedelta, timezone +import jwt import requests from .configuration import config @@ -24,12 +26,26 @@ class DataService: self._host = host def _is_valid(self, access_token): - # decode access_token - # check validity + try: + decoded = jwt.decode( + access_token, algorithms=["HS256"], options={"verify_signature": False} + ) + expiration_timestamp = decoded["exp"] + expiration = datetime.utcfromtimestamp(expiration_timestamp).replace( + tzinfo=timezone.utc + ) + now = datetime.now(timezone.utc) + return now < expiration - timedelta( + seconds=5 + ) # 5 seconds time delta to avoid timing issues + except jwt.ExpiredSignatureError: + logger.warning("JWT token expired.") + except jwt.InvalidTokenError: + logger.warning("JWT token invalid.") return False def _login(self): - if self._is_valid(self._access_token): + if self._access_token and self._is_valid(self._access_token): return self._access_token # invalid access_token, fetch it again diff --git a/caimira/tests/test_data_service.py b/caimira/tests/test_data_service.py index 9e0189e7..e5740873 100644 --- a/caimira/tests/test_data_service.py +++ b/caimira/tests/test_data_service.py @@ -1,6 +1,9 @@ +import time import unittest from unittest.mock import Mock, patch +import jwt + from caimira.store.data_service import DataService @@ -10,6 +13,24 @@ class DataServiceTests(unittest.TestCase): self.credentials = {"email": "test@example.com", "password": "password123"} self.data_service = DataService(self.credentials) + def test_jwt_expiration(self): + is_valid = self.data_service._is_valid(None) + self.assertFalse(is_valid) + + now = time.time() + + encoded = jwt.encode({"exp": now - 10}, "very secret", algorithm="HS256") + is_valid = self.data_service._is_valid(encoded) + self.assertFalse(is_valid) + + encoded = jwt.encode({"exp": now}, "very secret", algorithm="HS256") + is_valid = self.data_service._is_valid(encoded) + self.assertFalse(is_valid) + + encoded = jwt.encode({"exp": now + 10}, "very secret", algorithm="HS256") + is_valid = self.data_service._is_valid(encoded) + self.assertTrue(is_valid) + @patch("requests.post") def test_login_successful(self, mock_post): # Mock successful login response diff --git a/setup.py b/setup.py index 41b69c1b..b8eb391d 100644 --- a/setup.py +++ b/setup.py @@ -30,6 +30,7 @@ REQUIREMENTS: dict = { 'numpy', 'pandas', 'psutil', + 'pyjwt', 'python-dateutil', 'retry', 'scipy',