Skip to content

Commit

Permalink
Trace test
Browse files Browse the repository at this point in the history
* tests: on_target: extract values from uart log

Signed-off-by: David Dilner <[email protected]>

* tests: on_target: adding modem trace test

Signed-off-by: David Dilner <[email protected]>

---------

Signed-off-by: David Dilner <[email protected]>
  • Loading branch information
dilner authored Nov 18, 2024
1 parent 4220a68 commit 10ecba9
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 19 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/on_target.yml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ jobs:
env:
SEGGER: ${{ secrets.SEGGER_DUT_1 }}

- name: Run modem trace test
working-directory: thingy91x-oob/tests/on_target
run: |
mkdir -p results
pytest -s -v -m "dut1 and traces" \
--junit-xml=results/test-results-traces-location.xml \
tests
env:
SEGGER: ${{ secrets.SEGGER_DUT_1 }}

- name: Results
if: always()
uses: pmeier/[email protected]
Expand Down
16 changes: 14 additions & 2 deletions tests/on_target/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pytest
import types
from utils.flash_tools import recover_device
from utils.uart import Uart
from utils.uart import Uart, UartBinary
from utils.hellonrfcloud_fota import HelloNrfCloudFOTA
import sys
sys.path.append(os.getcwd())
Expand Down Expand Up @@ -59,7 +59,6 @@ def t91x_board():
if not all_uarts:
pytest.fail("No UARTs found")
log_uart_string = all_uarts[0]

uart = Uart(log_uart_string, timeout=UART_TIMEOUT)
fota = HelloNrfCloudFOTA(device_id=f"oob-{FOTADEVICE_IMEI}", \
fingerprint=FOTADEVICE_FINGERPRINT)
Expand All @@ -86,6 +85,19 @@ def t91x_board():

scan_log_for_assertions(uart_log)

@pytest.fixture(scope="module")
def t91x_traces(t91x_board):
all_uarts = get_uarts()
trace_uart_string = all_uarts[1]
uart_trace = UartBinary(trace_uart_string)

yield types.SimpleNamespace(
trace=uart_trace,
uart=t91x_board.uart
)

uart_trace.stop()

@pytest.fixture(scope="session")
def hex_file():
# Search for the firmware hex file in the artifacts folder
Expand Down
1 change: 1 addition & 0 deletions tests/on_target/tests/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ markers =
fullmfw_fota: fullmfw fota tests
wifi: wifi location tests
gnss: device used for GNSS tests
traces: modem trace tests
13 changes: 5 additions & 8 deletions tests/on_target/tests/test_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import pytest
import time
import os
import re
from utils.flash_tools import flash_device, reset_device
import sys
sys.path.append(os.getcwd())
Expand Down Expand Up @@ -45,13 +44,11 @@ def run_location(t91x_board, hex_file, location_method):
t91x_board.uart.wait_for_str(patterns_location, timeout=180)

