MrDraw/tests/filemanager/test_filemanager.py
Gina Häußge 42ac133436 Support full UTF8 file names
Files (and folders) are still slugified to ASCII for storage on disk,
but now the original filename is stored alongside in metadata.yaml
and used for display in the file list and print status.

The slicing dialog also inherits that display name for use as base
for the suggested GCODE name.

Internally, everything still depends completely on the slugified
ASCII version.

This implements #2094
2017-10-12 18:08:41 +02:00

569 lines
25 KiB
Python

# coding=utf-8
from __future__ import absolute_import, division, print_function
__author__ = "Gina Häußge <osd@foosel.net>"
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
__copyright__ = "Copyright (C) 2014 The OctoPrint Project - Released under terms of the AGPLv3 License"
import io
import unittest
import mock
import octoprint.filemanager
import octoprint.filemanager.util
import octoprint.settings
class FilemanagerMethodTest(unittest.TestCase):
def setUp(self):
# mock plugin manager
self.plugin_manager_patcher = mock.patch("octoprint.plugin.plugin_manager")
self.plugin_manager_getter = self.plugin_manager_patcher.start()
self.plugin_manager = mock.MagicMock()
hook_extensions = dict(
some_plugin=lambda: dict(dict(machinecode=dict(foo=["foo", "f"]))),
other_plugin=lambda: dict(dict(model=dict(amf=["amf"]))),
mime_map=lambda: dict(
mime_map=dict(
mime_map_yes=octoprint.filemanager.ContentTypeMapping(["mime_map_yes"], "application/mime_map_yes")
)
),
mime_detect=lambda: dict(
dict(
machinecode=dict(
mime_detect_yes=octoprint.filemanager.ContentTypeDetector(["mime_detect_yes"], lambda x: "application/mime_detect_yes"),
mime_detect_no=octoprint.filemanager.ContentTypeDetector(["mime_detect_no"], lambda x: None)
)
)
)
)
self.plugin_manager.get_hooks.return_value = hook_extensions
self.plugin_manager_getter.return_value = self.plugin_manager
def tearDown(self):
self.plugin_manager_patcher.stop()
def test_full_extension_tree(self):
full = octoprint.filemanager.full_extension_tree()
self.assertTrue("machinecode" in full)
self.assertTrue("gcode" in full["machinecode"])
self.assertTrue(isinstance(full["machinecode"]["gcode"], octoprint.filemanager.ContentTypeMapping))
self.assertItemsEqual(["gcode", "gco", "g"], full["machinecode"]["gcode"].extensions)
self.assertTrue("foo" in full["machinecode"])
self.assertTrue(isinstance(full["machinecode"]["foo"], list))
self.assertItemsEqual(["f", "foo"], full["machinecode"]["foo"])
self.assertTrue("model" in full)
self.assertTrue("stl" in full["model"])
self.assertTrue(isinstance(full["model"]["stl"], octoprint.filemanager.ContentTypeMapping))
self.assertItemsEqual(["stl"], full["model"]["stl"].extensions)
self.assertTrue("amf" in full["model"])
self.assertTrue(isinstance(full["model"]["amf"], list))
self.assertItemsEqual(["amf"], full["model"]["amf"])
def test_get_mimetype(self):
self.assertEqual(octoprint.filemanager.get_mime_type("foo.stl"), "application/sla")
self.assertEqual(octoprint.filemanager.get_mime_type("foo.gcode"), "text/plain")
self.assertEqual(octoprint.filemanager.get_mime_type("foo.unknown"), "application/octet-stream")
self.assertEqual(octoprint.filemanager.get_mime_type("foo.mime_map_yes"), "application/mime_map_yes")
self.assertEqual(octoprint.filemanager.get_mime_type("foo.mime_map_no"), "application/octet-stream")
self.assertEqual(octoprint.filemanager.get_mime_type("foo.mime_detect_yes"), "application/mime_detect_yes")
self.assertEqual(octoprint.filemanager.get_mime_type("foo.mime_detect_no"), "application/octet-stream")
def test_valid_file_type(self):
self.assertTrue(octoprint.filemanager.valid_file_type("foo.stl", type="model"))
self.assertTrue(octoprint.filemanager.valid_file_type("foo.stl", type="stl"))
self.assertFalse(octoprint.filemanager.valid_file_type("foo.stl", type="machinecode"))
self.assertTrue(octoprint.filemanager.valid_file_type("foo.foo", type="machinecode"))
self.assertTrue(octoprint.filemanager.valid_file_type("foo.foo", type="foo"))
self.assertTrue(octoprint.filemanager.valid_file_type("foo.foo"))
self.assertTrue(octoprint.filemanager.valid_file_type("foo.mime_map_yes"))
self.assertTrue(octoprint.filemanager.valid_file_type("foo.mime_detect_yes"))
self.assertFalse(octoprint.filemanager.valid_file_type("foo.unknown"))
def test_get_file_type(self):
self.assertEqual(["machinecode", "gcode"], octoprint.filemanager.get_file_type("foo.gcode"))
self.assertEqual(["machinecode", "gcode"], octoprint.filemanager.get_file_type("foo.gco"))
self.assertEqual(["machinecode", "foo"], octoprint.filemanager.get_file_type("foo.f"))
self.assertEqual(["model", "stl"], octoprint.filemanager.get_file_type("foo.stl"))
self.assertEqual(["model", "amf"], octoprint.filemanager.get_file_type("foo.amf"))
self.assertIsNone(octoprint.filemanager.get_file_type("foo.unknown"))
def test_hook_failure(self):
def hook():
raise RuntimeError("Boo!")
self.plugin_manager.get_hooks.return_value = dict(hook=hook)
with mock.patch("octoprint.filemanager.logging") as patched_logging:
logger = mock.MagicMock()
patched_logging.getLogger.return_value = logger
octoprint.filemanager.get_all_extensions()
self.assertEqual(1, len(logger.mock_calls))
class FileManagerTest(unittest.TestCase):
def setUp(self):
import octoprint.slicing
import octoprint.filemanager.storage
import octoprint.printer.profile
self.addCleanup(self.cleanUp)
# mock event manager
self.event_manager_patcher = mock.patch("octoprint.filemanager.eventManager")
event_manager = self.event_manager_patcher.start()
event_manager.return_value.fire = mock.MagicMock()
self.fire_event = event_manager.return_value.fire
# mock plugin manager
self.plugin_manager_patcher = mock.patch("octoprint.plugin.plugin_manager")
self.plugin_manager = self.plugin_manager_patcher.start()
# mock settings
self.settings_patcher = mock.patch("octoprint.settings.settings")
self.settings_getter = self.settings_patcher.start()
self.settings = mock.create_autospec(octoprint.settings.Settings)
self.settings.getBaseFolder.return_value = "/path/to/a/base_folder"
self.settings_getter.return_value = self.settings
self.analysis_queue = mock.MagicMock(spec=octoprint.filemanager.AnalysisQueue)
self.slicing_manager = mock.MagicMock(spec=octoprint.slicing.SlicingManager)
self.printer_profile_manager = mock.MagicMock(spec=octoprint.printer.profile.PrinterProfileManager)
self.local_storage = mock.MagicMock(spec=octoprint.filemanager.storage.LocalFileStorage)
self.local_storage.analysis_backlog = iter([])
self.storage_managers = dict()
self.storage_managers[octoprint.filemanager.FileDestinations.LOCAL] = self.local_storage
self.file_manager = octoprint.filemanager.FileManager(self.analysis_queue, self.slicing_manager, self.printer_profile_manager, initial_storage_managers=self.storage_managers)
def cleanUp(self):
self.event_manager_patcher.stop()
self.plugin_manager_patcher.stop()
self.settings_patcher.stop()
def test_add_file(self):
wrapper = object()
self.local_storage.add_file.return_value = ("", "test.gcode")
self.local_storage.path_on_disk.return_value = "prefix/test.gcode"
self.local_storage.split_path.return_value = ("", "test.gcode")
test_profile = dict(id="_default", name="My Default Profile")
self.printer_profile_manager.get_current_or_default.return_value = test_profile
file_path = self.file_manager.add_file(octoprint.filemanager.FileDestinations.LOCAL, "test.gcode", wrapper)
self.assertEqual(("", "test.gcode"), file_path)
self.local_storage.add_file.assert_called_once_with("test.gcode", wrapper,
printer_profile=test_profile,
allow_overwrite=False,
links=None,
display=None)
expected_events = [mock.call(octoprint.filemanager.Events.FILE_ADDED, dict(storage=octoprint.filemanager.FileDestinations.LOCAL,
name="test.gcode",
path="test.gcode",
type=["machinecode", "gcode"])),
mock.call(octoprint.filemanager.Events.UPDATED_FILES, dict(type="printables"))]
self.fire_event.call_args_list = expected_events
def test_add_file_display(self):
wrapper = object()
self.local_storage.add_file.return_value = ("", "test.gcode")
self.local_storage.path_on_disk.return_value = "prefix/test.gcode"
self.local_storage.split_path.return_value = ("", "test.gcode")
test_profile = dict(id="_default", name="My Default Profile")
self.printer_profile_manager.get_current_or_default.return_value = test_profile
file_path = self.file_manager.add_file(octoprint.filemanager.FileDestinations.LOCAL, "test.gcode", wrapper,
display=u"täst.gcode")
self.assertEqual(("", "test.gcode"), file_path)
self.local_storage.add_file.assert_called_once_with("test.gcode", wrapper,
printer_profile=test_profile,
allow_overwrite=False,
links=None,
display=u"täst.gcode")
def test_remove_file(self):
self.local_storage.path_on_disk.return_value = "prefix/test.gcode"
self.local_storage.split_path.return_value = ("", "test.gcode")
self.file_manager.remove_file(octoprint.filemanager.FileDestinations.LOCAL, "test.gcode")
self.local_storage.remove_file.assert_called_once_with("test.gcode")
self.analysis_queue.dequeue.assert_called_once()
expected_events = [mock.call(octoprint.filemanager.Events.FILE_REMOVED, dict(storage=octoprint.filemanager.FileDestinations.LOCAL,
name="test.gcode",
path="test.gcode",
type=["machinecode", "gcode"])),
mock.call(octoprint.filemanager.Events.UPDATED_FILES, dict(type="printables"))]
self.fire_event.call_args_list = expected_events
def test_add_folder(self):
self.local_storage.add_folder.return_value = ("", "test_folder")
self.local_storage.split_path.return_value = ("", "test_folder")
folder_path = self.file_manager.add_folder(octoprint.filemanager.FileDestinations.LOCAL, "test_folder")
self.assertEqual(("", "test_folder"), folder_path)
self.local_storage.add_folder.assert_called_once_with("test_folder",
ignore_existing=True,
display=None)
expected_events = [mock.call(octoprint.filemanager.Events.FOLDER_ADDED, dict(storage=octoprint.filemanager.FileDestinations.LOCAL,
name="test_folder",
path="test_folder")),
mock.call(octoprint.filemanager.Events.UPDATED_FILES, dict(type="printables"))]
self.fire_event.call_args_list = expected_events
def test_add_folder_not_ignoring_existing(self):
self.local_storage.add_folder.side_effect = RuntimeError("already there")
try:
self.file_manager.add_folder(octoprint.filemanager.FileDestinations.LOCAL, "test_folder", ignore_existing=False)
self.fail("Expected an exception to occur!")
except RuntimeError as e:
self.assertEqual("already there", e.message)
self.local_storage.add_folder.assert_called_once_with("test_folder", ignore_existing=False, display=None)
def test_add_folder_display(self):
self.local_storage.add_folder.side_effect = RuntimeError("already there")
try:
self.file_manager.add_folder(octoprint.filemanager.FileDestinations.LOCAL, "test_folder",
display=u"täst_folder")
self.fail("Expected an exception to occur!")
except RuntimeError as e:
self.assertEqual("already there", e.message)
self.local_storage.add_folder.assert_called_once_with("test_folder",
ignore_existing=True,
display=u"täst_folder")
def test_remove_folder(self):
self.local_storage.split_path.return_value = ("", "test_folder")
self.file_manager.remove_folder(octoprint.filemanager.FileDestinations.LOCAL, "test_folder")
self.local_storage.remove_folder.assert_called_once_with("test_folder", recursive=True)
self.analysis_queue.dequeue_folder.assert_called_once_with(octoprint.filemanager.FileDestinations.LOCAL, "test_folder")
expected_events = [mock.call(octoprint.filemanager.Events.FOLDER_REMOVED, dict(storage=octoprint.filemanager.FileDestinations.LOCAL,
name="test_folder",
path="test_folder")),
mock.call(octoprint.filemanager.Events.UPDATED_FILES, dict(type="printables"))]
self.fire_event.call_args_list = expected_events
def test_remove_folder_nonrecursive(self):
self.local_storage.split_path.return_value = ("", "test_folder")
self.file_manager.remove_folder(octoprint.filemanager.FileDestinations.LOCAL, "test_folder", recursive=False)
self.local_storage.remove_folder.assert_called_once_with("test_folder", recursive=False)
self.analysis_queue.dequeue_folder.assert_called_once_with(octoprint.filemanager.FileDestinations.LOCAL, "test_folder")
@mock.patch("octoprint.util.atomic_write", create=True)
@mock.patch("yaml.safe_dump", create=True)
@mock.patch("time.time")
def test_save_recovery_data(self, mock_time, mock_yaml_safe_dump, mock_atomic_write):
import os
now = 123456789
path = "some_file.gco"
pos = 1234
recovery_file = os.path.join("/path/to/a/base_folder", "print_recovery_data.yaml")
mock_atomic_write.return_value = mock.MagicMock(spec=file)
mock_atomic_write_handle = mock_atomic_write.return_value.__enter__.return_value
mock_time.return_value = now
self.local_storage.path_in_storage.return_value = path
self.file_manager.save_recovery_data(octoprint.filemanager.FileDestinations.LOCAL, path, pos)
expected = dict(origin=octoprint.filemanager.FileDestinations.LOCAL,
path=path,
pos=pos,
date=now)
mock_atomic_write.assert_called_with(recovery_file)
mock_yaml_safe_dump.assert_called_with(expected, stream=mock_atomic_write_handle, default_flow_style=False, indent=" ", allow_unicode=True)
@mock.patch("octoprint.util.atomic_write", create=True)
@mock.patch("yaml.safe_dump", create=True)
@mock.patch("time.time")
def test_save_recovery_data(self, mock_time, mock_yaml_safe_dump, mock_atomic_write):
import os
now = 123456789
path = "some_file.gco"
pos = 1234
recovery_file = os.path.join("/path/to/a/base_folder", "print_recovery_data.yaml")
mock_atomic_write.return_value = mock.MagicMock(spec=file)
mock_atomic_write_handle = mock_atomic_write.return_value.__enter__.return_value
mock_time.return_value = now
self.local_storage.path_in_storage.return_value = path
mock_yaml_safe_dump.side_effect = RuntimeError
self.file_manager.save_recovery_data(octoprint.filemanager.FileDestinations.LOCAL, path, pos)
@mock.patch("os.path.isfile")
@mock.patch("os.remove")
def test_delete_recovery_data(self, mock_remove, mock_isfile):
import os
recovery_file = os.path.join("/path/to/a/base_folder", "print_recovery_data.yaml")
mock_isfile.return_value = True
self.file_manager.delete_recovery_data()
mock_remove.assert_called_with(recovery_file)
@mock.patch("os.path.isfile")
@mock.patch("os.remove")
def test_delete_recovery_data_no_file(self, mock_remove, mock_isfile):
mock_isfile.return_value = False
self.file_manager.delete_recovery_data()
self.assertFalse(mock_remove.called)
@mock.patch("os.path.isfile")
@mock.patch("os.remove")
def test_delete_recovery_data_error(self, mock_remove, mock_isfile):
mock_isfile.return_value = True
mock_remove.side_effect = RuntimeError
self.file_manager.delete_recovery_data()
@mock.patch("os.path.isfile")
@mock.patch("yaml.safe_load")
def test_get_recovery_data(self, mock_yaml_safe_load, mock_isfile):
import os
recovery_file = os.path.join("/path/to/a/base_folder", "print_recovery_data.yaml")
mock_isfile.return_value = True
data = dict(path="some_path.gco",
origin="local",
pos=1234,
date=123456789)
mock_yaml_safe_load.return_value = data
with mock.patch("__builtin__.open", mock.mock_open(read_data=data), create=True) as m:
result = self.file_manager.get_recovery_data()
self.assertDictEqual(data, result)
m.assert_called_with(recovery_file)
mock_handle = m()
mock_yaml_safe_load.assert_called_with(mock_handle)
@mock.patch("os.path.isfile")
def test_get_recovery_data_no_file(self, mock_isfile):
mock_isfile.return_value = False
result = self.file_manager.get_recovery_data()
self.assertIsNone(result)
@mock.patch("os.path.isfile")
@mock.patch("yaml.safe_load")
@mock.patch("os.remove")
def test_get_recovery_data_broken_file(self, mock_remove, mock_yaml_safe_load, mock_isfile):
import os
recovery_file = os.path.join("/path/to/a/base_folder", "print_recovery_data.yaml")
mock_isfile.return_value = True
mock_yaml_safe_load.side_effect = RuntimeError
result = self.file_manager.get_recovery_data()
self.assertIsNone(result)
mock_remove.assert_called_with(recovery_file)
def test_get_metadata(self):
expected = dict(key="value")
self.local_storage.get_metadata.return_value = expected
metadata = self.file_manager.get_metadata(octoprint.filemanager.FileDestinations.LOCAL, "test.file")
self.assertEqual(metadata, expected)
self.local_storage.get_metadata.assert_called_once_with("test.file")
@mock.patch("octoprint.filemanager.util.atomic_write")
@mock.patch("io.FileIO")
@mock.patch("shutil.copyfileobj")
@mock.patch("os.remove")
@mock.patch("tempfile.NamedTemporaryFile")
@mock.patch("time.time", side_effect=[1411979916.422, 1411979932.116])
def test_slice(self, mocked_time, mocked_tempfile, mocked_os, mocked_shutil, mocked_fileio, mocked_atomic_write):
callback = mock.MagicMock()
callback_args = ("one", "two", "three")
# mock temporary file
temp_file = mock.MagicMock()
temp_file.name = "tmp.file"
mocked_tempfile.return_value = temp_file
# mock metadata on local storage
metadata = dict(hash="aabbccddeeff")
self.local_storage.get_metadata.return_value = metadata
# mock printer profile
expected_printer_profile = dict(id="_default", name="My Default Profile")
self.printer_profile_manager.get_current_or_default.return_value = expected_printer_profile
self.printer_profile_manager.get.return_value = None
# mock get_absolute_path method on local storage
def path_on_disk(path):
if isinstance(path, tuple):
import os
joined_path = ""
for part in path:
joined_path = os.path.join(joined_path, part)
path = joined_path
return "prefix/" + path
self.local_storage.path_on_disk.side_effect = path_on_disk
# mock split_path method on local storage - no folder support
def split_path(path):
return "", path
self.local_storage.split_path.side_effect = split_path
# mock add_file method on local storage
def add_file(path, file_obj, printer_profile=None, links=None, allow_overwrite=False, display=None):
file_obj.save("prefix/" + path)
return path
self.local_storage.add_file.side_effect = add_file
# mock slice method on slicing manager
def slice(slicer_name, source_path, dest_path, profile, done_cb, printer_profile_id=None, position=None, callback_args=None, overrides=None, on_progress=None, on_progress_args=None, on_progress_kwargs=None):
self.assertEqual("some_slicer", slicer_name)
self.assertEqual("prefix/source.file", source_path)
self.assertEqual("tmp.file", dest_path)
self.assertIsNone(profile)
self.assertIsNone(overrides)
self.assertIsNone(printer_profile_id)
self.assertIsNone(position)
self.assertIsNotNone(on_progress)
self.assertIsNotNone(on_progress_args)
self.assertTupleEqual(("some_slicer", octoprint.filemanager.FileDestinations.LOCAL, "source.file", octoprint.filemanager.FileDestinations.LOCAL, "dest.file"), on_progress_args)
self.assertIsNone(on_progress_kwargs)
if not callback_args:
callback_args = ()
done_cb(*callback_args)
self.slicing_manager.slice.side_effect = slice
##~~ execute tested method
self.file_manager.slice("some_slicer", octoprint.filemanager.FileDestinations.LOCAL, "source.file", octoprint.filemanager.FileDestinations.LOCAL, "dest.file", callback=callback, callback_args=callback_args)
# assert that events where fired
expected_events = [mock.call(octoprint.filemanager.Events.SLICING_STARTED, {"stl": "source.file", "gcode": "dest.file", "progressAvailable": False}),
mock.call(octoprint.filemanager.Events.SLICING_DONE, {"stl": "source.file", "gcode": "dest.file", "time": 15.694000005722046}),
mock.call(octoprint.filemanager.Events.FILE_ADDED, {"storage": octoprint.filemanager.FileDestinations.LOCAL,
"name": "dest.file",
"path": "dest.file",
"type": None})]
self.fire_event.call_args_list = expected_events
# assert that model links were added
expected_links = [("model", dict(name="source.file"))]
self.local_storage.add_file.assert_called_once_with("dest.file", mock.ANY,
printer_profile=expected_printer_profile,
allow_overwrite=True,
links=expected_links,
display=None)
# assert that the generated gcode was manipulated as required
expected_atomic_write_calls = [mock.call("prefix/dest.file", "wb")]
self.assertEqual(mocked_atomic_write.call_args_list, expected_atomic_write_calls)
#mocked_open.return_value.write.assert_called_once_with(";Generated from source.file aabbccddeeff\r")
# assert that shutil was asked to copy the concatenated multistream
self.assertEqual(1, len(mocked_shutil.call_args_list))
shutil_call_args = mocked_shutil.call_args_list[0]
self.assertTrue(isinstance(shutil_call_args[0][0], octoprint.filemanager.util.MultiStream))
multi_stream = shutil_call_args[0][0]
self.assertEqual(2, len(multi_stream.streams))
self.assertTrue(isinstance(multi_stream.streams[0], io.BytesIO))
# assert that the temporary file was deleted
mocked_os.assert_called_once_with("tmp.file")
# assert that our callback was called with the supplied arguments
callback.assert_called_once_with(*callback_args)
@mock.patch("os.remove")
@mock.patch("tempfile.NamedTemporaryFile")
@mock.patch("time.time", side_effect=[1411979916.422, 1411979932.116])
def test_slice_error(self, mocked_time, mocked_tempfile, mocked_os):
callback = mock.MagicMock()
callback_args = ("one", "two", "three")
# mock temporary file
temp_file = mock.MagicMock()
temp_file.name = "tmp.file"
mocked_tempfile.return_value = temp_file
# mock path_on_disk method on local storage
def path_on_disk(path):
if isinstance(path, tuple):
import os
joined_path = ""
for part in path:
joined_path = os.path.join(joined_path, part)
path = joined_path
return "prefix/" + path
self.local_storage.path_on_disk.side_effect = path_on_disk
# mock slice method on slicing manager
def slice(slicer_name, source_path, dest_path, profile, done_cb, printer_profile_id=None, position=None, callback_args=None, overrides=None, on_progress=None, on_progress_args=None, on_progress_kwargs=None):
self.assertEqual("some_slicer", slicer_name)
self.assertEqual("prefix/source.file", source_path)
self.assertEqual("tmp.file", dest_path)
self.assertIsNone(profile)
self.assertIsNone(overrides)
self.assertIsNone(printer_profile_id)
self.assertIsNone(position)
self.assertIsNotNone(on_progress)
self.assertIsNotNone(on_progress_args)
self.assertTupleEqual(("some_slicer", octoprint.filemanager.FileDestinations.LOCAL, "source.file", octoprint.filemanager.FileDestinations.LOCAL, "dest.file"), on_progress_args)
self.assertIsNone(on_progress_kwargs)
if not callback_args:
callback_args = ()
done_cb(*callback_args, _error="Something went wrong")
self.slicing_manager.slice.side_effect = slice
##~~ execute tested method
self.file_manager.slice("some_slicer", octoprint.filemanager.FileDestinations.LOCAL, "source.file", octoprint.filemanager.FileDestinations.LOCAL, "dest.file", callback=callback, callback_args=callback_args)
# assert that events where fired
expected_events = [mock.call(octoprint.filemanager.Events.SLICING_STARTED, {"stl": "source.file", "gcode": "dest.file"}),
mock.call(octoprint.filemanager.Events.SLICING_FAILED, {"stl": "source.file", "gcode": "dest.file", "reason": "Something went wrong"})]
self.fire_event.call_args_list = expected_events
# assert that the temporary file was deleted
mocked_os.assert_called_once_with("tmp.file")
# assert that time.time was only called once
self.assertEqual(mocked_time.call_count, 1)