MrDraw/tests/util/test_file_helpers.py
Gina Häußge abfcc6e5aa Fix deletion of unrendered timelapses
If the filename contained a [] pair, the file would not match the glob
pattern used for selecting for deletion.

Backporting the glob.escape function from Python 3.4 and using it here
should fix that. We only use our own ported version if glob.escape
doesn't exist - that should reduce redundant code once we become Python
3 compatible.
2017-10-10 12:41:47 +02:00

402 lines
13 KiB
Python

# coding=utf-8
from __future__ import absolute_import, division, print_function
__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html'
__copyright__ = "Copyright (C) 2015 The OctoPrint Project - Released under terms of the AGPLv3 License"
import unittest
import mock
import os
import ddt
import sys
import octoprint.util
class BomAwareOpenTest(unittest.TestCase):
"""
Tests for :func:`octoprint.util.bom_aware_open`.
"""
def setUp(self):
self.filename_utf8_with_bom = os.path.join(os.path.abspath(os.path.dirname(__file__)), "_files", "utf8_with_bom.txt")
self.filename_utf8_without_bom = os.path.join(os.path.abspath(os.path.dirname(__file__)), "_files", "utf8_without_bom.txt")
def test_bom_aware_open_with_bom(self):
"""Tests that the contents of a UTF8 file with BOM are loaded correctly (without the BOM)."""
# test
with octoprint.util.bom_aware_open(self.filename_utf8_with_bom, encoding="utf-8") as f:
contents = f.readlines()
# assert
self.assertEqual(len(contents), 3)
self.assertTrue(contents[0].startswith("#"))
def test_bom_aware_open_without_bom(self):
"""Tests that the contents of a UTF8 file without BOM are loaded correctly."""
# test
with octoprint.util.bom_aware_open(self.filename_utf8_without_bom, encoding="utf-8") as f:
contents = f.readlines()
# assert
self.assertEqual(len(contents), 3)
self.assertTrue(contents[0].startswith("#"))
def test_bom_aware_open_ascii(self):
"""Tests that the contents of a UTF8 file loaded as ASCII are replaced correctly if "replace" is specified on errors."""
# test
with octoprint.util.bom_aware_open(self.filename_utf8_with_bom, errors="replace") as f:
contents = f.readlines()
# assert
self.assertEqual(len(contents), 3)
self.assertTrue(contents[0].startswith(u"\ufffd" * 3 + "#"))
self.assertTrue(contents[2].endswith(u"\ufffd\ufffd" * 6))
def test_bom_aware_open_encoding_error(self):
"""Tests that an encoding error is thrown if not suppressed when opening a UTF8 file as ASCII."""
try:
with octoprint.util.bom_aware_open(self.filename_utf8_without_bom) as f:
f.readlines()
self.fail("Expected an exception")
except UnicodeDecodeError:
pass
def test_bom_aware_open_parameters(self):
"""Tests that the parameters are propagated properly."""
with mock.patch("codecs.open") as mock_open:
with octoprint.util.bom_aware_open(self.filename_utf8_without_bom, mode="rb", encoding="utf-8", errors="ignore") as f:
f.readlines()
mock_open.assert_called_once_with(self.filename_utf8_without_bom, encoding="utf-8", mode="rb", errors="ignore")
class TestAtomicWrite(unittest.TestCase):
"""
Tests for :func:`octoprint.util.atomic_write`.
"""
def setUp(self):
pass
@mock.patch("shutil.move")
@mock.patch("tempfile.NamedTemporaryFile")
@mock.patch("os.chmod")
@mock.patch("os.path.exists")
def test_atomic_write(self, mock_exists, mock_chmod, mock_tempfile, mock_move):
"""Tests the regular basic "good" case."""
# setup
mock_file = mock.MagicMock()
mock_file.name = "tempfile.tmp"
mock_tempfile.return_value = mock_file
mock_exists.return_value = False
# test
with octoprint.util.atomic_write("somefile.yaml") as f:
f.write("test")
# assert
mock_tempfile.assert_called_once_with(mode="w+b", prefix="tmp", suffix="", delete=False)
mock_file.write.assert_called_once_with("test")
mock_file.close.assert_called_once_with()
mock_chmod.assert_called_once_with("tempfile.tmp", 0o644)
mock_move.assert_called_once_with("tempfile.tmp", "somefile.yaml")
@mock.patch("shutil.move")
@mock.patch("tempfile.NamedTemporaryFile")
@mock.patch("os.chmod")
@mock.patch("os.path.exists")
def test_atomic_write_error_on_write(self, mock_exists, mock_chmod, mock_tempfile, mock_move):
"""Tests the error case where something in the wrapped code fails."""
# setup
mock_file = mock.MagicMock()
mock_file.name = "tempfile.tmp"
mock_file.write.side_effect = RuntimeError()
mock_tempfile.return_value = mock_file
mock_exists.return_value = False
# test
try:
with octoprint.util.atomic_write("somefile.yaml") as f:
f.write("test")
self.fail("Expected an exception")
except RuntimeError:
pass
# assert
mock_tempfile.assert_called_once_with(mode="w+b", prefix="tmp", suffix="", delete=False)
mock_file.close.assert_called_once_with()
self.assertFalse(mock_move.called)
self.assertFalse(mock_chmod.called)
@mock.patch("shutil.move")
@mock.patch("tempfile.NamedTemporaryFile")
@mock.patch("os.chmod")
@mock.patch("os.path.exists")
def test_atomic_write_error_on_move(self, mock_exists, mock_chmod, mock_tempfile, mock_move):
"""Tests the error case where the final move fails."""
# setup
mock_file = mock.MagicMock()
mock_file.name = "tempfile.tmp"
mock_tempfile.return_value = mock_file
mock_move.side_effect = RuntimeError()
mock_exists.return_value = False
# test
try:
with octoprint.util.atomic_write("somefile.yaml") as f:
f.write("test")
self.fail("Expected an exception")
except RuntimeError:
pass
# assert
mock_tempfile.assert_called_once_with(mode="w+b", prefix="tmp", suffix="", delete=False)
mock_file.close.assert_called_once_with()
self.assertTrue(mock_move.called)
self.assertTrue(mock_chmod.called)
@mock.patch("shutil.move")
@mock.patch("tempfile.NamedTemporaryFile")
@mock.patch("os.chmod")
@mock.patch("os.path.exists")
def test_atomic_write_parameters(self, mock_exists, mock_chmod, mock_tempfile, mock_move):
"""Tests that the open parameters are propagated properly."""
# setup
mock_file = mock.MagicMock()
mock_file.name = "tempfile.tmp"
mock_tempfile.return_value = mock_file
mock_exists.return_value = False
# test
with octoprint.util.atomic_write("somefile.yaml", mode="w", prefix="foo", suffix="bar") as f:
f.write("test")
# assert
mock_tempfile.assert_called_once_with(mode="w", prefix="foo", suffix="bar", delete=False)
mock_file.close.assert_called_once_with()
mock_chmod.assert_called_once_with("tempfile.tmp", 0o644)
mock_move.assert_called_once_with("tempfile.tmp", "somefile.yaml")
@mock.patch("shutil.move")
@mock.patch("tempfile.NamedTemporaryFile")
@mock.patch("os.chmod")
@mock.patch("os.path.exists")
def test_atomic_custom_permissions(self, mock_exists, mock_chmod, mock_tempfile, mock_move):
"""Tests that custom permissions may be set."""
# setup
mock_file = mock.MagicMock()
mock_file.name = "tempfile.tmp"
mock_tempfile.return_value = mock_file
mock_exists.return_value = False
# test
with octoprint.util.atomic_write("somefile.yaml", permissions=0o755) as f:
f.write("test")
# assert
mock_tempfile.assert_called_once_with(mode="w+b", prefix="tmp", suffix="", delete=False)
mock_file.close.assert_called_once_with()
mock_chmod.assert_called_once_with("tempfile.tmp", 0o755)
mock_move.assert_called_once_with("tempfile.tmp", "somefile.yaml")
@mock.patch("shutil.move")
@mock.patch("tempfile.NamedTemporaryFile")
@mock.patch("os.chmod")
@mock.patch("os.path.exists")
@mock.patch("os.stat")
def test_atomic_permissions_combined(self, mock_stat, mock_exists, mock_chmod, mock_tempfile, mock_move):
"""Tests that the permissions of an existing file are combined with the requested permissions."""
# setup
mock_file = mock.MagicMock()
mock_file.name = "tempfile.tmp"
mock_tempfile.return_value = mock_file
mock_exists.return_value = True
mock_stat_result = mock.MagicMock()
mock_stat_result.st_mode = 0o666
mock_stat.return_value = mock_stat_result
# test
with octoprint.util.atomic_write("somefile.yaml", permissions=0o755) as f:
f.write("test")
# assert
mock_tempfile.assert_called_once_with(mode="w+b", prefix="tmp", suffix="", delete=False)
mock_file.close.assert_called_once_with()
mock_chmod.assert_called_once_with("tempfile.tmp", 0o777) # 0o755 | 0o666
mock_move.assert_called_once_with("tempfile.tmp", "somefile.yaml")
@mock.patch("shutil.move")
@mock.patch("tempfile.NamedTemporaryFile")
@mock.patch("os.chmod")
@mock.patch("os.path.exists")
@mock.patch("os.stat")
def test_atomic_permissions_limited(self, mock_stat, mock_exists, mock_chmod, mock_tempfile, mock_move):
"""Tests that max_permissions limit the combined file permissions."""
# setup
mock_file = mock.MagicMock()
mock_file.name = "tempfile.tmp"
mock_tempfile.return_value = mock_file
mock_exists.return_value = True
mock_stat_result = mock.MagicMock()
mock_stat_result.st_mode = 0o755
mock_stat.return_value = mock_stat_result
# test
with octoprint.util.atomic_write("somefile.yaml", permissions=0o600, max_permissions=0o666) as f:
f.write("test")
# assert
mock_tempfile.assert_called_once_with(mode="w+b", prefix="tmp", suffix="", delete=False)
mock_file.close.assert_called_once_with()
mock_chmod.assert_called_once_with("tempfile.tmp", 0o644) # (0o600 | 0o755) & 0o666 = 0o755 & 0o666
mock_move.assert_called_once_with("tempfile.tmp", "somefile.yaml")
class TempDirTest(unittest.TestCase):
@mock.patch("shutil.rmtree")
@mock.patch("tempfile.mkdtemp")
def test_tempdir(self, mock_mkdtemp, mock_rmtree):
"""Tests regular "good" case."""
# setup
path = "/path/to/tmpdir"
mock_mkdtemp.return_value = path
# test
with octoprint.util.tempdir() as td:
self.assertEqual(td, path)
# assert
mock_mkdtemp.assert_called_once_with()
mock_rmtree.assert_called_once_with(path, ignore_errors=False, onerror=None)
@mock.patch("shutil.rmtree")
@mock.patch("tempfile.mkdtemp")
def test_tempdir_parameters_mkdtemp(self, mock_mkdtemp, mock_rmtree):
"""Tests that parameters for mkdtemp are properly propagated."""
# setup
path = "/path/to/tmpdir"
mock_mkdtemp.return_value = path
# test
with octoprint.util.tempdir(prefix="prefix", suffix="suffix", dir="dir") as td:
self.assertEqual(td, path)
# assert
mock_mkdtemp.assert_called_once_with(prefix="prefix", suffix="suffix", dir="dir")
mock_rmtree.assert_called_once_with(path, ignore_errors=False, onerror=None)
@mock.patch("shutil.rmtree")
@mock.patch("tempfile.mkdtemp")
def test_tempdir_parameters_rmtree(self, mock_mkdtemp, mock_rmtree):
"""Tests that parameters for rmtree are properly propagated."""
# setup
path = "/path/to/tmpdir"
mock_mkdtemp.return_value = path
onerror = mock.MagicMock()
# test
with octoprint.util.tempdir(ignore_errors=True, onerror=onerror) as td:
self.assertEqual(td, path)
# assert
mock_mkdtemp.assert_called_once_with()
mock_rmtree.assert_called_once_with(path, ignore_errors=True, onerror=onerror)
@ddt.ddt
class IsHiddenPathTest(unittest.TestCase):
def setUp(self):
import tempfile
self.basepath = tempfile.mkdtemp()
self.path_always_visible = os.path.join(self.basepath, "always_visible.txt")
self.path_hidden_on_windows = os.path.join(self.basepath, "hidden_on_windows.txt")
self.path_always_hidden = os.path.join(self.basepath, ".always_hidden.txt")
for attr in ("path_always_visible", "path_hidden_on_windows", "path_always_hidden"):
path = getattr(self, attr)
with open(path, "w+b") as f:
f.write(attr)
import sys
if sys.platform == "win32":
# we use ctypes and the windows API to set the hidden attribute on the file
# only hidden on windows
import ctypes
ctypes.windll.kernel32.SetFileAttributesW(unicode(self.path_hidden_on_windows), 2)
def tearDown(self):
import shutil
shutil.rmtree(self.basepath)
@ddt.data(
(None, False),
("path_always_visible", False),
("path_always_hidden", True),
("path_hidden_on_windows", sys.platform == "win32")
)
@ddt.unpack
def test_is_hidden_path(self, path_id, expected):
path = getattr(self, path_id) if path_id is not None else None
self.assertEqual(octoprint.util.is_hidden_path(path), expected)
try:
from glob import escape
except ImportError:
# no glob.escape - tests for our ported implementation
@ddt.ddt
class GlobEscapeTest(unittest.TestCase):
"""
Ported from Python 3.4
See https://github.com/python/cpython/commit/fd32fffa5ada8b8be8a65bd51b001d989f99a3d3
"""
@ddt.data(
("abc", "abc"),
("[", "[[]"),
("?", "[?]"),
("*", "[*]"),
("[[_/*?*/_]]", "[[][[]_/[*][?][*]/_]]"),
("/[[_/*?*/_]]/", "/[[][[]_/[*][?][*]/_]]/")
)
@ddt.unpack
def test_glob_escape(self, text, expected):
actual = octoprint.util.glob_escape(text)
self.assertEqual(actual, expected)
@ddt.data(
("?:?", "?:[?]"),
("*:*", "*:[*]"),
(r"\\?\c:\?", r"\\?\c:\[?]"),
(r"\\*\*\*", r"\\*\*\[*]"),
("//?/c:/?", "//?/c:/[?]"),
("//*/*/*", "//*/*/[*]")
)
@ddt.unpack
@unittest.skipUnless(sys.platform == "win32", "Win32 specific test")
def test_glob_escape_windows(self, text, expected):
actual = octoprint.util.glob_escape(text)
self.assertEqual(actual, expected)