GCODE scripts are now Jinja templates

Refactored some things in octoprint.util.comm and octoprint.settings, added migration function to get users of the devel version up to date with their gcode scripts. Migration function will be removed again one week from now.
This commit is contained in:
Gina Häußge 2015-03-06 01:42:09 +01:00
parent d665cc9538
commit 975cc5ccfc
8 changed files with 584 additions and 193 deletions

View file

@ -299,7 +299,7 @@ class Printer(PrinterInterface, comm.MachineComPrintCallback):
raise ValueError("offsets must be a dict")
validated_keys = filter(lambda x: PrinterInterface.valid_tool_regex.match(x), offsets.keys())
validated_values = filter(lambda x: isinstance(value, (int, long, float)), offsets.values())
validated_values = filter(lambda x: isinstance(x, (int, long, float)), offsets.values())
if len(validated_keys) != len(offsets):
raise ValueError("offsets contains invalid keys: {offsets}".format(offsets=offsets))
@ -309,22 +309,8 @@ class Printer(PrinterInterface, comm.MachineComPrintCallback):
if self._comm is None:
return
tool, bed = self._comm.getOffsets()
validatedOffsets = dict()
for key in offsets:
value = offsets[key]
if key == "bed":
bed = value
validatedOffsets[key] = value
elif key.startswith("tool"):
toolNum = int(key[len("tool"):])
tool[toolNum] = value
validatedOffsets[key] = value
self._comm.setTemperatureOffset(tool, bed)
self._stateMonitor.set_temp_offsets(validatedOffsets)
self._comm.setTemperatureOffset(offsets)
self._stateMonitor.set_temp_offsets(offsets)
def _convert_rate_value(self, factor, min=0, max=200):
if not isinstance(factor, (int, float, long)):

View file

@ -512,7 +512,7 @@ class Server():
debug = self._debug
# first initialize the settings singleton and make sure it uses given configfile and basedir if available
self._initSettings(self._configfile, self._basedir)
settings(init=True, basedir=self._basedir, configfile=self._configfile)
# then initialize logging
self._initLogging(self._debug, self._logConf)
@ -731,9 +731,6 @@ class Server():
if "geteuid" in dir(os) and os.geteuid() == 0:
exit("You should not run OctoPrint as root!")
def _initSettings(self, configfile, basedir):
settings(init=True, basedir=basedir, configfile=configfile)
def _initLogging(self, debug, logConf=None):
defaultConfig = {
"version": 1,
@ -785,7 +782,7 @@ class Server():
defaultConfig["root"]["level"] = "DEBUG"
if logConf is None:
logConf = os.path.join(settings().settings_dir, "logging.yaml")
logConf = os.path.join(settings().getBaseFolder("base"), "logging.yaml")
configFromFile = {}
if os.path.exists(logConf) and os.path.isfile(logConf):

View file

@ -93,9 +93,25 @@ def getSettings():
"events": s.get(["system", "events"])
},
"terminalFilters": s.get(["terminalFilters"]),
"scripts": s.get(["scripts"], merged=True)
"scripts": {
"gcode": {
"afterPrinterConnected": None,
"beforePrintStarted": None,
"afterPrintCancelled": None,
"afterPrintDone": None,
"beforePrintPaused": None,
"afterPrintResumed": None,
"snippets": dict()
}
}
}
gcode_scripts = s.listScripts("gcode")
if gcode_scripts:
data["scripts"] = dict(gcode=dict())
for name in gcode_scripts:
data["scripts"]["gcode"][name] = s.loadScript("gcode", name, source=True)
def process_plugin_result(name, plugin, result):
if result:
if not "plugins" in data:
@ -196,10 +212,12 @@ def setSettings():
if "actions" in data["system"].keys(): s.set(["system", "actions"], data["system"]["actions"])
if "events" in data["system"].keys(): s.set(["system", "events"], data["system"]["events"])
if "scripts" in data:
if "gcode" in data["scripts"]:
gcode_scripts = data["scripts"]["gcode"]
s.set(["scripts", "gcode"], octoprint.util.dict_merge(s.get(["scripts", "gcode"], merged=True), gcode_scripts))
if "scripts" in data:
if "gcode" in data["scripts"] and isinstance(data["scripts"]["gcode"], dict):
for name, script in data["scripts"]["gcode"].items():
if name == "snippets":
continue
s.saveScript("gcode", name, script.replace("\r\n", "\n").replace("\r", "\n"))
if "plugins" in data:
for name, plugin in octoprint.plugin.plugin_manager().get_implementations(octoprint.plugin.SettingsPlugin).items():

View file

