Merge branch 'dev/folderSupport' into devel

This commit is contained in:
Gina Häußge 2015-11-19 12:52:49 +01:00
commit a4e805685a
13 changed files with 1142 additions and 328 deletions

View file

@ -340,9 +340,15 @@ class FileManager(object):
def get_busy_files(self):
return self._slicing_jobs.keys()
def file_in_path(self, destination, path, file):
return self._storage(destination).file_in_path(path, file)
def file_exists(self, destination, path):
return self._storage(destination).file_exists(path)
def folder_exists(self, destination, path):
return self._storage(destination).folder_exists(path)
def list_files(self, destinations=None, path=None, filter=None, recursive=None):
if not destinations:
destinations = self._storage_managers.keys()
@ -385,6 +391,14 @@ class FileManager(object):
self._storage(destination).remove_file(path)
eventManager().fire(Events.UPDATED_FILES, dict(type="printables"))
def copy_file(self, destination, source, dst):
self._storage(destination).copy_file(source, dst)
eventManager().fire(Events.UPDATED_FILES, dict(type="printables"))
def move_file(self, destination, source, dst):
self._storage(destination).move_file(source, dst)
eventManager().fire(Events.UPDATED_FILES, dict(type="printables"))
def add_folder(self, destination, path, ignore_existing=True):
folder_path = self._storage(destination).add_folder(path, ignore_existing=ignore_existing)
eventManager().fire(Events.UPDATED_FILES, dict(type="printables"))
@ -394,6 +408,14 @@ class FileManager(object):
self._storage(destination).remove_folder(path, recursive=recursive)
eventManager().fire(Events.UPDATED_FILES, dict(type="printables"))
def copy_folder(self, destination, source, dst):
self._storage(destination).copy_folder(source, dst)
eventManager().fire(Events.UPDATED_FILES, dict(type="printables"))
def move_folder(self, destination, source, dst):
self._storage(destination).move_folder(source, dst)
eventManager().fire(Events.UPDATED_FILES, dict(type="printables"))
def get_metadata(self, destination, path):
return self._storage(destination).get_metadata(path)

View file

