Python RPI button board

 Clash Royale CLAN TAG#URR8PPP
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
7
down vote
favorite
My project consists of a Raspberry Pi 2B V1.1 connected to a number of push-buttons via GPIO pins. The goal is to create a panel of buttons, allowing the user to make a selection of one or more items from a list, and then either cancel or commit their selection.
I am new to Python, and have only worked a little with Java and JS before.
The code is split into three modules:
- matrix.py: Gets user input from the hardware
 The buttons are connected in a 2D matrix to save available pins. Currently; there are 6 columns and 2 rows; the first row contains the 6 choices, the second row 4 action buttons.
- controller.py: Contains button logic, handles selection
 The module reads the- config.inifor information on how to handle the input from- matrix.py. This includes selecting/deselecting items, switching the LEDs on/off and forwarding the selection to- database_commit.py.
- database_commit.py: Puts given input into a database
 Received information is commited to a database according to- config.ini.
as well as a config.ini file. In the following, the selectable items are called topics. "cancel", "commit", and the notification buttons are actions.
As I developed this while learning Python, the project is not well structured. What conceptual changes do I need to make in order to provide good readability, maintainability and extendability? What else should I improve?
The finished project will be in place for a long time, and I will not be able to maintain it; someone else will have to. I will supply a readme for the hardware maintenance, and have added info about the code there too. In my opinion, the code should be easily understandable without a manual though.
Note: The comments were translated to english. If anything is unclear, I'll gladly clarify.
matrix.py
# -*- coding: UTF-8 -*-
"""Reads input from GPIO with a reduced number of pins."""
import time
import RPi.GPIO as GPIO
from configparser import ConfigParser
class Matrix():
 def __init__(self):
 """Sets up constants and pin numbers."""
 # BCM (Broadcom) numbers are used
 GPIO.setmode(GPIO.BCM)
 parser = ConfigParser()
 filename = 'config.ini'
 parser.read(filename)
 gpio = parser['gpio']
 selection = parser['selection']
 self.rows = list(map(int, gpio['button_rows'].split(',')))
 self.columns = list(map(int, gpio['button_columns'].split(',')))
 topics = selection['topics'].replace(" ","").split(',')
 actions = selection['actions'].replace(" ","").split(',')
 while len(actions) <= len(topics):
 actions.append('unassigned')
 self.options_array = [
 topics,
 actions
 ]
 def get_button(self):
 """Findet gedrueckte Knoepfe und returnt Koordinaten falls gefunden."""
 # To search for the row of a pressed button:
 # - set all rows to input
 # - set all columns to output low
 # - save index in row_position
 row_position = -1
 for i in range(len(self.columns)):
 GPIO.setup(self.columns[i], GPIO.OUT)
 GPIO.output(self.columns[i], GPIO.LOW)
 for i in range(len(self.rows)):
 GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
 for i in range(len(self.rows)):
 temp_read = GPIO.input(self.rows[i])
 if temp_read == 0:
 row_position = i
 # cancel if no input was found
 if row_position < 0 or row_position >= len(self.rows):
 self.refresh()
 return
 # To search for the column of a pressed button in the row:
 # - set all columns to input
 # - rows[row_position] to output
 column_position = -1
 for j in range(len(self.columns)):
 GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
 GPIO.setup(self.rows[row_position], GPIO.OUT)
 GPIO.output(self.rows[row_position], GPIO.HIGH)
 for j in range(len(self.columns)):
 temp_read = GPIO.input(self.columns[j])
 if temp_read == 1:
 column_position = j
 # cancel if no input was found
 if column_position < 0 or column_position >= len(self.columns):
 self.refresh()
 return
 # returns coordinates of pressed button
 self.refresh()
 return self.options_array[row_position][column_position]
 def refresh(self):
 """Resets all pin states."""
 for i in range(len(self.rows)):
 GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
 for j in range(len(self.columns)):
 GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_UP)
if __name__ == '__main__':
 # For testing matrix functionality
 while True:
 user_input = None
 while user_input is None:
 user_input = Matrix().get_button()
 print(user_input)
 time.sleep(.2)
