diff --git a/src/octoprint/filemanager/__init__.py b/src/octoprint/filemanager/__init__.py index eb8e2140..c7702993 100644 --- a/src/octoprint/filemanager/__init__.py +++ b/src/octoprint/filemanager/__init__.py @@ -397,13 +397,10 @@ class FileManager(object): if hook_file_object is not None: file_object = hook_file_object file_path = self._storage(destination).add_file(path, file_object, links=links, printer_profile=printer_profile, allow_overwrite=allow_overwrite) - absolute_path = self._storage(destination).path_on_disk(file_path) - _, file_name = self._storage(destination).split_path(file_path) if analysis is None: - file_type = get_file_type(absolute_path) - if file_type: - queue_entry = QueueEntry(file_name, file_path, file_type[-1], destination, absolute_path, printer_profile) + queue_entry = self._analysis_queue_entry(destination, file_path, printer_profile=printer_profile) + if queue_entry: self._analysis_queue.enqueue(queue_entry, high_priority=True) else: self._add_analysis_result(destination, path, analysis) @@ -412,6 +409,8 @@ class FileManager(object): return file_path def remove_file(self, destination, path): + queue_entry = self._analysis_queue_entry(destination, path) + self._analysis_queue.dequeue(queue_entry) self._storage(destination).remove_file(path) eventManager().fire(Events.UPDATED_FILES, dict(type="printables")) @@ -420,6 +419,8 @@ class FileManager(object): eventManager().fire(Events.UPDATED_FILES, dict(type="printables")) def move_file(self, destination, source, dst): + queue_entry = self._analysis_queue_entry(destination, source) + self._analysis_queue.dequeue(queue_entry) self._storage(destination).move_file(source, dst) eventManager().fire(Events.UPDATED_FILES, dict(type="printables")) @@ -429,7 +430,10 @@ class FileManager(object): return folder_path def remove_folder(self, destination, path, recursive=True): + self._analysis_queue.dequeue_folder(destination, path) + self._analysis_queue.pause() self._storage(destination).remove_folder(path, recursive=recursive) + self._analysis_queue.resume() eventManager().fire(Events.UPDATED_FILES, dict(type="printables")) def copy_folder(self, destination, source, dst): @@ -437,7 +441,10 @@ class FileManager(object): eventManager().fire(Events.UPDATED_FILES, dict(type="printables")) def move_folder(self, destination, source, dst): + self._analysis_queue.dequeue_folder(destination, source) + self._analysis_queue.pause() self._storage(destination).move_folder(source, dst) + self._analysis_queue.resume() eventManager().fire(Events.UPDATED_FILES, dict(type="printables")) def get_metadata(self, destination, path): @@ -542,3 +549,15 @@ class FileManager(object): def _on_analysis_finished(self, entry, result): self._add_analysis_result(entry.location, entry.path, result) + def _analysis_queue_entry(self, destination, path, printer_profile=None): + if printer_profile is None: + printer_profile = self._printer_profile_manager.get_current_or_default() + + absolute_path = self._storage(destination).path_on_disk(path) + _, file_name = self._storage(destination).split_path(path) + file_type = get_file_type(absolute_path) + + if file_type: + return QueueEntry(file_name, path, file_type[-1], destination, absolute_path, printer_profile) + else: + return None diff --git a/src/octoprint/filemanager/analysis.py b/src/octoprint/filemanager/analysis.py index 3685ab5f..5c92bd8b 100644 --- a/src/octoprint/filemanager/analysis.py +++ b/src/octoprint/filemanager/analysis.py @@ -40,6 +40,12 @@ class QueueEntry(collections.namedtuple("QueueEntry", "name, path, type, locatio return "{location}:{path}".format(location=self.location, path=self.path) +class AnalysisAborted(Exception): + def __init__(self, reenqueue=True, *args, **kwargs): + Exception.__init__(self, *args, **kwargs) + self.reenqueue = reenqueue + + class AnalysisQueue(object): """ OctoPrint's :class:`AnalysisQueue` can manage various :class:`AbstractAnalysisQueue` implementations, mapped @@ -77,6 +83,16 @@ class AnalysisQueue(object): self._queues[entry.type].enqueue(entry, high_priority=high_priority) return True + def dequeue(self, entry): + if not entry.type in self._queues: + return False + + self._queues[entry.type].dequeue(entry.location, entry.path) + + def dequeue_folder(self, destination, path): + for queue in self._queues.values(): + queue.dequeue_folder(destination, path) + def pause(self): for queue in self._queues.values(): queue.pause() @@ -112,8 +128,10 @@ class AbstractAnalysisQueue(object): .. automethod:: _do_abort """ - LOW_PRIO = 0 - HIGH_PRIO = 100 + LOW_PRIO = 100 + LOW_PRIO_ABORTED = 75 + HIGH_PRIO = 50 + HIGH_PRIO_ABORTED = 0 def __init__(self, finished_callback): self._logger = logging.getLogger(__name__) @@ -123,11 +141,15 @@ class AbstractAnalysisQueue(object): self._active = threading.Event() self._active.set() + self._done = threading.Event() + self._done.clear() + self._currentFile = None self._currentProgress = None self._queue = queue.PriorityQueue() self._current = None + self._current_highprio = False self._worker = threading.Thread(target=self._work) self._worker.daemon = True @@ -153,7 +175,24 @@ class AbstractAnalysisQueue(object): self._logger.debug("Adding entry {entry} to analysis queue with low priority".format(entry=entry)) prio = self.__class__.LOW_PRIO - self._queue.put((prio, entry)) + self._queue.put((prio, entry, high_priority)) + if high_priority and self._current is not None and not self._current_highprio: + self._logger.debug("Aborting current analysis in favor of high priority one") + self._do_abort() + + def dequeue(self, location, path): + if self._current is not None and self._current.location == location \ + and self._current.path == path: + self._do_abort(reenqueue=False) + self._done.wait() + self._done.clear() + + def dequeue_folder(self, location, path): + if self._current is not None and self._current.location == location \ + and self._current.path.startswith(path + "/"): + self._do_abort(reenqueue=False) + self._done.wait() + self._done.clear() def pause(self): """ @@ -175,24 +214,23 @@ class AbstractAnalysisQueue(object): self._active.set() def _work(self): - aborted = None while True: - if aborted is not None: - entry = aborted - aborted = None - self._logger.debug("Got an aborted analysis job for entry {entry}, processing this instead of first item in queue".format(**locals())) - else: - (priority, entry) = self._queue.get() - self._logger.debug("Processing entry {entry} from queue (priority {priority})".format(**locals())) - + (priority, entry, high_priority) = self._queue.get() + self._logger.debug("Processing entry {} from queue (priority {})".format(entry, priority)) self._active.wait() try: - self._analyze(entry, high_priority=(priority == self.__class__.HIGH_PRIO)) + self._analyze(entry, high_priority=high_priority) self._queue.task_done() - except gcodeInterpreter.AnalysisAborted: - aborted = entry - self._logger.debug("Running analysis of entry {entry} aborted".format(**locals())) + self._done.set() + except AnalysisAborted as ex: + if ex.reenqueue: + self._queue.put((self.__class__.HIGH_PRIO_ABORTED if high_priority else self.__class__.LOW_PRIO_ABORTED, + entry, + high_priority)) + self._logger.debug("Running analysis of entry {} aborted".format(entry)) + self._queue.task_done() + self._done.set() else: time.sleep(1.0) @@ -202,10 +240,12 @@ class AbstractAnalysisQueue(object): return self._current = entry + self._current_highprio = high_priority self._current_progress = 0 try: - self._logger.info("Starting analysis of {entry}".format(**locals())) + start_time = time.time() + self._logger.info("Starting analysis of {}".format(entry)) eventManager().fire(Events.METADATA_ANALYSIS_STARTED, {"name": entry.name, "path": entry.path, "origin": entry.location, @@ -217,7 +257,7 @@ class AbstractAnalysisQueue(object): result = self._do_analysis(high_priority=high_priority) except TypeError: result = self._do_analysis() - self._logger.debug("Analysis of entry {entry} finished, notifying callback".format(**locals())) + self._logger.info("Analysis of entry {} finished, needed {:.2f}s".format(entry, time.time() - start_time)) self._finished_callback(self._current, result) finally: self._current = None @@ -237,7 +277,7 @@ class AbstractAnalysisQueue(object): """ return None - def _do_abort(self): + def _do_abort(self, reenqueue=True): """ Aborts analysis of the current entry. Needs to be overridden by sub classes. """ @@ -289,9 +329,11 @@ class GcodeAnalysisQueue(AbstractAnalysisQueue): "volume": self._gcode.extrusionVolume[i] } return result + except gcodeInterpreter.AnalysisAborted as ex: + raise AnalysisAborted(reenqueue=ex.reenqueue) finally: self._gcode = None - def _do_abort(self): + def _do_abort(self, reenqueue=True): if self._gcode: - self._gcode.abort() + self._gcode.abort(reenqueue=reenqueue) diff --git a/src/octoprint/util/gcodeInterpreter.py b/src/octoprint/util/gcodeInterpreter.py index 432eb0db..f0548159 100644 --- a/src/octoprint/util/gcodeInterpreter.py +++ b/src/octoprint/util/gcodeInterpreter.py @@ -172,7 +172,9 @@ class MinMax3D(object): class AnalysisAborted(Exception): - pass + def __init__(self, reenqueue=True, *args, **kwargs): + self.reenqueue = reenqueue + Exception.__init__(self, *args, **kwargs) class gcode(object): @@ -185,6 +187,7 @@ class gcode(object): self.filename = None self.progressCallback = None self._abort = False + self._reenqueue = True self._filamentDiameter = 0 self._minMax = MinMax3D() @@ -213,8 +216,9 @@ class gcode(object): with codecs.open(filename, encoding="utf-8", errors="replace") as f: self._load(f, printer_profile, throttle=throttle) - def abort(self): + def abort(self, reenqueue=True): self._abort = True + self._reenqueue = reenqueue def _load(self, gcodeFile, printer_profile, throttle=None): filePos = 0 @@ -240,7 +244,7 @@ class gcode(object): for line in gcodeFile: if self._abort: - raise AnalysisAborted() + raise AnalysisAborted(reenqueue=self._reenqueue) filePos += 1 readBytes += len(line) diff --git a/tests/filemanager/test_filemanager.py b/tests/filemanager/test_filemanager.py index 2c888be8..4490a0bd 100644 --- a/tests/filemanager/test_filemanager.py +++ b/tests/filemanager/test_filemanager.py @@ -171,9 +171,13 @@ class FileManagerTest(unittest.TestCase): self.fire_event.assert_called_once_with(octoprint.filemanager.Events.UPDATED_FILES, dict(type="printables")) def test_remove_file(self): + self.local_storage.path_on_disk.return_value = "prefix/test.file" + self.local_storage.split_path.return_value = ("", "test.file") + self.file_manager.remove_file(octoprint.filemanager.FileDestinations.LOCAL, "test.file") self.local_storage.remove_file.assert_called_once_with("test.file") + self.analysis_queue.dequeue.assert_called_once() self.fire_event.assert_called_once_with(octoprint.filemanager.Events.UPDATED_FILES, dict(type="printables")) def test_add_folder(self): @@ -199,11 +203,13 @@ class FileManagerTest(unittest.TestCase): self.file_manager.remove_folder(octoprint.filemanager.FileDestinations.LOCAL, "test_folder") self.local_storage.remove_folder.assert_called_once_with("test_folder", recursive=True) + self.analysis_queue.dequeue_folder.assert_called_once_with(octoprint.filemanager.FileDestinations.LOCAL, "test_folder") self.fire_event.assert_called_once_with(octoprint.filemanager.Events.UPDATED_FILES, dict(type="printables")) def test_remove_folder_nonrecursive(self): self.file_manager.remove_folder(octoprint.filemanager.FileDestinations.LOCAL, "test_folder", recursive=False) self.local_storage.remove_folder.assert_called_once_with("test_folder", recursive=False) + self.analysis_queue.dequeue_folder.assert_called_once_with(octoprint.filemanager.FileDestinations.LOCAL, "test_folder") @mock.patch("octoprint.util.atomic_write", create=True) @mock.patch("yaml.safe_dump", create=True)