diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index b182b53..324b832 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -29,7 +29,7 @@ jobs:
# Note: Currently using pytest with retries due to intermittent (~10% of the time) failure when
# running unit tests on macos runners due to "exec format error" when setting socket options (???).
- name: Run Pytest
- run: poetry run pytest --retries 3
+ run: poetry run pytest --retries 3 --capture=no
# This job exists to run mypy on Windows and Mac, as sometimes it can turn up different errors in these cases.
my-py:
diff --git a/.gitignore b/.gitignore
index 2e892eb..ab21c3c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,4 +4,7 @@ __pycache__/
*$py.class
# builds
-dist/*
\ No newline at end of file
+dist/*
+
+# PyCharm project
+.idea/*
\ No newline at end of file
diff --git a/.idea/.gitignore b/.idea/.gitignore
deleted file mode 100644
index 26d3352..0000000
--- a/.idea/.gitignore
+++ /dev/null
@@ -1,3 +0,0 @@
-# Default ignored files
-/shelf/
-/workspace.xml
diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml
deleted file mode 100644
index 105ce2d..0000000
--- a/.idea/inspectionProfiles/profiles_settings.xml
+++ /dev/null
@@ -1,6 +0,0 @@
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
deleted file mode 100644
index bb7e9d6..0000000
--- a/.idea/misc.xml
+++ /dev/null
@@ -1,7 +0,0 @@
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
deleted file mode 100644
index 8ff09de..0000000
--- a/.idea/modules.xml
+++ /dev/null
@@ -1,8 +0,0 @@
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/multicast_expert.iml b/.idea/multicast_expert.iml
deleted file mode 100644
index f31c208..0000000
--- a/.idea/multicast_expert.iml
+++ /dev/null
@@ -1,11 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
deleted file mode 100644
index 94a25f7..0000000
--- a/.idea/vcs.xml
+++ /dev/null
@@ -1,6 +0,0 @@
-
-
-
-
-
-
\ No newline at end of file
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3ef6a76..2c43464 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -55,6 +55,25 @@ Lorem Ipsum dolor sit amet.
_______________________________________________________________________________
+## [1.5.0] - 2025-02-XX
+
+This release is focused on addressing the performance issues caused by multicast_expert rescanning the network interfaces on the machine each time a socket is opened. You can now avoid this overhead by using the new scan functions and then passing their result into the socket constructors' `iface` argument.
+
+### Added
+- `multicast_expert.scan_interfaces()` provides a more complete wrapper around `netifaces` It will scan all the interface details from the machine into a list of `IfaceInfo` dataclass objects.
+- `multicast_expert.find_interfaces()` provides an easy way to locate interfaces matching a given specifier. The specifier may be an IPv4 or IPv6 address of the interface (as a string or an object), or an interface name (e.g. eth0).
+
+### Changed
+- `McastTxSocket` and `McastRxSocket` now accept an `IfaceSpecifier` for their interface IP arguments instead of just an IP address string. This is compatible with old usage but allows a wider range of values to be used, including an interface obtained from `scan_interfaces()` or `find_interfaces()`.
+- `McastTxSocket` and `McastRxSocket` now accept IPv4Address and IPv6Address objects in addition to strings for the `mcast_ips` and `source_ips` constructor arguments.
+ - Note that the sendto() and recvfrom() functions still accept and return only string addresses, as this matches the behavior of the `socket` module (surprisingly).
+
+### Fixed
+- It is now possible to unambiguously open a socket on an interface that has the same IP as another interface (in most cases). Previously, it was undefined which interface you'd get if this happened.
+- It is now possible to open a `McastRxSocket` socket on a machine with interfaces with multiple IPs. This previously crashed unless you explicitly specified one interface to use.
+
+_______________________________________________________________________________
+
## [1.4.2] - 2025-01-31
### Fixed
diff --git a/README.rst b/README.rst
index dc44364..443b655 100644
--- a/README.rst
+++ b/README.rst
@@ -78,15 +78,16 @@ Using Multicast Expert
Now let's get into some actual code examples. Now first, before we can create any sockets, we need to find the interface address we want to use (see above). Luckily, Multicast Expert comes with a convenient function to list all available network interfaces:
>>> import multicast_expert
->>> multicast_expert.get_interface_ips(include_ipv4=True, include_ipv6=False)
-['192.168.0.248', '192.168.153.1', '127.0.0.1']
+>>> multicast_expert.scan_interfaces()
+IfaceInfo(machine_name='{E61AD7AD-0125-4162-9967-98BE8A9CB330}', index=20, ip4_addrs=[], ip4_networks=[], ip6_addrs=[IPv6Address('fe80::1234:5678:%20')], ip6_networks=[IPv6Network('fe80::/64')])
+IfaceInfo(machine_name='{195D3CB7-6D21-4C5A-8514-C4F01494FDC0}', index=37, ip4_addrs=[IPv4Address('192.168.1.5')], ip4_networks=[IPv4Network('192.168.1.0/24')], ip6_addrs=[IPv6Address('fe80::1111:2222%37')], ip6_networks=[IPv6Network('fe80::/64')])
(note that this function is a wrapper around the netifaces library, which provides quite a bit more functionality if you need it)
But which of those is the interface we actually want to use? Well, that depends on your specific network setup, but to make an educated guess, we also have a function to get the interface your machine uses to contact the internet. This is not always correct but will work for many network setups.
->>> multicast_expert.get_default_gateway_iface_ip_v4()
-'192.168.0.248'
+>>> multicast_expert.get_default_gateway_iface(socket.AF_INET).ip4_addrs
+[IPv4Address('192.168.1.5')]
Transmitting Multicasts
=======================
@@ -96,23 +97,25 @@ To send some data to a multicast, use the McastTxSocket class. This wraps a soc
The following block shows how to create a Tx socket and send some data:
>>> import socket
->>> with multicast_expert.McastTxSocket(socket.AF_INET, mcast_ips=['239.1.2.3'], iface_ip='192.168.0.248') as mcast_tx_sock:
+>>> with multicast_expert.McastTxSocket(socket.AF_INET, mcast_ips=['239.1.2.3'], iface='192.168.0.248') as mcast_tx_sock:
... mcast_tx_sock.sendto(b'Hello World', ('239.1.2.3', 12345))
Note: when you construct the socket, you have to pass in all of the multicast IPs that you will want to use the socket to send to. These must be known in advance in order to configure socket options correctly.
-Note 2: If you omitted the iface_ip= argument, the get_default_gateway_iface_ip_v4() function would have been called to guess the iface ip. So, we could have omitted this argument for the same result.
+Note 2: If you omitted the iface= argument, the get_default_gateway_iface() function would have been called to guess the interface to use. So, we could have omitted this argument for the same result. However, you should pass this argument in most real usage, or at least make it configurable by the user. In addition to the interface IP address, you can pass the interface name or an IfaceInfo dataclass received from scan_interfaces().
Receiving Multicasts
====================
To receive from one or more multicast addresses, use the McastRxSocket class. For example:
->>> with multicast_expert.McastRxSocket(socket.AF_INET, mcast_ips=['239.1.2.3'], port=12345, iface_ip='192.168.0.248') as mcast_rx_sock:
+>>> with multicast_expert.McastRxSocket(socket.AF_INET, mcast_ips=['239.1.2.3'], port=12345) as mcast_rx_sock:
... bytes, src_address = mcast_rx_sock.recvfrom()
The above code will listen on the 239.1.2.3 multicast address, and will block until a packet is received. To change the blocking behavior, use the settimeout() function.
+For receiving multicasts, you often don't need to pass an interface name, as the default is to listen on all possible interfaces of the machine. However, you can pass one or more specific interfaces to listen on if you like, for performance or functionality reasons.
+
Full Example
============
For a complete example of how to use this library, see the system test script `here `_.
diff --git a/examples/mcast_communicator.py b/examples/mcast_communicator.py
index 33c2d11..ccb0469 100755
--- a/examples/mcast_communicator.py
+++ b/examples/mcast_communicator.py
@@ -70,25 +70,25 @@ def listener_thread(machine_index: int, addr_family: int, iface_ip: str) -> None
sys.exit(1)
if len(sys.argv) > 3:
- iface_ip = sys.argv[3]
+ iface = sys.argv[3]
else:
- iface_ip = multicast_expert.get_default_gateway_iface_ip(addr_family)
- if iface_ip is None:
+ iface = multicast_expert.get_default_gateway_iface_ip(addr_family)
+ if iface is None:
print("Unable to determine default gateway. Please specify interface in arguments.")
sys.exit(1)
# Start listener thread
listener_thread_obj = threading.Thread(
- target=listener_thread, name="Multicast Listener Thread", args=(machine_number, addr_family, iface_ip), daemon=True
+ target=listener_thread, name="Multicast Listener Thread", args=(machine_number, addr_family, iface), daemon=True
)
listener_thread_obj.start()
# Start transmitting
-print(f"Communicator starting on interface {iface_ip}. Press Ctrl-C to exit")
+print(f"Communicator starting on interface {iface}. Press Ctrl-C to exit")
with multicast_expert.McastTxSocket(
addr_family=addr_family,
mcast_ips=[MULTICAST_ADDRESSES[addr_family][0], MULTICAST_ADDRESSES[addr_family][machine_number]],
- iface_ip=iface_ip,
+ iface=iface,
) as tx_socket:
while True:
time.sleep(1.0)
diff --git a/multicast_expert/__init__.py b/multicast_expert/__init__.py
index 0337d3e..991e0ae 100644
--- a/multicast_expert/__init__.py
+++ b/multicast_expert/__init__.py
@@ -29,12 +29,18 @@
LOCALHOST_IPV6 = "::1"
+from multicast_expert.interfaces import IfaceInfo as IfaceInfo
+from multicast_expert.interfaces import IfaceSpecifier as IfaceSpecifier
+from multicast_expert.interfaces import find_interfaces as find_interfaces
+from multicast_expert.interfaces import get_default_gateway_iface as get_default_gateway_iface
+from multicast_expert.interfaces import get_default_gateway_iface_ip as get_default_gateway_iface_ip
+from multicast_expert.interfaces import get_default_gateway_iface_ip_v4 as get_default_gateway_iface_ip_v4
+from multicast_expert.interfaces import get_default_gateway_iface_ip_v6 as get_default_gateway_iface_ip_v6
+from multicast_expert.interfaces import get_interface_ips as get_interface_ips
+from multicast_expert.interfaces import scan_interfaces as scan_interfaces
from multicast_expert.rx_socket import McastRxSocket as McastRxSocket
from multicast_expert.tx_socket import McastTxSocket as McastTxSocket
from multicast_expert.utils import IPv4Or6Address as IPv4Or6Address
+from multicast_expert.utils import MulticastAddress as MulticastAddress
from multicast_expert.utils import MulticastExpertError as MulticastExpertError
-from multicast_expert.utils import get_default_gateway_iface_ip as get_default_gateway_iface_ip
-from multicast_expert.utils import get_default_gateway_iface_ip_v4 as get_default_gateway_iface_ip_v4
-from multicast_expert.utils import get_default_gateway_iface_ip_v6 as get_default_gateway_iface_ip_v6
-from multicast_expert.utils import get_interface_ips as get_interface_ips
from multicast_expert.utils import validate_mcast_ip as validate_mcast_ip
diff --git a/multicast_expert/interfaces.py b/multicast_expert/interfaces.py
new file mode 100644
index 0000000..ad4ddea
--- /dev/null
+++ b/multicast_expert/interfaces.py
@@ -0,0 +1,359 @@
+from __future__ import annotations
+
+import ipaddress
+import socket
+import warnings
+from collections.abc import Iterable, Sequence
+from dataclasses import dataclass
+from ipaddress import IPv4Address, IPv4Interface, IPv6Address, IPv6Interface
+from typing import Union, cast
+
+import netifaces
+
+from multicast_expert import LOCALHOST_IPV4, LOCALHOST_IPV6
+from multicast_expert.name_to_index import iface_name_to_index
+from multicast_expert.utils import MulticastExpertError, ip_interface_to_ip_string
+
+
+@dataclass(frozen=True)
+class IfaceInfo:
+ """
+ Class to store data about an interface.
+ """
+
+ machine_name: str
+ """
+ Unique machine-readable name of this interface.
+
+ On UNIX platforms this looks like 'eno1' or 'enp5s0f0'. On Windows platforms this is GUID,
+ like '{E61AD7AD-0125-4162-9967-98BE8A9CB330}'
+ """
+
+ # NOTE: I would love to add a new member like 'friendly_name' which would have the interface
+ # human readable name in Windows. However, this functionality is not available in netifaces.
+ # It is available in psutil but psutil doesn't have a way to get the interface index OR the
+ # GUID, which we need one of.
+ # It will have to wait for netifaces-2 support...
+
+ index: int
+ """ Unique integer index of this interface. """
+
+ link_layer_address: str | None
+ """
+ Link layer address of this interface, as a string, or None if it could not be detected.
+
+ For most network connections, this is a 6 octet MAC address, e.g. 00:01:02:03:04:05. However,
+ it might also be something else if that's what your link uses. See the netifaces README for details.
+
+ Note: the loopback interface generally does not have a link layer address.
+ """
+
+ ip4_addrs: Sequence[IPv4Interface]
+ """
+ IPv4 addresses assigned to this interface.
+
+ Most interfaces only have one IPv4 address, but some can have multiple.
+ """
+
+ ip6_addrs: Sequence[IPv6Interface]
+ """
+ IPv6 addresses assigned to this interface.
+
+ Most interfaces only have one IPv6 address, but some can have multiple.
+ """
+
+ def is_localhost(self) -> bool:
+ """
+ :return: Whether this interface is the loopback/localhost interface based on its IP address
+ """
+ # Note: on Mac, the localhost interface appears to have two IPv6 IPs, "::1" and "fe80::1%lo0".
+ # But this still works because we just need ::1 to be one of its IPs.
+
+ return IPv6Interface(LOCALHOST_IPV6) in self.ip6_addrs or IPv4Interface(LOCALHOST_IPV4) in self.ip4_addrs
+
+ def ip_addrs(self, family: int) -> Sequence[IPv4Interface] | Sequence[IPv6Interface]:
+ """
+ Get the IP addresses of this interface on a given address family.
+
+ :param family: Address family (socket.AF_INET or socket.AF_INET6)
+
+ :return: IP addresses of the given addr family
+ """
+ if family == socket.AF_INET:
+ return self.ip4_addrs
+ elif family == socket.AF_INET6:
+ return self.ip6_addrs
+ message = "Unknown family!"
+ raise KeyError(message)
+
+
+IfaceSpecifier = Union[str, IPv4Address, IPv6Address, IfaceInfo]
+""" Type for something that can specify an interface in multicast expert.
+
+May be:
+ - An IPv4 address assigned to the interface, as a string or IPv4Address
+ - An IPv6 address assigned to the interface, as a string or IPv6Address. Scope ID is optional, i.e.
+ '1234::abcd%5' and '1234::abcd' will both work. Scope ID is required if you wish to uniquely identify
+ an interface on a machine with multiple IPv6 interfaces with the same address.
+ - An interface machine readable name
+ - An IfaceInfo object
+"""
+
+
+def scan_interfaces() -> list[IfaceInfo]:
+ """
+ Scan the IP interfaces on the machine and return a list containing info for each interface.
+
+ This is a wrapper around netifaces functionality. It allows all of the interface info to be queried up
+ front, saving the significant amount of CPU needed to query it each time a socket is created.
+
+ .. note::
+ If an interface is currently down, it will appear in the list, but it is undefined whether
+ it will show any IP addresses or not. This is a limitation of the underlying netifaces library,
+ and we hope to clarify this behavior eventually.
+
+ :return: IfaceInfo objects scanned from the current machine.
+ """
+ result = []
+ for iface_name in netifaces.interfaces():
+ ip4_addrs = []
+ ip6_addrs = []
+
+ addresses_at_each_level = netifaces.ifaddresses(iface_name)
+
+ # Note: Check needed because some interfaces do not have an ipv4 or ipv6 address
+ if netifaces.AF_INET in addresses_at_each_level:
+ for addr_info in addresses_at_each_level[netifaces.AF_INET]:
+ ip4_addrs.append(IPv4Interface((addr_info["addr"], addr_info["netmask"])))
+
+ if netifaces.AF_INET6 in addresses_at_each_level:
+ for addr_info in addresses_at_each_level[netifaces.AF_INET6]:
+ # Netifaces implements its own method of converting IPv6 netmasks to strings that produces strings like
+ # "ffff:ffff:ffff:ffff::/64". As far as I can tell, this is not a standard notation, and in fact
+ # the concept of a "subnet mask string" for an IPv6 address is... dubious at best, standards-wise.
+ # Meanwhile, IPv6Address just wants the prefix length in bits.
+ prefix_len = int(addr_info["netmask"].split("/")[1])
+
+ ip6_addrs.append(IPv6Interface((addr_info["addr"], prefix_len)))
+
+ if netifaces.AF_LINK in addresses_at_each_level and len(addresses_at_each_level[netifaces.AF_LINK]) > 0:
+ link_layer_addr = addresses_at_each_level[netifaces.AF_LINK][0]["addr"]
+ else:
+ link_layer_addr = None
+
+ result.append(
+ IfaceInfo(
+ machine_name=iface_name,
+ index=iface_name_to_index(iface_name),
+ link_layer_address=link_layer_addr,
+ ip4_addrs=ip4_addrs,
+ ip6_addrs=ip6_addrs,
+ )
+ )
+
+ return result
+
+
+def _find_interfaces_for_specifier(specifier: IfaceSpecifier, ifaces: Sequence[IfaceInfo]) -> list[IfaceInfo]:
+ """
+ Find one or more IfaceInfos based on an interface specifier.
+
+ If no interfaces match the specifier, a MulticastExpertError is raised.
+
+ :param specifier: Specifier for the interface you want to find. If this is an IfaceInfo already, it will simply be returned.
+ :param ifaces: If ``specifier`` is not an IfaceInfo, these interfaces will be searched using the specifier.
+
+ :return: Found interface(s). Note that multiple interfaces can only be returned if the specifier
+ is an IP address.
+ """
+ # Easy case, we already have the interface
+ if isinstance(specifier, IfaceInfo):
+ return [specifier]
+
+ # Try to match the specifier to any known interface name.
+ # (please ${DEITY} do not let anyone name an interface with an IP address)
+ if isinstance(specifier, str):
+ for iface in ifaces:
+ if iface.machine_name == specifier:
+ # Found a match!
+ return [iface]
+
+ if isinstance(specifier, str):
+ try:
+ specifier_ip_addr = ipaddress.ip_address(specifier)
+ except Exception as ex:
+ message = f"Specifier '{specifier}' does not appear to be a valid interface name or IP address!"
+ raise MulticastExpertError(message) from ex
+ else:
+ specifier_ip_addr = specifier
+
+ result = []
+ for iface in ifaces:
+ # Annoyingly IPv[4/6]Network does not compare as equal to IPv[4/6]Address, so we have to convert
+ addrs_to_check: set[IPv4Address | IPv6Address] = set()
+ if isinstance(specifier_ip_addr, IPv6Address):
+ for addr in iface.ip6_addrs:
+ # go through string to work around https://github.com/python/cpython/issues/129538
+ addr_string = ip_interface_to_ip_string(addr)
+ if specifier_ip_addr.scope_id is None:
+ # Trim off the scope ID from the address string
+ addr_string = addr_string.split("%")[0]
+ addrs_to_check.add(IPv6Address(addr_string))
+ else:
+ addrs_to_check.update(addr.ip for addr in iface.ip4_addrs)
+
+ if specifier_ip_addr in addrs_to_check:
+ result.append(iface)
+
+ if len(result) == 0:
+ message = f"No matches found for interface IP address {specifier_ip_addr!s}"
+ raise MulticastExpertError(message)
+
+ return result
+
+
+def find_interfaces(
+ specifiers: Iterable[IfaceSpecifier], *, ifaces: Sequence[IfaceInfo] | None = None
+) -> list[IfaceInfo]:
+ """
+ Find interfaces (represented by IfaceInfo objects) based on a collection of interface specifiers.
+
+ If no interfaces match any individual specifier, a MulticastExpertError is raised.
+
+ :param specifiers: Specifier for each interface you want to find. If a specifier is an IfaceInfo already,
+ it will simply be added to the result list.
+ :param ifaces: If set, these interfaces will be searched using the specifier.
+ If not set, then the current set of interfaces will be scanned from the machine.
+
+ :return: Found interface(s). This function will usually return as many IfaceInfos as the number of
+ specifiers you passed in. However, if one IP address is used on multiple interfaces, and you
+ pass in that IP address as a specifier, multiple interfaces will be matched for that specifier. Also, if
+ multiple specifiers matched the same interface, the results will be deduplicated.
+ """
+ # First check if we were passed all IfaceInfos. If so we can return early without scanning interfaces
+ specifiers = list(specifiers)
+ if all(isinstance(x, IfaceInfo) for x in specifiers):
+ return cast(list[IfaceInfo], specifiers)
+
+ # Now we must scan interfaces if not passed them earlier
+ if ifaces is None:
+ ifaces = scan_interfaces()
+
+ # Find candidate interfaces for each specifier
+ results = [iface for specifier in specifiers for iface in _find_interfaces_for_specifier(specifier, ifaces=ifaces)]
+ result_dict = {iface.index: iface for iface in results} # deduplicate interfaces
+ return list(result_dict.values())
+
+
+def get_interface_ips(include_ipv4: bool = True, include_ipv6: bool = True) -> list[str]:
+ """
+ Use this function to get a list of all the interface IP addresses available on this machine.
+
+ This is the legacy way to query this information; it implicitly assumes a 1:1 mapping between interfaces and
+ IP addresses. It is recommended to use scan_interfaces() instead because that function can disambiguate multiple
+ interfaces with the same IP.
+
+ :param include_ipv4: If true, IPv4 addresses will be included in the results
+ :param include_ipv6: If true, IPv6 addresses will be included in the results
+
+ .. note::
+ If two interfaces on this machine have the same IP address, this function will warn and return only one of the interfaces.
+
+ :return: List of the interface IP of every interface on your machine, as a string.
+ """
+ interfaces = scan_interfaces()
+ ip_set: set[str] = set()
+ for iface in interfaces:
+ this_iface_addresses: list[str] = []
+ if include_ipv4:
+ this_iface_addresses.extend(ip_interface_to_ip_string(addr) for addr in iface.ip4_addrs)
+ if include_ipv6:
+ this_iface_addresses.extend(ip_interface_to_ip_string(addr) for addr in iface.ip6_addrs)
+
+ for addr_str in this_iface_addresses:
+ if addr_str in ip_set:
+ warnings.warn(
+ f"Interface {iface.machine_name} has IP {addr_str} which is also used by another interface. "
+ f"Passing this interface IP to multicast_expert will result in an error. We recommend using "
+ f"multicast_expert.scan_interfaces() instead to handle this situation cleanly.",
+ stacklevel=2,
+ )
+ ip_set.add(addr_str)
+
+ return list(ip_set)
+
+
+def get_default_gateway_iface_ip_v6(*, ifaces: list[IfaceInfo] | None = None) -> str | None:
+ """
+ Get the IP address of the interface that connects to the default IPv6 gateway.
+
+ :param ifaces: List of interfaces to search. If not provided, interfaces will be scanned.
+ :return: IP address as a string, or None if it cannot be determined.
+ """
+ return get_default_gateway_iface_ip(netifaces.AF_INET6, ifaces=ifaces)
+
+
+def get_default_gateway_iface_ip_v4(*, ifaces: list[IfaceInfo] | None = None) -> str | None:
+ """
+ Get the IP address of the interface that connects to the default IPv4 gateway.
+
+ :param ifaces: List of interfaces to search. If not provided, interfaces will be scanned.
+ :return: IP address as a string, or None if it cannot be determined.
+ """
+ return get_default_gateway_iface_ip(netifaces.AF_INET, ifaces=ifaces)
+
+
+def get_default_gateway_iface(addr_family: int, *, ifaces: list[IfaceInfo] | None = None) -> IfaceInfo | None:
+ """
+ Get the info of the interface that connects to the default gateway of the given addr_family.
+
+ :param addr_family: Address family to use (netifaces.AF_INET or netifaces.AF_INET6).
+ :param ifaces: List of interfaces to search. If not provided, interfaces will be scanned.
+ :return: IfaceInfo for the default gateway interface, or None if it cannot be determined.
+ """
+ # Enumerate all gateways using netifaces
+ try:
+ gateways = netifaces.gateways()
+ except OSError:
+ return None
+
+ # If it can, it will identify one of those as the default gateway for traffic.
+ # If not, return none.
+ if not "default" in gateways:
+ return None
+ if not addr_family in gateways["default"]:
+ return None
+
+ default_gateway = gateways["default"][addr_family]
+ default_gateway_iface = default_gateway[1] # element 1 is the iface name, per the docs
+
+ # Now, use the interface info to get the IP address of that interface.
+ if ifaces is None:
+ ifaces = scan_interfaces()
+
+ try:
+ # Note: guaranteed to return only 1 element because we are passing the iface name
+ return _find_interfaces_for_specifier(default_gateway_iface, ifaces=ifaces)[0]
+ except MulticastExpertError:
+ return None
+
+
+def get_default_gateway_iface_ip(addr_family: int, *, ifaces: list[IfaceInfo] | None = None) -> str | None:
+ """
+ Get the IP address of the interface that connects to the default gateway of the given addr_family.
+
+ :param addr_family: Address family to use (netifaces.AF_INET or netifaces.AF_INET6).
+ :param ifaces: List of interfaces to search
+
+ :return: IP address as a string, or None if it cannot be determined.
+ """
+ iface_info = get_default_gateway_iface(addr_family, ifaces=ifaces)
+ if iface_info is None:
+ return None
+
+ if addr_family == netifaces.AF_INET and len(iface_info.ip4_addrs) > 0:
+ return ip_interface_to_ip_string(iface_info.ip4_addrs[0])
+ elif addr_family == netifaces.AF_INET6 and len(iface_info.ip6_addrs) > 0:
+ return ip_interface_to_ip_string(iface_info.ip6_addrs[0])
+ else:
+ return None
diff --git a/multicast_expert/name_to_index.py b/multicast_expert/name_to_index.py
new file mode 100644
index 0000000..e0091b2
--- /dev/null
+++ b/multicast_expert/name_to_index.py
@@ -0,0 +1,40 @@
+# Windows-compatible version of if_nametoindex() using ctypes
+
+from __future__ import annotations
+
+import ctypes
+import socket
+import sys
+
+# Import needed Win32 DLL functions
+if sys.platform == "win32":
+ iphlpapi = ctypes.WinDLL("iphlpapi")
+ win32_GetAdapterIndex = iphlpapi.GetAdapterIndex # noqa: N816
+ win32_GetAdapterIndex.argtypes = [ctypes.c_wchar_p, ctypes.POINTER(ctypes.c_ulong)]
+
+
+def iface_name_to_index(iface_name: str) -> int:
+ """
+ Convert a network interface's name into its interface index.
+
+ :param iface_name: Machine-readable name of the interface
+ :return: Interface index
+ """
+ if sys.platform == "win32":
+ # To get the if index on Windows we have to use the GetAdapterIndex() function.
+ # docs here: https://docs.microsoft.com/en-us/windows/win32/api/iphlpapi/nf-iphlpapi-getadapterindex
+
+ if_idx = ctypes.c_ulong() # value is returned here
+
+ # For reasons I don't really understand, this string has to be prepended to the names returned by netifaces
+ # in order for the win32 API to recognize them.
+ iface_name_string = ctypes.c_wchar_p("\\DEVICE\\TCPIP_" + iface_name)
+
+ ret = win32_GetAdapterIndex(iface_name_string, ctypes.byref(if_idx))
+ if ret != 0:
+ raise ctypes.WinError(ret, "GetAdapterIndex() failed")
+
+ return if_idx.value
+ else:
+ # Unix implementation is easy, we can just use the socket function
+ return socket.if_nametoindex(iface_name)
diff --git a/multicast_expert/os_multicast.py b/multicast_expert/os_multicast.py
index 9fc4a2b..4dc3266 100644
--- a/multicast_expert/os_multicast.py
+++ b/multicast_expert/os_multicast.py
@@ -8,10 +8,9 @@
import socket
import struct
import sys
-from dataclasses import dataclass
-
-import netifaces
+from ipaddress import IPv4Address, IPv6Address
+from multicast_expert.interfaces import IfaceInfo
from multicast_expert.utils import is_mac, is_windows
# Import needed Win32 DLL functions
@@ -21,27 +20,6 @@
win32_GetAdapterIndex.argtypes = [ctypes.c_wchar_p, ctypes.POINTER(ctypes.c_ulong)]
-def iface_ip_to_name(iface_ip: str) -> str:
- """
- Convert a network interface's interface IP into its interface name.
- """
- # Go from IP to interface name using netifaces. To do that, we have to iterate through
- # all of the machine's interfaces
- iface_name: str | None = None
- for interface in netifaces.interfaces():
- addresses_at_each_level: dict[int, list[dict[str, str]]] = netifaces.ifaddresses(interface)
- for address_family in [netifaces.AF_INET, netifaces.AF_INET6]:
- if address_family in addresses_at_each_level:
- for address in addresses_at_each_level[address_family]:
- if address["addr"] == iface_ip:
- iface_name = interface
-
- if iface_name is None:
- raise KeyError("Could not find network address with local IP " + iface_ip)
-
- return iface_name
-
-
def iface_name_to_index(iface_name: str) -> int:
"""
Convert a network interface's name into its interface index.
@@ -66,36 +44,7 @@ def iface_name_to_index(iface_name: str) -> int:
return socket.if_nametoindex(iface_name)
-@dataclass
-class IfaceInfo:
- """
- Class to store data about an interface.
-
- Parameters
- ----------
- iface_ip:
- IP address of the interface as a string
- iface_name:
- Name of the interface as returned by netifaces.
- On Windows this is a guid, on unix it's the name you'd get from e.g. `ip link`.
- iface_idx:
- Index of this interface with the OS. This is an internal value used in some system calls.
- """
-
- iface_ip: str
- iface_name: str
- iface_idx: int
-
-
-def get_iface_info(iface_ip: str) -> IfaceInfo:
- """
- Return an IfaceInfo object for a network interface from its IP address
- """
- iface_name = iface_ip_to_name(iface_ip)
- return IfaceInfo(iface_ip, iface_name, iface_name_to_index(iface_name))
-
-
-def make_ip_mreqn_struct(mcast_addr: str, iface_info: IfaceInfo) -> bytes:
+def make_ip_mreqn_struct(mcast_addr: IPv4Address | IPv6Address, iface_info: IfaceInfo) -> bytes:
"""
Generates an ip_mreqn structure (used for setsockopt) from an IPv4 address and an interface.
"""
@@ -105,23 +54,23 @@ def make_ip_mreqn_struct(mcast_addr: str, iface_info: IfaceInfo) -> bytes:
# the imr_ifindex.
return struct.pack(
"@4s4si",
- socket.inet_aton(mcast_addr), # imr_multiaddr
- socket.inet_aton(iface_info.iface_ip), # imr_address
- iface_info.iface_idx,
+ mcast_addr.packed, # imr_multiaddr
+ iface_info.ip4_addrs[0].packed, # imr_address
+ iface_info.index,
) # imr_ifindex
-def make_ipv6_mreq_struct(mcast_ip: str, iface_info: IfaceInfo) -> bytes:
+def make_ipv6_mreq_struct(mcast_ip: IPv4Address | IPv6Address, iface_info: IfaceInfo) -> bytes:
"""
Generates an ipv6_mreq structure to be used with IPV6_ADD_MEMBERSHIP.
"""
# Structure documented here on Windows: https://docs.microsoft.com/en-us/windows/win32/api/ws2ipdef/ns-ws2ipdef-ipv6_mreq
# and here on Linux: https://github.com/torvalds/linux/blob/3d7cb6b04c3f3115719235cc6866b10326de34cd/include/uapi/linux/in6.h#L60
- return struct.pack("=16sI", socket.inet_pton(socket.AF_INET6, mcast_ip), iface_info.iface_idx)
+ return struct.pack("=16sI", mcast_ip.packed, iface_info.index)
def set_multicast_if(
- mcast_socket: socket.socket, mcast_ips: list[str], iface_info: IfaceInfo, addr_family: int
+ mcast_socket: socket.socket, mcast_ips: list[IPv4Address | IPv6Address], iface_info: IfaceInfo, addr_family: int
) -> None:
"""
Set the IP_MULTICAST_IF / IPV6_MULTICAST_IF socket option to an interface on a given socket.
@@ -133,12 +82,10 @@ def set_multicast_if(
# See docs here: https://docs.microsoft.com/en-us/windows/win32/winsock/ipproto-ip-socket-options
if addr_family == socket.AF_INET:
# IPv4 has if index in *network* byte order
- mcast_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, struct.pack("!I", iface_info.iface_idx))
+ mcast_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, struct.pack("!I", iface_info.index))
else: # IPv6
# IPv6 has if index in *host* byte order
- mcast_socket.setsockopt(
- socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, struct.pack("I", iface_info.iface_idx)
- )
+ mcast_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, struct.pack("I", iface_info.index))
elif addr_family == socket.AF_INET:
# On Linux/Mac IPv4, IP_MULTICAST_IF takes an ip_mreq struct and needs to be specified for each
@@ -149,10 +96,12 @@ def set_multicast_if(
# Linux/Mac IPv6 is same as Windows IPv6.
# Note: Documentation is very misleading, it does not take a pointer from the Python perspective,
# it just takes an int!
- mcast_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, struct.pack("I", iface_info.iface_idx))
+ mcast_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, struct.pack("I", iface_info.index))
-def add_memberships(mcast_socket: socket.socket, mcast_ips: list[str], iface_info: IfaceInfo, addr_family: int) -> None:
+def add_memberships(
+ mcast_socket: socket.socket, mcast_ips: list[IPv4Address | IPv6Address], iface_info: IfaceInfo, addr_family: int
+) -> None:
"""
Add a non-source-specific membership for the given multicast IPs on the given socket.
"""
@@ -161,7 +110,7 @@ def add_memberships(mcast_socket: socket.socket, mcast_ips: list[str], iface_inf
# See docs here: https://docs.microsoft.com/en-us/windows/win32/winsock/ipproto-ip-socket-options
if addr_family == socket.AF_INET:
# For IPv4, we pass the mcast addr and the if index in *network* byte order
- mreq_bytes = struct.pack("!4sI", socket.inet_aton(mcast_ip), iface_info.iface_idx)
+ mreq_bytes = struct.pack("!4sI", mcast_ip.packed, iface_info.index)
mcast_socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq_bytes)
else: # IPv6
# Note: Docs call the option "IPV6_ADD_MEMBERSHIP" but Python only has "IPV6_JOIN_GROUP"
@@ -181,7 +130,10 @@ def add_memberships(mcast_socket: socket.socket, mcast_ips: list[str], iface_inf
def add_source_specific_memberships(
- mcast_socket: socket.socket, mcast_ips: list[str], source_ips: list[str], iface_info: IfaceInfo
+ mcast_socket: socket.socket,
+ mcast_ips: list[IPv4Address | IPv6Address],
+ source_ips: list[IPv4Address | IPv6Address],
+ iface_info: IfaceInfo,
) -> None:
"""
Add a source-specific membership for the given multicast IPs on the given socket, with the given sources.
@@ -196,27 +148,27 @@ def add_source_specific_memberships(
# Note: imr_interface is in network byte order
mreq_source_bytes = struct.pack(
"!4s4sI",
- socket.inet_aton(mcast_ip), # imr_multiaddr
- socket.inet_aton(source_ip), # imr_sourceaddr
- iface_info.iface_idx,
- ) # imr_interface
+ mcast_ip.packed, # imr_multiaddr
+ source_ip.packed, # imr_sourceaddr
+ iface_info.index, # imr_interface
+ )
elif is_mac:
# Struct documented here:
- # https://opensource.apple.com/source/xnu/xnu-4570.1.46/bsd/netinet/in.h.auto.html
+ # https://github.com/apple/darwin-xnu/blob/main/bsd/netinet/in.h#L583
mreq_source_bytes = struct.pack(
"@4s4s4s",
- socket.inet_aton(mcast_ip), # imr_multiaddr
- socket.inet_aton(source_ip), # imr_sourceaddr
- socket.inet_aton(iface_info.iface_ip),
- ) # imr_interface
+ mcast_ip.packed, # imr_multiaddr
+ source_ip.packed, # imr_sourceaddr
+ iface_info.ip4_addrs[0].packed, # imr_interface
+ )
else:
# Struct documented here: https://linux.die.net/man/7/ip
mreq_source_bytes = struct.pack(
"@4s4s4s",
- socket.inet_aton(mcast_ip), # imr_multiaddr
- socket.inet_aton(iface_info.iface_ip), # imr_interface
- socket.inet_aton(source_ip),
- ) # imr_sourceaddr
+ mcast_ip.packed, # imr_multiaddr
+ iface_info.ip4_addrs[0].packed, # imr_interface
+ source_ip.packed, # imr_sourceaddr
+ )
mcast_socket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_SOURCE_MEMBERSHIP, mreq_source_bytes) # type: ignore[attr-defined]
diff --git a/multicast_expert/rx_socket.py b/multicast_expert/rx_socket.py
index f8a9a1b..258e870 100644
--- a/multicast_expert/rx_socket.py
+++ b/multicast_expert/rx_socket.py
@@ -1,18 +1,22 @@
from __future__ import annotations
+import ipaddress
import select
import socket
+from collections.abc import Sequence
from types import TracebackType
from typing import TYPE_CHECKING, cast
if TYPE_CHECKING:
from typing_extensions import Self
-from multicast_expert import LOCALHOST_IPV4, LOCALHOST_IPV6, os_multicast
+from multicast_expert import os_multicast
+from multicast_expert.interfaces import IfaceInfo, IfaceSpecifier, find_interfaces, scan_interfaces
from multicast_expert.utils import (
IPv4Or6Address,
+ MulticastAddress,
MulticastExpertError,
- get_interface_ips,
+ ip_interface_to_ip_string,
is_windows,
validate_mcast_ip,
)
@@ -26,21 +30,30 @@ class McastRxSocket:
def __init__(
self,
addr_family: int,
- mcast_ips: list[str],
+ mcast_ips: Sequence[MulticastAddress],
port: int,
- iface_ip: str | None = None,
- iface_ips: list[str] | None = None,
- source_ips: list[str] | None = None,
+ iface: IfaceSpecifier | None = None,
+ ifaces: Sequence[IfaceSpecifier] | None = None,
+ source_ips: Sequence[str | ipaddress.IPv4Address | ipaddress.IPv6Address] | None = None,
timeout: float | None = None,
blocking: bool | None = None,
enable_external_loopback: bool = False,
+ iface_ip: str | None = None,
+ iface_ips: Sequence[str] | None = None,
):
"""
Create a socket which receives UDP datagrams over multicast.
The socket must be opened (e.g. using a with statement) before it can be used.
- Note: This socket can only receive multicast traffic, not regular unicast traffic.
+ By default (if no arguments are passed), the Rx socket will listen on all non-loopback interfaces of
+ the machine that have addresses in the given family (IPv4 or IPv6). If you wish to select a specific
+ interface or interfaces, pass them using the ``iface`` or ``ifaces`` arguments.
+
+ ``multicast_expert.scan_interfaces()`` may be used to obtain a list of interfaces on the machine,
+ and ``multicast_expert.find_interfaces()`` may be used to find interfaces matching a given specifier.
+ Passing in an IfaceInfo obtained from one of those functions to this function will make opening multiple
+ sockets more performant as there will be no need to scan interface info from the machine again.
:param addr_family: Sets IPv4 or IPv6 operation. Either socket.AF_INET or socket.AF_INET6.
:param mcast_ips: List of all possible multicast IPs that this socket can receive from.
@@ -48,8 +61,8 @@ def __init__(
:param iface_ips: Interface IPs that this socket receives from. If left as None, multicast_expert will
attempt to listen on all (non-loopback) interfaces discovered on your machine. Be careful, this default
may not be desired in many cases. See the docs for details.
- :param iface_ip: Legacy alias for iface_ips. If this is given and iface_ips is not, this adds the
- given interface IP to iface_ips.
+ :param iface: Specify one interface to open the Rx socket on.
+ :param ifaces: Specify multiple interfaces to open the Rx socket on.
:param source_ips: Optional, list of source addresses to restrict the multicast subscription to. The
OS will drop messages not from one of these IPs, and may even use special IGMPv3 source-specific
subscription packets to ask for only those specific multicasts from switches/routers.
@@ -61,12 +74,17 @@ def __init__(
:param enable_external_loopback: Enable loopback of multicast packets sent on external interfaces. If and only
if this option is set to True, McastRxSockets will be able to receive packets sent by McastTxSockets open on
the same address, port, and interface.
+ :param iface_ip: Legacy alias for iface.
+ :param iface_ips: Legacy alias for iface_ips
"""
self.addr_family = addr_family
- self.mcast_ips = mcast_ips
- self.port = port
- self.source_ips = source_ips
+ # Convert all addresses from strings to IP addresses
+ self.mcast_ips = [ipaddress.ip_address(ip) for ip in mcast_ips]
+ if source_ips is not None:
+ self.source_ips = [ipaddress.ip_address(ip) for ip in source_ips]
+
+ self.port = port
self.is_opened = False
# blocking overrides timeout if set
@@ -75,40 +93,49 @@ def __init__(
else:
self.timeout = timeout
- # Handle legacy iface_ip argument if given
- self.iface_ips: list[str]
+ # Handle legacy iface_ip arguments
+ iface_specifiers: list[IfaceSpecifier] = [] if ifaces is None else list(ifaces)
+ if iface is not None:
+ if len(iface_specifiers) > 0:
+ message = "'iface' may not be specified at the same time as 'ifaces'"
+ raise MulticastExpertError(message)
+ iface_specifiers = [iface]
+ if iface_ips is not None:
+ if len(iface_specifiers) > 0:
+ message = "'iface_ips' may not be specified at the same time as other interface arguments"
+ raise MulticastExpertError(message)
+ iface_specifiers.extend(iface_ips)
if iface_ip is not None:
- if iface_ips is not None:
- message = "Both iface_ips and iface_ip may not be specified at the same time!"
+ if len(iface_specifiers) > 0:
+ message = "'iface_ip' may not be specified at the same time as other interface arguments"
raise MulticastExpertError(message)
-
- self.iface_ips = [iface_ip]
-
- elif iface_ips is None:
- self.iface_ips = get_interface_ips(addr_family == socket.AF_INET, addr_family == socket.AF_INET6)
-
- # Don't include the localhost IPs when listening on all interfaces, as that would cause
- # us to receive all mcasts sent by the current machine.
- if self.addr_family == socket.AF_INET6 and LOCALHOST_IPV6 in self.iface_ips:
- self.iface_ips.remove(LOCALHOST_IPV6)
- if self.addr_family == socket.AF_INET and LOCALHOST_IPV4 in self.iface_ips:
- self.iface_ips.remove(LOCALHOST_IPV4)
-
- if len(self.iface_ips) == 0:
+ iface_specifiers = [iface_ip]
+
+ # Scanning the interfaces is expensive, so only do it if we need to
+ scanned_ifaces = None
+ if len(iface_specifiers) == 0 or any(not isinstance(specifier, IfaceInfo) for specifier in iface_specifiers):
+ scanned_ifaces = scan_interfaces()
+
+ # If no interfaces passed, select all interfaces with IP addresses in the desired family
+ if len(iface_specifiers) == 0:
+ # Tell mypy that scanned_ifaces cannot be None
+ scanned_ifaces = cast(list[IfaceInfo], scanned_ifaces)
+
+ for scanned_iface in scanned_ifaces:
+ if (addr_family == socket.AF_INET and len(scanned_iface.ip4_addrs) > 0) or (
+ addr_family == socket.AF_INET6 and len(scanned_iface.ip6_addrs) > 0
+ ):
+ # Don't include the localhost IPs when listening on all interfaces, as that would cause
+ # us to receive all mcasts sent by the current machine.
+ if not scanned_iface.is_localhost():
+ iface_specifiers.append(scanned_iface)
+
+ if len(iface_specifiers) == 0:
message = "Unable to discover any listenable interfaces on this machine."
raise MulticastExpertError(message)
- else:
- self.iface_ips = iface_ips
-
- # Resolve the interfaces now. This prevents having to do this relatively expensive call
- # multiple times later.
- self.iface_infos = {}
- for ip in self.iface_ips:
- try:
- self.iface_infos[ip] = os_multicast.get_iface_info(ip)
- except KeyError as ex:
- message = f"Interface IP {ip} does not seem to correspond to a valid interface. Valid interfaces: {', '.join(get_interface_ips())}"
- raise MulticastExpertError(message) from ex
+
+ # Resolve the interfaces now.
+ self._iface_infos = find_interfaces(iface_specifiers, ifaces=scanned_ifaces)
# Sanity check multicast addresses
for mcast_ip in self.mcast_ips:
@@ -134,26 +161,24 @@ def __enter__(self) -> Self:
# to all multicast addresses on each of those sockets
# On Unix, we need one socket bound to each multicast address.
if is_windows:
- for iface_ip in self.iface_ips:
+ for iface_info in self._iface_infos:
new_socket = socket.socket(family=self.addr_family, type=socket.SOCK_DGRAM)
new_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- new_socket.bind((iface_ip, self.port))
+ new_socket.bind((ip_interface_to_ip_string(iface_info.ip_addrs(self.addr_family)[0]), self.port))
if self.is_source_specific:
os_multicast.add_source_specific_memberships(
- new_socket, self.mcast_ips, cast(list[str], self.source_ips), self.iface_infos[iface_ip]
+ new_socket, self.mcast_ips, self.source_ips, iface_info
)
else:
- os_multicast.add_memberships(
- new_socket, self.mcast_ips, self.iface_infos[iface_ip], self.addr_family
- )
+ os_multicast.add_memberships(new_socket, self.mcast_ips, iface_info, self.addr_family)
# On Windows, by default, sent packets are looped back to local sockets on the same interface, even for interfaces
- # that are not loopback.Change this by disabling IP_MULTICAST_LOOP unless the loopback interface is used or
+ # that are not loopback. Change this by disabling IP_MULTICAST_LOOP unless the loopback interface is used or
# if enable_external_loopback is set.
# Note: multicast_expert submitted a PR to clarify this in the Windows docs, and it was accepted!
- loop_enabled = self.enable_external_loopback or iface_ip in (LOCALHOST_IPV4, LOCALHOST_IPV6)
+ loop_enabled = self.enable_external_loopback or iface_info.is_localhost()
if self.addr_family == socket.AF_INET:
new_socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, loop_enabled)
else:
@@ -162,33 +187,37 @@ def __enter__(self) -> Self:
self.sockets.append(new_socket)
else:
for mcast_ip in self.mcast_ips:
- # For IPv6 on Unix, we need to create one socket for each mcast_ip - iface_ip permutation.
+ # Determine sockets to create and ifaces to subscribe to.
+ # Outer list = sockets to create.
+ # Inner list = ifaces to subscribe to on each socket
+ sockets_and_ifaces: list[list[IfaceInfo]]
+
+ # For IPv6 on Unix, we need to create one socket for each mcast_ip - iface permutation.
# For IPv4, on the systems I tested at least, you can get away with subscribing to multiple
# interfaces on one socket.
if self.addr_family == socket.AF_INET6:
- iface_ip_groups = [[iface_ip] for iface_ip in self.iface_ips]
+ sockets_and_ifaces = [[iface_info] for iface_info in self._iface_infos]
else:
- iface_ip_groups = [self.iface_ips]
+ sockets_and_ifaces = [self._iface_infos]
- for iface_ips_this_group in iface_ip_groups:
+ for ifaces_this_socket in sockets_and_ifaces:
new_socket = socket.socket(family=self.addr_family, type=socket.SOCK_DGRAM)
new_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if self.addr_family == socket.AF_INET6:
# Note: for Unix IPv6, need to specify the scope ID in the bind address in order for link-local mcast addresses to work.
- new_socket.bind((mcast_ip, self.port, 0, self.iface_infos[iface_ips_this_group[0]].iface_idx))
+ # Also, for IPv6, len(ifaces_this_socket) is always 1.
+ new_socket.bind((str(mcast_ip), self.port, 0, ifaces_this_socket[0].index))
else:
- new_socket.bind((mcast_ip, self.port))
+ new_socket.bind((str(mcast_ip), self.port))
- for iface_ip in iface_ips_this_group:
+ for iface_info in ifaces_this_socket:
if self.is_source_specific:
os_multicast.add_source_specific_memberships(
- new_socket, [mcast_ip], cast(list[str], self.source_ips), self.iface_infos[iface_ip]
+ new_socket, [mcast_ip], self.source_ips, iface_info
)
else:
- os_multicast.add_memberships(
- new_socket, [mcast_ip], self.iface_infos[iface_ip], self.addr_family
- )
+ os_multicast.add_memberships(new_socket, [mcast_ip], iface_info, self.addr_family)
self.sockets.append(new_socket)
@@ -277,3 +306,8 @@ def settimeout(self, timeout: float | None) -> None:
"""
self.timeout = timeout
+
+ @property
+ def network_interfaces(self) -> list[IfaceInfo]:
+ """Get the interface(s) used by this socket."""
+ return self._iface_infos
diff --git a/multicast_expert/tx_socket.py b/multicast_expert/tx_socket.py
index d4a8dbf..e047082 100644
--- a/multicast_expert/tx_socket.py
+++ b/multicast_expert/tx_socket.py
@@ -1,18 +1,21 @@
from __future__ import annotations
+import ipaddress
import socket
+from collections.abc import Sequence
from types import TracebackType
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing_extensions import Self
-from multicast_expert import LOCALHOST_IPV4, LOCALHOST_IPV6, os_multicast
+from multicast_expert import os_multicast
+from multicast_expert.interfaces import IfaceInfo, IfaceSpecifier, find_interfaces, get_default_gateway_iface_ip
from multicast_expert.utils import (
IPv4Or6Address,
+ MulticastAddress,
MulticastExpertError,
- get_default_gateway_iface_ip,
- get_interface_ips,
+ ip_interface_to_ip_string,
is_mac,
is_windows,
validate_mcast_ip,
@@ -27,49 +30,71 @@ class McastTxSocket:
def __init__(
self,
addr_family: int,
- mcast_ips: list[str],
- iface_ip: str | None = None,
+ mcast_ips: Sequence[MulticastAddress],
+ iface: IfaceSpecifier | None = None,
ttl: int = 1,
enable_external_loopback: bool = False,
+ iface_ip: IfaceSpecifier | None = None,
):
"""
Create a socket which transmits UDP datagrams over multicast.
The socket must be opened (e.g. using a with statement) before it can be used.
+ It is recommended to manually specify the interface to open the socket on. If no interface is passed, multicast_expert will
+ attempt to guess an interface from your default gateway (aka the interface your PC uses to access the internet).
+ Be careful, this default may not be desired in many cases. See the docs for details.
+
+ ``multicast_expert.scan_interfaces()`` may be used to obtain a list of interfaces on the machine,
+ and ``multicast_expert.find_interfaces()`` may be used to find interfaces matching a given specifier.
+ Passing in an IfaceInfo obtained from one of those functions to this function will make opening multiple
+ sockets more performant as there will be no need to scan interface info from the machine again.
+
+ .. note::
+ If two interfaces on this machine have the same IP address, passing an IP address for the interface
+ argument will result in an exception, because this situation is ambiguous.
+
:param addr_family: Sets IPv4 or IPv6 operation. Either socket.AF_INET or socket.AF_INET6.
:param mcast_ips: List of all possible multicast IPs that this socket can send to.
- :param iface_ip: Interface IP that this socket sends on. If left as None, multicast_expert will
- attempt to guess an interface by using the interface your default gateway is on (aka
- the one your PC uses to access the internet). Be careful, this default may not be desired
- in many cases. See the docs for details.
+ :param iface: Interface that this socket sends on. May be an interface IP (as a string or object),
+ an interface name, or an IfaceInfo object.
:param ttl: Time-to-live parameter to set on the packets sent by this socket, AKA hop limit for ipv6.
This controls the number of routers that the packets may pass through until they are dropped. The
default value of 1 prevents the multicast packets from passing through any routers.
:param enable_external_loopback: Enable receiving multicast packets sent over external interfaces. If and only
if this option is set to True, McastRxSockets will be able to receive packets sent by McastTxSockets open on
the same address, port, and interface.
+ :param iface_ip: Legacy alias for ``iface``. Deprecated.
"""
self.addr_family = addr_family
- self.iface_ip = iface_ip
- self.mcast_ips = mcast_ips
- self.mcast_ips_set = set(mcast_ips) # Used for checking IPs in send()
+ self.mcast_ips = [ipaddress.ip_address(mcast_ip) for mcast_ip in mcast_ips]
+ self.mcast_ips_set = {str(ip) for ip in self.mcast_ips} # Used for checking IPs in send()
self.ttl = ttl
self.is_opened = False
- if self.iface_ip is None:
- self.iface_ip = get_default_gateway_iface_ip(self.addr_family)
+ # Figure out what interface to use
+ if iface is not None and iface_ip is not None:
+ message = "iface and iface_ip may not both be specified!"
+ raise MulticastExpertError(message)
+ if iface is None:
+ iface = iface_ip
- if self.iface_ip is None:
- message = "iface_ip not specified but unable to determine the default gateway on this machine"
+ if iface is None:
+ iface = get_default_gateway_iface_ip(self.addr_family)
+ if iface is None:
+ message = "iface not specified but unable to determine the default gateway on this machine"
raise MulticastExpertError(message)
- # Resolve the interface
- try:
- self.iface_info = os_multicast.get_iface_info(self.iface_ip)
- except KeyError as ex:
- message = f"iface_ip {self.iface_ip} does not seem to correspond to a valid interface. Valid interfaces: {', '.join(get_interface_ips())}"
- raise MulticastExpertError(message) from ex
+ found_interfaces = find_interfaces([iface])
+ if len(found_interfaces) > 1:
+ message = (
+ f"Interface specifier {iface!s} matches multiple interfaces ({found_interfaces[0].machine_name} "
+ f"and {found_interfaces[1].machine_name})! To disambiguate in this situation, you need to pass "
+ f"an IfaceInfo object returned by scan_interfaces() or find_interfaces() instead of the "
+ f"interface address."
+ )
+ raise MulticastExpertError(message)
+ self._iface_info = found_interfaces[0]
# Sanity check multicast addresses
for mcast_ip in self.mcast_ips:
@@ -85,14 +110,22 @@ def __enter__(self) -> Self:
# Open the socket and set options
self.socket = socket.socket(family=self.addr_family, type=socket.SOCK_DGRAM)
- # Note: for Unix IPv6, need to specify the scope ID in the bind address
+ # Bind the socket to the interface address.
+ # If the interface has multiple addresses, just pick one, as I am 99% sure it doesn't matter to multicast which one we use.
+ iface_ips = self._iface_info.ip_addrs(self.addr_family)
+ if len(iface_ips) == 0:
+ message = (
+ "Interface does not have at least one IP address of the selected address family, cannot open socket!"
+ )
+ raise MulticastExpertError(message)
if self.addr_family == socket.AF_INET6 and not is_windows:
- self.socket.bind((self.iface_ip, 0, 0, self.iface_info.iface_idx))
+ # Note: for Unix IPv6, need to specify the scope ID in the bind address
+ self.socket.bind((ip_interface_to_ip_string(iface_ips[0]), 0, 0, self._iface_info.index))
else:
- self.socket.bind((self.iface_ip, 0)) # Bind to any available port
+ self.socket.bind((ip_interface_to_ip_string(iface_ips[0]), 0)) # Bind to any available port
# Use the IP_MULTICAST_IF option to set the interface to use.
- os_multicast.set_multicast_if(self.socket, self.mcast_ips, self.iface_info, self.addr_family)
+ os_multicast.set_multicast_if(self.socket, self.mcast_ips, self._iface_info, self.addr_family)
# Now set the time-to-live (thank goodness, this is the same on all platforms)
if self.addr_family == socket.AF_INET:
@@ -105,9 +138,7 @@ def __enter__(self) -> Self:
if not is_windows:
# Enable loopback if enable_external_loopback is set or if using a loopback interface on Mac.
# Otherwise, disable loopback.
- enable_loopback = self.enable_external_loopback or (
- is_mac and (self.iface_ip in (LOCALHOST_IPV4, LOCALHOST_IPV6))
- )
+ enable_loopback = self.enable_external_loopback or (is_mac and self._iface_info.is_localhost())
if self.addr_family == socket.AF_INET:
self.socket.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, enable_loopback)
@@ -157,3 +188,8 @@ def getsockname(self) -> tuple[str, int]:
:return: Tuple of local IP and local port
"""
return self.socket.getsockname() # type: ignore[no-any-return]
+
+ @property
+ def network_interface(self) -> IfaceInfo:
+ """Get the interface used by this socket."""
+ return self._iface_info
diff --git a/multicast_expert/utils.py b/multicast_expert/utils.py
index 03a466f..d866910 100644
--- a/multicast_expert/utils.py
+++ b/multicast_expert/utils.py
@@ -5,120 +5,60 @@
import socket
from typing import Union
-import netifaces
-
# docsig: disable
is_windows = platform.system() == "Windows"
is_mac = platform.system() == "Darwin"
-# Type which can represent an IPv4 or an IPv6 address.
+# Type which can represent an IPv4 or an IPv6 source/dest address for a TCP/UDP packet.
# For IPv4, address is a tuple of IP address (str) and port number.
# For IPv6, address is a tuple of IP address (str), port number, flow info (int), and scope ID (int).
# NOTE: Since we still support python 3.9, we cannot use new style type annotations here. We have to use the
# old Union annotation which works in 3.9
IPv4Or6Address = Union[tuple[str, int], tuple[str, int, int, int]]
+# Type which represents a multicast address passed to a socket.
+# Accepts a string or object form of an IP address.
+MulticastAddress = Union[str, ipaddress.IPv4Address, ipaddress.IPv6Address]
+
# Exception class for this library
class MulticastExpertError(RuntimeError):
pass
-def get_interface_ips(include_ipv4: bool = True, include_ipv6: bool = True) -> list[str]:
- """
- Use this function to get a list of all the interface IP addresses available on this machine.
-
- Should be useful for generating help menus / info messages / etc.
- This is a thin wrapper around the netifaces library's functionality.
-
- :param include_ipv4: If true, IPv4 addresses will be included in the results
- :param include_ipv6: If true, IPv6 addresses will be included in the results
-
- :return: List of the interface IP of every interface on your machine, as a string.
- """
- ip_list: list[str] = []
- for interface in netifaces.interfaces():
- all_addresses: list[dict[str, str]] = []
- addresses_at_each_level = netifaces.ifaddresses(interface)
-
- if include_ipv4:
- # Note: Check needed because some interfaces do not have an ipv4 or ipv6 address
- if netifaces.AF_INET in addresses_at_each_level:
- all_addresses.extend(addresses_at_each_level[netifaces.AF_INET])
-
- if include_ipv6:
- if netifaces.AF_INET6 in addresses_at_each_level:
- all_addresses.extend(addresses_at_each_level[netifaces.AF_INET6])
-
- ip_list.extend(address_dict["addr"] for address_dict in all_addresses)
-
- return ip_list
-
-
-def get_default_gateway_iface_ip_v6() -> str | None:
- """
- Get the IP address of the interface that connects to the default IPv6 gateway.
-
- If this cannot be determined, None is returned.
- """
- return get_default_gateway_iface_ip(netifaces.AF_INET6)
-
-
-def get_default_gateway_iface_ip_v4() -> str | None:
- """
- Get the IP address of the interface that connects to the default IPv4 gateway.
-
- If this cannot be determined, None is returned.
- """
- return get_default_gateway_iface_ip(netifaces.AF_INET)
-
-
-def get_default_gateway_iface_ip(addr_family: int) -> str | None:
- """
- Get the IP address of the interface that connects to the default gateway of the given addr_family.
-
- If this cannot be determined, None is returned.
- """
- # Enumerate all gateways using netifaces
- try:
- gateways = netifaces.gateways()
- except OSError:
- return None
-
- # If it can, it will identify one of those as the default gateway for traffic.
- # If not, return none.
- if not "default" in gateways:
- return None
- if not addr_family in gateways["default"]:
- return None
-
- default_gateway = gateways["default"][addr_family]
- default_gateway_iface = default_gateway[1] # element 1 is the iface name, per the docs
-
- # Now, use the interface name to get the IP address of that interface
- interface_addresses: dict[int, list[dict[str, str]]] = netifaces.ifaddresses(default_gateway_iface)
- if addr_family not in interface_addresses:
- return None
- return interface_addresses[addr_family][0]["addr"]
-
-
-def validate_mcast_ip(mcast_ip: str, addr_family: int) -> None:
+def validate_mcast_ip(mcast_ip: MulticastAddress, addr_family: int) -> None:
"""
Validate that the given mcast_ip is a valid multicast address in the given addr family (IPv4 or IPv6).
An exception is thrown if validation fails.
"""
- address_obj = ipaddress.ip_address(mcast_ip)
- if not address_obj.is_multicast:
+ ip_as_obj = ipaddress.ip_address(mcast_ip)
+ if not ip_as_obj.is_multicast:
message = f"mcast_ip {mcast_ip} is not a multicast address!"
raise MulticastExpertError(message)
- if address_obj is ipaddress.IPv4Address and addr_family == socket.AF_INET6:
+ if isinstance(ip_as_obj, ipaddress.IPv4Address) and addr_family == socket.AF_INET6:
message = f"mcast_ip {mcast_ip} is IPv4 but this is an AF_INET6 socket!"
raise MulticastExpertError(message)
- if address_obj is ipaddress.IPv6Address and addr_family == socket.AF_INET:
+ if isinstance(ip_as_obj, ipaddress.IPv6Address) and addr_family == socket.AF_INET:
message = f"mcast_ip {mcast_ip} is IPv6 but this is an AF_INET socket!"
raise MulticastExpertError(message)
+
+
+def ip_interface_to_ip_string(ip_interface: ipaddress.IPv4Interface | ipaddress.IPv6Interface) -> str:
+ """
+ Convert IPvxInterface object to string containing the IP address.
+
+ Workaround for https://github.com/python/cpython/issues/88178
+
+ :param ip_interface: IP interface object
+ :return: String
+ """
+ if isinstance(ip_interface, ipaddress.IPv4Interface):
+ return str(ip_interface.ip)
+ else:
+ # Use str() function from superclass, which prints the scope ID but ignores the prefix info
+ return ipaddress.IPv6Address.__str__(ip_interface)
diff --git a/mypy.ini b/mypy.ini
deleted file mode 100644
index 04620cf..0000000
--- a/mypy.ini
+++ /dev/null
@@ -1,6 +0,0 @@
-[mypy]
-strict = True
-
-# Don't enforce types for netifaces which has no type information
-[mypy-netifaces]
-ignore_missing_imports = True
\ No newline at end of file
diff --git a/poetry.lock b/poetry.lock
index 99070c7..ab04997 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -29,18 +29,18 @@ typing-extensions = {version = ">=4.0.0", markers = "python_version < \"3.11\""}
[[package]]
name = "babel"
-version = "2.16.0"
+version = "2.17.0"
description = "Internationalization utilities"
optional = false
python-versions = ">=3.8"
groups = ["dev"]
files = [
- {file = "babel-2.16.0-py3-none-any.whl", hash = "sha256:368b5b98b37c06b7daf6696391c3240c938b37767d4584413e8438c5c435fa8b"},
- {file = "babel-2.16.0.tar.gz", hash = "sha256:d1f3554ca26605fe173f3de0c65f750f5a42f924499bf134de6423582298e316"},
+ {file = "babel-2.17.0-py3-none-any.whl", hash = "sha256:4d0b53093fdfb4b21c92b5213dba5a1b23885afa8383709427046b21c366e5f2"},
+ {file = "babel-2.17.0.tar.gz", hash = "sha256:0c54cffb19f690cdcc52a3b50bcbf71e07a808d1c80d549f2459b9d2cf0afb9d"},
]
[package.extras]
-dev = ["freezegun (>=1.0,<2.0)", "pytest (>=6.0)", "pytest-cov"]
+dev = ["backports.zoneinfo", "freezegun (>=1.0,<2.0)", "jinja2 (>=3.0)", "pytest (>=6.0)", "pytest-cov", "pytz", "setuptools", "tzdata"]
[[package]]
name = "bracex"
@@ -56,14 +56,14 @@ files = [
[[package]]
name = "certifi"
-version = "2024.12.14"
+version = "2025.1.31"
description = "Python package for providing Mozilla's CA Bundle."
optional = false
python-versions = ">=3.6"
groups = ["dev"]
files = [
- {file = "certifi-2024.12.14-py3-none-any.whl", hash = "sha256:1275f7a45be9464efc1173084eaa30f866fe2e47d389406136d332ed4967ec56"},
- {file = "certifi-2024.12.14.tar.gz", hash = "sha256:b650d30f370c2b724812bee08008be0c4163b163ddaec3f2546c1caf65f191db"},
+ {file = "certifi-2025.1.31-py3-none-any.whl", hash = "sha256:ca78db4565a652026a4db2bcdf68f2fb589ea80d0be70e03929ed730746b84fe"},
+ {file = "certifi-2025.1.31.tar.gz", hash = "sha256:3d5da6925056f6f18f119200434a4780a94263f10d1c21d032a6f6b2baa20651"},
]
[[package]]
@@ -183,14 +183,14 @@ files = [
[[package]]
name = "docsig"
-version = "0.67.0"
+version = "0.68.0"
description = "Check signature params for proper documentation"
optional = false
python-versions = "<4.0.0,>=3.8.1"
groups = ["dev"]
files = [
- {file = "docsig-0.67.0-py3-none-any.whl", hash = "sha256:72af6e75c008b882f6a7546a2e97934c1cf843dfb099b943125147995c936460"},
- {file = "docsig-0.67.0.tar.gz", hash = "sha256:127ba9ac16983491010047590aa15d0d72485e84017e1c32fd28dac27ed57278"},
+ {file = "docsig-0.68.0-py3-none-any.whl", hash = "sha256:8f6a2038d83c6afd96c36fb7f222024cdbe67686698b41f8efd5ab848ae98837"},
+ {file = "docsig-0.68.0.tar.gz", hash = "sha256:2ace2276adac132ca06956a35fe468eda21670e50f12c71ead778066d3837c8c"},
]
[package.dependencies]
@@ -383,50 +383,44 @@ files = [
[[package]]
name = "mypy"
-version = "1.14.1"
+version = "1.15.0"
description = "Optional static typing for Python"
optional = false
-python-versions = ">=3.8"
+python-versions = ">=3.9"
groups = ["dev"]
files = [
- {file = "mypy-1.14.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:52686e37cf13d559f668aa398dd7ddf1f92c5d613e4f8cb262be2fb4fedb0fcb"},
- {file = "mypy-1.14.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:1fb545ca340537d4b45d3eecdb3def05e913299ca72c290326be19b3804b39c0"},
- {file = "mypy-1.14.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:90716d8b2d1f4cd503309788e51366f07c56635a3309b0f6a32547eaaa36a64d"},
- {file = "mypy-1.14.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2ae753f5c9fef278bcf12e1a564351764f2a6da579d4a81347e1d5a15819997b"},
- {file = "mypy-1.14.1-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:e0fe0f5feaafcb04505bcf439e991c6d8f1bf8b15f12b05feeed96e9e7bf1427"},
- {file = "mypy-1.14.1-cp310-cp310-win_amd64.whl", hash = "sha256:7d54bd85b925e501c555a3227f3ec0cfc54ee8b6930bd6141ec872d1c572f81f"},
- {file = "mypy-1.14.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:f995e511de847791c3b11ed90084a7a0aafdc074ab88c5a9711622fe4751138c"},
- {file = "mypy-1.14.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:d64169ec3b8461311f8ce2fd2eb5d33e2d0f2c7b49116259c51d0d96edee48d1"},
- {file = "mypy-1.14.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ba24549de7b89b6381b91fbc068d798192b1b5201987070319889e93038967a8"},
- {file = "mypy-1.14.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:183cf0a45457d28ff9d758730cd0210419ac27d4d3f285beda038c9083363b1f"},
- {file = "mypy-1.14.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:f2a0ecc86378f45347f586e4163d1769dd81c5a223d577fe351f26b179e148b1"},
- {file = "mypy-1.14.1-cp311-cp311-win_amd64.whl", hash = "sha256:ad3301ebebec9e8ee7135d8e3109ca76c23752bac1e717bc84cd3836b4bf3eae"},
- {file = "mypy-1.14.1-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:30ff5ef8519bbc2e18b3b54521ec319513a26f1bba19a7582e7b1f58a6e69f14"},
- {file = "mypy-1.14.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:cb9f255c18052343c70234907e2e532bc7e55a62565d64536dbc7706a20b78b9"},
- {file = "mypy-1.14.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:8b4e3413e0bddea671012b063e27591b953d653209e7a4fa5e48759cda77ca11"},
- {file = "mypy-1.14.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:553c293b1fbdebb6c3c4030589dab9fafb6dfa768995a453d8a5d3b23784af2e"},
- {file = "mypy-1.14.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:fad79bfe3b65fe6a1efaed97b445c3d37f7be9fdc348bdb2d7cac75579607c89"},
- {file = "mypy-1.14.1-cp312-cp312-win_amd64.whl", hash = "sha256:8fa2220e54d2946e94ab6dbb3ba0a992795bd68b16dc852db33028df2b00191b"},
- {file = "mypy-1.14.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:92c3ed5afb06c3a8e188cb5da4984cab9ec9a77ba956ee419c68a388b4595255"},
- {file = "mypy-1.14.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:dbec574648b3e25f43d23577309b16534431db4ddc09fda50841f1e34e64ed34"},
- {file = "mypy-1.14.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:8c6d94b16d62eb3e947281aa7347d78236688e21081f11de976376cf010eb31a"},
- {file = "mypy-1.14.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:d4b19b03fdf54f3c5b2fa474c56b4c13c9dbfb9a2db4370ede7ec11a2c5927d9"},
- {file = "mypy-1.14.1-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:0c911fde686394753fff899c409fd4e16e9b294c24bfd5e1ea4675deae1ac6fd"},
- {file = "mypy-1.14.1-cp313-cp313-win_amd64.whl", hash = "sha256:8b21525cb51671219f5307be85f7e646a153e5acc656e5cebf64bfa076c50107"},
- {file = "mypy-1.14.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:7084fb8f1128c76cd9cf68fe5971b37072598e7c31b2f9f95586b65c741a9d31"},
- {file = "mypy-1.14.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:8f845a00b4f420f693f870eaee5f3e2692fa84cc8514496114649cfa8fd5e2c6"},
- {file = "mypy-1.14.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:44bf464499f0e3a2d14d58b54674dee25c031703b2ffc35064bd0df2e0fac319"},
- {file = "mypy-1.14.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c99f27732c0b7dc847adb21c9d47ce57eb48fa33a17bc6d7d5c5e9f9e7ae5bac"},
- {file = "mypy-1.14.1-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:bce23c7377b43602baa0bd22ea3265c49b9ff0b76eb315d6c34721af4cdf1d9b"},
- {file = "mypy-1.14.1-cp38-cp38-win_amd64.whl", hash = "sha256:8edc07eeade7ebc771ff9cf6b211b9a7d93687ff892150cb5692e4f4272b0837"},
- {file = "mypy-1.14.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:3888a1816d69f7ab92092f785a462944b3ca16d7c470d564165fe703b0970c35"},
- {file = "mypy-1.14.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:46c756a444117c43ee984bd055db99e498bc613a70bbbc120272bd13ca579fbc"},
- {file = "mypy-1.14.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:27fc248022907e72abfd8e22ab1f10e903915ff69961174784a3900a8cba9ad9"},
- {file = "mypy-1.14.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:499d6a72fb7e5de92218db961f1a66d5f11783f9ae549d214617edab5d4dbdbb"},
- {file = "mypy-1.14.1-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:57961db9795eb566dc1d1b4e9139ebc4c6b0cb6e7254ecde69d1552bf7613f60"},
- {file = "mypy-1.14.1-cp39-cp39-win_amd64.whl", hash = "sha256:07ba89fdcc9451f2ebb02853deb6aaaa3d2239a236669a63ab3801bbf923ef5c"},
- {file = "mypy-1.14.1-py3-none-any.whl", hash = "sha256:b66a60cc4073aeb8ae00057f9c1f64d49e90f918fbcef9a977eb121da8b8f1d1"},
- {file = "mypy-1.14.1.tar.gz", hash = "sha256:7ec88144fe9b510e8475ec2f5f251992690fcf89ccb4500b214b4226abcd32d6"},
+ {file = "mypy-1.15.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:979e4e1a006511dacf628e36fadfecbcc0160a8af6ca7dad2f5025529e082c13"},
+ {file = "mypy-1.15.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c4bb0e1bd29f7d34efcccd71cf733580191e9a264a2202b0239da95984c5b559"},
+ {file = "mypy-1.15.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:be68172e9fd9ad8fb876c6389f16d1c1b5f100ffa779f77b1fb2176fcc9ab95b"},
+ {file = "mypy-1.15.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c7be1e46525adfa0d97681432ee9fcd61a3964c2446795714699a998d193f1a3"},
+ {file = "mypy-1.15.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:2e2c2e6d3593f6451b18588848e66260ff62ccca522dd231cd4dd59b0160668b"},
+ {file = "mypy-1.15.0-cp310-cp310-win_amd64.whl", hash = "sha256:6983aae8b2f653e098edb77f893f7b6aca69f6cffb19b2cc7443f23cce5f4828"},
+ {file = "mypy-1.15.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:2922d42e16d6de288022e5ca321cd0618b238cfc5570e0263e5ba0a77dbef56f"},
+ {file = "mypy-1.15.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2ee2d57e01a7c35de00f4634ba1bbf015185b219e4dc5909e281016df43f5ee5"},
+ {file = "mypy-1.15.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:973500e0774b85d9689715feeffcc980193086551110fd678ebe1f4342fb7c5e"},
+ {file = "mypy-1.15.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5a95fb17c13e29d2d5195869262f8125dfdb5c134dc8d9a9d0aecf7525b10c2c"},
+ {file = "mypy-1.15.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:1905f494bfd7d85a23a88c5d97840888a7bd516545fc5aaedff0267e0bb54e2f"},
+ {file = "mypy-1.15.0-cp311-cp311-win_amd64.whl", hash = "sha256:c9817fa23833ff189db061e6d2eff49b2f3b6ed9856b4a0a73046e41932d744f"},
+ {file = "mypy-1.15.0-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:aea39e0583d05124836ea645f412e88a5c7d0fd77a6d694b60d9b6b2d9f184fd"},
+ {file = "mypy-1.15.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:2f2147ab812b75e5b5499b01ade1f4a81489a147c01585cda36019102538615f"},
+ {file = "mypy-1.15.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ce436f4c6d218a070048ed6a44c0bbb10cd2cc5e272b29e7845f6a2f57ee4464"},
+ {file = "mypy-1.15.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8023ff13985661b50a5928fc7a5ca15f3d1affb41e5f0a9952cb68ef090b31ee"},
+ {file = "mypy-1.15.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:1124a18bc11a6a62887e3e137f37f53fbae476dc36c185d549d4f837a2a6a14e"},
+ {file = "mypy-1.15.0-cp312-cp312-win_amd64.whl", hash = "sha256:171a9ca9a40cd1843abeca0e405bc1940cd9b305eaeea2dda769ba096932bb22"},
+ {file = "mypy-1.15.0-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:93faf3fdb04768d44bf28693293f3904bbb555d076b781ad2530214ee53e3445"},
+ {file = "mypy-1.15.0-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:811aeccadfb730024c5d3e326b2fbe9249bb7413553f15499a4050f7c30e801d"},
+ {file = "mypy-1.15.0-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:98b7b9b9aedb65fe628c62a6dc57f6d5088ef2dfca37903a7d9ee374d03acca5"},
+ {file = "mypy-1.15.0-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c43a7682e24b4f576d93072216bf56eeff70d9140241f9edec0c104d0c515036"},
+ {file = "mypy-1.15.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:baefc32840a9f00babd83251560e0ae1573e2f9d1b067719479bfb0e987c6357"},
+ {file = "mypy-1.15.0-cp313-cp313-win_amd64.whl", hash = "sha256:b9378e2c00146c44793c98b8d5a61039a048e31f429fb0eb546d93f4b000bedf"},
+ {file = "mypy-1.15.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:e601a7fa172c2131bff456bb3ee08a88360760d0d2f8cbd7a75a65497e2df078"},
+ {file = "mypy-1.15.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:712e962a6357634fef20412699a3655c610110e01cdaa6180acec7fc9f8513ba"},
+ {file = "mypy-1.15.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f95579473af29ab73a10bada2f9722856792a36ec5af5399b653aa28360290a5"},
+ {file = "mypy-1.15.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:8f8722560a14cde92fdb1e31597760dc35f9f5524cce17836c0d22841830fd5b"},
+ {file = "mypy-1.15.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:1fbb8da62dc352133d7d7ca90ed2fb0e9d42bb1a32724c287d3c76c58cbaa9c2"},
+ {file = "mypy-1.15.0-cp39-cp39-win_amd64.whl", hash = "sha256:d10d994b41fb3497719bbf866f227b3489048ea4bbbb5015357db306249f7980"},
+ {file = "mypy-1.15.0-py3-none-any.whl", hash = "sha256:5469affef548bd1895d86d3bf10ce2b44e33d86923c29e4d675b3e323437ea3e"},
+ {file = "mypy-1.15.0.tar.gz", hash = "sha256:404534629d51d3efea5c800ee7c42b72a6554d6c400e6a79eafe15d11341fd43"},
]
[package.dependencies]
@@ -571,6 +565,24 @@ tomli = {version = ">=1", markers = "python_version < \"3.11\""}
[package.extras]
dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"]
+[[package]]
+name = "pytest-mock"
+version = "3.14.0"
+description = "Thin-wrapper around the mock package for easier use with pytest"
+optional = false
+python-versions = ">=3.8"
+groups = ["dev"]
+files = [
+ {file = "pytest-mock-3.14.0.tar.gz", hash = "sha256:2719255a1efeceadbc056d6bf3df3d1c5015530fb40cf347c0f9afac88410bd0"},
+ {file = "pytest_mock-3.14.0-py3-none-any.whl", hash = "sha256:0b72c38033392a5f4621342fe11e9219ac11ec9d375f8e2a0c164539e0d70f6f"},
+]
+
+[package.dependencies]
+pytest = ">=6.2.5"
+
+[package.extras]
+dev = ["pre-commit", "pytest-asyncio", "tox"]
+
[[package]]
name = "pytest-retry"
version = "1.7.0"
@@ -613,30 +625,30 @@ use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
[[package]]
name = "ruff"
-version = "0.9.3"
+version = "0.9.4"
description = "An extremely fast Python linter and code formatter, written in Rust."
optional = false
python-versions = ">=3.7"
groups = ["dev"]
files = [
- {file = "ruff-0.9.3-py3-none-linux_armv6l.whl", hash = "sha256:7f39b879064c7d9670197d91124a75d118d00b0990586549949aae80cdc16624"},
- {file = "ruff-0.9.3-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:a187171e7c09efa4b4cc30ee5d0d55a8d6c5311b3e1b74ac5cb96cc89bafc43c"},
- {file = "ruff-0.9.3-py3-none-macosx_11_0_arm64.whl", hash = "sha256:c59ab92f8e92d6725b7ded9d4a31be3ef42688a115c6d3da9457a5bda140e2b4"},
- {file = "ruff-0.9.3-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2dc153c25e715be41bb228bc651c1e9b1a88d5c6e5ed0194fa0dfea02b026439"},
- {file = "ruff-0.9.3-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:646909a1e25e0dc28fbc529eab8eb7bb583079628e8cbe738192853dbbe43af5"},
- {file = "ruff-0.9.3-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5a5a46e09355695fbdbb30ed9889d6cf1c61b77b700a9fafc21b41f097bfbba4"},
- {file = "ruff-0.9.3-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:c4bb09d2bbb394e3730d0918c00276e79b2de70ec2a5231cd4ebb51a57df9ba1"},
- {file = "ruff-0.9.3-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:96a87ec31dc1044d8c2da2ebbed1c456d9b561e7d087734336518181b26b3aa5"},
- {file = "ruff-0.9.3-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:9bb7554aca6f842645022fe2d301c264e6925baa708b392867b7a62645304df4"},
- {file = "ruff-0.9.3-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cabc332b7075a914ecea912cd1f3d4370489c8018f2c945a30bcc934e3bc06a6"},
- {file = "ruff-0.9.3-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:33866c3cc2a575cbd546f2cd02bdd466fed65118e4365ee538a3deffd6fcb730"},
- {file = "ruff-0.9.3-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:006e5de2621304c8810bcd2ee101587712fa93b4f955ed0985907a36c427e0c2"},
- {file = "ruff-0.9.3-py3-none-musllinux_1_2_i686.whl", hash = "sha256:ba6eea4459dbd6b1be4e6bfc766079fb9b8dd2e5a35aff6baee4d9b1514ea519"},
- {file = "ruff-0.9.3-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:90230a6b8055ad47d3325e9ee8f8a9ae7e273078a66401ac66df68943ced029b"},
- {file = "ruff-0.9.3-py3-none-win32.whl", hash = "sha256:eabe5eb2c19a42f4808c03b82bd313fc84d4e395133fb3fc1b1516170a31213c"},
- {file = "ruff-0.9.3-py3-none-win_amd64.whl", hash = "sha256:040ceb7f20791dfa0e78b4230ee9dce23da3b64dd5848e40e3bf3ab76468dcf4"},
- {file = "ruff-0.9.3-py3-none-win_arm64.whl", hash = "sha256:800d773f6d4d33b0a3c60e2c6ae8f4c202ea2de056365acfa519aa48acf28e0b"},
- {file = "ruff-0.9.3.tar.gz", hash = "sha256:8293f89985a090ebc3ed1064df31f3b4b56320cdfcec8b60d3295bddb955c22a"},
+ {file = "ruff-0.9.4-py3-none-linux_armv6l.whl", hash = "sha256:64e73d25b954f71ff100bb70f39f1ee09e880728efb4250c632ceed4e4cdf706"},
+ {file = "ruff-0.9.4-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:6ce6743ed64d9afab4fafeaea70d3631b4d4b28b592db21a5c2d1f0ef52934bf"},
+ {file = "ruff-0.9.4-py3-none-macosx_11_0_arm64.whl", hash = "sha256:54499fb08408e32b57360f6f9de7157a5fec24ad79cb3f42ef2c3f3f728dfe2b"},
+ {file = "ruff-0.9.4-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:37c892540108314a6f01f105040b5106aeb829fa5fb0561d2dcaf71485021137"},
+ {file = "ruff-0.9.4-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:de9edf2ce4b9ddf43fd93e20ef635a900e25f622f87ed6e3047a664d0e8f810e"},
+ {file = "ruff-0.9.4-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:87c90c32357c74f11deb7fbb065126d91771b207bf9bfaaee01277ca59b574ec"},
+ {file = "ruff-0.9.4-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:56acd6c694da3695a7461cc55775f3a409c3815ac467279dfa126061d84b314b"},
+ {file = "ruff-0.9.4-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e0c93e7d47ed951b9394cf352d6695b31498e68fd5782d6cbc282425655f687a"},
+ {file = "ruff-0.9.4-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:1d4c8772670aecf037d1bf7a07c39106574d143b26cfe5ed1787d2f31e800214"},
+ {file = "ruff-0.9.4-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bfc5f1d7afeda8d5d37660eeca6d389b142d7f2b5a1ab659d9214ebd0e025231"},
+ {file = "ruff-0.9.4-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:faa935fc00ae854d8b638c16a5f1ce881bc3f67446957dd6f2af440a5fc8526b"},
+ {file = "ruff-0.9.4-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:a6c634fc6f5a0ceae1ab3e13c58183978185d131a29c425e4eaa9f40afe1e6d6"},
+ {file = "ruff-0.9.4-py3-none-musllinux_1_2_i686.whl", hash = "sha256:433dedf6ddfdec7f1ac7575ec1eb9844fa60c4c8c2f8887a070672b8d353d34c"},
+ {file = "ruff-0.9.4-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:d612dbd0f3a919a8cc1d12037168bfa536862066808960e0cc901404b77968f0"},
+ {file = "ruff-0.9.4-py3-none-win32.whl", hash = "sha256:db1192ddda2200671f9ef61d9597fcef89d934f5d1705e571a93a67fb13a4402"},
+ {file = "ruff-0.9.4-py3-none-win_amd64.whl", hash = "sha256:05bebf4cdbe3ef75430d26c375773978950bbf4ee3c95ccb5448940dc092408e"},
+ {file = "ruff-0.9.4-py3-none-win_arm64.whl", hash = "sha256:585792f1e81509e38ac5123492f8875fbc36f3ede8185af0a26df348e5154f41"},
+ {file = "ruff-0.9.4.tar.gz", hash = "sha256:6907ee3529244bb0ed066683e075f09285b38dd5b4039370df6ff06041ca19e7"},
]
[[package]]
@@ -899,4 +911,4 @@ type = ["pytest-mypy"]
[metadata]
lock-version = "2.1"
python-versions = "~=3.9"
-content-hash = "76a800d6fdcca6d23312176b536438ab027db6e18143ea6fa1cac6d787208229"
+content-hash = "ba1a1734f3be39ee4342946c7548061e71a248976ec400d6d78cfb458a43b0d3"
diff --git a/pyproject.toml b/pyproject.toml
index 652a3d0..0ab1d20 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -5,7 +5,7 @@ name = "multicast_expert"
# "~=3.9", so Poetry won't solve the versions since it cannot find a version of docsig that works for Python >=4
requires-python = "~=3.9"
-version = "1.4.2"
+version = "1.5.0"
description = "A library to take the fiddly parts out of multicast networking!"
authors = [
{name = "Jamie Smith", email = "jsmith@crackofdawn.onmicrosoft.com"}
@@ -50,6 +50,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry.group.dev.dependencies]
pytest = ">=6.2"
pytest-retry = ">=1.7.0"
+pytest-mock = ">=3.13.0"
mypy = ">=1.8"
ruff = '>=0.9'
docsig = ">=0.67.0"
@@ -126,9 +127,10 @@ ignore = [
'TRY300', # Returns within try blocks
'PLR2004', # Magic value used in comparison
'SIM102', # Allow multiple nested if statements. Sometimes it's nice for readability
- 'TC003', # Don't require TYPE_CHECKING checks in imports. Waaaay too annoying to manage.
+ 'TC001', 'TC003', # Don't require TYPE_CHECKING checks in imports. Waaaay too annoying to manage.
'PERF203', # Allow try-except within loops. Sometimes this is needed!
'ERA001', # Allow commented code. This lint gets a false positive on docsig ignore comments!
+ 'PERF401', # Allow for loops that could be comprehensions. Sometimes this makes code easier to understand.
]
[tool.ruff.lint.flake8-quotes]
@@ -170,4 +172,11 @@ ban-relative-imports = 'all'
# Allow __init__ to not have a return type annotation
[tool.ruff.lint.flake8-annotations]
-mypy-init-return = true
\ No newline at end of file
+mypy-init-return = true
+
+[tool.mypy]
+strict = true
+mypy_path = "$MYPY_CONFIG_FILE_DIR/stubs"
+
+[tool.pyright]
+stubPath = "stubs"
\ No newline at end of file
diff --git a/stubs/netifaces/__init__.pyi b/stubs/netifaces/__init__.pyi
new file mode 100644
index 0000000..b9eef0d
--- /dev/null
+++ b/stubs/netifaces/__init__.pyi
@@ -0,0 +1,16 @@
+from typing import Final, Literal
+
+AF_INET: Final[int]
+AF_INET6: Final[int]
+AF_LINK: Final[int]
+AF_PACKET: Final[int]
+AF_UNSPEC: Final[int]
+
+address_families: Final[dict[int, str]]
+version: Final[str]
+
+def gateways() -> dict[
+ int | Literal["default"], list[tuple[str, str, bool] | tuple[str, str]] | dict[int, tuple[str, str]]
+]: ...
+def ifaddresses(ifname: str, /) -> dict[int, list[dict[str, str]]]: ...
+def interfaces() -> list[str]: ...
diff --git a/tests/test_interfaces.py b/tests/test_interfaces.py
new file mode 100644
index 0000000..fbf8e17
--- /dev/null
+++ b/tests/test_interfaces.py
@@ -0,0 +1,133 @@
+from ipaddress import IPv4Address, IPv4Interface, IPv6Address, IPv6Interface
+
+import multicast_expert
+import netifaces
+import pytest
+import pytest_mock
+from multicast_expert import IfaceInfo
+
+TEST_IFACES = [
+ # eth0 has v4 and v6 addresses
+ IfaceInfo(
+ machine_name="eth0",
+ index=20,
+ link_layer_address="ad:b8:90:13:16:12",
+ ip4_addrs=[IPv4Interface("192.168.1.10/24")],
+ ip6_addrs=[IPv6Interface("fe80::2dc9%20/64")],
+ ),
+ # eth1 has two V4 addresses only (one of which duplicates eth0), and no detectable MAC address
+ IfaceInfo(
+ machine_name="eth1",
+ index=21,
+ link_layer_address=None,
+ ip4_addrs=[IPv4Interface("192.168.1.11/17"), IPv4Interface("192.168.1.10/24")],
+ ip6_addrs=[],
+ ),
+ # wlan0 has no addresses (down)
+ IfaceInfo(machine_name="wlan0", index=22, link_layer_address="ac:b8:91:14:16:12", ip4_addrs=[], ip6_addrs=[]),
+]
+
+
+def test_scan_interfaces(mocker: pytest_mock.MockerFixture):
+ """
+ Test scanning interfaces to create IfaceInfo objects
+ """
+ mocker.patch("netifaces.interfaces").return_value = ["eth0", "eth1", "wlan0"]
+
+ test_if_addrs = {
+ "eth0": {
+ netifaces.AF_LINK: [{"addr": "ad:b8:90:13:16:12"}],
+ netifaces.AF_INET6: [
+ {
+ "addr": "fe80::2dc9%20",
+ "netmask": "ffff:ffff:ffff:ffff::/64",
+ "broadcast": "fe80::ffff:ffff:ffff:ffff%20",
+ }
+ ],
+ netifaces.AF_INET: [{"addr": "192.168.1.10", "netmask": "255.255.255.0", "broadcast": "192.168.1.255"}],
+ },
+ "eth1": {
+ netifaces.AF_INET: [
+ {"addr": "192.168.1.11", "netmask": "255.255.128.0", "broadcast": "192.168.127.255"},
+ {"addr": "192.168.1.10", "netmask": "255.255.255.0", "broadcast": "192.168.1.255"},
+ ],
+ },
+ "wlan0": {netifaces.AF_LINK: [{"addr": "ac:b8:91:14:16:12"}]},
+ }
+ test_if_indexes = {"eth0": 20, "eth1": 21, "wlan0": 22}
+ mocker.patch("netifaces.ifaddresses").side_effect = lambda name: test_if_addrs[name]
+ mocker.patch("multicast_expert.interfaces.iface_name_to_index").side_effect = lambda name: test_if_indexes[name]
+
+ assert multicast_expert.scan_interfaces() == TEST_IFACES
+
+ # Also test calling find_interfaces() without passing an ifaces= argument
+ assert multicast_expert.find_interfaces(["eth0", "eth1", "wlan0"]) == TEST_IFACES
+
+
+def test_find_iface_by_name():
+ """
+ Test that finding an interface by name works
+ """
+
+ assert multicast_expert.find_interfaces(["wlan0"], ifaces=TEST_IFACES) == [TEST_IFACES[2]]
+ assert multicast_expert.find_interfaces(["eth1"], ifaces=TEST_IFACES) == [TEST_IFACES[1]]
+
+ with pytest.raises(
+ multicast_expert.MulticastExpertError, match="does not appear to be a valid interface name or IP"
+ ):
+ multicast_expert.find_interfaces(["blargh"])
+
+
+def test_find_iface_by_ip():
+ """
+ Test that finding an interface by IP address works
+ """
+
+ assert multicast_expert.find_interfaces(["192.168.1.11"], ifaces=TEST_IFACES) == [TEST_IFACES[1]]
+ assert multicast_expert.find_interfaces([IPv4Address("192.168.1.10")], ifaces=TEST_IFACES) == [
+ TEST_IFACES[0],
+ TEST_IFACES[1],
+ ]
+
+ # Test with and without scope ID
+ assert multicast_expert.find_interfaces([IPv6Address("fe80::2dc9%20")], ifaces=TEST_IFACES) == [TEST_IFACES[0]]
+ assert multicast_expert.find_interfaces([IPv6Address("fe80::2dc9")], ifaces=TEST_IFACES) == [TEST_IFACES[0]]
+
+ with pytest.raises(multicast_expert.MulticastExpertError, match="No matches found for interface IP address"):
+ multicast_expert.find_interfaces([IPv4Address("192.168.1.12")])
+
+ with pytest.raises(multicast_expert.MulticastExpertError, match="No matches found for interface IP address"):
+ multicast_expert.find_interfaces(["1234::5678"])
+
+
+def test_find_iface_deduplication():
+ """
+ Test that passing multiple IPs of the same interface only returns that interface once
+ """
+ assert multicast_expert.find_interfaces(["192.168.1.11", "192.168.1.10"], ifaces=TEST_IFACES) == [
+ TEST_IFACES[1],
+ TEST_IFACES[0],
+ ]
+
+
+def test_find_iface_mixed():
+ """
+ Test that we can pass IfaceInfos and IPs at the same time to find_interfaces()
+ """
+ assert multicast_expert.find_interfaces(["192.168.1.11", TEST_IFACES[2]], ifaces=TEST_IFACES) == [
+ TEST_IFACES[1],
+ TEST_IFACES[2],
+ ]
+
+
+def test_find_iface_does_not_scan_if_passed_ifaceinfos(mocker: pytest_mock.MockerFixture):
+ """
+ Test that find_interfaces() does not scan interfaces if all arguments are interface infos
+ """
+
+ ifaces_mock = mocker.patch("netifaces.interfaces")
+ ifaces_mock.return_value = []
+ result = multicast_expert.find_interfaces([TEST_IFACES[1], TEST_IFACES[2]])
+
+ assert not ifaces_mock.called
+ assert result == [TEST_IFACES[1], TEST_IFACES[2]]
diff --git a/tests/test_multicast_expert.py b/tests/test_multicast_sockets.py
similarity index 81%
rename from tests/test_multicast_expert.py
rename to tests/test_multicast_sockets.py
index b9beb3d..6b064e5 100644
--- a/tests/test_multicast_expert.py
+++ b/tests/test_multicast_sockets.py
@@ -1,9 +1,12 @@
import platform
import socket
import warnings
+from ipaddress import IPv4Address, IPv6Address
import multicast_expert
+import netifaces
import pytest
+from multicast_expert import IfaceInfo
# Test constants
mcast_address_v4 = "239.2.2.2"
@@ -16,37 +19,46 @@
@pytest.fixture
-def nonloopback_iface_ipv6() -> str:
+def nonloopback_iface_ipv6() -> multicast_expert.IfaceInfo:
"""Try to obtain a non-loopback IPv6 interface. If the default interface cannot be found, then use an arbitrary interface."""
- nonloopback_iface_ipv6 = multicast_expert.get_default_gateway_iface_ip_v6()
+ nonloopback_iface_ipv6 = multicast_expert.get_default_gateway_iface(netifaces.AF_INET6)
if nonloopback_iface_ipv6 is None:
- for iface_ip in multicast_expert.get_interface_ips(include_ipv4=False, include_ipv6=True):
- if iface_ip != multicast_expert.LOCALHOST_IPV6:
- nonloopback_iface_ipv6 = iface_ip
+ for iface in multicast_expert.scan_interfaces():
+ if not iface.is_localhost() and len(iface.ip6_addrs) > 0:
+ nonloopback_iface_ipv6 = iface
break
if nonloopback_iface_ipv6 is None:
raise RuntimeError("Couldn't find an ipv6 interface to use for the test!")
warnings.warn(
- f"netifaces was not able to determine the default ipv6 gateway on this machine. Using arbitrarily selected interface {nonloopback_iface_ipv6} instead.",
+ f"netifaces was not able to determine the default ipv6 gateway on this machine. Using arbitrarily selected interface {nonloopback_iface_ipv6!s} instead.",
stacklevel=2,
)
return nonloopback_iface_ipv6
-def test_get_ifaces() -> None:
+def test_get_iface_ips() -> None:
"""
- Simple test, just prints the interfaces available on the current machine
+ Simple test, just prints the interface IPs available on the current machine
:return:
"""
- print("\nIPv4 Interfaces: -----------------")
+ print("\nIPv4 Interface IPs: -----------------")
print("\n".join(multicast_expert.get_interface_ips(include_ipv4=True, include_ipv6=False)))
- print("\nIPv6 Interfaces: -----------------")
+ print("\nIPv6 Interface IPs: -----------------")
print("\n".join(multicast_expert.get_interface_ips(include_ipv4=False, include_ipv6=True)))
+def test_scan_ifaces() -> None:
+ """
+ Simple test, just prints the interfaces available on the current machine
+ :return:
+ """
+ print("\nInterfaces: -----------------")
+ print("\n".join(str(iface_info) for iface_info in multicast_expert.scan_interfaces()))
+
+
def test_get_default_gateway() -> None:
"""
Simple test, just prints the default gateway ifaces on the current machine
@@ -65,18 +77,18 @@ def test_tx_v4_can_be_used() -> None:
mcast_sock.sendto(b"Hello IPv4", (mcast_address_v4, port))
-def test_tx_v6_can_be_used(nonloopback_iface_ipv6: str) -> None:
+def test_tx_v6_can_be_used(nonloopback_iface_ipv6: IfaceInfo) -> None:
"""
Sanity check that a Tx IPv6 socket can be opened and used using the default gateway
:return:
"""
with multicast_expert.McastTxSocket(
- socket.AF_INET6, mcast_ips=[mcast_address_v6], iface_ip=nonloopback_iface_ipv6
+ socket.AF_INET6, mcast_ips=[mcast_address_v6], iface=nonloopback_iface_ipv6
) as mcast_sock:
mcast_sock.sendto(b"Hello IPv6", (mcast_address_v6, port))
-def test_non_mcast_raises_error(nonloopback_iface_ipv6: str) -> None:
+def test_non_mcast_raises_error(nonloopback_iface_ipv6: IfaceInfo) -> None:
"""
Check that trying to use a non-multicast address raises an error
"""
@@ -84,7 +96,7 @@ def test_non_mcast_raises_error(nonloopback_iface_ipv6: str) -> None:
multicast_expert.McastTxSocket(socket.AF_INET, mcast_ips=["239.2.2.2", "192.168.5.1"])
with pytest.raises(multicast_expert.MulticastExpertError, match="not a multicast address"):
- multicast_expert.McastTxSocket(socket.AF_INET6, mcast_ips=["abcd::"], iface_ip=nonloopback_iface_ipv6)
+ multicast_expert.McastTxSocket(socket.AF_INET6, mcast_ips=["abcd::"], iface=nonloopback_iface_ipv6)
def test_rx_v4_can_be_opened() -> None:
@@ -105,12 +117,12 @@ def test_rx_v4_ssm_can_be_opened() -> None:
pass
-def test_rx_v6_can_be_opened(nonloopback_iface_ipv6: str) -> None:
+def test_rx_v6_can_be_opened(nonloopback_iface_ipv6: IfaceInfo) -> None:
"""
Sanity check that a Rx IPv6 socket can be opened using the default gateway
"""
with multicast_expert.McastRxSocket(
- socket.AF_INET6, mcast_ips=[mcast_address_v6], port=port, iface_ips=[nonloopback_iface_ipv6]
+ socket.AF_INET6, mcast_ips=[mcast_address_v6], port=port, ifaces=[nonloopback_iface_ipv6]
):
pass
@@ -144,6 +156,35 @@ def test_v4_loopback() -> None:
assert packet[1] == (multicast_expert.LOCALHOST_IPV4, mcast_tx_sock.getsockname()[1])
+def test_v4_loopback_with_ipaddrs() -> None:
+ """
+ Same as above test, but uses IPv4Address objects.
+ """
+
+ with (
+ multicast_expert.McastRxSocket(
+ socket.AF_INET,
+ mcast_ips=[IPv4Address(mcast_address_v4)],
+ port=port,
+ iface=IPv4Address(multicast_expert.LOCALHOST_IPV4),
+ timeout=1.0,
+ ) as mcast_rx_sock,
+ multicast_expert.McastTxSocket(
+ socket.AF_INET,
+ mcast_ips=[IPv4Address(mcast_address_v4)],
+ iface_ip=IPv4Address(multicast_expert.LOCALHOST_IPV4),
+ ) as mcast_tx_sock,
+ ):
+ mcast_tx_sock.sendto(test_string, (mcast_address_v4, port))
+
+ packet = mcast_rx_sock.recvfrom()
+
+ print("\nRx: " + repr(packet))
+ assert packet is not None
+ assert packet[0] == test_string
+ assert packet[1] == (multicast_expert.LOCALHOST_IPV4, mcast_tx_sock.getsockname()[1])
+
+
def test_v4_ssm_loopback() -> None:
"""
Check that a packet can be sent to the loopback address and received using IPv4 source-specific multicast.
@@ -202,6 +243,35 @@ def test_v6_loopback() -> None:
assert packet[1][0:2] == (multicast_expert.LOCALHOST_IPV6, mcast_tx_sock.getsockname()[1])
+def test_v6_loopback_with_ipaddrs() -> None:
+ """
+ Same as above test, but uses IPv6Address objects
+ """
+
+ with (
+ multicast_expert.McastRxSocket(
+ socket.AF_INET6,
+ mcast_ips=[IPv6Address(mcast_address_v6)],
+ port=port,
+ iface=IPv6Address(multicast_expert.LOCALHOST_IPV6),
+ timeout=1.0,
+ ) as mcast_rx_sock,
+ multicast_expert.McastTxSocket(
+ socket.AF_INET6,
+ mcast_ips=[IPv6Address(mcast_address_v6)],
+ iface_ip=IPv6Address(multicast_expert.LOCALHOST_IPV6),
+ ) as mcast_tx_sock,
+ ):
+ mcast_tx_sock.sendto(test_string, (mcast_address_v6, port))
+
+ packet = mcast_rx_sock.recvfrom()
+
+ print("\nRx: " + repr(packet))
+ assert packet is not None
+ assert packet[0] == test_string
+ assert packet[1][0:2] == (multicast_expert.LOCALHOST_IPV6, mcast_tx_sock.getsockname()[1])
+
+
def test_blocking_false() -> None:
"""
Check that the old style ``blocking`` argument still works.
@@ -351,13 +421,12 @@ def test_external_loopback_v4() -> None:
with multicast_expert.McastTxSocket(
socket.AF_INET, mcast_ips=[mcast_address_v4], enable_external_loopback=True
) as tx_socket:
- assert tx_socket.iface_ip is not None
- assert tx_socket.iface_ip != multicast_expert.LOCALHOST_IPV4
+ assert not tx_socket.network_interface.is_localhost()
with multicast_expert.McastRxSocket(
socket.AF_INET,
mcast_ips=[mcast_address_v4],
- iface_ips=[tx_socket.iface_ip],
+ iface=tx_socket.network_interface,
port=port,
timeout=1,
enable_external_loopback=True,
@@ -367,7 +436,7 @@ def test_external_loopback_v4() -> None:
assert data == test_string
-def test_external_loopback_v6(nonloopback_iface_ipv6: str) -> None:
+def test_external_loopback_v6(nonloopback_iface_ipv6: IfaceInfo) -> None:
"""
Check that packets sent over external interface can be received when `enable_external_loopback` is set.
"""
@@ -381,7 +450,7 @@ def test_external_loopback_v6(nonloopback_iface_ipv6: str) -> None:
multicast_expert.McastRxSocket(
socket.AF_INET6,
mcast_ips=[mcast_address_v6],
- iface_ips=[nonloopback_iface_ipv6],
+ ifaces=[nonloopback_iface_ipv6],
port=port,
timeout=1,
enable_external_loopback=True,
@@ -399,13 +468,12 @@ def test_external_loopback_disabled_v4() -> None:
with multicast_expert.McastTxSocket(
socket.AF_INET, mcast_ips=[mcast_address_v4], enable_external_loopback=False
) as tx_socket:
- assert tx_socket.iface_ip is not None
- assert tx_socket.iface_ip != multicast_expert.LOCALHOST_IPV4
+ assert not tx_socket.network_interface.is_localhost()
with multicast_expert.McastRxSocket(
socket.AF_INET,
mcast_ips=[mcast_address_v4],
- iface_ips=[tx_socket.iface_ip],
+ iface=tx_socket.network_interface,
port=port,
timeout=1,
enable_external_loopback=False,
@@ -416,7 +484,7 @@ def test_external_loopback_disabled_v4() -> None:
assert data == None
-def test_external_loopback_disabled_v6(nonloopback_iface_ipv6: str) -> None:
+def test_external_loopback_disabled_v6(nonloopback_iface_ipv6: IfaceInfo) -> None:
"""
Check that packets sent over external interface are not received when `enable_external_loopback` is set to False.
"""
@@ -430,7 +498,7 @@ def test_external_loopback_disabled_v6(nonloopback_iface_ipv6: str) -> None:
multicast_expert.McastRxSocket(
socket.AF_INET6,
mcast_ips=[mcast_address_v6],
- iface_ips=[nonloopback_iface_ipv6],
+ ifaces=[nonloopback_iface_ipv6],
port=port,
timeout=1,
enable_external_loopback=False,