2018-01-08 17:26:18 +00:00
'''
This program will check the database about every minute to search for a new event .
If one is found it will be sent to an mqtt server
'''
import time
import sqlite3
import configparser
import random
import json
import subprocess
2018-02-02 10:33:54 +00:00
import socket
2018-02-02 19:36:18 +00:00
import struct
2018-01-08 17:26:18 +00:00
import logging as log
log . basicConfig ( format = ' %(asctime)s - %(levelname)s - %(message)s ' , level = log . INFO )
def _initilize_DB ( _sqlite_location ) :
_db_conn = sqlite3 . connect ( _sqlite_location , timeout = 60.0 )
cursor = _db_conn . cursor ( )
cursor . execute ( " SELECT name FROM sqlite_master WHERE type= ' table ' AND name= ' Events ' " )
if cursor . fetchone ( ) == None :
cursor . execute ( ''' CREATE TABLE Events
( UTCUnixTime INTEGER , SubSeconds REAL , TemperatureC REAL , Humidity REAL , AccelX REAL ,
AccelY REAL , AccelZ REAL , MagX REAL , MagY REAL , MagZ REAL , Pressure REAL , Longitude REAL ,
Latitude REAL , DetectorName TEXT , DetectorVersion TEXT ) ; ''' )
_db_conn . commit ( )
def getserial ( ) :
# Extract serial from cpuinfo file
cpuserial = " 0000000000000000 "
try :
f = open ( ' /proc/cpuinfo ' , ' r ' )
for line in f :
if line [ 0 : 6 ] == ' Serial ' :
cpuserial = line [ 10 : 26 ]
f . close ( )
except :
cpuserial = " ERROR000000000 "
return cpuserial
2018-02-02 10:33:54 +00:00
def get_ip_address ( ifname ) :
s = socket . socket ( socket . AF_INET , socket . SOCK_DGRAM )
try :
import fcntl
result = socket . inet_ntoa ( fcntl . ioctl (
s . fileno ( ) ,
0x8915 , # SIOCGIFADDR
struct . pack ( ' 256s ' , ifname [ : 15 ] )
) [ 20 : 24 ] )
except IOError :
print ( " Error getting the IP address, either you are not doing this on raspbian or the queried device does not exist. " )
result = " no IP on {} " . format ( ifname )
except ImportError :
print ( " Getting the IP address on this OS is not implemented " )
result = " no IP on {} " . format ( ifname )
return result
2018-01-08 17:26:18 +00:00
def send_via_mqtt ( broker_address , broker_topic , message ) :
execution_string = " mosquitto_pub -h {} -t ' {} ' -m ' {} ' " . format ( broker_address , broker_topic , message )
log . debug ( " Executing the following: {} " . format ( execution_string ) )
2018-02-02 10:33:54 +00:00
# ToDo: This is unsafe, though there should be no user data here the risk of code injection is low.
2018-01-08 17:26:18 +00:00
subprocess . call ( execution_string , shell = True )
# settings files
CONFIG_FILE = " ../config/CosmicPi.config "
# read configuration
# Todo: Put the config parser into a propper class
# Todo: Implement proper error catching for configparser (e.g. non existent keys or file)
# read configuration
config = configparser . ConfigParser ( )
config . read ( CONFIG_FILE )
sqlite_location = config . get ( " Storage " , " sqlite_location " )
broker_address = config . get ( " MQTT " , " broker_address " )
broker_topic = " {} / {} " . format ( config . get ( " MQTT " , " broker_topic " ) , getserial ( ) )
last_sent_event_timestamp = 0
# setup the program
_initilize_DB ( sqlite_location )
# start the cleaning loop
while ( True ) :
global last_sent_event_timestamp
# establish a connection
db_conn = sqlite3 . connect ( sqlite_location , timeout = 60.0 )
# we would like to be able to use have our rows as dictionaries
db_conn . row_factory = sqlite3 . Row
cursor = db_conn . cursor ( )
# get the most recent time
cursor . execute ( " SELECT * FROM Events ORDER BY UTCUnixTime DESC, SubSeconds DESC; " )
2018-01-08 17:46:51 +00:00
time_row = cursor . fetchone ( )
# ToDo: Popper protection against an empty DB, this isn't working for some reason, but systemd will restart the program, so it's not completly deadly...
if time_row == type ( None ) :
log . info ( " Got a none type, retrying in a bit " )
# sleep for a semi random time
time_to_wait = int ( random . randrange ( 30 , 90 ) )
log . info ( " Sleeping for: {} [s] " . format ( time_to_wait ) )
time . sleep ( time_to_wait )
continue
latest_time = time_row [ 0 ]
2018-01-08 17:26:18 +00:00
# Get the next events that will be sent
cursor . execute ( " SELECT * FROM Events WHERE UTCUnixTime > ?; " , ( last_sent_event_timestamp , ) )
# send the next events
available_events = cursor . fetchall ( )
log . info ( " Searched for events since: {} ; Found the following number of new events: {} " . format ( last_sent_event_timestamp , len ( available_events ) ) )
for event in available_events :
2018-02-02 10:33:54 +00:00
message_dict = {
" data " : dict ( event ) ,
" meta " : {
" local IP " : get_ip_address ( " wlan0 " )
}
}
message = json . dumps ( message_dict )
2018-01-08 17:26:18 +00:00
send_via_mqtt ( broker_address , broker_topic , message )
db_conn . close ( )
# save the event time
last_sent_event_timestamp = latest_time
# sleep for a semi random time
2018-02-01 21:28:03 +00:00
time_to_wait = int ( random . randrange ( 1 , 5 ) )
2018-01-08 17:26:18 +00:00
log . info ( " Sleeping for: {} [s] " . format ( time_to_wait ) )
time . sleep ( time_to_wait )