Python RPI button board

The name of the pictureThe name of the pictureThe name of the pictureClash Royale CLAN TAG#URR8PPP





.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;







up vote
7
down vote

favorite












My project consists of a Raspberry Pi 2B V1.1 connected to a number of push-buttons via GPIO pins. The goal is to create a panel of buttons, allowing the user to make a selection of one or more items from a list, and then either cancel or commit their selection.
I am new to Python, and have only worked a little with Java and JS before.



The code is split into three modules:




  • matrix.py: Gets user input from the hardware

    The buttons are connected in a 2D matrix to save available pins. Currently; there are 6 columns and 2 rows; the first row contains the 6 choices, the second row 4 action buttons.


  • controller.py: Contains button logic, handles selection

    The module reads the config.ini for information on how to handle the input from matrix.py. This includes selecting/deselecting items, switching the LEDs on/off and forwarding the selection to database_commit.py.


  • database_commit.py: Puts given input into a database

    Received information is commited to a database according to config.ini.

as well as a config.ini file. In the following, the selectable items are called topics. "cancel", "commit", and the notification buttons are actions.
As I developed this while learning Python, the project is not well structured. What conceptual changes do I need to make in order to provide good readability, maintainability and extendability? What else should I improve?

The finished project will be in place for a long time, and I will not be able to maintain it; someone else will have to. I will supply a readme for the hardware maintenance, and have added info about the code there too. In my opinion, the code should be easily understandable without a manual though.



Note: The comments were translated to english. If anything is unclear, I'll gladly clarify.




matrix.py



# -*- coding: UTF-8 -*-

"""Reads input from GPIO with a reduced number of pins."""

import time
import RPi.GPIO as GPIO
from configparser import ConfigParser

class Matrix():

def __init__(self):
"""Sets up constants and pin numbers."""
# BCM (Broadcom) numbers are used
GPIO.setmode(GPIO.BCM)
parser = ConfigParser()
filename = 'config.ini'
parser.read(filename)
gpio = parser['gpio']
selection = parser['selection']
self.rows = list(map(int, gpio['button_rows'].split(',')))
self.columns = list(map(int, gpio['button_columns'].split(',')))
topics = selection['topics'].replace(" ","").split(',')
actions = selection['actions'].replace(" ","").split(',')
while len(actions) <= len(topics):
actions.append('unassigned')
self.options_array = [
topics,
actions
]

def get_button(self):
"""Findet gedrueckte Knoepfe und returnt Koordinaten falls gefunden."""

# To search for the row of a pressed button:
# - set all rows to input
# - set all columns to output low
# - save index in row_position

row_position = -1

for i in range(len(self.columns)):
GPIO.setup(self.columns[i], GPIO.OUT)
GPIO.output(self.columns[i], GPIO.LOW)

for i in range(len(self.rows)):
GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)

for i in range(len(self.rows)):
temp_read = GPIO.input(self.rows[i])
if temp_read == 0:
row_position = i

# cancel if no input was found
if row_position < 0 or row_position >= len(self.rows):
self.refresh()
return

# To search for the column of a pressed button in the row:
# - set all columns to input
# - rows[row_position] to output

column_position = -1

for j in range(len(self.columns)):
GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)

GPIO.setup(self.rows[row_position], GPIO.OUT)
GPIO.output(self.rows[row_position], GPIO.HIGH)

for j in range(len(self.columns)):
temp_read = GPIO.input(self.columns[j])
if temp_read == 1:
column_position = j

# cancel if no input was found
if column_position < 0 or column_position >= len(self.columns):
self.refresh()
return

# returns coordinates of pressed button
self.refresh()
return self.options_array[row_position][column_position]

def refresh(self):
"""Resets all pin states."""
for i in range(len(self.rows)):
GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
for j in range(len(self.columns)):
GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_UP)

if __name__ == '__main__':
# For testing matrix functionality
while True:
user_input = None
while user_input is None:
user_input = Matrix().get_button()
print(user_input)
time.sleep(.2)


controller.py:
The TODOs are there to hint the concept and will be implemented later



#!/usr/bin/python3

"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""

import time
import signal
import sys
import threading
import RPi.GPIO as GPIO
from threading import Timer
from configparser import ConfigParser
from matrix import Matrix
from database_commit import DatabaseCommit as database

class Controller():
"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
def __init__(self):
"""Sets up constants."""
# Use Broadcom-Numbering
GPIO.setmode(GPIO.BCM)

# Get constants from config.ini
parser = ConfigParser()
filename = 'config.ini'
parser.read(filename)

# Pin adresses
gpio = parser['gpio']

self.TOPICS_LED = list(map(int, gpio['topics_led'].split(',')))
self.ACTIONS_LED = list(map(int, gpio['actions_led'].split(',')))
self.CANCEL_LED = self.ACTIONS_LED[0]
self.CONFIRM_LED = self.ACTIONS_LED[1]
self.POST_LED = self.ACTIONS_LED[2]
self.NAGIOS_LED = self.ACTIONS_LED[3]

# names, defaults, timers
selection = parser['selection']

self.TOPICS = selection['topics'].replace(" ", "").split(',')
self.ACTIONS = selection['actions'].replace(" ", "").split(',')
self.CANCEL = self.ACTIONS[0]
self.CONFIRM = self.ACTIONS[1]
self.POST = self.ACTIONS[2]
self.NAGIOS = self.ACTIONS[3]
self.DEFAULT_SELECTION = selection['default_selection'].replace(" ", "").split(',')
self.DEFAULT_LED_STATE = [False] * len(self.TOPICS_LED)
self.SLEEP = float(selection['sleeptime'])
self.BLINK = float(selection['blinktime'])
self.SEND_DEFAULT_ON_EMPTY = bool(selection['send_default_on_empty'])

# reminder schedule
schedule = parser['schedule']

self.POST_TIME = time.strptime(schedule['post'].replace(":", " "), '%H %M')
self.NAGIOS_TIME = time.strptime(schedule['nagios'].replace(":", " "), '%H %M')

self.current_selection = self.DEFAULT_SELECTION[:]
self.led_state = self.DEFAULT_LED_STATE[:]

# Setup the LED pins
for i in range(len(self.TOPICS_LED)):
GPIO.setup(self.TOPICS_LED[i], GPIO.OUT)
GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
for i in range(len(self.ACTIONS_LED)):
GPIO.setup(self.ACTIONS_LED[i], GPIO.OUT)
GPIO.output(self.ACTIONS_LED[i], GPIO.LOW)

# Switch on default selection LEDs
for i in range(len(self.DEFAULT_SELECTION)):
for j in range(len(self.TOPICS)):
if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
self.led_state[j] = True

# Notification status
self.post_notification = False
self.nagios_notification = False



def led_reset_topics(self):
"""Disable all LEDs and enable default selection LEDs"""
# All off
for i in range(len(self.TOPICS_LED)):
GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
# Default an
for i in range(len(self.DEFAULT_SELECTION)):
for j in range(len(self.TOPICS)):
if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
self.led_state[j] = True
return

def continous_led_blink(blink_event, blink_thread, led):
"""Blink action LED continuously"""
while not blink_event.isSet():
GPIO.output(led, GPIO.HIGH)
time.sleep(self.BLINK)
event_is_set = blink_event.wait(blink_thread)
if event_is_set:
GPIO.output(led, GPIO.LOW)
else:
GPIO.output(led, GPIO.LOW)
time.sleep(self.BLINK)

def led_blink(self, led_pin):
"""Blink action LED
"""
t = Timer(self.BLINK, GPIO.output, [led_pin, GPIO.LOW])
GPIO.output(led_pin, GPIO.HIGH)
#time.sleep(.2)
#GPIO.output(led_pin, GPIO.LOW)
t.start()
return

