diff --git a/osdp/__init__.py b/osdp/__init__.py index c6e5fa6..6ee2128 100644 --- a/osdp/__init__.py +++ b/osdp/__init__.py @@ -1,7 +1,4 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - # Python OSDP Module. """ @@ -14,18 +11,31 @@ """ -from ._types import ReplyType, SecurityBlockType, Control, ErrorCode, Nak, DeviceIdentification, CapabilityFunction, DeviceCapability, DeviceCapabilities, InputStatus, OutputStatus, LocalStatus, ReaderTamperStatus, ReaderStatus, OutputControlCode, OutputControl, OutputControls, TemporaryReaderControlCode, PermanentReaderControlCode, LedColor, ReaderLedControl, ReaderLedControls, ToneCode, ReaderBuzzerControl, TextCommand, ReaderTextOutput, FormatCode, RawCardData, KeypadData, DataEvent -from ._connection import OsdpConnection, SerialPortOsdpConnection, TcpClientOsdpConnection, TcpServerOsdpConnection +from ._types import ( + ReplyType, SecurityBlockType, Control, ErrorCode, Nak, DeviceIdentification, CapabilityFunction, + DeviceCapability, DeviceCapabilities, InputStatus, OutputStatus, LocalStatus, ReaderTamperStatus, + ReaderStatus, OutputControlCode, OutputControl, OutputControls, TemporaryReaderControlCode, + PermanentReaderControlCode, LedColor, ReaderLedControl, ReaderLedControls, ToneCode, ReaderBuzzerControl, + TextCommand, ReaderTextOutput, FormatCode, RawCardData, KeypadData, DataEvent +) +from ._connection import ( + OsdpConnection, SerialPortOsdpConnection, TcpClientOsdpConnection, TcpServerOsdpConnection +) from ._device import Device from ._message import Message -from ._command import Command, PollCommand, IdReportCommand, DeviceCapabilitiesCommand, LocalStatusReportCommand, InputStatusReportCommand, OutputStatusReportCommand, ReaderStatusReportCommand, OutputControlCommand, ReaderLedControlCommand, ReaderBuzzerControlCommand, ReaderTextOutputCommand, SetDateTimeCommand, SecurityInitializationRequestCommand, ServerCryptogramCommand, ManufacturerSpecificCommand +from ._command import ( + Command, PollCommand, IdReportCommand, DeviceCapabilitiesCommand, LocalStatusReportCommand, + InputStatusReportCommand, OutputStatusReportCommand, ReaderStatusReportCommand, + OutputControlCommand, ReaderLedControlCommand, ReaderBuzzerControlCommand, + ReaderTextOutputCommand, SetDateTimeCommand, SecurityInitializationRequestCommand, + ServerCryptogramCommand, ManufacturerSpecificCommand +) from ._reply import Reply, AckReply, UnknownReply from ._secure_channel import SecureChannel from ._bus import Bus from ._control_panel import ControlPanel - __author__ = 'Ryan Hu' __copyright__ = 'Copyright 2019 Ryan Hu and Contributors' -__license__ = 'Apache License, Version 2.0' \ No newline at end of file +__license__ = 'Apache License, Version 2.0' diff --git a/osdp/_bus.py b/osdp/_bus.py index 9f350ab..941d478 100644 --- a/osdp/_bus.py +++ b/osdp/_bus.py @@ -1,11 +1,10 @@ import logging from datetime import datetime, timedelta import time -from queue import Queue from threading import Lock -from uuid import UUID, uuid4 +from uuid import uuid4 -from ._types import * +from ._types import ReplyType, ErrorCode from ._connection import OsdpConnection from ._device import Device from ._message import Message @@ -14,11 +13,11 @@ log = logging.getLogger('osdp') -''' -A group of OSDP devices sharing communications -''' -class Bus: +class Bus: + ''' + A group of OSDP devices sharing communications + ''' DRIVER_BYTE = 0xFF def __init__(self, connection: OsdpConnection, on_reply_received): @@ -32,7 +31,7 @@ def __init__(self, connection: OsdpConnection, on_reply_received): @property def idle_line_delay(self) -> timedelta: - return timedelta(milliseconds=(1000.0/self._connection.baud_rate * 16.0) * 100) + return timedelta(milliseconds=(1000.0 / self._connection.baud_rate * 16.0) * 100) def close(self): self._is_shutting_down = True @@ -124,12 +123,12 @@ def process_reply(self, reply: Reply, device: Device): if reply.type == ReplyType.Nak: extract_reply_data = reply.extract_reply_data error_code = ErrorCode(extract_reply_data[0]) - if error_code==ErrorCode.DoesNotSupportSecurityBlock or error_code==ErrorCode.DoesNotSupportSecurityBlock: + if error_code == ErrorCode.DoesNotSupportSecurityBlock or error_code == ErrorCode.DoesNotSupportSecurityBlock: device.reset_security() - if reply.type==ReplyType.CrypticData: + if reply.type == ReplyType.CrypticData: device.initialize_secure_channel(reply) - elif reply.type==ReplyType.InitialRMac: + elif reply.type == ReplyType.InitialRMac: device.validate_secure_channel_establishment(reply) if self._on_reply_received is not None: @@ -168,8 +167,8 @@ def extract_message_length(self, reply_buffer: bytearray) -> int: def wait_for_rest_of_message(self, buffer: bytearray, reply_length: int): while len(buffer) < reply_length: - bytes_read = self._connection.read(reply_length-len(buffer)) - if len(bytes_read)>0: + bytes_read = self._connection.read(reply_length - len(buffer)) + if len(bytes_read) > 0: buffer.extend(bytes_read) else: return False @@ -178,7 +177,7 @@ def wait_for_rest_of_message(self, buffer: bytearray, reply_length: int): def wait_for_message_length(self, buffer: bytearray) -> bool: while len(buffer) < 4: bytes_read = self._connection.read(4) - if len(bytes_read)>0: + if len(bytes_read) > 0: buffer.extend(bytes_read) else: return False @@ -187,13 +186,12 @@ def wait_for_message_length(self, buffer: bytearray) -> bool: def wait_for_start_of_message(self, buffer: bytearray) -> bool: while True: bytes_read = self._connection.read(1) - if len(bytes_read)==0: + if len(bytes_read) == 0: return False - if bytes_read[0]!=Message.SOM: + if bytes_read[0] != Message.SOM: continue buffer.extend(bytes_read) break - return True - \ No newline at end of file + return True diff --git a/osdp/_command.py b/osdp/_command.py index 3f9a054..2c31fbd 100644 --- a/osdp/_command.py +++ b/osdp/_command.py @@ -1,9 +1,10 @@ -from abc import ABC, abstractmethod +from abc import abstractmethod -from ._types import * +from ._types import OutputControls, ReaderLedControls, ReaderBuzzerControl, ReaderTextOutput from ._message import Message import datetime + class Command(Message): def __init__(self): @@ -63,6 +64,7 @@ def build_command(self, device) -> bytes: return bytes(command_buffer) + class PollCommand(Command): def __init__(self, address: int): @@ -73,7 +75,7 @@ def command_code(self) -> int: return 0x60 def security_control_block(self) -> bytes: - return bytes([ 0x02, 0x15 ]) + return bytes([0x02, 0x15]) def data(self) -> bytes: return bytes([]) @@ -81,6 +83,7 @@ def data(self) -> bytes: def custom_command_update(self, command_buffer: bytearray): pass + class IdReportCommand(Command): def __init__(self, address: int): @@ -91,14 +94,15 @@ def command_code(self) -> int: return 0x61 def security_control_block(self) -> bytes: - return bytes([ 0x02, 0x17 ]) + return bytes([0x02, 0x17]) def data(self) -> bytes: - return bytes([ 0x00 ]) + return bytes([0x00]) def custom_command_update(self, command_buffer: bytearray): pass + class DeviceCapabilitiesCommand(Command): def __init__(self, address: int): @@ -109,14 +113,15 @@ def command_code(self) -> int: return 0x62 def security_control_block(self) -> bytes: - return bytes([ 0x02, 0x17 ]) + return bytes([0x02, 0x17]) def data(self) -> bytes: - return bytes([ 0x00 ]) + return bytes([0x00]) def custom_command_update(self, command_buffer: bytearray): pass + class LocalStatusReportCommand(Command): def __init__(self, address: int): @@ -127,7 +132,7 @@ def command_code(self) -> int: return 0x64 def security_control_block(self) -> bytes: - return bytes([ 0x02, 0x15 ]) + return bytes([0x02, 0x15]) def data(self) -> bytes: return bytes([]) @@ -135,6 +140,7 @@ def data(self) -> bytes: def custom_command_update(self, command_buffer: bytearray): pass + class InputStatusReportCommand(Command): def __init__(self, address: int): @@ -145,14 +151,15 @@ def command_code(self) -> int: return 0x65 def security_control_block(self) -> bytes: - return bytes([ 0x02, 0x15 ]) + return bytes([0x02, 0x15]) def data(self) -> bytes: - return bytes([ ]) + return bytes([]) def custom_command_update(self, command_buffer: bytearray): pass + class OutputStatusReportCommand(Command): def __init__(self, address: int): @@ -163,7 +170,7 @@ def command_code(self) -> int: return 0x66 def security_control_block(self) -> bytes: - return bytes([ 0x02, 0x15 ]) + return bytes([0x02, 0x15]) def data(self) -> bytes: return bytes([]) @@ -171,6 +178,7 @@ def data(self) -> bytes: def custom_command_update(self, command_buffer: bytearray): pass + class ReaderStatusReportCommand(Command): def __init__(self, address: int): @@ -181,7 +189,7 @@ def command_code(self) -> int: return 0x67 def security_control_block(self) -> bytes: - return bytes([ 0x02, 0x15 ]) + return bytes([0x02, 0x15]) def data(self) -> bytes: return bytes([]) @@ -189,6 +197,7 @@ def data(self) -> bytes: def custom_command_update(self, command_buffer: bytearray): pass + class OutputControlCommand(Command): def __init__(self, address: int, output_controls: OutputControls): @@ -200,7 +209,7 @@ def command_code(self) -> int: return 0x68 def security_control_block(self) -> bytes: - return bytes([ 0x02, 0x17 ]) + return bytes([0x02, 0x17]) def data(self) -> bytes: return self.output_controls.build_data() @@ -208,6 +217,7 @@ def data(self) -> bytes: def custom_command_update(self, command_buffer: bytearray): pass + class ReaderLedControlCommand(Command): def __init__(self, address: int, reader_led_controls: ReaderLedControls): @@ -219,7 +229,7 @@ def command_code(self) -> int: return 0x69 def security_control_block(self) -> bytes: - return bytes([ 0x02, 0x17 ]) + return bytes([0x02, 0x17]) def data(self) -> bytes: return self.reader_led_controls.build_data() @@ -227,6 +237,7 @@ def data(self) -> bytes: def custom_command_update(self, command_buffer: bytearray): pass + class ReaderBuzzerControlCommand(Command): def __init__(self, address: int, reader_buzzer_control: ReaderBuzzerControl): @@ -238,7 +249,7 @@ def command_code(self) -> int: return 0x6A def security_control_block(self) -> bytes: - return bytes([ 0x02, 0x17 ]) + return bytes([0x02, 0x17]) def data(self) -> bytes: return self.reader_buzzer_control.build_data() @@ -246,6 +257,7 @@ def data(self) -> bytes: def custom_command_update(self, command_buffer: bytearray): pass + class ReaderTextOutputCommand(Command): def __init__(self, address: int, reader_text_output: ReaderTextOutput): @@ -257,7 +269,7 @@ def command_code(self) -> int: return 0x6B def security_control_block(self) -> bytes: - return bytes([ 0x02, 0x17 ]) + return bytes([0x02, 0x17]) def data(self) -> bytes: return self.reader_text_output.build_data() @@ -265,6 +277,7 @@ def data(self) -> bytes: def custom_command_update(self, command_buffer: bytearray): pass + class SetDateTimeCommand(Command): def __init__(self, address: int, timestamp: datetime.datetime): @@ -276,7 +289,7 @@ def command_code(self) -> int: return 0x6D def security_control_block(self) -> bytes: - return bytes([ 0x02, 0x17 ]) + return bytes([0x02, 0x17]) def data(self) -> bytes: year_bytes = self.timestamp.year.to_bytes(2, byteorder='little') @@ -293,6 +306,7 @@ def data(self) -> bytes: def custom_command_update(self, command_buffer: bytearray): pass + class ManufacturerSpecificCommand(Command): def __init__(self, address: int, manufacturer_data: bytes): @@ -304,7 +318,7 @@ def command_code(self) -> int: return 0x80 def security_control_block(self) -> bytes: - return bytes([ 0x02, 0x17 ]) + return bytes([0x02, 0x17]) def data(self) -> bytes: return self.manufacturer_data @@ -312,6 +326,7 @@ def data(self) -> bytes: def custom_command_update(self, command_buffer: bytearray): pass + class SecurityInitializationRequestCommand(Command): def __init__(self, address: int, server_random_number: bytes): @@ -323,7 +338,7 @@ def command_code(self) -> int: return 0x76 def security_control_block(self) -> bytes: - return bytes([ 0x03, 0x11, 0x00 ]) + return bytes([0x03, 0x11, 0x00]) def data(self) -> bytes: return self.server_random_number @@ -331,6 +346,7 @@ def data(self) -> bytes: def custom_command_update(self, command_buffer: bytearray): pass + class ServerCryptogramCommand(Command): def __init__(self, address: int, server_cryptogram: bytes): @@ -342,11 +358,10 @@ def command_code(self) -> int: return 0x77 def security_control_block(self) -> bytes: - return bytes([ 0x03, 0x13, 0x00 ]) + return bytes([0x03, 0x13, 0x00]) def data(self) -> bytes: return self.server_cryptogram def custom_command_update(self, command_buffer: bytearray): pass - diff --git a/osdp/_connection.py b/osdp/_connection.py index 27c4055..4be91d2 100644 --- a/osdp/_connection.py +++ b/osdp/_connection.py @@ -1,9 +1,9 @@ -from abc import ABC, abstractmethod +from abc import ABC, abstractmethod import serial import fcntl import struct import socket -import sys + class OsdpConnection(ABC): @@ -30,12 +30,12 @@ def write(self, buf: bytes): pass @abstractmethod - def read(self, size: int=1) -> bytes: + def read(self, size: int = 1) -> bytes: pass class SerialPortOsdpConnection(OsdpConnection): - + def __init__(self, port: str, baud_rate: int): self._port = port self._baud_rate = baud_rate @@ -66,11 +66,12 @@ def close(self): def write(self, buf: bytes): self.serial_port.write(buf) - def read(self, size: int=1) -> bytes: + def read(self, size: int = 1) -> bytes: return self.serial_port.read(size) + class TcpClientOsdpConnection(OsdpConnection): - + def __init__(self, server: str, port_number: int): self._server = server self._port_number = port_number @@ -98,18 +99,19 @@ def close(self): def write(self, buf: bytes): try: self.sock.send(buf) - except socket.timeout as e: - self.is_connected = False + except socket.timeout: + self.is_connected = False - def read(self, size: int=1) -> bytes: + def read(self, size: int = 1) -> bytes: try: return self.sock.recv(size) - except socket.timeout as e: + except socket.timeout: self.is_connected = False return b'' + class TcpServerOsdpConnection(OsdpConnection): - + def __init__(self, port_number: int): self._port_number = port_number self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -137,12 +139,12 @@ def close(self): def write(self, buf: bytes): try: self.connection.sendall(buf) - except socket.timeout as e: + except socket.timeout: self.close() - def read(self, size: int=1) -> bytes: + def read(self, size: int = 1) -> bytes: try: return self.connection.recv(size) - except socket.timeout as e: + except socket.timeout: self.close() return b'' diff --git a/osdp/_control_panel.py b/osdp/_control_panel.py index 27d74a6..09b0a30 100644 --- a/osdp/_control_panel.py +++ b/osdp/_control_panel.py @@ -1,13 +1,16 @@ import logging -from queue import Queue -from datetime import datetime, timedelta -from uuid import UUID, uuid4 +from uuid import UUID from threading import Thread -from ._types import * +from ._types import ( + DeviceIdentification, DeviceCapabilities, LocalStatus, InputStatus, OutputStatus, ReaderStatus, + OutputControls, ReplyType, ReaderLedControls, Reply, DataEvent, Nak, RawCardData, KeypadData +) from ._connection import OsdpConnection -from ._command import * -from ._reply import * +from ._command import ( + Command, IdReportCommand, DeviceCapabilitiesCommand, LocalStatusReportCommand, InputStatusReportCommand, + OutputStatusReportCommand, ReaderStatusReportCommand, OutputControlCommand, ReaderLedControlCommand +) from ._bus import Bus @@ -15,6 +18,7 @@ console_handler = logging.StreamHandler() log.addHandler(console_handler) + class ControlPanel: def __init__(self): @@ -25,7 +29,7 @@ def __init__(self): def start_connection(self, connection: OsdpConnection) -> UUID: bus = Bus(connection, self.on_reply_received) self._buses[bus.id] = bus - thread = Thread(target = bus.run_polling_loop) + thread = Thread(target=bus.run_polling_loop) thread.start() return bus.id @@ -67,6 +71,7 @@ def is_online(self, connection_id: UUID, address: int) -> bool: def send_command(self, connection_id: UUID, command: Command) -> Reply: event = DataEvent() + def reply_fetcher(reply: Reply): if reply.match_issuing_command(command): self._reply_handlers.remove(reply_fetcher) @@ -85,7 +90,7 @@ def reply_fetcher(reply: Reply): def shutdown(self): for bus in list(self._buses.values()): bus.close() - + def add_device(self, connection_id: UUID, address: int, use_crc: bool, use_secure_channel: bool): bus = self._buses.get(connection_id) if bus is not None: @@ -98,33 +103,32 @@ def remove_device(self, connection_id: UUID, address: int): def on_reply_received(self, reply: Reply): for reply_hander in self._reply_handlers: - reply_hander(reply) + reply_hander(reply) - if reply.type==ReplyType.Nak: + if reply.type == ReplyType.Nak: self.on_nak_reply_received(reply.address, Nak.parse_data(reply)) - elif reply.type==ReplyType.LocalStatusReport: + elif reply.type == ReplyType.LocalStatusReport: self.on_local_status_report_reply_received(reply.address, LocalStatus.parse_data(reply)) - elif reply.type==ReplyType.InputStatusReport: + elif reply.type == ReplyType.InputStatusReport: self.on_input_status_report_reply_received(reply.address, InputStatus.parse_data(reply)) - elif reply.type==ReplyType.OutputStatusReport: + elif reply.type == ReplyType.OutputStatusReport: self.on_output_status_report_reply_received(reply.address, OutputStatus.parse_data(reply)) - elif reply.type==ReplyType.ReaderStatusReport: + elif reply.type == ReplyType.ReaderStatusReport: self.on_reader_status_report_reply_received(reply.address, ReaderStatus.parse_data(reply)) - elif reply.type==ReplyType.FormattedReaderData: + elif reply.type == ReplyType.FormattedReaderData: self.on_formatted_reader_data_reply_received(reply.address, reply.extract_reply_data) - elif reply.type==ReplyType.RawReaderData: + elif reply.type == ReplyType.RawReaderData: self.on_raw_card_data_reply_received(reply.address, RawCardData.parse_data(reply)) - elif reply.type==ReplyType.KeypadData: + elif reply.type == ReplyType.KeypadData: self.on_keypad_data_reply_received(reply.address, KeypadData.parse_data(reply)) - def on_nak_reply_received(self, address: int, nak: Nak): log.debug("%s < Nak received %s", address, nak) diff --git a/osdp/_device.py b/osdp/_device.py index 53f4542..7ec27bf 100644 --- a/osdp/_device.py +++ b/osdp/_device.py @@ -1,13 +1,15 @@ import queue import datetime -from ._types import * -from ._command import * +from ._types import Control +from ._command import ( + PollCommand, SecurityInitializationRequestCommand, ServerCryptogramCommand +) from ._secure_channel import SecureChannel class Device(object): - + def __init__(self, address: int, use_crc: bool, use_secure_channel: bool): self._use_secure_channel = use_secure_channel self.address = address @@ -19,18 +21,18 @@ def __init__(self, address: int, use_crc: bool, use_secure_channel: bool): @property def is_security_established(self) -> bool: - return self.message_control.has_security_control_block and self._secureChannel.is_established + return self.message_control.has_security_control_block and self._secure_channel.is_established @property def is_online(self) -> bool: return self._last_valid_reply + datetime.timedelta(seconds=5) >= datetime.datetime.now() def get_next_command_data(self): - if self.message_control.sequence==0: + if self.message_control.sequence == 0: return PollCommand(self.address) if self._use_secure_channel and not self._secure_channel.is_initialized: - return SecurityInitializationRequestCommand(self.address, self._secure_channel.serverRandomNumber) + return SecurityInitializationRequestCommand(self.address, self._secure_channel.server_random_number) if self._use_secure_channel and not self._secure_channel.is_established: return ServerCryptogramCommand(self.address, self._secure_channel.serverCryptogram) @@ -50,13 +52,13 @@ def valid_reply_has_been_received(self): def initialize_secure_channel(self, reply): reply_data = reply.extract_reply_data - _secureChannel.initialize(reply_data[:8], reply_data[8:16], reply_data[16:32]) + self._secure_channel.initialize(reply_data[:8], reply_data[8:16], reply_data[16:32]) def validate_secure_channel_establishment(self, reply) -> bool: if not reply.secure_cryptogram_has_been_accepted(): return False - _secureChannel.establish(reply.extract_reply_data); + self._secure_channel.establish(reply.extract_reply_data) return True def generate_mac(self, message: bytes, is_command: bool): diff --git a/osdp/_message.py b/osdp/_message.py index c8fca7f..ced9b7f 100644 --- a/osdp/_message.py +++ b/osdp/_message.py @@ -1,7 +1,8 @@ from abc import ABC, abstractmethod + class Message(ABC): - + SOM = 0x53 crc_table = [ @@ -68,8 +69,7 @@ def add_checksum(self, packet: bytearray): def encrypted_data(self, device) -> bytes: data = self.data() - if len(data)>0: + if len(data) > 0: return device.encrypt_data(data) else: return data - diff --git a/osdp/_reply.py b/osdp/_reply.py index eb0176e..c633ce6 100644 --- a/osdp/_reply.py +++ b/osdp/_reply.py @@ -1,11 +1,12 @@ -from abc import ABC, abstractmethod +from abc import abstractmethod from uuid import UUID -from ._types import * +from ._types import SecurityBlockType, ReplyType, Control from ._message import Message from ._command import Command from ._device import Device + class Reply(Message): ADDRESS_MASK = 0x7F @@ -23,13 +24,16 @@ def __init__(self, data: bytes, connection_id: UUID, issuing_command: Command, d self._address = data[1] & self.ADDRESS_MASK self._sequence = data[4] & 0x03 - is_using_crc = (data[4] & 0x04)!=0 + is_using_crc = (data[4] & 0x04) != 0 reply_message_footer_size = 2 if is_using_crc else 1 - is_secure_control_block_present = (data[4] & 0x08)!=0 + is_secure_control_block_present = (data[4] & 0x08) != 0 secure_block_size = (data[5] & 0xFF) if is_secure_control_block_present else 0 self._security_block_type = (data[6] & 0xFF) if is_secure_control_block_present else 0 - self._secure_block_data = data[(self.REPLY_MESSAGE_HEADER_SIZE + 2):][:(secure_block_size-2)] if is_secure_control_block_present else b'' + if is_secure_control_block_present: + self._secure_block_data = data[(self.REPLY_MESSAGE_HEADER_SIZE + 2):][:(secure_block_size - 2)] + else: + self._secure_block_data = b'' mac_size = self.MAC_SIZE if self.is_secure_message else 0 message_length = len(data) - (reply_message_footer_size + mac_size) @@ -41,13 +45,13 @@ def __init__(self, data: bytes, connection_id: UUID, issuing_command: Command, d data_end = - reply_message_footer_size - mac_size self._extract_reply_data = data[data_start:data_end] - if self.security_block_type==SecurityBlockType.ReplyMessageWithDataSecurity.value: - self._extract_reply_data = self.decrypt_data(device); + if self.security_block_type == SecurityBlockType.ReplyMessageWithDataSecurity.value: + self._extract_reply_data = self.decrypt_data(device) if is_using_crc: - self._is_data_correct = self.calculate_crc(data[:-2])==int.from_bytes(data[-2:], byteorder='little') + self._is_data_correct = self.calculate_crc(data[:-2]) == int.from_bytes(data[-2:], byteorder='little') else: - self._is_data_correct = self.calculate_checksum(data[:-1])==int.from_bytes(data[-1:], byteorder='little') + self._is_data_correct = self.calculate_checksum(data[:-1]) == int.from_bytes(data[-1:], byteorder='little') self._message_for_mac_generation = data[:message_length] @@ -108,13 +112,13 @@ def parse(data: bytes, connection_id: UUID, issuing_command: Command, device: De return reply def secure_cryptogram_has_been_accepted(self) -> bool: - return self.secure_block_data[0]!=0 + return self.secure_block_data[0] != 0 def match_issuing_command(self, command: Command) -> bool: - return command==self._issuing_command + return command == self._issuing_command def is_valid_mac(self, mac: bytes) -> bool: - return mac[:self.MAC_SIZE]==self.mac + return mac[:self.MAC_SIZE] == self.mac def build_reply(self, address: int, control: Control) -> bytes: command_buffer = bytearray([ @@ -161,10 +165,10 @@ def reply_code(self) -> int: return 0x40 def security_control_block(self) -> bytes: - return bytes([ 0x02, 0x16 ]) + return bytes([0x02, 0x16]) def data(self) -> bytes: - return bytes([ ]) + return bytes([]) class UnknownReply(Reply): @@ -184,4 +188,3 @@ def security_control_block(self) -> bytes: def data(self) -> bytes: return self.extract_reply_data - diff --git a/osdp/_secure_channel.py b/osdp/_secure_channel.py index e53c1ed..38fd9f3 100644 --- a/osdp/_secure_channel.py +++ b/osdp/_secure_channel.py @@ -5,7 +5,10 @@ class SecureChannel: - default_secure_channel_key = bytes([0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x3A, 0x3B, 0x3C, 0x3D, 0x3E, 0x3F]) + default_secure_channel_key = bytes([ + 0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, + 0x38, 0x39, 0x3A, 0x3B, 0x3C, 0x3D, 0x3E, 0x3F + ]) def __init__(self): self._cmac = None @@ -22,26 +25,44 @@ def __init__(self): def initialize(self, cuid: bytes, client_random_number: bytes, client_cryptogram: bytes): self._enc = self.generate_key( - bytes([0x01, 0x82, self.server_random_number[0], self.server_random_number[1], self.server_random_number[2], self.server_random_number[3], self.server_random_number[4], self.server_random_number[5]]), - bytes([0x00]*8), self.default_secure_channel_key + bytes([ + 0x01, 0x82, + self.server_random_number[0], self.server_random_number[1], self.server_random_number[2], + self.server_random_number[3], self.server_random_number[4], self.server_random_number[5] + ]), + bytes([0x00] * 8), + self.default_secure_channel_key ) - if client_cryptogram!=self.generate_key(self.server_random_number, client_cryptogram, self._enc): + if client_cryptogram != self.generate_key(self.server_random_number, client_cryptogram, self._enc): raise Exception("Invalid client cryptogram") - self._smac1 = self.generate_key( - bytes([0x01, 0x01, self.server_random_number[0], self.server_random_number[1], self.server_random_number[2], self.server_random_number[3], self.server_random_number[4], self.server_random_number[5]]), - bytes([0x00]*8), self.default_secure_channel_key + bytes([ + 0x01, 0x01, + self.server_random_number[0], self.server_random_number[1], self.server_random_number[2], + self.server_random_number[3], self.server_random_number[4], self.server_random_number[5] + ]), + bytes([0x00] * 8), + self.default_secure_channel_key ) self._smac2 = self.generate_key( - bytes([0x01, 0x02, self.server_random_number[0], self.server_random_number[1], self.server_random_number[2], self.server_random_number[3], self.server_random_number[4], self.server_random_number[5]]), - bytes([0x00]*8), self.default_secure_channel_key + bytes([ + 0x01, 0x02, + self.server_random_number[0], self.server_random_number[1], + self.server_random_number[2], self.server_random_number[3], + self.server_random_number[4], self.server_random_number[5] + ]), + bytes([0x00] * 8), + self.default_secure_channel_key + ) + self.server_cryptogram = self.generate_key( + client_random_number, + self._server_random_number, + self._enc ) - self.server_cryptogram = self.generate_key(client_random_number, self._server_random_number, self._enc) self.is_initialized = True - def establish(self, rmac: bytes): self._rmac = rmac self.is_established = True @@ -55,16 +76,16 @@ def generate_mac(self, message: bytes, is_command: bool): key = self._smac1 iv = self._rmac if is_command else self._cmac - while current_location len(message): key = self._smac2 - if len(message)%crypto_length!=0: - input_buffer[len(message)%crypto_length] = padding_start + if len(message) % crypto_length != 0: + input_buffer[len(message) % crypto_length] = padding_start cipher = AES.new(key, AES.MODE_CBC, iv) mac = cipher.encrypt(bytes(input_buffer)) @@ -80,13 +101,13 @@ def decrypt_data(self, data: bytes) -> bytes: padding_start = 0x80 key = self._enc - iv = bytes([ (~b) & 0xFF for b in self._cmac ]) + iv = bytes([(~b) & 0xFF for b in self._cmac]) cipher = AES.new(key, AES.MODE_CBC, iv) padded_data = cipher.decrypt(data) decrypted_data = bytearray(padded_data) - while len(decrypted_data)>0 and decrypted_data[-1]!=padding_start: + while len(decrypted_data) > 0 and decrypted_data[-1] != padding_start: decrypted_data.pop() - if len(decrypted_data)>0 and decrypted_data[-1]==padding_start: + if len(decrypted_data) > 0 and decrypted_data[-1] == padding_start: decrypted_data.pop() return bytes(decrypted_data) @@ -95,11 +116,11 @@ def encrypt_data(self, data: bytes) -> bytes: padding_start = 0x80 padded_data = bytearray(data) padded_data.append(padding_start) - while len(padded_data)%crypto_length!=0: + while len(padded_data) % crypto_length != 0: padded_data.append(0x00) key = self._enc - iv = bytes([ (~b) & 0xFF for b in self._rmac ]) + iv = bytes([(~b) & 0xFF for b in self._rmac]) cipher = AES.new(key, AES.MODE_CBC, iv) return cipher.encrypt(padded_data) @@ -110,4 +131,4 @@ def reset(self): def generate_key(self, first: bytes, second: bytes, key: bytes) -> bytes: cipher = AES.new(key, AES.MODE_ECB) - return cipher.encrypt(first + second) \ No newline at end of file + return cipher.encrypt(first + second) diff --git a/osdp/_types.py b/osdp/_types.py index 75b47dd..93949d3 100644 --- a/osdp/_types.py +++ b/osdp/_types.py @@ -1,6 +1,7 @@ from enum import Enum from threading import Event + class ReplyType(Enum): Ack = 0x40 Nak = 0x41 @@ -21,6 +22,7 @@ class ReplyType(Enum): Busy = 0x79 ManufactureSpecific = 0x90 + class SecurityBlockType(Enum): BeginNewSecureConnectionSequence = 0x11 SecureConnectionSequenceStep2 = 0x12 @@ -31,6 +33,7 @@ class SecurityBlockType(Enum): CommandMessageWithDataSecurity = 0x17 ReplyMessageWithDataSecurity = 0x18 + class Control: def __init__(self, sequence: int, use_crc: bool, has_security_control_block: bool): @@ -40,10 +43,14 @@ def __init__(self, sequence: int, use_crc: bool, has_security_control_block: boo @property def control_byte(self) -> int: - return (self.sequence & 0x03 | (0x04 if self.use_crc else 0) | (0x08 if self.has_security_control_block else 0)) & 0xFF + cb = self.sequence & 0x03 + cb |= (0x04 if self.use_crc else 0) + cb |= (0x08 if self.has_security_control_block else 0) + return cb & 0xFF def increment_sequence(self): - self.sequence = self.sequence%3 + 1 + self.sequence = self.sequence % 3 + 1 + class ErrorCode(Enum): NoError = 0x0 @@ -58,6 +65,7 @@ class ErrorCode(Enum): UnableToProcessCommand = 0x9 GenericError = 0xFF + class Nak: def __init__(self, error_code: ErrorCode, extra_data: bytes): @@ -67,7 +75,7 @@ def __init__(self, error_code: ErrorCode, extra_data: bytes): @staticmethod def parse_data(reply): data = reply.extract_reply_data - if len(data)<1: + if len(data) < 1: raise ValueError("Invalid size for the data") error_code = ErrorCode(data[0]) @@ -77,9 +85,13 @@ def parse_data(reply): def __repr__(self): return "Error: {0}\n Data: {1}".format(self.error_code.name, self.extra_data.hex()) + class DeviceIdentification: - def __init__(self, vendor_code: bytes, model_number: int, version: int, serial_number: int, firmware_major: int, firmware_minor: int, firmware_build: int): + def __init__( + self, vendor_code: bytes, model_number: int, version: int, serial_number: int, + firmware_major: int, firmware_minor: int, firmware_build: int + ): self.vendor_code = vendor_code self.model_number = model_number self.version = version @@ -91,7 +103,7 @@ def __init__(self, vendor_code: bytes, model_number: int, version: int, serial_n @staticmethod def parse_data(reply): data = reply.extract_reply_data - if len(data)!=12: + if len(data) != 12: raise ValueError("Invalid size for the data") vendor_code = data[0:3] @@ -101,11 +113,33 @@ def parse_data(reply): firmware_major = data[9] firmware_minor = data[10] firmware_build = data[11] - return DeviceIdentification(vendor_code, model_number, version, serial_number, firmware_major, firmware_minor, firmware_build) + return DeviceIdentification( + vendor_code, + model_number, + version, + serial_number, + firmware_major, + firmware_minor, + firmware_build + ) def __repr__(self): - return " Vendor Code: {0}\n Model Number: {1}\n Version: {2}\n Serial Number: {3}\nFirmware Version: {4}.{5}.{6}".format( - self.vendor_code.hex(), self.model_number, self.version, self.serial_number.to_bytes(4, byteorder='little').hex(), self.firmware_major, self.firmware_minor, self.firmware_build) + return \ + " Vendor Code: {0}\n"\ + " Model Number: {1}\n"\ + " Version: {2}\n"\ + " Serial Number: {3}\n"\ + "Firmware Version: {4}.{5}.{6}"\ + .format( + self.vendor_code.hex(), + self.model_number, + self.version, + self.serial_number.to_bytes(4, byteorder='little').hex(), + self.firmware_major, + self.firmware_minor, + self.firmware_build + ) + class CapabilityFunction(Enum): Unknown = 0 @@ -124,6 +158,7 @@ class CapabilityFunction(Enum): Readers = 13 Biometrics = 14 + class DeviceCapability: def __init__(self, function: CapabilityFunction, compliance: int, number_of: int): @@ -133,16 +168,30 @@ def __init__(self, function: CapabilityFunction, compliance: int, number_of: int @staticmethod def parse_data(data: bytes): - function = CapabilityFunction(data[0]) if data[0]<=14 else CapabilityFunction.Unknown + function = CapabilityFunction(data[0]) if data[0] <= 14 else CapabilityFunction.Unknown compliance = data[1] number_of = data[2] return DeviceCapability(function, compliance, number_of) def __repr__(self): - if(self.function==CapabilityFunction.ReceiveBufferSize or self.function==CapabilityFunction.LargestCombinedMessageSize): - return " Function: {0}\n Size: {1}".format(self.function.name, int.from_bytes(bytes([self.compliance, self.number_of]), byteorder='little')) + if self.function == CapabilityFunction.ReceiveBufferSize or \ + self.function == CapabilityFunction.LargestCombinedMessageSize: + return \ + " Function: {0}\n"\ + " Size: {1}".format( + self.function.name, + int.from_bytes(bytes([self.compliance, self.number_of]), byteorder='little') + ) else: - return " Function: {0}\nCompliance: {1}\n Number Of: {2}".format(self.function.name, self.compliance, self.number_of) + return \ + " Function: {0}\n"\ + "Compliance: {1}\n"\ + " Number Of: {2}".format( + self.function.name, + self.compliance, + self.number_of + ) + class DeviceCapabilities: @@ -152,17 +201,18 @@ def __init__(self, capabilities): @staticmethod def parse_data(reply): data = reply.extract_reply_data - if len(data)%3!=0: + if len(data) % 3 != 0: raise ValueError("Invalid size for the data") capabilities = [] for i in range(0, len(data), 3): - capabilities.append(DeviceCapability.parse_data(data[i:i+3])) + capabilities.append(DeviceCapability.parse_data(data[i:(i + 3)])) return DeviceCapabilities(capabilities) def __repr__(self): return '\n\n'.join([str(capability) for capability in self.capabilities]) + class InputStatus: def __init__(self, statuses): @@ -171,12 +221,13 @@ def __init__(self, statuses): @staticmethod def parse_data(reply): data = reply.extract_reply_data - statuses = map(lambda b : b!=0, data) + statuses = map(lambda b: b != 0, data) return InputStatus(statuses) def __repr__(self): return 'Input: [' + ', '.join([str(status) for status in self.statuses]) + ']' + class OutputStatus: def __init__(self, statuses): @@ -185,12 +236,13 @@ def __init__(self, statuses): @staticmethod def parse_data(reply): data = reply.extract_reply_data - statuses = map(lambda b : b!=0, data) + statuses = map(lambda b: b != 0, data) return OutputStatus(statuses) def __repr__(self): return 'Output: [' + ', '.join([str(status) for status in self.statuses]) + ']' + class LocalStatus: def __init__(self, tamper: bool, power_failure: bool): @@ -200,21 +252,23 @@ def __init__(self, tamper: bool, power_failure: bool): @staticmethod def parse_data(reply): data = reply.extract_reply_data - if len(data)<2: + if len(data) < 2: raise ValueError("Invalid size for the data") - tamper = data[0]!=0 - power_failure = data[1]!=0 + tamper = data[0] != 0 + power_failure = data[1] != 0 return LocalStatus(tamper, power_failure) def __repr__(self): return " Tamper: {0}\nCPower Failure: {1}".format(self.tamper, self.power_failure) + class ReaderTamperStatus(Enum): Normal = 0x00 NotConnected = 0x01 Tamper = 0x02 + class ReaderStatus: def __init__(self, statuses): @@ -223,12 +277,13 @@ def __init__(self, statuses): @staticmethod def parse_data(reply): data = reply.extract_reply_data - statuses = map(lambda b : ReaderTamperStatus(b), data) + statuses = map(lambda b: ReaderTamperStatus(b), data) return ReaderStatus(statuses) def __repr__(self): return 'Reader Status: [' + ', '.join([str(status) for status in self.statuses]) + ']' + class OutputControlCode(Enum): Nop = 0x00 PermanentStateOffAbortTimedOperation = 0x01 @@ -238,6 +293,7 @@ class OutputControlCode(Enum): TemporaryStateOnResumePermanentState = 0x05 TemporaryStateOffResumePermanentState = 0x06 + class OutputControl: def __init__(self, output_number: int, output_control_code: OutputControlCode, timer: int): @@ -249,6 +305,7 @@ def build_data(self) -> bytes: timer_bytes = self.timer.to_bytes(2, byteorder='little') return bytes([self.output_number, self.output_control_code.value, timer_bytes[0], timer_bytes[1]]) + class OutputControls: def __init__(self, controls): @@ -260,36 +317,42 @@ def build_data(self) -> bytes: data.extend(control.build_data()) return bytes(data) + class TemporaryReaderControlCode(Enum): Nop = 0x00 CancelAnyTemporaryAndDisplayPermanent = 0x01 SetTemporaryAndStartTimer = 0x02 + class PermanentReaderControlCode(Enum): Nop = 0x00 SetPermanentState = 0x02 + class LedColor(Enum): Black = 0 Red = 1 - Green = 2 + Green = 2 Amber = 3 Blue = 4 + class ReaderLedControl: - def __init__(self, reader_number: int, led_number: int, - temporary_mode: TemporaryReaderControlCode, - temporary_on_time: int, - temporary_off_time: int, - temporary_on_color: LedColor, - temporary_off_color: LedColor, - temporary_timer: int, - permanent_mode: PermanentReaderControlCode, - permanent_on_time: int, - permanent_off_time: int, - permanent_on_color: LedColor, - permanent_off_color: LedColor): + def __init__( + self, reader_number: int, led_number: int, + temporary_mode: TemporaryReaderControlCode, + temporary_on_time: int, + temporary_off_time: int, + temporary_on_color: LedColor, + temporary_off_color: LedColor, + temporary_timer: int, + permanent_mode: PermanentReaderControlCode, + permanent_on_time: int, + permanent_off_time: int, + permanent_on_color: LedColor, + permanent_off_color: LedColor + ): self.reader_number = reader_number self.led_number = led_number self.temporary_mode = temporary_mode @@ -306,7 +369,8 @@ def __init__(self, reader_number: int, led_number: int, def build_data(self) -> bytes: temporary_timer_bytes = self.temporary_timer.to_bytes(2, byteorder='little') - return bytes([self.reader_number, self.led_number, + return bytes([ + self.reader_number, self.led_number, self.temporary_mode.value, self.temporary_on_time, self.temporary_off_time, @@ -321,6 +385,7 @@ def build_data(self) -> bytes: self.permanent_off_color.value ]) + class ReaderLedControls: def __init__(self, controls): @@ -332,20 +397,24 @@ def build_data(self) -> bytes: data.extend(control.build_data()) return bytes(data) + class ToneCode(Enum): NoTone = 0 Off = 1 - DefaultTone = 2 - TBD = 3 + DefaultTone = 2 + TBD = 3 + class ReaderBuzzerControl: - def __init__(self, - reader_number: int, - tone_code: ToneCode, - on_time: int, - off_time: int, - count: int): + def __init__( + self, + reader_number: int, + tone_code: ToneCode, + on_time: int, + off_time: int, + count: int + ): self.reader_number = reader_number self.tone_code = tone_code self.on_time = on_time @@ -361,21 +430,25 @@ def build_data(self) -> bytes: self.count ]) + class TextCommand(Enum): PermanentTextNoWrap = 0x01 PermanentTextWithWrap = 0x02 TempTextNoWrap = 0x02 TempTextWithWrap = 0x04 + class ReaderTextOutput: - def __init__(self, - reader_number: int, - text_command: TextCommand, - temp_text_time: int, - row: int, - column: int, - text: str): + def __init__( + self, + reader_number: int, + text_command: TextCommand, + temp_text_time: int, + row: int, + column: int, + text: str + ): self.reader_number = reader_number self.text_command = text_command self.temp_text_time = temp_text_time @@ -394,10 +467,12 @@ def build_data(self) -> bytes: text_length, ]) + self.text.encode("ascii") + class FormatCode(Enum): NotSpecified = 0x0 Wiegand = 0x1 + class RawCardData: def __init__(self, reader_number: int, format_code: FormatCode, bit_count: int, data: bytes): @@ -409,7 +484,7 @@ def __init__(self, reader_number: int, format_code: FormatCode, bit_count: int, @staticmethod def parse_data(reply) -> Nak: data = reply.extract_reply_data - if len(data)<4: + if len(data) < 4: raise ValueError("Invalid size for the data") reader_number = data[0] @@ -419,7 +494,17 @@ def parse_data(reply) -> Nak: return RawCardData(reader_number, format_code, bit_count, data) def __repr__(self): - return "Reader Number: {0}\n Format Code: {1}\n Bit Count: {2}\n Data: {3}".format(self.reader_number, self.format_code.name, self.bit_count, self.data.hex().upper()) + return \ + "Reader Number: {0}\n"\ + " Format Code: {1}\n"\ + " Bit Count: {2}\n"\ + " Data: {3}".format( + self.reader_number, + self.format_code.name, + self.bit_count, + self.data.hex().upper() + ) + class KeypadData: @@ -431,7 +516,7 @@ def __init__(self, reader_number: int, bit_count: int, data: bytes): @staticmethod def parse_data(reply) -> Nak: data = reply.extract_reply_data - if len(data)<2: + if len(data) < 2: raise ValueError("Invalid size for the data") reader_number = data[0] @@ -440,7 +525,14 @@ def parse_data(reply) -> Nak: return KeypadData(reader_number, bit_count, data) def __repr__(self): - return "Reader Number: {0}\n Bit Count: {1}\n Data: {2}".format(self.reader_number, self.bit_count, self.data.hex().upper()) + return \ + "Reader Number: {0}\n"\ + " Bit Count: {1}\n"\ + " Data: {2}".format( + self.reader_number, + self.bit_count, + self.data.hex().upper() + ) class DataEvent(Event): diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..515c827 --- /dev/null +++ b/tox.ini @@ -0,0 +1,5 @@ +[flake8] +ignore = W191,E117,E722,C901,F401 +max-line-length = 120 +exclude = tests/* +max-complexity = 10 \ No newline at end of file