temperature-logger-interface/temp_log.py

111 lines
3.3 KiB
Python
Executable File

#! /usr/bin/env python
import glob
import io
import platform
import Queue
import serial
import sys
import threading
import time
defaultSerialPort = '/dev/ttyACM0'
defaultVersion = 0
def list_serial_ports():
"""Lists serial ports, taken from http://stackoverflow.com/questions/11303850/what-is-the-cross-platform-method-of-enumerating-serial-ports-in-python-includi"""
system_name = platform.system()
if system_name == "Windows":
# Scan for available ports.
available = []
for i in range(256):
try:
s = serial.Serial(i)
available.append(i)
s.close()
except serial.SerialException:
pass
return available
elif system_name == "Darwin":
# Mac
return glob.glob('/dev/tty*') + glob.glob('/dev/cu*')
else:
# Assume Linux or something else
return glob.glob('/dev/ttyS*') + glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*')
class ParseException(Exception):
"""Parser Exceptions"""
pass
def parse_line(line, version = defaultVersion):
"""Parses a line and returns an array index by name of the data available"""
# @TODO This could be a class based versioning?
data = {}
if (version == 0):
line = line.split(' ')
try:
data['temperature'] = float(line[-2])
except Exception, e:
pass
else:
raise ParseException('Unimplemented line version', version)
return data
def threaded_reader(source, dataQueue, commandQueue):
try:
ser = serial.Serial(source, baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS)
sio = io.TextIOWrapper(io.BufferedRWPair(ser, ser))
ser.isOpen()
except Exception, e:
dataQueue.put({'exception' : str(e)})
return
buffer = ''
runTime = ''
cmd = ''
while True:
try:
cmd = commandQueue.get_nowait()
except Queue.Empty:
pass
if cmd == 'stop':
break
try:
data = parse_line(ser.readline(), defaultVersion)
runTime = time.time()
except Exception, e:
dataQueue.put({'exception' : str(e)})
if data and runTime:
item = {'time' : runTime, 'data' : data}
dataQueue.put(item)
# close out the fds
ser.close()
def main(args = sys.argv):
# @TODO switch to parse args or similar
# @TODO parse of temperature logger line version
serialPort = defaultSerialPort
lineVersion = defaultVersion
try:
serialPort = sys.argv[1]
except Exception, e:
pass
ser = serial.Serial(port=serialPort, baudrate=9600, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS)
sio = io.TextIOWrapper(io.BufferedRWPair(ser, ser))
ser.isOpen()
buffer = ''
while True:
try:
buffer = ser.readline()
data = parse_line(buffer, lineVersion)
if data:
print unicode(time.time()) + ',' + unicode(data['temperature'])
sys.stdout.flush()
except Exception, e:
sys.stderr.write(str(e) + '\n')
continue
if __name__ == '__main__':
main(sys.argv)