controller.py:
The TODOs are there to hint the concept and will be implemented later
#!/usr/bin/python3
"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
import time
import signal
import sys
import threading
import RPi.GPIO as GPIO
from threading import Timer
from configparser import ConfigParser
from matrix import Matrix
from database_commit import DatabaseCommit as database
class Controller():
 """Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
 def __init__(self):
 """Sets up constants."""
 # Use Broadcom-Numbering
 GPIO.setmode(GPIO.BCM)
 # Get constants from config.ini
 parser = ConfigParser()
 filename = 'config.ini'
 parser.read(filename)
 # Pin adresses
 gpio = parser['gpio']
 self.TOPICS_LED = list(map(int, gpio['topics_led'].split(',')))
 self.ACTIONS_LED = list(map(int, gpio['actions_led'].split(',')))
 self.CANCEL_LED = self.ACTIONS_LED[0]
 self.CONFIRM_LED = self.ACTIONS_LED[1]
 self.POST_LED = self.ACTIONS_LED[2]
 self.NAGIOS_LED = self.ACTIONS_LED[3]
 # names, defaults, timers
 selection = parser['selection']
 self.TOPICS = selection['topics'].replace(" ", "").split(',')
 self.ACTIONS = selection['actions'].replace(" ", "").split(',')
 self.CANCEL = self.ACTIONS[0]
 self.CONFIRM = self.ACTIONS[1]
 self.POST = self.ACTIONS[2]
 self.NAGIOS = self.ACTIONS[3]
 self.DEFAULT_SELECTION = selection['default_selection'].replace(" ", "").split(',')
 self.DEFAULT_LED_STATE = [False] * len(self.TOPICS_LED)
 self.SLEEP = float(selection['sleeptime'])
 self.BLINK = float(selection['blinktime'])
 self.SEND_DEFAULT_ON_EMPTY = bool(selection['send_default_on_empty'])
 # reminder schedule
 schedule = parser['schedule']
 self.POST_TIME = time.strptime(schedule['post'].replace(":", " "), '%H %M')
 self.NAGIOS_TIME = time.strptime(schedule['nagios'].replace(":", " "), '%H %M')
 self.current_selection = self.DEFAULT_SELECTION[:]
 self.led_state = self.DEFAULT_LED_STATE[:]
 # Setup the LED pins
 for i in range(len(self.TOPICS_LED)):
 GPIO.setup(self.TOPICS_LED[i], GPIO.OUT)
 GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
 for i in range(len(self.ACTIONS_LED)):
 GPIO.setup(self.ACTIONS_LED[i], GPIO.OUT)
 GPIO.output(self.ACTIONS_LED[i], GPIO.LOW)
 # Switch on default selection LEDs
 for i in range(len(self.DEFAULT_SELECTION)):
 for j in range(len(self.TOPICS)):
 if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
 GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
 self.led_state[j] = True
 # Notification status 
 self.post_notification = False
 self.nagios_notification = False
 def led_reset_topics(self):
 """Disable all LEDs and enable default selection LEDs"""
 # All off
 for i in range(len(self.TOPICS_LED)):
 GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
 # Default an
 for i in range(len(self.DEFAULT_SELECTION)):
 for j in range(len(self.TOPICS)):
 if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
 GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
 self.led_state[j] = True
 return
 def continous_led_blink(blink_event, blink_thread, led):
 """Blink action LED continuously"""
 while not blink_event.isSet():
 GPIO.output(led, GPIO.HIGH)
 time.sleep(self.BLINK)
 event_is_set = blink_event.wait(blink_thread)
 if event_is_set:
 GPIO.output(led, GPIO.LOW)
 else:
 GPIO.output(led, GPIO.LOW)
 time.sleep(self.BLINK)
 def led_blink(self, led_pin):
 """Blink action LED
 """
 t = Timer(self.BLINK, GPIO.output, [led_pin, GPIO.LOW])
 GPIO.output(led_pin, GPIO.HIGH)
 #time.sleep(.2)
 #GPIO.output(led_pin, GPIO.LOW)
 t.start()
 return
 def dismiss_notification(self, led_pin):
 """Disables notification LED
 :param led_pin: GPIO pin number
 """
 GPIO.output(led_pin, GPIO.LOW)
 def led_toggle_topic(self, led_index):
 """Toggles LED state of led_index
 :param led_index: Index in TOPICS_LED; contains GPIO pin number"""
 self.led_state[led_index] = not self.led_state[led_index]
 if self.led_state[led_index]:
 GPIO.output(self.TOPICS_LED[led_index], GPIO.HIGH)
 else:
 GPIO.output(self.TOPICS_LED[led_index], GPIO.LOW)
 return
 def handle_selection(self, user_input):
 """
 Handles de/selection of topics
 Returns False if selection is altered
 Returns True if CANCEL or COMMIT is chosen
 :param user_input: String, name of pressed button
 """
 # If cancelled, reset to default selection
 if user_input == self.CANCEL:
 self.led_blink(self.CANCEL_LED)
 self.current_selection = self.DEFAULT_SELECTION[:]
 self.led_reset_topics()
 return True
 # If selection is non-empty, commit selection
 # - selection array is always sorted
 # If selection is empty, reset to default selection
 # and send default selection if SEND_DEFAULT_ON_EMPTY is True
 elif user_input == self.CONFIRM:
 self.led_blink(self.CONFIRM_LED)
 if self.current_selection != :
 #self.current_selection.sort()
 database().make_commit(self.current_selection)
 self.current_selection = self.DEFAULT_SELECTION[:]
 else:
 self.current_selection = self.DEFAULT_SELECTION[:]
 if self.SEND_DEFAULT_ON_EMPTY:
 if len(self.DEFAULT_SELECTION) > 0:
 database().make_commit(self.current_selection)
 return True
 # TODO
 # Disable "Post" notification
 elif user_input == self.POST:
 """self.post_notification = False
 self.blink_thread.start()
 self.dismiss_notification(self.POST_LED)
 return False"""
 # TODO
 # Disable "Nagios" notification
 elif user_input == self.NAGIOS:
 """self.nagios_notification = False
 self.dismiss_notification(self.NAGIOS_LED)
 return False"""
 else:
 # Compare input with all TOPICS
 for i in range(len(self.TOPICS)):
 if user_input == self.TOPICS[i]:
 self.led_toggle_topic(i)
 already_exists = False
 # Delete if exists already
 for j in range(len(self.current_selection)):
 if user_input == self.current_selection[j]:
 already_exists = True
 self.current_selection.pop(j)
 print(user_input + " -")
 break
 # Add if does not exits yet
 if not already_exists:
 self.current_selection.append(user_input)
 print(user_input + " +")
 # Sort array
 # Same order as in TOPICS
 self.temp_selection = 
 for l in range(len(self.TOPICS)):
 self.temp_selection.append("")
 for i in range(len(self.TOPICS)):
 for j in range(len(self.current_selection)):
 if self.TOPICS[i] == self.current_selection[j]:
 self.temp_selection[i]=self.TOPICS[i]
 self.temp_selection = list(filter(None, self.temp_selection))
 self.current_selection = self.temp_selection[:]
 return False
 def wait_for_input(self):
 """Wartet auf Input und gibt an handle_selection weiter."""
 done_with_input = False
 while not done_with_input:
 user_input = None
 while user_input is None:
 user_input = Matrix().get_button()
 done_with_input = self.handle_selection(user_input)
 time.sleep(self.SLEEP)
 return
if __name__ == '__main__':
 GPIO.setwarnings(False)
 database().external_prepare_table()
 while True:
 Controller().wait_for_input()
 GPIO.cleanup()
 sys.exit(0)
database_commit.py:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import datetime
import time
from configparser import ConfigParser
from mysql.connector import MySQLConnection, Error
class DatabaseCommit():
 def read_config(section, filename='config.ini'):
 """ Reads config.ini and returns dictionary with config from section
 :param filename: name of config file
 :param section: name of section in config file
 :return: dictionary with database settings
 """
 parser = ConfigParser()
 parser.read(filename)
 config = 
 if parser.has_section(section):
 items = parser.items(section)
 for item in items:
 config[item[0]] = item[1].replace(" ", "")
 else:
 raise Exception('0 not found in 1 '.format(section, filename))
 return config
 def establish_connection(self):
 db_config = DatabaseCommit.read_config('mysql')
 conn = MySQLConnection(**db_config)
 if conn.is_connected():
 return conn
 else:
 print("Connection failed")
 def prepare_table(self, cursor):
 # TODO check for existing tables, add any if neccessary, fill with 0
 database_name = DatabaseCommit.read_config('mysql')['database'].replace(" ", "")
 table_name = DatabaseCommit.read_config('table')['table_name'].replace(" ", "")
 counter_column = DatabaseCommit.read_config('table')['counter_column'].replace(" ", "")
 date_column = DatabaseCommit.read_config('table')['date_column'].replace(" ", "")
 sql = 'CREATE TABLE IF NOT EXISTS '+ table_name +' ('
 sql += counter_column +' INT PRIMARY KEY AUTO_INCREMENT, '
 sql += date_column +' DATETIME'
 for topic in DatabaseCommit.read_config('selection')['topics'].replace(" ", "").split(','):
 topic = topic.lower()
 sql += ', '+ topic +' BOOLEAN NOT NULL DEFAULT 0'
 sql += ');'
 print("n"+sql+"n")
 cursor.execute(sql)
 sql = ''
 return cursor
 def external_prepare_table(self):
 conn = self.establish_connection()
 cursor = conn.cursor()
 self.prepare_table(cursor)
 def insert_user_input(self, user_input, cursor, table_name):
 date_column = DatabaseCommit.read_config('table')['date_column']
 sql = 'INSERT INTO ' + table_name + '(' + date_column
 for topic in user_input:
 sql += ','+ topic
 sql += ')'
 sql += 'VALUES(NOW()'
 for topic in user_input:
 sql += ','+'TRUE'
 sql += ')'
 cursor.execute(sql)
 def make_commit(self, user_input):
 conn = self.establish_connection()
 cursor = conn.cursor()
 #self.prepare_table(cursor)
 table_name = DatabaseCommit.read_config('table')['table_name']
 self.insert_user_input(user_input, cursor, table_name)
 cursor.execute('COMMIT;')
 print("made commit :")
 print(user_input)
 cursor.close()
 conn.close()
config.ini:
Topic names and other info have been changed for privacy purposes
[gpio]
button_rows = 4, 17
button_columns = 18, 23, 24, 25, 12, 16
topics_led = 2, 3, 27, 22, 10, 9
actions_led = 11, 5, 6, 13, 19, 26
[selection]
topics = topic_1, topic_2, topic_3, topic_4, topic_5, topic_6
actions = Cancel, Commit, Post, Rundgang, unassigned_1, unassigned_2
default_selection = topic_2
sleeptime = .3
blinktime = .2
send_default_on_empty = True
[schedule]
post = 11:30
nagios = 09:30
[mysql]
host = 127.0.0.1
database = mydatabase
user = rpi
password = password
[table]
table_name = test
counter_column = number
date_column = date
python beginner mysql raspberry-pi
add a comment |Â
up vote
7
down vote
favorite
My project consists of a Raspberry Pi 2B V1.1 connected to a number of push-buttons via GPIO pins. The goal is to create a panel of buttons, allowing the user to make a selection of one or more items from a list, and then either cancel or commit their selection.
I am new to Python, and have only worked a little with Java and JS before.
The code is split into three modules:
- matrix.py: Gets user input from the hardware
 The buttons are connected in a 2D matrix to save available pins. Currently; there are 6 columns and 2 rows; the first row contains the 6 choices, the second row 4 action buttons.
- controller.py: Contains button logic, handles selection
 The module reads the- config.inifor information on how to handle the input from- matrix.py. This includes selecting/deselecting items, switching the LEDs on/off and forwarding the selection to- database_commit.py.
- database_commit.py: Puts given input into a database
 Received information is commited to a database according to- config.ini.
as well as a config.ini file. In the following, the selectable items are called topics. "cancel", "commit", and the notification buttons are actions.
As I developed this while learning Python, the project is not well structured. What conceptual changes do I need to make in order to provide good readability, maintainability and extendability? What else should I improve?
The finished project will be in place for a long time, and I will not be able to maintain it; someone else will have to. I will supply a readme for the hardware maintenance, and have added info about the code there too. In my opinion, the code should be easily understandable without a manual though.
Note: The comments were translated to english. If anything is unclear, I'll gladly clarify.
matrix.py
# -*- coding: UTF-8 -*-
"""Reads input from GPIO with a reduced number of pins."""
import time
import RPi.GPIO as GPIO
from configparser import ConfigParser
class Matrix():
 def __init__(self):
 """Sets up constants and pin numbers."""
 # BCM (Broadcom) numbers are used
 GPIO.setmode(GPIO.BCM)
 parser = ConfigParser()
 filename = 'config.ini'
 parser.read(filename)
 gpio = parser['gpio']
 selection = parser['selection']
 self.rows = list(map(int, gpio['button_rows'].split(',')))
 self.columns = list(map(int, gpio['button_columns'].split(',')))
 topics = selection['topics'].replace(" ","").split(',')
 actions = selection['actions'].replace(" ","").split(',')
 while len(actions) <= len(topics):
 actions.append('unassigned')
 self.options_array = [
 topics,
 actions
 ]
 def get_button(self):
 """Findet gedrueckte Knoepfe und returnt Koordinaten falls gefunden."""
 # To search for the row of a pressed button:
 # - set all rows to input
 # - set all columns to output low
 # - save index in row_position
 row_position = -1
 for i in range(len(self.columns)):
 GPIO.setup(self.columns[i], GPIO.OUT)
 GPIO.output(self.columns[i], GPIO.LOW)
 for i in range(len(self.rows)):
 GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
 for i in range(len(self.rows)):
 temp_read = GPIO.input(self.rows[i])
 if temp_read == 0:
 row_position = i
 # cancel if no input was found
 if row_position < 0 or row_position >= len(self.rows):
 self.refresh()
 return
 # To search for the column of a pressed button in the row:
 # - set all columns to input
 # - rows[row_position] to output
 column_position = -1
 for j in range(len(self.columns)):
 GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
 GPIO.setup(self.rows[row_position], GPIO.OUT)
 GPIO.output(self.rows[row_position], GPIO.HIGH)
 for j in range(len(self.columns)):
 temp_read = GPIO.input(self.columns[j])
 if temp_read == 1:
 column_position = j
 # cancel if no input was found
 if column_position < 0 or column_position >= len(self.columns):
 self.refresh()
 return
 # returns coordinates of pressed button
 self.refresh()
 return self.options_array[row_position][column_position]
 def refresh(self):
 """Resets all pin states."""
 for i in range(len(self.rows)):
 GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
 for j in range(len(self.columns)):
 GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_UP)
if __name__ == '__main__':
 # For testing matrix functionality
 while True:
 user_input = None
 while user_input is None:
 user_input = Matrix().get_button()
 print(user_input)
 time.sleep(.2)
controller.py:
The TODOs are there to hint the concept and will be implemented later
#!/usr/bin/python3
"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
import time
import signal
import sys
import threading
import RPi.GPIO as GPIO
from threading import Timer
from configparser import ConfigParser
from matrix import Matrix
from database_commit import DatabaseCommit as database
class Controller():
 """Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
 def __init__(self):
 """Sets up constants."""
 # Use Broadcom-Numbering
 GPIO.setmode(GPIO.BCM)
 # Get constants from config.ini
 parser = ConfigParser()
 filename = 'config.ini'
 parser.read(filename)
 # Pin adresses
 gpio = parser['gpio']
 self.TOPICS_LED = list(map(int, gpio['topics_led'].split(',')))
 self.ACTIONS_LED = list(map(int, gpio['actions_led'].split(',')))
 self.CANCEL_LED = self.ACTIONS_LED[0]
 self.CONFIRM_LED = self.ACTIONS_LED[1]
 self.POST_LED = self.ACTIONS_LED[2]
 self.NAGIOS_LED = self.ACTIONS_LED[3]
 # names, defaults, timers
 selection = parser['selection']
 self.TOPICS = selection['topics'].replace(" ", "").split(',')
 self.ACTIONS = selection['actions'].replace(" ", "").split(',')
 self.CANCEL = self.ACTIONS[0]
 self.CONFIRM = self.ACTIONS[1]
 self.POST = self.ACTIONS[2]
 self.NAGIOS = self.ACTIONS[3]
 self.DEFAULT_SELECTION = selection['default_selection'].replace(" ", "").split(',')
 self.DEFAULT_LED_STATE = [False] * len(self.TOPICS_LED)
 self.SLEEP = float(selection['sleeptime'])
 self.BLINK = float(selection['blinktime'])
 self.SEND_DEFAULT_ON_EMPTY = bool(selection['send_default_on_empty'])
 # reminder schedule
 schedule = parser['schedule']
 self.POST_TIME = time.strptime(schedule['post'].replace(":", " "), '%H %M')
 self.NAGIOS_TIME = time.strptime(schedule['nagios'].replace(":", " "), '%H %M')
 self.current_selection = self.DEFAULT_SELECTION[:]
 self.led_state = self.DEFAULT_LED_STATE[:]
 # Setup the LED pins
 for i in range(len(self.TOPICS_LED)):
 GPIO.setup(self.TOPICS_LED[i], GPIO.OUT)
 GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
 for i in range(len(self.ACTIONS_LED)):
 GPIO.setup(self.ACTIONS_LED[i], GPIO.OUT)
 GPIO.output(self.ACTIONS_LED[i], GPIO.LOW)
 # Switch on default selection LEDs
 for i in range(len(self.DEFAULT_SELECTION)):
 for j in range(len(self.TOPICS)):
 if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
 GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
 self.led_state[j] = True
 # Notification status 
 self.post_notification = False
 self.nagios_notification = False
 def led_reset_topics(self):
 """Disable all LEDs and enable default selection LEDs"""
 # All off
 for i in range(len(self.TOPICS_LED)):
 GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
 # Default an
 for i in range(len(self.DEFAULT_SELECTION)):
 for j in range(len(self.TOPICS)):
 if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
 GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
 self.led_state[j] = True
 return
 def continous_led_blink(blink_event, blink_thread, led):
 """Blink action LED continuously"""
 while not blink_event.isSet():
 GPIO.output(led, GPIO.HIGH)
 time.sleep(self.BLINK)
 event_is_set = blink_event.wait(blink_thread)
 if event_is_set:
 GPIO.output(led, GPIO.LOW)
 else:
 GPIO.output(led, GPIO.LOW)
 time.sleep(self.BLINK)
 def led_blink(self, led_pin):
 """Blink action LED
 """
 t = Timer(self.BLINK, GPIO.output, [led_pin, GPIO.LOW])
 GPIO.output(led_pin, GPIO.HIGH)
 #time.sleep(.2)
 #GPIO.output(led_pin, GPIO.LOW)
 t.start()
 return
 def dismiss_notification(self, led_pin):
 """Disables notification LED
 :param led_pin: GPIO pin number
 """
 GPIO.output(led_pin, GPIO.LOW)
 def led_toggle_topic(self, led_index):
 """Toggles LED state of led_index
 :param led_index: Index in TOPICS_LED; contains GPIO pin number"""
 self.led_state[led_index] = not self.led_state[led_index]
 if self.led_state[led_index]:
 GPIO.output(self.TOPICS_LED[led_index], GPIO.HIGH)
 else:
 GPIO.output(self.TOPICS_LED[led_index], GPIO.LOW)
 return
 def handle_selection(self, user_input):
 """
 Handles de/selection of topics
 Returns False if selection is altered
 Returns True if CANCEL or COMMIT is chosen
 :param user_input: String, name of pressed button
 """
 # If cancelled, reset to default selection
 if user_input == self.CANCEL:
 self.led_blink(self.CANCEL_LED)
 self.current_selection = self.DEFAULT_SELECTION[:]
 self.led_reset_topics()
 return True
 # If selection is non-empty, commit selection
 # - selection array is always sorted
 # If selection is empty, reset to default selection
 # and send default selection if SEND_DEFAULT_ON_EMPTY is True
 elif user_input == self.CONFIRM:
 self.led_blink(self.CONFIRM_LED)
 if self.current_selection != :
 #self.current_selection.sort()
 database().make_commit(self.current_selection)
 self.current_selection = self.DEFAULT_SELECTION[:]
 else:
 self.current_selection = self.DEFAULT_SELECTION[:]
 if self.SEND_DEFAULT_ON_EMPTY:
 if len(self.DEFAULT_SELECTION) > 0:
 database().make_commit(self.current_selection)
 return True
 # TODO
 # Disable "Post" notification
 elif user_input == self.POST:
 """self.post_notification = False
 self.blink_thread.start()
 self.dismiss_notification(self.POST_LED)
 return False"""
 # TODO
 # Disable "Nagios" notification
 elif user_input == self.NAGIOS:
 """self.nagios_notification = False
 self.dismiss_notification(self.NAGIOS_LED)
 return False"""
 else:
 # Compare input with all TOPICS
 for i in range(len(self.TOPICS)):
 if user_input == self.TOPICS[i]:
 self.led_toggle_topic(i)
 already_exists = False
 # Delete if exists already
 for j in range(len(self.current_selection)):
 if user_input == self.current_selection[j]:
 already_exists = True
 self.current_selection.pop(j)
 print(user_input + " -")
 break
 # Add if does not exits yet
 if not already_exists:
 self.current_selection.append(user_input)
 print(user_input + " +")
 # Sort array
 # Same order as in TOPICS
 self.temp_selection = 
 for l in range(len(self.TOPICS)):
 self.temp_selection.append("")
 for i in range(len(self.TOPICS)):
 for j in range(len(self.current_selection)):
 if self.TOPICS[i] == self.current_selection[j]:
 self.temp_selection[i]=self.TOPICS[i]
 self.temp_selection = list(filter(None, self.temp_selection))
 self.current_selection = self.temp_selection[:]
 return False
 def wait_for_input(self):
 """Wartet auf Input und gibt an handle_selection weiter."""
 done_with_input = False
 while not done_with_input:
 user_input = None
 while user_input is None:
 user_input = Matrix().get_button()
 done_with_input = self.handle_selection(user_input)
 time.sleep(self.SLEEP)
 return
