Introduced UploadStorageFallbackHandler which rewrites multipart file upload forms to store the contained files in temporary files (like nginx-upload), made the file upload API in OctoPrint utilize this

This commit is contained in:
Gina Häußge 2014-08-04 14:45:46 +02:00
parent d770c20b5a
commit e9ca09da87
3 changed files with 540 additions and 54 deletions

View file

@ -35,7 +35,8 @@ user_permission = Permission(RoleNeed("user"))
# only import the octoprint stuff down here, as it might depend on things defined above to be initialized already
from octoprint.server.util import LargeResponseHandler, ReverseProxied, restricted_access, PrinterStateConnection, admin_validator, \
UrlForwardHandler, user_validator, GcodeWatchdogHandler, UploadCleanupWatchdogHandler, \
access_validation_factory, StreamedWsgiContainer, StreamingFallbackHandler, PrintableFilesUploadHandler
access_validation_factory, WsgiInputContainer, StreamingFallbackHandler, PrintableFilesUploadHandler, \
UploadStorageFallbackHandler
from octoprint.printer import Printer, getConnectionOptions
from octoprint.settings import settings
import octoprint.gcodefiles as gcodefiles
@ -186,8 +187,7 @@ class Server():
(r"/downloads/files/local/([^/]*\.(gco|gcode))", LargeResponseHandler, dict(path=settings().getBaseFolder("uploads"), as_attachment=True)),
(r"/downloads/logs/([^/]*)", LargeResponseHandler, dict(path=settings().getBaseFolder("logs"), as_attachment=True, access_validation=access_validation_factory(app, loginManager, admin_validator))),
(r"/downloads/camera/current", UrlForwardHandler, dict(url=settings().get(["webcam", "snapshot"]), as_attachment=True, access_validation=access_validation_factory(app, loginManager, user_validator))),
(r"/api/files/([^/]*)", PrintableFilesUploadHandler, dict(path=settings().getBaseFolder("uploads"), access_validation=access_validation_factory(app, loginManager, user_validator))),
(r".*", FallbackHandler, dict(fallback=WSGIContainer(app.wsgi_app)))
(r".*", UploadStorageFallbackHandler, dict(fallback=WsgiInputContainer(app.wsgi_app), file_prefix="octoprint-file-upload-", file_suffix=".tmp"))
])
self._server = HTTPServer(self._tornado_app, max_body_size=1*1024*1024*1024)
self._server.listen(self._port, address=self._host)

View file

