182 lines
6.1 KiB
Python
182 lines
6.1 KiB
Python
# coding=utf-8
|
|
from __future__ import absolute_import
|
|
|
|
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
|
|
__copyright__ = "Copyright (C) 2016 The OctoPrint Project - Released under terms of the AGPLv3 License"
|
|
|
|
import unittest
|
|
import mock
|
|
|
|
import os
|
|
import time
|
|
|
|
from collections import namedtuple, OrderedDict
|
|
_stat = namedtuple("StatResult", "st_size, st_ctime, st_mtime")
|
|
_entry = namedtuple("DirEntry", "name, path, is_file, is_dir, stat")
|
|
|
|
import octoprint.settings
|
|
import octoprint.timelapse
|
|
|
|
class TimelapseTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
# mock settings
|
|
self.settings_patcher = mock.patch("octoprint.timelapse.settings")
|
|
self.settings_getter = self.settings_patcher.start()
|
|
|
|
self.settings = mock.create_autospec(octoprint.settings.Settings)
|
|
self.settings_getter.return_value = self.settings
|
|
|
|
self.now = time.time()
|
|
|
|
def cleanUp(self):
|
|
self.settings_patcher.stop()
|
|
|
|
@mock.patch("os.remove")
|
|
@mock.patch("octoprint.timelapse.scandir")
|
|
def test_delete_unrendered_timelapse(self, mock_scandir, mock_remove):
|
|
## prepare
|
|
|
|
mocked_path = "/path/to/timelapse/tmp"
|
|
mocked_files = self._generate_scandir(mocked_path, ["a-0.jpg",
|
|
"a-1.jpg",
|
|
"a-2.jpg",
|
|
"b-0.jpg",
|
|
"b-1.jpg",
|
|
"tmp_00000.jpg",
|
|
"tmp_00001.jpg"])
|
|
|
|
self.settings.getBaseFolder.return_value = mocked_path
|
|
mock_scandir.return_value = mocked_files.values()
|
|
|
|
## test
|
|
octoprint.timelapse.delete_unrendered_timelapse("b")
|
|
|
|
## verify
|
|
expected_deletions = map(lambda x: os.path.join(mocked_path, x), ["b-0.jpg",
|
|
"b-1.jpg"])
|
|
expected_deletion_calls = map(mock.call, expected_deletions)
|
|
self.assertListEqual(mock_remove.mock_calls, expected_deletion_calls)
|
|
|
|
@mock.patch("time.time")
|
|
@mock.patch("os.remove")
|
|
@mock.patch("octoprint.timelapse.scandir")
|
|
def test_delete_old_unrendered_timelapses(self, mock_scandir, mock_remove, mock_time):
|
|
## prepare
|
|
|
|
mocked_path = "/path/to/timelapse/tmp"
|
|
|
|
files = ["old-0.jpg",
|
|
"old-1.jpg",
|
|
"old-2.jpg",
|
|
"prefix-0.jpg",
|
|
"prefix-1.jpg",
|
|
"tmp_00000.jpg",
|
|
"tmp_00001.jpg"]
|
|
files = dict((f, None) for f in files)
|
|
files["old-0.jpg"] = _stat(st_size=10, st_ctime=0, st_mtime=0)
|
|
|
|
now = self.now
|
|
days = 1
|
|
|
|
self.settings.getBaseFolder.return_value = mocked_path
|
|
self.settings.getInt.return_value = days
|
|
|
|
mock_time.return_value = now
|
|
|
|
mock_scandir.return_value = self._generate_scandir(mocked_path, files).values()
|
|
|
|
## test
|
|
octoprint.timelapse.delete_old_unrendered_timelapses()
|
|
|
|
## verify
|
|
expected_deletions = map(lambda x: os.path.join(mocked_path, x), ["tmp_00000.jpg",
|
|
"tmp_00001.jpg",
|
|
"old-0.jpg",
|
|
"old-1.jpg",
|
|
"old-2.jpg"])
|
|
expected_deletion_calls = map(mock.call, expected_deletions)
|
|
self.assertListEqual(mock_remove.mock_calls, expected_deletion_calls)
|
|
|
|
@mock.patch("octoprint.timelapse.scandir")
|
|
def test_get_finished_timelapses(self, mock_listdir):
|
|
|
|
## prepare
|
|
|
|
mocked_path = "/path/to/timelapse"
|
|
|
|
files = dict()
|
|
files["one.mpg"] = _stat(st_size=1024, st_ctime=self.now, st_mtime=self.now)
|
|
files["nope.jpg"] = _stat(st_size=100, st_ctime=self.now, st_mtime=self.now)
|
|
files["two.mpg"] = _stat(st_size=2048, st_ctime=self.now, st_mtime=self.now)
|
|
|
|
self.settings.getBaseFolder.return_value = mocked_path
|
|
|
|
mock_listdir.return_value = self._generate_scandir(mocked_path, files).values()
|
|
|
|
## test
|
|
result = octoprint.timelapse.get_finished_timelapses()
|
|
|
|
## verify
|
|
self.assertEqual(len(result), 2)
|
|
self.assertEqual(result[0]["name"], "one.mpg")
|
|
self.assertEqual(result[0]["bytes"], 1024)
|
|
self.assertEqual(result[1]["name"], "two.mpg")
|
|
self.assertEqual(result[1]["bytes"], 2048)
|
|
|
|
@mock.patch("octoprint.timelapse.scandir")
|
|
def test_unrendered_timelapses(self, mock_scandir):
|
|
## prepare
|
|
files = dict()
|
|
files["one-0.jpg"] = _stat(st_size=1, st_ctime=self.now - 1, st_mtime=self.now - 1)
|
|
files["one-1.jpg"] = _stat(st_size=2, st_ctime=self.now, st_mtime=self.now)
|
|
files["one-2.jpg"] = _stat(st_size=3, st_ctime=self.now, st_mtime=self.now)
|
|
files["nope.mpg"] = _stat(st_size=2048, st_ctime=self.now, st_mtime=self.now)
|
|
files["two-0.jpg"] = _stat(st_size=4, st_ctime=self.now, st_mtime=self.now)
|
|
files["two-1.jpg"] = _stat(st_size=5, st_ctime=self.now, st_mtime=self.now)
|
|
|
|
mocked_path = "/path/to/timelapse/tmp"
|
|
self.settings.getBaseFolder.return_value = mocked_path
|
|
|
|
mock_scandir.return_value = self._generate_scandir(mocked_path, files).values()
|
|
|
|
## test
|
|
result = octoprint.timelapse.get_unrendered_timelapses()
|
|
|
|
## verify
|
|
self.assertEqual(len(result), 2)
|
|
|
|
self.assertEqual(result[0]["name"], "one")
|
|
self.assertEqual(result[0]["count"], 3)
|
|
self.assertEqual(result[0]["bytes"], 6)
|
|
|
|
self.assertEqual(result[1]["name"], "two")
|
|
self.assertEqual(result[1]["count"], 2)
|
|
self.assertEqual(result[1]["bytes"], 9)
|
|
|
|
def _generate_scandir(self, path, files):
|
|
result = OrderedDict()
|
|
|
|
def add_to_result(name, stat=None):
|
|
if stat is None:
|
|
stat = _stat(st_size=10, st_ctime=self.now, st_mtime=self.now)
|
|
|
|
result[name] = _entry(name=name,
|
|
path=os.path.join(path, name),
|
|
is_file=True,
|
|
is_dir=False,
|
|
stat=lambda: stat)
|
|
|
|
if isinstance(files, dict):
|
|
for f in sorted(files.keys()):
|
|
stat = files[f]
|
|
add_to_result(f, stat)
|
|
|
|
elif isinstance(files, (list, tuple)):
|
|
for f in files:
|
|
add_to_result(f)
|
|
|
|
else:
|
|
raise ValueError("files must be either dict or list/tuple")
|
|
|
|
return result
|