Skip to content

Commit

Permalink
[TESTED] now with UART support
Browse files Browse the repository at this point in the history
  • Loading branch information
i2cy committed Jan 12, 2024
1 parent 156fb25 commit e4bb12e
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .idea/CH347PythonLib.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ _+ [CH347-HIDAPI Github Page](https://github.com/i2cy/ch347-hidapi) +_
</p>

## Abstract
This project is the API library of CH347 USB-SPI/I2C bridge chip based on Python.
This project is the API library of CH347 USB-SPI/I2C/UART bridge chip based on Python.

`Standard USB-HID mode setting of CH347 chip supported only`

This library provides full access of SPI/I2C settings and communication with CH347 USB-SPI
This library provides full access of SPI/I2C/UART settings and communication with CH347 USB-SPI/I2C/UART
bridge chip in Python language.

__For demonstration and code reference please refer to the `demo.py` file in [source page](https://github.com/i2cy/CH347-HIDAPI/blob/master/demo.py).__
Expand All @@ -45,6 +45,11 @@ THUS, THIS API MAY NOT FULLY CAPABLE OF EVERY FUNCTION IN OFFICIAL API FROM CH34

## Update Notes

#### 2024-01-12
1. Now with fully compatible UART (UART1 with pins TXD1/RXD1/RTS1/CTS1/DTR1) support under mode 3 (which is HID mode),
2. Baudrate supports ranging from 1.2K to 9M
3. Multithread receiver for UART (optional, default is on) to receive the data in parallel

#### 2024-01-08
1. Added independent I2C interface class objects (I2CDevice) and SPI interface class objects (SPIDevice)
2. Added new demo file `demo.py` to demonstrate the usage of classes added above (simplified code)
Expand Down
106 changes: 102 additions & 4 deletions ch347api/__device.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,129 @@
from .__i2c import convert_i2c_address, convert_int_to_bytes
from typing import Tuple, Any
from functools import wraps
import warnings

VENDOR_ID: int = 6790
PRODUCT_ID: int = 21980


class CH347HIDUART1(hid.device):

def __init__(self, vendor_id=VENDOR_ID, product_id=PRODUCT_ID):
"""
Class of CH347 UART1 interface based on hidapi
:param vendor_id: the vender ID of the device
:type vendor_id: int
:param product_id: the product ID of the device
:type product_id: int
"""
super(CH347HIDUART1, self).__init__()
target = None
for ele in hid.enumerate():
if ele["vendor_id"] == vendor_id and ele["product_id"] == product_id:
if ele['interface_number'] == 0: # UART interface ID: 0
target = ele['path']

self.open_path(target)

def init_UART(self, baudrate: int = 115200, stop_bits: int = 1, verify_bits: int = 0, timeout: int = 32) -> bool:
"""
Initialize the configuration of the UART interface
:param baudrate: the baudrate of the device, default is 115200
:type baudrate: int
:param stop_bits: number of stop bits, default is 1
:type stop_bits: int
:param verify_bits: number of verify bits, default is 0
:type verify_bits: int
:param timeout: timeout in milliseconds, default is 32, this value should not exceed the maximum of 255
:type timeout: int
:return: operation status
:rtype: bool
"""
header = b"\x00\xcb\x08\x00"
stop_bits = stop_bits * 2 - 2

if baudrate < 1200 or baudrate > 7_500_000:
raise ValueError("Invalid baudrate, correct value should ranging from 1200 to 7500000")

if timeout > 255:
raise Exception("timeout should not exceed the maximum number of 255")

payload = header + struct.pack("<IBBBB", baudrate, stop_bits, verify_bits, 0x08, timeout)
response = bool(self.send_feature_report(payload))

# response = self.get_feature_report(0, 16)
return response

def write_raw(self, data: bytes) -> int:
"""
Write data to the device
:param data: data to write
:type data: bytes
:return: wrote length
:rtype: int
"""
offset = 0
while len(data) - offset > 510:
payload = struct.pack("<BH", 0x00, 510) + data[offset:offset + 510]
self.write(payload)
offset += 510
payload = struct.pack("<BH", 0x00, len(data) - offset) + data[offset:len(data)]
self.write(payload)
offset += len(data) - offset

return offset

def read_raw(self, length: int = -1) -> list:
"""
Read data from the device if any byte is available
:param length: maximum length of the data, default is -1 which means read all bytes that received
:type length: int
:return: list of bytes read
:rtype: list
"""
self.set_nonblocking(1)

ret = []
while len(ret) < length or length < 0:
chunk = self.read(512)
if chunk:
chunk_len = struct.unpack("<H", bytes(chunk[0:2]))[0]
ret.extend(chunk[2:chunk_len+2])
else:
break

self.set_nonblocking(0)

return ret


class CH347HIDDev(hid.device):

def __init__(self, vendor_id=VENDOR_ID, product_id=PRODUCT_ID, interface_num=1,
def __init__(self, vendor_id=VENDOR_ID, product_id=PRODUCT_ID, interface_num=None,
enable_device_lock=True):
"""
Class of CH347 device based on hidapi
Class of CH347 SPI/I2C/GPIO interface based on hidapi
:param vendor_id: the vendor ID of the device
:type vendor_id: int
:param product_id: the product ID of the device
:type product_id: int
:param interface_num: the interface number of the device
:param interface_num: the interface number of the device (deprecated)
:type interface_num: int
:param enable_device_lock: whether to enable device in case of multithreading communication
:type enable_device_lock: bool
"""

if interface_num is not None:
warnings.warn("interface_num is deprecated and will be removed in future releases."
" From now on interface_num will be set automatically to 1",
DeprecationWarning)

super(CH347HIDDev, self).__init__()
target = None
for ele in hid.enumerate():
if ele["vendor_id"] == vendor_id and ele["product_id"] == product_id:
if ele['interface_number'] == interface_num:
if ele['interface_number'] == 1: # SPI/I2C/GPIO interface ID: 1
target = ele['path']

self.open_path(target)
Expand Down
1 change: 1 addition & 0 deletions ch347api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
from .__device import CH347HIDDev, VENDOR_ID, PRODUCT_ID
from .i2c import I2CDevice
from .spi import SPIDevice
from .uart import UARTDevice
from .__spi import SPIClockFreq
from .__i2c import I2CClockFreq
125 changes: 125 additions & 0 deletions ch347api/uart.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,128 @@
# Project: CH347-HIDAPI
# Filename: uart
# Created on: 2024/1/8

from .__device import CH347HIDUART1
import threading
import time


class UARTDevice:

def __init__(self, baudrate: int = 115200, stop_bits: int = 1, verify_bits: int = 0, timeout: int = 32,
multithreading: bool = True):
"""
Initialize the configuration of the UART interface
:param baudrate: the baudrate of the device, default is 115200, (Min: 1200, Max: 9M)
:type baudrate: int
:param stop_bits: number of stop bits, default is 1
:type stop_bits: int
:param verify_bits: number of verify bits, default is 0
:type verify_bits: int
:param timeout: timeout in milliseconds, default is 32, this value should not exceed the maximum of 255
:type timeout: int
:param multithreading: whether to use multithreading to receive the data in parallel, default is True
:type multithreading: bool
"""
self.dev = CH347HIDUART1()
self.__multithreading = False
status = self.dev.init_UART(baudrate=baudrate, stop_bits=stop_bits, verify_bits=verify_bits, timeout=timeout)
if not status:
raise Exception("failed to initialize UART-1 interface on CH347")

self.__multithreading = multithreading

self.__received_data = []
self.__live = True
self.__thread = threading.Thread(target=self.__receiver_thread)
self.__data_lock = threading.Lock()

if self.__multithreading:
# enable multithreading receiver
self.__thread.start()

def __receiver_thread(self):
"""
Receive data from the UART interface and put it in a queue to store the received data
:return:
"""
while self.__live:
data = self.dev.read_raw()
if data:
self.__data_lock.acquire()
# print("received data:", bytes(data).hex())
self.__received_data.extend(data)
self.__data_lock.release()
else:
time.sleep(0.01)

def __del__(self) -> None:
if self.__multithreading:
self.__live = False
try:
self.__data_lock.release()
self.__thread.join()
except RuntimeError:
pass

def kill(self):
"""
Kill receiver thread if multithreading is on
:return:
"""
if self.__multithreading:
self.__live = False
try:
self.__data_lock.release()
self.__thread.join()
except RuntimeError:
pass

def write(self, data: bytes) -> int:
"""
Write data to the device
:param data: data to write
:type data: bytes
:return: wrote length
:rtype: int
"""
return self.dev.write_raw(data)

def read(self, length: int = -1, timeout: int = 5) -> list:
"""
Read data from the device if any byte is available
:param length: maximum length of the data, default is -1 which means read all bytes that received
:type length: int
:param timeout: timeout in seconds, default is 5, set to 0 means return data from buffer immediately instead of
waiting for the buffer to collect enough data (available only for multithreading)
:type timeout: int
:return: list of bytes read
:rtype: list
"""
if self.__multithreading:
# wait for data
t0 = time.time()

if length < 0:
while time.time() - t0 < timeout and len(self.__received_data) == 0:
time.sleep(0.002)
else:
while time.time() - t0 < timeout and length > len(self.__received_data):
time.sleep(0.002)

self.__data_lock.acquire()

if len(self.__received_data) > length > 0:
ret = self.__received_data[:length]
self.__received_data = self.__received_data[length:]

else:
ret = self.__received_data
self.__received_data = []

self.__data_lock.release()

else:
ret = self.dev.read_raw(length)

return ret
50 changes: 44 additions & 6 deletions demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import random
import time
from ch347api import CH347HIDDev, I2CDevice, SPIDevice, SPIClockFreq, I2CClockFreq
from ch347api import CH347HIDDev, I2CDevice, SPIDevice, UARTDevice, SPIClockFreq, I2CClockFreq


def i2c_demo():
Expand Down Expand Up @@ -61,7 +61,7 @@ def spi_demo():
# i2c = I2CDevice(addr=0x68, ch347_device=dev)

# write test (activate CS -> write data -> deactivate CS)
print("performing SPI write test")
print("[SPI] performing SPI write test")
spi.write_CS1(b"hello world")
spi.write_CS2(b"this is ch347")
spi.write_CS1([0, 1, 2, 3])
Expand All @@ -72,16 +72,54 @@ def spi_demo():
spi.write_CS1(b"this is ch347")

# read test (activate CS -> read data -> deactivate CS)
print("performing SPI read test")
print("received 16 bytes from SPI bus on CS1:", bytes(spi.read_CS1(16)))
print("[SPI] performing SPI read test")
print("[SPI] received 16 bytes from SPI bus on CS1:", bytes(spi.read_CS1(16)))

# write&read test (activate CS -> read data -> deactivate CS)
random_bytes = random.randbytes(512)
print("write read test result (with MOSI, MISO short connected): {}".format(
random_bytes = b"\xa5\x5a\x5a\xa5" * 128
print("[SPI] write read test result (with MOSI, MISO short connected): {}".format(
bytes(spi.writeRead_CS1(random_bytes)) == random_bytes
))


def uart_demo():
# while performing this test please make sure TX and RX pin short connected

# initialize an uart communication object
# -*- Way 1 -*-
uart = UARTDevice(baudrate=7_500_000)

# -*- Way 2 -*- (with no multithreading receiver)
# uart = UARTDevice(baudrate=115200, stop_bits=1, verify_bits=0, timeout=128, multithreading=False)

# uart write test
test_b1 = b"hello world, this is ch347. "
test_b2 = b"using CH347-HIDAPI"
test_b3 = b"\xa5\x5a\x5a\xa5\x00\x01\x02\x03\xfc\xfd\xfe\xff\xa5\x5a\x5a\xa5"
wrote = uart.write(test_b1)
print("[UART] wrote {} bytes with content \"{}\"".format(wrote, test_b1.decode("utf-8")))
wrote = uart.write(test_b2)
print("[UART] wrote {} bytes with content \"{}\"".format(wrote, test_b2.decode("utf-8")))

# uart read test
read = uart.read(len(test_b1 + test_b2))
print("[UART] read {} bytes of data test result: {}".format(len(read), bytes(read) == test_b1 + test_b2))
print("[UART] received: {}".format(bytes(read)))

# uart accuracy test
print("[UART] continuous sending and receiving test with 4MB size in progress..")
payload = test_b3 * 64 * 1024 * 4
t0 = time.time()
uart.write(payload)
read = uart.read(len(payload), timeout=15)
print("[UART] 4MB payload received, time spent: {:.2f} ms, accuracy test result: {}".format(
(time.time() - t0) * 1000, bytes(read) == payload))

# [VITAL] kill sub-thread(receiver thread) for safe exit
uart.kill()


if __name__ == "__main__":
i2c_demo()
spi_demo()
uart_demo()
Loading

0 comments on commit e4bb12e

Please sign in to comment.