@ -9,7 +9,11 @@ __copyright__ = "Copyright (C) 2014 The OctoPrint Project - Released under terms
import logging
import os
import pylru
import tempfile
import shutil
from octoprint.util import atomic_write
from contextlib import contextmanager
from copy import deepcopy
import octoprint.filemanager
@ -33,6 +37,15 @@ class StorageInterface(object):
return
yield
def file_in_path(self, path, filepath):
"""
Returns whether the file indicated by ``file`` is inside ``path`` or not.
:param string path: the path to check
:param string filepath: path to the file
:return: ``True`` if the file is inside the path, ``False`` otherwise
"""
return NotImplementedError()
def file_exists(self, path):
"""
Returns whether the file indicated by ``path`` exists or not.
@ -41,6 +54,14 @@ class StorageInterface(object):
"""
raise NotImplementedError()
def folder_exists(self, path):
"""
Returns whether the folder indicated by ``path`` exists or not.
:param string path: the path to check for existence
:return: ``True`` if the folder exists, ``False`` otherwise
"""
raise NotImplementedError()
def list_files(self, path=None, filter=None, recursive=True):
"""
List all files in storage starting at ``path``. If ``recursive`` is set to True (the default), also dives into
@ -112,6 +133,24 @@ class StorageInterface(object):
"""
raise NotImplementedError()
def copy_folder(self, source, destination):
"""
Copys the folder ``source`` to ``destination``
:param string source: path to the source folder
:param string destination: path to destination
"""
raise NotImplementedError()
def move_folder(self, source, destination):
"""
Moves the folder ``source`` to ``destination``
:param string source: path to the source folder
:param string destination: path to destination
"""
raise NotImplementedError()
def add_file(self, path, file_object, printer_profile=None, links=None, allow_overwrite=False):
"""
Adds the file ``file_object`` as ``path``
@ -136,6 +175,24 @@ class StorageInterface(object):
"""
raise NotImplementedError()
def copy_file(self, source, destination):
"""
Copys the file ``source`` to ``destination``
:param string source: path to the source file
:param string destination: path to destination
"""
raise NotImplementedError()
def move_file(self, source, destination):
"""
Moves the file ``source`` to ``destination``
:param string source: path to the source file
:param string destination: path to destination
"""
raise NotImplementedError()
def get_metadata(self, path):
"""
Retrieves the metadata for the file ``path``.
@ -277,6 +334,25 @@ class StorageInterface(object):
raise NotImplementedError()
class StorageError(BaseException):
UNKNOWN = "unknown"
INVALID_DIRECTORY = "invalid_directory"
INVALID_FILE = "invalid_file"
INVALID_SOURCE = "invalid_source"
INVALID_DESTINATION = "invalid_destination"
DOES_NOT_EXIST = "does_not_exist"
ALREADY_EXISTS = "already_exists"
NOT_EMPTY = "not_empty"
def __init__(self, message, code=None, cause=None):
BaseException.__init__(self)
self.message = message
self.cause = cause
if code is None:
code = StorageError.UNKNOWN
self.code = code
class LocalFileStorage(StorageInterface):
"""
@ -302,10 +378,11 @@ class LocalFileStorage(StorageInterface):
if not os.path.exists(self.basefolder) and create:
os.makedirs(self.basefolder)
if not os.path.exists(self.basefolder) or not os.path.isdir(self.basefolder):
raise RuntimeError("{basefolder} is not a valid directory".format(**locals()))
raise StorageError("{basefolder} is not a valid directory".format(**locals()), code=StorageError.INVALID_DIRECTORY)
import threading
self._metadata_lock = threading.Lock()
self._metadata_lock_mutex = threading.RLock()
self._metadata_locks = dict()
self._metadata_cache = pylru.lrucache(10)
@ -374,11 +451,22 @@ class LocalFileStorage(StorageInterface):
for sub_entry in self._analysis_backlog_generator(absolute_path):
yield self.join_path(entry, sub_entry[0]), sub_entry[1], sub_entry[2]
def file_in_path(self, path, filepath):
filepath = self.sanitize_path(filepath)
path = self.sanitize_path(path)
return filepath.startswith(path)
def file_exists(self, path):
path, name = self.sanitize(path)
file_path = os.path.join(path, name)
return os.path.exists(file_path) and os.path.isfile(file_path)
def folder_exists(self, path):
path, name = self.sanitize(path)
folder_path = os.path.join(path, name)
return os.path.exists(folder_path) and os.path.isdir(folder_path)
def list_files(self, path=None, filter=None, recursive=True):
if path:
path = self.sanitize_path(path)
@ -392,7 +480,7 @@ class LocalFileStorage(StorageInterface):
folder_path = os.path.join(path, name)
if os.path.exists(folder_path):
if not ignore_existing:
raise RuntimeError("{sanitized_foldername} does already exist in {virtual_path}".format(**locals()))
raise StorageError("{sanitized_foldername} does already exist in {virtual_path}".format(**locals()), code=StorageError.ALREADY_EXISTS)
else:
os.mkdir(folder_path)
@ -409,25 +497,69 @@ class LocalFileStorage(StorageInterface):
if ".metadata.yaml" in contents:
contents.remove(".metadata.yaml")
if contents and not recursive:
raise RuntimeError("{sanitized_foldername} in {virtual_path} is not empty".format(**locals()))
raise StorageError("{sanitized_foldername} in {virtual_path} is not empty".format(**locals()), code=StorageError.NOT_EMPTY)
import shutil
shutil.rmtree(folder_path)
self._delete_metadata(folder_path)
def _get_source_destination_data(self, source, destination):
"""Prepares data dicts about source and destination for copy/move."""
source_path, source_name = self.sanitize(source)
destination_path, destination_name = self.sanitize(destination)
source_fullpath = os.path.join(source_path, source_name)
destination_fullpath = os.path.join(destination_path, destination_name)
if not os.path.exists(source_fullpath):
raise StorageError("{} in {} does not exist".format(source_name, source_path), code=StorageError.INVALID_SOURCE)
if not os.path.isdir(destination_path):
raise StorageError("Destination path {} does not exist or is not a folder".format(destination_path), code=StorageError.INVALID_DESTINATION)
if os.path.exists(destination_fullpath):
raise StorageError("{} does already exist in {}".format(destination_name, destination_path), code=StorageError.INVALID_DESTINATION)
source_data = dict(
path=source_path,
name=source_name,
fullpath=source_fullpath,
)
destination_data = dict(
path=destination_path,
name=destination_name,
fullpath=destination_fullpath,
)
return source_data, destination_data
def copy_folder(self, source, destination):
source_data, destination_data = self._get_source_destination_data(source, destination)
try:
shutil.copytree(source_data["fullpath"], destination_data["fullpath"])
except Exception as e:
raise StorageError("Could not copy %s in %s to %s in %s" % (source_data["name"], source_data["path"], destination_data["name"], destination_data["path"]), cause=e)
def move_folder(self, source, destination):
source_data, destination_data = self._get_source_destination_data(source, destination)
try:
shutil.move(source_data["fullpath"], destination_data["fullpath"])
except Exception as e:
raise StorageError("Could not move %s in %s to %s in %s" % (source_data["name"], source_data["path"], destination_data["name"], destination_data["path"]), cause=e)
self._delete_metadata(source_data["fullpath"])
def add_file(self, path, file_object, printer_profile=None, links=None, allow_overwrite=False):
path, name = self.sanitize(path)
if not octoprint.filemanager.valid_file_type(name):
raise RuntimeError("{name} is an unrecognized file type".format(**locals()))
metadata = self._get_metadata(path)
if not metadata:
metadata = dict()
raise StorageError("{name} is an unrecognized file type".format(**locals()), code=StorageError.INVALID_FILE)
file_path = os.path.join(path, name)
if os.path.exists(file_path) and not os.path.isfile(file_path):
raise RuntimeError("{name} does already exist in {path} and is not a file".format(**locals()))
raise StorageError("{name} does already exist in {path} and is not a file".format(**locals()), code=StorageError.ALREADY_EXISTS)
if os.path.exists(file_path) and not allow_overwrite:
raise RuntimeError("{name} does already exist in {path} and overwriting is prohibited".format(**locals()))
raise StorageError("{name} does already exist in {path} and overwriting is prohibited".format(**locals()), code=StorageError.ALREADY_EXISTS)
# make sure folders exist
if not os.path.exists(path):
@ -438,13 +570,10 @@ class LocalFileStorage(StorageInterface):
# save the file's hash to the metadata of the folder
file_hash = self._create_hash(file_path)
if not name in metadata or not "hash" in metadata[name] or metadata[name]["hash"] != file_hash:
# make sure to create a new metadata entry if we've never seen that file with that content before
file_metadata = dict(
hash=file_hash
)
metadata[name] = file_metadata
self._save_metadata(path, metadata)
metadata = self._get_metadata_entry(path, name, default=dict())
if not "hash" in metadata or metadata["hash"] != file_hash:
metadata["hash"] = file_hash
self._update_metadata_entry(path, name, metadata)
# process any links that were also provided for adding to the file
if not links:
@ -463,45 +592,50 @@ class LocalFileStorage(StorageInterface):
def remove_file(self, path):
path, name = self.sanitize(path)
metadata = self._get_metadata(path)
file_path = os.path.join(path, name)
if not os.path.exists(file_path):
return
if not os.path.isfile(file_path):
raise RuntimeError("{name} in {path} is not a file".format(**locals()))
raise StorageError("{name} in {path} is not a file".format(**locals()), code=StorageError.INVALID_FILE)
try:
os.remove(file_path)
except Exception as e:
raise RuntimeError("Could not delete {name} in {path}".format(**locals()), e)
raise StorageError("Could not delete {name} in {path}".format(**locals()), cause=e)
if name in metadata:
if "hash" in metadata[name]:
hash = metadata[name]["hash"]
for m in metadata.values():
if not "links" in m:
continue
for link in m["links"]:
if "rel" in link and "hash" in link and (link["rel"] == "model" or link["rel"] == "machinecode") and link["hash"] == hash:
m["links"].remove(link)
del metadata[name]
self._save_metadata(path, metadata)
self._remove_metadata_entry(path, name)
def copy_file(self, source, destination):
source_data, destination_data = self._get_source_destination_data(source, destination)
try:
shutil.copy2(source_data["fullpath"], destination_data["fullpath"])
except Exception as e:
raise StorageError("Could not copy %s in %s to %s in %s" % (source_data["name"], source_data["path"], destination_data["name"], destination_data["path"]), cause=e)
self._copy_metadata_entry(source_data["path"], source_data["name"],
destination_data["path"], destination_data["name"])
def move_file(self, source, destination, allow_overwrite=False):
source_data, destination_data = self._get_source_destination_data(source, destination)
try:
shutil.move(source_data["fullpath"], destination_data["fullpath"])
except Exception as e:
raise StorageError("Could not move %s in %s to %s in %s" % (source_data["name"], source_data["path"], destination_data["name"], destination_data["path"]), cause=e)
self._copy_metadata_entry(source_data["path"], source_data["name"],
destination_data["path"], destination_data["name"],
delete_source=True)
def get_metadata(self, path):
path, name = self.sanitize(path)
metadata = self._get_metadata(path)
if name in metadata:
return metadata[name]
else:
return None
return self._get_metadata_entry(path, name)
def get_link(self, path, rel):
path, name = self.sanitize(path)
return self._get_links(name, path, rel)
def add_link(self, path, rel, data):
path, name = self.sanitize(path)
self._add_links(name, path, [(rel, data)])
@ -925,13 +1059,29 @@ class LocalFileStorage(StorageInterface):
# folder recursion
elif os.path.isdir(entry_path) and recursive:
sub_result = self._list_folder(entry_path, filter=filter)
result[entry] = dict(
sub_result = self._list_folder(entry_path, filter=filter, recursive=recursive)
entry_data = dict(
name=entry,
type="folder",
children=sub_result
)
if not filter or filter(entry, entry_data):
def get_size():
total_size = 0
for element in entry_data["children"].itervalues():
if "size" in element:
total_size += element["size"]
return total_size
# only add folders passing the optional filter
extended_entry_data = dict()
extended_entry_data.update(entry_data)
extended_entry_data["size"] = get_size()
result[entry] = extended_entry_data
# TODO recreate links if we have metadata less entries
# save metadata
@ -978,13 +1128,53 @@ class LocalFileStorage(StorageInterface):
return hash.hexdigest()
def _get_metadata(self, path):
if path in self._metadata_cache:
return self._metadata_cache[path]
def _get_metadata_entry(self, path, name, default=None):
with self._get_metadata_lock(path):
metadata = self._get_metadata(path)
return metadata.get(name, default)
metadata_path = os.path.join(path, ".metadata.yaml")
if os.path.exists(metadata_path):
with self._metadata_lock:
def _remove_metadata_entry(self, path, name):
with self._get_metadata_lock(path):
metadata = self._get_metadata(path)
if not name in metadata:
return
if "hash" in metadata[name]:
hash = metadata[name]["hash"]
for m in metadata.values():
if not "links" in m:
continue
links_hash = lambda link: "hash" in link and link["hash"] == hash and "rel" in link and (link["rel"] == "model" or link["rel"] == "machinecode")
m["links"] = [link for link in m["links"] if not links_hash(link)]
del metadata[name]
self._save_metadata(path, metadata)
def _update_metadata_entry(self, path, name, data):
with self._get_metadata_lock(path):
metadata = self._get_metadata(path)
metadata[name] = data
self._save_metadata(path, metadata)
def _copy_metadata_entry(self, source_path, source_name, destination_path, destination_name, delete_source=False):
with self._get_metadata_lock(source_path):
source_data = self._get_metadata_entry(source_path, source_name, default=dict())
if not source_data:
return
if delete_source:
self._remove_metadata_entry(source_path, source_name)
with self._get_metadata_lock(destination_path):
self._update_metadata_entry(destination_path, destination_name, source_data)
def _get_metadata(self, path):
with self._get_metadata_lock(path):
if path in self._metadata_cache:
return deepcopy(self._metadata_cache[path])
metadata_path = os.path.join(path, ".metadata.yaml")
if os.path.exists(metadata_path):
with open(metadata_path) as f:
try:
import yaml
@ -992,30 +1182,49 @@ class LocalFileStorage(StorageInterface):
except:
self._logger.exception("Error while reading .metadata.yaml from {path}".format(**locals()))
else:
self._metadata_cache[path] = metadata
self._metadata_cache[path] = deepcopy(metadata)
return metadata
return dict()
return dict()
def _save_metadata(self, path, metadata):
metadata_path = os.path.join(path, ".metadata.yaml")
with self._metadata_lock:
with self._get_metadata_lock(path):
metadata_path = os.path.join(path, ".metadata.yaml")
try:
import yaml
import shutil
file_obj = tempfile.NamedTemporaryFile(delete=False)
try:
yaml.safe_dump(metadata, stream=file_obj, default_flow_style=False, indent=" ", allow_unicode=True)
file_obj.close()
shutil.move(file_obj.name, metadata_path)
finally:
try:
if os.path.exists(file_obj.name):
os.remove(file_obj.name)
except Exception as e:
self._logger.warn("Could not delete file {}: {}".format(file_obj.name, str(e)))
with atomic_write(metadata_path) as f:
yaml.safe_dump(metadata, stream=f, default_flow_style=False, indent=" ", allow_unicode=True)
except:
self._logger.exception("Error while writing .metadata.yaml to {path}".format(**locals()))
else:
self._metadata_cache[path] = metadata
self._metadata_cache[path] = deepcopy(metadata)
def _delete_metadata(self, path):
with self._get_metadata_lock(path):
metadata_path = os.path.join(path, ".metadata.yaml")
if os.path.exists(metadata_path):
try:
os.remove(metadata_path)
except:
self._logger.exception("Error while deleting .metadata.yaml from {path}".format(**locals()))
if path in self._metadata_cache:
del self._metadata_cache[path]
@contextmanager
def _get_metadata_lock(self, path):
with self._metadata_lock_mutex:
if path not in self._metadata_locks:
import threading
self._metadata_locks[path] = (0, threading.RLock())
counter, lock = self._metadata_locks[path]
counter += 1
self._metadata_locks[path] = (counter, lock)
yield lock
counter = self._metadata_locks[path][0]
counter -= 1
if counter <= 0:
del self._metadata_locks[path]
else:
self._metadata_locks[path] = (counter, lock)

View file

@ -25,11 +25,12 @@ import psutil
@api.route("/files", methods=["GET"])
def readGcodeFiles():
filter = None
if "filter" in request.values:
filter = request.values["filter"]
files = _getFileList(FileDestinations.LOCAL, filter=filter)
filter = "filter" in request.values and request.values["recursive"] in valid_boolean_trues
recursive = "recursive" in request.values and request.values["recursive"] in valid_boolean_trues
files = _getFileList(FileDestinations.LOCAL, filter=filter, recursive=recursive)
files.extend(_getFileList(FileDestinations.SDCARD))
usage = psutil.disk_usage(settings().getBaseFolder("uploads"))
return jsonify(files=files, free=usage.free, total=usage.total)
@ -39,7 +40,11 @@ def readGcodeFilesForOrigin(origin):
if origin not in [FileDestinations.LOCAL, FileDestinations.SDCARD]:
return make_response("Unknown origin: %s" % origin, 404)
files = _getFileList(origin)
recursive = False
if "recursive" in request.values:
recursive = request.values["recursive"] == 'true'
files = _getFileList(origin, recursive=recursive)
if origin == FileDestinations.LOCAL:
usage = psutil.disk_usage(settings().getBaseFolder("uploads"))
@ -48,15 +53,47 @@ def readGcodeFilesForOrigin(origin):
return jsonify(files=files)
def _getFileDetails(origin, filename):
files = _getFileList(origin)
for file in files:
if file["name"] == filename:
return file
def _getFileDetails(origin, path):
files = _getFileList(origin, recursive=True)
path = path.split('/')
if len(path) == 1:
# shortcut for files in the root folder
name = path[0]
for f in files:
if f["name"] == name:
return f
return None
node = files
while path:
segment = path.pop(0)
for f in node:
if not f["name"] == segment:
# wrong name => next!
continue
if not path:
# no path left and name matches => found it!
return f
if not f["type"] == "folder":
# path left but not a folder => that doesn't work
return None
# we'll use this folder's children as the next iteration
node = f["children"]
break
else:
# nothing matched the name, we can't find it
return None
# nothing returned until now => not found
return None
def _getFileList(origin, filter=None):
def _getFileList(origin, filter=None, recursive=False):
if origin == FileDestinations.SDCARD:
sdFileList = printer.get_sd_files()
@ -78,45 +115,59 @@ def _getFileList(origin, filter=None):
filter_func = None
if filter:
filter_func = lambda entry, entry_data: octoprint.filemanager.valid_file_type(entry, type=filter)
files = fileManager.list_files(origin, filter=filter_func, recursive=False)[origin].values()
for file in files:
file["origin"] = FileDestinations.LOCAL
if "analysis" in file and octoprint.filemanager.valid_file_type(file["name"], type="gcode"):
file["gcodeAnalysis"] = file["analysis"]
del file["analysis"]
files = fileManager.list_files(origin, filter=filter_func, recursive=recursive)[origin].values()
if "history" in file and octoprint.filemanager.valid_file_type(file["name"], type="gcode"):
# convert print log
history = file["history"]
del file["history"]
success = 0
failure = 0
last = None
for entry in history:
success += 1 if "success" in entry and entry["success"] else 0
failure += 1 if "success" in entry and not entry["success"] else 0
if not last or ("timestamp" in entry and "timestamp" in last and entry["timestamp"] > last["timestamp"]):
last = entry
if last:
prints = dict(
success=success,
failure=failure,
last=dict(
success=last["success"],
date=last["timestamp"]
)
)
if "printTime" in last:
prints["last"]["printTime"] = last["printTime"]
file["prints"] = prints
def analyse_recursively(files, path=None):
if path is None:
path = ""
for file in files:
file["origin"] = FileDestinations.LOCAL
if file["type"] == "folder":
file["children"] = analyse_recursively(file["children"].values(), path + file["name"] + "/")
else:
if "analysis" in file and octoprint.filemanager.valid_file_type(file["name"], type="gcode"):
file["gcodeAnalysis"] = file["analysis"]
del file["analysis"]
if "history" in file and octoprint.filemanager.valid_file_type(file["name"], type="gcode"):
# convert print log
history = file["history"]
del file["history"]
success = 0
failure = 0
last = None
for entry in history:
success += 1 if "success" in entry and entry["success"] else 0
failure += 1 if "success" in entry and not entry["success"] else 0
if not last or ("timestamp" in entry and "timestamp" in last and entry["timestamp"] > last["timestamp"]):
last = entry
if last:
prints = dict(
success=success,
failure=failure,
last=dict(
success=last["success"],
date=last["timestamp"]
)
)
if "printTime" in last:
prints["last"]["printTime"] = last["printTime"]
file["prints"] = prints
file.update({
"refs": {
"resource": url_for(".readGcodeFile", target=FileDestinations.LOCAL, filename=path + file["name"], _external=True),
"download": url_for("index", _external=True) + "downloads/files/" + FileDestinations.LOCAL + "/" + path + file["name"]
}
})
return files
analyse_recursively(files)
file.update({
"refs": {
"resource": url_for(".readGcodeFile", target=FileDestinations.LOCAL, filename=file["name"], _external=True),
"download": url_for("index", _external=True) + "downloads/files/" + FileDestinations.LOCAL + "/" + file["name"]
}
})
return files
@ -127,139 +178,180 @@ def _verifyFileExists(origin, filename):
return fileManager.file_exists(origin, filename)
def _verifyFolderExists(origin, foldername):
if origin == FileDestinations.SDCARD:
return False
else:
return fileManager.folder_exists(origin, foldername)
def _isBusy(target, path):
currentOrigin, currentFilename = _getCurrentFile()
if currentFilename is not None and currentOrigin == target and fileManager.file_in_path(FileDestinations.LOCAL, path, currentFilename) and (printer.is_printing() or printer.is_paused()):
return True
return any(target == x[0] and fileManager.file_in_path(FileDestinations.LOCAL, path, x[1]) for x in fileManager.get_busy_files())
@api.route("/files/<string:target>", methods=["POST"])
@restricted_access
def uploadGcodeFile(target):
if not target in [FileDestinations.LOCAL, FileDestinations.SDCARD]:
return make_response("Unknown target: %s" % target, 404)
input_name = "file"
input_upload_name = input_name + "." + settings().get(["server", "uploads", "nameSuffix"])
input_upload_path = input_name + "." + settings().get(["server", "uploads", "pathSuffix"])
if input_upload_name in request.values and input_upload_path in request.values:
if not target in [FileDestinations.LOCAL, FileDestinations.SDCARD]:
return make_response("Unknown target: %s" % target, 404)
upload = octoprint.filemanager.util.DiskFileWrapper(request.values[input_upload_name], request.values[input_upload_path])
else:
return make_response("No file included", 400)
# Store any additional user data the caller may have passed.
userdata = None
if "userdata" in request.values:
import json
# Store any additional user data the caller may have passed.
userdata = None
if "userdata" in request.values:
import json
try:
userdata = json.loads(request.values["userdata"])
except:
return make_response("userdata contains invalid JSON", 400)
if target == FileDestinations.SDCARD and not settings().getBoolean(["feature", "sdSupport"]):
return make_response("SD card support is disabled", 404)
sd = target == FileDestinations.SDCARD
selectAfterUpload = "select" in request.values.keys() and request.values["select"] in valid_boolean_trues
printAfterSelect = "print" in request.values.keys() and request.values["print"] in valid_boolean_trues
if sd:
# validate that all preconditions for SD upload are met before attempting it
if not (printer.is_operational() and not (printer.is_printing() or printer.is_paused())):
return make_response("Can not upload to SD card, printer is either not operational or already busy", 409)
if not printer.is_sd_ready():
return make_response("Can not upload to SD card, not yet initialized", 409)
# determine current job
currentPath = None
currentFilename = None
currentOrigin = None
currentJob = printer.get_current_job()
if currentJob is not None and "file" in currentJob.keys():
currentJobFile = currentJob["file"]
if currentJobFile is not None and "name" in currentJobFile.keys() and "origin" in currentJobFile.keys() and currentJobFile["name"] is not None and currentJobFile["origin"] is not None:
currentPath, currentFilename = fileManager.sanitize(FileDestinations.LOCAL, currentJobFile["name"])
currentOrigin = currentJobFile["origin"]
# determine future filename of file to be uploaded, abort if it can't be uploaded
try:
userdata = json.loads(request.values["userdata"])
# FileDestinations.LOCAL = should normally be target, but can't because SDCard handling isn't implemented yet
futurePath, futureFilename = fileManager.sanitize(FileDestinations.LOCAL, upload.filename)
except:
return make_response("userdata contains invalid JSON", 400)
futurePath = None
futureFilename = None
if target == FileDestinations.SDCARD and not settings().getBoolean(["feature", "sdSupport"]):
return make_response("SD card support is disabled", 404)
if futureFilename is None:
return make_response("Can not upload file %s, wrong format?" % upload.filename, 415)
sd = target == FileDestinations.SDCARD
selectAfterUpload = "select" in request.values.keys() and request.values["select"] in valid_boolean_trues
printAfterSelect = "print" in request.values.keys() and request.values["print"] in valid_boolean_trues
if "path" in request.values and request.values["path"]:
# FileDestinations.LOCAL = should normally be target, but can't because SDCard handling isn't implemented yet
futurePath = fileManager.sanitize_path(FileDestinations.LOCAL, request.values["path"])
if sd:
# validate that all preconditions for SD upload are met before attempting it
if not (printer.is_operational() and not (printer.is_printing() or printer.is_paused())):
return make_response("Can not upload to SD card, printer is either not operational or already busy", 409)
if not printer.is_sd_ready():
return make_response("Can not upload to SD card, not yet initialized", 409)
# prohibit overwriting currently selected file while it's being printed
if futurePath == currentPath and futureFilename == currentFilename and target == currentOrigin and (printer.is_printing() or printer.is_paused()):
return make_response("Trying to overwrite file that is currently being printed: %s" % currentFilename, 409)
# determine current job
currentFilename = None
currentOrigin = None
currentJob = printer.get_current_job()
if currentJob is not None and "file" in currentJob.keys():
currentJobFile = currentJob["file"]
if "name" in currentJobFile.keys() and "origin" in currentJobFile.keys():
currentFilename = currentJobFile["name"]
currentOrigin = currentJobFile["origin"]
def fileProcessingFinished(filename, absFilename, destination):
"""
Callback for when the file processing (upload, optional slicing, addition to analysis queue) has
finished.
# determine future filename of file to be uploaded, abort if it can't be uploaded
try:
futureFilename = fileManager.sanitize_name(FileDestinations.LOCAL, upload.filename)
except:
futureFilename = None
if futureFilename is None:
return make_response("Can not upload file %s, wrong format?" % upload.filename, 415)
Depending on the file's destination triggers either streaming to SD card or directly calls selectAndOrPrint.
"""
# prohibit overwriting currently selected file while it's being printed
if futureFilename == currentFilename and target == currentOrigin and printer.is_printing() or printer.is_paused():
return make_response("Trying to overwrite file that is currently being printed: %s" % currentFilename, 409)
if destination == FileDestinations.SDCARD and octoprint.filemanager.valid_file_type(filename, "gcode"):
return filename, printer.add_sd_file(filename, absFilename, selectAndOrPrint)
else:
selectAndOrPrint(filename, absFilename, destination)
return filename
def fileProcessingFinished(filename, absFilename, destination):
"""
Callback for when the file processing (upload, optional slicing, addition to analysis queue) has
finished.
def selectAndOrPrint(filename, absFilename, destination):
"""
Callback for when the file is ready to be selected and optionally printed. For SD file uploads this is only
the case after they have finished streaming to the printer, which is why this callback is also used
for the corresponding call to addSdFile.
Depending on the file's destination triggers either streaming to SD card or directly calls selectAndOrPrint.
"""
Selects the just uploaded file if either selectAfterUpload or printAfterSelect are True, or if the
exact file is already selected, such reloading it.
"""
if octoprint.filemanager.valid_file_type(added_file, "gcode") and (selectAfterUpload or printAfterSelect or (currentFilename == filename and currentOrigin == destination)):
printer.select_file(absFilename, destination == FileDestinations.SDCARD, printAfterSelect)
if destination == FileDestinations.SDCARD and octoprint.filemanager.valid_file_type(filename, "gcode"):
return filename, printer.add_sd_file(filename, absFilename, selectAndOrPrint)
# FileDestinations.LOCAL = should normally be target, but can't because SDCard handling isn't implemented yet
futureFullPath = fileManager.join_path(FileDestinations.LOCAL, futurePath, futureFilename)
added_file = fileManager.add_file(FileDestinations.LOCAL, futureFullPath, upload, allow_overwrite=True)
if added_file is None:
return make_response("Could not upload the file %s" % upload.filename, 500)
if octoprint.filemanager.valid_file_type(added_file, "stl"):
filename = added_file
done = True
else:
selectAndOrPrint(filename, absFilename, destination)
return filename
filename = fileProcessingFinished(added_file, fileManager.path_on_disk(FileDestinations.LOCAL, added_file), target)
done = True
def selectAndOrPrint(filename, absFilename, destination):
"""
Callback for when the file is ready to be selected and optionally printed. For SD file uploads this is only
the case after they have finished streaming to the printer, which is why this callback is also used
for the corresponding call to addSdFile.
if userdata is not None:
# upload included userdata, add this now to the metadata
fileManager.set_additional_metadata(FileDestinations.LOCAL, added_file, "userdata", userdata)
Selects the just uploaded file if either selectAfterUpload or printAfterSelect are True, or if the
exact file is already selected, such reloading it.
"""
if octoprint.filemanager.valid_file_type(added_file, "gcode") and (selectAfterUpload or printAfterSelect or (currentFilename == filename and currentOrigin == destination)):
printer.select_file(absFilename, destination == FileDestinations.SDCARD, printAfterSelect)
sdFilename = None
if isinstance(filename, tuple):
filename, sdFilename = filename
added_file = fileManager.add_file(FileDestinations.LOCAL, upload.filename, upload, allow_overwrite=True)
if added_file is None:
return make_response("Could not upload the file %s" % upload.filename, 500)
if octoprint.filemanager.valid_file_type(added_file, "stl"):
filename = added_file
done = True
else:
filename = fileProcessingFinished(added_file, fileManager.path_on_disk(FileDestinations.LOCAL, added_file), target)
done = True
eventManager.fire(Events.UPLOAD, {"file": filename, "target": target})
if userdata is not None:
# upload included userdata, add this now to the metadata
fileManager.set_additional_metadata(FileDestinations.LOCAL, added_file, "userdata", userdata)
sdFilename = None
if isinstance(filename, tuple):
filename, sdFilename = filename
eventManager.fire(Events.UPLOAD, {"file": filename, "target": target})
files = {}
location = url_for(".readGcodeFile", target=FileDestinations.LOCAL, filename=filename, _external=True)
files.update({
FileDestinations.LOCAL: {
"name": filename,
"origin": FileDestinations.LOCAL,
"refs": {
"resource": location,
"download": url_for("index", _external=True) + "downloads/files/" + FileDestinations.LOCAL + "/" + filename
}
}
})
if sd and sdFilename:
location = url_for(".readGcodeFile", target=FileDestinations.SDCARD, filename=sdFilename, _external=True)
files = {}
location = url_for(".readGcodeFile", target=FileDestinations.LOCAL, filename=filename, _external=True)
files.update({
FileDestinations.SDCARD: {
"name": sdFilename,
"origin": FileDestinations.SDCARD,
FileDestinations.LOCAL: {
"name": filename,
"origin": FileDestinations.LOCAL,
"refs": {
"resource": location
"resource": location,
"download": url_for("index", _external=True) + "downloads/files/" + FileDestinations.LOCAL + "/" + filename
}
}
})
r = make_response(jsonify(files=files, done=done), 201)
r.headers["Location"] = location
return r
if sd and sdFilename:
location = url_for(".readGcodeFile", target=FileDestinations.SDCARD, filename=sdFilename, _external=True)
files.update({
FileDestinations.SDCARD: {
"name": sdFilename,
"origin": FileDestinations.SDCARD,
"refs": {
"resource": location
}
}
})
r = make_response(jsonify(files=files, done=done), 201)
r.headers["Location"] = location
return r
elif "foldername" in request.values:
foldername = request.values["foldername"]
if not target in [FileDestinations.LOCAL]:
return make_response("Unknown target: %s" % target, 400)
futurePath, futureName = fileManager.sanitize(target, foldername)
futureFullPath = fileManager.join_path(target, futurePath, futureName)
if octoprint.filemanager.valid_file_type(futureName):
return make_response("Can't create a folder named %s, please try another name" % futureName, 409)
added_folder = fileManager.add_folder(target, futureFullPath)
if added_folder is None:
return make_response("Could not create folder %s" % futureName, 500)
else:
return make_response("No file to upload and no folder to create", 400)
return NO_CONTENT
@api.route("/files/<string:target>/<path:filename>", methods=["GET"])
@ -280,13 +372,12 @@ def gcodeFileCommand(filename, target):
if not target in [FileDestinations.LOCAL, FileDestinations.SDCARD]:
return make_response("Unknown target: %s" % target, 404)
if not _verifyFileExists(target, filename):
return make_response("File not found on '%s': %s" % (target, filename), 404)
# valid file commands, dict mapping command name to mandatory parameters
valid_commands = {
"select": [],
"slice": []
"slice": [],
"copy": ["destination"],
"move": ["destination"]
}
command, data, response = get_json_command_from_request(request, valid_commands)
@ -294,6 +385,9 @@ def gcodeFileCommand(filename, target):
return response
if command == "select":
if not _verifyFileExists(target, filename):
return make_response("File not found on '%s': %s" % (target, filename), 404)
# selects/loads a file
if not octoprint.filemanager.valid_file_type(filename, type="machinecode"):
return make_response("Cannot select {filename} for printing, not a machinecode file".format(**locals()), 415)
@ -313,6 +407,9 @@ def gcodeFileCommand(filename, target):
printer.select_file(filenameToSelect, sd, printAfterLoading)
elif command == "slice":
if not _verifyFileExists(target, filename):
return make_response("File not found on '%s': %s" % (target, filename), 404)
try:
if "slicer" in data:
slicer = data["slicer"]
@ -343,6 +440,13 @@ def gcodeFileCommand(filename, target):
name, _ = os.path.splitext(filename)
gcode_name = name + ".gco"
if "path" in data and data["path"]:
gcode_name = fileManager.join_path(target, data["path"], gcode_name)
else:
path, _ = fileManager.split_path(target, filename)
if path:
gcode_name = fileManager.join_path(target, path, gcode_name)
# prohibit overwriting the file that is currently being printed
currentOrigin, currentFilename = _getCurrentFile()
if currentFilename == gcode_name and currentOrigin == target and (printer.is_printing() or printer.is_paused()):
@ -419,35 +523,82 @@ def gcodeFileCommand(filename, target):
r.headers["Location"] = location
return r
elif command == "copy" or command == "move":
# Copy and move are only possible on local storage
if not target in [FileDestinations.LOCAL]:
return make_response("Unsupported target for {}: {}".format(command, target), 400)
if not _verifyFileExists(target, filename) and not _verifyFolderExists(target, filename):
return make_response("File/Folder not found on '%s': %s" % (target, filename), 404)
destination = data["destination"]
if _verifyFolderExists(target, destination):
path, name = fileManager.split_path(target, filename)
destination = fileManager.join_path(target, destination, name)
if _verifyFileExists(target, destination) or _verifyFolderExists(target, destination):
return make_response("File/Folder already exists: %s" % filename, 409)
if command == "copy":
if fileManager.file_exists(target, filename):
fileManager.copy_file(target, filename, destination)
elif fileManager.folder_exists(target, filename):
fileManager.copy_folder(target, filename, destination)
elif command == "move":
if _isBusy(target, filename):
return make_response("Trying to move a file/folder that is currently in use: %s" % filename, 409)
# deselect the file if it's currently selected
currentOrigin, currentFilename = _getCurrentFile()
if currentFilename is not None and filename == currentFilename:
printer.unselect_file()
if fileManager.file_exists(target, filename):
fileManager.move_file(target, filename, destination)
elif fileManager.folder_exists(target, filename):
fileManager.move_folder(target, filename, destination)
return NO_CONTENT
@api.route("/files/<string:target>/<path:filename>", methods=["DELETE"])
@restricted_access
def deleteGcodeFile(filename, target):
if not target in [FileDestinations.LOCAL, FileDestinations.SDCARD]:
return make_response("Unknown target: %s" % target, 404)
if not _verifyFileExists(target, filename) and not _verifyFolderExists(target, filename):
return make_response("File/Folder not found on '%s': %s" % (target, filename), 404)
if not _verifyFileExists(target, filename):
return make_response("File not found on '%s': %s" % (target, filename), 404)
if _verifyFileExists(target, filename):
if not target in [FileDestinations.LOCAL, FileDestinations.SDCARD]:
return make_response("Unknown target: %s" % target, 400)
# prohibit deleting files that are currently in use
currentOrigin, currentFilename = _getCurrentFile()
if currentFilename == filename and currentOrigin == target and (printer.is_printing() or printer.is_paused()):
make_response("Trying to delete file that is currently being printed: %s" % filename, 409)
if _isBusy(target, filename):
return make_response("Trying to delete a file that is currently in use: %s" % filename, 409)
if (target, filename) in fileManager.get_busy_files():
make_response("Trying to delete a file that is currently in use: %s" % filename, 409)
# deselect the file if it's currently selected
currentOrigin, currentFilename = _getCurrentFile()
if currentFilename is not None and currentOrigin == target and filename == currentFilename:
printer.unselect_file()
# deselect the file if it's currently selected
if currentFilename is not None and filename == currentFilename:
printer.unselect_file()
# delete it
if target == FileDestinations.SDCARD:
printer.delete_sd_file(filename)
else:
fileManager.remove_file(target, filename)
# delete it
if target == FileDestinations.SDCARD:
printer.delete_sd_file(filename)
else:
fileManager.remove_file(target, filename)
elif _verifyFolderExists(target, filename):
if not target in [FileDestinations.LOCAL]:
return make_response("Unknown target: %s" % target, 400)
if _isBusy(target, filename):
return make_response("Trying to delete a folder that contains a file that is currently in use: %s" % filename, 409)
# deselect the file if it's currently selected
currentOrigin, currentFilename = _getCurrentFile()
if currentFilename is not None and currentOrigin == target and fileManager.file_in_path(target, filename, currentFilename):
printer.unselect_file()
# delete it
fileManager.remove_folder(target, filename)
return NO_CONTENT

View file

@ -800,12 +800,16 @@ class LargeResponseHandler(tornado.web.StaticFileHandler):
self._access_validation(self.request)
if self._path_validation is not None:
self._path_validation(path)
if "cookie" in self.request.arguments:
self.set_cookie(self.request.arguments["cookie"][0], "true", path="/")
result = tornado.web.StaticFileHandler.get(self, path, include_body=include_body)
return result
def set_extra_headers(self, path):
if self._as_attachment:
self.set_header("Content-Disposition", "attachment")
self.set_header("Content-Disposition", "attachment; filename=%s" % os.path.basename(path))
if not self._allow_client_caching:
self.set_header("Cache-Control", "max-age=0, must-revalidate, private")

View file

@ -2,12 +2,12 @@
'use strict';
if (typeof define === 'function' && define.amd) {
// Register as an anonymous AMD module:
define(["OctoPrint", "jquery"], factory);
define(["OctoPrint", "jquery", "lodash"], factory);
} else {
// Browser globals:
factory(window.OctoPrint, window.jQuery);
factory(window.OctoPrint, window.jQuery, window._);
}
})(function(OctoPrint, $) {
})(function(OctoPrint, $, _) {
var url = "api/files";
var resourceForLocation = function(location) {
@ -27,15 +27,33 @@
return OctoPrint.get(resourceForFile(location, filename), opts);
};
var preProcessList = function(response) {
var recursiveCheck = function(element, index, list) {
if (!element.hasOwnProperty("parent")) element.parent = { children: list, parent: undefined };
if (!element.hasOwnProperty("size")) element.size = undefined;
if (!element.hasOwnProperty("date")) element.date = undefined;
if (element.type == "folder") {
_.each(element.children, function(e, i, l) {
e.parent = element;
recursiveCheck(e, i, l);
});
}
};
_.each(response.files, recursiveCheck);
};
OctoPrint.files = {
get: getFile,
list: function (opts) {
return OctoPrint.get(url, opts);
return OctoPrint.get(url, opts)
.done(preProcessList);
},
listForLocation: function (location, opts) {
return OctoPrint.get(resourceForLocation(location), opts);
return OctoPrint.get(resourceForLocation(location), opts)
.done(preProcessList);
},
select: function (location, filename, print, opts) {
@ -57,6 +75,24 @@
return OctoPrint.delete(resourceForFile(location, filename), opts);
},
copy: function(location, filename, destination, opts) {
return issueFileCommand(location, filename, "copy", { destination: destination }, opts);
},
move: function(location, filename, destination, opts) {
return issueFileCommand(location, filename, "move", { destination: destination }, opts);
},
createFolder: function (location, name, path) {
var data = "foldername=" + name;
if (path != undefined && path != "") {
data = "foldername:" + path + "/" + name;
}
return OctoPrint.post(resourceForLocation(location), data);
},
upload: function (location, file, data) {
data = data || {};
@ -80,6 +116,45 @@
deferred.reject.apply(null, arguments);
});
return deferred.promise();
},
pathForElement: function(element) {
if (!element || !element.hasOwnProperty("parent") || element.parent == undefined) {
return "";
}
var recursivePath = function(element, path) {
if (element.hasOwnProperty("parent") && element.parent != undefined) {
return recursivePath(element.parent, element.name + "/" + path);
}
return path;
};
return recursivePath(element.parent, element.name);
},
elementByPath: function(location, startElement) {
var recursiveSearch = function(location, element) {
if (location.length == 0) {
return element;
}
if (!element.hasOwnProperty("children")) {
return undefined;
}
var name = location.shift();
for(var i = 0; i < element.children.length; i++) {
if (name == element.children[i].name) {
return recursiveSearch(location, element.children[i]);
}
}
return undefined;
};
return recursiveSearch(location.split("/"), startElement);
}
}
});

