2014-10-06 15:34:07 +00:00
|
|
|
# coding=utf-8
|
|
|
|
|
from __future__ import absolute_import
|
|
|
|
|
|
|
|
|
|
__author__ = "Gina Häußge <osd@foosel.net>"
|
|
|
|
|
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
|
|
|
|
|
__copyright__ = "Copyright (C) 2014 The OctoPrint Project - Released under terms of the AGPLv3 License"
|
|
|
|
|
|
|
|
|
|
|
2015-04-17 12:45:58 +00:00
|
|
|
import io
|
2014-10-06 15:34:07 +00:00
|
|
|
import unittest
|
|
|
|
|
import mock
|
|
|
|
|
|
|
|
|
|
import octoprint.filemanager
|
2015-04-17 12:45:58 +00:00
|
|
|
import octoprint.filemanager.util
|
2014-10-06 15:34:07 +00:00
|
|
|
|
|
|
|
|
class FileManagerTest(unittest.TestCase):
|
|
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
|
import octoprint.slicing
|
|
|
|
|
import octoprint.filemanager.storage
|
2014-12-02 10:30:00 +00:00
|
|
|
import octoprint.printer.profile
|
2014-10-06 15:34:07 +00:00
|
|
|
|
|
|
|
|
self.addCleanup(self.cleanUp)
|
|
|
|
|
|
|
|
|
|
# mock event manager
|
|
|
|
|
self.event_manager_patcher = mock.patch("octoprint.filemanager.eventManager")
|
|
|
|
|
event_manager = self.event_manager_patcher.start()
|
|
|
|
|
event_manager.return_value.fire = mock.MagicMock()
|
|
|
|
|
self.fire_event = event_manager.return_value.fire
|
|
|
|
|
|
2015-03-02 11:58:24 +00:00
|
|
|
# mock plugin manager
|
|
|
|
|
self.plugin_manager_patcher = mock.patch("octoprint.plugin.plugin_manager")
|
|
|
|
|
self.plugin_manager = self.plugin_manager_patcher.start()
|
|
|
|
|
|
2014-10-06 15:34:07 +00:00
|
|
|
self.analysis_queue = mock.MagicMock(spec=octoprint.filemanager.AnalysisQueue)
|
|
|
|
|
|
|
|
|
|
self.slicing_manager = mock.MagicMock(spec=octoprint.slicing.SlicingManager)
|
|
|
|
|
|
2014-12-02 10:30:00 +00:00
|
|
|
self.printer_profile_manager = mock.MagicMock(spec=octoprint.printer.profile.PrinterProfileManager)
|
|
|
|
|
|
2014-10-06 15:34:07 +00:00
|
|
|
self.local_storage = mock.MagicMock(spec=octoprint.filemanager.storage.LocalFileStorage)
|
|
|
|
|
self.local_storage.analysis_backlog = iter([])
|
|
|
|
|
|
|
|
|
|
self.storage_managers = dict()
|
|
|
|
|
self.storage_managers[octoprint.filemanager.FileDestinations.LOCAL] = self.local_storage
|
|
|
|
|
|
2014-12-02 10:30:00 +00:00
|
|
|
self.file_manager = octoprint.filemanager.FileManager(self.analysis_queue, self.slicing_manager, self.printer_profile_manager, initial_storage_managers=self.storage_managers)
|
2014-10-06 15:34:07 +00:00
|
|
|
|
|
|
|
|
def cleanUp(self):
|
|
|
|
|
self.event_manager_patcher.stop()
|
2015-03-02 11:58:24 +00:00
|
|
|
self.plugin_manager_patcher.stop()
|
2014-10-06 15:34:07 +00:00
|
|
|
|
|
|
|
|
def test_add_file(self):
|
|
|
|
|
wrapper = object()
|
|
|
|
|
|
|
|
|
|
self.local_storage.add_file.return_value = ("", "test.file")
|
2015-03-02 11:58:24 +00:00
|
|
|
self.local_storage.path_on_disk.return_value = "prefix/test.file"
|
2014-10-06 15:34:07 +00:00
|
|
|
|
2014-12-02 10:30:00 +00:00
|
|
|
test_profile = dict(id="_default", name="My Default Profile")
|
|
|
|
|
self.printer_profile_manager.get_current_or_default.return_value = test_profile
|
|
|
|
|
|
2014-10-06 15:34:07 +00:00
|
|
|
file_path = self.file_manager.add_file(octoprint.filemanager.FileDestinations.LOCAL, "test.file", wrapper)
|
|
|
|
|
|
|
|
|
|
self.assertEquals(("", "test.file"), file_path)
|
2014-12-02 10:30:00 +00:00
|
|
|
self.local_storage.add_file.assert_called_once_with("test.file", wrapper, printer_profile=test_profile, allow_overwrite=False, links=None)
|
2014-10-06 15:34:07 +00:00
|
|
|
self.fire_event.assert_called_once_with(octoprint.filemanager.Events.UPDATED_FILES, dict(type="printables"))
|
|
|
|
|
|
|
|
|
|
def test_remove_file(self):
|
|
|
|
|
self.file_manager.remove_file(octoprint.filemanager.FileDestinations.LOCAL, "test.file")
|
|
|
|
|
|
|
|
|
|
self.local_storage.remove_file.assert_called_once_with("test.file")
|
|
|
|
|
self.fire_event.assert_called_once_with(octoprint.filemanager.Events.UPDATED_FILES, dict(type="printables"))
|
|
|
|
|
|
|
|
|
|
def test_add_folder(self):
|
|
|
|
|
self.local_storage.add_folder.return_value = ("", "test_folder")
|
|
|
|
|
|
|
|
|
|
folder_path = self.file_manager.add_folder(octoprint.filemanager.FileDestinations.LOCAL, "test_folder")
|
|
|
|
|
|
|
|
|
|
self.assertEquals(("", "test_folder"), folder_path)
|
|
|
|
|
self.local_storage.add_folder.assert_called_once_with("test_folder", ignore_existing=True)
|
|
|
|
|
self.fire_event.assert_called_once_with(octoprint.filemanager.Events.UPDATED_FILES, dict(type="printables"))
|
|
|
|
|
|
|
|
|
|
def test_add_folder_not_ignoring_existing(self):
|
|
|
|
|
self.local_storage.add_folder.side_effect = RuntimeError("already there")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
self.file_manager.add_folder(octoprint.filemanager.FileDestinations.LOCAL, "test_folder", ignore_existing=False)
|
|
|
|
|
self.fail("Expected an exception to occur!")
|
|
|
|
|
except RuntimeError as e:
|
|
|
|
|
self.assertEquals("already there", e.message)
|
|
|
|
|
self.local_storage.add_folder.assert_called_once_with("test_folder", ignore_existing=False)
|
|
|
|
|
|
|
|
|
|
def test_remove_folder(self):
|
|
|
|
|
self.file_manager.remove_folder(octoprint.filemanager.FileDestinations.LOCAL, "test_folder")
|
|
|
|
|
|
|
|
|
|
self.local_storage.remove_folder.assert_called_once_with("test_folder", recursive=True)
|
|
|
|
|
self.fire_event.assert_called_once_with(octoprint.filemanager.Events.UPDATED_FILES, dict(type="printables"))
|
|
|
|
|
|
|
|
|
|
def test_remove_folder_nonrecursive(self):
|
|
|
|
|
self.file_manager.remove_folder(octoprint.filemanager.FileDestinations.LOCAL, "test_folder", recursive=False)
|
|
|
|
|
self.local_storage.remove_folder.assert_called_once_with("test_folder", recursive=False)
|
|
|
|
|
|
|
|
|
|
def test_get_metadata(self):
|
|
|
|
|
expected = dict(key="value")
|
|
|
|
|
self.local_storage.get_metadata.return_value = expected
|
|
|
|
|
|
|
|
|
|
metadata = self.file_manager.get_metadata(octoprint.filemanager.FileDestinations.LOCAL, "test.file")
|
|
|
|
|
|
|
|
|
|
self.assertEquals(metadata, expected)
|
|
|
|
|
self.local_storage.get_metadata.assert_called_once_with("test.file")
|
|
|
|
|
|
|
|
|
|
@mock.patch("__builtin__.open", new_callable=mock.mock_open)
|
2015-04-17 12:45:58 +00:00
|
|
|
@mock.patch("io.FileIO")
|
2014-10-06 15:34:07 +00:00
|
|
|
@mock.patch("shutil.copyfileobj")
|
|
|
|
|
@mock.patch("os.remove")
|
|
|
|
|
@mock.patch("tempfile.NamedTemporaryFile")
|
|
|
|
|
@mock.patch("time.time", side_effect=[1411979916.422, 1411979932.116])
|
2015-04-17 12:45:58 +00:00
|
|
|
def test_slice(self, mocked_time, mocked_tempfile, mocked_os, mocked_shutil, mocked_fileio, mocked_open):
|
2014-10-06 15:34:07 +00:00
|
|
|
callback = mock.MagicMock()
|
|
|
|
|
callback_args = ("one", "two", "three")
|
|
|
|
|
|
|
|
|
|
# mock temporary file
|
|
|
|
|
temp_file = mock.MagicMock()
|
|
|
|
|
temp_file.name = "tmp.file"
|
|
|
|
|
mocked_tempfile.return_value = temp_file
|
|
|
|
|
|
|
|
|
|
# mock metadata on local storage
|
|
|
|
|
metadata = dict(hash="aabbccddeeff")
|
|
|
|
|
self.local_storage.get_metadata.return_value = metadata
|
|
|
|
|
|
2014-12-02 10:30:00 +00:00
|
|
|
# mock printer profile
|
|
|
|
|
expected_printer_profile = dict(id="_default", name="My Default Profile")
|
|
|
|
|
self.printer_profile_manager.get_current_or_default.return_value = expected_printer_profile
|
|
|
|
|
self.printer_profile_manager.get.return_value = None
|
|
|
|
|
|
2014-10-06 15:34:07 +00:00
|
|
|
# mock get_absolute_path method on local storage
|
2015-03-02 11:58:24 +00:00
|
|
|
def path_on_disk(path):
|
2014-10-06 15:34:07 +00:00
|
|
|
if isinstance(path, tuple):
|
|
|
|
|
import os
|
|
|
|
|
joined_path = ""
|
|
|
|
|
for part in path:
|
|
|
|
|
joined_path = os.path.join(joined_path, part)
|
|
|
|
|
path = joined_path
|
|
|
|
|
return "prefix/" + path
|
2015-03-02 11:58:24 +00:00
|
|
|
self.local_storage.path_on_disk.side_effect = path_on_disk
|
2014-10-06 15:34:07 +00:00
|
|
|
|
|
|
|
|
# mock split_path method on local storage
|
|
|
|
|
def split_path(path):
|
|
|
|
|
return "", path
|
|
|
|
|
self.local_storage.split_path.side_effect = split_path
|
|
|
|
|
|
|
|
|
|
# mock add_file method on local storage
|
2014-12-02 10:30:00 +00:00
|
|
|
def add_file(path, file_obj, printer_profile=None, links=None, allow_overwrite=False):
|
2014-10-06 15:34:07 +00:00
|
|
|
file_obj.save("prefix/" + path)
|
|
|
|
|
return "", path
|
|
|
|
|
self.local_storage.add_file.side_effect = add_file
|
|
|
|
|
|
|
|
|
|
# mock slice method on slicing manager
|
2015-03-02 11:58:24 +00:00
|
|
|
def slice(slicer_name, source_path, dest_path, profile, done_cb, printer_profile_id=None, position=None, callback_args=None, overrides=None, on_progress=None, on_progress_args=None, on_progress_kwargs=None):
|
2014-10-06 15:34:07 +00:00
|
|
|
self.assertEquals("some_slicer", slicer_name)
|
|
|
|
|
self.assertEquals("prefix/source.file", source_path)
|
|
|
|
|
self.assertEquals("tmp.file", dest_path)
|
|
|
|
|
self.assertIsNone(profile)
|
|
|
|
|
self.assertIsNone(overrides)
|
2015-03-02 11:58:24 +00:00
|
|
|
self.assertIsNone(printer_profile_id)
|
|
|
|
|
self.assertIsNone(position)
|
2014-10-21 12:14:00 +00:00
|
|
|
self.assertIsNotNone(on_progress)
|
|
|
|
|
self.assertIsNotNone(on_progress_args)
|
|
|
|
|
self.assertTupleEqual(("some_slicer", octoprint.filemanager.FileDestinations.LOCAL, "source.file", octoprint.filemanager.FileDestinations.LOCAL, "dest.file"), on_progress_args)
|
|
|
|
|
self.assertIsNone(on_progress_kwargs)
|
2014-10-06 15:34:07 +00:00
|
|
|
|
|
|
|
|
if not callback_args:
|
|
|
|
|
callback_args = ()
|
|
|
|
|
done_cb(*callback_args)
|
|
|
|
|
self.slicing_manager.slice.side_effect = slice
|
|
|
|
|
|
|
|
|
|
##~~ execute tested method
|
|
|
|
|
self.file_manager.slice("some_slicer", octoprint.filemanager.FileDestinations.LOCAL, "source.file", octoprint.filemanager.FileDestinations.LOCAL, "dest.file", callback=callback, callback_args=callback_args)
|
|
|
|
|
|
|
|
|
|
# assert that events where fired
|
2014-10-29 11:17:26 +00:00
|
|
|
expected_events = [mock.call(octoprint.filemanager.Events.SLICING_STARTED, {"stl": "source.file", "gcode": "dest.file", "progressAvailable": False}),
|
2014-10-06 15:34:07 +00:00
|
|
|
mock.call(octoprint.filemanager.Events.SLICING_DONE, {"stl": "source.file", "gcode": "dest.file", "time": 15.694000005722046})]
|
|
|
|
|
self.fire_event.call_args_list = expected_events
|
|
|
|
|
|
|
|
|
|
# assert that model links were added
|
|
|
|
|
expected_links = [("model", dict(name="source.file"))]
|
2014-12-02 10:30:00 +00:00
|
|
|
self.local_storage.add_file.assert_called_once_with("dest.file", mock.ANY, printer_profile=expected_printer_profile, allow_overwrite=True, links=expected_links)
|
2014-10-06 15:34:07 +00:00
|
|
|
|
|
|
|
|
# assert that the generated gcode was manipulated as required
|
2015-04-17 12:45:58 +00:00
|
|
|
expected_open_calls = [mock.call("prefix/dest.file", "wb")]
|
2014-10-06 15:34:07 +00:00
|
|
|
self.assertEquals(mocked_open.call_args_list, expected_open_calls)
|
2015-04-17 12:45:58 +00:00
|
|
|
#mocked_open.return_value.write.assert_called_once_with(";Generated from source.file aabbccddeeff\r")
|
|
|
|
|
|
|
|
|
|
# assert that shutil was asked to copy the concatenated multistream
|
|
|
|
|
self.assertEquals(1, len(mocked_shutil.call_args_list))
|
|
|
|
|
shutil_call_args = mocked_shutil.call_args_list[0]
|
|
|
|
|
self.assertTrue(isinstance(shutil_call_args[0][0], octoprint.filemanager.util.MultiStream))
|
|
|
|
|
multi_stream = shutil_call_args[0][0]
|
|
|
|
|
self.assertEquals(2, len(multi_stream.streams))
|
|
|
|
|
self.assertTrue(isinstance(multi_stream.streams[0], io.BytesIO))
|
2014-10-06 15:34:07 +00:00
|
|
|
|
|
|
|
|
# assert that the temporary file was deleted
|
|
|
|
|
mocked_os.assert_called_once_with("tmp.file")
|
|
|
|
|
|
|
|
|
|
# assert that our callback was called with the supplied arguments
|
|
|
|
|
callback.assert_called_once_with(*callback_args)
|
|
|
|
|
|
|
|
|
|
@mock.patch("os.remove")
|
|
|
|
|
@mock.patch("tempfile.NamedTemporaryFile")
|
|
|
|
|
@mock.patch("time.time", side_effect=[1411979916.422, 1411979932.116])
|
|
|
|
|
def test_slice_error(self, mocked_time, mocked_tempfile, mocked_os):
|
|
|
|
|
callback = mock.MagicMock()
|
|
|
|
|
callback_args = ("one", "two", "three")
|
|
|
|
|
|
|
|
|
|
# mock temporary file
|
|
|
|
|
temp_file = mock.MagicMock()
|
|
|
|
|
temp_file.name = "tmp.file"
|
|
|
|
|
mocked_tempfile.return_value = temp_file
|
|
|
|
|
|
2015-03-02 11:58:24 +00:00
|
|
|
# mock path_on_disk method on local storage
|
|
|
|
|
def path_on_disk(path):
|
2014-10-06 15:34:07 +00:00
|
|
|
if isinstance(path, tuple):
|
|
|
|
|
import os
|
|
|
|
|
joined_path = ""
|
|
|
|
|
for part in path:
|
|
|
|
|
joined_path = os.path.join(joined_path, part)
|
|
|
|
|
path = joined_path
|
|
|
|
|
return "prefix/" + path
|
2015-03-02 11:58:24 +00:00
|
|
|
self.local_storage.path_on_disk.side_effect = path_on_disk
|
2014-10-06 15:34:07 +00:00
|
|
|
|
|
|
|
|
# mock slice method on slicing manager
|
2015-03-02 11:58:24 +00:00
|
|
|
def slice(slicer_name, source_path, dest_path, profile, done_cb, printer_profile_id=None, position=None, callback_args=None, overrides=None, on_progress=None, on_progress_args=None, on_progress_kwargs=None):
|
2014-10-06 15:34:07 +00:00
|
|
|
self.assertEquals("some_slicer", slicer_name)
|
|
|
|
|
self.assertEquals("prefix/source.file", source_path)
|
|
|
|
|
self.assertEquals("tmp.file", dest_path)
|
|
|
|
|
self.assertIsNone(profile)
|
|
|
|
|
self.assertIsNone(overrides)
|
2015-03-02 11:58:24 +00:00
|
|
|
self.assertIsNone(printer_profile_id)
|
|
|
|
|
self.assertIsNone(position)
|
2014-10-21 12:14:00 +00:00
|
|
|
self.assertIsNotNone(on_progress)
|
|
|
|
|
self.assertIsNotNone(on_progress_args)
|
|
|
|
|
self.assertTupleEqual(("some_slicer", octoprint.filemanager.FileDestinations.LOCAL, "source.file", octoprint.filemanager.FileDestinations.LOCAL, "dest.file"), on_progress_args)
|
|
|
|
|
self.assertIsNone(on_progress_kwargs)
|
2014-10-06 15:34:07 +00:00
|
|
|
|
|
|
|
|
if not callback_args:
|
|
|
|
|
callback_args = ()
|
|
|
|
|
done_cb(*callback_args, _error="Something went wrong")
|
|
|
|
|
self.slicing_manager.slice.side_effect = slice
|
|
|
|
|
|
|
|
|
|
##~~ execute tested method
|
|
|
|
|
self.file_manager.slice("some_slicer", octoprint.filemanager.FileDestinations.LOCAL, "source.file", octoprint.filemanager.FileDestinations.LOCAL, "dest.file", callback=callback, callback_args=callback_args)
|
|
|
|
|
|
|
|
|
|
# assert that events where fired
|
|
|
|
|
expected_events = [mock.call(octoprint.filemanager.Events.SLICING_STARTED, {"stl": "source.file", "gcode": "dest.file"}),
|
|
|
|
|
mock.call(octoprint.filemanager.Events.SLICING_FAILED, {"stl": "source.file", "gcode": "dest.file", "reason": "Something went wrong"})]
|
|
|
|
|
self.fire_event.call_args_list = expected_events
|
|
|
|
|
|
|
|
|
|
# assert that the temporary file was deleted
|
|
|
|
|
mocked_os.assert_called_once_with("tmp.file")
|
|
|
|
|
|
|
|
|
|
# assert that time.time was only called once
|
|
|
|
|
mocked_time.assert_called_once()
|