From 689df67bf9c3bc0e0c47fb61962f9d085febd438 Mon Sep 17 00:00:00 2001 From: Martin Belanger Date: Thu, 13 Jul 2023 13:43:14 -0400 Subject: [PATCH] avahi: add connectivity checker to verify IP addresses are reachable Signed-off-by: Martin Belanger --- NEWS.md | 3 + meson.build | 2 +- staslib/avahi.py | 278 ++++++++++++++++++++++++++++++++--------------- staslib/gutil.py | 98 +++++++++++++++++ 4 files changed, 291 insertions(+), 90 deletions(-) diff --git a/NEWS.md b/NEWS.md index 5c3a699..4583cea 100644 --- a/NEWS.md +++ b/NEWS.md @@ -5,11 +5,14 @@ New features: - Support for nBFT (NVMe-oF Boot Table). +- The Avahi driver will now verify reachability of services discovered through mDNS to make sure all discovered IP addresses can be connected to. This avoids invoking the NVMe kernel driver with invalid IP addresses and getting error messages in the syslog. +- The Avahi driver will now print an error message if the same IP address is found on multiple interfaces. This indicates a misconfiguration of the network. Bug fixes: * For TCP transport: use `sysfs` controller `src_addr` attribute when matching to a configured "candidate" controller. This is to determine when an existing controller (located under the `sysfs`) can be reused instead of creating a new one. This avoids creating unnecessary duplicate connections. * Udev event handling: use `systemctl restart` instead of `systemctl start`. There is a small chance that a `start` operation has not completed when a new `start` is required. Issuing a `start` while a `start` is being performed has no effect. However, a `restart` will be handled properly. +* `stafd`: Do not delete and recreate DC objects on kernel events indicating that an nvme device associated to a discovery controller was removed by the kernel. This was done to kick start the reconnect process, but was also causing the DLPE (Discovery Log Page Entries) cache to be lost. This could potentially result in `stacd` disconnecting from I/O controllers. Instead, keep the existing DC object which contains a valid DLPE cache and simply restart the "retry to connect" timer. This way the DLPE cache is maintained throughout the reconnect to DC process. ## Changes with release 2.2.3 diff --git a/meson.build b/meson.build index df52199..1e6f864 100644 --- a/meson.build +++ b/meson.build @@ -9,7 +9,7 @@ project( 'nvme-stas', meson_version: '>= 0.53.0', - version: '2.3-rc1', + version: '2.3-rc2', license: 'Apache-2.0', default_options: [ 'buildtype=release', diff --git a/staslib/avahi.py b/staslib/avahi.py index 26543d9..f29f89d 100644 --- a/staslib/avahi.py +++ b/staslib/avahi.py @@ -18,7 +18,7 @@ import dasbus.client.proxy import dasbus.client.observer from gi.repository import GLib -from staslib import defs, conf, gutil +from staslib import defs, conf, gutil, iputil def _txt2dict(txt: list): @@ -54,6 +54,141 @@ def _proto2trans(protocol): return None +def mk_service_key(interface, protocol, name, stype, domain): + '''Return a tuple used as a service key (unique identifier)''' + return (interface, protocol, name, stype, domain) + + +def fmt_service_str(interface, protocol, name, stype, domain, flags): # pylint: disable=too-many-arguments + '''Return service identifier as a string''' + return ( + f'interface={interface}:{(socket.if_indextoname(interface) + ","):<9} ' + f'protocol={Avahi.protocol_as_string(protocol)}, ' + f'stype={stype}, ' + f'domain={domain}, ' + f'flags={flags}:{(Avahi.result_flags_as_string(flags) + ","):<12} ' + f'name={name}' + ) + + +# ****************************************************************************** +class Service: # pylint: disable=too-many-instance-attributes + '''Object used to keep track of the services discovered from the avahi-daemon''' + + interface_name = property(lambda self: self._interface_name) + interface = property(lambda self: self._interface_id) + ip_family = property(lambda self: self._ip_family) + reachable = property(lambda self: self._reachable) + protocol = property(lambda self: self._protocol_id) + key_str = property(lambda self: self._key_str) + domain = property(lambda self: self._domain) + stype = property(lambda self: self._stype) + data = property(lambda self: self._data) + name = property(lambda self: self._name) + key = property(lambda self: self._key) + ip = property(lambda self: self._ip) + + def __init__(self, args, identified_cback): + self._identified_cback = identified_cback + self._interface_id = args[0] + self._protocol_id = args[1] + self._name = args[2] + self._stype = args[3] + self._domain = args[4] + self._flags = args[5] + self._ip_family = 4 if self._protocol_id == Avahi.PROTO_INET else 6 + + self._interface_name = socket.if_indextoname(self._interface_id).strip() + self._protocol_name = Avahi.protocol_as_string(self._protocol_id) + self._flags_str = '(' + Avahi.result_flags_as_string(self._flags) + '),' + + self._key = mk_service_key(self._interface_id, self._protocol_id, self._name, self._stype, self._domain) + self._key_str = f'({self._interface_name}, {self._protocol_name}, {self._name}.{self._domain}, {self._stype})' + + self._id = fmt_service_str( + self._interface_id, self._protocol_id, self._name, self._stype, self._domain, self._flags + ) + + self._ip = None + self._resolver = None + self._data = {} + self._reachable = False + self._connect_checker = None + + def info(self): + '''Return debug info''' + info = self._data + info['reachable'] = str(self._reachable) + return info + + def __str__(self): + return self._id + + def set_identity(self, transport, address, port, txt): # pylint: disable=too-many-arguments + '''Complete identification and check connectivity (if needed) + Return True if identification is complete. Return False if + we need to check connectivity. + ''' + traddr = address.strip() + trsvcid = str(port).strip() + # host-iface permitted for tcp alone and not rdma + host_iface = self._interface_name if transport == 'tcp' else '' + self._data = { + 'transport': transport, + 'traddr': traddr, + 'trsvcid': trsvcid, + # host-iface permitted for tcp alone and not rdma + 'host-iface': host_iface, + 'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip() + if conf.NvmeOptions().discovery_supp + else defs.WELL_KNOWN_DISC_NQN, + } + + self._ip = iputil.get_ipaddress_obj(traddr, ipv4_mapped_convert=True) + + if transport != 'tcp': + self._reachable = True + self._identified_cback() + return + + self._reachable = False + connect_checker = gutil.TcpChecker(traddr, trsvcid, host_iface, self._tcp_connect_check_cback) + + try: + connect_checker.connect() + except RuntimeError as err: + logging.error('Unable to verify connectivity: %s', err) + connect_checker.close() + connect_checker = None + + self._connect_checker = connect_checker + + def _tcp_connect_check_cback(self, connected): + if self._connect_checker is not None: + self._connect_checker.close() + self._connect_checker = None + self._reachable = connected + self._identified_cback() + + def set_resolver(self, resolver): + '''Set the resolver object''' + self._resolver = resolver + + def close(self): + '''Close this object and release all resources''' + if self._connect_checker is not None: + self._connect_checker.close() + self._connect_checker = None + + if self._resolver is not None: + try: + self._resolver.Free() + dasbus.client.proxy.disconnect_proxy(self._resolver) + except (AttributeError, dasbus.error.DBusError) as ex: + logging.debug('Service.close() - Failed to Free() resolver. %s', ex) + self._resolver = None + + # ****************************************************************************** class Avahi: # pylint: disable=too-many-instance-attributes '''@brief Avahi Server proxy. Set up the D-Bus connection to the Avahi @@ -182,16 +317,10 @@ def kill(self): def info(self) -> dict: '''@brief return debug info about this object''' - services = dict() - for service, obj in self._services.items(): - interface, protocol, name, stype, domain = service - key = f'({socket.if_indextoname(interface)}, {Avahi.protos.get(protocol, "unknown")}, {name}.{domain}, {stype})' - services[key] = obj.get('data', {}) - info = { 'avahi wake up timer': str(self._kick_avahi_tmr), 'service types': list(self._stypes), - 'services': services, + 'services': {service.key_str: service.info() for service in self._services.values()}, } return info @@ -217,7 +346,7 @@ def get_controllers(self) -> list: [...] ] ''' - return [service['data'] for service in self._services.values() if len(service['data'])] + return [service.data for service in self._services.values() if service.reachable] def config_stypes(self, stypes: list): '''@brief Configure the service types that we want to discover. @@ -234,18 +363,17 @@ def kick_start(self): ''' self._kick_avahi_tmr.clear() + def _remove_service(self, service_to_rm: typing.Tuple[int, int, str, str, str]): + service = self._services.pop(service_to_rm) + if service is not None: + service.close() + def _disconnect(self): logging.debug('Avahi._disconnect()') for service in self._services.values(): - resolver = service.pop('resolver', None) - if resolver is not None: - try: - resolver.Free() - dasbus.client.proxy.disconnect_proxy(resolver) - except (AttributeError, dasbus.error.DBusError) as ex: - logging.debug('Avahi._disconnect() - Failed to Free() resolver. %s', ex) + service.close() - self._services = dict() + self._services.clear() for browser in self._service_browsers.values(): try: @@ -296,15 +424,9 @@ def _configure_browsers(self): logging.debug('Avahi._configure_browsers() - Failed to Free() browser. %s', ex) # Find the cached services corresponding to stype_to_rm and remove them - services_to_rm = [service for service in self._services if service[3] == stype_to_rm] - for service in services_to_rm: - resolver = self._services.pop(service, {}).pop('resolver', None) - if resolver is not None: - try: - resolver.Free() - dasbus.client.proxy.disconnect_proxy(resolver) - except (AttributeError, dasbus.error.DBusError) as ex: - logging.debug('Avahi._configure_browsers() - Failed to Free() resolver. %s', ex) + services_to_rm = [service.key for service in self._services.values() if service.stype == stype_to_rm] + for service_to_rm in services_to_rm: + self._remove_service(service_to_rm) for stype in stypes_to_add: try: @@ -329,31 +451,25 @@ def _service_discovered( args: typing.Tuple[int, int, str, str, str, int], *_user_data, ): - (interface, protocol, name, stype, domain, flags) = args - logging.debug( - 'Avahi._service_discovered() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s', - interface, - socket.if_indextoname(interface), - Avahi.protocol_as_string(protocol), - stype, - domain, - flags, - '(' + Avahi.result_flags_as_string(flags) + '),', - name, - ) + service = Service(args, self._change_cb) + logging.debug('Avahi._service_discovered() - %s', service) - service = (interface, protocol, name, stype, domain) - if service not in self._services: + if service.key not in self._services: try: obj_path = self._avahi.ServiceResolverNew( - interface, protocol, name, stype, domain, Avahi.PROTO_UNSPEC, Avahi.LOOKUP_USE_MULTICAST + service.interface, + service.protocol, + service.name, + service.stype, + service.domain, + Avahi.PROTO_UNSPEC, + Avahi.LOOKUP_USE_MULTICAST, ) - self._services[service] = { - 'resolver': self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path), - 'data': {}, - } + service.set_resolver(self._sysbus.get_proxy(Avahi.DBUS_NAME, obj_path)) except dasbus.error.DBusError as ex: - logging.warning('Failed to create resolver: "%s", "%s", "%s". %s', interface, name, stype, ex) + logging.warning('Failed to create resolver - %s: %s', service, ex) + + self._services[service.key] = service def _service_removed( self, @@ -367,27 +483,14 @@ def _service_removed( ): (interface, protocol, name, stype, domain, flags) = args logging.debug( - 'Avahi._service_removed() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s', - interface, - socket.if_indextoname(interface), - Avahi.protocol_as_string(protocol), - stype, - domain, - flags, - '(' + Avahi.result_flags_as_string(flags) + '),', - name, + 'Avahi._service_removed() - %s', + fmt_service_str(interface, protocol, name, stype, domain, flags), ) - service = (interface, protocol, name, stype, domain) - resolver = self._services.pop(service, {}).pop('resolver', None) - if resolver is not None: - try: - resolver.Free() - dasbus.client.proxy.disconnect_proxy(resolver) - except (AttributeError, dasbus.error.DBusError) as ex: - logging.debug('Avahi._service_removed() - Failed to Free() resolver. %s', ex) - - self._change_cb() + service_key = mk_service_key(interface, protocol, name, stype, domain) + self._remove_service(service_key) + if self._change_cb is not None: + self._change_cb() def _service_identified( # pylint: disable=too-many-locals self, @@ -402,38 +505,21 @@ def _service_identified( # pylint: disable=too-many-locals (interface, protocol, name, stype, domain, host, aprotocol, address, port, txt, flags) = args txt = _txt2dict(txt) logging.debug( - 'Avahi._service_identified() - interface=%s (%s), protocol=%s, stype=%s, domain=%s, flags=%s %-14s name=%s, host=%s, aprotocol=%s, address=%s, port=%s, txt=%s', - interface, - socket.if_indextoname(interface), - Avahi.protocol_as_string(protocol), - stype, - domain, - flags, - '(' + Avahi.result_flags_as_string(flags) + '),', - name, + 'Avahi._service_identified() - %s, host=%s, aprotocol=%s, port=%s, address=%s, txt=%s', + fmt_service_str(interface, protocol, name, stype, domain, flags), host, Avahi.protocol_as_string(aprotocol), - address, port, + address, txt, ) - service = (interface, protocol, name, stype, domain) - if service in self._services: + service_key = mk_service_key(interface, protocol, name, stype, domain) + service = self._services.get(service_key, None) + if service is not None: transport = _proto2trans(txt.get('p')) if transport is not None: - self._services[service]['data'] = { - 'transport': transport, - 'traddr': address.strip(), - 'trsvcid': str(port).strip(), - # host-iface permitted for tcp alone and not rdma - 'host-iface': socket.if_indextoname(interface).strip() if transport == 'tcp' else '', - 'subsysnqn': txt.get('nqn', defs.WELL_KNOWN_DISC_NQN).strip() - if conf.NvmeOptions().discovery_supp - else defs.WELL_KNOWN_DISC_NQN, - } - - self._change_cb() + service.set_identity(transport, address, port, txt) else: logging.error( 'Received invalid/undefined protocol in mDNS TXT field: address=%s, iface=%s, TXT=%s', @@ -442,6 +528,8 @@ def _service_identified( # pylint: disable=too-many-locals txt, ) + self._check_for_duplicate_ips() + def _failure_handler( # pylint: disable=no-self-use self, _connection, @@ -456,3 +544,15 @@ def _failure_handler( # pylint: disable=no-self-use if 'ServiceResolver' not in interface_name or 'TimeoutError' not in error: # ServiceResolver may fire a timeout event after being Free'd(). This seems to be normal. logging.error('Avahi._failure_handler() - name=%s, error=%s', interface_name, error) + + def _check_for_duplicate_ips(self): + '''This is to identify misconfigured networks where the + same IP addresses are discovered on two or more interfaces.''' + ips = {} + for service in self._services.values(): + if service.ip is not None: + ips.setdefault(service.ip.compressed, []).append(service.interface_name) + + for ip, ifaces in ips.items(): + if len(ifaces) > 1: + logging.error('IP address %s was found on multiple interfaces: %s', ip, ','.join(ifaces)) diff --git a/staslib/gutil.py b/staslib/gutil.py index 836674a..1730ac0 100644 --- a/staslib/gutil.py +++ b/staslib/gutil.py @@ -11,6 +11,7 @@ ''' import logging +import socket from gi.repository import Gio, GLib, GObject from staslib import conf, iputil, trid @@ -416,3 +417,100 @@ def cancel(self): if self.is_scheduled(): self._source.destroy() self._source = None + + +# ****************************************************************************** +class TcpChecker: # pylint: disable=too-many-instance-attributes + '''@brief Verify that a TCP connection can be established with an enpoint''' + + def __init__(self, traddr, trsvcid, host_iface, user_cback, *user_data): + self._user_cback = user_cback + self._host_iface = host_iface + self._user_data = user_data + self._trsvcid = trsvcid + self._traddr = iputil.get_ipaddress_obj(traddr, ipv4_mapped_convert=True) + self._cancellable = None + self._gio_sock = None + self._native_sock = None + + def connect(self): + '''Attempt to connect''' + self.close() + + # Gio has limited setsockopt() capabilities. To set SO_BINDTODEVICE + # we need to use a generic socket.socket() and then convert to a + # Gio.Socket() object to perform async connect operation within + # the GLib context. + family = socket.AF_INET if self._traddr.version == 4 else socket.AF_INET6 + self._native_sock = socket.socket(family, socket.SOCK_STREAM | socket.SOCK_NONBLOCK, socket.IPPROTO_TCP) + if isinstance(self._host_iface, str): + self._native_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, self._host_iface.encode('utf-8')) + + # Convert socket.socket() to a Gio.Socket() object + try: + self._gio_sock = Gio.Socket.new_from_fd(self._native_sock.fileno()) # returns None on error + except GLib.Error as err: + logging.error('Cannot create socket: %s', err.message) # pylint: disable=no-member + self._gio_sock = None + + if self._gio_sock is None: + self._native_sock.close() + raise RuntimeError(f'Unable to connect to {self._traddr}, {self._trsvcid}, {self._host_iface}') + + g_addr = Gio.InetSocketAddress.new_from_string(self._traddr.compressed, int(self._trsvcid)) + + self._cancellable = Gio.Cancellable() + + g_sockconn = self._gio_sock.connection_factory_create_connection() + g_sockconn.connect_async(g_addr, self._cancellable, self._connect_async_cback) + + def close(self): + '''Terminate/Cancel current connection attempt and free resources''' + if self._cancellable is not None: + self._cancellable.cancel() + self._cancellable = None + + if self._gio_sock is not None: + try: + self._gio_sock.close() + except GLib.Error as err: + logging.debug('TcpChecker.close() gio_sock.close - %s', err.message) # pylint: disable=no-member + + self._gio_sock = None + + if self._native_sock is not None: + try: + # This is expected to fail because the socket + # is already closed by self._gio_sock.close() above. + # This code is just for completeness. + self._native_sock.close() + except OSError: + pass + + self._native_sock = None + + def _connect_async_cback(self, source_object, result): + ''' + @param source_object: The Gio.SocketConnection object used to + invoke the connect_async() API. + ''' + try: + connected = source_object.connect_finish(result) + except GLib.Error as err: + connected = False + # We don't need to report "cancellation" errors. + if err.matches(Gio.io_error_quark(), Gio.IOErrorEnum.CANCELLED): + logging.debug('TcpChecker._connect_async_cback() - %s', err.message) # pylint: disable=no-member + else: + logging.info( + 'Unable to verify TCP connectivity - (%-10s %-14s %s): %s', + self._host_iface + ',', + self._traddr.compressed + ',', + self._trsvcid, + err.message, # pylint: disable=no-member + ) + + self.close() + + if self._user_cback is not None: + self._user_cback(connected, *self._user_data)