def dismiss_notification(self, led_pin):
"""Disables notification LED
:param led_pin: GPIO pin number
"""
GPIO.output(led_pin, GPIO.LOW)


def led_toggle_topic(self, led_index):
"""Toggles LED state of led_index
:param led_index: Index in TOPICS_LED; contains GPIO pin number"""
self.led_state[led_index] = not self.led_state[led_index]
if self.led_state[led_index]:
GPIO.output(self.TOPICS_LED[led_index], GPIO.HIGH)
else:
GPIO.output(self.TOPICS_LED[led_index], GPIO.LOW)
return

def handle_selection(self, user_input):
"""
Handles de/selection of topics
Returns False if selection is altered
Returns True if CANCEL or COMMIT is chosen
:param user_input: String, name of pressed button
"""

# If cancelled, reset to default selection
if user_input == self.CANCEL:
self.led_blink(self.CANCEL_LED)
self.current_selection = self.DEFAULT_SELECTION[:]
self.led_reset_topics()
return True

# If selection is non-empty, commit selection
# - selection array is always sorted
# If selection is empty, reset to default selection
# and send default selection if SEND_DEFAULT_ON_EMPTY is True
elif user_input == self.CONFIRM:
self.led_blink(self.CONFIRM_LED)
if self.current_selection != :
#self.current_selection.sort()
database().make_commit(self.current_selection)
self.current_selection = self.DEFAULT_SELECTION[:]
else:
self.current_selection = self.DEFAULT_SELECTION[:]
if self.SEND_DEFAULT_ON_EMPTY:
if len(self.DEFAULT_SELECTION) > 0:
database().make_commit(self.current_selection)
return True

# TODO
# Disable "Post" notification
elif user_input == self.POST:
"""self.post_notification = False
self.blink_thread.start()
self.dismiss_notification(self.POST_LED)
return False"""

# TODO
# Disable "Nagios" notification
elif user_input == self.NAGIOS:
"""self.nagios_notification = False
self.dismiss_notification(self.NAGIOS_LED)
return False"""

else:
# Compare input with all TOPICS
for i in range(len(self.TOPICS)):
if user_input == self.TOPICS[i]:
self.led_toggle_topic(i)
already_exists = False
# Delete if exists already
for j in range(len(self.current_selection)):
if user_input == self.current_selection[j]:
already_exists = True
self.current_selection.pop(j)
print(user_input + " -")
break
# Add if does not exits yet
if not already_exists:
self.current_selection.append(user_input)
print(user_input + " +")
# Sort array
# Same order as in TOPICS
self.temp_selection =
for l in range(len(self.TOPICS)):
self.temp_selection.append("")
for i in range(len(self.TOPICS)):
for j in range(len(self.current_selection)):
if self.TOPICS[i] == self.current_selection[j]:
self.temp_selection[i]=self.TOPICS[i]
self.temp_selection = list(filter(None, self.temp_selection))
self.current_selection = self.temp_selection[:]
return False

def wait_for_input(self):
"""Wartet auf Input und gibt an handle_selection weiter."""

done_with_input = False
while not done_with_input:
user_input = None
while user_input is None:
user_input = Matrix().get_button()
done_with_input = self.handle_selection(user_input)
time.sleep(self.SLEEP)
return

if __name__ == '__main__':
GPIO.setwarnings(False)
database().external_prepare_table()
while True:
Controller().wait_for_input()
GPIO.cleanup()
sys.exit(0)


database_commit.py:



#!/usr/bin/python
# -*- coding: UTF-8 -*-

import datetime
import time
from configparser import ConfigParser
from mysql.connector import MySQLConnection, Error

class DatabaseCommit():

def read_config(section, filename='config.ini'):
""" Reads config.ini and returns dictionary with config from section
:param filename: name of config file
:param section: name of section in config file
:return: dictionary with database settings
"""

parser = ConfigParser()
parser.read(filename)
config =
if parser.has_section(section):
items = parser.items(section)
for item in items:
config[item[0]] = item[1].replace(" ", "")
else:
raise Exception('0 not found in 1 '.format(section, filename))

return config

def establish_connection(self):
db_config = DatabaseCommit.read_config('mysql')
conn = MySQLConnection(**db_config)
if conn.is_connected():
return conn
else:
print("Connection failed")

def prepare_table(self, cursor):
# TODO check for existing tables, add any if neccessary, fill with 0
database_name = DatabaseCommit.read_config('mysql')['database'].replace(" ", "")
table_name = DatabaseCommit.read_config('table')['table_name'].replace(" ", "")
counter_column = DatabaseCommit.read_config('table')['counter_column'].replace(" ", "")
date_column = DatabaseCommit.read_config('table')['date_column'].replace(" ", "")
sql = 'CREATE TABLE IF NOT EXISTS '+ table_name +' ('
sql += counter_column +' INT PRIMARY KEY AUTO_INCREMENT, '
sql += date_column +' DATETIME'
for topic in DatabaseCommit.read_config('selection')['topics'].replace(" ", "").split(','):
topic = topic.lower()
sql += ', '+ topic +' BOOLEAN NOT NULL DEFAULT 0'
sql += ');'
print("n"+sql+"n")
cursor.execute(sql)
sql = ''
return cursor

def external_prepare_table(self):
conn = self.establish_connection()
cursor = conn.cursor()
self.prepare_table(cursor)

def insert_user_input(self, user_input, cursor, table_name):
date_column = DatabaseCommit.read_config('table')['date_column']
sql = 'INSERT INTO ' + table_name + '(' + date_column
for topic in user_input:
sql += ','+ topic
sql += ')'
sql += 'VALUES(NOW()'
for topic in user_input:
sql += ','+'TRUE'
sql += ')'
cursor.execute(sql)

def make_commit(self, user_input):
conn = self.establish_connection()
cursor = conn.cursor()
#self.prepare_table(cursor)
table_name = DatabaseCommit.read_config('table')['table_name']
self.insert_user_input(user_input, cursor, table_name)
cursor.execute('COMMIT;')
print("made commit :")
print(user_input)
cursor.close()
conn.close()


config.ini:
Topic names and other info have been changed for privacy purposes



[gpio]
button_rows = 4, 17
button_columns = 18, 23, 24, 25, 12, 16
topics_led = 2, 3, 27, 22, 10, 9
actions_led = 11, 5, 6, 13, 19, 26

[selection]
topics = topic_1, topic_2, topic_3, topic_4, topic_5, topic_6
actions = Cancel, Commit, Post, Rundgang, unassigned_1, unassigned_2
default_selection = topic_2
sleeptime = .3
blinktime = .2
send_default_on_empty = True

[schedule]
post = 11:30
nagios = 09:30

[mysql]
host = 127.0.0.1
database = mydatabase
user = rpi
password = password

[table]
table_name = test
counter_column = number
date_column = date






share|improve this question



















  • Is there a reason you need this many buttons?
    – Mast
    Mar 2 at 15:19










  • @Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
    – Orphevs
    Mar 2 at 17:37










  • Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
    – A. Romeu
    Mar 3 at 0:31
















up vote
7
down vote

favorite












My project consists of a Raspberry Pi 2B V1.1 connected to a number of push-buttons via GPIO pins. The goal is to create a panel of buttons, allowing the user to make a selection of one or more items from a list, and then either cancel or commit their selection.
I am new to Python, and have only worked a little with Java and JS before.