if __name__ == '__main__':
 GPIO.setwarnings(False)
 database().external_prepare_table()
 while True:
 Controller().wait_for_input()
 GPIO.cleanup()
 sys.exit(0)
database_commit.py:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import datetime
import time
from configparser import ConfigParser
from mysql.connector import MySQLConnection, Error
class DatabaseCommit():
 def read_config(section, filename='config.ini'):
 """ Reads config.ini and returns dictionary with config from section
 :param filename: name of config file
 :param section: name of section in config file
 :return: dictionary with database settings
 """
 parser = ConfigParser()
 parser.read(filename)
 config = 
 if parser.has_section(section):
 items = parser.items(section)
 for item in items:
 config[item[0]] = item[1].replace(" ", "")
 else:
 raise Exception('0 not found in 1 '.format(section, filename))
 return config
 def establish_connection(self):
 db_config = DatabaseCommit.read_config('mysql')
 conn = MySQLConnection(**db_config)
 if conn.is_connected():
 return conn
 else:
 print("Connection failed")
 def prepare_table(self, cursor):
 # TODO check for existing tables, add any if neccessary, fill with 0
 database_name = DatabaseCommit.read_config('mysql')['database'].replace(" ", "")
 table_name = DatabaseCommit.read_config('table')['table_name'].replace(" ", "")
 counter_column = DatabaseCommit.read_config('table')['counter_column'].replace(" ", "")
 date_column = DatabaseCommit.read_config('table')['date_column'].replace(" ", "")
 sql = 'CREATE TABLE IF NOT EXISTS '+ table_name +' ('
 sql += counter_column +' INT PRIMARY KEY AUTO_INCREMENT, '
 sql += date_column +' DATETIME'
 for topic in DatabaseCommit.read_config('selection')['topics'].replace(" ", "").split(','):
 topic = topic.lower()
 sql += ', '+ topic +' BOOLEAN NOT NULL DEFAULT 0'
 sql += ');'
 print("n"+sql+"n")
 cursor.execute(sql)
 sql = ''
 return cursor
 def external_prepare_table(self):
 conn = self.establish_connection()
 cursor = conn.cursor()
 self.prepare_table(cursor)
 def insert_user_input(self, user_input, cursor, table_name):
 date_column = DatabaseCommit.read_config('table')['date_column']
 sql = 'INSERT INTO ' + table_name + '(' + date_column
 for topic in user_input:
 sql += ','+ topic
 sql += ')'
 sql += 'VALUES(NOW()'
 for topic in user_input:
 sql += ','+'TRUE'
 sql += ')'
 cursor.execute(sql)
 def make_commit(self, user_input):
 conn = self.establish_connection()
 cursor = conn.cursor()
 #self.prepare_table(cursor)
 table_name = DatabaseCommit.read_config('table')['table_name']
 self.insert_user_input(user_input, cursor, table_name)
 cursor.execute('COMMIT;')
 print("made commit :")
 print(user_input)
 cursor.close()
 conn.close()
