-
Notifications
You must be signed in to change notification settings - Fork 75
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add driver for Atx Led Dali Hat (AL-DALI-HAT). (#135)
* Add driver for Atx Led Dali Hat (AL-DALI-HAT). This uses the Raspberry PI on-board serial port to communicate at 19200 baud to the Dali Hat, the hardware on the hat adapts the UART serial data stream into DALI encoding * Remove path editing on import and __main__ from atxled.py and use read_until in read * Remove __main__ since it serves no real purpose
- Loading branch information
1 parent
0ebccd7
commit 15bcee3
Showing
3 changed files
with
270 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
from dali.command import Command | ||
from dali.driver.base import SyncDALIDriver, DALIDriver | ||
from dali.frame import BackwardFrame | ||
import logging | ||
import sys | ||
|
||
import serial | ||
import threading | ||
import time | ||
|
||
DALI_PACKET_SIZE = {"j": 8, "h": 16, "l": 24, "m": 25} | ||
DALI_PACKET_PREFIX = {v: k for k, v in DALI_PACKET_SIZE.items()} | ||
|
||
|
||
class DaliHatSerialDriver(DALIDriver): | ||
"""Driver for communicating with DALI devices over a serial connection.""" | ||
|
||
def __init__(self, port="/dev/ttyS0", LOG=None): | ||
"""Initialize the serial connection to the DALI interface.""" | ||
self.port = port | ||
self.lock = threading.RLock() | ||
self.buffer = [] | ||
if not LOG: | ||
self.LOG = logging.getLogger("AtxLedDaliDriver") | ||
handler = logging.StreamHandler(sys.stdout) | ||
self.LOG.addHandler(handler) | ||
else: | ||
self.LOG = LOG | ||
try: | ||
self.conn = serial.Serial( | ||
port=self.port, | ||
baudrate=19200, | ||
parity=serial.PARITY_NONE, | ||
stopbits=serial.STOPBITS_ONE, | ||
bytesize=serial.EIGHTBITS, | ||
timeout=0.2, | ||
) | ||
except Exception as e: | ||
self.LOG.exception("Could not open serial connection: %s", e) | ||
self.conn = None | ||
|
||
def read_line(self): | ||
"""Read the next line from the buffer, refilling the buffer if necessary.""" | ||
with self.lock: | ||
while not self.buffer: | ||
line = self.conn.read_until(b"\n").decode("ascii") | ||
if not line: | ||
return "" | ||
self.buffer.append(line) | ||
return self.buffer.pop(0) | ||
|
||
def construct(self, command): | ||
"""Construct a DALI command to be sent over the serial connection.""" | ||
assert isinstance(command, Command) | ||
f = command.frame | ||
packet_size = len(f) | ||
prefix = DALI_PACKET_PREFIX[packet_size] | ||
if command.sendtwice and packet_size == 16: | ||
prefix = "t" | ||
data = "".join(["{:02X}".format(byte) for byte in f.pack]) | ||
command_str = (f"{prefix}{data}\n").encode("ascii") | ||
return command_str | ||
|
||
def extract(self, data): | ||
"""Parse the response from the serial device and return the corresponding frame.""" | ||
if data.startswith("J"): | ||
try: | ||
data = int(data[1:], 16) | ||
return BackwardFrame(data) | ||
except ValueError as e: | ||
self.LOG.error(f"Failed to parse response '{data}': {e}") | ||
return None | ||
|
||
def close(self): | ||
"""Close the serial connection.""" | ||
if self.conn: | ||
self.conn.close() | ||
|
||
|
||
class SyncDaliHatDriver(DaliHatSerialDriver, SyncDALIDriver): | ||
"""Synchronous DALI driver.""" | ||
|
||
def send(self, command: Command): | ||
"""Send a command to the DALI interface and wait for a response.""" | ||
with self.lock: | ||
lines = [] | ||
last_resp = None | ||
send_twice = command.sendtwice | ||
cmd = self.construct(command) | ||
self.LOG.debug("command string sent: %r", cmd) | ||
self.conn.write(cmd) | ||
REPS = 5 | ||
i = 0 | ||
already_resent = False | ||
resent_times = 0 | ||
resp = None | ||
while i < REPS: | ||
i += 1 | ||
resp = self.read_line() | ||
self.LOG.debug("raw response received: %r", resp) | ||
resend = False | ||
if cmd[:3] not in ["hB1", "hB3", "hB5"]: | ||
if resp and resp[0] in {"N", "J"}: | ||
if send_twice: | ||
if last_resp: | ||
if last_resp == resp: | ||
resp = self.extract(resp) | ||
break | ||
resend = True | ||
last_resp = None | ||
else: | ||
last_resp = resp | ||
else: | ||
resp = self.extract(resp) | ||
break | ||
elif resp and resp[0] in {"X", "Z", ""}: | ||
time.sleep(0.1) | ||
collision_bytes = None | ||
while collision_bytes != "": | ||
collision_bytes = self.read_line() | ||
if resp[0] == "X": | ||
break | ||
self.LOG.info( | ||
"got conflict (%s) sending %r, sending again", resp, cmd | ||
) | ||
last_resp = None | ||
resend = True | ||
elif resp: | ||
lines.append(resp) | ||
|
||
resp = None | ||
if resend and not already_resent: | ||
self.conn.write((cmd).encode("ascii")) | ||
REPS += 1 + send_twice | ||
already_resent = True | ||
else: | ||
if resp and resp[0] == "N": | ||
resp = self.extract(resp) | ||
break | ||
elif resp and resp[0] in {"X", "Z", ""}: | ||
time.sleep(0.1) | ||
collision_bytes = None | ||
while collision_bytes != "": | ||
collision_bytes = self.read_line() | ||
elif resp: | ||
last_resp = None | ||
resend = True | ||
if resend and resent_times < 5: | ||
self.conn.write(cmd.encode("ascii")) | ||
REPS += 1 + send_twice | ||
resent_times += 1 | ||
if command.is_query: | ||
return command.response(resp) | ||
return resp |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
#!/usr/bin/env python3 | ||
|
||
from dali.gear.general import ( | ||
DAPC, | ||
QueryControlGearPresent, | ||
QueryGroupsZeroToSeven, | ||
QueryGroupsEightToFifteen, | ||
QueryActualLevel, | ||
Off, | ||
QueryMinLevel, | ||
QueryMaxLevel, | ||
QueryPhysicalMinimum, | ||
) | ||
from dali.driver.base import SyncDALIDriver | ||
from dali.driver.atxled import SyncDaliHatDriver | ||
from dali.address import GearShort | ||
|
||
import logging | ||
|
||
|
||
LOG = logging.getLogger("DaliHatTest") | ||
|
||
|
||
class DaliHatTest: | ||
def __init__(self, driver: SyncDALIDriver): | ||
self.driver = driver | ||
|
||
def scan_devices(self): | ||
present_devices = [] | ||
for address in range(0, 64): | ||
try: | ||
response = self.driver.send(QueryControlGearPresent(GearShort(address))) | ||
if response.value is True: | ||
present_devices.append(address) | ||
LOG.info(f"Device found at address: {address}") | ||
else: | ||
LOG.info(f"Response from address {address}: {response.value}") | ||
|
||
except Exception as e: | ||
LOG.info(f"Error while querying address {address}: {e}") | ||
|
||
return present_devices | ||
|
||
def set_device_level(self, address, level, fade_time=0): | ||
try: | ||
self.driver.send(DAPC(GearShort(address), level)) | ||
LOG.info( | ||
f"Set device at address {address} to level {level} with fade time {fade_time}" | ||
) | ||
except Exception as e: | ||
LOG.info(f"Error while setting level for address {address}: {e}") | ||
|
||
def query_device_info(self, address): | ||
current_command = None | ||
try: | ||
current_command = "QueryGroupsZeroToSeven" | ||
groups_0_7 = self.driver.send( | ||
QueryGroupsZeroToSeven(GearShort(address)) | ||
).value | ||
LOG.info(f"Device {address} groups 0-7: {groups_0_7}") | ||
|
||
current_command = "QueryGroupsEightToFifteen" | ||
groups_8_15 = self.driver.send( | ||
QueryGroupsEightToFifteen(GearShort(address)) | ||
).value | ||
LOG.info(f"Device {address} groups 8-15: {groups_8_15}") | ||
|
||
current_command = "QueryMinLevel" | ||
min_level = self.driver.send(QueryMinLevel(GearShort(address))).value | ||
LOG.info(f"Device {address} minimum level: {min_level}") | ||
|
||
current_command = "QueryMaxLevel" | ||
max_level = self.driver.send(QueryMaxLevel(GearShort(address))).value | ||
LOG.info(f"Device {address} maximum level: {max_level}") | ||
|
||
current_command = "QueryPhysicalMinimum" | ||
physical_minimum = self.driver.send( | ||
QueryPhysicalMinimum(GearShort(address)) | ||
).value | ||
LOG.info(f"Device {address} physical minimum: {physical_minimum}") | ||
|
||
current_command = "QueryActualLevel" | ||
actual_level = self.driver.send(QueryActualLevel(GearShort(address))).value | ||
LOG.info(f"Device {address} actual level: {actual_level}") | ||
|
||
except Exception as e: | ||
LOG.info( | ||
f"Error while querying device {address} with command '{current_command}': {e}" | ||
) | ||
|
||
def turn_off_device(self, address): | ||
try: | ||
self.driver.send(Off(GearShort(address))) | ||
LOG.info(f"Turned off device at address {address}") | ||
except Exception as e: | ||
LOG.info(f"Error while turning off device {address}: {e}") | ||
|
||
|
||
if __name__ == "__main__": | ||
logging.basicConfig(level=logging.INFO) | ||
dali_driver = SyncDaliHatDriver() | ||
|
||
dali_test = DaliHatTest(dali_driver) | ||
found_devices = [] | ||
|
||
found_devices = dali_test.scan_devices() | ||
LOG.info(f"Scanned and found {len(found_devices)} devices.") | ||
|
||
for device in found_devices: | ||
dali_test.query_device_info(device) | ||
dali_test.set_device_level(device, 128) | ||
dali_test.turn_off_device(device) | ||
|
||
dali_driver.close() |