Merge branch 'fix/contentTypeForDownloads' into devel

This commit is contained in:
Gina Häußge 2015-10-07 17:22:21 +02:00
commit 4017f96ed5
4 changed files with 157 additions and 5 deletions

View file

@ -18,6 +18,11 @@ from .analysis import QueueEntry, AnalysisQueue
from .storage import LocalFileStorage
from .util import AbstractFileWrapper, StreamWrapper, DiskFileWrapper
from collections import namedtuple
ContentTypeMapping = namedtuple("ContentTypeMapping", "extensions, content_type")
ContentTypeDetector = namedtuple("ContentTypeDetector", "extensions, detector")
extensions = dict(
)
@ -25,11 +30,11 @@ def full_extension_tree():
result = dict(
# extensions for 3d model files
model=dict(
stl=["stl"]
stl=ContentTypeMapping(["stl"], "application/sla")
),
# extensions for printable machine code
machinecode=dict(
gcode=["gcode", "gco", "g"]
gcode=ContentTypeMapping(["gcode", "gco", "g"], "text/plain")
)
)
@ -68,8 +73,12 @@ def get_all_extensions(subtree=None):
for key, value in subtree.items():
if isinstance(value, dict):
result += get_all_extensions(value)
elif isinstance(value, (ContentTypeMapping, ContentTypeDetector)):
result += value.extensions
elif isinstance(value, (list, tuple)):
result += value
elif isinstance(subtree, (ContentTypeMapping, ContentTypeDetector)):
result = subtree.extensions
elif isinstance(subtree, (list, tuple)):
result = subtree
return result
@ -79,7 +88,9 @@ def get_path_for_extension(extension, subtree=None):
subtree = full_extension_tree()
for key, value in subtree.items():
if isinstance(value, (list, tuple)) and extension in value:
if isinstance(value, (ContentTypeMapping, ContentTypeDetector)) and extension in value.extensions:
return [key]
elif isinstance(value, (list, tuple)) and extension in value:
return [key]
elif isinstance(value, dict):
path = get_path_for_extension(extension, subtree=value)
@ -88,6 +99,23 @@ def get_path_for_extension(extension, subtree=None):
return None
def get_content_type_mapping_for_extension(extension, subtree=None):
if not subtree:
subtree = full_extension_tree()
for key, value in subtree.items():
content_extension_matches = isinstance(value, (ContentTypeMapping, ContentTypeDetector)) and extension in value. extensions
list_extension_matches = isinstance(value, (list, tuple)) and extension in value
if content_extension_matches or list_extension_matches:
return value
elif isinstance(value, dict):
result = get_content_type_mapping_for_extension(extension, subtree=value)
if result is not None:
return result
return None
def valid_extension(extension, type=None):
if not type:
return extension in get_all_extensions()
@ -106,6 +134,19 @@ def get_file_type(filename):
extension = extension[1:].lower()
return get_path_for_extension(extension)
def get_mime_type(filename):
_, extension = os.path.splitext(filename)
extension = extension[1:].lower()
mapping = get_content_type_mapping_for_extension(extension)
if mapping:
if isinstance(mapping, ContentTypeMapping) and mapping.content_type is not None:
return mapping.content_type
elif isinstance(mapping, ContentTypeDetector) and callable(mapping.detector):
result = mapping.detector(filename)
if result is not None:
return result
return "application/octet-stream"
class NoSuchStorage(Exception):
pass

View file

