From 859ea0d0e486082ed04cad398b21770756ea11e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gina=20H=C3=A4u=C3=9Fge?= Date: Wed, 7 Oct 2015 17:19:18 +0200 Subject: [PATCH] Added mime type guessing for downloads to Tornado --- src/octoprint/filemanager/__init__.py | 47 ++++++++++++- src/octoprint/server/__init__.py | 7 +- src/octoprint/server/util/tornado.py | 12 +++- tests/filemanager/__init__.py | 96 +++++++++++++++++++++++++++ 4 files changed, 157 insertions(+), 5 deletions(-) diff --git a/src/octoprint/filemanager/__init__.py b/src/octoprint/filemanager/__init__.py index fad1fc41..34f31680 100644 --- a/src/octoprint/filemanager/__init__.py +++ b/src/octoprint/filemanager/__init__.py @@ -18,6 +18,11 @@ from .analysis import QueueEntry, AnalysisQueue from .storage import LocalFileStorage from .util import AbstractFileWrapper, StreamWrapper, DiskFileWrapper +from collections import namedtuple + +ContentTypeMapping = namedtuple("ContentTypeMapping", "extensions, content_type") +ContentTypeDetector = namedtuple("ContentTypeDetector", "extensions, detector") + extensions = dict( ) @@ -25,11 +30,11 @@ def full_extension_tree(): result = dict( # extensions for 3d model files model=dict( - stl=["stl"] + stl=ContentTypeMapping(["stl"], "application/sla") ), # extensions for printable machine code machinecode=dict( - gcode=["gcode", "gco", "g"] + gcode=ContentTypeMapping(["gcode", "gco", "g"], "text/plain") ) ) @@ -68,8 +73,12 @@ def get_all_extensions(subtree=None): for key, value in subtree.items(): if isinstance(value, dict): result += get_all_extensions(value) + elif isinstance(value, (ContentTypeMapping, ContentTypeDetector)): + result += value.extensions elif isinstance(value, (list, tuple)): result += value + elif isinstance(subtree, (ContentTypeMapping, ContentTypeDetector)): + result = subtree.extensions elif isinstance(subtree, (list, tuple)): result = subtree return result @@ -79,7 +88,9 @@ def get_path_for_extension(extension, subtree=None): subtree = full_extension_tree() for key, value in subtree.items(): - if isinstance(value, (list, tuple)) and extension in value: + if isinstance(value, (ContentTypeMapping, ContentTypeDetector)) and extension in value.extensions: + return [key] + elif isinstance(value, (list, tuple)) and extension in value: return [key] elif isinstance(value, dict): path = get_path_for_extension(extension, subtree=value) @@ -88,6 +99,23 @@ def get_path_for_extension(extension, subtree=None): return None +def get_content_type_mapping_for_extension(extension, subtree=None): + if not subtree: + subtree = full_extension_tree() + + for key, value in subtree.items(): + content_extension_matches = isinstance(value, (ContentTypeMapping, ContentTypeDetector)) and extension in value. extensions + list_extension_matches = isinstance(value, (list, tuple)) and extension in value + + if content_extension_matches or list_extension_matches: + return value + elif isinstance(value, dict): + result = get_content_type_mapping_for_extension(extension, subtree=value) + if result is not None: + return result + + return None + def valid_extension(extension, type=None): if not type: return extension in get_all_extensions() @@ -106,6 +134,19 @@ def get_file_type(filename): extension = extension[1:].lower() return get_path_for_extension(extension) +def get_mime_type(filename): + _, extension = os.path.splitext(filename) + extension = extension[1:].lower() + mapping = get_content_type_mapping_for_extension(extension) + if mapping: + if isinstance(mapping, ContentTypeMapping) and mapping.content_type is not None: + return mapping.content_type + elif isinstance(mapping, ContentTypeDetector) and callable(mapping.detector): + result = mapping.detector(filename) + if result is not None: + return result + return "application/octet-stream" + class NoSuchStorage(Exception): pass diff --git a/src/octoprint/server/__init__.py b/src/octoprint/server/__init__.py index a6a6aa24..f76f9cb8 100644 --- a/src/octoprint/server/__init__.py +++ b/src/octoprint/server/__init__.py @@ -330,10 +330,15 @@ class Server(): upload_suffixes = dict(name=s.get(["server", "uploads", "nameSuffix"]), path=s.get(["server", "uploads", "pathSuffix"])) + def mime_type_guesser(path): + from octoprint.filemanager import get_mime_type + return get_mime_type(path) + download_handler_kwargs = dict( as_attachment=True, allow_client_caching=False ) + additional_mime_types=dict(mime_type_guesser=mime_type_guesser) admin_validator = dict(access_validation=util.tornado.access_validation_factory(app, loginManager, util.flask.user_validator)) no_hidden_files_validator = dict(path_validation=util.tornado.path_validation_factory(lambda path: not os.path.basename(path).startswith("."), status_code=404)) @@ -349,7 +354,7 @@ class Server(): server_routes = self._router.urls + [ # various downloads (r"/downloads/timelapse/([^/]*\.mpg)", util.tornado.LargeResponseHandler, joined_dict(dict(path=s.getBaseFolder("timelapse")), download_handler_kwargs, no_hidden_files_validator)), - (r"/downloads/files/local/(.*)", util.tornado.LargeResponseHandler, joined_dict(dict(path=s.getBaseFolder("uploads")), download_handler_kwargs, no_hidden_files_validator)), + (r"/downloads/files/local/(.*)", util.tornado.LargeResponseHandler, joined_dict(dict(path=s.getBaseFolder("uploads")), download_handler_kwargs, no_hidden_files_validator, additional_mime_types)), (r"/downloads/logs/([^/]*)", util.tornado.LargeResponseHandler, joined_dict(dict(path=s.getBaseFolder("logs")), download_handler_kwargs, admin_validator)), # camera snapshot (r"/downloads/camera/current", util.tornado.UrlProxyHandler, dict(url=s.get(["webcam", "snapshot"]), as_attachment=True, access_validation=util.tornado.access_validation_factory(app, loginManager, util.flask.user_validator))), diff --git a/src/octoprint/server/util/tornado.py b/src/octoprint/server/util/tornado.py index d69f3036..30753f34 100644 --- a/src/octoprint/server/util/tornado.py +++ b/src/octoprint/server/util/tornado.py @@ -785,13 +785,15 @@ class LargeResponseHandler(tornado.web.StaticFileHandler): """ def initialize(self, path, default_filename=None, as_attachment=False, allow_client_caching=True, - access_validation=None, path_validation=None, etag_generator=None): + access_validation=None, path_validation=None, etag_generator=None, + mime_type_guesser=None): tornado.web.StaticFileHandler.initialize(self, os.path.abspath(path), default_filename) self._as_attachment = as_attachment self._allow_client_caching = allow_client_caching self._access_validation = access_validation self._path_validation = path_validation self._etag_generator = etag_generator + self._mime_type_guesser = mime_type_guesser def get(self, path, include_body=True): if self._access_validation is not None: @@ -815,6 +817,14 @@ class LargeResponseHandler(tornado.web.StaticFileHandler): else: return self.get_content_version(self.absolute_path) + def get_content_type(self): + if self._mime_type_guesser is not None: + type = self._mime_type_guesser(self.absolute_path) + if type is not None: + return type + + return tornado.web.StaticFileHandler.get_content_type(self) + @classmethod def get_content_version(cls, abspath): import os diff --git a/tests/filemanager/__init__.py b/tests/filemanager/__init__.py index 65f33057..107fc063 100644 --- a/tests/filemanager/__init__.py +++ b/tests/filemanager/__init__.py @@ -9,3 +9,99 @@ __author__ = "Gina Häußge " __license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html' __copyright__ = "Copyright (C) 2014 The OctoPrint Project - Released under terms of the AGPLv3 License" +import unittest +import mock + +import octoprint.filemanager + +class FilemanagerUtilTest(unittest.TestCase): + + def setUp(self): + # mock plugin manager + self.plugin_manager_patcher = mock.patch("octoprint.plugin.plugin_manager") + self.plugin_manager_getter = self.plugin_manager_patcher.start() + + self.plugin_manager = mock.MagicMock() + + hook_extensions = dict( + some_plugin=lambda: dict(dict(machinecode=dict(foo=["foo", "f"]))), + other_plugin=lambda: dict(dict(model=dict(amf=["amf"]))), + mime_map=lambda: dict( + mime_map=dict( + mime_map_yes=octoprint.filemanager.ContentTypeMapping(["mime_map_yes"], "application/mime_map_yes") + ) + ), + mime_detect=lambda: dict( + dict( + machinecode=dict( + mime_detect_yes=octoprint.filemanager.ContentTypeDetector(["mime_detect_yes"], lambda x: "application/mime_detect_yes"), + mime_detect_no=octoprint.filemanager.ContentTypeDetector(["mime_detect_no"], lambda x: None) + ) + ) + ) + ) + self.plugin_manager.get_hooks.return_value = hook_extensions + + self.plugin_manager_getter.return_value = self.plugin_manager + + def tearDown(self): + self.plugin_manager_patcher.stop() + + def test_full_extension_tree(self): + full = octoprint.filemanager.full_extension_tree() + self.assertTrue("machinecode" in full) + self.assertTrue("gcode" in full["machinecode"]) + self.assertTrue(isinstance(full["machinecode"]["gcode"], octoprint.filemanager.ContentTypeMapping)) + self.assertItemsEqual(["gcode", "gco", "g"], full["machinecode"]["gcode"].extensions) + self.assertTrue("foo" in full["machinecode"]) + self.assertTrue(isinstance(full["machinecode"]["foo"], list)) + self.assertItemsEqual(["f", "foo"], full["machinecode"]["foo"]) + + self.assertTrue("model" in full) + self.assertTrue("stl" in full["model"]) + self.assertTrue(isinstance(full["model"]["stl"], octoprint.filemanager.ContentTypeMapping)) + self.assertItemsEqual(["stl"], full["model"]["stl"].extensions) + self.assertTrue("amf" in full["model"]) + self.assertTrue(isinstance(full["model"]["amf"], list)) + self.assertItemsEqual(["amf"], full["model"]["amf"]) + + def test_get_mimetype(self): + self.assertEquals(octoprint.filemanager.get_mime_type("foo.stl"), "application/sla") + self.assertEquals(octoprint.filemanager.get_mime_type("foo.gcode"), "text/plain") + self.assertEquals(octoprint.filemanager.get_mime_type("foo.unknown"), "application/octet-stream") + self.assertEquals(octoprint.filemanager.get_mime_type("foo.mime_map_yes"), "application/mime_map_yes") + self.assertEquals(octoprint.filemanager.get_mime_type("foo.mime_map_no"), "application/octet-stream") + self.assertEquals(octoprint.filemanager.get_mime_type("foo.mime_detect_yes"), "application/mime_detect_yes") + self.assertEquals(octoprint.filemanager.get_mime_type("foo.mime_detect_no"), "application/octet-stream") + + def test_valid_file_type(self): + self.assertTrue(octoprint.filemanager.valid_file_type("foo.stl", type="model")) + self.assertTrue(octoprint.filemanager.valid_file_type("foo.stl", type="stl")) + self.assertFalse(octoprint.filemanager.valid_file_type("foo.stl", type="machinecode")) + self.assertTrue(octoprint.filemanager.valid_file_type("foo.foo", type="machinecode")) + self.assertTrue(octoprint.filemanager.valid_file_type("foo.foo", type="foo")) + self.assertTrue(octoprint.filemanager.valid_file_type("foo.foo")) + self.assertTrue(octoprint.filemanager.valid_file_type("foo.mime_map_yes")) + self.assertTrue(octoprint.filemanager.valid_file_type("foo.mime_detect_yes")) + self.assertFalse(octoprint.filemanager.valid_file_type("foo.unknown")) + + def test_get_file_type(self): + self.assertEquals(["machinecode", "gcode"], octoprint.filemanager.get_file_type("foo.gcode")) + self.assertEquals(["machinecode", "gcode"], octoprint.filemanager.get_file_type("foo.gco")) + self.assertEquals(["machinecode", "foo"], octoprint.filemanager.get_file_type("foo.f")) + self.assertEquals(["model", "stl"], octoprint.filemanager.get_file_type("foo.stl")) + self.assertEquals(["model", "amf"], octoprint.filemanager.get_file_type("foo.amf")) + self.assertIsNone(octoprint.filemanager.get_file_type("foo.unknown")) + + def test_hook_failure(self): + def hook(): + raise RuntimeError("Boo!") + self.plugin_manager.get_hooks.return_value = dict(hook=hook) + + with mock.patch("octoprint.filemanager.logging") as patched_logging: + logger = mock.MagicMock() + patched_logging.getLogger.return_value = logger + + octoprint.filemanager.get_all_extensions() + + self.assertEquals(1, len(logger.mock_calls))