Trying to get large file uploads to work

This commit is contained in:
Gina Häußge 2014-08-02 10:24:41 +02:00
parent 8aab0ee318
commit 1ef9c617b3
4 changed files with 328 additions and 34 deletions

View file

@ -1,6 +1,6 @@
flask==0.9
werkzeug==0.8.3
tornado==3.0.2
tornado>=4.0
sockjs-tornado>=1.0.0
PyYAML==3.10
Flask-Login==0.2.2

View file

@ -34,7 +34,8 @@ user_permission = Permission(RoleNeed("user"))
# only import the octoprint stuff down here, as it might depend on things defined above to be initialized already
from octoprint.server.util import LargeResponseHandler, ReverseProxied, restricted_access, PrinterStateConnection, admin_validator, \
UrlForwardHandler, user_validator, GcodeWatchdogHandler, UploadCleanupWatchdogHandler
UrlForwardHandler, user_validator, GcodeWatchdogHandler, UploadCleanupWatchdogHandler, LargeUploadHandler, \
access_validation_factory, StreamedWsgiContainer, StreamingFallbackHandler
from octoprint.printer import Printer, getConnectionOptions
from octoprint.settings import settings
import octoprint.gcodefiles as gcodefiles
@ -106,8 +107,8 @@ class Server():
self._debug = debug
self._allowRoot = allowRoot
self._logConf = logConf
self._server = None
def run(self):
if not self._allowRoot:
self._checkForRoot()
@ -180,35 +181,15 @@ class Server():
self._router = SockJSRouter(self._createSocketConnection, "/sockjs")
def access_validation_factory(validator):
"""
Creates an access validation wrapper using the supplied validator.
:param validator: the access validator to use inside the validation wrapper
:return: an access validation wrapper taking a request as parameter and performing the request validation
"""
def f(request):
"""
Creates a custom wsgi and Flask request context in order to be able to process user information
stored in the current session.
:param request: The Tornado request for which to create the environment and context
"""
wsgi_environ = tornado.wsgi.WSGIContainer.environ(request)
with app.request_context(wsgi_environ):
app.session_interface.open_session(app, flask.request)
loginManager.reload_user()
validator(flask.request)
return f
self._tornado_app = Application(self._router.urls + [
(r"/downloads/timelapse/([^/]*\.mpg)", LargeResponseHandler, {"path": settings().getBaseFolder("timelapse"), "as_attachment": True}),
(r"/downloads/files/local/([^/]*\.(gco|gcode))", LargeResponseHandler, {"path": settings().getBaseFolder("uploads"), "as_attachment": True}),
(r"/downloads/logs/([^/]*)", LargeResponseHandler, {"path": settings().getBaseFolder("logs"), "as_attachment": True, "access_validation": access_validation_factory(admin_validator)}),
(r"/downloads/camera/current", UrlForwardHandler, {"url": settings().get(["webcam", "snapshot"]), "as_attachment": True, "access_validation": access_validation_factory(user_validator)}),
(r".*", FallbackHandler, {"fallback": WSGIContainer(app.wsgi_app)})
(r"/downloads/timelapse/([^/]*\.mpg)", LargeResponseHandler, dict(path=settings().getBaseFolder("timelapse"), as_attachment=True)),
(r"/downloads/files/local/([^/]*\.(gco|gcode))", LargeResponseHandler, dict(path=settings().getBaseFolder("uploads"), as_attachment=True)),
(r"/downloads/logs/([^/]*)", LargeResponseHandler, dict(path=settings().getBaseFolder("logs"), as_attachment=True, access_validation=access_validation_factory(app, loginManager, admin_validator))),
(r"/downloads/camera/current", UrlForwardHandler, dict(url=settings().get(["webcam", "snapshot"]), as_attachment=True, access_validation=access_validation_factory(app, loginManager, user_validator))),
(r"/uploads/files/([^/]*)", LargeUploadHandler, dict(access_validation=access_validation_factory(app, loginManager, user_validator))),
(r".*", StreamingFallbackHandler, dict(fallback=StreamedWsgiContainer(app.wsgi_app)))
])
self._server = HTTPServer(self._tornado_app)
self._server = HTTPServer(self._tornado_app, max_body_size=1*1024*1024*1024)
self._server.listen(self._port, address=self._host)
eventManager.fire(events.Events.STARTUP)

View file

@ -4,7 +4,7 @@ from octoprint.events import Events
__author__ = "Gina Häußge <osd@foosel.net>"
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
from flask import request, jsonify, make_response, url_for
from flask import request, jsonify, make_response, url_for, redirect
import octoprint.gcodefiles as gcodefiles
import octoprint.util as util
@ -188,6 +188,7 @@ def uploadGcodeFile(target):
r = make_response(jsonify(files=files, done=done), 201)
r.headers["Location"] = location
#return redirect(url_for("index", _external=True) + "uploads/files/" + target, code=307)
return r

View file

