From 779cdcdecff89181749c08cb312d136c17096b69 Mon Sep 17 00:00:00 2001 From: mmaxiaolei <412171946@qq.com> Date: Fri, 21 Jan 2022 10:10:23 +0800 Subject: [PATCH 1/4] feat: abstract source --- pkg/core/source/abstract/source.go | 164 +++++++++++++++++++++++++++++ pkg/source/dev/source.go | 54 ++-------- pkg/source/file/source.go | 103 ++++++------------ 3 files changed, 209 insertions(+), 112 deletions(-) create mode 100644 pkg/core/source/abstract/source.go diff --git a/pkg/core/source/abstract/source.go b/pkg/core/source/abstract/source.go new file mode 100644 index 000000000..e0f36cebc --- /dev/null +++ b/pkg/core/source/abstract/source.go @@ -0,0 +1,164 @@ +package abstract + +import ( + "fmt" + "loggie.io/loggie/pkg/core/api" + "loggie.io/loggie/pkg/core/event" + "loggie.io/loggie/pkg/core/log" + "loggie.io/loggie/pkg/pipeline" +) + +type Source struct { + done chan struct{} + name string + typeName api.Type + eventPool *event.Pool + pipelineInfo pipeline.Info + context api.Context + + EventProvider func() api.Event + + startFunc func(context api.Context) + stopFunc func() + commitFunc func(events []api.Event) +} + +func ExtendsAbstractSource(info pipeline.Info, typeName api.Type) *Source { + return &Source{ + typeName: typeName, + eventPool: info.EventPool, + pipelineInfo: info, + EventProvider: func() api.Event { + return info.EventPool.Get() + }, + } +} + +// ------------------------------------------------------------------------ +// extension methods +// ------------------------------------------------------------------------ + +func (as *Source) Name() string { + return as.name +} + +func (as *Source) PipelineName() string { + return as.pipelineInfo.PipelineName +} + +func (as *Source) Epoch() pipeline.Epoch { + return as.pipelineInfo.Epoch +} + +func (as *Source) PipelineInfo() pipeline.Info { + return as.pipelineInfo +} + +func (as *Source) Event() api.Event { + return as.eventPool.Get() +} + +// ------------------------------------------------------------------------ +// implement methods of api.Source +// do not override +// ------------------------------------------------------------------------ + +func (as *Source) Category() api.Category { + return api.SOURCE +} + +func (as *Source) Type() api.Type { + return as.typeName +} + +func (as *Source) String() string { + return fmt.Sprintf("%s/%s/%s/%s", as.PipelineName(), as.Category(), as.Type(), as.Name()) +} + +func (as *Source) Init(context api.Context) { + as.name = context.Name() + as.context = context +} + +func (as *Source) Start() { + log.Info("start source: %s", as.String()) + if as.startFunc != nil { + as.startFunc(as.context) + } + log.Info("source has started: %s", as.String()) +} + +func (as *Source) Stop() { + log.Info("start stop source: %s", as.String()) + close(as.done) + if as.stopFunc != nil { + as.stopFunc() + } + log.Info("source has stopped: %s", as.String()) +} + +func (as *Source) Commit(events []api.Event) { + if as.commitFunc != nil { + as.commitFunc(events) + } + if len(events) == 0 { + return + } + // release events + as.eventPool.PutAll(events) +} + +// ------------------------------------------------------------------------ +// optional override methods +// ------------------------------------------------------------------------ + +// Config A pointer to config or nil should be returned +func (as *Source) Config() interface{} { + return nil +} + +func (as *Source) ProductLoop(productFunc api.ProductFunc) { +} + +func (as *Source) DoStart(context api.Context) { + +} + +func (as *Source) DoStop() { + +} + +func (as *Source) DoCommit(events []api.Event) { + +} + +// ------------------------------------------------------------------------ +// internal methods +// do not override +// ------------------------------------------------------------------------ + +func (as *Source) AbstractSource() *Source { + return as +} + +type SourceConvert interface { + api.Component + AbstractSource() *Source + + DoStart(context api.Context) + DoStop() + DoCommit(events []api.Event) +} + +type SourceRegisterFactory func(info pipeline.Info) SourceConvert + +func SourceRegister(t api.Type, factory SourceRegisterFactory) { + pipeline.Register(api.SOURCE, t, func(info pipeline.Info) api.Component { + convert := factory(info) + source := convert.AbstractSource() + source.startFunc = convert.DoStart + source.stopFunc = convert.DoStop + source.commitFunc = convert.DoCommit + return convert + }) +} diff --git a/pkg/source/dev/source.go b/pkg/source/dev/source.go index 901d79c41..8b0ad47ec 100644 --- a/pkg/source/dev/source.go +++ b/pkg/source/dev/source.go @@ -17,62 +17,28 @@ limitations under the License. package dev import ( - "fmt" "loggie.io/loggie/pkg/core/api" - "loggie.io/loggie/pkg/core/event" "loggie.io/loggie/pkg/core/log" + "loggie.io/loggie/pkg/core/source/abstract" "loggie.io/loggie/pkg/pipeline" ) const Type = "dev" func init() { - pipeline.Register(api.SOURCE, Type, makeSource) + abstract.SourceRegister(Type, makeSource) } -func makeSource(info pipeline.Info) api.Component { +func makeSource(info pipeline.Info) abstract.SourceConvert { return &Dev{ - stop: info.Stop, - eventPool: info.EventPool, + Source: abstract.ExtendsAbstractSource(info, Type), + stop: info.Stop, } } type Dev struct { - name string - stop bool - eventPool *event.Pool -} - -func (d *Dev) Config() interface{} { - return nil -} - -func (d *Dev) Category() api.Category { - return api.SOURCE -} - -func (d *Dev) Type() api.Type { - return Type -} - -func (d *Dev) String() string { - return fmt.Sprintf("%s/%s", api.SOURCE, Type) -} - -func (d *Dev) Init(context api.Context) { - d.name = context.Name() -} - -func (d *Dev) Start() { - -} - -func (d *Dev) Stop() { - -} - -func (d *Dev) Product() api.Event { - return nil + *abstract.Source + stop bool } func (d *Dev) ProductLoop(productFunc api.ProductFunc) { @@ -84,12 +50,12 @@ func (d *Dev) ProductLoop(productFunc api.ProductFunc) { header := make(map[string]interface{}) header["offset"] = 888 header["topic"] = "log-test" - e := d.eventPool.Get() + e := d.Event() e.Fill(e.Meta(), header, content) productFunc(e) } } -func (d *Dev) Commit(events []api.Event) { - d.eventPool.PutAll(events) +func (d *Dev) DoStart(context api.Context) { + log.Info("%s override start!", d.String()) } diff --git a/pkg/source/file/source.go b/pkg/source/file/source.go index 905ed5eca..0fe156807 100644 --- a/pkg/source/file/source.go +++ b/pkg/source/file/source.go @@ -17,10 +17,9 @@ limitations under the License. package file import ( - "fmt" "loggie.io/loggie/pkg/core/api" - "loggie.io/loggie/pkg/core/event" "loggie.io/loggie/pkg/core/log" + "loggie.io/loggie/pkg/core/source/abstract" "loggie.io/loggie/pkg/pipeline" "time" ) @@ -28,30 +27,20 @@ import ( const Type = "file" func init() { - pipeline.Register(api.SOURCE, Type, makeSource) + abstract.SourceRegister(Type, makeSource) } -func makeSource(info pipeline.Info) api.Component { - return &Source{ - pipelineName: info.PipelineName, - epoch: info.Epoch, - rc: info.R, - eventPool: info.EventPool, - sinkCount: info.SinkCount, - config: &Config{}, +func makeSource(info pipeline.Info) abstract.SourceConvert { + s := &Source{ + Source: abstract.ExtendsAbstractSource(info, Type), + config: &Config{}, } + return s } type Source struct { - pipelineName string - epoch pipeline.Epoch - rc *pipeline.RegisterCenter - eventPool *event.Pool + *abstract.Source config *Config - sinkCount int - name string - filename string - out chan api.Event productFunc api.ProductFunc r *Reader ackEnable bool @@ -65,26 +54,11 @@ type Source struct { mTask *MultiTask } -func (s *Source) Config() interface{} { - return s.config -} - -func (s *Source) Category() api.Category { - return api.SOURCE -} - -func (s *Source) Type() api.Type { - return Type -} - -func (s *Source) String() string { - return fmt.Sprintf("%s/%s/%s", s.Category(), s.Type(), s.name) -} - -func (s *Source) Init(context api.Context) { - s.name = context.Name() - s.out = make(chan api.Event, s.sinkCount) +// ------------------------------------------------------------------------ +// override methods +// ------------------------------------------------------------------------ +func (s *Source) DoStart(context api.Context) { s.ackEnable = s.config.AckConfig.Enable // init default multi agg timeout mutiTimeout := s.config.ReaderConfig.MultiConfig.Timeout @@ -104,23 +78,20 @@ func (s *Source) Init(context api.Context) { } s.isolation = Isolation{ - PipelineName: s.pipelineName, - SourceName: s.name, + PipelineName: s.PipelineName(), + SourceName: s.Name(), Level: IsolationLevel(s.config.Isolation), } -} -func (s *Source) Start() { - log.Info("start source: %s", s.String()) if s.config.ReaderConfig.MultiConfig.Active { s.multilineProcessor = GetOrCreateShareMultilineProcessor() } // register queue listener for ack if s.ackEnable { s.dbHandler = GetOrCreateShareDbHandler(s.config.DbConfig) - s.ackChainHandler = GetOrCreateShareAckChainHandler(s.sinkCount, s.config.AckConfig) - s.rc.RegisterListener(&AckListener{ - sourceName: s.name, + s.ackChainHandler = GetOrCreateShareAckChainHandler(s.PipelineInfo().SinkCount, s.config.AckConfig) + s.PipelineInfo().R.RegisterListener(&AckListener{ + sourceName: s.Name(), ackChainHandler: s.ackChainHandler, }) } @@ -131,10 +102,9 @@ func (s *Source) Start() { s.HandleHttp() } -func (s *Source) Stop() { - log.Info("start stop source: %s", s.String()) +func (s *Source) DoStop() { // Stop ack - if s.config.AckConfig.Enable { + if s.ackEnable { // stop append&ack source event s.ackChainHandler.StopTask(s.ackTask) log.Info("[%s] all ack jobs of source exit", s.String()) @@ -149,41 +119,38 @@ func (s *Source) Stop() { if s.config.ReaderConfig.MultiConfig.Active { s.multilineProcessor.StopTask(s.mTask) } - log.Info("source has stopped: %s", s.String()) } -func (s *Source) Product() api.Event { - return <-s.out +func (s *Source) DoCommit(events []api.Event) { + // ack events + if s.ackEnable { + ss := make([]*State, 0, len(events)) + for _, e := range events { + ss = append(ss, getState(e)) + } + s.ackChainHandler.ackChan <- ss + } +} + +func (s *Source) Config() interface{} { + return s.config } func (s *Source) ProductLoop(productFunc api.ProductFunc) { log.Info("%s start product loop", s.String()) s.productFunc = productFunc if s.config.ReaderConfig.MultiConfig.Active { - s.mTask = NewMultiTask(s.epoch, s.name, s.config.ReaderConfig.MultiConfig, s.eventPool, s.productFunc) + s.mTask = NewMultiTask(s.Epoch(), s.Name(), s.config.ReaderConfig.MultiConfig, s.PipelineInfo().EventPool, s.productFunc) s.multilineProcessor.StartTask(s.mTask) s.productFunc = s.multilineProcessor.Process } if s.config.AckConfig.Enable { - s.ackTask = NewAckTask(s.epoch, s.pipelineName, s.name, func(state *State) { + s.ackTask = NewAckTask(s.Epoch(), s.PipelineName(), s.Name(), func(state *State) { s.dbHandler.state <- state }) s.ackChainHandler.StartTask(s.ackTask) } - s.watchTask = NewWatchTask(s.epoch, s.pipelineName, s.name, s.config.CollectConfig, s.eventPool, s.productFunc, s.r.jobChan) + s.watchTask = NewWatchTask(s.Epoch(), s.PipelineName(), s.Name(), s.config.CollectConfig, s.PipelineInfo().EventPool, s.productFunc, s.r.jobChan) // start watch source paths s.watcher.StartWatchTask(s.watchTask) } - -func (s *Source) Commit(events []api.Event) { - // ack events - if s.ackEnable { - ss := make([]*State, 0, len(events)) - for _, e := range events { - ss = append(ss, getState(e)) - } - s.ackChainHandler.ackChan <- ss - } - // release events - s.eventPool.PutAll(events) -} From 434d36f85bc94bedbf127b6c84acb6cc5e1073c4 Mon Sep 17 00:00:00 2001 From: mmaxiaolei <412171946@qq.com> Date: Fri, 21 Jan 2022 10:29:19 +0800 Subject: [PATCH 2/4] merge main --- pkg/core/source/abstract/source.go | 16 +++++----- pkg/source/dev/source.go | 51 +++++------------------------- pkg/source/file/source.go | 18 +++-------- 3 files changed, 20 insertions(+), 65 deletions(-) diff --git a/pkg/core/source/abstract/source.go b/pkg/core/source/abstract/source.go index e0f36cebc..450add4f5 100644 --- a/pkg/core/source/abstract/source.go +++ b/pkg/core/source/abstract/source.go @@ -2,10 +2,10 @@ package abstract import ( "fmt" - "loggie.io/loggie/pkg/core/api" - "loggie.io/loggie/pkg/core/event" - "loggie.io/loggie/pkg/core/log" - "loggie.io/loggie/pkg/pipeline" + "github.com/loggie-io/loggie/pkg/core/api" + "github.com/loggie-io/loggie/pkg/core/event" + "github.com/loggie-io/loggie/pkg/core/log" + "github.com/loggie-io/loggie/pkg/pipeline" ) type Source struct { @@ -18,7 +18,7 @@ type Source struct { EventProvider func() api.Event - startFunc func(context api.Context) + startFunc func() stopFunc func() commitFunc func(events []api.Event) } @@ -83,7 +83,7 @@ func (as *Source) Init(context api.Context) { func (as *Source) Start() { log.Info("start source: %s", as.String()) if as.startFunc != nil { - as.startFunc(as.context) + as.startFunc() } log.Info("source has started: %s", as.String()) } @@ -120,7 +120,7 @@ func (as *Source) Config() interface{} { func (as *Source) ProductLoop(productFunc api.ProductFunc) { } -func (as *Source) DoStart(context api.Context) { +func (as *Source) DoStart() { } @@ -145,7 +145,7 @@ type SourceConvert interface { api.Component AbstractSource() *Source - DoStart(context api.Context) + DoStart() DoStop() DoCommit(events []api.Event) } diff --git a/pkg/source/dev/source.go b/pkg/source/dev/source.go index d1b1f11aa..691b1d37a 100644 --- a/pkg/source/dev/source.go +++ b/pkg/source/dev/source.go @@ -18,14 +18,12 @@ package dev import ( "context" - "fmt" "math/rand" "github.com/loggie-io/loggie/pkg/core/api" - "github.com/loggie-io/loggie/pkg/core/event" "github.com/loggie-io/loggie/pkg/core/log" + "github.com/loggie-io/loggie/pkg/core/source/abstract" "github.com/loggie-io/loggie/pkg/pipeline" - "loggie.io/loggie/pkg/core/source/abstract" "golang.org/x/time/rate" ) @@ -41,44 +39,23 @@ func makeSource(info pipeline.Info) abstract.SourceConvert { return &Dev{ Source: abstract.ExtendsAbstractSource(info, Type), stop: info.Stop, - stop: info.Stop, - config: &Config{}, - eventPool: info.EventPool, + config: &Config{}, } } type Dev struct { *abstract.Source - stop bool - name string - stop bool - eventPool *event.Pool - config *Config - limiter *rate.Limiter - content []byte + stop bool + config *Config + limiter *rate.Limiter + content []byte } func (d *Dev) Config() interface{} { return d.config } -func (d *Dev) Category() api.Category { - return api.SOURCE -} - -func (d *Dev) Type() api.Type { - return Type -} - -func (d *Dev) String() string { - return fmt.Sprintf("%s/%s", api.SOURCE, Type) -} - -func (d *Dev) Init(context api.Context) { - d.name = context.Name() -} - -func (d *Dev) Start() { +func (d *Dev) DoStart() { d.limiter = rate.NewLimiter(rate.Limit(d.config.Qps), d.config.Qps) d.content = make([]byte, d.config.ByteSize) for i := range d.content { @@ -86,27 +63,15 @@ func (d *Dev) Start() { } } -func (d *Dev) Stop() { - -} - -func (d *Dev) Product() api.Event { - return nil -} - func (d *Dev) ProductLoop(productFunc api.ProductFunc) { ctx := context.Background() log.Info("%s start product loop", d.String()) content := d.content for !d.stop { header := make(map[string]interface{}) - e := d.eventPool.Get() + e := d.Event() e.Fill(e.Meta(), header, content) d.limiter.Wait(ctx) productFunc(e) } } - -func (d *Dev) DoStart(context api.Context) { - log.Info("%s override start!", d.String()) -} diff --git a/pkg/source/file/source.go b/pkg/source/file/source.go index 77efe732f..07042bcd0 100644 --- a/pkg/source/file/source.go +++ b/pkg/source/file/source.go @@ -17,15 +17,10 @@ limitations under the License. package file import ( - "fmt" "github.com/loggie-io/loggie/pkg/core/api" - "github.com/loggie-io/loggie/pkg/core/event" "github.com/loggie-io/loggie/pkg/core/log" + "github.com/loggie-io/loggie/pkg/core/source/abstract" "github.com/loggie-io/loggie/pkg/pipeline" - "loggie.io/loggie/pkg/core/api" - "loggie.io/loggie/pkg/core/log" - "loggie.io/loggie/pkg/core/source/abstract" - "loggie.io/loggie/pkg/pipeline" "time" ) @@ -63,7 +58,7 @@ type Source struct { // override methods // ------------------------------------------------------------------------ -func (s *Source) DoStart(context api.Context) { +func (s *Source) DoStart() { s.ackEnable = s.config.AckConfig.Enable // init default multi agg timeout mutiTimeout := s.config.ReaderConfig.MultiConfig.Timeout @@ -137,12 +132,8 @@ func (s *Source) DoCommit(events []api.Event) { } } -func (s *Source) Product() api.Event { - return <-s.out -} - func (s *Source) ProductLoop(productFunc api.ProductFunc) { - log.Info("%s start product loop", s.String()) + log.Info("[%s] start product loop", s.String()) s.productFunc = productFunc if s.config.ReaderConfig.MultiConfig.Active { s.mTask = NewMultiTask(s.Epoch(), s.Name(), s.config.ReaderConfig.MultiConfig, s.PipelineInfo().EventPool, s.productFunc) @@ -155,8 +146,7 @@ func (s *Source) ProductLoop(productFunc api.ProductFunc) { }) s.ackChainHandler.StartTask(s.ackTask) } - s.watchTask = NewWatchTask(s.Epoch(), s.PipelineName(), s.Name(), s.config.CollectConfig, s.PipelineInfo().EventPool, s.productFunc, s.r.jobChan) - s.watchTask = NewWatchTask(s.epoch, s.pipelineName, s.name, s.config.CollectConfig, s.eventPool, s.productFunc, s.r.jobChan, s.config.Fields) + s.watchTask = NewWatchTask(s.Epoch(), s.PipelineName(), s.Name(), s.config.CollectConfig, s.PipelineInfo().EventPool, s.productFunc, s.r.jobChan, s.config.Fields) // start watch source paths s.watcher.StartWatchTask(s.watchTask) } From 6afd3632852a239e8dd770b06e59ed9de2657dfd Mon Sep 17 00:00:00 2001 From: mmaxiaolei <412171946@qq.com> Date: Fri, 21 Jan 2022 14:55:57 +0800 Subject: [PATCH 3/4] refactor --- go.mod | 1 + pkg/core/source/abstract/source.go | 51 ++++++++++++++++++++---------- pkg/source/dev/source.go | 10 ++---- pkg/source/file/source.go | 19 +++++------ vendor/modules.txt | 1 + 5 files changed, 49 insertions(+), 33 deletions(-) diff --git a/go.mod b/go.mod index ffc8e3c3f..c165f420a 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/xhit/go-str2duration/v2 v2.0.0 go.uber.org/automaxprocs v0.0.0-20200415073007-b685be8c1c23 golang.org/x/net v0.0.0-20220111093109-d55c255bac03 + golang.org/x/time v0.0.0-20191024005414-555d28b269f0 google.golang.org/grpc v1.33.2 google.golang.org/protobuf v1.26.0-rc.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 diff --git a/pkg/core/source/abstract/source.go b/pkg/core/source/abstract/source.go index 450add4f5..f0f0f3584 100644 --- a/pkg/core/source/abstract/source.go +++ b/pkg/core/source/abstract/source.go @@ -9,18 +9,17 @@ import ( ) type Source struct { - done chan struct{} name string typeName api.Type eventPool *event.Pool pipelineInfo pipeline.Info context api.Context + productFunc api.ProductFunc - EventProvider func() api.Event - - startFunc func() - stopFunc func() - commitFunc func(events []api.Event) + startFunc func() + stopFunc func() + commitFunc func(events []api.Event) + internalProductFunc func() } func ExtendsAbstractSource(info pipeline.Info, typeName api.Type) *Source { @@ -28,9 +27,6 @@ func ExtendsAbstractSource(info pipeline.Info, typeName api.Type) *Source { typeName: typeName, eventPool: info.EventPool, pipelineInfo: info, - EventProvider: func() api.Event { - return info.EventPool.Get() - }, } } @@ -54,10 +50,27 @@ func (as *Source) PipelineInfo() pipeline.Info { return as.pipelineInfo } -func (as *Source) Event() api.Event { +func (as *Source) NewEvent() api.Event { return as.eventPool.Get() } +// ProductFunc only use in DoProduct() +func (as *Source) ProductFunc() api.ProductFunc { + return as.productFunc +} + +// Send only use in DoProduct() +func (as *Source) Send(e api.Event) api.Result { + return as.productFunc(e) +} + +// SendWithBody only use in DoProduct() +func (as *Source) SendWithBody(body []byte) api.Result { + e := as.NewEvent() + e.Fill(e.Meta(), e.Header(), body) + return as.Send(e) +} + // ------------------------------------------------------------------------ // implement methods of api.Source // do not override @@ -90,7 +103,6 @@ func (as *Source) Start() { func (as *Source) Stop() { log.Info("start stop source: %s", as.String()) - close(as.done) if as.stopFunc != nil { as.stopFunc() } @@ -108,6 +120,14 @@ func (as *Source) Commit(events []api.Event) { as.eventPool.PutAll(events) } +func (as *Source) ProductLoop(productFunc api.ProductFunc) { + as.productFunc = productFunc + log.Info("[%s] start product loop", as.String()) + if as.internalProductFunc != nil { + go as.internalProductFunc() + } +} + // ------------------------------------------------------------------------ // optional override methods // ------------------------------------------------------------------------ @@ -117,19 +137,16 @@ func (as *Source) Config() interface{} { return nil } -func (as *Source) ProductLoop(productFunc api.ProductFunc) { -} - func (as *Source) DoStart() { - } func (as *Source) DoStop() { +} +func (as *Source) DoProduct() { } func (as *Source) DoCommit(events []api.Event) { - } // ------------------------------------------------------------------------ @@ -147,6 +164,7 @@ type SourceConvert interface { DoStart() DoStop() + DoProduct() DoCommit(events []api.Event) } @@ -158,6 +176,7 @@ func SourceRegister(t api.Type, factory SourceRegisterFactory) { source := convert.AbstractSource() source.startFunc = convert.DoStart source.stopFunc = convert.DoStop + source.internalProductFunc = convert.DoProduct source.commitFunc = convert.DoCommit return convert }) diff --git a/pkg/source/dev/source.go b/pkg/source/dev/source.go index 691b1d37a..1a23a690b 100644 --- a/pkg/source/dev/source.go +++ b/pkg/source/dev/source.go @@ -20,8 +20,6 @@ import ( "context" "math/rand" - "github.com/loggie-io/loggie/pkg/core/api" - "github.com/loggie-io/loggie/pkg/core/log" "github.com/loggie-io/loggie/pkg/core/source/abstract" "github.com/loggie-io/loggie/pkg/pipeline" "golang.org/x/time/rate" @@ -63,15 +61,11 @@ func (d *Dev) DoStart() { } } -func (d *Dev) ProductLoop(productFunc api.ProductFunc) { +func (d *Dev) DoProduct() { ctx := context.Background() - log.Info("%s start product loop", d.String()) content := d.content for !d.stop { - header := make(map[string]interface{}) - e := d.Event() - e.Fill(e.Meta(), header, content) d.limiter.Wait(ctx) - productFunc(e) + d.SendWithBody(content) } } diff --git a/pkg/source/file/source.go b/pkg/source/file/source.go index 07042bcd0..419fbebcf 100644 --- a/pkg/source/file/source.go +++ b/pkg/source/file/source.go @@ -31,17 +31,15 @@ func init() { } func makeSource(info pipeline.Info) abstract.SourceConvert { - s := &Source{ + return &Source{ Source: abstract.ExtendsAbstractSource(info, Type), config: &Config{}, } - return s } type Source struct { *abstract.Source config *Config - productFunc api.ProductFunc r *Reader ackEnable bool ackChainHandler *AckChainHandler @@ -58,6 +56,10 @@ type Source struct { // override methods // ------------------------------------------------------------------------ +func (s *Source) Config() interface{} { + return s.config +} + func (s *Source) DoStart() { s.ackEnable = s.config.AckConfig.Enable // init default multi agg timeout @@ -132,13 +134,12 @@ func (s *Source) DoCommit(events []api.Event) { } } -func (s *Source) ProductLoop(productFunc api.ProductFunc) { - log.Info("[%s] start product loop", s.String()) - s.productFunc = productFunc +func (s *Source) DoProduct() { + productFunc := s.ProductFunc() if s.config.ReaderConfig.MultiConfig.Active { - s.mTask = NewMultiTask(s.Epoch(), s.Name(), s.config.ReaderConfig.MultiConfig, s.PipelineInfo().EventPool, s.productFunc) + s.mTask = NewMultiTask(s.Epoch(), s.Name(), s.config.ReaderConfig.MultiConfig, s.PipelineInfo().EventPool, productFunc) s.multilineProcessor.StartTask(s.mTask) - s.productFunc = s.multilineProcessor.Process + productFunc = s.multilineProcessor.Process } if s.config.AckConfig.Enable { s.ackTask = NewAckTask(s.Epoch(), s.PipelineName(), s.Name(), func(state *State) { @@ -146,7 +147,7 @@ func (s *Source) ProductLoop(productFunc api.ProductFunc) { }) s.ackChainHandler.StartTask(s.ackTask) } - s.watchTask = NewWatchTask(s.Epoch(), s.PipelineName(), s.Name(), s.config.CollectConfig, s.PipelineInfo().EventPool, s.productFunc, s.r.jobChan, s.config.Fields) + s.watchTask = NewWatchTask(s.Epoch(), s.PipelineName(), s.Name(), s.config.CollectConfig, s.PipelineInfo().EventPool, productFunc, s.r.jobChan, s.config.Fields) // start watch source paths s.watcher.StartWatchTask(s.watchTask) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 3ec39205c..1bf135fbe 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -237,6 +237,7 @@ golang.org/x/text/unicode/bidi golang.org/x/text/unicode/norm golang.org/x/text/width # golang.org/x/time v0.0.0-20191024005414-555d28b269f0 +## explicit golang.org/x/time/rate # golang.org/x/tools v0.0.0-20200616133436-c1934b75d054 golang.org/x/tools/go/ast/astutil From 3baa7b83f6723b9afecd00ac17556f036c8113ef Mon Sep 17 00:00:00 2001 From: mmaxiaolei <412171946@qq.com> Date: Fri, 21 Jan 2022 15:10:56 +0800 Subject: [PATCH 4/4] refactor kafka source --- pkg/source/kafka/kafka.go | 51 +++++++++++---------------------------- 1 file changed, 14 insertions(+), 37 deletions(-) diff --git a/pkg/source/kafka/kafka.go b/pkg/source/kafka/kafka.go index bf981950d..e02ea37b9 100644 --- a/pkg/source/kafka/kafka.go +++ b/pkg/source/kafka/kafka.go @@ -19,6 +19,7 @@ package kafka import ( "context" "fmt" + "github.com/loggie-io/loggie/pkg/core/source/abstract" "regexp" "sync" "time" @@ -28,7 +29,6 @@ import ( "github.com/segmentio/kafka-go/topics" "github.com/loggie-io/loggie/pkg/core/api" - "github.com/loggie-io/loggie/pkg/core/event" "github.com/loggie-io/loggie/pkg/core/log" "github.com/loggie-io/loggie/pkg/pipeline" ) @@ -36,47 +36,29 @@ import ( const Type = "kafka" func init() { - pipeline.Register(api.SOURCE, Type, makeSource) + abstract.SourceRegister(Type, makeSource) } -func makeSource(info pipeline.Info) api.Component { +func makeSource(info pipeline.Info) abstract.SourceConvert { return &Source{ - done: make(chan struct{}), - config: &Config{}, - eventPool: info.EventPool, + done: make(chan struct{}), + config: &Config{}, } } type Source struct { - name string + *abstract.Source done chan struct{} closeOnce sync.Once config *Config consumer *kafka.Reader - eventPool *event.Pool } func (k *Source) Config() interface{} { return k.config } -func (k *Source) Category() api.Category { - return api.SOURCE -} - -func (k *Source) Type() api.Type { - return Type -} - -func (k *Source) String() string { - return fmt.Sprintf("%s/%s", api.SOURCE, Type) -} - -func (k *Source) Init(context api.Context) { - k.name = context.Name() -} - -func (k *Source) Start() { +func (k *Source) DoStart() { topicRegx, err := regexp.Compile(k.config.Topic) if err != nil { log.Error("compile kafka topic regex %s error: %s", k.config.Topic, err.Error()) @@ -119,7 +101,7 @@ func (k *Source) Start() { k.consumer = kafka.NewReader(readerCfg) } -func (k *Source) Stop() { +func (k *Source) DoStop() { k.closeOnce.Do(func() { if k.consumer != nil { err := k.consumer.Close() @@ -132,7 +114,7 @@ func (k *Source) Stop() { }) } -func (k *Source) ProductLoop(productFunc api.ProductFunc) { +func (k *Source) DoProduct() { log.Info("%s start product loop", k.String()) for { @@ -141,7 +123,7 @@ func (k *Source) ProductLoop(productFunc api.ProductFunc) { return default: - err := k.consume(productFunc) + err := k.consume() if err != nil { log.Error("%+v", err) } @@ -149,7 +131,7 @@ func (k *Source) ProductLoop(productFunc api.ProductFunc) { } } -func (k *Source) consume(productFunc api.ProductFunc) error { +func (k *Source) consume() error { if k.consumer == nil { return fmt.Errorf("kakfa consumer not initialized yet") } @@ -168,11 +150,8 @@ func (k *Source) consume(productFunc api.ProductFunc) error { } } - e := k.eventPool.Get() + e := k.NewEvent() header := e.Header() - if header == nil { - header = make(map[string]interface{}) - } header["kafka"] = map[string]interface{}{ "offset": msg.Offset, "partition": msg.Partition, @@ -185,11 +164,11 @@ func (k *Source) consume(productFunc api.ProductFunc) error { } header["@timestamp"] = time.Now().Format(time.RFC3339) e.Fill(e.Meta(), header, msg.Value) - productFunc(e) + k.Send(e) return nil } -func (k *Source) Commit(events []api.Event) { +func (k *Source) DoCommit(events []api.Event) { // commit when sink ack if !k.config.EnableAutoCommit { var msgs []kafka.Message @@ -224,6 +203,4 @@ func (k *Source) Commit(events []api.Event) { log.Error("consumer manually commit messgage error: %v", err) } } - - k.eventPool.PutAll(events) }