Skip to content

Commit

Permalink
feat(ingest): add tetragon ingester
Browse files Browse the repository at this point in the history
Also add hubble DDL.
  • Loading branch information
ernado committed Jan 30, 2025
1 parent 421f0b7 commit 21f7954
Show file tree
Hide file tree
Showing 14 changed files with 2,445 additions and 75 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,8 @@ issues:
linters: [ gosec ]
text: "G204" # Subprocess launched with variable

- linters: [ gosec ]
text: "G115" # uint32 -> uint16

- linters: [ revive ]
text: "comment on exported const .+ should be of the form"
34 changes: 34 additions & 0 deletions _hack/k8s/vega-ingest.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: vega-ingest
namespace: vega
spec:
replicas: 1
selector:
matchLabels:
app: vega-ingest
template:
metadata:
labels:
app: vega-ingest
spec:
containers:
- name: ingest
image: vega-ingest
imagePullPolicy: Never
env:
- name: OTEL_EXPORTER_OTLP_PROTOCOL
value: "grpc"
- name: OTEL_EXPORTER_OTLP_ENDPOINT
value: "http://otel-collector.monitoring.svc.cluster.local:4317"
- name: KAFKA_ADDR
value: "queue-kafka.kafka.svc.cluster.local:9092"
- name: CLICKHOUSE_ADDR
value: "chi-clickhouse-default-0-0.clickhouse:9000"
- name: CLICKHOUSE_USER
value: "admin"
- name: CLICKHOUSE_PASSWORD
value: "admin"
- name: CLICKHOUSE_DB
value: "default"
62 changes: 5 additions & 57 deletions cmd/vega-agent/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,71 +2,19 @@ package main

import (
"context"
"os"
"strings"
"sync"
"time"

"github.com/go-faster/errors"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)

// KafkaAddrs returns kafka addresses from environment variable.
func KafkaAddrs() []string {
v := os.Getenv("KAFKA_ADDR")
var list []string
for _, s := range strings.Split(v, ",") {
s = strings.TrimSpace(s)
if s != "" {
list = append(list, s)
}
}
return list
}

func KafkaDialer() *kafka.Dialer {
d := &kafka.Dialer{
KeepAlive: time.Second * 10,
Timeout: time.Second * 3,
}
if user := os.Getenv("KAFKA_USER"); user != "" {
d.SASLMechanism = plain.Mechanism{
Username: user,
Password: os.Getenv("KAFKA_PASSWORD"),
}
}
return d
}

// KafkaTransport from environment.
func KafkaTransport() *kafka.Transport {
d := KafkaDialer()
return &kafka.Transport{
Dial: d.DialFunc,
DialTimeout: d.Timeout,
SASL: d.SASLMechanism,
}
}

// KafkaBalancer from environment.
func KafkaBalancer() kafka.Balancer {
switch os.Getenv("KAFKA_BALANCER") {
case "", "least_bytes":
return &kafka.LeastBytes{}
case "hash":
return &kafka.Hash{}
case "round_robin":
return &kafka.RoundRobin{}
default:
panic("unknown balancer: " + os.Getenv("KAFKA_BALANCER"))
}
}
"github.com/go-faster/vega/internal/kfk"
)

type KafkaProducer struct {
addrs []string
Expand Down Expand Up @@ -97,8 +45,8 @@ func (k *KafkaProducer) newWriter(topic string) *kafka.Writer {

Addr: kafka.TCP(k.addrs...),
Topic: topic,
Balancer: KafkaBalancer(),
Transport: KafkaTransport(),
Balancer: kfk.KafkaBalancer(),
Transport: kfk.Transport(),

Logger: fnLogger(func(s string, i ...interface{}) {
lg.Sugar().Debugf(s, i...)
Expand Down Expand Up @@ -134,7 +82,7 @@ func (f fnLogger) Printf(s string, i ...interface{}) {
}

func NewKafkaProducer(lg *zap.Logger, provider metric.MeterProvider) (*KafkaProducer, error) {
addrs := KafkaAddrs()
addrs := kfk.Addrs()
lg.Info("Initializing kafka producer",
zap.Strings("addrs", addrs),
zap.Int("addrs.count", len(addrs)),
Expand Down
Loading

0 comments on commit 21f7954

Please sign in to comment.