Merge branch 'devel' into dev/folderSupport

This commit is contained in:
Gina Häußge 2015-11-18 17:16:12 +01:00
commit 8b4aecde35
40 changed files with 3368 additions and 530 deletions

View file

@ -55,6 +55,7 @@ date of first contribution):
* [Thomas Hou](https://github.com/masterhou)
* [Mark Bastiaans](https://github.com/markbastiaans)
* [Marcel Hellwig](https://github.com/punkkeks)
* [Kevin Murphy](https://github.com/kevingelion)
OctoPrint started off as a fork of [Cura](https://github.com/daid/Cura) by
[Daid Braam](https://github.com/daid). Parts of its communication layer and

31
docs/modules/cli.rst Normal file
View file

@ -0,0 +1,31 @@
.. _sec-modules-cli:
octoprint.cli
-------------
.. automodule:: octoprint.cli
:members:
.. _sec-modules-cli-dev:
octoprint.cli.dev
-----------------
.. automodule:: octoprint.cli.dev
:members:
.. _sec-modules-cli-plugins:
octoprint.cli.plugins
---------------------
.. automodule:: octoprint.cli.plugins
:members:
.. _sec-modules-cli-server:
octoprint.cli.server
--------------------
.. automodule:: octoprint.cli.server
:members:

View file

@ -7,6 +7,7 @@ Internal Modules
.. toctree::
:maxdepth: 3
cli.rst
filemanager.rst
plugin.rst
printer.rst

View file

@ -10,6 +10,37 @@ Over the course of this little tutorial we'll build a full fledged, installable
at some locations throughout OctoPrint and also offers some other basic functionality to give you an idea of what
you can achieve with OctoPrint's plugin system.
First of all let use make sure that you have OctoPrint checked out and set up for development on your local
development environment::
$ cd ~/devel
$ git clone https://github.com/foosel/OctoPrint
[...]
$ cd OctoPrint
$ virtualenv venv
[...]
$ source venv/bin/activate
(venv) $ pip install -e[develop]
[...]
(venv) $ octoprint --help
Usage: octoprint [OPTIONS] COMMAND [ARGS]...
[...]
.. note::
You can also develop your plugin directly on your Raspberry Pi running OctoPi of course. In that
case please ignore the above instructions, you'll only need to activate the ``oprint``
virtual environment:
.. code-block:: none
$ source ~/oprint/bin/activate
(oprint) $ octoprint --help
Usage: octoprint [OPTIONS] COMMAND [ARGS]...
[...]
We'll start at the most basic form a plugin can take - just a couple of simple lines of Python code:
.. code-block:: python
@ -24,14 +55,15 @@ We'll start at the most basic form a plugin can take - just a couple of simple l
Saving this as ``helloworld.py`` in ``~/.octoprint/plugins`` yields you something resembling these log entries upon server startup::
(venv) $ octoprint serve
2015-01-27 11:14:35,124 - octoprint.server - INFO - Starting OctoPrint 1.2.0-dev-448-gd96e56e (devel branch)
[...]
2015-01-27 11:14:35,124 - octoprint.plugin.core - INFO - Loading plugins from /home/pi/.octoprint/plugins, /home/pi/OctoPrint/src/octoprint/plugins and installed plugin packages...
[...]
2015-01-27 11:14:36,135 - octoprint.plugin.core - INFO - 3 plugin(s) registered with the system:
| CuraEngine (bundled) = /home/pi/OctoPrint/src/octoprint/plugins/cura
| Discovery (bundled) = /home/pi/OctoPrint/src/octoprint/plugins/discovery
[...]
| Hello World (1.0.0) = /home/pi/.octoprint/plugins/helloworld.py
[...]
OctoPrint found that plugin in the folder and took a look into it. The name and the version it displays in that log
entry it got from the ``__plugin_name__`` and ``__plugin_version__`` lines. It also read the description from
@ -101,31 +133,35 @@ or alternatively manually utilizing Python's standard package manager ``pip`` di
So let's begin. We'll use the `cookiecutter <https://github.com/audreyr/cookiecutter>`_ template for OctoPrint plugins here,
so we'll first need to install that::
$ pip install cookiecutter
(venv) $ pip install cookiecutter
Then we can use the ``cookiecutter`` command to generate a new OctoPrint plugin skeleton for us::
Then we can use the ``octoprint dev plugin:new`` command [#f1]_ to generate a new OctoPrint plugin skeleton for us::
$ cookiecutter gh:OctoPrint/cookiecutter-octoprint-plugin
(venv) $ cd ~/devel
(venv) $ octoprint dev plugin:new helloworld
Cloning into 'cookiecutter-octoprint-plugin'...
remote: Counting objects: 70, done.
remote: Compressing objects: 100% (17/17), done.
emote: Total 70 (delta 0), reused 0 (delta 0), pack-reused 51
Unpacking objects: 100% (70/70), done.
Checking connectivity... done.
plugin_identifier (default is "skeleton")? helloworld
plugin_package (default is "octoprint_helloworld")?
plugin_name (default is "OctoPrint-Helloworld")?
repo_name (default is "OctoPrint-Helloworld")?
full_name (default is "You")? Your Name
email (default is "you@example.com")? you@somewhere.net
github_username (default is "you")? yourGithubName
plugin_version (default is "0.1.0")? 1.0.0
plugin_description (default is "TODO")? A quick "Hello World" example plugin for OctoPrint
plugin_license (default is "AGPLv3")?
plugin_homepage (default is "https://github.com/yourGithubName/OctoPrint-Helloworld")?
plugin_source (default is "https://github.com/yourGithubName/OctoPrint-Helloworld")?
plugin_installurl (default is "https://github.com/yourGithubName/OctoPrint-Helloworld/archive/master.zip")?
$ cd OctoPrint-HelloWorld
remote: Counting objects: 101, done.
remote: Total 101 (delta 0), reused 0 (delta 0), pack-reused 101
Receiving objects: 100% (101/101), 53.69 KiB, done.
Resolving deltas: 100% (35/35), done.
plugin_package [octoprint_helloworld]:
plugin_name [OctoPrint-Helloworld]:
repo_name [OctoPrint-Helloworld]:
full_name [You]: Your Name
email [you@example.com]: you@somewhere.net
github_username [you]: yourGithubName
plugin_version [0.1.0]: 1.0.0
plugin_description [TODO]: A quick "Hello World" example plugin for OCtoPrint
plugin_license [AGPLv3]:
plugin_homepage [https://github.com/yourGithubName/OctoPrint-Helloworld]:
plugin_source [https://github.com/yourGithubName/OctoPrint-Helloworld]:
plugin_installurl [https://github.com/yourGithubName/OctoPrint-Helloworld/archive/master.zip]:
(venv) $ cd OctoPrint-HelloWorld
.. note::
If ``octoprint dev plugin:new`` isn't recognized as a command (and also doesn't show up in the output of
``octoprint --help``, make sure you installed cookiecutter into the same python environment as OctoPrint.
This will create a project structure in the ``OctoPrint-HelloWorld`` folder we just changed to that looks like this::
@ -193,9 +229,11 @@ Now all that's left to do is to move our ``helloworld.py`` into the ``octoprint_
The plugin is now ready to be installed via ``python setup.py install``. However, since we are still
working on our plugin, it makes more sense to use ``python setup.py develop`` for now -- this way the plugin becomes
discoverable by OctoPrint, however we don't have to reinstall it after any changes we will still do::
discoverable by OctoPrint, however we don't have to reinstall it after any changes we will still do. We can have the
``octoprint dev plugin:install`` command do everything for us here, it will ensure to use the python binary belonging
to your OctoPrint installation::
$ python setup.py develop
(venv) $ octoprint dev plugin:install
running develop
running egg_info
creating OctoPrint_HelloWorld.egg-info
@ -209,9 +247,8 @@ Restart OctoPrint. Your plugin should still be properly discovered and the log l
2015-01-27 13:43:34,134 - octoprint.plugin.core - INFO - Loading plugins from /home/pi/.octoprint/plugins, /home/pi/OctoPrint/src/octoprint/plugins and installed plugin packages...
[...]
2015-01-27 13:43:34,818 - octoprint.plugin.core - INFO - 3 plugin(s) registered with the system:
| CuraEngine (bundled) = /home/pi/OctoPrint/src/octoprint/plugins/cura
| Discovery (bundled) = /home/pi/OctoPrint/src/octoprint/plugins/discovery
| Hello World (1.0.0) = /home/pi/OctoPrint-HelloWorld/octoprint_helloworld
[...]
| Hello World (1.0.0) = /home/pi/devel/OctoPrint-HelloWorld/octoprint_helloworld
[...]
2015-01-27 13:43:38,997 - octoprint.plugins.helloworld - INFO - Hello World!
@ -257,9 +294,9 @@ and ``__plugin_description__`` from ``__init__.py``:
and restart OctoPrint::
2015-01-27 13:46:33,786 - octoprint.plugin.core - INFO - 3 plugin(s) registered with the system:
| CuraEngine (bundled) = /home/pi/OctoPrint/src/octoprint/plugins/cura
| Discovery (bundled) = /home/pi/OctoPrint/src/octoprint/plugins/discovery
| OctoPrint-HelloWorld (1.0.0) = /home/pi/OctoPrint-HelloWorld/octoprint_helloworld
[...]
| OctoPrint-HelloWorld (1.0.0) = /home/pi/devel/OctoPrint-HelloWorld/octoprint_helloworld
[...]
Our "Hello World" Plugin still gets detected fine, but it's now listed under the same name it's installed under,
"OctoPrint-HelloWorld". That's a bit redundant and squashed, so we'll override that bit via ``__plugin_name__`` again:
@ -284,9 +321,9 @@ Our "Hello World" Plugin still gets detected fine, but it's now listed under the
Restart OctoPrint again::
2015-01-27 13:48:54,122 - octoprint.plugin.core - INFO - 3 plugin(s) registered with the system:
| CuraEngine (bundled) = /home/pi/OctoPrint/src/octoprint/plugins/cura
| Discovery (bundled) = /home/pi/OctoPrint/src/octoprint/plugins/discovery
[...]
| Hello World (1.0.0) = /home/pi/OctoPrint-HelloWorld/octoprint_helloworld
[...]
Much better! You can override pretty much all of the metadata defined within ``setup.py`` from within your Plugin itself --
take a look at :ref:`the available control properties <sec-plugin-concepts-controlproperties>` for all available
@ -295,7 +332,7 @@ overrides.
Following the README of the `Plugin Skeleton <https://github.com/OctoPrint/OctoPrint-PluginSkeleton>`_ you could now
already publish your plugin on Github and it would be directly installable by others using pip::
pip install https://github.com/yourGithubName/OctoPrint-HelloWorld/archive/master.zip
(venv) $ pip install https://github.com/yourGithubName/OctoPrint-HelloWorld/archive/master.zip
But let's add some more features instead.
@ -857,7 +894,7 @@ generated CSS files (and compiles them on the fly in your browser using `lessjs
which makes development so much easier. Let's try that, so you know how it works for future bigger projects.
Add another folder to our ``static`` folder called ``less`` and within that create a file ``helloworld.less``. Put
into that the same content as into our CSS file. Compile that LESS file to CSS [#f1]_, overwriting our old ``helloworld.css``
into that the same content as into our CSS file. Compile that LESS file to CSS [#f2]_, overwriting our old ``helloworld.css``
in the process. The folder structure of our plugin should now look like this::
octoprint_helloworld/
@ -1038,6 +1075,12 @@ looking for examples.
.. rubric:: Footnotes
.. [#f1] Refer to the `LESS documentation <http://lesscss.org/#using-less>`_ on how to do that. If you are developing
.. [#f1] Instead of the ``octoprint dev plugin:new`` you could also have manually called cookiecutter with the
template's repository URL shortcut: ``cookiecutter gh:OctoPrint/cookiecutter-octoprint-plugin``. The
``devel:newplugin`` command already does this for you, makes sure cookiecutter always uses a fresh
checkout without prompting you for it and also allows to pre-specify a bunch of settings (like the
plugin's identifier) directly from the command line. Take a look at ``octoprint dev plugin:new --help``
for the usage details.
.. [#f2] Refer to the `LESS documentation <http://lesscss.org/#using-less>`_ on how to do that. If you are developing
your plugin under Windows you might also want to give `WinLESS <http://winless.org/>`_ a look which will run
in the background and keep your CSS files up to date with your various project's LESS files automatically.

View file

@ -37,6 +37,143 @@ octoprint.accesscontrol.appkey
:return: A list of 3-tuples as described above
:rtype: list
.. _sec-plugins-hook-cli-commands:
octoprint.cli.commands
----------------------
.. py:function:: hook(cli_group, pass_octoprint_ctx, *args, **kwargs)
By providing a handler for this hook plugins may register commands on OctoPrint's command line interface (CLI).
Handlers are expected to return a list of callables annotated as `Click commands <http://click.pocoo.org/5/>`_ to register with the
CLI.
The custom ``MultiCommand`` instance :class:`~octoprint.cli.plugins.OctoPrintPluginCommands` is provided
as parameter. Via that object handlers may access the *global* :class:`~octoprint.settings.Settings`
and the :class:`~octoprint.plugin.core.PluginManager` instance as ``cli_group.settings`` and ``cli_group.plugin_manager``.
**Example:**
Registers two new commands, ``custom_cli_command:greet`` and ``custom_cli_command:random`` with
OctoPrint:
.. onlineinclude:: https://raw.githubusercontent.com/OctoPrint/Plugin-Examples/master/custom_cli_command.py
:linenos:
:tab-width: 4
:caption: `custom_cli_command.py <https://github.com/OctoPrint/Plugin-Examples/blob/master/custom_cli_command.py>`_
Calling ``octoprint --help`` shows the two new commands:
.. code-block:: none
$ octoprint --help
Usage: octoprint [OPTIONS] COMMAND [ARGS]...
Options:
-b, --basedir PATH Specify the basedir to use for uploads, timelapses etc.
-c, --config PATH Specify the config file to use.
-v, --verbose Increase logging verbosity
--version Show the version and exit.
--help Show this message and exit.
Commands:
daemon Starts, stops or restarts in daemon mode.
dev:plugin Helpers for plugin developers
plugin:custom_cli_command custom_cli_command commands
serve Starts the OctoPrint server.
$ octoprint plugin:custom_cli_command --help
Usage: octoprint plugin:custom_cli_command [OPTIONS] COMMAND [ARGS]...
custom_cli_command commands
Options:
--help Show this message and exit.
Commands:
greet Greet someone by name, the greeting can be...
random Greet someone by name with a random greeting.
Each also has an individual help output:
.. code-block:: none
$ octoprint plugin:custom_cli_command greet --help
Usage: octoprint plugin:custom_cli_command greet [OPTIONS] [NAME]
Greet someone by name, the greeting can be customized.
Options:
-g, --greeting TEXT The greeting to use
--help Show this message and exit.
$ octoprint plugin:custom_cli_command random --help
Usage: octoprint plugin:custom_cli_command random [OPTIONS] [NAME]
Greet someone by name with a random greeting.
Options:
--help Show this message and exit.
And of course they work too:
.. code-block:: none
$ octoprint plugin:custom_cli_command greet
Hello World!
$ octoprint plugin:custom_cli_command greet --greeting "Good morning"
Good morning World!
$ octoprint plugin:custom_cli_command random stranger
Hola stranger!
.. note::
If your hook handler is an instance method of a plugin mixin implementation, be aware that the hook will be
called without OctoPrint initializing your implementation instance. That means that **none** of the
:ref:`injected properties <sec-plugins-concepts-injectedproperties>` will be available and also the
:method:`~octoprint.plugin.Plugin.initialize` method will not be called.
Your hook handler will have access to the plugin manager as ``cli_group.plugin_manager`` and to the
*global* settings as ``cli_group.settings``. You can have your handler turn the latter into a
:class:`~octoprint.plugin.PluginSettings` instance by using :func:`octoprint.plugin.plugin_settings_from_settings_plugin`
if your plugin's implementation implements the :class:`~octoprint.plugin.SettingsPlugin` mixin and inject
that and the plugin manager instance yourself:
.. code-block:: python
import octoprint.plugin
class MyPlugin(octoprint.plugin.SettingsPlugin):
def get_cli_commands(self, cli_group, pass_octoprint_ctx, *args, **kwargs):
import logging
settings = cli_group._settings
plugin_settings = octoprint.plugin.plugin_settings_for_settings_plugin("myplugin", self)
if plugin_settings is None:
# this can happen if anything goes wrong with preparing the PluginSettings instance
return dict()
self._settings = plugin_settings
self._plugin_manager = cli_group._plugin_manager
self._logger = logging.getLogger(__name__)
### command definition starts here
# ...
No other platform components will be available - the CLI runs outside of a running, fully initialized
OctoPrint server context, so there is absolutely no way to access a printer connection, the event bus or
anything else like that. The only things available are the settings and the plugin manager.
:return: A list of `Click commands or groups <http://click.pocoo.org/5/commands/>`_ to provide on
OctoPrint's CLI.
:rtype: list
.. _sec-plugins-hook-comm-protocol-action:
octoprint.comm.protocol.action

View file

@ -3,11 +3,17 @@
# The init.d script will only run if this variable non-empty.
OCTOPRINT_USER=pi
# base directory to use
#BASEDIR=/home/pi/.octoprint
# configuration file to use
#CONFIGFILE=/home/pi/.octoprint/config.yaml
# On what port to run daemon, default is 5000
PORT=5000
# Path to the OctoPrint executable, use this to override the default setting "/usr/bin/octoprint"
#DAEMON=/path/to/octoprint/executable
# Path to the OctoPrint executable, you need to set this to match your installation!
#DAEMON=/home/pi/OctoPrint/venv/bin/octoprint
# What arguments to pass to octoprint, usually no need to touch this
DAEMON_ARGS="--port=$PORT"

View file

@ -16,17 +16,23 @@
# Author: Sami Olmari
PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin
DESC="Octoprint Daemon"
NAME="Octoprint"
DAEMON=/usr/local/bin/octoprint
PIDFILE=/var/run/$NAME.pid
DESC="OctoPrint Daemon"
NAME="OctoPrint"
PKGNAME=octoprint
PIDFILE=/var/run/$PKGNAME.pid
SCRIPTNAME=/etc/init.d/$PKGNAME
# Read configuration variable file if it is present
[ -r /etc/default/$PKGNAME ] && . /etc/default/$PKGNAME
# Exit if the octoprint is not installed
# Exit if the DAEMON is not set
if [ -z "$DAEMON" ]
then
log_warning_msg "Not starting $PKGNAME, DAEMON not set in /etc/default/$PKGNAME."
exit 0
fi
# Exit if the DAEMON is not installed
[ -x "$DAEMON" ] || exit 0
# Load the VERBOSE setting and other rcS variables
@ -48,6 +54,17 @@ then
exit 0
fi
COMMAND_ARGS=
if [ -z "$BASEDIR" ]
then
COMMAND_ARGS="--basedir $BASEDIR $COMMAND_ARGS"
fi
if [ -z "$CONFIGFILE" ]
then
COMMAND_ARGS="--config $CONFIGFILE $COMMAND_ARGS"
fi
#
# Function to verify if a pid is alive
#
@ -74,7 +91,7 @@ do_start()
if [ $RETVAL != 0 ]; then
start-stop-daemon --start --background --quiet --pidfile $PIDFILE --make-pidfile \
--exec $DAEMON --chuid $OCTOPRINT_USER --user $OCTOPRINT_USER --umask $UMASK --nicelevel=$NICELEVEL \
-- $DAEMON_ARGS
-- $COMMAND_ARGS serve $DAEMON_ARGS
RETVAL="$?"
fi
}

View file

@ -34,7 +34,8 @@ INSTALL_REQUIRES = [
"pkginfo==1.2.1",
"requests==2.7.0",
"semantic_version==2.4.2",
"psutil==3.2.1"
"psutil==3.2.1",
"Click"
]
# Additional requirements for optional install options

View file

@ -1,85 +1,214 @@
#!/usr/bin/env python2
# coding=utf-8
from __future__ import absolute_import, print_function
import sys
from octoprint.daemon import Daemon
from octoprint.server import Server
import logging
#~~ version
from ._version import get_versions
__version__ = get_versions()['version']
versions = get_versions()
__version__ = versions['version']
__branch__ = versions['branch'] if 'branch' in versions else None
__display_version__ = "{} ({} branch)".format(__version__, __branch__) if __branch__ else __version__
del versions
del get_versions
#~~ main class
#~~ sane logging defaults
class Main(Daemon):
def __init__(self, pidfile, configfile, basedir, host, port, debug, allowRoot, logConf):
Daemon.__init__(self, pidfile)
logging.basicConfig()
self._configfile = configfile
self._basedir = basedir
self._host = host
self._port = port
self._debug = debug
self._allowRoot = allowRoot
self._logConf = logConf
#~~ try to ensure a sound SSL environment
def run(self):
octoprint = Server(self._configfile, self._basedir, self._host, self._port, self._debug, self._allowRoot)
octoprint.run()
urllib3_ssl = True
"""Whether requests/urllib3 and urllib3 (if installed) should be able to establish
a sound SSL environment or not."""
version_info = sys.version_info
if version_info.major == 2 and version_info.minor <= 7 and version_info.micro < 9:
try:
# make sure our requests version of urllib3 is properly patched (if possible)
import requests.packages.urllib3.contrib.pyopenssl
requests.packages.urllib3.contrib.pyopenssl.inject_into_urllib3()
except ImportError:
urllib3_ssl = False
try:
import urllib3
# only proceed if urllib3 is even installed on its own
try:
# urllib3 is there, let's patch that too
import urllib3.contrib.pyopenssl
urllib3.contrib.pyopenssl.inject_into_urllib3()
except ImportError:
urllib3_ssl = False
except ImportError:
pass
del version_info
#~~ init methods to bring up platform
def init_platform(basedir, configfile, use_logging_file=True, logging_file=None,
logging_config=None, debug=False, verbosity=0, uncaught_logger=None,
uncaught_handler=None, after_settings=None, after_logging=None):
settings = init_settings(basedir, configfile)
if callable(after_settings):
after_settings(settings)
logger = init_logging(settings,
use_logging_file=use_logging_file,
logging_file=logging_file,
default_config=logging_config,
debug=debug,
verbosity=verbosity,
uncaught_logger=uncaught_logger,
uncaught_handler=uncaught_handler)
if callable(after_logging):
after_logging(logger)
plugin_manager = init_pluginsystem(settings)
return settings, logger, plugin_manager
def init_settings(basedir, configfile):
"""Inits the settings instance based on basedir and configfile to use."""
from octoprint.settings import settings
return settings(init=True, basedir=basedir, configfile=configfile)
def init_logging(settings, use_logging_file=True, logging_file=None, default_config=None, debug=False, verbosity=0, uncaught_logger=None, uncaught_handler=None):
"""Sets up logging."""
import os
from octoprint.util import dict_merge
# default logging configuration
if default_config is None:
default_config = {
"version": 1,
"formatters": {
"simple": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "DEBUG",
"formatter": "simple",
"stream": "ext://sys.stdout"
},
"file": {
"class": "logging.handlers.TimedRotatingFileHandler",
"level": "DEBUG",
"formatter": "simple",
"when": "D",
"backupCount": "1",
"filename": os.path.join(settings.getBaseFolder("logs"), "octoprint.log")
},
"serialFile": {
"class": "logging.handlers.RotatingFileHandler",
"level": "DEBUG",
"formatter": "simple",
"maxBytes": 2 * 1024 * 1024, # let's limit the serial log to 2MB in size
"filename": os.path.join(settings.getBaseFolder("logs"), "serial.log")
}
},
"loggers": {
"SERIAL": {
"level": "CRITICAL",
"handlers": ["serialFile"],
"propagate": False
},
"octoprint": {
"level": "INFO"
},
"octoprint.util": {
"level": "INFO"
},
"octoprint.plugins": {
"level": "INFO"
}
},
"root": {
"level": "WARN",
"handlers": ["console", "file"]
}
}
if debug or verbosity > 0:
default_config["loggers"]["octoprint"]["level"] = "DEBUG"
default_config["root"]["level"] = "INFO"
if verbosity > 1:
default_config["loggers"]["octoprint.plugins"]["level"] = "DEBUG"
if verbosity > 2:
default_config["root"]["level"] = "DEBUG"
if use_logging_file:
# further logging configuration from file...
if logging_file is None:
logging_file = os.path.join(settings.getBaseFolder("base"), "logging.yaml")
config_from_file = {}
if os.path.exists(logging_file) and os.path.isfile(logging_file):
import yaml
with open(logging_file, "r") as f:
config_from_file = yaml.safe_load(f)
# we merge that with the default config
config = dict_merge(default_config, config_from_file)
else:
config = default_config
# configure logging globally
import logging.config
logging.config.dictConfig(config)
# make sure we log any warnings
logging.captureWarnings(True)
import warnings
categories = (DeprecationWarning, PendingDeprecationWarning)
if verbosity > 2:
warnings.simplefilter("always")
elif debug or verbosity > 0:
for category in categories:
warnings.simplefilter("always", category=category)
# make sure we also log any uncaught exceptions
if uncaught_logger is None:
logger = logging.getLogger(__name__)
else:
logger = logging.getLogger(uncaught_logger)
if uncaught_handler is None:
def exception_logger(exc_type, exc_value, exc_tb):
logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_tb))
uncaught_handler = exception_logger
sys.excepthook = uncaught_handler
return logger
def init_pluginsystem(settings):
"""Initializes the plugin manager based on the settings."""
from octoprint.plugin import plugin_manager
return plugin_manager(init=True, settings=settings)
#~~ server main method
def main():
import argparse
from octoprint.cli import octo
octo(prog_name="octoprint", auto_envvar_prefix="OCTOPRINT")
parser = argparse.ArgumentParser(prog="run")
parser.add_argument("-v", "--version", action="store_true", dest="version",
help="Output OctoPrint's version and exit")
parser.add_argument("-d", "--debug", action="store_true", dest="debug",
help="Enable debug mode")
parser.add_argument("--host", action="store", type=str, dest="host",
help="Specify the host on which to bind the server")
parser.add_argument("--port", action="store", type=int, dest="port",
help="Specify the port on which to bind the server")
parser.add_argument("-c", "--config", action="store", dest="config",
help="Specify the config file to use. OctoPrint needs to have write access for the settings dialog to work. Defaults to ~/.octoprint/config.yaml")
parser.add_argument("-b", "--basedir", action="store", dest="basedir",
help="Specify the basedir to use for uploads, timelapses etc. OctoPrint needs to have write access. Defaults to ~/.octoprint")
parser.add_argument("--logging", action="store", dest="logConf",
help="Specify the config file to use for configuring logging. Defaults to ~/.octoprint/logging.yaml")
parser.add_argument("--daemon", action="store", type=str, choices=["start", "stop", "restart"],
help="Daemonize/control daemonized OctoPrint instance (only supported under Linux right now)")
parser.add_argument("--pid", action="store", type=str, dest="pidfile", default="/tmp/octoprint.pid",
help="Pidfile to use for daemonizing, defaults to /tmp/octoprint.pid")
parser.add_argument("--iknowwhatimdoing", action="store_true", dest="allowRoot",
help="Allow OctoPrint to run as user root")
args = parser.parse_args()
if args.version:
print "OctoPrint version %s" % __version__
sys.exit(0)
if args.daemon:
if sys.platform == "darwin" or sys.platform == "win32":
print >> sys.stderr, "Sorry, daemon mode is only supported under Linux right now"
sys.exit(2)
daemon = Main(args.pidfile, args.config, args.basedir, args.host, args.port, args.debug, args.allowRoot, args.logConf)
if "start" == args.daemon:
daemon.start()
elif "stop" == args.daemon:
daemon.stop()
elif "restart" == args.daemon:
daemon.restart()
else:
octoprint = Server(args.config, args.basedir, args.host, args.port, args.debug, args.allowRoot, args.logConf)
octoprint.run()
if __name__ == "__main__":
main()

View file

@ -0,0 +1,151 @@
# coding=utf-8
from __future__ import absolute_import
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
__copyright__ = "Copyright (C) 2015 The OctoPrint Project - Released under terms of the AGPLv3 License"
import click
import octoprint
#~~ click context
class OctoPrintContext(object):
"""Custom context wrapping the standard options."""
def __init__(self, configfile=None, basedir=None, verbosity=0):
self.configfile = configfile
self.basedir = basedir
self.verbosity = verbosity
pass_octoprint_ctx = click.make_pass_decorator(OctoPrintContext, ensure=True)
"""Decorator to pass in the :class:`OctoPrintContext` instance."""
#~~ Custom click option to hide from help
class HiddenOption(click.Option):
"""Custom option sub class with empty help."""
def get_help_record(self, ctx):
pass
def hidden_option(*param_decls, **attrs):
"""Attaches a hidden option to the command. All positional arguments are
passed as parameter declarations to :class:`Option`; all keyword
arguments are forwarded unchanged. This is equivalent to creating an
:class:`Option` instance manually and attaching it to the
:attr:`Command.params` list.
"""
import inspect
from click.decorators import _param_memo
def decorator(f):
if 'help' in attrs:
attrs['help'] = inspect.cleandoc(attrs['help'])
_param_memo(f, HiddenOption(param_decls, **attrs))
return f
return decorator
#~~ helper for settings context options
def set_ctx_obj_option(ctx, param, value):
"""Helper for setting eager options on the context."""
if ctx.obj is None:
ctx.obj = OctoPrintContext()
if hasattr(ctx.obj, param.name):
setattr(ctx.obj, param.name, value)
#~~ helper for setting a lot of bulk options
def bulk_options(options):
"""
Utility decorator to decorate a function with a list of click decorators.
The provided list of ``options`` will be reversed to ensure correct
processing order (inverse from what would be intuitive).
"""
def decorator(f):
options.reverse()
for option in options:
option(f)
return f
return decorator
#~~ helper for setting --basedir, --config and --verbose options
def standard_options(hidden=False):
"""
Decorator to add the standard options shared among all "octoprint" commands.
Adds the options ``--basedir``, ``--config`` and ``--verbose``. If ``hidden``
is set to ``True``, the options will be available on the command but not
listed in its help page.
"""
factory = click.option
if hidden:
factory = hidden_option
options = [
factory("--basedir", "-b", type=click.Path(), callback=set_ctx_obj_option, is_eager=True, expose_value=False,
help="Specify the basedir to use for uploads, timelapses etc."),
factory("--config", "-c", "configfile", type=click.Path(), callback=set_ctx_obj_option, is_eager=True, expose_value=False,
help="Specify the config file to use."),
factory("--verbose", "-v", "verbosity", count=True, callback=set_ctx_obj_option, is_eager=True, expose_value=False,
help="Increase logging verbosity"),
]
return bulk_options(options)
#~~ helper for settings legacy options we still have to support on "octoprint"
legacy_options = bulk_options([
hidden_option("--host", type=click.STRING),
hidden_option("--port", type=click.INT),
hidden_option("--logging", type=click.Path()),
hidden_option("--debug", "-d", is_flag=True),
hidden_option("--daemon", type=click.Choice(["start", "stop", "restart"])),
hidden_option("--pid", type=click.Path(), default="/tmp/octoprint.pid"),
hidden_option("--iknowwhatimdoing", "allow_root", is_flag=True),
])
"""Legacy options available directly on the "octoprint" command in earlier versions.
Kept available for reasons of backwards compatibility, but hidden from the
generated help pages."""
#~~ "octoprint" command, merges server_commands and plugin_commands groups
from .server import server_commands
from .plugins import plugin_commands
from .dev import dev_commands
from .client import client_commands
@click.group(name="octoprint", invoke_without_command=True, cls=click.CommandCollection,
sources=[server_commands, plugin_commands, dev_commands, client_commands])
@standard_options()
@legacy_options
@click.version_option(version=octoprint.__version__)
@click.pass_context
def octo(ctx, debug, host, port, logging, daemon, pid, allow_root):
if ctx.invoked_subcommand is None:
# We have to support calling the octoprint command without any
# sub commands to remain backwards compatible.
#
# But better print a message to inform people that they should
# use the sub commands instead.
if daemon:
click.echo("Daemon operation via \"octoprint --daemon "
"start|stop|restart\" is deprecated, please use "
"\"octoprint daemon start|stop|restart\" from now on")
from octoprint.cli.server import daemon_command
ctx.invoke(daemon_command, debug=debug, pid=pid, daemon=daemon, allow_root=allow_root)
else:
click.echo("Starting the server via \"octoprint\" is deprecated, "
"please use \"octoprint serve\" from now on.")
from octoprint.cli.server import serve_command
ctx.invoke(serve_command, debug=debug, host=host, port=port, logging=logging, allow_root=allow_root)

176
src/octoprint/cli/client.py Normal file
View file

@ -0,0 +1,176 @@
# coding=utf-8
from __future__ import absolute_import
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
__copyright__ = "Copyright (C) 2015 The OctoPrint Project - Released under terms of the AGPLv3 License"
import click
import json
import octoprint_client
from octoprint.cli import pass_octoprint_ctx, bulk_options, standard_options
from octoprint import init_settings
class JsonStringParamType(click.ParamType):
name = "json"
def convert(self, value, param, ctx):
try:
return json.loads(value)
except:
self.fail("%s is not a valid json string" % value, param, ctx)
@click.group()
def client_commands():
pass
@client_commands.group("client", context_settings=dict(ignore_unknown_options=True))
@click.option("--host", "-h", type=click.STRING)
@click.option("--port", "-p", type=click.INT)
@click.option("--httpuser", type=click.STRING)
@click.option("--httppass", type=click.STRING)
@click.option("--https", is_flag=True)
@click.option("--prefix", type=click.STRING)
@pass_octoprint_ctx
def client(obj, host, port, httpuser, httppass, https, prefix):
"""Basic API client."""
obj.settings = init_settings(obj.basedir, obj.configfile)
octoprint_client.init_client(obj.settings, https=https, httpuser=httpuser, httppass=httppass, host=host, port=port, prefix=prefix)
def log_response(response, status_code=True, body=True, headers=False):
if status_code:
click.echo("Status Code: {}".format(response.status_code))
if headers:
for header, value in response.headers.items():
click.echo("{}: {}".format(header, value))
click.echo()
if body:
click.echo(response.text)
@client.command("get")
@click.argument("path")
def get(path):
"""Performs a GET request against the specified server path."""
r = octoprint_client.get(path)
log_response(r)
@client.command("post_json")
@click.argument("path")
@click.argument("data", type=JsonStringParamType())
def post_json(path, data):
"""POSTs JSON data to the specified server path."""
r = octoprint_client.post_json(path, data)
log_response(r)
@client.command("patch_json")
@click.argument("path")
@click.argument("data", type=JsonStringParamType())
def patch_json(path, data):
"""PATCHes JSON data to the specified server path."""
r = octoprint_client.patch(path, data, encoding="json")
log_response(r)
@client.command("post_from_file")
@click.argument("path")
@click.argument("file_path", type=click.Path(exists=True, dir_okay=False, resolve_path=True))
@click.option("--json", is_flag=True)
@click.option("--yaml", is_flag=True)
def post_from_file(path, file_path, json_flag, yaml_flag):
"""POSTs JSON data to the specified server path."""
if json_flag or yaml_flag:
if json_flag:
with open(file_path, "rb") as fp:
data = json.load(fp)
else:
import yaml
with open(file_path, "rb") as fp:
data = yaml.safe_load(fp)
r = octoprint_client.post_json(path, data)
else:
with open(file_path, "rb") as fp:
data = fp.read()
r = octoprint_client.post(path, data)
log_response(r)
@client.command("command")
@click.argument("path")
@click.argument("command")
@click.option("--str", "-s", "str_params", multiple=True, nargs=2, type=click.Tuple([unicode, unicode]))
@click.option("--int", "-i", "int_params", multiple=True, nargs=2, type=click.Tuple([unicode, int]))
@click.option("--float", "-f", "float_params", multiple=True, nargs=2, type=click.Tuple([unicode, float]))
@click.option("--bool", "-b", "bool_params", multiple=True, nargs=2, type=click.Tuple([unicode, bool]))
def command(path, command, str_params, int_params, float_params, bool_params):
"""Sends a JSON command to the specified server path."""
data = dict()
params = str_params + int_params + float_params + bool_params
for param in params:
data[param[0]] = param[1]
r = octoprint_client.post_command(path, command, additional=data)
log_response(r, body=False)
@client.command("upload")
@click.argument("path")
@click.argument("file_path", type=click.Path(exists=True, dir_okay=False, resolve_path=True))
@click.option("--parameter", "-P", "params", multiple=True, nargs=2, type=click.Tuple([unicode, unicode]))
@click.option("--file-name", type=click.STRING)
@click.option("--content-type", type=click.STRING)
def upload(path, file_path, params, file_name, content_type):
"""Uploads the specified file to the specified server path."""
data = dict()
for param in params:
data[param[0]] = param[1]
r = octoprint_client.upload(path, file_path, additional=data, file_name=file_name, content_type=content_type)
log_response(r)
@client.command("delete")
@click.argument("path")
def delete(path):
"""Sends a DELETE request to the specified server path."""
r = octoprint_client.delete(path)
log_response(r)
@client.command("listen")
def listen():
def on_connect(ws):
click.echo(">>> Connected!")
def on_close(ws):
click.echo(">>> Connection closed!")
def on_error(ws, error):
click.echo("!!! Error: {}".format(error))
def on_heartbeat(ws):
click.echo("<3")
def on_message(ws, message_type, message_payload):
click.echo("Message: {}, Payload: {}".format(message_type, json.dumps(message_payload)))
socket = octoprint_client.connect_socket(on_connect=on_connect,
on_close=on_close,
on_error=on_error,
on_heartbeat=on_heartbeat,
on_message=on_message)
click.echo(">>> Waiting for client to exit")
try:
socket.wait()
finally:
click.echo(">>> Goodbye...")

233
src/octoprint/cli/dev.py Normal file
View file

@ -0,0 +1,233 @@
# coding=utf-8
from __future__ import absolute_import
__author__ = "Gina Häußge <osd@foosel.net>"
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
__copyright__ = "Copyright (C) 2015 The OctoPrint Project - Released under terms of the AGPLv3 License"
import click
class OctoPrintDevelCommands(click.MultiCommand):
"""
Custom `click.MultiCommand <http://click.pocoo.org/5/api/#click.MultiCommand>`_
implementation that provides commands relevant for (plugin) development
based on availability of development dependencies.
"""
sep = ":"
groups = ("plugin",)
def __init__(self, *args, **kwargs):
click.MultiCommand.__init__(self, *args, **kwargs)
from octoprint.util.commandline import CommandlineCaller
from functools import partial
def log_util(f):
def log(*lines):
for line in lines:
f(line)
return log
self.command_caller = CommandlineCaller()
self.command_caller.on_log_call = log_util(lambda x: click.echo(">> {}".format(x)))
self.command_caller.on_log_stdout = log_util(click.echo)
self.command_caller.on_log_stderr = log_util(partial(click.echo, err=True))
def _get_prefix_methods(self, method_prefix):
for name in [x for x in dir(self) if x.startswith(method_prefix)]:
method = getattr(self, name)
yield method
def _get_commands_from_prefix_methods(self, method_prefix):
for method in self._get_prefix_methods(method_prefix):
result = method()
if result is not None and isinstance(result, click.Command):
yield result
def _get_commands(self):
result = dict()
for group in self.groups:
for command in self._get_commands_from_prefix_methods("{}_".format(group)):
result[group + self.sep + command.name] = command
return result
def list_commands(self, ctx):
result = [name for name in self._get_commands()]
result.sort()
return result
def get_command(self, ctx, cmd_name):
commands = self._get_commands()
return commands.get(cmd_name, None)
def plugin_new(self):
try:
import cookiecutter.main
except ImportError:
return None
import contextlib
@contextlib.contextmanager
def custom_cookiecutter_config(config):
"""
Allows overriding cookiecutter's user config with a custom dict
with fallback to the original data.
"""
from octoprint.util import fallback_dict
original_get_user_config = cookiecutter.main.get_user_config
original_config = original_get_user_config()
try:
cookiecutter.main.get_user_config = lambda: fallback_dict(config, original_config)
yield
finally:
cookiecutter.main.get_user_config = original_get_user_config
@contextlib.contextmanager
def custom_cookiecutter_prompt(options):
"""
Custom cookiecutter prompter for the template config.
If a setting is available in the provided options (read from the CLI)
that will be used, otherwise the user will be prompted for a value
via click.
"""
original_prompt_for_config = cookiecutter.main.prompt_for_config
def custom_prompt_for_config(context, no_input=False):
import cookiecutter.prompt
cookiecutter_dict = {}
env = cookiecutter.prompt.Environment()
for key, raw in context['cookiecutter'].items():
if key in options:
val = options[key]
else:
raw = raw if isinstance(raw, basestring) else str(raw)
val = env.from_string(raw).render(cookiecutter=cookiecutter_dict)
if not no_input:
val = click.prompt(key, default=val)
cookiecutter_dict[key] = val
return cookiecutter_dict
try:
cookiecutter.main.prompt_for_config = custom_prompt_for_config
yield
finally:
cookiecutter.main.prompt_for_config = original_prompt_for_config
@click.command("new")
@click.option("--name", "-n", help="The name of the plugin")
@click.option("--package", "-p", help="The plugin package")
@click.option("--author", "-a", help="The plugin author's name")
@click.option("--email", "-e", help="The plugin author's mail address")
@click.option("--license", "-l", help="The plugin's license")
@click.option("--description", "-d", help="The plugin's description")
@click.option("--homepage", help="The plugin's homepage URL")
@click.option("--source", "-s", help="The URL to the plugin's source")
@click.option("--installurl", "-i", help="The plugin's install URL")
@click.argument("identifier", required=False)
def command(name, package, author, email, description, license, homepage, source, installurl, identifier):
"""Creates a new plugin based on the OctoPrint Plugin cookiecutter template."""
from octoprint.util import tempdir
# deleting a git checkout folder might run into access errors due
# to write-protected sub folders, so we use a custom onerror handler
# that tries to fix such permissions
def onerror(func, path, exc_info):
"""Originally from http://stackoverflow.com/a/2656405/2028598"""
import stat
import os
if not os.access(path, os.W_OK):
os.chmod(path, stat.S_IWUSR)
func(path)
else:
raise
with tempdir(onerror=onerror) as path:
custom = dict(cookiecutters_dir=path)
with custom_cookiecutter_config(custom):
raw_options = dict(
plugin_identifier=identifier,
plugin_package=package,
plugin_name=name,
full_name=author,
email=email,
plugin_description=description,
plugin_license=license,
plugin_homepage=homepage,
plugin_source=source,
plugin_installurl=installurl
)
options = dict((k, v) for k, v in raw_options.items() if v is not None)
with custom_cookiecutter_prompt(options):
cookiecutter.main.cookiecutter("gh:OctoPrint/cookiecutter-octoprint-plugin")
return command
def plugin_install(self):
@click.command("install")
@click.option("--path", help="Path of the local plugin development folder to install")
def command(path):
"""
Installs the local plugin in development mode.
Note: This can NOT be used to install plugins from remote locations
such as the plugin repository! It is strictly for local development
of plugins, to ensure the plugin is installed (editable) into the
same python environment that OctoPrint is installed under.
"""
import os
import sys
if not path:
path = os.getcwd()
# check if this really looks like a plugin
if not os.path.isfile(os.path.join(path, "setup.py")):
click.echo("This doesn't look like an OctoPrint plugin folder")
sys.exit(1)
self.command_caller.call([sys.executable, "setup.py", "develop"], cwd=path)
return command
def plugin_uninstall(self):
from octoprint.util.pip import PipCaller
pip_command = PipCaller.autodetect_pip()
if pip_command is None:
return
@click.command("uninstall")
@click.argument("name")
def command(name):
"""Uninstalls the plugin with the given name."""
import sys
lower_name = name.lower()
if not lower_name.startswith("octoprint_") and not lower_name.startswith("octoprint-"):
click.echo("This doesn't look like an OctoPrint plugin name")
sys.exit(1)
call = [pip_command, "uninstall", "--yes", name]
self.command_caller.call(call)
return command
@click.group()
def dev_commands():
pass
@dev_commands.group(name="dev", cls=OctoPrintDevelCommands)
def dev():
"""Additional commands for development tasks."""
pass

View file

@ -0,0 +1,99 @@
# coding=utf-8
from __future__ import absolute_import
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
__copyright__ = "Copyright (C) 2015 The OctoPrint Project - Released under terms of the AGPLv3 License"
import click
import logging
from octoprint.cli import pass_octoprint_ctx, OctoPrintContext
#~~ "octoprint plugin:command" commands
class OctoPrintPluginCommands(click.MultiCommand):
"""
Custom `click.MultiCommand <http://click.pocoo.org/5/api/#click.MultiCommand>`_
implementation that collects commands from the plugin hook
:ref:`octoprint.cli.commands <sec-plugins-hook-cli-commands>`.
.. attribute:: settings
The global :class:`~octoprint.settings.Settings` instance.
.. attribute:: plugin_manager
The :class:`~octoprint.plugin.core.PluginManager` instance.
"""
sep = ":"
def __init__(self, *args, **kwargs):
click.MultiCommand.__init__(self, *args, **kwargs)
self.settings = None
self.plugin_manager = None
self.hooks = dict()
self._logger = logging.getLogger(__name__)
self._initialized = False
def _initialize(self, ctx):
if self._initialized:
return
if ctx.obj is None:
ctx.obj = OctoPrintContext()
# initialize settings and plugin manager based on provided
# context (basedir and configfile)
from octoprint import init_settings, init_pluginsystem
self.settings = init_settings(ctx.obj.basedir, ctx.obj.configfile)
self.plugin_manager = init_pluginsystem(self.settings)
# fetch registered hooks
self.hooks = self.plugin_manager.get_hooks("octoprint.cli.commands")
self._initialized = True
def list_commands(self, ctx):
self._initialize(ctx)
result = [name for name in self._get_commands()]
result.sort()
return result
def get_command(self, ctx, cmd_name):
self._initialize(ctx)
commands = self._get_commands()
return commands.get(cmd_name, None)
def _get_commands(self):
"""Fetch all commands from plugins providing any."""
import collections
result = collections.OrderedDict()
for name, hook in self.hooks.items():
try:
commands = hook(self, pass_octoprint_ctx)
for command in commands:
if not isinstance(command, click.Command):
self._logger.warn("Plugin {} provided invalid CLI command, ignoring it: {!r}".format(name, command))
continue
result[name + self.sep + command.name] = command
except:
self._logger.exception("Error while retrieving cli commants for plugin {}".format(name))
return result
@click.group()
@pass_octoprint_ctx
def plugin_commands(obj):
logging.basicConfig(level=logging.DEBUG if obj.verbosity > 0 else logging.WARN)
@plugin_commands.group(name="plugins", cls=OctoPrintPluginCommands)
def plugins():
"""Additional commands provided by plugins."""
pass

131
src/octoprint/cli/server.py Normal file
View file

@ -0,0 +1,131 @@
# coding=utf-8
from __future__ import absolute_import
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
__copyright__ = "Copyright (C) 2015 The OctoPrint Project - Released under terms of the AGPLv3 License"
import click
import logging
import sys
from octoprint.cli import pass_octoprint_ctx, bulk_options, standard_options
def run_server(basedir, configfile, host, port, debug, allow_root, logging_config, verbosity):
"""Initializes the environment and starts up the server."""
from octoprint import init_platform, __display_version__
def log_startup(_):
logging.getLogger("octoprint.server").info("Starting OctoPrint {}".format(__display_version__))
from octoprint import urllib3_ssl
if not urllib3_ssl:
logging.getLogger("octoprint.server")\
.warn("requests/urllib3 will run in an insecure SSL environment. "
"You might see corresponding warnings logged later "
"(\"InsecurePlatformWarning\"). It is recommended to either "
"update to a Python version >= 2.7.9 or alternatively "
"install PyOpenSSL plus its dependencies. For details see "
"https://urllib3.readthedocs.org/en/latest/security.html#openssl-pyopenssl")
settings, _, plugin_manager = init_platform(basedir,
configfile,
logging_file=logging_config,
debug=debug,
verbosity=verbosity,
uncaught_logger=__name__,
after_logging=log_startup)
from octoprint.server import Server
octoprint_server = Server(settings=settings, plugin_manager=plugin_manager, host=host, port=port, debug=debug, allow_root=allow_root)
octoprint_server.run()
#~~ server options
server_options = bulk_options([
click.option("--host", type=click.STRING,
help="Specify the host on which to bind the server."),
click.option("--port", type=click.INT,
help="Specify the port on which to bind the server."),
click.option("--logging", type=click.Path(),
help="Specify the config file to use for configuring logging."),
click.option("--iknowwhatimdoing", "allow_root", is_flag=True,
help="Allow OctoPrint to run as user root."),
click.option("--debug", is_flag=True, help="Enable debug mode"),
])
"""Decorator to add the options shared among the server commands: ``--host``, ``--port``,
``--logging``, ``--iknowwhatimdoing`` and ``--debug``."""
#~~ "octoprint serve" and "octoprint daemon" commands
@click.group()
@pass_octoprint_ctx
def server_commands(obj):
pass
@server_commands.command(name="serve")
@server_options
@standard_options(hidden=True)
@pass_octoprint_ctx
def serve_command(obj, host, port, logging, allow_root, debug):
"""Starts the OctoPrint server."""
run_server(obj.basedir, obj.configfile, host, port, debug,
allow_root, logging, obj.verbosity)
@server_commands.command(name="daemon")
@click.option("--pid", type=click.Path(), default="/tmp/octoprint.pid",
help="Pidfile to use for daemonizing.")
@server_options
@standard_options(hidden=True)
@click.argument("command", type=click.Choice(["start", "stop", "restart", "status"]),
metavar="start|stop|restart|status")
@pass_octoprint_ctx
def daemon_command(octoprint_ctx, pid, host, port, logging, allow_root, debug, command):
"""
Starts, stops or restarts in daemon mode.
Please note that daemon mode is only supported under Linux right now.
"""
if sys.platform == "darwin" or sys.platform == "win32":
click.echo("Sorry, daemon mode is only supported under Linux right now",
file=sys.stderr)
sys.exit(2)
if pid is None:
click.echo("No path to a pidfile set",
file=sys.stderr)
sys.exit(1)
from octoprint.daemon import Daemon
class OctoPrintDaemon(Daemon):
def __init__(self, pidfile, basedir, configfile, host, port, debug, allow_root, logging_config, verbosity):
Daemon.__init__(self, pidfile)
self._basedir = basedir
self._configfile = configfile
self._host = host
self._port = port
self._debug = debug
self._allow_root = allow_root
self._logging_config = logging_config
self._verbosity = verbosity
def run(self):
run_server(self._basedir, self._configfile, self._host, self._port, self._debug, self._allow_root, self._logging_config, self._verbosity)
octoprint_daemon = OctoPrintDaemon(pid, octoprint_ctx.basedir, octoprint_ctx.configfile,
host, port, debug, allow_root, logging, octoprint_ctx.verbosity)
if command == "start":
octoprint_daemon.start()
elif command == "stop":
octoprint_daemon.stop()
elif command == "restart":
octoprint_daemon.restart()
elif command == "status":
octoprint_daemon.status()

View file

@ -1,46 +1,65 @@
# coding=utf-8
"""
Generic linux daemon base class for python 3.x
Generic linux daemon base class
Originally from http://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/#c35
"""
from __future__ import (print_function, absolute_import)
import sys, os, time, signal
class Daemon:
"""A generic daemon class.
"""
A generic daemon class.
Usage: subclass the daemon class and override the run() method."""
Usage: subclass the daemon class and override the run() method.
def __init__(self, pidfile): self.pidfile = pidfile
def daemonize(self):
"""Deamonize class. UNIX double fork mechanism."""
If you want to log the output to someplace different that stdout and stderr,
also override the echo() and error() methods.
"""
try:
pid = os.fork()
def __init__(self, pidfile):
self.pidfile = pidfile
def _daemonize(self):
"""Daemonize class. UNIX double fork mechanism."""
self._double_fork()
self._redirect_io()
# write pidfile
pid = str(os.getpid())
self.set_pid(pid)
# register listener for SIGTERM
signal.signal(signal.SIGTERM, self._on_sigterm)
def _double_fork(self):
try:
pid = os.fork()
if pid > 0:
# exit first parent
sys.exit(0)
except OSError as err:
sys.stderr.write('fork #1 failed: {0}\n'.format(err))
sys.exit(0)
except OSError as err:
self.error("First fork failed: {}".format(str(err)))
sys.exit(1)
# decouple from parent environment
os.chdir('/')
os.setsid()
os.umask(002)
# do second fork
try:
pid = os.fork()
if pid > 0:
# decouple from parent environment
os.chdir('/')
os.setsid()
os.umask(002)
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent
sys.exit(0)
except OSError as err:
sys.stderr.write('fork #2 failed: {0}\n'.format(err))
sys.exit(1)
sys.exit(0)
except OSError as err:
self.error("Second fork failed: {}".format(str(err)))
sys.exit(1)
def _redirect_io(self):
# redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
@ -51,57 +70,39 @@ class Daemon:
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
# write pidfile
pid = str(os.getpid())
with open(self.pidfile,'w+') as f:
f.write(pid + '\n')
# register listener for SIGTERM
signal.signal(signal.SIGTERM, self.term)
def term(self, _signo, _stack_frame):
os.remove(self.pidfile)
def _on_sigterm(self, _signo, _stack_frame):
"""Signal handler for SIGTERM, deletes the pidfile."""
self.remove_pidfile()
sys.exit(0)
def start(self):
"""Start the daemon."""
# Check for a pidfile to see if the daemon already runs
try:
with open(self.pidfile,'r') as pf:
pid = int(pf.read().strip())
except IOError:
pid = None
pid = self.get_pid()
if pid:
message = "pidfile {0} already exist. " + \
"Daemon already running?\n"
sys.stderr.write(message.format(self.pidfile))
self.error("pidfile {} already exist. Is the daemon already running?".format(self.pidfile))
sys.exit(1)
self.echo("Starting daemon...")
# Start the daemon
self.daemonize()
self._daemonize()
self.run()
def stop(self):
def stop(self, check_running=True):
"""Stop the daemon."""
# Get the pid from the pidfile
try:
with open(self.pidfile,'r') as pf:
pid = int(pf.read().strip())
except IOError:
pid = None
pid = self.get_pid()
if not pid:
message = "pidfile {0} does not exist. " + \
"Daemon not running?\n"
sys.stderr.write(message.format(self.pidfile))
return # not an error in a restart
if not check_running:
return
self.error("pidfile {} does not exist. Is the daemon really running?".format(self.pidfile))
sys.exit(1)
# Try killing the daemon process
self.echo("Stopping daemon...")
# Try killing the daemon process
try:
while 1:
os.kill(pid, signal.SIGTERM)
@ -109,20 +110,71 @@ class Daemon:
except OSError as err:
e = str(err.args)
if e.find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
self.remove_pidfile()
else:
print (str(err.args))
self.error(e)
sys.exit(1)
def restart(self):
"""Restart the daemon."""
self.stop()
self.stop(check_running=False)
self.start()
def status(self):
"""Prints the daemon status."""
if self.is_running():
self.echo("Daemon is running")
else:
self.echo("Daemon is not running")
def is_running(self):
"""Check if a process is running under the specified pid."""
pid = self.get_pid()
if pid is None:
return False
try:
os.kill(pid, 0)
except OSError:
try:
self.remove_pidfile()
except:
self.error("Daemon found not running, but could not remove stale pidfile")
return False
else:
return True
def get_pid(self):
"""Get the pid from the pidfile."""
try:
with open(self.pidfile,'r') as pf:
pid = int(pf.read().strip())
except (IOError, ValueError):
pid = None
return pid
def set_pid(self, pid):
"""Write the pid to the pidfile."""
with open(self.pidfile,'w+') as f:
f.write(str(pid) + '\n')
def remove_pidfile(self):
"""Removes the pidfile."""
if os.path.isfile(self.pidfile):
os.remove(self.pidfile)
def run(self):
"""You should override this method when you subclass Daemon.
It will be called after the process has been daemonized by
It will be called after the process has been daemonized by
start() or restart()."""
raise NotImplementedError()
@classmethod
def echo(cls, line):
print(line)
@classmethod
def error(cls, line):
print(line, stream=sys.stderr)

View file

@ -22,7 +22,7 @@ __copyright__ = "Copyright (C) 2014 The OctoPrint Project - Released under terms
import os
import logging
from octoprint.settings import settings
from octoprint.settings import settings as s
from octoprint.plugin.core import (PluginInfo, PluginManager, Plugin)
from octoprint.plugin.types import *
@ -44,7 +44,7 @@ def _validate_plugin(phase, plugin_info):
setattr(plugin_info.instance, PluginInfo.attr_hooks, hooks)
def plugin_manager(init=False, plugin_folders=None, plugin_types=None, plugin_entry_points=None, plugin_disabled_list=None,
plugin_restart_needing_hooks=None, plugin_obsolete_hooks=None, plugin_validators=None):
plugin_restart_needing_hooks=None, plugin_obsolete_hooks=None, plugin_validators=None, settings=None):
"""
Factory method for initially constructing and consecutively retrieving the :class:`~octoprint.plugin.core.PluginManager`
singleton.
@ -87,9 +87,12 @@ def plugin_manager(init=False, plugin_folders=None, plugin_types=None, plugin_en
else:
if init:
if settings is None:
settings = s()
if plugin_folders is None:
plugin_folders = (
settings().getBaseFolder("plugins"),
settings.getBaseFolder("plugins"),
(os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", "plugins")), True)
)
if plugin_types is None:
@ -109,7 +112,7 @@ def plugin_manager(init=False, plugin_folders=None, plugin_types=None, plugin_en
if plugin_entry_points is None:
plugin_entry_points = "octoprint.plugin"
if plugin_disabled_list is None:
plugin_disabled_list = settings().get(["plugins", "_disabled"])
plugin_disabled_list = settings.get(["plugins", "_disabled"])
if plugin_restart_needing_hooks is None:
plugin_restart_needing_hooks = [
"octoprint.server.http"
@ -136,7 +139,7 @@ def plugin_manager(init=False, plugin_folders=None, plugin_types=None, plugin_en
return _instance
def plugin_settings(plugin_key, defaults=None, get_preprocessors=None, set_preprocessors=None):
def plugin_settings(plugin_key, defaults=None, get_preprocessors=None, set_preprocessors=None, settings=None):
"""
Factory method for creating a :class:`PluginSettings` instance.
@ -145,12 +148,45 @@ def plugin_settings(plugin_key, defaults=None, get_preprocessors=None, set_prepr
defaults (dict): The default settings for the plugin.
get_preprocessors (dict): The getter preprocessors for the plugin.
set_preprocessors (dict): The setter preprocessors for the plugin.
settings (octoprint.settings.Settings): The settings instance to use.
Returns:
PluginSettings: A fully initialized :class:`PluginSettings` instance to be used to access the plugin's
settings
"""
return PluginSettings(settings(), plugin_key, defaults=defaults, get_preprocessors=get_preprocessors, set_preprocessors=set_preprocessors)
if settings is None:
settings = s()
return PluginSettings(settings, plugin_key, defaults=defaults,
get_preprocessors=get_preprocessors,
set_preprocessors=set_preprocessors)
def plugin_settings_for_settings_plugin(plugin_key, instance, settings=None):
"""
Factory method for creating a :class:`PluginSettings` instance for a given :class:`SettingsPlugin` instance.
Will return `None` if the provided `instance` is not a :class:`SettingsPlugin` instance.
Arguments:
plugin_key (string): The plugin identifier for which to create the settings instance.
implementation (octoprint.plugin.SettingsPlugin): The :class:`SettingsPlugin` instance.
settings (octoprint.settings.Settings): The settings instance to use. Defaults to the global OctoPrint settings.
Returns:
PluginSettings or None: A fully initialized :class:`PluginSettings` instance to be used to access the plugin's
settings, or `None` if the provided `instance` was not a class:`SettingsPlugin`
"""
if not isinstance(instance, SettingsPlugin):
return None
try:
defaults = instance.get_settings_defaults()
get_preprocessors, set_preprocessors = instance.get_settings_preprocessors()
except:
logging.getLogger(__name__).exception("Error while retrieving defaults or preprocessors for plugin {}".format(plugin_key))
return None
return plugin_settings(plugin_key, defaults=defaults, get_preprocessors=get_preprocessors, set_preprocessors=set_preprocessors, settings=settings)
def call_plugin(types, method, args=None, kwargs=None, callback=None, error_callback=None, sorting_context=None):

View file

@ -113,8 +113,10 @@
{{ pluginmanager_sudopip() }}
<h4 style="position: relative">
{{ _('... from the <a href="%(url)s" target="_blank">Plugin Repository</a>', url='http://plugins.octoprint.org') }}
<a class="dropdown-toggle pull-right" data-toggle="dropdown" href="#">
<span class="icon-wrench"></span>
<div class="dropdown pull-right">
<a class="dropdown-toggle" data-toggle="dropdown" href="#">
<span class="icon-wrench"></span>
</a>
<ul class="dropdown-menu pull-right">
<li><a href="#" data-bind="click: function() { repositoryplugins.changeSorting('title'); }"><i class="icon-ok" data-bind="style: {visibility: repositoryplugins.currentSorting() == 'title' ? 'visible' : 'hidden'}"></i> {{ _('Sort by title') }} ({{ _('ascending') }})</a></li>
<li><a href="#" data-bind="click: function() { repositoryplugins.changeSorting('published'); }"><i class="icon-ok" data-bind="style: {visibility: repositoryplugins.currentSorting() == 'published' ? 'visible' : 'hidden'}"></i> {{ _('Sort by publication date') }} ({{ _('descending') }})</a></li>
@ -124,7 +126,7 @@
<li class="divider"></li>
<li><a href="#" data-bind="click: function() { refreshRepository(); }"><i class="icon-refresh"></i> {{ _('Refresh list from repository') }}</a></li>
</ul>
</a>
</div>
</h4>
<form class="form-search" data-bind="submit: performRepositorySearch">

View file

@ -16,7 +16,7 @@ import logging
import logging.handlers
import hashlib
from . import version_checks, updaters, exceptions, util
from . import version_checks, updaters, exceptions, util, cli
from octoprint.server.util.flask import restricted_access
@ -351,7 +351,7 @@ class SoftwareUpdatePlugin(octoprint.plugin.BlueprintPlugin,
@restricted_access
def check_for_update(self):
if "check" in flask.request.values:
check_targets = map(str.strip, flask.request.values["check"].split(","))
check_targets = map(lambda x: x.strip(), flask.request.values["check"].split(","))
else:
check_targets = None
@ -381,7 +381,7 @@ class SoftwareUpdatePlugin(octoprint.plugin.BlueprintPlugin,
json_data = flask.request.json
if "check" in json_data:
check_targets = map(str.strip, json_data["check"])
check_targets = map(lambda x: x.strip(), json_data["check"])
else:
check_targets = None
@ -528,7 +528,8 @@ class SoftwareUpdatePlugin(octoprint.plugin.BlueprintPlugin,
updater_thread.daemon = False
updater_thread.start()
return to_be_updated, dict((key, check["displayName"] if "displayName" in check else key) for key, check in checks.items() if key in to_be_updated)
check_data = dict((key, self._populated_check(key, check)["displayName"]) for key, check in checks.items() if key in to_be_updated)
return to_be_updated, check_data
def _update_worker(self, checks, check_targets, force):
@ -761,6 +762,7 @@ class SoftwareUpdatePlugin(octoprint.plugin.BlueprintPlugin,
else:
raise exceptions.UnknownUpdateType()
__plugin_name__ = "Software Update"
__plugin_author__ = "Gina Häußge"
__plugin_url__ = "https://github.com/foosel/OctoPrint/wiki/Plugin:-Software-Update"
@ -778,4 +780,9 @@ def __plugin_load__():
util=util
)
global __plugin_hooks__
__plugin_hooks__ = {
"octoprint.cli.commands": cli.commands
}

View file

@ -0,0 +1,167 @@
# coding=utf-8
from __future__ import absolute_import
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
__copyright__ = "Copyright (C) 2015 The OctoPrint Project - Released under terms of the AGPLv3 License"
def commands(cli_group, pass_octoprint_ctx, *args, **kwargs):
import click
import sys
import requests.exceptions
import octoprint_client as client
@click.command("check")
@click.option("--force", is_flag=True, help="Ignore the cache for the update check")
@click.argument("targets", nargs=-1)
def check_command(force, targets):
"""Check for updates."""
params = dict(force=force)
if targets:
params["check"] = ",".join(targets)
client.init_client(cli_group.settings)
r = client.get("plugin/softwareupdate/check", params=params)
try:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
click.echo("Could not get update information from server, got {}".format(e))
sys.exit(1)
data = r.json()
status = data["status"]
information = data["information"]
lines = []
octoprint_line = None
for key, info in information.items():
status_text = "Up to date"
if info["updateAvailable"]:
if info["updatePossible"]:
status_text = "Update available"
else:
status_text = "Update available (manual)"
line = "{}\n\tInstalled: {}\n\tAvailable: {}\n\t=> {}".format(info["displayName"],
info["information"]["local"]["name"],
info["information"]["remote"]["name"],
status_text)
if key == "octoprint":
octoprint_line = line
else:
lines.append(line)
lines.sort()
if octoprint_line:
lines = [octoprint_line] + lines
for line in lines:
click.echo(line)
click.echo()
if status == "current":
click.echo("Everything is up to date")
else:
click.echo("There are updates available!")
@click.command("update")
@click.option("--force", is_flag=True, help="Update even if already up to date")
@click.argument("targets", nargs=-1)
def update_command(force, targets):
"""Apply updates."""
data = dict(force=force)
if targets:
data["check"] = targets
client.init_client(cli_group.settings)
flags = dict(
waiting_for_restart=False,
seen_close=False
)
def on_message(ws, msg_type, msg):
if msg_type != "plugin" or msg["plugin"] != "softwareupdate":
return
plugin_message = msg["data"]
if not "type" in plugin_message:
return
plugin_message_type = plugin_message["type"]
plugin_message_data = plugin_message["data"]
if plugin_message_type == "updating":
click.echo("Updating {} to {}...".format(plugin_message_data["name"], plugin_message_data["target"]))
elif plugin_message_type == "update_failed":
click.echo("\t... failed: {}".format(plugin_message_data["reason"]))
elif plugin_message_type == "loglines" and "loglines" in plugin_message_data:
for entry in plugin_message_data["loglines"]:
prefix = ">>> " if entry["stream"] == "call" else ""
error = entry["stream"] == "stderr"
click.echo("\t{}{}".format(prefix, entry["line"].replace("\n", "\n\t")), err=error)
elif plugin_message_type == "success" or plugin_message_type == "restart_manually":
results = plugin_message_data["results"] if "results" in plugin_message_data else dict()
if results:
click.echo("The update finished successfully.")
if plugin_message_type == "restart_manually":
click.echo("Please restart the OctoPrint server.")
else:
click.echo("No update necessary")
ws.close()
elif plugin_message_type == "restarting":
flags["waiting_for_restart"] = True
click.echo("Restarting to apply changes...")
elif plugin_message_type == "failure":
click.echo("Error")
ws.close()
def on_open(ws):
if flags["waiting_for_restart"] and flags["seen_close"]:
click.echo(" Reconnected!")
else:
click.echo("Connected to server...")
def on_close(ws):
if flags["waiting_for_restart"] and flags["seen_close"]:
click.echo(".", nl=False)
else:
flags["seen_close"] = True
click.echo("Disconnected from server...")
socket = client.connect_socket(on_message=on_message,
on_open=on_open,
on_close=on_close)
r = client.post_json("plugin/softwareupdate/update", data=data)
try:
r.raise_for_status()
except requests.exceptions.HTTPError as e:
click.echo("Could not get update information from server, got {}".format(e))
sys.exit(1)
data = r.json()
to_be_updated = data["order"]
checks = data["checks"]
click.echo("Update in progress, updating:")
for name in to_be_updated:
click.echo("\t{}".format(name if not name in checks else checks[name]))
socket.wait()
if flags["waiting_for_restart"]:
if socket.reconnect(timeout=60):
click.echo("The update finished successfully.")
else:
click.echo("The update finished successfully but the server apparently didn't restart as expected.")
click.echo("Please restart the OctoPrint server.")
return [check_command, update_command]

View file

@ -17,10 +17,10 @@ from octoprint.settings import settings
from octoprint.plugin import plugin_manager
class VirtualPrinter(object):
command_regex = re.compile("[GM]\d+")
command_regex = re.compile("^([GMT])(\d+)")
sleep_regex = re.compile("sleep (\d+)")
sleep_after_regex = re.compile("sleep_after ([GM]\d+) (\d+)")
sleep_after_next_regex = re.compile("sleep_after_next ([GM]\d+) (\d+)")
sleep_after_regex = re.compile("sleep_after ([GMT]\d+) (\d+)")
sleep_after_next_regex = re.compile("sleep_after_next ([GMT]\d+) (\d+)")
custom_action_regex = re.compile("action_custom ([a-zA-Z0-9_]+)(\s+.*)?")
def __init__(self, seriallog_handler=None, read_timeout=5.0, write_timeout=10.0):
@ -180,130 +180,184 @@ class VirtualPrinter(object):
from octoprint._version import get_versions
self._send("OctoPrint VirtualPrinter v" + get_versions()["version"])
continue
elif data.startswith("!!DEBUG:"):
self._debugTrigger(data[len("!!DEBUG:"):].strip())
elif data.startswith("!!DEBUG:") or data.strip() == "!!DEBUG":
debug_command = ""
if data.startswith("!!DEBUG:"):
debug_command = data[len("!!DEBUG:"):].strip()
self._debugTrigger(debug_command)
continue
# if we are sending oks before command output, send it now
if len(data.strip()) > 0 and self._okBeforeCommandOutput:
self._sendOk()
#print "Send: %s" % (data.rstrip())
if 'M104' in data or 'M109' in data:
self._parseHotendCommand(data)
# actual command handling
command_match = VirtualPrinter.command_regex.match(data)
if command_match is not None:
command = command_match.group(0)
letter = command_match.group(1)
if 'M140' in data or 'M190' in data:
self._parseBedCommand(data)
try:
# if we have a method _gcode_G, _gcode_M or _gcode_T, execute that first
letter_handler = "_gcode_{}".format(letter)
if hasattr(self, letter_handler):
code = command_match.group(2)
handled = getattr(self, letter_handler)(code, data)
if handled:
continue
if 'M105' in data:
self._processTemperatureQuery()
continue
elif 'M20' in data:
if self._sdCardReady:
self._listSd()
elif 'M21' in data:
self._sdCardReady = True
self._send("SD card ok")
elif 'M22' in data:
self._sdCardReady = False
elif 'M23' in data:
if self._sdCardReady:
filename = data.split(None, 1)[1].strip()
self._selectSdFile(filename)
elif 'M24' in data:
if self._sdCardReady:
self._startSdPrint()
elif 'M25' in data:
if self._sdCardReady:
self._pauseSdPrint()
elif 'M26' in data:
if self._sdCardReady:
pos = int(re.search("S([0-9]+)", data).group(1))
self._setSdPos(pos)
elif 'M27' in data:
if self._sdCardReady:
self._reportSdStatus()
elif 'M28' in data:
if self._sdCardReady:
filename = data.split(None, 1)[1].strip()
self._writeSdFile(filename)
elif 'M29' in data:
if self._sdCardReady:
self._finishSdFile()
elif 'M30' in data:
if self._sdCardReady:
filename = data.split(None, 1)[1].strip()
self._deleteSdFile(filename)
elif "M114" in data:
# send dummy position report
output = "C: X:{} Y:{} Z:{} E:{}".format(self._lastX, self._lastY, self._lastZ, self._lastE)
if not self._okBeforeCommandOutput:
output = "ok " + output
self._send(output)
continue
elif "M117" in data:
# we'll just use this to echo a message, to allow playing around with pause triggers
self._send("echo:%s" % re.search("M117\s+(.*)", data).group(1))
elif "M400" in data:
self.buffered.join()
elif "M999" in data:
# mirror Marlin behaviour
self._send("Resend: 1")
elif data.startswith("T"):
self.currentExtruder = int(re.search("T(\d+)", data).group(1))
self._send("Active Extruder: %d" % self.currentExtruder)
elif "G20" in data:
self._unitModifier = 1.0 / 2.54
if self._lastX is not None:
self._lastX *= 2.54
if self._lastY is not None:
self._lastY *= 2.54
if self._lastZ is not None:
self._lastZ *= 2.54
if self._lastE is not None:
self._lastE *= 2.54
elif "G21" in data:
self._unitModifier = 1.0
if self._lastX is not None:
self._lastX /= 2.54
if self._lastY is not None:
self._lastY /= 2.54
if self._lastZ is not None:
self._lastZ /= 2.54
if self._lastE is not None:
self._lastE /= 2.54
elif "G90" in data:
self._relative = False
elif "G91" in data:
self._relative = True
elif "G92" in data:
self._setPosition(data)
# then look for a method _gcode_<command> and execute that if it exists
command_handler = "_gcode_{}".format(command)
if hasattr(self, command_handler):
handled = getattr(self, command_handler)(data)
if handled:
continue
elif data.startswith("G28"):
self._performMove(data)
finally:
# make sure that the debug sleepAfter and sleepAfterNext stuff works even
# if we continued above
if len(self._sleepAfter) or len(self._sleepAfterNext):
interval = None
if command in self._sleepAfter:
interval = self._sleepAfter[command]
elif command in self._sleepAfterNext:
interval = self._sleepAfterNext[command]
del self._sleepAfterNext[command]
elif data.startswith("G0") or data.startswith("G1") or data.startswith("G2") or data.startswith("G3"):
# simulate reprap buffered commands via a Queue with maxsize which internally simulates the moves
self.buffered.put(data)
if len(self._sleepAfter) or len(self._sleepAfterNext):
command_match = VirtualPrinter.command_regex.match(data)
if command_match is not None:
command = command_match.group(0)
interval = None
if command in self._sleepAfter:
interval = self._sleepAfter[command]
elif command in self._sleepAfterNext:
interval = self._sleepAfterNext[command]
del self._sleepAfterNext[command]
if interval is not None:
self._send("// sleeping for {interval} seconds".format(interval=interval))
time.sleep(interval)
if interval is not None:
self._send("// sleeping for {interval} seconds".format(interval=interval))
time.sleep(interval)
# if we are sending oks after command output, send it now
if len(data.strip()) > 0 and not self._okBeforeCommandOutput:
self._sendOk()
##~~ command implementations
def _gcode_T(self, code, data):
self.currentExtruder = int(code)
self._send("Active Extruder: %d" % self.currentExtruder)
def _gcode_M104(self, data):
self._parseHotendCommand(data)
_gcode_M109 = _gcode_M104
def _gcode_M140(self, data):
self._parseBedCommand(data)
_gcode_M190 = _gcode_M140
def _gcode_M105(self, data):
self._processTemperatureQuery()
return True
def _gcode_M20(self, data):
if self._sdCardReady:
self._listSd()
def _gcode_M21(self, data):
self._sdCardReady = True
self._send("SD card ok")
def _gcode_M22(self, data):
self._sdCardReady = False
def _gcode_M23(self, data):
if self._sdCardReady:
filename = data.split(None, 1)[1].strip()
self._selectSdFile(filename)
def _gcode_M24(self, data):
if self._sdCardReady:
self._startSdPrint()
def _gcode_M25(self, data):
if self._sdCardReady:
self._pauseSdPrint()
def _gcode_M26(self, data):
if self._sdCardReady:
pos = int(re.search("S([0-9]+)", data).group(1))
self._setSdPos(pos)
def _gcode_M27(self, data):
if self._sdCardReady:
self._reportSdStatus()
def _gcode_M28(self, data):
if self._sdCardReady:
filename = data.split(None, 1)[1].strip()
self._writeSdFile(filename)
def _gcode_M29(self, data):
if self._sdCardReady:
self._finishSdFile()
def _gcode_M30(self, data):
if self._sdCardReady:
filename = data.split(None, 1)[1].strip()
self._deleteSdFile(filename)
def _gcode_M114(self, data):
output = "C: X:{} Y:{} Z:{} E:{}".format(self._lastX, self._lastY, self._lastZ, self._lastE)
if not self._okBeforeCommandOutput:
output = "ok " + output
self._send(output)
return True
def _gcode_M117(self, data):
# we'll just use this to echo a message, to allow playing around with pause triggers
self._send("echo:%s" % re.search("M117\s+(.*)", data).group(1))
def _gcode_M400(self, data):
self.buffered.join()
def _gcode_M999(self, data):
# mirror Marlin behaviour
self._send("Resend: 1")
def _gcode_G20(self, data):
self._unitModifier = 1.0 / 2.54
if self._lastX is not None:
self._lastX *= 2.54
if self._lastY is not None:
self._lastY *= 2.54
if self._lastZ is not None:
self._lastZ *= 2.54
if self._lastE is not None:
self._lastE *= 2.54
def _gcode_G21(self, data):
self._unitModifier = 1.0
if self._lastX is not None:
self._lastX /= 2.54
if self._lastY is not None:
self._lastY /= 2.54
if self._lastZ is not None:
self._lastZ /= 2.54
if self._lastE is not None:
self._lastE /= 2.54
def _gcode_G90(self, data):
self._relative = False
def _gcode_G91(self, data):
self._relative = True
def _gcode_G92(self, data):
self._setPosition(data)
def _gcode_G28(self, data):
self._performMove(data)
def _gcode_G0(self, data):
# simulate reprap buffered commands via a Queue with maxsize which internally simulates the moves
self.buffered.put(data)
_gcode_G1 = _gcode_G0
_gcode_G2 = _gcode_G0
_gcode_G3 = _gcode_G0
##~~ further helpers
def _kill(self):
if not self._supportM112:
return
@ -331,7 +385,50 @@ class VirtualPrinter(object):
request_resend()
def _debugTrigger(self, data):
if data == "action_pause":
if data == "" or data == "help" or data == "?":
usage = """
OctoPrint Virtual Printer debug commands
help
?
| This help.
# Action Triggers
action_pause
| Sends a "// action:pause" action trigger to the host.
action_resume
| Sends a "// action:resume" action trigger to the host.
action_disconnect
| Sends a "// action:disconnect" action trigger to the
| host.
action_custom <action>[ <parameters>]
| Sends a custom "// action:<action> <parameters>"
| action trigger to the host.
# Communication Errors
dont_answer
| Will not acknowledge the next command.
trigger_resend_lineno
| Triggers a resend error with a line number mismatch
trigger_resend_checksum
| Triggers a resend error with a checksum mismatch
drop_connection
| Drops the serial connection
# Reply Timing / Sleeping
sleep <int:seconds>
| Sleep <seconds> s
sleep_after <str:command> <int:seconds>
| Sleeps <seconds> s after each execution of <command>
sleep_after_next <str:command> <int:seconds>
| Sleeps <seconds> s after execution of <command>
"""
for line in usage.split("\n"):
self._send("echo: {}".format(line.strip()))
elif data == "action_pause":
self._send("// action:pause")
elif data == "action_resume":
self._send("// action:resume")

View file

@ -49,6 +49,7 @@ admin_permission = Permission(RoleNeed("admin"))
user_permission = Permission(RoleNeed("user"))
# only import the octoprint stuff down here, as it might depend on things defined above to be initialized already
from octoprint import __version__, __branch__, __display_version__
from octoprint.printer import get_connection_options
from octoprint.printer.profile import PrinterProfileManager
from octoprint.printer.standard import Printer
@ -67,11 +68,9 @@ from . import util
UI_API_KEY = ''.join('%02X' % ord(z) for z in uuid.uuid4().bytes)
versions = octoprint._version.get_versions()
VERSION = versions['version']
BRANCH = versions['branch'] if 'branch' in versions else None
DISPLAY_VERSION = "%s (%s branch)" % (VERSION, BRANCH) if BRANCH else VERSION
del versions
VERSION = __version__
BRANCH = __branch__
DISPLAY_VERSION = __display_version__
LOCALES = []
LANGUAGES = set()
@ -108,15 +107,14 @@ def load_user(id):
#~~ startup code
class Server(object):
def __init__(self, configfile=None, basedir=None, host="0.0.0.0", port=5000, debug=False, allowRoot=False, logConf=None):
self._configfile = configfile
self._basedir = basedir
class Server():
def __init__(self, settings=None, plugin_manager=None, host="0.0.0.0", port=5000, debug=False, allow_root=False):
self._settings = settings
self._plugin_manager = plugin_manager
self._host = host
self._port = port
self._debug = debug
self._allowRoot = allowRoot
self._logConf = logConf
self._allow_root = allow_root
self._server = None
self._logger = None
@ -126,9 +124,14 @@ class Server(object):
self._template_searchpaths = []
def run(self):
if not self._allowRoot:
if not self._allow_root:
self._check_for_root()
if self._settings is None:
self._settings = settings()
if self._plugin_manager is None:
self._plugin_manager = octoprint.plugin.plugin_manager()
global app
global babel
@ -148,16 +151,14 @@ class Server(object):
from tornado.ioloop import IOLoop
from tornado.web import Application, RequestHandler
import sys
debug = self._debug
# first initialize the settings singleton and make sure it uses given configfile and basedir if available
s = settings(init=True, basedir=self._basedir, configfile=self._configfile)
self._logger = logging.getLogger(__name__)
pluginManager = self._plugin_manager
# then monkey patch a bunch of stuff
# monkey patch a bunch of stuff
util.tornado.fix_ioloop_scheduling()
util.flask.enable_additional_translations(additional_folders=[s.getBaseFolder("translations")])
util.flask.enable_additional_translations(additional_folders=[self._settings.getBaseFolder("translations")])
# setup app
self._setup_app()
@ -165,31 +166,28 @@ class Server(object):
# setup i18n
self._setup_i18n(app)
# then initialize logging
self._setup_logging(self._debug, self._logConf)
self._logger = logging.getLogger(__name__)
def exception_logger(exc_type, exc_value, exc_tb):
self._logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_tb))
sys.excepthook = exception_logger
self._logger.info("Starting OctoPrint %s" % DISPLAY_VERSION)
if self._settings.getBoolean(["serial", "log"]):
# enable debug logging to serial.log
logging.getLogger("SERIAL").setLevel(logging.DEBUG)
logging.getLogger("SERIAL").debug("Enabling serial logging")
# then initialize the plugin manager
pluginManager = octoprint.plugin.plugin_manager(init=True)
# load plugins
pluginManager.reload_plugins(startup=True, initialize_implementations=False)
printerProfileManager = PrinterProfileManager()
eventManager = events.eventManager()
analysisQueue = octoprint.filemanager.analysis.AnalysisQueue()
slicingManager = octoprint.slicing.SlicingManager(s.getBaseFolder("slicingProfiles"), printerProfileManager)
slicingManager = octoprint.slicing.SlicingManager(self._settings.getBaseFolder("slicingProfiles"), printerProfileManager)
storage_managers = dict()
storage_managers[octoprint.filemanager.FileDestinations.LOCAL] = octoprint.filemanager.storage.LocalFileStorage(s.getBaseFolder("uploads"))
storage_managers[octoprint.filemanager.FileDestinations.LOCAL] = octoprint.filemanager.storage.LocalFileStorage(self._settings.getBaseFolder("uploads"))
fileManager = octoprint.filemanager.FileManager(analysisQueue, slicingManager, printerProfileManager, initial_storage_managers=storage_managers)
printer = Printer(fileManager, analysisQueue, printerProfileManager)
appSessionManager = util.flask.AppSessionManager()
pluginLifecycleManager = LifecycleManager(pluginManager)
# setup access control
if s.getBoolean(["accessControl", "enabled"]):
userManagerName = s.get(["accessControl", "userManager"])
if self._settings.getBoolean(["accessControl", "enabled"]):
userManagerName = self._settings.get(["accessControl", "userManager"])
try:
clazz = octoprint.util.get_class(userManagerName)
userManager = clazz()
@ -209,19 +207,15 @@ class Server(object):
printer=printer,
app_session_manager=appSessionManager,
plugin_lifecycle_manager=pluginLifecycleManager,
data_folder=os.path.join(settings().getBaseFolder("data"), name),
data_folder=os.path.join(self._settings.getBaseFolder("data"), name),
user_manager=userManager
)
def settings_plugin_inject_factory(name, implementation):
if not isinstance(implementation, octoprint.plugin.SettingsPlugin):
return None
default_settings = implementation.get_settings_defaults()
get_preprocessors, set_preprocessors = implementation.get_settings_preprocessors()
plugin_settings = octoprint.plugin.plugin_settings(name,
defaults=default_settings,
get_preprocessors=get_preprocessors,
set_preprocessors=set_preprocessors)
plugin_settings = octoprint.plugin.plugin_settings_for_settings_plugin(name, implementation)
if plugin_settings is None:
return
return dict(settings=plugin_settings)
def settings_plugin_config_migration_and_cleanup(name, implementation):
@ -290,22 +284,22 @@ class Server(object):
app.wsgi_app = util.ReverseProxied(
app.wsgi_app,
s.get(["server", "reverseProxy", "prefixHeader"]),
s.get(["server", "reverseProxy", "schemeHeader"]),
s.get(["server", "reverseProxy", "hostHeader"]),
s.get(["server", "reverseProxy", "prefixFallback"]),
s.get(["server", "reverseProxy", "schemeFallback"]),
s.get(["server", "reverseProxy", "hostFallback"])
self._settings.get(["server", "reverseProxy", "prefixHeader"]),
self._settings.get(["server", "reverseProxy", "schemeHeader"]),
self._settings.get(["server", "reverseProxy", "hostHeader"]),
self._settings.get(["server", "reverseProxy", "prefixFallback"]),
self._settings.get(["server", "reverseProxy", "schemeFallback"]),
self._settings.get(["server", "reverseProxy", "hostFallback"])
)
secret_key = s.get(["server", "secretKey"])
secret_key = self._settings.get(["server", "secretKey"])
if not secret_key:
import string
from random import choice
chars = string.ascii_lowercase + string.ascii_uppercase + string.digits
secret_key = "".join(choice(chars) for _ in xrange(32))
s.set(["server", "secretKey"], secret_key)
s.save()
self._settings.set(["server", "secretKey"], secret_key)
self._settings.save()
app.secret_key = secret_key
loginManager = LoginManager()
loginManager.session_protection = "strong"
@ -316,9 +310,9 @@ class Server(object):
loginManager.init_app(app)
if self._host is None:
self._host = s.get(["server", "host"])
self._host = self._settings.get(["server", "host"])
if self._port is None:
self._port = s.getInt(["server", "port"])
self._port = self._settings.getInt(["server", "port"])
app.debug = self._debug
@ -332,7 +326,7 @@ class Server(object):
self._router = SockJSRouter(self._create_socket_connection, "/sockjs")
upload_suffixes = dict(name=s.get(["server", "uploads", "nameSuffix"]), path=s.get(["server", "uploads", "pathSuffix"]))
upload_suffixes = dict(name=self._settings.get(["server", "uploads", "nameSuffix"]), path=self._settings.get(["server", "uploads", "pathSuffix"]))
def mime_type_guesser(path):
from octoprint.filemanager import get_mime_type
@ -357,13 +351,22 @@ class Server(object):
server_routes = self._router.urls + [
# various downloads
(r"/downloads/timelapse/([^/]*\.mpg)", util.tornado.LargeResponseHandler, joined_dict(dict(path=s.getBaseFolder("timelapse")), download_handler_kwargs, no_hidden_files_validator)),
(r"/downloads/files/local/(.*)", util.tornado.LargeResponseHandler, joined_dict(dict(path=s.getBaseFolder("uploads")), download_handler_kwargs, no_hidden_files_validator, additional_mime_types)),
(r"/downloads/logs/([^/]*)", util.tornado.LargeResponseHandler, joined_dict(dict(path=s.getBaseFolder("logs")), download_handler_kwargs, admin_validator)),
(r"/downloads/timelapse/([^/]*\.mpg)", util.tornado.LargeResponseHandler, joined_dict(dict(path=self._settings.getBaseFolder("timelapse")),
download_handler_kwargs,
no_hidden_files_validator)),
(r"/downloads/files/local/(.*)", util.tornado.LargeResponseHandler, joined_dict(dict(path=self._settings.getBaseFolder("uploads")),
download_handler_kwargs,
no_hidden_files_validator,
additional_mime_types)),
(r"/downloads/logs/([^/]*)", util.tornado.LargeResponseHandler, joined_dict(dict(path=self._settings.getBaseFolder("logs")),
download_handler_kwargs,
admin_validator)),
# camera snapshot
(r"/downloads/camera/current", util.tornado.UrlProxyHandler, dict(url=s.get(["webcam", "snapshot"]), as_attachment=True, access_validation=util.tornado.access_validation_factory(app, loginManager, util.flask.user_validator))),
(r"/downloads/camera/current", util.tornado.UrlProxyHandler, dict(url=self._settings.get(["webcam", "snapshot"]),
as_attachment=True,
access_validation=util.tornado.access_validation_factory(app, loginManager, util.flask.user_validator))),
# generated webassets
(r"/static/webassets/(.*)", util.tornado.LargeResponseHandler, dict(path=os.path.join(s.getBaseFolder("generated"), "webassets")))
(r"/static/webassets/(.*)", util.tornado.LargeResponseHandler, dict(path=os.path.join(self._settings.getBaseFolder("generated"), "webassets")))
]
for name, hook in pluginManager.get_hooks("octoprint.server.http.routes").items():
try:
@ -390,7 +393,7 @@ class Server(object):
self._tornado_app = Application(server_routes)
max_body_sizes = [
("POST", r"/api/files/([^/]*)", s.getInt(["server", "uploads", "maxSize"])),
("POST", r"/api/files/([^/]*)", self._settings.getInt(["server", "uploads", "maxSize"])),
("POST", r"/api/languages", 5 * 1024 * 1024)
]
@ -416,25 +419,25 @@ class Server(object):
self._logger.debug("Adding maximum body size of {size}B for {method} requests to {route})".format(**locals()))
max_body_sizes.append((method, route, size))
self._server = util.tornado.CustomHTTPServer(self._tornado_app, max_body_sizes=max_body_sizes, default_max_body_size=s.getInt(["server", "maxSize"]))
self._server = util.tornado.CustomHTTPServer(self._tornado_app, max_body_sizes=max_body_sizes, default_max_body_size=self._settings.getInt(["server", "maxSize"]))
self._server.listen(self._port, address=self._host)
eventManager.fire(events.Events.STARTUP)
if s.getBoolean(["serial", "autoconnect"]):
(port, baudrate) = s.get(["serial", "port"]), s.getInt(["serial", "baudrate"])
if self._settings.getBoolean(["serial", "autoconnect"]):
(port, baudrate) = self._settings.get(["serial", "port"]), self._settings.getInt(["serial", "baudrate"])
printer_profile = printerProfileManager.get_default()
connectionOptions = get_connection_options()
if port in connectionOptions["ports"]:
printer.connect(port=port, baudrate=baudrate, profile=printer_profile["id"] if "id" in printer_profile else "_default")
# start up watchdogs
if s.getBoolean(["feature", "pollWatched"]):
if self._settings.getBoolean(["feature", "pollWatched"]):
# use less performant polling observer if explicitely configured
observer = PollingObserver()
else:
# use os default
observer = Observer()
observer.schedule(util.watchdog.GcodeWatchdogHandler(fileManager, printer), s.getBaseFolder("watched"))
observer.schedule(util.watchdog.GcodeWatchdogHandler(fileManager, printer), self._settings.getBaseFolder("watched"))
observer.start()
# run our startup plugins
@ -527,89 +530,12 @@ class Server(object):
except octoprint.users.UnknownUser:
pass
default_language = settings().get(["appearance", "defaultLanguage"])
default_language = self._settings.get(["appearance", "defaultLanguage"])
if default_language is not None and not default_language == "_default" and default_language in LANGUAGES:
return Locale.negotiate([default_language], LANGUAGES)
return request.accept_languages.best_match(LANGUAGES)
def _setup_logging(self, debug, logConf=None):
defaultConfig = {
"version": 1,
"formatters": {
"simple": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "DEBUG",
"formatter": "simple",
"stream": "ext://sys.stdout"
},
"file": {
"class": "logging.handlers.TimedRotatingFileHandler",
"level": "DEBUG",
"formatter": "simple",
"when": "D",
"backupCount": "1",
"filename": os.path.join(settings().getBaseFolder("logs"), "octoprint.log")
},
"serialFile": {
"class": "logging.handlers.RotatingFileHandler",
"level": "DEBUG",
"formatter": "simple",
"maxBytes": 2 * 1024 * 1024, # let's limit the serial log to 2MB in size
"filename": os.path.join(settings().getBaseFolder("logs"), "serial.log")
}
},
"loggers": {
"SERIAL": {
"level": "CRITICAL",
"handlers": ["serialFile"],
"propagate": False
},
"tornado.application": {
"level": "INFO"
},
"tornado.general": {
"level": "INFO"
},
"octoprint.server.util.flask": {
"level": "WARN"
}
},
"root": {
"level": "INFO",
"handlers": ["console", "file"]
}
}
if debug:
defaultConfig["root"]["level"] = "DEBUG"
if logConf is None:
logConf = os.path.join(settings().getBaseFolder("base"), "logging.yaml")
configFromFile = {}
if os.path.exists(logConf) and os.path.isfile(logConf):
import yaml
with open(logConf, "r") as f:
configFromFile = yaml.safe_load(f)
config = octoprint.util.dict_merge(defaultConfig, configFromFile)
logging.config.dictConfig(config)
logging.captureWarnings(True)
import warnings
warnings.simplefilter("always")
if settings().getBoolean(["serial", "log"]):
# enable debug logging to serial.log
logging.getLogger("SERIAL").setLevel(logging.DEBUG)
logging.getLogger("SERIAL").debug("Enabling serial logging")
def _setup_app(self):
@app.before_request
def before_request():
@ -820,10 +746,10 @@ class Server(object):
util.flask.fix_webassets_cache()
util.flask.fix_webassets_filtertool()
base_folder = settings().getBaseFolder("generated")
base_folder = self._settings.getBaseFolder("generated")
# clean the folder
if settings().getBoolean(["devel", "webassets", "clean_on_startup"]):
if self._settings.getBoolean(["devel", "webassets", "clean_on_startup"]):
import shutil
import errno
import sys
@ -887,16 +813,16 @@ class Server(object):
return base_folder
assets = CustomDirectoryEnvironment(app)
assets.debug = not settings().getBoolean(["devel", "webassets", "bundle"])
assets.debug = not self._settings.getBoolean(["devel", "webassets", "bundle"])
UpdaterType = type(util.flask.SettingsCheckUpdater)(util.flask.SettingsCheckUpdater.__name__, (util.flask.SettingsCheckUpdater,), dict(
updater=assets.updater
))
assets.updater = UpdaterType
enable_gcodeviewer = settings().getBoolean(["gcodeViewer", "enabled"])
preferred_stylesheet = settings().get(["devel", "stylesheet"])
minify = settings().getBoolean(["devel", "webassets", "minify"])
enable_gcodeviewer = self._settings.getBoolean(["gcodeViewer", "enabled"])
preferred_stylesheet = self._settings.get(["devel", "stylesheet"])
minify = self._settings.getBoolean(["devel", "webassets", "minify"])
dynamic_assets = util.flask.collect_plugin_assets(
enable_gcodeviewer=enable_gcodeviewer,
@ -1071,6 +997,3 @@ class LifecycleManager(object):
if callback in self._plugin_lifecycle_callbacks[event]:
self._plugin_lifecycle_callbacks[event].remove(callback)
if __name__ == "__main__":
server = Server()
server.run()

View file

@ -26,7 +26,7 @@ import octoprint.server
import octoprint.users
import octoprint.plugin
from werkzeug.contrib.cache import SimpleCache
from werkzeug.contrib.cache import BaseCache
#~~ monkey patching
@ -259,7 +259,59 @@ def passive_login():
#~~ cache decorator for cacheable views
_cache = SimpleCache()
class LessSimpleCache(BaseCache):
"""
Slightly improved version of :class:`SimpleCache`.
Setting ``default_timeout`` or ``timeout`` to ``-1`` will have no timeout be applied at all.
"""
def __init__(self, threshold=500, default_timeout=300):
BaseCache.__init__(self, default_timeout=default_timeout)
self._cache = {}
self.clear = self._cache.clear
self._threshold = threshold
def _prune(self):
if self.over_threshold():
now = time.time()
for idx, (key, (expires, _)) in enumerate(self._cache.items()):
if expires is not None and expires <= now or idx % 3 == 0:
self._cache.pop(key, None)
def get(self, key):
import pickle
now = time.time()
expires, value = self._cache.get(key, (0, None))
if expires is None or expires > now:
return pickle.loads(value)
def set(self, key, value, timeout=None):
import pickle
self._prune()
self._cache[key] = (self.calculate_timeout(timeout=timeout),
pickle.dumps(value, pickle.HIGHEST_PROTOCOL))
def add(self, key, value, timeout=None):
self.set(key, value, timeout=None)
self._cache.setdefault(key, self._cache[key])
def delete(self, key):
self._cache.pop(key, None)
def calculate_timeout(self, timeout=None):
if timeout is None:
timeout = self.default_timeout
if timeout is -1:
return None
return time.time() + timeout
def over_threshold(self):
if self._threshold is None:
return False
return len(self._cache) > self._threshold
_cache = LessSimpleCache()
def cached(timeout=5 * 60, key=lambda: "view/%s" % flask.request.path, unless=None, refreshif=None, unless_response=None):
def decorator(f):

View file

@ -50,7 +50,8 @@ def index():
render_kwargs = _get_render_kwargs(_templates, _plugin_names, _plugin_vars, now)
def get_cached_view(key, view):
return util.flask.cached(refreshif=lambda: force_refresh,
return util.flask.cached(timeout=-1,
refreshif=lambda: force_refresh,
key=lambda: "ui:{}:{}".format(key, g.locale),
unless_response=util.flask.cache_check_response_headers)(view)
@ -507,7 +508,8 @@ def robotsTxt():
@app.route("/i18n/<string:locale>/<string:domain>.js")
@util.flask.cached(refreshif=lambda: util.flask.cache_check_headers() or "_refresh" in request.values,
@util.flask.cached(timeout=-1,
refreshif=lambda: util.flask.cache_check_headers() or "_refresh" in request.values,
key=lambda: "{}:{}".format(request.path, g.locale))
def localeJs(locale, domain):
messages = dict()

View file

@ -379,7 +379,7 @@ GCODE.renderer = (function(){
};
var drawLayer = function(layerNum, fromProgress, toProgress, isNotCurrentLayer){
console.log("Drawing layer " + layerNum + " from " + fromProgress + " to " + toProgress + " (current: " + !isNotCurrentLayer + ")");
log.trace("Drawing layer " + layerNum + " from " + fromProgress + " to " + toProgress + " (current: " + !isNotCurrentLayer + ")");
var i;

View file

@ -26,6 +26,71 @@ $(function() {
});
});
//~~ some CoreUI specific stuff we put into OctoPrint.coreui
OctoPrint.coreui = (function() {
var exports = {
browserTabVisibility: undefined,
selectedTab: undefined
};
var browserVisibilityCallbacks = [];
var getHiddenProp = function() {
var prefixes = ["webkit", "moz", "ms", "o"];
// if "hidden" is natively supported just return it
if ("hidden" in document) {
return "hidden"
}
// otherwise loop over all the known prefixes until we find one
var vendorPrefix = _.find(prefixes, function(prefix) {
return (prefix + "Hidden" in document);
});
if (vendorPrefix !== undefined) {
return vendorPrefix + "Hidden";
}
// nothing found
return undefined;
};
var isHidden = function() {
var prop = getHiddenProp();
if (!prop) return false;
return document[prop];
};
var updateBrowserVisibility = function() {
var visible = !isHidden();
exports.browserTabVisible = visible;
_.each(browserVisibilityCallbacks, function(callback) {
callback(visible);
})
};
// register for browser visibility tracking
var prop = getHiddenProp();
if (!prop) return undefined;
var eventName = prop.replace(/[H|h]idden/, "") + "visibilitychange";
document.addEventListener(eventName, updateBrowserVisibility);
updateBrowserVisibility();
// exports
exports.isVisible = function() { return !isHidden() };
exports.onBrowserVisibilityChange = function(callback) {
browserVisibilityCallbacks.push(callback);
};
return exports;
})();
//~~ AJAX setup
// work around a stupid iOS6 bug where ajax requests get cached and only work once, as described at
@ -416,11 +481,17 @@ $(function() {
$('.nav-pills, .nav-tabs').tabdrop();
// Allow components to react to tab change
var onTabChange = function(current, previous) {
log.debug("Selected OctoPrint tab changed: previous = " + previous + ", current = " + current);
OctoPrint.coreui.selectedTab = current;
callViewModels(allViewModels, "onTabChange", [current, previous]);
};
var tabs = $('#tabs a[data-toggle="tab"]');
tabs.on('show', function (e) {
var current = e.target.hash;
var previous = e.relatedTarget.hash;
callViewModels(allViewModels, "onTabChange", [current, previous]);
onTabChange(current, previous);
});
tabs.on('shown', function (e) {
@ -429,6 +500,8 @@ $(function() {
callViewModels(allViewModels, "onAfterTabChange", [current, previous]);
});
onTabChange(OCTOPRINT_INITIAL_TAB);
// Fix input element click problems on dropdowns
$(".dropdown input, .dropdown label").click(function(e) {
e.stopPropagation();
@ -522,7 +595,14 @@ $(function() {
callViewModels(allViewModels, "onAllBound", [allViewModels]);
log.info("... binding done");
// startup complete
callViewModels(allViewModels, "onStartupComplete");
// make sure we can track the browser tab visibility
OctoPrint.coreui.onBrowserVisibilityChange(function(status) {
log.debug("Browser tab is now " + (status ? "visible" : "hidden"));
callViewModels(allViewModels, "onBrowserTabVisibilityChange", [status]);
});
};
if (!_.has(viewModelMap, "settingsViewModel")) {

View file

@ -357,31 +357,51 @@ $(function() {
self.onSettingsBeforeSave = self.updateRotatorWidth;
self._disableWebcam = function() {
// only disable webcam stream if tab is out of focus for more than 5s, otherwise we might cause
// more load by the constant connection creation than by the actual webcam stream
self.webcamDisableTimeout = setTimeout(function () {
$("#webcam_image").attr("src", "");
}, 5000);
};
self._enableWebcam = function() {
if (OctoPrint.coreui.selectedTab != "#control" || !OctoPrint.coreui.browserTabVisible) {
return;
}
if (self.webcamDisableTimeout != undefined) {
clearTimeout(self.webcamDisableTimeout);
}
var webcamImage = $("#webcam_image");
var currentSrc = webcamImage.attr("src");
if (currentSrc === undefined || currentSrc.trim() == "") {
var newSrc = CONFIG_WEBCAM_STREAM;
if (CONFIG_WEBCAM_STREAM.lastIndexOf("?") > -1) {
newSrc += "&";
} else {
newSrc += "?";
}
newSrc += new Date().getTime();
self.updateRotatorWidth();
webcamImage.attr("src", newSrc);
}
};
self.onTabChange = function (current, previous) {
if (current == "#control") {
if (self.webcamDisableTimeout != undefined) {
clearTimeout(self.webcamDisableTimeout);
}
var webcamImage = $("#webcam_image");
var currentSrc = webcamImage.attr("src");
if (currentSrc === undefined || currentSrc.trim() == "") {
var newSrc = CONFIG_WEBCAM_STREAM;
if (CONFIG_WEBCAM_STREAM.lastIndexOf("?") > -1) {
newSrc += "&";
} else {
newSrc += "?";
}
newSrc += new Date().getTime();
self.updateRotatorWidth();
webcamImage.attr("src", newSrc);
}
self._enableWebcam();
} else if (previous == "#control") {
// only disable webcam stream if tab is out of focus for more than 5s, otherwise we might cause
// more load by the constant connection creation than by the actual webcam stream
self.webcamDisableTimeout = setTimeout(function () {
$("#webcam_image").attr("src", "");
}, 5000);
self._disableWebcam();
}
};
self.onBrowserTabVisibilityChange = function(status) {
if (status) {
self._enableWebcam();
} else {
self._disableWebcam();
}
};

View file

@ -356,7 +356,7 @@ $(function() {
if(self.loadedFilename
&& self.loadedFilename == data.job.file.name
&& self.loadedFileDate == data.job.file.date) {
if (self.tabActive && self.currentlyPrinting && self.renderer_syncProgress() && !self.waitForApproval()) {
if (OctoPrint.coreui.browserTabVisible && self.tabActive && self.currentlyPrinting && self.renderer_syncProgress() && !self.waitForApproval()) {
var cmdIndex = GCODE.gCodeReader.getCmdIndexForPercentage(data.progress.completion);
if(cmdIndex){
GCODE.renderer.render(cmdIndex.layer, 0, cmdIndex.cmd);
@ -502,12 +502,11 @@ $(function() {
self.onBeforeBinding = function() {
self.initialize();
}
};
self.onTabChange = function(current, previous) {
self.tabActive = current == "#gcode";
}
};
}
OCTOPRINT_VIEWMODELS.push([

View file

@ -34,4 +34,12 @@
var OCTOPRINT_VIEWMODELS = [];
var ADDITIONAL_VIEWMODELS = [];
var OCTOPRINT_ADDITIONAL_BINDINGS = [];
{% if templates.tab.order %}
{% set first_tab = templates.tab.order[0] %}
{% set entry, data = templates.tab.entries[first_tab] %}
var OCTOPRINT_INITIAL_TAB = "#{{ data._div }}";
{% else %}
var OCTOPRINT_INITIAL_TAB = undefined;
{% endif %}
</script>

View file

@ -26,37 +26,37 @@
<div class="jog-panel" style="display: none;" data-bind="visible: loginState.isUser">
<!-- XY jogging control panel -->
<div class="jog-panel">
<div id="control-jog-xy" class="jog-panel">
<h1>X/Y</h1>
<div>
<button class="btn box" id="control-yinc" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendJogCommand('y',1) }"><i class="icon-arrow-up"></i></button>
<button id="control-yinc" class="btn box" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendJogCommand('y',1) }"><i class="icon-arrow-up"></i></button>
</div>
<div>
<button class="btn box pull-left" id="control-xdec" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendJogCommand('x',-1) }"><i class="icon-arrow-left"></i></button>
<button class="btn box pull-left" id="control-xyhome" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendHomeCommand(['x', 'y']) }"><i class="icon-home"></i></button>
<button class="btn box pull-left" id="control-xinc" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendJogCommand('x',1) }"><i class="icon-arrow-right"></i></button>
<button id="control-xdec" class="btn box pull-left" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendJogCommand('x',-1) }"><i class="icon-arrow-left"></i></button>
<button id="control-xyhome" class="btn box pull-left" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendHomeCommand(['x', 'y']) }"><i class="icon-home"></i></button>
<button id="control-xinc" class="btn box pull-left" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendJogCommand('x',1) }"><i class="icon-arrow-right"></i></button>
</div>
<div>
<button class="btn box" id="control-ydec" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendJogCommand('y',-1) }"><i class="icon-arrow-down"></i></button>
<button id="control-ydec" class="btn box" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendJogCommand('y',-1) }"><i class="icon-arrow-down"></i></button>
</div>
</div>
<!-- Z jogging control panel -->
<div class="jog-panel">
<div id="control-jog-z" class="jog-panel">
<h1>Z</h1>
<div>
<button class="btn box" id="control-zinc" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendJogCommand('z',1) }"><i class="icon-arrow-up"></i></button>
<button id="control-zinc" class="btn box" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendJogCommand('z',1) }"><i class="icon-arrow-up"></i></button>
</div>
<div>
<button class="btn box" id="control-zhome" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendHomeCommand(['z']) }"><i class="icon-home"></i></button>
<button id="control-zhome" class="btn box" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendHomeCommand(['z']) }"><i class="icon-home"></i></button>
</div>
<div>
<button class="btn box" id="control-zdec" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendJogCommand('z',-1) }"><i class="icon-arrow-down"></i></button>
<button id="control-zdec" class="btn box" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendJogCommand('z',-1) }"><i class="icon-arrow-down"></i></button>
</div>
</div>
<!-- Jog distance -->
<div class="distance">
<div class="btn-group" data-toggle="buttons-radio" id="jog_distance">
<div id="control-jog-distance" class="distance">
<div id="jog_distance" class="btn-group" data-toggle="buttons-radio">
<button type="button" id="control-distance01" class="btn distance" data-distance="0.1" data-bind="enable: loginState.isUser()">0.1</button>
<button type="button" id="control-distance1" class="btn distance" data-distance="1" data-bind="enable: loginState.isUser()">1</button>
<button type="button" id="control-distance10" class="btn distance active" data-distance="10" data-bind="enable: loginState.isUser()">10</button>
@ -65,11 +65,13 @@
</div>
<!-- Feed rate -->
<input type="number" style="width: 153px" data-bind="slider: {min: 50, max: 150, step: 1, value: feedRate, tooltip: 'hide'}">
<button class="btn btn-block" style="width: 169px" data-bind="enable: isOperational() && loginState.isUser(), click: function() { $root.sendFeedRateCommand() }">{{ _('Feed rate:') }}<span data-bind="text: feedRate() + '%'"></span></button>
<div id="control-jog-feedrate" class="jog-panel">
<input type="number" style="width: 153px" data-bind="slider: {min: 50, max: 150, step: 1, value: feedRate, tooltip: 'hide'}">
<button class="btn btn-block" style="width: 169px" data-bind="enable: isOperational() && loginState.isUser(), click: function() { $root.sendFeedRateCommand() }">{{ _('Feed rate:') }}<span data-bind="text: feedRate() + '%'"></span></button>
</div>
</div>
<!-- Extrusion control panel -->
<div class="jog-panel" style="display: none;" data-bind="visible: loginState.isUser">
<div id="control-jog-extrusion" class="jog-panel" style="display: none;" data-bind="visible: loginState.isUser">
<h1>Tool (E)</h1>
<div>
<div class="btn-group control-box">
@ -94,7 +96,7 @@
</div>
</div>
<!-- General control panel -->
<div class="jog-panel" style="display: none;" data-bind="visible: loginState.isUser">
<div id="control-jog-general" class="jog-panel" style="display: none;" data-bind="visible: loginState.isUser">
<h1>{{ _('General') }}</h1>
<div>
<button class="btn btn-block control-box" data-bind="enable: isOperational() && !isPrinting() && loginState.isUser(), click: function() { $root.sendCustomCommand({type:'command',command:'M18'}) }">{{ _('Motors off') }}</button>
@ -104,7 +106,7 @@
</div>
<!-- Container for custom controls -->
<div style="clear: both; display: none;" data-bind="visible: loginState.isUser, template: { name: $root.displayMode, foreach: controls }"></div>
<div id="control-jog-custom" style="clear: both; display: none;" data-bind="visible: loginState.isUser, template: { name: $root.displayMode, foreach: controls }"></div>
<!-- Templates for custom controls -->
<script type="text/html" id="customControls_containerTemplate_nameless">

View file

@ -1,5 +1,5 @@
{% if enableTemperatureGraph %}
<div class="row" style="padding-left: 20px">
<div class="row-fluid">
<div id="temperature-graph"></div>
</div>
{% endif %}
@ -23,7 +23,7 @@
<td style="text-align: right; vertical-align: middle" data-bind="html: formatTemperature(actual())"></td>
<td style="vertical-align: middle; overflow: visible">
<div class="input-append">
<input type="text" class="input-mini text-right tempInput" data-bind="attr: {placeholder: cleanTemperature(target()) }, value: newTarget, enable: $root.isOperational() && $root.loginState.isUser(), event: { keyup: function(d, e) {$root.handleEnter(e, 'target', $data);} }">
<input type="number" class="input-mini text-right tempInput" data-bind="attr: {placeholder: cleanTemperature(target()) }, value: newTarget, enable: $root.isOperational() && $root.loginState.isUser(), event: { keyup: function(d, e) {$root.handleEnter(e, 'target', $data);} }">
<span class="add-on">&deg;C</span>
<div class="btn-group">
<button type="submit" data-bind="click: $parent.setTarget, enable: $root.isOperational() && $root.loginState.isUser()" class="btn">{{ _('Set') }}</button>

View file

@ -22,10 +22,9 @@
</div>
<div>
<div><small><a href="#" class="muted" onclick="$(this).children().toggleClass('icon-caret-right icon-caret-down').parent().parent().parent().next().slideToggle('fast')"><i class="icon-caret-right"></i> {{ _('Advanced options') }}</a></small></div>
<div><small><a href="javascript:void(0);" class="muted" onclick="$(this).children().toggleClass('icon-caret-right icon-caret-down').parent().parent().parent().next().slideToggle('fast')"><i class="icon-caret-right"></i> {{ _('Advanced options') }}</a></small></div>
<div class="hide">
<button class="btn btn-block" type="button" data-bind="click: fakeAck, enable: isOperational() && loginState.isUser()">{{ _("Fake Acknowledgement") }}</button>
<small class="muted">{{ _("If acknowledgements (\"ok\"s) sent by the firmware get lost due to issues with the serial communication to your printer, OctoPrint's communication with it can become stuck. If that happens, this can help. Please be advised that such occurences hint at general communication issues with your printer which will probably negatively influence your printing results and which you should therefore try to resolve!") }}</small>
</div>
</div>

View file

@ -547,6 +547,36 @@ def dict_contains_keys(keys, dictionary):
return True
class fallback_dict(dict):
def __init__(self, custom, *fallbacks):
self.custom = custom
self.fallbacks = fallbacks
def __getitem__(self, item):
for dictionary in self._all():
if item in dictionary:
return dictionary[item]
raise KeyError()
def __setitem__(self, key, value):
self.custom[key] = value
def __delitem__(self, key):
for dictionary in self._all():
if key in dictionary:
del dictionary[key]
def keys(self):
result = set()
for dictionary in self._all():
result += dictionary.keys()
return result
def _all(self):
return [self.custom] + list(self.fallbacks)
class Object(object):
pass
@ -589,11 +619,25 @@ def address_for_client(host, port):
@contextlib.contextmanager
def atomic_write(filename, mode="w+b", prefix="tmp", suffix=""):
temp_config = tempfile.NamedTemporaryFile(mode=mode, prefix=prefix, suffix=suffix, delete=False)
yield temp_config
temp_config.close()
try:
yield temp_config
finally:
temp_config.close()
shutil.move(temp_config.name, filename)
@contextlib.contextmanager
def tempdir(ignore_errors=False, onerror=None, **kwargs):
import tempfile
import shutil
dirpath = tempfile.mkdtemp(**kwargs)
try:
yield dirpath
finally:
shutil.rmtree(dirpath, ignore_errors=ignore_errors, onerror=onerror)
def bom_aware_open(filename, encoding="ascii", mode="r", **kwargs):
import codecs
@ -614,7 +658,7 @@ def bom_aware_open(filename, encoding="ascii", mode="r", **kwargs):
if header.startswith(bom):
encoding += "-sig"
return codecs.open(filename, encoding=encoding, **kwargs)
return codecs.open(filename, encoding=encoding, mode=mode, **kwargs)
class RepeatedTimer(threading.Thread):

View file

@ -1788,6 +1788,12 @@ class MachineCom(object):
return None, # Don't send the M0 or M1 to the machine, as M0 and M1 are handled as an LCD menu pause.
_gcode_M1_queuing = _gcode_M0_queuing
def _gcode_M25_queuing(self, cmd, cmd_type=None):
# M25 while not printing from SD will be handled as pause. This way it can be used as another marker
# for GCODE induced pausing. Send it to the printer anyway though.
if self.isPrinting() and not self.isSdPrinting():
self.setPause(True)
def _gcode_M104_sent(self, cmd, cmd_type=None):
toolNum = self._currentTool
toolMatch = regexes_parameters["intT"].search(cmd)

66
src/octoprint/util/dev.py Normal file
View file

@ -0,0 +1,66 @@
# coding=utf-8
"""
This module provides a bunch of utility methods and helpers FOR DEVELOPMENT ONLY.
"""
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
import contextlib
@contextlib.contextmanager
def duration_log(context=None, log=None):
"""
Context manager to log the duration of the wrapped code block.
If no ``log`` function is provided will use a ``debug`` python logger for
``octoprint.util.dev``.
``context`` can be set to give some textual context in the output.
Arguments:
context (str): A custom string to give some textual context in the output.
log (callable): The log function to use to log the execution duration.
"""
if log is None:
import logging
log = logging.getLogger(__name__).debug
import time
start = time.time()
try:
yield
finally:
end = time.time()
duration = end - start
if context:
message = "Execution of {name} took {duration}s"
else:
message = "Execution of codeblock took {duration}s"
log(message.format(name=context, duration=duration))
def log_duration(log=None, with_args=False):
"""
Decorator that logs the execution duration of the annotated function.
Arguments:
log (callable): The logging function to use.
with_args (bool): Whether to include the calling arguments in the logged
output or not.
"""
def decorator(f):
def wrapped(*args, **kwargs):
if with_args:
args_str = ", ".join(map(lambda x: repr(x), args))
kwargs_str = ", ".join(map(lambda item: "{}={}".format(item[0], repr(item[1])), kwargs.items()))
sep = ", " if args_str and kwargs_str else ""
arguments = "".join([args_str, sep, kwargs_str])
else:
arguments = "..."
context = "{name}({arguments})".format(name=f.__name__, arguments=arguments)
with duration_log(context=context, log=log):
return f(*args, **kwargs)
return wrapped
return decorator

View file

@ -114,7 +114,7 @@ class PipCaller(CommandlineCaller):
self._version = None
self.refresh = False
def execute(self, *args):
def execute(self, *args, **kwargs):
if self.refresh:
self.trigger_refresh()
@ -151,20 +151,10 @@ class PipCaller(CommandlineCaller):
if self._use_sudo or self.force_sudo:
command = ["sudo"] + command
return self.call(command)
return self.call(command, **kwargs)
def _setup_pip(self):
pip_command = self.configured
if pip_command is not None and pip_command.startswith("sudo "):
pip_command = pip_command[len("sudo "):]
pip_sudo = True
else:
pip_sudo = False
if pip_command is None:
pip_command = self._autodetect_pip()
pip_command, pip_sudo = self._get_pip_command()
if pip_command is None:
return
@ -211,7 +201,22 @@ class PipCaller(CommandlineCaller):
self._virtual_env = pip_virtual_env
self._install_dir = pip_install_dir
def _autodetect_pip(self):
def _get_pip_command(self):
pip_command = self.configured
if pip_command is not None and pip_command.startswith("sudo "):
pip_command = pip_command[len("sudo "):]
pip_sudo = True
else:
pip_sudo = False
if pip_command is None:
pip_command = self.autodetect_pip()
return pip_command, pip_sudo
@classmethod
def autodetect_pip(cls):
import os
python_command = sys.executable
binary_dir = os.path.dirname(python_command)
@ -325,3 +330,22 @@ class PipCaller(CommandlineCaller):
else:
self._logger.debug("Could not detect desired output from testballoon install, got this instead: {}".format(" ".join(sarge_command), output))
return False, False, False, None
class LocalPipCaller(PipCaller):
def _get_pip_command(self):
return self.autodetect_pip(), False
def _check_pip_setup(self, pip_command):
import sys
import os
from distutils.sysconfig import get_python_lib
virtual_env = hasattr(sys, "real_prefix")
install_dir = get_python_lib()
writable = os.access(install_dir, os.W_OK)
return writable or not virtual_env, \
not writable and not virtual_env and site.ENABLE_USER_SITE, \
virtual_env, \
install_dir

View file

@ -0,0 +1,384 @@
# coding=utf-8
from __future__ import absolute_import
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
__copyright__ = "Copyright (C) 2015 The OctoPrint Project - Released under terms of the AGPLv3 License"
import requests
import time
apikey = None
baseurl = None
class SocketTimeout(BaseException):
pass
class SocketClient(object):
def __init__(self, url, use_ssl=False, daemon=True, **kwargs):
self._url = url
self._use_ssl = use_ssl
self._daemon = daemon
self._ws_kwargs = kwargs
self._seen_open = False
self._seen_close = False
self._waiting_for_reconnect = False
self._ws = None
self._thread = None
# hello world
def _prepare(self):
"""Prepares socket and thread for a new connection."""
# close the socket if it's currently open
if self._ws is not None:
try:
self._ws.close()
except:
# we can't handle that in any meaningful way right now
pass
# prepare a bunch of callback methods
import functools
callbacks = dict()
for callback in ("on_open", "on_message", "on_error", "on_close"):
# now, normally we could just use functools.partial for something like
# this, but websocket does a type check against a python function type
# which the use of partial makes fail, so we have to use a lambda
# wrapper here, and since we need to pick the current value of
# callback from the current scope we need a factory method for that...
def factory(cb):
return lambda *fargs, **fkwargs: functools.partial(self._on_callback, cb)(*fargs, **fkwargs)
callbacks[callback] = factory(callback)
# initialize socket instance with url and callbacks
import websocket
kwargs = dict(self._ws_kwargs)
kwargs.update(callbacks)
self._ws = websocket.WebSocketApp(self._url, **kwargs)
# initialize thread
import threading
self._thread = threading.Thread(target=self._on_thread_run)
self._thread.daemon = self._daemon
def _on_thread_run(self):
"""Has the socket run forever (aka until closed...)."""
self._ws.run_forever()
def _on_callback(self, cb, *args, **kwargs):
"""
Callback for socket events.
Will call any callback method defined on ``self`` that matches ``cb``
prefixed with an ``_ws_``, then will call any callback method provided in
the socket keyword arguments (``self._ws_kwargs``) that matches ``cb``.
Calling args and kwargs will be the ones passed to ``_on_callback``.
Arguments:
cb (str): the callback type
"""
internal = "_ws_" + cb
if hasattr(self, internal):
cb_func = getattr(self, internal)
if callable(cb_func):
cb_func(*args, **kwargs)
cb_func = self._ws_kwargs.get(cb, None)
if callable(cb_func):
cb_func(*args, **kwargs)
def _ws_on_open(self, ws):
"""
Callback for socket on_open event.
Used only to track active reconnection attempts.
"""
if not self._waiting_for_reconnect:
return
if ws != self._ws:
return
self._seen_open = True
def _ws_on_close(self, ws):
"""
Callback for socket on_close event.
Used only to track active reconnection attempts.
"""
if not self._waiting_for_reconnect:
return
if ws != self._ws:
return
self._seen_close = True
def connect(self):
"""Connects the socket."""
self._prepare()
self._thread.start()
def wait(self, timeout=None):
"""Waits for the closing of the socket or the timeout."""
start = None
def test_condition():
if timeout and start and start + timeout > time.time():
raise SocketTimeout()
start = time.time()
while self._thread.is_alive():
test_condition()
self._thread.join(timeout=1.0)
@property
def is_connected(self):
"""Whether the web socket is connected or not."""
return self._thread and self._ws and self._thread.is_alive()
def disconnect(self):
"""Disconnect the web socket."""
if self._ws:
self._ws.close()
def reconnect(self, timeout=None, disconnect=True):
"""
Tries to reconnect to the web socket.
If timeout is set will try to reconnect over the specified timeout in seconds
and return False if the connection could not be re-established.
If no timeout is provided, the method will block until the connection could
be re-established.
If disconnect is set to ``True`` will disconnect the socket explictly
first if it is currently connected.
Arguments:
timeout (number): timeout in seconds to wait for the reconnect to happen.
disconnect (bool): Whether to disconnect explicitly from the socket if
a connection is currently established (True, default) or not (False).
Returns:
bool - True if the reconnect was successful, False otherwise.
"""
self._seen_close = False
self._seen_open = False
self._waiting_for_reconnect = True
if not self.is_connected:
# not connected, so we are already closed
self._seen_close = True
elif disconnect:
# connected and disconnect is True, so we disconnect
self.disconnect()
start = None
if timeout:
timeout_condition = lambda: start is not None and time.time() > start + timeout
else:
timeout_condition = lambda: False
start = time.time()
while not timeout_condition():
if self._seen_close and self._seen_open:
# we saw a connection close and open, so a reconnect, success!
return True
else:
# try to connect
self.connect()
# sleep a bit
time.sleep(1.0)
# if we land here the timeout condition became True without us seeing
# a reconnect, that's a failure
return False
def build_base_url(https=False, httpuser=None, httppass=None, host=None, port=None, prefix=None):
protocol = "https" if https else "http"
httpauth = "{}:{}@".format(httpuser, httppass) if httpuser and httppass else ""
host = host if host else "127.0.0.1"
port = ":{}".format(port) if port else ":5000"
prefix = prefix if prefix else ""
return "{}://{}{}{}{}".format(protocol, httpauth, host, port, prefix)
def init_client(settings, https=False, httpuser=None, httppass=None, host=None, port=None, prefix=None):
"""
Initializes the API client with the provided settings.
Basically a convenience method to set ``apikey`` and ``baseurl`` from settings
and/or command line arguments.
Arguments:
settings (octoprint.settings.Settings): A :class:`~octoprint.settings.Settings` instance to use
for client configuration
https (bool): Whether to connect via HTTPS (True) or not (False, default)
httpuser (str or None): HTTP Basic Auth username to use. No Basic Auth will be
used if unset.
httppass (str or None): HTTP Basic Auth password to use. No Basic Auth will be
used if unset.
host (str or None): Host to connect to, overrides data from settings if set.
port (int or None): Port to connect to, overrides data from settings if set.
prefix (str or None): Path prefix, overrides data from settings if set.
"""
settings_host = settings.get(["server", "host"])
settings_port = settings.getInt(["server", "port"])
settings_apikey = settings.get(["api", "key"])
global apikey, baseurl
apikey = settings_apikey
baseurl = build_base_url(https=https,
httpuser=httpuser,
httppass=httppass,
host=host or settings_host if settings_host != "0.0.0.0" else "127.0.0.1",
port=port or settings_port,
prefix=prefix)
def prepare_request(method=None, path=None, params=None):
url = None
if baseurl:
while path.startswith("/"):
path = path[1:]
url = baseurl + "/" + path
return requests.Request(method=method, url=url, params=params, headers={"X-Api-Key": apikey}).prepare()
def request(method, path, data=None, files=None, encoding=None, params=None):
s = requests.Session()
request = prepare_request(method, path, params=params)
if data or files:
if encoding == "json":
request.prepare_body(None, None, json=data)
else:
request.prepare_body(data, files=files)
response = s.send(request)
return response
def get(path, params=None):
return request("GET", path, params=params)
def post(path, data, encoding=None, params=None):
return request("POST", path, data=data, encoding=encoding, params=params)
def post_json(path, data, params=None):
return post(path, data, encoding="json", params=params)
def post_command(path, command, additional=None):
data = dict(command=command)
if additional:
data.update(additional)
return post_json(path, data, params=data)
def upload(path, file_path, additional=None, file_name=None, content_type=None, params=None):
import os
if not os.path.isfile(file_path):
raise ValueError("{} cannot be uploaded since it is not a file".format(file_path))
if file_name is None:
file_name = os.path.basename(file_path)
with open(file_path, "rb") as fp:
if content_type:
files = dict(file=(file_name, fp, content_type))
else:
files = dict(file=(file_name, fp))
response = request("POST", path, data=additional, files=files, params=params)
return response
def delete(path, params=None):
return request("DELETE", path, params=params)
def patch(path, data, encoding=None, params=None):
return request("PATCH", path, data=data, encoding=encoding, params=params)
def put(path, data, encoding=None, params=None):
return request("PUT", path, data=data, encoding=encoding, params=params)
def connect_socket(**kwargs):
import uuid
import random
import json
# creates websocket URL for SockJS according to
# - http://sockjs.github.io/sockjs-protocol/sockjs-protocol-0.3.3.html#section-37
# - http://sockjs.github.io/sockjs-protocol/sockjs-protocol-0.3.3.html#section-50
url = "ws://{}/sockjs/{:0>3d}/{}/websocket".format(
baseurl[baseurl.find("//") + 2:], # host + port + prefix, but no protocol
random.randrange(0, stop=999), # server_id
uuid.uuid4() # session_id
)
use_ssl = baseurl.startswith("https:")
on_open_cb = kwargs.get("on_open", None)
on_heartbeat_cb = kwargs.get("on_heartbeat", None)
on_message_cb = kwargs.get("on_message", None)
on_close_cb = kwargs.get("on_close", None)
on_error_cb = kwargs.get("on_error", None)
daemon = kwargs.get("daemon", True)
def on_message(ws, message):
message_type = message[0]
if message_type == "h":
# "heartbeat" message
if callable(on_heartbeat_cb):
on_heartbeat_cb(ws)
return
elif message_type == "o":
# "open" message
return
elif message_type == "c":
# "close" message
return
if not callable(on_message_cb):
return
message_body = message[1:]
if not message_body:
return
data = json.loads(message_body)
if message_type == "m":
data = [data,]
for d in data:
for internal_type, internal_message in d.items():
on_message_cb(ws, internal_type, internal_message)
def on_open(ws):
if callable(on_open_cb):
on_open_cb(ws)
def on_close(ws):
if callable(on_close_cb):
on_close_cb(ws)
def on_error(ws, error):
if callable(on_error_cb):
on_error_cb(ws, error)
socket = SocketClient(url,
use_ssl=use_ssl,
daemon=daemon,
on_open=on_open,
on_message=on_message,
on_close=on_close,
on_error=on_error)
socket.connect()
return socket

484
tests/test_daemon.py Normal file
View file

@ -0,0 +1,484 @@
# coding=utf-8
from __future__ import (print_function, absolute_import)
import unittest
import mock
import octoprint.daemon
class ExpectedExit(BaseException):
pass
class DaemonTest(unittest.TestCase):
def setUp(self):
run_method = mock.MagicMock()
echo_method = mock.MagicMock()
error_method = mock.MagicMock()
class TestDaemon(octoprint.daemon.Daemon):
def run(self):
run_method()
def echo(cls, line):
echo_method(line)
def error(cls, line):
error_method(line)
self.pidfile = "/my/pid/file"
self.daemon = TestDaemon(self.pidfile)
self.run_method = run_method
self.echo_method = echo_method
self.error_method = error_method
@mock.patch("os.fork", create=True)
@mock.patch("os.chdir")
@mock.patch("os.setsid", create=True)
@mock.patch("os.umask")
@mock.patch("sys.exit")
def test_double_fork(self, mock_exit, mock_umask, mock_setsid, mock_chdir, mock_fork):
# setup
pid1 = 1234
pid2 = 2345
mock_fork.side_effect = [pid1, pid2]
# test
self.daemon._double_fork()
# assert
self.assertListEqual(mock_fork.mock_calls, [mock.call(), mock.call()])
self.assertListEqual(mock_exit.mock_calls, [mock.call(0), mock.call(0)])
mock_chdir.assert_called_once_with("/")
mock_setsid.assert_called_once_with()
mock_umask.assert_called_once_with(002)
@mock.patch("os.fork", create=True)
@mock.patch("sys.exit")
def test_double_fork_failed_first(self, mock_exit, mock_fork):
# setup
mock_fork.side_effect = OSError()
mock_exit.side_effect = ExpectedExit()
# test
try:
self.daemon._double_fork()
self.fail("Expected an exit")
except ExpectedExit:
pass
# assert
self.assertListEqual(mock_fork.mock_calls, [mock.call()])
self.assertListEqual(mock_exit.mock_calls, [mock.call(1)])
self.assertEquals(len(self.error_method.mock_calls), 1)
@mock.patch("os.fork", create=True)
@mock.patch("os.chdir")
@mock.patch("os.setsid", create=True)
@mock.patch("os.umask")
@mock.patch("sys.exit")
def test_double_fork_failed_second(self, mock_exit, mock_umask, mock_setsid, mock_chdir, mock_fork):
# setup
mock_fork.side_effect = [1234, OSError()]
mock_exit.side_effect = [None, ExpectedExit()]
# test
try:
self.daemon._double_fork()
self.fail("Expected an exit")
except ExpectedExit:
pass
# assert
self.assertEquals(mock_fork.call_count, 2)
self.assertListEqual(mock_exit.mock_calls, [mock.call(0), mock.call(1)])
self.assertEquals(self.error_method.call_count, 1)
mock_chdir.assert_called_once_with("/")
mock_setsid.assert_called_once_with()
mock_umask.assert_called_once_with(002)
@mock.patch("sys.stdin")
@mock.patch("sys.stdout")
@mock.patch("sys.stderr")
@mock.patch("os.devnull")
@mock.patch("__builtin__.open")
@mock.patch("os.dup2")
def test_redirect_io(self, mock_dup2, mock_open, mock_devnull, mock_stderr, mock_stdout, mock_stdin):
# setup
mock_stdin.fileno.return_value = "stdin"
mock_stdout.fileno.return_value = "stdout"
mock_stderr.fileno.return_value = "stderr"
new_stdin = mock.MagicMock()
new_stdout = mock.MagicMock()
new_stderr = mock.MagicMock()
new_stdin.fileno.return_value = "new_stdin"
new_stdout.fileno.return_value = "new_stdout"
new_stderr.fileno.return_value = "new_stderr"
mock_open.side_effect = [new_stdin, new_stdout, new_stderr]
# test
self.daemon._redirect_io()
# assert
mock_stdout.flush.assert_called_once_with()
mock_stderr.flush.assert_called_once_with()
self.assertListEqual(mock_open.mock_calls,
[mock.call(mock_devnull, "r"),
mock.call(mock_devnull, "a+"),
mock.call(mock_devnull, "a+")])
self.assertListEqual(mock_dup2.mock_calls,
[mock.call("new_stdin", "stdin"),
mock.call("new_stdout", "stdout"),
mock.call("new_stderr", "stderr")])
@mock.patch("os.getpid")
@mock.patch("signal.signal")
def test_daemonize(self, mock_signal, mock_getpid):
# setup
self.daemon._double_fork = mock.MagicMock()
self.daemon._redirect_io = mock.MagicMock()
self.daemon.set_pid = mock.MagicMock()
pid = 1234
mock_getpid.return_value = pid
# test
self.daemon.start()
# assert
import signal
self.daemon._double_fork.assert_called_once_with()
self.daemon._redirect_io.assert_called_once_with()
self.daemon.set_pid.assert_called_once_with(str(pid))
mock_signal.assert_called_once_with(signal.SIGTERM, self.daemon._on_sigterm)
@mock.patch("sys.exit")
def test_on_sigterm(self, mock_exit):
# setup
self.daemon.remove_pidfile = mock.MagicMock()
mock_exit.side_effect = ExpectedExit
# test
try:
self.daemon._on_sigterm("foo", "bar")
self.fail("Expected an exit")
except ExpectedExit:
pass
# assert
self.daemon.remove_pidfile.assert_called_once_with()
mock_exit.assert_called_once_with(0)
def test_start(self):
# setup
self.daemon._daemonize = mock.MagicMock()
self.daemon.get_pid = mock.MagicMock()
self.daemon.get_pid.return_value = None
# test
self.daemon.start()
# assert
self.daemon._daemonize.assert_called_once_with()
self.daemon.get_pid.assert_called_once_with()
self.echo_method.assert_called_once_with("Starting daemon...")
self.assertTrue(self.run_method.called)
@mock.patch("sys.exit")
def test_start_running(self, mock_exit):
# setup
pid = "1234"
self.daemon.get_pid = mock.MagicMock()
self.daemon.get_pid.return_value = pid
mock_exit.side_effect = ExpectedExit()
# test
try:
self.daemon.start()
self.fail("Expected an exit")
except ExpectedExit:
pass
# assert
self.daemon.get_pid.assert_called_once_with()
self.assertTrue(self.error_method.called)
mock_exit.assert_called_once_with(1)
@mock.patch("os.kill")
@mock.patch("time.sleep")
def test_stop(self, mock_sleep, mock_kill):
import signal
# setup
pid = "1234"
self.daemon.get_pid = mock.MagicMock()
self.daemon.get_pid.return_value = pid
self.daemon.remove_pidfile = mock.MagicMock()
mock_kill.side_effect = [None, OSError("No such process")]
# test
self.daemon.stop()
# assert
self.daemon.get_pid.assert_called_once_with()
self.assertListEqual(mock_kill.mock_calls,
[mock.call(pid, signal.SIGTERM),
mock.call(pid, signal.SIGTERM)])
mock_sleep.assert_called_once_with(0.1)
self.daemon.remove_pidfile.assert_called_once_with()
@mock.patch("sys.exit")
def test_stop_not_running(self, mock_exit):
# setup
self.daemon.get_pid = mock.MagicMock()
self.daemon.get_pid.return_value = None
mock_exit.side_effect = ExpectedExit()
# test
try:
self.daemon.stop()
self.fail("Expected an exit")
except ExpectedExit:
pass
# assert
self.daemon.get_pid.assert_called_once_with()
self.assertEquals(self.error_method.call_count, 1)
mock_exit.assert_called_once_with(1)
@mock.patch("sys.exit")
def test_stop_not_running_no_error(self, mock_exit):
# setup
self.daemon.get_pid = mock.MagicMock()
self.daemon.get_pid.return_value = None
# test
self.daemon.stop(check_running=False)
# assert
self.daemon.get_pid.assert_called_once_with()
self.assertFalse(mock_exit.called)
@mock.patch("os.kill")
@mock.patch("sys.exit")
def test_stop_unknown_error(self, mock_exit, mock_kill):
# setup
pid = "1234"
self.daemon.get_pid = mock.MagicMock()
self.daemon.get_pid.return_value = pid
mock_exit.side_effect = ExpectedExit()
mock_kill.side_effect = OSError("Unknown")
# test
try:
self.daemon.stop()
self.fail("Expected an exit")
except ExpectedExit:
pass
# assert
self.assertTrue(self.error_method.called)
mock_exit.assert_called_once_with(1)
def test_restart(self):
# setup
self.daemon.start = mock.MagicMock()
self.daemon.stop = mock.MagicMock()
# test
self.daemon.restart()
# assert
self.daemon.stop.assert_called_once_with(check_running=False)
self.daemon.start.assert_called_once_with()
def test_status_running(self):
# setup
self.daemon.is_running = mock.MagicMock()
self.daemon.is_running.return_value = True
# test
self.daemon.status()
# assert
self.echo_method.assert_called_once_with("Daemon is running")
def test_status_not_running(self):
# setup
self.daemon.is_running = mock.MagicMock()
self.daemon.is_running.return_value = False
# test
self.daemon.status()
# assert
self.echo_method.assert_called_once_with("Daemon is not running")
@mock.patch("os.kill")
def test_is_running_true(self, mock_kill):
# setup
pid = "1234"
self.daemon.get_pid = mock.MagicMock()
self.daemon.get_pid.return_value = pid
self.daemon.remove_pidfile = mock.MagicMock()
# test
result = self.daemon.is_running()
# assert
self.assertTrue(result)
mock_kill.assert_called_once_with(pid, 0)
self.assertFalse(self.daemon.remove_pidfile.called)
self.assertFalse(self.error_method.called)
def test_is_running_false_no_pid(self):
# setup
self.daemon.get_pid = mock.MagicMock()
self.daemon.get_pid.return_value = None
# test
result = self.daemon.is_running()
# assert
self.assertFalse(result)
@mock.patch("os.kill")
def test_is_running_false_pidfile_removed(self, mock_kill):
# setup
pid = "1234"
self.daemon.get_pid = mock.MagicMock()
self.daemon.get_pid.return_value = pid
mock_kill.side_effect = OSError()
self.daemon.remove_pidfile = mock.MagicMock()
# test
result = self.daemon.is_running()
# assert
self.assertFalse(result)
mock_kill.assert_called_once_with(pid, 0)
self.daemon.remove_pidfile.assert_called_once_with()
self.assertFalse(self.error_method.called)
@mock.patch("os.kill")
def test_is_running_false_pidfile_error(self, mock_kill):
# setup
pid = "1234"
self.daemon.get_pid = mock.MagicMock()
self.daemon.get_pid.return_value = pid
mock_kill.side_effect = OSError()
self.daemon.remove_pidfile = mock.MagicMock()
self.daemon.remove_pidfile.side_effect = IOError()
# test
result = self.daemon.is_running()
# assert
self.assertFalse(result)
mock_kill.assert_called_once_with(pid, 0)
self.daemon.remove_pidfile.assert_called_once_with()
self.assertTrue(self.error_method.called)
def test_get_pid(self):
# setup
pid = 1234
# test
with mock.patch("__builtin__.open", mock.mock_open(read_data="{}\n".format(pid)), create=True) as m:
result = self.daemon.get_pid()
# assert
self.assertEquals(result, pid)
m.assert_called_once_with(self.pidfile, "r")
def test_get_pid_ioerror(self):
# setup
handle = mock.MagicMock()
handle.__enter__.side_effect = IOError()
# test
with mock.patch("__builtin__.open", mock.mock_open(), create=True) as m:
result = self.daemon.get_pid()
# assert
self.assertIsNone(result)
m.assert_called_once_with(self.pidfile, "r")
def test_get_pid_valueerror(self):
# setup
pid = "not an integer"
# test
with mock.patch("__builtin__.open", mock.mock_open(read_data="{}\n".format(pid)), create=True) as m:
result = self.daemon.get_pid()
# assert
self.assertIsNone(result)
m.assert_called_once_with(self.pidfile, "r")
def test_set_pid(self):
# setup
pid = "1234"
# test
with mock.patch("__builtin__.open", mock.mock_open(), create=True) as m:
self.daemon.set_pid(pid)
# assert
m.assert_called_once_with(self.pidfile, "w+")
handle = m()
handle.write.assert_called_once_with("{}\n".format(pid))
def test_set_pid_int(self):
# setup
pid = 1234
# test
with mock.patch("__builtin__.open", mock.mock_open(), create=True) as m:
self.daemon.set_pid(pid)
# assert
m.assert_called_once_with(self.pidfile, "w+")
handle = m()
handle.write.assert_called_once_with("{}\n".format(pid))
@mock.patch("os.path.isfile")
@mock.patch("os.remove")
def test_remove_pidfile_exists(self, mock_remove, mock_isfile):
# setup
mock_isfile.return_value = True
# test
self.daemon.remove_pidfile()
# assert
mock_isfile.assert_called_once_with(self.pidfile)
mock_remove.assert_called_once_with(self.pidfile)
@mock.patch("os.path.isfile")
@mock.patch("os.remove")
def test_remove_pidfile_doesnt_exist(self, mock_remove, mock_isfile):
# setup
mock_isfile.return_value = False
# test
self.daemon.remove_pidfile()
# assert
mock_isfile.assert_called_once_with(self.pidfile)
self.assertFalse(mock_remove.called)

View file

@ -0,0 +1,3 @@
# This is a text file encoded in UTF8 with a BOM
Here are some umlauts: äöüÄÖÜß

View file

@ -0,0 +1,3 @@
# This is a text file encoded in UTF8 without a BOM
Here are some umlauts: äöüÄÖÜß

View file

@ -0,0 +1,222 @@
# coding=utf-8
from __future__ import absolute_import
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
__copyright__ = "Copyright (C) 2015 The OctoPrint Project - Released under terms of the AGPLv3 License"
import unittest
import mock
import os
import octoprint.util
class BomAwareOpenTest(unittest.TestCase):
"""
Tests for :func:`octoprint.util.bom_aware_open`.
"""
def setUp(self):
self.filename_utf8_with_bom = os.path.join(os.path.abspath(os.path.dirname(__file__)), "_files", "utf8_with_bom.txt")
self.filename_utf8_without_bom = os.path.join(os.path.abspath(os.path.dirname(__file__)), "_files", "utf8_without_bom.txt")
def test_bom_aware_open_with_bom(self):
"""Tests that the contents of a UTF8 file with BOM are loaded correctly (without the BOM)."""
# test
with octoprint.util.bom_aware_open(self.filename_utf8_with_bom, encoding="utf-8") as f:
contents = f.readlines()
# assert
self.assertEquals(len(contents), 3)
self.assertTrue(contents[0].startswith("#"))
def test_bom_aware_open_without_bom(self):
"""Tests that the contents of a UTF8 file without BOM are loaded correctly."""
# test
with octoprint.util.bom_aware_open(self.filename_utf8_without_bom, encoding="utf-8") as f:
contents = f.readlines()
# assert
self.assertEquals(len(contents), 3)
self.assertTrue(contents[0].startswith("#"))
def test_bom_aware_open_ascii(self):
"""Tests that the contents of a UTF8 file loaded as ASCII are replaced correctly if "replace" is specified on errors."""
# test
with octoprint.util.bom_aware_open(self.filename_utf8_with_bom, errors="replace") as f:
contents = f.readlines()
# assert
self.assertEquals(len(contents), 3)
self.assertTrue(contents[0].startswith(u"\ufffd" * 3 + "#"))
self.assertTrue(contents[2].endswith(u"\ufffd\ufffd" * 6))
def test_bom_aware_open_encoding_error(self):
"""Tests that an encoding error is thrown if not suppressed when opening a UTF8 file as ASCII."""
try:
with octoprint.util.bom_aware_open(self.filename_utf8_without_bom) as f:
f.readlines()
self.fail("Expected an exception")
except UnicodeDecodeError:
pass
def test_bom_aware_open_parameters(self):
"""Tests that the parameters are propagated properly."""
with mock.patch("codecs.open") as mock_open:
with octoprint.util.bom_aware_open(self.filename_utf8_without_bom, mode="rb", encoding="utf-8", errors="ignore") as f:
f.readlines()
mock_open.assert_called_once_with(self.filename_utf8_without_bom, encoding="utf-8", mode="rb", errors="ignore")
class TestAtomicWrite(unittest.TestCase):
"""
Tests for :func:`octoprint.util.atomic_write`.
"""
def setUp(self):
pass
@mock.patch("shutil.move")
@mock.patch("tempfile.NamedTemporaryFile")
def test_atomic_write(self, mock_tempfile, mock_move):
"""Tests the regular basic "good" case."""
# setup
mock_file = mock.MagicMock()
mock_file.name = "tempfile.tmp"
mock_tempfile.return_value = mock_file
# test
with octoprint.util.atomic_write("somefile.yaml") as f:
f.write("test")
# assert
mock_tempfile.assert_called_once_with(mode="w+b", prefix="tmp", suffix="", delete=False)
mock_file.write.assert_called_once_with("test")
mock_file.close.assert_called_once_with()
mock_move.assert_called_once_with("tempfile.tmp", "somefile.yaml")
@mock.patch("shutil.move")
@mock.patch("tempfile.NamedTemporaryFile")
def test_atomic_write_error_on_write(self, mock_tempfile, mock_move):
"""Tests the error case where something in the wrapped code fails."""
# setup
mock_file = mock.MagicMock()
mock_file.name = "tempfile.tmp"
mock_file.write.side_effect = RuntimeError()
mock_tempfile.return_value = mock_file
# test
try:
with octoprint.util.atomic_write("somefile.yaml") as f:
f.write("test")
self.fail("Expected an exception")
except RuntimeError:
pass
# assert
mock_tempfile.assert_called_once_with(mode="w+b", prefix="tmp", suffix="", delete=False)
mock_file.close.assert_called_once_with()
self.assertFalse(mock_move.called)
@mock.patch("shutil.move")
@mock.patch("tempfile.NamedTemporaryFile")
def test_atomic_write_error_on_move(self, mock_tempfile, mock_move):
"""Tests the error case where the final move fails."""
# setup
mock_file = mock.MagicMock()
mock_file.name = "tempfile.tmp"
mock_tempfile.return_value = mock_file
mock_move.side_effect = RuntimeError()
# test
try:
with octoprint.util.atomic_write("somefile.yaml") as f:
f.write("test")
self.fail("Expected an exception")
except RuntimeError:
pass
# assert
mock_tempfile.assert_called_once_with(mode="w+b", prefix="tmp", suffix="", delete=False)
mock_file.close.assert_called_once_with()
self.assertTrue(mock_move.called)
@mock.patch("shutil.move")
@mock.patch("tempfile.NamedTemporaryFile")
def test_atomic_write_parameters(self, mock_tempfile, mock_move):
"""Tests that the open parameters are propagated properly."""
# setup
mock_file = mock.MagicMock()
mock_file.name = "tempfile.tmp"
mock_tempfile.return_value = mock_file
# test
with octoprint.util.atomic_write("somefile.yaml", mode="w", prefix="foo", suffix="bar") as f:
f.write("test")
# assert
mock_tempfile.assert_called_once_with(mode="w", prefix="foo", suffix="bar", delete=False)
mock_file.close.assert_called_once_with()
mock_move.assert_called_once_with("tempfile.tmp", "somefile.yaml")
class TempDirTest(unittest.TestCase):
@mock.patch("shutil.rmtree")
@mock.patch("tempfile.mkdtemp")
def test_tempdir(self, mock_mkdtemp, mock_rmtree):
"""Tests regular "good" case."""
# setup
path = "/path/to/tmpdir"
mock_mkdtemp.return_value = path
# test
with octoprint.util.tempdir() as td:
self.assertEquals(td, path)
# assert
mock_mkdtemp.assert_called_once_with()
mock_rmtree.assert_called_once_with(path, ignore_errors=False, onerror=None)
@mock.patch("shutil.rmtree")
@mock.patch("tempfile.mkdtemp")
def test_tempdir_parameters_mkdtemp(self, mock_mkdtemp, mock_rmtree):
"""Tests that parameters for mkdtemp are properly propagated."""
# setup
path = "/path/to/tmpdir"
mock_mkdtemp.return_value = path
# test
with octoprint.util.tempdir(prefix="prefix", suffix="suffix", dir="dir") as td:
self.assertEquals(td, path)
# assert
mock_mkdtemp.assert_called_once_with(prefix="prefix", suffix="suffix", dir="dir")
mock_rmtree.assert_called_once_with(path, ignore_errors=False, onerror=None)
@mock.patch("shutil.rmtree")
@mock.patch("tempfile.mkdtemp")
def test_tempdir_parameters_rmtree(self, mock_mkdtemp, mock_rmtree):
"""Tests that parameters for rmtree are properly propagated."""
# setup
path = "/path/to/tmpdir"
mock_mkdtemp.return_value = path
onerror = mock.MagicMock()
# test
with octoprint.util.tempdir(ignore_errors=True, onerror=onerror) as td:
self.assertEquals(td, path)
# assert
mock_mkdtemp.assert_called_once_with()
mock_rmtree.assert_called_once_with(path, ignore_errors=True, onerror=onerror)