Skip to content

Commit

Permalink
Support antctl command for packetcapture
Browse files Browse the repository at this point in the history
Signed-off-by: Hang Yan <[email protected]>
  • Loading branch information
hangyan committed Feb 6, 2025
1 parent 4911802 commit b27b72b
Show file tree
Hide file tree
Showing 5 changed files with 701 additions and 2 deletions.
6 changes: 6 additions & 0 deletions pkg/antctl/antctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
checkinstallation "antrea.io/antrea/pkg/antctl/raw/check/installation"
"antrea.io/antrea/pkg/antctl/raw/featuregates"
"antrea.io/antrea/pkg/antctl/raw/multicluster"
"antrea.io/antrea/pkg/antctl/raw/packetcapture"
"antrea.io/antrea/pkg/antctl/raw/proxy"
"antrea.io/antrea/pkg/antctl/raw/set"
"antrea.io/antrea/pkg/antctl/raw/supportbundle"
Expand Down Expand Up @@ -750,6 +751,11 @@ $ antctl get podmulticaststats pod -n namespace`,
supportAgent: true,
supportController: true,
},
{
cobraCommand: packetcapture.Command,
supportAgent: true,
supportController: true,
},
{
cobraCommand: proxy.Command,
supportAgent: false,
Expand Down
4 changes: 2 additions & 2 deletions pkg/antctl/command_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ func TestGetDebugCommands(t *testing.T) {
{
name: "Antctl running against controller mode",
mode: "controller",
expected: [][]string{{"version"}, {"get", "networkpolicy"}, {"get", "appliedtogroup"}, {"get", "addressgroup"}, {"get", "controllerinfo"}, {"supportbundle"}, {"traceflow"}, {"get", "featuregates"}},
expected: [][]string{{"version"}, {"get", "networkpolicy"}, {"get", "appliedtogroup"}, {"get", "addressgroup"}, {"get", "controllerinfo"}, {"supportbundle"}, {"traceflow"}, {"packetcapture"}, {"get", "featuregates"}},
},
{
name: "Antctl running against agent mode",
mode: "agent",
expected: [][]string{{"version"}, {"get", "podmulticaststats"}, {"log-level"}, {"get", "networkpolicy"}, {"get", "appliedtogroup"}, {"get", "addressgroup"}, {"get", "agentinfo"}, {"get", "podinterface"}, {"get", "ovsflows"}, {"trace-packet"}, {"get", "serviceexternalip"}, {"get", "memberlist"}, {"get", "bgppolicy"}, {"get", "bgppeers"}, {"get", "bgproutes"}, {"supportbundle"}, {"traceflow"}, {"get", "featuregates"}},
expected: [][]string{{"version"}, {"get", "podmulticaststats"}, {"log-level"}, {"get", "networkpolicy"}, {"get", "appliedtogroup"}, {"get", "addressgroup"}, {"get", "agentinfo"}, {"get", "podinterface"}, {"get", "ovsflows"}, {"trace-packet"}, {"get", "serviceexternalip"}, {"get", "memberlist"}, {"get", "bgppolicy"}, {"get", "bgppeers"}, {"get", "bgproutes"}, {"supportbundle"}, {"traceflow"}, {"packetcapture"}, {"get", "featuregates"}},
},
{
name: "Antctl running against flow-aggregator mode",
Expand Down
325 changes: 325 additions & 0 deletions pkg/antctl/raw/packetcapture/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
// Copyright 2025 Antrea Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package packetcapture

import (
"context"
"errors"
"fmt"
"net"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/spf13/afero"
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/utils/ptr"

"antrea.io/antrea/pkg/antctl/raw"
"antrea.io/antrea/pkg/apis/crd/v1alpha1"
antrea "antrea.io/antrea/pkg/client/clientset/versioned"
"antrea.io/antrea/pkg/util/env"
)

var (
defaultTimeout = time.Second * 60
Command *cobra.Command
getClients = getConfigAndClients
getCopier = getPodFile
defaultFS = afero.NewOsFs()
)
var option = &struct {
source string
dest string
nowait bool
timeout time.Duration
number int32
flow string
outputDir string
}{}

var packetCaptureExample = strings.TrimSpace(`
Start capture packets from pod1 to pod2, both Pods are in Namespace default
$ antctl packetcaputre -S pod1 -D pod2
Start capture packets from pod1 in Namespace ns1 to a destination IP
$ antctl packetcapture -S ns1/pod1 -D 192.168.123.123
Start capture UDP packets from pod1 to pod2, with destination port 1234
$ antctl packetcapture -S pod1 -D pod2 -f udp,udp_dst=1234
Save the packets file to a specified directory
$ antctl packetcapture -S 192.168.123.123 -D pod2 -f tcp,tcp_dst=80 -o /tmp
`)

func init() {
Command = &cobra.Command{
Use: "packetcapture",
Short: "Start capture packets",
Long: "Start capture packets on the target flow.",
Aliases: []string{"pc", "packetcaptures"},
Example: packetCaptureExample,
RunE: packetCaptureRunE,
}

Command.Flags().StringVarP(&option.source, "source", "S", "", "source of the the PacketCapture: Namespace/Pod, Pod, or IP")
Command.Flags().StringVarP(&option.dest, "destination", "D", "", "destination of the PacketCapture: Namespace/Pod, Pod, or IP")
Command.Flags().Int32VarP(&option.number, "number", "n", 0, "target packets number")
Command.Flags().StringVarP(&option.flow, "flow", "f", "", "specify the flow (packet headers) of the PacketCapture , including tcp_src, tcp_dst, udp_src, udp_dst")
Command.Flags().BoolVarP(&option.nowait, "nowait", "", false, "if set, command returns without retrieving results")
Command.Flags().StringVarP(&option.outputDir, "output-dir", "o", ".", "save the packets file to the target directory")
}

var protocols = map[string]int32{
"icmp": 1,
"tcp": 6,
"udp": 17,
}

func getFlowFields(flow string) (map[string]int, error) {
fields := map[string]int{}
for _, v := range strings.Split(flow, ",") {
kv := strings.Split(v, "=")
if len(kv) == 2 && len(kv[0]) != 0 && len(kv[1]) != 0 {
r, err := strconv.Atoi(kv[1])
if err != nil {
return nil, err
}
fields[kv[0]] = r
} else if len(kv) == 1 {
if len(kv[0]) != 0 {
fields[v] = 0
}
} else {
return nil, fmt.Errorf("%s is not valid in flow", v)
}
}
return fields, nil
}

func getConfigAndClients(cmd *cobra.Command) (*rest.Config, kubernetes.Interface, antrea.Interface, error) {
kubeConfig, err := raw.ResolveKubeconfig(cmd)
if err != nil {
return nil, nil, nil, err
}
k8sClientset, client, err := raw.SetupClients(kubeConfig)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to create clientset: %w", err)
}
return kubeConfig, k8sClientset, client, nil
}

func getPodFile(cmd *cobra.Command) (PodFileCopy, error) {
config, client, _, err := getClients(cmd)
if err != nil {
return nil, err
}
return &podFile{
restConfig: config,
restInterface: client.CoreV1().RESTClient(),
}, nil
}

func packetCaptureRunE(cmd *cobra.Command, args []string) error {
option.timeout, _ = cmd.Flags().GetDuration("timeout")
if option.timeout > time.Hour {
return errors.New("timeout cannot be longer than 1 hour")
}
if option.timeout == 0 {
option.timeout = defaultTimeout
}
if option.number == 0 {
return errors.New("packet number should be larger than 0")
}

_, _, antreaClient, err := getClients(cmd)
if err != nil {
return err
}
pc, err := newPacketCapture()
if err != nil {
return fmt.Errorf("error when filling up PacketCapture config: %w", err)
}
createCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if _, err := antreaClient.CrdV1alpha1().PacketCaptures().Create(createCtx, pc, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("error when creating PacketCapture, is PacketCapture feature gate enabled? %w", err)
}
defer func() {
if !option.nowait {
if err = antreaClient.CrdV1alpha1().PacketCaptures().Delete(context.TODO(), pc.Name, metav1.DeleteOptions{}); err != nil {
fmt.Fprintf(cmd.OutOrStdout(), "error when deleting PacketCapture: %s", err.Error())
}
}
}()

if option.nowait {
fmt.Fprintf(cmd.OutOrStdout(), "PacketCapture Name: %s\n", pc.Name)
return nil
}

var latestPC *v1alpha1.PacketCapture
err = wait.PollUntilContextTimeout(context.TODO(), 1*time.Second, option.timeout, false, func(ctx context.Context) (bool, error) {
res, err := antreaClient.CrdV1alpha1().PacketCaptures().Get(ctx, pc.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
for _, cond := range res.Status.Conditions {
if cond.Type == v1alpha1.PacketCaptureComplete && cond.Status == metav1.ConditionTrue {
latestPC = res
return true, nil
}
}
return false, nil

})

if wait.Interrupted(err) {
err = errors.New("timeout waiting for PacketCapture done")
if latestPC == nil {
return err
}
} else if err != nil {
return fmt.Errorf("error when retrieving PacketCapture: %w", err)
}

splits := strings.Split(latestPC.Status.FilePath, ":")
fileName := filepath.Base(splits[1])
copier, _ := getCopier(cmd)
err = copier.CopyFromPod(context.TODO(), env.GetAntreaNamespace(), splits[0], "antrea-agent", splits[1], option.outputDir)
if err == nil {
fmt.Fprintf(cmd.OutOrStdout(), "Packet File: %s\n", filepath.Join(option.outputDir, fileName))
}
return err
}

func parseEndpoint(endpoint string) (pod *v1alpha1.PodReference, ip *string) {
parsedIP := net.ParseIP(endpoint)
if parsedIP != nil && parsedIP.To4() != nil {
ip = ptr.To(parsedIP.String())
} else {
split := strings.Split(endpoint, "/")
if len(split) == 1 {
pod = &v1alpha1.PodReference{
Namespace: "default",
Name: split[0],
}
} else if len(split) == 2 && len(split[0]) != 0 && len(split[1]) != 0 {
pod = &v1alpha1.PodReference{
Namespace: split[0],
Name: split[1],
}
}
}
return
}

func getPCName(src, dest string) string {
replace := func(s string) string {
return strings.ReplaceAll(s, "/", "-")
}
prefix := fmt.Sprintf("%s-%s", replace(src), replace(dest))
if option.nowait {
return prefix
}
return fmt.Sprintf("%s-%s", prefix, rand.String(8))
}

func parseFlow() (*v1alpha1.Packet, error) {
cleanFlow := strings.ReplaceAll(option.flow, " ", "")
fields, err := getFlowFields(cleanFlow)
if err != nil {
return nil, fmt.Errorf("error when parsing the flow: %w", err)
}
var pkt v1alpha1.Packet
pkt.IPFamily = v1.IPv4Protocol
for k, v := range protocols {
if _, ok := fields[k]; ok {
pkt.Protocol = ptr.To(intstr.FromInt32(v))
break
}
}
if r, ok := fields["tcp_src"]; ok {
pkt.TransportHeader.TCP = new(v1alpha1.TCPHeader)
pkt.TransportHeader.TCP.SrcPort = ptr.To(int32(r))
}
if r, ok := fields["tcp_dst"]; ok {
if pkt.TransportHeader.TCP == nil {
pkt.TransportHeader.TCP = new(v1alpha1.TCPHeader)
}
pkt.TransportHeader.TCP.DstPort = ptr.To(int32(r))
}
if r, ok := fields["udp_src"]; ok {
pkt.TransportHeader.UDP = new(v1alpha1.UDPHeader)
pkt.TransportHeader.UDP.SrcPort = ptr.To(int32(r))
}
if r, ok := fields["udp_dst"]; ok {
if pkt.TransportHeader.UDP != nil {
pkt.TransportHeader.UDP = new(v1alpha1.UDPHeader)
}
pkt.TransportHeader.UDP.DstPort = ptr.To(int32(r))
}
return &pkt, nil
}

func newPacketCapture() (*v1alpha1.PacketCapture, error) {
var src v1alpha1.Source
if option.source != "" {
src.Pod, src.IP = parseEndpoint(option.source)
if src.Pod == nil && src.IP == nil {
return nil, fmt.Errorf("source should be in the format of Namespace/Pod, Pod, or IPv4")
}
}

var dst v1alpha1.Destination
if option.dest != "" {
dst.Pod, dst.IP = parseEndpoint(option.dest)
if dst.Pod == nil && dst.IP == nil {
return nil, fmt.Errorf("destination should be in the format of Namespace/Pod, Pod, or IPv4")
}
}

if src.Pod == nil && dst.Pod == nil {
return nil, errors.New("one of source and destination must be a Pod")
}
pkt, err := parseFlow()
if err != nil {
return nil, fmt.Errorf("failed to parse flow: %w", err)
}

name := getPCName(option.source, option.dest)
pc := &v1alpha1.PacketCapture{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1alpha1.PacketCaptureSpec{
Source: src,
Destination: dst,
Packet: pkt,
CaptureConfig: v1alpha1.CaptureConfig{
FirstN: &v1alpha1.PacketCaptureFirstNConfig{
Number: option.number,
},
},
},
}
return pc, nil
}
Loading

0 comments on commit b27b72b

Please sign in to comment.