From b2516c8a27c88f45fa9e3fffd2da606d3f2b08a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Mieczkowski?= Date: Fri, 8 Mar 2024 22:07:49 +0100 Subject: [PATCH] Make formatter and unknown fields public in ProtoProducerMessage --- producer/proto/custom.go | 32 ++++++++++++++++++++++++++++++++ producer/proto/messages.go | 21 +++++++++++++++++---- 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/producer/proto/custom.go b/producer/proto/custom.go index 348e46e7..0f6a36ce 100644 --- a/producer/proto/custom.go +++ b/producer/proto/custom.go @@ -80,6 +80,38 @@ type FormatterConfigMapper struct { isSlice map[string]bool } +func (m *FormatterConfigMapper) GetFields() []string { + return m.fields +} + +func (m *FormatterConfigMapper) GetKey() []string { + return m.key +} + +func (m *FormatterConfigMapper) GetReMap() map[string]string { + return m.reMap +} + +func (m *FormatterConfigMapper) GetRenameMap() map[string]string { + return m.rename +} + +func (m *FormatterConfigMapper) GetRenderers() map[string]RenderFunc { + return m.render +} + +func (m *FormatterConfigMapper) GetPbMap() map[string]ProtobufFormatterConfig { + return m.pbMap +} + +func (m *FormatterConfigMapper) GetNumToPbMap() map[int32]ProtobufFormatterConfig { + return m.numToPb +} + +func (m *FormatterConfigMapper) GetIsSliceMap() map[string]bool { + return m.isSlice +} + type NetFlowMapper struct { data map[string]DataMap // maps field to destination } diff --git a/producer/proto/messages.go b/producer/proto/messages.go index 5595b9a8..503bbcad 100644 --- a/producer/proto/messages.go +++ b/producer/proto/messages.go @@ -8,6 +8,7 @@ import ( "reflect" "strings" "sync" + "sync/atomic" "google.golang.org/protobuf/encoding/protodelim" "google.golang.org/protobuf/encoding/protowire" @@ -18,7 +19,8 @@ import ( type ProtoProducerMessage struct { flowmessage.FlowMessage - formatter *FormatterConfigMapper + formatter *FormatterConfigMapper + unknownFields atomic.Pointer[map[string]interface{}] // thread safe } var protoMessagePool = sync.Pool{ @@ -41,7 +43,7 @@ func (m *ProtoProducerMessage) baseKey(h hash.Hash) { vfm := reflect.ValueOf(m) vfm = reflect.Indirect(vfm) - unkMap := m.mapUnknown() // todo: should be able to reuse if set in structure + unkMap := m.MapUnknown() for _, s := range m.formatter.key { fieldName := s @@ -67,6 +69,10 @@ func (m *ProtoProducerMessage) baseKey(h hash.Hash) { } } +func (m *ProtoProducerMessage) GetFormatter() *FormatterConfigMapper { + return m.formatter +} + func (m *ProtoProducerMessage) Key() []byte { if m.formatter == nil || len(m.formatter.key) == 0 { return nil @@ -97,7 +103,12 @@ func ExtractTag(name, original string, tag reflect.StructTag) string { return before } -func (m *ProtoProducerMessage) mapUnknown() map[string]interface{} { +func (m *ProtoProducerMessage) MapUnknown() map[string]interface{} { + unknownFields := m.unknownFields.Load() + if unknownFields != nil { + return *unknownFields + } + unkMap := make(map[string]interface{}) fmr := m.ProtoReflect() @@ -142,6 +153,8 @@ func (m *ProtoProducerMessage) mapUnknown() map[string]interface{} { } } + + m.unknownFields.Store(&unkMap) return unkMap } @@ -152,7 +165,7 @@ func (m *ProtoProducerMessage) FormatMessageReflectCustom(ext, quotes, sep, sign var i int fstr := make([]string, len(m.formatter.fields)) // todo: reuse with pool - unkMap := m.mapUnknown() + unkMap := m.MapUnknown() // iterate through the fields requested by the user for _, s := range m.formatter.fields {