diff --git a/src/octoprint/util/__init__.py b/src/octoprint/util/__init__.py index 02d96793..9de498e9 100644 --- a/src/octoprint/util/__init__.py +++ b/src/octoprint/util/__init__.py @@ -18,6 +18,7 @@ import threading from functools import wraps import warnings import contextlib +import Queue as queue logger = logging.getLogger(__name__) @@ -884,3 +885,43 @@ class InvariantContainer(object): def __iter__(self): return self._data.__iter__() + + +class TypedQueue(queue.Queue): + + def __init__(self, maxsize=0): + queue.Queue.__init__(self, maxsize=maxsize) + self._lookup = set() + + def put(self, item, item_type=None, *args, **kwargs): + queue.Queue.put(self, (item, item_type), *args, **kwargs) + + def get(self, *args, **kwargs): + item, _ = queue.Queue.get(self, *args, **kwargs) + return item + + def _put(self, item): + _, item_type = item + if item_type is not None: + if item_type in self._lookup: + raise TypeAlreadyInQueue(item_type, "Type {} is already in queue".format(item_type)) + else: + self._lookup.add(item_type) + + queue.Queue._put(self, item) + + def _get(self): + item = queue.Queue._get(self) + _, item_type = item + + if item_type is not None: + self._lookup.discard(item_type) + + return item + + +class TypeAlreadyInQueue(Exception): + def __init__(self, t, *args, **kwargs): + Exception.__init__(self, *args, **kwargs) + self.type = t +