From 23e58d2fc18c77c4e34a54debac51b41a60ae4b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gina=20H=C3=A4u=C3=9Fge?= Date: Tue, 11 Jul 2017 14:04:38 +0200 Subject: [PATCH] Make sure resent lines always win before regular ones Introduced a custom send queue type that actually contains two queues, one for resends, one for regular lines. A flag indicates whether lines should be returned from both or only resends. That way we ensure that as soon as we have an active resend request we ignore what was already in the queue and only send the lines we need to resend. Also: PrependQueue => PrependableQueue --- src/octoprint/util/__init__.py | 18 ++--- src/octoprint/util/comm.py | 119 ++++++++++++++++++++++++++++----- 2 files changed, 112 insertions(+), 25 deletions(-) diff --git a/src/octoprint/util/__init__.py b/src/octoprint/util/__init__.py index d43733ce..c62fe8ed 100644 --- a/src/octoprint/util/__init__.py +++ b/src/octoprint/util/__init__.py @@ -1046,7 +1046,7 @@ class InvariantContainer(object): return self._data.__iter__() -class PrependQueue(queue.Queue): +class PrependableQueue(queue.Queue): def __init__(self, maxsize=0): queue.Queue.__init__(self, maxsize=maxsize) @@ -1082,21 +1082,21 @@ class PrependQueue(queue.Queue): self.queue.appendleft(item) -class TypedQueue(PrependQueue): +class TypedQueue(PrependableQueue): def __init__(self, maxsize=0): - PrependQueue.__init__(self, maxsize=maxsize) + PrependableQueue.__init__(self, maxsize=maxsize) self._lookup = set() def put(self, item, item_type=None, *args, **kwargs): - PrependQueue.put(self, (item, item_type), *args, **kwargs) + PrependableQueue.put(self, (item, item_type), *args, **kwargs) def get(self, *args, **kwargs): - item, _ = PrependQueue.get(self, *args, **kwargs) + item, _ = PrependableQueue.get(self, *args, **kwargs) return item def prepend(self, item, item_type=None, *args, **kwargs): - PrependQueue.prepend(self, (item, item_type), *args, **kwargs) + PrependableQueue.prepend(self, (item, item_type), *args, **kwargs) def _put(self, item): _, item_type = item @@ -1106,10 +1106,10 @@ class TypedQueue(PrependQueue): else: self._lookup.add(item_type) - queue.Queue._put(self, item) + PrependableQueue._put(self, item) def _get(self): - item = PrependQueue._get(self) + item = PrependableQueue._get(self) _, item_type = item if item_type is not None: @@ -1125,7 +1125,7 @@ class TypedQueue(PrependQueue): else: self._lookup.add(item_type) - PrependQueue._prepend(self, item) + PrependableQueue._prepend(self, item) class TypeAlreadyInQueue(Exception): def __init__(self, t, *args, **kwargs): diff --git a/src/octoprint/util/comm.py b/src/octoprint/util/comm.py index 3517b970..aa71a9ac 100644 --- a/src/octoprint/util/comm.py +++ b/src/octoprint/util/comm.py @@ -32,7 +32,7 @@ from octoprint.events import eventManager, Events from octoprint.filemanager import valid_file_type from octoprint.filemanager.destinations import FileDestinations from octoprint.util import get_exception_string, sanitize_ascii, filter_non_ascii, CountedEvent, RepeatedTimer, \ - to_unicode, bom_aware_open, TypedQueue, TypeAlreadyInQueue, chunks + to_unicode, bom_aware_open, TypedQueue, PrependableQueue, TypeAlreadyInQueue, chunks try: import _winreg @@ -422,7 +422,7 @@ class MachineCom(object): self._checksum_requiring_commands = settings().get(["serial", "checksumRequiringCommands"]) self._clear_to_send = CountedEvent(max=10, name="comm.clear_to_send") - self._send_queue = TypedQueue() + self._send_queue = SendQueue() self._temperature_timer = None self._sd_status_timer = None @@ -2068,6 +2068,7 @@ class MachineCom(object): self._to_logfile_with_terminal("Got a resend request from the printer: requested line = {}, current line = {}".format(lineToResend, self._currentLine)) self._log_resends_rate_count += 1 + self._send_queue.resend_active = True return True finally: if self._supportResendsWithoutOk: @@ -2096,7 +2097,7 @@ class MachineCom(object): cmd = self._lastLines[-self._resendDelta] lineNumber = self._currentLine - self._resendDelta - result = self._enqueue_for_sending(cmd, linenumber=lineNumber) + result = self._enqueue_for_sending(cmd, linenumber=lineNumber, resend=True) self._resendDelta -= 1 if self._resendDelta <= 0: @@ -2104,6 +2105,8 @@ class MachineCom(object): self._lastResendNumber = None self._currentResendCount = 0 + self._send_queue.resend_active = False + return result def _sendCommand(self, cmd, cmd_type=None, on_sent=None): @@ -2166,7 +2169,7 @@ class MachineCom(object): ##~~ send loop handling - def _enqueue_for_sending(self, command, linenumber=None, command_type=None, on_sent=None): + def _enqueue_for_sending(self, command, linenumber=None, command_type=None, on_sent=None, resend=False): """ Enqueues a command and optional linenumber to use for it in the send queue. @@ -2180,7 +2183,11 @@ class MachineCom(object): """ try: - self._send_queue.put((command, linenumber, command_type, on_sent, False), item_type=command_type) + target = "send" + if resend: + target = "resend" + + self._send_queue.put((command, linenumber, command_type, on_sent, False), item_type=command_type, target=target) return True except TypeAlreadyInQueue as e: self._logger.debug("Type already in send queue: " + e.type) @@ -2188,7 +2195,7 @@ class MachineCom(object): def _send_loop(self): """ - The send loop is reponsible of sending commands in ``self._send_queue`` over the line, if it is cleared for + The send loop is responsible of sending commands in ``self._send_queue`` over the line, if it is cleared for sending (through received ``ok`` responses from the printer's firmware. """ @@ -2241,16 +2248,14 @@ class MachineCom(object): if len(results) > 1: with self._sendingLock: - # last command gets on_sent attached - last = results[-1] - self._send_queue.prepend((last[0], None, None, on_sent, True)) - on_sent = None - - # middle gets prepended reversed (so order gets restored) - if len(results) > 2: - to_prepend = reversed(results[1:-1]) - for m in to_prepend: - self._send_queue.prepend((m[0], None, None, None, True)) + # prepend reversed (so order gets restored) + to_prepend = reversed(results[1:-1]) + for m in to_prepend: + try: + self._send_queue.prepend((m[0], None, m[1], on_sent, True), item_type=m[1]) + on_sent = None + except TypeAlreadyInQueue: + pass # we only actually send the first entry here command, _, gcode, subcode = results[0] @@ -2926,6 +2931,88 @@ class StreamingGcodeFileInformation(PrintingGcodeFileInformation): duration=duration) self._logger.info("Finished in {duration:.3f} s. Approx. transfer rate of {rate:.3f} lines/s or {time_per_line:.3f} ms per line".format(**stats)) + +class SendQueue(PrependableQueue): + + def __init__(self, maxsize=0): + PrependableQueue.__init__(self, maxsize=maxsize) + + self._resend_queue = PrependableQueue() + self._send_queue = PrependableQueue() + self._lookup = set() + + self._resend_active = False + + @property + def resend_active(self): + return self._resend_active + + @resend_active.setter + def resend_active(self, resend_active): + with self.mutex: + self._resend_active = resend_active + + def prepend(self, item, item_type=None, target=None, block=True, timeout=None): + PrependableQueue.prepend(self, (item, item_type, target), block=block, timeout=timeout) + + def put(self, item, item_type=None, target=None, block=True, timeout=None): + PrependableQueue.put(self, (item, item_type, target), block=block, timeout=timeout) + + def get(self, block=True, timeout=None): + item, _, _ = PrependableQueue.get(self, block=block, timeout=timeout) + return item + + def _put(self, item): + _, item_type, target = item + if item_type is not None: + if item_type in self._lookup: + raise TypeAlreadyInQueue(item_type, "Type {} is already in queue".format(item_type)) + else: + self._lookup.add(item_type) + + if target == "resend": + self._resend_queue.put(item) + else: + self._send_queue.put(item) + + pass + + def _prepend(self, item): + _, item_type, target = item + if item_type is not None: + if item_type in self._lookup: + raise TypeAlreadyInQueue(item_type, "Type {} is already in queue".format(item_type)) + else: + self._lookup.add(item_type) + + if target == "resend": + self._resend_queue.prepend(item) + else: + self._send_queue.prepend(item) + + def _get(self): + if self.resend_active: + item = self._resend_queue.get(block=False) + else: + try: + item = self._resend_queue.get(block=False) + except queue.Empty: + item = self._send_queue.get(block=False) + + _, item_type, _ = item + if item_type is not None: + if item_type in self._lookup: + self._lookup.remove(item_type) + + return item + + def _qsize(self): + if self.resend_active: + return self._resend_queue.qsize() + else: + return self._resend_queue.qsize() + self._send_queue.qsize() + + def get_new_timeout(type, intervals): now = time.time() return now + intervals.get(type, 0.0)