From 214b48451b0149f2dcc75295d436c622acd42bbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gina=20H=C3=A4u=C3=9Fge?= Date: Mon, 10 Aug 2015 14:27:57 +0200 Subject: [PATCH] Turned SessionUser into a delegate That should fix user settings across multiple SessionUser wrappers. User settings were previously only copied upon initialization of the SessionUser, with no update later on, leading to them becoming out of sync with the underlying User instance. This commit changes SessionUser to delegate method and attribute access for anything but its own members to delegate to the wrapped User instance instead. --- src/octoprint/users.py | 17 +++++- tests/users/__init__.py | 112 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 2 deletions(-) create mode 100644 tests/users/__init__.py diff --git a/src/octoprint/users.py b/src/octoprint/users.py index fc5be2b7..53843af6 100644 --- a/src/octoprint/users.py +++ b/src/octoprint/users.py @@ -149,7 +149,7 @@ class UserManager(object): if session is not None: for session in self._session_users_by_session: user = self._session_users_by_session[session] - if username is None or username == user.get_name(): + if username is None or username == user.get_id(): return user break @@ -443,7 +443,7 @@ class User(UserMixin): path = [key] else: path = key - self._set_setting(path, value) + return self._set_setting(path, value) def _get_setting(self, path): s = self._settings @@ -474,6 +474,7 @@ class User(UserMixin): class SessionUser(User): def __init__(self, user): + self._user = user User.__init__(self, user._username, user._passwordHash, user._active, user._roles, user._apikey, user._settings) import string @@ -483,6 +484,18 @@ class SessionUser(User): self._session = "".join(random.choice(chars) for _ in xrange(10)) self._created = time.time() + def __getattribute__(self, item): + if item in ("get_session", "_user", "_session", "_created"): + return object.__getattribute__(self, item) + else: + return getattr(object.__getattribute__(self, "_user"), item) + + def __setattr__(self, item, value): + if item in ("_user", "_session", "_created"): + return object.__setattr__(self, item, value) + else: + return setattr(self._user, item, value) + def get_session(self): return self._session diff --git a/tests/users/__init__.py b/tests/users/__init__.py new file mode 100644 index 00000000..f9f102db --- /dev/null +++ b/tests/users/__init__.py @@ -0,0 +1,112 @@ +# coding=utf-8 +""" +Unit tests for octoprint.users +""" +from __future__ import absolute_import + +__author__ = "Gina Häußge " +__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html' +__copyright__ = "Copyright (C) 2015 The OctoPrint Project - Released under terms of the AGPLv3 License" + +import unittest +import ddt + +import octoprint.users +from octoprint.util import dict_merge + +default_settings = dict(key="value", sub=dict(subkey="subvalue")) + +@ddt.ddt +class UserTestCase(unittest.TestCase): + def setUp(self): + self.user = octoprint.users.User("user", "passwordHash", True, ("user",), apikey="apikey_user", settings=default_settings) + self.admin = octoprint.users.User("admin", "passwordHash", True, ("user","admin"), apikey="apikey_admin", settings=default_settings) + self.inactive = octoprint.users.User("inactive", "passwordHash", False, ("user",), apikey="apikey_inactive", settings=default_settings) + + @ddt.data( + ("user", dict(name="user", + active=True, + admin=False, + user=True, + apikey="apikey_user", + settings=default_settings)), + ("admin", dict(name="admin", + active=True, + admin=True, + user=True, + apikey="apikey_admin", + settings=default_settings)), + ("inactive", dict(name="inactive", + active=False, + admin=False, + user=True, + apikey="apikey_inactive", + settings=default_settings)) + ) + @ddt.unpack + def test_as_dict(self, uservar, expected): + user = getattr(self, uservar) + self.assertDictEqual(expected, user.asDict()) + + @ddt.data( + ("key", "value"), + (["sub", "subkey"], "subvalue"), + ("doesntexist", None) + ) + @ddt.unpack + def test_get_setting(self, key, expected): + value = self.user.get_setting(key) + self.assertEquals(expected, value) + + @ddt.data( + ("otherkey", "othervalue", dict(otherkey="othervalue"), True), + (["sub", "othersubkey"], "othersubvalue", dict(sub=dict(othersubkey="othersubvalue")), True), + ("booleankey", True, dict(booleankey=True), True), + (["newsub", "newsubkey"], "newsubvalue", dict(newsub=dict(newsubkey="newsubvalue")), True), + (["sub", "subkey", "wontwork"], "wontwork", dict(), False) + ) + @ddt.unpack + def test_set_setting_string(self, key, value, update, expected_returnvalue): + returnvalue = self.user.set_setting(key, value) + + expected = dict_merge(default_settings, update) + + self.assertDictEqual(expected, self.user.get_all_settings()) + self.assertEqual(expected_returnvalue, returnvalue) + + def test_check_password(self): + self.assertTrue(self.user.check_password("passwordHash")) + self.assertFalse(self.user.check_password("notThePasswordHash")) + + @ddt.data( + ("user", "User(id=user,name=user,active=True,user=True,admin=False)"), + ("admin", "User(id=admin,name=admin,active=True,user=True,admin=True)"), + ("inactive", "User(id=inactive,name=inactive,active=False,user=True,admin=False)") + ) + @ddt.unpack + def test_repr(self, uservar, output): + self.assertEqual(output, repr(getattr(self, uservar))) + +class SessionUserTestCase(unittest.TestCase): + + def setUp(self): + self.user = octoprint.users.User("username", "passwordHash", True, ("user",), apikey="apikey", settings=dict(key="value")) + + def test_two_sessions(self): + session1 = octoprint.users.SessionUser(self.user) + session2 = octoprint.users.SessionUser(self.user) + + self.assertNotEqual(session1.get_session(), session2.get_session()) + self.assertEqual(session1._user, session2._user) + self.assertEqual(session1._username, session2._username) + + def test_settings_change_propagates(self): + user = octoprint.users.SessionUser(self.user) + self.user.set_setting("otherkey", "othervalue") + + self.assertDictEqual(dict(key="value", otherkey="othervalue"), user.get_all_settings()) + + def test_repr(self): + user = octoprint.users.SessionUser(self.user) + expected = "SessionUser(id=username,name=username,active=True,user=True,admin=False,session={},created={})".format(user._session, user._created) + self.assertEquals(expected, repr(user))