@ -86,7 +86,115 @@ def _verifyFileExists(origin, filename):
@api.route("/files/<string:target>", methods=["POST"])
@restricted_access
def uploadGcodeFile(target):
return redirect(url_for("index", _external=True) + "uploads/files/" + target, code=307)
if not target in [FileDestinations.LOCAL, FileDestinations.SDCARD]:
return make_response("Unknown target: %s" % target, 404)
if "file.name" in request.values and "file.path" in request.values:
import shutil
upload = Object()
upload.filename = request.values["file.name"]
upload.save = lambda new_path: shutil.move(request.values["file.path"], new_path)
elif "file" in request.files:
upload = request.files["file"]
else:
return make_response("No file included", 400)
if target == FileDestinations.SDCARD and not settings().getBoolean(["feature", "sdSupport"]):
return make_response("SD card support is disabled", 404)
sd = target == FileDestinations.SDCARD
selectAfterUpload = "select" in request.values.keys() and request.values["select"] in valid_boolean_trues
printAfterSelect = "print" in request.values.keys() and request.values["print"] in valid_boolean_trues
if sd:
# validate that all preconditions for SD upload are met before attempting it
if not (printer.isOperational() and not (printer.isPrinting() or printer.isPaused())):
return make_response("Can not upload to SD card, printer is either not operational or already busy", 409)
if not printer.isSdReady():
return make_response("Can not upload to SD card, not yet initialized", 409)
# determine current job
currentFilename = None
currentOrigin = None
currentJob = printer.getCurrentJob()
if currentJob is not None and "file" in currentJob.keys():
currentJobFile = currentJob["file"]
if "name" in currentJobFile.keys() and "origin" in currentJobFile.keys():
currentFilename = currentJobFile["name"]
currentOrigin = currentJobFile["origin"]
# determine future filename of file to be uploaded, abort if it can't be uploaded
futureFilename = gcodeManager.getFutureFilename(upload)
if futureFilename is None or (not settings().getBoolean(["cura", "enabled"]) and not gcodefiles.isGcodeFileName(futureFilename)):
return make_response("Can not upload file %s, wrong format?" % upload.filename, 415)
# prohibit overwriting currently selected file while it's being printed
if futureFilename == currentFilename and target == currentOrigin and printer.isPrinting() or printer.isPaused():
return make_response("Trying to overwrite file that is currently being printed: %s" % currentFilename, 409)
def fileProcessingFinished(filename, absFilename, destination):
"""
Callback for when the file processing (upload, optional slicing, addition to analysis queue) has
finished.
Depending on the file's destination triggers either streaming to SD card or directly calls selectAndOrPrint.
"""
if destination == FileDestinations.SDCARD:
return filename, printer.addSdFile(filename, absFilename, selectAndOrPrint)
else:
selectAndOrPrint(filename, absFilename, destination)
return filename
def selectAndOrPrint(filename, absFilename, destination):
"""
Callback for when the file is ready to be selected and optionally printed. For SD file uploads this is only
the case after they have finished streaming to the printer, which is why this callback is also used
for the corresponding call to addSdFile.
Selects the just uploaded file if either selectAfterUpload or printAfterSelect are True, or if the
exact file is already selected, such reloading it.
"""
if selectAfterUpload or printAfterSelect or (currentFilename == filename and currentOrigin == destination):
printer.selectFile(absFilename, destination == FileDestinations.SDCARD, printAfterSelect)
filename, done = gcodeManager.addFile(upload, target, fileProcessingFinished)
if filename is None:
return make_response("Could not upload the file %s" % upload.filename, 500)
sdFilename = None
if isinstance(filename, tuple):
filename, sdFilename = filename
eventManager.fire(Events.UPLOAD, {"file": filename, "target": target})
files = {}
location = url_for(".readGcodeFile", target=FileDestinations.LOCAL, filename=filename, _external=True)
files.update({
FileDestinations.LOCAL: {
"name": filename,
"origin": FileDestinations.LOCAL,
"refs": {
"resource": location,
"download": url_for("index", _external=True) + "downloads/files/" + FileDestinations.LOCAL + "/" + filename
}
}
})
if sd and sdFilename:
location = url_for(".readGcodeFile", target=FileDestinations.SDCARD, filename=sdFilename, _external=True)
files.update({
FileDestinations.SDCARD: {
"name": sdFilename,
"origin": FileDestinations.SDCARD,
"refs": {
"resource": location
}
}
})
r = make_response(jsonify(files=files, done=done), 201)
r.headers["Location"] = location
return r
@api.route("/files/<string:target>/<path:filename>", methods=["GET"])
@ -172,3 +280,6 @@ def deleteGcodeFile(filename, target):
return NO_CONTENT
class Object(object):
pass

View file