@ -332,10 +332,15 @@ class Server(object):
upload_suffixes = dict(name=s.get(["server", "uploads", "nameSuffix"]), path=s.get(["server", "uploads", "pathSuffix"]))
def mime_type_guesser(path):
from octoprint.filemanager import get_mime_type
return get_mime_type(path)
download_handler_kwargs = dict(
as_attachment=True,
allow_client_caching=False
)
additional_mime_types=dict(mime_type_guesser=mime_type_guesser)
admin_validator = dict(access_validation=util.tornado.access_validation_factory(app, loginManager, util.flask.user_validator))
no_hidden_files_validator = dict(path_validation=util.tornado.path_validation_factory(lambda path: not os.path.basename(path).startswith("."), status_code=404))
@ -351,7 +356,7 @@ class Server(object):
server_routes = self._router.urls + [
# various downloads
(r"/downloads/timelapse/([^/]*\.mpg)", util.tornado.LargeResponseHandler, joined_dict(dict(path=s.getBaseFolder("timelapse")), download_handler_kwargs, no_hidden_files_validator)),
(r"/downloads/files/local/(.*)", util.tornado.LargeResponseHandler, joined_dict(dict(path=s.getBaseFolder("uploads")), download_handler_kwargs, no_hidden_files_validator)),
(r"/downloads/files/local/(.*)", util.tornado.LargeResponseHandler, joined_dict(dict(path=s.getBaseFolder("uploads")), download_handler_kwargs, no_hidden_files_validator, additional_mime_types)),
(r"/downloads/logs/([^/]*)", util.tornado.LargeResponseHandler, joined_dict(dict(path=s.getBaseFolder("logs")), download_handler_kwargs, admin_validator)),
# camera snapshot
(r"/downloads/camera/current", util.tornado.UrlProxyHandler, dict(url=s.get(["webcam", "snapshot"]), as_attachment=True, access_validation=util.tornado.access_validation_factory(app, loginManager, util.flask.user_validator))),

View file

@ -785,13 +785,15 @@ class LargeResponseHandler(tornado.web.StaticFileHandler):
"""
def initialize(self, path, default_filename=None, as_attachment=False, allow_client_caching=True,
access_validation=None, path_validation=None, etag_generator=None):
access_validation=None, path_validation=None, etag_generator=None,
mime_type_guesser=None):
tornado.web.StaticFileHandler.initialize(self, os.path.abspath(path), default_filename)
self._as_attachment = as_attachment
self._allow_client_caching = allow_client_caching
self._access_validation = access_validation
self._path_validation = path_validation
self._etag_generator = etag_generator
self._mime_type_guesser = mime_type_guesser
def get(self, path, include_body=True):
if self._access_validation is not None:
@ -815,6 +817,14 @@ class LargeResponseHandler(tornado.web.StaticFileHandler):
else:
return self.get_content_version(self.absolute_path)
def get_content_type(self):
if self._mime_type_guesser is not None:
type = self._mime_type_guesser(self.absolute_path)
if type is not None:
return type
return tornado.web.StaticFileHandler.get_content_type(self)
@classmethod
def get_content_version(cls, abspath):
import os

View file

@ -9,3 +9,99 @@ __author__ = "Gina Häußge <osd@foosel.net>"
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
__copyright__ = "Copyright (C) 2014 The OctoPrint Project - Released under terms of the AGPLv3 License"
import unittest
import mock
import octoprint.filemanager
class FilemanagerUtilTest(unittest.TestCase):
def setUp(self):
# mock plugin manager
self.plugin_manager_patcher = mock.patch("octoprint.plugin.plugin_manager")
self.plugin_manager_getter = self.plugin_manager_patcher.start()
self.plugin_manager = mock.MagicMock()
hook_extensions = dict(
some_plugin=lambda: dict(dict(machinecode=dict(foo=["foo", "f"]))),
other_plugin=lambda: dict(dict(model=dict(amf=["amf"]))),
mime_map=lambda: dict(
mime_map=dict(
mime_map_yes=octoprint.filemanager.ContentTypeMapping(["mime_map_yes"], "application/mime_map_yes")
)
),
mime_detect=lambda: dict(
dict(
machinecode=dict(
mime_detect_yes=octoprint.filemanager.ContentTypeDetector(["mime_detect_yes"], lambda x: "application/mime_detect_yes"),
mime_detect_no=octoprint.filemanager.ContentTypeDetector(["mime_detect_no"], lambda x: None)
)
)
)
)
self.plugin_manager.get_hooks.return_value = hook_extensions
self.plugin_manager_getter.return_value = self.plugin_manager
def tearDown(self):
self.plugin_manager_patcher.stop()
def test_full_extension_tree(self):
full = octoprint.filemanager.full_extension_tree()
self.assertTrue("machinecode" in full)
self.assertTrue("gcode" in full["machinecode"])
self.assertTrue(isinstance(full["machinecode"]["gcode"], octoprint.filemanager.ContentTypeMapping))
self.assertItemsEqual(["gcode", "gco", "g"], full["machinecode"]["gcode"].extensions)
self.assertTrue("foo" in full["machinecode"])
self.assertTrue(isinstance(full["machinecode"]["foo"], list))
self.assertItemsEqual(["f", "foo"], full["machinecode"]["foo"])
self.assertTrue("model" in full)
self.assertTrue("stl" in full["model"])
self.assertTrue(isinstance(full["model"]["stl"], octoprint.filemanager.ContentTypeMapping))
self.assertItemsEqual(["stl"], full["model"]["stl"].extensions)
self.assertTrue("amf" in full["model"])
self.assertTrue(isinstance(full["model"]["amf"], list))
self.assertItemsEqual(["amf"], full["model"]["amf"])
def test_get_mimetype(self):
self.assertEquals(octoprint.filemanager.get_mime_type("foo.stl"), "application/sla")
self.assertEquals(octoprint.filemanager.get_mime_type("foo.gcode"), "text/plain")
self.assertEquals(octoprint.filemanager.get_mime_type("foo.unknown"), "application/octet-stream")
self.assertEquals(octoprint.filemanager.get_mime_type("foo.mime_map_yes"), "application/mime_map_yes")
self.assertEquals(octoprint.filemanager.get_mime_type("foo.mime_map_no"), "application/octet-stream")
self.assertEquals(octoprint.filemanager.get_mime_type("foo.mime_detect_yes"), "application/mime_detect_yes")
self.assertEquals(octoprint.filemanager.get_mime_type("foo.mime_detect_no"), "application/octet-stream")
def test_valid_file_type(self):
self.assertTrue(octoprint.filemanager.valid_file_type("foo.stl", type="model"))
self.assertTrue(octoprint.filemanager.valid_file_type("foo.stl", type="stl"))
self.assertFalse(octoprint.filemanager.valid_file_type("foo.stl", type="machinecode"))
self.assertTrue(octoprint.filemanager.valid_file_type("foo.foo", type="machinecode"))
self.assertTrue(octoprint.filemanager.valid_file_type("foo.foo", type="foo"))
self.assertTrue(octoprint.filemanager.valid_file_type("foo.foo"))
self.assertTrue(octoprint.filemanager.valid_file_type("foo.mime_map_yes"))
self.assertTrue(octoprint.filemanager.valid_file_type("foo.mime_detect_yes"))
self.assertFalse(octoprint.filemanager.valid_file_type("foo.unknown"))
def test_get_file_type(self):
self.assertEquals(["machinecode", "gcode"], octoprint.filemanager.get_file_type("foo.gcode"))
self.assertEquals(["machinecode", "gcode"], octoprint.filemanager.get_file_type("foo.gco"))
self.assertEquals(["machinecode", "foo"], octoprint.filemanager.get_file_type("foo.f"))
self.assertEquals(["model", "stl"], octoprint.filemanager.get_file_type("foo.stl"))
self.assertEquals(["model", "amf"], octoprint.filemanager.get_file_type("foo.amf"))
self.assertIsNone(octoprint.filemanager.get_file_type("foo.unknown"))
def test_hook_failure(self):
def hook():
raise RuntimeError("Boo!")
self.plugin_manager.get_hooks.return_value = dict(hook=hook)
with mock.patch("octoprint.filemanager.logging") as patched_logging:
logger = mock.MagicMock()
patched_logging.getLogger.return_value = logger
octoprint.filemanager.get_all_extensions()
self.assertEquals(1, len(logger.mock_calls))