Skip to content

Commit

Permalink
Use SSL context for wrapping socket (#30)
Browse files Browse the repository at this point in the history
* Add test for SSL-exceptions
* Add SSLError to "except" clause
* Add tests for SSL-context
* User SSL context for wrapping socket
* Add section about encrypted communication

---------

Co-authored-by: Simon Faiss <[email protected]>
  • Loading branch information
sfaiss and Simon Faiss authored Jul 4, 2023
1 parent 839663a commit 364b200
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 4 deletions.
46 changes: 46 additions & 0 deletions doc/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,49 @@ DoIPClient
----------
.. autoclass:: doipclient.DoIPClient
:members:


Encrypted Communication
-----------------------
:abbr:`TLS (Transport Layer Security)`/:abbr:`SSL (Secure Sockets Layer)` can
be enabled by setting the `use_secure` parameter when creating an instance of
`DoIPClient`.

.. code-block:: python
client = DoIPClient(
ip,
logical_address,
use_secure=True, # Enable encryption
tcp_port=3496,
)
If more control is required, a preconfigured `SSL context`_ can be provided.
For instance, to enforce the use of TLSv1.2, create a context with the desired
protocol version:

.. code-block:: python
import ssl
# Enforce use of TLSv1.2
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
client = DoIPClient(
ip,
logical_address,
use_secure=ssl_context,
tcp_port=3496,
)
.. note::
Since the communication is encrypted, debugging without the pre-master
secret is not possible. To decrypt the TLS traffic for analysis, the
pre-master secret can be dumped to a file and `loaded into Wireshark`_.
This can be done via the `built-in mechanism`_ or with `sslkeylog`_ when
using Python 3.7 and earlier.

.. _SSL context: https://docs.python.org/3/library/ssl.html#ssl-contexts
.. _loaded into Wireshark: https://wiki.wireshark.org/TLS#using-the-pre-master-secret
.. _built-in mechanism: https://docs.python.org/3/library/ssl.html#ssl.SSLContext.keylog_filename
.. _sslkeylog: https://pypi.org/project/sslkeylog/
18 changes: 14 additions & 4 deletions doipclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import ssl
from enum import IntEnum
from typing import Union
from .constants import (
TCP_DATA_UNSECURED,
UDP_DISCOVERY,
Expand Down Expand Up @@ -139,8 +140,9 @@ class DoIPClient:
Useful if you have multiple network adapters. Can be an IPv4 or IPv6 address just like `ecu_ip_address`, though
the type should match.
:type client_ip_address: str, optional
:param use_secure: Enables TLS if True. Untested. Should be combined with changing tcp_port to 3496.
:type use_secure: bool
:param use_secure: Enables TLS. If set to True, a default SSL context is used. For more control, a preconfigured
SSL context can be passed directly. Untested. Should be combined with changing tcp_port to 3496.
:type use_secure: Union[bool,ssl.SSLContext]
:param log_level: Logging level
:type log_level: int
:param auto_reconnect_tcp: Attempt to automatically reconnect TCP sockets that were closed by peer
Expand Down Expand Up @@ -382,7 +384,7 @@ def _tcp_socket_check(self, first_timeout=0.010):
self._tcp_parser.push_bytes(data)
# Subsequent reads, go to 0 timeout
self._tcp_sock.settimeout(0)
except (BlockingIOError, socket.timeout):
except (BlockingIOError, socket.timeout, ssl.SSLError):
pass
except (ConnectionResetError, BrokenPipeError):
logger.debug("TCP Connection broken, attempting to reset")
Expand Down Expand Up @@ -698,7 +700,15 @@ def _connect(self):
self._udp_sock.bind((self._client_ip_address, 0))

if self._use_secure:
self._tcp_sock = ssl.wrap_socket(self._tcp_sock)
if isinstance(self._use_secure, type(ssl.SSLContext)):
ssl_context = self._use_secure
else:
ssl_context = ssl.create_default_context()
self._wrap_socket(ssl_context)

def _wrap_socket(self, ssl_context):
"""Wrap the underlying socket in a SSL context."""
self._tcp_sock = ssl_context.wrap_socket(self._tcp_sock)

def close(self):
"""Close the DoIP client"""
Expand Down
34 changes: 34 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import socket
import ssl
import pytest
import logging
from doipclient import DoIPClient
Expand Down Expand Up @@ -640,3 +641,36 @@ def test_await_ipv4(mock_socket):
assert mock_socket.opts == {
socket.SOL_SOCKET: {socket.SO_REUSEADDR: True, socket.SO_BROADCAST: True},
}


def test_exception_from_blocking_ssl_socket(mock_socket, mocker):
"""SSL sockets behave slightly different than regular sockets in
non-blocking mode. They won't raise BlockingIOError but SSLWantWriteError
or SSLWantReadError instead.
See: https://docs.python.org/3/library/ssl.html#notes-on-non-blocking-sockets
"""
sut = DoIPClient(test_ip, test_logical_address)

try:
sut._tcp_sock.recv = mocker.Mock(side_effect=ssl.SSLWantReadError)
sut._tcp_socket_check()
sut._tcp_sock.recv = mocker.Mock(side_effect=ssl.SSLWantWriteError)
sut._tcp_socket_check()
except (ssl.SSLWantReadError, ssl.SSLWantWriteError) as exc:
pytest.fail(f"Should not raise exception: {exc.__class__.__name__}")


def test_use_secure_uses_default_ssl_context(mock_socket, mocker):
"""Wrap socket with default SSL-context when use_secure=True"""
mocked_context = mocker.patch.object(ssl, "SSLContext", autospec=True)
sut = DoIPClient(test_ip, test_logical_address, use_secure=True, activation_type=None)
mocked_wrap_socket = mocked_context.return_value.wrap_socket
mocked_wrap_socket.assert_called_once_with(mock_socket)


def test_use_secure_with_external_ssl_context(mock_socket, mocker):
"""Wrap socket with user provided SSL-context when use_secure=ssl_context"""
mocked_context = mocker.patch.object(ssl, "SSLContext", autospec=True)
sut = DoIPClient(test_ip, test_logical_address, use_secure=mocked_context, activation_type=None)
mocked_context.wrap_socket.assert_called_once_with(mock_socket)

0 comments on commit 364b200

Please sign in to comment.