The code is split into three modules:




  • matrix.py: Gets user input from the hardware

    The buttons are connected in a 2D matrix to save available pins. Currently; there are 6 columns and 2 rows; the first row contains the 6 choices, the second row 4 action buttons.


  • controller.py: Contains button logic, handles selection

    The module reads the config.ini for information on how to handle the input from matrix.py. This includes selecting/deselecting items, switching the LEDs on/off and forwarding the selection to database_commit.py.


  • database_commit.py: Puts given input into a database

    Received information is commited to a database according to config.ini.

as well as a config.ini file. In the following, the selectable items are called topics. "cancel", "commit", and the notification buttons are actions.
As I developed this while learning Python, the project is not well structured. What conceptual changes do I need to make in order to provide good readability, maintainability and extendability? What else should I improve?

The finished project will be in place for a long time, and I will not be able to maintain it; someone else will have to. I will supply a readme for the hardware maintenance, and have added info about the code there too. In my opinion, the code should be easily understandable without a manual though.



Note: The comments were translated to english. If anything is unclear, I'll gladly clarify.




matrix.py



# -*- coding: UTF-8 -*-

"""Reads input from GPIO with a reduced number of pins."""

import time
import RPi.GPIO as GPIO
from configparser import ConfigParser

class Matrix():

def __init__(self):
"""Sets up constants and pin numbers."""
# BCM (Broadcom) numbers are used
GPIO.setmode(GPIO.BCM)
parser = ConfigParser()
filename = 'config.ini'
parser.read(filename)
gpio = parser['gpio']
selection = parser['selection']
self.rows = list(map(int, gpio['button_rows'].split(',')))
self.columns = list(map(int, gpio['button_columns'].split(',')))
topics = selection['topics'].replace(" ","").split(',')
actions = selection['actions'].replace(" ","").split(',')
while len(actions) <= len(topics):
actions.append('unassigned')
self.options_array = [
topics,
actions
]

def get_button(self):
"""Findet gedrueckte Knoepfe und returnt Koordinaten falls gefunden."""

# To search for the row of a pressed button:
# - set all rows to input
# - set all columns to output low
# - save index in row_position

row_position = -1

for i in range(len(self.columns)):
GPIO.setup(self.columns[i], GPIO.OUT)
GPIO.output(self.columns[i], GPIO.LOW)

for i in range(len(self.rows)):
GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)

for i in range(len(self.rows)):
temp_read = GPIO.input(self.rows[i])
if temp_read == 0:
row_position = i

# cancel if no input was found
if row_position < 0 or row_position >= len(self.rows):
self.refresh()
return

# To search for the column of a pressed button in the row:
# - set all columns to input
# - rows[row_position] to output

column_position = -1

for j in range(len(self.columns)):
GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)

GPIO.setup(self.rows[row_position], GPIO.OUT)
GPIO.output(self.rows[row_position], GPIO.HIGH)

for j in range(len(self.columns)):
temp_read = GPIO.input(self.columns[j])
if temp_read == 1:
column_position = j

# cancel if no input was found
if column_position < 0 or column_position >= len(self.columns):
self.refresh()
return

# returns coordinates of pressed button
self.refresh()
return self.options_array[row_position][column_position]

def refresh(self):
"""Resets all pin states."""
for i in range(len(self.rows)):
GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
for j in range(len(self.columns)):
GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_UP)

if __name__ == '__main__':
# For testing matrix functionality
while True:
user_input = None
while user_input is None:
user_input = Matrix().get_button()
print(user_input)
time.sleep(.2)


controller.py:
The TODOs are there to hint the concept and will be implemented later



#!/usr/bin/python3

"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""

import time
import signal
import sys
import threading
import RPi.GPIO as GPIO
from threading import Timer
from configparser import ConfigParser
from matrix import Matrix
from database_commit import DatabaseCommit as database

class Controller():
"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
def __init__(self):
"""Sets up constants."""
# Use Broadcom-Numbering
GPIO.setmode(GPIO.BCM)

# Get constants from config.ini
parser = ConfigParser()
filename = 'config.ini'
parser.read(filename)

# Pin adresses
gpio = parser['gpio']

self.TOPICS_LED = list(map(int, gpio['topics_led'].split(',')))
self.ACTIONS_LED = list(map(int, gpio['actions_led'].split(',')))
self.CANCEL_LED = self.ACTIONS_LED[0]
self.CONFIRM_LED = self.ACTIONS_LED[1]
self.POST_LED = self.ACTIONS_LED[2]
self.NAGIOS_LED = self.ACTIONS_LED[3]

# names, defaults, timers
selection = parser['selection']

self.TOPICS = selection['topics'].replace(" ", "").split(',')
self.ACTIONS = selection['actions'].replace(" ", "").split(',')
self.CANCEL = self.ACTIONS[0]
self.CONFIRM = self.ACTIONS[1]
self.POST = self.ACTIONS[2]
self.NAGIOS = self.ACTIONS[3]
self.DEFAULT_SELECTION = selection['default_selection'].replace(" ", "").split(',')
self.DEFAULT_LED_STATE = [False] * len(self.TOPICS_LED)
self.SLEEP = float(selection['sleeptime'])
self.BLINK = float(selection['blinktime'])
self.SEND_DEFAULT_ON_EMPTY = bool(selection['send_default_on_empty'])

# reminder schedule
schedule = parser['schedule']

self.POST_TIME = time.strptime(schedule['post'].replace(":", " "), '%H %M')
self.NAGIOS_TIME = time.strptime(schedule['nagios'].replace(":", " "), '%H %M')

self.current_selection = self.DEFAULT_SELECTION[:]
self.led_state = self.DEFAULT_LED_STATE[:]

# Setup the LED pins
for i in range(len(self.TOPICS_LED)):
GPIO.setup(self.TOPICS_LED[i], GPIO.OUT)
GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
for i in range(len(self.ACTIONS_LED)):
GPIO.setup(self.ACTIONS_LED[i], GPIO.OUT)
GPIO.output(self.ACTIONS_LED[i], GPIO.LOW)

# Switch on default selection LEDs
for i in range(len(self.DEFAULT_SELECTION)):
for j in range(len(self.TOPICS)):
if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
self.led_state[j] = True

# Notification status
self.post_notification = False
self.nagios_notification = False



def led_reset_topics(self):
"""Disable all LEDs and enable default selection LEDs"""
# All off
for i in range(len(self.TOPICS_LED)):
GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
# Default an
for i in range(len(self.DEFAULT_SELECTION)):
for j in range(len(self.TOPICS)):
if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
self.led_state[j] = True
return

def continous_led_blink(blink_event, blink_thread, led):
"""Blink action LED continuously"""
while not blink_event.isSet():
GPIO.output(led, GPIO.HIGH)
time.sleep(self.BLINK)
event_is_set = blink_event.wait(blink_thread)
if event_is_set:
GPIO.output(led, GPIO.LOW)
else:
GPIO.output(led, GPIO.LOW)
time.sleep(self.BLINK)

def led_blink(self, led_pin):
"""Blink action LED
"""
t = Timer(self.BLINK, GPIO.output, [led_pin, GPIO.LOW])
GPIO.output(led_pin, GPIO.HIGH)
#time.sleep(.2)
#GPIO.output(led_pin, GPIO.LOW)
t.start()
return

def dismiss_notification(self, led_pin):
"""Disables notification LED
:param led_pin: GPIO pin number
"""
GPIO.output(led_pin, GPIO.LOW)


def led_toggle_topic(self, led_index):
"""Toggles LED state of led_index
:param led_index: Index in TOPICS_LED; contains GPIO pin number"""
self.led_state[led_index] = not self.led_state[led_index]
if self.led_state[led_index]:
GPIO.output(self.TOPICS_LED[led_index], GPIO.HIGH)
else:
GPIO.output(self.TOPICS_LED[led_index], GPIO.LOW)
return

