Added test stub for quick performance tests of the comm layer

Running comm.py with parameters port, baudrate, local file, remote file will
upload local file to remote file on the printer's sd using the provided connection
parameters.

Example:

    python -m octoprint.util.comm /dev/ttyUSB0 115200 /path/to/some.gcode test.gco

No plugin support! Only regular serial connection supported.
This commit is contained in:
Gina Häußge 2016-03-11 10:38:52 +01:00
parent 47b7552f91
commit 337329eda0

View file

@ -2170,8 +2170,6 @@ class PrintingGcodeFileInformation(PrintingFileInformation):
self._handle = None
self._first_line = None
self._offsets_callback = offsets_callback
self._current_tool_callback = current_tool_callback
@ -2179,6 +2177,7 @@ class PrintingGcodeFileInformation(PrintingFileInformation):
raise IOError("File %s does not exist" % self._filename)
self._size = os.stat(self._filename).st_size
self._pos = 0
self._read_lines = 0
def seek(self, offset):
if self._handle is None:
@ -2186,12 +2185,14 @@ class PrintingGcodeFileInformation(PrintingFileInformation):
self._handle.seek(offset)
self._pos = self._handle.tell()
self._read_lines = 0
def start(self):
"""
Opens the file for reading and determines the file size.
"""
PrintingFileInformation.start(self)
self._read_lines = 0
self._handle = bom_aware_open(self._filename, encoding="utf-8", errors="replace")
def close(self):
@ -2221,19 +2222,30 @@ class PrintingGcodeFileInformation(PrintingFileInformation):
while processed is None:
if self._handle is None:
# file got closed just now
self._pos = self._size
self._report_stats()
return None
line = to_unicode(self._handle.readline())
if not line:
self.close()
processed = process_gcode_line(line, offsets=offsets, current_tool=current_tool)
self._pos = self._handle.tell()
self._pos = self._handle.tell()
self._read_lines += 1
return processed
except Exception as e:
self.close()
self._logger.exception("Exception while processing line")
raise e
def _report_stats(self):
duration = time.time() - self._start_time
stats = dict(lines=self._read_lines,
rate=float(self._read_lines) / duration,
time_per_line=duration * 1000.0 / float(self._read_lines),
duration=duration)
self._logger.info("Finished in {duration:.3f} s. Approx. transfer rate of {rate:.3f} lines/s or {time_per_line:.3f} ms per line".format(**stats))
class StreamingGcodeFileInformation(PrintingGcodeFileInformation):
def __init__(self, path, localFilename, remoteFilename):
PrintingGcodeFileInformation.__init__(self, path)
@ -2584,3 +2596,92 @@ def gcode_command_for_cmd(cmd):
# this should never happen
return None
# --- Test code for speed testing the comm layer via command line follows
def upload_cli():
"""
Usage: python -m octoprint.util.comm <port> <baudrate> <local path> <remote path>
Uploads <local path> to <remote path> on SD card of printer on port <port>, using baudrate <baudrate>.
"""
import sys
from octoprint.util import Object
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
# fetch port, baudrate, filename and target from commandline
if len(sys.argv) < 5:
print("Usage: comm.py <port> <baudrate> <local path> <target path>")
sys.exit(-1)
port = sys.argv[1]
baudrate = sys.argv[2]
path = sys.argv[3]
target = sys.argv[4]
# init settings & plugin manager
settings(init=True)
octoprint.plugin.plugin_manager(init=True)
# create dummy callback
class MyMachineComCallback(MachineComPrintCallback):
progress_interval = 1
def __init__(self, path, target):
self.finished = threading.Event()
self.finished.clear()
self.comm = None
self.error = False
self.started = False
self._path = path
self._target = target
def on_comm_file_transfer_started(self, filename, filesize):
# transfer started, report
logger.info("Started file transfer of {}, size {}B".format(filename, filesize))
self.started = True
def on_comm_file_transfer_done(self, filename):
# transfer done, report, print stats and finish
logger.info("Finished file transfer of {}".format(filename))
self.finished.set()
def on_comm_state_change(self, state):
if state in (MachineCom.STATE_ERROR, MachineCom.STATE_CLOSED_WITH_ERROR):
# report and exit on errors
logger.error("Error/closed with error, exiting.")
self.error = True
self.finished.set()
elif state in (MachineCom.STATE_OPERATIONAL,) and not self.started:
# start transfer once we are operational
self.comm.startFileTransfer(self._path, os.path.basename(self._path), self._target)
callback = MyMachineComCallback(path, target)
# mock printer profile manager
profile = dict(heatedBed=False,
extruder=dict(count=1))
printer_profile_manager = Object()
printer_profile_manager.get_current_or_default = lambda: profile
# initialize serial
comm = MachineCom(port=port, baudrate=baudrate, callbackObject=callback, printerProfileManager=printer_profile_manager)
callback.comm = comm
# wait for file transfer to finish
callback.finished.wait()
# close connection
comm.close()
logger.info("Done, exiting...")
if __name__ == "__main__":
upload_cli()