diff --git a/src/octoprint/util/__init__.py b/src/octoprint/util/__init__.py index d43733ce..c62fe8ed 100644 --- a/src/octoprint/util/__init__.py +++ b/src/octoprint/util/__init__.py @@ -1046,7 +1046,7 @@ class InvariantContainer(object): return self._data.__iter__() -class PrependQueue(queue.Queue): +class PrependableQueue(queue.Queue): def __init__(self, maxsize=0): queue.Queue.__init__(self, maxsize=maxsize) @@ -1082,21 +1082,21 @@ class PrependQueue(queue.Queue): self.queue.appendleft(item) -class TypedQueue(PrependQueue): +class TypedQueue(PrependableQueue): def __init__(self, maxsize=0): - PrependQueue.__init__(self, maxsize=maxsize) + PrependableQueue.__init__(self, maxsize=maxsize) self._lookup = set() def put(self, item, item_type=None, *args, **kwargs): - PrependQueue.put(self, (item, item_type), *args, **kwargs) + PrependableQueue.put(self, (item, item_type), *args, **kwargs) def get(self, *args, **kwargs): - item, _ = PrependQueue.get(self, *args, **kwargs) + item, _ = PrependableQueue.get(self, *args, **kwargs) return item def prepend(self, item, item_type=None, *args, **kwargs): - PrependQueue.prepend(self, (item, item_type), *args, **kwargs) + PrependableQueue.prepend(self, (item, item_type), *args, **kwargs) def _put(self, item): _, item_type = item @@ -1106,10 +1106,10 @@ class TypedQueue(PrependQueue): else: self._lookup.add(item_type) - queue.Queue._put(self, item) + PrependableQueue._put(self, item) def _get(self): - item = PrependQueue._get(self) + item = PrependableQueue._get(self) _, item_type = item if item_type is not None: @@ -1125,7 +1125,7 @@ class TypedQueue(PrependQueue): else: self._lookup.add(item_type) - PrependQueue._prepend(self, item) + PrependableQueue._prepend(self, item) class TypeAlreadyInQueue(Exception): def __init__(self, t, *args, **kwargs): diff --git a/src/octoprint/util/comm.py b/src/octoprint/util/comm.py index 3517b970..aa71a9ac 100644 --- a/src/octoprint/util/comm.py +++ b/src/octoprint/util/comm.py @@ -32,7 +32,7 @@ from octoprint.events import eventManager, Events from octoprint.filemanager import valid_file_type from octoprint.filemanager.destinations import FileDestinations from octoprint.util import get_exception_string, sanitize_ascii, filter_non_ascii, CountedEvent, RepeatedTimer, \ - to_unicode, bom_aware_open, TypedQueue, TypeAlreadyInQueue, chunks + to_unicode, bom_aware_open, TypedQueue, PrependableQueue, TypeAlreadyInQueue, chunks try: import _winreg @@ -422,7 +422,7 @@ class MachineCom(object): self._checksum_requiring_commands = settings().get(["serial", "checksumRequiringCommands"]) self._clear_to_send = CountedEvent(max=10, name="comm.clear_to_send") - self._send_queue = TypedQueue() + self._send_queue = SendQueue() self._temperature_timer = None self._sd_status_timer = None @@ -2068,6 +2068,7 @@ class MachineCom(object): self._to_logfile_with_terminal("Got a resend request from the printer: requested line = {}, current line = {}".format(lineToResend, self._currentLine)) self._log_resends_rate_count += 1 + self._send_queue.resend_active = True return True finally: if self._supportResendsWithoutOk: @@ -2096,7 +2097,7 @@ class MachineCom(object): cmd = self._lastLines[-self._resendDelta] lineNumber = self._currentLine - self._resendDelta - result = self._enqueue_for_sending(cmd, linenumber=lineNumber) + result = self._enqueue_for_sending(cmd, linenumber=lineNumber, resend=True) self._resendDelta -= 1 if self._resendDelta <= 0: @@ -2104,6 +2105,8 @@ class MachineCom(object): self._lastResendNumber = None self._currentResendCount = 0 + self._send_queue.resend_active = False + return result def _sendCommand(self, cmd, cmd_type=None, on_sent=None): @@ -2166,7 +2169,7 @@ class MachineCom(object): ##~~ send loop handling - def _enqueue_for_sending(self, command, linenumber=None, command_type=None, on_sent=None): + def _enqueue_for_sending(self, command, linenumber=None, command_type=None, on_sent=None, resend=False): """ Enqueues a command and optional linenumber to use for it in the send queue. @@ -2180,7 +2183,11 @@ class MachineCom(object): """ try: - self._send_queue.put((command, linenumber, command_type, on_sent, False), item_type=command_type) + target = "send" + if resend: + target = "resend" + + self._send_queue.put((command, linenumber, command_type, on_sent, False), item_type=command_type, target=target) return True except TypeAlreadyInQueue as e: self._logger.debug("Type already in send queue: " + e.type) @@ -2188,7 +2195,7 @@ class MachineCom(object): def _send_loop(self): """ - The send loop is reponsible of sending commands in ``self._send_queue`` over the line, if it is cleared for + The send loop is responsible of sending commands in ``self._send_queue`` over the line, if it is cleared for sending (through received ``ok`` responses from the printer's firmware. """ @@ -2241,16 +2248,14 @@ class MachineCom(object): if len(results) > 1: with self._sendingLock: - # last command gets on_sent attached - last = results[-1] - self._send_queue.prepend((last[0], None, None, on_sent, True)) - on_sent = None - - # middle gets prepended reversed (so order gets restored) - if len(results) > 2: - to_prepend = reversed(results[1:-1]) - for m in to_prepend: - self._send_queue.prepend((m[0], None, None, None, True)) + # prepend reversed (so order gets restored) + to_prepend = reversed(results[1:-1]) + for m in to_prepend: + try: + self._send_queue.prepend((m[0], None, m[1], on_sent, True), item_type=m[1]) + on_sent = None + except TypeAlreadyInQueue: + pass # we only actually send the first entry here command, _, gcode, subcode = results[0] @@ -2926,6 +2931,88 @@ class StreamingGcodeFileInformation(PrintingGcodeFileInformation): duration=duration) self._logger.info("Finished in {duration:.3f} s. Approx. transfer rate of {rate:.3f} lines/s or {time_per_line:.3f} ms per line".format(**stats)) + +class SendQueue(PrependableQueue): + + def __init__(self, maxsize=0): + PrependableQueue.__init__(self, maxsize=maxsize) + + self._resend_queue = PrependableQueue() + self._send_queue = PrependableQueue() + self._lookup = set() + + self._resend_active = False + + @property + def resend_active(self): + return self._resend_active + + @resend_active.setter + def resend_active(self, resend_active): + with self.mutex: + self._resend_active = resend_active + + def prepend(self, item, item_type=None, target=None, block=True, timeout=None): + PrependableQueue.prepend(self, (item, item_type, target), block=block, timeout=timeout) + + def put(self, item, item_type=None, target=None, block=True, timeout=None): + PrependableQueue.put(self, (item, item_type, target), block=block, timeout=timeout) + + def get(self, block=True, timeout=None): + item, _, _ = PrependableQueue.get(self, block=block, timeout=timeout) + return item + + def _put(self, item): + _, item_type, target = item + if item_type is not None: + if item_type in self._lookup: + raise TypeAlreadyInQueue(item_type, "Type {} is already in queue".format(item_type)) + else: + self._lookup.add(item_type) + + if target == "resend": + self._resend_queue.put(item) + else: + self._send_queue.put(item) + + pass + + def _prepend(self, item): + _, item_type, target = item + if item_type is not None: + if item_type in self._lookup: + raise TypeAlreadyInQueue(item_type, "Type {} is already in queue".format(item_type)) + else: + self._lookup.add(item_type) + + if target == "resend": + self._resend_queue.prepend(item) + else: + self._send_queue.prepend(item) + + def _get(self): + if self.resend_active: + item = self._resend_queue.get(block=False) + else: + try: + item = self._resend_queue.get(block=False) + except queue.Empty: + item = self._send_queue.get(block=False) + + _, item_type, _ = item + if item_type is not None: + if item_type in self._lookup: + self._lookup.remove(item_type) + + return item + + def _qsize(self): + if self.resend_active: + return self._resend_queue.qsize() + else: + return self._resend_queue.qsize() + self._send_queue.qsize() + + def get_new_timeout(type, intervals): now = time.time() return now + intervals.get(type, 0.0)