Skip to content

Commit

Permalink
Pull har in for synth feed. (#636)
Browse files Browse the repository at this point in the history
  • Loading branch information
i3149 authored Nov 30, 2023
1 parent 03641f4 commit 0cbf1ab
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 44 deletions.
66 changes: 66 additions & 0 deletions pkg/cat/jchf.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,56 @@ func (kc *KTranslate) doEnrichments(ctx context.Context, msgs []*kt.JCHF) []*kt.
msg.CustomStr["src_host"] = kc.resolver.Resolve(ctx, msg.SrcAddr, false)
msg.CustomStr["dst_host"] = kc.resolver.Resolve(ctx, msg.DstAddr, false)
}

// This data is typically json, try to parse and pull out.
if msg.EventType == kt.KENTIK_EVENT_SYNTH {
if rawStr, ok := msg.CustomStr["error_cause/trace_route"]; ok {
if rawStr != "" {
strData := []interface{}{}
if err := json.Unmarshal([]byte(rawStr), &strData); err == nil {
if len(strData) > 0 {
switch sd := strData[0].(type) {
case map[string]interface{}:
for key, val := range sd {
switch tv := val.(type) {
case string:
msg.CustomStr[key] = tv
case int:
msg.CustomInt[key] = int32(tv)
case int64:
msg.CustomBigInt[key] = tv
case float64:
msg.CustomBigInt[key] = int64(tv)
case map[string]interface{}:
if hv, ok := tv["har"]; ok {
switch av := hv.(type) {
case []interface{}:
if len(av) > 0 {
switch iner := av[0].(type) {
case map[string]interface{}:
if path, ok := iner["path"]; ok {
switch pt := path.(type) {
case string:
kc.getHar(ctx, pt, msg)
}
}
}
}
}
}
case nil:
// Noop here.
default:
// And noop here.
}
}
delete(msg.CustomStr, "error_cause/trace_route")
}
}
}
}
}
}
}

// If there's an outside enrichment service, send over here.
Expand All @@ -545,3 +595,19 @@ func (kc *KTranslate) doEnrichments(ctx context.Context, msgs []*kt.JCHF) []*kt.

return msgs
}

// Pulls in a har file if possible.
func (kc *KTranslate) getHar(ctx context.Context, path string, msg *kt.JCHF) {
if kc.objmgr != nil {
data, err := kc.objmgr.Get(ctx, path)
if err != nil {
kc.log.Errorf("Cannot get path %s %v", path, err)
return
}

var har kt.HarFile
if err := json.Unmarshal(data, &har); err == nil {
msg.Har = &har
}
}
}
18 changes: 18 additions & 0 deletions pkg/cat/kkc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/kentik/ktranslate/pkg/maps"
"github.com/kentik/ktranslate/pkg/rollup"
ss "github.com/kentik/ktranslate/pkg/sinks"
"github.com/kentik/ktranslate/pkg/sinks/s3"
"github.com/kentik/ktranslate/pkg/util/enrich"
"github.com/kentik/ktranslate/pkg/util/gopatricia/patricia"
"github.com/kentik/ktranslate/pkg/util/resolv"
Expand Down Expand Up @@ -206,6 +207,15 @@ func NewKTranslate(config *ktranslate.Config, log logger.ContextL, registry go_m
// Get some randomness
rand.Seed(time.Now().UnixNano())

// If configured, install the s3 sink as a cloud objmgr.
if config.S3Sink.Bucket != "" {
s3mgr, err := s3.NewSink(log.GetLogger().GetUnderlyingLogger(), registry, config.S3Sink)
if err != nil {
return nil, err
}
kc.objmgr = s3mgr
}

return kc, nil
}

Expand Down Expand Up @@ -618,6 +628,14 @@ func (kc *KTranslate) Run(ctx context.Context) error {
}
}

// If there's a objmgr, init this also.
if kc.objmgr != nil {
err := kc.objmgr.Init(ctx, format, compression, kc.format)
if err != nil {
return err
}
}

