Skip to content

Commit

Permalink
Make formatter and unknown fields public in ProtoProducerMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
Paweł Mieczkowski committed Mar 8, 2024
1 parent 74000ef commit b2516c8
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 4 deletions.
32 changes: 32 additions & 0 deletions producer/proto/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,38 @@ type FormatterConfigMapper struct {
isSlice map[string]bool
}

func (m *FormatterConfigMapper) GetFields() []string {
return m.fields
}

func (m *FormatterConfigMapper) GetKey() []string {
return m.key
}

func (m *FormatterConfigMapper) GetReMap() map[string]string {
return m.reMap
}

func (m *FormatterConfigMapper) GetRenameMap() map[string]string {
return m.rename
}

func (m *FormatterConfigMapper) GetRenderers() map[string]RenderFunc {
return m.render
}

func (m *FormatterConfigMapper) GetPbMap() map[string]ProtobufFormatterConfig {
return m.pbMap
}

func (m *FormatterConfigMapper) GetNumToPbMap() map[int32]ProtobufFormatterConfig {
return m.numToPb
}

func (m *FormatterConfigMapper) GetIsSliceMap() map[string]bool {
return m.isSlice
}

type NetFlowMapper struct {
data map[string]DataMap // maps field to destination
}
Expand Down
21 changes: 17 additions & 4 deletions producer/proto/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"

"google.golang.org/protobuf/encoding/protodelim"
"google.golang.org/protobuf/encoding/protowire"
Expand All @@ -18,7 +19,8 @@ import (
type ProtoProducerMessage struct {
flowmessage.FlowMessage

formatter *FormatterConfigMapper
formatter *FormatterConfigMapper
unknownFields atomic.Pointer[map[string]interface{}] // thread safe
}

var protoMessagePool = sync.Pool{
Expand All @@ -41,7 +43,7 @@ func (m *ProtoProducerMessage) baseKey(h hash.Hash) {
vfm := reflect.ValueOf(m)
vfm = reflect.Indirect(vfm)

unkMap := m.mapUnknown() // todo: should be able to reuse if set in structure
unkMap := m.MapUnknown()

for _, s := range m.formatter.key {
fieldName := s
Expand All @@ -67,6 +69,10 @@ func (m *ProtoProducerMessage) baseKey(h hash.Hash) {
}
}

func (m *ProtoProducerMessage) GetFormatter() *FormatterConfigMapper {
return m.formatter
}

func (m *ProtoProducerMessage) Key() []byte {
if m.formatter == nil || len(m.formatter.key) == 0 {
return nil
Expand Down Expand Up @@ -97,7 +103,12 @@ func ExtractTag(name, original string, tag reflect.StructTag) string {
return before
}

func (m *ProtoProducerMessage) mapUnknown() map[string]interface{} {
func (m *ProtoProducerMessage) MapUnknown() map[string]interface{} {
unknownFields := m.unknownFields.Load()
if unknownFields != nil {
return *unknownFields
}

unkMap := make(map[string]interface{})

fmr := m.ProtoReflect()
Expand Down Expand Up @@ -142,6 +153,8 @@ func (m *ProtoProducerMessage) mapUnknown() map[string]interface{} {

}
}

m.unknownFields.Store(&unkMap)
return unkMap
}

Expand All @@ -152,7 +165,7 @@ func (m *ProtoProducerMessage) FormatMessageReflectCustom(ext, quotes, sep, sign
var i int
fstr := make([]string, len(m.formatter.fields)) // todo: reuse with pool

unkMap := m.mapUnknown()
unkMap := m.MapUnknown()

// iterate through the fields requested by the user
for _, s := range m.formatter.fields {
Expand Down

0 comments on commit b2516c8

Please sign in to comment.