config.ini:
Topic names and other info have been changed for privacy purposes
[gpio]
button_rows = 4, 17
button_columns = 18, 23, 24, 25, 12, 16
topics_led = 2, 3, 27, 22, 10, 9
actions_led = 11, 5, 6, 13, 19, 26
[selection]
topics = topic_1, topic_2, topic_3, topic_4, topic_5, topic_6
actions = Cancel, Commit, Post, Rundgang, unassigned_1, unassigned_2
default_selection = topic_2
sleeptime = .3
blinktime = .2
send_default_on_empty = True
[schedule]
post = 11:30
nagios = 09:30
[mysql]
host = 127.0.0.1
database = mydatabase
user = rpi
password = password
[table]
table_name = test
counter_column = number
date_column = date
python beginner mysql raspberry-pi
 
 
 
 
 
 
 Is there a reason you need this many buttons?
 â Mast
 Mar 2 at 15:19
 
 
 
 
 
 
 
 
 
 @Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
 â Orphevs
 Mar 2 at 17:37
 
 
 
 
 
 
 
 
 
 Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
 â A. Romeu
 Mar 3 at 0:31
 
 
 
add a comment |Â
up vote
7
down vote
favorite
up vote
7
down vote
favorite
My project consists of a Raspberry Pi 2B V1.1 connected to a number of push-buttons via GPIO pins. The goal is to create a panel of buttons, allowing the user to make a selection of one or more items from a list, and then either cancel or commit their selection.
I am new to Python, and have only worked a little with Java and JS before.
The code is split into three modules:
- matrix.py: Gets user input from the hardware
 The buttons are connected in a 2D matrix to save available pins. Currently; there are 6 columns and 2 rows; the first row contains the 6 choices, the second row 4 action buttons.
