Allow overriding file name sanitization by sub classes of LocalStorage
This commit is contained in:
parent
2b22d26eac
commit
a7bd770180
1 changed files with 36 additions and 25 deletions
|
|
@ -391,7 +391,7 @@ class LocalFileStorage(StorageInterface):
|
|||
path = self.sanitize_path(path)
|
||||
else:
|
||||
path = self.basefolder
|
||||
return self._list_folder(path, filter=filter, recursive=recursive)
|
||||
return self._list_folder(path, entry_filter=filter, recursive=recursive)
|
||||
|
||||
def add_folder(self, path, ignore_existing=True):
|
||||
path, name = self.sanitize(path)
|
||||
|
|
@ -650,6 +650,30 @@ class LocalFileStorage(StorageInterface):
|
|||
raise ValueError("path not contained in base folder: {path}".format(**locals()))
|
||||
return path
|
||||
|
||||
def _sanitize_entry(self, entry, path, entry_path):
|
||||
sanitized = self.sanitize_name(entry)
|
||||
if sanitized != entry:
|
||||
# entry is not sanitized yet, let's take care of that
|
||||
sanitized_path = os.path.join(path, sanitized)
|
||||
sanitized_name, sanitized_ext = os.path.splitext(sanitized)
|
||||
|
||||
counter = 1
|
||||
while os.path.exists(sanitized_path):
|
||||
counter += 1
|
||||
sanitized = self.sanitize_name("{}_({}){}".format(sanitized_name, counter, sanitized_ext))
|
||||
sanitized_path = os.path.join(path, sanitized)
|
||||
|
||||
try:
|
||||
shutil.move(entry_path, sanitized_path)
|
||||
|
||||
self._logger.info("Sanitized \"{}\" to \"{}\"".format(entry_path, sanitized_path))
|
||||
return sanitized, sanitized_path
|
||||
except:
|
||||
self._logger.exception("Error while trying to rename \"{}\" to \"{}\", ignoring file".format(entry_path, sanitized_path))
|
||||
raise
|
||||
|
||||
return entry, entry_path
|
||||
|
||||
def path_in_storage(self, path):
|
||||
if isinstance(path, (tuple, list)):
|
||||
path = self.join_path(*path)
|
||||
|
|
@ -889,7 +913,10 @@ class LocalFileStorage(StorageInterface):
|
|||
if metadata_dirty:
|
||||
self._save_metadata(path, metadata)
|
||||
|
||||
def _list_folder(self, path, filter=None, recursive=True):
|
||||
def _list_folder(self, path, entry_filter=None, recursive=True, **kwargs):
|
||||
if entry_filter is None:
|
||||
entry_filter = kwargs.get("filter", None)
|
||||
|
||||
metadata = self._get_metadata(path)
|
||||
if not metadata:
|
||||
metadata = dict()
|
||||
|
|
@ -903,27 +930,11 @@ class LocalFileStorage(StorageInterface):
|
|||
|
||||
entry_path = os.path.join(path, entry)
|
||||
|
||||
sanitized = self.sanitize_name(entry)
|
||||
if sanitized != entry:
|
||||
# entry is not sanitized yet, let's take care of that
|
||||
sanitized_path = os.path.join(path, sanitized)
|
||||
sanitized_name, sanitized_ext = os.path.splitext(sanitized)
|
||||
|
||||
counter = 1
|
||||
while os.path.exists(sanitized_path):
|
||||
counter += 1
|
||||
sanitized = self.sanitize_name("{}_({}){}".format(sanitized_name, counter, sanitized_ext))
|
||||
sanitized_path = os.path.join(path, sanitized)
|
||||
|
||||
try:
|
||||
shutil.move(entry_path, sanitized_path)
|
||||
|
||||
self._logger.info("Sanitized \"{}\" to \"{}\"".format(entry_path, sanitized_path))
|
||||
entry = sanitized
|
||||
entry_path = sanitized_path
|
||||
except:
|
||||
self._logger.exception("Error while trying to rename \"{}\" to \"{}\", ignoring file".format(entry_path, sanitized_path))
|
||||
continue
|
||||
try:
|
||||
entry, entry_path = self._sanitize_entry(entry, path, entry_path)
|
||||
except:
|
||||
# error while trying to rename the file, we'll continue here and ignore it
|
||||
continue
|
||||
|
||||
# file handling
|
||||
if os.path.isfile(entry_path):
|
||||
|
|
@ -942,7 +953,7 @@ class LocalFileStorage(StorageInterface):
|
|||
|
||||
# TODO extract model hash from source if possible to recreate link
|
||||
|
||||
if not filter or filter(entry, entry_data):
|
||||
if not entry_filter or entry_filter(entry, entry_data):
|
||||
# only add files passing the optional filter
|
||||
extended_entry_data = dict()
|
||||
extended_entry_data.update(entry_data)
|
||||
|
|
@ -957,7 +968,7 @@ class LocalFileStorage(StorageInterface):
|
|||
|
||||
# folder recursion
|
||||
elif os.path.isdir(entry_path) and recursive:
|
||||
sub_result = self._list_folder(entry_path, filter=filter)
|
||||
sub_result = self._list_folder(entry_path, entry_filter=entry_filter)
|
||||
result[entry] = dict(
|
||||
name=entry,
|
||||
type="folder",
|
||||
|
|
|
|||
Loading…
Reference in a new issue