Python RPI button board
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
7
down vote
favorite
My project consists of a Raspberry Pi 2B V1.1 connected to a number of push-buttons via GPIO pins. The goal is to create a panel of buttons, allowing the user to make a selection of one or more items from a list, and then either cancel or commit their selection.
I am new to Python, and have only worked a little with Java and JS before.
The code is split into three modules:
matrix.py
: Gets user input from the hardware
The buttons are connected in a 2D matrix to save available pins. Currently; there are 6 columns and 2 rows; the first row contains the 6 choices, the second row 4 action buttons.controller.py
: Contains button logic, handles selection
The module reads theconfig.ini
for information on how to handle the input frommatrix.py
. This includes selecting/deselecting items, switching the LEDs on/off and forwarding the selection todatabase_commit.py
.database_commit.py
: Puts given input into a database
Received information is commited to a database according toconfig.ini
.
as well as a config.ini
file. In the following, the selectable items are called topics. "cancel", "commit", and the notification buttons are actions.
As I developed this while learning Python, the project is not well structured. What conceptual changes do I need to make in order to provide good readability, maintainability and extendability? What else should I improve?
The finished project will be in place for a long time, and I will not be able to maintain it; someone else will have to. I will supply a readme for the hardware maintenance, and have added info about the code there too. In my opinion, the code should be easily understandable without a manual though.
Note: The comments were translated to english. If anything is unclear, I'll gladly clarify.
matrix.py
# -*- coding: UTF-8 -*-
"""Reads input from GPIO with a reduced number of pins."""
import time
import RPi.GPIO as GPIO
from configparser import ConfigParser
class Matrix():
def __init__(self):
"""Sets up constants and pin numbers."""
# BCM (Broadcom) numbers are used
GPIO.setmode(GPIO.BCM)
parser = ConfigParser()
filename = 'config.ini'
parser.read(filename)
gpio = parser['gpio']
selection = parser['selection']
self.rows = list(map(int, gpio['button_rows'].split(',')))
self.columns = list(map(int, gpio['button_columns'].split(',')))
topics = selection['topics'].replace(" ","").split(',')
actions = selection['actions'].replace(" ","").split(',')
while len(actions) <= len(topics):
actions.append('unassigned')
self.options_array = [
topics,
actions
]
def get_button(self):
"""Findet gedrueckte Knoepfe und returnt Koordinaten falls gefunden."""
# To search for the row of a pressed button:
# - set all rows to input
# - set all columns to output low
# - save index in row_position
row_position = -1
for i in range(len(self.columns)):
GPIO.setup(self.columns[i], GPIO.OUT)
GPIO.output(self.columns[i], GPIO.LOW)
for i in range(len(self.rows)):
GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
for i in range(len(self.rows)):
temp_read = GPIO.input(self.rows[i])
if temp_read == 0:
row_position = i
# cancel if no input was found
if row_position < 0 or row_position >= len(self.rows):
self.refresh()
return
# To search for the column of a pressed button in the row:
# - set all columns to input
# - rows[row_position] to output
column_position = -1
for j in range(len(self.columns)):
GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(self.rows[row_position], GPIO.OUT)
GPIO.output(self.rows[row_position], GPIO.HIGH)
for j in range(len(self.columns)):
temp_read = GPIO.input(self.columns[j])
if temp_read == 1:
column_position = j
# cancel if no input was found
if column_position < 0 or column_position >= len(self.columns):
self.refresh()
return
# returns coordinates of pressed button
self.refresh()
return self.options_array[row_position][column_position]
def refresh(self):
"""Resets all pin states."""
for i in range(len(self.rows)):
GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
for j in range(len(self.columns)):
GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_UP)
if __name__ == '__main__':
# For testing matrix functionality
while True:
user_input = None
while user_input is None:
user_input = Matrix().get_button()
print(user_input)
time.sleep(.2)
controller.py
:
The TODOs are there to hint the concept and will be implemented later
#!/usr/bin/python3
"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
import time
import signal
import sys
import threading
import RPi.GPIO as GPIO
from threading import Timer
from configparser import ConfigParser
from matrix import Matrix
from database_commit import DatabaseCommit as database
class Controller():
"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
def __init__(self):
"""Sets up constants."""
# Use Broadcom-Numbering
GPIO.setmode(GPIO.BCM)
# Get constants from config.ini
parser = ConfigParser()
filename = 'config.ini'
parser.read(filename)
# Pin adresses
gpio = parser['gpio']
self.TOPICS_LED = list(map(int, gpio['topics_led'].split(',')))
self.ACTIONS_LED = list(map(int, gpio['actions_led'].split(',')))
self.CANCEL_LED = self.ACTIONS_LED[0]
self.CONFIRM_LED = self.ACTIONS_LED[1]
self.POST_LED = self.ACTIONS_LED[2]
self.NAGIOS_LED = self.ACTIONS_LED[3]
# names, defaults, timers
selection = parser['selection']
self.TOPICS = selection['topics'].replace(" ", "").split(',')
self.ACTIONS = selection['actions'].replace(" ", "").split(',')
self.CANCEL = self.ACTIONS[0]
self.CONFIRM = self.ACTIONS[1]
self.POST = self.ACTIONS[2]
self.NAGIOS = self.ACTIONS[3]
self.DEFAULT_SELECTION = selection['default_selection'].replace(" ", "").split(',')
self.DEFAULT_LED_STATE = [False] * len(self.TOPICS_LED)
self.SLEEP = float(selection['sleeptime'])
self.BLINK = float(selection['blinktime'])
self.SEND_DEFAULT_ON_EMPTY = bool(selection['send_default_on_empty'])
# reminder schedule
schedule = parser['schedule']
self.POST_TIME = time.strptime(schedule['post'].replace(":", " "), '%H %M')
self.NAGIOS_TIME = time.strptime(schedule['nagios'].replace(":", " "), '%H %M')
self.current_selection = self.DEFAULT_SELECTION[:]
self.led_state = self.DEFAULT_LED_STATE[:]
# Setup the LED pins
for i in range(len(self.TOPICS_LED)):
GPIO.setup(self.TOPICS_LED[i], GPIO.OUT)
GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
for i in range(len(self.ACTIONS_LED)):
GPIO.setup(self.ACTIONS_LED[i], GPIO.OUT)
GPIO.output(self.ACTIONS_LED[i], GPIO.LOW)
# Switch on default selection LEDs
for i in range(len(self.DEFAULT_SELECTION)):
for j in range(len(self.TOPICS)):
if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
self.led_state[j] = True
# Notification status
self.post_notification = False
self.nagios_notification = False
def led_reset_topics(self):
"""Disable all LEDs and enable default selection LEDs"""
# All off
for i in range(len(self.TOPICS_LED)):
GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
# Default an
for i in range(len(self.DEFAULT_SELECTION)):
for j in range(len(self.TOPICS)):
if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
self.led_state[j] = True
return
def continous_led_blink(blink_event, blink_thread, led):
"""Blink action LED continuously"""
while not blink_event.isSet():
GPIO.output(led, GPIO.HIGH)
time.sleep(self.BLINK)
event_is_set = blink_event.wait(blink_thread)
if event_is_set:
GPIO.output(led, GPIO.LOW)
else:
GPIO.output(led, GPIO.LOW)
time.sleep(self.BLINK)
def led_blink(self, led_pin):
"""Blink action LED
"""
t = Timer(self.BLINK, GPIO.output, [led_pin, GPIO.LOW])
GPIO.output(led_pin, GPIO.HIGH)
#time.sleep(.2)
#GPIO.output(led_pin, GPIO.LOW)
t.start()
return
def dismiss_notification(self, led_pin):
"""Disables notification LED
:param led_pin: GPIO pin number
"""
GPIO.output(led_pin, GPIO.LOW)
def led_toggle_topic(self, led_index):
"""Toggles LED state of led_index
:param led_index: Index in TOPICS_LED; contains GPIO pin number"""
self.led_state[led_index] = not self.led_state[led_index]
if self.led_state[led_index]:
GPIO.output(self.TOPICS_LED[led_index], GPIO.HIGH)
else:
GPIO.output(self.TOPICS_LED[led_index], GPIO.LOW)
return
def handle_selection(self, user_input):
"""
Handles de/selection of topics
Returns False if selection is altered
Returns True if CANCEL or COMMIT is chosen
:param user_input: String, name of pressed button
"""
# If cancelled, reset to default selection
if user_input == self.CANCEL:
self.led_blink(self.CANCEL_LED)
self.current_selection = self.DEFAULT_SELECTION[:]
self.led_reset_topics()
return True
# If selection is non-empty, commit selection
# - selection array is always sorted
# If selection is empty, reset to default selection
# and send default selection if SEND_DEFAULT_ON_EMPTY is True
elif user_input == self.CONFIRM:
self.led_blink(self.CONFIRM_LED)
if self.current_selection != :
#self.current_selection.sort()
database().make_commit(self.current_selection)
self.current_selection = self.DEFAULT_SELECTION[:]
else:
self.current_selection = self.DEFAULT_SELECTION[:]
if self.SEND_DEFAULT_ON_EMPTY:
if len(self.DEFAULT_SELECTION) > 0:
database().make_commit(self.current_selection)
return True
# TODO
# Disable "Post" notification
elif user_input == self.POST:
"""self.post_notification = False
self.blink_thread.start()
self.dismiss_notification(self.POST_LED)
return False"""
# TODO
# Disable "Nagios" notification
elif user_input == self.NAGIOS:
"""self.nagios_notification = False
self.dismiss_notification(self.NAGIOS_LED)
return False"""
else:
# Compare input with all TOPICS
for i in range(len(self.TOPICS)):
if user_input == self.TOPICS[i]:
self.led_toggle_topic(i)
already_exists = False
# Delete if exists already
for j in range(len(self.current_selection)):
if user_input == self.current_selection[j]:
already_exists = True
self.current_selection.pop(j)
print(user_input + " -")
break
# Add if does not exits yet
if not already_exists:
self.current_selection.append(user_input)
print(user_input + " +")
# Sort array
# Same order as in TOPICS
self.temp_selection =
for l in range(len(self.TOPICS)):
self.temp_selection.append("")
for i in range(len(self.TOPICS)):
for j in range(len(self.current_selection)):
if self.TOPICS[i] == self.current_selection[j]:
self.temp_selection[i]=self.TOPICS[i]
self.temp_selection = list(filter(None, self.temp_selection))
self.current_selection = self.temp_selection[:]
return False
def wait_for_input(self):
"""Wartet auf Input und gibt an handle_selection weiter."""
done_with_input = False
while not done_with_input:
user_input = None
while user_input is None:
user_input = Matrix().get_button()
done_with_input = self.handle_selection(user_input)
time.sleep(self.SLEEP)
return
if __name__ == '__main__':
GPIO.setwarnings(False)
database().external_prepare_table()
while True:
Controller().wait_for_input()
GPIO.cleanup()
sys.exit(0)
database_commit.py
:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import datetime
import time
from configparser import ConfigParser
from mysql.connector import MySQLConnection, Error
class DatabaseCommit():
def read_config(section, filename='config.ini'):
""" Reads config.ini and returns dictionary with config from section
:param filename: name of config file
:param section: name of section in config file
:return: dictionary with database settings
"""
parser = ConfigParser()
parser.read(filename)
config =
if parser.has_section(section):
items = parser.items(section)
for item in items:
config[item[0]] = item[1].replace(" ", "")
else:
raise Exception('0 not found in 1 '.format(section, filename))
return config
def establish_connection(self):
db_config = DatabaseCommit.read_config('mysql')
conn = MySQLConnection(**db_config)
if conn.is_connected():
return conn
else:
print("Connection failed")
def prepare_table(self, cursor):
# TODO check for existing tables, add any if neccessary, fill with 0
database_name = DatabaseCommit.read_config('mysql')['database'].replace(" ", "")
table_name = DatabaseCommit.read_config('table')['table_name'].replace(" ", "")
counter_column = DatabaseCommit.read_config('table')['counter_column'].replace(" ", "")
date_column = DatabaseCommit.read_config('table')['date_column'].replace(" ", "")
sql = 'CREATE TABLE IF NOT EXISTS '+ table_name +' ('
sql += counter_column +' INT PRIMARY KEY AUTO_INCREMENT, '
sql += date_column +' DATETIME'
for topic in DatabaseCommit.read_config('selection')['topics'].replace(" ", "").split(','):
topic = topic.lower()
sql += ', '+ topic +' BOOLEAN NOT NULL DEFAULT 0'
sql += ');'
print("n"+sql+"n")
cursor.execute(sql)
sql = ''
return cursor
def external_prepare_table(self):
conn = self.establish_connection()
cursor = conn.cursor()
self.prepare_table(cursor)
def insert_user_input(self, user_input, cursor, table_name):
date_column = DatabaseCommit.read_config('table')['date_column']
sql = 'INSERT INTO ' + table_name + '(' + date_column
for topic in user_input:
sql += ','+ topic
sql += ')'
sql += 'VALUES(NOW()'
for topic in user_input:
sql += ','+'TRUE'
sql += ')'
cursor.execute(sql)
def make_commit(self, user_input):
conn = self.establish_connection()
cursor = conn.cursor()
#self.prepare_table(cursor)
table_name = DatabaseCommit.read_config('table')['table_name']
self.insert_user_input(user_input, cursor, table_name)
cursor.execute('COMMIT;')
print("made commit :")
print(user_input)
cursor.close()
conn.close()
config.ini
:
Topic names and other info have been changed for privacy purposes
[gpio]
button_rows = 4, 17
button_columns = 18, 23, 24, 25, 12, 16
topics_led = 2, 3, 27, 22, 10, 9
actions_led = 11, 5, 6, 13, 19, 26
[selection]
topics = topic_1, topic_2, topic_3, topic_4, topic_5, topic_6
actions = Cancel, Commit, Post, Rundgang, unassigned_1, unassigned_2
default_selection = topic_2
sleeptime = .3
blinktime = .2
send_default_on_empty = True
[schedule]
post = 11:30
nagios = 09:30
[mysql]
host = 127.0.0.1
database = mydatabase
user = rpi
password = password
[table]
table_name = test
counter_column = number
date_column = date
python beginner mysql raspberry-pi
add a comment |Â
up vote
7
down vote
favorite
My project consists of a Raspberry Pi 2B V1.1 connected to a number of push-buttons via GPIO pins. The goal is to create a panel of buttons, allowing the user to make a selection of one or more items from a list, and then either cancel or commit their selection.
I am new to Python, and have only worked a little with Java and JS before.
The code is split into three modules:
matrix.py
: Gets user input from the hardware
The buttons are connected in a 2D matrix to save available pins. Currently; there are 6 columns and 2 rows; the first row contains the 6 choices, the second row 4 action buttons.controller.py
: Contains button logic, handles selection
The module reads theconfig.ini
for information on how to handle the input frommatrix.py
. This includes selecting/deselecting items, switching the LEDs on/off and forwarding the selection todatabase_commit.py
.database_commit.py
: Puts given input into a database
Received information is commited to a database according toconfig.ini
.
as well as a config.ini
file. In the following, the selectable items are called topics. "cancel", "commit", and the notification buttons are actions.
As I developed this while learning Python, the project is not well structured. What conceptual changes do I need to make in order to provide good readability, maintainability and extendability? What else should I improve?
The finished project will be in place for a long time, and I will not be able to maintain it; someone else will have to. I will supply a readme for the hardware maintenance, and have added info about the code there too. In my opinion, the code should be easily understandable without a manual though.
Note: The comments were translated to english. If anything is unclear, I'll gladly clarify.
matrix.py
# -*- coding: UTF-8 -*-
"""Reads input from GPIO with a reduced number of pins."""
import time
import RPi.GPIO as GPIO
from configparser import ConfigParser
class Matrix():
def __init__(self):
"""Sets up constants and pin numbers."""
# BCM (Broadcom) numbers are used
GPIO.setmode(GPIO.BCM)
parser = ConfigParser()
filename = 'config.ini'
parser.read(filename)
gpio = parser['gpio']
selection = parser['selection']
self.rows = list(map(int, gpio['button_rows'].split(',')))
self.columns = list(map(int, gpio['button_columns'].split(',')))
topics = selection['topics'].replace(" ","").split(',')
actions = selection['actions'].replace(" ","").split(',')
while len(actions) <= len(topics):
actions.append('unassigned')
self.options_array = [
topics,
actions
]
def get_button(self):
"""Findet gedrueckte Knoepfe und returnt Koordinaten falls gefunden."""
# To search for the row of a pressed button:
# - set all rows to input
# - set all columns to output low
# - save index in row_position
row_position = -1
for i in range(len(self.columns)):
GPIO.setup(self.columns[i], GPIO.OUT)
GPIO.output(self.columns[i], GPIO.LOW)
for i in range(len(self.rows)):
GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
for i in range(len(self.rows)):
temp_read = GPIO.input(self.rows[i])
if temp_read == 0:
row_position = i
# cancel if no input was found
if row_position < 0 or row_position >= len(self.rows):
self.refresh()
return
# To search for the column of a pressed button in the row:
# - set all columns to input
# - rows[row_position] to output
column_position = -1
for j in range(len(self.columns)):
GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(self.rows[row_position], GPIO.OUT)
GPIO.output(self.rows[row_position], GPIO.HIGH)
for j in range(len(self.columns)):
temp_read = GPIO.input(self.columns[j])
if temp_read == 1:
column_position = j
# cancel if no input was found
if column_position < 0 or column_position >= len(self.columns):
self.refresh()
return
# returns coordinates of pressed button
self.refresh()
return self.options_array[row_position][column_position]
def refresh(self):
"""Resets all pin states."""
for i in range(len(self.rows)):
GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
for j in range(len(self.columns)):
GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_UP)
if __name__ == '__main__':
# For testing matrix functionality
while True:
user_input = None
while user_input is None:
user_input = Matrix().get_button()
print(user_input)
time.sleep(.2)
controller.py
:
The TODOs are there to hint the concept and will be implemented later
#!/usr/bin/python3
"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
import time
import signal
import sys
import threading
import RPi.GPIO as GPIO
from threading import Timer
from configparser import ConfigParser
from matrix import Matrix
from database_commit import DatabaseCommit as database
class Controller():
"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
def __init__(self):
"""Sets up constants."""
# Use Broadcom-Numbering
GPIO.setmode(GPIO.BCM)
# Get constants from config.ini
parser = ConfigParser()
filename = 'config.ini'
parser.read(filename)
# Pin adresses
gpio = parser['gpio']
self.TOPICS_LED = list(map(int, gpio['topics_led'].split(',')))
self.ACTIONS_LED = list(map(int, gpio['actions_led'].split(',')))
self.CANCEL_LED = self.ACTIONS_LED[0]
self.CONFIRM_LED = self.ACTIONS_LED[1]
self.POST_LED = self.ACTIONS_LED[2]
self.NAGIOS_LED = self.ACTIONS_LED[3]
# names, defaults, timers
selection = parser['selection']
self.TOPICS = selection['topics'].replace(" ", "").split(',')
self.ACTIONS = selection['actions'].replace(" ", "").split(',')
self.CANCEL = self.ACTIONS[0]
self.CONFIRM = self.ACTIONS[1]
self.POST = self.ACTIONS[2]
self.NAGIOS = self.ACTIONS[3]
self.DEFAULT_SELECTION = selection['default_selection'].replace(" ", "").split(',')
self.DEFAULT_LED_STATE = [False] * len(self.TOPICS_LED)
self.SLEEP = float(selection['sleeptime'])
self.BLINK = float(selection['blinktime'])
self.SEND_DEFAULT_ON_EMPTY = bool(selection['send_default_on_empty'])
# reminder schedule
schedule = parser['schedule']
self.POST_TIME = time.strptime(schedule['post'].replace(":", " "), '%H %M')
self.NAGIOS_TIME = time.strptime(schedule['nagios'].replace(":", " "), '%H %M')
self.current_selection = self.DEFAULT_SELECTION[:]
self.led_state = self.DEFAULT_LED_STATE[:]
# Setup the LED pins
for i in range(len(self.TOPICS_LED)):
GPIO.setup(self.TOPICS_LED[i], GPIO.OUT)
GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
for i in range(len(self.ACTIONS_LED)):
GPIO.setup(self.ACTIONS_LED[i], GPIO.OUT)
GPIO.output(self.ACTIONS_LED[i], GPIO.LOW)
# Switch on default selection LEDs
for i in range(len(self.DEFAULT_SELECTION)):
for j in range(len(self.TOPICS)):
if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
self.led_state[j] = True
# Notification status
self.post_notification = False
self.nagios_notification = False
def led_reset_topics(self):
"""Disable all LEDs and enable default selection LEDs"""
# All off
for i in range(len(self.TOPICS_LED)):
GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
# Default an
for i in range(len(self.DEFAULT_SELECTION)):
for j in range(len(self.TOPICS)):
if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
self.led_state[j] = True
return
def continous_led_blink(blink_event, blink_thread, led):
"""Blink action LED continuously"""
while not blink_event.isSet():
GPIO.output(led, GPIO.HIGH)
time.sleep(self.BLINK)
event_is_set = blink_event.wait(blink_thread)
if event_is_set:
GPIO.output(led, GPIO.LOW)
else:
GPIO.output(led, GPIO.LOW)
time.sleep(self.BLINK)
def led_blink(self, led_pin):
"""Blink action LED
"""
t = Timer(self.BLINK, GPIO.output, [led_pin, GPIO.LOW])
GPIO.output(led_pin, GPIO.HIGH)
#time.sleep(.2)
#GPIO.output(led_pin, GPIO.LOW)
t.start()
return
def dismiss_notification(self, led_pin):
"""Disables notification LED
:param led_pin: GPIO pin number
"""
GPIO.output(led_pin, GPIO.LOW)
def led_toggle_topic(self, led_index):
"""Toggles LED state of led_index
:param led_index: Index in TOPICS_LED; contains GPIO pin number"""
self.led_state[led_index] = not self.led_state[led_index]
if self.led_state[led_index]:
GPIO.output(self.TOPICS_LED[led_index], GPIO.HIGH)
else:
GPIO.output(self.TOPICS_LED[led_index], GPIO.LOW)
return
def handle_selection(self, user_input):
"""
Handles de/selection of topics
Returns False if selection is altered
Returns True if CANCEL or COMMIT is chosen
:param user_input: String, name of pressed button
"""
# If cancelled, reset to default selection
if user_input == self.CANCEL:
self.led_blink(self.CANCEL_LED)
self.current_selection = self.DEFAULT_SELECTION[:]
self.led_reset_topics()
return True
# If selection is non-empty, commit selection
# - selection array is always sorted
# If selection is empty, reset to default selection
# and send default selection if SEND_DEFAULT_ON_EMPTY is True
elif user_input == self.CONFIRM:
self.led_blink(self.CONFIRM_LED)
if self.current_selection != :
#self.current_selection.sort()
database().make_commit(self.current_selection)
self.current_selection = self.DEFAULT_SELECTION[:]
else:
self.current_selection = self.DEFAULT_SELECTION[:]
if self.SEND_DEFAULT_ON_EMPTY:
if len(self.DEFAULT_SELECTION) > 0:
database().make_commit(self.current_selection)
return True
# TODO
# Disable "Post" notification
elif user_input == self.POST:
"""self.post_notification = False
self.blink_thread.start()
self.dismiss_notification(self.POST_LED)
return False"""
# TODO
# Disable "Nagios" notification
elif user_input == self.NAGIOS:
"""self.nagios_notification = False
self.dismiss_notification(self.NAGIOS_LED)
return False"""
else:
# Compare input with all TOPICS
for i in range(len(self.TOPICS)):
if user_input == self.TOPICS[i]:
self.led_toggle_topic(i)
already_exists = False
# Delete if exists already
for j in range(len(self.current_selection)):
if user_input == self.current_selection[j]:
already_exists = True
self.current_selection.pop(j)
print(user_input + " -")
break
# Add if does not exits yet
if not already_exists:
self.current_selection.append(user_input)
print(user_input + " +")
# Sort array
# Same order as in TOPICS
self.temp_selection =
for l in range(len(self.TOPICS)):
self.temp_selection.append("")
for i in range(len(self.TOPICS)):
for j in range(len(self.current_selection)):
if self.TOPICS[i] == self.current_selection[j]:
self.temp_selection[i]=self.TOPICS[i]
self.temp_selection = list(filter(None, self.temp_selection))
self.current_selection = self.temp_selection[:]
return False
def wait_for_input(self):
"""Wartet auf Input und gibt an handle_selection weiter."""
done_with_input = False
while not done_with_input:
user_input = None
while user_input is None:
user_input = Matrix().get_button()
done_with_input = self.handle_selection(user_input)
time.sleep(self.SLEEP)
return
if __name__ == '__main__':
GPIO.setwarnings(False)
database().external_prepare_table()
while True:
Controller().wait_for_input()
GPIO.cleanup()
sys.exit(0)
database_commit.py
:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import datetime
import time
from configparser import ConfigParser
from mysql.connector import MySQLConnection, Error
class DatabaseCommit():
def read_config(section, filename='config.ini'):
""" Reads config.ini and returns dictionary with config from section
:param filename: name of config file
:param section: name of section in config file
:return: dictionary with database settings
"""
parser = ConfigParser()
parser.read(filename)
config =
if parser.has_section(section):
items = parser.items(section)
for item in items:
config[item[0]] = item[1].replace(" ", "")
else:
raise Exception('0 not found in 1 '.format(section, filename))
return config
def establish_connection(self):
db_config = DatabaseCommit.read_config('mysql')
conn = MySQLConnection(**db_config)
if conn.is_connected():
return conn
else:
print("Connection failed")
def prepare_table(self, cursor):
# TODO check for existing tables, add any if neccessary, fill with 0
database_name = DatabaseCommit.read_config('mysql')['database'].replace(" ", "")
table_name = DatabaseCommit.read_config('table')['table_name'].replace(" ", "")
counter_column = DatabaseCommit.read_config('table')['counter_column'].replace(" ", "")
date_column = DatabaseCommit.read_config('table')['date_column'].replace(" ", "")
sql = 'CREATE TABLE IF NOT EXISTS '+ table_name +' ('
sql += counter_column +' INT PRIMARY KEY AUTO_INCREMENT, '
sql += date_column +' DATETIME'
for topic in DatabaseCommit.read_config('selection')['topics'].replace(" ", "").split(','):
topic = topic.lower()
sql += ', '+ topic +' BOOLEAN NOT NULL DEFAULT 0'
sql += ');'
print("n"+sql+"n")
cursor.execute(sql)
sql = ''
return cursor
def external_prepare_table(self):
conn = self.establish_connection()
cursor = conn.cursor()
self.prepare_table(cursor)
def insert_user_input(self, user_input, cursor, table_name):
date_column = DatabaseCommit.read_config('table')['date_column']
sql = 'INSERT INTO ' + table_name + '(' + date_column
for topic in user_input:
sql += ','+ topic
sql += ')'
sql += 'VALUES(NOW()'
for topic in user_input:
sql += ','+'TRUE'
sql += ')'
cursor.execute(sql)
def make_commit(self, user_input):
conn = self.establish_connection()
cursor = conn.cursor()
#self.prepare_table(cursor)
table_name = DatabaseCommit.read_config('table')['table_name']
self.insert_user_input(user_input, cursor, table_name)
cursor.execute('COMMIT;')
print("made commit :")
print(user_input)
cursor.close()
conn.close()
config.ini
:
Topic names and other info have been changed for privacy purposes
[gpio]
button_rows = 4, 17
button_columns = 18, 23, 24, 25, 12, 16
topics_led = 2, 3, 27, 22, 10, 9
actions_led = 11, 5, 6, 13, 19, 26
[selection]
topics = topic_1, topic_2, topic_3, topic_4, topic_5, topic_6
actions = Cancel, Commit, Post, Rundgang, unassigned_1, unassigned_2
default_selection = topic_2
sleeptime = .3
blinktime = .2
send_default_on_empty = True
[schedule]
post = 11:30
nagios = 09:30
[mysql]
host = 127.0.0.1
database = mydatabase
user = rpi
password = password
[table]
table_name = test
counter_column = number
date_column = date
python beginner mysql raspberry-pi
Is there a reason you need this many buttons?
â Mast
Mar 2 at 15:19
@Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
â Orphevs
Mar 2 at 17:37
Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
â A. Romeu
Mar 3 at 0:31
add a comment |Â
up vote
7
down vote
favorite
up vote
7
down vote
favorite
My project consists of a Raspberry Pi 2B V1.1 connected to a number of push-buttons via GPIO pins. The goal is to create a panel of buttons, allowing the user to make a selection of one or more items from a list, and then either cancel or commit their selection.
I am new to Python, and have only worked a little with Java and JS before.
The code is split into three modules:
matrix.py
: Gets user input from the hardware
The buttons are connected in a 2D matrix to save available pins. Currently; there are 6 columns and 2 rows; the first row contains the 6 choices, the second row 4 action buttons.controller.py
: Contains button logic, handles selection
The module reads theconfig.ini
for information on how to handle the input frommatrix.py
. This includes selecting/deselecting items, switching the LEDs on/off and forwarding the selection todatabase_commit.py
.database_commit.py
: Puts given input into a database
Received information is commited to a database according toconfig.ini
.
as well as a config.ini
file. In the following, the selectable items are called topics. "cancel", "commit", and the notification buttons are actions.
As I developed this while learning Python, the project is not well structured. What conceptual changes do I need to make in order to provide good readability, maintainability and extendability? What else should I improve?
The finished project will be in place for a long time, and I will not be able to maintain it; someone else will have to. I will supply a readme for the hardware maintenance, and have added info about the code there too. In my opinion, the code should be easily understandable without a manual though.
Note: The comments were translated to english. If anything is unclear, I'll gladly clarify.
matrix.py
# -*- coding: UTF-8 -*-
"""Reads input from GPIO with a reduced number of pins."""
import time
import RPi.GPIO as GPIO
from configparser import ConfigParser
class Matrix():
def __init__(self):
"""Sets up constants and pin numbers."""
# BCM (Broadcom) numbers are used
GPIO.setmode(GPIO.BCM)
parser = ConfigParser()
filename = 'config.ini'
parser.read(filename)
gpio = parser['gpio']
selection = parser['selection']
self.rows = list(map(int, gpio['button_rows'].split(',')))
self.columns = list(map(int, gpio['button_columns'].split(',')))
topics = selection['topics'].replace(" ","").split(',')
actions = selection['actions'].replace(" ","").split(',')
while len(actions) <= len(topics):
actions.append('unassigned')
self.options_array = [
topics,
actions
]
def get_button(self):
"""Findet gedrueckte Knoepfe und returnt Koordinaten falls gefunden."""
# To search for the row of a pressed button:
# - set all rows to input
# - set all columns to output low
# - save index in row_position
row_position = -1
for i in range(len(self.columns)):
GPIO.setup(self.columns[i], GPIO.OUT)
GPIO.output(self.columns[i], GPIO.LOW)
for i in range(len(self.rows)):
GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
for i in range(len(self.rows)):
temp_read = GPIO.input(self.rows[i])
if temp_read == 0:
row_position = i
# cancel if no input was found
if row_position < 0 or row_position >= len(self.rows):
self.refresh()
return
# To search for the column of a pressed button in the row:
# - set all columns to input
# - rows[row_position] to output
column_position = -1
for j in range(len(self.columns)):
GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(self.rows[row_position], GPIO.OUT)
GPIO.output(self.rows[row_position], GPIO.HIGH)
for j in range(len(self.columns)):
temp_read = GPIO.input(self.columns[j])
if temp_read == 1:
column_position = j
# cancel if no input was found
if column_position < 0 or column_position >= len(self.columns):
self.refresh()
return
# returns coordinates of pressed button
self.refresh()
return self.options_array[row_position][column_position]
def refresh(self):
"""Resets all pin states."""
for i in range(len(self.rows)):
GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
for j in range(len(self.columns)):
GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_UP)
if __name__ == '__main__':
# For testing matrix functionality
while True:
user_input = None
while user_input is None:
user_input = Matrix().get_button()
print(user_input)
time.sleep(.2)
controller.py
:
The TODOs are there to hint the concept and will be implemented later
#!/usr/bin/python3
"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
import time
import signal
import sys
import threading
import RPi.GPIO as GPIO
from threading import Timer
from configparser import ConfigParser
from matrix import Matrix
from database_commit import DatabaseCommit as database
class Controller():
"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
def __init__(self):
"""Sets up constants."""
# Use Broadcom-Numbering
GPIO.setmode(GPIO.BCM)
# Get constants from config.ini
parser = ConfigParser()
filename = 'config.ini'
parser.read(filename)
# Pin adresses
gpio = parser['gpio']
self.TOPICS_LED = list(map(int, gpio['topics_led'].split(',')))
self.ACTIONS_LED = list(map(int, gpio['actions_led'].split(',')))
self.CANCEL_LED = self.ACTIONS_LED[0]
self.CONFIRM_LED = self.ACTIONS_LED[1]
self.POST_LED = self.ACTIONS_LED[2]
self.NAGIOS_LED = self.ACTIONS_LED[3]
# names, defaults, timers
selection = parser['selection']
self.TOPICS = selection['topics'].replace(" ", "").split(',')
self.ACTIONS = selection['actions'].replace(" ", "").split(',')
self.CANCEL = self.ACTIONS[0]
self.CONFIRM = self.ACTIONS[1]
self.POST = self.ACTIONS[2]
self.NAGIOS = self.ACTIONS[3]
self.DEFAULT_SELECTION = selection['default_selection'].replace(" ", "").split(',')
self.DEFAULT_LED_STATE = [False] * len(self.TOPICS_LED)
self.SLEEP = float(selection['sleeptime'])
self.BLINK = float(selection['blinktime'])
self.SEND_DEFAULT_ON_EMPTY = bool(selection['send_default_on_empty'])
# reminder schedule
schedule = parser['schedule']
self.POST_TIME = time.strptime(schedule['post'].replace(":", " "), '%H %M')
self.NAGIOS_TIME = time.strptime(schedule['nagios'].replace(":", " "), '%H %M')
self.current_selection = self.DEFAULT_SELECTION[:]
self.led_state = self.DEFAULT_LED_STATE[:]
# Setup the LED pins
for i in range(len(self.TOPICS_LED)):
GPIO.setup(self.TOPICS_LED[i], GPIO.OUT)
GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
for i in range(len(self.ACTIONS_LED)):
GPIO.setup(self.ACTIONS_LED[i], GPIO.OUT)
GPIO.output(self.ACTIONS_LED[i], GPIO.LOW)
# Switch on default selection LEDs
for i in range(len(self.DEFAULT_SELECTION)):
for j in range(len(self.TOPICS)):
if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
self.led_state[j] = True
# Notification status
self.post_notification = False
self.nagios_notification = False
def led_reset_topics(self):
"""Disable all LEDs and enable default selection LEDs"""
# All off
for i in range(len(self.TOPICS_LED)):
GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
# Default an
for i in range(len(self.DEFAULT_SELECTION)):
for j in range(len(self.TOPICS)):
if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
self.led_state[j] = True
return
def continous_led_blink(blink_event, blink_thread, led):
"""Blink action LED continuously"""
while not blink_event.isSet():
GPIO.output(led, GPIO.HIGH)
time.sleep(self.BLINK)
event_is_set = blink_event.wait(blink_thread)
if event_is_set:
GPIO.output(led, GPIO.LOW)
else:
GPIO.output(led, GPIO.LOW)
time.sleep(self.BLINK)
def led_blink(self, led_pin):
"""Blink action LED
"""
t = Timer(self.BLINK, GPIO.output, [led_pin, GPIO.LOW])
GPIO.output(led_pin, GPIO.HIGH)
#time.sleep(.2)
#GPIO.output(led_pin, GPIO.LOW)
t.start()
return
def dismiss_notification(self, led_pin):
"""Disables notification LED
:param led_pin: GPIO pin number
"""
GPIO.output(led_pin, GPIO.LOW)
def led_toggle_topic(self, led_index):
"""Toggles LED state of led_index
:param led_index: Index in TOPICS_LED; contains GPIO pin number"""
self.led_state[led_index] = not self.led_state[led_index]
if self.led_state[led_index]:
GPIO.output(self.TOPICS_LED[led_index], GPIO.HIGH)
else:
GPIO.output(self.TOPICS_LED[led_index], GPIO.LOW)
return
def handle_selection(self, user_input):
"""
Handles de/selection of topics
Returns False if selection is altered
Returns True if CANCEL or COMMIT is chosen
:param user_input: String, name of pressed button
"""
# If cancelled, reset to default selection
if user_input == self.CANCEL:
self.led_blink(self.CANCEL_LED)
self.current_selection = self.DEFAULT_SELECTION[:]
self.led_reset_topics()
return True
# If selection is non-empty, commit selection
# - selection array is always sorted
# If selection is empty, reset to default selection
# and send default selection if SEND_DEFAULT_ON_EMPTY is True
elif user_input == self.CONFIRM:
self.led_blink(self.CONFIRM_LED)
if self.current_selection != :
#self.current_selection.sort()
database().make_commit(self.current_selection)
self.current_selection = self.DEFAULT_SELECTION[:]
else:
self.current_selection = self.DEFAULT_SELECTION[:]
if self.SEND_DEFAULT_ON_EMPTY:
if len(self.DEFAULT_SELECTION) > 0:
database().make_commit(self.current_selection)
return True
# TODO
# Disable "Post" notification
elif user_input == self.POST:
"""self.post_notification = False
self.blink_thread.start()
self.dismiss_notification(self.POST_LED)
return False"""
# TODO
# Disable "Nagios" notification
elif user_input == self.NAGIOS:
"""self.nagios_notification = False
self.dismiss_notification(self.NAGIOS_LED)
return False"""
else:
# Compare input with all TOPICS
for i in range(len(self.TOPICS)):
if user_input == self.TOPICS[i]:
self.led_toggle_topic(i)
already_exists = False
# Delete if exists already
for j in range(len(self.current_selection)):
if user_input == self.current_selection[j]:
already_exists = True
self.current_selection.pop(j)
print(user_input + " -")
break
# Add if does not exits yet
if not already_exists:
self.current_selection.append(user_input)
print(user_input + " +")
# Sort array
# Same order as in TOPICS
self.temp_selection =
for l in range(len(self.TOPICS)):
self.temp_selection.append("")
for i in range(len(self.TOPICS)):
for j in range(len(self.current_selection)):
if self.TOPICS[i] == self.current_selection[j]:
self.temp_selection[i]=self.TOPICS[i]
self.temp_selection = list(filter(None, self.temp_selection))
self.current_selection = self.temp_selection[:]
return False
def wait_for_input(self):
"""Wartet auf Input und gibt an handle_selection weiter."""
done_with_input = False
while not done_with_input:
user_input = None
while user_input is None:
user_input = Matrix().get_button()
done_with_input = self.handle_selection(user_input)
time.sleep(self.SLEEP)
return
if __name__ == '__main__':
GPIO.setwarnings(False)
database().external_prepare_table()
while True:
Controller().wait_for_input()
GPIO.cleanup()
sys.exit(0)
database_commit.py
:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import datetime
import time
from configparser import ConfigParser
from mysql.connector import MySQLConnection, Error
class DatabaseCommit():
def read_config(section, filename='config.ini'):
""" Reads config.ini and returns dictionary with config from section
:param filename: name of config file
:param section: name of section in config file
:return: dictionary with database settings
"""
parser = ConfigParser()
parser.read(filename)
config =
if parser.has_section(section):
items = parser.items(section)
for item in items:
config[item[0]] = item[1].replace(" ", "")
else:
raise Exception('0 not found in 1 '.format(section, filename))
return config
def establish_connection(self):
db_config = DatabaseCommit.read_config('mysql')
conn = MySQLConnection(**db_config)
if conn.is_connected():
return conn
else:
print("Connection failed")
def prepare_table(self, cursor):
# TODO check for existing tables, add any if neccessary, fill with 0
database_name = DatabaseCommit.read_config('mysql')['database'].replace(" ", "")
table_name = DatabaseCommit.read_config('table')['table_name'].replace(" ", "")
counter_column = DatabaseCommit.read_config('table')['counter_column'].replace(" ", "")
date_column = DatabaseCommit.read_config('table')['date_column'].replace(" ", "")
sql = 'CREATE TABLE IF NOT EXISTS '+ table_name +' ('
sql += counter_column +' INT PRIMARY KEY AUTO_INCREMENT, '
sql += date_column +' DATETIME'
for topic in DatabaseCommit.read_config('selection')['topics'].replace(" ", "").split(','):
topic = topic.lower()
sql += ', '+ topic +' BOOLEAN NOT NULL DEFAULT 0'
sql += ');'
print("n"+sql+"n")
cursor.execute(sql)
sql = ''
return cursor
def external_prepare_table(self):
conn = self.establish_connection()
cursor = conn.cursor()
self.prepare_table(cursor)
def insert_user_input(self, user_input, cursor, table_name):
date_column = DatabaseCommit.read_config('table')['date_column']
sql = 'INSERT INTO ' + table_name + '(' + date_column
for topic in user_input:
sql += ','+ topic
sql += ')'
sql += 'VALUES(NOW()'
for topic in user_input:
sql += ','+'TRUE'
sql += ')'
cursor.execute(sql)
def make_commit(self, user_input):
conn = self.establish_connection()
cursor = conn.cursor()
#self.prepare_table(cursor)
table_name = DatabaseCommit.read_config('table')['table_name']
self.insert_user_input(user_input, cursor, table_name)
cursor.execute('COMMIT;')
print("made commit :")
print(user_input)
cursor.close()
conn.close()
config.ini
:
Topic names and other info have been changed for privacy purposes
[gpio]
button_rows = 4, 17
button_columns = 18, 23, 24, 25, 12, 16
topics_led = 2, 3, 27, 22, 10, 9
actions_led = 11, 5, 6, 13, 19, 26
[selection]
topics = topic_1, topic_2, topic_3, topic_4, topic_5, topic_6
actions = Cancel, Commit, Post, Rundgang, unassigned_1, unassigned_2
default_selection = topic_2
sleeptime = .3
blinktime = .2
send_default_on_empty = True
[schedule]
post = 11:30
nagios = 09:30
[mysql]
host = 127.0.0.1
database = mydatabase
user = rpi
password = password
[table]
table_name = test
counter_column = number
date_column = date
python beginner mysql raspberry-pi
My project consists of a Raspberry Pi 2B V1.1 connected to a number of push-buttons via GPIO pins. The goal is to create a panel of buttons, allowing the user to make a selection of one or more items from a list, and then either cancel or commit their selection.
I am new to Python, and have only worked a little with Java and JS before.
The code is split into three modules:
matrix.py
: Gets user input from the hardware
The buttons are connected in a 2D matrix to save available pins. Currently; there are 6 columns and 2 rows; the first row contains the 6 choices, the second row 4 action buttons.controller.py
: Contains button logic, handles selection
The module reads theconfig.ini
for information on how to handle the input frommatrix.py
. This includes selecting/deselecting items, switching the LEDs on/off and forwarding the selection todatabase_commit.py
.database_commit.py
: Puts given input into a database
Received information is commited to a database according toconfig.ini
.
as well as a config.ini
file. In the following, the selectable items are called topics. "cancel", "commit", and the notification buttons are actions.
As I developed this while learning Python, the project is not well structured. What conceptual changes do I need to make in order to provide good readability, maintainability and extendability? What else should I improve?
The finished project will be in place for a long time, and I will not be able to maintain it; someone else will have to. I will supply a readme for the hardware maintenance, and have added info about the code there too. In my opinion, the code should be easily understandable without a manual though.
Note: The comments were translated to english. If anything is unclear, I'll gladly clarify.
matrix.py
# -*- coding: UTF-8 -*-
"""Reads input from GPIO with a reduced number of pins."""
import time
import RPi.GPIO as GPIO
from configparser import ConfigParser
class Matrix():
def __init__(self):
"""Sets up constants and pin numbers."""
# BCM (Broadcom) numbers are used
GPIO.setmode(GPIO.BCM)
parser = ConfigParser()
filename = 'config.ini'
parser.read(filename)
gpio = parser['gpio']
selection = parser['selection']
self.rows = list(map(int, gpio['button_rows'].split(',')))
self.columns = list(map(int, gpio['button_columns'].split(',')))
topics = selection['topics'].replace(" ","").split(',')
actions = selection['actions'].replace(" ","").split(',')
while len(actions) <= len(topics):
actions.append('unassigned')
self.options_array = [
topics,
actions
]
def get_button(self):
"""Findet gedrueckte Knoepfe und returnt Koordinaten falls gefunden."""
# To search for the row of a pressed button:
# - set all rows to input
# - set all columns to output low
# - save index in row_position
row_position = -1
for i in range(len(self.columns)):
GPIO.setup(self.columns[i], GPIO.OUT)
GPIO.output(self.columns[i], GPIO.LOW)
for i in range(len(self.rows)):
GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
for i in range(len(self.rows)):
temp_read = GPIO.input(self.rows[i])
if temp_read == 0:
row_position = i
# cancel if no input was found
if row_position < 0 or row_position >= len(self.rows):
self.refresh()
return
# To search for the column of a pressed button in the row:
# - set all columns to input
# - rows[row_position] to output
column_position = -1
for j in range(len(self.columns)):
GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(self.rows[row_position], GPIO.OUT)
GPIO.output(self.rows[row_position], GPIO.HIGH)
for j in range(len(self.columns)):
temp_read = GPIO.input(self.columns[j])
if temp_read == 1:
column_position = j
# cancel if no input was found
if column_position < 0 or column_position >= len(self.columns):
self.refresh()
return
# returns coordinates of pressed button
self.refresh()
return self.options_array[row_position][column_position]
def refresh(self):
"""Resets all pin states."""
for i in range(len(self.rows)):
GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
for j in range(len(self.columns)):
GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_UP)
if __name__ == '__main__':
# For testing matrix functionality
while True:
user_input = None
while user_input is None:
user_input = Matrix().get_button()
print(user_input)
time.sleep(.2)
controller.py
:
The TODOs are there to hint the concept and will be implemented later
#!/usr/bin/python3
"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
import time
import signal
import sys
import threading
import RPi.GPIO as GPIO
from threading import Timer
from configparser import ConfigParser
from matrix import Matrix
from database_commit import DatabaseCommit as database
class Controller():
"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
def __init__(self):
"""Sets up constants."""
# Use Broadcom-Numbering
GPIO.setmode(GPIO.BCM)
# Get constants from config.ini
parser = ConfigParser()
filename = 'config.ini'
parser.read(filename)
# Pin adresses
gpio = parser['gpio']
self.TOPICS_LED = list(map(int, gpio['topics_led'].split(',')))
self.ACTIONS_LED = list(map(int, gpio['actions_led'].split(',')))
self.CANCEL_LED = self.ACTIONS_LED[0]
self.CONFIRM_LED = self.ACTIONS_LED[1]
self.POST_LED = self.ACTIONS_LED[2]
self.NAGIOS_LED = self.ACTIONS_LED[3]
# names, defaults, timers
selection = parser['selection']
self.TOPICS = selection['topics'].replace(" ", "").split(',')
self.ACTIONS = selection['actions'].replace(" ", "").split(',')
self.CANCEL = self.ACTIONS[0]
self.CONFIRM = self.ACTIONS[1]
self.POST = self.ACTIONS[2]
self.NAGIOS = self.ACTIONS[3]
self.DEFAULT_SELECTION = selection['default_selection'].replace(" ", "").split(',')
self.DEFAULT_LED_STATE = [False] * len(self.TOPICS_LED)
self.SLEEP = float(selection['sleeptime'])
self.BLINK = float(selection['blinktime'])
self.SEND_DEFAULT_ON_EMPTY = bool(selection['send_default_on_empty'])
# reminder schedule
schedule = parser['schedule']
self.POST_TIME = time.strptime(schedule['post'].replace(":", " "), '%H %M')
self.NAGIOS_TIME = time.strptime(schedule['nagios'].replace(":", " "), '%H %M')
self.current_selection = self.DEFAULT_SELECTION[:]
self.led_state = self.DEFAULT_LED_STATE[:]
# Setup the LED pins
for i in range(len(self.TOPICS_LED)):
GPIO.setup(self.TOPICS_LED[i], GPIO.OUT)
GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
for i in range(len(self.ACTIONS_LED)):
GPIO.setup(self.ACTIONS_LED[i], GPIO.OUT)
GPIO.output(self.ACTIONS_LED[i], GPIO.LOW)
# Switch on default selection LEDs
for i in range(len(self.DEFAULT_SELECTION)):
for j in range(len(self.TOPICS)):
if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
self.led_state[j] = True
# Notification status
self.post_notification = False
self.nagios_notification = False
def led_reset_topics(self):
"""Disable all LEDs and enable default selection LEDs"""
# All off
for i in range(len(self.TOPICS_LED)):
GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
# Default an
for i in range(len(self.DEFAULT_SELECTION)):
for j in range(len(self.TOPICS)):
if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
self.led_state[j] = True
return
def continous_led_blink(blink_event, blink_thread, led):
"""Blink action LED continuously"""
while not blink_event.isSet():
GPIO.output(led, GPIO.HIGH)
time.sleep(self.BLINK)
event_is_set = blink_event.wait(blink_thread)
if event_is_set:
GPIO.output(led, GPIO.LOW)
else:
GPIO.output(led, GPIO.LOW)
time.sleep(self.BLINK)
def led_blink(self, led_pin):
"""Blink action LED
"""
t = Timer(self.BLINK, GPIO.output, [led_pin, GPIO.LOW])
GPIO.output(led_pin, GPIO.HIGH)
#time.sleep(.2)
#GPIO.output(led_pin, GPIO.LOW)
t.start()
return
def dismiss_notification(self, led_pin):
"""Disables notification LED
:param led_pin: GPIO pin number
"""
GPIO.output(led_pin, GPIO.LOW)
def led_toggle_topic(self, led_index):
"""Toggles LED state of led_index
:param led_index: Index in TOPICS_LED; contains GPIO pin number"""
self.led_state[led_index] = not self.led_state[led_index]
if self.led_state[led_index]:
GPIO.output(self.TOPICS_LED[led_index], GPIO.HIGH)
else:
GPIO.output(self.TOPICS_LED[led_index], GPIO.LOW)
return
def handle_selection(self, user_input):
"""
Handles de/selection of topics
Returns False if selection is altered
Returns True if CANCEL or COMMIT is chosen
:param user_input: String, name of pressed button
"""
# If cancelled, reset to default selection
if user_input == self.CANCEL:
self.led_blink(self.CANCEL_LED)
self.current_selection = self.DEFAULT_SELECTION[:]
self.led_reset_topics()
return True
# If selection is non-empty, commit selection
# - selection array is always sorted
# If selection is empty, reset to default selection
# and send default selection if SEND_DEFAULT_ON_EMPTY is True
elif user_input == self.CONFIRM:
self.led_blink(self.CONFIRM_LED)
if self.current_selection != :
#self.current_selection.sort()
database().make_commit(self.current_selection)
self.current_selection = self.DEFAULT_SELECTION[:]
else:
self.current_selection = self.DEFAULT_SELECTION[:]
if self.SEND_DEFAULT_ON_EMPTY:
if len(self.DEFAULT_SELECTION) > 0:
database().make_commit(self.current_selection)
return True
# TODO
# Disable "Post" notification
elif user_input == self.POST:
"""self.post_notification = False
self.blink_thread.start()
self.dismiss_notification(self.POST_LED)
return False"""
# TODO
# Disable "Nagios" notification
elif user_input == self.NAGIOS:
"""self.nagios_notification = False
self.dismiss_notification(self.NAGIOS_LED)
return False"""
else:
# Compare input with all TOPICS
for i in range(len(self.TOPICS)):
if user_input == self.TOPICS[i]:
self.led_toggle_topic(i)
already_exists = False
# Delete if exists already
for j in range(len(self.current_selection)):
if user_input == self.current_selection[j]:
already_exists = True
self.current_selection.pop(j)
print(user_input + " -")
break
# Add if does not exits yet
if not already_exists:
self.current_selection.append(user_input)
print(user_input + " +")
# Sort array
# Same order as in TOPICS
self.temp_selection =
for l in range(len(self.TOPICS)):
self.temp_selection.append("")
for i in range(len(self.TOPICS)):
for j in range(len(self.current_selection)):
if self.TOPICS[i] == self.current_selection[j]:
self.temp_selection[i]=self.TOPICS[i]
self.temp_selection = list(filter(None, self.temp_selection))
self.current_selection = self.temp_selection[:]
return False
def wait_for_input(self):
"""Wartet auf Input und gibt an handle_selection weiter."""
done_with_input = False
while not done_with_input:
user_input = None
while user_input is None:
user_input = Matrix().get_button()
done_with_input = self.handle_selection(user_input)
time.sleep(self.SLEEP)
return
if __name__ == '__main__':
GPIO.setwarnings(False)
database().external_prepare_table()
while True:
Controller().wait_for_input()
GPIO.cleanup()
sys.exit(0)
database_commit.py
:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import datetime
import time
from configparser import ConfigParser
from mysql.connector import MySQLConnection, Error
class DatabaseCommit():
def read_config(section, filename='config.ini'):
""" Reads config.ini and returns dictionary with config from section
:param filename: name of config file
:param section: name of section in config file
:return: dictionary with database settings
"""
parser = ConfigParser()
parser.read(filename)
config =
if parser.has_section(section):
items = parser.items(section)
for item in items:
config[item[0]] = item[1].replace(" ", "")
else:
raise Exception('0 not found in 1 '.format(section, filename))
return config
def establish_connection(self):
db_config = DatabaseCommit.read_config('mysql')
conn = MySQLConnection(**db_config)
if conn.is_connected():
return conn
else:
print("Connection failed")
def prepare_table(self, cursor):
# TODO check for existing tables, add any if neccessary, fill with 0
database_name = DatabaseCommit.read_config('mysql')['database'].replace(" ", "")
table_name = DatabaseCommit.read_config('table')['table_name'].replace(" ", "")
counter_column = DatabaseCommit.read_config('table')['counter_column'].replace(" ", "")
date_column = DatabaseCommit.read_config('table')['date_column'].replace(" ", "")
sql = 'CREATE TABLE IF NOT EXISTS '+ table_name +' ('
sql += counter_column +' INT PRIMARY KEY AUTO_INCREMENT, '
sql += date_column +' DATETIME'
for topic in DatabaseCommit.read_config('selection')['topics'].replace(" ", "").split(','):
topic = topic.lower()
sql += ', '+ topic +' BOOLEAN NOT NULL DEFAULT 0'
sql += ');'
print("n"+sql+"n")
cursor.execute(sql)
sql = ''
return cursor
def external_prepare_table(self):
conn = self.establish_connection()
cursor = conn.cursor()
self.prepare_table(cursor)
def insert_user_input(self, user_input, cursor, table_name):
date_column = DatabaseCommit.read_config('table')['date_column']
sql = 'INSERT INTO ' + table_name + '(' + date_column
for topic in user_input:
sql += ','+ topic
sql += ')'
sql += 'VALUES(NOW()'
for topic in user_input:
sql += ','+'TRUE'
sql += ')'
cursor.execute(sql)
def make_commit(self, user_input):
conn = self.establish_connection()
cursor = conn.cursor()
#self.prepare_table(cursor)
table_name = DatabaseCommit.read_config('table')['table_name']
self.insert_user_input(user_input, cursor, table_name)
cursor.execute('COMMIT;')
print("made commit :")
print(user_input)
cursor.close()
conn.close()
config.ini
:
Topic names and other info have been changed for privacy purposes
[gpio]
button_rows = 4, 17
button_columns = 18, 23, 24, 25, 12, 16
topics_led = 2, 3, 27, 22, 10, 9
actions_led = 11, 5, 6, 13, 19, 26
[selection]
topics = topic_1, topic_2, topic_3, topic_4, topic_5, topic_6
actions = Cancel, Commit, Post, Rundgang, unassigned_1, unassigned_2
default_selection = topic_2
sleeptime = .3
blinktime = .2
send_default_on_empty = True
[schedule]
post = 11:30
nagios = 09:30
[mysql]
host = 127.0.0.1
database = mydatabase
user = rpi
password = password
[table]
table_name = test
counter_column = number
date_column = date
python beginner mysql raspberry-pi
asked Mar 2 at 14:09
Orphevs
1363
1363
Is there a reason you need this many buttons?
â Mast
Mar 2 at 15:19
@Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
â Orphevs
Mar 2 at 17:37
Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
â A. Romeu
Mar 3 at 0:31
add a comment |Â
Is there a reason you need this many buttons?
â Mast
Mar 2 at 15:19
@Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
â Orphevs
Mar 2 at 17:37
Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
â A. Romeu
Mar 3 at 0:31
Is there a reason you need this many buttons?
â Mast
Mar 2 at 15:19
Is there a reason you need this many buttons?
â Mast
Mar 2 at 15:19
@Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
â Orphevs
Mar 2 at 17:37
@Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
â Orphevs
Mar 2 at 17:37
Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
â A. Romeu
Mar 3 at 0:31
Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
â A. Romeu
Mar 3 at 0:31
add a comment |Â
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f188680%2fpython-rpi-button-board%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Is there a reason you need this many buttons?
â Mast
Mar 2 at 15:19
@Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
â Orphevs
Mar 2 at 17:37
Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
â A. Romeu
Mar 3 at 0:31