Skip to content

Commit

Permalink
Dev (#11)
Browse files Browse the repository at this point in the history
* Added rx_obj() and tx_obj() members

* Update setup.py

* Overhaul example code
  • Loading branch information
PowerBroker2 authored Apr 9, 2020
1 parent ab167c6 commit 4d9acd1
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 40 deletions.
134 changes: 107 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,122 @@ If using this package to communicate with Arduinos, see https://github.com/Power
pip install pySerialTransfer
```

# Example Sketch
# Example Python Script
```python
from time import sleep
import time
from pySerialTransfer import pySerialTransfer as txfer


if __name__ == '__main__':
try:
link = txfer.SerialTransfer('COM13')
link = txfer.SerialTransfer('COM17')

link.open()
sleep(2) # allow some time for the Arduino to completely reset

link.txBuff[0] = 'h'
link.txBuff[1] = 'i'
link.txBuff[2] = '\n'

link.send(3)

while not link.available():
if link.status < 0:
if link.status == -1:
print('ERROR: CRC_ERROR')
elif link.status == -2:
print('ERROR: PAYLOAD_ERROR')
elif link.status == -3:
print('ERROR: STOP_BYTE_ERROR')

print('Response received:')
time.sleep(2) # allow some time for the Arduino to completely reset

response = ''
for index in range(link.bytesRead):
response += chr(link.rxBuff[index])

print(response)
while True:
send_size = 0

###################################################################
# Send a list
###################################################################
list_ = [1, 3]
list_size = link.tx_obj(list_)
send_size += list_size

###################################################################
# Send a string
###################################################################
str_ = 'hello'
str_size = link.tx_obj(str_, send_size) - send_size
send_size += str_size

###################################################################
# Send a float
###################################################################
float_ = 5.234
float_size = link.tx_obj(float_, send_size) - send_size
send_size += float_size

###################################################################
# Transmit all the data to send in a single packet
###################################################################
link.send(send_size)

###################################################################
# Wait for a response and report any errors while receiving packets
###################################################################
while not link.available():
if link.status < 0:
if link.status == -1:
print('ERROR: CRC_ERROR')
elif link.status == -2:
print('ERROR: PAYLOAD_ERROR')
elif link.status == -3:
print('ERROR: STOP_BYTE_ERROR')

###################################################################
# Parse response list
###################################################################
rec_list_ = link.rx_obj(obj_type=type(list_),
obj_byte_size=list_size,
list_format='i')

###################################################################
# Parse response string
###################################################################
rec_str_ = link.rx_obj(obj_type=type(str_),
obj_byte_size=str_size,
start_pos=list_size)

###################################################################
# Parse response float
###################################################################
rec_float_ = link.rx_obj(obj_type=type(float_),
obj_byte_size=float_size,
start_pos=(list_size + str_size))

###################################################################
# Display the received data
###################################################################
print('SENT: {} {} {}'.format(list_, str_, float_))
print('RCVD: {} {} {}'.format(rec_list_, rec_str_, rec_float_))
print(' ')

except KeyboardInterrupt:
link.close()

except:
import traceback
traceback.print_exc()

except KeyboardInterrupt:
link.close()
```

# Example Arduino Sketch
```C++
#include "SerialTransfer.h"


SerialTransfer myTransfer;


void setup()
{
Serial.begin(115200);
myTransfer.begin(Serial);
}


void loop()
{
if(myTransfer.available())
{
// send all received data back to Python
for(uint16_t i=0; i < myTransfer.bytesRead; i++)
myTransfer.txBuff[i] = myTransfer.rxBuff[i];

myTransfer.sendData(myTransfer.bytesRead);
}
}
```
137 changes: 126 additions & 11 deletions pySerialTransfer/pySerialTransfer.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
import os
import json
import struct
import serial
import serial.tools.list_ports
from array import array
from .CRC import CRC


class InvalidSerialPort(Exception):
pass


CONTINUE = 2
NEW_DATA = 1
NO_DATA = 0
CRC_ERROR = -1
PAYLOAD_ERROR = -2
CONTINUE = 2
NEW_DATA = 1
NO_DATA = 0
CRC_ERROR = -1
PAYLOAD_ERROR = -2
STOP_BYTE_ERROR = -3

START_BYTE = 0x7E
STOP_BYTE = 0x81
STOP_BYTE = 0x81

MAX_PACKET_SIZE = 0xFE

find_start_byte = 0
find_start_byte = 0
find_overhead_byte = 1
find_payload_len = 2
find_payload = 3
find_crc = 4
find_end_byte = 5
find_payload_len = 2
find_payload = 3
find_crc = 4
find_end_byte = 5


def msb(val):
Expand Down Expand Up @@ -155,6 +158,118 @@ def close(self):
'''
if self.connection.is_open:
self.connection.close()

def tx_obj(self, val, start_pos=0):
'''
Description:
-----------
Insert an arbitrary variable's value into the TX buffer starting at the
specified index
:param val: n/a - value to be inserted into TX buffer
:param start_pos: int - index of TX buffer where the first byte
of the value is to be stored in
:return: int - index of the last byte of the value in the TX buffer + 1,
None if operation failed
'''

if type(val) == str:
val = val.encode()
format_str = '%ds' % len(val)

elif type(val) == dict:
val = json.dumps(val).encode()
format_str = '%ds' % len(val)

elif type(val) == float:
format_str = 'f'

elif type(val) == int:
format_str = 'i'

elif type(val) == bool:
format_str = '?'

elif type(val) == list:
for el in val:
start_pos = self.tx_obj(el, start_pos)

return start_pos

else:
return None

val_bytes = struct.pack(format_str, val)

for index in range(len(val_bytes)):
self.txBuff[index + start_pos] = val_bytes[index]

return start_pos + len(val_bytes)

def rx_obj(self, obj_type, obj_byte_size, start_pos=0, list_format=None):
'''
Description:
------------
Extract an arbitrary variable's value from the RX buffer starting at
the specified index. If object_type is list, it is assumed that the
list to be extracted has homogeneous element types where the common
element type can neither be list, dict, nor string longer than a
single char
:param obj_type: type - type of object to extract from the RX buffer
:param obj_byte_size: int - number of bytes making up extracted object
:param start_pos: int - index of TX buffer where the first byte
of the value is to be stored in
:param list_format: char - array.array format char to represent the
common list element type - 'c' for a char
list is supported
:return unpacked_response: obj - object extracted from the RX buffer,
None if operation failed
'''

buff = bytes(self.rxBuff[start_pos:(start_pos+obj_byte_size)])

if (obj_type == str) or (obj_type == dict):
format_str = '%ds' % len(buff)

elif obj_type == float:
format_str = 'f'

elif obj_type == int:
format_str = 'i'

elif obj_type == bool:
format_str = '?'

elif obj_type == list:
if list_format == 'c':
arr = array('B', buff)
temp_list = arr.tolist()

for index in range(len(temp_list)):
temp_list[index] = chr(temp_list[index])
return temp_list

elif list_format:
arr = array(list_format, buff)
return arr.tolist()

else:
return None
else:
return None

unpacked_response = struct.unpack(format_str, buff)[0]

if (obj_type == str) or (obj_type == dict):
unpacked_response = unpacked_response.decode('utf-8')

if obj_type == dict:
unpacked_response = json.loads(unpacked_response)

return unpacked_response

def calc_overhead(self, pay_len):
'''
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
setup(
name = 'pySerialTransfer',
packages = ['pySerialTransfer'],
version = '1.1.11',
version = '1.2.0',
description = 'Python package used to transmit and receive low overhead byte packets - especially useful for PC<-->Arduino USB communication (compatible with https://github.com/PowerBroker2/SerialTransfer)',
author = 'Power_Broker',
author_email = '[email protected]',
url = 'https://github.com/PowerBroker2/pySerialTransfer',
download_url = 'https://github.com/PowerBroker2/pySerialTransfer/archive/1.1.11.tar.gz',
download_url = 'https://github.com/PowerBroker2/pySerialTransfer/archive/1.2.0.tar.gz',
keywords = ['Arduino', 'serial', 'usb', 'protocol', 'communication'],
classifiers = [],
)

0 comments on commit 4d9acd1

Please sign in to comment.