From b8b049fc9d784c91509d7168e574b2624efa4756 Mon Sep 17 00:00:00 2001 From: Ryan Hu Date: Mon, 18 Nov 2019 17:00:28 +1300 Subject: [PATCH] Initial commit --- .gitignore | 33 ++++ LICENSE | 13 ++ osdp/bus.py | 187 ++++++++++++++++++ osdp/connections/osdp_connection.py | 29 +++ .../serial_port_osdp_connection.py | 31 +++ .../connections/tcp_client_osdp_connection.py | 42 ++++ .../connections/tcp_server_osdp_connection.py | 42 ++++ osdp/control_panel.py | 157 +++++++++++++++ osdp/device.py | 72 +++++++ osdp/messages/ack_reply.py | 13 ++ osdp/messages/command.py | 62 ++++++ osdp/messages/control.py | 16 ++ osdp/messages/device_capabilities_command.py | 19 ++ osdp/messages/id_report_command.py | 19 ++ osdp/messages/input_status_report_command.py | 19 ++ osdp/messages/message.py | 86 ++++++++ osdp/messages/output_control_command.py | 21 ++ osdp/messages/output_status_report_command.py | 19 ++ osdp/messages/poll_command.py | 19 ++ osdp/messages/reader_led_control_command.py | 20 ++ osdp/messages/reader_status_report_command.py | 19 ++ osdp/messages/reply.py | 154 +++++++++++++++ osdp/messages/reply_type.py | 21 ++ osdp/messages/security_block_type.py | 11 ++ ...security_initialization_request_command.py | 20 ++ osdp/messages/server_cryptogram_command.py | 20 ++ osdp/messages/unknown_reply.py | 22 +++ osdp/model/command_data/output_controls.py | 32 +++ .../model/command_data/reader_led_controls.py | 74 +++++++ osdp/model/reply_data/device_capabilities.py | 57 ++++++ .../model/reply_data/device_identification.py | 33 ++++ osdp/model/reply_data/input_status.py | 15 ++ osdp/model/reply_data/nak.py | 34 ++++ osdp/model/reply_data/out_status.py | 15 ++ osdp/model/reply_data/raw_card_data.py | 29 +++ osdp/model/reply_data/reader_status.py | 20 ++ osdp/secure_channel.py | 113 +++++++++++ 37 files changed, 1608 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 osdp/bus.py create mode 100644 osdp/connections/osdp_connection.py create mode 100644 osdp/connections/serial_port_osdp_connection.py create mode 100644 osdp/connections/tcp_client_osdp_connection.py create mode 100644 osdp/connections/tcp_server_osdp_connection.py create mode 100644 osdp/control_panel.py create mode 100644 osdp/device.py create mode 100644 osdp/messages/ack_reply.py create mode 100644 osdp/messages/command.py create mode 100644 osdp/messages/control.py create mode 100644 osdp/messages/device_capabilities_command.py create mode 100644 osdp/messages/id_report_command.py create mode 100644 osdp/messages/input_status_report_command.py create mode 100644 osdp/messages/message.py create mode 100644 osdp/messages/output_control_command.py create mode 100644 osdp/messages/output_status_report_command.py create mode 100644 osdp/messages/poll_command.py create mode 100644 osdp/messages/reader_led_control_command.py create mode 100644 osdp/messages/reader_status_report_command.py create mode 100644 osdp/messages/reply.py create mode 100644 osdp/messages/reply_type.py create mode 100644 osdp/messages/security_block_type.py create mode 100644 osdp/messages/security_initialization_request_command.py create mode 100644 osdp/messages/server_cryptogram_command.py create mode 100644 osdp/messages/unknown_reply.py create mode 100644 osdp/model/command_data/output_controls.py create mode 100644 osdp/model/command_data/reader_led_controls.py create mode 100644 osdp/model/reply_data/device_capabilities.py create mode 100644 osdp/model/reply_data/device_identification.py create mode 100644 osdp/model/reply_data/input_status.py create mode 100644 osdp/model/reply_data/nak.py create mode 100644 osdp/model/reply_data/out_status.py create mode 100644 osdp/model/reply_data/raw_card_data.py create mode 100644 osdp/model/reply_data/reader_status.py create mode 100644 osdp/secure_channel.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..14980cf --- /dev/null +++ b/.gitignore @@ -0,0 +1,33 @@ +*.deb +*.egg +*.egg-info/ +*.egg/ +*.ignore +*.py[co] +*.py[oc] +*.spl +*.vagrant +.DS_Store +.coverage +.eggs/ +.eggs/* +.idea +.idea/ +.pt +.vagrant/ +RELEASE-VERSION.txt +build/ +cover/ +dist/ +dump.rdb +flake8.log +local/ +local_* +metadata/ +nosetests.xml +output.xml +pylint.log +redis-server.log +redis-server/ +__pycache__ +*/__pycache__ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..c51dad3 --- /dev/null +++ b/LICENSE @@ -0,0 +1,13 @@ +Copyright 2019 Ryan Hu and Contributors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/osdp/bus.py b/osdp/bus.py new file mode 100644 index 0000000..6b72cb2 --- /dev/null +++ b/osdp/bus.py @@ -0,0 +1,187 @@ +import logging +from datetime import datetime, timedelta +import time +from queue import Queue +from threading import Lock +from uuid import UUID, uuid4 + +log = logging.getLogger('osdp') + +''' +A group of OSDP devices sharing communications +''' +class Bus: + + DRIVER_BYTE = 0xFF + + def __init__(self, connection: OsdpConnection, on_reply_received): + self._connection = connection + self._on_reply_received = on_reply_received + self._configured_devices = {} + self._configured_devices_lock = Lock() + self._read_timeout = timedelta(milliseconds=200) + self.id = uuid4() + self._is_shutting_down = False + + @property + def idle_line_delay(self) -> timedelta: + return timedelta(milliseconds=(1000.0/self._connection.baud_rate * 16.0)) + + def close(self): + self._is_shutting_down = True + self._connection.close() + + def send_command(self, command: Command): + found_device = self._configured_devices.get(command.address) + if found_device is not None: + found_device.send_command(command) + + def add_device(self, address: int, use_crc: bool, use_secure_channel: bool): + found_device = self._configured_devices.get(address) + self._configured_devices_lock.acquire() + if found_device is not None: + self._configured_devices.pop(address) + self._configured_devices[address] = Device(address, use_crc, use_secure_channel) + self._configured_devices_lock.release() + + def remove_device(self, address: int): + found_device = self._configured_devices.get(address) + self._configured_devices_lock.acquire() + if found_device is not None: + self._configured_devices.pop(address) + self._configured_devices_lock.release() + + def is_online(self, address: int) -> bool: + found_device = self._configured_devices.get(address) + if found_device is None: + return False + else: + return found_device.is_online + + def run_polling_loop(self): + last_message_sent_time = datetime.min + while not self._is_shutting_down: + if not self._connection.is_open: + try: + self._connection.open() + except: + log.exception("Error while opening connection %s", self._connection) + + time_difference = timedelta(milliseconds=100) - (datetime.now() - last_message_sent_time) + time.sleep(max(time_difference, timedelta(seconds=0)).total_seconds()) + + if not self._configured_devices: + last_message_sent_time = datetime.now() + continue + + for device in list(self._configured_devices.values()): + data = bytearray([self.DRIVER_BYTE]) + command = device.get_next_command_data() + + last_message_sent_time = datetime.now() + + reply: Reply = None + try: + reply = self.send_command_and_receive_reply(data, command, device) + except: + log.exception("Error while sending command %s and receiving reply", command) + self._connection.close() + continue + + try: + self.process_reply(reply, device) + except: + log.exception("Error while processing reply %s", reply) + self._connection.close() + continue + + time.sleep(self.idle_line_delay.total_seconds()) + + def process_reply(self, reply: Reply, device: Device): + if not reply.is_valid_reply: + return + + if reply.is_secure_message: + mac = device.generate_mac(reply.message_for_mac_generation, False) + if not reply.is_valid_mac(mac): + device.reset_security() + return + + if reply.type != ReplyType.Busy: + device.valid_reply_has_been_received(reply.sequence) + + extract_reply_data = reply.extract_reply_data + error_code = ErrorCode(extract_reply_data[0]) + if reply.type == ReplyType.Nak and (error_code==ErrorCode.DoesNotSupportSecurityBlock || error_code==ErrorCode.DoesNotSupportSecurityBlock): + device.reset_security() + + if reply.type==ReplyType.CrypticData: + device.initialize_secure_channel(reply) + elif reply.type==ReplyType.InitialRMac: + device.validate_secure_channel_establishment(reply) + + if self._on_reply_received is not None: + self._on_reply_received(reply) + + def send_command_and_receive_reply(data: bytearray, command: Command, device: Device) -> Reply: + command_data: bytes = None + try: + command_data = command.build_command(device) + except: + log.exception("Error while building command %s", command) + raise + data.extend(command_data) + + log.debug("Raw write data: %s", command_data.hex()) + + self._connection.write(bytes(data)) + + reply_buffer = bytearray() + + if not self.wait_for_start_of_message(reply_buffer): + raise TimeoutError("Timeout waiting for reply message") + + if not self.wait_for_message_length(reply_buffer): + raise TimeoutError("Timeout waiting for reply message length") + + if not self.wait_for_rest_of_message(reply_buffer, self.extract_message_length(reply_buffer)): + raise TimeoutError("Timeout waiting for rest of reply message") + + log.debug("Raw reply data: %s", reply_buffer.hex()) + + return Reply.parse(reply_buffer, self.id, command, device) + + def extract_message_length(self, reply_buffer: bytearray) -> int: + return Message.convert_bytes_to_short(bytes(reply_buffer[2:3])) + + def wait_for_rest_of_message(self, buffer: bytearray, reply_length: int): + while len(buffer) < reply_length: + bytes_read = self._connection.read(reply_length-len(buffer)) + if len(bytes_read)>0: + buffer.extend(bytes_read) + else: + return False + return True + + def wait_for_message_length(self, buffer: bytearray) -> bool: + while len(buffer) < 4: + bytes_read = self._connection.read(4) + if len(bytes_read)>0: + buffer.extend(bytes_read) + else: + return False + return True + + def wait_for_start_of_message(self, buffer: bytearray) -> bool: + while True: + bytes_read = self._connection.read(1) + if len(bytes_read)==0: + return False + + if bytes_read[0]!=Message.SOM: + continue + + buffer.extend(bytes_read) + break + return True + \ No newline at end of file diff --git a/osdp/connections/osdp_connection.py b/osdp/connections/osdp_connection.py new file mode 100644 index 0000000..dc778f0 --- /dev/null +++ b/osdp/connections/osdp_connection.py @@ -0,0 +1,29 @@ +from abc import ABC, abstractmethod + +class OsdpConnection(ABC): + + @property + @abstractmethod + def baud_rate(self) -> int: + pass + + @property + @abstractmethod + def is_open(self) -> bool: + pass + + @abstractmethod + def open(self): + pass + + @abstractmethod + def close(self): + pass + + @abstractmethod + def write(self, buf: bytes): + pass + + @abstractmethod + def read(self, size: int=1) -> bytes: + pass \ No newline at end of file diff --git a/osdp/connections/serial_port_osdp_connection.py b/osdp/connections/serial_port_osdp_connection.py new file mode 100644 index 0000000..39dda72 --- /dev/null +++ b/osdp/connections/serial_port_osdp_connection.py @@ -0,0 +1,31 @@ +import serial + + +class SerialPortOsdpConnection(OsdpConnection): + + def __init__(self, port: str, baud_rate: int): + self._port = port + self._baud_rate = baud_rate + self.serial_port = None + + @property + def baud_rate(self) -> int: + return self._baud_rate + + @property + def is_open(self) -> bool: + self.serial_port is not None and self.serial_port.is_open + + def open(self): + self.serial_port = serial.Serial(port=self._port, baudrate=self._baud_rate, timeout=2.0) + + def close(self): + if self.serial_port is not None: + self.serial_port.close() + self.serial_port = None + + def write(self, buf: bytes): + self.serial_port.write(buf) + + def read(self, size: int=1) -> bytes: + return self.serial_port.read(size) \ No newline at end of file diff --git a/osdp/connections/tcp_client_osdp_connection.py b/osdp/connections/tcp_client_osdp_connection.py new file mode 100644 index 0000000..4a5b232 --- /dev/null +++ b/osdp/connections/tcp_client_osdp_connection.py @@ -0,0 +1,42 @@ +import socket +import sys + + +class TcpClientOsdpConnection(OsdpConnection): + + def __init__(self, server: str, port_number: int): + self._server = server + self._port_number = port_number + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.settimeout(2) + self.is_connected = False + + @property + def baud_rate(self) -> int: + return 9600 + + @property + def is_open(self) -> bool: + return self.is_connected + + def open(self): + server_address = (self._server, self._port_number) + self.sock.connect(server_address) + self.is_connected = True + + def close(self): + self.sock.close() + self.is_connected = False + + def write(self, buf: bytes): + try: + self.sock.send(buf) + except socket.timeout as e: + self.is_connected = False + + def read(self, size: int=1) -> bytes: + try: + return self.sock.recv(size) + except socket.timeout as e: + self.is_connected = False + return b'' \ No newline at end of file diff --git a/osdp/connections/tcp_server_osdp_connection.py b/osdp/connections/tcp_server_osdp_connection.py new file mode 100644 index 0000000..b28fe2f --- /dev/null +++ b/osdp/connections/tcp_server_osdp_connection.py @@ -0,0 +1,42 @@ +import socket +import sys + + +class TcpServerOsdpConnection(OsdpConnection): + + def __init__(self, port_number: int): + self._port_number = port_number + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.settimeout(2) + server_address = ('0.0.0.0', self._port_number) + self.sock.bind(server_address) + self.connection = None + + @property + def baud_rate(self) -> int: + return 9600 + + @property + def is_open(self) -> bool: + return self.connection is not None + + def open(self): + self.sock.listen(1) + self.connection, _ = self.sock.accept() + + def close(self): + self.connection.close() + self.connection = None + + def write(self, buf: bytes): + try: + self.connection.sendall(buf) + except socket.timeout as e: + self.close() + + def read(self, size: int=1) -> bytes: + try: + return self.connection.recv(size) + except socket.timeout as e: + self.close() + return b'' \ No newline at end of file diff --git a/osdp/control_panel.py b/osdp/control_panel.py new file mode 100644 index 0000000..0622724 --- /dev/null +++ b/osdp/control_panel.py @@ -0,0 +1,157 @@ +import logging +from queue import Queue +from datetime import datetime, timedelta +from uuid import UUID, uuid4 +from threading import Thread, Event + +log = logging.getLogger('osdp') + +class ControlPanel: + + def __init__(self): + self._buses = {} + self._reply_handlers = [] + self._reply_timeout = 5.0 + + def start_connection(self, connection: OsdpConnection) -> UUID: + bus = Bus(connection, self.on_reply_received) + self._buses[bus.id] = bus + thread = Thread(target = bus.run_polling_loop) + thread.start() + return bus.id + + def send_custom_command(self, connection_id: UUID, command: Command): + self.send_command(connection_id, command) + + def id_report(self, connection_id: UUID, address: int) -> DeviceIdentification: + return DeviceIdentification.parse_data(self.send_command(connection_id, IdReportCommand(address))) + + def device_capabilities(self, connection_id: UUID, address: int) -> DeviceCapabilities: + return DeviceCapabilities.parse_data(self.send_command(connection_id, DeviceCapabilitiesCommand(address))) + + def local_status(self, connection_id: UUID, address: int) -> LocalStatus: + return LocalStatus.parse_data(self.send_command(connection_id, LocalStatusReportCommand(address))) + + def input_status(self, connection_id: UUID, address: int) -> InputStatus: + return InputStatus.parse_data(self.send_command(connection_id, InputStatusReportCommand(address))) + + def output_status(self, connection_id: UUID, address: int) -> OutputStatus: + return OutputStatus.parse_data(self.send_command(connection_id, OutputStatusReportCommand(address))) + + def reader_status(self, connection_id: UUID, address: int) -> ReaderStatus: + return ReaderStatus.parse_data(self.send_command(connection_id, ReaderStatusReportCommand(address))) + + def output_control(self, connection_id: UUID, address: int, output_controls: OutputControls) -> bool: + reply = self.send_command(connection_id, OutputControlCommand(address, output_controls)) + return reply.type == ReplyType.Ack || reply.type == ReplyType.OutputStatusReport + + def reader_led_control(self, connection_id: UUID, address: int, reader_led_controls: ReaderLedControls) -> bool: + reply = self.send_command(connection_id, ReaderLedControlCommand(address, reader_led_controls)) + return reply.type == ReplyType.Ack + + def is_online(self, connection_id: UUID, address: int) -> bool: + bus = self._buses.get(connection_id) + if bus is None: + return False + else: + return bus.is_online(address) + + def send_command(self, connection_id: UUID, command: Command) -> Reply: + event = DataEvent() + def reply_fetcher(reply: Reply): + if reply.match_issuing_command(command): + self._reply_handlers.remove(reply_fetcher) + event.set_data(reply) + + self._reply_handlers.append(reply_fetcher) + bus = self._buses[connection_id] + bus.send_command(command) + result = event.wait_data(self._reply_timeout) + if event.is_set(): + return result + else: + self._reply_handlers.remove(reply_fetcher) + raise TimeoutError() + + def shutdown(self): + for bus in self._buses: + bus.close() + + def add_device(self, connection_id: UUID, address: int, use_crc: bool, use_secure_channel: bool): + bus = self._buses.get(connection_id) + if bus is not None: + bus.add_device(address, use_crc, use_secure_channel) + + def remove_device(self, connection_id: UUID, address: int): + bus = self._buses.get(connection_id) + if bus is not None: + bus.remove_device(address) + + def on_reply_received(self, reply: Reply): + for reply_hander in self._reply_handlers: + reply_hander(reply) + + if reply.type==ReplyType.Nak: + self.on_nak_reply_received(reply.address, Nak.parse_data(reply)) + + elif reply.type==ReplyType.LocalStatusReport: + self.on_local_status_report_reply_received(reply.address, LocalStatus.parse_data(reply)) + + elif reply.type==ReplyType.InputStatusReport: + self.on_input_status_report_reply_received(reply.address, InputStatus.parse_data(reply)) + + elif reply.type==ReplyType.OutputStatusReport: + self.on_output_status_report_reply_received(reply.address, OutputStatus.parse_data(reply)) + + elif reply.type==ReplyType.ReaderStatusReport: + self.on_reader_status_report_reply_received(reply.address, ReaderStatus.parse_data(reply)) + + elif reply.type==ReplyType.FormattedReaderData: + self.on_formatted_reader_data_reply_received(reply.address, reply.extract_reply_data) + + elif reply.type==ReplyType.RawReaderData: + self.on_raw_card_data_reply_received(reply.address, RawCardData.parse_data(reply)) + + + def on_nak_reply_received(self, address: int, nak: Nak): + log.debug("%s < Nak received %s", address, nak) + + def on_local_status_report_reply_received(self, address: int, local_status: LocalStatus): + log.debug("%s < Local status received %s", address, local_status) + + def on_input_status_report_reply_received(self, address: int, input_status: InputStatus): + log.debug("%s < Input status received %s", address, input_status) + + def on_output_status_report_reply_received(self, address: int, output_status: OutputStatus): + log.debug("%s < Output status received %s", address, output_status) + + def on_reader_status_report_reply_received(self, address: int, reader_status: ReaderStatus): + log.debug("%s < Reader status received %s", address, reader_status) + + def on_formatted_reader_data_reply_received(self, address: int, formatted_reader_data: bytes): + log.debug("%s < Formatted reader data received %s", address, formatted_reader_data) + + def on_raw_card_data_reply_received(self, address: int, raw_card_data: RawCardData): + log.debug("%s < Raw reader data received %s", address, raw_card_data) + + +class DataEvent(Event): + + def __init__(self): + super().__init__() + self.data = None + + def set_data(self, data): + self.data = data + super().set() + + def clear_data(self): + self.data = None + super().clear() + + def wait_data(self, timeout=None): + self.wait(timeout) + if self.is_set(): + return self.data + else: + return None \ No newline at end of file diff --git a/osdp/device.py b/osdp/device.py new file mode 100644 index 0000000..b2c3e42 --- /dev/null +++ b/osdp/device.py @@ -0,0 +1,72 @@ +import queue +import datetime +from message import Message +from reply import Reply +from secure_channel import SecureChannel +from control import Control + + +class Device(object): + + def __init__(self, address: int, use_crc: bool, use_secure_channel: bool): + self._use_secure_channel = use_secure_channel + this.address = address + self.message_control = Control(0, use_crc, use_secure_channel) + + self._commands = queue.Queue() + self._secure_channel = SecureChannel() + self._last_valid_reply = datetime.datetime.now() + + @property + def is_security_established(self) -> bool: + return self.message_control.has_security_control_block and self._secureChannel.is_established + + @property + def is_online(self) -> bool: + return self._last_valid_reply + datetime.timedelta(seconds=5) >= datetime.datetime.now() + + def get_next_command_data(self) -> Command: + if self.message_control.sequence==0: + return PollCommand(self.address) + + if self._use_secure_channel && !self._secure_channel.is_initialized: + return SecurityInitializationRequestCommand(self.address, self._secure_channel.serverRandomNumber) + + if self._use_secure_channel && !self._secure_channel.is_established: + return ServerCryptogramCommand(self.address, self._secure_channel.serverCryptogram) + + if self._commands.empty(): + return PollCommand(self.address) + else: + command = self._commands.get(False) + return command + + def send_command(self, command: Command): + self._commands.put(command) + + def valid_reply_has_been_received(self, sequence: int): + self.message_control.increment_sequence(sequence) + self._last_valid_reply = datetime.datetime.now() + + def initialize_secure_channel(self, reply: Reply): + reply_data = reply.extract_reply_data + _secureChannel.initialize(reply_data[:8], reply_data[8:16], reply_data[16:32]) + + def validate_secure_channel_establishment(self, reply: Reply) -> bool: + if !reply.secure_cryptogram_has_been_accepted(): + return False + + _secureChannel.establish(reply.extract_reply_data); + return True + + def generate_mac(self, message: bytes, is_command: bool): + return self._secure_channel.generate_mac(message, is_command) + + def reset_security(self): + self._secure_channel.reset() + + def encrypt_data(self, data: bytes): + return self._secure_channel.encrypt_data(data) + + def decrypt_data(self, data: bytes): + return self._secure_channel.decrypt_data(data) diff --git a/osdp/messages/ack_reply.py b/osdp/messages/ack_reply.py new file mode 100644 index 0000000..c0a436a --- /dev/null +++ b/osdp/messages/ack_reply.py @@ -0,0 +1,13 @@ +from reply import Reply + + +class AckReply(Command): + + def reply_code(self) -> int: + return 0x40 + + def security_control_block(self) -> bytes: + return bytes([ 0x02, 0x16 ]) + + def data() -> bytes: + return bytes([ ]) diff --git a/osdp/messages/command.py b/osdp/messages/command.py new file mode 100644 index 0000000..a86ffe3 --- /dev/null +++ b/osdp/messages/command.py @@ -0,0 +1,62 @@ +from abc import ABC, abstractmethod +from message import Message + +class Command(Message): + + def __init__(self): + self._address = None + self._code = None + + @property + @abstractmethod + def command_code(self) -> int: + pass + + @abstractmethod + def security_control_block(self) -> bytes: + pass + + @abstractmethod + def custom_command_update(self, command_buffer: bytearray): + pass + + def build_command(self, device: Device) -> bytes: + command_buffer = bytearray([ + self.SOM, + self.address, + 0x00, + 0x00, + device.message_control.control_byte + ]) + + if device.message_control.has_security_control_block: + command_buffer.extend(self.security_control_block()) + + command_buffer.append(self.command_code) + + if device.is_security_established: + command_buffer.extend(self.encrypted_data(device)) + + # TODO: I don't think this needed + # include mac and crc/checksum in length before generating mac + # additional_length = 4 + (device.message_control.use_crc ? 2 : 1) + # self.add_packet_length(command_buffer, additional_length) + + command_buffer.extend(device.generate_mac(bytes(command_buffer), True)[0:4]) + else: + command_buffer.extend(self.data()) + + command_buffer.append(0x00) + if device.message_control.use_crc: + command_buffer.append(0x00) + + self.add_packet_length(command_buffer) + + if device.message_control.use_crc: + self.add_crc(command_buffer) + else: + self.add_checksum(command_buffer) + + custom_command_update(command_buffer) + + return bytes(command_buffer) diff --git a/osdp/messages/control.py b/osdp/messages/control.py new file mode 100644 index 0000000..c7c0a53 --- /dev/null +++ b/osdp/messages/control.py @@ -0,0 +1,16 @@ + + +class Control: + + def __init__(self, sequence: int, use_crc: bool, has_security_control_block: bool): + self.sequence = sequence + self.use_crc = use_crc + self.has_security_control_block = has_security_control_block + + @property + def control_byte(self) -> int: + return (self.sequence & 0x03 | (0x04 if self.use_crc else 0) | (0x08 if self.has_security_control_block else 0)) & 0xFF + + def increment_sequence(self, _sequence: int): + _sequence = (_sequence+1)%3 + 1 + self.sequence = _sequence diff --git a/osdp/messages/device_capabilities_command.py b/osdp/messages/device_capabilities_command.py new file mode 100644 index 0000000..2970abf --- /dev/null +++ b/osdp/messages/device_capabilities_command.py @@ -0,0 +1,19 @@ +from command import Command + + +class DeviceCapabilitiesCommand(Command): + + def __init__(self, address: int): + self.address = address + + def command_code(self) -> int: + return 0x62 + + def security_control_block(self) -> bytes: + return bytes([ 0x02, 0x17 ]) + + def data() -> bytes: + return bytes([ 0x00 ]) + + def custom_command_update(self, command_buffer: bytearray): + pass \ No newline at end of file diff --git a/osdp/messages/id_report_command.py b/osdp/messages/id_report_command.py new file mode 100644 index 0000000..2b59ad5 --- /dev/null +++ b/osdp/messages/id_report_command.py @@ -0,0 +1,19 @@ +from command import Command + + +class DeviceCapabilitiesCommand(Command): + + def __init__(self, address: int): + self.address = address + + def command_code(self) -> int: + return 0x61 + + def security_control_block(self) -> bytes: + return bytes([ 0x02, 0x17 ]) + + def data() -> bytes: + return bytes([ 0x00 ]) + + def custom_command_update(self, command_buffer: bytearray): + pass \ No newline at end of file diff --git a/osdp/messages/input_status_report_command.py b/osdp/messages/input_status_report_command.py new file mode 100644 index 0000000..263331d --- /dev/null +++ b/osdp/messages/input_status_report_command.py @@ -0,0 +1,19 @@ +from command import Command + + +class InputStatusReportCommand(Command): + + def __init__(self, address: int): + self.address = address + + def command_code(self) -> int: + return 0x65 + + def security_control_block(self) -> bytes: + return bytes([ 0x02, 0x15 ]) + + def data() -> bytes: + return bytes([ ]) + + def custom_command_update(self, command_buffer: bytearray): + pass \ No newline at end of file diff --git a/osdp/messages/message.py b/osdp/messages/message.py new file mode 100644 index 0000000..01666e8 --- /dev/null +++ b/osdp/messages/message.py @@ -0,0 +1,86 @@ +from abc import ABC, abstractmethod + +class Message(ABC): + + SOM = 0x53 + + crc_table = [ + 0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7, 0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, + 0xD1AD, 0xE1CE, 0xF1EF, 0x1231, 0x0210, 0x3273, 0x2252, 0x52B5, 0x4294, 0x72F7, 0x62D6, 0x9339, 0x8318, + 0xB37B, 0xA35A, 0xD3BD, 0xC39C, 0xF3FF, 0xE3DE, 0x2462, 0x3443, 0x0420, 0x1401, 0x64E6, 0x74C7, 0x44A4, + 0x5485, 0xA56A, 0xB54B, 0x8528, 0x9509, 0xE5EE, 0xF5CF, 0xC5AC, 0xD58D, 0x3653, 0x2672, 0x1611, 0x0630, + 0x76D7, 0x66F6, 0x5695, 0x46B4, 0xB75B, 0xA77A, 0x9719, 0x8738, 0xF7DF, 0xE7FE, 0xD79D, 0xC7BC, 0x48C4, + 0x58E5, 0x6886, 0x78A7, 0x0840, 0x1861, 0x2802, 0x3823, 0xC9CC, 0xD9ED, 0xE98E, 0xF9AF, 0x8948, 0x9969, + 0xA90A, 0xB92B, 0x5AF5, 0x4AD4, 0x7AB7, 0x6A96, 0x1A71, 0x0A50, 0x3A33, 0x2A12, 0xDBFD, 0xCBDC, 0xFBBF, + 0xEB9E, 0x9B79, 0x8B58, 0xBB3B, 0xAB1A, 0x6CA6, 0x7C87, 0x4CE4, 0x5CC5, 0x2C22, 0x3C03, 0x0C60, 0x1C41, + 0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD, 0xAD2A, 0xBD0B, 0x8D68, 0x9D49, 0x7E97, 0x6EB6, 0x5ED5, 0x4EF4, 0x3E13, + 0x2E32, 0x1E51, 0x0E70, 0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC, 0xBF1B, 0xAF3A, 0x9F59, 0x8F78, 0x9188, 0x81A9, + 0xB1CA, 0xA1EB, 0xD10C, 0xC12D, 0xF14E, 0xE16F, 0x1080, 0x00A1, 0x30C2, 0x20E3, 0x5004, 0x4025, 0x7046, + 0x6067, 0x83B9, 0x9398, 0xA3FB, 0xB3DA, 0xC33D, 0xD31C, 0xE37F, 0xF35E, 0x02B1, 0x1290, 0x22F3, 0x32D2, + 0x4235, 0x5214, 0x6277, 0x7256, 0xB5EA, 0xA5CB, 0x95A8, 0x8589, 0xF56E, 0xE54F, 0xD52C, 0xC50D, 0x34E2, + 0x24C3, 0x14A0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405, 0xA7DB, 0xB7FA, 0x8799, 0x97B8, 0xE75F, 0xF77E, + 0xC71D, 0xD73C, 0x26D3, 0x36F2, 0x0691, 0x16B0, 0x6657, 0x7676, 0x4615, 0x5634, 0xD94C, 0xC96D, 0xF90E, + 0xE92F, 0x99C8, 0x89E9, 0xB98A, 0xA9AB, 0x5844, 0x4865, 0x7806, 0x6827, 0x18C0, 0x08E1, 0x3882, 0x28A3, + 0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E, 0x8BF9, 0x9BD8, 0xABBB, 0xBB9A, 0x4A75, 0x5A54, 0x6A37, 0x7A16, 0x0AF1, + 0x1AD0, 0x2AB3, 0x3A92, 0xFD2E, 0xED0F, 0xDD6C, 0xCD4D, 0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9, 0x7C26, 0x6C07, + 0x5C64, 0x4C45, 0x3CA2, 0x2C83, 0x1CE0, 0x0CC1, 0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C, 0xAF9B, 0xBFBA, 0x8FD9, + 0x9FF8, 0x6E17, 0x7E36, 0x4E55, 0x5E74, 0x2E93, 0x3EB2, 0x0ED1, 0x1EF0 + ] + + def __init__(self): + self._address = None + + @property + def address(self) -> int: + return self._address + + @address.setter + def address(self, value: int): + return self._address = value + + @abstractmethod + def data() -> bytes: + pass + + def convert_short_to_bytes(self, value: int) -> bytes: + return value.to_bytes(2, byteorder='little') + + def convert_bytes_to_short(self, data) -> int: + return int.from_bytes(data, byteorder='little') + + def calculate_crc(self, data: bytes) -> int: + crc = 0x1D0F + for t in data: + crc = ((crc << 8) ^ self.crc_table[((crc >> 8) ^ t) & 0xFF]) & 0xFFFF + return crc + + def calculate_checksum(self, data: bytes) -> int: + return (0x100 - sum(data) & 0xFF) + + def add_packet_length(self, packet: bytearray, additional_length: int = 0): + packet_length = self.convert_short_to_bytes(len(packet) + additional_length) + packet[2] = packet_length[0] + packet[3] = packet_length[1] + + def add_crc(self, packet: bytearray): + crc = self.calculate_crc(bytes(packet[:-2])) + crc_bytes = self.convert_short_to_bytes(crc) + packet[-2] = crc_bytes[0] + packet[-1] = crc_bytes[1] + + def add_checksum(self, packet: bytearray): + checksum = self.calculate_checksum(bytes(packet[:-1])) + packet[-1] = checksum & 0xFF + + def encrypted_data(self, device: Device) -> bytes: + data = self.data() + if len(data)>0: + return device.encrypt_data(data) + else: + return data + + def convert_int_to_bytes(self, value: int) -> bytes: + return value.to_bytes(2, byteorder='little') + + def convert_bytes_to_int(self, data) -> int: + return int.from_bytes(data, byteorder='little') diff --git a/osdp/messages/output_control_command.py b/osdp/messages/output_control_command.py new file mode 100644 index 0000000..c0e8a12 --- /dev/null +++ b/osdp/messages/output_control_command.py @@ -0,0 +1,21 @@ +from command import Command +from output_controls import OutputControls + + +class OutputControlCommand(Command): + + def __init__(self, address: int, output_controls: OutputControls): + self.address = address + self.output_controls = output_controls + + def command_code(self) -> int: + return 0x68 + + def security_control_block(self) -> bytes: + return bytes([ 0x02, 0x17 ]) + + def data() -> bytes: + return self.output_controls.build_data() + + def custom_command_update(self, command_buffer: bytearray): + pass \ No newline at end of file diff --git a/osdp/messages/output_status_report_command.py b/osdp/messages/output_status_report_command.py new file mode 100644 index 0000000..ed53e0a --- /dev/null +++ b/osdp/messages/output_status_report_command.py @@ -0,0 +1,19 @@ +from command import Command + + +class OutputStatusReportCommand(Command): + + def __init__(self, address: int): + self.address = address + + def command_code(self) -> int: + return 0x66 + + def security_control_block(self) -> bytes: + return bytes([ 0x02, 0x15 ]) + + def data() -> bytes: + return bytes([]) + + def custom_command_update(self, command_buffer: bytearray): + pass \ No newline at end of file diff --git a/osdp/messages/poll_command.py b/osdp/messages/poll_command.py new file mode 100644 index 0000000..bd8c96f --- /dev/null +++ b/osdp/messages/poll_command.py @@ -0,0 +1,19 @@ +from command import Command + + +class PollCommand(Command): + + def __init__(self, address: int): + self.address = address + + def command_code(self) -> int: + return 0x60 + + def security_control_block(self) -> bytes: + return bytes([ 0x02, 0x15 ]) + + def data() -> bytes: + return bytes([]) + + def custom_command_update(self, command_buffer: bytearray): + pass \ No newline at end of file diff --git a/osdp/messages/reader_led_control_command.py b/osdp/messages/reader_led_control_command.py new file mode 100644 index 0000000..9db9930 --- /dev/null +++ b/osdp/messages/reader_led_control_command.py @@ -0,0 +1,20 @@ +from command import Command + + +class ReaderLedControlCommand(Command): + + def __init__(self, address: int, reader_led_controls ReaderLedControls): + self.address = address + self.reader_led_controls = reader_led_controls + + def command_code(self) -> int: + return 0x69 + + def security_control_block(self) -> bytes: + return bytes([ 0x02, 0x17 ]) + + def data() -> bytes: + return self.reader_led_controls.build_data() + + def custom_command_update(self, command_buffer: bytearray): + pass \ No newline at end of file diff --git a/osdp/messages/reader_status_report_command.py b/osdp/messages/reader_status_report_command.py new file mode 100644 index 0000000..2e24ee0 --- /dev/null +++ b/osdp/messages/reader_status_report_command.py @@ -0,0 +1,19 @@ +from command import Command + + +class ReaderStatusReportCommand(Command): + + def __init__(self, address: int): + self.address = address + + def command_code(self) -> int: + return 0x67 + + def security_control_block(self) -> bytes: + return bytes([ 0x02, 0x15 ]) + + def data() -> bytes: + return bytes([]) + + def custom_command_update(self, command_buffer: bytearray): + pass \ No newline at end of file diff --git a/osdp/messages/reply.py b/osdp/messages/reply.py new file mode 100644 index 0000000..8c3f327 --- /dev/null +++ b/osdp/messages/reply.py @@ -0,0 +1,154 @@ +from abc import ABC, abstractmethod +from uuid import UUID, uuid4 +from message import Message +from command import Command +from replay_type import ReplyType +from security_block_type import SecurityBlockType + +class Reply(Message): + + ADDRESS_MASK = 0x7F + REPLY_MESSAGE_HEADER_SIZE = 5 + REPLY_TYPE_INDEX = 5 + MAC_SIZE = 4 + SecureSessionMessages = [ + SecurityBlockType.CommandMessageWithNoDataSecurity, + SecurityBlockType.ReplyMessageWithNoDataSecurity, + SecurityBlockType.CommandMessageWithDataSecurity, + SecurityBlockType.ReplyMessageWithDataSecurity + ] + + def __init__(self, data: bytes, connection_id: UUID, issuing_command: Command, device Device): + self._address = data[1] & self.ADDRESS_MASK + self._sequence = data[4] & 0x03 + + is_using_crc: bool = (data[4] & 0x04)!=0 + reply_message_footer_size: int = 2 if is_using_crc else 1 + + is_secure_control_block_present: bool = (data[4] & 0x08)!=0 + secure_block_size: int = (data[5] & 0xFF) if is_secure_control_block_present else 0 + self._security_block_type = (data[6] & 0xFF) if is_secure_control_block_present else 0 + self._secure_block_data = data[(self.REPLY_MESSAGE_HEADER_SIZE + 2):][:(secure_block_size-2)] + + mac_size: int = self.MAC_SIZE if self.is_secure_message else 0 + message_length: int = len(data) - (reply_message_footer_size + mac_size) + + self._mac = data[message_length:][:mac_size] + self._type = ReplyType(data[self.REPLY_TYPE_INDEX + secure_block_size] & 0xFF) + + data_start: int = self.REPLY_MESSAGE_HEADER_SIZE + secure_block_size + data_end: int = - reply_message_footer_size - mac_size + self._extract_reply_data = data[data_start:data_end] + + if SecurityBlockType(self.security_block_type)==SecurityBlockType.ReplyMessageWithDataSecurity: + self._extract_reply_data = self.decrypt_data(device); + + if is_using_crc: + self._is_data_correct = self.calculate_crc(data[:-2])==self.convert_bytes_to_short(data[-2:]) + else: + self._is_data_correct = self.calculate_checksum(data[:-1])==self.convert_bytes_to_short(data[-1:]) + + self._message_for_mac_generation = data[:message_length] + + self._connection_id = connection_id + self._issuing_command = issuing_command + + @property + def security_block_type(self) -> int: + return self._security_block_type + + @property + def secure_block_data(self) -> bytes: + return self._secure_block_data + + @property + def mac(self) -> bytes: + return self._mac + + @property + def is_data_correct(self) -> bool: + return self._is_data_correct + + @property + def sequence(self) -> int: + return self._sequence + + @property + def is_correct_address(self) -> bool: + return self._issuing_command.address == self.address + + @property + def type(self) -> ReplyType: + return self._type + + @property + def extract_reply_data(self) -> bytes: + return return self._extract_reply_data + + @property + def message_for_mac_generation(self) -> bytes: + return return self._message_for_mac_generation + + @property + def is_secure_message(self) -> bool: + return SecurityBlockType(self.security_block_type) in self.SecureSessionMessages + + @property + @abstractmethod + def reply_code(self) -> int: + pass + + @property + def is_valid_reply(self) -> bool: + return self.is_correct_address and self.is_data_correct + + @staticmethod + def parse(data: bytes, connection_id: UUID, issuing_command: Command, device Device) -> Reply: + reply = UnknownReply(data, connection_id, issuing_command, Device) + return reply + + def secure_cryptogram_has_been_accepted(self) -> bool: + return self.secure_block_data[0]!=0 + + def match_issuing_command(self, command: Command) -> bool: + return command==self._issuingCommand + + def is_valid_mac(self, mac: bytes) -> bool: + return mac[:self.MAC_SIZE]==self.mac + + def build_reply(self, address: int, control: Control) -> bytes: + command_buffer = bytearray([ + self.SOM, + (self.address | 0x80), + 0x00, + 0x00, + control.control_byte + ]) + + if control.has_security_control_block: + command_buffer.extend(self.security_control_block()) + + command_buffer.append(self.reply_code) + command_buffer.extend(self.data()) + + command_buffer.append(0x00) + if control.use_crc: + command_buffer.append(0x00) + self.add_packet_length(command_buffer) + + if control.use_crc: + self.add_crc(command_buffer) + else: + self.add_checksum(command_buffer) + + return bytes(command_buffer) + + @abstractmethod + def security_control_block(self) -> bytes: + pass + + def __repr__(self): + return "Connection ID: {0} Address: {1} Type: {2}".format(self._connection_id, self.address, self.type) + + def decrypt_data(self, device: Device) -> bytes: + return device.decrypt_data(self.extract_reply_data) \ No newline at end of file diff --git a/osdp/messages/reply_type.py b/osdp/messages/reply_type.py new file mode 100644 index 0000000..ecce8e4 --- /dev/null +++ b/osdp/messages/reply_type.py @@ -0,0 +1,21 @@ +from enum import Enum + +class ReplyType(Enum): + Ack = 0x40 + Nak = 0x41 + PdIdReport = 0x45 + PdCapabilitiesReport = 0x46 + LocalStatusReport = 0x48 + InputStatusReport = 0x49 + OutputStatusReport = 0x4A + ReaderStatusReport = 0x4B + RawReaderData = 0x50 + FormattedReaderData = 0x51 + KeypadData = 0x53 + PdCommunicationsConfigurationReport = 0x54 + BiometricData = 0x57 + BiometricMatchResult = 0x58 + CrypticData = 0x76 + InitialRMac = 0x78 + Busy = 0x79 + ManufactureSpecific = 0x90 diff --git a/osdp/messages/security_block_type.py b/osdp/messages/security_block_type.py new file mode 100644 index 0000000..cd1a7e2 --- /dev/null +++ b/osdp/messages/security_block_type.py @@ -0,0 +1,11 @@ +from enum import Enum + +class SecurityBlockType(Enum): + BeginNewSecureConnectionSequence = 0x11 + SecureConnectionSequenceStep2 = 0x12 + SecureConnectionSequenceStep3 = 0x13 + SecureConnectionSequenceStep4 = 0x14 + CommandMessageWithNoDataSecurity = 0x15 + ReplyMessageWithNoDataSecurity = 0x16 + CommandMessageWithDataSecurity = 0x17 + ReplyMessageWithDataSecurity = 0x18 diff --git a/osdp/messages/security_initialization_request_command.py b/osdp/messages/security_initialization_request_command.py new file mode 100644 index 0000000..8d7d7a9 --- /dev/null +++ b/osdp/messages/security_initialization_request_command.py @@ -0,0 +1,20 @@ +from command import Command + + +class SecurityInitializationRequestCommand(Command): + + def __init__(self, address: int, server_random_number: bytes): + self.address = address + self.server_random_number = server_random_number + + def command_code(self) -> int: + return 0x76 + + def security_control_block(self) -> bytes: + return bytes([ 0x03, 0x11, 0x00 ]) + + def data() -> bytes: + return self.server_random_number + + def custom_command_update(self, command_buffer: bytearray): + pass \ No newline at end of file diff --git a/osdp/messages/server_cryptogram_command.py b/osdp/messages/server_cryptogram_command.py new file mode 100644 index 0000000..7bb7e43 --- /dev/null +++ b/osdp/messages/server_cryptogram_command.py @@ -0,0 +1,20 @@ +from command import Command + + +class ServerCryptogramCommand(Command): + + def __init__(self, address: int, server_cryptogram: bytes): + self.address = address + self.server_cryptogram = server_cryptogram + + def command_code(self) -> int: + return 0x77 + + def security_control_block(self) -> bytes: + return bytes([ 0x03, 0x13, 0x00 ]) + + def data() -> bytes: + return self.server_cryptogram + + def custom_command_update(self, command_buffer: bytearray): + pass \ No newline at end of file diff --git a/osdp/messages/unknown_reply.py b/osdp/messages/unknown_reply.py new file mode 100644 index 0000000..18649df --- /dev/null +++ b/osdp/messages/unknown_reply.py @@ -0,0 +1,22 @@ +from uuid import UUID, uuid4 +from message import Message +from command import Command +from reply import Reply + + +class UnknownReply(Command): + + def __init__(self, data: bytes, connection_id: UUID, issuing_command: Command, device: Device): + super().__init__(data, connection_id, issuing_command, device) + + def reply_code(self) -> int: + return self.type.value + + def security_control_block(self) -> bytes: + security_block_length = len(self.secure_block_data) + 2 + secbk = bytearray([self.security_block_type, security_block_length]) + secbk.extend(self.secure_block_data) + return bytes(secbk) + + def data() -> bytes: + return self.extract_reply_data diff --git a/osdp/model/command_data/output_controls.py b/osdp/model/command_data/output_controls.py new file mode 100644 index 0000000..f8b0fac --- /dev/null +++ b/osdp/model/command_data/output_controls.py @@ -0,0 +1,32 @@ +from enum import Enum + +class OutputControls: + + def __init__(self, controls): + self.controls = controls + + def build_data(self) -> bytes: + data = bytearray() + for control in controls: + data.extend(control.build_data()) + return bytes(data) + +class OutputControl: + + def __init__(self, output_number: int, output_control_code: OutputControlCode, timer: int): + self.output_number = output_number + self.output_control_code = output_control_code + self.timer = timer + + def build_data(self) -> bytes: + timer_bytes = Message.convert_short_to_bytes(self.timer) + return bytes([self.output_number, self.output_control_code.value, timer_bytes[0], timer_bytes[1]]) + +class OutputControlCode(Enum): + Nop = 0x00 + PermanentStateOffAbortTimedOperation = 0x01 + PermanentStateOnAbortTimedOperation = 0x02 + PermanentStateOffAllowTimedOperation = 0x03 + PermanentStateOnAllowTimedOperation = 0x04 + TemporaryStateOnResumePermanentState = 0x05 + TemporaryStateOffResumePermanentState = 0x06 diff --git a/osdp/model/command_data/reader_led_controls.py b/osdp/model/command_data/reader_led_controls.py new file mode 100644 index 0000000..26ff288 --- /dev/null +++ b/osdp/model/command_data/reader_led_controls.py @@ -0,0 +1,74 @@ +from enum import Enum + + +class ReaderLedControls: + + def __init__(self, controls): + self.controls = controls + + def build_data(self) -> bytes: + data = bytearray() + for control in controls: + data.extend(control.build_data()) + return bytes(data) + +class ReaderLedControl: + + def __init__(self, reader_number: int, led_number: int, + temporary_mode: TemporaryReaderControlCode, + temporary_on_time: int, + temporary_off_time: int, + temporary_on_color: LedColor, + temporary_off_color: LedColor, + temporary_timer: int, + permanent_mode: PermanentReaderControlCode, + permanent_on_time: int, + permanent_off_time: int, + permanent_on_color: LedColor, + permanent_off_color: LedColor): + self.reader_number = reader_number + self.led_number = led_number + self.temporary_mode = temporary_mode + self.temporary_on_time = temporary_on_time + self.temporary_off_time = temporary_off_time + self.temporary_on_color = temporary_on_color + self.temporary_off_color = temporary_off_color + self.temporary_timer = temporary_timer + self.permanent_mode = permanent_mode + self.permanent_on_time = permanent_on_time + self.permanent_off_time = permanent_off_time + self.permanent_on_color = permanent_on_color + self.permanent_off_color = permanent_off_color + + def build_data(self) -> bytes: + temporary_timer_bytes = Message.convert_short_to_bytes(self.temporary_timer) + return bytes([self.reader_number, self.led_number, + self.temporary_mode.value, + self.temporary_on_time, + self.temporary_off_time, + self.temporary_on_color.value, + self.temporary_off_color.value, + temporary_timer_bytes[0], + temporary_timer_bytes[1], + self.permanent_mode, + self.permanent_on_time, + self.permanent_off_time, + self.permanent_on_color.value, + self.permanent_off_color.value + ]) + +class TemporaryReaderControlCode(Enum): + Nop = 0x00 + CancelAnyTemporaryAndDisplayPermanent = 0x01 + SetTemporaryAndStartTimer = 0x02 + +class PermanentReaderControlCode(Enum): + Nop = 0x00 + SetPermanentState = 0x02 + +class LedColor(Enum): + Black = 0 + Red = 1 + Green = 2 + Amber = 3 + Blue = 4 \ No newline at end of file diff --git a/osdp/model/reply_data/device_capabilities.py b/osdp/model/reply_data/device_capabilities.py new file mode 100644 index 0000000..a4ca1b1 --- /dev/null +++ b/osdp/model/reply_data/device_capabilities.py @@ -0,0 +1,57 @@ +from enum import Enum + +class DeviceCapabilities: + + def __init__(self, capabilities): + self.capabilities = capabilities + + @staticmethod + def parse_data(reply: Reply) -> DeviceCapabilities: + data = reply.extract_reply_data + if len(data)%3!=0: + raise ValueError("Invalid size for the data") + + capabilities = [] + for i in range(0, len(data), 3): + capabilities.append(DeviceCapability.parse_data(data[i:i+3])) + return DeviceCapabilities(capabilities) + + def __repr__(self): + return '\n\n'.join([str(capability) for capability in self.capabilities]) + +class DeviceCapability: + + def __init__(self, function: CapabilityFunction, compliance: int, number_of: int): + self.function = function + self.compliance = compliance + self.number_of = number_of + + @staticmethod + def parse_data(data: bytes) -> DeviceCapability: + function = CapabilityFunction(data[0]) if data[0]<=14 else CapabilityFunction.Unknown + compliance = data[1] + number_of = data[2] + return DeviceCapability(function, compliance, number_of) + + def __repr__(self): + if(self.function==CapabilityFunction.ReceiveBufferSize or self.function==CapabilityFunction.LargestCombinedMessageSize): + return " Function: {0}\n Size: {1}".format(self.function.name, Message.convert_bytes_to_short(bytes([self.compliance, self.number_of]))) + else: + return " Function: {0}\nCompliance: {1}\n Number Of: {2}".format(self.function.name, self.compliance, self.number_of) + +class CapabilityFunction(Enum): + Unknown = 0 + ContactStatusMonitoring = 1 + OutputControl = 2 + CardDataFormat = 3 + ReaderLEDControl = 4 + ReaderAudibleOutput = 5 + ReaderTextOutput = 6 + TimeKeeping = 7 + CheckCharacterSupport = 8 + CommunicationSecurity = 9 + ReceiveBufferSize = 10 + LargestCombinedMessageSize = 11 + SmartCardSupport = 12 + Readers = 13 + Biometrics = 14 \ No newline at end of file diff --git a/osdp/model/reply_data/device_identification.py b/osdp/model/reply_data/device_identification.py new file mode 100644 index 0000000..e01dfcb --- /dev/null +++ b/osdp/model/reply_data/device_identification.py @@ -0,0 +1,33 @@ + + +class DeviceIdentification: + + def __init__(self, vendor_code: bytes, model_number: int, version: int, serial_number: int, firmware_major: int, firmware_minor: int, firmware_build: int): + self.vendor_code = vendor_code + self.model_number = model_number + self.version = version + self.serial_number = serial_number + self.firmware_major = firmware_major + self.firmware_minor = firmware_minor + self.firmware_build = firmware_build + + @staticmethod + def parse_data(reply: Reply) -> DeviceIdentification: + data = reply.extract_reply_data + if len(data)!=12: + raise ValueError("Invalid size for the data") + + vendor_code = data[0:3] + model_number = data[3] + version = data[4] + serial_number = Message.convert_bytes_to_int(data[5:9]) + firmware_major = data[9] + firmware_minor = data[10] + firmware_build = data[11] + return DeviceIdentification(vendor_code, model_number, version, serial_number, firmware_major, firmware_minor, firmware_build) + + def __repr__(self): + return " Vendor Code: {0}\n Model Number: {1}\n Version: {2}\n Serial Number: {3}\nFirmware Version: {4}.{5}.{6}".format( + self.vendor_code.hex(), self.model_number, self.version, Message.convert_int_to_bytes(self.serial_number).hex(), self.firmware_major, self.firmware_minor, self.firmware_build) + + diff --git a/osdp/model/reply_data/input_status.py b/osdp/model/reply_data/input_status.py new file mode 100644 index 0000000..0ae24b0 --- /dev/null +++ b/osdp/model/reply_data/input_status.py @@ -0,0 +1,15 @@ + + +class InputStatus: + + def __init__(self, statuses): + self.statuses = statuses + + @staticmethod + def parse_data(reply: Reply) -> InputStatus: + data = reply.extract_reply_data + statuses = map(lambda b : b!=0, data) + return InputStatus(statuses) + + def __repr__(self): + return 'Input: [' + ', '.join([str(status) for status in self.statuses]) + ']' \ No newline at end of file diff --git a/osdp/model/reply_data/nak.py b/osdp/model/reply_data/nak.py new file mode 100644 index 0000000..0eb46e7 --- /dev/null +++ b/osdp/model/reply_data/nak.py @@ -0,0 +1,34 @@ +from enum import Enum + +class Nak: + + def __init__(self, error_code: ErrorCode, extra_data: bytes): + self.error_code = error_code + self.extra_data = extra_data + + @staticmethod + def parse_data(reply: Reply) -> Nak: + data = reply.extract_reply_data + if len(data)<1: + raise ValueError("Invalid size for the data") + + error_code = ErrorCode(data[0]) + extra_data = data[1:] + return Nak(error_code, extra_data) + + def __repr__(self): + return "Error: {0}\n Data: {1}".format(self.error_code.name, self.extra_data.hex()) + + +class ErrorCode(Enum): + NoError = 0x0 + BadChecksumOrCrc = 0x1 + InvalidCommandLength = 0x2 + UnknownCommandCode = 0x3 + UnexpectedSequenceNumber = 0x4 + DoesNotSupportSecurityBlock = 0x5 + CommunicationSecurityNotMet = 0x6 + BioTypeNotSupported = 0x7 + BioFormatNotSupported = 0x8 + UnableToProcessCommand = 0x9 + GenericError = 0xFF \ No newline at end of file diff --git a/osdp/model/reply_data/out_status.py b/osdp/model/reply_data/out_status.py new file mode 100644 index 0000000..5220d45 --- /dev/null +++ b/osdp/model/reply_data/out_status.py @@ -0,0 +1,15 @@ + + +class OutputStatus: + + def __init__(self, statuses): + self.statuses = statuses + + @staticmethod + def parse_data(reply: Reply) -> OutputStatus: + data = reply.extract_reply_data + statuses = map(lambda b : b!=0, data) + return OutputStatus(statuses) + + def __repr__(self): + return 'Output: [' + ', '.join([str(status) for status in self.statuses]) + ']' \ No newline at end of file diff --git a/osdp/model/reply_data/raw_card_data.py b/osdp/model/reply_data/raw_card_data.py new file mode 100644 index 0000000..682347a --- /dev/null +++ b/osdp/model/reply_data/raw_card_data.py @@ -0,0 +1,29 @@ +from enum import Enum + + +class RawCardData: + + def __init__(self, reader_number: int, format_code: FormatCode, bit_count: int, data: bytes): + self.reader_number = reader_number + self.format_code = format_code + self.bit_count = bit_count + self.data = data + + @staticmethod + def parse_data(reply: Reply) -> Nak: + data = reply.extract_reply_data + if len(data)<4: + raise ValueError("Invalid size for the data") + + reader_number = data[0] + format_code = FormatCode(data[1]) + bit_count = Message.convert_bytes_to_short(data[2:4]) + data = data[4:] + return RawCardData(reader_number, format_code, bit_count, data) + + def __repr__(self): + return "Reader Number: {0}\n Format Code: {1}\n Bit Count: {2}\n Data: {3}".format(self.reader_number, self.format_code.name, self.bit_count, self.data.hex()) + +class FormatCode(Enum): + NotSpecified = 0x0 + Wiegand = 0x1 \ No newline at end of file diff --git a/osdp/model/reply_data/reader_status.py b/osdp/model/reply_data/reader_status.py new file mode 100644 index 0000000..c6aed14 --- /dev/null +++ b/osdp/model/reply_data/reader_status.py @@ -0,0 +1,20 @@ + + +class ReaderStatus: + + def __init__(self, statuses): + self.statuses = statuses + + @staticmethod + def parse_data(reply: Reply) -> ReaderStatus: + data = reply.extract_reply_data + statuses = map(lambda b : ReaderTamperStatus(b), data) + return ReaderStatus(statuses) + + def __repr__(self): + return 'Reader Status: [' + ', '.join([str(status) for status in self.statuses]) + ']' + +class ReaderTamperStatus(Enum): + Normal = 0x00 + NotConnected = 0x01 + Tamper = 0x02 \ No newline at end of file diff --git a/osdp/secure_channel.py b/osdp/secure_channel.py new file mode 100644 index 0000000..e53c1ed --- /dev/null +++ b/osdp/secure_channel.py @@ -0,0 +1,113 @@ + +from Crypto import Random +from Crypto.Cipher import AES + + +class SecureChannel: + + default_secure_channel_key = bytes([0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x3A, 0x3B, 0x3C, 0x3D, 0x3E, 0x3F]) + + def __init__(self): + self._cmac = None + self._enc = None + self._rmac = None + self._smac1 = None + self._smac2 = None + + self.server_random_number = None + self.server_cryptogram = None + self.is_initialized = False + self.is_established = False + self.reset() + + def initialize(self, cuid: bytes, client_random_number: bytes, client_cryptogram: bytes): + self._enc = self.generate_key( + bytes([0x01, 0x82, self.server_random_number[0], self.server_random_number[1], self.server_random_number[2], self.server_random_number[3], self.server_random_number[4], self.server_random_number[5]]), + bytes([0x00]*8), self.default_secure_channel_key + ) + + if client_cryptogram!=self.generate_key(self.server_random_number, client_cryptogram, self._enc): + raise Exception("Invalid client cryptogram") + + + self._smac1 = self.generate_key( + bytes([0x01, 0x01, self.server_random_number[0], self.server_random_number[1], self.server_random_number[2], self.server_random_number[3], self.server_random_number[4], self.server_random_number[5]]), + bytes([0x00]*8), self.default_secure_channel_key + ) + self._smac2 = self.generate_key( + bytes([0x01, 0x02, self.server_random_number[0], self.server_random_number[1], self.server_random_number[2], self.server_random_number[3], self.server_random_number[4], self.server_random_number[5]]), + bytes([0x00]*8), self.default_secure_channel_key + ) + self.server_cryptogram = self.generate_key(client_random_number, self._server_random_number, self._enc) + self.is_initialized = True + + + def establish(self, rmac: bytes): + self._rmac = rmac + self.is_established = True + + def generate_mac(self, message: bytes, is_command: bool): + crypto_length = 16 + padding_start = 0x80 + + mac = b'\x00' * crypto_length + current_location = 0 + key = self._smac1 + iv = self._rmac if is_command else self._cmac + + while current_location len(message): + key = self._smac2 + if len(message)%crypto_length!=0: + input_buffer[len(message)%crypto_length] = padding_start + + cipher = AES.new(key, AES.MODE_CBC, iv) + mac = cipher.encrypt(bytes(input_buffer)) + iv = mac + + if is_command: + self._cmac = mac + else: + self._rmac = mac + return mac + + def decrypt_data(self, data: bytes) -> bytes: + padding_start = 0x80 + + key = self._enc + iv = bytes([ (~b) & 0xFF for b in self._cmac ]) + cipher = AES.new(key, AES.MODE_CBC, iv) + padded_data = cipher.decrypt(data) + decrypted_data = bytearray(padded_data) + while len(decrypted_data)>0 and decrypted_data[-1]!=padding_start: + decrypted_data.pop() + if len(decrypted_data)>0 and decrypted_data[-1]==padding_start: + decrypted_data.pop() + return bytes(decrypted_data) + + def encrypt_data(self, data: bytes) -> bytes: + crypto_length = 16 + padding_start = 0x80 + padded_data = bytearray(data) + padded_data.append(padding_start) + while len(padded_data)%crypto_length!=0: + padded_data.append(0x00) + + key = self._enc + iv = bytes([ (~b) & 0xFF for b in self._rmac ]) + cipher = AES.new(key, AES.MODE_CBC, iv) + return cipher.encrypt(padded_data) + + def reset(self): + self.server_random_number = Random.new().read(8) + self.is_initialized = False + self.is_established = False + + def generate_key(self, first: bytes, second: bytes, key: bytes) -> bytes: + cipher = AES.new(key, AES.MODE_ECB) + return cipher.encrypt(first + second) \ No newline at end of file