- controller.py: Contains button logic, handles selection
 The module reads the- config.inifor information on how to handle the input from- matrix.py. This includes selecting/deselecting items, switching the LEDs on/off and forwarding the selection to- database_commit.py.
- database_commit.py: Puts given input into a database
 Received information is commited to a database according to- config.ini.
as well as a config.ini file. In the following, the selectable items are called topics. "cancel", "commit", and the notification buttons are actions.
As I developed this while learning Python, the project is not well structured. What conceptual changes do I need to make in order to provide good readability, maintainability and extendability? What else should I improve?
The finished project will be in place for a long time, and I will not be able to maintain it; someone else will have to. I will supply a readme for the hardware maintenance, and have added info about the code there too. In my opinion, the code should be easily understandable without a manual though.
Note: The comments were translated to english. If anything is unclear, I'll gladly clarify.
matrix.py
# -*- coding: UTF-8 -*-
"""Reads input from GPIO with a reduced number of pins."""
import time
import RPi.GPIO as GPIO
from configparser import ConfigParser
class Matrix():
 def __init__(self):
 """Sets up constants and pin numbers."""
 # BCM (Broadcom) numbers are used
 GPIO.setmode(GPIO.BCM)
 parser = ConfigParser()
 filename = 'config.ini'
 parser.read(filename)
 gpio = parser['gpio']
 selection = parser['selection']
 self.rows = list(map(int, gpio['button_rows'].split(',')))
 self.columns = list(map(int, gpio['button_columns'].split(',')))
 topics = selection['topics'].replace(" ","").split(',')
 actions = selection['actions'].replace(" ","").split(',')
 while len(actions) <= len(topics):
 actions.append('unassigned')
 self.options_array = [
 topics,
 actions
 ]
 def get_button(self):
 """Findet gedrueckte Knoepfe und returnt Koordinaten falls gefunden."""
 # To search for the row of a pressed button:
 # - set all rows to input
 # - set all columns to output low
 # - save index in row_position
 row_position = -1
 for i in range(len(self.columns)):
 GPIO.setup(self.columns[i], GPIO.OUT)
 GPIO.output(self.columns[i], GPIO.LOW)
 for i in range(len(self.rows)):
 GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
 for i in range(len(self.rows)):
 temp_read = GPIO.input(self.rows[i])
 if temp_read == 0:
 row_position = i
 # cancel if no input was found
 if row_position < 0 or row_position >= len(self.rows):
 self.refresh()
 return
 # To search for the column of a pressed button in the row:
 # - set all columns to input
 # - rows[row_position] to output
 column_position = -1
 for j in range(len(self.columns)):
 GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
 GPIO.setup(self.rows[row_position], GPIO.OUT)
 GPIO.output(self.rows[row_position], GPIO.HIGH)
 for j in range(len(self.columns)):
 temp_read = GPIO.input(self.columns[j])
 if temp_read == 1:
 column_position = j
 # cancel if no input was found
 if column_position < 0 or column_position >= len(self.columns):
 self.refresh()
 return
 # returns coordinates of pressed button
 self.refresh()
 return self.options_array[row_position][column_position]
 def refresh(self):
 """Resets all pin states."""
 for i in range(len(self.rows)):
 GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
 for j in range(len(self.columns)):
 GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_UP)
if __name__ == '__main__':
 # For testing matrix functionality
 while True:
 user_input = None
 while user_input is None:
 user_input = Matrix().get_button()
 print(user_input)
 time.sleep(.2)
