diff --git a/backend/mqtt_publisher.py b/backend/mqtt_publisher.py new file mode 100644 index 0000000..7fa440a --- /dev/null +++ b/backend/mqtt_publisher.py @@ -0,0 +1,98 @@ +''' +This program will check the database about every minute to search for a new event. +If one is found it will be sent to an mqtt server + +''' + + +import time +import sqlite3 +import configparser +import random +import json +import subprocess + +import logging as log +log.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=log.INFO) + + +def _initilize_DB(_sqlite_location): + _db_conn = sqlite3.connect(_sqlite_location, timeout=60.0) + cursor = _db_conn.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='Events'") + if cursor.fetchone() == None: + cursor.execute('''CREATE TABLE Events + (UTCUnixTime INTEGER, SubSeconds REAL, TemperatureC REAL, Humidity REAL, AccelX REAL, + AccelY REAL, AccelZ REAL, MagX REAL, MagY REAL, MagZ REAL, Pressure REAL, Longitude REAL, + Latitude REAL, DetectorName TEXT, DetectorVersion TEXT);''') + _db_conn.commit() + + +def getserial(): + # Extract serial from cpuinfo file + cpuserial = "0000000000000000" + try: + f = open('/proc/cpuinfo','r') + for line in f: + if line[0:6]=='Serial': + cpuserial = line[10:26] + f.close() + except: + cpuserial = "ERROR000000000" + + return cpuserial + +def send_via_mqtt(broker_address, broker_topic, message): + execution_string = "mosquitto_pub -h {} -t '{}' -m '{}'".format(broker_address, broker_topic, message) + log.debug("Executing the following: {}".format(execution_string)) + # ToDo: This is unsafe, change it! + subprocess.call(execution_string, shell=True) + +# settings files +CONFIG_FILE = "../config/CosmicPi.config" + +# read configuration +# Todo: Put the config parser into a propper class +# Todo: Implement proper error catching for configparser (e.g. non existent keys or file) +# read configuration +config = configparser.ConfigParser() +config.read(CONFIG_FILE) +sqlite_location = config.get("Storage", "sqlite_location") +broker_address = config.get("MQTT", "broker_address") +broker_topic = "{}/{}".format(config.get("MQTT", "broker_topic"), getserial()) +last_sent_event_timestamp = 0 + + +# setup the program +_initilize_DB(sqlite_location) + +# start the cleaning loop +while(True): + global last_sent_event_timestamp + # establish a connection + db_conn = sqlite3.connect(sqlite_location, timeout=60.0) + # we would like to be able to use have our rows as dictionaries + db_conn.row_factory = sqlite3.Row + cursor = db_conn.cursor() + + # get the most recent time + cursor.execute("SELECT * FROM Events ORDER BY UTCUnixTime DESC, SubSeconds DESC;") + latest_time = cursor.fetchone()[0] + + # Get the next events that will be sent + cursor.execute("SELECT * FROM Events WHERE UTCUnixTime > ?;", (last_sent_event_timestamp,)) + # send the next events + available_events = cursor.fetchall() + log.info("Searched for events since: {}; Found the following number of new events: {}".format(last_sent_event_timestamp, len(available_events))) + for event in available_events: + message = json.dumps(dict(event)) + send_via_mqtt(broker_address, broker_topic, message) + db_conn.close() + + # save the event time + last_sent_event_timestamp = latest_time + + # sleep for a semi random time + time_to_wait = int(random.randrange(30, 90)) + log.info("Sleeping for: {} [s]".format(time_to_wait)) + time.sleep(time_to_wait) diff --git a/install_files/CosmicPi-mqtt.service b/install_files/CosmicPi-mqtt.service new file mode 100644 index 0000000..e35094d --- /dev/null +++ b/install_files/CosmicPi-mqtt.service @@ -0,0 +1,10 @@ +[Unit] +Description=Software for sending locally stored events from the CosmicPi to an MQTT srver + +[Service] +ExecStart=/usr/bin/python PATH_TO_EXECUTABLE/backend/mqtt_publisher.py +WorkingDirectory=PATH_TO_EXECUTABLE/backend/ +Restart=on-failure + +[Install] +WantedBy=multi-user.target