diff --git a/src/octoprint/events.py b/src/octoprint/events.py index 2a3f3136..de1345d8 100644 --- a/src/octoprint/events.py +++ b/src/octoprint/events.py @@ -7,6 +7,8 @@ __license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agp import datetime import logging import subprocess +import Queue +import threading from octoprint.settings import settings @@ -28,6 +30,27 @@ class EventManager(object): self._registeredListeners = {} self._logger = logging.getLogger(__name__) + self._queue = Queue.PriorityQueue() + self._worker = threading.Thread(target=self._work) + self._worker.daemon = True + self._worker.start() + + def _work(self): + while True: + (event, payload) = self._queue.get(True) + + eventListeners = self._registeredListeners.get(event, None) + if eventListeners is None: + return + self._logger.debug("Firing event: %s (Payload: %r)" % (event, payload)) + + for listener in eventListeners: + self._logger.debug("Sending action to %r" % listener) + try: + listener(event, payload) + except: + self._logger.exception("Got an exception while sending event %s (Payload: %r) to %s" % (event, payload, listener)) + def fire(self, event, payload=None): """ Fire an event to anyone subscribed to it @@ -41,17 +64,8 @@ class EventManager(object): if not event in self._registeredListeners.keys(): return - self._logger.debug("Firing event: %s (Payload: %r)" % (event, payload)) + self._queue.put((event, payload), 0) - eventListeners = self._registeredListeners[event] - for listener in eventListeners: - self._logger.debug("Sending action to %r" % listener) - try: - listener(event, payload) - except: - self._logger.exception("Got an exception while sending event %s (Payload: %r) to %s" % (event, payload, listener)) - - def subscribe(self, event, callback): """ Subscribe a listener to an event -- pass in the event name (as a string) and the callback object