Make sure resent lines always win before regular ones

Introduced a custom send queue type that actually contains two queues,
one for resends, one for regular lines. A flag indicates whether
lines should be returned from both or only resends. That way we
ensure that as soon as we have an active resend request we ignore
what was already in the queue and only send the lines we need to resend.

Also: PrependQueue => PrependableQueue
This commit is contained in:
Gina Häußge 2017-07-11 14:04:38 +02:00
parent 2e897f19ac
commit 23e58d2fc1
2 changed files with 112 additions and 25 deletions

View file

@ -1046,7 +1046,7 @@ class InvariantContainer(object):
return self._data.__iter__()
class PrependQueue(queue.Queue):
class PrependableQueue(queue.Queue):
def __init__(self, maxsize=0):
queue.Queue.__init__(self, maxsize=maxsize)
@ -1082,21 +1082,21 @@ class PrependQueue(queue.Queue):
self.queue.appendleft(item)
class TypedQueue(PrependQueue):
class TypedQueue(PrependableQueue):
def __init__(self, maxsize=0):
PrependQueue.__init__(self, maxsize=maxsize)
PrependableQueue.__init__(self, maxsize=maxsize)
self._lookup = set()
def put(self, item, item_type=None, *args, **kwargs):
PrependQueue.put(self, (item, item_type), *args, **kwargs)
PrependableQueue.put(self, (item, item_type), *args, **kwargs)
def get(self, *args, **kwargs):
item, _ = PrependQueue.get(self, *args, **kwargs)
item, _ = PrependableQueue.get(self, *args, **kwargs)
return item
def prepend(self, item, item_type=None, *args, **kwargs):
PrependQueue.prepend(self, (item, item_type), *args, **kwargs)
PrependableQueue.prepend(self, (item, item_type), *args, **kwargs)
def _put(self, item):
_, item_type = item
@ -1106,10 +1106,10 @@ class TypedQueue(PrependQueue):
else:
self._lookup.add(item_type)
queue.Queue._put(self, item)
PrependableQueue._put(self, item)
def _get(self):
item = PrependQueue._get(self)
item = PrependableQueue._get(self)
_, item_type = item
if item_type is not None:
@ -1125,7 +1125,7 @@ class TypedQueue(PrependQueue):
else:
self._lookup.add(item_type)
PrependQueue._prepend(self, item)
PrependableQueue._prepend(self, item)
class TypeAlreadyInQueue(Exception):
def __init__(self, t, *args, **kwargs):

View file

