Skip to content

Commit

Permalink
[Tested] added I2CDevice, SPIDevice seperated interface objects for c…
Browse files Browse the repository at this point in the history
…leaner coding
  • Loading branch information
i2cy committed Jan 8, 2024
1 parent caa577f commit 43223fe
Show file tree
Hide file tree
Showing 10 changed files with 511 additions and 38 deletions.
3 changes: 3 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ This project is the API library of CH347 USB-SPI/I2C bridge chip based on Python
This library provides full access of SPI/I2C settings and communication with CH347 USB-SPI
bridge chip in Python language.

For demonstration and code reference please refer to the `test.py` file in [source page](https://github.com/i2cy/CH347-HIDAPI/blob/master/test.py).
For demonstration and code reference please refer to the `demo.py` file in [source page](https://github.com/i2cy/CH347-HIDAPI/blob/master/demo.py).

[CH347-Chip Official Site](https://www.wch.cn/products/CH347.html)

Expand All @@ -45,6 +45,10 @@ THUS, THIS API MAY NOT FULLY CAPABLE OF EVERY FUNCTION IN OFFICIAL API FROM CH34

## Update Notes

#### 2024-01-08
1. Added independent I2C interface class objects (I2CDevice) and SPI interface class objects (SPIDevice)
2. Added new demo file `demo.py` to demonstrate the usage of classes added above (simplified code)

#### 2023-08-06
1. Now with fully compatible I2C support, I2C clock speed level: 0 -> 20KHz, 1 -> 100KHz, 2 -> 400KHz, 3 -> 750KHz
2. Added test.py for demonstration
161 changes: 125 additions & 36 deletions ch347api/__device.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,91 @@
# Project: CH347PythonLib
# Filename: device
# Created on: 2023/7/31

import time

import hid
import struct
from .__spi import CSConfig, SPIConfig
from .__i2c import convert_i2c_address, convert_int_to_bytes
from typing import Tuple, Any
from functools import wraps

VENDOR_ID: int = 6790
PRODUCT_ID: int = 21980


class CH347HIDDev(hid.device):

def __init__(self, vendor_id, product_id, interface_num):
def __init__(self, vendor_id=VENDOR_ID, product_id=PRODUCT_ID, interface_num=1,
enable_device_lock=True):
"""
Class of CH347 device based on hidapi
:param vendor_id: int
:param product_id: int
:param interface_num: int
:param vendor_id: the vendor ID of the device
:type vendor_id: int
:param product_id: the product ID of the device
:type product_id: int
:param interface_num: the interface number of the device
:type interface_num: int
:param enable_device_lock: whether to enable device in case of multithreading communication
:type enable_device_lock: bool
"""
super(CH347HIDDev, self).__init__()
target = None
for ele in hid.enumerate():
if ele["vendor_id"] == vendor_id and ele["product_id"] == product_id:
if ele['interface_number'] == interface_num:
target = ele['path']

self.open_path(target)

self.CS1_enabled = False
self.CS2_enabled = False
self.cs_activate_delay = 0
self.cs_deactivate_delay = 0
self.i2c_initiated = False
self.spi_initiated = False
self.lock_enabled = enable_device_lock
self.is_busy = False

def __busy_check(self, timeout=5) -> bool:
"""
Check if device is busy
:param timeout: time to wait for other thread to reset the busy flag
:type timeout: int
:return: busy status
:rtype: bool
"""
t0 = time.time()
while self.lock_enabled and self.is_busy and time.time() - t0 < timeout:
time.sleep(0.001)

return self.is_busy

def __device_lock(timeout: int = 5):
"""
device lock decorator
:param timeout: timeout to wait for device to unlock
:type timeout: int
:return:
"""

def aop(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
# check busy flag
self.__busy_check(timeout)
# set busy flag
self.is_busy = True
ret = func(self, *args, **kwargs)
# unset busy flag
self.is_busy = False
return ret

return wrapper

return aop

@__device_lock(2)
def reset(self):
"""
reset device
Expand All @@ -48,23 +98,29 @@ def reset(self):
self.read(64)

# --*-- [ I2C ] --*--
def init_I2C(self, clock_speed_level: int = 1):
@__device_lock(2)
def init_I2C(self, clock_freq_level: int = 1):
"""
initialize I2C configuration
:param clock_speed_level: int, 0-20KHz, 1-100KHz, 2-400KHz, 3-750KHz
:return: None
:param clock_freq_level: 0-20KHz, 1-100KHz, 2-400KHz, 3-750KHz
:type clock_freq_level: int
:return:
"""
self.write(struct.pack("<BHBB", 0x00, 3, 0xaa, 0x60 | clock_speed_level))
self.write(struct.pack("<BHBB", 0x00, 3, 0xaa, 0x60 | clock_freq_level))
self.i2c_initiated = True

@__device_lock(2)
def i2c_write(self, addr: (int, bytes), data: (int, bytes)) -> bool:
"""
write data through I2C bus using 7-bits address
:param addr: Any[int, bytes], 7-bits of device address
:param data: Any[int, bytes], one byte or several bytes of data to send (62 bytes max),
:type addr: :obj:`int`, :obj:`bytes`
:param data: one byte or several bytes of data to send (62 bytes max),
this method will automatically convert it into bytes with byte order 'big' unsigned if data type is int,
e.g. 0x00f1f2 -> b'\xf1\xf2'
:return: bool, operation status
:type data: :obj:`int`, :obj:`bytes`
:return: operation status
:rtype: bool
"""
# convert address
addr = convert_i2c_address(addr, read=False)
Expand All @@ -75,20 +131,25 @@ def i2c_write(self, addr: (int, bytes), data: (int, bytes)) -> bool:
# assemble i2c frame
payload = addr + data
# send data through i2c stream
status, feedback = self.i2c_read_write_raw(payload)
status, feedback = self.__i2c_read_write_raw(payload)

return status

@__device_lock(2)
def i2c_read(self, addr: (int, bytes), read_length: int,
register_addr: (int, bytes) = None) -> Tuple[bool, bytes]:
"""
read byte(s) data from i2c bus with register address using 7-bits of device address
:param addr: Any[int, bytes], 7-bits of device address
:param read_length: int, length
:param addr: 7-bits of device address
:type addr: :obj:`int`, :obj:`bytes`
:param read_length: length of data to read from i2c bus
:type read_length: int
:param register_addr: Optional[int, bytes], one byte or several bytes of address of register,
this method will automatically convert it into bytes with byte order 'big' unsigned if data type is int,
e.g. 0x00f1f2 -> b'\xf1\xf2'
:type register_addr: :obj:`int`, :obj:`bytes` (optional)
:return: Tuple[operation_status bool, feedback bytes]
:rtype: (bool, bytes)
"""
# convert address
if register_addr is None:
Expand All @@ -105,16 +166,19 @@ def i2c_read(self, addr: (int, bytes), read_length: int,
payload = addr + register_addr

# send and receive data from i2c bus
status, feedback = self.i2c_read_write_raw(payload, read_len=read_length)
status, feedback = self.__i2c_read_write_raw(payload, read_len=read_length)

return status, feedback

def i2c_read_write_raw(self, data: bytes, read_len: int = 0) -> Tuple[bool, bytes]:
def __i2c_read_write_raw(self, data: bytes, read_len: int = 0) -> Tuple[bool, bytes]:
"""
read and write i2c bus through I2CStream
:param read_len: int, length of data to read (max 63B)
:param data: bytes
:param read_len: length of data to read (max 63B)
:type read_len: int
:param data: data to write
:type data: bytes
:return: tuple(<bool status>, <bytes feedback>)
:rtype: (:obj:`bool`, :obj:`bytes`)
"""
if not self.i2c_initiated:
raise Exception('I2C device initialization required')
Expand Down Expand Up @@ -150,39 +214,51 @@ def i2c_read_write_raw(self, data: bytes, read_len: int = 0) -> Tuple[bool, byte
return sum(ack_signals) == len(ack_signals), payload

# --*-- [ SPI ] --*--
def init_SPI(self, clock_speed_level: int = 1, is_MSB: bool = True,
mode: int = 3, write_read_interval: int = 0,
@__device_lock(2)
def init_SPI(self, clock_freq_level: int = 1, is_MSB: bool = True,
mode: int = 0, write_read_interval: int = 0,
CS1_high: bool = False,
CS2_high: bool = False):
"""
initialize SPI configuration
:param clock_speed_level: int, 0-7
:param is_MSB: bool
:param clock_freq_level: clock freq, 0=60M, 1=30M, 2=15M, 3=7.5M, 4=3.75M, 5=1.875M, 6=937.5K,7=468.75K
:type clock_freq_level: int
:param is_MSB: enable MSB mode
:type is_MSB: bool
:param mode: int, 0-3
:param write_read_interval: int, 0-65535
:param CS1_high: bool
:param CS2_high: bool
:type mode: int
:param write_read_interval: 0-65535
:type write_read_interval: int
:param CS1_high: set SPI CS1 port polarity, True=Active-High, False=Active-Low
:type CS1_high: bool
:param CS2_high: set SPI CS1 port polarity, True=Active-High, False=Active-Low
:type CS2_high: bool
:return:
"""
self.CS1_enabled = False
self.CS2_enabled = False
conf = SPIConfig()
conf.set_mode(mode)
conf.set_clockSpeed(clock_speed_level)
conf.set_clockSpeed(clock_freq_level)
conf.set_MSB(is_MSB)
conf.set_writeReadInterval(write_read_interval)
conf.set_CS1Polar(CS1_high)
conf.set_CS2Polar(CS2_high)
self.write(conf)
self.read(64)
self.spi_initiated = True

@__device_lock(2)
def set_CS1(self, enable: bool = True, active_delay_us: int = -1,
deactivate_delay_us: int = -1):
"""
set CS1 enable/disable with delay settings
:param enable: bool
:param active_delay_us: int
:param deactivate_delay_us: int
:param enable: enable/disable CS1
:type enable: bool
:param active_delay_us: delay for CS1 (in μs) to activate
:type active_delay_us: int
:param deactivate_delay_us: delay for CS1 (in ms) to deactivate
:type deactivate_delay_us: int
:return:
"""
if enable and self.CS2_enabled:
Expand All @@ -203,13 +279,17 @@ def set_CS1(self, enable: bool = True, active_delay_us: int = -1,
self.CS1_enabled = False
self.CS2_enabled = False

@__device_lock(2)
def set_CS2(self, enable: bool = True, active_delay_us: int = -1,
deactivate_delay_us: int = -1):
"""
set CS2 enable/disable with delay settings
:param enable: bool
:param active_delay_us: int
:param deactivate_delay_us: int
:param enable: enable/disable CS2
:type enable: bool
:param active_delay_us: delay for CS2 (in μs) to activate
:type active_delay_us: int
:param deactivate_delay_us: delay for CS2 (in μs) to deactivate
:type deactivate_delay_us: int
:return:
"""
if enable and self.CS1_enabled:
Expand All @@ -230,11 +310,14 @@ def set_CS2(self, enable: bool = True, active_delay_us: int = -1,
self.CS1_enabled = False
self.CS2_enabled = False

@__device_lock(2)
def spi_write(self, data: bytes) -> int:
"""
write data to SPI devices
:param data: bytes, max length up to 32768 bytes
:param data: max length up to 32768 Bytes
:type data: bytes
:return: int, length of sent data
:rtype: int
"""
if not (self.CS1_enabled or self.CS2_enabled):
raise Exception("no CS enabled yet")
Expand All @@ -250,11 +333,14 @@ def spi_write(self, data: bytes) -> int:

return length

@__device_lock(2)
def spi_read_write(self, data: bytes) -> list:
"""
write data to SPI devices
:param data: bytes, max length up to 32768 bytes
:return: int, length of sent data
:type data: bytes
:return: list of bytes received from SPI device
:rtype: list
"""
if not (self.CS1_enabled or self.CS2_enabled):
raise Exception("no CS enabled yet")
Expand Down Expand Up @@ -284,11 +370,14 @@ def spi_read_write(self, data: bytes) -> list:

return ret

@__device_lock(2)
def spi_read(self, length) -> list:
"""
read data from SPI device with given length
:param length: int
:return:
:param length: length of data to read
:type length: int
:return: list of data received from SPI device
:rtype: list
"""
if not (self.CS1_enabled or self.CS2_enabled):
raise Exception("no CS enabled yet")
Expand Down
7 changes: 7 additions & 0 deletions ch347api/__i2c.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
import struct


class I2CClockFreq:
f_20K = 0
f_100K = 1
f_400K = 2
f_750K = 3


def convert_i2c_address(addr: (int, bytes), read: bool = False) -> bytes:
"""
static method for address conversion
Expand Down
5 changes: 4 additions & 1 deletion ch347api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@
# Created on: 2022/11/11

from .__device import CH347HIDDev, VENDOR_ID, PRODUCT_ID

from .i2c import I2CDevice
from .spi import SPIDevice
from .__spi import SPIClockFreq
from .__i2c import I2CClockFreq
11 changes: 11 additions & 0 deletions ch347api/__spi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@
# Created on: 2022/11/11


class SPIClockFreq:
f_60M = 0
f_30M = 1
f_15M = 2
f_7M5 = 3
f_3M75 = 4
f_1M875 = 5
f_937K5 = 6
f_468K75 = 7


class SPIConfig(list):

def __init__(self):
Expand Down
Loading

0 comments on commit 43223fe

Please sign in to comment.