From 53c0ad08898ca70698d7c5110395b7b35942eb37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gina=20H=C3=A4u=C3=9Fge?= Date: Fri, 16 Sep 2016 18:35:30 +0200 Subject: [PATCH] Fixed unit tests for octoprint.timelapse --- src/octoprint/timelapse.py | 2 +- tests/timelapse/test_timelapse_helpers.py | 112 ++++++++++++---------- 2 files changed, 62 insertions(+), 52 deletions(-) diff --git a/src/octoprint/timelapse.py b/src/octoprint/timelapse.py index 42777224..1133fcdb 100644 --- a/src/octoprint/timelapse.py +++ b/src/octoprint/timelapse.py @@ -192,7 +192,7 @@ def delete_old_unrendered_timelapses(): if prefix in prefixes_to_clean: continue - if os.path.getmtime(entry.path) < cutoff: + if entry.stat().st_mtime < cutoff: prefixes_to_clean.append(prefix) except: if logging.getLogger(__name__).isEnabledFor(logging.DEBUG): diff --git a/tests/timelapse/test_timelapse_helpers.py b/tests/timelapse/test_timelapse_helpers.py index 5f3c108a..26f05c30 100644 --- a/tests/timelapse/test_timelapse_helpers.py +++ b/tests/timelapse/test_timelapse_helpers.py @@ -10,8 +10,9 @@ import mock import os import time -from collections import namedtuple +from collections import namedtuple, OrderedDict _stat = namedtuple("StatResult", "st_size, st_ctime, st_mtime") +_entry = namedtuple("DirEntry", "name, path, is_file, is_dir, stat") import octoprint.settings import octoprint.timelapse @@ -32,21 +33,21 @@ class TimelapseTest(unittest.TestCase): self.settings_patcher.stop() @mock.patch("os.remove") - @mock.patch("os.listdir") - def test_delete_unrendered_timelapse(self, mock_listdir, mock_remove): + @mock.patch("octoprint.timelapse.scandir") + def test_delete_unrendered_timelapse(self, mock_scandir, mock_remove): ## prepare mocked_path = "/path/to/timelapse/tmp" - mocked_files = ["a-0.jpg", - "a-1.jpg", - "a-2.jpg", - "b-0.jpg", - "b-1.jpg", - "tmp_00000.jpg", - "tmp_00001.jpg"] + mocked_files = self._generate_scandir(mocked_path, ["a-0.jpg", + "a-1.jpg", + "a-2.jpg", + "b-0.jpg", + "b-1.jpg", + "tmp_00000.jpg", + "tmp_00001.jpg"]) self.settings.getBaseFolder.return_value = mocked_path - mock_listdir.return_value = mocked_files + mock_scandir.return_value = mocked_files.values() ## test octoprint.timelapse.delete_unrendered_timelapse("b") @@ -59,38 +60,31 @@ class TimelapseTest(unittest.TestCase): @mock.patch("time.time") @mock.patch("os.remove") - @mock.patch("os.path.getmtime") - @mock.patch("os.listdir") - def test_delete_old_unrendered_timelapses(self, mock_listdir, mock_mtime, mock_remove, mock_time): + @mock.patch("octoprint.timelapse.scandir") + def test_delete_old_unrendered_timelapses(self, mock_scandir, mock_remove, mock_time): ## prepare mocked_path = "/path/to/timelapse/tmp" - mocked_files = ["old-0.jpg", - "old-1.jpg", - "old-2.jpg", - "prefix-0.jpg", - "prefix-1.jpg", - "tmp_00000.jpg", - "tmp_00001.jpg"] + + files = ["old-0.jpg", + "old-1.jpg", + "old-2.jpg", + "prefix-0.jpg", + "prefix-1.jpg", + "tmp_00000.jpg", + "tmp_00001.jpg"] + files = dict((f, None) for f in files) + files["old-0.jpg"] = _stat(st_size=10, st_ctime=0, st_mtime=0) + now = self.now days = 1 - def mtime(p): - if p.startswith(os.path.join(mocked_path, "old-0")): - # old-0 is definitely older than cutoff - return 0 - else: - # all other files were just created - return now - self.settings.getBaseFolder.return_value = mocked_path self.settings.getInt.return_value = days mock_time.return_value = now - mock_listdir.return_value = mocked_files - - mock_mtime.side_effect = mtime + mock_scandir.return_value = self._generate_scandir(mocked_path, files).values() ## test octoprint.timelapse.delete_old_unrendered_timelapses() @@ -104,26 +98,21 @@ class TimelapseTest(unittest.TestCase): expected_deletion_calls = map(mock.call, expected_deletions) self.assertListEqual(mock_remove.mock_calls, expected_deletion_calls) - @mock.patch("os.stat") - @mock.patch("os.listdir") - def test_get_finished_timelapses(self, mock_listdir, mock_stat): + @mock.patch("octoprint.timelapse.scandir") + def test_get_finished_timelapses(self, mock_listdir): ## prepare + mocked_path = "/path/to/timelapse" + files = dict() files["one.mpg"] = _stat(st_size=1024, st_ctime=self.now, st_mtime=self.now) files["nope.jpg"] = _stat(st_size=100, st_ctime=self.now, st_mtime=self.now) files["two.mpg"] = _stat(st_size=2048, st_ctime=self.now, st_mtime=self.now) - mocked_path = "/path/to/timelapse" self.settings.getBaseFolder.return_value = mocked_path - mock_listdir.return_value = sorted(files.keys()) - - def stat(p): - name = p[len(mocked_path) + 1:] - return files[name] - mock_stat.side_effect = stat + mock_listdir.return_value = self._generate_scandir(mocked_path, files).values() ## test result = octoprint.timelapse.get_finished_timelapses() @@ -135,9 +124,8 @@ class TimelapseTest(unittest.TestCase): self.assertEqual(result[1]["name"], "two.mpg") self.assertEqual(result[1]["bytes"], 2048) - @mock.patch("os.stat") - @mock.patch("os.listdir") - def test_unrendered_timelapses(self, mock_listdir, mock_stat): + @mock.patch("octoprint.timelapse.scandir") + def test_unrendered_timelapses(self, mock_scandir): ## prepare files = dict() files["one-0.jpg"] = _stat(st_size=1, st_ctime=self.now - 1, st_mtime=self.now - 1) @@ -150,12 +138,7 @@ class TimelapseTest(unittest.TestCase): mocked_path = "/path/to/timelapse/tmp" self.settings.getBaseFolder.return_value = mocked_path - mock_listdir.return_value = sorted(files.keys()) - - def stat(p): - name = p[len(mocked_path) + 1:] - return files[name] - mock_stat.side_effect = stat + mock_scandir.return_value = self._generate_scandir(mocked_path, files).values() ## test result = octoprint.timelapse.get_unrendered_timelapses() @@ -170,3 +153,30 @@ class TimelapseTest(unittest.TestCase): self.assertEqual(result[1]["name"], "two") self.assertEqual(result[1]["count"], 2) self.assertEqual(result[1]["bytes"], 9) + + def _generate_scandir(self, path, files): + result = OrderedDict() + + def add_to_result(name, stat=None): + if stat is None: + stat = _stat(st_size=10, st_ctime=self.now, st_mtime=self.now) + + result[name] = _entry(name=name, + path=os.path.join(path, name), + is_file=True, + is_dir=False, + stat=lambda: stat) + + if isinstance(files, dict): + for f in sorted(files.keys()): + stat = files[f] + add_to_result(f, stat) + + elif isinstance(files, (list, tuple)): + for f in files: + add_to_result(f) + + else: + raise ValueError("files must be either dict or list/tuple") + + return result