@ -32,7 +32,7 @@ from octoprint.events import eventManager, Events
from octoprint.filemanager import valid_file_type
from octoprint.filemanager.destinations import FileDestinations
from octoprint.util import get_exception_string, sanitize_ascii, filter_non_ascii, CountedEvent, RepeatedTimer, \
to_unicode, bom_aware_open, TypedQueue, TypeAlreadyInQueue, chunks
to_unicode, bom_aware_open, TypedQueue, PrependableQueue, TypeAlreadyInQueue, chunks
try:
import _winreg
@ -422,7 +422,7 @@ class MachineCom(object):
self._checksum_requiring_commands = settings().get(["serial", "checksumRequiringCommands"])
self._clear_to_send = CountedEvent(max=10, name="comm.clear_to_send")
self._send_queue = TypedQueue()
self._send_queue = SendQueue()
self._temperature_timer = None
self._sd_status_timer = None
@ -2068,6 +2068,7 @@ class MachineCom(object):
self._to_logfile_with_terminal("Got a resend request from the printer: requested line = {}, current line = {}".format(lineToResend, self._currentLine))
self._log_resends_rate_count += 1
self._send_queue.resend_active = True
return True
finally:
if self._supportResendsWithoutOk:
@ -2096,7 +2097,7 @@ class MachineCom(object):
cmd = self._lastLines[-self._resendDelta]
lineNumber = self._currentLine - self._resendDelta
result = self._enqueue_for_sending(cmd, linenumber=lineNumber)
result = self._enqueue_for_sending(cmd, linenumber=lineNumber, resend=True)
self._resendDelta -= 1
if self._resendDelta <= 0:
@ -2104,6 +2105,8 @@ class MachineCom(object):
self._lastResendNumber = None
self._currentResendCount = 0
self._send_queue.resend_active = False
return result
def _sendCommand(self, cmd, cmd_type=None, on_sent=None):
@ -2166,7 +2169,7 @@ class MachineCom(object):
##~~ send loop handling
def _enqueue_for_sending(self, command, linenumber=None, command_type=None, on_sent=None):
def _enqueue_for_sending(self, command, linenumber=None, command_type=None, on_sent=None, resend=False):
"""
Enqueues a command and optional linenumber to use for it in the send queue.
@ -2180,7 +2183,11 @@ class MachineCom(object):
"""
try:
self._send_queue.put((command, linenumber, command_type, on_sent, False), item_type=command_type)
target = "send"
if resend:
target = "resend"
self._send_queue.put((command, linenumber, command_type, on_sent, False), item_type=command_type, target=target)
return True
except TypeAlreadyInQueue as e:
self._logger.debug("Type already in send queue: " + e.type)
@ -2188,7 +2195,7 @@ class MachineCom(object):
def _send_loop(self):
"""
The send loop is reponsible of sending commands in ``self._send_queue`` over the line, if it is cleared for
The send loop is responsible of sending commands in ``self._send_queue`` over the line, if it is cleared for
sending (through received ``ok`` responses from the printer's firmware.
"""
@ -2241,16 +2248,14 @@ class MachineCom(object):
if len(results) > 1:
with self._sendingLock:
# last command gets on_sent attached
last = results[-1]
self._send_queue.prepend((last[0], None, None, on_sent, True))
on_sent = None
# middle gets prepended reversed (so order gets restored)
if len(results) > 2:
to_prepend = reversed(results[1:-1])
for m in to_prepend:
self._send_queue.prepend((m[0], None, None, None, True))
# prepend reversed (so order gets restored)
to_prepend = reversed(results[1:-1])
for m in to_prepend:
try:
self._send_queue.prepend((m[0], None, m[1], on_sent, True), item_type=m[1])
on_sent = None
except TypeAlreadyInQueue:
pass
# we only actually send the first entry here
command, _, gcode, subcode = results[0]
@ -2926,6 +2931,88 @@ class StreamingGcodeFileInformation(PrintingGcodeFileInformation):
duration=duration)
self._logger.info("Finished in {duration:.3f} s. Approx. transfer rate of {rate:.3f} lines/s or {time_per_line:.3f} ms per line".format(**stats))
class SendQueue(PrependableQueue):
def __init__(self, maxsize=0):
PrependableQueue.__init__(self, maxsize=maxsize)
self._resend_queue = PrependableQueue()
self._send_queue = PrependableQueue()
self._lookup = set()
self._resend_active = False
@property
def resend_active(self):
return self._resend_active
@resend_active.setter
def resend_active(self, resend_active):
with self.mutex:
self._resend_active = resend_active
def prepend(self, item, item_type=None, target=None, block=True, timeout=None):
PrependableQueue.prepend(self, (item, item_type, target), block=block, timeout=timeout)
def put(self, item, item_type=None, target=None, block=True, timeout=None):
PrependableQueue.put(self, (item, item_type, target), block=block, timeout=timeout)
def get(self, block=True, timeout=None):
item, _, _ = PrependableQueue.get(self, block=block, timeout=timeout)
return item
def _put(self, item):
_, item_type, target = item
if item_type is not None:
if item_type in self._lookup:
raise TypeAlreadyInQueue(item_type, "Type {} is already in queue".format(item_type))
else:
self._lookup.add(item_type)
if target == "resend":
self._resend_queue.put(item)
else:
self._send_queue.put(item)
pass
def _prepend(self, item):
_, item_type, target = item
if item_type is not None:
if item_type in self._lookup:
raise TypeAlreadyInQueue(item_type, "Type {} is already in queue".format(item_type))
else:
self._lookup.add(item_type)
if target == "resend":
self._resend_queue.prepend(item)
else:
self._send_queue.prepend(item)
def _get(self):
if self.resend_active:
item = self._resend_queue.get(block=False)
else:
try:
item = self._resend_queue.get(block=False)
except queue.Empty:
item = self._send_queue.get(block=False)
_, item_type, _ = item
if item_type is not None:
if item_type in self._lookup:
self._lookup.remove(item_type)
return item
def _qsize(self):
if self.resend_active:
return self._resend_queue.qsize()
else:
return self._resend_queue.qsize() + self._send_queue.qsize()
def get_new_timeout(type, intervals):
now = time.time()
return now + intervals.get(type, 0.0)