Refactoring of localstorage and added tests

* improved internal metadata handling
  * less code duplication
  * individual locks per metadata file
  * unit tests for copy and move operations
This commit is contained in:
Gina Häußge 2015-11-18 10:47:49 +01:00
parent 962dc490aa
commit 306fd164d7
2 changed files with 376 additions and 174 deletions

View file

@ -9,9 +9,12 @@ __copyright__ = "Copyright (C) 2014 The OctoPrint Project - Released under terms
import logging
import os
import pylru
import tempfile
import shutil
from octoprint.util import atomic_write
from contextlib import contextmanager
from copy import deepcopy
import octoprint.filemanager
class StorageInterface(object):
@ -331,6 +334,25 @@ class StorageInterface(object):
raise NotImplementedError()
class StorageError(BaseException):
UNKNOWN = "unknown"
INVALID_DIRECTORY = "invalid_directory"
INVALID_FILE = "invalid_file"
INVALID_SOURCE = "invalid_source"
INVALID_DESTINATION = "invalid_destination"
DOES_NOT_EXIST = "does_not_exist"
ALREADY_EXISTS = "already_exists"
NOT_EMPTY = "not_empty"
def __init__(self, message, code=None, cause=None):
BaseException.__init__(self)
self.message = message
self.cause = cause
if code is None:
code = StorageError.UNKNOWN
self.code = code
class LocalFileStorage(StorageInterface):
"""
@ -356,10 +378,11 @@ class LocalFileStorage(StorageInterface):
if not os.path.exists(self.basefolder) and create:
os.makedirs(self.basefolder)
if not os.path.exists(self.basefolder) or not os.path.isdir(self.basefolder):
raise RuntimeError("{basefolder} is not a valid directory".format(**locals()))
raise StorageError("{basefolder} is not a valid directory".format(**locals()), code=StorageError.INVALID_DIRECTORY)
import threading
self._metadata_lock = threading.Lock()
self._metadata_lock_mutex = threading.RLock()
self._metadata_locks = dict()
self._metadata_cache = pylru.lrucache(10)
@ -457,7 +480,7 @@ class LocalFileStorage(StorageInterface):
folder_path = os.path.join(path, name)
if os.path.exists(folder_path):
if not ignore_existing:
raise RuntimeError("{sanitized_foldername} does already exist in {virtual_path}".format(**locals()))
raise StorageError("{sanitized_foldername} does already exist in {virtual_path}".format(**locals()), code=StorageError.ALREADY_EXISTS)
else:
os.mkdir(folder_path)
@ -474,89 +497,69 @@ class LocalFileStorage(StorageInterface):
if ".metadata.yaml" in contents:
contents.remove(".metadata.yaml")
if contents and not recursive:
raise RuntimeError("{sanitized_foldername} in {virtual_path} is not empty".format(**locals()))
raise StorageError("{sanitized_foldername} in {virtual_path} is not empty".format(**locals()), code=StorageError.NOT_EMPTY)
import shutil
shutil.rmtree(folder_path)
def _copyMove(self, source, destination):
sourcepath, sourcename = self.sanitize(source)
destinationpath, destinationname = self.sanitize(destination)
self._delete_metadata(folder_path)
source_path = os.path.join(sourcepath, sourcename)
if not os.path.exists(source_path):
raise RuntimeError("{sourcename} in {sourcepath} does not exist".format(**locals()))
def _get_source_destination_data(self, source, destination):
"""Prepares data dicts about source and destination for copy/move."""
source_path, source_name = self.sanitize(source)
destination_path, destination_name = self.sanitize(destination)
destination_path = os.path.join(destinationpath, destinationname)
if os.path.exists(destination_path):
raise RuntimeError("{destinationname} does already exist in {destinationpath}".format(**locals()))
source_fullpath = os.path.join(source_path, source_name)
destination_fullpath = os.path.join(destination_path, destination_name)
sourceObj = dict(
path=sourcepath,
name=sourcename,
fullpath=source_path,
if not os.path.exists(source_fullpath):
raise StorageError("{} in {} does not exist".format(source_name, source_path), code=StorageError.INVALID_SOURCE)
if not os.path.isdir(destination_path):
raise StorageError("Destination path {} does not exist or is not a folder".format(destination_path), code=StorageError.INVALID_DESTINATION)
if os.path.exists(destination_fullpath):
raise StorageError("{} does already exist in {}".format(destination_name, destination_path), code=StorageError.INVALID_DESTINATION)
source_data = dict(
path=source_path,
name=source_name,
fullpath=source_fullpath,
)
destinationObj = dict(
path=destinationpath,
name=destinationname,
fullpath=destination_path,
destination_data = dict(
path=destination_path,
name=destination_name,
fullpath=destination_fullpath,
)
return sourceObj, destinationObj
def _copyMoveWithMetadata(self, source, destination):
sourceObj, destinationObj = self._copyMove(source, destination)
if not os.path.isfile(sourceObj["fullpath"]):
raise RuntimeError("%s in %s is not a file" % (sourceObj["name"], sourceObj["path"]))
sourcemetadata = self._get_metadata(sourceObj["path"])
if not sourcemetadata:
sourcemetadata = dict()
destinationmetadata = self._get_metadata(destinationObj["path"])
if not destinationmetadata:
destinationmetadata = dict()
sourceObj.update(dict(metadata=sourcemetadata))
destinationObj.update(dict(metadata=destinationmetadata))
return sourceObj, destinationObj
return source_data, destination_data
def copy_folder(self, source, destination):
sourceObj, destinationObj = self._copyMove(source, destination)
if not os.path.isdir(sourceObj["fullpath"]):
raise RuntimeError("%s in %s is not a folder" % (sourceObj["name"], sourceObj["path"]))
source_data, destination_data = self._get_source_destination_data(source, destination)
try:
shutil.copytree(sourceObj["fullpath"], destinationObj["fullpath"])
shutil.copytree(source_data["fullpath"], destination_data["fullpath"])
except Exception as e:
raise RuntimeError("Could not copy %s in %s to %s in %s" % (sourceObj["name"], sourceObj["path"], destinationObj["name"], destinationObj["path"]), e)
raise StorageError("Could not copy %s in %s to %s in %s" % (source_data["name"], source_data["path"], destination_data["name"], destination_data["path"]), cause=e)
def move_folder(self, source, destination):
sourceObj, destinationObj = self._copyMove(source, destination)
if not os.path.isdir(sourceObj["fullpath"]):
raise RuntimeError("%s in %s is not a folder" % (sourceObj["name"], sourceObj["path"]))
source_data, destination_data = self._get_source_destination_data(source, destination)
try:
shutil.move(sourceObj["fullpath"], destinationObj["fullpath"])
shutil.move(source_data["fullpath"], destination_data["fullpath"])
except Exception as e:
raise RuntimeError("Could not move %s in %s to %s in %s" % (sourceObj["name"], sourceObj["path"], destinationObj["name"], destinationObj["path"]), e)
raise StorageError("Could not move %s in %s to %s in %s" % (source_data["name"], source_data["path"], destination_data["name"], destination_data["path"]), cause=e)
self._delete_metadata(source_data["fullpath"])
def add_file(self, path, file_object, printer_profile=None, links=None, allow_overwrite=False):
path, name = self.sanitize(path)
if not octoprint.filemanager.valid_file_type(name):
raise RuntimeError("{name} is an unrecognized file type".format(**locals()))
metadata = self._get_metadata(path)
if not metadata:
metadata = dict()
raise StorageError("{name} is an unrecognized file type".format(**locals()), code=StorageError.INVALID_FILE)
file_path = os.path.join(path, name)
if os.path.exists(file_path) and not os.path.isfile(file_path):
raise RuntimeError("{name} does already exist in {path} and is not a file".format(**locals()))
raise StorageError("{name} does already exist in {path} and is not a file".format(**locals()), code=StorageError.ALREADY_EXISTS)
if os.path.exists(file_path) and not allow_overwrite:
raise RuntimeError("{name} does already exist in {path} and overwriting is prohibited".format(**locals()))
raise StorageError("{name} does already exist in {path} and overwriting is prohibited".format(**locals()), code=StorageError.ALREADY_EXISTS)
# make sure folders exist
if not os.path.exists(path):
@ -567,13 +570,10 @@ class LocalFileStorage(StorageInterface):
# save the file's hash to the metadata of the folder
file_hash = self._create_hash(file_path)
if not name in metadata or not "hash" in metadata[name] or metadata[name]["hash"] != file_hash:
# make sure to create a new metadata entry if we've never seen that file with that content before
file_metadata = dict(
hash=file_hash
)
metadata[name] = file_metadata
self._save_metadata(path, metadata)
metadata = self._get_metadata_entry(path, name, default=dict())
if not "hash" in metadata or metadata["hash"] != file_hash:
metadata["hash"] = file_hash
self._update_metadata_entry(path, name, metadata)
# process any links that were also provided for adding to the file
if not links:
@ -592,76 +592,50 @@ class LocalFileStorage(StorageInterface):
def remove_file(self, path):
path, name = self.sanitize(path)
metadata = self._get_metadata(path)
file_path = os.path.join(path, name)
if not os.path.exists(file_path):
return
if not os.path.isfile(file_path):
raise RuntimeError("{name} in {path} is not a file".format(**locals()))
raise StorageError("{name} in {path} is not a file".format(**locals()), code=StorageError.INVALID_FILE)
try:
os.remove(file_path)
except Exception as e:
raise RuntimeError("Could not delete {name} in {path}".format(**locals()), e)
raise StorageError("Could not delete {name} in {path}".format(**locals()), cause=e)
if name in metadata:
if "hash" in metadata[name]:
hash = metadata[name]["hash"]
for m in metadata.values():
if not "links" in m:
continue
for link in m["links"]:
if "rel" in link and "hash" in link and (
link["rel"] == "model" or link["rel"] == "machinecode") and link["hash"] == hash:
m["links"].remove(link)
del metadata[name]
self._save_metadata(path, metadata)
self._remove_metadata_entry(path, name)
def copy_file(self, source, destination):
sourceObj, destinationObj = self._copyMoveWithMetadata(source, destination)
source_data, destination_data = self._get_source_destination_data(source, destination)
try:
shutil.copy2(sourceObj["fullpath"], destinationObj["fullpath"])
shutil.copy2(source_data["fullpath"], destination_data["fullpath"])
except Exception as e:
raise RuntimeError("Could not copy %s in %s to %s in %s" % (sourceObj["name"], sourceObj["path"], destinationObj["name"], destinationObj["path"]), e)
raise StorageError("Could not copy %s in %s to %s in %s" % (source_data["name"], source_data["path"], destination_data["name"], destination_data["path"]), cause=e)
if sourceObj["name"] in sourceObj["metadata"]:
destinationObj["metadata"][destinationObj["name"]] = sourceObj["metadata"][sourceObj["name"]]
destinationObj["metadata"][destinationObj["name"]]["hash"] = self._create_hash(destinationObj["fullpath"])
self._save_metadata(destinationObj["path"], destinationObj["metadata"])
self._copy_metadata_entry(source_data["path"], source_data["name"],
destination_data["path"], destination_data["name"])
def move_file(self, source, destination, allow_overwrite=False):
sourceObj, destinationObj = self._copyMoveWithMetadata(source, destination)
source_data, destination_data = self._get_source_destination_data(source, destination)
try:
shutil.move(sourceObj["fullpath"], destinationObj["fullpath"])
shutil.move(source_data["fullpath"], destination_data["fullpath"])
except Exception as e:
raise RuntimeError("Could not move %s in %s to %s in %s" % (sourceObj["name"], sourceObj["path"], destinationObj["name"], destinationObj["path"]), e)
raise StorageError("Could not move %s in %s to %s in %s" % (source_data["name"], source_data["path"], destination_data["name"], destination_data["path"]), cause=e)
if sourceObj["name"] in sourceObj["metadata"]:
metadata = sourceObj["metadata"][sourceObj["name"]]
del sourceObj["metadata"][sourceObj["name"]]
self._save_metadata(sourceObj["path"], sourceObj["metadata"])
destinationObj["metadata"][destinationObj["name"]] = metadata
destinationObj["metadata"][destinationObj["name"]]["hash"] = self._create_hash(destinationObj["fullpath"])
self._save_metadata(destinationObj["path"], destinationObj["metadata"])
self._copy_metadata_entry(source_data["path"], source_data["name"],
destination_data["path"], destination_data["name"],
delete_source=True)
def get_metadata(self, path):
path, name = self.sanitize(path)
metadata = self._get_metadata(path)
if name in metadata:
return metadata[name]
else:
return None
return self._get_metadata_entry(path, name)
def get_link(self, path, rel):
path, name = self.sanitize(path)
return self._get_links(name, path, rel)
def add_link(self, path, rel, data):
path, name = self.sanitize(path)
self._add_links(name, path, [(rel, data)])
@ -1154,13 +1128,53 @@ class LocalFileStorage(StorageInterface):
return hash.hexdigest()
def _get_metadata(self, path):
if path in self._metadata_cache:
return self._metadata_cache[path]
def _get_metadata_entry(self, path, name, default=None):
with self._get_metadata_lock(path):
metadata = self._get_metadata(path)
return metadata.get(name, default)
metadata_path = os.path.join(path, ".metadata.yaml")
if os.path.exists(metadata_path):
with self._metadata_lock:
def _remove_metadata_entry(self, path, name):
with self._get_metadata_lock(path):
metadata = self._get_metadata(path)
if not name in metadata:
return
if "hash" in metadata[name]:
hash = metadata[name]["hash"]
for m in metadata.values():
if not "links" in m:
continue
links_hash = lambda link: "hash" in link and link["hash"] == hash and "rel" in link and (link["rel"] == "model" or link["rel"] == "machinecode")
m["links"] = [link for link in m["links"] if not links_hash(link)]
del metadata[name]
self._save_metadata(path, metadata)
def _update_metadata_entry(self, path, name, data):
with self._get_metadata_lock(path):
metadata = self._get_metadata(path)
metadata[name] = data
self._save_metadata(path, metadata)
def _copy_metadata_entry(self, source_path, source_name, destination_path, destination_name, delete_source=False):
with self._get_metadata_lock(source_path):
source_data = self._get_metadata_entry(source_path, source_name, default=dict())
if not source_data:
return
if delete_source:
self._remove_metadata_entry(source_path, source_name)
with self._get_metadata_lock(destination_path):
self._update_metadata_entry(destination_path, destination_name, source_data)
def _get_metadata(self, path):
with self._get_metadata_lock(path):
if path in self._metadata_cache:
return deepcopy(self._metadata_cache[path])
metadata_path = os.path.join(path, ".metadata.yaml")
if os.path.exists(metadata_path):
with open(metadata_path) as f:
try:
import yaml
@ -1168,30 +1182,49 @@ class LocalFileStorage(StorageInterface):
except:
self._logger.exception("Error while reading .metadata.yaml from {path}".format(**locals()))
else:
self._metadata_cache[path] = metadata
self._metadata_cache[path] = deepcopy(metadata)
return metadata
return dict()
return dict()
def _save_metadata(self, path, metadata):
metadata_path = os.path.join(path, ".metadata.yaml")
with self._metadata_lock:
with self._get_metadata_lock(path):
metadata_path = os.path.join(path, ".metadata.yaml")
try:
import yaml
import shutil
file_obj = tempfile.NamedTemporaryFile(delete=False)
try:
yaml.safe_dump(metadata, stream=file_obj, default_flow_style=False, indent=" ", allow_unicode=True)
file_obj.close()
shutil.move(file_obj.name, metadata_path)
finally:
try:
if os.path.exists(file_obj.name):
os.remove(file_obj.name)
except Exception as e:
self._logger.warn("Could not delete file {}: {}".format(file_obj.name, str(e)))
with atomic_write(metadata_path) as f:
yaml.safe_dump(metadata, stream=f, default_flow_style=False, indent=" ", allow_unicode=True)
except:
self._logger.exception("Error while writing .metadata.yaml to {path}".format(**locals()))
else:
self._metadata_cache[path] = metadata
self._metadata_cache[path] = deepcopy(metadata)
def _delete_metadata(self, path):
with self._get_metadata_lock(path):
metadata_path = os.path.join(path, ".metadata.yaml")
if os.path.exists(metadata_path):
try:
os.remove(metadata_path)
except:
self._logger.exception("Error while deleting .metadata.yaml from {path}".format(**locals()))
if path in self._metadata_cache:
del self._metadata_cache[path]
@contextmanager
def _get_metadata_lock(self, path):
with self._metadata_lock_mutex:
if path not in self._metadata_locks:
import threading
self._metadata_locks[path] = (0, threading.RLock())
counter, lock = self._metadata_locks[path]
counter += 1
self._metadata_locks[path] = (counter, lock)
yield lock
counter = self._metadata_locks[path][0]
counter -= 1
if counter <= 0:
del self._metadata_locks[path]
else:
self._metadata_locks[path] = (counter, lock)

View file

@ -12,7 +12,7 @@ import os.path
from ddt import ddt, unpack, data
import octoprint.filemanager.storage
from octoprint.filemanager.storage import LocalFileStorage, StorageError
class FileWrapper(object):
@ -43,7 +43,7 @@ class LocalStorageTest(unittest.TestCase):
def setUp(self):
import tempfile
self.basefolder = os.path.realpath(os.path.abspath(tempfile.mkdtemp()))
self.storage = octoprint.filemanager.storage.LocalFileStorage(self.basefolder)
self.storage = LocalFileStorage(self.basefolder)
# mock file manager module
self.filemanager_patcher = mock.patch("octoprint.filemanager")
@ -67,24 +67,24 @@ class LocalStorageTest(unittest.TestCase):
self.filemanager_patcher.stop()
def test_add_file(self):
self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
def test_add_file_overwrite(self):
self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
try:
self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL, overwrite=False)
self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL, overwrite=False)
except:
pass
self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL, overwrite=True)
self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL, overwrite=True)
def test_add_file_with_web(self):
import time
href = "http://www.example.com"
retrieved = time.time()
stl_name = self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL, links=[("web", dict(href=href, retrieved=retrieved))])
stl_name = self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL, links=[("web", dict(href=href, retrieved=retrieved))])
stl_metadata = self.storage.get_metadata(stl_name)
self.assertIsNotNone(stl_metadata)
@ -97,8 +97,8 @@ class LocalStorageTest(unittest.TestCase):
self.assertEquals(retrieved, link["retrieved"])
def test_add_file_with_association(self):
stl_name = self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE, links=[("model", dict(name=stl_name))])
stl_name = self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_and_verify_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE, links=[("model", dict(name=stl_name))])
stl_metadata = self.storage.get_metadata(stl_name)
gcode_metadata = self.storage.get_metadata(gcode_name)
@ -122,8 +122,8 @@ class LocalStorageTest(unittest.TestCase):
self.assertEquals(FILE_BP_CASE_GCODE.hash, link["hash"])
def test_remove_file(self):
stl_name = self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE, links=[("model", dict(name=stl_name))])
stl_name = self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_and_verify_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE, links=[("model", dict(name=stl_name))])
stl_metadata = self.storage.get_metadata(stl_name)
gcode_metadata = self.storage.get_metadata(gcode_name)
@ -142,23 +142,94 @@ class LocalStorageTest(unittest.TestCase):
self.assertEquals(0, len(gcode_metadata["links"]))
def test_copy_file(self):
self._add_file("bp_case.stl", FILE_BP_CASE_STL)
self._add_folder("test")
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "bp_case.stl")))
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, "test")))
self.storage.copy_file("bp_case.stl", "test/copied.stl")
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "bp_case.stl")))
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "test", "copied.stl")))
stl_metadata = self.storage.get_metadata("bp_case.stl")
copied_metadata = self.storage.get_metadata("test/copied.stl")
self.assertIsNotNone(stl_metadata)
self.assertIsNotNone(copied_metadata)
self.assertDictEqual(stl_metadata, copied_metadata)
def test_move_file(self):
self._add_file("bp_case.stl", FILE_BP_CASE_STL)
self._add_folder("test")
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "bp_case.stl")))
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, "test")))
before_stl_metadata = self.storage.get_metadata("bp_case.stl")
self.storage.move_file("bp_case.stl", "test/copied.stl")
self.assertFalse(os.path.isfile(os.path.join(self.basefolder, "bp_case.stl")))
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "test", "copied.stl")))
after_stl_metadata = self.storage.get_metadata("bp_case.stl")
copied_metadata = self.storage.get_metadata("test/copied.stl")
self.assertIsNotNone(before_stl_metadata)
self.assertIsNone(after_stl_metadata)
self.assertIsNotNone(copied_metadata)
self.assertDictEqual(before_stl_metadata, copied_metadata)
@data("copy_file", "move_file")
def test_copy_move_file_missing_source(self, operation):
try:
getattr(self.storage, operation)("bp_case.stl", "test/copied.stl")
self.fail("Expected an exception")
except StorageError as e:
self.assertEqual(e.code, StorageError.INVALID_SOURCE)
@data("copy_file", "move_file")
def test_copy_move_file_missing_destination_folder(self, operation):
self._add_file("bp_case.stl", FILE_BP_CASE_STL)
try:
getattr(self.storage, operation)("bp_case.stl", "test/copied.stl")
self.fail("Expected an exception")
except StorageError as e:
self.assertEqual(e.code, StorageError.INVALID_DESTINATION)
@data("copy_file", "move_file")
def test_copy_move_file_existing_destination_path(self, operation):
self._add_file("bp_case.stl", FILE_BP_CASE_STL)
self._add_folder("test")
self._add_file("test/crazyradio.stl", FILE_CRAZYRADIO_STL)
try:
getattr(self.storage, operation)("bp_case.stl", "test/crazyradio.stl")
self.fail("Expected an exception")
except StorageError as e:
self.assertEqual(e.code, StorageError.INVALID_DESTINATION)
def test_add_folder(self):
self._add_folder("test", "test")
self._add_and_verify_folder("test", "test")
def test_add_subfolder(self):
folder_name = self._add_folder("folder with some spaces", "folder_with_some_spaces")
subfolder_name = self._add_folder((folder_name, "subfolder"), folder_name + "/subfolder")
stl_name = self._add_file((subfolder_name, "bp_case.stl"), subfolder_name + "/bp_case.stl", FILE_BP_CASE_STL)
folder_name = self._add_and_verify_folder("folder with some spaces", "folder_with_some_spaces")
subfolder_name = self._add_and_verify_folder((folder_name, "subfolder"), folder_name + "/subfolder")
stl_name = self._add_and_verify_file((subfolder_name, "bp_case.stl"), subfolder_name + "/bp_case.stl", FILE_BP_CASE_STL)
self.assertTrue(os.path.exists(os.path.join(self.basefolder, folder_name)))
self.assertTrue(os.path.exists(os.path.join(self.basefolder, subfolder_name)))
self.assertTrue(os.path.exists(os.path.join(self.basefolder, stl_name)))
def test_remove_folder(self):
content_folder = self._add_folder("content", "content")
other_stl_name = self._add_file((content_folder, "crazyradio.stl"), content_folder + "/crazyradio.stl", FILE_CRAZYRADIO_STL)
content_folder = self._add_and_verify_folder("content", "content")
other_stl_name = self._add_and_verify_file((content_folder, "crazyradio.stl"), content_folder + "/crazyradio.stl", FILE_CRAZYRADIO_STL)
empty_folder = self._add_folder("empty", "empty")
empty_folder = self._add_and_verify_folder("empty", "empty")
try:
self.storage.remove_folder(content_folder, recursive=False)
@ -177,20 +248,94 @@ class LocalStorageTest(unittest.TestCase):
self.assertFalse(os.path.isdir(os.path.join(self.basefolder, empty_folder)))
def test_remove_folder_with_metadata(self):
content_folder = self._add_folder("content", "content")
other_stl_name = self._add_file((content_folder, "crazyradio.stl"), content_folder + "/crazyradio.stl", FILE_CRAZYRADIO_STL)
content_folder = self._add_and_verify_folder("content", "content")
other_stl_name = self._add_and_verify_file((content_folder, "crazyradio.stl"), content_folder + "/crazyradio.stl", FILE_CRAZYRADIO_STL)
self.storage.remove_file(other_stl_name)
self.storage.remove_folder(content_folder, recursive=False)
def test_copy_folder(self):
self._add_folder("source")
self._add_folder("destination")
self._add_file("source/crazyradio.stl", FILE_CRAZYRADIO_STL)
source_metadata = self.storage.get_metadata("source/crazyradio.stl")
self.storage.copy_folder("source", "destination/copied")
copied_metadata = self.storage.get_metadata("destination/copied/crazyradio.stl")
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, "source")))
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "source", "crazyradio.stl")))
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, "destination")))
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, "destination", "copied")))
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "destination", "copied", ".metadata.yaml")))
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "destination", "copied", "crazyradio.stl")))
self.assertIsNotNone(source_metadata)
self.assertIsNotNone(copied_metadata)
self.assertDictEqual(source_metadata, copied_metadata)
def test_move_folder(self):
self._add_folder("source")
self._add_folder("destination")
self._add_file("source/crazyradio.stl", FILE_CRAZYRADIO_STL)
before_source_metadata = self.storage.get_metadata("source/crazyradio.stl")
self.storage.move_folder("source", "destination/copied")
after_source_metadata = self.storage.get_metadata("source/crazyradio.stl")
copied_metadata = self.storage.get_metadata("destination/copied/crazyradio.stl")
self.assertFalse(os.path.isdir(os.path.join(self.basefolder, "source")))
self.assertFalse(os.path.isfile(os.path.join(self.basefolder, "source", "crazyradio.stl")))
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, "destination")))
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, "destination", "copied")))
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "destination", "copied", ".metadata.yaml")))
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "destination", "copied", "crazyradio.stl")))
self.assertIsNotNone(before_source_metadata)
self.assertIsNone(after_source_metadata)
self.assertIsNotNone(copied_metadata)
self.assertDictEqual(before_source_metadata, copied_metadata)
@data("copy_folder", "move_folder")
def test_copy_move_folder_missing_source(self, operation):
try:
getattr(self.storage, operation)("source", "destination/copied")
self.fail("Expected an exception")
except StorageError as e:
self.assertEqual(e.code, StorageError.INVALID_SOURCE)
@data("copy_folder", "move_folder")
def test_copy_move_folder_missing_destination_folder(self, operation):
self._add_folder("source")
self._add_file("source/crazyradio.stl", FILE_CRAZYRADIO_STL)
try:
getattr(self.storage, operation)("source", "destination/copied")
self.fail("Expected an exception")
except StorageError as e:
self.assertEqual(e.code, StorageError.INVALID_DESTINATION)
@data("copy_folder", "move_folder")
def test_copy_move_folder_existing_destination_path(self, operation):
self._add_folder("source")
self._add_file("source/crazyradio.stl", FILE_CRAZYRADIO_STL)
self._add_folder("destination")
self._add_folder("destination/copied")
try:
getattr(self.storage, operation)("source", "destination/copied")
self.fail("Expected an exception")
except StorageError as e:
self.assertEqual(e.code, StorageError.INVALID_DESTINATION)
def test_list(self):
bp_case_stl = self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
self._add_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE, links=[("model", dict(name=bp_case_stl))])
bp_case_stl = self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
self._add_and_verify_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE, links=[("model", dict(name=bp_case_stl))])
content_folder = self._add_folder("content", "content")
self._add_file((content_folder, "crazyradio.stl"), content_folder + "/crazyradio.stl", FILE_CRAZYRADIO_STL)
content_folder = self._add_and_verify_folder("content", "content")
self._add_and_verify_file((content_folder, "crazyradio.stl"), content_folder + "/crazyradio.stl", FILE_CRAZYRADIO_STL)
self._add_folder("empty", "empty")
self._add_and_verify_folder("empty", "empty")
file_list = self.storage.list_files()
self.assertEquals(4, len(file_list))
@ -215,8 +360,8 @@ class LocalStorageTest(unittest.TestCase):
self.assertEquals(0, len(file_list["empty"]["children"]))
def test_add_link_model(self):
stl_name = self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE)
stl_name = self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_and_verify_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE)
self.storage.add_link(gcode_name, "model", dict(name=stl_name))
@ -242,8 +387,8 @@ class LocalStorageTest(unittest.TestCase):
self.assertEquals(FILE_BP_CASE_GCODE.hash, link["hash"])
def test_add_link_machinecode(self):
stl_name = self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE)
stl_name = self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_and_verify_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE)
self.storage.add_link(stl_name, "machinecode", dict(name=gcode_name))
@ -269,7 +414,7 @@ class LocalStorageTest(unittest.TestCase):
self.assertEquals(FILE_BP_CASE_GCODE.hash, link["hash"])
def test_remove_link(self):
stl_name = self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
stl_name = self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
self.storage.add_link(stl_name, "web", dict(href="http://www.example.com"))
self.storage.add_link(stl_name, "web", dict(href="http://www.example2.com"))
@ -288,8 +433,8 @@ class LocalStorageTest(unittest.TestCase):
self.assertEquals(1, len(stl_metadata["links"]))
def test_remove_link_bidirectional(self):
stl_name = self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE)
stl_name = self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_and_verify_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE)
self.storage.add_link(stl_name, "machinecode", dict(name=gcode_name))
self.storage.add_link(stl_name, "web", dict(href="http://www.example.com"))
@ -379,8 +524,23 @@ class LocalStorageTest(unittest.TestCase):
self.assertEquals(expected_path, actual_path)
self.assertEquals(expected_name, actual_name)
def _add_file(self, path, expected_path, file_object, links=None, overwrite=False):
def _add_and_verify_file(self, path, expected_path, file_object, links=None, overwrite=False):
"""Adds a file to the storage and verifies the sanitized path."""
sanitized_path = self._add_file(path, file_object, links=links, overwrite=overwrite)
self.assertEquals(expected_path, sanitized_path)
return sanitized_path
def _add_file(self, path, file_object, links=None, overwrite=False):
"""
Adds a file to the storage.
Ensures file is present, metadata is present, hash and links (if applicable)
are populated correctly.
Returns sanitized path.
"""
sanitized_path = self.storage.add_file(path, file_object, links=links, allow_overwrite=overwrite)
split_path = sanitized_path.split("/")
if len(split_path) == 1:
file_path = os.path.join(self.basefolder, split_path[0])
@ -389,9 +549,8 @@ class LocalStorageTest(unittest.TestCase):
file_path = os.path.join(self.basefolder, os.path.join(*split_path))
folder_path = os.path.join(self.basefolder, os.path.join(*split_path[:-1]))
self.assertEquals(expected_path, sanitized_path)
self.assertTrue(os.path.exists(file_path))
self.assertTrue(os.path.exists(os.path.join(folder_path, ".metadata.yaml")))
self.assertTrue(os.path.isfile(file_path))
self.assertTrue(os.path.isfile(os.path.join(folder_path, ".metadata.yaml")))
metadata = self.storage.get_metadata(sanitized_path)
self.assertIsNotNone(metadata)
@ -406,11 +565,21 @@ class LocalStorageTest(unittest.TestCase):
return sanitized_path
def _add_folder(self, path, expected_path):
sanitized_path = self.storage.add_folder(path)
def _add_and_verify_folder(self, path, expected_path):
"""Adds a folder to the storage and verifies sanitized path."""
sanitized_path = self._add_folder(path)
self.assertEquals(expected_path, sanitized_path)
self.assertTrue(os.path.exists(os.path.join(self.basefolder, os.path.join(*sanitized_path.split("/")))))
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, os.path.join(*sanitized_path.split("/")))))
return sanitized_path
def _add_folder(self, path):
"""
Adds a folder to the storage.
Verifies existance of folder.
Returns sanitized path.
"""
sanitized_path = self.storage.add_folder(path)
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, os.path.join(*sanitized_path.split("/")))))
return sanitized_path