-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathprobes.go
204 lines (167 loc) · 4.48 KB
/
probes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package main
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"log"
"net"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/btf"
"github.com/cilium/ebpf/ringbuf"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
)
type TcProbe struct {
reader *ringbuf.Reader
bpf *TCFilterObjects
probes []*Probe
output *Output
}
type Probe struct {
ifIndex int
ifName string
bpf *TCFilterObjects
ingressFileter *netlink.BpfFilter
egressFileter *netlink.BpfFilter
}
func NewTcProbe(neti *NetInterface, o *Output) *TcProbe {
p := &TcProbe{}
p.output = o
p.probes = make([]*Probe, 0)
for i, n := range neti.interfaces {
p.probes = append(p.probes, &Probe{ifIndex: i, ifName: n.name})
}
return p
}
//show filter
//tc filter show dev eth0 ingress(egress)
//customize deleteed TC filter
//tc filter del dev eth0 ingress(egress)
func (p *TcProbe) Start(f *Flags) {
var btfSpec *btf.Spec
var err error
if f.KernelBTF != "" {
btfSpec, err = btf.LoadSpec(f.KernelBTF)
} else {
btfSpec, err = btf.LoadKernelSpec()
}
if err != nil {
log.Fatalf("Failed to load BTF spec: %s", err)
}
var opts ebpf.CollectionOptions
opts.Programs.KernelTypes = btfSpec
objs := TCFilterObjects{}
var bpfSpec *ebpf.CollectionSpec
bpfSpec, err = LoadTCFilter()
if err != nil {
log.Fatalf("loading objects: %v", err)
}
fcg := GetConfig(f)
if err := bpfSpec.RewriteConstants(map[string]interface{}{
"FCG": fcg,
}); err != nil {
log.Fatalf("Failed to rewrite filter config: %v", err)
}
if err := bpfSpec.LoadAndAssign(&objs, &opts); err != nil {
log.Fatalf("Failed to load bpf objects: %v", err)
}
p.bpf = &objs
for _, tcPro := range p.probes {
if f.FilterInterface != "" && tcPro.ifName != f.FilterInterface {
continue
}
link, err := netlink.LinkByIndex(tcPro.ifIndex)
if err != nil {
log.Fatalf("create net link failed: %v", err)
}
sec := "classifier/ingress"
inf, err := attachTC(link, objs.IngressClsFunc, sec, netlink.HANDLE_MIN_INGRESS)
if err != nil {
log.Fatalf("attach tc ingress failed, %v", err)
}
tcPro.ingressFileter = inf
sec = "classifier/egress"
ef, err := attachTC(link, objs.EgressClsFunc, sec, netlink.HANDLE_MIN_EGRESS)
if err != nil {
log.Fatalf("attach tc egress failed, %v", err)
}
tcPro.egressFileter = ef
log.Printf("create probe on interface %d - %s \n", tcPro.ifIndex, tcPro.ifName)
}
rd, err := ringbuf.NewReader(objs.TcCaptureEvents)
if err != nil {
log.Fatalf("opening ringbuf reader: %s", err)
}
p.reader = rd
log.Println("Waiting for events..")
p.output.PrintHeader()
go func() {
// bpfEvent is generated by bpf2go.
var event TCFilterNetPacketEvent
for {
record, err := rd.Read()
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
log.Println("Received signal, exiting..")
return
}
log.Printf("reading from reader: %s", err)
continue
}
// Parse the ringbuf event entry into a bpfEvent structure.
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
log.Printf("parsing ringbuf event: %s", err)
continue
}
p.output.Print(event)
}
}()
}
func (p *TcProbe) Stop() {
p.bpf.Close()
p.reader.Close()
for _, tcPro := range p.probes {
netlink.FilterDel(tcPro.ingressFileter)
netlink.FilterDel(tcPro.egressFileter)
}
}
func replaceQdisc(link netlink.Link) error {
attrs := netlink.QdiscAttrs{
LinkIndex: link.Attrs().Index,
Handle: netlink.MakeHandle(0xffff, 0),
Parent: netlink.HANDLE_CLSACT,
}
qdisc := &netlink.GenericQdisc{
QdiscAttrs: attrs,
QdiscType: "clsact",
}
return netlink.QdiscReplace(qdisc)
}
func attachTC(link netlink.Link, prog *ebpf.Program, progName string, qdiscParent uint32) (*netlink.BpfFilter, error) {
if err := replaceQdisc(link); err != nil {
return nil, fmt.Errorf("replacing clsact qdisc for interface %s: %w", link.Attrs().Name, err)
}
filter := &netlink.BpfFilter{
FilterAttrs: netlink.FilterAttrs{
LinkIndex: link.Attrs().Index,
Parent: qdiscParent,
Handle: 1,
Protocol: unix.ETH_P_ALL,
Priority: 1,
},
Fd: prog.FD(),
Name: fmt.Sprintf("%s-%s", progName, link.Attrs().Name),
DirectAction: true,
}
if err := netlink.FilterReplace(filter); err != nil {
return nil, fmt.Errorf("replacing tc filter: %w", err)
}
return filter, nil
}
// intToIP converts IPv4 number to net.IP
func intToIP(ipNum uint32) net.IP {
ip := make(net.IP, 4)
binary.LittleEndian.PutUint32(ip, ipNum)
return ip
}