Skip to content

Commit

Permalink
Added reply test cases, added keypad data
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanhz committed Nov 19, 2019
1 parent a8667c9 commit 049a306
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 13 deletions.
4 changes: 2 additions & 2 deletions osdp/_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def send_command_and_receive_reply(data: bytearray, command: Command, device: De
raise
data.extend(command_data)

log.debug("Raw write data: %s", command_data.hex())
log.debug("Raw command data: %s", command_data.hex())

self._connection.write(bytes(data))

Expand All @@ -154,7 +154,7 @@ def send_command_and_receive_reply(data: bytearray, command: Command, device: De

log.debug("Raw reply data: %s", reply_buffer.hex())

return Reply.parse(reply_buffer, self.id, command, device)
return Reply.parse(bytes(reply_buffer), self.id, command, device)

def extract_message_length(self, reply_buffer: bytearray) -> int:
return int.from_bytes(bytes(reply_buffer[2:3]), byteorder='little')
Expand Down
6 changes: 5 additions & 1 deletion osdp/_control_panel.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ def on_reply_received(self, reply: Reply):
elif reply.type==ReplyType.RawReaderData:
self.on_raw_card_data_reply_received(reply.address, RawCardData.parse_data(reply))

elif reply.type==ReplyType.KeypadData:
self.on_keypad_data_reply_received(reply.address, KeypadData.parse_data(reply))


def on_nak_reply_received(self, address: int, nak: Nak):
log.debug("%s < Nak received %s", address, nak)
Expand All @@ -139,4 +142,5 @@ def on_formatted_reader_data_reply_received(self, address: int, formatted_reader
def on_raw_card_data_reply_received(self, address: int, raw_card_data: RawCardData):
log.debug("%s < Raw reader data received %s", address, raw_card_data)


def on_keypad_data_reply_received(self, address: int, keypad_data: KeypadData):
log.debug("%s < Keypad data received %s", address, keypad_data)
19 changes: 10 additions & 9 deletions osdp/_reply.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ class Reply(Message):
REPLY_TYPE_INDEX = 5
MAC_SIZE = 4
SecureSessionMessages = [
SecurityBlockType.CommandMessageWithNoDataSecurity,
SecurityBlockType.ReplyMessageWithNoDataSecurity,
SecurityBlockType.CommandMessageWithDataSecurity,
SecurityBlockType.ReplyMessageWithDataSecurity
SecurityBlockType.CommandMessageWithNoDataSecurity.value,
SecurityBlockType.ReplyMessageWithNoDataSecurity.value,
SecurityBlockType.CommandMessageWithDataSecurity.value,
SecurityBlockType.ReplyMessageWithDataSecurity.value
]

def __init__(self, data: bytes, connection_id: UUID, issuing_command: Command, device: Device):
Expand All @@ -29,19 +29,19 @@ def __init__(self, data: bytes, connection_id: UUID, issuing_command: Command, d
is_secure_control_block_present: bool = (data[4] & 0x08)!=0
secure_block_size: int = (data[5] & 0xFF) if is_secure_control_block_present else 0
self._security_block_type = (data[6] & 0xFF) if is_secure_control_block_present else 0
self._secure_block_data = data[(self.REPLY_MESSAGE_HEADER_SIZE + 2):][:(secure_block_size-2)]
self._secure_block_data = data[(self.REPLY_MESSAGE_HEADER_SIZE + 2):][:(secure_block_size-2)] if is_secure_control_block_present else b''

mac_size: int = self.MAC_SIZE if self.is_secure_message else 0
message_length: int = len(data) - (reply_message_footer_size + mac_size)

self._mac = data[message_length:][:mac_size]
self._type = ReplyType(data[self.REPLY_TYPE_INDEX + secure_block_size] & 0xFF)

data_start: int = self.REPLY_MESSAGE_HEADER_SIZE + secure_block_size
data_start: int = self.REPLY_MESSAGE_HEADER_SIZE + secure_block_size + 1
data_end: int = - reply_message_footer_size - mac_size
self._extract_reply_data = data[data_start:data_end]

if SecurityBlockType(self.security_block_type)==SecurityBlockType.ReplyMessageWithDataSecurity:
if self.security_block_type==SecurityBlockType.ReplyMessageWithDataSecurity.value:
self._extract_reply_data = self.decrypt_data(device);

if is_using_crc:
Expand Down Expand Up @@ -92,10 +92,9 @@ def message_for_mac_generation(self) -> bytes:

@property
def is_secure_message(self) -> bool:
return SecurityBlockType(self.security_block_type) in self.SecureSessionMessages
return self.security_block_type in self.SecureSessionMessages

@property
@abstractmethod
def reply_code(self) -> int:
pass

Expand Down Expand Up @@ -157,6 +156,7 @@ def decrypt_data(self, device: Device) -> bytes:

class AckReply(Reply):

@property
def reply_code(self) -> int:
return 0x40

Expand All @@ -172,6 +172,7 @@ class UnknownReply(Reply):
def __init__(self, data: bytes, connection_id: UUID, issuing_command: Command, device: Device):
super().__init__(data, connection_id, issuing_command, device)

@property
def reply_code(self) -> int:
return self.type.value

Expand Down
24 changes: 23 additions & 1 deletion osdp/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,29 @@ def parse_data(reply) -> Nak:
return RawCardData(reader_number, format_code, bit_count, data)

def __repr__(self):
return "Reader Number: {0}\n Format Code: {1}\n Bit Count: {2}\n Data: {3}".format(self.reader_number, self.format_code.name, self.bit_count, self.data.hex())
return "Reader Number: {0}\n Format Code: {1}\n Bit Count: {2}\n Data: {3}".format(self.reader_number, self.format_code.name, self.bit_count, self.data.hex().upper())

class KeypadData:

def __init__(self, reader_number: int, bit_count: int, data: bytes):
self.reader_number = reader_number
self.bit_count = bit_count
self.data = data

@staticmethod
def parse_data(reply) -> Nak:
data = reply.extract_reply_data
if len(data)<2:
raise ValueError("Invalid size for the data")

reader_number = data[0]
bit_count = int.from_bytes(data[1:2], byteorder='little')
data = data[2:]
return KeypadData(reader_number, bit_count, data)

def __repr__(self):
return "Reader Number: {0}\n Bit Count: {1}\n Data: {2}".format(self.reader_number, self.bit_count, self.data.hex().upper())


class DataEvent(Event):

Expand Down
100 changes: 100 additions & 0 deletions tests/test_reply.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Tests for OSDP Replies"""

import os
import sys
import unittest
import datetime
from uuid import UUID, uuid4

sys.path.insert(0, os.path.abspath('..'))
from osdp import *


class CommandTestCase(unittest.TestCase):

"""Test commands for OSDP Python Module."""

def setUp(self):
"""Setup."""

def tearDown(self):
"""Teardown."""

def test_poll_reply_no_data_checksum(self):
bus_id = uuid4()
device = Device(address=0x7F, use_crc=False, use_secure_channel=False)

device.message_control.increment_sequence(device.message_control.sequence)
self.assertEqual(device.message_control.sequence, 1)

command = PollCommand(address=0x7F)
data = bytes.fromhex('53 FF 07 00 01 40 66')
reply = Reply.parse(data, bus_id, command, device)
self.assertEqual(reply.type, ReplyType.Ack)
self.assertEqual(reply.extract_reply_data, b'')

message = reply.build_reply(address=0x7F, control=device.message_control).hex().upper()
self.assertEqual(message, data.hex().upper())

device.message_control.increment_sequence(device.message_control.sequence)
self.assertEqual(device.message_control.sequence, 2)

device.message_control.increment_sequence(device.message_control.sequence)
self.assertEqual(device.message_control.sequence, 3)

def test_poll_reply_card_data_checksum(self):
bus_id = uuid4()
device = Device(address=0x7F, use_crc=False, use_secure_channel=False)

device.message_control.increment_sequence(device.message_control.sequence)
self.assertEqual(device.message_control.sequence, 1)

device.message_control.increment_sequence(device.message_control.sequence)
self.assertEqual(device.message_control.sequence, 2)

command = PollCommand(address=0x7F)
data = bytes.fromhex('53 FF 0F 00 02 50 FF 01 1A 00 CD 22 C7 16 67')
reply = Reply.parse(data, bus_id, command, device)
self.assertEqual(reply.type, ReplyType.RawReaderData)
self.assertEqual(reply.extract_reply_data.hex().upper(), 'FF011A00CD22C716')

card_data = RawCardData.parse_data(reply)
self.assertEqual(card_data.data.hex().upper(), 'CD22C716')

message = reply.build_reply(address=0x7F, control=device.message_control).hex().upper()
self.assertEqual(message, data.hex().upper())

device.message_control.increment_sequence(device.message_control.sequence)
self.assertEqual(device.message_control.sequence, 3)

def test_poll_reply_key_data_checksum(self):
bus_id = uuid4()
device = Device(address=0x7F, use_crc=False, use_secure_channel=False)

device.message_control.increment_sequence(device.message_control.sequence)
self.assertEqual(device.message_control.sequence, 1)

device.message_control.increment_sequence(device.message_control.sequence)
self.assertEqual(device.message_control.sequence, 2)

command = PollCommand(address=0x7F)
data = bytes.fromhex('53 FF 0D 00 02 53 FF 04 31 32 33 34 7F')
reply = Reply.parse(data, bus_id, command, device)
self.assertEqual(reply.type, ReplyType.KeypadData)
self.assertEqual(reply.extract_reply_data.hex().upper(), 'FF0431323334')

keypad_data = KeypadData.parse_data(reply)
self.assertEqual(keypad_data.data.hex().upper(), '31323334')

message = reply.build_reply(address=0x7F, control=device.message_control).hex().upper()
self.assertEqual(message, data.hex().upper())

device.message_control.increment_sequence(device.message_control.sequence)
self.assertEqual(device.message_control.sequence, 3)


if __name__ == '__main__':
unittest.main()

0 comments on commit 049a306

Please sign in to comment.