Turned SessionUser into a delegate
That should fix user settings across multiple SessionUser wrappers. User settings were previously only copied upon initialization of the SessionUser, with no update later on, leading to them becoming out of sync with the underlying User instance. This commit changes SessionUser to delegate method and attribute access for anything but its own members to delegate to the wrapped User instance instead.
This commit is contained in:
parent
54b981b1ec
commit
214b48451b
2 changed files with 127 additions and 2 deletions
|
|
@ -149,7 +149,7 @@ class UserManager(object):
|
|||
if session is not None:
|
||||
for session in self._session_users_by_session:
|
||||
user = self._session_users_by_session[session]
|
||||
if username is None or username == user.get_name():
|
||||
if username is None or username == user.get_id():
|
||||
return user
|
||||
break
|
||||
|
||||
|
|
@ -443,7 +443,7 @@ class User(UserMixin):
|
|||
path = [key]
|
||||
else:
|
||||
path = key
|
||||
self._set_setting(path, value)
|
||||
return self._set_setting(path, value)
|
||||
|
||||
def _get_setting(self, path):
|
||||
s = self._settings
|
||||
|
|
@ -474,6 +474,7 @@ class User(UserMixin):
|
|||
|
||||
class SessionUser(User):
|
||||
def __init__(self, user):
|
||||
self._user = user
|
||||
User.__init__(self, user._username, user._passwordHash, user._active, user._roles, user._apikey, user._settings)
|
||||
|
||||
import string
|
||||
|
|
@ -483,6 +484,18 @@ class SessionUser(User):
|
|||
self._session = "".join(random.choice(chars) for _ in xrange(10))
|
||||
self._created = time.time()
|
||||
|
||||
def __getattribute__(self, item):
|
||||
if item in ("get_session", "_user", "_session", "_created"):
|
||||
return object.__getattribute__(self, item)
|
||||
else:
|
||||
return getattr(object.__getattribute__(self, "_user"), item)
|
||||
|
||||
def __setattr__(self, item, value):
|
||||
if item in ("_user", "_session", "_created"):
|
||||
return object.__setattr__(self, item, value)
|
||||
else:
|
||||
return setattr(self._user, item, value)
|
||||
|
||||
def get_session(self):
|
||||
return self._session
|
||||
|
||||
|
|
|
|||
112
tests/users/__init__.py
Normal file
112
tests/users/__init__.py
Normal file
|
|
@ -0,0 +1,112 @@
|
|||
# coding=utf-8
|
||||
"""
|
||||
Unit tests for octoprint.users
|
||||
"""
|
||||
from __future__ import absolute_import
|
||||
|
||||
__author__ = "Gina Häußge <osd@foosel.net>"
|
||||
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
|
||||
__copyright__ = "Copyright (C) 2015 The OctoPrint Project - Released under terms of the AGPLv3 License"
|
||||
|
||||
import unittest
|
||||
import ddt
|
||||
|
||||
import octoprint.users
|
||||
from octoprint.util import dict_merge
|
||||
|
||||
default_settings = dict(key="value", sub=dict(subkey="subvalue"))
|
||||
|
||||
@ddt.ddt
|
||||
class UserTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.user = octoprint.users.User("user", "passwordHash", True, ("user",), apikey="apikey_user", settings=default_settings)
|
||||
self.admin = octoprint.users.User("admin", "passwordHash", True, ("user","admin"), apikey="apikey_admin", settings=default_settings)
|
||||
self.inactive = octoprint.users.User("inactive", "passwordHash", False, ("user",), apikey="apikey_inactive", settings=default_settings)
|
||||
|
||||
@ddt.data(
|
||||
("user", dict(name="user",
|
||||
active=True,
|
||||
admin=False,
|
||||
user=True,
|
||||
apikey="apikey_user",
|
||||
settings=default_settings)),
|
||||
("admin", dict(name="admin",
|
||||
active=True,
|
||||
admin=True,
|
||||
user=True,
|
||||
apikey="apikey_admin",
|
||||
settings=default_settings)),
|
||||
("inactive", dict(name="inactive",
|
||||
active=False,
|
||||
admin=False,
|
||||
user=True,
|
||||
apikey="apikey_inactive",
|
||||
settings=default_settings))
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_as_dict(self, uservar, expected):
|
||||
user = getattr(self, uservar)
|
||||
self.assertDictEqual(expected, user.asDict())
|
||||
|
||||
@ddt.data(
|
||||
("key", "value"),
|
||||
(["sub", "subkey"], "subvalue"),
|
||||
("doesntexist", None)
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_get_setting(self, key, expected):
|
||||
value = self.user.get_setting(key)
|
||||
self.assertEquals(expected, value)
|
||||
|
||||
@ddt.data(
|
||||
("otherkey", "othervalue", dict(otherkey="othervalue"), True),
|
||||
(["sub", "othersubkey"], "othersubvalue", dict(sub=dict(othersubkey="othersubvalue")), True),
|
||||
("booleankey", True, dict(booleankey=True), True),
|
||||
(["newsub", "newsubkey"], "newsubvalue", dict(newsub=dict(newsubkey="newsubvalue")), True),
|
||||
(["sub", "subkey", "wontwork"], "wontwork", dict(), False)
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_set_setting_string(self, key, value, update, expected_returnvalue):
|
||||
returnvalue = self.user.set_setting(key, value)
|
||||
|
||||
expected = dict_merge(default_settings, update)
|
||||
|
||||
self.assertDictEqual(expected, self.user.get_all_settings())
|
||||
self.assertEqual(expected_returnvalue, returnvalue)
|
||||
|
||||
def test_check_password(self):
|
||||
self.assertTrue(self.user.check_password("passwordHash"))
|
||||
self.assertFalse(self.user.check_password("notThePasswordHash"))
|
||||
|
||||
@ddt.data(
|
||||
("user", "User(id=user,name=user,active=True,user=True,admin=False)"),
|
||||
("admin", "User(id=admin,name=admin,active=True,user=True,admin=True)"),
|
||||
("inactive", "User(id=inactive,name=inactive,active=False,user=True,admin=False)")
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_repr(self, uservar, output):
|
||||
self.assertEqual(output, repr(getattr(self, uservar)))
|
||||
|
||||
class SessionUserTestCase(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.user = octoprint.users.User("username", "passwordHash", True, ("user",), apikey="apikey", settings=dict(key="value"))
|
||||
|
||||
def test_two_sessions(self):
|
||||
session1 = octoprint.users.SessionUser(self.user)
|
||||
session2 = octoprint.users.SessionUser(self.user)
|
||||
|
||||
self.assertNotEqual(session1.get_session(), session2.get_session())
|
||||
self.assertEqual(session1._user, session2._user)
|
||||
self.assertEqual(session1._username, session2._username)
|
||||
|
||||
def test_settings_change_propagates(self):
|
||||
user = octoprint.users.SessionUser(self.user)
|
||||
self.user.set_setting("otherkey", "othervalue")
|
||||
|
||||
self.assertDictEqual(dict(key="value", otherkey="othervalue"), user.get_all_settings())
|
||||
|
||||
def test_repr(self):
|
||||
user = octoprint.users.SessionUser(self.user)
|
||||
expected = "SessionUser(id=username,name=username,active=True,user=True,admin=False,session={},created={})".format(user._session, user._created)
|
||||
self.assertEquals(expected, repr(user))
|
||||
Loading…
Reference in a new issue