New util resource for testing paths and urls from the frontend
This commit is contained in:
parent
29b3b62c0d
commit
e2894c25b5
2 changed files with 128 additions and 0 deletions
|
|
@ -226,3 +226,112 @@ def _logout(user):
|
|||
if "usersession.id" in session:
|
||||
del session["usersession.id"]
|
||||
octoprint.server.userManager.logout_user(user)
|
||||
|
||||
@api.route("/util/test", methods=["POST"])
|
||||
@restricted_access
|
||||
@admin_permission.require(403)
|
||||
def utilTestPath():
|
||||
valid_commands = dict(
|
||||
path=["path"],
|
||||
url=["url"]
|
||||
)
|
||||
|
||||
command, data, response = get_json_command_from_request(request, valid_commands)
|
||||
if response is not None:
|
||||
return response
|
||||
|
||||
if command == "path":
|
||||
import os
|
||||
from octoprint.util.paths import normalize
|
||||
|
||||
path = normalize(data["path"])
|
||||
check_type = None
|
||||
check_access = []
|
||||
|
||||
if "check_type" in data and data["check_type"] in ("file", "dir"):
|
||||
check_type = data["check_type"]
|
||||
|
||||
if "check_access" in data:
|
||||
request_check_access = data["check_access"]
|
||||
if not isinstance(request_check_access, list):
|
||||
request_check_access = list(request_check_access)
|
||||
|
||||
check_access = [check for check in request_check_access if check in ("r", "w", "x")]
|
||||
|
||||
exists = os.path.exists(path)
|
||||
|
||||
# check if path exists
|
||||
type_mapping = dict(file=os.path.isfile, dir=os.path.isdir)
|
||||
if check_type:
|
||||
typeok = type_mapping[check_type](path)
|
||||
else:
|
||||
typeok = True
|
||||
|
||||
# check if path allows requested access
|
||||
access_mapping = dict(r=os.R_OK, w=os.W_OK, x=os.X_OK)
|
||||
if check_access:
|
||||
access = os.access(path, reduce(lambda x, y: x | y, map(lambda a: access_mapping[a], check_access)))
|
||||
else:
|
||||
access = True
|
||||
|
||||
return jsonify(path=path, exists=exists, typeok=typeok, access=access, result=exists and typeok and access)
|
||||
|
||||
elif command == "url":
|
||||
import requests
|
||||
|
||||
class StatusCodeRange(object):
|
||||
def __init__(self, start=None, end=None):
|
||||
self.start = start
|
||||
self.end = end
|
||||
|
||||
def __contains__(self, item):
|
||||
if not isinstance(item, int):
|
||||
return False
|
||||
if self.start and self.end:
|
||||
return self.start <= item < self.end
|
||||
elif self.start:
|
||||
return self.start <= item
|
||||
elif self.end:
|
||||
return item < self.end
|
||||
else:
|
||||
return False
|
||||
|
||||
status_ranges = dict(
|
||||
informational=StatusCodeRange(start=100,end=200),
|
||||
success=StatusCodeRange(start=200,end=300),
|
||||
redirection=StatusCodeRange(start=300,end=400),
|
||||
client_error=StatusCodeRange(start=400,end=500),
|
||||
server_error=StatusCodeRange(start=500),
|
||||
normal=StatusCodeRange(end=400),
|
||||
error=StatusCodeRange(start=400),
|
||||
any=StatusCodeRange(start=100)
|
||||
)
|
||||
|
||||
url = data["url"]
|
||||
method = "HEAD"
|
||||
check_status = [status_ranges["normal"]]
|
||||
|
||||
if "method" in data:
|
||||
method = data["method"]
|
||||
|
||||
if "status" in data:
|
||||
request_status = data["status"]
|
||||
if not isinstance(request_status, list):
|
||||
request_status = [request_status]
|
||||
|
||||
check_status = []
|
||||
for rs in request_status:
|
||||
if rs in status_ranges:
|
||||
check_status.append(status_ranges[rs])
|
||||
else:
|
||||
code = requests.codes[rs]
|
||||
if code is not None:
|
||||
check_status.append([code])
|
||||
|
||||
try:
|
||||
response = requests.request(method=method, url=url)
|
||||
status = reduce(lambda x, y: x and response.status_code in y, check_status)
|
||||
except:
|
||||
status = False
|
||||
|
||||
return jsonify(url=url, status=response.status_code, result=status)
|
||||
|
|
|
|||
19
src/octoprint/util/paths.py
Normal file
19
src/octoprint/util/paths.py
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
# coding=utf-8
|
||||
from __future__ import absolute_import
|
||||
|
||||
__author__ = "Gina Häußge <osd@foosel.net>"
|
||||
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
|
||||
__copyright__ = "Copyright (C) 2015 The OctoPrint Project - Released under terms of the AGPLv3 License"
|
||||
|
||||
def normalize(path, expand_user=True, expand_vars=True, **kwargs):
|
||||
import os
|
||||
|
||||
if path is None:
|
||||
return None
|
||||
|
||||
if expand_user:
|
||||
path = os.path.expanduser(path)
|
||||
if expand_vars:
|
||||
path = os.path.expandvars(path)
|
||||
path = os.path.realpath(os.path.abspath(path))
|
||||
return path
|
||||
Loading…
Reference in a new issue