Merge branch 'fix/serialRxBufferHandling' into maintenance

This commit is contained in:
Gina Häußge 2016-07-13 17:24:13 +02:00
commit 1e7ea28195
2 changed files with 55 additions and 23 deletions

View file

@ -30,7 +30,9 @@ class VirtualPrinter(object):
self._read_timeout = read_timeout
self._write_timeout = write_timeout
self.incoming = CharCountingQueue(settings().getInt(["devel", "virtualPrinter", "rxBuffer"]), name="RxBuffer")
self._rx_buffer_size = settings().getInt(["devel", "virtualPrinter", "rxBuffer"])
self.incoming = CharCountingQueue(self._rx_buffer_size, name="RxBuffer")
self.outgoing = Queue.Queue()
self.buffered = Queue.Queue(maxsize=settings().getInt(["devel", "virtualPrinter", "commandBuffer"]))
@ -95,13 +97,13 @@ class VirtualPrinter(object):
self._triggerResendWithTimeoutAt105 = True
self._triggeredResendWithTimeoutAt105 = False
waitThread = threading.Thread(target=self._sendWaitAfterTimeout)
waitThread = threading.Thread(target=self._sendWaitAfterTimeout, name="octoprint.plugins.virtual_printer.wait_thread")
waitThread.start()
readThread = threading.Thread(target=self._processIncoming)
readThread = threading.Thread(target=self._processIncoming, name="octoprint.plugins.virtual_printer.read_thread")
readThread.start()
bufferThread = threading.Thread(target=self._processBuffer)
bufferThread = threading.Thread(target=self._processBuffer, name="octoprint.plugins.virtual_printer.buffer_thread")
bufferThread.start()
def __str__(self):
@ -135,17 +137,26 @@ class VirtualPrinter(object):
def _processIncoming(self):
next_wait_timeout = time.time() + self._waitInterval
buf = ""
while self.incoming is not None and not self._killed:
self._simulateTemps()
try:
data = self.incoming.get(timeout=0.01)
self.incoming.task_done()
except Queue.Empty:
if self._sendWait and time.time() > next_wait_timeout:
self.outgoing.put("wait")
next_wait_timeout = time.time() + self._waitInterval
continue
buf += data
if "\n" in buf:
data = buf[:buf.find("\n") + 1]
buf = buf[buf.find("\n") + 1:]
else:
continue
next_wait_timeout = time.time() + self._waitInterval
if data is None:
@ -340,6 +351,8 @@ class VirtualPrinter(object):
if len(data.strip()) > 0 and not self._okBeforeCommandOutput:
self._sendOk()
self._logger.info("Closing down read loop")
def _calculate_checksum(self, line):
checksum = 0
for c in line:
@ -756,6 +769,9 @@ class VirtualPrinter(object):
continue
self._performMove(line)
self.buffered.task_done()
self._logger.info("Closing down buffer loop")
def write(self, data):
if self._debug_drop_connection:
@ -764,14 +780,14 @@ class VirtualPrinter(object):
with self._incoming_lock:
if self.incoming is None or self.outgoing is None:
return
return 0
if "M112" in data and self._supportM112:
self._kill()
return
return len(data)
try:
self.incoming.put(data, timeout=self._write_timeout)
return self.incoming.put(data, timeout=self._write_timeout, partial=True)
except Queue.Full:
self._logger.info("Incoming queue is full, raising SerialTimeoutException")
raise SerialTimeoutException()
@ -782,6 +798,7 @@ class VirtualPrinter(object):
try:
line = self.outgoing.get(timeout=self._read_timeout)
self.outgoing.task_done()
return line
except Queue.Empty:
return ""
@ -813,22 +830,25 @@ class CharCountingQueue(Queue.Queue):
self._size = 0
self._name = name
def put(self, item, block=True, timeout=None):
def put(self, item, block=True, timeout=None, partial=False):
self.not_full.acquire()
try:
item_size = self._len(item)
if not self._will_it_fit(item) and partial:
space_left = self.maxsize - self._qsize()
item = item[:space_left]
if not block:
if self._qsize() + item_size >= self.maxsize:
if not self._will_it_fit(item):
raise Queue.Full
elif timeout is None:
while self._qsize() + item_size >= self.maxsize:
while not self._will_it_fit(item):
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = time.time() + timeout
while self._qsize() + item_size >= self.maxsize:
while not self._will_it_fit(item):
remaining = endtime - time.time()
if remaining <= 0.0:
raise Queue.Full
@ -837,6 +857,8 @@ class CharCountingQueue(Queue.Queue):
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
return self._len(item)
finally:
self.not_full.release()
@ -856,3 +878,6 @@ class CharCountingQueue(Queue.Queue):
item = self.queue.popleft()
self._size -= self._len(item)
return item
def _will_it_fit(self, item):
return self.maxsize - self._qsize() >= self._len(item)

View file

@ -1948,24 +1948,31 @@ class MachineCom(object):
return
self._log("Send: " + str(cmd))
try:
self._serial.write(cmd + '\n')
except serial.SerialTimeoutException:
self._log("Serial timeout while writing to serial port, trying again.")
cmd += "\n"
written = 0
while written < len(cmd):
to_send = cmd[written:]
try:
self._serial.write(cmd + '\n')
written += self._serial.write(to_send)
except serial.SerialTimeoutException:
self._log("Serial timeout while writing to serial port, trying again.")
try:
written += self._serial.write(to_send)
except:
if not self._connection_closing:
self._logger.exception("Unexpected error while writing to serial port")
self._log("Unexpected error while writing to serial port: %s" % (get_exception_string()))
self._errorValue = get_exception_string()
self.close(is_error=True)
break
except:
if not self._connection_closing:
self._logger.exception("Unexpected error while writing to serial port")
self._log("Unexpected error while writing to serial port: %s" % (get_exception_string()))
self._errorValue = get_exception_string()
self.close(is_error=True)
except:
if not self._connection_closing:
self._logger.exception("Unexpected error while writing to serial port")
self._log("Unexpected error while writing to serial port: %s" % (get_exception_string()))
self._errorValue = get_exception_string()
self.close(is_error=True)
break
##~~ command handlers