diff --git a/tests/safety_replay/.gitignore b/tests/safety_replay/.gitignore deleted file mode 100644 index 192fb0945e..0000000000 --- a/tests/safety_replay/.gitignore +++ /dev/null @@ -1 +0,0 @@ -*.bz2 diff --git a/tests/safety_replay/__init__.py b/tests/safety_replay/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tests/safety_replay/helpers.py b/tests/safety_replay/helpers.py deleted file mode 100644 index e8dac6888d..0000000000 --- a/tests/safety_replay/helpers.py +++ /dev/null @@ -1,85 +0,0 @@ -from opendbc.car.toyota.values import ToyotaSafetyFlags -from opendbc.car.structs import CarParams -from opendbc.safety.tests.libsafety import libsafety_py - -def to_signed(d, bits): - ret = d - if d >= (1 << (bits - 1)): - ret = d - (1 << bits) - return ret - -def is_steering_msg(mode, param, addr): - ret = False - if mode in (CarParams.SafetyModel.hondaNidec, CarParams.SafetyModel.hondaBosch): - ret = (addr == 0xE4) or (addr == 0x194) or (addr == 0x33D) or (addr == 0x33DA) or (addr == 0x33DB) - elif mode == CarParams.SafetyModel.toyota: - ret = addr == (0x191 if param & ToyotaSafetyFlags.LTA else 0x2E4) - elif mode == CarParams.SafetyModel.gm: - ret = addr == 384 - elif mode == CarParams.SafetyModel.hyundai: - ret = addr == 832 - elif mode == CarParams.SafetyModel.chrysler: - ret = addr == 0x292 - elif mode == CarParams.SafetyModel.subaru: - ret = addr == 0x122 - elif mode == CarParams.SafetyModel.ford: - ret = addr == 0x3d3 - elif mode == CarParams.SafetyModel.nissan: - ret = addr == 0x169 - elif mode == CarParams.SafetyModel.rivian: - ret = addr == 0x120 - return ret - -def get_steer_value(mode, param, to_send): - torque, angle = 0, 0 - if mode in (CarParams.SafetyModel.hondaNidec, CarParams.SafetyModel.hondaBosch): - torque = (to_send.data[0] << 8) | to_send.data[1] - torque = to_signed(torque, 16) - elif mode == CarParams.SafetyModel.toyota: - if param & ToyotaSafetyFlags.LTA: - angle = (to_send.data[1] << 8) | to_send.data[2] - angle = to_signed(angle, 16) - else: - torque = (to_send.data[1] << 8) | (to_send.data[2]) - torque = to_signed(torque, 16) - elif mode == CarParams.SafetyModel.gm: - torque = ((to_send.data[0] & 0x7) << 8) | to_send.data[1] - torque = to_signed(torque, 11) - elif mode == CarParams.SafetyModel.hyundai: - torque = (((to_send.data[3] & 0x7) << 8) | to_send.data[2]) - 1024 - elif mode == CarParams.SafetyModel.chrysler: - torque = (((to_send.data[0] & 0x7) << 8) | to_send.data[1]) - 1024 - elif mode == CarParams.SafetyModel.subaru: - torque = ((to_send.data[3] & 0x1F) << 8) | to_send.data[2] - torque = -to_signed(torque, 13) - elif mode == CarParams.SafetyModel.ford: - angle = ((to_send.data[0] << 3) | (to_send.data[1] >> 5)) - 1000 - elif mode == CarParams.SafetyModel.nissan: - angle = (to_send.data[0] << 10) | (to_send.data[1] << 2) | (to_send.data[2] >> 6) - angle = -angle + (1310 * 100) - elif mode == CarParams.SafetyModel.rivian: - torque = ((to_send.data[2] << 3) | (to_send.data[3] >> 5)) - 1024 - return torque, angle - -def package_can_msg(msg): - return libsafety_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) - -def init_segment(safety, lr, mode, param): - sendcan = (msg for msg in lr if msg.which() == 'sendcan') - steering_msgs = (can for msg in sendcan for can in msg.sendcan if is_steering_msg(mode, param, can.address)) - - msg = next(steering_msgs, None) - if msg is None: - # no steering msgs - return - - to_send = package_can_msg(msg) - torque, angle = get_steer_value(mode, param, to_send) - if torque != 0: - safety.set_controls_allowed(1) - safety.set_desired_torque_last(torque) - elif angle != 0: - safety.set_controls_allowed(1) - safety.set_desired_angle_last(angle) - safety.set_angle_meas(angle, angle) - assert safety.safety_tx_hook(to_send), "failed to initialize panda safety for segment" diff --git a/tests/safety_replay/replay_drive.py b/tests/safety_replay/replay_drive.py deleted file mode 100755 index db20b67e7f..0000000000 --- a/tests/safety_replay/replay_drive.py +++ /dev/null @@ -1,97 +0,0 @@ -#!/usr/bin/env python3 -import argparse -import os -from collections import Counter - -from opendbc.safety.tests.libsafety import libsafety_py -from panda.tests.safety_replay.helpers import package_can_msg, init_segment - -# replay a drive to check for safety violations -def replay_drive(lr, safety_mode, param, alternative_experience, segment=False): - safety = libsafety_py.libsafety - - err = safety.set_safety_hooks(safety_mode, param) - assert err == 0, "invalid safety mode: %d" % safety_mode - safety.set_alternative_experience(alternative_experience) - - if segment: - init_segment(safety, lr, safety_mode, param) - lr.reset() - - rx_tot, rx_invalid, tx_tot, tx_blocked, tx_controls, tx_controls_blocked = 0, 0, 0, 0, 0, 0 - safety_tick_rx_invalid = False - blocked_addrs = Counter() - invalid_addrs = set() - - can_msgs = [m for m in lr if m.which() in ('can', 'sendcan')] - start_t = can_msgs[0].logMonoTime - end_t = can_msgs[-1].logMonoTime - for msg in can_msgs: - safety.set_timer((msg.logMonoTime // 1000) % 0xFFFFFFFF) - - # skip start and end of route, warm up/down period - if msg.logMonoTime - start_t > 1e9 and end_t - msg.logMonoTime > 1e9: - safety.safety_tick_current_safety_config() - safety_tick_rx_invalid |= not safety.safety_config_valid() or safety_tick_rx_invalid - - if msg.which() == 'sendcan': - for canmsg in msg.sendcan: - to_send = package_can_msg(canmsg) - sent = safety.safety_tx_hook(to_send) - if not sent: - tx_blocked += 1 - tx_controls_blocked += safety.get_controls_allowed() - blocked_addrs[canmsg.address] += 1 - - if "DEBUG" in os.environ: - print("blocked bus %d msg %d at %f" % (canmsg.src, canmsg.address, (msg.logMonoTime - start_t) / 1e9)) - tx_controls += safety.get_controls_allowed() - tx_tot += 1 - elif msg.which() == 'can': - # ignore msgs we sent - for canmsg in filter(lambda m: m.src < 128, msg.can): - to_push = package_can_msg(canmsg) - recv = safety.safety_rx_hook(to_push) - if not recv: - rx_invalid += 1 - invalid_addrs.add(canmsg.address) - rx_tot += 1 - - print("\nRX") - print("total rx msgs:", rx_tot) - print("invalid rx msgs:", rx_invalid) - print("safety tick rx invalid:", safety_tick_rx_invalid) - print("invalid addrs:", invalid_addrs) - print("\nTX") - print("total openpilot msgs:", tx_tot) - print("total msgs with controls allowed:", tx_controls) - print("blocked msgs:", tx_blocked) - print("blocked with controls allowed:", tx_controls_blocked) - print("blocked addrs:", blocked_addrs) - - return tx_controls_blocked == 0 and rx_invalid == 0 and not safety_tick_rx_invalid - -if __name__ == "__main__": - from openpilot.tools.lib.logreader import LogReader - - parser = argparse.ArgumentParser(description="Replay CAN messages from a route or segment through a safety mode", - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument("route_or_segment_name", nargs='+') - parser.add_argument("--mode", type=int, help="Override the safety mode from the log") - parser.add_argument("--param", type=int, help="Override the safety param from the log") - parser.add_argument("--alternative-experience", type=int, help="Override the alternative experience from the log") - args = parser.parse_args() - - lr = LogReader(args.route_or_segment_name[0], sort_by_time=True) - - if None in (args.mode, args.param, args.alternative_experience): - CP = lr.first('carParams') - if args.mode is None: - args.mode = CP.safetyConfigs[-1].safetyModel.raw - if args.param is None: - args.param = CP.safetyConfigs[-1].safetyParam - if args.alternative_experience is None: - args.alternative_experience = CP.alternativeExperience - - print(f"replaying {args.route_or_segment_name[0]} with safety mode {args.mode}, param {args.param}, alternative experience {args.alternative_experience}") - replay_drive(lr, args.mode, args.param, args.alternative_experience, segment=len(lr.logreader_identifiers) == 1)