def handle_selection(self, user_input):
"""
Handles de/selection of topics
Returns False if selection is altered
Returns True if CANCEL or COMMIT is chosen
:param user_input: String, name of pressed button
"""

# If cancelled, reset to default selection
if user_input == self.CANCEL:
self.led_blink(self.CANCEL_LED)
self.current_selection = self.DEFAULT_SELECTION[:]
self.led_reset_topics()
return True

# If selection is non-empty, commit selection
# - selection array is always sorted
# If selection is empty, reset to default selection
# and send default selection if SEND_DEFAULT_ON_EMPTY is True
elif user_input == self.CONFIRM:
self.led_blink(self.CONFIRM_LED)
if self.current_selection != :
#self.current_selection.sort()
database().make_commit(self.current_selection)
self.current_selection = self.DEFAULT_SELECTION[:]
else:
self.current_selection = self.DEFAULT_SELECTION[:]
if self.SEND_DEFAULT_ON_EMPTY:
if len(self.DEFAULT_SELECTION) > 0:
database().make_commit(self.current_selection)
return True

# TODO
# Disable "Post" notification
elif user_input == self.POST:
"""self.post_notification = False
self.blink_thread.start()
self.dismiss_notification(self.POST_LED)
return False"""

# TODO
# Disable "Nagios" notification
elif user_input == self.NAGIOS:
"""self.nagios_notification = False
self.dismiss_notification(self.NAGIOS_LED)
return False"""

else:
# Compare input with all TOPICS
for i in range(len(self.TOPICS)):
if user_input == self.TOPICS[i]:
self.led_toggle_topic(i)
already_exists = False
# Delete if exists already
for j in range(len(self.current_selection)):
if user_input == self.current_selection[j]:
already_exists = True
self.current_selection.pop(j)
print(user_input + " -")
break
# Add if does not exits yet
if not already_exists:
self.current_selection.append(user_input)
print(user_input + " +")
# Sort array
# Same order as in TOPICS
self.temp_selection =
for l in range(len(self.TOPICS)):
self.temp_selection.append("")
for i in range(len(self.TOPICS)):
for j in range(len(self.current_selection)):
if self.TOPICS[i] == self.current_selection[j]:
self.temp_selection[i]=self.TOPICS[i]
self.temp_selection = list(filter(None, self.temp_selection))
self.current_selection = self.temp_selection[:]
return False

def wait_for_input(self):
"""Wartet auf Input und gibt an handle_selection weiter."""

done_with_input = False
while not done_with_input:
user_input = None
while user_input is None:
user_input = Matrix().get_button()
done_with_input = self.handle_selection(user_input)
time.sleep(self.SLEEP)
return

if __name__ == '__main__':
GPIO.setwarnings(False)
database().external_prepare_table()
while True:
Controller().wait_for_input()
GPIO.cleanup()
sys.exit(0)


database_commit.py:



#!/usr/bin/python
# -*- coding: UTF-8 -*-

import datetime
import time
from configparser import ConfigParser
from mysql.connector import MySQLConnection, Error

class DatabaseCommit():

def read_config(section, filename='config.ini'):
""" Reads config.ini and returns dictionary with config from section
:param filename: name of config file
:param section: name of section in config file
:return: dictionary with database settings
"""

parser = ConfigParser()
parser.read(filename)
config =
if parser.has_section(section):
items = parser.items(section)
for item in items:
config[item[0]] = item[1].replace(" ", "")
else:
raise Exception('0 not found in 1 '.format(section, filename))

return config

def establish_connection(self):
db_config = DatabaseCommit.read_config('mysql')
conn = MySQLConnection(**db_config)
if conn.is_connected():
return conn
else:
print("Connection failed")

def prepare_table(self, cursor):
# TODO check for existing tables, add any if neccessary, fill with 0
database_name = DatabaseCommit.read_config('mysql')['database'].replace(" ", "")
table_name = DatabaseCommit.read_config('table')['table_name'].replace(" ", "")
counter_column = DatabaseCommit.read_config('table')['counter_column'].replace(" ", "")
date_column = DatabaseCommit.read_config('table')['date_column'].replace(" ", "")
sql = 'CREATE TABLE IF NOT EXISTS '+ table_name +' ('
sql += counter_column +' INT PRIMARY KEY AUTO_INCREMENT, '
sql += date_column +' DATETIME'
for topic in DatabaseCommit.read_config('selection')['topics'].replace(" ", "").split(','):
topic = topic.lower()
sql += ', '+ topic +' BOOLEAN NOT NULL DEFAULT 0'
sql += ');'
print("n"+sql+"n")
cursor.execute(sql)
sql = ''
return cursor

def external_prepare_table(self):
conn = self.establish_connection()
cursor = conn.cursor()
self.prepare_table(cursor)

def insert_user_input(self, user_input, cursor, table_name):
date_column = DatabaseCommit.read_config('table')['date_column']
sql = 'INSERT INTO ' + table_name + '(' + date_column
for topic in user_input:
sql += ','+ topic
sql += ')'
sql += 'VALUES(NOW()'
for topic in user_input:
sql += ','+'TRUE'
sql += ')'
cursor.execute(sql)

def make_commit(self, user_input):
conn = self.establish_connection()
cursor = conn.cursor()
#self.prepare_table(cursor)
table_name = DatabaseCommit.read_config('table')['table_name']
self.insert_user_input(user_input, cursor, table_name)
cursor.execute('COMMIT;')
print("made commit :")
print(user_input)
cursor.close()
conn.close()


config.ini:
Topic names and other info have been changed for privacy purposes



[gpio]
button_rows = 4, 17
button_columns = 18, 23, 24, 25, 12, 16
topics_led = 2, 3, 27, 22, 10, 9
actions_led = 11, 5, 6, 13, 19, 26

[selection]
topics = topic_1, topic_2, topic_3, topic_4, topic_5, topic_6
actions = Cancel, Commit, Post, Rundgang, unassigned_1, unassigned_2
default_selection = topic_2
sleeptime = .3
blinktime = .2
send_default_on_empty = True

[schedule]
post = 11:30
nagios = 09:30

[mysql]
host = 127.0.0.1
database = mydatabase
user = rpi
password = password

[table]
table_name = test
counter_column = number
date_column = date






share|improve this question



















  • Is there a reason you need this many buttons?
    – Mast
    Mar 2 at 15:19










  • @Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
    – Orphevs
    Mar 2 at 17:37










  • Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
    – A. Romeu
    Mar 3 at 0:31












up vote
7
down vote

favorite









up vote
7
down vote

favorite











My project consists of a Raspberry Pi 2B V1.1 connected to a number of push-buttons via GPIO pins. The goal is to create a panel of buttons, allowing the user to make a selection of one or more items from a list, and then either cancel or commit their selection.
I am new to Python, and have only worked a little with Java and JS before.



The code is split into three modules:




  • matrix.py: Gets user input from the hardware

    The buttons are connected in a 2D matrix to save available pins. Currently; there are 6 columns and 2 rows; the first row contains the 6 choices, the second row 4 action buttons.


  • controller.py: Contains button logic, handles selection

    The module reads the config.ini for information on how to handle the input from matrix.py. This includes selecting/deselecting items, switching the LEDs on/off and forwarding the selection to database_commit.py.


  • database_commit.py: Puts given input into a database

    Received information is commited to a database according to config.ini.

