Refactoring in octoprint.util

* Renamed a couple of camelCased methods to snake_case
  * Moved to better location what didn't make sense in the common util module
This commit is contained in:
Gina Häußge 2015-02-27 10:28:16 +01:00
parent c76b84e1be
commit 0a92b340ac
17 changed files with 106 additions and 134 deletions

View file

@ -25,6 +25,8 @@
- `action:pause`: Pauses the current job in OctoPrint
- `action:resume`: Resumes the current job in OctoPrint
- `action:disconnect`: Disconnects OctoPrint from the printer
Plugins can add supported commands by [hooking](http://docs.octoprint.org/en/devel/plugins/hooks.html) into the
``octoprint.comm.protocol.action`` hook
* Mousing over the webcam image in the control tab enables key control mode, allowing you to quickly move the axis of your
printer with your computer's keyboard ([#610](https://github.com/foosel/OctoPrint/pull/610)):
- arrow keys: X and Y axes
@ -134,6 +136,7 @@
* Color code successful or failed print results directly in file list, not just after a reload
* Changing Timelapse post roll activates save button
* Timelapse post roll is loaded properly from config
* Handling of files on the printer's SD card contained in folders now works correctly
([Commits](https://github.com/foosel/OctoPrint/compare/master...devel))

View file

@ -13,7 +13,7 @@ import tempfile
import octoprint.filemanager
from octoprint.util import safeRename
from octoprint.util import safe_rename
class StorageInterface(object):
"""
@ -1003,7 +1003,7 @@ class LocalFileStorage(StorageInterface):
with open(metadata_temporary_path, "w") as f:
import yaml
yaml.safe_dump(metadata, stream=f, default_flow_style=False, indent=" ", allow_unicode=True)
safeRename(metadata_temporary_path, metadata_path, throw_error=True)
safe_rename(metadata_temporary_path, metadata_path, throw_error=True)
except:
self._logger.exception("Error while writing .metadata.yaml to {path}".format(**locals()))
else:

View file

@ -92,7 +92,7 @@ class CuraPlugin(octoprint.plugin.SlicerPlugin,
# default values for name, display name and description
profile_name = _sanitize_name(name)
profile_display_name = name
profile_description = "Imported from {filename} on {date}".format(filename=filename, date=octoprint.util.getFormattedDateTime(datetime.datetime.now()))
profile_description = "Imported from {filename} on {date}".format(filename=filename, date=octoprint.util.get_formatted_datetime(datetime.datetime.now()))
profile_allow_overwrite = False
# overrides

View file

@ -580,7 +580,7 @@ class Server():
if settings().getBoolean(["accessControl", "enabled"]):
userManagerName = settings().get(["accessControl", "userManager"])
try:
clazz = octoprint.util.getClass(userManagerName)
clazz = octoprint.util.get_class(userManagerName)
userManager = clazz()
except AttributeError, e:
logger.exception("Could not instantiate user manager %s, will run with accessControl disabled!" % userManagerName)

View file

@ -20,7 +20,7 @@ import octoprint.plugin
from octoprint.server import admin_permission, NO_CONTENT
from octoprint.settings import settings as s, valid_boolean_trues
from octoprint.server.util import apiKeyRequestHandler, corsResponseHandler
from octoprint.server.util.flask import restricted_access
from octoprint.server.util.flask import restricted_access, get_remote_address, get_json_command_from_request
#~~ init api blueprint, including sub modules
@ -73,7 +73,7 @@ def pluginCommand(name):
if valid_commands is None:
return make_response("Method not allowed", 405)
command, data, response = util.getJsonCommandFromRequest(request, valid_commands)
command, data, response = get_json_command_from_request(request, valid_commands)
if response is not None:
return response
@ -204,7 +204,7 @@ def login():
localNetworks.add(ip)
try:
remoteAddr = util.getRemoteAddress(request)
remoteAddr = get_remote_address(request)
if netaddr.IPAddress(remoteAddr) in localNetworks:
user = octoprint.server.userManager.findUser(autologinAs)
if user is not None:

View file

@ -11,7 +11,7 @@ from octoprint.settings import settings
from octoprint.printer import getConnectionOptions
from octoprint.server import printer, printerProfileManager, NO_CONTENT
from octoprint.server.api import api
from octoprint.server.util.flask import restricted_access
from octoprint.server.util.flask import restricted_access, get_json_command_from_request
import octoprint.util as util
@ -36,7 +36,7 @@ def connectionCommand():
"disconnect": []
}
command, data, response = util.getJsonCommandFromRequest(request, valid_commands)
command, data, response = get_json_command_from_request(request, valid_commands)
if response is not None:
return response

View file

@ -11,7 +11,7 @@ import octoprint.util as util
from octoprint.filemanager.destinations import FileDestinations
from octoprint.settings import settings, valid_boolean_trues
from octoprint.server import printer, fileManager, slicingManager, eventManager, NO_CONTENT
from octoprint.server.util.flask import restricted_access
from octoprint.server.util.flask import restricted_access, get_json_command_from_request
from octoprint.server.api import api
from octoprint.events import Events
import octoprint.filemanager
@ -27,7 +27,7 @@ def readGcodeFiles():
filter = request.values["filter"]
files = _getFileList(FileDestinations.LOCAL, filter=filter)
files.extend(_getFileList(FileDestinations.SDCARD))
return jsonify(files=files, free=util.getFreeBytes(settings().getBaseFolder("uploads")))
return jsonify(files=files, free=util.get_free_bytes(settings().getBaseFolder("uploads")))
@api.route("/files/<string:origin>", methods=["GET"])
@ -38,7 +38,7 @@ def readGcodeFilesForOrigin(origin):
files = _getFileList(origin)
if origin == FileDestinations.LOCAL:
return jsonify(files=files, free=util.getFreeBytes(settings().getBaseFolder("uploads")))
return jsonify(files=files, free=util.get_free_bytes(settings().getBaseFolder("uploads")))
else:
return jsonify(files=files)
@ -276,7 +276,7 @@ def gcodeFileCommand(filename, target):
"slice": []
}
command, data, response = util.getJsonCommandFromRequest(request, valid_commands)
command, data, response = get_json_command_from_request(request, valid_commands)
if response is not None:
return response

View file

@ -8,7 +8,7 @@ __copyright__ = "Copyright (C) 2014 The OctoPrint Project - Released under terms
from flask import request, make_response, jsonify
from octoprint.server import printer, NO_CONTENT
from octoprint.server.util.flask import restricted_access
from octoprint.server.util.flask import restricted_access, get_json_command_from_request
from octoprint.server.api import api
import octoprint.util as util
@ -26,7 +26,7 @@ def controlJob():
"cancel": []
}
command, data, response = util.getJsonCommandFromRequest(request, valid_commands)
command, data, response = get_json_command_from_request(request, valid_commands)
if response is not None:
return response

View file

@ -15,7 +15,7 @@ from octoprint.settings import settings
from octoprint.server import NO_CONTENT, admin_permission
from octoprint.server.util.flask import redirect_to_tornado, restricted_access
from octoprint.server.api import api
from octoprint.util import getFreeBytes
from octoprint.util import get_free_bytes
@api.route("/logs", methods=["GET"])
@ -23,7 +23,7 @@ from octoprint.util import getFreeBytes
@admin_permission.require(403)
def getLogFiles():
files = _getLogFiles()
return jsonify(files=files, free=getFreeBytes(settings().getBaseFolder("logs")))
return jsonify(files=files, free=get_free_bytes(settings().getBaseFolder("logs")))
@api.route("/logs/<path:filename>", methods=["GET"])

View file

@ -12,7 +12,7 @@ import re
from octoprint.settings import settings, valid_boolean_trues
from octoprint.server import printer, NO_CONTENT
from octoprint.server.api import api
from octoprint.server.util.flask import restricted_access
from octoprint.server.util.flask import restricted_access, get_json_command_from_request
import octoprint.util as util
#~~ Printer
@ -64,7 +64,7 @@ def printerToolCommand():
"extrude": ["amount"],
"flowrate": ["factor"]
}
command, data, response = util.getJsonCommandFromRequest(request, valid_commands)
command, data, response = get_json_command_from_request(request, valid_commands)
if response is not None:
return response
@ -166,7 +166,7 @@ def printerBedCommand():
"target": ["target"],
"offset": ["offset"]
}
command, data, response = util.getJsonCommandFromRequest(request, valid_commands)
command, data, response = get_json_command_from_request(request, valid_commands)
if response is not None:
return response
@ -232,7 +232,7 @@ def printerPrintheadCommand():
"home": ["axes"],
"feedrate": ["factor"]
}
command, data, response = util.getJsonCommandFromRequest(request, valid_commands)
command, data, response = get_json_command_from_request(request, valid_commands)
if response is not None:
return response
@ -293,7 +293,7 @@ def printerSdCommand():
"refresh": [],
"release": []
}
command, data, response = util.getJsonCommandFromRequest(request, valid_commands)
command, data, response = get_json_command_from_request(request, valid_commands)
if response is not None:
return response

View file

@ -57,7 +57,7 @@ def downloadTimelapse(filename):
@api.route("/timelapse/<filename>", methods=["DELETE"])
@restricted_access
def deleteTimelapse(filename):
if util.isAllowedFile(filename, {"mpg"}):
if util.is_allowed_file(filename, {"mpg"}):
timelapse_folder = settings().getBaseFolder("timelapse")
full_path = os.path.realpath(os.path.join(timelapse_folder, filename))
if full_path.startswith(timelapse_folder) and os.path.exists(full_path):

View file

@ -1,5 +1,6 @@
# coding=utf-8
from __future__ import absolute_import
from flask import make_response
__author__ = "Gina Häußge <osd@foosel.net>"
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
@ -245,3 +246,25 @@ class AppSessionManager(object):
self._logger.debug("App sessions after cleanup: %r" % self._sessions)
def get_remote_address(request):
forwardedFor = request.headers.get("X-Forwarded-For", None)
if forwardedFor is not None:
return forwardedFor.split(",")[0]
return request.remote_addr
def get_json_command_from_request(request, valid_commands):
if not "application/json" in request.headers["Content-Type"]:
return None, None, make_response("Expected content-type JSON", 400)
data = request.json
if not "command" in data.keys() or not data["command"] in valid_commands.keys():
return None, None, make_response("Expected valid command", 400)
command = data["command"]
for parameter in valid_commands[command]:
if not parameter in data:
return None, None, make_response("Mandatory parameter %s missing for command %s" % (parameter, command), 400)
return command, data, None

View file

@ -373,7 +373,7 @@ class UploadStorageFallbackHandler(tornado.web.RequestHandler):
finally:
# make sure the temporary files are removed again
for f in self._files:
octoprint.util.silentRemove(f)
octoprint.util.silent_remove(f)
# make all http methods trigger _handle_method
get = _handle_method

View file

@ -36,7 +36,7 @@ class GcodeWatchdogHandler(watchdog.events.PatternMatchingEventHandler):
self.filename = os.path.basename(self._path)
def save(self, target):
octoprint.util.safeRename(self._path, target)
octoprint.util.safe_rename(self._path, target)
file_wrapper = WatchdogFileWrapper(path)

View file

@ -33,9 +33,9 @@ def getFinishedTimelapses():
statResult = os.stat(os.path.join(basedir, osFile))
files.append({
"name": osFile,
"size": util.getFormattedSize(statResult.st_size),
"size": util.get_formatted_size(statResult.st_size),
"bytes": statResult.st_size,
"date": util.getFormattedDateTime(datetime.datetime.fromtimestamp(statResult.st_ctime))
"date": util.get_formatted_datetime(datetime.datetime.fromtimestamp(statResult.st_ctime))
})
return files

View file

@ -5,19 +5,13 @@ __license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agp
import os
import traceback
import sys
import time
import re
import tempfile
import logging
import shutil
from functools import wraps
from flask import make_response
import warnings
from octoprint.settings import settings, default_settings
logger = logging.getLogger(__name__)
@ -38,7 +32,7 @@ def warning_decorator_factory(warning_type):
deprecated = warning_decorator_factory(DeprecationWarning)
pending_deprecation = warning_decorator_factory(PendingDeprecationWarning)
def getFormattedSize(num):
def get_formatted_size(num):
"""
Taken from http://stackoverflow.com/a/1094933/2028598
"""
@ -49,11 +43,11 @@ def getFormattedSize(num):
return "%3.1f%s" % (num, "TB")
def isAllowedFile(filename, extensions):
def is_allowed_file(filename, extensions):
return "." in filename and filename.rsplit(".", 1)[1] in extensions
def getFormattedTimeDelta(d):
def get_formatted_timedelta(d):
if d is None:
return None
hours = d.days * 24 + d.seconds // 3600
@ -62,14 +56,14 @@ def getFormattedTimeDelta(d):
return "%02d:%02d:%02d" % (hours, minutes, seconds)
def getFormattedDateTime(d):
def get_formatted_datetime(d):
if d is None:
return None
return d.strftime("%Y-%m-%d %H:%M")
def getClass(name):
def get_class(name):
"""
Taken from http://stackoverflow.com/a/452981/2028598
"""
@ -81,47 +75,12 @@ def getClass(name):
return m
def isDevVersion():
gitPath = os.path.abspath(os.path.join(os.path.split(os.path.abspath(__file__))[0], "../../../.git"))
return os.path.exists(gitPath)
def getExceptionString():
def get_exception_string():
locationInfo = traceback.extract_tb(sys.exc_info()[2])[0]
return "%s: '%s' @ %s:%s:%d" % (str(sys.exc_info()[0].__name__), str(sys.exc_info()[1]), os.path.basename(locationInfo[0]), locationInfo[2], locationInfo[1])
def getGitInfo():
gitPath = os.path.abspath(os.path.join(os.path.split(os.path.abspath(__file__))[0], "../../../.git"))
if not os.path.exists(gitPath):
return (None, None)
headref = None
with open(os.path.join(gitPath, "HEAD"), "r") as f:
headref = f.readline().strip()
if headref is None:
return (None, None)
headref = headref[len("ref: "):]
branch = headref[headref.rfind("/") + 1:]
with open(os.path.join(gitPath, headref)) as f:
head = f.readline().strip()
return (branch, head)
def getNewTimeout(type):
now = time.time()
if type not in default_settings["serial"]["timeout"].keys():
# timeout immediately for unknown timeout type
return now
return now + settings().getFloat(["serial", "timeout", type])
def getFreeBytes(path):
def get_free_bytes(path):
"""
Taken from http://stackoverflow.com/a/2372171/2028598
"""
@ -135,14 +94,7 @@ def getFreeBytes(path):
return st.f_bavail * st.f_frsize
def getRemoteAddress(request):
forwardedFor = request.headers.get("X-Forwarded-For", None)
if forwardedFor is not None:
return forwardedFor.split(",")[0]
return request.remote_addr
def getDosFilename(input, existingFilenames, extension=None):
def get_dos_filename(input, existingFilenames, extension=None):
if input is None:
return None
@ -150,10 +102,10 @@ def getDosFilename(input, existingFilenames, extension=None):
extension = "gco"
filename, ext = input.rsplit(".", 1)
return findCollisionfreeName(filename, extension, existingFilenames)
return find_collision_free_name(filename, extension, existingFilenames)
def findCollisionfreeName(input, extension, existingFilenames):
def find_collision_free_name(input, extension, existingFilenames):
filename = re.sub(r"\s+", "_", input.lower().translate({ord(i):None for i in ".\"/\\[]:;=,"}))
counter = 1
@ -169,7 +121,7 @@ def findCollisionfreeName(input, extension, existingFilenames):
raise ValueError("Can't create a collision free filename")
def safeRename(old, new, throw_error=False):
def safe_rename(old, new, throw_error=False):
"""
Safely renames a file.
@ -178,7 +130,7 @@ def safeRename(old, new, throw_error=False):
anything goes wrong during those steps, the backup (if already there) will be renamed to its old name and thus
the operation hopefully result in a no-op.
On other operating systems the atomic os.rename function will be used instead.
On other operating systems :func:`shutil.move` will be used instead.
@param old the path to the old file to be renamed
@param new the path to the new file to be created/replaced
@ -190,7 +142,7 @@ def safeRename(old, new, throw_error=False):
try:
if os.path.exists(new):
silentRemove(backup)
silent_remove(backup)
os.rename(new, backup)
os.rename(old, new)
os.remove(backup)
@ -198,7 +150,7 @@ def safeRename(old, new, throw_error=False):
# if anything went wrong, try to rename the backup file to its original name
logger.error("Could not perform safe rename, trying to revert")
if os.path.exists(backup):
silentRemove(new)
silent_remove(new)
os.rename(backup, new)
if throw_error:
raise e
@ -207,7 +159,7 @@ def safeRename(old, new, throw_error=False):
shutil.move(old, new)
def silentRemove(file):
def silent_remove(file):
"""
Silently removes a file. Does not raise an error if the file doesn't exist.
@ -220,11 +172,11 @@ def silentRemove(file):
pass
def sanitizeAscii(line):
def sanitize_ascii(line):
return unicode(line, 'ascii', 'replace').encode('ascii', 'replace').rstrip()
def filterNonAscii(line):
def filter_non_ascii(line):
"""
Returns True if the line contains non-ascii characters, false otherwise
@ -238,22 +190,6 @@ def filterNonAscii(line):
return True
def getJsonCommandFromRequest(request, valid_commands):
if not "application/json" in request.headers["Content-Type"]:
return None, None, make_response("Expected content-type JSON", 400)
data = request.json
if not "command" in data.keys() or not data["command"] in valid_commands.keys():
return None, None, make_response("Expected valid command", 400)
command = data["command"]
for parameter in valid_commands[command]:
if not parameter in data:
return None, None, make_response("Mandatory parameter %s missing for command %s" % (parameter, command), 400)
return command, data, None
def dict_merge(a, b):
'''recursively merges dict's. not just simple a['key'] = b['key'], if
both a and bhave a key who's value is a dict then dict_merge is called
@ -332,7 +268,7 @@ def address_for_client(host, port):
sock.bind((address, 0))
sock.connect((host, port))
return address
except Exception as e:
pass
except:
continue

View file

@ -20,11 +20,11 @@ from collections import deque
from octoprint.util.avr_isp import stk500v2
from octoprint.util.avr_isp import ispBase
from octoprint.settings import settings
from octoprint.settings import settings, default_settings
from octoprint.events import eventManager, Events
from octoprint.filemanager import valid_file_type
from octoprint.filemanager.destinations import FileDestinations
from octoprint.util import getExceptionString, getNewTimeout, sanitizeAscii, filterNonAscii
from octoprint.util import get_exception_string, sanitize_ascii, filter_non_ascii
from octoprint.util.virtual import VirtualPrinter
try:
@ -415,7 +415,7 @@ class MachineCom(object):
self._sendNext()
except:
self._logger.exception("Error while trying to start printing")
self._errorValue = getExceptionString()
self._errorValue = get_exception_string()
self._changeState(self.STATE_ERROR)
eventManager().fire(Events.ERROR, {"error": self.getErrorString()})
@ -652,10 +652,10 @@ class MachineCom(object):
self._changeState(self.STATE_CONNECTING)
#Start monitoring the serial port.
self._timeout = getNewTimeout("communication")
self._timeout = get_new_timeout("communication")
tempRequestTimeout = getNewTimeout("temperature")
sdStatusRequestTimeout = getNewTimeout("sdStatus")
tempRequestTimeout = get_new_timeout("temperature")
sdStatusRequestTimeout = get_new_timeout("sdStatus")
startSeen = not settings().getBoolean(["feature", "waitForStartOnConnect"])
heatingUp = False
@ -668,7 +668,7 @@ class MachineCom(object):
if line is None:
break
if line.strip() is not "":
self._timeout = getNewTimeout("communication")
self._timeout = get_new_timeout("communication")
##~~ debugging output handling
if line.startswith("//"):
@ -714,7 +714,7 @@ class MachineCom(object):
size = None
if valid_file_type(filename, "gcode"):
if filterNonAscii(filename):
if filter_non_ascii(filename):
self._logger.warn("Got a file from printer's SD that has a non-ascii filename (%s), that shouldn't happen according to the protocol" % filename)
else:
if not filename.startswith("/"):
@ -886,12 +886,12 @@ class MachineCom(object):
self._log("Trying baudrate: %d" % (baudrate))
self._baudrateDetectRetry = 5
self._baudrateDetectTestOk = 0
self._timeout = getNewTimeout("communication")
self._timeout = get_new_timeout("communication")
self._serial.write('\n')
self._sendCommand("M105")
self._testingBaudrate = True
except:
self._log("Unexpected error while setting baudrate: %d %s" % (baudrate, getExceptionString()))
self._log("Unexpected error while setting baudrate: %d %s" % (baudrate, get_exception_string()))
elif 'ok' in line and 'T:' in line:
self._baudrateDetectTestOk += 1
if self._baudrateDetectTestOk < 10:
@ -935,7 +935,7 @@ class MachineCom(object):
self._sendCommand(self._commandQueue.get())
else:
self._sendCommand("M105")
tempRequestTimeout = getNewTimeout("temperature")
tempRequestTimeout = get_new_timeout("temperature")
# resend -> start resend procedure from requested line
elif line.lower().startswith("resend") or line.lower().startswith("rs"):
if settings().get(["feature", "swallowOkAfterResend"]):
@ -951,16 +951,16 @@ class MachineCom(object):
if self.isSdPrinting():
if time.time() > tempRequestTimeout and not heatingUp:
self._sendCommand("M105")
tempRequestTimeout = getNewTimeout("temperature")
tempRequestTimeout = get_new_timeout("temperature")
if time.time() > sdStatusRequestTimeout and not heatingUp:
self._sendCommand("M27")
sdStatusRequestTimeout = getNewTimeout("sdStatus")
sdStatusRequestTimeout = get_new_timeout("sdStatus")
else:
# Even when printing request the temperature every 5 seconds.
if time.time() > tempRequestTimeout and not self.isStreaming():
self._commandQueue.put("M105")
tempRequestTimeout = getNewTimeout("temperature")
tempRequestTimeout = get_new_timeout("temperature")
if "ok" in line and swallowOk:
swallowOk = False
@ -1000,7 +1000,7 @@ class MachineCom(object):
self._log("Error while connecting to %s: %s" % (p, str(e)))
pass
except:
self._log("Unexpected error while connecting to serial port: %s %s" % (p, getExceptionString()))
self._log("Unexpected error while connecting to serial port: %s %s" % (p, get_exception_string()))
programmer.close()
if self._serial is None:
self._log("Failed to autodetect serial port")
@ -1023,7 +1023,7 @@ class MachineCom(object):
self._serial.parity = serial.PARITY_NONE
self._serial.open()
except:
self._log("Unexpected error while connecting to serial port: %s %s" % (self._port, getExceptionString()))
self._log("Unexpected error while connecting to serial port: %s %s" % (self._port, get_exception_string()))
self._errorValue = "Failed to open serial port, permissions correct?"
self._changeState(self.STATE_ERROR)
eventManager().fire(Events.ERROR, {"error": self.getErrorString()})
@ -1060,14 +1060,14 @@ class MachineCom(object):
try:
ret = self._serial.readline()
except:
self._log("Unexpected error while reading serial port: %s" % (getExceptionString()))
self._errorValue = getExceptionString()
self._log("Unexpected error while reading serial port: %s" % (get_exception_string()))
self._errorValue = get_exception_string()
self.close(True)
return None
if ret == '':
#self._log("Recv: TIMEOUT")
return ''
self._log("Recv: %s" % sanitizeAscii(ret))
self._log("Recv: %s" % sanitize_ascii(ret))
return ret
def _sendNext(self):
@ -1191,12 +1191,12 @@ class MachineCom(object):
try:
self._serial.write(cmd + '\n')
except:
self._log("Unexpected error while writing serial port: %s" % (getExceptionString()))
self._errorValue = getExceptionString()
self._log("Unexpected error while writing serial port: %s" % (get_exception_string()))
self._errorValue = get_exception_string()
self.close(True)
except:
self._log("Unexpected error while writing serial port: %s" % (getExceptionString()))
self._errorValue = getExceptionString()
self._log("Unexpected error while writing serial port: %s" % (get_exception_string()))
self._errorValue = get_exception_string()
self.close(True)
def _gcode_T(self, cmd):
@ -1301,7 +1301,7 @@ class MachineCom(object):
elif s_idx != -1:
# dwell time is specified in seconds
_timeout = int(cmd[s_idx+1:])
self._timeout = getNewTimeout("communication") + _timeout
self._timeout = get_new_timeout("communication") + _timeout
return cmd
### MachineCom callback ################################################################################################
@ -1547,3 +1547,13 @@ class StreamingGcodeFileInformation(PrintingGcodeFileInformation):
def getRemoteFilename(self):
return self._remoteFilename
def get_new_timeout(type):
now = time.time()
if type not in default_settings["serial"]["timeout"].keys():
# timeout immediately for unknown timeout type
return now
return now + settings().getFloat(["serial", "timeout", type])