From e51d7dc7a09db163db97b84f141ef09a02782a91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gina=20H=C3=A4u=C3=9Fge?= Date: Thu, 10 Jan 2013 21:02:47 +0100 Subject: [PATCH] Changed message queue... but still not happy with that... --- printer_webui/printer.py | 104 +++++++++++++++++++++++++++------- printer_webui/server.py | 20 +++---- printer_webui/static/js/ui.js | 38 ++++++++----- 3 files changed, 115 insertions(+), 47 deletions(-) diff --git a/printer_webui/printer.py b/printer_webui/printer.py index 226d9206..e6da6045 100644 --- a/printer_webui/printer.py +++ b/printer_webui/printer.py @@ -5,6 +5,7 @@ __license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agp import time from threading import Thread import Queue +import collections import printer_webui.util.comm as comm from printer_webui.util import gcodeInterpreter @@ -69,7 +70,16 @@ class Printer(): self._callbacks = [] self._lastProgressReport = None - self._updateQueue = Queue.Queue() + self._updateQueue = MessageQueue() + self._updateQueue.registerMessageType("zchange", self._sendZChangeCallbacks, overwrite=True) + self._updateQueue.registerMessageType("state", self._sendStateCallbacks) + self._updateQueue.registerMessageType("temperature", self._sendTemperatureCallbacks, mergeFunction=(lambda x,y: x + y)) + self._updateQueue.registerMessageType("log", self._sendLogCallbacks, mergeFunction=(lambda x, y: x + y)) + self._updateQueue.registerMessageType("message", self._sendMessageCallbacks, mergeFunction=(lambda x, y: x + y)) + self._updateQueue.registerMessageType("progress", self._sendProgressCallbacks, overwrite=True) + self._updateQueue.registerMessageType("job", self._sendJobCallbacks, throttling=0.5) + self._updateQueue.registerMessageType("gcode", self._sendGcodeCallbacks, throttling=0.5) + self._updateQueueWorker = Thread(target=self._processQueue) self._updateQueueWorker.start() @@ -95,17 +105,17 @@ class Printer(): def _sendTemperatureCallbacks(self, data): for callback in self._callbacks: - try: callback.temperatureChangeCB(data["currentTime"], data["temp"], data["bedTemp"], data["targetTemp"], data["targetBedTemp"]) + try: callback.temperatureChangeCB(data) except: pass def _sendLogCallbacks(self, data): for callback in self._callbacks: - try: callback.logChangeCB(data["log"]) + try: callback.logChangeCB(data) except: pass def _sendMessageCallbacks(self, data): for callback in self._callbacks: - try: callback.messageChangeCB(data["message"]) + try: callback.messageChangeCB(data) except: pass def _sendProgressCallbacks(self, data): @@ -124,12 +134,9 @@ class Printer(): except: pass - def _addUpdate(self, target, data): - self._updateQueue.put((target, data)) - def _processQueue(self): while True: - (target, data) = self._updateQueue.get() + (target, data) = self._updateQueue.read() target(data) self._updateQueue.task_done() @@ -234,11 +241,11 @@ class Printer(): def _setCurrentZ(self, currentZ): self._currentZ = currentZ - self._addUpdate(self._sendZChangeCallbacks, {"currentZ": self._currentZ}) + self._updateQueue.message("zchange", {"currentZ": self._currentZ}) def _setState(self, state): self._state = state - self._addUpdate(self._sendStateCallbacks, {"state": self._state, "stateString": self.getStateString(), "stateFlags": self._getStateFlags()}) + self._updateQueue.message("state", {"state": self._state, "stateString": self.getStateString(), "stateFlags": self._getStateFlags()}) def _addLog(self, log): """ @@ -247,13 +254,13 @@ class Printer(): self._latestLog = log self._log.append(log) self._log = self._log[-300:] - self._addUpdate(self._sendLogCallbacks, {"log": self._latestLog}) + self._updateQueue.message("log", [self._latestLog]) def _addMessage(self, message): self._latestMessage = message self._messages.append(message) self._messages = self._messages[-300:] - self._addUpdate(self._sendLogCallbacks, {"message": self._latestLog}) + self._updateQueue.message("message", [self._latestLog]) def _setProgressData(self, progress, printTime, printTimeLeft): self._progress = progress @@ -261,7 +268,7 @@ class Printer(): self._printTimeLeft = printTimeLeft #if not self._lastProgressReport or self._lastProgressReport + 0.5 <= time.time(): - self._addUpdate(self._sendProgressCallbacks, {"progress": self._progress, "printTime": self._printTime, "printTimeLeft": self._printTimeLeft}) + self._updateQueue.message("progress", {"progress": self._progress, "printTime": self._printTime, "printTimeLeft": self._printTimeLeft}) # self._lastProgressReport = time.time() def _addTemperatureData(self, temp, bedTemp, targetTemp, bedTargetTemp): @@ -288,7 +295,7 @@ class Printer(): self._targetTemp = targetTemp self._targetBedTemp = bedTargetTemp - self._addUpdate(self._sendTemperatureCallbacks, {"currentTime": currentTime, "temp": self._temp, "bedTemp": self._bedTemp, "targetTemp": self._targetTemp, "targetBedTemp": self._targetBedTemp, "history": self._temps}) + self._updateQueue.message("temperature", [{"currentTime": currentTime, "temp": self._temp, "bedTemp": self._bedTemp, "targetTemp": self._targetTemp, "targetBedTemp": self._targetBedTemp, "history": self._temps}]) def _setJobData(self, filename, gcode, gcodeList): self._filename = filename @@ -305,7 +312,7 @@ class Printer(): estimatedPrintTime = self._gcode.totalMoveTimeMinute filament = self._gcode.extrusionAmount - self._addUpdate(self._sendJobCallbacks, {"filename": self._filename, "lines": lines, "estimatedPrintTime": estimatedPrintTime, "filament": filament}) + self._updateQueue.message("job", {"filename": self._filename, "lines": lines, "estimatedPrintTime": estimatedPrintTime, "filament": filament}) def _sendInitialStateUpdate(self, callback): lines = None @@ -321,10 +328,8 @@ class Printer(): try: callback.zChangeCB(self._currentZ) callback.stateChangeCB(self._state, self.getStateString(), self._getStateFlags()) - callback.logChangeCB(self._latestLog) - callback.messageChangeCB(self._latestMessage) callback.progressChangeCB(self._progress, self._printTime, self._printTimeLeft) - callback.temperatureChangeCB(time.time() * 1000, self._temp, self._bedTemp, self._targetTemp, self._targetBedTemp) + callback.temperatureChangeCB([{"currentTime": time.time() * 1000, "temp": self._temp, "bedTemp": self._bedTemp, "targetTemp": self._targetTemp, "bedTargetTemp": self._targetBedTemp}]) callback.jobDataChangeCB(self._filename, lines, estimatedPrintTime, filament) callback.sendHistoryData(self._temps, self._log, self._messages) except Exception, err: @@ -402,7 +407,7 @@ class Printer(): #~~ callbacks triggered by gcodeLoader def onGcodeLoadingProgress(self, progress): - self._addUpdate(self._sendGcodeCallbacks, {"filename": self._gcodeLoader._filename, "progress": progress}) + self._updateQueue.message("gcode", {"filename": self._gcodeLoader._filename, "progress": progress}) def onGcodeLoaded(self): self._setJobData(self._gcodeLoader._filename, self._gcodeLoader._gcode, self._gcodeLoader._gcodeList) @@ -410,7 +415,7 @@ class Printer(): self._setProgressData(None, None, None) self._gcodeLoader = None - self._addUpdate(self._sendStateCallbacks, {"state": self._state, "stateString": self.getStateString(), "stateFlags": self._getStateFlags()}) + self._updateQueue.message("state", {"state": self._state, "stateString": self.getStateString(), "stateFlags": self._getStateFlags()}) #~~ state reports @@ -540,4 +545,61 @@ class PrinterCallback(object): pass def sendHistoryData(self, tempHistory, logHistory, messageHistory): - pass \ No newline at end of file + pass + + +class MessageQueue(Queue.Queue): + def __init__(self, maxsize=0): + Queue.Queue.__init__(self, maxsize) + self._messageTypes = dict() + self._lastSends = dict() + + def registerMessageType(self, messageType, callback, overwrite=False, throttling=None, mergeFunction=None): + self._messageTypes[messageType] = (callback, overwrite, throttling, mergeFunction) + if throttling is not None: + self._lastSends[messageType] = time.time() + + def message(self, messageType, data, timestamp=time.time()): + if not self._messageTypes.has_key(messageType): + return + + (callback, overwrite, throttling, merger) = self._messageTypes[messageType] + updated = False + try: + self.mutex.acquire() + if overwrite or throttling is not None or merger is not None: + for item in self.queue: + if item.type == messageType and ((throttling is not None and item.timestamp + throttling < time.time()) or overwrite or merger is not None): + if merger is not None: + item.payload = merger(item.payload, data) + else: + item.payload = data + updated = True + break + finally: + self.mutex.release() + + if not updated: + item = MessageQueueItem(messageType, timestamp, data) + self.put(item) + + def read(self): + item = None + while item is None: + item = self.get() + if not self._messageTypes.has_key(item.type): + self.task_done() + item = None + (callback, overwrite, throttling, merger) = self._messageTypes[item.type] + if throttling and self._lastSends[item.type] + throttling > time.time(): + self.message(item.type, item.payload, item.timestamp) + item = None + + self._lastSends[item.type] = time.time() + return (callback, item.payload) + +class MessageQueueItem(object): + def __init__(self, type, timestamp, payload): + self.type = type + self.timestamp = timestamp + self.payload = payload \ No newline at end of file diff --git a/printer_webui/server.py b/printer_webui/server.py index 79931e54..cd58206b 100644 --- a/printer_webui/server.py +++ b/printer_webui/server.py @@ -68,31 +68,25 @@ class PrinterStateConnection(tornadio2.SocketConnection, PrinterCallback): "printTimeLeft": formattedPrintTimeLeft }) - def temperatureChangeCB(self, currentTime, temp, bedTemp, targetTemp, targetBedTemp): + def temperatureChangeCB(self, temperatures): print("Sending temperatureChange...") - self.emit("temperature", { - "currentTime": currentTime, - "temp": temp, - "bedTemp": bedTemp, - "targetTemp": targetTemp, - "targetBedTemp": targetBedTemp - }) + self.emit("temperatures", temperatures) def stateChangeCB(self, state, stateString, booleanStates): print("Sending stateChange...") self.emit("state", {"currentState": stateString, "flags": booleanStates}) - def logChangeCB(self, line): + def logChangeCB(self, lines): print("Sending logChange...") - self.emit("log", {"line": line}) + self.emit("log", {"lines": lines}) - def messageChangeCB(self, line): + def messageChangeCB(self, lines): print("Sending messageChange...") - self.emit("message", {"line": line}) + self.emit("message", {"lines": lines}) def gcodeChangeCB(self, filename, progress): print("Sending gcodeChange...") - self.emit("jobData", {"filename": "Loading... (%d%%)" % (round(progress * 100)), "lineCount": None, "estimatedPrintTime": None, "filament": None}) + self.emit("gcode", {"filename": filename, "progress": progress}) def jobDataChangeCB(self, filename, lines, estimatedPrintTimeInMinutes, filamentLengthInMillimeters): formattedPrintTimeEstimation = None diff --git a/printer_webui/static/js/ui.js b/printer_webui/static/js/ui.js index e6d0196a..368302e7 100644 --- a/printer_webui/static/js/ui.js +++ b/printer_webui/static/js/ui.js @@ -157,6 +157,12 @@ function PrinterStateViewModel() { self.filament(data.filament); } + self.fromGcodeEvent = function(data) { + if (self.isLoading()) { + self.filename("Loading... (" + Math.round(data.progress * 100) + ")"); + } + } + self.fromProgressEvent = function(data) { self.currentLine(data.currentLine); self.printTime(data.printTime); @@ -246,12 +252,14 @@ function TemperatureViewModel() { } self.fromTemperatureEvent = function(data) { - self.temp(data.temp); - self.bedTemp(data.bedTemp); - self.targetTemp(data.targetTemp); - self.bedTargetTemp(data.bedTargetTemp); + if (data.length == 0) + return; + + self.temp(data[data.length - 1].temp); + self.bedTemp(data[data.length - 1].bedTemp); + self.targetTemp(data[data.length - 1].targetTemp); + self.bedTargetTemp(data[data.length - 1].bedTargetTemp); - // plot if (!self.temperatures) self.temperatures = []; if (!self.temperatures.actual) @@ -263,11 +271,12 @@ function TemperatureViewModel() { if (!self.temperatures.targetBed) self.temperatures.targetBed = []; - self.temperatures.actual.push([data.currentTime, data.temp]) - self.temperatures.target.push([data.currentTime, data.targetTemp]) - self.temperatures.actualBed.push([data.currentTime, data.bedTemp]) - self.temperatures.targetBed.push([data.currentTime, data.bedTargetTemp]) - + for (var i = 0; i < data.length; i++) { + self.temperatures.actual.push([data[i].currentTime, data[i].temp]) + self.temperatures.target.push([data[i].currentTime, data[i].targetTemp]) + self.temperatures.actualBed.push([data[i].currentTime, data[i].bedTemp]) + self.temperatures.targetBed.push([data[i].currentTime, data[i].bedTargetTemp]) + } self.temperatures.actual = self.temperatures.actual.slice(-300); self.temperatures.target = self.temperatures.target.slice(-300); self.temperatures.actualBed = self.temperatures.actualBed.slice(-300); @@ -361,7 +370,7 @@ function TerminalViewModel() { self.fromLogEvent = function(data) { if (!self.log) self.log = [] - self.log.push(data.line) + self.log.concat(data.line) self.updateOutput(); } @@ -547,13 +556,16 @@ function DataUpdater(connectionViewModel, printerStateViewModel, temperatureView self.speedViewModel.fromStateEvent(data); self.webcamViewModel.fromStateEvent(data); }) - self.socket.on("temperature", function(data) { + self.socket.on("temperatures", function(data) { self.temperatureViewModel.fromTemperatureEvent(data); }) self.socket.on("jobData", function(data) { self.printerStateViewModel.fromJobEvent(data); }) - self.socket.on("log", function(data) { + self.socket.on("gcode", function(data) { + self.printerStateViewModel.fromGcodeEvent(data); + }) + self.socket.on("logs", function(data) { self.terminalViewModel.fromLogEvent(data); }) self.socket.on("printProgress", function(data) {