@ -1,6 +1,27 @@
# coding=utf-8
"""
This module represents OctoPrint's settings management. Within this module the default settings for the core
application are defined and the instance of the :class:`Settings` is held, which offers getter and setter
methods for the raw configuration values as well as various convenience methods to access the paths to base folders
of various types and the configuration file itself.
.. autodata:: default_settings
:annotation: = dict(...)
.. autodata:: valid_boolean_trues
.. autofunction:: settings
.. autoclass:: Settings
:members:
:undoc-members:
"""
from __future__ import absolute_import
__author__ = "Gina Häußge <osd@foosel.net>"
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
__copyright__ = "Copyright (C) 2014 The OctoPrint Project - Released under terms of the AGPLv3 License"
import sys
import os
@ -9,18 +30,44 @@ import logging
import re
import uuid
APPNAME="OctoPrint"
_APPNAME = "OctoPrint"
instance = None
_instance = None
def settings(init=False, configfile=None, basedir=None):
global instance
if instance is None:
def settings(init=False, basedir=None, configfile=None):
"""
Factory method for initially constructing and consecutively retrieving the :class:`~octoprint.settings.Settings`
singleton.
Arguments:
init (boolean): A flag indicating whether this is the initial call to construct the singleton (True) or not
(False, default). If this is set to True and the plugin manager has already been initialized, a :class:`ValueError`
will be raised. The same will happen if the plugin manager has not yet been initialized and this is set to
False.
basedir (str): Path of the base directoy for all of OctoPrint's settings, log files, uploads etc. If not set
the default will be used: ``~/.octoprint`` on Linux, ``%APPDATA%/OctoPrint`` on Windows and
``~/Library/Application Support/OctoPrint`` on MacOS.
configfile (str): Path of the configuration file (``config.yaml``) to work on. If not set the default will
be used: ``<basedir>/config.yaml`` for ``basedir`` as defined above.
Returns:
Settings: The fully initialized :class:`Settings` instance.
Raises:
ValueError: ``init`` is True but settings are already initialized or vice versa.
"""
global _instance
if _instance is not None:
if init:
instance = Settings(configfile, basedir)
raise ValueError("Settings Manager already initialized")
else:
if init:
_instance = Settings(configfile=configfile, basedir=basedir)
else:
raise ValueError("Settings not initialized yet")
return instance
return _instance
default_settings = {
"serial": {
@ -100,7 +147,8 @@ default_settings = {
"watched": None,
"plugins": None,
"slicingProfiles": None,
"printerProfiles": None
"printerProfiles": None,
"scripts": None
},
"temperature": {
"profiles": [
@ -156,17 +204,9 @@ default_settings = {
"plugins": {},
"scripts": {
"gcode": {
"afterPrinterConnected": None,
"beforePrintStarted": None,
"afterPrintDone": None,
"afterPrintCancelled": "{disable_steppers}\n{disable_hotends}\n{disable_bed}\n{disable_fan}",
"afterPrintPaused": None,
"beforePrintResumed": None,
"templates": {
"disable_steppers": "M84",
"disable_hotends": "M104 T{tool:d} S0",
"disable_bed": "M140 S0",
"disable_fan": "M106 S0"
"afterPrintCancelled": "; disable motors\nM84\n\n;disable all heaters\n{% snippet 'disable_hotends' %}\nM140 S0\n\n;disable fan\nM106 S0",
"snippets": {
"disable_hotends": "{% for tool in range(printer_profile.extruder.count) %}M104 T{{ tool }} S0\n{% endfor %}"
}
}
},
@ -197,69 +237,268 @@ default_settings = {
}
}
}
"""The default settings of the core application."""
valid_boolean_trues = [True, "true", "yes", "y", "1"]
""" Values that are considered to be equivalent to the boolean ``True`` value, used for type conversion in various places."""
class Settings(object):
"""
The :class:`Settings` class allows managing all of OctoPrint's settings. It takes care of initializing the settings
directory, loading the configuration from ``config.yaml``, persisting changes to disk etc and provides access
methods for getting and setting specific values from the overall settings structure via paths.
A general word on the concept of paths, since they play an important role in OctoPrint's settings management. A
path is basically a list or tuple consisting of keys to follow down into the settings (which are basically like
a ``dict``) in order to set or retrieve a specific value (or more than one). For example, for a settings
structure like the following::
serial:
port: "/dev/ttyACM0"
baudrate: 250000
timeouts:
communication: 20.0
temperature: 5.0
sdStatus: 1.0
connection: 10.0
server:
host: "0.0.0.0"
port: 5000
the following paths could be used:
========================================== ============================================================================
Path Value
========================================== ============================================================================
``["serial", "port"]`` ::
"/dev/ttyACM0"
``["serial", "timeouts"]`` ::
communication: 20.0
temperature: 5.0
sdStatus: 1.0
connection: 10.0
``["serial", "timeouts", "temperature"]`` ::
5.0
``["server", "port"]`` ::
5000
========================================== ============================================================================
However, these would be invalid paths: ``["key"]``, ``["serial", "port", "value"]``, ``["server", "host", 3]``.
"""
def __init__(self, configfile=None, basedir=None):
self._logger = logging.getLogger(__name__)
self.settings_dir = None
self._basedir = None
self._config = None
self._dirty = False
self._mtime = None
self._init_settings_dir(basedir)
self._init_basedir(basedir)
if configfile is not None:
self._configfile = configfile
else:
self._configfile = os.path.join(self.settings_dir, "config.yaml")
self._configfile = os.path.join(self._basedir, "config.yaml")
self.load(migrate=True)
if self.get(["api", "key"]) is None:
self.set(["api", "key"], ''.join('%02X' % ord(z) for z in uuid.uuid4().bytes))
self.save(force=True)
def _init_settings_dir(self, basedir):
self._script_env = self._init_script_templating()
def _init_basedir(self, basedir):
if basedir is not None:
self.settings_dir = basedir
self._basedir = basedir
else:
self.settings_dir = _resolveSettingsDir(APPNAME)
self._basedir = _default_basedir(_APPNAME)
if not os.path.isdir(self.settings_dir):
os.makedirs(self.settings_dir)
if not os.path.isdir(self._basedir):
os.makedirs(self._basedir)
def _getDefaultFolder(self, type):
def _get_default_folder(self, type):
folder = default_settings["folder"][type]
if folder is None:
folder = os.path.join(self.settings_dir, type.replace("_", os.path.sep))
folder = os.path.join(self._basedir, type.replace("_", os.path.sep))
return folder
def _init_script_templating(self):
from jinja2 import Environment, BaseLoader, FileSystemLoader, ChoiceLoader, TemplateNotFound
from jinja2.nodes import Include, Const
from jinja2.ext import Extension
class SnippetExtension(Extension):
tags = {"snippet"}
fields = Include.fields
def parse(self, parser):
node = parser.parse_include()
if not node.template.value.startswith("/"):
node.template.value = "snippets/" + node.template.value
return node
class SettingsScriptLoader(BaseLoader):
def __init__(self, s):
self._settings = s
def get_source(self, environment, template):
parts = template.split("/")
if not len(parts):
raise TemplateNotFound(template)
script = self._settings.get(["scripts"], merged=True)
for part in parts:
if isinstance(script, dict) and part in script:
script = script[part]
else:
raise TemplateNotFound(template)
source = script
if source is None:
raise TemplateNotFound(template)
mtime = self._settings._mtime
return source, None, lambda: mtime == self._settings._last_modified
def list_templates(self):
scripts = self._settings.get(["scripts"], merged=True)
return self._get_templates(scripts)
def _get_templates(self, scripts):
templates = []
for key in scripts:
if isinstance(scripts[key], dict):
templates += map(lambda x: key + "/" + x, self._get_templates(scripts[key]))
elif isinstance(scripts[key], basestring):
templates.append(key)
return templates
class SelectLoader(BaseLoader):
def __init__(self, default, mapping, sep=":"):
self._default = default
self._mapping = mapping
self._sep = sep
def get_source(self, environment, template):
if self._sep in template:
prefix, name = template.split(self._sep, 1)
if not prefix in self._mapping:
raise TemplateNotFound(template)
return self._mapping[prefix].get_source(environment, name)
return self._default.get_source(environment, template)
def list_templates(self):
return self._default.list_templates()
class RelEnvironment(Environment):
def __init__(self, prefix_sep=":", *args, **kwargs):
Environment.__init__(self, *args, **kwargs)
self._prefix_sep = prefix_sep
def join_path(self, template, parent):
prefix, name = self._split_prefix(template)
if name.startswith("/"):
return self._join_prefix(prefix, name[1:])
else:
_, parent_name = self._split_prefix(parent)
parent_base = parent_name.split("/")[:-1]
return self._join_prefix(prefix, "/".join(parent_base) + "/" + name)
def _split_prefix(self, template):
if self._prefix_sep in template:
return template.split(self._prefix_sep, 1)
else:
return "", template
def _join_prefix(self, prefix, template):
if len(prefix):
return prefix + self._prefix_sep + template
else:
return template
file_system_loader = FileSystemLoader(self.getBaseFolder("scripts"))
settings_loader = SettingsScriptLoader(self)
choice_loader = ChoiceLoader([file_system_loader, settings_loader])
select_loader = SelectLoader(choice_loader, dict(bundled=settings_loader, file=file_system_loader))
return RelEnvironment(loader=select_loader, extensions=[SnippetExtension])
def _get_script_template(self, script_type, name, source=False):
from jinja2 import TemplateNotFound
template_name = script_type + "/" + name
try:
if source:
template_name, _, _ = self._script_env.loader.get_source(self._script_env, template_name)
return template_name
else:
return self._script_env.get_template(template_name)
except TemplateNotFound:
return None
except:
self._logger.exception("Exception while trying to resolve template {template_name}".format(**locals()))
return None
def _get_scripts(self, script_type):
return self._script_env.list_templates(filter_func=lambda x: x.startswith(script_type+"/"))
#~~ load and save
def load(self, migrate=False):
if os.path.exists(self._configfile) and os.path.isfile(self._configfile):
with open(self._configfile, "r") as f:
self._config = yaml.safe_load(f)
self._mtime = self._last_modified()
self._mtime = self._last_modified
# changed from else to handle cases where the file exists, but is empty / 0 bytes
if not self._config:
self._config = {}
if migrate:
self._migrateConfig()
self._migrate_config()
def _migrateConfig(self):
def _migrate_config(self):
dirty = False
for migrate in (self._migrate_event_config, self._migrate_reverse_proxy_config, self._migrate_printer_parameters):
for migrate in (self._migrate_event_config, self._migrate_reverse_proxy_config, self._migrate_printer_parameters, self._migrate_gcode_scripts):
dirty = migrate() or dirty
if dirty:
self.save(force=True)
def _migrate_gcode_scripts(self):
"""
Migrates an old development version of gcode scripts to the new template based format.
"""
dirty = False
if "scripts" in self._config:
if "gcode" in self._config["scripts"]:
if "templates" in self._config["scripts"]["gcode"]:
del self._config["scripts"]["gcode"]["templates"]
replacements = dict(
disable_steppers="M84",
disable_hotends="{% snippet 'disable_hotends' %}",
disable_bed="M140 S0",
disable_fan="M106 S0"
)
for name, script in self._config["scripts"]["gcode"].items():
self.saveScript("gcode", name, script.format(**replacements))
del self._config["scripts"]
dirty = True
return dirty
def _migrate_printer_parameters(self):
"""
Migrates the old "printer > parameters" data structure to the new printer profile mechanism.
"""
default_profile = self._config["printerProfiles"]["defaultProfile"] if "printerProfiles" in self._config and "defaultProfile" in self._config["printerProfiles"] else dict()
dirty = False
@ -320,6 +559,10 @@ class Settings(object):
return dirty
def _migrate_reverse_proxy_config(self):
"""
Migrates the old "server > baseUrl" and "server > scheme" configuration entries to
"server > reverseProxy > prefixFallback" and "server > reverseProxy > schemeFallback".
"""
if "server" in self._config.keys() and ("baseUrl" in self._config["server"] or "scheme" in self._config["server"]):
prefix = ""
if "baseUrl" in self._config["server"]:
@ -343,6 +586,10 @@ class Settings(object):
return False
def _migrate_event_config(self):
"""
Migrates the old event configuration format of type "events > gcodeCommandTrigger" and
"event > systemCommandTrigger" to the new events format.
"""
if "events" in self._config.keys() and ("gcodeCommandTrigger" in self._config["events"] or "systemCommandTrigger" in self._config["events"]):
self._logger.info("Migrating config (event subscriptions)...")
@ -439,7 +686,12 @@ class Settings(object):
self.load()
return True
@property
def _last_modified(self):
"""
Returns:
int: The last modification time of the configuration file.
"""
stat = os.stat(self._configfile)
return stat.st_mtime
@ -533,19 +785,47 @@ class Settings(object):
return value.lower() in valid_boolean_trues
return value is not None
def getBaseFolder(self, type):
if type not in default_settings["folder"].keys():
def getBaseFolder(self, type, create=True):
if type not in default_settings["folder"].keys() + ["base"]:
return None
if type == "base":
return self._basedir
folder = self.get(["folder", type])
if folder is None:
folder = self._getDefaultFolder(type)
folder = self._get_default_folder(type)
if not os.path.isdir(folder):
os.makedirs(folder)
if create:
os.makedirs(folder)
else:
raise IOError("No such folder: {folder}".format(folder=folder))
return folder
def listScripts(self, script_type):
return map(lambda x: x[len(script_type + "/"):], filter(lambda x: x.startswith(script_type + "/"), self._get_scripts(script_type)))
def loadScript(self, script_type, name, context=None, source=False):
if context is None:
context = dict()
template = self._get_script_template(script_type, name, source=source)
if template is None:
return None
if source:
script = template
else:
try:
script = template.render(**context)
except:
self._logger.exception("Exception while trying to render script {script_type}:{name}".format(**locals()))
return None
return script
def getFeedbackControls(self):
feedbackControls = []
for control in self.get(["controls"]):
@ -600,7 +880,7 @@ class Settings(object):
if len(path) == 0:
return
if self._mtime is not None and self._last_modified() != self._mtime:
if self._mtime is not None and self._last_modified != self._mtime:
self.load()
config = self._config
@ -620,10 +900,10 @@ class Settings(object):
return
key = path.pop(0)
if not force and key in defaults.keys() and key in config.keys() and defaults[key] == value:
if not force and key in defaults and key in config and defaults[key] == value:
del config[key]
self._dirty = True
elif force or (not key in config.keys() and defaults[key] != value) or (key in config.keys() and config[key] != value):
elif force or (not key in config and defaults[key] != value) or (key in config and config[key] != value):
if value is None:
del config[key]
else:
@ -669,7 +949,7 @@ class Settings(object):
return None
currentPath = self.getBaseFolder(type)
defaultPath = self._getDefaultFolder(type)
defaultPath = self._get_default_folder(type)
if (path is None or path == defaultPath) and "folder" in self._config.keys() and type in self._config["folder"].keys():
del self._config["folder"][type]
if not self._config["folder"]:
@ -681,7 +961,20 @@ class Settings(object):
self._config["folder"][type] = path
self._dirty = True
def _resolveSettingsDir(applicationName):
def saveScript(self, script_type, name, script):
script_folder = self.getBaseFolder("scripts")
filename = os.path.realpath(os.path.join(script_folder, script_type, name))
if not filename.startswith(script_folder):
# oops, jail break, that shouldn't happen
raise ValueError("Invalid script path to save to: {filename} (from {script_type}:{name})".format(**locals()))
path, _ = os.path.split(filename)
if not os.path.exists(path):
os.makedirs(path)
with open(filename, "w+") as f:
f.write(script)
def _default_basedir(applicationName):
# taken from http://stackoverflow.com/questions/1084697/how-do-i-store-desktop-application-data-in-a-cross-platform-way-for-python
if sys.platform == "darwin":
from AppKit import NSSearchPathForDirectoriesInDomains

View file

@ -157,7 +157,7 @@ class FilebasedUserManager(UserManager):
userfile = settings().get(["accessControl", "userfile"])
if userfile is None:
userfile = os.path.join(settings().settings_dir, "users.yaml")
userfile = os.path.join(settings().getBaseFolder("base"), "users.yaml")
self._userfile = userfile
self._users = {}
self._dirty = False

View file

@ -141,16 +141,15 @@ class MachineCom(object):
self._baudrateDetectList = baudrateList()
self._baudrateDetectRetry = 0
self._temp = {}
self._tempOffset = {}
self._bedTemp = None
self._bedTempOffset = 0
self._tempOffsets = dict()
self._commandQueue = queue.Queue()
self._currentZ = None
self._heatupWaitStartTime = None
self._heatupWaitTimeLost = 0.0
self._pauseWaitStartTime = None
self._pauseWaitTimeLost = 0.0
self._currentExtruder = 0
self._currentTool = 0
self._blocking_command = False
self._heating = False
@ -356,7 +355,10 @@ class MachineCom(object):
return self._bedTemp
def getOffsets(self):
return self._tempOffset, self._bedTempOffset
return dict(self._tempOffsets)
def getCurrentTool(self):
return self._currentTool
def getConnection(self):
return self._port, self._baudrate
@ -405,38 +407,40 @@ class MachineCom(object):
eventManager().fire(Events.PRINT_FAILED, payload)
eventManager().fire(Events.DISCONNECTED)
def setTemperatureOffset(self, tool=None, bed=None):
if tool is not None:
self._tempOffset = tool
def setTemperatureOffset(self, offsets):
self._tempOffsets.update(offsets)
if bed is not None:
self._bedTempOffset = bed
def sendCommand(self, cmd, cmd_type=None):
def sendCommand(self, cmd, cmd_type=None, processed=False):
cmd = cmd.encode('ascii', 'replace')
if not processed:
cmd = process_gcode_line(cmd)
if not cmd:
return
if self.isPrinting() and not self.isSdFileSelected():
self._commandQueue.put((cmd, cmd_type))
elif self.isOperational():
self._sendCommand((cmd, cmd_type))
def sendGcodeScript(self, scriptName, replacements=None):
gcodeScripts = settings().get(["scripts", "gcode"], merged=True)
if scriptName in gcodeScripts and gcodeScripts[scriptName]:
script = gcodeScripts[scriptName]
else:
return
if replacements is None:
replacements = dict()
replacements.update(dict(
disable_steppers=gcodeScripts["templates"]["disable_steppers"],
disable_hotends="\n".join(map(lambda x: gcodeScripts["templates"]["disable_hotends"].format(tool=x), range(self._printerProfileManager.get_current_or_default()["extruder"]["count"]))),
disable_bed=gcodeScripts["templates"]["disable_bed"],
disable_fan=gcodeScripts["templates"]["disable_fan"]
context = dict()
if replacements is not None and isinstance(replacements, dict):
context.update(replacements)
context.update(dict(
printer_profile=self._printerProfileManager.get_current_or_default()
))
script = script.format(**replacements)
scriptLines = map(str.strip, script.split("\n"))
template = settings().loadScript("gcode", scriptName, context=context)
if template is None:
return None
scriptLines = filter(
lambda x: x is not None and x.strip() != "",
map(
lambda x: process_gcode_line(x, offsets=self._tempOffsets, current_tool=self._currentTool),
template.split("\n")
)
)
for hook in self._gcodescript_hooks:
try:
@ -543,7 +547,7 @@ class MachineCom(object):
self._sdFileToSelect = filename
self.sendCommand("M23 %s" % filename)
else:
self._currentFile = PrintingGcodeFileInformation(filename, self.getOffsets)
self._currentFile = PrintingGcodeFileInformation(filename, offsets_callback=self.getOffsets, current_tool_callback=self.getCurrentTool)
eventManager().fire(Events.FILE_SELECTED, {
"file": self._currentFile.getFilename(),
"origin": self._currentFile.getFileLocation()
@ -1444,7 +1448,7 @@ class MachineCom(object):
def _gcode_T(self, cmd):
toolMatch = self._regex_paramTInt.search(cmd)
if toolMatch:
self._currentExtruder = int(toolMatch.group(1))
self._currentTool = int(toolMatch.group(1))
return cmd
def _gcode_G0(self, cmd):
@ -1467,7 +1471,7 @@ class MachineCom(object):
_gcode_M1 = _gcode_M0
def _gcode_M104(self, cmd):
toolNum = self._currentExtruder
toolNum = self._currentTool
toolMatch = self._regex_paramTInt.search(cmd)
if toolMatch:
toolNum = int(toolMatch.group(1))
@ -1613,22 +1617,23 @@ class PrintingFileInformation(object):
"""
def __init__(self, filename):
self._logger = logging.getLogger(__name__)
self._filename = filename
self._filepos = 0
self._filesize = None
self._startTime = None
self._pos = 0
self._size = None
self._start_time = None
def getStartTime(self):
return self._startTime
return self._start_time
def getFilename(self):
return self._filename
def getFilesize(self):
return self._filesize
return self._size
def getFilepos(self):
return self._filepos
return self._pos
def getFileLocation(self):
return FileDestinations.LOCAL
@ -1638,36 +1643,36 @@ class PrintingFileInformation(object):
The current progress of the file, calculated as relation between file position and absolute size. Returns -1
if file size is None or < 1.
"""
if self._filesize is None or not self._filesize > 0:
if self._size is None or not self._size > 0:
return -1
return float(self._filepos) / float(self._filesize)
return float(self._pos) / float(self._size)
def reset(self):
"""
Resets the current file position to 0.
"""
self._filepos = 0
self._pos = 0
def start(self):
"""
Marks the print job as started and remembers the start time.
"""
self._startTime = time.time()
self._start_time = time.time()
class PrintingSdFileInformation(PrintingFileInformation):
"""
Encapsulates information regarding an ongoing print from SD.
"""
def __init__(self, filename, filesize):
def __init__(self, filename, size):
PrintingFileInformation.__init__(self, filename)
self._filesize = filesize
self._size = size
def setFilepos(self, filepos):
def setFilepos(self, pos):
"""
Sets the current file position.
"""
self._filepos = filepos
self._pos = pos
def getFileLocation(self):
return FileDestinations.SDCARD
@ -1678,118 +1683,68 @@ class PrintingGcodeFileInformation(PrintingFileInformation):
that the file is closed in case of an error.
"""
def __init__(self, filename, offsetCallback):
def __init__(self, filename, offsets_callback=None, current_tool_callback=None):
PrintingFileInformation.__init__(self, filename)
self._filehandle = None
self._handle = None
self._filesetMenuModehandle = None
self._firstLine = None
self._currentTool = 0
self._first_line = None
self._offsetCallback = offsetCallback
self._regex_tempCommand = re.compile("M(104|109|140|190)")
self._regex_tempCommandTemperature = re.compile("S([-+]?\d*\.?\d*)")
self._regex_tempCommandTool = re.compile("T(\d+)")
self._regex_toolCommand = re.compile("^T(\d+)")
self._offsets_callback = offsets_callback
self._current_tool_callback = current_tool_callback
if not os.path.exists(self._filename) or not os.path.isfile(self._filename):
raise IOError("File %s does not exist" % self._filename)
self._filesize = os.stat(self._filename).st_size
self._size = os.stat(self._filename).st_size
self._pos = 0
def start(self):
"""
Opens the file for reading and determines the file size. Start time won't be recorded until 100 lines in
"""
PrintingFileInformation.start(self)
self._filehandle = open(self._filename, "r")
self._handle = open(self._filename, "r")
def getNext(self):
"""
Retrieves the next line for printing.
"""
if self._filehandle is None:
if self._handle is None:
raise ValueError("File %s is not open for reading" % self._filename)
offsets = self._offsets_callback() if self._offsets_callback is not None else None
current_tool = self._current_tool_callback() if self._current_tool_callback is not None else None
try:
processedLine = None
while processedLine is None:
if self._filehandle is None:
processed = None
while processed is None:
if self._handle is None:
# file got closed just now
return None
line = self._filehandle.readline()
line = self._handle.readline()
if not line:
self._filehandle.close()
self._filehandle = None
processedLine = self._processLine(line)
self._filepos = self._filehandle.tell()
self._handle.close()
self._handle = None
processed = process_gcode_line(line, offsets=offsets, current_tool=current_tool)
self._pos = self._handle.tell()
return processedLine
except Exception as (e):
if self._filehandle is not None:
self._filehandle.close()
self._filehandle = None
return processed
except Exception as e:
if self._handle is not None:
self._handle.close()
self._handle = None
self._logger.exception("Exception while processing line")
raise e
def _processLine(self, line):
if ";" in line:
line = line[0:line.find(";")]
line = line.strip()
if len(line) > 0:
toolMatch = self._regex_toolCommand.match(line)
if toolMatch is not None:
# track tool changes
self._currentTool = int(toolMatch.group(1))
else:
## apply offsets
if self._offsetCallback is not None:
tempMatch = self._regex_tempCommand.match(line)
if tempMatch is not None:
# if we have a temperature command, retrieve current offsets
tempOffset, bedTempOffset = self._offsetCallback()
if tempMatch.group(1) == "104" or tempMatch.group(1) == "109":
# extruder temperature, determine which one and retrieve corresponding offset
toolNum = self._currentTool
toolNumMatch = self._regex_tempCommandTool.search(line)
if toolNumMatch is not None:
try:
toolNum = int(toolNumMatch.group(1))
except ValueError:
pass
offset = tempOffset[toolNum] if toolNum in tempOffset.keys() and tempOffset[toolNum] is not None else 0
elif tempMatch.group(1) == "140" or tempMatch.group(1) == "190":
# bed temperature
offset = bedTempOffset
else:
# unknown, should never happen
offset = 0
if not offset == 0:
# if we have an offset != 0, we need to get the temperature to be set and apply the offset to it
tempValueMatch = self._regex_tempCommandTemperature.search(line)
if tempValueMatch is not None:
try:
temp = float(tempValueMatch.group(1))
if temp > 0:
newTemp = temp + offset
line = line.replace("S" + tempValueMatch.group(1), "S%f" % newTemp)
except ValueError:
pass
return line
else:
return None
class StreamingGcodeFileInformation(PrintingGcodeFileInformation):
def __init__(self, path, localFilename, remoteFilename):
PrintingGcodeFileInformation.__init__(self, path, None)
PrintingGcodeFileInformation.__init__(self, path)
self._localFilename = localFilename
self._remoteFilename = remoteFilename
def start(self):
PrintingGcodeFileInformation.start(self)
self._startTime = time.time()
self._start_time = time.time()
def getLocalFilename(self):
return self._localFilename
@ -1841,4 +1796,66 @@ def get_interval(type):
if type not in default_settings["serial"]["timeout"]:
return 0
else:
return settings().getFloat(["serial", "timeout", type])
return settings().getFloat(["serial", "timeout", type])
_temp_command_regex = re.compile("^M(?P<command>104|109|140|190)(\s+T(?P<tool>\d+)|\s+S(?P<temperature>[-+]?\d*\.?\d*))+")
def apply_temperature_offsets(line, offsets, current_tool=None):
if offsets is None:
return line
match = _temp_command_regex.match(line)
if match is None:
return line
groups = match.groupdict()
if not "temperature" in groups or groups["temperature"] is None:
return line
offset = 0
if current_tool is not None and (groups["command"] == "104" or groups["command"] == "109"):
# extruder temperature, determine which one and retrieve corresponding offset
tool_num = current_tool
if "tool" in groups and groups["tool"] is not None:
tool_num = int(groups["tool"])
tool_key = "tool%d" % tool_num
offset = offsets[tool_key] if tool_key in offsets and offsets[tool_key] else 0
elif groups["command"] == "140" or groups["command"] == "190":
# bed temperature
offset = offsets["bed"] if "bed" in offsets else 0
if offset == 0:
return line
temperature = float(groups["temperature"])
if temperature == 0:
return line
return line[:match.start("temperature")] + "%f" % (temperature + offset) + line[match.end("temperature"):]
def strip_comment(line):
if not ";" in line:
# shortcut
return line
escaped = False
result = []
for c in line:
if c == ";" and not escaped:
break
result += c if c != "\\" or escaped else ""
escaped = (c == "\\") and not escaped
return "".join(result)
def process_gcode_line(line, offsets=None, current_tool=None):
line = strip_comment(line).strip()
if not len(line):
return None
if offsets is not None:
line = apply_temperature_offsets(line, offsets, current_tool=current_tool)
return line

10
tests/util/__init__.py Normal file
View file

@ -0,0 +1,10 @@
# coding=utf-8
"""
Unit tests for ``octoprint.util``.
"""
from __future__ import absolute_import
__author__ = "Gina Häußge <osd@foosel.net>"
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
__copyright__ = "Copyright (C) 2014 The OctoPrint Project - Released under terms of the AGPLv3 License"

View file

@ -0,0 +1,70 @@
# coding=utf-8
from __future__ import absolute_import
__author__ = "Gina Häußge <osd@foosel.net>"
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
__copyright__ = "Copyright (C) 2014 The OctoPrint Project - Released under terms of the AGPLv3 License"
import unittest
from ddt import ddt, data, unpack
@ddt
class TestCommHelpers(unittest.TestCase):
@data(
("M117 Test", "M117 Test"),
("M117 Test ; foo", "M117 Test "),
("M117 Test \\; foo", "M117 Test ; foo"),
("M117 Test \\\\; foo", "M117 Test \\"),
("M117 Test \\\\\\; foo", "M117 Test \\; foo"),
("; foo", "")
)
@unpack
def test_strip_comment(self, input, expected):
from octoprint.util import comm
self.assertEquals(expected, comm.strip_comment(input))
@data(
("M117 Test", None, None, "M117 Test"),
("", None, None, None),
(" \t \r \n", None, None, None),
("M117 Test", dict(), 0, "M117 Test")
)
@unpack
def test_process_gcode_line(self, input, offsets, current_tool, expected):
from octoprint.util import comm
self.assertEquals(expected, comm.process_gcode_line(input, offsets=offsets, current_tool=current_tool))
@data(
("M104 S200", None, None, None),
("M117 Test", dict(), None, None),
("M104 T0", dict(), None, None),
("M104 S220", dict(tool0=10, tool1=20, bed=30), 0, 230.0),
("M104 T1 S220", dict(tool0=10, tool1=20, bed=30), 0, 240.0),
("M104 S220", dict(tool0=10, tool1=20, bed=30), 1, 240.0),
("M140 S100", dict(tool0=10, tool1=20, bed=30), 1, 130.0),
("M190 S100", dict(tool0=10, tool1=20, bed=30), 1, 130.0),
("M109 S220", dict(tool0=10, tool1=20, bed=30), 0, 230.0),
("M109 S220", dict(), 0, None),
("M140 S100", dict(), 0, None),
("M104 S220", dict(tool0=0), 0, None),
("M104 S220", dict(tool0=20), None, None),
("M104 S0", dict(tool0=20), 0, None)
)
@unpack
def test_apply_temperature_offsets(self, input, offsets, current_tool, expected):
from octoprint.util import comm
actual = comm.apply_temperature_offsets(input, offsets, current_tool=current_tool)
if expected is None:
self.assertEquals(input, actual)
else:
import re
match = re.search("S(\d+(\.\d+)?)", actual)
if not match:
self.fail("No temperature found")
temperature = float(match.group(1))
self.assertEquals(expected, temperature)
self.assertEquals(input[:match.start(1)], actual[:match.start(1)])
self.assertEquals(input[match.end(1):], actual[match.end(1):])