Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix inconsistency of target.ips output #870

Merged
merged 20 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions dissect/target/plugins/general/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,22 @@ def interfaces(self) -> Iterator[InterfaceRecord]:
@export
def ips(self) -> list[IPAddress]:
"""Return IP addresses as list of :class:`IPAddress`."""
return list(self._get_record_type("ip"))
return list(set(self._get_record_type("ip")))

@export
def gateways(self) -> list[IPAddress]:
"""Return gateways as list of :class:`IPAddress`."""
return list(self._get_record_type("gateway"))
return list(set(self._get_record_type("gateway")))

@export
def macs(self) -> list[str]:
"""Return MAC addresses as list of :class:`str`."""
return list(self._get_record_type("mac"))
return list(set(self._get_record_type("mac")))

@export
def dns(self) -> list[str]:
def dns(self) -> list[str | IPAddress]:
"""Return DNS addresses as list of :class:`str`."""
return list(self._get_record_type("dns"))
return list(set(self._get_record_type("dns")))

@internal
def with_ip(self, ip_addr: str) -> Iterator[InterfaceRecord]:
Expand Down
5 changes: 1 addition & 4 deletions dissect/target/plugins/os/unix/bsd/osx/_os.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,7 @@ def hostname(self) -> Optional[str]:

@export(property=True)
def ips(self) -> Optional[list[str]]:
ips = set()
for ip in self.target.network.ips():
ips.add(str(ip))
return list(ips)
return list(set(map(str, self.target.network.ips())))

@export(property=True)
def version(self) -> Optional[str]:
Expand Down
10 changes: 4 additions & 6 deletions dissect/target/plugins/os/unix/linux/_os.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,15 @@ def detect(cls, target: Target) -> Filesystem | None:
@export(property=True)
def ips(self) -> list[str]:
"""Returns a list of static IP addresses and DHCP lease IP addresses found on the host system."""
ips = []
ips = set()

for ip_set in self.network_manager.get_config_value("ips"):
for ip in ip_set:
ips.append(ip)
ips.update(ip_set)

for ip in parse_unix_dhcp_log_messages(self.target, iter_all=False):
if ip not in ips:
ips.append(ip)
ips.add(ip)

return ips
return list(ips)

@export(property=True)
def dns(self) -> list[str]:
Expand Down
2 changes: 1 addition & 1 deletion dissect/target/plugins/os/windows/_os.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def hostname(self) -> str | None:

@export(property=True)
def ips(self) -> list[str]:
return self.target.network.ips()
return list(set(map(str, self.target.network.ips())))

def _get_version_reg_value(self, value_name: str) -> Any:
try:
Expand Down
188 changes: 102 additions & 86 deletions dissect/target/plugins/os/windows/network.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from enum import IntEnum
from functools import lru_cache
from typing import Iterator

from dissect.util.ts import wintimestamp
Expand All @@ -12,6 +13,7 @@
from dissect.target.helpers.record import WindowsInterfaceRecord
from dissect.target.helpers.regutil import RegistryKey
from dissect.target.plugins.general.network import NetworkPlugin
from dissect.target.target import Target


class IfTypes(IntEnum):
Expand Down Expand Up @@ -222,15 +224,32 @@
return None


def _get_config_value(key: RegistryKey, name: str) -> set:
value = _try_value(key, name)
if not value or value in ("", "0.0.0.0", None, [], ["0.0.0.0"]):
return set()

if isinstance(value, list):
return set(value)

return {value}


class WindowsNetworkPlugin(NetworkPlugin):
"""Windows network interface plugin."""

def __init__(self, target: Target):
super().__init__(target)
self._extract_network_device_config = lru_cache(128)(self._extract_network_device_config)

def _interfaces(self) -> Iterator[WindowsInterfaceRecord]:
"""Yields found Windows interfaces used by :meth:`NetworkPlugin.interfaces() <dissect.target.plugins.general.network.NetworkPlugin.interfaces>`.""" # noqa: E501