@ -237,44 +237,354 @@ class PrinterStateConnection(SockJSConnection):
@stream_request_body
class UploadStorageFallbackHandler(RequestHandler):
"""
A `RequestHandler` similar to `tornado.web.FallbackHandler` which fetches any files contained in the request bodies
of content type `multipart`, stores them in temporary files and supplies the `fallback` with the file's `name`,
`content_type`, `path` and `size` instead via a rewritten body.
def initialize(self, delegate, folder):
self.delegate = delegate
self.folder = folder
Basically similar to what the nginx upload module does.
# Parts information will be stored here
self.parts = dict()
Basic request body example:
# files will be stored here
self.files = dict()
------WebKitFormBoundarypYiSUx63abAmhT5C
Content-Disposition: form-data; name="file"; filename="test.gcode"
Content-Type: application/octet-stream
# values will be stored here
self.values = dict()
...
------WebKitFormBoundarypYiSUx63abAmhT5C
Content-Disposition: form-data; name="apikey"
my_funny_apikey
------WebKitFormBoundarypYiSUx63abAmhT5C
Content-Disposition: form-data; name="select"
true
------WebKitFormBoundarypYiSUx63abAmhT5C--
That would get turned into:
------WebKitFormBoundarypYiSUx63abAmhT5C
Content-Disposition: form-data; name="apikey"
my_funny_apikey
------WebKitFormBoundarypYiSUx63abAmhT5C
Content-Disposition: form-data; name="select"
true
------WebKitFormBoundarypYiSUx63abAmhT5C
Content-Disposition: form-data; name="file.path"
/tmp/tmpzupkro
------WebKitFormBoundarypYiSUx63abAmhT5C
Content-Disposition: form-data; name="file.name"
test.gcode
------WebKitFormBoundarypYiSUx63abAmhT5C
Content-Disposition: form-data; name="file.content_type"
application/octet-stream
------WebKitFormBoundarypYiSUx63abAmhT5C
Content-Disposition: form-data; name="file.size"
349182
------WebKitFormBoundarypYiSUx63abAmhT5C--
The underlying application can then access the contained files via their respective paths and just move them
where necessary.
"""
# the request methods that may contain a request body
BODY_METHODS = ("POST", "PATCH", "PUT")
def initialize(self, fallback, file_prefix="tmp", file_suffix="", path=None, suffixes=None):
if not suffixes:
suffixes = dict()
self._fallback = fallback
self._file_prefix = file_prefix
self._file_suffix = file_suffix
self._path = path
self._suffixes = dict((key, key) for key in ("name", "path", "content_type", "size"))
for suffix_type, suffix in suffixes.iteritems():
if suffix_type in self._suffixes and suffix is not None:
self._suffixes[suffix_type] = suffix
# Parts, files and values will be stored here
self._parts = dict()
self._files = []
# Part currently being processed
self._current_part = None
# content type of request body
self._content_type = None
# bytes left to read according to content_length of request body
self._bytes_left = 0
# buffer needed for identifying form data parts
self._buffer = b""
# we only support multipart/form-data content, so check for that
self._bytes_left = self.request.headers.get("Content-Length", 0)
content_type = self.request.headers.get("Content-Type", "")
if not self._bytes_left or not content_type.startswith("multipart"):
raise HTTPError(405)
# buffer for new body
self._new_body = b""
# extract the multipart boundary
fields = content_type.split(";")
for field in fields:
k, sep, v = field.strip().partition("=")
if k == "boundary" and v:
if v.startswith(b'"') and v.endswith(b'"'):
self.boundary = tornado.escape.utf8(v[1:-1])
# logger
self._logger = logging.getLogger(__name__)
def prepare(self):
"""
Prepares the processing of the request. If it's a request that may contain a request body (as defined in
`UploadStorageFallbackHandler.BODY_METHODS) prepares the multipart parsing if content type fits. If it's a
body-less request, just calls the `fallback` with an empty body and finishes the request.
"""
if self.request.method in UploadStorageFallbackHandler.BODY_METHODS:
self._bytes_left = self.request.headers.get("Content-Length", 0)
self._content_type = self.request.headers.get("Content-Type", None)
# request might contain a body
if self.is_multipart():
if not self._bytes_left:
# we don't support requests without a content-length
raise HTTPError(400, reason="No Content-Length supplied")
# extract the multipart boundary
fields = self._content_type.split(";")
for field in fields:
k, sep, v = field.strip().partition("=")
if k == "boundary" and v:
if v.startswith(b'"') and v.endswith(b'"'):
self._multipart_boundary = tornado.escape.utf8(v[1:-1])
else:
self._multipart_boundary = tornado.escape.utf8(v)
break
else:
self.boundary = tornado.escape.utf8(v)
break
self._multipart_boundary = None
else:
raise HTTPError(400)
self._fallback(self.request, b"")
self._finished = True
def data_received(self, chunk):
"""
Called by Tornado on receiving a chunk of the request body. If request is a multipart request, takes care of
processing the multipart data structure via `self._process_multipart_data`. If not, just adds the chunk to
internal in-memory buffer.
:param chunk: chunk of data received from Tornado
"""
data = self._buffer + chunk
if self.is_multipart():
self._process_multipart_data(data)
else:
self._buffer = data
def is_multipart(self):
"""Checks whether this request is a `multipart` request"""
return self._content_type is not None and self._content_type.startswith("multipart")
def _process_multipart_data(self, data):
"""
Processes the given data, parsing it for multipart definitions and calling the appropriate methods.
:param data: the data to process as a string
"""
# check for boundary
delimiter = b"--%s" % self._multipart_boundary
delimiter_loc = data.find(delimiter)
delimiter_len = len(delimiter)
end_of_header = None
if delimiter_loc != -1:
# found the delimiter in the currently available data
data, self._buffer = data[0:delimiter_loc], data[delimiter_loc:]
end_of_header = self._buffer.find("\r\n\r\n")
else:
# make sure any boundary (with single or double ==) contained at the end of chunk does not get
# truncated by this processing round => save it to the buffer for next round
endlen = len(self._multipart_boundary) + 4
data, self._buffer = data[0:-endlen], data[-endlen:]
# stream data to part handler
if data:
if self._current_part:
self._on_data(self._current_part, data)
if end_of_header >= 0:
self._on_header(self._buffer[delimiter_len+2:end_of_header])
self._buffer = self._buffer[end_of_header + 4:]
if delimiter_loc != -1 and self._buffer[delimiter_len:delimiter_len+2] == "--":
# we saw the last boundary and are at the end of our request
if self._current_part:
self._on_close(self._current_part)
self._current_part = None
self._buffer = b""
self._on_finish()
def _on_header(self, header):
"""
Called for a new multipart header, takes care of parsing the header and calling `self._on_part` with the
relevant data, setting the current part in the process.
:param header: header to parse
"""
# close any open parts
if self._current_part:
self._on_close(self._current_part)
self._current_part = None
header_check = header.find(self._multipart_boundary)
if header_check != -1:
self._logger.warn("Header still contained multipart boundary, stripping it...")
header = header[header_check:]
# convert to dict
header = HTTPHeaders.parse(header.decode("utf-8"))
disp_header = header.get("Content-Disposition", "")
disposition, disp_params = tornado.httputil._parse_header(disp_header)
if disposition != "form-data":
self._logger.warn("Got a multipart header without form-data content disposition, ignoring that one")
return
if not disp_params.get("name"):
self._logger.warn("Got a multipart header without name, ignoring that one")
return
self._current_part = self._on_part(disp_params["name"], header.get("Content-Type", None), filename=disp_params["filename"] if "filename" in disp_params else None)
def _on_part(self, name, content_type, filename=None):
"""
Called for new parts in the multipart stream. If `filename` is given creates new `file` part (which leads
to storage of the data as temporary file on disk), if not creates a new `data` part (which stores
incoming data in memory).
Structure of `file` parts:
* `name`: name of the part
* `filename`: filename associated with the part
* `path`: path to the temporary file storing the file's data
* `content_type`: content type of the part
* `file`: file handle for the temporary file (mode "wb", not deleted on close!)
Structure of `data` parts:
* `name`: name of the part
* `content_type`: content type of the part
* `data`: bytes of the part (initialized to "")
:param name: name of the part
:param content_type: content type of the part
:param filename: filename associated with the part.
:return: dict describing the new part
"""
if filename is not None:
# this is a file
import tempfile
handle = tempfile.NamedTemporaryFile(mode="wb", prefix=self._file_prefix, suffix=self._file_suffix, dir=self._path, delete=False)
return dict(name=tornado.escape.utf8(name),
filename=tornado.escape.utf8(filename),
path=tornado.escape.utf8(handle.name),
content_type=tornado.escape.utf8(content_type),
file=handle)
else:
return dict(name=tornado.escape.utf8(name), content_type=content_type, data=b"")
def _on_data(self, part, data):
"""
Called when new bytes are received for the given `part`, takes care of writing them to their storage.
:param part: part for which data was received
:param data: data chunk which was received
"""
if "file" in part:
part["file"].write(data)
else:
part["data"] += data
def _on_close(self, part):
"""
Called when a part gets closed, takes care of storing the finished part in the internal parts storage and for
`file` parts closing the temporary file and storing the part in the internal files storage.
:param part: part which was closed
"""
name = part["name"]
self._parts[name] = part
if "file" in part:
self._files.append(part["path"])
part["file"].close()
del part["file"]
def _on_finish(self):
"""
Called when the request body has been read completely. Takes care of creating the replacement body out of the
logged parts, turning `file` parts into new
:return:
"""
self._new_body = b""
for name, part in self._parts.iteritems():
if "filename" in part:
# add form fields for filename, path, size and content_type for all files contained in the request
fields = dict((self._suffixes[key], value) for (key, value) in dict(name=part["filename"], path=part["path"], size=str(os.stat(part["path"]).st_size), content_type=part["content_type"]).iteritems())
for n, p in fields.iteritems():
key = name + "." + n
self._new_body += b"--%s\r\n" % self._multipart_boundary
self._new_body += b"Content-Disposition: form-data; name=\"%s\"\r\n" % key
self._new_body += b"\r\n"
self._new_body += p + b"\r\n"
elif "data" in part:
self._new_body += b"--%s\r\n" % self._multipart_boundary
value = part["data"]
self._new_body += b"Content-Disposition: form-data; name=\"%s\"\r\n" % name
if "content_type" in part and part["content_type"] is not None:
self._new_body += b"Content-Type: %s\r\n" % part["content_type"]
self._new_body += b"\r\n"
self._new_body += value
self._new_body += b"--%s--\r\n" % self._multipart_boundary
def _handle_method(self, *args, **kwargs):
"""
Handler for any request method, takes care of defining the new request body if necessary and forwarding
the current request and changed body to the `fallback`.
"""
# determine which body to supply
body = b""
if self.is_multipart():
# make sure we really processed all data in the buffer
while len(self._buffer):
self._process_multipart_data(self._buffer)
# use rewritten body
body = self._new_body
elif self.request.method in UploadStorageFallbackHandler.BODY_METHODS:
# directly use data from buffer
body = self._buffer
# rewrite content length
self.request.headers["Content-Length"] = len(body)
try:
# call the configured fallback with request and body to use
self._fallback(self.request, body)
finally:
# make sure the temporary files are removed again
for f in self._files:
util.silentRemove(f)
# make all http methods trigger _handle_method
get = _handle_method
post = _handle_method
put = _handle_method
patch = _handle_method
delete = _handle_method
head = _handle_method
options = _handle_method
@stream_request_body
@ -294,39 +604,88 @@ class StreamingFallbackHandler(FallbackHandler):
(r".*", StreamingFallbackHandler, dict(fallback=wsgi_app),
])
"""
def initialize(self, fallback):
self.fallback = fallback
def data_received(self, chunk):
self._tmpfile.write(chunk)
self._length_left -= len(chunk)
if self._length_left <= 0:
self.finished_body()
NO_BODY_METHODS = ("GET", "HEAD", "OPTIONS")
def finished_body(self):
self._tmpfile.seek(0)
def initialize(self, fallback, max_size=50*1024*1024*1024, file_prefix="tmp"):
self._fallback = fallback
self._max_size = max_size
self._file_prefix = file_prefix
def prepare(self):
self._length_left = int(self.request.headers.get("Content-Length", 0))
self._tmpfile = TemporaryFile()
if self.request.method not in StreamingFallbackHandler.NO_BODY_METHODS:
import tempfile
self._length_left = int(self.request.headers.get("Content-Length", 0))
self._tempfile = tempfile.TemporaryFile(prefix=self._file_prefix)
else:
import io
self._fallback(self.request, io.BytesIO())
self._finished = True
def __getattribute__(self, name):
if name in ("get", "post", "put", "patch"):
try:
body_stream = self._tmpfile
self.fallback(self.request, body_stream)
finally:
self._tmpfile.close()
return lambda *args, **kwargs: None
return object.__getattribute__(self, name)
def data_received(self, chunk):
if self.request.method not in StreamingFallbackHandler.NO_BODY_METHODS:
self._tempfile.write(chunk)
self._length_left -= len(chunk)
if self._length_left <= 0:
self.finished_body()
def finished_body(self):
self._tempfile.seek(0)
def get(self, *args, **kwargs):
return self._handle_method()
def post(self, *args, **kwargs):
return self._handle_method()
def put(self, *args, **kwargs):
return self._handle_method()
def delete(self, *args, **kwargs):
return self._handle_method()
def head(self, *args, **kwargs):
return self._handle_method()
def options(self, *args, **kwargs):
return self._handle_method()
def _handle_method(self):
try:
self._fallback(self.request, self._tempfile)
finally:
self._tempfile.close()
class StreamedWsgiContainer(object):
class WsgiInputContainer(object):
"""
A WSGI container for use with Tornado that allows supplying the request body to be used for `wsgi.input` in the
generated WSGI environment upon call.
A `RequestHandler` can thus provide the WSGI application with a stream for the request body, or a modified body.
Example usage:
wsgi_app = octoprint.server.util.WsgiInputContainer(octoprint_app)
application = tornado.web.Application([
(r".*", UploadStorageFallbackHandler, dict(fallback=wsgi_app),
])
The implementation logic is basically the same as `tornado.wsgi.WSGIContainer` but the `__call__` and `environ`
"""
def __init__(self, wsgi_application):
self.wsgi_application = wsgi_application
def __call__(self, request, body_stream):
def __call__(self, request, body=None):
"""
Wraps the call against the WSGI app, deriving the WSGI environment from the supplied Tornado `HTTPServerRequest`.
:param request: the `tornado.httpserver.HTTPServerRequest` to derive the WSGI environment from
:param body: an optional body to use as `wsgi.input` instead of `request.body`, can be a string or a stream
"""
data = {}
response = []
@ -335,7 +694,7 @@ class StreamedWsgiContainer(object):
data["headers"] = response_headers
return response.append
app_response = self.wsgi_application(
StreamedWsgiContainer.environ(request, body_stream), start_response)
WsgiInputContainer.environ(request, body), start_response)
try:
response.extend(app_response)
body = b"".join(response)
@ -367,13 +726,29 @@ class StreamedWsgiContainer(object):
self._log(status_code, request)
@staticmethod
def environ(request, body_stream):
"""Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment.
def environ(request, body=None):
"""
Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment.
An optional `body` to be used for populating `wsgi.input` can be supplied (either a string or a stream). If not
supplied, `request.body` will be wrapped into a `io.BytesIO` stream and used instead.
:param request: the `tornado.httpserver.HTTPServerRequest` to derive the WSGI environment from
:param body: an optional body to use as `wsgi.input` instead of `request.body`, can be a string or a stream
"""
from tornado.wsgi import to_wsgi_str
import sys
import io
# determine the request_body to supply as wsgi.input
if body is not None:
if isinstance(body, (bytes, str)):
request_body = io.BytesIO(tornado.escape.utf8(body))
else:
request_body = body
else:
request_body = io.BytesIO(tornado.escape.utf8(request.body))
hostport = request.host.split(":")
if len(hostport) == 2:
host = hostport[0]
@ -393,7 +768,7 @@ class StreamedWsgiContainer(object):
"SERVER_PROTOCOL": request.version,
"wsgi.version": (1, 0),
"wsgi.url_scheme": request.protocol,
"wsgi.input": body_stream,
"wsgi.input": request_body,
"wsgi.errors": sys.stderr,
"wsgi.multithread": False,
"wsgi.multiprocess": True,