as well as a config.ini file. In the following, the selectable items are called topics. "cancel", "commit", and the notification buttons are actions.
As I developed this while learning Python, the project is not well structured. What conceptual changes do I need to make in order to provide good readability, maintainability and extendability? What else should I improve?

The finished project will be in place for a long time, and I will not be able to maintain it; someone else will have to. I will supply a readme for the hardware maintenance, and have added info about the code there too. In my opinion, the code should be easily understandable without a manual though.



Note: The comments were translated to english. If anything is unclear, I'll gladly clarify.




matrix.py



# -*- coding: UTF-8 -*-

"""Reads input from GPIO with a reduced number of pins."""

import time
import RPi.GPIO as GPIO
from configparser import ConfigParser

class Matrix():

def __init__(self):
"""Sets up constants and pin numbers."""
# BCM (Broadcom) numbers are used
GPIO.setmode(GPIO.BCM)
parser = ConfigParser()
filename = 'config.ini'
parser.read(filename)
gpio = parser['gpio']
selection = parser['selection']
self.rows = list(map(int, gpio['button_rows'].split(',')))
self.columns = list(map(int, gpio['button_columns'].split(',')))
topics = selection['topics'].replace(" ","").split(',')
actions = selection['actions'].replace(" ","").split(',')
while len(actions) <= len(topics):
actions.append('unassigned')
self.options_array = [
topics,
actions
]

def get_button(self):
"""Findet gedrueckte Knoepfe und returnt Koordinaten falls gefunden."""

# To search for the row of a pressed button:
# - set all rows to input
# - set all columns to output low
# - save index in row_position

row_position = -1

for i in range(len(self.columns)):
GPIO.setup(self.columns[i], GPIO.OUT)
GPIO.output(self.columns[i], GPIO.LOW)

for i in range(len(self.rows)):
GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)

for i in range(len(self.rows)):
temp_read = GPIO.input(self.rows[i])
if temp_read == 0:
row_position = i

# cancel if no input was found
if row_position < 0 or row_position >= len(self.rows):
self.refresh()
return

# To search for the column of a pressed button in the row:
# - set all columns to input
# - rows[row_position] to output

column_position = -1

for j in range(len(self.columns)):
GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)

GPIO.setup(self.rows[row_position], GPIO.OUT)
GPIO.output(self.rows[row_position], GPIO.HIGH)

for j in range(len(self.columns)):
temp_read = GPIO.input(self.columns[j])
if temp_read == 1:
column_position = j

# cancel if no input was found
if column_position < 0 or column_position >= len(self.columns):
self.refresh()
return

# returns coordinates of pressed button
self.refresh()
return self.options_array[row_position][column_position]

def refresh(self):
"""Resets all pin states."""
for i in range(len(self.rows)):
GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
for j in range(len(self.columns)):
GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_UP)

if __name__ == '__main__':
# For testing matrix functionality
while True:
user_input = None
while user_input is None:
user_input = Matrix().get_button()
print(user_input)
time.sleep(.2)


controller.py:
The TODOs are there to hint the concept and will be implemented later



#!/usr/bin/python3

"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""

import time
import signal
import sys
import threading
import RPi.GPIO as GPIO
from threading import Timer
from configparser import ConfigParser
from matrix import Matrix
from database_commit import DatabaseCommit as database

class Controller():
"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
def __init__(self):
"""Sets up constants."""
# Use Broadcom-Numbering
GPIO.setmode(GPIO.BCM)

# Get constants from config.ini
parser = ConfigParser()
filename = 'config.ini'
parser.read(filename)

# Pin adresses
gpio = parser['gpio']

self.TOPICS_LED = list(map(int, gpio['topics_led'].split(',')))
self.ACTIONS_LED = list(map(int, gpio['actions_led'].split(',')))
self.CANCEL_LED = self.ACTIONS_LED[0]
self.CONFIRM_LED = self.ACTIONS_LED[1]
self.POST_LED = self.ACTIONS_LED[2]
self.NAGIOS_LED = self.ACTIONS_LED[3]

# names, defaults, timers
selection = parser['selection']

self.TOPICS = selection['topics'].replace(" ", "").split(',')
self.ACTIONS = selection['actions'].replace(" ", "").split(',')
self.CANCEL = self.ACTIONS[0]
self.CONFIRM = self.ACTIONS[1]
self.POST = self.ACTIONS[2]
self.NAGIOS = self.ACTIONS[3]
self.DEFAULT_SELECTION = selection['default_selection'].replace(" ", "").split(',')
self.DEFAULT_LED_STATE = [False] * len(self.TOPICS_LED)
self.SLEEP = float(selection['sleeptime'])
self.BLINK = float(selection['blinktime'])
self.SEND_DEFAULT_ON_EMPTY = bool(selection['send_default_on_empty'])

# reminder schedule
schedule = parser['schedule']

self.POST_TIME = time.strptime(schedule['post'].replace(":", " "), '%H %M')
self.NAGIOS_TIME = time.strptime(schedule['nagios'].replace(":", " "), '%H %M')

self.current_selection = self.DEFAULT_SELECTION[:]
self.led_state = self.DEFAULT_LED_STATE[:]

# Setup the LED pins
for i in range(len(self.TOPICS_LED)):
GPIO.setup(self.TOPICS_LED[i], GPIO.OUT)
GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
for i in range(len(self.ACTIONS_LED)):
GPIO.setup(self.ACTIONS_LED[i], GPIO.OUT)
GPIO.output(self.ACTIONS_LED[i], GPIO.LOW)

# Switch on default selection LEDs
for i in range(len(self.DEFAULT_SELECTION)):
for j in range(len(self.TOPICS)):
if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
self.led_state[j] = True

# Notification status
self.post_notification = False
self.nagios_notification = False



def led_reset_topics(self):
"""Disable all LEDs and enable default selection LEDs"""
# All off
for i in range(len(self.TOPICS_LED)):
GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
# Default an
for i in range(len(self.DEFAULT_SELECTION)):
for j in range(len(self.TOPICS)):
if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
self.led_state[j] = True
return

def continous_led_blink(blink_event, blink_thread, led):
"""Blink action LED continuously"""
while not blink_event.isSet():
GPIO.output(led, GPIO.HIGH)
time.sleep(self.BLINK)
event_is_set = blink_event.wait(blink_thread)
if event_is_set:
GPIO.output(led, GPIO.LOW)
else:
GPIO.output(led, GPIO.LOW)
time.sleep(self.BLINK)

def led_blink(self, led_pin):
"""Blink action LED
"""
t = Timer(self.BLINK, GPIO.output, [led_pin, GPIO.LOW])
GPIO.output(led_pin, GPIO.HIGH)
#time.sleep(.2)
#GPIO.output(led_pin, GPIO.LOW)
t.start()
return

def dismiss_notification(self, led_pin):
"""Disables notification LED
:param led_pin: GPIO pin number
"""
GPIO.output(led_pin, GPIO.LOW)


def led_toggle_topic(self, led_index):
"""Toggles LED state of led_index
:param led_index: Index in TOPICS_LED; contains GPIO pin number"""
self.led_state[led_index] = not self.led_state[led_index]
if self.led_state[led_index]:
GPIO.output(self.TOPICS_LED[led_index], GPIO.HIGH)
else:
GPIO.output(self.TOPICS_LED[led_index], GPIO.LOW)
return

def handle_selection(self, user_input):
"""
Handles de/selection of topics
Returns False if selection is altered
Returns True if CANCEL or COMMIT is chosen
:param user_input: String, name of pressed button
"""

# If cancelled, reset to default selection
if user_input == self.CANCEL:
self.led_blink(self.CANCEL_LED)
self.current_selection = self.DEFAULT_SELECTION[:]
self.led_reset_topics()
return True