// Set up api auth system if this is set. Allows kproxy|kprobe|kappa|ksynth and others to use this without phoneing home to kentik.
if kc.authConfig != nil {
authr, err := auth.NewServer(kc.authConfig, kc.config.SNMPInput.SNMPFile, kc.log, snmp.ServiceName)
Expand Down
1 change: 1 addition & 0 deletions pkg/cat/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type KTranslate struct {
authConfig *auth.AuthConfig
confMgr config.ConfigManager
shutdown func(string)
objmgr sinks.CloudObjectManager
}

type CustomMapper struct {
Expand Down
19 changes: 1 addition & 18 deletions pkg/formats/nrm/nrm.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ var (
"responseStart": true,
"secureConnectionStart": true,
"tlsProtocol": true,
"har_file": true,
}
)

Expand Down Expand Up @@ -425,7 +426,6 @@ func (f *NRMFormat) fromKSynth(in *kt.JCHF) []NRMetric {
return nil // Don't worry about timeouts and errors for now.
}

rawStr := in.CustomStr["error_cause/trace_route"] // Pull this out early.
metrics := util.GetSynMetricNameSet(in.CustomInt["result_type"])
attr := map[string]interface{}{}
f.mux.RLock()
Expand All @@ -441,23 +441,6 @@ func (f *NRMFormat) fromKSynth(in *kt.JCHF) []NRMetric {
attr["instrumentation.name"] = tt
}

// If there's str00 data, try to unserialize and pass in useful bits.
if rawStr != "" {
strData := []interface{}{}
if err := json.Unmarshal([]byte(rawStr), &strData); err == nil {
if len(strData) > 0 {
switch sd := strData[0].(type) {
case map[string]interface{}:
for key, _ := range synthWLAttr {
if val, ok := sd[key]; ok {
attr[key] = val
}
}
}
}
}
}

for k, v := range attr { // White list only a few attributes here.
if !synthWLAttr[k] {
delete(attr, k)
Expand Down
5 changes: 5 additions & 0 deletions pkg/formats/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ func SetAttr(attr map[string]interface{}, in *kt.JCHF, metrics map[string]kt.Met
}
}

// Copy this over as a deap struct.
if in.Har != nil {
attr["har_file"] = &in.Har
}

if lastMetadata != nil {
for k, v := range lastMetadata.DeviceInfo {
attr[k] = v
Expand Down
33 changes: 32 additions & 1 deletion pkg/kt/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ type JCHF struct {
CustomMetrics map[string]MetricInfo `json:"-"`
CustomTables map[string]DeviceTableMetadata `json:"-"`
MatchAttr map[string]*regexp.Regexp `json:"-"`
ApplySample bool `json:"-"` // Should this value be subject to sampling?
ApplySample bool `json:"-"` // Should this value be subject to sampling?
Har *HarFile `json:"har_file"` // Let you attatch a har file to this object if needed.
}

type MetricInfo struct {
Expand Down Expand Up @@ -416,3 +417,33 @@ func (o *Output) BodyLen() int {
}
return len(o.Body)
}

type HarFile struct {
HarLog HarLog `json:"log"`
}

type HarLog struct {
Entries []Entry `json:"entries"`
}

type Entry struct {
Request Request `json:"request"`
Response Response `json:"response"`
Time float64 `json:"time"`
}

type Request struct {
Method string `json:"method"`
Url string `json:"url"`
}

type Response struct {
Status int `json:"status"`
Content Content `json:"content"`
}

type Content struct {
MimeType string `json:"mimeType"`
Size int64 `json:"size"`
Compression int64 `json:"compression"`
}
81 changes: 56 additions & 25 deletions pkg/sinks/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
go_metrics "github.com/kentik/go-metrics"
"github.com/kentik/ktranslate"
Expand Down Expand Up @@ -55,6 +56,7 @@ type S3Sink struct {
buf *bytes.Buffer
mux sync.RWMutex
config *ktranslate.S3SinkConfig
dl *s3manager.Downloader
}

type S3Metric struct {
Expand Down Expand Up @@ -107,6 +109,7 @@ func (s *S3Sink) Init(ctx context.Context, format formats.Format, compression kt
}

s.client = s3manager.NewUploader(sess)
s.dl = s3manager.NewDownloader(sess)

} else if s.config.AssumeRoleARN != "" || s.config.EC2InstanceProfile {
if err := s.get_tmp_credentials(ctx); err != nil {
Expand All @@ -116,6 +119,7 @@ func (s *S3Sink) Init(ctx context.Context, format formats.Format, compression kt
sess := session.Must(session.NewSession())
s.Infof("Session is created using default settings")
s.client = s3manager.NewUploader(sess)
s.dl = s3manager.NewDownloader(sess)
}

switch compression {
Expand Down Expand Up @@ -162,6 +166,27 @@ func (s *S3Sink) Send(ctx context.Context, payload *kt.Output) {
s.buf.Write(payload.Body)
}

func (s *S3Sink) Get(ctx context.Context, path string) ([]byte, error) {
buf := aws.NewWriteAtBuffer([]byte{})
size, err := s.dl.DownloadWithContext(ctx, buf, &s3.GetObjectInput{
Bucket: aws.String(s.Bucket),
Key: aws.String(path),
})
if err != nil {
return nil, err
}
return buf.Bytes()[0:size], nil
}

func (s *S3Sink) Put(ctx context.Context, path string, data []byte) error {
_, err := s.client.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(s.Bucket),
Body: bytes.NewBuffer(data),
Key: aws.String(path),
})
return err
}

func (s *S3Sink) send(ctx context.Context, payload []byte) {
_, err := s.client.UploadWithContext(ctx, &s3manager.UploadInput{
Bucket: aws.String(s.Bucket),
Expand Down Expand Up @@ -189,34 +214,40 @@ func (s *S3Sink) HttpInfo() map[string]float64 {

func (s *S3Sink) get_tmp_credentials(ctx context.Context) error {
// First, make sure we can get some credentials:
creds, err := s.tmp_credentials(ctx)
creds, dlc, err := s.tmp_credentials(ctx)
if err != nil {
return err
}
s.client = creds
s.dl = dlc

// Now loop forever getting new creds.
newCredTick := time.NewTicker(time.Duration(s.config.AssumeRoleOrInstanceProfileIntervalSeconds) * time.Second)
defer newCredTick.Stop()

for {
select {
case _ = <-newCredTick.C:
creds, err := s.tmp_credentials(ctx)
if err != nil {
// In this case, keep trying while not replacing the old creds, maybe next time around will work.
s.Errorf("Cannot get new AWS creds: %v", err)
} else {
s.client = creds
}
go func() {
newCredTick := time.NewTicker(time.Duration(s.config.AssumeRoleOrInstanceProfileIntervalSeconds) * time.Second)
defer newCredTick.Stop()

for {
select {
case _ = <-newCredTick.C:
creds, dlc, err := s.tmp_credentials(ctx)
if err != nil {
// In this case, keep trying while not replacing the old creds, maybe next time around will work.
s.Errorf("Cannot get new AWS creds: %v", err)
} else {
s.client = creds
s.dl = dlc
}

case <-ctx.Done():
return nil
case <-ctx.Done():
return
}
}
}
}()

return nil
}

func (s *S3Sink) tmp_credentials(ctx context.Context) (*s3manager.Uploader, error) {
func (s *S3Sink) tmp_credentials(ctx context.Context) (*s3manager.Uploader, *s3manager.Downloader, error) {

if s.config.EC2InstanceProfile && s.config.AssumeRoleARN != "" {

Expand All @@ -231,14 +262,14 @@ func (s *S3Sink) tmp_credentials(ctx context.Context) (*s3manager.Uploader, erro
_, err_role := ec2_role_creds.Get()
if err_role != nil {
s.Errorf("Not able to retrieve credentials via Instance Profile. ARN: %v. ERROR: %v", s.config.AssumeRoleARN, err_role)
return nil, err_role
return nil, nil, err_role
}

creds := stscreds.NewCredentials(sess_tmp, s.config.AssumeRoleARN)
_, err_creds := creds.Get()
if err_creds != nil {
s.Errorf("Assume Role ARN doesn't work. ARN: %v. ERROR: %v", s.config.AssumeRoleARN, err_creds)
return nil, err_creds
return nil, nil, err_creds
}

// Creating a new session from assume role
Expand All @@ -250,12 +281,12 @@ func (s *S3Sink) tmp_credentials(ctx context.Context) (*s3manager.Uploader, erro
)
if err != nil {
s.Errorf("Session is not created ERROR: %v", err)
return nil, err
return nil, nil, err
} else {
s.Infof("Session is created using assume role based on EC2 Instance Profile")
}

return s3manager.NewUploader(sess), nil
return s3manager.NewUploader(sess), s3manager.NewDownloader(sess), nil

} else {

Expand All @@ -269,7 +300,7 @@ func (s *S3Sink) tmp_credentials(ctx context.Context) (*s3manager.Uploader, erro
_, err := creds.Get()
if err != nil {
s.Errorf("Assume Role ARN doesn't work. ARN: %v, %v", s.config.AssumeRoleARN, err)
return nil, err
return nil, nil, err
}

// Creating a new session from assume role
Expand All @@ -281,11 +312,11 @@ func (s *S3Sink) tmp_credentials(ctx context.Context) (*s3manager.Uploader, erro
)
if err != nil {
s.Errorf("Session is not created with region %v, %v", s.config.Region, err)
return nil, err
return nil, nil, err
} else {
s.Infof("Session is created using assume role via shared configuration")
}

return s3manager.NewUploader(sess), nil
return s3manager.NewUploader(sess), s3manager.NewDownloader(sess), nil
}
}
Loading

0 comments on commit 0cbf1ab

Please sign in to comment.