Skip to content

Commit

Permalink
Add classmethod get_entity to send Vehicle ID Req without TCP connect…
Browse files Browse the repository at this point in the history
…ion #17 (#28)

Supports vehicle identification broadcast request on IPV4 in addition to the existing await method.
  • Loading branch information
HarshaLaxman authored Jul 6, 2023
1 parent c479ade commit 3b34d70
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 42 deletions.
10 changes: 10 additions & 0 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ Announcement broadcast message (sent at powerup) as follows:
ip, port = address
print(ip, port, logical_address)
Alternatively, you can request a Vehicle Identification Response message:

.. code-block:: python
from doipclient import DoIPClient
address, announcement = DoIPClient.get_entity()
logical_address = announcement.logical_address
ip, port = address
print(ip, port, logical_address)
Once you have the IP address and Logical Address for your ECU, you can connect
to it and begin interacting.

Expand Down
138 changes: 99 additions & 39 deletions doipclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from enum import IntEnum
from typing import Union
from .constants import (
A_DOIP_CTRL,
TCP_DATA_UNSECURED,
UDP_DISCOVERY,
A_PROCESSING_TIME,
Expand Down Expand Up @@ -116,11 +117,12 @@ class DoIPClient:
:param ecu_ip_address: This is the IP address of the target ECU. This should be a string representing an IPv4
address like "192.168.1.1" or an IPv6 address like "2001:db8::". Like the logical_address, if you don't know the
value for your ECU, utilize the await_vehicle_announcement() method.
value for your ECU, utilize the get_entity() or await_vehicle_announcement() method.
:type ecu_ip_address: str
:param ecu_logical_address: The logical address of the target ECU. This should be an integer. According to the
specification, the correct range is 0x0001 to 0x0DFF ("VM specific"). If you don't know the logical address,
use the await_vehicle_announcement() method and power cycle the ECU - it should identify itself on bootup.
either use the get_entity() method OR the await_vehicle_announcement() method and power
cycle the ECU - it should identify itself on bootup.
:type ecu_logical_address: int
:param tcp_port: The destination TCP port for DoIP data communication. By default this is 13400 for unsecure and
3496 when using TLS.
Expand Down Expand Up @@ -205,33 +207,8 @@ def __enter__(self):
def __exit__(self, type, value, traceback):
self.close()

@classmethod
def await_vehicle_announcement(
cls, udp_port=UDP_DISCOVERY, timeout=None, ipv6=False, source_interface=None
):
"""Receive Vehicle Announcement Message
When an ECU first turns on, it's supposed to broadcast a Vehicle Announcement Message over UDP 3 times
to assist DoIP clients in determining ECU IP's and Logical Addresses. Will use an IPv4 socket by default,
though this can be overridden with the `ipv6` parameter.
:param udp_port: The UDP port to listen on. Per the spec this should be 13400, but some VM's use a custom
one.
:type udp_port: int, optional
:param timeout: Maximum amount of time to wait for message
:type timeout: float, optional
:param ipv6: Bool forcing IPV6 socket instead of IPV4 socket
:type ipv6: bool, optional
:return: IP Address of ECU and VehicleAnnouncementMessage object
:rtype: tuple
:param source_interface: Interface name (like "eth0") to bind to for use with IPv6. Defaults to None which
will use the default interface (which may not be the one connected to the ECU). Does nothing for IPv4,
which will bind to all interfaces uses INADDR_ANY.
:type source_interface: str, optional
:raises TimeoutError: If vehicle announcement not received in time
"""
start_time = time.time()

@staticmethod
def _create_udp_socket(ipv6=False, udp_port=UDP_DISCOVERY, timeout=None, source_interface=None):
if ipv6:
sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)