# If selection is non-empty, commit selection
# - selection array is always sorted
# If selection is empty, reset to default selection
# and send default selection if SEND_DEFAULT_ON_EMPTY is True
elif user_input == self.CONFIRM:
self.led_blink(self.CONFIRM_LED)
if self.current_selection != :
#self.current_selection.sort()
database().make_commit(self.current_selection)
self.current_selection = self.DEFAULT_SELECTION[:]
else:
self.current_selection = self.DEFAULT_SELECTION[:]
if self.SEND_DEFAULT_ON_EMPTY:
if len(self.DEFAULT_SELECTION) > 0:
database().make_commit(self.current_selection)
return True

# TODO
# Disable "Post" notification
elif user_input == self.POST:
"""self.post_notification = False
self.blink_thread.start()
self.dismiss_notification(self.POST_LED)
return False"""

# TODO
# Disable "Nagios" notification
elif user_input == self.NAGIOS:
"""self.nagios_notification = False
self.dismiss_notification(self.NAGIOS_LED)
return False"""

else:
# Compare input with all TOPICS
for i in range(len(self.TOPICS)):
if user_input == self.TOPICS[i]:
self.led_toggle_topic(i)
already_exists = False
# Delete if exists already
for j in range(len(self.current_selection)):
if user_input == self.current_selection[j]:
already_exists = True
self.current_selection.pop(j)
print(user_input + " -")
break
# Add if does not exits yet
if not already_exists:
self.current_selection.append(user_input)
print(user_input + " +")
# Sort array
# Same order as in TOPICS
self.temp_selection =
for l in range(len(self.TOPICS)):
self.temp_selection.append("")
for i in range(len(self.TOPICS)):
for j in range(len(self.current_selection)):
if self.TOPICS[i] == self.current_selection[j]:
self.temp_selection[i]=self.TOPICS[i]
self.temp_selection = list(filter(None, self.temp_selection))
self.current_selection = self.temp_selection[:]
return False

def wait_for_input(self):
"""Wartet auf Input und gibt an handle_selection weiter."""

done_with_input = False
while not done_with_input:
user_input = None
while user_input is None:
user_input = Matrix().get_button()
done_with_input = self.handle_selection(user_input)
time.sleep(self.SLEEP)
return

if __name__ == '__main__':
GPIO.setwarnings(False)
database().external_prepare_table()
while True:
Controller().wait_for_input()
GPIO.cleanup()
sys.exit(0)


database_commit.py:



#!/usr/bin/python
# -*- coding: UTF-8 -*-

import datetime
import time
from configparser import ConfigParser
from mysql.connector import MySQLConnection, Error

class DatabaseCommit():

def read_config(section, filename='config.ini'):
""" Reads config.ini and returns dictionary with config from section
:param filename: name of config file
:param section: name of section in config file
:return: dictionary with database settings
"""

parser = ConfigParser()
parser.read(filename)
config =
if parser.has_section(section):
items = parser.items(section)
for item in items:
config[item[0]] = item[1].replace(" ", "")
else:
raise Exception('0 not found in 1 '.format(section, filename))

return config

def establish_connection(self):
db_config = DatabaseCommit.read_config('mysql')
conn = MySQLConnection(**db_config)
if conn.is_connected():
return conn
else:
print("Connection failed")

def prepare_table(self, cursor):
# TODO check for existing tables, add any if neccessary, fill with 0
database_name = DatabaseCommit.read_config('mysql')['database'].replace(" ", "")
table_name = DatabaseCommit.read_config('table')['table_name'].replace(" ", "")
counter_column = DatabaseCommit.read_config('table')['counter_column'].replace(" ", "")
date_column = DatabaseCommit.read_config('table')['date_column'].replace(" ", "")
sql = 'CREATE TABLE IF NOT EXISTS '+ table_name +' ('
sql += counter_column +' INT PRIMARY KEY AUTO_INCREMENT, '
sql += date_column +' DATETIME'
for topic in DatabaseCommit.read_config('selection')['topics'].replace(" ", "").split(','):
topic = topic.lower()
sql += ', '+ topic +' BOOLEAN NOT NULL DEFAULT 0'
sql += ');'
print("n"+sql+"n")
cursor.execute(sql)
sql = ''
return cursor

def external_prepare_table(self):
conn = self.establish_connection()
cursor = conn.cursor()
self.prepare_table(cursor)

def insert_user_input(self, user_input, cursor, table_name):
date_column = DatabaseCommit.read_config('table')['date_column']
sql = 'INSERT INTO ' + table_name + '(' + date_column
for topic in user_input:
sql += ','+ topic
sql += ')'
sql += 'VALUES(NOW()'
for topic in user_input:
sql += ','+'TRUE'
sql += ')'
cursor.execute(sql)

def make_commit(self, user_input):
conn = self.establish_connection()
cursor = conn.cursor()
#self.prepare_table(cursor)
table_name = DatabaseCommit.read_config('table')['table_name']
self.insert_user_input(user_input, cursor, table_name)
cursor.execute('COMMIT;')
print("made commit :")
print(user_input)
cursor.close()
conn.close()


config.ini:
Topic names and other info have been changed for privacy purposes



[gpio]
button_rows = 4, 17
button_columns = 18, 23, 24, 25, 12, 16
topics_led = 2, 3, 27, 22, 10, 9
actions_led = 11, 5, 6, 13, 19, 26

[selection]
topics = topic_1, topic_2, topic_3, topic_4, topic_5, topic_6
actions = Cancel, Commit, Post, Rundgang, unassigned_1, unassigned_2
default_selection = topic_2
sleeptime = .3
blinktime = .2
send_default_on_empty = True

[schedule]
post = 11:30
nagios = 09:30

[mysql]
host = 127.0.0.1
database = mydatabase
user = rpi
password = password

[table]
table_name = test
counter_column = number
date_column = date






share|improve this question











My project consists of a Raspberry Pi 2B V1.1 connected to a number of push-buttons via GPIO pins. The goal is to create a panel of buttons, allowing the user to make a selection of one or more items from a list, and then either cancel or commit their selection.
I am new to Python, and have only worked a little with Java and JS before.



The code is split into three modules:




  • matrix.py: Gets user input from the hardware

    The buttons are connected in a 2D matrix to save available pins. Currently; there are 6 columns and 2 rows; the first row contains the 6 choices, the second row 4 action buttons.


  • controller.py: Contains button logic, handles selection

    The module reads the config.ini for information on how to handle the input from matrix.py. This includes selecting/deselecting items, switching the LEDs on/off and forwarding the selection to database_commit.py.


  • database_commit.py: Puts given input into a database

    Received information is commited to a database according to config.ini.

as well as a config.ini file. In the following, the selectable items are called topics. "cancel", "commit", and the notification buttons are actions.
As I developed this while learning Python, the project is not well structured. What conceptual changes do I need to make in order to provide good readability, maintainability and extendability? What else should I improve?

The finished project will be in place for a long time, and I will not be able to maintain it; someone else will have to. I will supply a readme for the hardware maintenance, and have added info about the code there too. In my opinion, the code should be easily understandable without a manual though.



Note: The comments were translated to english. If anything is unclear, I'll gladly clarify.




matrix.py



# -*- coding: UTF-8 -*-

"""Reads input from GPIO with a reduced number of pins."""

import time
import RPi.GPIO as GPIO
from configparser import ConfigParser

class Matrix():

