Better thread safety for SockJS
Heartbeat messages could still be sent out of turn, causing message corruption or other weird things. This should hopefully fix this.
This commit is contained in:
parent
1ebfa4b1dd
commit
5e0b53b651
2 changed files with 38 additions and 11 deletions
|
|
@ -359,7 +359,8 @@ class Server(object):
|
|||
ioloop = IOLoop()
|
||||
ioloop.install()
|
||||
|
||||
self._router = SockJSRouter(self._create_socket_connection, "/sockjs")
|
||||
self._router = SockJSRouter(self._create_socket_connection, "/sockjs",
|
||||
session_kls=util.sockjs.ThreadSafeSession)
|
||||
|
||||
upload_suffixes = dict(name=self._settings.get(["server", "uploads", "nameSuffix"]), path=self._settings.get(["server", "uploads", "pathSuffix"]))
|
||||
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ __copyright__ = "Copyright (C) 2014 The OctoPrint Project - Released under terms
|
|||
import logging
|
||||
import threading
|
||||
import sockjs.tornado
|
||||
import sockjs.tornado.session
|
||||
import time
|
||||
|
||||
import octoprint.timelapse
|
||||
|
|
@ -21,6 +22,34 @@ from octoprint.settings import settings
|
|||
import octoprint.printer
|
||||
|
||||
|
||||
class ThreadSafeSession(sockjs.tornado.session.Session):
|
||||
def __init__(self, conn, server, session_id, expiry=None):
|
||||
sockjs.tornado.session.Session.__init__(self, conn, server, session_id, expiry=expiry)
|
||||
|
||||
def set_handler(self, handler, start_heartbeat=True):
|
||||
if getattr(handler, "__orig_send_pack", None) is None:
|
||||
orig_send_pack = handler.send_pack
|
||||
mutex = threading.RLock()
|
||||
|
||||
def send_pack(*args, **kwargs):
|
||||
with mutex:
|
||||
return orig_send_pack(*args, **kwargs)
|
||||
|
||||
handler.send_pack = send_pack
|
||||
setattr(handler, "__orig_send_pack", orig_send_pack)
|
||||
|
||||
return sockjs.tornado.session.Session.set_handler(self, handler, start_heartbeat=start_heartbeat)
|
||||
|
||||
def remove_handler(self, handler):
|
||||
result = sockjs.tornado.session.Session.remove_handler(self, handler)
|
||||
|
||||
if getattr(handler, "__orig_send_pack", None) is not None:
|
||||
handler.send_pack = getattr(handler, "__orig_send_pack")
|
||||
delattr(handler, "__orig_send_pack")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class PrinterStateConnection(sockjs.tornado.SockJSConnection, octoprint.printer.PrinterCallback):
|
||||
def __init__(self, printer, fileManager, analysisQueue, userManager, eventManager, pluginManager, session):
|
||||
sockjs.tornado.SockJSConnection.__init__(self, session)
|
||||
|
|
@ -47,8 +76,6 @@ class PrinterStateConnection(sockjs.tornado.SockJSConnection, octoprint.printer.
|
|||
self._lastCurrent = 0
|
||||
self._baseRateLimit = 0.5
|
||||
|
||||
self._emit_mutex = threading.RLock()
|
||||
|
||||
def _getRemoteAddress(self, info):
|
||||
forwardedFor = info.headers.get("X-Forwarded-For")
|
||||
if forwardedFor is not None:
|
||||
|
|
@ -205,11 +232,10 @@ class PrinterStateConnection(sockjs.tornado.SockJSConnection, octoprint.printer.
|
|||
self.sendEvent(event, payload)
|
||||
|
||||
def _emit(self, type, payload):
|
||||
with self._emit_mutex:
|
||||
try:
|
||||
self.send({type: payload})
|
||||
except Exception as e:
|
||||
if self._logger.isEnabledFor(logging.DEBUG):
|
||||
self._logger.exception("Could not send message to client {}".format(self._remoteAddress))
|
||||
else:
|
||||
self._logger.warn("Could not send message to client {}: {}".format(self._remoteAddress, e))
|
||||
try:
|
||||
self.send({type: payload})
|
||||
except Exception as e:
|
||||
if self._logger.isEnabledFor(logging.DEBUG):
|
||||
self._logger.exception("Could not send message to client {}".format(self._remoteAddress))
|
||||
else:
|
||||
self._logger.warn("Could not send message to client {}: {}".format(self._remoteAddress, e))
|
||||
|
|
|
|||
Loading…
Reference in a new issue