Working streaming file upload for now

This commit is contained in:
Gina Häußge 2014-08-03 18:53:10 +02:00
parent 1ef9c617b3
commit d770c20b5a
3 changed files with 244 additions and 149 deletions

View file

@ -34,8 +34,8 @@ user_permission = Permission(RoleNeed("user"))
# only import the octoprint stuff down here, as it might depend on things defined above to be initialized already
from octoprint.server.util import LargeResponseHandler, ReverseProxied, restricted_access, PrinterStateConnection, admin_validator, \
UrlForwardHandler, user_validator, GcodeWatchdogHandler, UploadCleanupWatchdogHandler, LargeUploadHandler, \
access_validation_factory, StreamedWsgiContainer, StreamingFallbackHandler
UrlForwardHandler, user_validator, GcodeWatchdogHandler, UploadCleanupWatchdogHandler, \
access_validation_factory, StreamedWsgiContainer, StreamingFallbackHandler, PrintableFilesUploadHandler
from octoprint.printer import Printer, getConnectionOptions
from octoprint.settings import settings
import octoprint.gcodefiles as gcodefiles
@ -186,8 +186,8 @@ class Server():
(r"/downloads/files/local/([^/]*\.(gco|gcode))", LargeResponseHandler, dict(path=settings().getBaseFolder("uploads"), as_attachment=True)),
(r"/downloads/logs/([^/]*)", LargeResponseHandler, dict(path=settings().getBaseFolder("logs"), as_attachment=True, access_validation=access_validation_factory(app, loginManager, admin_validator))),
(r"/downloads/camera/current", UrlForwardHandler, dict(url=settings().get(["webcam", "snapshot"]), as_attachment=True, access_validation=access_validation_factory(app, loginManager, user_validator))),
(r"/uploads/files/([^/]*)", LargeUploadHandler, dict(access_validation=access_validation_factory(app, loginManager, user_validator))),
(r".*", StreamingFallbackHandler, dict(fallback=StreamedWsgiContainer(app.wsgi_app)))
(r"/api/files/([^/]*)", PrintableFilesUploadHandler, dict(path=settings().getBaseFolder("uploads"), access_validation=access_validation_factory(app, loginManager, user_validator))),
(r".*", FallbackHandler, dict(fallback=WSGIContainer(app.wsgi_app)))
])
self._server = HTTPServer(self._tornado_app, max_body_size=1*1024*1024*1024)
self._server.listen(self._port, address=self._host)

View file