def __init__(self):
"""Sets up constants and pin numbers."""
# BCM (Broadcom) numbers are used
GPIO.setmode(GPIO.BCM)
parser = ConfigParser()
filename = 'config.ini'
parser.read(filename)
gpio = parser['gpio']
selection = parser['selection']
self.rows = list(map(int, gpio['button_rows'].split(',')))
self.columns = list(map(int, gpio['button_columns'].split(',')))
topics = selection['topics'].replace(" ","").split(',')
actions = selection['actions'].replace(" ","").split(',')
while len(actions) <= len(topics):
actions.append('unassigned')
self.options_array = [
topics,
actions
]

def get_button(self):
"""Findet gedrueckte Knoepfe und returnt Koordinaten falls gefunden."""

# To search for the row of a pressed button:
# - set all rows to input
# - set all columns to output low
# - save index in row_position

row_position = -1

for i in range(len(self.columns)):
GPIO.setup(self.columns[i], GPIO.OUT)
GPIO.output(self.columns[i], GPIO.LOW)

for i in range(len(self.rows)):
GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)

for i in range(len(self.rows)):
temp_read = GPIO.input(self.rows[i])
if temp_read == 0:
row_position = i

# cancel if no input was found
if row_position < 0 or row_position >= len(self.rows):
self.refresh()
return

# To search for the column of a pressed button in the row:
# - set all columns to input
# - rows[row_position] to output

column_position = -1

for j in range(len(self.columns)):
GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)

GPIO.setup(self.rows[row_position], GPIO.OUT)
GPIO.output(self.rows[row_position], GPIO.HIGH)

for j in range(len(self.columns)):
temp_read = GPIO.input(self.columns[j])
if temp_read == 1:
column_position = j

# cancel if no input was found
if column_position < 0 or column_position >= len(self.columns):
self.refresh()
return

# returns coordinates of pressed button
self.refresh()
return self.options_array[row_position][column_position]

def refresh(self):
"""Resets all pin states."""
for i in range(len(self.rows)):
GPIO.setup(self.rows[i], GPIO.IN, pull_up_down=GPIO.PUD_UP)
for j in range(len(self.columns)):
GPIO.setup(self.columns[j], GPIO.IN, pull_up_down=GPIO.PUD_UP)

if __name__ == '__main__':
# For testing matrix functionality
while True:
user_input = None
while user_input is None:
user_input = Matrix().get_button()
print(user_input)
time.sleep(.2)


controller.py:
The TODOs are there to hint the concept and will be implemented later



#!/usr/bin/python3

"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""

import time
import signal
import sys
import threading
import RPi.GPIO as GPIO
from threading import Timer
from configparser import ConfigParser
from matrix import Matrix
from database_commit import DatabaseCommit as database

class Controller():
"""Waits for input from matrix.py and hands over selection to database_commit.py on confirmation."""
def __init__(self):
"""Sets up constants."""
# Use Broadcom-Numbering
GPIO.setmode(GPIO.BCM)

# Get constants from config.ini
parser = ConfigParser()
filename = 'config.ini'
parser.read(filename)

# Pin adresses
gpio = parser['gpio']

self.TOPICS_LED = list(map(int, gpio['topics_led'].split(',')))
self.ACTIONS_LED = list(map(int, gpio['actions_led'].split(',')))
self.CANCEL_LED = self.ACTIONS_LED[0]
self.CONFIRM_LED = self.ACTIONS_LED[1]
self.POST_LED = self.ACTIONS_LED[2]
self.NAGIOS_LED = self.ACTIONS_LED[3]

# names, defaults, timers
selection = parser['selection']

self.TOPICS = selection['topics'].replace(" ", "").split(',')
self.ACTIONS = selection['actions'].replace(" ", "").split(',')
self.CANCEL = self.ACTIONS[0]
self.CONFIRM = self.ACTIONS[1]
self.POST = self.ACTIONS[2]
self.NAGIOS = self.ACTIONS[3]
self.DEFAULT_SELECTION = selection['default_selection'].replace(" ", "").split(',')
self.DEFAULT_LED_STATE = [False] * len(self.TOPICS_LED)
self.SLEEP = float(selection['sleeptime'])
self.BLINK = float(selection['blinktime'])
self.SEND_DEFAULT_ON_EMPTY = bool(selection['send_default_on_empty'])

# reminder schedule
schedule = parser['schedule']

self.POST_TIME = time.strptime(schedule['post'].replace(":", " "), '%H %M')
self.NAGIOS_TIME = time.strptime(schedule['nagios'].replace(":", " "), '%H %M')

self.current_selection = self.DEFAULT_SELECTION[:]
self.led_state = self.DEFAULT_LED_STATE[:]

# Setup the LED pins
for i in range(len(self.TOPICS_LED)):
GPIO.setup(self.TOPICS_LED[i], GPIO.OUT)
GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
for i in range(len(self.ACTIONS_LED)):
GPIO.setup(self.ACTIONS_LED[i], GPIO.OUT)
GPIO.output(self.ACTIONS_LED[i], GPIO.LOW)

# Switch on default selection LEDs
for i in range(len(self.DEFAULT_SELECTION)):
for j in range(len(self.TOPICS)):
if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
self.led_state[j] = True

# Notification status
self.post_notification = False
self.nagios_notification = False



def led_reset_topics(self):
"""Disable all LEDs and enable default selection LEDs"""
# All off
for i in range(len(self.TOPICS_LED)):
GPIO.output(self.TOPICS_LED[i], GPIO.LOW)
# Default an
for i in range(len(self.DEFAULT_SELECTION)):
for j in range(len(self.TOPICS)):
if self.DEFAULT_SELECTION[i] == self.TOPICS[j]:
GPIO.output(self.TOPICS_LED[j], GPIO.HIGH)
self.led_state[j] = True
return

def continous_led_blink(blink_event, blink_thread, led):
"""Blink action LED continuously"""
while not blink_event.isSet():
GPIO.output(led, GPIO.HIGH)
time.sleep(self.BLINK)
event_is_set = blink_event.wait(blink_thread)
if event_is_set:
GPIO.output(led, GPIO.LOW)
else:
GPIO.output(led, GPIO.LOW)
time.sleep(self.BLINK)

def led_blink(self, led_pin):
"""Blink action LED
"""
t = Timer(self.BLINK, GPIO.output, [led_pin, GPIO.LOW])
GPIO.output(led_pin, GPIO.HIGH)
#time.sleep(.2)
#GPIO.output(led_pin, GPIO.LOW)
t.start()
return

def dismiss_notification(self, led_pin):
"""Disables notification LED
:param led_pin: GPIO pin number
"""
GPIO.output(led_pin, GPIO.LOW)


def led_toggle_topic(self, led_index):
"""Toggles LED state of led_index
:param led_index: Index in TOPICS_LED; contains GPIO pin number"""
self.led_state[led_index] = not self.led_state[led_index]
if self.led_state[led_index]:
GPIO.output(self.TOPICS_LED[led_index], GPIO.HIGH)
else:
GPIO.output(self.TOPICS_LED[led_index], GPIO.LOW)
return

def handle_selection(self, user_input):
"""
Handles de/selection of topics
Returns False if selection is altered
Returns True if CANCEL or COMMIT is chosen
:param user_input: String, name of pressed button
"""

# If cancelled, reset to default selection
if user_input == self.CANCEL:
self.led_blink(self.CANCEL_LED)
self.current_selection = self.DEFAULT_SELECTION[:]
self.led_reset_topics()
return True

