-
Notifications
You must be signed in to change notification settings - Fork 190
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #189 from polybassa/master
add CAN support for Automotive Penetration Testing
- Loading branch information
Showing
2 changed files
with
224 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
#! /usr/bin/env python | ||
|
||
## This file is part of Scapy | ||
## See http://www.secdev.org/projects/scapy for more informations | ||
## Copyright (C) Nils Weiss <[email protected]> | ||
## This program is published under a GPLv2 license | ||
|
||
""" | ||
CANSocket. | ||
""" | ||
from scapy.packet import * | ||
from scapy.fields import * | ||
from scapy.supersocket import SuperSocket | ||
from scapy.sendrecv import sndrcv | ||
from scapy.arch.linux import get_last_packet_timestamp | ||
|
||
############ | ||
## Consts ## | ||
############ | ||
CAN_FRAME_SIZE = 16 | ||
LINKTYPE_CAN_SOCKETCAN = 227 # From pcap spec | ||
|
||
|
||
class CAN(Packet): | ||
name = 'SocketCAN' | ||
fields_desc = [ | ||
XIntField('id', 0), | ||
PadField(FieldLenField('dlc', None, length_of='data', fmt='B'), 4), | ||
PadField(StrLenField('data', '', length_from=lambda pkt: min(pkt.dlc, 8)), 8) | ||
] | ||
|
||
def extract_padding(self, p): | ||
return '', p | ||
|
||
|
||
class CANSocket(SuperSocket): | ||
desc = "read/write packets at a given CAN interface using PF_CAN sockets" | ||
can_frame_fmt = "<IB3x8s" | ||
|
||
def __init__(self, iface=None, receive_own_messages=False, filter=None, nofilter=0): | ||
if iface is None: | ||
iface = conf.CANiface | ||
self.ins = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) | ||
|
||
try: | ||
self.ins.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_RECV_OWN_MSGS, | ||
struct.pack('i', receive_own_messages)) | ||
except Exception as e: | ||
Scapy_Exception("Could not receive own messages (%s)", e) | ||
|
||
if filter is None or nofilter == 0: | ||
filter = [{ | ||
'can_id': 0, | ||
'can_mask': 0 | ||
}] | ||
|
||
can_filter_fmt = "={}I".format(2 * len(filter)) | ||
filter_data = [] | ||
for can_filter in filter: | ||
filter_data.append(can_filter['can_id']) | ||
filter_data.append(can_filter['can_mask']) | ||
|
||
self.ins.setsockopt(socket.SOL_CAN_RAW, | ||
socket.CAN_RAW_FILTER, | ||
struct.pack(can_filter_fmt, *filter_data) | ||
) | ||
|
||
self.ins.bind((iface,)) | ||
self.outs = self.ins | ||
|
||
def recv(self, x=CAN_FRAME_SIZE): | ||
# Fetching the Arb ID, DLC and Data | ||
try: | ||
pkt, sa_ll = self.ins.recvfrom(x) | ||
except BlockingIOError: | ||
warning('Captured no data, socket in non-blocking mode.') | ||
return None | ||
except socket.timeout: | ||
warning('Captured no data, socket read timed out.') | ||
return None | ||
except OSError: | ||
# something bad happened (e.g. the interface went down) | ||
warning("Captured no data.") | ||
return None | ||
|
||
can_id, can_dlc, data = struct.unpack(self.can_frame_fmt, pkt) | ||
|
||
q = CAN(id=can_id, dlc=can_dlc, data=data[:can_dlc]) | ||
q.time = get_last_packet_timestamp(self.ins) | ||
return q | ||
|
||
def send(self, x): | ||
can_dlc = len(x.data) | ||
data = x.data.ljust(8, b'\x00') | ||
sx = struct.pack(self.can_frame_fmt, x.id, can_dlc, data) | ||
if hasattr(x, "sent_time"): | ||
x.sent_time = time.time() | ||
return self.outs.send(sx) | ||
|
||
def sr(self, *args, **kargs): | ||
return sndrcv(self, *args, **kargs) | ||
|
||
def sr1(self, *args, **kargs): | ||
a, b = sndrcv(self, *args, **kargs) | ||
if len(a) > 0: | ||
return a[0][1] | ||
else: | ||
return None | ||
|
||
def sniff(self, *args, **kargs): | ||
return sniff(opened_socket=self, *args, **kargs) | ||
|
||
|
||
@conf.commands.register | ||
def srcan(pkt, iface=None, receive_own_messages=False, filter=None, nofilter=0, *args, **kargs): | ||
if not "timeout" in kargs: | ||
kargs["timeout"] = -1 | ||
s = conf.CANSocket(iface, receive_own_messages, filter, nofilter) | ||
a, b = sndrcv(s, pkt, *args, **kargs) | ||
s.close() | ||
return a, b | ||
|
||
|
||
@conf.commands.register | ||
def srcanloop(pkts, *args, **kargs): | ||
"""Send a packet at can layer in loop and print the answer each time | ||
srloop(pkts, [prn], [inter], [count], ...) --> None""" | ||
return scapy.sendrecv.__sr_loop(srcan, pkts, *args, **kargs) | ||
|
||
|
||
conf.l2types.register(LINKTYPE_CAN_SOCKETCAN, CAN) | ||
conf.CANiface = "can0" | ||
conf.CANSocket = CANSocket |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
# CAN unit tests | ||
# | ||
# Type the following command to launch start the tests: | ||
# $ sudo bash test/run_tests -t test/can.uts -F | ||
|
||
% CAN unit tests | ||
|
||
+ Configuration of scapy3 | ||
|
||
= Load CAN_addon | ||
~ conf command | ||
from scapy.layers.can import CAN, CANSocket, srcan | ||
|
||
= Setup string for vcan | ||
~ conf command | ||
bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'" | ||
|
||
= Load os | ||
~ conf command | ||
import os | ||
import threading | ||
from time import sleep | ||
|
||
= Setup vcan0 | ||
~ conf command | ||
0 == os.system(bashCommand) | ||
|
||
+ Basic Packet Tests() | ||
= CAN Packet init | ||
|
||
canframe = CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') | ||
bytes(canframe) == b'\x00\x00\x07\xff\x08\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08' | ||
|
||
= DLC greater than 8 | ||
canframe = CAN(id=0x7ff,dlc=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') | ||
print(len(canframe.data)) | ||
canframe.dlc = len(canframe.data) | ||
bytes(canframe) == b'\x00\x00\x07\xff\x08\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08' | ||
|
||
+ Basic Socket Tests() | ||
= CAN Socket Init | ||
|
||
sock1 = CANSocket(iface="vcan0") | ||
|
||
= CAN Socket send recv | ||
|
||
def sender(): | ||
sleep(0.1) | ||
sock2 = CANSocket(iface="vcan0") | ||
sock2.send(CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) | ||
|
||
thread = threading.Thread(target=sender) | ||
thread.start() | ||
|
||
rx = sock1.recv() | ||
rx == CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') | ||
|
||
= CAN Socket sr1 | ||
|
||
tx = CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') | ||
tx.sent_time == 0 | ||
thread = threading.Thread(target=sender) | ||
thread.start() | ||
rx = None | ||
rx = sock1.sr1(tx) | ||
tx == rx | ||
tx.sent_time > rx.time | ||
rx.time > 0 | ||
|
||
= srcan | ||
|
||
tx = CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') | ||
tx.sent_time == 0 | ||
thread = threading.Thread(target=sender) | ||
thread.start() | ||
rx = None | ||
rx = srcan(tx, "vcan0", timeout=1) | ||
rx = rx[0][0][1] | ||
tx == rx | ||
tx.sent_time > rx.time | ||
rx.time > 0 | ||
|
||
|
||
+ PCAP CAN Tests() | ||
= Write pcap file | ||
|
||
rx = CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') | ||
wrpcap('/tmp/scapyPcapTest.pcap', rx, append=False) | ||
readPack = rdpcap('/tmp/scapyPcapTest.pcap', 1) | ||
rx == readPack[0] | ||
|