controller.py:
The TODOs are there to hint the concept and will be implemented later
#!/usr/bin/python3
"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
import time
import signal
import sys
import threading
import RPi.GPIO as GPIO
from threading import Timer
from configparser import ConfigParser
from matrix import Matrix
from database_commit import DatabaseCommit as database
class Controller():
 """Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
 def __init__(self):
 """Sets up constants."""
 # Use Broadcom-Numbering
 GPIO.setmode(GPIO.BCM)
 # Get constants from config.ini
 parser = ConfigParser()
 filename = 'config.ini'
 parser.read(filename)
 # Pin adresses
 gpio = parser['gpio']
 self.TOPICS_LED = list(map(int, gpio['topics_led'].split(',')))
 self.ACTIONS_LED = list(map(int, gpio['actions_led'].split(',')))
 self.CANCEL_LED = self.ACTIONS_LED[0]
 self.CONFIRM_LED = self.ACTIONS_LED[1]
 self.POST_LED = self.ACTIONS_LED[2]
 self.NAGIOS_LED = self.ACTIONS_LED[3]
 # names, defaults, timers
 selection = parser['selection']
 self.TOPICS = selection['topics'].replace(" ", "").split(',')
 self.ACTIONS = selection['actions'].replace(" ", "").split(',')
 self.CANCEL = self.ACTIONS[0]
 self.CONFIRM = self.ACTIONS[1]
 self.POST = self.ACTIONS[2]
 self.NAGIOS = self.ACTIONS[3]
 self.DEFAULT_SELECTION = selection['default_selection'].replace(" ", "").split(',')
 self.DEFAULT_LED_STATE = [False] * len(self.TOPICS_LED)
 self.SLEEP = float(selection['sleeptime'])
 self.BLINK = float(selection['blinktime'])
 self.SEND_DEFAULT_ON_EMPTY = bool(selection['send_default_on_empty'])
 # reminder schedule
 schedule = parser['schedule']
 self.POST_TIME = time.strptime(schedule['post'].replace(":", " "), '%H %M')
 self.NAGIOS_TIME = time.strptime(schedule['nagios'].replace(":", " "), '%H %M')
 self.current_selection = self.DEFAULT_SELECTION[:]
 self.led_state = self.DEFAULT_LED_STATE[:]
 # Setup the LED pins
 for i in range(len(self.TOPICS_LED)):
 GPIO.setup(self.TOPICS_LED[i], GPIO.OUT)
 GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
 for i in range(len(self.ACTIONS_LED)):
 GPIO.setup(self.ACTIONS_LED[i], GPIO.OUT)
 GPIO.output(self.ACTIONS_LED[i], GPIO.LOW)
 # Switch on default selection LEDs
 for i in range(len(self.DEFAULT_SELECTION)):
 for j in range(len(self.TOPICS)):
 if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
 GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
 self.led_state[j] = True
 # Notification status 
 self.post_notification = False
 self.nagios_notification = False
 def led_reset_topics(self):
 """Disable all LEDs and enable default selection LEDs"""
 # All off
 for i in range(len(self.TOPICS_LED)):
 GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
 # Default an
 for i in range(len(self.DEFAULT_SELECTION)):
 for j in range(len(self.TOPICS)):
 if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
 GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
 self.led_state[j] = True
 return
 def continous_led_blink(blink_event, blink_thread, led):
 """Blink action LED continuously"""
 while not blink_event.isSet():
 GPIO.output(led, GPIO.HIGH)
 time.sleep(self.BLINK)
 event_is_set = blink_event.wait(blink_thread)
 if event_is_set:
 GPIO.output(led, GPIO.LOW)
 else:
 GPIO.output(led, GPIO.LOW)
 time.sleep(self.BLINK)
 def led_blink(self, led_pin):
 """Blink action LED
 """
 t = Timer(self.BLINK, GPIO.output, [led_pin, GPIO.LOW])
 GPIO.output(led_pin, GPIO.HIGH)
 #time.sleep(.2)
 #GPIO.output(led_pin, GPIO.LOW)
 t.start()
 return
 def dismiss_notification(self, led_pin):
 """Disables notification LED
 :param led_pin: GPIO pin number
 """
 GPIO.output(led_pin, GPIO.LOW)
 def led_toggle_topic(self, led_index):
 """Toggles LED state of led_index
 :param led_index: Index in TOPICS_LED; contains GPIO pin number"""
 self.led_state[led_index] = not self.led_state[led_index]
 if self.led_state[led_index]:
 GPIO.output(self.TOPICS_LED[led_index], GPIO.HIGH)
 else:
 GPIO.output(self.TOPICS_LED[led_index], GPIO.LOW)
 return
 def handle_selection(self, user_input):
 """
 Handles de/selection of topics
 Returns False if selection is altered
 Returns True if CANCEL or COMMIT is chosen
 :param user_input: String, name of pressed button
 """
 # If cancelled, reset to default selection
 if user_input == self.CANCEL:
 self.led_blink(self.CANCEL_LED)
 self.current_selection = self.DEFAULT_SELECTION[:]
 self.led_reset_topics()
 return True
 # If selection is non-empty, commit selection
 # - selection array is always sorted
 # If selection is empty, reset to default selection
 # and send default selection if SEND_DEFAULT_ON_EMPTY is True
 elif user_input == self.CONFIRM:
 self.led_blink(self.CONFIRM_LED)
 if self.current_selection != :
 #self.current_selection.sort()
 database().make_commit(self.current_selection)
 self.current_selection = self.DEFAULT_SELECTION[:]
 else:
 self.current_selection = self.DEFAULT_SELECTION[:]
 if self.SEND_DEFAULT_ON_EMPTY:
 if len(self.DEFAULT_SELECTION) > 0:
 database().make_commit(self.current_selection)
 return True
 # TODO
 # Disable "Post" notification
 elif user_input == self.POST:
 """self.post_notification = False
 self.blink_thread.start()
 self.dismiss_notification(self.POST_LED)
 return False"""
 # TODO
 # Disable "Nagios" notification
 elif user_input == self.NAGIOS:
 """self.nagios_notification = False
 self.dismiss_notification(self.NAGIOS_LED)
 return False"""
 else:
 # Compare input with all TOPICS
 for i in range(len(self.TOPICS)):
 if user_input == self.TOPICS[i]:
 self.led_toggle_topic(i)
 already_exists = False
 # Delete if exists already
 for j in range(len(self.current_selection)):
 if user_input == self.current_selection[j]:
 already_exists = True
 self.current_selection.pop(j)
 print(user_input + " -")
 break
 # Add if does not exits yet
 if not already_exists:
 self.current_selection.append(user_input)
 print(user_input + " +")
 # Sort array
 # Same order as in TOPICS
 self.temp_selection = 
 for l in range(len(self.TOPICS)):
 self.temp_selection.append("")
 for i in range(len(self.TOPICS)):
 for j in range(len(self.current_selection)):
 if self.TOPICS[i] == self.current_selection[j]:
 self.temp_selection[i]=self.TOPICS[i]
 self.temp_selection = list(filter(None, self.temp_selection))
 self.current_selection = self.temp_selection[:]
 return False
 def wait_for_input(self):
 """Wartet auf Input und gibt an handle_selection weiter."""
 done_with_input = False
 while not done_with_input:
 user_input = None
 while user_input is None:
 user_input = Matrix().get_button()
 done_with_input = self.handle_selection(user_input)
 time.sleep(self.SLEEP)
 return
if __name__ == '__main__':
 GPIO.setwarnings(False)
 database().external_prepare_table()
 while True:
 Controller().wait_for_input()
 GPIO.cleanup()
 sys.exit(0)
database_commit.py:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import datetime
import time
from configparser import ConfigParser
from mysql.connector import MySQLConnection, Error
class DatabaseCommit():
 def read_config(section, filename='config.ini'):
 """ Reads config.ini and returns dictionary with config from section
 :param filename: name of config file
 :param section: name of section in config file
 :return: dictionary with database settings
 """
 parser = ConfigParser()
 parser.read(filename)
 config = 
 if parser.has_section(section):
 items = parser.items(section)
 for item in items:
 config[item[0]] = item[1].replace(" ", "")
 else:
 raise Exception('0 not found in 1 '.format(section, filename))
 return config
 def establish_connection(self):
 db_config = DatabaseCommit.read_config('mysql')
 conn = MySQLConnection(**db_config)
 if conn.is_connected():
 return conn
 else:
 print("Connection failed")
 def prepare_table(self, cursor):
 # TODO check for existing tables, add any if neccessary, fill with 0
 database_name = DatabaseCommit.read_config('mysql')['database'].replace(" ", "")
 table_name = DatabaseCommit.read_config('table')['table_name'].replace(" ", "")
 counter_column = DatabaseCommit.read_config('table')['counter_column'].replace(" ", "")
 date_column = DatabaseCommit.read_config('table')['date_column'].replace(" ", "")
 sql = 'CREATE TABLE IF NOT EXISTS '+ table_name +' ('
 sql += counter_column +' INT PRIMARY KEY AUTO_INCREMENT, '
 sql += date_column +' DATETIME'
 for topic in DatabaseCommit.read_config('selection')['topics'].replace(" ", "").split(','):
 topic = topic.lower()
 sql += ', '+ topic +' BOOLEAN NOT NULL DEFAULT 0'
 sql += ');'
 print("n"+sql+"n")
 cursor.execute(sql)
 sql = ''
 return cursor
 def external_prepare_table(self):
 conn = self.establish_connection()
 cursor = conn.cursor()
 self.prepare_table(cursor)
 def insert_user_input(self, user_input, cursor, table_name):
 date_column = DatabaseCommit.read_config('table')['date_column']
 sql = 'INSERT INTO ' + table_name + '(' + date_column
 for topic in user_input:
 sql += ','+ topic
 sql += ')'
 sql += 'VALUES(NOW()'
 for topic in user_input:
 sql += ','+'TRUE'
 sql += ')'
 cursor.execute(sql)
 def make_commit(self, user_input):
 conn = self.establish_connection()
 cursor = conn.cursor()
 #self.prepare_table(cursor)
 table_name = DatabaseCommit.read_config('table')['table_name']
 self.insert_user_input(user_input, cursor, table_name)
 cursor.execute('COMMIT;')
 print("made commit :")
 print(user_input)
 cursor.close()
 conn.close()
config.ini:
Topic names and other info have been changed for privacy purposes
[gpio]
button_rows = 4, 17
button_columns = 18, 23, 24, 25, 12, 16
topics_led = 2, 3, 27, 22, 10, 9
actions_led = 11, 5, 6, 13, 19, 26
[selection]
topics = topic_1, topic_2, topic_3, topic_4, topic_5, topic_6
actions = Cancel, Commit, Post, Rundgang, unassigned_1, unassigned_2
default_selection = topic_2
sleeptime = .3
blinktime = .2
send_default_on_empty = True
[schedule]
post = 11:30
nagios = 09:30
[mysql]
host = 127.0.0.1
database = mydatabase
user = rpi
password = password
[table]
table_name = test
counter_column = number
date_column = date
python beginner mysql raspberry-pi
My project consists of a Raspberry Pi 2B V1.1 connected to a number of push-buttons via GPIO pins. The goal is to create a panel of buttons, allowing the user to make a selection of one or more items from a list, and then either cancel or commit their selection.
I am new to Python, and have only worked a little with Java and JS before.
The code is split into three modules:
- matrix.py: Gets user input from the hardware
 The buttons are connected in a 2D matrix to save available pins. Currently; there are 6 columns and 2 rows; the first row contains the 6 choices, the second row 4 action buttons.
- controller.py: Contains button logic, handles selection
 The module reads the- config.inifor information on how to handle the input from- matrix.py. This includes selecting/deselecting items, switching the LEDs on/off and forwarding the selection to- database_commit.py.
- database_commit.py: Puts given input into a database
 Received information is commited to a database according to- config.ini.
as well as a config.ini file. In the following, the selectable items are called topics. "cancel", "commit", and the notification buttons are actions.
As I developed this while learning Python, the project is not well structured. What conceptual changes do I need to make in order to provide good readability, maintainability and extendability? What else should I improve?
The finished project will be in place for a long time, and I will not be able to maintain it; someone else will have to. I will supply a readme for the hardware maintenance, and have added info about the code there too. In my opinion, the code should be easily understandable without a manual though.
Note: The comments were translated to english. If anything is unclear, I'll gladly clarify.
matrix.py
# -*- coding: UTF-8 -*-
"""Reads input from GPIO with a reduced number of pins."""
import time
import RPi.GPIO as GPIO
from configparser import ConfigParser
class Matrix():
 def __init__(self):
 """Sets up constants and pin numbers."""
 # BCM (Broadcom) numbers are used
 GPIO.setmode(GPIO.BCM)
 parser = ConfigParser()
 filename = 'config.ini'
 parser.read(filename)
 gpio = parser['gpio']
 selection = parser['selection']
 self.rows = list(map(int, gpio['button_rows'].split(',')))
 self.columns = list(map(int, gpio['button_columns'].split(',')))
 topics = selection['topics'].replace(" ","").split(',')
 actions = selection['actions'].replace(" ","").split(',')
 while len(actions) <= len(topics):
 actions.append('unassigned')
 self.options_array = [
 topics,
 actions
 ]
 def get_button(self):
 """Findet gedrueckte Knoepfe und returnt Koordinaten falls gefunden."""
 # To search for the row of a pressed button:
 # - set all rows to input
 # - set all columns to output low
 # - save index in row_position
 row_position = -1
 for i in range(len(self.columns)):
 GPIO.setup(self.columns[i], GPIO.OUT)
 GPIO.output(self.columns[i], GPIO.LOW)
 for i in range(len(self.rows)):
 GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
 for i in range(len(self.rows)):
 temp_read = GPIO.input(self.rows[i])
 if temp_read == 0:
 row_position = i
 # cancel if no input was found
 if row_position < 0 or row_position >= len(self.rows):
 self.refresh()
 return
 # To search for the column of a pressed button in the row:
 # - set all columns to input
 # - rows[row_position] to output
 column_position = -1
 for j in range(len(self.columns)):
 GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
 GPIO.setup(self.rows[row_position], GPIO.OUT)
 GPIO.output(self.rows[row_position], GPIO.HIGH)
 for j in range(len(self.columns)):
 temp_read = GPIO.input(self.columns[j])
 if temp_read == 1:
 column_position = j
 # cancel if no input was found
 if column_position < 0 or column_position >= len(self.columns):
 self.refresh()
 return
 # returns coordinates of pressed button
 self.refresh()
 return self.options_array[row_position][column_position]
 def refresh(self):
 """Resets all pin states."""
 for i in range(len(self.rows)):
 GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
 for j in range(len(self.columns)):
 GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_UP)
if __name__ == '__main__':
 # For testing matrix functionality
 while True:
 user_input = None
 while user_input is None:
 user_input = Matrix().get_button()
 print(user_input)
 time.sleep(.2)
controller.py:
The TODOs are there to hint the concept and will be implemented later
#!/usr/bin/python3
"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
import time
import signal
import sys
import threading
import RPi.GPIO as GPIO
from threading import Timer
from configparser import ConfigParser
from matrix import Matrix
from database_commit import DatabaseCommit as database
class Controller():
 """Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
 def __init__(self):
 """Sets up constants."""
 # Use Broadcom-Numbering
 GPIO.setmode(GPIO.BCM)
 # Get constants from config.ini
 parser = ConfigParser()
 filename = 'config.ini'
 parser.read(filename)
 # Pin adresses
 gpio = parser['gpio']
 self.TOPICS_LED = list(map(int, gpio['topics_led'].split(',')))
 self.ACTIONS_LED = list(map(int, gpio['actions_led'].split(',')))
 self.CANCEL_LED = self.ACTIONS_LED[0]
 self.CONFIRM_LED = self.ACTIONS_LED[1]
 self.POST_LED = self.ACTIONS_LED[2]
 self.NAGIOS_LED = self.ACTIONS_LED[3]
 # names, defaults, timers
 selection = parser['selection']
 self.TOPICS = selection['topics'].replace(" ", "").split(',')
 self.ACTIONS = selection['actions'].replace(" ", "").split(',')
 self.CANCEL = self.ACTIONS[0]
 self.CONFIRM = self.ACTIONS[1]
 self.POST = self.ACTIONS[2]
 self.NAGIOS = self.ACTIONS[3]
 self.DEFAULT_SELECTION = selection['default_selection'].replace(" ", "").split(',')
 self.DEFAULT_LED_STATE = [False] * len(self.TOPICS_LED)
 self.SLEEP = float(selection['sleeptime'])
 self.BLINK = float(selection['blinktime'])
 self.SEND_DEFAULT_ON_EMPTY = bool(selection['send_default_on_empty'])
 # reminder schedule
 schedule = parser['schedule']
 self.POST_TIME = time.strptime(schedule['post'].replace(":", " "), '%H %M')
 self.NAGIOS_TIME = time.strptime(schedule['nagios'].replace(":", " "), '%H %M')
 self.current_selection = self.DEFAULT_SELECTION[:]
 self.led_state = self.DEFAULT_LED_STATE[:]
 # Setup the LED pins
 for i in range(len(self.TOPICS_LED)):
 GPIO.setup(self.TOPICS_LED[i], GPIO.OUT)
 GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
 for i in range(len(self.ACTIONS_LED)):
 GPIO.setup(self.ACTIONS_LED[i], GPIO.OUT)
 GPIO.output(self.ACTIONS_LED[i], GPIO.LOW)
 # Switch on default selection LEDs
 for i in range(len(self.DEFAULT_SELECTION)):
 for j in range(len(self.TOPICS)):
 if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
 GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
 self.led_state[j] = True
 # Notification status 
 self.post_notification = False
 self.nagios_notification = False
 def led_reset_topics(self):
 """Disable all LEDs and enable default selection LEDs"""
 # All off
 for i in range(len(self.TOPICS_LED)):
 GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
 # Default an
 for i in range(len(self.DEFAULT_SELECTION)):
 for j in range(len(self.TOPICS)):
 if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
 GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
 self.led_state[j] = True
 return
 def continous_led_blink(blink_event, blink_thread, led):
 """Blink action LED continuously"""
 while not blink_event.isSet():
 GPIO.output(led, GPIO.HIGH)
 time.sleep(self.BLINK)
 event_is_set = blink_event.wait(blink_thread)
 if event_is_set:
 GPIO.output(led, GPIO.LOW)
 else:
 GPIO.output(led, GPIO.LOW)
 time.sleep(self.BLINK)
 def led_blink(self, led_pin):
 """Blink action LED
 """
 t = Timer(self.BLINK, GPIO.output, [led_pin, GPIO.LOW])
 GPIO.output(led_pin, GPIO.HIGH)
 #time.sleep(.2)
 #GPIO.output(led_pin, GPIO.LOW)
 t.start()
 return
 def dismiss_notification(self, led_pin):
 """Disables notification LED
 :param led_pin: GPIO pin number
 """
 GPIO.output(led_pin, GPIO.LOW)
 def led_toggle_topic(self, led_index):
 """Toggles LED state of led_index
 :param led_index: Index in TOPICS_LED; contains GPIO pin number"""
 self.led_state[led_index] = not self.led_state[led_index]
 if self.led_state[led_index]:
 GPIO.output(self.TOPICS_LED[led_index], GPIO.HIGH)
 else:
 GPIO.output(self.TOPICS_LED[led_index], GPIO.LOW)
 return
 def handle_selection(self, user_input):
 """
 Handles de/selection of topics
 Returns False if selection is altered
 Returns True if CANCEL or COMMIT is chosen
 :param user_input: String, name of pressed button
 """
 # If cancelled, reset to default selection
 if user_input == self.CANCEL:
 self.led_blink(self.CANCEL_LED)
 self.current_selection = self.DEFAULT_SELECTION[:]
 self.led_reset_topics()
 return True
 # If selection is non-empty, commit selection
 # - selection array is always sorted
 # If selection is empty, reset to default selection
 # and send default selection if SEND_DEFAULT_ON_EMPTY is True
 elif user_input == self.CONFIRM:
 self.led_blink(self.CONFIRM_LED)
 if self.current_selection != :
 #self.current_selection.sort()
 database().make_commit(self.current_selection)
 self.current_selection = self.DEFAULT_SELECTION[:]
 else:
 self.current_selection = self.DEFAULT_SELECTION[:]
 if self.SEND_DEFAULT_ON_EMPTY:
 if len(self.DEFAULT_SELECTION) > 0:
 database().make_commit(self.current_selection)
 return True
 # TODO
 # Disable "Post" notification
 elif user_input == self.POST:
 """self.post_notification = False
 self.blink_thread.start()
 self.dismiss_notification(self.POST_LED)
 return False"""
 # TODO
 # Disable "Nagios" notification
 elif user_input == self.NAGIOS:
 """self.nagios_notification = False
 self.dismiss_notification(self.NAGIOS_LED)
 return False"""
 else:
 # Compare input with all TOPICS
 for i in range(len(self.TOPICS)):
 if user_input == self.TOPICS[i]:
 self.led_toggle_topic(i)
 already_exists = False
 # Delete if exists already
 for j in range(len(self.current_selection)):
 if user_input == self.current_selection[j]:
 already_exists = True
 self.current_selection.pop(j)
 print(user_input + " -")
 break
 # Add if does not exits yet
 if not already_exists:
 self.current_selection.append(user_input)
 print(user_input + " +")
 # Sort array
 # Same order as in TOPICS
 self.temp_selection = 
 for l in range(len(self.TOPICS)):
 self.temp_selection.append("")
 for i in range(len(self.TOPICS)):
 for j in range(len(self.current_selection)):
 if self.TOPICS[i] == self.current_selection[j]:
 self.temp_selection[i]=self.TOPICS[i]
 self.temp_selection = list(filter(None, self.temp_selection))
 self.current_selection = self.temp_selection[:]
 return False
 def wait_for_input(self):
 """Wartet auf Input und gibt an handle_selection weiter."""
 done_with_input = False
 while not done_with_input:
 user_input = None
 while user_input is None:
 user_input = Matrix().get_button()
 done_with_input = self.handle_selection(user_input)
 time.sleep(self.SLEEP)
 return
