diff --git a/src/octoprint/server/api/__init__.py b/src/octoprint/server/api/__init__.py index 735155da..82d5d0f4 100644 --- a/src/octoprint/server/api/__init__.py +++ b/src/octoprint/server/api/__init__.py @@ -226,3 +226,112 @@ def _logout(user): if "usersession.id" in session: del session["usersession.id"] octoprint.server.userManager.logout_user(user) + +@api.route("/util/test", methods=["POST"]) +@restricted_access +@admin_permission.require(403) +def utilTestPath(): + valid_commands = dict( + path=["path"], + url=["url"] + ) + + command, data, response = get_json_command_from_request(request, valid_commands) + if response is not None: + return response + + if command == "path": + import os + from octoprint.util.paths import normalize + + path = normalize(data["path"]) + check_type = None + check_access = [] + + if "check_type" in data and data["check_type"] in ("file", "dir"): + check_type = data["check_type"] + + if "check_access" in data: + request_check_access = data["check_access"] + if not isinstance(request_check_access, list): + request_check_access = list(request_check_access) + + check_access = [check for check in request_check_access if check in ("r", "w", "x")] + + exists = os.path.exists(path) + + # check if path exists + type_mapping = dict(file=os.path.isfile, dir=os.path.isdir) + if check_type: + typeok = type_mapping[check_type](path) + else: + typeok = True + + # check if path allows requested access + access_mapping = dict(r=os.R_OK, w=os.W_OK, x=os.X_OK) + if check_access: + access = os.access(path, reduce(lambda x, y: x | y, map(lambda a: access_mapping[a], check_access))) + else: + access = True + + return jsonify(path=path, exists=exists, typeok=typeok, access=access, result=exists and typeok and access) + + elif command == "url": + import requests + + class StatusCodeRange(object): + def __init__(self, start=None, end=None): + self.start = start + self.end = end + + def __contains__(self, item): + if not isinstance(item, int): + return False + if self.start and self.end: + return self.start <= item < self.end + elif self.start: + return self.start <= item + elif self.end: + return item < self.end + else: + return False + + status_ranges = dict( + informational=StatusCodeRange(start=100,end=200), + success=StatusCodeRange(start=200,end=300), + redirection=StatusCodeRange(start=300,end=400), + client_error=StatusCodeRange(start=400,end=500), + server_error=StatusCodeRange(start=500), + normal=StatusCodeRange(end=400), + error=StatusCodeRange(start=400), + any=StatusCodeRange(start=100) + ) + + url = data["url"] + method = "HEAD" + check_status = [status_ranges["normal"]] + + if "method" in data: + method = data["method"] + + if "status" in data: + request_status = data["status"] + if not isinstance(request_status, list): + request_status = [request_status] + + check_status = [] + for rs in request_status: + if rs in status_ranges: + check_status.append(status_ranges[rs]) + else: + code = requests.codes[rs] + if code is not None: + check_status.append([code]) + + try: + response = requests.request(method=method, url=url) + status = reduce(lambda x, y: x and response.status_code in y, check_status) + except: + status = False + + return jsonify(url=url, status=response.status_code, result=status) diff --git a/src/octoprint/util/paths.py b/src/octoprint/util/paths.py new file mode 100644 index 00000000..89a45ba3 --- /dev/null +++ b/src/octoprint/util/paths.py @@ -0,0 +1,19 @@ +# coding=utf-8 +from __future__ import absolute_import + +__author__ = "Gina Häußge " +__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html' +__copyright__ = "Copyright (C) 2015 The OctoPrint Project - Released under terms of the AGPLv3 License" + +def normalize(path, expand_user=True, expand_vars=True, **kwargs): + import os + + if path is None: + return None + + if expand_user: + path = os.path.expanduser(path) + if expand_vars: + path = os.path.expandvars(path) + path = os.path.realpath(os.path.abspath(path)) + return path