Use wrapt.ObjectProxy for SessionUser wrapper class

Our old manual wrapped didn't properly work in some aspects, this
version now should hopefully take care of most common proxy object
pitfalls.
This commit is contained in:
Gina Häußge 2017-06-27 15:03:37 +02:00
parent 2e11a7b860
commit 12d40122d1
5 changed files with 53 additions and 34 deletions

View file

@ -48,7 +48,8 @@ INSTALL_REQUIRES = [
"future>=0.15,<0.16",
"scandir>=1.3,<1.4",
"websocket-client>=0.40,<0.41",
"python-dateutil>=2.6,<2.7"
"python-dateutil>=2.6,<2.7",
"wrapt>=1.10.10,<1.11"
]
if sys.platform == "darwin":

View file

@ -207,7 +207,7 @@ def login():
if octoprint.server.userManager.checkPassword(username, password):
if octoprint.server.userManager.enabled:
user = octoprint.server.userManager.login_user(user)
session["usersession.id"] = user.get_session()
session["usersession.id"] = user.session
g.user = user
login_user(user, remember=remember)
identity_changed.send(current_app._get_current_object(), identity=Identity(user.get_id()))

View file

@ -486,8 +486,8 @@ def passive_login():
if user is not None and not user.is_anonymous():
flask.ext.principal.identity_changed.send(flask.current_app._get_current_object(), identity=flask.ext.principal.Identity(user.get_id()))
if hasattr(user, "get_session"):
flask.session["usersession.id"] = user.get_session()
if hasattr(user, "session"):
flask.session["usersession.id"] = user.session
flask.g.user = user
return flask.jsonify(user.asDict())
elif settings().getBoolean(["accessControl", "autologinLocal"]) \
@ -505,7 +505,7 @@ def passive_login():
user = octoprint.server.userManager.findUser(autologinAs)
if user is not None:
user = octoprint.server.userManager.login_user(user)
flask.session["usersession.id"] = user.get_session()
flask.session["usersession.id"] = user.session
flask.g.user = user
flask.ext.login.login_user(user)
flask.ext.principal.identity_changed.send(flask.current_app._get_current_object(), identity=flask.ext.principal.Identity(user.get_id()))

View file

@ -13,12 +13,14 @@ import os
import yaml
import uuid
import wrapt
import logging
from builtins import range, bytes
from octoprint.settings import settings
from octoprint.util import atomic_write, to_str
from octoprint.util import atomic_write, to_str, deprecated
class UserManager(object):
valid_roles = ["user", "admin"]
@ -58,13 +60,13 @@ class UserManager(object):
if not isinstance(user, SessionUser):
user = SessionUser(user)
self._session_users_by_session[user.get_session()] = user
self._session_users_by_session[user.session] = user
userid = user.get_id()
if not userid in self._sessionids_by_userid:
self._sessionids_by_userid[userid] = set()
self._sessionids_by_userid[userid].add(user.get_session())
self._sessionids_by_userid[userid].add(user.session)
self._logger.debug("Logged in user: %r" % user)
@ -81,7 +83,7 @@ class UserManager(object):
return
userid = user.get_id()
sessionid = user.get_session()
sessionid = user.session
if userid in self._sessionids_by_userid:
try:
@ -99,7 +101,7 @@ class UserManager(object):
for session, user in self._session_users_by_session.items():
if not isinstance(user, SessionUser):
continue
if user._created + (24 * 60 * 60) < time.time():
if user.created + (24 * 60 * 60) < time.time():
self.logout_user(user)
@staticmethod
@ -505,37 +507,34 @@ class User(UserMixin):
def __repr__(self):
return "User(id=%s,name=%s,active=%r,user=%r,admin=%r)" % (self.get_id(), self.get_name(), self.is_active(), self.is_user(), self.is_admin())
class SessionUser(User):
class SessionUser(wrapt.ObjectProxy):
def __init__(self, user):
self._user = user
wrapt.ObjectProxy.__init__(self, user)
import string
import random
import time
chars = string.ascii_uppercase + string.ascii_lowercase + string.digits
self._session = "".join(random.choice(chars) for _ in range(10))
self._created = time.time()
self._self_session = "".join(random.choice(chars) for _ in range(10))
self._self_created = time.time()
def __getattribute__(self, item):
if item in ("get_session", "update_user", "_user", "_session", "_created"):
return object.__getattribute__(self, item)
else:
return getattr(object.__getattribute__(self, "_user"), item)
@property
def session(self):
return self._self_session
def __setattr__(self, item, value):
if item in ("_user", "_session", "_created"):
return object.__setattr__(self, item, value)
else:
return setattr(self._user, item, value)
@property
def created(self):
return self._self_created
@deprecated("SessionUser.get_session() has been deprecated, use SessionUser.session instead", since="1.3.5")
def get_session(self):
return self._session
return self.session
def update_user(self, user):
self._user = user
self.__wrapped__ = user
def __repr__(self):
return "SessionUser(id=%s,name=%s,active=%r,user=%r,admin=%r,session=%s,created=%s)" % (self.get_id(), self.get_name(), self.is_active(), self.is_user(), self.is_admin(), self._session, self._created)
return "SessionUser({!r},session={},created={})".format(self.__wrapped__, self.session, self.created)
##~~ DummyUser object to use when accessControl is disabled

View file

@ -21,17 +21,36 @@ class SessionUserTestCase(unittest.TestCase):
session1 = octoprint.users.SessionUser(self.user)
session2 = octoprint.users.SessionUser(self.user)
self.assertNotEqual(session1.get_session(), session2.get_session())
self.assertEqual(session1._user, session2._user)
self.assertEqual(session1._username, session2._username)
# session should be different, wrapped object should be identical
self.assertNotEqual(session1.session, session2.session)
self.assertEqual(session1.__wrapped__, session2.__wrapped__)
self.assertEqual(session1.get_name(), session2.get_name())
def test_settings_change_propagates(self):
user = octoprint.users.SessionUser(self.user)
self.user.set_setting("otherkey", "othervalue")
session1 = octoprint.users.SessionUser(self.user)
session2 = octoprint.users.SessionUser(self.user)
self.assertDictEqual(dict(key="value", otherkey="othervalue"), user.get_all_settings())
# change should propagate from User to SessionUser
self.user.set_setting("otherkey", "othervalue")
self.assertDictEqual(dict(key="value", otherkey="othervalue"), session1.get_all_settings())
# change should propagate from SessionUser to SessionUser
session2.set_setting("otherkey", "yetanothervalue")
self.assertDictEqual(dict(key="value", otherkey="yetanothervalue"), session1.get_all_settings())
def test_repr(self):
user = octoprint.users.SessionUser(self.user)
expected = "SessionUser(id=username,name=username,active=True,user=True,admin=False,session={},created={})".format(user._session, user._created)
expected = "SessionUser({!r},session={},created={})".format(self.user, user.session, user.created)
self.assertEqual(expected, repr(user))
def test_isinstance(self):
session = octoprint.users.SessionUser(self.user)
# needs to be detected as User instance
self.assertTrue(isinstance(session, octoprint.users.User))
# also needs to be detected as SessionUser instance
self.assertTrue(isinstance(session, octoprint.users.SessionUser))
# but wrapped user should NOT be detected as SessionUser instance of course
self.assertFalse(isinstance(self.user, octoprint.users.SessionUser))