@ -1,11 +1,16 @@
# coding=utf-8
from io import FileIO
from tempfile import NamedTemporaryFile, TemporaryFile
from tornado.httputil import HTTPHeaders
from tornado.wsgi import WSGIContainer
from octoprint.filemanager.destinations import FileDestinations
__author__ = "Gina Häußge <osd@foosel.net>"
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
from flask.ext.principal import identity_changed, Identity
from tornado.web import StaticFileHandler, HTTPError, RequestHandler, asynchronous
from tornado.web import StaticFileHandler, HTTPError, RequestHandler, asynchronous, stream_request_body, FallbackHandler
import tornado.escape, tornado.httputil
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
from flask import url_for, make_response, request, current_app
from flask.ext.login import login_required, login_user, current_user
@ -232,6 +237,313 @@ class PrinterStateConnection(SockJSConnection):
self.send({type: payload})
@stream_request_body
class StreamingFallbackHandler(FallbackHandler):
"""A `RequestHandler` that wraps another HTTP server callback.
The fallback is a callable object that accepts an
`~.httputil.HTTPServerRequest`, such as an `Application` or
`tornado.wsgi.WSGIContainer`. This is most useful to use both
Tornado ``RequestHandlers`` and WSGI in the same server. Typical
usage::
wsgi_app = tornado.wsgi.WSGIContainer(
django.core.handlers.wsgi.WSGIHandler())
application = tornado.web.Application([
(r"/foo", FooHandler),
(r".*", FallbackHandler, dict(fallback=wsgi_app),
])
"""
def initialize(self, fallback):
self.fallback = fallback
self._length_left = int(self.request.headers.get("Content-Length", 0))
self._tmpfile = TemporaryFile()
def data_received(self, chunk):
self._tmpfile.write(chunk)
self._length_left -= len(chunk)
if self._length_left <= 0:
self.finished_body()
def finished_body(self):
self._tmpfile.seek(0)
try:
body_stream = self._tmpfile
self.fallback(self.request, body_stream)
finally:
self._tmpfile.close()
def prepare(self):
pass
def __getattribute__(self, name):
if name in ("get", "post", "put", "patch"):
return lambda *args, **kwargs: None
return object.__getattribute__(self, name)
class StreamedWsgiContainer(object):
def __init__(self, wsgi_application):
self.wsgi_application = wsgi_application
def __call__(self, request, body_stream):
data = {}
response = []
def start_response(status, response_headers, exc_info=None):
data["status"] = status
data["headers"] = response_headers
return response.append
app_response = self.wsgi_application(
StreamedWsgiContainer.environ(request, body_stream), start_response)
try:
response.extend(app_response)
body = b"".join(response)
finally:
if hasattr(app_response, "close"):
app_response.close()
if not data:
raise Exception("WSGI app did not call start_response")
status_code = int(data["status"].split()[0])
headers = data["headers"]
header_set = set(k.lower() for (k, v) in headers)
body = tornado.escape.utf8(body)
if status_code != 304:
if "content-length" not in header_set:
headers.append(("Content-Length", str(len(body))))
if "content-type" not in header_set:
headers.append(("Content-Type", "text/html; charset=UTF-8"))
if "server" not in header_set:
headers.append(("Server", "TornadoServer/%s" % tornado.version))
parts = [tornado.escape.utf8("HTTP/1.1 " + data["status"] + "\r\n")]
for key, value in headers:
parts.append(tornado.escape.utf8(key) + b": " + tornado.escape.utf8(value) + b"\r\n")
parts.append(b"\r\n")
parts.append(body)
request.write(b"".join(parts))
request.finish()
self._log(status_code, request)
@staticmethod
def environ(request, body_stream):
"""Converts a `tornado.httputil.HTTPServerRequest` to a WSGI environment.
"""
from tornado.wsgi import to_wsgi_str
import sys
import io
hostport = request.host.split(":")
if len(hostport) == 2:
host = hostport[0]
port = int(hostport[1])
else:
host = request.host
port = 443 if request.protocol == "https" else 80
environ = {
"REQUEST_METHOD": request.method,
"SCRIPT_NAME": "",
"PATH_INFO": to_wsgi_str(tornado.escape.url_unescape(
request.path, encoding=None, plus=False)),
"QUERY_STRING": request.query,
"REMOTE_ADDR": request.remote_ip,
"SERVER_NAME": host,
"SERVER_PORT": str(port),
"SERVER_PROTOCOL": request.version,
"wsgi.version": (1, 0),
"wsgi.url_scheme": request.protocol,
"wsgi.input": body_stream,
"wsgi.errors": sys.stderr,
"wsgi.multithread": False,
"wsgi.multiprocess": True,
"wsgi.run_once": False,
}
if "Content-Type" in request.headers:
environ["CONTENT_TYPE"] = request.headers.pop("Content-Type")
if "Content-Length" in request.headers:
environ["CONTENT_LENGTH"] = request.headers.pop("Content-Length")
for key, value in request.headers.items():
environ["HTTP_" + key.replace("-", "_").upper()] = value
return environ
def _log(self, status_code, request):
access_log = logging.getLogger("tornado.access")
if status_code < 400:
log_method = access_log.info
elif status_code < 500:
log_method = access_log.warning
else:
log_method = access_log.error
request_time = 1000.0 * request.request_time()
summary = request.method + " " + request.uri + " (" + \
request.remote_ip + ")"
log_method("%d %s %.2fms", status_code, summary, request_time)
def access_validation_factory(app, login_manager, validator):
"""
Creates an access validation wrapper using the supplied validator.
:param validator: the access validator to use inside the validation wrapper
:return: an access validation wrapper taking a request as parameter and performing the request validation
"""
def f(request):
"""
Creates a custom wsgi and Flask request context in order to be able to process user information
stored in the current session.
:param request: The Tornado request for which to create the environment and context
"""
import flask
import io
wsgi_environ = StreamedWsgiContainer.environ(request, io.BytesIO(""))
with app.request_context(wsgi_environ):
app.session_interface.open_session(app, flask.request)
login_manager.reload_user()
validator(flask.request)
return f
@stream_request_body
class LargeUploadHandler(RequestHandler):
def initialize(self, files_only=False, access_validation=None):
self.files_only = files_only
self._access_validation = access_validation
# Parts information will be stored here
self.parts = dict()
# Part currently being processed
self._current_part = None
# bytes left to process in the stream
self._bytes_left = int(self.request.headers.get("Content-Length", 0))
# buffer needed for identifying form data parts
self._buffer = ""
# we only support multipart/form-data content, so check for that
content_type = self.request.headers.get("Content-Type", "")
if not self._bytes_left or not content_type.startswith("multipart"):
raise HTTPError(405)
# extract the multipart boundary
fields = content_type.split(";")
for field in fields:
k, sep, v = field.strip().partition("=")
if k == "boundary" and v:
if v.startswith(b'"') and v.endswith(b'"'):
self.boundary = tornado.escape.utf8(v[1:-1])
else:
self.boundary = tornado.escape.utf8(v)
break
else:
raise HTTPError(400)
def data_received(self, chunk):
self._bytes_left -= len(chunk)
data = self._buffer + chunk
# check for boundary
delimiter = b"--%s" % self.boundary
delimiter_loc = data.find(delimiter)
delimiter_len = len(delimiter)
end_of_header = None
if delimiter_loc != -1:
# found the delimiter in the currently available data
data, self._buffer = data[0:delimiter], data[delimiter:]
end_of_header = self._buffer.find("\r\n\r\n")
else:
# make sure any boundary (with single or double ==) contained at the end of chunk does not get
# truncated by this processing round => save it to the buffer for next round
endlen = len(self.boundary) + 4
data, self._buffer = data[0:-endlen], data[-endlen:]
# stream data to part handler
if data:
if self._current_part:
self.ondata(self._current_part, data)
if end_of_header >= 0:
self._header(self._buffer[delimiter_len+2:end_of_header])
self._buffer = self._buffer[end_of_header + 4]
if delimiter_loc != -1 and self._buffer[delimiter_len:delimiter_len+2] == "--":
# we saw the last boundary and are at the end of our request
if self._current_part:
self.onclose(self._current_part)
self._current_part = None
return self.onfinish()
def _header(self, header):
# close any open parts
if self._current_part:
self.onclose(self._current_part)
self._current_part = None
header_check = header.find(self.boundary)
if header_check != -1:
# TODO log warning
header = header[header_check:]
# convert to dict
header = HTTPHeaders.parse(header.decode("utf-8"))
disp_header = header.get("Content-Disposition", "")
disposition, disp_params = tornado.httputil._parse_header(disp_header)
next = False
if disposition != "form-data":
# TODO log warning
return
if not disp_params.get("name"):
# TODO log warning
return
if self.files_only and "filename" not in disp_params:
# TODO log warning
return
else:
self._current_part = self.onpart(disp_params["name"], header.get("Content-Type", None), filename=disp_params["filename"] if self._files_only else None)
def onpart(self, name, content_type, filename=None):
if content_type is None:
# we got a key-value-pair
return dict(name=name, value=b"")
elif content_type == "application/octet-stream":
# this is a file
tmpfile = NamedTemporaryFile()
return dict(name=name, filename=filename, file=tmpfile)
else:
return dict()
def ondata(self, part, data):
if "value" in part:
part["value"] += data
elif "file" in part:
part["file"].write(data)
def onclose(self, part):
for part in self.parts:
if "value" in part:
self._get_argument()
self.parts[part["name"]] = part
def onfinish(self):
pass
def post(self, *args, **kwargs):
if self._access_validation is not None:
self._access_validation(self.request)
raise HTTPError(550, "Yay")
#~~ customized large response handler
@ -479,7 +791,7 @@ class ReverseProxied(object):
return self.app(environ, start_response)
def redirectToTornado(request, target):
def redirectToTornado(request, target, code=302):
requestUrl = request.url
appBaseUrl = requestUrl[:requestUrl.find(url_for("index") + "api")]
@ -487,7 +799,7 @@ def redirectToTornado(request, target):
if "?" in requestUrl:
fragment = requestUrl[requestUrl.rfind("?"):]
redirectUrl += fragment
return redirect(redirectUrl)
return redirect(redirectUrl, code=code)
class UploadCleanupWatchdogHandler(PatternMatchingEventHandler):