@ -86,110 +86,7 @@ def _verifyFileExists(origin, filename):
@api.route("/files/<string:target>", methods=["POST"])
@restricted_access
def uploadGcodeFile(target):
if not target in [FileDestinations.LOCAL, FileDestinations.SDCARD]:
return make_response("Unknown target: %s" % target, 404)
if not "file" in request.files.keys():
return make_response("No file included", 400)
if target == FileDestinations.SDCARD and not settings().getBoolean(["feature", "sdSupport"]):
return make_response("SD card support is disabled", 404)
file = request.files["file"]
sd = target == FileDestinations.SDCARD
selectAfterUpload = "select" in request.values.keys() and request.values["select"] in valid_boolean_trues
printAfterSelect = "print" in request.values.keys() and request.values["print"] in valid_boolean_trues
if sd:
# validate that all preconditions for SD upload are met before attempting it
if not (printer.isOperational() and not (printer.isPrinting() or printer.isPaused())):
return make_response("Can not upload to SD card, printer is either not operational or already busy", 409)
if not printer.isSdReady():
return make_response("Can not upload to SD card, not yet initialized", 409)
# determine current job
currentFilename = None
currentOrigin = None
currentJob = printer.getCurrentJob()
if currentJob is not None and "file" in currentJob.keys():
currentJobFile = currentJob["file"]
if "name" in currentJobFile.keys() and "origin" in currentJobFile.keys():
currentFilename = currentJobFile["name"]
currentOrigin = currentJobFile["origin"]
# determine future filename of file to be uploaded, abort if it can't be uploaded
futureFilename = gcodeManager.getFutureFilename(file)
if futureFilename is None or (not settings().getBoolean(["cura", "enabled"]) and not gcodefiles.isGcodeFileName(futureFilename)):
return make_response("Can not upload file %s, wrong format?" % file.filename, 415)
# prohibit overwriting currently selected file while it's being printed
if futureFilename == currentFilename and target == currentOrigin and printer.isPrinting() or printer.isPaused():
return make_response("Trying to overwrite file that is currently being printed: %s" % currentFilename, 409)
def fileProcessingFinished(filename, absFilename, destination):
"""
Callback for when the file processing (upload, optional slicing, addition to analysis queue) has
finished.
Depending on the file's destination triggers either streaming to SD card or directly calls selectAndOrPrint.
"""
if destination == FileDestinations.SDCARD:
return filename, printer.addSdFile(filename, absFilename, selectAndOrPrint)
else:
selectAndOrPrint(filename, absFilename, destination)
return filename
def selectAndOrPrint(filename, absFilename, destination):
"""
Callback for when the file is ready to be selected and optionally printed. For SD file uploads this is only
the case after they have finished streaming to the printer, which is why this callback is also used
for the corresponding call to addSdFile.
Selects the just uploaded file if either selectAfterUpload or printAfterSelect are True, or if the
exact file is already selected, such reloading it.
"""
if selectAfterUpload or printAfterSelect or (currentFilename == filename and currentOrigin == destination):
printer.selectFile(absFilename, destination == FileDestinations.SDCARD, printAfterSelect)
filename, done = gcodeManager.addFile(file, target, fileProcessingFinished)
if filename is None:
return make_response("Could not upload the file %s" % file.filename, 500)
sdFilename = None
if isinstance(filename, tuple):
filename, sdFilename = filename
eventManager.fire(Events.UPLOAD, {"file": filename, "target": target})
files = {}
location = url_for(".readGcodeFile", target=FileDestinations.LOCAL, filename=filename, _external=True)
files.update({
FileDestinations.LOCAL: {
"name": filename,
"origin": FileDestinations.LOCAL,
"refs": {
"resource": location,
"download": url_for("index", _external=True) + "downloads/files/" + FileDestinations.LOCAL + "/" + filename
}
}
})
if sd and sdFilename:
location = url_for(".readGcodeFile", target=FileDestinations.SDCARD, filename=sdFilename, _external=True)
files.update({
FileDestinations.SDCARD: {
"name": sdFilename,
"origin": FileDestinations.SDCARD,
"refs": {
"resource": location
}
}
})
r = make_response(jsonify(files=files, done=done), 201)
r.headers["Location"] = location
#return redirect(url_for("index", _external=True) + "uploads/files/" + target, code=307)
return r
return redirect(url_for("index", _external=True) + "uploads/files/" + target, code=307)
@api.route("/files/<string:target>/<path:filename>", methods=["GET"])

View file