View file

@ -336,11 +336,17 @@ $(function() {
},
update: function(element, valueAccessor, allBindings, viewModel, bindingContext) {
setTimeout(function() {
$(element).slimScroll({scrollBy: 0});
if (element.nodeName == "#comment") {
// foreach is bound to a virtual element
$(element.parentElement).slimScroll({scrollBy: 0});
} else {
$(element).slimScroll({scrollBy: 0});
}
}, 10);
return ko.bindingHandlers.foreach.update(element, valueAccessor(), allBindings, viewModel, bindingContext);
}
};
ko.virtualElements.allowedBindings.slimScrolledForeach = true;
ko.bindingHandlers.qrcode = {
update: function(element, valueAccessor, allBindings, viewModel, bindingContext) {

View file

@ -55,6 +55,16 @@ $(function() {
self.uploadButton = undefined;
self.uploadSdButton = undefined;
self.addFolderDialog = undefined;
self.addFolderName = ko.observable(undefined);
self.enableAddFolder = ko.computed(function() {
return self.loginState.isUser() && self.addFolderName() && self.addFolderName().trim() != "";
});
self.allItems = ko.observable(undefined);
self.listStyle = ko.observable("folders_files");
self.currentPath = ko.observable("");
// initialize list helper
self.listHelper = new ItemListHelper(
"gcodeFiles",
@ -79,28 +89,57 @@ $(function() {
}
},
{
"printed": function(file) {
return !(file["prints"] && file["prints"]["success"] && file["prints"]["success"] > 0);
"printed": function(data) {
return !(data["prints"] && data["prints"]["success"] && data["prints"]["success"] > 0) || (data["type"] && data["type"] == "folder");
},
"sd": function(file) {
return file["origin"] && file["origin"] == "sdcard";
"sd": function(data) {
return data["origin"] && data["origin"] == "sdcard";
},
"local": function(file) {
return !(file["origin"] && file["origin"] == "sdcard");
"local": function(data) {
return !(data["origin"] && data["origin"] == "sdcard");
},
"machinecode": function(file) {
return file["type"] && file["type"] == "machinecode";
"machinecode": function(data) {
return data["type"] && (data["type"] == "machinecode" || data["type"] == "folder");
},
"model": function(file) {
return file["type"] && file["type"] == "model";
"model": function(data) {
return data["type"] && (data["type"] == "model" || data["type"] == "folder");
},
"emptyFolder": function(data) {
return data["type"] && (data["type"] != "folder" || data["children"].length != 0);
}
},
"name",
[],
["emptyFolder"],
[["sd", "local"], ["machinecode", "model"]],
0
);
self.foldersOnlyList = ko.dependentObservable(function() {
var filter = function(data) { return data["type"] && data["type"] == "folder"; };
return _.filter(self.listHelper.paginatedItems(), filter);
});
self.filesOnlyList = ko.dependentObservable(function() {
var filter = function(data) { return data["type"] && data["type"] != "folder"; };
return _.filter(self.listHelper.paginatedItems(), filter);
});
self.filesAndFolders = ko.dependentObservable(function() {
var style = self.listStyle();
if (style == "folders_files" || style == "files_folders") {
var files = self.filesOnlyList();
var folders = self.foldersOnlyList();
if (style == "folders_files") {
return folders.concat(files);
} else {
return files.concat(folders);
}
} else {
return self.listHelper.paginatedItems();
}
});
self.isLoadActionPossible = ko.computed(function() {
return self.loginState.isUser() && !self.isPrinting() && !self.isPaused() && !self.isLoading();
});
@ -113,12 +152,20 @@ $(function() {
self.highlightFilename(newValue);
});
self.highlightCurrentFilename = function() {
self.highlightFilename(self.printerState.filename());
};
self.highlightFilename = function(filename) {
if (filename == undefined) {
self.listHelper.selectNone();
} else {
self.listHelper.selectItem(function(item) {
return item.name == filename;
if (item.type == "folder") {
return _.startsWith(filename, OctoPrint.files.pathForElement(item) + "/");
} else {
return OctoPrint.files.pathForElement(item) == filename;
}
});
}
};
@ -143,26 +190,30 @@ $(function() {
};
self._otherRequestInProgress = false;
self.requestData = function(filenameToFocus, locationToFocus) {
self.requestData = function(filenameToFocus, locationToFocus, switchToPath) {
if (self._otherRequestInProgress) return;
self._otherRequestInProgress = true;
OctoPrint.files.list()
OctoPrint.files.list({ data: { recursive: true} })
.done(function(response) {
self.fromResponse(response, filenameToFocus, locationToFocus);
self.fromResponse(response, filenameToFocus, locationToFocus, switchToPath);
})
.always(function() {
self._otherRequestInProgress = false;
});
};
self.fromResponse = function(response, filenameToFocus, locationToFocus) {
self.fromResponse = function(response, filenameToFocus, locationToFocus, switchToPath) {
var files = response.files;
_.each(files, function(element, index, list) {
if (!element.hasOwnProperty("size")) element.size = undefined;
if (!element.hasOwnProperty("date")) element.date = undefined;
});
self.listHelper.updateItems(files);
self.allItems(files);
self.currentPath("");
if (!switchToPath) {
self.listHelper.updateItems(files);
} else {
self.changeFolderByPath(switchToPath);
}
if (filenameToFocus) {
// got a file to scroll to
@ -184,14 +235,55 @@ $(function() {
self.totalSpace(response.total);
}
self.highlightFilename(self.printerState.filename());
self.highlightCurrentFilename();
};
self.changeFolder = function(data) {
self.currentPath(OctoPrint.files.pathForElement(data));
self.listHelper.updateItems(data.children);
self.highlightCurrentFilename();
};
self.navigateUp = function() {
var path = self.currentPath().split("/");
path.pop();
self.changeFolderByPath(path.join("/"));
};
self.changeFolderByPath = function(path) {
var element = OctoPrint.files.elementByPath(path, { children: self.allItems() });
if (element) {
self.currentPath(path);
self.listHelper.updateItems(element.children);
} else{
self.currentPath("");
self.listHelper.updateItems(self.allItems());
}
self.highlightCurrentFilename();
};
self.showAddFolderDialog = function() {
if (self.addFolderDialog) {
self.addFolderDialog.modal("show");
}
};
self.addFolder = function() {
var name = self.addFolderName();
// "local" only for now since we only support local and sdcard,
// and sdcard doesn't support creating folders...
OctoPrint.files.createFolder("local", name, self.currentPath())
.done(function() {
self.addFolderDialog.modal("hide");
});
};
self.loadFile = function(file, printAfterLoad) {
if (!file) {
return;
}
OctoPrint.files.select(file.origin, file.name)
OctoPrint.files.select(file.origin, OctoPrint.files.pathForElement(file))
.done(function() {
if (printAfterLoad) {
OctoPrint.job.start();
@ -203,9 +295,24 @@ $(function() {
if (!file) {
return;
}
OctoPrint.files.delete(file.origin, file.name)
var index = self.listHelper.paginatedItems().indexOf(file) + 1;
if (index >= self.listHelper.paginatedItems().length) {
index = index - 2;
}
if (index < 0) {
index = 0;
}
var filenameToFocus = undefined;
var fileToFocus = self.listHelper.paginatedItems()[index];
if (fileToFocus) {
filenameToFocus = fileToFocus.name;
}
OctoPrint.files.delete(file.origin, OctoPrint.files.pathForElement(file))
.done(function() {
self.requestData();
self.requestData(undefined, filenameToFocus, OctoPrint.files.pathForElement(file.parent));
})
};
@ -214,7 +321,7 @@ $(function() {
return;
}
self.slicing.show(file.origin, file.name, true);
self.slicing.show(file.origin, OctoPrint.files.pathForElement(file), true);
};
self.initSdCard = function() {
@ -327,9 +434,20 @@ $(function() {
var query = self.searchQuery();
if (query !== undefined && query.trim() != "") {
query = query.toLocaleLowerCase();
self.listHelper.changeSearchFunction(function(entry) {
return entry && entry["name"].toLocaleLowerCase().indexOf(query) > -1;
});
var recursiveSearch = function(entry) {
if (entry === undefined) {
return false;
}
if (entry["type"] == "folder" && entry["children"]) {
return _.any(entry["children"], recursiveSearch);
} else {
return entry["name"].toLocaleLowerCase().indexOf(query) > -1;
}
};
self.listHelper.changeSearchFunction(recursiveSearch);
} else {
self.listHelper.resetSearch();
}
@ -338,7 +456,7 @@ $(function() {
};
self.onDataUpdaterReconnect = function() {
self.requestData();
self.requestData(undefined, undefined, self.currentPath());
};
self.onUserLoggedIn = function(user) {
@ -376,6 +494,8 @@ $(function() {
scrollBy: "102px"
});
self.addFolderDialog = $("#add_folder_dialog");
//~~ Gcode upload
self.uploadButton = $("#gcode_upload");
@ -414,7 +534,7 @@ $(function() {
filename = data.result.files.local.name;
location = "local";
}
self.requestData(filename, location);
self.requestData(filename, location, self.currentPath());
if (_.endsWith(filename.toLowerCase(), ".stl")) {
self.slicing.show(location, filename);
@ -462,6 +582,9 @@ $(function() {
done: gcode_upload_done,
fail: gcode_upload_fail,
progressall: gcode_upload_progress
}).bind('fileuploadsubmit', function(e, data) {
if (self.currentPath() != "")
data.formData = { path: self.currentPath() };
});
}
@ -538,26 +661,26 @@ $(function() {
self.onEventUpdatedFiles = function(payload) {
if (payload.type == "gcode") {
self.requestData();
self.requestData(undefined, undefined, self.currentPath());
}
};
self.onEventSlicingDone = function(payload) {
self.requestData();
self.requestData(undefined, undefined, self.currentPath());
};
self.onEventMetadataAnalysisFinished = function(payload) {
self.requestData();
self.requestData(undefined, undefined, self.currentPath());
};
self.onEventMetadataStatisticsUpdated = function(payload) {
self.requestData();
self.requestData(undefined, undefined, self.currentPath());
};
}
OCTOPRINT_VIEWMODELS.push([
GcodeFilesViewModel,
["settingsViewModel", "loginStateViewModel", "printerStateViewModel", "slicingViewModel"],
"#files_wrapper"
["#files_wrapper", "#add_folder_dialog"]
]);
});

View file

@ -7,6 +7,7 @@ $(function() {
self.target = undefined;
self.file = undefined;
self.path = undefined;
self.data = undefined;
self.defaultSlicer = undefined;
@ -34,16 +35,23 @@ $(function() {
];
self.afterSlicing = ko.observable("none");
self.show = function(target, file, force) {
self.show = function(target, file, force, path) {
if (!self.enableSlicingDialog() && !force) {
return;
}
var filename = file.substr(0, file.lastIndexOf("."));
if (filename.lastIndexOf("/") != 0) {
path = path || filename.substr(0, filename.lastIndexOf("/"));
filename = filename.substr(filename.lastIndexOf("/") + 1);
}
self.requestData();
self.target = target;
self.file = file;
self.title(_.sprintf(gettext("Slicing %(filename)s"), {filename: self.file}));
self.gcodeFilename(self.file.substr(0, self.file.lastIndexOf(".")));
self.path = path;
self.title(_.sprintf(gettext("Slicing %(filename)s"), {filename: filename}));
self.gcodeFilename(filename);
self.printerProfile(self.printerProfiles.currentProfile());
self.afterSlicing("none");
$("#slicing_configuration_dialog").modal("show");
@ -149,6 +157,10 @@ $(function() {
gcode: gcodeFilename
};
if (self.path != undefined) {
data["path"] = self.path;
}
if (self.afterSlicing() == "print") {
data["print"] = true;
} else if (self.afterSlicing() == "select") {

View file

@ -0,0 +1,21 @@
<div id="add_folder_dialog" class="modal hide fade">
<div class="modal-header">
<a href="#" class="close" data-dismiss="modal" aria-hidden="true">&times;</a>
<h3>Create Folder</h3>
</div>
<div class="modal-body">
<p>{{ _('Please specify the name of the folder to create.') }}</p>
<form class="form-horizontal">
<div class="control-group">
<label class="control-label">{{ _('Folder name') }}</label>
<div class="controls">
<input type="text" data-bind="value: addFolderName, valueUpdate: 'afterkeydown'">
</div>
</div>
</form>
</div>
<div class="modal-footer">
<a href="#" class="btn" data-dismiss="modal" aria-hidden="true">{{ _('Cancel') }}</a>
<a href="#" class="btn btn-primary" data-bind="click: function() { if ($root.enableAddFolder()) { $root.addFolder() } }, enabled: $root.enableAddFolder(), css: {disabled: !$root.enableAddFolder()}">{{ _('Create') }}</a>
</div>
</div>

View file

@ -130,6 +130,7 @@
{% include 'dialogs/usersettings.jinja2' %}
{% include 'dialogs/wizard.jinja2' %}
{% include 'dialogs/about.jinja2' %}
{% include 'dialogs/files.jinja2' %}
<!-- End of dialogs -->
<!-- Overlays -->

View file

@ -1,8 +1,12 @@
<form class="form-search" data-bind="submit: performSearch">
<input type="text" class="input-block search-query" data-bind="value: searchQuery, valueUpdate: 'input'" placeholder="{{ _('Search...') }}">
</form>
<div class="gcode_files" data-bind="slimScrolledForeach: listHelper.paginatedItems">
<div class="gcode_files">
<div class="entry" data-bind="visible: currentPath() != '', click: function() { $root.navigateUp(); }" style="display: none"><i class="icon-arrow-left"></i> {{ _('Back') }}</div>
<!-- ko slimScrolledForeach: filesAndFolders -->
<div class="entry" data-bind="attr: { id: $root.getEntryId($data) }, template: { name: $root.templateFor($data), data: $data }"></div>
<!-- /ko -->
<script type="text/html" id="files_template_machinecode">
<div class="title" data-bind="css: $root.getSuccessClass($data), style: { 'font-weight': $root.listHelper.isSelected($data) ? 'bold' : 'normal' }, text: name"></div>
@ -30,13 +34,21 @@
</script>
<script type="text/html" id="files_template_folder">
<div class="title" data-bind="text: name"></div>
<div data-bind="click: $root.changeFolder">
<div class="title" data-bind="style: { 'font-weight': $root.listHelper.isSelected($data) ? 'bold' : 'normal' }"><i class="icon-folder-open"></i> <span data-bind="text: name"></span></div>
<div class="size">{{ _('Size') }}: <span data-bind="text: formatSize(size)"></span></div>
</div>
</script>
</div>
<div class="text-right muted" data-bind="attr: {title: diskusageString}, css: {'text-error': diskusageCritical}, style: {'font-weight': diskusageCritical() || diskusageWarning() ? 'bold' : 'normal'}">
<small>{{ _('Free') }}: <span data-bind="text: freeSpaceString"></span> / {{ _('Total') }}: <span data-bind="text: totalSpaceString"></span> <i class="icon-exclamation-sign" data-bind="visible: diskusageWarning" style="display: none"></i></small>
</div>
<div style="display: none;" data-bind="visible: loginState.isUser">
<div class="row-fluid folder-button">
<span class="btn addfolder-button span12" data-bind="click: function() { if ($root.loginState.isUser()) { $root.showAddFolderDialog(); } }, css: {disabled: !$root.loginState.isUser()}">
<i class="icon-folder-close"></i> {{ _('Create folder...') }}
</span>
</div>
<div class="row-fluid upload-buttons">
{% if enableSdSupport %}
<span class="btn btn-primary fileinput-button span6" data-bind="css: {disabled: !$root.loginState.isUser()}" style="margin-bottom: 10px">

View file

@ -6,6 +6,14 @@
<li><a href="#" data-bind="click: function() { $root.listHelper.changeSorting('name'); }"><i class="icon-ok" data-bind="style: {visibility: listHelper.currentSorting() == 'name' ? 'visible' : 'hidden'}"></i> {{ _('Sort by name') }} ({{ _('ascending') }})</a></li>
<li><a href="#" data-bind="click: function() { $root.listHelper.changeSorting('upload'); }"><i class="icon-ok" data-bind="style: {visibility: listHelper.currentSorting() == 'upload' ? 'visible' : 'hidden'}"></i> {{ _('Sort by upload date') }} ({{ _('descending') }})</a></li>
<li><a href="#" data-bind="click: function() { $root.listHelper.changeSorting('size'); }"><i class="icon-ok" data-bind="style: {visibility: listHelper.currentSorting() == 'size' ? 'visible' : 'hidden'}"></i> {{ _('Sort by file size') }} ({{ _('descending') }})</a></li>
<li class="dropdown-submenu">
<a href="#"><i class="icon-ok" style="visibility: hidden"></i> {{ _('Folders') }}</a>
<ul class="dropdown-menu">
<li><a href="#" data-bind="click: function() { $root.listStyle('folders_files'); }"><i class="icon-ok" data-bind="style: {visibility: listStyle() == 'folders_files' ? 'visible' : 'hidden'}"></i> {{ _('Sort by Folders, Files') }}</a></li>
<li><a href="#" data-bind="click: function() { $root.listStyle('files_folders'); }"><i class="icon-ok" data-bind="style: {visibility: listStyle() == 'files_folders' ? 'visible' : 'hidden'}"></i> {{ _('Sort by Files, Folders') }}</a></li>
<li><a href="#" data-bind="click: function() { $root.listStyle('mixed'); }"><i class="icon-ok" data-bind="style: {visibility: listStyle() == 'mixed' ? 'visible' : 'hidden'}"></i> {{ _('Mixed') }}</a></li>
</ul>
</li>
<li class="divider"></li>
<li><a href="#" data-bind="click: function() { $root.listHelper.toggleFilter('machinecode'); }"><i class="icon-ok" data-bind="style: {visibility: _.contains(listHelper.currentFilters(), 'machinecode') ? 'visible' : 'hidden'}"></i> {{ _('Only show GCode files') }}</a></li>
<li><a href="#" data-bind="click: function() { $root.listHelper.toggleFilter('model'); }"><i class="icon-ok" data-bind="style: {visibility: _.contains(listHelper.currentFilters(), 'model') ? 'visible' : 'hidden'}"></i> {{ _('Only show STL files') }}</a></li>
@ -15,6 +23,7 @@
<li><a href="#" data-bind="click: function() { $root.listHelper.toggleFilter('sd'); }"><i class="icon-ok" data-bind="style: {visibility: _.contains(listHelper.currentFilters(), 'sd') ? 'visible' : 'hidden'}"></i> {{ _('Only show files stored on SD') }}</a></li>
{% endif %}
<li class="divider"></li>
<li><a href="#" data-bind="click: function() { $root.listHelper.toggleFilter('emptyFolder'); }"><i class="icon-ok" data-bind="style: {visibility: _.contains(listHelper.currentFilters(), 'emptyFolder') ? 'visible' : 'hidden'}"></i> {{ _('Hide empty folders') }}</a></li>
<li><a href="#" data-bind="click: function() { $root.listHelper.toggleFilter('printed'); }"><i class="icon-ok" data-bind="style: {visibility: _.contains(listHelper.currentFilters(), 'printed') ? 'visible' : 'hidden'}"></i> {{ _('Hide successfully printed files') }}</a></li>
</ul>
</div>

View file

@ -12,7 +12,7 @@ import os.path
from ddt import ddt, unpack, data
import octoprint.filemanager.storage
from octoprint.filemanager.storage import LocalFileStorage, StorageError
class FileWrapper(object):
@ -43,7 +43,7 @@ class LocalStorageTest(unittest.TestCase):
def setUp(self):
import tempfile
self.basefolder = os.path.realpath(os.path.abspath(tempfile.mkdtemp()))
self.storage = octoprint.filemanager.storage.LocalFileStorage(self.basefolder)
self.storage = LocalFileStorage(self.basefolder)
# mock file manager module
self.filemanager_patcher = mock.patch("octoprint.filemanager")
@ -67,24 +67,24 @@ class LocalStorageTest(unittest.TestCase):
self.filemanager_patcher.stop()
def test_add_file(self):
self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
def test_add_file_overwrite(self):
self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
try:
self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL, overwrite=False)
self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL, overwrite=False)
except:
pass
self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL, overwrite=True)
self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL, overwrite=True)
def test_add_file_with_web(self):
import time
href = "http://www.example.com"
retrieved = time.time()
stl_name = self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL, links=[("web", dict(href=href, retrieved=retrieved))])
stl_name = self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL, links=[("web", dict(href=href, retrieved=retrieved))])
stl_metadata = self.storage.get_metadata(stl_name)
self.assertIsNotNone(stl_metadata)
@ -97,8 +97,8 @@ class LocalStorageTest(unittest.TestCase):
self.assertEquals(retrieved, link["retrieved"])
def test_add_file_with_association(self):
stl_name = self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE, links=[("model", dict(name=stl_name))])
stl_name = self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_and_verify_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE, links=[("model", dict(name=stl_name))])
stl_metadata = self.storage.get_metadata(stl_name)
gcode_metadata = self.storage.get_metadata(gcode_name)
@ -122,8 +122,8 @@ class LocalStorageTest(unittest.TestCase):
self.assertEquals(FILE_BP_CASE_GCODE.hash, link["hash"])
def test_remove_file(self):
stl_name = self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE, links=[("model", dict(name=stl_name))])
stl_name = self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_and_verify_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE, links=[("model", dict(name=stl_name))])
stl_metadata = self.storage.get_metadata(stl_name)
gcode_metadata = self.storage.get_metadata(gcode_name)
@ -142,23 +142,94 @@ class LocalStorageTest(unittest.TestCase):
self.assertEquals(0, len(gcode_metadata["links"]))
def test_copy_file(self):
self._add_file("bp_case.stl", FILE_BP_CASE_STL)
self._add_folder("test")
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "bp_case.stl")))
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, "test")))
self.storage.copy_file("bp_case.stl", "test/copied.stl")
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "bp_case.stl")))
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "test", "copied.stl")))
stl_metadata = self.storage.get_metadata("bp_case.stl")
copied_metadata = self.storage.get_metadata("test/copied.stl")
self.assertIsNotNone(stl_metadata)
self.assertIsNotNone(copied_metadata)
self.assertDictEqual(stl_metadata, copied_metadata)
def test_move_file(self):
self._add_file("bp_case.stl", FILE_BP_CASE_STL)
self._add_folder("test")
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "bp_case.stl")))
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, "test")))
before_stl_metadata = self.storage.get_metadata("bp_case.stl")
self.storage.move_file("bp_case.stl", "test/copied.stl")
self.assertFalse(os.path.isfile(os.path.join(self.basefolder, "bp_case.stl")))
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "test", "copied.stl")))
after_stl_metadata = self.storage.get_metadata("bp_case.stl")
copied_metadata = self.storage.get_metadata("test/copied.stl")
self.assertIsNotNone(before_stl_metadata)
self.assertIsNone(after_stl_metadata)
self.assertIsNotNone(copied_metadata)
self.assertDictEqual(before_stl_metadata, copied_metadata)
@data("copy_file", "move_file")
def test_copy_move_file_missing_source(self, operation):
try:
getattr(self.storage, operation)("bp_case.stl", "test/copied.stl")
self.fail("Expected an exception")
except StorageError as e:
self.assertEqual(e.code, StorageError.INVALID_SOURCE)
@data("copy_file", "move_file")
def test_copy_move_file_missing_destination_folder(self, operation):
self._add_file("bp_case.stl", FILE_BP_CASE_STL)
try:
getattr(self.storage, operation)("bp_case.stl", "test/copied.stl")
self.fail("Expected an exception")
except StorageError as e:
self.assertEqual(e.code, StorageError.INVALID_DESTINATION)
@data("copy_file", "move_file")
def test_copy_move_file_existing_destination_path(self, operation):
self._add_file("bp_case.stl", FILE_BP_CASE_STL)
self._add_folder("test")
self._add_file("test/crazyradio.stl", FILE_CRAZYRADIO_STL)
try:
getattr(self.storage, operation)("bp_case.stl", "test/crazyradio.stl")
self.fail("Expected an exception")
except StorageError as e:
self.assertEqual(e.code, StorageError.INVALID_DESTINATION)
def test_add_folder(self):
self._add_folder("test", "test")
self._add_and_verify_folder("test", "test")
def test_add_subfolder(self):
folder_name = self._add_folder("folder with some spaces", "folder_with_some_spaces")
subfolder_name = self._add_folder((folder_name, "subfolder"), folder_name + "/subfolder")
stl_name = self._add_file((subfolder_name, "bp_case.stl"), subfolder_name + "/bp_case.stl", FILE_BP_CASE_STL)
folder_name = self._add_and_verify_folder("folder with some spaces", "folder_with_some_spaces")
subfolder_name = self._add_and_verify_folder((folder_name, "subfolder"), folder_name + "/subfolder")
stl_name = self._add_and_verify_file((subfolder_name, "bp_case.stl"), subfolder_name + "/bp_case.stl", FILE_BP_CASE_STL)
self.assertTrue(os.path.exists(os.path.join(self.basefolder, folder_name)))
self.assertTrue(os.path.exists(os.path.join(self.basefolder, subfolder_name)))
self.assertTrue(os.path.exists(os.path.join(self.basefolder, stl_name)))
def test_remove_folder(self):
content_folder = self._add_folder("content", "content")
other_stl_name = self._add_file((content_folder, "crazyradio.stl"), content_folder + "/crazyradio.stl", FILE_CRAZYRADIO_STL)
content_folder = self._add_and_verify_folder("content", "content")
other_stl_name = self._add_and_verify_file((content_folder, "crazyradio.stl"), content_folder + "/crazyradio.stl", FILE_CRAZYRADIO_STL)
empty_folder = self._add_folder("empty", "empty")
empty_folder = self._add_and_verify_folder("empty", "empty")
try:
self.storage.remove_folder(content_folder, recursive=False)
@ -177,20 +248,94 @@ class LocalStorageTest(unittest.TestCase):
self.assertFalse(os.path.isdir(os.path.join(self.basefolder, empty_folder)))
def test_remove_folder_with_metadata(self):
content_folder = self._add_folder("content", "content")
other_stl_name = self._add_file((content_folder, "crazyradio.stl"), content_folder + "/crazyradio.stl", FILE_CRAZYRADIO_STL)
content_folder = self._add_and_verify_folder("content", "content")
other_stl_name = self._add_and_verify_file((content_folder, "crazyradio.stl"), content_folder + "/crazyradio.stl", FILE_CRAZYRADIO_STL)
self.storage.remove_file(other_stl_name)
self.storage.remove_folder(content_folder, recursive=False)
def test_copy_folder(self):
self._add_folder("source")
self._add_folder("destination")
self._add_file("source/crazyradio.stl", FILE_CRAZYRADIO_STL)
source_metadata = self.storage.get_metadata("source/crazyradio.stl")
self.storage.copy_folder("source", "destination/copied")
copied_metadata = self.storage.get_metadata("destination/copied/crazyradio.stl")
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, "source")))
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "source", "crazyradio.stl")))
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, "destination")))
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, "destination", "copied")))
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "destination", "copied", ".metadata.yaml")))
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "destination", "copied", "crazyradio.stl")))
self.assertIsNotNone(source_metadata)
self.assertIsNotNone(copied_metadata)
self.assertDictEqual(source_metadata, copied_metadata)
def test_move_folder(self):
self._add_folder("source")
self._add_folder("destination")
self._add_file("source/crazyradio.stl", FILE_CRAZYRADIO_STL)
before_source_metadata = self.storage.get_metadata("source/crazyradio.stl")
self.storage.move_folder("source", "destination/copied")
after_source_metadata = self.storage.get_metadata("source/crazyradio.stl")
copied_metadata = self.storage.get_metadata("destination/copied/crazyradio.stl")
self.assertFalse(os.path.isdir(os.path.join(self.basefolder, "source")))
self.assertFalse(os.path.isfile(os.path.join(self.basefolder, "source", "crazyradio.stl")))
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, "destination")))
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, "destination", "copied")))
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "destination", "copied", ".metadata.yaml")))
self.assertTrue(os.path.isfile(os.path.join(self.basefolder, "destination", "copied", "crazyradio.stl")))
self.assertIsNotNone(before_source_metadata)
self.assertIsNone(after_source_metadata)
self.assertIsNotNone(copied_metadata)
self.assertDictEqual(before_source_metadata, copied_metadata)
@data("copy_folder", "move_folder")
def test_copy_move_folder_missing_source(self, operation):
try:
getattr(self.storage, operation)("source", "destination/copied")
self.fail("Expected an exception")
except StorageError as e:
self.assertEqual(e.code, StorageError.INVALID_SOURCE)
@data("copy_folder", "move_folder")
def test_copy_move_folder_missing_destination_folder(self, operation):
self._add_folder("source")
self._add_file("source/crazyradio.stl", FILE_CRAZYRADIO_STL)
try:
getattr(self.storage, operation)("source", "destination/copied")
self.fail("Expected an exception")
except StorageError as e:
self.assertEqual(e.code, StorageError.INVALID_DESTINATION)
@data("copy_folder", "move_folder")
def test_copy_move_folder_existing_destination_path(self, operation):
self._add_folder("source")
self._add_file("source/crazyradio.stl", FILE_CRAZYRADIO_STL)
self._add_folder("destination")
self._add_folder("destination/copied")
try:
getattr(self.storage, operation)("source", "destination/copied")
self.fail("Expected an exception")
except StorageError as e:
self.assertEqual(e.code, StorageError.INVALID_DESTINATION)
def test_list(self):
bp_case_stl = self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
self._add_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE, links=[("model", dict(name=bp_case_stl))])
bp_case_stl = self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
self._add_and_verify_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE, links=[("model", dict(name=bp_case_stl))])
content_folder = self._add_folder("content", "content")
self._add_file((content_folder, "crazyradio.stl"), content_folder + "/crazyradio.stl", FILE_CRAZYRADIO_STL)
content_folder = self._add_and_verify_folder("content", "content")
self._add_and_verify_file((content_folder, "crazyradio.stl"), content_folder + "/crazyradio.stl", FILE_CRAZYRADIO_STL)
self._add_folder("empty", "empty")
self._add_and_verify_folder("empty", "empty")
file_list = self.storage.list_files()
self.assertEquals(4, len(file_list))
@ -215,8 +360,8 @@ class LocalStorageTest(unittest.TestCase):
self.assertEquals(0, len(file_list["empty"]["children"]))
def test_add_link_model(self):
stl_name = self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE)
stl_name = self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_and_verify_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE)
self.storage.add_link(gcode_name, "model", dict(name=stl_name))
@ -242,8 +387,8 @@ class LocalStorageTest(unittest.TestCase):
self.assertEquals(FILE_BP_CASE_GCODE.hash, link["hash"])
def test_add_link_machinecode(self):
stl_name = self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE)
stl_name = self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_and_verify_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE)
self.storage.add_link(stl_name, "machinecode", dict(name=gcode_name))
@ -269,7 +414,7 @@ class LocalStorageTest(unittest.TestCase):
self.assertEquals(FILE_BP_CASE_GCODE.hash, link["hash"])
def test_remove_link(self):
stl_name = self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
stl_name = self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
self.storage.add_link(stl_name, "web", dict(href="http://www.example.com"))
self.storage.add_link(stl_name, "web", dict(href="http://www.example2.com"))
@ -288,8 +433,8 @@ class LocalStorageTest(unittest.TestCase):
self.assertEquals(1, len(stl_metadata["links"]))
def test_remove_link_bidirectional(self):
stl_name = self._add_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE)
stl_name = self._add_and_verify_file("bp_case.stl", "bp_case.stl", FILE_BP_CASE_STL)
gcode_name = self._add_and_verify_file("bp_case.gcode", "bp_case.gcode", FILE_BP_CASE_GCODE)
self.storage.add_link(stl_name, "machinecode", dict(name=gcode_name))
self.storage.add_link(stl_name, "web", dict(href="http://www.example.com"))
@ -379,8 +524,23 @@ class LocalStorageTest(unittest.TestCase):
self.assertEquals(expected_path, actual_path)
self.assertEquals(expected_name, actual_name)
def _add_file(self, path, expected_path, file_object, links=None, overwrite=False):
def _add_and_verify_file(self, path, expected_path, file_object, links=None, overwrite=False):
"""Adds a file to the storage and verifies the sanitized path."""
sanitized_path = self._add_file(path, file_object, links=links, overwrite=overwrite)
self.assertEquals(expected_path, sanitized_path)
return sanitized_path
def _add_file(self, path, file_object, links=None, overwrite=False):
"""
Adds a file to the storage.
Ensures file is present, metadata is present, hash and links (if applicable)
are populated correctly.
Returns sanitized path.
"""
sanitized_path = self.storage.add_file(path, file_object, links=links, allow_overwrite=overwrite)
split_path = sanitized_path.split("/")
if len(split_path) == 1:
file_path = os.path.join(self.basefolder, split_path[0])
@ -389,9 +549,8 @@ class LocalStorageTest(unittest.TestCase):
file_path = os.path.join(self.basefolder, os.path.join(*split_path))
folder_path = os.path.join(self.basefolder, os.path.join(*split_path[:-1]))
self.assertEquals(expected_path, sanitized_path)
self.assertTrue(os.path.exists(file_path))
self.assertTrue(os.path.exists(os.path.join(folder_path, ".metadata.yaml")))
self.assertTrue(os.path.isfile(file_path))
self.assertTrue(os.path.isfile(os.path.join(folder_path, ".metadata.yaml")))
metadata = self.storage.get_metadata(sanitized_path)
self.assertIsNotNone(metadata)
@ -406,11 +565,21 @@ class LocalStorageTest(unittest.TestCase):
return sanitized_path
def _add_folder(self, path, expected_path):
sanitized_path = self.storage.add_folder(path)
def _add_and_verify_folder(self, path, expected_path):
"""Adds a folder to the storage and verifies sanitized path."""
sanitized_path = self._add_folder(path)
self.assertEquals(expected_path, sanitized_path)
self.assertTrue(os.path.exists(os.path.join(self.basefolder, os.path.join(*sanitized_path.split("/")))))
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, os.path.join(*sanitized_path.split("/")))))
return sanitized_path
def _add_folder(self, path):
"""
Adds a folder to the storage.
Verifies existance of folder.
Returns sanitized path.
"""
sanitized_path = self.storage.add_folder(path)
self.assertTrue(os.path.isdir(os.path.join(self.basefolder, os.path.join(*sanitized_path.split("/")))))
return sanitized_path