From c4ede52609cb9a392d85f85d6836fbb2fccdae7e Mon Sep 17 00:00:00 2001 From: Birne Vogel Date: Fri, 12 Apr 2024 14:57:38 +0200 Subject: [PATCH] feat: support setting udp buffer size in run time --- dlt/dlt.py | 72 ++++++++++++++++++++++++++++++++++++-- dlt/dlt_broker_handlers.py | 12 ++++++- dlt/py_dlt_receive.py | 22 +++++++++++- 3 files changed, 101 insertions(+), 5 deletions(-) diff --git a/dlt/dlt.py b/dlt/dlt.py index 8d19577..888b0d4 100644 --- a/dlt/dlt.py +++ b/dlt/dlt.py @@ -56,6 +56,9 @@ DLT_EMPTY_FILE_ERROR = "DLT TRACE FILE IS EMPTY" cDLT_FILE_NOT_OPEN_ERROR = "Could not open DLT Trace file (libdlt)" # pylint: disable=invalid-name +DLT_UDP_MULTICAST_FD_BUFFER_SIZE = int(os.environ.get("PYDLT_UDP_MULTICAST_FD_BUFFER_SIZE", 2 * (2**20))) # 2 Mb +DLT_UDP_MULTICAST_BUFFER_SIZE = int(os.environ.get("PYDLT_UDP_MULTICAST_BUFFER_SIZE", 8 * (2**20))) # 8 Mb + class cached_property(object): # pylint: disable=invalid-name """ @@ -865,13 +868,23 @@ def __len__(self): class DLTClient(cDltClient): - """DLTClient class takes care about correct initialization and - cleanup - """ + """DLTClient class takes care about correct initialization and cleanup""" verbose = 0 def __init__(self, **kwords): + """Initialize a DLTClient. + + :param servIP: Optional[str] - dlt server IP. + :param hostIP: Optional[str] - Only available for udp multicast mode. + Set host interface address. + :param port: Optional[int] - dlt tcp daemon port. + :param verbose: Optional[bool] - Enable verbose output. + :param udp_fd_buffer_size_bytes: Optional[int] - Only available for udp + multicast mode. Set the UDP buffer size through setsockopt (unit: bytes). + :param udp_buffer_size_bytes: Optional[int] - Only available for udp + multicast mode. Set the DltReceiver's buffer size (unit: bytes). + """ self.is_udp_multicast = False self.verbose = kwords.pop("verbose", 0) if dltlib.dlt_client_init(ctypes.byref(self), self.verbose) == DLT_RETURN_ERROR: @@ -915,6 +928,9 @@ def __init__(self, **kwords): # it ourselves elsewhere self.port = kwords.get("port", DLT_DAEMON_TCP_PORT) + self._udp_fd_buffer_size_bytes = kwords.get("udp_fd_buffer_size_bytes", DLT_UDP_MULTICAST_FD_BUFFER_SIZE) + self._udp_buffer_size_bytes = kwords.get("udp_buffer_size_bytes", DLT_UDP_MULTICAST_BUFFER_SIZE) + def connect(self, timeout=None): """Connect to the server @@ -973,7 +989,9 @@ def connect(self, timeout=None): else: if self.verbose: logger.info("Connecting DLTClient using UDP Connection") + connected = dltlib.dlt_client_connect(ctypes.byref(self), self.verbose) + self._set_udp_multicast_buffer_size() if self.verbose: logger.info("DLT Connection return: %s", connected) @@ -1051,6 +1069,54 @@ def client_loop(self): dltlib.dlt_client_register_message_callback(self.msg_callback) dltlib.dlt_client_main_loop(ctypes.byref(self), None, self.verbose) + def _set_udp_multicast_buffer_size(self, custom_fd_buffer_size_bytes=None, custom_buffer_size_bytes=None) -> None: + fd_buffer_size = int(self._udp_fd_buffer_size_bytes or custom_fd_buffer_size_bytes or 0) + buffer_size_bytes = int(self._udp_buffer_size_bytes or custom_buffer_size_bytes or 0) + + if fd_buffer_size: + # Socket options are associated with an open file description. This + # means that file descriptors duplicated as a consequence of dup() + # (or similar) or fork() share the same set of socket options. + # -- Chapter 61.9 Socket Options. + # The Linux Programming Interface, p.1279 + # + # The buffer size can be changed with a new fd which is created by + # dup system call (it's the internal implementation in + # `socket.fromfd`), so the code creates a socket instance first + # configures it and directly close it. + with socket.fromfd(self.sock, socket.AF_INET, socket.SOCK_DGRAM) as conf_socket: + logger.debug("Set UDP Multicast socket buffer size: %s kbytes", fd_buffer_size / 1024) + conf_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, fd_buffer_size) + + real_buffer_size = int(conf_socket.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF) / 2) + if real_buffer_size != fd_buffer_size: + logger.warning( + ( + "Failed to set UDP Multicast buffer size. set_size: %s, real_size: %s. " + "Bypass the error and continue" + ), + fd_buffer_size / 1024, + real_buffer_size / 1024, + ) + logger.warning( + ( + "Please run command `sysctl -w net.core.rmem_max=%s` with root permission to " + "set the maximum size and restart dlt again." + ), + fd_buffer_size, + ) + + if buffer_size_bytes: + logger.debug("Set UDP Multicast DltReceiver buffer size: %s kbytes", buffer_size_bytes / 1024) + ret = dltlib.dlt_receiver_init( + ctypes.byref(self.receiver), self.sock, self.receiver.type, buffer_size_bytes + ) + if ret < 0: + raise RuntimeError( + f"Failed to set UDP Multicast DltReceiver buffer size. return code: {ret}, " + f"buffer_size_bytes: {buffer_size_bytes}" + ) + def py_dlt_file_main_loop(dlt_reader, limit=None, callback=None): """Main loop to read dlt messages from dlt file.""" diff --git a/dlt/dlt_broker_handlers.py b/dlt/dlt_broker_handlers.py index 684804a..9bae249 100644 --- a/dlt/dlt_broker_handlers.py +++ b/dlt/dlt_broker_handlers.py @@ -15,6 +15,8 @@ from dlt.dlt import ( DLTClient, DLT_DAEMON_TCP_PORT, + DLT_UDP_MULTICAST_BUFFER_SIZE, + DLT_UDP_MULTICAST_FD_BUFFER_SIZE, cDLT_FILE_NOT_OPEN_ERROR, load, py_dlt_client_main_loop, @@ -402,6 +404,8 @@ def __init__( self.tracefile = None self.last_connected = time.time() self.last_message = time.time() - 120.0 + self._udp_fd_buffer_size_bytes = client_cfg.get("udp_fd_buffer_size_bytes", DLT_UDP_MULTICAST_FD_BUFFER_SIZE) + self._udp_buffer_size_bytes = client_cfg.get("udp_buffer_size_bytes", DLT_UDP_MULTICAST_BUFFER_SIZE) def is_valid_message(self, message): return message and (message.apid != "" or message.ctid != "") @@ -420,7 +424,13 @@ def _client_connect(self): self._port, self._filename, ) - self._client = DLTClient(servIP=self._ip_address, port=self._port, verbose=self.verbose) + self._client = DLTClient( + servIP=self._ip_address, + port=self._port, + verbose=self.verbose, + udp_fd_buffer_size_bytes=self._udp_fd_buffer_size_bytes, + udp_buffer_size_bytes=self._udp_buffer_size_bytes, + ) connected = self._client.connect(self.timeout) if connected: logger.info("DLTClient connected to %s", self._client.servIP) diff --git a/dlt/py_dlt_receive.py b/dlt/py_dlt_receive.py index 9cad7ca..b389645 100644 --- a/dlt/py_dlt_receive.py +++ b/dlt/py_dlt_receive.py @@ -5,6 +5,7 @@ import logging import time +from dlt.dlt import DLT_UDP_MULTICAST_FD_BUFFER_SIZE, DLT_UDP_MULTICAST_BUFFER_SIZE from dlt.dlt_broker import DLTBroker logging.basicConfig(format="%(asctime)s %(name)s %(levelname)-8s %(message)s") @@ -18,13 +19,32 @@ def parse_args(): parser = argparse.ArgumentParser(description="Receive DLT messages") parser.add_argument("--host", required=True, help="hostname or ip address to connect to") parser.add_argument("--file", required=True, help="The file into which the messages will be written") + parser.add_argument( + "--udp-fd-buffer-size", + dest="udp_fd_buffer_size", + default=DLT_UDP_MULTICAST_FD_BUFFER_SIZE, + type=int, + help=f"Set the socket buffer size in udp multicast mode. default: {DLT_UDP_MULTICAST_FD_BUFFER_SIZE} bytes", + ) + parser.add_argument( + "--udp-buffer-size", + dest="udp_buffer_size", + default=DLT_UDP_MULTICAST_BUFFER_SIZE, + type=int, + help=f"Set the DltReceiver buffer size in udp multicast mode. default: {DLT_UDP_MULTICAST_BUFFER_SIZE} bytes", + ) return parser.parse_args() def dlt_receive(options): """Receive DLT messages via DLTBroker""" logger.info("Creating DLTBroker instance") - broker = DLTBroker(ip_address=options.host, filename=options.file) + broker = DLTBroker( + ip_address=options.host, + filename=options.file, + udp_fd_buffer_size_bytes=options.udp_buffer_size, + udp_buffer_size_bytes=options.udp_fd_buffer_size, + ) logger.info("Starting DLTBroker") broker.start() # start the loop