Skip to content

Commit

Permalink
feat(agent): send events to kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Jan 30, 2025
1 parent 304f90a commit 77d35e5
Show file tree
Hide file tree
Showing 7 changed files with 2,735 additions and 7 deletions.
2 changes: 2 additions & 0 deletions _hack/k8s/vega-agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ spec:
value: "grpc"
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "http://otel-collector.monitoring.svc.cluster.local:4317"
- name: KAFKA_ADDR
value: "queue-kafka.kafka.svc.cluster.local:9092"
volumeMounts:
- mountPath: /var/run/tetragon
name: tetragon
Expand Down
2,485 changes: 2,485 additions & 0 deletions _hack/kafka.values.yml

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions _hack/kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
listeners:
client:
protocol: PLAINTEXT

provisioning:
enabled: true
replicationFactor: 1
topics:
- name: hubble
replicationFactor: 1
- name: tetragon
replicationFactor: 1
16 changes: 11 additions & 5 deletions cmd/vega-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
func main() {
app.Run(func(ctx context.Context, lg *zap.Logger, m *app.Telemetry) error {
ctx = zctx.WithOpenTelemetryZap(ctx)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
<-ctx.Done()
return ctx.Err()
})
meter := m.MeterProvider().Meter("vega-agent")
kafkaProducer, err := NewKafkaProducer(lg, m.MeterProvider())
if err != nil {
return errors.Wrap(err, "create kafka producer")
}
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
// Hubble component.
const (
Expand Down Expand Up @@ -86,6 +86,9 @@ func main() {
zap.String("node", resp.NodeName),
)
flowsCount.Add(ctx, 1)
if err := kafkaProducer.Produce(ctx, "hubble", resp); err != nil {
return errors.Wrap(err, "produce")
}
}
})
g.Go(func() error {
Expand Down Expand Up @@ -142,6 +145,9 @@ func main() {
zap.String("node", resp.NodeName),
)
eventsCount.Add(ctx, 1)
if err := kafkaProducer.Produce(ctx, "tetragon", resp); err != nil {
return errors.Wrap(err, "produce")
}
}
})
return g.Wait()
Expand Down
190 changes: 190 additions & 0 deletions cmd/vega-agent/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package main

