From e9ca09da876d227e6017fb15fb39d3f215e0e55f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gina=20H=C3=A4u=C3=9Fge?= Date: Mon, 4 Aug 2014 14:45:46 +0200 Subject: [PATCH] Introduced UploadStorageFallbackHandler which rewrites multipart file upload forms to store the contained files in temporary files (like nginx-upload), made the file upload API in OctoPrint utilize this --- src/octoprint/server/__init__.py | 6 +- src/octoprint/server/api/files.py | 113 ++++++- src/octoprint/server/util.py | 475 ++++++++++++++++++++++++++---- 3 files changed, 540 insertions(+), 54 deletions(-) diff --git a/src/octoprint/server/__init__.py b/src/octoprint/server/__init__.py index 07e6a04e..8f29dd53 100644 --- a/src/octoprint/server/__init__.py +++ b/src/octoprint/server/__init__.py @@ -35,7 +35,8 @@ user_permission = Permission(RoleNeed("user")) # only import the octoprint stuff down here, as it might depend on things defined above to be initialized already from octoprint.server.util import LargeResponseHandler, ReverseProxied, restricted_access, PrinterStateConnection, admin_validator, \ UrlForwardHandler, user_validator, GcodeWatchdogHandler, UploadCleanupWatchdogHandler, \ - access_validation_factory, StreamedWsgiContainer, StreamingFallbackHandler, PrintableFilesUploadHandler + access_validation_factory, WsgiInputContainer, StreamingFallbackHandler, PrintableFilesUploadHandler, \ + UploadStorageFallbackHandler from octoprint.printer import Printer, getConnectionOptions from octoprint.settings import settings import octoprint.gcodefiles as gcodefiles @@ -186,8 +187,7 @@ class Server(): (r"/downloads/files/local/([^/]*\.(gco|gcode))", LargeResponseHandler, dict(path=settings().getBaseFolder("uploads"), as_attachment=True)), (r"/downloads/logs/([^/]*)", LargeResponseHandler, dict(path=settings().getBaseFolder("logs"), as_attachment=True, access_validation=access_validation_factory(app, loginManager, admin_validator))), (r"/downloads/camera/current", UrlForwardHandler, dict(url=settings().get(["webcam", "snapshot"]), as_attachment=True, access_validation=access_validation_factory(app, loginManager, user_validator))), - (r"/api/files/([^/]*)", PrintableFilesUploadHandler, dict(path=settings().getBaseFolder("uploads"), access_validation=access_validation_factory(app, loginManager, user_validator))), - (r".*", FallbackHandler, dict(fallback=WSGIContainer(app.wsgi_app))) + (r".*", UploadStorageFallbackHandler, dict(fallback=WsgiInputContainer(app.wsgi_app), file_prefix="octoprint-file-upload-", file_suffix=".tmp")) ]) self._server = HTTPServer(self._tornado_app, max_body_size=1*1024*1024*1024) self._server.listen(self._port, address=self._host) diff --git a/src/octoprint/server/api/files.py b/src/octoprint/server/api/files.py index 555e730a..eb71c14c 100644 --- a/src/octoprint/server/api/files.py +++ b/src/octoprint/server/api/files.py @@ -86,7 +86,115 @@ def _verifyFileExists(origin, filename): @api.route("/files/", methods=["POST"]) @restricted_access def uploadGcodeFile(target): - return redirect(url_for("index", _external=True) + "uploads/files/" + target, code=307) + if not target in [FileDestinations.LOCAL, FileDestinations.SDCARD]: + return make_response("Unknown target: %s" % target, 404) + + if "file.name" in request.values and "file.path" in request.values: + import shutil + upload = Object() + upload.filename = request.values["file.name"] + upload.save = lambda new_path: shutil.move(request.values["file.path"], new_path) + elif "file" in request.files: + upload = request.files["file"] + else: + return make_response("No file included", 400) + + if target == FileDestinations.SDCARD and not settings().getBoolean(["feature", "sdSupport"]): + return make_response("SD card support is disabled", 404) + + sd = target == FileDestinations.SDCARD + selectAfterUpload = "select" in request.values.keys() and request.values["select"] in valid_boolean_trues + printAfterSelect = "print" in request.values.keys() and request.values["print"] in valid_boolean_trues + + if sd: + # validate that all preconditions for SD upload are met before attempting it + if not (printer.isOperational() and not (printer.isPrinting() or printer.isPaused())): + return make_response("Can not upload to SD card, printer is either not operational or already busy", 409) + if not printer.isSdReady(): + return make_response("Can not upload to SD card, not yet initialized", 409) + + # determine current job + currentFilename = None + currentOrigin = None + currentJob = printer.getCurrentJob() + if currentJob is not None and "file" in currentJob.keys(): + currentJobFile = currentJob["file"] + if "name" in currentJobFile.keys() and "origin" in currentJobFile.keys(): + currentFilename = currentJobFile["name"] + currentOrigin = currentJobFile["origin"] + + # determine future filename of file to be uploaded, abort if it can't be uploaded + futureFilename = gcodeManager.getFutureFilename(upload) + if futureFilename is None or (not settings().getBoolean(["cura", "enabled"]) and not gcodefiles.isGcodeFileName(futureFilename)): + return make_response("Can not upload file %s, wrong format?" % upload.filename, 415) + + # prohibit overwriting currently selected file while it's being printed + if futureFilename == currentFilename and target == currentOrigin and printer.isPrinting() or printer.isPaused(): + return make_response("Trying to overwrite file that is currently being printed: %s" % currentFilename, 409) + + def fileProcessingFinished(filename, absFilename, destination): + """ + Callback for when the file processing (upload, optional slicing, addition to analysis queue) has + finished. + + Depending on the file's destination triggers either streaming to SD card or directly calls selectAndOrPrint. + """ + if destination == FileDestinations.SDCARD: + return filename, printer.addSdFile(filename, absFilename, selectAndOrPrint) + else: + selectAndOrPrint(filename, absFilename, destination) + return filename + + def selectAndOrPrint(filename, absFilename, destination): + """ + Callback for when the file is ready to be selected and optionally printed. For SD file uploads this is only + the case after they have finished streaming to the printer, which is why this callback is also used + for the corresponding call to addSdFile. + + Selects the just uploaded file if either selectAfterUpload or printAfterSelect are True, or if the + exact file is already selected, such reloading it. + """ + if selectAfterUpload or printAfterSelect or (currentFilename == filename and currentOrigin == destination): + printer.selectFile(absFilename, destination == FileDestinations.SDCARD, printAfterSelect) + + filename, done = gcodeManager.addFile(upload, target, fileProcessingFinished) + if filename is None: + return make_response("Could not upload the file %s" % upload.filename, 500) + + sdFilename = None + if isinstance(filename, tuple): + filename, sdFilename = filename + + eventManager.fire(Events.UPLOAD, {"file": filename, "target": target}) + + files = {} + location = url_for(".readGcodeFile", target=FileDestinations.LOCAL, filename=filename, _external=True) + files.update({ + FileDestinations.LOCAL: { + "name": filename, + "origin": FileDestinations.LOCAL, + "refs": { + "resource": location, + "download": url_for("index", _external=True) + "downloads/files/" + FileDestinations.LOCAL + "/" + filename + } + } + }) + + if sd and sdFilename: + location = url_for(".readGcodeFile", target=FileDestinations.SDCARD, filename=sdFilename, _external=True) + files.update({ + FileDestinations.SDCARD: { + "name": sdFilename, + "origin": FileDestinations.SDCARD, + "refs": { + "resource": location + } + } + }) + + r = make_response(jsonify(files=files, done=done), 201) + r.headers["Location"] = location + return r @api.route("/files//", methods=["GET"]) @@ -172,3 +280,6 @@ def deleteGcodeFile(filename, target): return NO_CONTENT + +class Object(object): + pass diff --git a/src/octoprint/server/util.py b/src/octoprint/server/util.py index ca9ee241..f4e6f4ab 100644 --- a/src/octoprint/server/util.py +++ b/src/octoprint/server/util.py @@ -237,44 +237,354 @@ class PrinterStateConnection(SockJSConnection): @stream_request_body class UploadStorageFallbackHandler(RequestHandler): + """ + A `RequestHandler` similar to `tornado.web.FallbackHandler` which fetches any files contained in the request bodies + of content type `multipart`, stores them in temporary files and supplies the `fallback` with the file's `name`, + `content_type`, `path` and `size` instead via a rewritten body. - def initialize(self, delegate, folder): - self.delegate = delegate - self.folder = folder + Basically similar to what the nginx upload module does. - # Parts information will be stored here - self.parts = dict() + Basic request body example: - # files will be stored here - self.files = dict() + ------WebKitFormBoundarypYiSUx63abAmhT5C + Content-Disposition: form-data; name="file"; filename="test.gcode" + Content-Type: application/octet-stream - # values will be stored here - self.values = dict() + ... + ------WebKitFormBoundarypYiSUx63abAmhT5C + Content-Disposition: form-data; name="apikey" + + my_funny_apikey + ------WebKitFormBoundarypYiSUx63abAmhT5C + Content-Disposition: form-data; name="select" + + true + ------WebKitFormBoundarypYiSUx63abAmhT5C-- + + That would get turned into: + + ------WebKitFormBoundarypYiSUx63abAmhT5C + Content-Disposition: form-data; name="apikey" + + my_funny_apikey + ------WebKitFormBoundarypYiSUx63abAmhT5C + Content-Disposition: form-data; name="select" + + true + ------WebKitFormBoundarypYiSUx63abAmhT5C + Content-Disposition: form-data; name="file.path" + + /tmp/tmpzupkro + ------WebKitFormBoundarypYiSUx63abAmhT5C + Content-Disposition: form-data; name="file.name" + + test.gcode + ------WebKitFormBoundarypYiSUx63abAmhT5C + Content-Disposition: form-data; name="file.content_type" + + application/octet-stream + ------WebKitFormBoundarypYiSUx63abAmhT5C + Content-Disposition: form-data; name="file.size" + + 349182 + ------WebKitFormBoundarypYiSUx63abAmhT5C-- + + The underlying application can then access the contained files via their respective paths and just move them + where necessary. + """ + + # the request methods that may contain a request body + BODY_METHODS = ("POST", "PATCH", "PUT") + + def initialize(self, fallback, file_prefix="tmp", file_suffix="", path=None, suffixes=None): + if not suffixes: + suffixes = dict() + + self._fallback = fallback + self._file_prefix = file_prefix + self._file_suffix = file_suffix + self._path = path + + self._suffixes = dict((key, key) for key in ("name", "path", "content_type", "size")) + for suffix_type, suffix in suffixes.iteritems(): + if suffix_type in self._suffixes and suffix is not None: + self._suffixes[suffix_type] = suffix + + # Parts, files and values will be stored here + self._parts = dict() + self._files = [] # Part currently being processed self._current_part = None + + # content type of request body + self._content_type = None + + # bytes left to read according to content_length of request body + self._bytes_left = 0 + # buffer needed for identifying form data parts self._buffer = b"" - # we only support multipart/form-data content, so check for that - self._bytes_left = self.request.headers.get("Content-Length", 0) - content_type = self.request.headers.get("Content-Type", "") - if not self._bytes_left or not content_type.startswith("multipart"): - raise HTTPError(405) + # buffer for new body + self._new_body = b"" - # extract the multipart boundary - fields = content_type.split(";") - for field in fields: - k, sep, v = field.strip().partition("=") - if k == "boundary" and v: - if v.startswith(b'"') and v.endswith(b'"'): - self.boundary = tornado.escape.utf8(v[1:-1]) + # logger + self._logger = logging.getLogger(__name__) + + def prepare(self): + """ + Prepares the processing of the request. If it's a request that may contain a request body (as defined in + `UploadStorageFallbackHandler.BODY_METHODS) prepares the multipart parsing if content type fits. If it's a + body-less request, just calls the `fallback` with an empty body and finishes the request. + """ + if self.request.method in UploadStorageFallbackHandler.BODY_METHODS: + self._bytes_left = self.request.headers.get("Content-Length", 0) + self._content_type = self.request.headers.get("Content-Type", None) + + # request might contain a body + if self.is_multipart(): + if not self._bytes_left: + # we don't support requests without a content-length + raise HTTPError(400, reason="No Content-Length supplied") + + # extract the multipart boundary + fields = self._content_type.split(";") + for field in fields: + k, sep, v = field.strip().partition("=") + if k == "boundary" and v: + if v.startswith(b'"') and v.endswith(b'"'): + self._multipart_boundary = tornado.escape.utf8(v[1:-1]) + else: + self._multipart_boundary = tornado.escape.utf8(v) + break else: - self.boundary = tornado.escape.utf8(v) - break + self._multipart_boundary = None else: - raise HTTPError(400) + self._fallback(self.request, b"") + self._finished = True + def data_received(self, chunk): + """ + Called by Tornado on receiving a chunk of the request body. If request is a multipart request, takes care of + processing the multipart data structure via `self._process_multipart_data`. If not, just adds the chunk to + internal in-memory buffer. + + :param chunk: chunk of data received from Tornado + """ + + data = self._buffer + chunk + if self.is_multipart(): + self._process_multipart_data(data) + else: + self._buffer = data + + def is_multipart(self): + """Checks whether this request is a `multipart` request""" + return self._content_type is not None and self._content_type.startswith("multipart") + + def _process_multipart_data(self, data): + """ + Processes the given data, parsing it for multipart definitions and calling the appropriate methods. + + :param data: the data to process as a string + """ + + # check for boundary + delimiter = b"--%s" % self._multipart_boundary + delimiter_loc = data.find(delimiter) + delimiter_len = len(delimiter) + end_of_header = None + if delimiter_loc != -1: + # found the delimiter in the currently available data + data, self._buffer = data[0:delimiter_loc], data[delimiter_loc:] + end_of_header = self._buffer.find("\r\n\r\n") + else: + # make sure any boundary (with single or double ==) contained at the end of chunk does not get + # truncated by this processing round => save it to the buffer for next round + endlen = len(self._multipart_boundary) + 4 + data, self._buffer = data[0:-endlen], data[-endlen:] + + # stream data to part handler + if data: + if self._current_part: + self._on_data(self._current_part, data) + + if end_of_header >= 0: + self._on_header(self._buffer[delimiter_len+2:end_of_header]) + self._buffer = self._buffer[end_of_header + 4:] + + if delimiter_loc != -1 and self._buffer[delimiter_len:delimiter_len+2] == "--": + # we saw the last boundary and are at the end of our request + if self._current_part: + self._on_close(self._current_part) + self._current_part = None + self._buffer = b"" + self._on_finish() + + def _on_header(self, header): + """ + Called for a new multipart header, takes care of parsing the header and calling `self._on_part` with the + relevant data, setting the current part in the process. + + :param header: header to parse + """ + + # close any open parts + if self._current_part: + self._on_close(self._current_part) + self._current_part = None + + header_check = header.find(self._multipart_boundary) + if header_check != -1: + self._logger.warn("Header still contained multipart boundary, stripping it...") + header = header[header_check:] + + # convert to dict + header = HTTPHeaders.parse(header.decode("utf-8")) + disp_header = header.get("Content-Disposition", "") + disposition, disp_params = tornado.httputil._parse_header(disp_header) + + if disposition != "form-data": + self._logger.warn("Got a multipart header without form-data content disposition, ignoring that one") + return + if not disp_params.get("name"): + self._logger.warn("Got a multipart header without name, ignoring that one") + return + + self._current_part = self._on_part(disp_params["name"], header.get("Content-Type", None), filename=disp_params["filename"] if "filename" in disp_params else None) + + def _on_part(self, name, content_type, filename=None): + """ + Called for new parts in the multipart stream. If `filename` is given creates new `file` part (which leads + to storage of the data as temporary file on disk), if not creates a new `data` part (which stores + incoming data in memory). + + Structure of `file` parts: + + * `name`: name of the part + * `filename`: filename associated with the part + * `path`: path to the temporary file storing the file's data + * `content_type`: content type of the part + * `file`: file handle for the temporary file (mode "wb", not deleted on close!) + + Structure of `data` parts: + + * `name`: name of the part + * `content_type`: content type of the part + * `data`: bytes of the part (initialized to "") + + :param name: name of the part + :param content_type: content type of the part + :param filename: filename associated with the part. + :return: dict describing the new part + """ + if filename is not None: + # this is a file + import tempfile + handle = tempfile.NamedTemporaryFile(mode="wb", prefix=self._file_prefix, suffix=self._file_suffix, dir=self._path, delete=False) + return dict(name=tornado.escape.utf8(name), + filename=tornado.escape.utf8(filename), + path=tornado.escape.utf8(handle.name), + content_type=tornado.escape.utf8(content_type), + file=handle) + + else: + return dict(name=tornado.escape.utf8(name), content_type=content_type, data=b"") + + def _on_data(self, part, data): + """ + Called when new bytes are received for the given `part`, takes care of writing them to their storage. + + :param part: part for which data was received + :param data: data chunk which was received + """ + if "file" in part: + part["file"].write(data) + else: + part["data"] += data + + def _on_close(self, part): + """ + Called when a part gets closed, takes care of storing the finished part in the internal parts storage and for + `file` parts closing the temporary file and storing the part in the internal files storage. + + :param part: part which was closed + """ + name = part["name"] + self._parts[name] = part + if "file" in part: + self._files.append(part["path"]) + part["file"].close() + del part["file"] + + def _on_finish(self): + """ + Called when the request body has been read completely. Takes care of creating the replacement body out of the + logged parts, turning `file` parts into new + :return: + """ + + self._new_body = b"" + for name, part in self._parts.iteritems(): + if "filename" in part: + # add form fields for filename, path, size and content_type for all files contained in the request + fields = dict((self._suffixes[key], value) for (key, value) in dict(name=part["filename"], path=part["path"], size=str(os.stat(part["path"]).st_size), content_type=part["content_type"]).iteritems()) + for n, p in fields.iteritems(): + key = name + "." + n + self._new_body += b"--%s\r\n" % self._multipart_boundary + self._new_body += b"Content-Disposition: form-data; name=\"%s\"\r\n" % key + self._new_body += b"\r\n" + self._new_body += p + b"\r\n" + elif "data" in part: + self._new_body += b"--%s\r\n" % self._multipart_boundary + value = part["data"] + self._new_body += b"Content-Disposition: form-data; name=\"%s\"\r\n" % name + if "content_type" in part and part["content_type"] is not None: + self._new_body += b"Content-Type: %s\r\n" % part["content_type"] + self._new_body += b"\r\n" + self._new_body += value + self._new_body += b"--%s--\r\n" % self._multipart_boundary + + def _handle_method(self, *args, **kwargs): + """ + Handler for any request method, takes care of defining the new request body if necessary and forwarding + the current request and changed body to the `fallback`. + """ + + # determine which body to supply + body = b"" + if self.is_multipart(): + # make sure we really processed all data in the buffer + while len(self._buffer): + self._process_multipart_data(self._buffer) + + # use rewritten body + body = self._new_body + + elif self.request.method in UploadStorageFallbackHandler.BODY_METHODS: + # directly use data from buffer + body = self._buffer + + # rewrite content length + self.request.headers["Content-Length"] = len(body) + + try: + # call the configured fallback with request and body to use + self._fallback(self.request, body) + finally: + # make sure the temporary files are removed again + for f in self._files: + util.silentRemove(f) + + # make all http methods trigger _handle_method + get = _handle_method + post = _handle_method + put = _handle_method + patch = _handle_method + delete = _handle_method + head = _handle_method + options = _handle_method @stream_request_body @@ -294,39 +604,88 @@ class StreamingFallbackHandler(FallbackHandler): (r".*", StreamingFallbackHandler, dict(fallback=wsgi_app), ]) """ - def initialize(self, fallback): - self.fallback = fallback - def data_received(self, chunk): - self._tmpfile.write(chunk) - self._length_left -= len(chunk) - if self._length_left <= 0: - self.finished_body() + NO_BODY_METHODS = ("GET", "HEAD", "OPTIONS") - def finished_body(self): - self._tmpfile.seek(0) + def initialize(self, fallback, max_size=50*1024*1024*1024, file_prefix="tmp"): + self._fallback = fallback + self._max_size = max_size + self._file_prefix = file_prefix def prepare(self): - self._length_left = int(self.request.headers.get("Content-Length", 0)) - self._tmpfile = TemporaryFile() + if self.request.method not in StreamingFallbackHandler.NO_BODY_METHODS: + import tempfile + self._length_left = int(self.request.headers.get("Content-Length", 0)) + self._tempfile = tempfile.TemporaryFile(prefix=self._file_prefix) + else: + import io + self._fallback(self.request, io.BytesIO()) + self._finished = True - def __getattribute__(self, name): - if name in ("get", "post", "put", "patch"): - try: - body_stream = self._tmpfile - self.fallback(self.request, body_stream) - finally: - self._tmpfile.close() - return lambda *args, **kwargs: None - return object.__getattribute__(self, name) + def data_received(self, chunk): + if self.request.method not in StreamingFallbackHandler.NO_BODY_METHODS: + self._tempfile.write(chunk) + self._length_left -= len(chunk) + if self._length_left <= 0: + self.finished_body() + + def finished_body(self): + self._tempfile.seek(0) + + def get(self, *args, **kwargs): + return self._handle_method() + + def post(self, *args, **kwargs): + return self._handle_method() + + def put(self, *args, **kwargs): + return self._handle_method() + + def delete(self, *args, **kwargs): + return self._handle_method() + + def head(self, *args, **kwargs): + return self._handle_method() + + def options(self, *args, **kwargs): + return self._handle_method() + + def _handle_method(self): + try: + self._fallback(self.request, self._tempfile) + finally: + self._tempfile.close() -class StreamedWsgiContainer(object): +class WsgiInputContainer(object): + """ + A WSGI container for use with Tornado that allows supplying the request body to be used for `wsgi.input` in the + generated WSGI environment upon call. + + A `RequestHandler` can thus provide the WSGI application with a stream for the request body, or a modified body. + + Example usage: + + wsgi_app = octoprint.server.util.WsgiInputContainer(octoprint_app) + application = tornado.web.Application([ + (r".*", UploadStorageFallbackHandler, dict(fallback=wsgi_app), + ]) + + The implementation logic is basically the same as `tornado.wsgi.WSGIContainer` but the `__call__` and `environ` + + """ def __init__(self, wsgi_application): self.wsgi_application = wsgi_application - def __call__(self, request, body_stream): + def __call__(self, request, body=None): + """ + Wraps the call against the WSGI app, deriving the WSGI environment from the supplied Tornado `HTTPServerRequest`. + + :param request: the `tornado.httpserver.HTTPServerRequest` to derive the WSGI environment from + :param body: an optional body to use as `wsgi.input` instead of `request.body`, can be a string or a stream + """ + data = {} response = [] @@ -335,7 +694,7 @@ class StreamedWsgiContainer(object): data["headers"] = response_headers return response.append app_response = self.wsgi_application( - StreamedWsgiContainer.environ(request, body_stream), start_response) + WsgiInputContainer.environ(request, body), start_response) try: response.extend(app_response) body = b"".join(response) @@ -367,13 +726,29 @@ class StreamedWsgiContainer(object): self._log(status_code, request) @staticmethod - def environ(request, body_stream): - """Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment. + def environ(request, body=None): + """ + Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment. + + An optional `body` to be used for populating `wsgi.input` can be supplied (either a string or a stream). If not + supplied, `request.body` will be wrapped into a `io.BytesIO` stream and used instead. + + :param request: the `tornado.httpserver.HTTPServerRequest` to derive the WSGI environment from + :param body: an optional body to use as `wsgi.input` instead of `request.body`, can be a string or a stream """ from tornado.wsgi import to_wsgi_str import sys import io + # determine the request_body to supply as wsgi.input + if body is not None: + if isinstance(body, (bytes, str)): + request_body = io.BytesIO(tornado.escape.utf8(body)) + else: + request_body = body + else: + request_body = io.BytesIO(tornado.escape.utf8(request.body)) + hostport = request.host.split(":") if len(hostport) == 2: host = hostport[0] @@ -393,7 +768,7 @@ class StreamedWsgiContainer(object): "SERVER_PROTOCOL": request.version, "wsgi.version": (1, 0), "wsgi.url_scheme": request.protocol, - "wsgi.input": body_stream, + "wsgi.input": request_body, "wsgi.errors": sys.stderr, "wsgi.multithread": False, "wsgi.multiprocess": True,