From 93914d500e766b6076fb3deb3d4a2f09c87051b1 Mon Sep 17 00:00:00 2001 From: beachbumbob <42321005+beachbumbob@users.noreply.github.com> Date: Sat, 21 Dec 2024 16:38:02 +0100 Subject: [PATCH 1/3] Update README.md Simulate press a button --- README.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/README.md b/README.md index a788a49..5dfb538 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,17 @@ make the suitable changes and from the root directory of this repository, instal ```bash $ sudo ./scripts/install.sh ``` +## Simulate input + +This make it possible to simulate input to a GPIO pin, e.g. pressing a button. + +echo "[GPIO]:[HI|LOW]" > /dev/tcp/{host}/{port} + +The port is hardcoded to 5566 + +example: +echo "18:LOW" > /dev/tcp/192.168.122.143/5566 + ## Contribute From b4367ddb4262ced6ab86fdf6689e6aa63ce7367c Mon Sep 17 00:00:00 2001 From: beachbumbob <42321005+beachbumbob@users.noreply.github.com> Date: Sat, 21 Dec 2024 16:40:32 +0100 Subject: [PATCH 2/3] Add files via upload Simulate press a button --- Mock/PiBoard.py | 245 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 245 insertions(+) create mode 100644 Mock/PiBoard.py diff --git a/Mock/PiBoard.py b/Mock/PiBoard.py new file mode 100644 index 0000000..1d303c5 --- /dev/null +++ b/Mock/PiBoard.py @@ -0,0 +1,245 @@ +from threading import ThreadError +import threading +import time +import socket as sk +#from GPIO.py import GPIO + + +class Board: + channelConfigs = {} + channelEvents = {} + gpio_direction = {} + __instance = None + serviceThread = None + outFile = "/tmp/PiBoard.out" + pin_to_gpio_rev1 = {-1, -1, -1, 0, -1, 1, -1, 4, 14, -1, 15, 17, 18, 21, -1, 22, 23, -1, 24, 10, -1, 9, 25, 11, 8, -1, 7, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 } + pin_to_gpio_rev2 = {-1, -1, -1, 2, -1, 3, -1, 4, 14, -1, 15, 17, 18, 27, -1, 22, 23, -1, 24, 10, -1, 9, 25, 11, 8, -1, 7, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1 } + pin_to_gpio_rev3 = {-1, -1, -1, 2, -1, 3, -1, 4, 14, -1, 15, 17, 18, 27, -1, 22, 23, -1, 24, 10, -1, 9, 25, 11, 8, -1, 7, -1, -1, 5, -1, 6, 12, 13, -1, 19, 16, 26, 20, -1, 21 } + + + @staticmethod + def getInstance(): + """ Static access method. """ + if Board.__instance == None: + Board() + return Board.__instance + + def __init__(self): +# print('__init__') + if Board.__instance != None: + raise Exception("This class is a singleton!") + else: + Board.__instance = self + + i = 0 + while i < 54: + Board.__instance.gpio_direction[i] = -1 + i += 1 + ''' + if Board.__instance.rpiinfo.p1_revision == 1: + Board.__instance.pin_to_gpio = Board.__instance.pin_to_gpio_rev1 + elif Board.__instance.rpiinfo.p1_revision == 2: + Board.__instance.pin_to_gpio = Board.__instance.pin_to_gpio_rev2 + else # assume model B+ or A+ or 2B + ''' + # Based on GPIO.RPI_REVISION = 3 + Board.__instance.pin_to_gpio = Board.__instance.pin_to_gpio_rev3 + + Board.__instance.serviceThread = ServiceThread() + Board.__instance.serviceThread.setPiBoardCallback(Board.__instance.piBoardCallback) + Board.__instance.serviceThread.threadify() + + def piBoardCallback(_piBoardInstance, _value): + v=_value.decode("UTF-8") + print('piBoardCallback: ' + v) + global channelEvents + # This assumes that _value is in format {channel:[HI|LOW]}, i.e. 22:HI + values = v.split(":") + print('-'+ str(values[0])+'-' ) + tmpV=values[0] + print("tmpV: -" + tmpV + "-") + channel = int(tmpV) + edge = values[1] + + _dir = Board.__instance.gpio_direction[int(channel)] + + if not _dir == -1 and not _dir == 1: + raise ValueError("Wrong direction for " + channel) + + edge_dec = -1 + print('edge: -'+edge.rstrip() + '-' ) + if edge.rstrip() == "LOW": + edge_dec = 0 + elif edge.rstrip() == "HI": + edge_dec = 1 + else: + raise ValueError("Edge must be either HI or LOW") + + event = _piBoardInstance.channelEvents[int(channel)] + cc = _piBoardInstance.channelConfigs[int(channel)] + # TODO: Handle logic on wether to call event callback or not. + if not int(cc.current) == edge_dec: + cc.current = edge_dec + _piBoardInstance.channelConfigs[int(channel)] = cc + event.eventCallback(event.channel) + + def setChannelConfig(_piBoardInstance, channel): + if channel != None: + _piBoardInstance.channelConfigs[channel.chanel] = channel + + def setChannelEvent(_piBoardInstance, _channel, _edge, _channelEventCallback): + + if _channel != None: + event = Event(_edge, _channelEventCallback, _channel) + _piBoardInstance.channelEvents[_channel] = event + + def cleanUp(__X): + if len(Board.__instance.channelEvents) == 0 and len(Board.__instance.channelConfigs): + t = Board.__instance.serviceThread.thread + print('t: ' + str(t)) + if t == None: + exit() + currThreadIdent = t.ident() + currThreadIdent = currThreadIdent + "kill" + t1 = thread_with_exception('Thread 1') + t1.start() + time.sleep(2) + t1.raise_exception() + t1.join() + if Board.__instance.serviceThread.thread.is_alive(): + raise ThreadError("Failed to stop " + currThreadIdent) + + def logToFile(_piBoardInstance,_channel,_value): + with open(_piBoardInstance.outFile, 'a') as f: + loggMsg = "event:" + str(Board.__instance) + ":" +str(_channel) + ":" + str(_value) + "\n" + print(loggMsg) + f.write(loggMsg) + f.close() + + +class Event: + + eventCallback = None + edge = None + channel = None + + def __init__(self, _edge, _eventCallback, _channel): + self.eventCallback = _eventCallback + self.edge = _edge + self.channel = _channel + + +class Service: + + serviceThreadCallback = None + + def __init__(self): + print(self) + + def listen(self, _serviceThreadCallback): + global serviceThreadCallback + serviceThreadCallback = _serviceThreadCallback + connection = sk.socket(sk.AF_INET, sk.SOCK_STREAM) + connection.setsockopt(sk.SOL_SOCKET, sk.SO_REUSEADDR, 1) + try: + connection.bind(('0.0.0.0', 5566)) + except sk.error: + print('sk.error - return 0') + return 0 + connection.listen(10) + while True: + current_connection, address = connection.accept() + while True: + data = current_connection.recv(2048) + + if data == 'quit\\n': + current_connection.shutdown(1) + current_connection.close() + break + + elif data == 'stop\\n': + current_connection.shutdown(1) + current_connection.close() + exit() + + elif data: + _serviceThreadCallback(data) + + else: + break + + def setCallback(_serviceThreadCallback): + global serviceThreadCallback + serviceThreadCallback = _serviceThreadCallback + + +class ServiceThread: + + thread = None + svc = None + piBoardCallback = None + + def __init__(self, interval=1): + self.interval = interval + + def run(self): + global piBoardCallback + self.svc = Service() + self.svc.listen(piBoardCallback) + + def setPiBoardCallback(_serviceThread, _piBoardCallback): + global piBoardCallback + piBoardCallback = _piBoardCallback + + def threadify(self): + global thread + thread = threading.Thread(target=self.run) + thread.daemon = True # Daemonize thread + thread.start() # Start the execution + + +def ext_callback(_event): + print("ext_callback") + print(__event.channel) + print(_event.edge) + print(_event.eventCallback) + + + + +### Only for testing + + +def getBoard(): + _rpib = Board.getInstance() + if _rpib == None: + _rpib = Board() + return _rpib + +class Channel: + def __init__(self,channel, direction, initial=0,pull_up_down=0): + self.chanel = channel + self.direction = direction + self.initial = initial + self.current = initial + self.pull_up_down = pull_up_down + + +if __name__ == '__main__': + + try: + while True: + +# GPIO.setmode(GPIO.BCM) +# GPIO.setup(22, GPIO.IN, 0, GPIO.PUD_UP) +# GPIO.add_event_detect(22, GPIO.FALLING, ext_callback, bouncetime=1500) + + + rpib = getBoard() +# rpib.setChannelConfig(Channel(22, 32, 0, 0)) +# rpib.setChannelEvent(22, 32, ext_callback) + rpib.logToFile(22, GPIO.HIGH) +# time.sleep(1000) + #rpib.cleanUp() + except KeyboardInterrupt: + pass From f164e8ab73709e4b1d1c88ce6d098eac5f3595a4 Mon Sep 17 00:00:00 2001 From: beachbumbob <42321005+beachbumbob@users.noreply.github.com> Date: Sat, 21 Dec 2024 16:42:41 +0100 Subject: [PATCH 3/3] Update GPIO.py Simulate press a button --- Mock/GPIO.py | 221 ++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 192 insertions(+), 29 deletions(-) diff --git a/Mock/GPIO.py b/Mock/GPIO.py index 4639fd4..e6aecc7 100644 --- a/Mock/GPIO.py +++ b/Mock/GPIO.py @@ -1,10 +1,13 @@ -""" -Mock Library for RPi.GPIO -""" +#Mock Library for RPi.GPIO + -import time import logging import os +import time +import types +# import yaml + +from RPi import PiBoard logger = logging.getLogger(__name__) @@ -39,6 +42,7 @@ IN = 1 LOW = 0 OUT = 0 +INPUT = 1 PUD_DOWN = 21 PUD_OFF = 20 PUD_UP = 22 @@ -49,7 +53,31 @@ SPI = 41 UNKNOWN = -1 VERSION = '0.7.0' - +PY_EVENT_CONST_OFFSET = 30 + +BCM2708_PERI_BASE_DEFAULT = 0x20000000 +BCM2709_PERI_BASE_DEFAULT = 0x3f000000 +GPIO_BASE_OFFSET = 0x200000 +FSEL_OFFSET = 0 # 0x0000 +SET_OFFSET = 7 # 0x001c / 4 +CLR_OFFSET = 10 # 0x0028 / 4 +PINLEVEL_OFFSET = 13 # 0x0034 / 4 +EVENT_DETECT_OFFSET = 16 # 0x0040 / 4 +RISING_ED_OFFSET = 19 # 0x004c / 4 +FALLING_ED_OFFSET = 22 # 0x0058 / 4 +HIGH_DETECT_OFFSET = 25 # 0x0064 / 4 +LOW_DETECT_OFFSET = 28 # 0x0070 / 4 +PULLUPDN_OFFSET = 37 # 0x0094 / 4 +PULLUPDNCLK_OFFSET = 38 # 0x0098 / 4 +PULLUPDN_OFFSET_2711_0 = 57 +PULLUPDN_OFFSET_2711_1 = 58 +PULLUPDN_OFFSET_2711_2 = 59 +PULLUPDN_OFFSET_2711_3 = 60 + +NO_EDGE = 0 +RISING_EDGE = 1 +FALLING_EDGE = 2 +BOTH_EDGE = 3 _mode = 0 channel_config = {} @@ -59,9 +87,10 @@ class Channel: def __init__(self,channel, direction, initial=0,pull_up_down=PUD_OFF): - self.channel = channel + self.chanel = channel self.direction = direction self.initial = initial + self.current = initial self.pull_up_down = pull_up_down @@ -72,7 +101,11 @@ def setmode(mode): BOARD - Use Raspberry Pi board numbers BCM - Use Broadcom GPIO 00..nn numbers """ + + board = getBoard() + # GPIO = GPIO() + time.sleep(1) if(mode == BCM): setModeDone = True _mode = mode @@ -87,26 +120,45 @@ def getmode(): Get numbering mode used for channel numbers. Returns BOARD, BCM or None """ + board = getBoard() return _mode def setwarnings(flag): """ Enable or disable warning messages """ - logger.info("Set warnings as {}".format(flag)) + board = getBoard() + logger.info("Set Warings as {}".format(flag)) def setup(channel, direction, initial=0,pull_up_down=PUD_OFF): """ - Set up a GPIO channel or list of channels with a direction and (optional) pull up/down control + Set up a GPIO channel or list of channels with a direction and (optional) pull/up down control channel - either board pin number or BCM number depending on which mode is set. direction - IN or OUT [pull_up_down] - PUD_OFF (default), PUD_UP or PUD_DOWN [initial] - Initial value for an output channel """ - logger.info("Setup channel : {} as {} with initial :{} and pull_up_down {}".format(channel,direction,initial,pull_up_down)) + logger.info("setup channel : {} as {} with intial :{} and pull_up_dowm {}".format(channel,direction,initial,pull_up_down)) + board = getBoard() global channel_config - channel_config[channel] = Channel(channel, direction, initial, pull_up_down) + + if isinstance(channel, list): + print("channel is list") + for c in channel: + print ("processing channel " + str(c)) + channel_config[c] = Channel(c, direction, initial, pull_up_down) + board.setChannelConfig(channel_config[c]) + board.gpio_direction[c] = direction + elif isinstance(channel, int): + print("channel is int") + channel_config[channel] = Channel(channel, direction, initial, pull_up_down) + board.setChannelConfig(channel_config[channel]) + board.gpio_direction[channel] = direction + else: + raise TypeError("channel is of wrong type: " + channel ) + + def output(channel, value): """ @@ -115,14 +167,17 @@ def output(channel, value): value - 0/1 or False/True or LOW/HIGH """ - logger.info("Output channel : {} with value : {}".format(channel, value)) + board = getBoard() + board.logToFile(channel,value) + logger.info("output channel : {} with value : {}".format(channel, value)) def input(channel): """ Input from a GPIO channel. Returns HIGH=1=True or LOW=0=False channel - either board pin number or BCM number depending on which mode is set. """ - logger.info("Reading from channel {}".format(channel)) + board = getBoard() + logger.info("reading from chanel {}".format(channel)) def wait_for_edge(channel,edge,bouncetime,timeout): """ @@ -132,10 +187,70 @@ def wait_for_edge(channel,edge,bouncetime,timeout): [bouncetime] - time allowed between calls to allow for switchbounce [timeout] - timeout in ms """ - logger.info("Waiting for edge : {} on channel : {} with bounce time : {} and Timeout :{}".format(edge,channel,bouncetime,timeout)) + board = getBoard() + + _gpio +# unsigned int gpio; +# int channel, edge, result; + _channel + _edge +# bouncetime = -666; // None + _bouncetime = None +# int timeout = -1; // None + _timeout = None + +# static char *kwlist[] = {"channel", "edge", "bouncetime", "timeout", None} + _kwlist = {} + _kwlist[0] = "channel" + _kwlist[1] = "edge" + _kwlist[2] = "bouncetime" + _kwlist[3] = "timeout" + _kwlist[4] = None + + if not PyArg_ParseTupleAndKeywords(args, _kwargs, "ii|ii", _kwlist, _channel, _edge, _bouncetime, _timeout): + return None + + # TODO verify this + _get_gpio_numbers = get_gpio_number(_channel, _gpio).split(":") + _gpio = int(_get_gpio_numbers[1]) + if int(_get_gpio_numbers[0]) == 0: + return None + +# check channel is setup as an input + + if not board.gpio_direction[_gpio] == INPUT: + raise RuntimeError("You must setup() the GPIO channel as an input first") + return None + +# // is edge a valid value? + _edge -= PY_EVENT_CONST_OFFSET + if not _edge == RISING_EDGE and not _edge == FALLING_EDGE and _edge == BOTH_EDGE: + raise ValueError("The edge must be set to RISING, FALLING or BOTH") + return None + + if _bouncetime <= 0 and not _bouncetime == None: + raise ValueError("Bouncetime must be greater than 0") + return None + + if _timeout <= 0 and not _timeout != None: + raise ValueError("Timeout must be greater than 0") + return None + + if _result == 0: + return None + elif _result == -1: + raise RuntimeError("Conflicting edge detection events already exist for this GPIO channel") + return None + elif _result == -2: + raise RuntimeError("Error waiting for edge") + return None + else: + return Py_BuildValue("i", _channel) + + logger.info("waiting for edge : {} on channel : {} with bounce time : {} and Timeout :{}".format(edge,channel,bouncetime,timeout)) -def add_event_detect(channel,edge,callback=None,bouncetime=None): +def add_event_detect(channel,edge,callback,bouncetime): """ Enable edge detection events for a particular GPIO channel. channel - either board pin number or BCM number depending on which mode is set. @@ -143,13 +258,15 @@ def add_event_detect(channel,edge,callback=None,bouncetime=None): [callback] - A callback function for the event (optional) [bouncetime] - Switch bounce timeout in ms for callback """ - logger.info("Event detect added for edge : {} on channel : {} with bounce time : {} and callback {}".format(edge,channel,bouncetime,callback)) + getBoard().setChannelEvent(channel, edge, callback) + logger.info("Event detect added for edge : {} on channel : {} with bouce time : {} and callback {}".format(edge,channel,bouncetime,callback)) def event_detected(channel): """ Returns True if an edge has occurred on a given GPIO. You need to enable edge detection using add_event_detect() first. channel - either board pin number or BCM number depending on which mode is set. """ + board = getBoard() logger.info("Waiting for even detection on channel :{}".format(channel)) def add_event_callback(channel,callback): @@ -158,21 +275,23 @@ def add_event_callback(channel,callback): channel - either board pin number or BCM number depending on which mode is set. callback - a callback function """ - logger.info("Event callback : {} added for channel : {}".format(callback,channel)) + logger.info("Event Calback : {} added for channel : {}".format(callback,channel)) def remove_event_detect(channel): """ Remove edge detection for a particular GPIO channel channel - either board pin number or BCM number depending on which mode is set. """ - logger.info("Event detect removed for channel : {}".format(channel)) + board = getBoard() + logger.info("Event Detect Removed for channel : {}".format(channel)) def gpio_function(channel): """ Return the current GPIO function (IN, OUT, PWM, SERIAL, I2C, SPI) channel - either board pin number or BCM number depending on which mode is set. """ - logger.info("GPIO function of channel : {} is {}".format(channel,channel_config[channel].direction)) + board = getBoard() + logger.info("GPIO function of Channel : {} is {}".format(channel,channel_config[channel].direction)) class PWM: @@ -181,13 +300,14 @@ def __init__(self, channel, frequency): """ x.__init__(...) initializes x; see help(type(x)) for signature """ - self.channel = channel + self.chanel = channel self.frequency = frequency self.dutycycle = 0 global channel_config channel_config[channel] = Channel(channel,PWM,) - logger.info("Initialized PWM for channel : {} at frequency : {}".format(channel,frequency)) - + board = getBoard() + logger.info("Initialized PWM for Channel : {} at frequency : {}".format(channel,frequency)) + # where dc is the duty cycle (0.0 <= dc <= 100.0) def start(self, dutycycle): """ @@ -195,15 +315,17 @@ def start(self, dutycycle): dutycycle - the duty cycle (0.0 to 100.0) """ self.dutycycle = dutycycle - logger.info("Start pwm on channel : {} with duty cycle : {}".format(self.channel,dutycycle)) - + board = getBoard() + logger.info("start pwm on channel : {} with Duty cycle : {}".format(self.chanel,dutycycle)) + # where freq is the new frequency in Hz def ChangeFrequency(self, frequency): """ Change the frequency frequency - frequency in Hz (freq > 1.0) """ - logger.info("Freqency changed for channel : {} from : {} -> to : {}".format(self.channel,self.frequency,frequency)) + board = getBoard() + logger.info("Freqency Changed for channel : {} from : {} -> to : {}".format(self.chanel,self.frequency,frequency)) self.frequency = frequency # where 0.0 <= dc <= 100.0 @@ -212,12 +334,14 @@ def ChangeDutyCycle(self, dutycycle): Change the duty cycle dutycycle - between 0.0 and 100.0 """ + board = getBoard() self.dutycycle = dutycycle - logger.info("Dutycycle changed for channel : {} from : {} -> to : {}".format(self.channel,self.dutycycle,dutycycle)) - + logger.info("Dutycycle Changed for channel : {} from : {} -> to : {}".format(self.chanel,self.dutycycle,dutycycle)) + # stop PWM generation def stop(self): - logger.info("Stop PWM on channel : {} with duty cycle : {}".format(self.channel,self.dutycycle)) + board = getBoard() + logger.info("Stop pwm on channel : {} with Duty cycle : {}".format(self.chanel,self.dutycycle)) def cleanup(channel=None): @@ -225,7 +349,46 @@ def cleanup(channel=None): Clean up by resetting all GPIO channels that have been used by this program to INPUT with no pullup/pulldown and no event detection [channel] - individual channel or list/tuple of channels to clean up. Default - clean every channel that has been used. """ + board = getBoard() if channel is not None: - logger.info("Cleaning up channel : {}".format(channel)) + logger.info("Cleaning Up Channel : {}".format(channel)) + setup_gpio(channel, INPUT, PUD_OFF) + board.gpio_direction[channel] = -1 + board.channelConfigs[channel] = None + board.channelEvents[channel] = None + + found = 1 else: - logger.info("Cleaning up all channels") + logger.info("Cleaning Up all channels") + board.channelConfigs[channel] = {} + board.channelEvents = {} + + i = 0 + while i < 54: + if not board.gpio_direction[i] == -1: + setup_gpio(i, INPUT, PUD_OFF) + board.gpio_direction[i] = -1 + found = 1 + i += 1 + board.cleanUp() + + +def getBoard(): + rpib = PiBoard.Board.getInstance() + if rpib == None: + rpib = PiBoard.Board() + return rpib + + +def get_gpio_number(_channel, _gpio): + ''' + Will only work with BCM + + ''' + return "0" + ":" + str(_channel) + + +def setup_gpio(chanel,direction,pud): + print(chanel) + print(direction) + print(pud)