import (
"context"
"os"
"strings"
"sync"
"time"

"github.com/go-faster/errors"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

// KafkaAddrs returns kafka addresses from environment variable.
func KafkaAddrs() []string {
v := os.Getenv("KAFKA_ADDR")
var list []string
for _, s := range strings.Split(v, ",") {
s = strings.TrimSpace(s)
if s != "" {
list = append(list, s)
}
}
return list
}

func KafkaDialer() *kafka.Dialer {
d := &kafka.Dialer{
KeepAlive: time.Second * 10,
Timeout: time.Second * 3,
}
if user := os.Getenv("KAFKA_USER"); user != "" {
d.SASLMechanism = plain.Mechanism{
Username: user,
Password: os.Getenv("KAFKA_PASSWORD"),
}
}
return d
}

// KafkaTransport from environment.
func KafkaTransport() *kafka.Transport {
d := KafkaDialer()
return &kafka.Transport{
Dial: d.DialFunc,
DialTimeout: d.Timeout,
SASL: d.SASLMechanism,
}
}

// KafkaBalancer from environment.
func KafkaBalancer() kafka.Balancer {
switch os.Getenv("KAFKA_BALANCER") {
case "", "least_bytes":
return &kafka.LeastBytes{}
case "hash":
return &kafka.Hash{}
case "round_robin":
return &kafka.RoundRobin{}
default:
panic("unknown balancer: " + os.Getenv("KAFKA_BALANCER"))
}
}

type KafkaProducer struct {
addrs []string
lg *zap.Logger
writers map[string]*kafka.Writer
mux sync.Mutex
messagesFailed metric.Int64Counter
messagesSent metric.Int64Counter
}

func (k *KafkaProducer) writer(topic string) (*kafka.Writer, error) {
k.mux.Lock()
defer k.mux.Unlock()
writer, ok := k.writers[topic]
if !ok {
var err error
writer, err = k.newWriter(topic)
if err != nil {
return nil, errors.Wrap(err, "create kafka writer")
}
k.writers[topic] = writer
}
return writer, nil
}

func (k *KafkaProducer) newWriter(topic string) (*kafka.Writer, error) {

Check failure on line 95 in cmd/vega-agent/producer.go

View workflow job for this annotation

GitHub Actions / lint / run

(*KafkaProducer).newWriter - result 1 (error) is always nil (unparam)
lg := k.lg.Named(topic)
return &kafka.Writer{
Async: true,
BatchSize: 10_000,
BatchTimeout: time.Second,

Addr: kafka.TCP(k.addrs...),
Topic: topic,
Balancer: KafkaBalancer(),
Transport: KafkaTransport(),

Logger: fnLogger(func(s string, i ...interface{}) {
lg.Sugar().Debugf(s, i...)
}),
ErrorLogger: fnLogger(func(s string, i ...interface{}) {
lg.Sugar().Errorf(s, i...)
}),
Completion: func(messages []kafka.Message, err error) {
ctx := context.Background()
count := int64(len(messages))
withAttributes := metric.WithAttributes(
attribute.String("topic", topic),
)
if err == nil {
k.messagesSent.Add(ctx, count, withAttributes)
lg.Debug("Kafka messages completed",
zap.Error(err), zap.Int64("messages.count", count),
)
} else {
k.messagesFailed.Add(ctx, count, withAttributes)
lg.Error("Kafka messages failed",
zap.Error(err), zap.Int64("messages.count", count),
)
}
},
}, nil
}

type fnLogger func(string, ...interface{})

func (f fnLogger) Printf(s string, i ...interface{}) {
f(s, i...)
}

func NewKafkaProducer(lg *zap.Logger, provider metric.MeterProvider) (*KafkaProducer, error) {
addrs := KafkaAddrs()
lg.Info("Initializing kafka producer",
zap.Strings("addrs", addrs),
zap.Int("addrs.count", len(addrs)),
)
meter := provider.Meter("kafka.producer")
messagesSent, err := meter.Int64Counter("kafka.messages.sent")
if err != nil {
return nil, errors.Wrap(err, "register metric")
}
messagesFailed, err := meter.Int64Counter("kafka.messages.failed")
if err != nil {
return nil, errors.Wrap(err, "register metric")
}
k := &KafkaProducer{
addrs: addrs,
lg: lg,
writers: make(map[string]*kafka.Writer),

messagesSent: messagesSent,
messagesFailed: messagesFailed,
}
return k, nil
}

func (k *KafkaProducer) Produce(ctx context.Context, topic string, msg proto.Message) error {
writer, err := k.writer(topic)
if err != nil {
return errors.Wrap(err, "get kafka writer")
}
data, err := proto.Marshal(msg)
if err != nil {
return errors.Wrap(err, "marshal message")
}
return writer.WriteMessages(ctx,
kafka.Message{
Value: data,
},
)
}

func (k *KafkaProducer) Close() error {
var errs []error
for _, writer := range k.writers {
if err := writer.Close(); err != nil {
errs = append(errs, err)
}
}
return multierr.Combine(errs...)
}
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@ require (
github.com/go-faster/tetragon v1.3.2
github.com/ogen-go/ent2ogen v0.0.0-20230913015246-1d588150cabc
github.com/ogen-go/ogen v1.9.0
github.com/segmentio/kafka-go v0.4.47
github.com/spf13/cobra v1.8.1
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/otel v1.34.0
go.opentelemetry.io/otel/metric v1.34.0
go.opentelemetry.io/otel/trace v1.34.0
go.uber.org/mock v0.5.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.10.0
google.golang.org/grpc v1.70.0
google.golang.org/protobuf v1.36.3
k8s.io/api v0.32.1
k8s.io/apimachinery v0.32.1
k8s.io/client-go v0.32.1
Expand Down Expand Up @@ -122,6 +125,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63 // indirect
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7 // indirect
github.com/pingcap/tidb/parser v0.0.0-20220817134052-9709249e523a // indirect
Expand Down Expand Up @@ -169,7 +173,6 @@ require (
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/automaxprocs v1.6.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
gocloud.dev v0.27.0 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/exp v0.0.0-20241004190924-225e2abe05e6 // indirect
Expand All @@ -186,7 +189,6 @@ require (
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect
google.golang.org/protobuf v1.36.3 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
Expand Down
Loading

0 comments on commit 77d35e5

Please sign in to comment.