if __name__ == '__main__':
 GPIO.setwarnings(False)
 database().external_prepare_table()
 while True:
 Controller().wait_for_input()
 GPIO.cleanup()
 sys.exit(0)
database_commit.py:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import datetime
import time
from configparser import ConfigParser
from mysql.connector import MySQLConnection, Error
class DatabaseCommit():
 def read_config(section, filename='config.ini'):
 """ Reads config.ini and returns dictionary with config from section
 :param filename: name of config file
 :param section: name of section in config file
 :return: dictionary with database settings
 """
 parser = ConfigParser()
 parser.read(filename)
 config = 
 if parser.has_section(section):
 items = parser.items(section)
 for item in items:
 config[item[0]] = item[1].replace(" ", "")
 else:
 raise Exception('0 not found in 1 '.format(section, filename))
 return config
 def establish_connection(self):
 db_config = DatabaseCommit.read_config('mysql')
 conn = MySQLConnection(**db_config)
 if conn.is_connected():
 return conn
 else:
 print("Connection failed")
 def prepare_table(self, cursor):
 # TODO check for existing tables, add any if neccessary, fill with 0
 database_name = DatabaseCommit.read_config('mysql')['database'].replace(" ", "")
 table_name = DatabaseCommit.read_config('table')['table_name'].replace(" ", "")
 counter_column = DatabaseCommit.read_config('table')['counter_column'].replace(" ", "")
 date_column = DatabaseCommit.read_config('table')['date_column'].replace(" ", "")
 sql = 'CREATE TABLE IF NOT EXISTS '+ table_name +' ('
 sql += counter_column +' INT PRIMARY KEY AUTO_INCREMENT, '
 sql += date_column +' DATETIME'
 for topic in DatabaseCommit.read_config('selection')['topics'].replace(" ", "").split(','):
 topic = topic.lower()
 sql += ', '+ topic +' BOOLEAN NOT NULL DEFAULT 0'
 sql += ');'
 print("n"+sql+"n")
 cursor.execute(sql)
 sql = ''
 return cursor
 def external_prepare_table(self):
 conn = self.establish_connection()
 cursor = conn.cursor()
 self.prepare_table(cursor)
 def insert_user_input(self, user_input, cursor, table_name):
 date_column = DatabaseCommit.read_config('table')['date_column']
 sql = 'INSERT INTO ' + table_name + '(' + date_column
 for topic in user_input:
 sql += ','+ topic
 sql += ')'
 sql += 'VALUES(NOW()'
 for topic in user_input:
 sql += ','+'TRUE'
 sql += ')'
 cursor.execute(sql)
 def make_commit(self, user_input):
 conn = self.establish_connection()
 cursor = conn.cursor()
 #self.prepare_table(cursor)
 table_name = DatabaseCommit.read_config('table')['table_name']
 self.insert_user_input(user_input, cursor, table_name)
 cursor.execute('COMMIT;')
 print("made commit :")
 print(user_input)
 cursor.close()
 conn.close()
