Also explicitely support ISO-8859-1 encoded multipart headers

Solves an issue with clients encoding filenames in multipart
 headers in ISO-8859-1, causing an HTTP 500 response code.

 This change makes ISO-8859-1 encoded headers work, sends
 a 400 Bad Request instead of 500 Internal Server Error if the
 request multipart headers cannot be decoded as either UTF-8
 or ISO-8859-1, defines UTF-8 content type for multipart text
 fields in rebuilt body and also adds support for RFC 5987 for the
 multipart file upload "filename" header component.
This commit is contained in:
Gina Häußge 2016-09-07 17:03:59 +02:00
parent 209a88bd6a
commit 13728c231c
2 changed files with 167 additions and 6 deletions

View file

@ -104,18 +104,22 @@ class UploadStorageFallbackHandler(tornado.web.RequestHandler):
true
------WebKitFormBoundarypYiSUx63abAmhT5C
Content-Disposition: form-data; name="file.path"
Content-Type: text/plain; charset=utf-8
/tmp/tmpzupkro
------WebKitFormBoundarypYiSUx63abAmhT5C
Content-Disposition: form-data; name="file.name"
Content-Type: text/plain; charset=utf-8
test.gcode
------WebKitFormBoundarypYiSUx63abAmhT5C
Content-Disposition: form-data; name="file.content_type"
Content-Type: text/plain; charset=utf-8
application/octet-stream
------WebKitFormBoundarypYiSUx63abAmhT5C
Content-Disposition: form-data; name="file.size"
Content-Type: text/plain; charset=utf-8
349182
------WebKitFormBoundarypYiSUx63abAmhT5C--
@ -272,9 +276,19 @@ class UploadStorageFallbackHandler(tornado.web.RequestHandler):
header = header[header_check:]
# convert to dict
header = tornado.httputil.HTTPHeaders.parse(header.decode("utf-8"))
try:
header = tornado.httputil.HTTPHeaders.parse(header.decode("utf-8"))
except UnicodeDecodeError:
try:
header = tornado.httputil.HTTPHeaders.parse(header.decode("iso-8859-1"))
except:
# looks like we couldn't decode something here neither as UTF-8 nor ISO-8859-1
self._logger.warn("Could not decode multipart headers in request, should be either UTF-8 or ISO-8859-1")
self.send_error(400)
return
disp_header = header.get("Content-Disposition", "")
disposition, disp_params = tornado.httputil._parse_header(disp_header)
disposition, disp_params = _parse_header(disp_header, strip_quotes=False)
if disposition != "form-data":
self._logger.warn("Got a multipart header without form-data content disposition, ignoring that one")
@ -283,7 +297,22 @@ class UploadStorageFallbackHandler(tornado.web.RequestHandler):
self._logger.warn("Got a multipart header without name, ignoring that one")
return
self._current_part = self._on_part_start(disp_params["name"], header.get("Content-Type", None), filename=disp_params["filename"] if "filename" in disp_params else None)
filename = disp_params.get("filename*", None) # RFC 5987 header present?
if filename is not None:
try:
filename = _extended_header_value(filename)
except:
# parse error, this is not RFC 5987 compliant after all
self._logger.warn("extended filename* value {!r} is not RFC 5987 compliant")
self.send_error(400)
return
else:
# no filename* header, just strip quotes from filename header then and be done
filename = _strip_value_quotes(disp_params.get("filename", None))
self._current_part = self._on_part_start(_strip_value_quotes(disp_params["name"]),
header.get("Content-Type", None),
filename=filename)
def _on_part_start(self, name, content_type, filename=None):
"""
@ -357,7 +386,7 @@ class UploadStorageFallbackHandler(tornado.web.RequestHandler):
"""
self._new_body = b""
for name, part in self._parts.iteritems():
for name, part in self._parts.items():
if "filename" in part:
# add form fields for filename, path, size and content_type for all files contained in the request
if not "path" in part:
@ -371,11 +400,12 @@ class UploadStorageFallbackHandler(tornado.web.RequestHandler):
if "content_type" in part:
parameters["content_type"] = part["content_type"]
fields = dict((self._suffixes[key], value) for (key, value) in parameters.iteritems())
for n, p in fields.iteritems():
fields = dict((self._suffixes[key], value) for (key, value) in parameters.items())
for n, p in fields.items():
key = name + "." + n
self._new_body += b"--%s\r\n" % self._multipart_boundary
self._new_body += b"Content-Disposition: form-data; name=\"%s\"\r\n" % key
self._new_body += b"Content-Type: text/plain; charset=utf-8\r\n"
self._new_body += b"\r\n"
self._new_body += b"%s\r\n" % p
elif "data" in part:
@ -430,6 +460,47 @@ class UploadStorageFallbackHandler(tornado.web.RequestHandler):
options = _handle_method
def _parse_header(line, strip_quotes=True):
parts = tornado.httputil._parseparam(';' + line)
key = next(parts)
pdict = {}
for p in parts:
i = p.find('=')
if i >= 0:
name = p[:i].strip().lower()
value = p[i + 1:].strip()
if strip_quotes:
value = _strip_value_quotes(value)
pdict[name] = value
return key, pdict
def _strip_value_quotes(value):
if not value:
return value
if len(value) >= 2 and value[0] == value[-1] == '"':
value = value[1:-1]
value = value.replace('\\\\', '\\').replace('\\"', '"')
return value
def _extended_header_value(value):
if not value:
return value
if value.lower().startswith("iso-8859-1'") or value.lower().startswith("utf-8'"):
# RFC 5987 section 3.2
from urllib import unquote
encoding, _, value = value.split("'", 2)
return unquote(value).decode(encoding)
else:
# no encoding provided, strip potentially present quotes and call it a day
return _strip_value_quotes(value)
class WsgiInputContainer(object):
"""
A WSGI container for use with Tornado that allows supplying the request body to be used for ``wsgi.input`` in the

View file

@ -0,0 +1,90 @@
# coding=utf-8
"""
Unit tests for ``octoprint.server.util.tornado``.
"""
from __future__ import absolute_import
__author__ = "Gina Häußge <osd@foosel.net>"
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
__copyright__ = "Copyright (C) 2016 The OctoPrint Project - Released under terms of the AGPLv3 License"
import unittest
import mock
from ddt import ddt, data, unpack
##~~ _parse_header
@ddt
class ParseHeaderTest(unittest.TestCase):
@data(
("form-data; filename=test.gco", "form-data", dict(filename="test.gco")),
("form-data; filename=\"test.gco\"", "form-data", dict(filename="test.gco")),
("form-data; filename=test\\\\.gco", "form-data", dict(filename="test\\\\.gco")),
("form-data; filename=\"test\\\\.gco\"", "form-data", dict(filename="test\\.gco"))
)
@unpack
def test_parse_header_strip_quotes(self, value, expected_key, expected_dict):
from octoprint.server.util.tornado import _parse_header
actual_key, actual_dict = _parse_header(value)
self.assertEqual(expected_key, actual_key)
self.assertDictEqual(expected_dict, actual_dict)
@data(
("form-data; filename=test.gco", "form-data", dict(filename="test.gco")),
("form-data; filename=\"test.gco\"", "form-data", dict(filename="\"test.gco\"")),
("form-data; filename=test\\\\.gco", "form-data", dict(filename="test\\\\.gco")),
("form-data; filename=\"test\\\\.gco\"", "form-data", dict(filename="\"test\\\\.gco\"")),
("form-data; filename=iso-8859-1'en'test.gco", "form-data", dict(filename="iso-8859-1'en'test.gco"))
)
@unpack
def test_parse_header_leave_quotes(self, value, expected_key, expected_dict):
from octoprint.server.util.tornado import _parse_header
actual_key, actual_dict = _parse_header(value, strip_quotes=False)
self.assertEqual(expected_key, actual_key)
self.assertDictEqual(expected_dict, actual_dict)
##~~ _strip_value_quotes
@ddt
class StripValueQuotesTest(unittest.TestCase):
@data(
("", ""),
(None, None),
('"test.gco"', "test.gco"),
('"test".gco', '"test".gco'),
("test\\\\.gco", "test\\\\.gco"),
('"test\\\\.gco"', "test\\.gco")
)
@unpack
def test_strip_value_quotes(self, value, expected):
from octoprint.server.util.tornado import _strip_value_quotes
actual = _strip_value_quotes(value)
self.assertEqual(expected, actual)
##~~ _extended_header_value
@ddt
class ExtendedHeaderValueTest(unittest.TestCase):
@data(
("", u""),
(None, None),
('"quoted-string"', u"quoted-string"),
("iso-8859-1'en'%A3%20rates", u"£ rates"),
("UTF-8''%c2%a3%20and%20%e2%82%ac%20rates", u"£ and € rates")
)
@unpack
def test_extended_header_value(self, value, expected):
from octoprint.server.util.tornado import _extended_header_value
actual = _extended_header_value(value)
self.assertEqual(expected, actual)