# Get all the network interfaces
for keys in self.target.registry.keys(
for key in self.target.registry.keys(
"HKLM\\SYSTEM\\CurrentControlSet\\Control\\Class\\{4d36e972-e325-11ce-bfc1-08002be10318}"
):
for subkey in keys.subkeys():
for subkey in key.subkeys():
device_info = {}

if (net_cfg_instance_id := _try_value(subkey, "NetCfgInstanceId")) is None:
Expand All @@ -239,24 +258,26 @@

# Extract the network device configuration for given interface id
config = self._extract_network_device_config(net_cfg_instance_id)
if config is None or all(not conf for conf in config):
# if no configuration is found or all configurations are empty, skip this network interface
continue

# Extract the network device name for given interface id
name_key = self.target.registry.key(
f"HKLM\\SYSTEM\\CurrentControlSet\\Control\\Network\\"
f"{{4D36E972-E325-11CE-BFC1-08002BE10318}}\\{net_cfg_instance_id}\\Connection"
)
if value_name := _try_value(name_key, "Name"):
device_info["name"] = value_name

# Extract the metric value from the REGISTRY_KEY_INTERFACE key
interface_key = self.target.registry.key(
f"HKLM\\SYSTEM\\CurrentControlSet\\Services\\Tcpip\\Parameters\\Interfaces\\{net_cfg_instance_id}"
)
if value_metric := _try_value(interface_key, "InterfaceMetric"):
device_info["metric"] = value_metric
# Extract a network device name for given interface id
try:
name_key = self.target.registry.key(
f"HKLM\\SYSTEM\\CurrentControlSet\\Control\\Network\\{{4D36E972-E325-11CE-BFC1-08002BE10318}}\\{net_cfg_instance_id}\\Connection" # noqa: E501
)
if value_name := _try_value(name_key, "Name"):
device_info["name"] = value_name
except RegistryKeyNotFoundError:
pass

# Extract the metric value from the interface registry key
try:
interface_key = self.target.registry.key(
f"HKLM\\SYSTEM\\CurrentControlSet\\Services\\Tcpip\\Parameters\\Interfaces\\{net_cfg_instance_id}" # noqa: E501
)
if value_metric := _try_value(interface_key, "InterfaceMetric"):
device_info["metric"] = value_metric
except RegistryKeyNotFoundError:
pass

Check warning on line 280 in dissect/target/plugins/os/windows/network.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/plugins/os/windows/network.py#L279-L280

Added lines #L279 - L280 were not covered by tests

# Extract the rest of the device information
device_info["mac"] = _try_value(subkey, "NetworkAddress")
Expand All @@ -270,96 +291,91 @@

# Yield a record for each non-empty configuration
for conf in config:
if conf:
# Create a copy of device_info to avoid overwriting
record_info = device_info.copy()
record_info.update(conf)
yield WindowsInterfaceRecord(
**record_info,
source=f"HKLM\\SYSTEM\\{subkey.path}",
_target=self.target,
)
# If no configuration is found or all configurations are empty,
# skip this network interface.
if not conf or not any(
[
conf["dns"],
conf["ip"],
conf["gateway"],
conf["subnetmask"],
conf["search_domain"],
]
):
continue

# Create a copy of device_info to avoid overwriting
record_info = device_info.copy()
record_info.update(conf)
yield WindowsInterfaceRecord(
**record_info,
source=f"HKLM\\SYSTEM\\{subkey.path}",
_target=self.target,
)

def _extract_network_device_config(
self, interface_id: str
) -> list[dict[str, str | list], dict[str, str | list]] | None:
dhcp_config = {}
static_config = {}
"""Extract network device configuration from the given interface_id for all ControlSets on the system."""

dhcp_config = {
"gateway": set(),
"ip": set(),
"dns": set(),
"subnetmask": set(),
"search_domain": set(),
"network": set(),
}

static_config = {
"ip": set(),
"dns": set(),
"subnetmask": set(),
"search_domain": set(),
"gateway": set(),
"network": set(),
}

# Get the registry keys for the given interface id
try:
keys = self.target.registry.key(
f"HKLM\\SYSTEM\\CurrentControlSet\\Services\\Tcpip\\Parameters\\Interfaces\\{interface_id}"
keys = list(
self.target.registry.keys(
f"HKLM\\SYSTEM\\CurrentControlSet\\Services\\Tcpip\\Parameters\\Interfaces\\{interface_id}"
)
)
except RegistryKeyNotFoundError:
return None

if not len(keys):
return None

# Extract DHCP configuration from the registry
dhcp_gateway = _try_value(keys, "DhcpDefaultGateway")
if dhcp_gateway not in ["", "0.0.0.0", None, []]:
dhcp_config["gateway"] = dhcp_gateway

dhcp_ip = _try_value(keys, "DhcpIPAddress")
if dhcp_ip not in ["", "0.0.0.0", None]:
dhcp_config["ip"] = [dhcp_ip]

dhcp_dns = _try_value(keys, "DhcpNameServer")
if dhcp_dns not in ["", "0.0.0.0", None]:
dhcp_config["dns"] = dhcp_dns.split(" ")

dhcp_subnetmask = _try_value(keys, "DhcpSubnetMask")
if dhcp_subnetmask not in ["", "0.0.0.0", None]:
dhcp_config["subnetmask"] = [dhcp_subnetmask]

dhcp_domain = _try_value(keys, "DhcpDomain")
if dhcp_domain not in ["", None]:
dhcp_config["search_domain"] = [dhcp_domain]
for key in keys:
# Extract DHCP configuration from the registry
dhcp_config["gateway"].update(_get_config_value(key, "DhcpDefaultGateway"))
dhcp_config["ip"].update(_get_config_value(key, "DhcpIPAddress"))
dhcp_config["subnetmask"].update(_get_config_value(key, "DhcpSubnetMask"))
dhcp_config["search_domain"].update(_get_config_value(key, "DhcpDomain"))
dhcp_config["dns"].update(_get_config_value(key, "DhcpNameServer"))

# Extract static configuration from the registry
static_config["gateway"].update(_get_config_value(key, "DefaultGateway"))
static_config["dns"].update(_get_config_value(key, "NameServer"))
static_config["search_domain"].update(_get_config_value(key, "Domain"))
static_config["ip"].update(_get_config_value(key, "IPAddress"))
static_config["subnetmask"].update(_get_config_value(key, "SubnetMask"))

if len(dhcp_config) > 0:
dhcp_enable = _try_value(keys, "EnableDHCP")
dhcp_config["enabled"] = dhcp_enable == 1
dhcp_config["enabled"] = _try_value(key, "EnableDHCP") == 1
dhcp_config["dhcp"] = True

# Extract static configuration from the registry
static_gateway = _try_value(keys, "DefaultGateway")
if static_gateway not in ["", None, []]:
static_config["gateway"] = static_gateway

static_ip = _try_value(keys, "IPAddress")
if static_ip not in ["", "0.0.0.0", ["0.0.0.0"], None, []]:
static_config["ip"] = static_ip if isinstance(static_ip, list) else [static_ip]

static_dns = _try_value(keys, "NameServer")
if static_dns not in ["", "0.0.0.0", None]:
static_config["dns"] = static_dns.split(",")

static_subnetmask = _try_value(keys, "SubnetMask")
if static_subnetmask not in ["", "0.0.0.0", ["0.0.0.0"], None, []]:
static_config["subnetmask"] = (
static_subnetmask if isinstance(static_subnetmask, list) else [static_subnetmask]
)

static_domain = _try_value(keys, "Domain")
if static_domain not in ["", None]:
static_config["search_domain"] = [static_domain]

if len(static_config) > 0:
static_config["enabled"] = None
static_config["dhcp"] = False

# Combine ip and subnetmask for extraction
combined_configs = [
(dhcp_config, dhcp_config.get("ip", []), dhcp_config.get("subnetmask", [])),
(static_config, static_config.get("ip", []), static_config.get("subnetmask", [])),
]

# Iterate over combined ip/subnet lists
for config, ips, subnet_masks in combined_configs:
for network_address in self.calculate_network(ips, subnet_masks):
config.setdefault("network", []).append(network_address)
for config in (dhcp_config, static_config):
if (ips := config.get("ip")) and (masks := config.get("subnetmask")):
config["network"].update(set(self.calculate_network(ips, masks)))

# Return both configurations
return [dhcp_config, static_config]
2 changes: 1 addition & 1 deletion dissect/target/tools/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
continue

if isinstance(value, list):
value = ", ".join(value)
value = ", ".join(map(str, value))

Check warning on line 140 in dissect/target/tools/info.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/tools/info.py#L140

Added line #L140 was not covered by tests

if isinstance(value, datetime):
value = value.isoformat(timespec="microseconds")
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies = [
"dissect.regf>=3.3,<4",
"dissect.util>=3,<4",
"dissect.volume>=2,<4",
"flow.record~=3.16.0",
"flow.record~=3.17.0",
"structlog",
]
dynamic = ["version"]
Expand Down
17 changes: 13 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,23 @@ def hive_hklm() -> Iterator[VirtualHive]:
hive = VirtualHive()

# set current control set to ControlSet001 and mock it
controlset_key = "SYSTEM\\ControlSet001"
change_controlset(hive, 1)

yield hive


def change_controlset(hive: VirtualHive, num: int) -> None:
"""Update the current control set of the given HKLM hive."""

if not isinstance(num, int) or num > 999 or num < 1:
raise ValueError("ControlSet integer must be between 1 and 999")

controlset_key = f"SYSTEM\\ControlSet{num:>03}"
hive.map_key(controlset_key, VirtualKey(hive, controlset_key))

select_key = "SYSTEM\\Select"
hive.map_key(select_key, VirtualKey(hive, select_key))
hive.map_value(select_key, "Current", VirtualValue(hive, "Current", 1))

yield hive
hive.map_value(select_key, "Current", VirtualValue(hive, "Current", num))


@pytest.fixture
Expand Down
2 changes: 1 addition & 1 deletion tests/plugins/general/test_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_base_network_plugin(target_bare: Target, network_record: InterfaceRecor
assert network.ips() == ["10.42.42.10"]
assert network.gateways() == ["10.42.42.1"]
assert network.macs() == ["DE:AD:BE:EF:00:00"]
assert network.dns() == ["8.8.8.8", "1.1.1.1"]
assert sorted(list(map(str, network.dns()))) == ["1.1.1.1", "8.8.8.8"]

assert len(list(network.in_cidr("10.42.42.0/24"))) == 1
assert len(list(network.in_cidr("10.43.42.0/24"))) == 0
Expand Down
Loading
Loading