@ -1,8 +1,6 @@
# coding=utf-8
from io import FileIO
from tempfile import NamedTemporaryFile, TemporaryFile
from tempfile import TemporaryFile
from tornado.httputil import HTTPHeaders
from tornado.wsgi import WSGIContainer
from octoprint.filemanager.destinations import FileDestinations
__author__ = "Gina Häußge <osd@foosel.net>"
@ -28,7 +26,7 @@ import logging
from functools import wraps
from watchdog.events import PatternMatchingEventHandler
from octoprint.settings import settings
from octoprint.settings import settings, valid_boolean_trues
import octoprint.timelapse
import octoprint.server
from octoprint.users import ApiUser
@ -237,29 +235,68 @@ class PrinterStateConnection(SockJSConnection):
self.send({type: payload})
@stream_request_body
class UploadStorageFallbackHandler(RequestHandler):
def initialize(self, delegate, folder):
self.delegate = delegate
self.folder = folder
# Parts information will be stored here
self.parts = dict()
# files will be stored here
self.files = dict()
# values will be stored here
self.values = dict()
# Part currently being processed
self._current_part = None
# buffer needed for identifying form data parts
self._buffer = b""
# we only support multipart/form-data content, so check for that
self._bytes_left = self.request.headers.get("Content-Length", 0)
content_type = self.request.headers.get("Content-Type", "")
if not self._bytes_left or not content_type.startswith("multipart"):
raise HTTPError(405)
# extract the multipart boundary
fields = content_type.split(";")
for field in fields:
k, sep, v = field.strip().partition("=")
if k == "boundary" and v:
if v.startswith(b'"') and v.endswith(b'"'):
self.boundary = tornado.escape.utf8(v[1:-1])
else:
self.boundary = tornado.escape.utf8(v)
break
else:
raise HTTPError(400)
@stream_request_body
class StreamingFallbackHandler(FallbackHandler):
"""A `RequestHandler` that wraps another HTTP server callback.
The fallback is a callable object that accepts an
`~.httputil.HTTPServerRequest`, such as an `Application` or
`tornado.wsgi.WSGIContainer`. This is most useful to use both
`octoprint.server.util.StreamedWSGIContainer`. This is most useful to use both
Tornado ``RequestHandlers`` and WSGI in the same server. Typical
usage::
wsgi_app = tornado.wsgi.WSGIContainer(
wsgi_app = octoprint.server.util.StreamedWSGIContainer(
django.core.handlers.wsgi.WSGIHandler())
application = tornado.web.Application([
(r"/foo", FooHandler),
(r".*", FallbackHandler, dict(fallback=wsgi_app),
(r".*", StreamingFallbackHandler, dict(fallback=wsgi_app),
])
"""
def initialize(self, fallback):
self.fallback = fallback
self._length_left = int(self.request.headers.get("Content-Length", 0))
self._tmpfile = TemporaryFile()
def data_received(self, chunk):
self._tmpfile.write(chunk)
self._length_left -= len(chunk)
@ -268,17 +305,18 @@ class StreamingFallbackHandler(FallbackHandler):
def finished_body(self):
self._tmpfile.seek(0)
try:
body_stream = self._tmpfile
self.fallback(self.request, body_stream)
finally:
self._tmpfile.close()
def prepare(self):
pass
self._length_left = int(self.request.headers.get("Content-Length", 0))
self._tmpfile = TemporaryFile()
def __getattribute__(self, name):
if name in ("get", "post", "put", "patch"):
try:
body_stream = self._tmpfile
self.fallback(self.request, body_stream)
finally:
self._tmpfile.close()
return lambda *args, **kwargs: None
return object.__getattribute__(self, name)
@ -399,9 +437,9 @@ def access_validation_factory(app, login_manager, validator):
:param request: The Tornado request for which to create the environment and context
"""
import flask
import io
import tornado.wsgi
wsgi_environ = StreamedWsgiContainer.environ(request, io.BytesIO(""))
wsgi_environ = tornado.wsgi.WSGIContainer.environ(request)
with app.request_context(wsgi_environ):
app.session_interface.open_session(app, flask.request)
login_manager.reload_user()
@ -410,25 +448,32 @@ def access_validation_factory(app, login_manager, validator):
@stream_request_body
class LargeUploadHandler(RequestHandler):
class PrintableFilesUploadHandler(RequestHandler):
def initialize(self, files_only=False, access_validation=None):
self.files_only = files_only
def initialize(self, path, postfix=".tmp", files_only=False, access_validation=None):
self._path = path
self._postfix = postfix
self._files_only = files_only
self._access_validation = access_validation
# Parts information will be stored here
self.parts = dict()
# files will be stored here
self.files = dict()
# values will be stored here
self.values = dict()
# Part currently being processed
self._current_part = None
# bytes left to process in the stream
self._bytes_left = int(self.request.headers.get("Content-Length", 0))
# buffer needed for identifying form data parts
self._buffer = ""
self._buffer = b""
# we only support multipart/form-data content, so check for that
bytes_left = self.request.headers.get("Content-Length", 0)
content_type = self.request.headers.get("Content-Type", "")
if not self._bytes_left or not content_type.startswith("multipart"):
if not bytes_left or not content_type.startswith("multipart"):
raise HTTPError(405)
# extract the multipart boundary
@ -445,9 +490,10 @@ class LargeUploadHandler(RequestHandler):
raise HTTPError(400)
def data_received(self, chunk):
self._bytes_left -= len(chunk)
data = self._buffer + chunk
self.process_data(data)
def process_data(self, data):
# check for boundary
delimiter = b"--%s" % self.boundary
delimiter_loc = data.find(delimiter)
@ -455,7 +501,7 @@ class LargeUploadHandler(RequestHandler):
end_of_header = None
if delimiter_loc != -1:
# found the delimiter in the currently available data
data, self._buffer = data[0:delimiter], data[delimiter:]
data, self._buffer = data[0:delimiter_loc], data[delimiter_loc:]
end_of_header = self._buffer.find("\r\n\r\n")
else:
# make sure any boundary (with single or double ==) contained at the end of chunk does not get
@ -470,14 +516,15 @@ class LargeUploadHandler(RequestHandler):
if end_of_header >= 0:
self._header(self._buffer[delimiter_len+2:end_of_header])
self._buffer = self._buffer[end_of_header + 4]
self._buffer = self._buffer[end_of_header + 4:]
if delimiter_loc != -1 and self._buffer[delimiter_len:delimiter_len+2] == "--":
# we saw the last boundary and are at the end of our request
if self._current_part:
self.onclose(self._current_part)
self._current_part = None
return self.onfinish()
self._buffer = b""
self.onfinish()
def _header(self, header):
# close any open parts
@ -495,7 +542,6 @@ class LargeUploadHandler(RequestHandler):
disp_header = header.get("Content-Disposition", "")
disposition, disp_params = tornado.httputil._parse_header(disp_header)
next = False
if disposition != "form-data":
# TODO log warning
return
@ -503,20 +549,30 @@ class LargeUploadHandler(RequestHandler):
# TODO log warning
return
if self.files_only and "filename" not in disp_params:
if self._files_only and "filename" not in disp_params:
# TODO log warning
return
else:
self._current_part = self.onpart(disp_params["name"], header.get("Content-Type", None), filename=disp_params["filename"] if self._files_only else None)
self._current_part = self.onpart(disp_params["name"], header.get("Content-Type", None), filename=disp_params["filename"] if "filename" in disp_params else None)
def onpart(self, name, content_type, filename=None):
from octoprint.server import gcodeManager
if content_type is None:
# we got a key-value-pair
return dict(name=name, value=b"")
elif content_type == "application/octet-stream":
elif filename is not None:
# this is a file
tmpfile = NamedTemporaryFile()
return dict(name=name, filename=filename, file=tmpfile)
upload = Object()
upload.filename = filename
sane_filename = gcodeManager.getFutureFilename(upload)
if sane_filename is None:
return dict()
local_path = os.path.join(self._path, sane_filename + self._postfix)
handle = open(local_path, "wb")
return dict(name=name, filename=filename, sane_filename=sane_filename, content_type=content_type, local_path=local_path, file=handle)
else:
return dict()
@ -527,21 +583,161 @@ class LargeUploadHandler(RequestHandler):
part["file"].write(data)
def onclose(self, part):
for part in self.parts:
if "value" in part:
self._get_argument()
self.parts[part["name"]] = part
escaped_name = tornado.escape.utf8(part["name"])
self.parts[escaped_name] = part
if "file" in part:
part["file"].close()
del part["file"]
self.files[escaped_name] = part
elif "value" in part:
escaped_value = tornado.escape.utf8(part["value"])
self.values[escaped_name] = escaped_value
self.request.body_arguments.setdefault(escaped_name, []).append(escaped_value)
def onfinish(self):
pass
# we now do something horrible and replace the body by a version stripped of all files. Yes, I feel bad for this
import io
new_body = b""
for name, value in self.values.iteritems():
new_body += b"--%s\r\n" % self.boundary
new_body += b"Content-Disposition: form-data; name=\"%s\"\r\n\r\n" % name
new_body += value
new_body += b"--%s--\r\n" % self.boundary
self.request.body = new_body
self.request.headers["Content-Length"] = len(new_body)
def post(self, *args, **kwargs):
while len(self._buffer):
self.process_data(self._buffer)
from octoprint.server import gcodeManager, printer, eventManager
if self._access_validation is not None:
self._access_validation(self.request)
target = self.request.path.split("/")[-1]
if not target in [FileDestinations.LOCAL, FileDestinations.SDCARD]:
self.set_status(404, reason="Unknown target: %s" % target)
return
raise HTTPError(550, "Yay")
if not "file" in self.files:
self.set_status(400, reason="No file included")
return
if target == FileDestinations.SDCARD and not settings().getBoolean(["feature", "sdSupport"]):
self.set_status(404, reason="SD card support is disabled")
return
import octoprint.util
upload = Object()
upload.filename = self.files["file"]["filename"]
upload.sane_filename = self.files["file"]["sane_filename"]
upload.save = lambda new_name: octoprint.util.safeRename(self.files["file"]["local_path"], new_name)
sd = target == FileDestinations.SDCARD
selectAfterUpload = "select" in self.values and self.values["select"] in valid_boolean_trues
printAfterSelect = "print" in self.values and self.values["print"] in valid_boolean_trues
if sd:
# validate that all preconditions for SD upload are met before attempting it
if not (printer.isOperational() and not (printer.isPrinting() or printer.isPaused())):
self.set_status(409, reason="Can not upload to SD card, printer is either not operational or already busy")
return
if not printer.isSdReady():
self.set_status(409, "Can not upload to SD card, not yet initialized")
return
# determine current job
currentFilename = None
currentOrigin = None
currentJob = printer.getCurrentJob()
if currentJob is not None and "file" in currentJob.keys():
currentJobFile = currentJob["file"]
if "name" in currentJobFile.keys() and "origin" in currentJobFile.keys():
currentFilename = currentJobFile["name"]
currentOrigin = currentJobFile["origin"]
# determine future filename of file to be uploaded, abort if it can't be uploaded
futureFilename = upload.sane_filename
if futureFilename is None or (not settings().getBoolean(["cura", "enabled"]) and not gcodefiles.isGcodeFileName(futureFilename)):
self.set_status(415, reason="Can not upload file %s, wrong format?" % upload.filename)
return
# prohibit overwriting currently selected file while it's being printed
if futureFilename == currentFilename and target == currentOrigin and printer.isPrinting() or printer.isPaused():
self.set_status(409, reason="Trying to overwrite file that is currently being printed: %s" % currentFilename)
return
def fileProcessingFinished(filename, absFilename, destination):
"""
Callback for when the file processing (upload, optional slicing, addition to analysis queue) has
finished.
Depending on the file's destination triggers either streaming to SD card or directly calls selectAndOrPrint.
"""
if destination == FileDestinations.SDCARD:
return filename, printer.addSdFile(filename, absFilename, selectAndOrPrint)
else:
selectAndOrPrint(filename, absFilename, destination)
return filename
def selectAndOrPrint(filename, absFilename, destination):
"""
Callback for when the file is ready to be selected and optionally printed. For SD file uploads this is only
the case after they have finished streaming to the printer, which is why this callback is also used
for the corresponding call to addSdFile.
Selects the just uploaded file if either selectAfterUpload or printAfterSelect are True, or if the
exact file is already selected, such reloading it.
"""
if selectAfterUpload or printAfterSelect or (currentFilename == filename and currentOrigin == destination):
printer.selectFile(absFilename, destination == FileDestinations.SDCARD, printAfterSelect)
filename, done = gcodeManager.addFile(upload, target, fileProcessingFinished)
if filename is None:
return make_response("Could not upload the file %s" % upload.filename, 500)
sdFilename = None
if isinstance(filename, tuple):
filename, sdFilename = filename
eventManager.fire(Events.UPLOAD, {"file": filename, "target": target})
"""
files = {}
location = url_for(".readGcodeFile", target=FileDestinations.LOCAL, filename=filename, _external=True)
files.update({
FileDestinations.LOCAL: {
"name": filename,
"origin": FileDestinations.LOCAL,
"refs": {
"resource": location,
"download": url_for("index", _external=True) + "downloads/files/" + FileDestinations.LOCAL + "/" + filename
}
}
})
if sd and sdFilename:
location = url_for(".readGcodeFile", target=FileDestinations.SDCARD, filename=sdFilename, _external=True)
files.update({
FileDestinations.SDCARD: {
"name": sdFilename,
"origin": FileDestinations.SDCARD,
"refs": {
"resource": location
}
}
})
"""
import tornado.escape
self.set_status(201)
#self.set_header("Location", location)
self.finish(tornado.escape.json_encode(dict(files={}, done=done)))
#~~ customized large response handler
@ -875,3 +1071,5 @@ class GcodeWatchdogHandler(PatternMatchingEventHandler):
self._upload(event.src_path)
class Object(object):
pass