From 10ecba93c91bbf69fa37fda6022bb6b0e129cf22 Mon Sep 17 00:00:00 2001 From: dilner <74708120+dilner@users.noreply.github.com> Date: Mon, 18 Nov 2024 10:30:43 +0100 Subject: [PATCH] Trace test * tests: on_target: extract values from uart log Signed-off-by: David Dilner * tests: on_target: adding modem trace test Signed-off-by: David Dilner --------- Signed-off-by: David Dilner --- .github/workflows/on_target.yml | 10 ++++ tests/on_target/tests/conftest.py | 16 +++++- tests/on_target/tests/pytest.ini | 1 + tests/on_target/tests/test_location.py | 13 ++--- tests/on_target/tests/test_traces.py | 71 ++++++++++++++++++++++++++ tests/on_target/utils/test_uart.py | 42 +++++++++++++++ tests/on_target/utils/uart.py | 33 ++++++++---- 7 files changed, 167 insertions(+), 19 deletions(-) create mode 100644 tests/on_target/tests/test_traces.py diff --git a/.github/workflows/on_target.yml b/.github/workflows/on_target.yml index 9b9389de..3ccdd844 100644 --- a/.github/workflows/on_target.yml +++ b/.github/workflows/on_target.yml @@ -192,6 +192,16 @@ jobs: env: SEGGER: ${{ secrets.SEGGER_DUT_1 }} + - name: Run modem trace test + working-directory: thingy91x-oob/tests/on_target + run: | + mkdir -p results + pytest -s -v -m "dut1 and traces" \ + --junit-xml=results/test-results-traces-location.xml \ + tests + env: + SEGGER: ${{ secrets.SEGGER_DUT_1 }} + - name: Results if: always() uses: pmeier/pytest-results-action@v0.7.1 diff --git a/tests/on_target/tests/conftest.py b/tests/on_target/tests/conftest.py index fd848038..ef1db340 100644 --- a/tests/on_target/tests/conftest.py +++ b/tests/on_target/tests/conftest.py @@ -8,7 +8,7 @@ import pytest import types from utils.flash_tools import recover_device -from utils.uart import Uart +from utils.uart import Uart, UartBinary from utils.hellonrfcloud_fota import HelloNrfCloudFOTA import sys sys.path.append(os.getcwd()) @@ -59,7 +59,6 @@ def t91x_board(): if not all_uarts: pytest.fail("No UARTs found") log_uart_string = all_uarts[0] - uart = Uart(log_uart_string, timeout=UART_TIMEOUT) fota = HelloNrfCloudFOTA(device_id=f"oob-{FOTADEVICE_IMEI}", \ fingerprint=FOTADEVICE_FINGERPRINT) @@ -86,6 +85,19 @@ def t91x_board(): scan_log_for_assertions(uart_log) +@pytest.fixture(scope="module") +def t91x_traces(t91x_board): + all_uarts = get_uarts() + trace_uart_string = all_uarts[1] + uart_trace = UartBinary(trace_uart_string) + + yield types.SimpleNamespace( + trace=uart_trace, + uart=t91x_board.uart + ) + + uart_trace.stop() + @pytest.fixture(scope="session") def hex_file(): # Search for the firmware hex file in the artifacts folder diff --git a/tests/on_target/tests/pytest.ini b/tests/on_target/tests/pytest.ini index 9b3e6386..8ecdf2af 100644 --- a/tests/on_target/tests/pytest.ini +++ b/tests/on_target/tests/pytest.ini @@ -9,3 +9,4 @@ markers = fullmfw_fota: fullmfw fota tests wifi: wifi location tests gnss: device used for GNSS tests + traces: modem trace tests diff --git a/tests/on_target/tests/test_location.py b/tests/on_target/tests/test_location.py index e2228dea..82d563b5 100644 --- a/tests/on_target/tests/test_location.py +++ b/tests/on_target/tests/test_location.py @@ -6,7 +6,6 @@ import pytest import time import os -import re from utils.flash_tools import flash_device, reset_device import sys sys.path.append(os.getcwd()) @@ -45,13 +44,11 @@ def run_location(t91x_board, hex_file, location_method): t91x_board.uart.wait_for_str(patterns_location, timeout=180) # Extract coordinates from UART output - location_pattern = re.compile( \ + values = t91x_board.uart.extract_value( \ r"location_event_handler: Got location: lat: ([\d.]+), lon: ([\d.]+), acc: ([\d.]+), method: ([\d.]+)") - match = location_pattern.search(t91x_board.uart.log) - assert match, "Failed to extract coordinates from UART output" - lat, lon, acc, method = match.groups() - assert abs(float(lat) - 61.5) < 2 and abs(float(lon) - 10.5) < 1, \ - f"Coordinates ({lat}, {lon}) are not in the same part of the world as the test servers." + assert values + lat, lon, acc, method = values + assert abs(float(lat) - 61.5) < 2 and abs(float(lon) - 10.5) < 1 method = int(method) expected_method = 4 if location_method == "Wi-Fi" else 1 - assert method == expected_method, f"Unexpected location method used {method}, expected: {expected_method}" + assert method == expected_method diff --git a/tests/on_target/tests/test_traces.py b/tests/on_target/tests/test_traces.py new file mode 100644 index 00000000..3fcf61a3 --- /dev/null +++ b/tests/on_target/tests/test_traces.py @@ -0,0 +1,71 @@ +########################################################################################## +# Copyright (c) 2024 Nordic Semiconductor +# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause +########################################################################################## + +import pytest +import time +import os +from utils.flash_tools import flash_device, reset_device +import sys +sys.path.append(os.getcwd()) +from utils.logger import get_logger + +logger = get_logger() + +trace_collection_time = 60 +trace_read_timeout = 60 +threshold_lost_traces = 200 + +@pytest.mark.traces +@pytest.mark.dut1 +def test_traces(t91x_traces, hex_file): + flash_device(os.path.abspath(hex_file)) + time.sleep(5) + t91x_traces.uart.xfactoryreset() + t91x_traces.uart.flush() + reset_device() + + t91x_traces.uart.wait_for_str("nrf_modem_lib_trace: Trace thread ready", timeout=60) + t91x_traces.uart.write("modem_trace size\r\n") + t91x_traces.uart.wait_for_str("Modem trace data size:", timeout=5) + modem_trace_size_1 = t91x_traces.uart.extract_value(r"Modem trace data size: (\d+) bytes") + assert modem_trace_size_1 + + logger.info("Collecting modem traces for 60 seconds") + time.sleep(trace_collection_time) + t91x_traces.uart.write("modem_trace stop\r\n") + log_len = t91x_traces.uart.wait_for_str("Modem trace stop command issued. This may produce a few more traces. Please wait at least 1 second before attempting to read out trace data.", timeout=10) + time.sleep(1) + t91x_traces.uart.write("modem_trace size\r\n") + t91x_traces.uart.wait_for_str("Modem trace data size:", timeout=5, start_pos=log_len) + modem_trace_size_2 = t91x_traces.uart.extract_value(r"Modem trace data size: (\d+) bytes", start_pos=log_len) + assert modem_trace_size_2 + assert int(modem_trace_size_2[0]) > int(modem_trace_size_1[0]) + + logger.info("Dumping modem traces to uart1") + uart1_before_dump = t91x_traces.trace.get_size() + t91x_traces.uart.write("modem_trace dump_uart\r\n") + t91x_traces.uart.wait_for_str_ordered(["Reading out", "bytes of trace data"], timeout=5) + modem_trace_size_to_read = t91x_traces.uart.extract_value(r"Reading out (\d+) bytes of trace data") + assert modem_trace_size_to_read + assert int(modem_trace_size_to_read[0]) == int(modem_trace_size_2[0]) + start_t = time.time() + while True: + uart1_size = t91x_traces.trace.get_size() + logger.debug(f"Uart1 trace size {uart1_size} bytes after {time.time()-start_t} seconds") + if uart1_size - uart1_before_dump == int(modem_trace_size_to_read[0]): + break + if start_t + trace_read_timeout < time.time(): + if abs(uart1_size - uart1_before_dump - int(modem_trace_size_to_read[0])) < threshold_lost_traces: + logger.warning(f"Uart1 trace size {uart1_size}, expected {int(modem_trace_size_to_read[0]) + uart1_before_dump}\n \ + uart1 trace size before dump: {uart1_before_dump}") + break + raise AssertionError(f"Timeout waiting for uart1 trace size to match the expected size \n \ + Expected size: {int(modem_trace_size_to_read[0]) + uart1_before_dump} bytes \n \ + Actual size: {uart1_size} bytes \n \ + uart1 trace size before dump: {uart1_before_dump} bytes") + time.sleep(1) + modem_trace_size_read = t91x_traces.uart.extract_value(r"Total trace bytes read from flash: (\d+)") + assert modem_trace_size_read + assert int(modem_trace_size_read[0]) == int(modem_trace_size_to_read[0]) diff --git a/tests/on_target/utils/test_uart.py b/tests/on_target/utils/test_uart.py index 1d7d9ef3..5cadb9bb 100644 --- a/tests/on_target/utils/test_uart.py +++ b/tests/on_target/utils/test_uart.py @@ -105,3 +105,45 @@ def test_wait_ordered_7_none(time_sleep, time_time): with pytest.raises(AssertionError) as ex_info: u.wait_for_str_ordered(["abc", "def", "ghi", "jkl"], timeout=2) assert "abc missing" in str(ex_info.value) + +@patch("time.time", side_effect=counter()) +@patch("time.sleep") +def test_wait_8_get_current_size(time_sleep, time_time): + u = mocked_uart() + u.log = "foo123\nbar123\nbaz123\n" + current_log_size = u.wait_for_str(["bar"], timeout=3) + assert current_log_size == len(u.log) + +@patch("time.time", side_effect=counter()) +@patch("time.sleep") +def test_wait_9_start_position(time_sleep, time_time): + u = mocked_uart() + u.log = "foo123\nbar123\nbaz123\n" + bar_pos = u.log.find("bar") + with pytest.raises(AssertionError) as ex_info: + u.wait_for_str(["baz", "foo", "bar"], timeout=3, start_pos=bar_pos) + assert "foo" in str(ex_info.value) + u.wait_for_str(["bar", "baz"], timeout=3, start_pos=bar_pos) + +@patch("time.time", side_effect=counter()) +@patch("time.sleep") +def test_wait_10_extract_one_value(time_sleep, time_time): + u = mocked_uart() + u.log = "foo: 123.45\n bar: 23.45 \n baz: 0.1234\n" + assert float(u.extract_value(r"bar: (\d.+)")[0]) == 23.45 + +@patch("time.time", side_effect=counter()) +@patch("time.sleep") +def test_wait_11_extract_three_values(time_sleep, time_time): + u = mocked_uart() + u.log = "foo: 123.45 bar: 23.45 baz: 0.1234" + extrated_values = u.extract_value(r"foo: (\d.+) bar: (\d.+) baz: (\d.+)") + assert [float(x) for x in extrated_values] == [123.45, 23.45, 0.1234] + +@patch("time.time", side_effect=counter()) +@patch("time.sleep") +def test_wait_12_extract_missing_values(time_sleep, time_time): + u = mocked_uart() + u.log = "foo: 123.45 baz: 23.45 bar: 0.1234" + extrated_values = u.extract_value(r"foo: (\d.+) foo: (\d.+) foo: (\d.+)") + assert extrated_values is None diff --git a/tests/on_target/utils/uart.py b/tests/on_target/utils/uart.py index f1d874d3..df9b14ed 100644 --- a/tests/on_target/utils/uart.py +++ b/tests/on_target/utils/uart.py @@ -9,6 +9,7 @@ import queue import os import sys +import re sys.path.append(os.getcwd()) from utils.logger import get_logger from typing import Union @@ -172,12 +173,15 @@ def start(self, timeout: int = DEFAULT_UART_TIMEOUT) -> None: self._selfdestruct = threading.Timer(timeout , self.selfdestruct) self._selfdestruct.start() + def get_size(self) -> int: + # Return the current size of the log + return len(self.log) + def wait_for_str_ordered( self, msgs: list, error_msg: str = "", timeout: int = DEFAULT_WAIT_FOR_STR_TIMEOUT ) -> None: - start = time.time() + start_t = time.time() while True: - time.sleep(1) missing = None pos = 0 for msg in msgs: @@ -189,26 +193,34 @@ def wait_for_str_ordered( pos += 1 else: break - if start + timeout < time.time(): + if start_t + timeout < time.time(): raise AssertionError( f"{missing if missing else msgs} missing in UART log in the expected order. {error_msg}\nUart log:\n{self.log}" ) if self._evt.is_set(): raise RuntimeError(f"Uart thread stopped, log:\n{self.log}") + time.sleep(1) - def wait_for_str(self, msgs: Union[str, list], error_msg: str = "", timeout: int = DEFAULT_WAIT_FOR_STR_TIMEOUT) -> None: - start = time.time() + def wait_for_str(self, msgs: Union[str, list], error_msg: str = "", timeout: int = DEFAULT_WAIT_FOR_STR_TIMEOUT, start_pos: int = 0) -> None: + start_t = time.time() msgs = msgs if isinstance(msgs, (list, tuple)) else [msgs] while True: - time.sleep(1) - missing_msgs = [x for x in msgs if x not in self.log] + missing_msgs = [x for x in msgs if x not in self.log[start_pos:]] if missing_msgs == []: - break - if start + timeout < time.time(): + return self.get_size() + if start_t + timeout < time.time(): raise AssertionError(f"{missing_msgs} missing in UART log. {error_msg}\nUart log:\n{self.log}") if self._evt.is_set(): raise RuntimeError(f"Uart thread stopped, log:\n{self.log}") + time.sleep(1) + + def extract_value(self, pattern: str, start_pos: int = 0): + pattern = re.compile(pattern) + match = pattern.search(self.log[start_pos:]) + if match: + return match.groups() + return None class UartBinary(Uart): def __init__( @@ -263,3 +275,6 @@ def save_to_file(self, filename: str) -> None: return with open(filename, "wb" ) as f: f.write(self.data) + + def get_size(self) -> int: + return len(self.data)