# Extract coordinates from UART output
location_pattern = re.compile( \
values = t91x_board.uart.extract_value( \
r"location_event_handler: Got location: lat: ([\d.]+), lon: ([\d.]+), acc: ([\d.]+), method: ([\d.]+)")
match = location_pattern.search(t91x_board.uart.log)
assert match, "Failed to extract coordinates from UART output"
lat, lon, acc, method = match.groups()
assert abs(float(lat) - 61.5) < 2 and abs(float(lon) - 10.5) < 1, \
f"Coordinates ({lat}, {lon}) are not in the same part of the world as the test servers."
assert values
lat, lon, acc, method = values
assert abs(float(lat) - 61.5) < 2 and abs(float(lon) - 10.5) < 1
method = int(method)
expected_method = 4 if location_method == "Wi-Fi" else 1
assert method == expected_method, f"Unexpected location method used {method}, expected: {expected_method}"
assert method == expected_method
71 changes: 71 additions & 0 deletions tests/on_target/tests/test_traces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
##########################################################################################
# Copyright (c) 2024 Nordic Semiconductor
# SPDX-License-Identifier: LicenseRef-Nordic-5-Clause
##########################################################################################

import pytest
import time
import os
from utils.flash_tools import flash_device, reset_device
import sys
sys.path.append(os.getcwd())
from utils.logger import get_logger

logger = get_logger()

trace_collection_time = 60
trace_read_timeout = 60
threshold_lost_traces = 200

@pytest.mark.traces
@pytest.mark.dut1
def test_traces(t91x_traces, hex_file):
flash_device(os.path.abspath(hex_file))
time.sleep(5)
t91x_traces.uart.xfactoryreset()
t91x_traces.uart.flush()
reset_device()

t91x_traces.uart.wait_for_str("nrf_modem_lib_trace: Trace thread ready", timeout=60)
t91x_traces.uart.write("modem_trace size\r\n")
t91x_traces.uart.wait_for_str("Modem trace data size:", timeout=5)
modem_trace_size_1 = t91x_traces.uart.extract_value(r"Modem trace data size: (\d+) bytes")
assert modem_trace_size_1

logger.info("Collecting modem traces for 60 seconds")
time.sleep(trace_collection_time)
t91x_traces.uart.write("modem_trace stop\r\n")
log_len = t91x_traces.uart.wait_for_str("Modem trace stop command issued. This may produce a few more traces. Please wait at least 1 second before attempting to read out trace data.", timeout=10)
time.sleep(1)
t91x_traces.uart.write("modem_trace size\r\n")
t91x_traces.uart.wait_for_str("Modem trace data size:", timeout=5, start_pos=log_len)
modem_trace_size_2 = t91x_traces.uart.extract_value(r"Modem trace data size: (\d+) bytes", start_pos=log_len)
assert modem_trace_size_2
assert int(modem_trace_size_2[0]) > int(modem_trace_size_1[0])

logger.info("Dumping modem traces to uart1")
uart1_before_dump = t91x_traces.trace.get_size()
t91x_traces.uart.write("modem_trace dump_uart\r\n")
t91x_traces.uart.wait_for_str_ordered(["Reading out", "bytes of trace data"], timeout=5)
modem_trace_size_to_read = t91x_traces.uart.extract_value(r"Reading out (\d+) bytes of trace data")
assert modem_trace_size_to_read
assert int(modem_trace_size_to_read[0]) == int(modem_trace_size_2[0])
start_t = time.time()
while True:
uart1_size = t91x_traces.trace.get_size()
logger.debug(f"Uart1 trace size {uart1_size} bytes after {time.time()-start_t} seconds")
if uart1_size - uart1_before_dump == int(modem_trace_size_to_read[0]):
break
if start_t + trace_read_timeout < time.time():
if abs(uart1_size - uart1_before_dump - int(modem_trace_size_to_read[0])) < threshold_lost_traces:
logger.warning(f"Uart1 trace size {uart1_size}, expected {int(modem_trace_size_to_read[0]) + uart1_before_dump}\n \
uart1 trace size before dump: {uart1_before_dump}")
break
raise AssertionError(f"Timeout waiting for uart1 trace size to match the expected size \n \
Expected size: {int(modem_trace_size_to_read[0]) + uart1_before_dump} bytes \n \
Actual size: {uart1_size} bytes \n \
uart1 trace size before dump: {uart1_before_dump} bytes")
time.sleep(1)
modem_trace_size_read = t91x_traces.uart.extract_value(r"Total trace bytes read from flash: (\d+)")
assert modem_trace_size_read
assert int(modem_trace_size_read[0]) == int(modem_trace_size_to_read[0])
42 changes: 42 additions & 0 deletions tests/on_target/utils/test_uart.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,45 @@ def test_wait_ordered_7_none(time_sleep, time_time):
with pytest.raises(AssertionError) as ex_info:
u.wait_for_str_ordered(["abc", "def", "ghi", "jkl"], timeout=2)
assert "abc missing" in str(ex_info.value)

@patch("time.time", side_effect=counter())
@patch("time.sleep")
def test_wait_8_get_current_size(time_sleep, time_time):
u = mocked_uart()
u.log = "foo123\nbar123\nbaz123\n"
current_log_size = u.wait_for_str(["bar"], timeout=3)
assert current_log_size == len(u.log)

@patch("time.time", side_effect=counter())
@patch("time.sleep")
def test_wait_9_start_position(time_sleep, time_time):
u = mocked_uart()
u.log = "foo123\nbar123\nbaz123\n"
bar_pos = u.log.find("bar")
with pytest.raises(AssertionError) as ex_info:
u.wait_for_str(["baz", "foo", "bar"], timeout=3, start_pos=bar_pos)
assert "foo" in str(ex_info.value)
u.wait_for_str(["bar", "baz"], timeout=3, start_pos=bar_pos)

@patch("time.time", side_effect=counter())
@patch("time.sleep")
def test_wait_10_extract_one_value(time_sleep, time_time):
u = mocked_uart()
u.log = "foo: 123.45\n bar: 23.45 \n baz: 0.1234\n"
assert float(u.extract_value(r"bar: (\d.+)")[0]) == 23.45

@patch("time.time", side_effect=counter())
@patch("time.sleep")
def test_wait_11_extract_three_values(time_sleep, time_time):
u = mocked_uart()
u.log = "foo: 123.45 bar: 23.45 baz: 0.1234"
extrated_values = u.extract_value(r"foo: (\d.+) bar: (\d.+) baz: (\d.+)")
assert [float(x) for x in extrated_values] == [123.45, 23.45, 0.1234]

@patch("time.time", side_effect=counter())
@patch("time.sleep")
def test_wait_12_extract_missing_values(time_sleep, time_time):
u = mocked_uart()
u.log = "foo: 123.45 baz: 23.45 bar: 0.1234"
extrated_values = u.extract_value(r"foo: (\d.+) foo: (\d.+) foo: (\d.+)")
assert extrated_values is None
33 changes: 24 additions & 9 deletions tests/on_target/utils/uart.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import queue
import os
import sys
import re
sys.path.append(os.getcwd())
from utils.logger import get_logger
from typing import Union
Expand Down Expand Up @@ -172,12 +173,15 @@ def start(self, timeout: int = DEFAULT_UART_TIMEOUT) -> None:
self._selfdestruct = threading.Timer(timeout , self.selfdestruct)
self._selfdestruct.start()

def get_size(self) -> int:
# Return the current size of the log
return len(self.log)

def wait_for_str_ordered(
self, msgs: list, error_msg: str = "", timeout: int = DEFAULT_WAIT_FOR_STR_TIMEOUT
) -> None:
start = time.time()
start_t = time.time()
while True:
time.sleep(1)
missing = None
pos = 0
for msg in msgs:
Expand All @@ -189,26 +193,34 @@ def wait_for_str_ordered(
pos += 1
else:
break
if start + timeout < time.time():
if start_t + timeout < time.time():
raise AssertionError(
f"{missing if missing else msgs} missing in UART log in the expected order. {error_msg}\nUart log:\n{self.log}"
)
if self._evt.is_set():
raise RuntimeError(f"Uart thread stopped, log:\n{self.log}")
time.sleep(1)

def wait_for_str(self, msgs: Union[str, list], error_msg: str = "", timeout: int = DEFAULT_WAIT_FOR_STR_TIMEOUT) -> None:
start = time.time()
def wait_for_str(self, msgs: Union[str, list], error_msg: str = "", timeout: int = DEFAULT_WAIT_FOR_STR_TIMEOUT, start_pos: int = 0) -> None:
start_t = time.time()
msgs = msgs if isinstance(msgs, (list, tuple)) else [msgs]

while True:
time.sleep(1)
missing_msgs = [x for x in msgs if x not in self.log]
missing_msgs = [x for x in msgs if x not in self.log[start_pos:]]
if missing_msgs == []:
break
if start + timeout < time.time():
return self.get_size()
if start_t + timeout < time.time():
raise AssertionError(f"{missing_msgs} missing in UART log. {error_msg}\nUart log:\n{self.log}")
if self._evt.is_set():
raise RuntimeError(f"Uart thread stopped, log:\n{self.log}")
time.sleep(1)

def extract_value(self, pattern: str, start_pos: int = 0):
pattern = re.compile(pattern)
match = pattern.search(self.log[start_pos:])
if match:
return match.groups()
return None

class UartBinary(Uart):
def __init__(
Expand Down Expand Up @@ -263,3 +275,6 @@ def save_to_file(self, filename: str) -> None:
return
with open(filename, "wb" ) as f:
f.write(self.data)

def get_size(self) -> int:
return len(self.data)

0 comments on commit 10ecba9

Please sign in to comment.