From 5e0b53b651a018fd6c7bb186fa4123de801557f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gina=20H=C3=A4u=C3=9Fge?= Date: Fri, 23 Jun 2017 17:56:17 +0200 Subject: [PATCH] Better thread safety for SockJS Heartbeat messages could still be sent out of turn, causing message corruption or other weird things. This should hopefully fix this. --- src/octoprint/server/__init__.py | 3 +- src/octoprint/server/util/sockjs.py | 46 ++++++++++++++++++++++------- 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/src/octoprint/server/__init__.py b/src/octoprint/server/__init__.py index a09fce46..bf922010 100644 --- a/src/octoprint/server/__init__.py +++ b/src/octoprint/server/__init__.py @@ -359,7 +359,8 @@ class Server(object): ioloop = IOLoop() ioloop.install() - self._router = SockJSRouter(self._create_socket_connection, "/sockjs") + self._router = SockJSRouter(self._create_socket_connection, "/sockjs", + session_kls=util.sockjs.ThreadSafeSession) upload_suffixes = dict(name=self._settings.get(["server", "uploads", "nameSuffix"]), path=self._settings.get(["server", "uploads", "pathSuffix"])) diff --git a/src/octoprint/server/util/sockjs.py b/src/octoprint/server/util/sockjs.py index af2f106b..bcd027a4 100644 --- a/src/octoprint/server/util/sockjs.py +++ b/src/octoprint/server/util/sockjs.py @@ -8,6 +8,7 @@ __copyright__ = "Copyright (C) 2014 The OctoPrint Project - Released under terms import logging import threading import sockjs.tornado +import sockjs.tornado.session import time import octoprint.timelapse @@ -21,6 +22,34 @@ from octoprint.settings import settings import octoprint.printer +class ThreadSafeSession(sockjs.tornado.session.Session): + def __init__(self, conn, server, session_id, expiry=None): + sockjs.tornado.session.Session.__init__(self, conn, server, session_id, expiry=expiry) + + def set_handler(self, handler, start_heartbeat=True): + if getattr(handler, "__orig_send_pack", None) is None: + orig_send_pack = handler.send_pack + mutex = threading.RLock() + + def send_pack(*args, **kwargs): + with mutex: + return orig_send_pack(*args, **kwargs) + + handler.send_pack = send_pack + setattr(handler, "__orig_send_pack", orig_send_pack) + + return sockjs.tornado.session.Session.set_handler(self, handler, start_heartbeat=start_heartbeat) + + def remove_handler(self, handler): + result = sockjs.tornado.session.Session.remove_handler(self, handler) + + if getattr(handler, "__orig_send_pack", None) is not None: + handler.send_pack = getattr(handler, "__orig_send_pack") + delattr(handler, "__orig_send_pack") + + return result + + class PrinterStateConnection(sockjs.tornado.SockJSConnection, octoprint.printer.PrinterCallback): def __init__(self, printer, fileManager, analysisQueue, userManager, eventManager, pluginManager, session): sockjs.tornado.SockJSConnection.__init__(self, session) @@ -47,8 +76,6 @@ class PrinterStateConnection(sockjs.tornado.SockJSConnection, octoprint.printer. self._lastCurrent = 0 self._baseRateLimit = 0.5 - self._emit_mutex = threading.RLock() - def _getRemoteAddress(self, info): forwardedFor = info.headers.get("X-Forwarded-For") if forwardedFor is not None: @@ -205,11 +232,10 @@ class PrinterStateConnection(sockjs.tornado.SockJSConnection, octoprint.printer. self.sendEvent(event, payload) def _emit(self, type, payload): - with self._emit_mutex: - try: - self.send({type: payload}) - except Exception as e: - if self._logger.isEnabledFor(logging.DEBUG): - self._logger.exception("Could not send message to client {}".format(self._remoteAddress)) - else: - self._logger.warn("Could not send message to client {}: {}".format(self._remoteAddress, e)) + try: + self.send({type: payload}) + except Exception as e: + if self._logger.isEnabledFor(logging.DEBUG): + self._logger.exception("Could not send message to client {}".format(self._remoteAddress)) + else: + self._logger.warn("Could not send message to client {}: {}".format(self._remoteAddress, e))