From 364b2006644ff937a8322c27d565aaeb6d7f0710 Mon Sep 17 00:00:00 2001 From: sfaiss <44982288+sfaiss@users.noreply.github.com> Date: Tue, 4 Jul 2023 21:59:39 +0200 Subject: [PATCH] Use SSL context for wrapping socket (#30) * Add test for SSL-exceptions * Add SSLError to "except" clause * Add tests for SSL-context * User SSL context for wrapping socket * Add section about encrypted communication --------- Co-authored-by: Simon Faiss --- doc/source/index.rst | 46 ++++++++++++++++++++++++++++++++++++++++++++ doipclient/client.py | 18 +++++++++++++---- tests/test_client.py | 34 ++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 4 deletions(-) diff --git a/doc/source/index.rst b/doc/source/index.rst index b63386b..afec62a 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -50,3 +50,49 @@ DoIPClient ---------- .. autoclass:: doipclient.DoIPClient :members: + + +Encrypted Communication +----------------------- +:abbr:`TLS (Transport Layer Security)`/:abbr:`SSL (Secure Sockets Layer)` can +be enabled by setting the `use_secure` parameter when creating an instance of +`DoIPClient`. + +.. code-block:: python + + client = DoIPClient( + ip, + logical_address, + use_secure=True, # Enable encryption + tcp_port=3496, + ) + +If more control is required, a preconfigured `SSL context`_ can be provided. +For instance, to enforce the use of TLSv1.2, create a context with the desired +protocol version: + +.. code-block:: python + + import ssl + + # Enforce use of TLSv1.2 + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + + client = DoIPClient( + ip, + logical_address, + use_secure=ssl_context, + tcp_port=3496, + ) + +.. note:: + Since the communication is encrypted, debugging without the pre-master + secret is not possible. To decrypt the TLS traffic for analysis, the + pre-master secret can be dumped to a file and `loaded into Wireshark`_. + This can be done via the `built-in mechanism`_ or with `sslkeylog`_ when + using Python 3.7 and earlier. + +.. _SSL context: https://docs.python.org/3/library/ssl.html#ssl-contexts +.. _loaded into Wireshark: https://wiki.wireshark.org/TLS#using-the-pre-master-secret +.. _built-in mechanism: https://docs.python.org/3/library/ssl.html#ssl.SSLContext.keylog_filename +.. _sslkeylog: https://pypi.org/project/sslkeylog/ diff --git a/doipclient/client.py b/doipclient/client.py index bd5eeb0..0b8adb6 100644 --- a/doipclient/client.py +++ b/doipclient/client.py @@ -5,6 +5,7 @@ import time import ssl from enum import IntEnum +from typing import Union from .constants import ( TCP_DATA_UNSECURED, UDP_DISCOVERY, @@ -139,8 +140,9 @@ class DoIPClient: Useful if you have multiple network adapters. Can be an IPv4 or IPv6 address just like `ecu_ip_address`, though the type should match. :type client_ip_address: str, optional - :param use_secure: Enables TLS if True. Untested. Should be combined with changing tcp_port to 3496. - :type use_secure: bool + :param use_secure: Enables TLS. If set to True, a default SSL context is used. For more control, a preconfigured + SSL context can be passed directly. Untested. Should be combined with changing tcp_port to 3496. + :type use_secure: Union[bool,ssl.SSLContext] :param log_level: Logging level :type log_level: int :param auto_reconnect_tcp: Attempt to automatically reconnect TCP sockets that were closed by peer @@ -382,7 +384,7 @@ def _tcp_socket_check(self, first_timeout=0.010): self._tcp_parser.push_bytes(data) # Subsequent reads, go to 0 timeout self._tcp_sock.settimeout(0) - except (BlockingIOError, socket.timeout): + except (BlockingIOError, socket.timeout, ssl.SSLError): pass except (ConnectionResetError, BrokenPipeError): logger.debug("TCP Connection broken, attempting to reset") @@ -698,7 +700,15 @@ def _connect(self): self._udp_sock.bind((self._client_ip_address, 0)) if self._use_secure: - self._tcp_sock = ssl.wrap_socket(self._tcp_sock) + if isinstance(self._use_secure, type(ssl.SSLContext)): + ssl_context = self._use_secure + else: + ssl_context = ssl.create_default_context() + self._wrap_socket(ssl_context) + + def _wrap_socket(self, ssl_context): + """Wrap the underlying socket in a SSL context.""" + self._tcp_sock = ssl_context.wrap_socket(self._tcp_sock) def close(self): """Close the DoIP client""" diff --git a/tests/test_client.py b/tests/test_client.py index 8f27c15..0cbe6d6 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,4 +1,5 @@ import socket +import ssl import pytest import logging from doipclient import DoIPClient @@ -640,3 +641,36 @@ def test_await_ipv4(mock_socket): assert mock_socket.opts == { socket.SOL_SOCKET: {socket.SO_REUSEADDR: True, socket.SO_BROADCAST: True}, } + + +def test_exception_from_blocking_ssl_socket(mock_socket, mocker): + """SSL sockets behave slightly different than regular sockets in + non-blocking mode. They won't raise BlockingIOError but SSLWantWriteError + or SSLWantReadError instead. + + See: https://docs.python.org/3/library/ssl.html#notes-on-non-blocking-sockets + """ + sut = DoIPClient(test_ip, test_logical_address) + + try: + sut._tcp_sock.recv = mocker.Mock(side_effect=ssl.SSLWantReadError) + sut._tcp_socket_check() + sut._tcp_sock.recv = mocker.Mock(side_effect=ssl.SSLWantWriteError) + sut._tcp_socket_check() + except (ssl.SSLWantReadError, ssl.SSLWantWriteError) as exc: + pytest.fail(f"Should not raise exception: {exc.__class__.__name__}") + + +def test_use_secure_uses_default_ssl_context(mock_socket, mocker): + """Wrap socket with default SSL-context when use_secure=True""" + mocked_context = mocker.patch.object(ssl, "SSLContext", autospec=True) + sut = DoIPClient(test_ip, test_logical_address, use_secure=True, activation_type=None) + mocked_wrap_socket = mocked_context.return_value.wrap_socket + mocked_wrap_socket.assert_called_once_with(mock_socket) + + +def test_use_secure_with_external_ssl_context(mock_socket, mocker): + """Wrap socket with user provided SSL-context when use_secure=ssl_context""" + mocked_context = mocker.patch.object(ssl, "SSLContext", autospec=True) + sut = DoIPClient(test_ip, test_logical_address, use_secure=mocked_context, activation_type=None) + mocked_context.wrap_socket.assert_called_once_with(mock_socket)