Expand Down Expand Up @@ -264,8 +241,53 @@ def await_vehicle_announcement(
if timeout is not None:
sock.settimeout(timeout)

return sock

@staticmethod
def _pack_doip(protocol_version, payload_type, payload_data):
data_bytes = struct.pack(
"!BBHL",
protocol_version,
0xFF ^ protocol_version,
payload_type,
len(payload_data)
)
data_bytes += payload_data

return data_bytes

@classmethod
def await_vehicle_announcement(
cls, udp_port=UDP_DISCOVERY, timeout=None, ipv6=False, source_interface=None, sock=None
):
"""Receive Vehicle Announcement Message
When an ECU first turns on, it's supposed to broadcast a Vehicle Announcement Message over UDP 3 times
to assist DoIP clients in determining ECU IP's and Logical Addresses. Will use an IPv4 socket by default,
though this can be overridden with the `ipv6` parameter.
:param udp_port: The UDP port to listen on. Per the spec this should be 13400, but some VM's use a custom
one.
:type udp_port: int, optional
:param timeout: Maximum amount of time to wait for message
:type timeout: float, optional
:param ipv6: Bool forcing IPV6 socket instead of IPV4 socket
:type ipv6: bool, optional
:return: IP Address of ECU and VehicleAnnouncementMessage object
:rtype: tuple
:param source_interface: Interface name (like "eth0") to bind to for use with IPv6. Defaults to None which
will use the default interface (which may not be the one connected to the ECU). Does nothing for IPv4,
which will bind to all interfaces uses INADDR_ANY.
:type source_interface: str, optional
:raises TimeoutError: If vehicle announcement not received in time
"""
start_time = time.time()

parser = Parser()

if not sock:
sock = cls._create_udp_socket(ipv6=ipv6, udp_port=udp_port, timeout=timeout, source_interface=source_interface)

while True:
remaining = None
if timeout:
Expand All @@ -290,6 +312,52 @@ def await_vehicle_announcement(
if result and type(result) == VehicleIdentificationResponse:
return addr, result

@classmethod
def get_entity(cls, ecu_ip_address='255.255.255.255', protocol_version=0x02, eid=None, vin=None):
"""Sends a VehicleIdentificationRequest and awaits a VehicleIdentificationResponse from the ECU,
either with a specified VIN, EIN, or nothing. Equivalent to the request_vehicle_identification() method
but can be called without instantiation.
:param ecu_ip_address: This is the IP address of the target ECU for unicast. Defaults to broadcast if
the address is not known.
:type ecu_ip_address: str, optional
:param protocol_version: The DoIP protocol version to use for communication. Represents the version of the ISO 13400
specification to follow. 0x02 (2012) is probably correct for most ECU's at the time of writing, though technically
this implementation is against 0x03 (2019).
:type protocol_version: int, optional
:param eid: EID of the Vehicle
:type eid: bytes, optional
:param vin: VIN of the Vehicle
:type vin: str, optional
:return: The vehicle identification response message
:rtype: VehicleIdentificationResponse
"""

# UDP_TEST_EQUIPMENT_REQUEST is dynamically assigned using udp_port=0
sock = cls._create_udp_socket(udp_port=0, timeout=A_DOIP_CTRL)

if eid:
message = VehicleIdentificationRequestWithEID(eid)
elif vin:
message = VehicleIdentificationRequestWithVIN(vin)
else:
message = VehicleIdentificationRequest()

payload_data = message.pack()
payload_type = payload_message_to_type[type(message)]

data_bytes = cls._pack_doip(protocol_version, payload_type, payload_data)
logger.debug(
"Sending DoIP Vehicle Identification Request: Type: 0x{:X}, Payload Size: {}, Payload: {}".format(
payload_type,
len(payload_data),
" ".join(f"{byte:02X}" for byte in payload_data),
)
)
sock.sendto(data_bytes, (ecu_ip_address, UDP_DISCOVERY))

return cls.await_vehicle_announcement(timeout=A_DOIP_CTRL, sock=sock)

def empty_rxqueue(self):
"""Implemented for compatibility with udsoncan library. Nothing useful to be done yet"""
pass
Expand Down Expand Up @@ -414,14 +482,7 @@ def send_doip(

retry = self._auto_reconnect_tcp and not disable_retry

data_bytes = struct.pack(
"!BBHL",
self._protocol_version,
0xFF ^ self._protocol_version,
payload_type,
len(payload_data),
)
data_bytes += payload_data
data_bytes = self._pack_doip(self._protocol_version, payload_type, payload_data)
logger.debug(
"Sending DoIP Message: Type: 0x{:X}, Payload Size: {}, Payload: {}".format(
payload_type,
Expand Down Expand Up @@ -529,9 +590,8 @@ def request_activation(
)

def request_vehicle_identification(self, eid=None, vin=None):
"""Requests a VehicleIdentificationResponse from the ECU, either with a specified VIN, EIN,
"""Sends a VehicleIdentificationRequest and awaits a VehicleIdentificationResponse from the ECU, either with a specified VIN, EIN,
or nothing.
:param eid: EID of the Vehicle
:type eid: bytes, optional
:param vin: VIN of the Vehicle
Expand Down
37 changes: 34 additions & 3 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,16 +527,47 @@ def test_request_vehicle_identification_with_ein(mock_socket):
assert mock_socket.tx_queue[-1] == vehicle_identification_request_with_ein
assert result.vin == "1" * 17
assert result.logical_address == 0x1234


def test_request_vehicle_identification_with_vin(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(vehicle_identification_response)
result = sut.request_vehicle_identification(vin="1" * 17)
assert mock_socket.tx_queue[-1] == vehicle_identification_request_with_vin
assert result.vin == "1" * 17
assert result.logical_address == 0x1234
assert result.eid == b"1" * 6
assert result.gid == b"2" * 6
assert result.further_action_required == 0x00
assert result.vin_sync_status == 0x00

def test_get_entity(mock_socket):
mock_socket.rx_queue.append(vehicle_identification_response)
_, result = DoIPClient.get_entity()
assert mock_socket.tx_queue[-1] == vehicle_identification_request
assert result.vin == "1" * 17
assert result.logical_address == 0x1234
assert result.eid == b"1" * 6
assert result.gid == b"2" * 6
assert result.further_action_required == 0x00
assert result.vin_sync_status == 0x00

def test_request_vehicle_identification_with_vin(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)

def test_get_entity_with_ein(mock_socket):
mock_socket.rx_queue.append(vehicle_identification_response)
result = sut.request_vehicle_identification(vin="1" * 17)
_, result = DoIPClient.get_entity(eid=b"1" * 6)
assert mock_socket.tx_queue[-1] == vehicle_identification_request_with_ein
assert result.vin == "1" * 17
assert result.logical_address == 0x1234
assert result.eid == b"1" * 6
assert result.gid == b"2" * 6
assert result.further_action_required == 0x00
assert result.vin_sync_status == 0x00


def test_get_entity_with_vin(mock_socket):
mock_socket.rx_queue.append(vehicle_identification_response)
_, result = DoIPClient.get_entity(vin="1" * 17)
assert mock_socket.tx_queue[-1] == vehicle_identification_request_with_vin
assert result.vin == "1" * 17
assert result.logical_address == 0x1234
Expand Down

0 comments on commit 3b34d70

Please sign in to comment.