Various analysis queue fixes

* Fix priority queue sorting (sorts ascending, not descending!)
  * Abort low priority jobs when a high priority job comes in
  * Utilize queue for tracking aborted jobs too
  * Abort analysis for items that are to be deleted/moved (also
    fixes issue under Windows where it was impossible to delete
    a file for which the analysis was still running).
This commit is contained in:
Gina Häußge 2017-01-12 18:05:01 +01:00
parent d69c166f10
commit e6dd37e26e
4 changed files with 100 additions and 29 deletions

View file

@ -397,13 +397,10 @@ class FileManager(object):
if hook_file_object is not None:
file_object = hook_file_object
file_path = self._storage(destination).add_file(path, file_object, links=links, printer_profile=printer_profile, allow_overwrite=allow_overwrite)
absolute_path = self._storage(destination).path_on_disk(file_path)
_, file_name = self._storage(destination).split_path(file_path)
if analysis is None:
file_type = get_file_type(absolute_path)
if file_type:
queue_entry = QueueEntry(file_name, file_path, file_type[-1], destination, absolute_path, printer_profile)
queue_entry = self._analysis_queue_entry(destination, file_path, printer_profile=printer_profile)
if queue_entry:
self._analysis_queue.enqueue(queue_entry, high_priority=True)
else:
self._add_analysis_result(destination, path, analysis)
@ -412,6 +409,8 @@ class FileManager(object):
return file_path
def remove_file(self, destination, path):
queue_entry = self._analysis_queue_entry(destination, path)
self._analysis_queue.dequeue(queue_entry)
self._storage(destination).remove_file(path)
eventManager().fire(Events.UPDATED_FILES, dict(type="printables"))
@ -420,6 +419,8 @@ class FileManager(object):
eventManager().fire(Events.UPDATED_FILES, dict(type="printables"))
def move_file(self, destination, source, dst):
queue_entry = self._analysis_queue_entry(destination, source)
self._analysis_queue.dequeue(queue_entry)
self._storage(destination).move_file(source, dst)
eventManager().fire(Events.UPDATED_FILES, dict(type="printables"))
@ -429,7 +430,10 @@ class FileManager(object):
return folder_path
def remove_folder(self, destination, path, recursive=True):
self._analysis_queue.dequeue_folder(destination, path)
self._analysis_queue.pause()
self._storage(destination).remove_folder(path, recursive=recursive)
self._analysis_queue.resume()
eventManager().fire(Events.UPDATED_FILES, dict(type="printables"))
def copy_folder(self, destination, source, dst):
@ -437,7 +441,10 @@ class FileManager(object):
eventManager().fire(Events.UPDATED_FILES, dict(type="printables"))
def move_folder(self, destination, source, dst):
self._analysis_queue.dequeue_folder(destination, source)
self._analysis_queue.pause()
self._storage(destination).move_folder(source, dst)
self._analysis_queue.resume()
eventManager().fire(Events.UPDATED_FILES, dict(type="printables"))
def get_metadata(self, destination, path):
@ -542,3 +549,15 @@ class FileManager(object):
def _on_analysis_finished(self, entry, result):
self._add_analysis_result(entry.location, entry.path, result)
def _analysis_queue_entry(self, destination, path, printer_profile=None):
if printer_profile is None:
printer_profile = self._printer_profile_manager.get_current_or_default()
absolute_path = self._storage(destination).path_on_disk(path)
_, file_name = self._storage(destination).split_path(path)
file_type = get_file_type(absolute_path)
if file_type:
return QueueEntry(file_name, path, file_type[-1], destination, absolute_path, printer_profile)
else:
return None

View file

