From a6c4f8ba3b49f5364c133034c2b666589e65eed7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gina=20H=C3=A4u=C3=9Fge?= Date: Tue, 17 Jan 2017 17:46:20 +0100 Subject: [PATCH 1/3] Fix potential race condition on print start When the sending of the first line of a file to print is still taking place while an "ok" from the firmware comes in, it's possible that two threads will try to access the file handle in parallel. That can lead to trouble within Python's codecs module. Synchronizing all access to the handle should do the trick. --- src/octoprint/util/comm.py | 101 +++++++++++++++++++------------------ 1 file changed, 53 insertions(+), 48 deletions(-) diff --git a/src/octoprint/util/comm.py b/src/octoprint/util/comm.py index dd0ab9c6..de9e1d42 100644 --- a/src/octoprint/util/comm.py +++ b/src/octoprint/util/comm.py @@ -2580,6 +2580,7 @@ class PrintingGcodeFileInformation(PrintingFileInformation): PrintingFileInformation.__init__(self, filename) self._handle = None + self._handle_mutex = threading.RLock() self._offsets_callback = offsets_callback self._current_tool_callback = current_tool_callback @@ -2591,76 +2592,80 @@ class PrintingGcodeFileInformation(PrintingFileInformation): self._read_lines = 0 def seek(self, offset): - if self._handle is None: - return + with self._handle_mutex: + if self._handle is None: + return - self._handle.seek(offset) - self._pos = self._handle.tell() - self._read_lines = 0 + self._handle.seek(offset) + self._pos = self._handle.tell() + self._read_lines = 0 def start(self): """ Opens the file for reading and determines the file size. """ PrintingFileInformation.start(self) - self._handle = bom_aware_open(self._filename, encoding="utf-8", errors="replace") - self._pos = self._handle.tell() - if self._handle.encoding.endswith("-sig"): - # Apparently we found an utf-8 bom in the file. - # We need to add its length to our pos because it will - # be stripped transparently and we'll have no chance - # catching that. - import codecs - self._pos += len(codecs.BOM_UTF8) - self._read_lines = 0 + with self._handle_mutex: + self._handle = bom_aware_open(self._filename, encoding="utf-8", errors="replace") + self._pos = self._handle.tell() + if self._handle.encoding.endswith("-sig"): + # Apparently we found an utf-8 bom in the file. + # We need to add its length to our pos because it will + # be stripped transparently and we'll have no chance + # catching that. + import codecs + self._pos += len(codecs.BOM_UTF8) + self._read_lines = 0 def close(self): """ Closes the file if it's still open. """ PrintingFileInformation.close(self) - if self._handle is not None: - try: - self._handle.close() - except: - pass - self._handle = None + with self._handle_mutex: + if self._handle is not None: + try: + self._handle.close() + except: + pass + self._handle = None def getNext(self): """ Retrieves the next line for printing. """ - if self._handle is None: - raise ValueError("File %s is not open for reading" % self._filename) + with self._handle_mutex: + if self._handle is None: + raise ValueError("File %s is not open for reading" % self._filename) - try: - offsets = self._offsets_callback() if self._offsets_callback is not None else None - current_tool = self._current_tool_callback() if self._current_tool_callback is not None else None + try: + offsets = self._offsets_callback() if self._offsets_callback is not None else None + current_tool = self._current_tool_callback() if self._current_tool_callback is not None else None - processed = None - while processed is None: - if self._handle is None: - # file got closed just now - self._pos = self._size - self._report_stats() - return None + processed = None + while processed is None: + if self._handle is None: + # file got closed just now + self._pos = self._size + self._report_stats() + return None - # we need to manually keep track of our pos here since - # codecs' readline will make our handle's tell not - # return the actual number of bytes read, but also the - # already buffered bytes (for detecting the newlines) - line = self._handle.readline() - self._pos += len(line.encode("utf-8")) + # we need to manually keep track of our pos here since + # codecs' readline will make our handle's tell not + # return the actual number of bytes read, but also the + # already buffered bytes (for detecting the newlines) + line = self._handle.readline() + self._pos += len(line.encode("utf-8")) - if not line: - self.close() - processed = self._process(line, offsets, current_tool) - self._read_lines += 1 - return processed - except Exception as e: - self.close() - self._logger.exception("Exception while processing line") - raise e + if not line: + self.close() + processed = self._process(line, offsets, current_tool) + self._read_lines += 1 + return processed + except Exception as e: + self.close() + self._logger.exception("Exception while processing line") + raise e def _process(self, line, offsets, current_tool): return process_gcode_line(line, offsets=offsets, current_tool=current_tool) From 1e482369b8736e7039212819d390f5d329d9bee0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gina=20H=C3=A4u=C3=9Fge?= Date: Wed, 18 Jan 2017 12:58:38 +0100 Subject: [PATCH 2/3] Further synchronization in comm layer Prevent concurrent sending of commands from the monitor thread for incoming ok's while processing the start, pause, resume or cancel of a print job. --- src/octoprint/util/comm.py | 162 ++++++++++++++++++------------------- 1 file changed, 81 insertions(+), 81 deletions(-) diff --git a/src/octoprint/util/comm.py b/src/octoprint/util/comm.py index de9e1d42..26655417 100644 --- a/src/octoprint/util/comm.py +++ b/src/octoprint/util/comm.py @@ -387,7 +387,7 @@ class MachineCom(object): self._currentFile = None # multithreading locks - self._sendNextLock = threading.Lock() + self._jobLock = threading.RLock() self._sendingLock = threading.RLock() # monitoring thread @@ -743,41 +743,42 @@ class MachineCom(object): self._pauseWaitTimeLost = 0.0 try: - self._currentFile.start() + with self._jobLock: + self._currentFile.start() - self._changeState(self.STATE_PRINTING) + self._changeState(self.STATE_PRINTING) - self.resetLineNumbers() + self.resetLineNumbers() - self._callback.on_comm_print_job_started() + self._callback.on_comm_print_job_started() - if self.isSdFileSelected(): - #self.sendCommand("M26 S0") # setting the sd pos apparently sometimes doesn't work, so we re-select - # the file instead + if self.isSdFileSelected(): + #self.sendCommand("M26 S0") # setting the sd pos apparently sometimes doesn't work, so we re-select + # the file instead - # make sure to ignore the "file selected" later on, otherwise we'll reset our progress data - self._ignore_select = True - self.sendCommand("M23 {filename}".format(filename=self._currentFile.getFilename())) - if pos is not None and isinstance(pos, int) and pos > 0: - self._currentFile.setFilepos(pos) - self.sendCommand("M26 S{}".format(pos)) + # make sure to ignore the "file selected" later on, otherwise we'll reset our progress data + self._ignore_select = True + self.sendCommand("M23 {filename}".format(filename=self._currentFile.getFilename())) + if pos is not None and isinstance(pos, int) and pos > 0: + self._currentFile.setFilepos(pos) + self.sendCommand("M26 S{}".format(pos)) + else: + self._currentFile.setFilepos(0) + + self.sendCommand("M24") + + self._sd_status_timer = RepeatedTimer(self._timeout_intervals.get("sdStatus", 1.0), self._poll_sd_status, run_first=True) + self._sd_status_timer.start() else: - self._currentFile.setFilepos(0) + if pos is not None and isinstance(pos, int) and pos > 0: + self._currentFile.seek(pos) - self.sendCommand("M24") + line = self._getNext() + if line is not None: + self.sendCommand(line) - self._sd_status_timer = RepeatedTimer(self._timeout_intervals.get("sdStatus", 1.0), self._poll_sd_status, run_first=True) - self._sd_status_timer.start() - else: - if pos is not None and isinstance(pos, int) and pos > 0: - self._currentFile.seek(pos) - - line = self._getNext() - if line is not None: - self.sendCommand(line) - - # now make sure we actually do something, up until now we only filled up the queue - self._sendFromQueue() + # now make sure we actually do something, up until now we only filled up the queue + self._sendFromQueue() except: self._logger.exception("Error while trying to start printing") self._errorValue = get_exception_string() @@ -786,17 +787,18 @@ class MachineCom(object): def startFileTransfer(self, filename, localFilename, remoteFilename): if not self.isOperational() or self.isBusy(): - logging.info("Printer is not operation or busy") + logging.info("Printer is not operational or busy") return - self.resetLineNumbers() + with self._jobLock: + self.resetLineNumbers() - self._currentFile = StreamingGcodeFileInformation(filename, localFilename, remoteFilename) - self._currentFile.start() + self._currentFile = StreamingGcodeFileInformation(filename, localFilename, remoteFilename) + self._currentFile.start() - self.sendCommand("M28 %s" % remoteFilename) - eventManager().fire(Events.TRANSFER_STARTED, {"local": localFilename, "remote": remoteFilename}) - self._callback.on_comm_file_transfer_started(remoteFilename, self._currentFile.getFilesize()) + self.sendCommand("M28 %s" % remoteFilename) + eventManager().fire(Events.TRANSFER_STARTED, {"local": localFilename, "remote": remoteFilename}) + self._callback.on_comm_file_transfer_started(remoteFilename, self._currentFile.getFilesize()) def selectFile(self, filename, sd): if self.isBusy(): @@ -835,18 +837,6 @@ class MachineCom(object): # we aren't even printing, nothing to cancel... return - self._changeState(self.STATE_OPERATIONAL) - - if self.isSdFileSelected(): - self.sendCommand("M25") # pause print - self.sendCommand("M27") # get current byte position in file - self.sendCommand("M26 S0") # reset position in file to byte 0 - if self._sd_status_timer is not None: - try: - self._sd_status_timer.cancel() - except: - pass - def _on_M400_sent(): # we don't call on_print_job_cancelled on our callback here # because we do this only after our M114 has been answered @@ -854,7 +844,20 @@ class MachineCom(object): self._record_cancel_position = True self.sendCommand("M114") - self.sendCommand("M400", on_sent=_on_M400_sent) + with self._jobLock: + self._changeState(self.STATE_OPERATIONAL) + + if self.isSdFileSelected(): + self.sendCommand("M25") # pause print + self.sendCommand("M27") # get current byte position in file + self.sendCommand("M26 S0") # reset position in file to byte 0 + if self._sd_status_timer is not None: + try: + self._sd_status_timer.cancel() + except: + pass + + self.sendCommand("M400", on_sent=_on_M400_sent) def _pause_preparation_done(self): self._callback.on_comm_print_job_paused() @@ -866,41 +869,42 @@ class MachineCom(object): if not self._currentFile: return - if not pause and self.isPaused(): - if self._pauseWaitStartTime: - self._pauseWaitTimeLost = self._pauseWaitTimeLost + (time.time() - self._pauseWaitStartTime) - self._pauseWaitStartTime = None + with self._jobLock: + if not pause and self.isPaused(): + if self._pauseWaitStartTime: + self._pauseWaitTimeLost = self._pauseWaitTimeLost + (time.time() - self._pauseWaitStartTime) + self._pauseWaitStartTime = None - self._changeState(self.STATE_PRINTING) - self._callback.on_comm_print_job_resumed() + self._changeState(self.STATE_PRINTING) + self._callback.on_comm_print_job_resumed() - if self.isSdFileSelected(): - self.sendCommand("M24") - self.sendCommand("M27") - else: - line = self._getNext() - if line is not None: - self.sendCommand(line) + if self.isSdFileSelected(): + self.sendCommand("M24") + self.sendCommand("M27") + else: + line = self._getNext() + if line is not None: + self.sendCommand(line) - # now make sure we actually do something, up until now we only filled up the queue - self._sendFromQueue() + # now make sure we actually do something, up until now we only filled up the queue + self._sendFromQueue() - elif pause and self.isPrinting(): - if not self._pauseWaitStartTime: - self._pauseWaitStartTime = time.time() + elif pause and self.isPrinting(): + if not self._pauseWaitStartTime: + self._pauseWaitStartTime = time.time() - self._changeState(self.STATE_PAUSED) - if self.isSdFileSelected(): - self.sendCommand("M25") # pause print + self._changeState(self.STATE_PAUSED) + if self.isSdFileSelected(): + self.sendCommand("M25") # pause print - def _on_M400_sent(): - # we don't call on_print_job_paused on our callback here - # because we do this only after our M114 has been answered - # by the firmware - self._record_pause_position = True - self.sendCommand("M114") + def _on_M400_sent(): + # we don't call on_print_job_paused on our callback here + # because we do this only after our M114 has been answered + # by the firmware + self._record_pause_position = True + self.sendCommand("M114") - self.sendCommand("M400", on_sent=_on_M400_sent) + self.sendCommand("M400", on_sent=_on_M400_sent) def getSdFiles(self): return self._sdFiles @@ -1818,7 +1822,7 @@ class MachineCom(object): return line def _sendNext(self): - with self._sendNextLock: + with self._jobLock: while self._active: # we loop until we've actually enqueued a line for sending if self._state != self.STATE_PRINTING: @@ -2589,7 +2593,6 @@ class PrintingGcodeFileInformation(PrintingFileInformation): raise IOError("File %s does not exist" % self._filename) self._size = os.stat(self._filename).st_size self._pos = 0 - self._read_lines = 0 def seek(self, offset): with self._handle_mutex: @@ -2598,7 +2601,6 @@ class PrintingGcodeFileInformation(PrintingFileInformation): self._handle.seek(offset) self._pos = self._handle.tell() - self._read_lines = 0 def start(self): """ @@ -2615,7 +2617,6 @@ class PrintingGcodeFileInformation(PrintingFileInformation): # catching that. import codecs self._pos += len(codecs.BOM_UTF8) - self._read_lines = 0 def close(self): """ @@ -2660,7 +2661,6 @@ class PrintingGcodeFileInformation(PrintingFileInformation): if not line: self.close() processed = self._process(line, offsets, current_tool) - self._read_lines += 1 return processed except Exception as e: self.close() From 6d1e28ae9ce3536cc33064f5312f684d258030a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gina=20H=C3=A4u=C3=9Fge?= Date: Wed, 18 Jan 2017 13:04:01 +0100 Subject: [PATCH 3/3] We still need _read_lines after all in the streamed file info --- src/octoprint/util/comm.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/octoprint/util/comm.py b/src/octoprint/util/comm.py index 26655417..ae5a99c6 100644 --- a/src/octoprint/util/comm.py +++ b/src/octoprint/util/comm.py @@ -2593,6 +2593,7 @@ class PrintingGcodeFileInformation(PrintingFileInformation): raise IOError("File %s does not exist" % self._filename) self._size = os.stat(self._filename).st_size self._pos = 0 + self._read_lines = 0 def seek(self, offset): with self._handle_mutex: @@ -2601,6 +2602,7 @@ class PrintingGcodeFileInformation(PrintingFileInformation): self._handle.seek(offset) self._pos = self._handle.tell() + self._read_lines = 0 def start(self): """ @@ -2617,6 +2619,7 @@ class PrintingGcodeFileInformation(PrintingFileInformation): # catching that. import codecs self._pos += len(codecs.BOM_UTF8) + self._read_lines = 0 def close(self): """ @@ -2661,6 +2664,7 @@ class PrintingGcodeFileInformation(PrintingFileInformation): if not line: self.close() processed = self._process(line, offsets, current_tool) + self._read_lines += 1 return processed except Exception as e: self.close()