diff --git a/README.md b/README.md
index 9934ea1..32b40d8 100644
--- a/README.md
+++ b/README.md
@@ -4,7 +4,7 @@
# CH347-HIDAPI Python Library
-_? [CH347-HIDAPI Github Page](https://github.com/i2cy/ch347-hidapi) ?_
+_+ [CH347-HIDAPI Github Page](https://github.com/i2cy/ch347-hidapi) +_
@@ -19,12 +19,15 @@ _? [CH347-HIDAPI Github Page](https://github.com/i2cy/ch347-hidapi) ?_
## Abstract
-This project is the API library of CH347 USB-SPI bridge chip based on Python.
-Standard USB-HID mode setting of CH347 chip supported only.
+This project is the API library of CH347 USB-SPI/I2C bridge chip based on Python.
-This library provides full access of SPI settings and communication with CH347 USB-SPI
+`Standard USB-HID mode setting of CH347 chip supported only`
+
+This library provides full access of SPI/I2C settings and communication with CH347 USB-SPI
bridge chip in Python language.
+For demonstration and code reference please refer to the `test.py` file in [source page](https://github.com/i2cy/CH347-HIDAPI/blob/master/test.py).
+
[CH347-Chip Official Site](https://www.wch.cn/products/CH347.html)
## Installation
@@ -40,4 +43,8 @@ demonstration APP. In other words that it was inferred from captured HID package
THUS, THIS API MAY NOT FULLY CAPABLE OF EVERY FUNCTION IN OFFICIAL API FROM CH347DLL.DLL.
-And I2C API is not ready. I2C bus example from demo app is frustrating and I need time to figure it out.
\ No newline at end of file
+## Update Notes
+
+#### 2023-08-06
+ 1. Now with fully compatible I2C support, I2C clock speed level: 0 -> 20KHz, 1 -> 100KHz, 2 -> 400KHz, 3 -> 750KHz
+ 2. Added test.py for demonstration
diff --git a/ch347api/__device.py b/ch347api/__device.py
index 0d50017..54018b8 100644
--- a/ch347api/__device.py
+++ b/ch347api/__device.py
@@ -9,6 +9,8 @@
import hid
import struct
from .__spi import CSConfig, SPIConfig
+from .__i2c import convert_i2c_address, convert_int_to_bytes
+from typing import Tuple, Any
VENDOR_ID: int = 6790
PRODUCT_ID: int = 21980
@@ -35,6 +37,7 @@ def __init__(self, vendor_id, product_id, interface_num):
self.CS2_enabled = False
self.cs_activate_delay = 0
self.cs_deactivate_delay = 0
+ self.i2c_initiated = False
def reset(self):
"""
@@ -52,20 +55,80 @@ def init_I2C(self, clock_speed_level: int = 1):
:return: None
"""
self.write(struct.pack(" tuple:
+ def i2c_write(self, addr: (int, bytes), data: (int, bytes)) -> bool:
+ """
+ write data through I2C bus using 7-bits address
+ :param addr: Any[int, bytes], 7-bits of device address
+ :param data: Any[int, bytes], one byte or several bytes of data to send (62 bytes max),
+ this method will automatically convert it into bytes with byte order 'big' unsigned if data type is int,
+ e.g. 0x00f1f2 -> b'\xf1\xf2'
+ :return: bool, operation status
+ """
+ # convert address
+ addr = convert_i2c_address(addr, read=False)
+
+ # convert data
+ data = convert_int_to_bytes(data)
+
+ # assemble i2c frame
+ payload = addr + data
+ # send data through i2c stream
+ status, feedback = self.i2c_read_write_raw(payload)
+
+ return status
+
+ def i2c_read(self, addr: (int, bytes), read_length: int,
+ register_addr: (int, bytes) = None) -> Tuple[bool, bytes]:
+ """
+ read byte(s) data from i2c bus with register address using 7-bits of device address
+ :param addr: Any[int, bytes], 7-bits of device address
+ :param read_length: int, length
+ :param register_addr: Optional[int, bytes], one byte or several bytes of address of register,
+ this method will automatically convert it into bytes with byte order 'big' unsigned if data type is int,
+ e.g. 0x00f1f2 -> b'\xf1\xf2'
+ :return: Tuple[operation_status bool, feedback bytes]
+ """
+ # convert address
+ if register_addr is None:
+ register_addr = b""
+ # convert address with reading signal
+ addr = convert_i2c_address(addr, read=True)
+
+ else:
+ register_addr = convert_int_to_bytes(register_addr)
+ # convert address with writing signal
+ addr = convert_i2c_address(addr, read=False)
+
+ # assemble payload
+ payload = addr + register_addr
+
+ # send and receive data from i2c bus
+ status, feedback = self.i2c_read_write_raw(payload, read_len=read_length)
+
+ return status, feedback
+
+ def i2c_read_write_raw(self, data: bytes, read_len: int = 0) -> Tuple[bool, bytes]:
"""
read and write i2c bus through I2CStream
:param read_len: int, length of data to read (max 63B)
:param data: bytes
:return: tuple(, )
"""
+ if not self.i2c_initiated:
+ raise Exception('I2C device initialization required')
+
if read_len == 0:
tail = b"\x75"
elif read_len == 1:
- tail = struct.pack(" 1:
+ tail = b"\x74\x81\xd1" + tail
elif read_len < 64:
- tail = struct.pack(" 1:
+ tail = b"\x74\x81\xd1" + tail
else:
raise Exception("read length exceeded max size of 63 Bytes")
payload = struct.pack(" tuple:
self.write(payload)
feedback = bytes(self.read(512))
- payload_length, rb1 = struct.unpack(" int:
raw = struct.pack(" bytes:
+ """
+ static method for address conversion
+ :param addr: Any[int, bytes], 7-bits of device address
+ :param read: bool, False -> write, True -> read
+ :return: bytes
+ """
+ if isinstance(addr, int):
+ addr = addr << 1
+ else:
+ addr = addr[0] << 1
+
+ if read:
+ addr += 1
+
+ return struct.pack('B', addr)
+
+
+def convert_int_to_bytes(inputs: (int, bytes)) -> bytes:
+ """
+ this method will automatically convert it into bytes with byte order 'big' unsigned if data type is int,
+ e.g. 0x00f1f2 -> b'\xf1\xf2'
+ :param inputs: Any[int, bytes]
+ :return: bytes
+ """
+ if isinstance(inputs, int):
+ b_len = 0
+ data_copy = inputs
+ while data_copy:
+ b_len += 1
+ data_copy = data_copy // 256
+ inputs = inputs.to_bytes(b_len, 'big', signed=False)
+
+ return inputs
diff --git a/ch347api/__init__.py b/ch347api/__init__.py
index 4e0ac12..90507dc 100644
--- a/ch347api/__init__.py
+++ b/ch347api/__init__.py
@@ -5,6 +5,5 @@
# Filename: __init__
# Created on: 2022/11/11
-from .__spi import *
from .__device import CH347HIDDev, VENDOR_ID, PRODUCT_ID
diff --git a/setup.py b/setup.py
index 355df4c..9874a11 100644
--- a/setup.py
+++ b/setup.py
@@ -12,10 +12,10 @@
setuptools.setup(
name="ch347api",
- version="0.0.4",
+ version="0.1.0",
author="I2cy Cloud",
author_email="i2cy@outlook.com",
- description="A Python Library provides full access of SPI settings and communication"
+ description="A Python Library provides full access of SPI/I2C settings and communication"
" with CH347 USB-SPI bridge chip in Python language.",
long_description=long_description,
long_description_content_type="text/markdown",
diff --git a/test.py b/test.py
index a367fc8..77197ae 100644
--- a/test.py
+++ b/test.py
@@ -34,6 +34,32 @@ def generate_random_data(length=50):
print("Product: %s" % test_dev.get_product_string())
print("Serial No: %s" % test_dev.get_serial_number_string())
+ # -*- [ i2c test ] -*-
+
+ # initialize I2C with speed 400KHz
+ test_dev.init_I2C(2)
+
+ input("(press ENTER to perform I2C test)")
+
+ # write 0x75 to device with address 0x68
+ print("I2C test address 0x68")
+ status = test_dev.i2c_write(0x68, 0x75)
+ print("I2C write 0x75 test: {}".format(status))
+
+ # read 2 bytes from device with address 0x68
+ status, feedback = test_dev.i2c_read(0x68, 2)
+ print("I2C read 2 bytes test: {}, {}".format(status, feedback.hex()))
+
+ # read 1 byte of register 0x75 from device with address 0x68
+ status, feedback = test_dev.i2c_read(0x68, 1, 0x75)
+ print("I2C read 1 byte with register address 0x75 test: {}, {}".format(status, feedback.hex()))
+
+ # read 2 bytes of register 0x74 from device with address 0x68
+ status, feedback = test_dev.i2c_read(0x68, 2, b"\x74")
+ print("I2C read 2 bytes with register address 0x74 test: {}, {}".format(status, feedback.hex()))
+
+ input("(press ENTER to perform SPI test)")
+
# initialize SPI settings
test_dev.init_SPI(0, mode=1) # CLK speed: 60Mhz, SPI mode: 0b11
test_dev.set_CS1() # enable CS1 for transmission
@@ -41,19 +67,6 @@ def generate_random_data(length=50):
test_data_frame_length = 32768
time.sleep(0.2)
- # i2c test
- test_dev.init_I2C(2)
-
- for i in range(1):
- status, feedback = test_dev.i2c_read_write_raw(b"\xd1\x75", 0)
- print("I2C test NO.{}: {}, {}".format(i + 1, status, feedback))
-
- try:
- input("(press ENTER to perform test)")
- except KeyboardInterrupt:
- test_dev.close()
- exit()
-
# generate test bytes
data = generate_random_data(test_data_frame_length)