# If selection is non-empty, commit selection
# - selection array is always sorted
# If selection is empty, reset to default selection
# and send default selection if SEND_DEFAULT_ON_EMPTY is True
elif user_input == self.CONFIRM:
self.led_blink(self.CONFIRM_LED)
if self.current_selection != :
#self.current_selection.sort()
database().make_commit(self.current_selection)
self.current_selection = self.DEFAULT_SELECTION[:]
else:
self.current_selection = self.DEFAULT_SELECTION[:]
if self.SEND_DEFAULT_ON_EMPTY:
if len(self.DEFAULT_SELECTION) > 0:
database().make_commit(self.current_selection)
return True

# TODO
# Disable "Post" notification
elif user_input == self.POST:
"""self.post_notification = False
self.blink_thread.start()
self.dismiss_notification(self.POST_LED)
return False"""

# TODO
# Disable "Nagios" notification
elif user_input == self.NAGIOS:
"""self.nagios_notification = False
self.dismiss_notification(self.NAGIOS_LED)
return False"""

else:
# Compare input with all TOPICS
for i in range(len(self.TOPICS)):
if user_input == self.TOPICS[i]:
self.led_toggle_topic(i)
already_exists = False
# Delete if exists already
for j in range(len(self.current_selection)):
if user_input == self.current_selection[j]:
already_exists = True
self.current_selection.pop(j)
print(user_input + " -")
break
# Add if does not exits yet
if not already_exists:
self.current_selection.append(user_input)
print(user_input + " +")
# Sort array
# Same order as in TOPICS
self.temp_selection =
for l in range(len(self.TOPICS)):
self.temp_selection.append("")
for i in range(len(self.TOPICS)):
for j in range(len(self.current_selection)):
if self.TOPICS[i] == self.current_selection[j]:
self.temp_selection[i]=self.TOPICS[i]
self.temp_selection = list(filter(None, self.temp_selection))
self.current_selection = self.temp_selection[:]
return False

def wait_for_input(self):
"""Wartet auf Input und gibt an handle_selection weiter."""

done_with_input = False
while not done_with_input:
user_input = None
while user_input is None:
user_input = Matrix().get_button()
done_with_input = self.handle_selection(user_input)
time.sleep(self.SLEEP)
return

if __name__ == '__main__':
GPIO.setwarnings(False)
database().external_prepare_table()
while True:
Controller().wait_for_input()
GPIO.cleanup()
sys.exit(0)


database_commit.py:



#!/usr/bin/python
# -*- coding: UTF-8 -*-

import datetime
import time
from configparser import ConfigParser
from mysql.connector import MySQLConnection, Error

class DatabaseCommit():

def read_config(section, filename='config.ini'):
""" Reads config.ini and returns dictionary with config from section
:param filename: name of config file
:param section: name of section in config file
:return: dictionary with database settings
"""

parser = ConfigParser()
parser.read(filename)
config =
if parser.has_section(section):
items = parser.items(section)
for item in items:
config[item[0]] = item[1].replace(" ", "")
else:
raise Exception('0 not found in 1 '.format(section, filename))

return config

def establish_connection(self):
db_config = DatabaseCommit.read_config('mysql')
conn = MySQLConnection(**db_config)
if conn.is_connected():
return conn
else:
print("Connection failed")

def prepare_table(self, cursor):
# TODO check for existing tables, add any if neccessary, fill with 0
database_name = DatabaseCommit.read_config('mysql')['database'].replace(" ", "")
table_name = DatabaseCommit.read_config('table')['table_name'].replace(" ", "")
counter_column = DatabaseCommit.read_config('table')['counter_column'].replace(" ", "")
date_column = DatabaseCommit.read_config('table')['date_column'].replace(" ", "")
sql = 'CREATE TABLE IF NOT EXISTS '+ table_name +' ('
sql += counter_column +' INT PRIMARY KEY AUTO_INCREMENT, '
sql += date_column +' DATETIME'
for topic in DatabaseCommit.read_config('selection')['topics'].replace(" ", "").split(','):
topic = topic.lower()
sql += ', '+ topic +' BOOLEAN NOT NULL DEFAULT 0'
sql += ');'
print("n"+sql+"n")
cursor.execute(sql)
sql = ''
return cursor

def external_prepare_table(self):
conn = self.establish_connection()
cursor = conn.cursor()
self.prepare_table(cursor)

def insert_user_input(self, user_input, cursor, table_name):
date_column = DatabaseCommit.read_config('table')['date_column']
sql = 'INSERT INTO ' + table_name + '(' + date_column
for topic in user_input:
sql += ','+ topic
sql += ')'
sql += 'VALUES(NOW()'
for topic in user_input:
sql += ','+'TRUE'
sql += ')'
cursor.execute(sql)

def make_commit(self, user_input):
conn = self.establish_connection()
cursor = conn.cursor()
#self.prepare_table(cursor)
table_name = DatabaseCommit.read_config('table')['table_name']
self.insert_user_input(user_input, cursor, table_name)
cursor.execute('COMMIT;')
print("made commit :")
print(user_input)
cursor.close()
conn.close()


config.ini:
Topic names and other info have been changed for privacy purposes



[gpio]
button_rows = 4, 17
button_columns = 18, 23, 24, 25, 12, 16
topics_led = 2, 3, 27, 22, 10, 9
actions_led = 11, 5, 6, 13, 19, 26

[selection]
topics = topic_1, topic_2, topic_3, topic_4, topic_5, topic_6
actions = Cancel, Commit, Post, Rundgang, unassigned_1, unassigned_2
default_selection = topic_2
sleeptime = .3
blinktime = .2
send_default_on_empty = True

[schedule]
post = 11:30
nagios = 09:30

[mysql]
host = 127.0.0.1
database = mydatabase
user = rpi
password = password

[table]
table_name = test
counter_column = number
date_column = date








share|improve this question










share|improve this question




share|improve this question









asked Mar 2 at 14:09









Orphevs

1363




1363











  • Is there a reason you need this many buttons?
    – Mast
    Mar 2 at 15:19










  • @Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
    – Orphevs
    Mar 2 at 17:37










  • Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
    – A. Romeu
    Mar 3 at 0:31
















  • Is there a reason you need this many buttons?
    – Mast
    Mar 2 at 15:19










  • @Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
    – Orphevs
    Mar 2 at 17:37










  • Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
    – A. Romeu
    Mar 3 at 0:31















Is there a reason you need this many buttons?
– Mast
Mar 2 at 15:19




Is there a reason you need this many buttons?
– Mast
Mar 2 at 15:19












@Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
– Orphevs
Mar 2 at 17:37




@Mast Yes, the 6 selections plus 2 actions and 2 special buttons are part of the requirement. The intended purpose cannot be fulfilled with fewer buttons.
– Orphevs
Mar 2 at 17:37












Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
– A. Romeu
Mar 3 at 0:31




Will definitely recommend you to do a small reading on SQLAlchemy to have an easier way to interact with the database and maintain much less boilerplate code for db operations sqlalchemy.org
– A. Romeu
Mar 3 at 0:31















active

oldest

votes











Your Answer




StackExchange.ifUsing("editor", function ()
return StackExchange.using("mathjaxEditing", function ()
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix)
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
);
);
, "mathjax-editing");

StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "196"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
convertImagesToLinks: false,
noModals: false,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);








 

draft saved


draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f188680%2fpython-rpi-button-board%23new-answer', 'question_page');

);

Post as a guest



































active

oldest

votes













active

oldest

votes









active

oldest

votes






active

oldest

votes










 

draft saved


draft discarded


























 


draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f188680%2fpython-rpi-button-board%23new-answer', 'question_page');

);

Post as a guest













































































Popular posts from this blog

Greedy Best First Search implementation in Rust

Function to Return a JSON Like Objects Using VBA Collections and Arrays

C++11 CLH Lock Implementation