config.ini:
Topic names and other info have been changed for privacy purposes
[gpio]
button_rows = 4, 17
button_columns = 18, 23, 24, 25, 12, 16
topics_led = 2, 3, 27, 22, 10, 9
actions_led = 11, 5, 6, 13, 19, 26
[selection]
topics = topic_1, topic_2, topic_3, topic_4, topic_5, topic_6
actions = Cancel, Commit, Post, Rundgang, unassigned_1, unassigned_2
default_selection = topic_2
sleeptime = .3
blinktime = .2
send_default_on_empty = True
[schedule]
post = 11:30
nagios = 09:30
[mysql]
host = 127.0.0.1
database = mydatabase
user = rpi
password = password
[table]
table_name = test
counter_column = number
date_column = date
python beginner mysql raspberry-pi
asked Mar 2 at 14:09
Orphevs
1363
1363
 
 
 
 
 
 
 Is there a reason you need this many buttons?
 â Mast
 Mar 2 at 15:19
 
 
 
 
 
 
 
 
 
 @Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
 â Orphevs
 Mar 2 at 17:37
 
 
 
 
 
 
 
 
 
 Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
 â A. Romeu
 Mar 3 at 0:31
 
 
 
add a comment |Â
 
 
 
 
 
 
 Is there a reason you need this many buttons?
 â Mast
 Mar 2 at 15:19
 
 
 
 
 
 
 
 
 
 @Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
 â Orphevs
 Mar 2 at 17:37
 
 
 
 
 
 
 
 
 
 Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
 â A. Romeu
 Mar 3 at 0:31
 
 
 
Is there a reason you need this many buttons?
â Mast
Mar 2 at 15:19
Is there a reason you need this many buttons?
â Mast
Mar 2 at 15:19
@Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
â Orphevs
Mar 2 at 17:37
@Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
â Orphevs
Mar 2 at 17:37
Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
â A. Romeu
Mar 3 at 0:31
Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
â A. Romeu
Mar 3 at 0:31
add a comment |Â
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f188680%2fpython-rpi-button-board%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Is there a reason you need this many buttons?
â Mast
Mar 2 at 15:19
@Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
â Orphevs
Mar 2 at 17:37
Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
â A. Romeu
Mar 3 at 0:31