@ -40,6 +40,12 @@ class QueueEntry(collections.namedtuple("QueueEntry", "name, path, type, locatio
return "{location}:{path}".format(location=self.location, path=self.path)
class AnalysisAborted(Exception):
def __init__(self, reenqueue=True, *args, **kwargs):
Exception.__init__(self, *args, **kwargs)
self.reenqueue = reenqueue
class AnalysisQueue(object):
"""
OctoPrint's :class:`AnalysisQueue` can manage various :class:`AbstractAnalysisQueue` implementations, mapped
@ -77,6 +83,16 @@ class AnalysisQueue(object):
self._queues[entry.type].enqueue(entry, high_priority=high_priority)
return True
def dequeue(self, entry):
if not entry.type in self._queues:
return False
self._queues[entry.type].dequeue(entry.location, entry.path)
def dequeue_folder(self, destination, path):
for queue in self._queues.values():
queue.dequeue_folder(destination, path)
def pause(self):
for queue in self._queues.values():
queue.pause()
@ -112,8 +128,10 @@ class AbstractAnalysisQueue(object):
.. automethod:: _do_abort
"""
LOW_PRIO = 0
HIGH_PRIO = 100
LOW_PRIO = 100
LOW_PRIO_ABORTED = 75
HIGH_PRIO = 50
HIGH_PRIO_ABORTED = 0
def __init__(self, finished_callback):
self._logger = logging.getLogger(__name__)
@ -123,11 +141,15 @@ class AbstractAnalysisQueue(object):
self._active = threading.Event()
self._active.set()
self._done = threading.Event()
self._done.clear()
self._currentFile = None
self._currentProgress = None
self._queue = queue.PriorityQueue()
self._current = None
self._current_highprio = False
self._worker = threading.Thread(target=self._work)
self._worker.daemon = True
@ -153,7 +175,24 @@ class AbstractAnalysisQueue(object):
self._logger.debug("Adding entry {entry} to analysis queue with low priority".format(entry=entry))
prio = self.__class__.LOW_PRIO
self._queue.put((prio, entry))
self._queue.put((prio, entry, high_priority))
if high_priority and self._current is not None and not self._current_highprio:
self._logger.debug("Aborting current analysis in favor of high priority one")
self._do_abort()
def dequeue(self, location, path):
if self._current is not None and self._current.location == location \
and self._current.path == path:
self._do_abort(reenqueue=False)
self._done.wait()
self._done.clear()
def dequeue_folder(self, location, path):
if self._current is not None and self._current.location == location \
and self._current.path.startswith(path + "/"):
self._do_abort(reenqueue=False)
self._done.wait()
self._done.clear()
def pause(self):
"""
@ -175,24 +214,23 @@ class AbstractAnalysisQueue(object):
self._active.set()
def _work(self):
aborted = None
while True:
if aborted is not None:
entry = aborted
aborted = None
self._logger.debug("Got an aborted analysis job for entry {entry}, processing this instead of first item in queue".format(**locals()))
else:
(priority, entry) = self._queue.get()
self._logger.debug("Processing entry {entry} from queue (priority {priority})".format(**locals()))
(priority, entry, high_priority) = self._queue.get()
self._logger.debug("Processing entry {} from queue (priority {})".format(entry, priority))
self._active.wait()
try:
self._analyze(entry, high_priority=(priority == self.__class__.HIGH_PRIO))
self._analyze(entry, high_priority=high_priority)
self._queue.task_done()
except gcodeInterpreter.AnalysisAborted:
aborted = entry
self._logger.debug("Running analysis of entry {entry} aborted".format(**locals()))
self._done.set()
except AnalysisAborted as ex:
if ex.reenqueue:
self._queue.put((self.__class__.HIGH_PRIO_ABORTED if high_priority else self.__class__.LOW_PRIO_ABORTED,
entry,
high_priority))
self._logger.debug("Running analysis of entry {} aborted".format(entry))
self._queue.task_done()
self._done.set()
else:
time.sleep(1.0)
@ -202,10 +240,12 @@ class AbstractAnalysisQueue(object):
return
self._current = entry
self._current_highprio = high_priority
self._current_progress = 0
try:
self._logger.info("Starting analysis of {entry}".format(**locals()))
start_time = time.time()
self._logger.info("Starting analysis of {}".format(entry))
eventManager().fire(Events.METADATA_ANALYSIS_STARTED, {"name": entry.name,
"path": entry.path,
"origin": entry.location,
@ -217,7 +257,7 @@ class AbstractAnalysisQueue(object):
result = self._do_analysis(high_priority=high_priority)
except TypeError:
result = self._do_analysis()
self._logger.debug("Analysis of entry {entry} finished, notifying callback".format(**locals()))
self._logger.info("Analysis of entry {} finished, needed {:.2f}s".format(entry, time.time() - start_time))
self._finished_callback(self._current, result)
finally:
self._current = None
@ -237,7 +277,7 @@ class AbstractAnalysisQueue(object):
"""
return None
def _do_abort(self):
def _do_abort(self, reenqueue=True):
"""
Aborts analysis of the current entry. Needs to be overridden by sub classes.
"""
@ -289,9 +329,11 @@ class GcodeAnalysisQueue(AbstractAnalysisQueue):
"volume": self._gcode.extrusionVolume[i]
}
return result
except gcodeInterpreter.AnalysisAborted as ex:
raise AnalysisAborted(reenqueue=ex.reenqueue)
finally:
self._gcode = None
def _do_abort(self):
def _do_abort(self, reenqueue=True):
if self._gcode:
self._gcode.abort()
self._gcode.abort(reenqueue=reenqueue)

View file

@ -172,7 +172,9 @@ class MinMax3D(object):
class AnalysisAborted(Exception):
pass
def __init__(self, reenqueue=True, *args, **kwargs):
self.reenqueue = reenqueue
Exception.__init__(self, *args, **kwargs)
class gcode(object):
@ -185,6 +187,7 @@ class gcode(object):
self.filename = None
self.progressCallback = None
self._abort = False
self._reenqueue = True
self._filamentDiameter = 0
self._minMax = MinMax3D()
@ -213,8 +216,9 @@ class gcode(object):
with codecs.open(filename, encoding="utf-8", errors="replace") as f:
self._load(f, printer_profile, throttle=throttle)
def abort(self):
def abort(self, reenqueue=True):
self._abort = True
self._reenqueue = reenqueue
def _load(self, gcodeFile, printer_profile, throttle=None):
filePos = 0
@ -240,7 +244,7 @@ class gcode(object):
for line in gcodeFile:
if self._abort:
raise AnalysisAborted()
raise AnalysisAborted(reenqueue=self._reenqueue)
filePos += 1
readBytes += len(line)

View file

@ -171,9 +171,13 @@ class FileManagerTest(unittest.TestCase):
self.fire_event.assert_called_once_with(octoprint.filemanager.Events.UPDATED_FILES, dict(type="printables"))
def test_remove_file(self):
self.local_storage.path_on_disk.return_value = "prefix/test.file"
self.local_storage.split_path.return_value = ("", "test.file")
self.file_manager.remove_file(octoprint.filemanager.FileDestinations.LOCAL, "test.file")
self.local_storage.remove_file.assert_called_once_with("test.file")
self.analysis_queue.dequeue.assert_called_once()
self.fire_event.assert_called_once_with(octoprint.filemanager.Events.UPDATED_FILES, dict(type="printables"))
def test_add_folder(self):
@ -199,11 +203,13 @@ class FileManagerTest(unittest.TestCase):
self.file_manager.remove_folder(octoprint.filemanager.FileDestinations.LOCAL, "test_folder")
self.local_storage.remove_folder.assert_called_once_with("test_folder", recursive=True)
self.analysis_queue.dequeue_folder.assert_called_once_with(octoprint.filemanager.FileDestinations.LOCAL, "test_folder")
self.fire_event.assert_called_once_with(octoprint.filemanager.Events.UPDATED_FILES, dict(type="printables"))
def test_remove_folder_nonrecursive(self):
self.file_manager.remove_folder(octoprint.filemanager.FileDestinations.LOCAL, "test_folder", recursive=False)
self.local_storage.remove_folder.assert_called_once_with("test_folder", recursive=False)
self.analysis_queue.dequeue_folder.assert_called_once_with(octoprint.filemanager.FileDestinations.LOCAL, "test_folder")
@mock.patch("octoprint.util.atomic_write", create=True)
@mock.patch("yaml.safe_dump", create=True)