diff --git a/.docker/Dockerfile b/.docker/Dockerfile index 7eff902..cfd1486 100644 --- a/.docker/Dockerfile +++ b/.docker/Dockerfile @@ -1,4 +1,4 @@ -FROM registry:5000/gobase:1.12.9-alpine3.10 AS gobuild +FROM registry:5000/gobase:1.13.0-alpine3.10 AS gobuild ENV GO111MODULE=on WORKDIR /go-fluentd diff --git a/.docker/README.md b/.docker/README.md index 311e978..d5f295a 100644 --- a/.docker/README.md +++ b/.docker/README.md @@ -17,8 +17,8 @@ go mod download go mod vendor # build base image -docker build . -f ./.docker/gobase.Dockerfile -t registry:5000/gobase:1.12.9-alpine3.10 -docker push registry:5000/gobase:1.12.9-alpine3.10 +docker build . -f ./.docker/gobase.Dockerfile -t registry:5000/gobase:1.13.0-alpine3.10 +docker push registry:5000/gobase:1.13.0-alpine3.10 # build image docker build . -f ./.docker/Dockerfile -t registry:5000/go-fluentd:1.8.2 diff --git a/.docker/gobase.Dockerfile b/.docker/gobase.Dockerfile index 6dd428e..316e9f0 100644 --- a/.docker/gobase.Dockerfile +++ b/.docker/gobase.Dockerfile @@ -1,6 +1,6 @@ -# docker build . -f ./.docker/gobase.Dockerfile -t registry:5000/gobase:1.12.9-alpine3.10 -# docker push registry:5000/gobase:1.12.9-alpine3.10 -FROM golang:1.12.9-alpine3.10 +# docker build . -f ./.docker/gobase.Dockerfile -t registry:5000/gobase:1.13.0-alpine3.10 +# docker push registry:5000/gobase:1.13.0-alpine3.10 +FROM golang:1.13.0-alpine3.10 # run dependencies RUN apk update && apk upgrade && \ diff --git a/.docker/test.Dockerfile b/.docker/test.Dockerfile index ecb59a9..69e8536 100644 --- a/.docker/test.Dockerfile +++ b/.docker/test.Dockerfile @@ -1,8 +1,10 @@ # docker build . -f ./.docker/test.Dockerfile -t registry:5000/go-fluentd-test:v1 # docker push registry:5000/go-fluentd-test:v1 -FROM registry:5000/gobase:1.12.9-alpine3.10 +FROM registry:5000/gobase:1.13.0-alpine3.10 ENV GO111MODULE=on + + WORKDIR /go-fluentd COPY go.mod . COPY go.sum . diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index f890d10..f3000bc 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -47,7 +47,7 @@ build:app: # - public # image: registry:5000/docker:latest # script: -# - docker pull registry:5000/gobase:1.12.9-alpine3.10 # pull latest image +# - docker pull registry:5000/gobase:1.13.0-alpine3.10 # pull latest image # - docker build . -f ./.docker/test.Dockerfile -t go-fluentd-test:$DOCKER_TAG # retry: 1 diff --git a/CHANGELOG.md b/CHANGELOG.md index e17f933..b382117 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,22 @@ *CURRENT* --- +- 2019-09-05 (Laisky) ci: upgrade go-utils v1.7.5 +- 2019-09-05 (Laisky) fix: shrink deps +- 2019-09-05 (Laisky) ci: upgrade go-utils v1.7.4 +- 2019-09-04 (Laisky) fix: upgrade go-utils +- 2019-09-04 (Laisky) fix: add context in postpipeline +- 2019-09-04 (Laisky) build: upgrade go v1.13.0 +- 2019-09-04 (Laisky) fix: add context in accptor +- 2019-09-04 (Laisky) fix: add context in acceptorPipeline +- 2019-09-04 (Laisky) fix: add context in producer +- 2019-09-04 (Laisky) fix: refactor tagPipeline +- 2019-09-03 (Laisky) ci: upgrade go-utils v1.7.3 +- 2019-09-03 (Laisky) docs: update changelog + +*v1.10.8* +--- + - 2019-09-03 (Laisky) fix: ctx.Done return *v1.10.7* diff --git a/acceptor.go b/acceptor.go index 9be2cea..176d5e5 100644 --- a/acceptor.go +++ b/acceptor.go @@ -1,6 +1,7 @@ package concator import ( + "context" "fmt" "sync" @@ -45,7 +46,7 @@ func NewAcceptor(cfg *AcceptorCfg, recvs ...recvs.AcceptorRecvItf) *Acceptor { // Run starting acceptor to listening and receive messages, // you can use `acceptor.MessageChan()` to load messages` -func (a *Acceptor) Run() { +func (a *Acceptor) Run(ctx context.Context) { // got exists max id from legacy utils.Logger.Info("process legacy data...") maxID, err := a.Journal.LoadMaxID() @@ -64,7 +65,7 @@ func (a *Acceptor) Run() { recv.SetSyncOutChan(a.syncOutChan) recv.SetMsgPool(a.MsgPool) recv.SetCounter(couter) - go recv.Run() + go recv.Run(ctx) } } diff --git a/acceptorFilters/pipeline.go b/acceptorFilters/pipeline.go index 420710e..374fa53 100644 --- a/acceptorFilters/pipeline.go +++ b/acceptorFilters/pipeline.go @@ -1,6 +1,7 @@ package acceptorFilters import ( + "context" "fmt" "sync" "time" @@ -78,7 +79,7 @@ func (f *AcceptorPipeline) DiscardMsg(msg *libs.FluentMsg) { f.MsgPool.Put(msg) } -func (f *AcceptorPipeline) Wrap(asyncInChan, syncInChan chan *libs.FluentMsg) (outChan, skipDumpChan chan *libs.FluentMsg) { +func (f *AcceptorPipeline) Wrap(ctx context.Context, asyncInChan, syncInChan chan *libs.FluentMsg) (outChan, skipDumpChan chan *libs.FluentMsg) { outChan = make(chan *libs.FluentMsg, f.OutChanSize) skipDumpChan = make(chan *libs.FluentMsg, f.OutChanSize) @@ -92,11 +93,13 @@ func (f *AcceptorPipeline) Wrap(asyncInChan, syncInChan chan *libs.FluentMsg) (o filter AcceptorFilterItf msg *libs.FluentMsg ) - defer utils.Logger.Panic("quit acceptorPipeline asyncChan", zap.String("msg", fmt.Sprint(msg))) + defer utils.Logger.Info("quit acceptorPipeline asyncChan", zap.String("last_msg", fmt.Sprint(msg))) + NEXT_ASYNC_MSG: for { - NEXT_ASYNC_MSG: select { + case <-ctx.Done(): + return case msg = <-f.reEnterChan: // CAUTION: do not put msg into reEnterChan forever case msg = <-asyncInChan: } @@ -112,7 +115,7 @@ func (f *AcceptorPipeline) Wrap(asyncInChan, syncInChan chan *libs.FluentMsg) (o for _, filter = range f.filters { if msg = filter.Filter(msg); msg == nil { // quit filters for this msg - goto NEXT_ASYNC_MSG + continue NEXT_ASYNC_MSG } } @@ -132,9 +135,15 @@ func (f *AcceptorPipeline) Wrap(asyncInChan, syncInChan chan *libs.FluentMsg) (o filter AcceptorFilterItf msg *libs.FluentMsg ) - defer utils.Logger.Panic("quit acceptorPipeline syncChan", zap.String("msg", fmt.Sprint(msg))) + defer utils.Logger.Info("quit acceptorPipeline syncChan", zap.String("last_msg", fmt.Sprint(msg))) - for msg = range syncInChan { + NEXT_SYNC_MSG: + for { + select { + case <-ctx.Done(): + return + case msg = <-syncInChan: + } // utils.Logger.Debug("AcceptorPipeline got blockable msg") f.counter.Count() @@ -148,12 +157,11 @@ func (f *AcceptorPipeline) Wrap(asyncInChan, syncInChan chan *libs.FluentMsg) (o if msg = filter.Filter(msg); msg == nil { // quit filters for this msg // do not discard in pipeline // filter can make decision to bypass or discard msg - goto NEXT_SYNC_MSG + continue NEXT_SYNC_MSG } } outChan <- msg - NEXT_SYNC_MSG: } }() diff --git a/controllor.go b/controllor.go index 80de64e..a4c3db1 100644 --- a/controllor.go +++ b/controllor.go @@ -19,8 +19,8 @@ import ( "github.com/Laisky/zap" ) -const ( - ctxKey utils.CtxKeyT = "key" +var ( + ctxKey = utils.CtxKeyT{} ) // Controllor is an IoC that manage all roles @@ -121,28 +121,29 @@ func (c *Controllor) initRecvs(env string) []recvs.AcceptorRecvItf { MaxAllowedAheadSec: utils.Settings.GetDuration("settings.acceptor.recvs.plugins."+name+".max_allowed_ahead_sec") * time.Second, })) case "kafka": - receivers = append(receivers, recvs.NewKafkaRecv(&recvs.KafkaCfg{ - KMsgPool: sharingKMsgPool, - Meta: utils.FallBack( - func() interface{} { - return utils.Settings.Get("settings.acceptor.recvs.plugins." + name + ".meta").(map[string]interface{}) - }, map[string]interface{}{}).(map[string]interface{}), - Name: name, - MsgKey: utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".msg_key"), - Brokers: utils.Settings.GetStringSlice("settings.acceptor.recvs.plugins." + name + ".brokers." + env), - Topics: []string{utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".topics." + env)}, - Group: utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".groups." + env), - Tag: utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".tags." + env), - IsJSONFormat: utils.Settings.GetBool("settings.acceptor.recvs.plugins." + name + ".is_json_format"), - TagKey: utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".tag_key"), - JSONTagKey: utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".json_tag_key"), - RewriteTag: recvs.GetKafkaRewriteTag(utils.Settings.GetString("settings.acceptor.recvs.plugins."+name+".rewrite_tag"), env), - NConsumer: utils.Settings.GetInt("settings.acceptor.recvs.plugins." + name + ".nconsumer"), - KafkaCommitCfg: &recvs.KafkaCommitCfg{ - IntervalNum: utils.Settings.GetInt("settings.acceptor.recvs.plugins." + name + ".interval_num"), - IntervalDuration: utils.Settings.GetDuration("settings.acceptor.recvs.plugins."+name+".interval_sec") * time.Second, - }, - })) + kafkaCfg := recvs.NewKafkaCfg() + kafkaCfg.KMsgPool = sharingKMsgPool + kafkaCfg.Meta = utils.FallBack( + func() interface{} { + return utils.Settings.Get("settings.acceptor.recvs.plugins." + name + ".meta").(map[string]interface{}) + }, map[string]interface{}{}).(map[string]interface{}) + kafkaCfg.Name = name + kafkaCfg.MsgKey = utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".msg_key") + kafkaCfg.Brokers = utils.Settings.GetStringSlice("settings.acceptor.recvs.plugins." + name + ".brokers." + env) + kafkaCfg.Topics = []string{utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".topics." + env)} + kafkaCfg.Group = utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".groups." + env) + kafkaCfg.Tag = utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".tags." + env) + kafkaCfg.IsJSONFormat = utils.Settings.GetBool("settings.acceptor.recvs.plugins." + name + ".is_json_format") + kafkaCfg.TagKey = utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".tag_key") + kafkaCfg.JSONTagKey = utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".json_tag_key") + kafkaCfg.RewriteTag = recvs.GetKafkaRewriteTag(utils.Settings.GetString("settings.acceptor.recvs.plugins."+name+".rewrite_tag"), env) + kafkaCfg.NConsumer = utils.Settings.GetInt("settings.acceptor.recvs.plugins." + name + ".nconsumer") + kafkaCfg.KafkaCommitCfg = &recvs.KafkaCommitCfg{ + IntervalNum: utils.Settings.GetInt("settings.acceptor.recvs.plugins." + name + ".interval_num"), + IntervalDuration: utils.Settings.GetDuration("settings.acceptor.recvs.plugins."+name+".interval_sec") * time.Second, + } + + receivers = append(receivers, recvs.NewKafkaRecv(kafkaCfg)) default: utils.Logger.Panic("unknown recv type", zap.String("recv_type", utils.Settings.GetString("settings.acceptor.recvs.plugins."+name+".type")), @@ -171,7 +172,7 @@ func (c *Controllor) initAcceptor(journal *Journal, receivers []recvs.AcceptorRe receivers..., ) - acceptor.Run() + acceptor.Run(c.ctx) return acceptor } @@ -290,7 +291,7 @@ func (c *Controllor) initTagPipeline(env string, waitCommitChan chan<- *libs.Flu })}, fs...) } - return tagFilters.NewTagPipeline(&tagFilters.TagPipelineCfg{ + return tagFilters.NewTagPipeline(c.ctx, &tagFilters.TagPipelineCfg{ MsgPool: c.msgPool, CommitedChan: waitCommitChan, DefaultInternalChanSize: utils.Settings.GetInt("settings.tag_filters.internal_chan_size"), @@ -306,7 +307,7 @@ func (c *Controllor) initDispatcher(waitDispatchChan chan *libs.FluentMsg, tagPi NFork: utils.Settings.GetInt("settings.dispatcher.nfork"), OutChanSize: utils.Settings.GetInt("settings.dispatcher.out_chan_size"), }) - dispatcher.Run() + dispatcher.Run(c.ctx) return dispatcher } @@ -485,8 +486,15 @@ func (c *Controllor) initProducer(env string, waitProduceChan chan *libs.FluentM ) } -func (c *Controllor) runHeartBeat() { +func (c *Controllor) runHeartBeat(ctx context.Context) { + defer utils.Logger.Info("heartbeat exit") for { + select { + case <-ctx.Done(): + return + default: + } + utils.Logger.Info("heartbeat", zap.Int("goroutine", runtime.NumGoroutine()), ) @@ -509,7 +517,7 @@ func (c *Controllor) Run() { waitCommitChan := journal.GetCommitChan() waitAccepPipelineSyncChan := acceptor.GetSyncOutChan() waitAccepPipelineAsyncChan := acceptor.GetAsyncOutChan() - waitDumpChan, skipDumpChan := acceptorPipeline.Wrap(waitAccepPipelineAsyncChan, waitAccepPipelineSyncChan) + waitDumpChan, skipDumpChan := acceptorPipeline.Wrap(c.ctx, waitAccepPipelineAsyncChan, waitAccepPipelineSyncChan) // after `journal.DumpMsgFlow`, every discarded msg should commit to waitCommitChan waitDispatchChan := journal.DumpMsgFlow(c.ctx, c.msgPool, waitDumpChan, skipDumpChan) @@ -518,12 +526,12 @@ func (c *Controllor) Run() { dispatcher := c.initDispatcher(waitDispatchChan, tagPipeline) waitPostPipelineChan := dispatcher.GetOutChan() postPipeline := c.initPostPipeline(env, waitCommitChan) - waitProduceChan := postPipeline.Wrap(waitPostPipelineChan) + waitProduceChan := postPipeline.Wrap(c.ctx, waitPostPipelineChan) producerSenders := c.initSenders(env) producer := c.initProducer(env, waitProduceChan, waitCommitChan, producerSenders) // heartbeat - go c.runHeartBeat() + go c.runHeartBeat(c.ctx) // monitor monitor.AddMetric("controllor", func() map[string]interface{} { @@ -549,6 +557,6 @@ func (c *Controllor) Run() { }) monitor.BindHTTP(server) - go producer.Run() + go producer.Run(c.ctx) RunServer(utils.Settings.GetString("addr")) } diff --git a/dispacher.go b/dispacher.go index 5d2e89c..4af67c2 100644 --- a/dispacher.go +++ b/dispacher.go @@ -1,8 +1,8 @@ package concator import ( + "context" "fmt" - "regexp" "sync" "time" @@ -15,24 +15,18 @@ import ( type DispatcherCfg struct { InChan chan *libs.FluentMsg - TagPipeline *tagFilters.TagPipeline + TagPipeline tagFilters.TagPipelineItf NFork, OutChanSize int } // Dispatcher dispatch messages by tag to different concator type Dispatcher struct { *DispatcherCfg - concatorMap *sync.Map // tag:msgchan - tagsCounter *sync.Map // tag:counter - outChan chan *libs.FluentMsg // skip concator, direct to producer - counter *utils.Counter -} - -// ConcatorFactoryItf interface of ConcatorFactory, -// decoupling with specific ConcatorFactory -type ConcatorFactoryItf interface { - Spawn(string, string, *regexp.Regexp) chan<- *libs.FluentMsg - MessageChan() chan *libs.FluentMsg + tag2Concator *sync.Map // tag:msgchan + tag2Counter *sync.Map // tag:counter + tag2Cancel *sync.Map // tag:cancel + outChan chan *libs.FluentMsg // skip concator, direct to producer + counter *utils.Counter } // NewDispatcher create new Dispatcher @@ -46,14 +40,15 @@ func NewDispatcher(cfg *DispatcherCfg) *Dispatcher { return &Dispatcher{ DispatcherCfg: cfg, outChan: make(chan *libs.FluentMsg, cfg.OutChanSize), - concatorMap: &sync.Map{}, - tagsCounter: &sync.Map{}, + tag2Concator: &sync.Map{}, + tag2Counter: &sync.Map{}, + tag2Cancel: &sync.Map{}, counter: utils.NewCounter(), } } // Run dispacher to dispatch messages to different concators -func (d *Dispatcher) Run() { +func (d *Dispatcher) Run(ctx context.Context) { utils.Logger.Info("run dispacher...") d.registerMonitor() lock := &sync.Mutex{} @@ -68,24 +63,43 @@ func (d *Dispatcher) Run() { counterI interface{} msg *libs.FluentMsg ) - defer utils.Logger.Panic("dispatcher exit with msg", zap.String("msg", fmt.Sprint(msg))) + // send each message to appropriate tagfilter by `tag` - for msg = range d.InChan { + for { + select { + case <-ctx.Done(): + utils.Logger.Info("dispatcher exit with msg", zap.String("msg", fmt.Sprint(msg))) + return + case msg = <-d.InChan: + } + d.counter.Count() - if inChanForEachTagi, ok = d.concatorMap.Load(msg.Tag); !ok { + if inChanForEachTagi, ok = d.tag2Concator.Load(msg.Tag); !ok { // create new inChanForEachTag lock.Lock() - if inChanForEachTagi, ok = d.concatorMap.Load(msg.Tag); !ok { // double check + if inChanForEachTagi, ok = d.tag2Concator.Load(msg.Tag); !ok { // double check // new tag, create new tagfilter and its inchan utils.Logger.Info("got new tag", zap.String("tag", msg.Tag)) - if inChanForEachTag, err = d.TagPipeline.Spawn(msg.Tag, d.outChan); err != nil { + ctx2Tag, cancel := context.WithCancel(ctx) + if inChanForEachTag, err = d.TagPipeline.Spawn(ctx2Tag, msg.Tag, d.outChan); err != nil { utils.Logger.Error("try to spawn new tagpipeline got error", zap.Error(err), zap.String("tag", msg.Tag)) + cancel() continue } else { - d.concatorMap.Store(msg.Tag, inChanForEachTag) - d.tagsCounter.Store(msg.Tag, utils.NewCounter()) + d.tag2Concator.Store(msg.Tag, inChanForEachTag) + d.tag2Counter.Store(msg.Tag, utils.NewCounter()) + d.tag2Cancel.Store(msg.Tag, cancel) + go func(tag string) { + <-ctx2Tag.Done() + utils.Logger.Info("remove tag in dispatcher", zap.String("tag", tag)) + lock.Lock() + d.tag2Concator.Delete(tag) + d.tag2Counter.Delete(tag) + d.tag2Cancel.Delete(tag) + lock.Unlock() + }(msg.Tag) } } else { inChanForEachTag = inChanForEachTagi.(chan<- *libs.FluentMsg) @@ -97,7 +111,7 @@ func (d *Dispatcher) Run() { } // count - if counterI, ok = d.tagsCounter.Load(msg.Tag); !ok { + if counterI, ok = d.tag2Counter.Load(msg.Tag); !ok { utils.Logger.Panic("counter must exists", zap.String("tag", msg.Tag)) } counterI.(*utils.Counter).Count() @@ -120,14 +134,14 @@ func (d *Dispatcher) registerMonitor() { "msgPerSec": utils.Round(float64(d.counter.Get())/(time.Now().Sub(lastT).Seconds()), .5, 1), } d.counter.Set(0) - d.tagsCounter.Range(func(tagi interface{}, ci interface{}) bool { + d.tag2Counter.Range(func(tagi interface{}, ci interface{}) bool { metrics[tagi.(string)+".MsgPerSec"] = utils.Round(float64(ci.(*utils.Counter).Get())/(time.Now().Sub(lastT).Seconds()), .5, 1) ci.(*utils.Counter).Set(0) return true }) lastT = time.Now() - d.concatorMap.Range(func(tagi interface{}, ci interface{}) bool { + d.tag2Concator.Range(func(tagi interface{}, ci interface{}) bool { metrics[tagi.(string)+".ChanLen"] = len(ci.(chan<- *libs.FluentMsg)) metrics[tagi.(string)+".ChanCap"] = cap(ci.(chan<- *libs.FluentMsg)) return true diff --git a/go.mod b/go.mod index 0ae9724..a170c3a 100644 --- a/go.mod +++ b/go.mod @@ -1,20 +1,28 @@ -module pateo.com/go-fluentd +module github.com/Laisky/go-fluentd go 1.12 require ( - github.com/Depado/ginprom v1.1.2 - github.com/Laisky/go-fluentd v1.10.6 github.com/Laisky/go-syslog v2.3.3+incompatible - github.com/Laisky/go-utils v1.7.2 + github.com/Laisky/go-utils v1.7.5 github.com/Laisky/zap v1.9.2 + github.com/OneOfOne/xxhash v1.2.5 // indirect github.com/Shopify/sarama v1.22.1 github.com/cespare/xxhash v1.1.0 github.com/gin-contrib/pprof v1.2.0 github.com/gin-gonic/gin v1.4.0 + github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31 // indirect + github.com/go-logfmt/logfmt v0.4.0 // indirect + github.com/golang/protobuf v1.3.2 // indirect github.com/json-iterator/go v1.1.6 + github.com/pelletier/go-toml v1.4.0 // indirect + github.com/pierrec/lz4 v2.0.5+incompatible // indirect github.com/pkg/errors v0.8.1 + github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a // indirect + github.com/spf13/afero v1.2.2 // indirect + github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.3 github.com/spf13/viper v1.4.0 github.com/tinylib/msgp v1.1.0 + github.com/ugorji/go v1.1.7 // indirect ) diff --git a/go.sum b/go.sum index d5c3f83..4534175 100644 --- a/go.sum +++ b/go.sum @@ -5,22 +5,12 @@ github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798 h1:2T/jmrHeTezcCM58 github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= github.com/DataDog/zstd v1.3.8 h1:wMrT3Ulre3EsZQi6lPUYWFoA/+fPTW2hYc+GxtXjQEg= github.com/DataDog/zstd v1.3.8/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= -github.com/Depado/ginprom v1.1.2 h1:oD7wQrCkLj+mzBbwBO1iqkprQUt8bTpYR8EagH82lW4= -github.com/Depado/ginprom v1.1.2/go.mod h1:OPTeCiC3QLr6QueEA0UNChKGWyxKOqmC4k4WUJUh5lg= -github.com/Joker/hpp v0.0.0-20180418125244-6893e659854a h1:PiDAizhfJbwZMISZ1Itx1ZTFeOFCml89Ofmz3V8rhoU= -github.com/Joker/hpp v0.0.0-20180418125244-6893e659854a/go.mod h1:MzD2WMdSxvbHw5fM/OXOFily/lipJWRc9C1px0Mt0ZE= -github.com/Joker/jade v1.0.0 h1:lOCEPvTAtWfLpSZYMOv/g44MGQFAolbKh2khHHGu0Kc= -github.com/Joker/jade v1.0.0/go.mod h1:efZIdO0py/LtcJRSa/j2WEklMSAw84WV0zZVMxNToB8= -github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Laisky/go-chaining v0.0.0-20180507092046-43dcdc5a21be h1:7Rxhm6IjOtDAyj8eScOFntevwzkWhx94zi48lxo4m4w= github.com/Laisky/go-chaining v0.0.0-20180507092046-43dcdc5a21be/go.mod h1:1mdzaETo0kjvCQICPSePsoaatJN4l7JvEA1200lyevo= -github.com/Laisky/go-fluentd v1.10.6 h1:5H38VpCOd0ydQmtQdmptuZGLp3IJevK2XQDKhQEGPLM= -github.com/Laisky/go-fluentd v1.10.6/go.mod h1:rQO40AxzljNgzGdD9PW6KuG5VqOCNQMZ6x4qNTaS3z0= github.com/Laisky/go-syslog v2.3.3+incompatible h1:TSHhP3iadAPDzC5efyYLPnGkv2pvUtuUInm7poVRkFA= github.com/Laisky/go-syslog v2.3.3+incompatible/go.mod h1:PPmESkLU3DEbJ3fRXam2hqJTNQVFMggsDXBnOtu2ITk= -github.com/Laisky/go-utils v1.6.4/go.mod h1:YjXldX4UO0Mr0JdwMEAVgWT2M9bKwcki2LIF5GgF1T0= -github.com/Laisky/go-utils v1.7.2 h1:fdGlB+7ioKh4LKYpR7e1fzCVI9LKdIIk+ytPlws5nrE= -github.com/Laisky/go-utils v1.7.2/go.mod h1:YjXldX4UO0Mr0JdwMEAVgWT2M9bKwcki2LIF5GgF1T0= +github.com/Laisky/go-utils v1.7.5 h1:/JYfZwIpzs88ml1WAFJtBQMCOw0Wz9sGvcDQBzrrHw8= +github.com/Laisky/go-utils v1.7.5/go.mod h1:E38nNGyiDATLe8KWFGlV6AoJwTGk8x+Uf6p5hZVL28Q= github.com/Laisky/zap v1.9.2 h1:7dTtABboHk8DnT0d6Dc8A9Opu2cyIEaMlL9JO11zvag= github.com/Laisky/zap v1.9.2/go.mod h1:CQdLb2wEfqBvoNLmfOp7wnKTOMvhc4DQRc3xfshL4EQ= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= @@ -29,45 +19,26 @@ github.com/OneOfOne/xxhash v1.2.5 h1:zl/OfRA6nftbBK9qTohYBJ5xvw6C/oNKizR7cZGl3cI github.com/OneOfOne/xxhash v1.2.5/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/RoaringBitmap/roaring v0.4.18 h1:nh8Ngxctxt5QAoMLuR7MHJe4jEqpn+EnsdgDWPryQWo= github.com/RoaringBitmap/roaring v0.4.18/go.mod h1:D3qVegWTmfCaX4Bl5CrBE9hfrSrrXIr8KVNvRsDi1NI= -github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398 h1:WDC6ySpJzbxGWFh4aMxFFC28wwGp5pEuoTtvA4q/qQ4= -github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= github.com/Shopify/sarama v1.22.1 h1:exyEsKLGyCsDiqpV5Lr4slFi8ev2KiM3cP1KZ6vnCQ0= github.com/Shopify/sarama v1.22.1/go.mod h1:FRzlvRpMFO/639zY1SDxUxkqH97Y0ndM5CbGj6oG3As= github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= -github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU= -github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= -github.com/appleboy/gofight/v2 v2.0.0+incompatible h1:mfTmEhmo9BZnzAtFpRL8MEkAxuWEzfJ3Gpgn6Zuzgw4= -github.com/appleboy/gofight/v2 v2.0.0+incompatible/go.mod h1:6E7pthKhmwss84j/zEixBNim8Q6ahhHcYOtmW5ts5vA= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/astaxie/beego v1.11.1/go.mod h1:i69hVzgauOPSw5qeyF4GVZhn7Od0yG5bbCGzmhbWxgQ= -github.com/aymerick/raymond v2.0.2+incompatible h1:VEp3GpgdAnv9B2GFyTvqgcKvY+mfKMjPOA3SbKLtnU0= -github.com/aymerick/raymond v2.0.2+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= -github.com/beego/goyaml2 v0.0.0-20130207012346-5545475820dd/go.mod h1:1b+Y/CofkYwXMUU0OhQqGvsY2Bvgr4j6jfT699wyZKQ= -github.com/beego/x2j v0.0.0-20131220205130-a0352aadc542/go.mod h1:kSeGC/p1AbBiEp5kat81+DSQrZenVBZXklMLaELspWU= -github.com/belogik/goes v0.0.0-20151229125003-e54d722c3aff/go.mod h1:PhH1ZhyCzHKt4uAasyx+ljRCgoezetRNf59CUtwUkqY= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= -github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737/go.mod h1:PmM6Mmwb0LSuEubjR8N7PtNe1KxZLtOUHtbeikc5h60= github.com/bsm/sarama-cluster v2.1.15+incompatible h1:RkV6WiNRnqEEbp81druK8zYhmnIgdOjqSVi0+9Cnl2A= github.com/bsm/sarama-cluster v2.1.15+incompatible/go.mod h1:r7ao+4tTNXvWm+VRpRJchr2kQhqxgmAp2iEX5W96gMM= -github.com/casbin/casbin v1.7.0/go.mod h1:c67qKN6Oum3UF5Q1+BByfFxkwKvhwW57ITjqwtzR1KE= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= -github.com/couchbase/go-couchbase v0.0.0-20181122212707-3e9b6e1258bb/go.mod h1:TWI8EKQMs5u5jLKW/tsb9VwauIrMIxQG1r5fMsswK5U= -github.com/couchbase/gomemcached v0.0.0-20181122193126-5125a94a666c/go.mod h1:srVSlQLB8iXBVXHgnqemxUXqN6FCvClgCMPCsjBDR7c= -github.com/couchbase/goutils v0.0.0-20180530154633-e865a1461c8a/go.mod h1:BQwMFlJzDjFDG3DJUdU0KORxn88UlsOULuxLExMh3Hs= -github.com/cupcake/rdb v0.0.0-20161107195141-43ba34106c76/go.mod h1:vYwsqCOLxGiisLwp9rITslkFNpZD5rz43tf41QFkTWY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -80,18 +51,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= -github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= -github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 h1:clC1lXBpe2kTj2VHdaIu9ajZQe4kcEY9j0NsnDDBZ3o= -github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385/go.mod h1:0vRUJqYpeSZifjYj7uP3BG/gKcuzL9xWVV/Y+cK33KM= -github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= -github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= -github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= -github.com/flosch/pongo2 v0.0.0-20190505152737-8914e1cf9164 h1:/HMcOGZC5Bi8JPgfbwz13ELWn/91+vY59HXS3z0qY5w= -github.com/flosch/pongo2 v0.0.0-20190505152737-8914e1cf9164/go.mod h1:tbAXHifHQWNSpWbiJHpJTZH5fi3XHhDMdP//vuz9WS4= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424 h1:Vh7rylVZRZCj6W41lRlP17xPk4Nq260H4Xo/DDYmEZk= -github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424/go.mod h1:vmp8DIyckQMXOPl0AQVHt+7n5h7Gb7hS6CUydiV8QeA= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gin-contrib/pprof v1.2.0 h1:tmuopx+/U4t4ImlPfWDpMcZF0x+egx0iA10e4/rR+PE= github.com/gin-contrib/pprof v1.2.0/go.mod h1:gOv+/B5pK9Hm3wuppCDM+WUwmGff5bwenVHS9/qpYYk= @@ -106,13 +67,9 @@ github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2/go.mod github.com/glycerine/goconvey v0.0.0-20180728074245-46e3a41ad493/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24= github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31 h1:gclg6gY70GLy3PbkQ1AERPfmLMMagS60DKF78eWwLn8= github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24= -github.com/go-check/check v1.0.0-20180628173108-788fd7840127 h1:3dbHpVjNKf7Myfit4Xmw4BA0JbCt47OJPhMQ5w8O3E8= -github.com/go-check/check v1.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= -github.com/go-redis/redis v6.14.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= -github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= @@ -124,20 +81,13 @@ github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk= -github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= -github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= -github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= -github.com/gorilla/pat v0.0.0-20180118222023-199c85a7f6d1/go.mod h1:YeAe0gNeiNT5hoiZRI4yiOky6jVdNvfO2N6Kav/HmxY= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= @@ -146,45 +96,16 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/imkira/go-interpol v1.1.0 h1:KIiKr0VSG2CUW1hl1jpiyuzuJeKUUpC8iM1AIE7N1Vk= -github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA= -github.com/iris-contrib/blackfriday v2.0.0+incompatible h1:o5sHQHHm0ToHUlAJSTjW9UWicjJSDDauOOQ2AHuIVp4= -github.com/iris-contrib/blackfriday v2.0.0+incompatible/go.mod h1:UzZ2bDEoaSGPbkg6SAB4att1aAwTmVIx/5gCVqeyUdI= -github.com/iris-contrib/formBinder v0.0.0-20190104093907-fbd5963f41e1 h1:7GsNnSLoVceNylMpwcfy5aFNz/S5/TV25crb34I5PEo= -github.com/iris-contrib/formBinder v0.0.0-20190104093907-fbd5963f41e1/go.mod h1:i8kTYUOEstd/S8TG0ChTXQdf4ermA/e8vJX0+QruD9w= -github.com/iris-contrib/go.uuid v2.0.0+incompatible h1:XZubAYg61/JwnJNbZilGjf3b3pB80+OQg2qf6c8BfWE= -github.com/iris-contrib/go.uuid v2.0.0+incompatible/go.mod h1:iz2lgM/1UnEf1kP0L/+fafWORmlnuysV2EMP8MW+qe0= -github.com/iris-contrib/httpexpect v0.0.0-20180314041918-ebe99fcebbce h1:q8Ka/exfHNgK7izJE+aUOZd7KZXJ7oQbnJWiZakEiMo= -github.com/iris-contrib/httpexpect v0.0.0-20180314041918-ebe99fcebbce/go.mod h1:VER17o2JZqquOx41avolD/wMGQSFEFBKWmhag9/RQRY= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= -github.com/juju/errors v0.0.0-20181118221551-089d3ea4e4d5 h1:rhqTjzJlm7EbkELJDKMTU7udov+Se0xZkWmugr6zGok= -github.com/juju/errors v0.0.0-20181118221551-089d3ea4e4d5/go.mod h1:W54LbzXuIE0boCoNJfwqpmkKJ1O4TCTZMetAt6jGk7Q= -github.com/juju/loggo v0.0.0-20180524022052-584905176618 h1:MK144iBQF9hTSwBW/9eJm034bVoG30IshVm688T2hi8= -github.com/juju/loggo v0.0.0-20180524022052-584905176618/go.mod h1:vgyd7OREkbtVEN/8IXZe5Ooef3LQePvuBm9UWj6ZL8U= -github.com/juju/testing v0.0.0-20180920084828-472a3e8b2073 h1:WQM1NildKThwdP7qWrNAFGzp4ijNLw8RlgENkaI4MJs= -github.com/juju/testing v0.0.0-20180920084828-472a3e8b2073/go.mod h1:63prj8cnj0tU0S9OHjGJn+b1h0ZghCndfnbQolrYTwA= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= -github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM= -github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k= -github.com/kataras/golog v0.0.0-20180321173939-03be10146386 h1:VT6AeCHO/mc+VedKBMhoqb5eAK8B1i9F6nZl7EGlHvA= -github.com/kataras/golog v0.0.0-20180321173939-03be10146386/go.mod h1:PcaEvfvhGsqwXZ6S3CgCbmjcp+4UDUh2MIfF2ZEul8M= -github.com/kataras/iris v11.1.1+incompatible h1:c2iRKvKLpTYMXKdVB8YP/+A67NtZFt9kFFy+ZwBhWD0= -github.com/kataras/iris v11.1.1+incompatible/go.mod h1:ki9XPua5SyAJbIxDdsssxevgGrbpBmmvoQmo/A0IodY= -github.com/kataras/pio v0.0.0-20190103105442-ea782b38602d h1:V5Rs9ztEWdp58oayPq/ulmlqJJZeJP6pP79uP3qjcao= -github.com/kataras/pio v0.0.0-20190103105442-ea782b38602d/go.mod h1:NV88laa9UiiDuX9AhMbDPkGYSPugBOV6yTZB1l2K9Z0= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.7.1 h1:VRD0WLa8rweLB7alA5WMSVkoAtrI8xou5RrNd4JUlR0= -github.com/klauspost/compress v1.7.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= -github.com/klauspost/compress v1.7.4 h1:4UqAIzZ1Ns2epCTyJ1d2xMWvxtX+FNSCYWeOFogK9nc= -github.com/klauspost/compress v1.7.4/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= -github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w= -github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= @@ -192,37 +113,22 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/labstack/echo v3.3.10+incompatible h1:pGRcYk231ExFAyoAjAfD85kQzRJCRI8bbnE7CX5OEgg= -github.com/labstack/echo v3.3.10+incompatible/go.mod h1:0INS7j/VjnFxD4E2wkz67b8cVwCLbBmJyDaka6Cmk1s= -github.com/labstack/gommon v0.2.8 h1:JvRqmeZcfrHC5u6uVleB4NxxNbzx6gpbJiQknDbKQu0= -github.com/labstack/gommon v0.2.8/go.mod h1:/tj9csK2iPSBvn+3NLM9e52usepMtrd5ilFYA+wQNJ4= -github.com/labstack/gommon v0.2.9 h1:heVeuAYtevIQVYkGj6A41dtfT91LrvFG220lavpWhrU= -github.com/labstack/gommon v0.2.9/go.mod h1:E8ZTmW9vw5az5/ZyHWCp0Lw4OH2ecsaBP1C/NKavGG4= -github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= -github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU= -github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.7 h1:UvyT9uN+3r7yLEYSlJsbQGdsaB/a0DlgWP3pql6iwOc= github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.8 h1:HLtExJ+uU2HOZ+wI0Tt5DtUDrx8yhUqDcp7fYERX4CE= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= -github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= -github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/microcosm-cc/bluemonday v1.0.2 h1:5lPfLTTAvAbtS0VqT+94yOtFnGfUWYyx0+iToC3Os3s= -github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc= github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/moul/http2curl v1.0.0 h1:dRMWoAtb+ePxMlLkrCbAqh4TlPHXvoGUSQ323/9Zahs= -github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ= github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae h1:VeRdUYdCw49yizlSbMEn2SZ+gT+3IUKx8BqxyQdz+BY= github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae/go.mod h1:qAyveg+e4CE+eKJXWVjKXM4ck2QobLqTDytGJbLLhJg= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= @@ -255,7 +161,6 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1: github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= -github.com/prometheus/common v0.0.0-20181120120127-aeab699e26f4/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.4.0 h1:7etb9YClo3a6HjLzfl6rIQaU+FDfi0VSX39io3aQ+DM= github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw= @@ -271,15 +176,7 @@ github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40T github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= -github.com/ryanuber/columnize v2.1.0+incompatible h1:j1Wcmh8OrK4Q7GXY+V7SVSY8nUWQxHW5TkBe7YUl+2s= -github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= -github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= -github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= -github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= -github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= -github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw= -github.com/siddontang/ledisdb v0.0.0-20181029004158-becf5f38d373/go.mod h1:mF1DpOSOUiJRMR+FDqaqu3EBqrybQtrDDszLUZ6oxPg= -github.com/siddontang/rdb v0.0.0-20150307021120-fc89ed2e418d/go.mod h1:AMEsy7v5z92TR1JKMkLLoaOQk++LVnOKL3ScbJ8GNGA= +github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= @@ -301,24 +198,11 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.4.0 h1:yXHLWeravcrgGyFSyCgdYpXQ9dR9c/WED3pg1RhxqEU= github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= -github.com/ssdb/gossdb v0.0.0-20180723034631-88f6b59b84ec/go.mod h1:QBvMkMya+gXctz3kmljlUCu/yB3GZ6oee+dUozsezQE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/syndtr/goleveldb v0.0.0-20181127023241-353a9fca669c/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0= -github.com/tidwall/gjson v1.2.1 h1:j0efZLrZUvNerEf6xqoi0NjWMK5YlLrR7Guo/dxY174= -github.com/tidwall/gjson v1.2.1/go.mod h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA= -github.com/tidwall/gjson v1.2.2 h1:DZhDNHhghPN1YawsV8qUna8ayu8+E95vbukEBThzoFU= -github.com/tidwall/gjson v1.2.2/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls= -github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc= -github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= -github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 h1:rQ229MBgvW68s1/g6f1/63TgYwYxfF4E+bi/KC19P8g= -github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= -github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= -github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tinylib/msgp v1.1.0 h1:9fQd+ICuRIu/ue4vxJZu6/LzxN0HwMds2nq/0cFvxHU= github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -333,31 +217,14 @@ github.com/ugorji/go/codec v1.1.5-pre h1:5YV9PsFAN+ndcCtTM7s60no7nY7eTG3LPtxhSwu github.com/ugorji/go/codec v1.1.5-pre/go.mod h1:tULtS6Gy1AE1yCENaw4Vb//HLH5njI2tfCQDUqRd8fI= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= -github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= -github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8= -github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= -github.com/wendal/errors v0.0.0-20130201093226-f66c77a7882b/go.mod h1:Q12BUT7DqIlHRmgv3RskH+UCM/4eqVMgI0EMmlSpAXc= github.com/willf/bitset v1.1.10 h1:NotGKqX0KwQ72NUzqrjZq5ipPNDQex9lo3WpaS8L2sc= github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= -github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c= -github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= -github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= -github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= -github.com/xeipuuv/gojsonschema v1.1.0 h1:ngVtJC9TY/lg0AA/1k48FYhBrhRoFlEmWzsehpNAaZg= -github.com/xeipuuv/gojsonschema v1.1.0/go.mod h1:5yf86TLmAcydyeJq5YvxkGPE2fm/u4myDekKRoLuqhs= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= -github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0 h1:6fRhSjgLCkTD3JnJxvaJ4Sj+TYblw757bqYgZaOq5ZY= -github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0/go.mod h1:/LWChgwKmvncFJFHJ7Gvn9wZArjbV5/FppcK2fKk/tI= -github.com/yudai/gojsondiff v1.0.0 h1:27cbfqXLVEJ1o8I6v3y9lg8Ydm53EKqHXAOMxEGlCOA= -github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg= -github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3IfnEUduWvb9is428/nNb5L3U01M= -github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM= -github.com/yudai/pp v2.0.1+incompatible h1:Q4//iY4pNF6yPLZIigmvcl7k/bPgrcTPIFIcmawg5bI= -github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZkTdatxwunjIkc= +github.com/zsais/go-gin-prometheus v0.1.0 h1:bkLv1XCdzqVgQ36ScgRi09MA2UC1t3tAB6nsfErsGO4= +github.com/zsais/go-gin-prometheus v0.1.0/go.mod h1:Slirjzuz8uM8Cw0jmPNqbneoqcUtY2GGjn2bEd4NRLY= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -365,7 +232,6 @@ go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20181127143415-eb0de9b17e85/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8 h1:1wopBVtVdWnn03fZelqdXTqk7U7zPQCb+T4rbU9ZEoU= @@ -374,7 +240,6 @@ golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTk golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -406,7 +271,6 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20181221001348-537d06c36207/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= diff --git a/journal.go b/journal.go index 3cc8ac5..e5dba9a 100644 --- a/journal.go +++ b/journal.go @@ -94,7 +94,13 @@ func (j *Journal) CloseTag(tag string) error { if fi, ok := j.tag2CtxCancelMap.Load(tag); !ok { return fmt.Errorf("tag %v not exists in tag2CtxCancelMap", tag) } else { + j.jjLock.Lock() fi.(func())() + j.tag2JMap.Delete(tag) + j.tag2JJInchanMap.Delete(tag) + j.tag2JJCommitChanMap.Delete(tag) + j.tag2CtxCancelMap.Delete(tag) + j.jjLock.Unlock() } return nil @@ -144,7 +150,7 @@ func (j *Journal) ProcessLegacyMsg(dumpChan chan *libs.FluentMsg) (maxID int64, if !j.legacyLock.TryLock() { return 0, fmt.Errorf("another legacy is running") } - defer j.legacyLock.ForceRealse() + defer j.legacyLock.ForceRelease() utils.Logger.Debug("starting to process legacy data...") var ( @@ -242,17 +248,6 @@ func (j *Journal) createJournalRunner(ctx context.Context, tag string) { if _, ok = j.tag2CtxCancelMap.LoadOrStore(tag, cancel); ok { utils.Logger.Panic("tag already exists in tag2CtxCancelMap", zap.String("tag", tag)) } - go func(ctx context.Context) { - <-ctx.Done() - j.jjLock.Lock() - defer j.jjLock.Unlock() - - utils.Logger.Info("close journal for tag", zap.String("tag", tag)) - j.tag2JMap.Delete(tag) - j.tag2JJInchanMap.Delete(tag) - j.tag2JJCommitChanMap.Delete(tag) - j.tag2CtxCancelMap.Delete(tag) - }(ctxForTag) jcfg := journal.NewConfig() jcfg.BufDirPath = j.baseJournalCfg.BufDirPath @@ -397,11 +392,11 @@ func (j *Journal) ConvertMsg2Buf(msg *libs.FluentMsg, data *map[string]interface func (j *Journal) DumpMsgFlow(ctx context.Context, msgPool *sync.Pool, dumpChan, skipDumpChan chan *libs.FluentMsg) chan *libs.FluentMsg { // deal with legacy go func() { + defer utils.Logger.Info("legacy processor exit") var err error for { // try to starting legacy loading select { case <-ctx.Done(): - utils.Logger.Info("legacy processor exit") return default: } @@ -415,10 +410,10 @@ func (j *Journal) DumpMsgFlow(ctx context.Context, msgPool *sync.Pool, dumpChan, // start periodic gc go func() { + defer utils.Logger.Info("gc runner exit") for { select { case <-ctx.Done(): - utils.Logger.Info("gc runner exit") return default: } @@ -430,11 +425,11 @@ func (j *Journal) DumpMsgFlow(ctx context.Context, msgPool *sync.Pool, dumpChan, // deal with msgs that skip dump go func() { + defer utils.Logger.Info("skipDumpChan goroutine exit") var msg *libs.FluentMsg for { select { case <-ctx.Done(): - utils.Logger.Info("skipDumpChan goroutine exit") return case msg = <-skipDumpChan: } @@ -444,6 +439,7 @@ func (j *Journal) DumpMsgFlow(ctx context.Context, msgPool *sync.Pool, dumpChan, }() go func() { + defer utils.Logger.Info("legacy dumper exit") var ( ok bool jji interface{} @@ -452,7 +448,6 @@ func (j *Journal) DumpMsgFlow(ctx context.Context, msgPool *sync.Pool, dumpChan, for { select { case <-ctx.Done(): - utils.Logger.Info("legacy dumper exit") return case msg = <-dumpChan: } @@ -485,6 +480,7 @@ func (j *Journal) GetCommitChan() chan<- *libs.FluentMsg { func (j *Journal) startCommitRunner(ctx context.Context) { go func() { + defer utils.Logger.Info("id commitor exit") var ( ok bool chani interface{} @@ -493,7 +489,6 @@ func (j *Journal) startCommitRunner(ctx context.Context) { for { select { case <-ctx.Done(): - utils.Logger.Info("id commitor exit") return case msg = <-j.commitChan: } diff --git a/postFilters/pipeline.go b/postFilters/pipeline.go index f4999f0..3017d0c 100644 --- a/postFilters/pipeline.go +++ b/postFilters/pipeline.go @@ -1,9 +1,12 @@ package postFilters import ( + "context" "fmt" "sync" + "github.com/Laisky/zap" + "github.com/Laisky/go-fluentd/libs" "github.com/Laisky/go-utils" ) @@ -42,33 +45,35 @@ func NewPostPipeline(cfg *PostPipelineCfg, filters ...PostFilterItf) *PostPipeli return pp } -func (f *PostPipeline) Wrap(inChan chan *libs.FluentMsg) (outChan chan *libs.FluentMsg) { +func (f *PostPipeline) Wrap(ctx context.Context, inChan chan *libs.FluentMsg) (outChan chan *libs.FluentMsg) { outChan = make(chan *libs.FluentMsg, f.OutChanSize) for i := 0; i < f.NFork; i++ { - go func() { - defer panic(fmt.Errorf("quit postPipeline")) + go func(i int) { + defer utils.Logger.Info("quit postPipeline", zap.Int("i", i)) var ( filter PostFilterItf msg *libs.FluentMsg ) + NEW_MSG: for { - NEW_MSG: select { + case <-ctx.Done(): + return case msg = <-f.reEnterChan: case msg = <-inChan: } for _, filter = range f.filters { if msg = filter.Filter(msg); msg == nil { // quit filters for this msg - goto NEW_MSG + continue NEW_MSG } } outChan <- msg } - }() + }(i) } return outChan diff --git a/producer.go b/producer.go index 874cff3..147f5d1 100644 --- a/producer.go +++ b/producer.go @@ -1,6 +1,7 @@ package concator import ( + "context" "fmt" "sync" "time" @@ -22,7 +23,8 @@ type ProducerCfg struct { // Producer send messages to downstream type Producer struct { *ProducerCfg - producerTagChanMap *sync.Map + sync.Mutex + tag2SenderChan *sync.Map senders []senders.SenderItf discardChan, discardWithoutCommitChan chan *libs.FluentMsg // discardMsgCountMap = map[&msg]discardedCount @@ -35,6 +37,10 @@ type Producer struct { discardMsgCountMap *sync.Map counter *utils.Counter pMsgPool *sync.Pool // pending msg pool + + unSupportedTags *sync.Map + tag2NSender *sync.Map // map[tag]nSender + tag2Cancel *sync.Map // map[tag]nSender } type ProducerPendingDiscardMsg struct { @@ -54,7 +60,7 @@ func NewProducer(cfg *ProducerCfg, senders ...senders.SenderItf) *Producer { p := &Producer{ ProducerCfg: cfg, senders: senders, - producerTagChanMap: &sync.Map{}, + tag2SenderChan: &sync.Map{}, discardChan: make(chan *libs.FluentMsg, cfg.DiscardChanSize), discardWithoutCommitChan: make(chan *libs.FluentMsg, cfg.DiscardChanSize), discardMsgCountMap: &sync.Map{}, @@ -66,6 +72,10 @@ func NewProducer(cfg *ProducerCfg, senders ...senders.SenderItf) *Producer { } }, }, + + unSupportedTags: &sync.Map{}, + tag2NSender: &sync.Map{}, // map[tag]nSender + tag2Cancel: &sync.Map{}, // map[tag]nSender } p.registerMonitor() @@ -90,7 +100,7 @@ func (p *Producer) registerMonitor() { p.counter.Set(0) lastT = time.Now() - p.producerTagChanMap.Range(func(tagi, smi interface{}) bool { + p.tag2SenderChan.Range(func(tagi, smi interface{}) bool { smi.(*sync.Map).Range(func(si, ci interface{}) bool { metrics[tagi.(string)+"."+si.(senders.SenderItf).GetName()+".ChanLen"] = len(ci.(chan<- *libs.FluentMsg)) metrics[tagi.(string)+"."+si.(senders.SenderItf).GetName()+".ChanCap"] = cap(ci.(chan<- *libs.FluentMsg)) @@ -123,7 +133,8 @@ func (p *Producer) DiscardMsg(pmsg *ProducerPendingDiscardMsg) { p.pMsgPool.Put(pmsg) } -func (p *Producer) RunMsgCollector(nSenderForTagMap *sync.Map, discardChan chan *libs.FluentMsg) { +func (p *Producer) RunMsgCollector(ctx context.Context, tag2NSender *sync.Map, discardChan chan *libs.FluentMsg) { + defer utils.Logger.Info("msg collector exit") var ( cntToDiscard int ok, isCommit bool @@ -134,6 +145,8 @@ func (p *Producer) RunMsgCollector(nSenderForTagMap *sync.Map, discardChan chan for { select { + case <-ctx.Done(): + return case msg = <-p.discardChan: isCommit = true case msg = <-p.discardWithoutCommitChan: @@ -141,8 +154,8 @@ func (p *Producer) RunMsgCollector(nSenderForTagMap *sync.Map, discardChan chan } // ⚠️Notify: Do not change tag in any sender - if itf, ok = nSenderForTagMap.Load(msg.Tag); !ok { - utils.Logger.Error("[panic] nSenderForTagMap should contains tag", + if itf, ok = tag2NSender.Load(msg.Tag); !ok { + utils.Logger.Panic("[panic] tag2NSender should contains tag", zap.String("tag", msg.Tag), zap.String("msg", fmt.Sprint(msg))) cntToDiscard = 1 @@ -174,68 +187,79 @@ func (p *Producer) RunMsgCollector(nSenderForTagMap *sync.Map, discardChan chan } // Run starting Producer to send messages -func (p *Producer) Run() { +func (p *Producer) Run(ctx context.Context) { utils.Logger.Info("start producer") - var ( - unSupportedTags = &sync.Map{} - nSenderForTagMap = &sync.Map{} // map[tag]nSender - ) - - go p.RunMsgCollector(nSenderForTagMap, p.discardChan) + go p.RunMsgCollector(ctx, p.tag2NSender, p.discardChan) for i := 0; i < p.NFork; i++ { - go func() { + go func(i int) { + defer utils.Logger.Info("producer exit", zap.Int("i", i)) var ( ok, isSkip bool s senders.SenderItf itf interface{} - senderChanMap *sync.Map // sender: chan + sender2Inchan *sync.Map // sender: chan nSender int + msg *libs.FluentMsg ) - for msg := range p.InChan { + for { + select { + case <-ctx.Done(): + return + case msg = <-p.InChan: + } + // utils.Logger.Info(fmt.Sprintf("send msg %p", msg)) p.counter.Count() - if _, ok = unSupportedTags.Load(msg.Tag); ok { + if _, ok = p.unSupportedTags.Load(msg.Tag); ok { utils.Logger.Warn("do not produce since of unsupported tag", zap.String("tag", msg.Tag)) p.discardChan <- msg continue } msg.Message["msgid"] = msg.Id // set id - if _, ok = p.producerTagChanMap.Load(msg.Tag); !ok { - // create sender chans for new tag - isSkip = true - nSender = 0 - senderChanMap = &sync.Map{} - for _, s = range p.senders { - if s.IsTagSupported(msg.Tag) { - isSkip = false - nSender++ - utils.Logger.Info("spawn new producer sender", - zap.String("name", s.GetName()), - zap.String("tag", msg.Tag)) - senderChanMap.Store(s, s.Spawn(msg.Tag)) + if _, ok = p.tag2SenderChan.Load(msg.Tag); !ok { + p.Lock() + if _, ok = p.tag2SenderChan.Load(msg.Tag); !ok { // double check + // create sender chans for new tag + isSkip = true + nSender = 0 + sender2Inchan = &sync.Map{} + ctx2Tag, cancel := context.WithCancel(ctx) + for _, s = range p.senders { + if s.IsTagSupported(msg.Tag) { + isSkip = false + nSender++ + utils.Logger.Info("spawn new producer sender", + zap.String("name", s.GetName()), + zap.String("tag", msg.Tag)) + sender2Inchan.Store(s, s.Spawn(ctx2Tag, msg.Tag)) + } } - } - if isSkip { - // no sender support this tag - utils.Logger.Warn("do not produce since of unsupported tag", zap.String("tag", msg.Tag)) - nSenderForTagMap.Store(msg.Tag, 1) - unSupportedTags.Store(msg.Tag, struct{}{}) // mark as unsupported - p.discardChan <- msg - continue - } + if isSkip { + // no sender support this tag + utils.Logger.Warn("do not produce since of unsupported tag", zap.String("tag", msg.Tag)) + p.tag2NSender.Store(msg.Tag, 1) + p.unSupportedTags.Store(msg.Tag, struct{}{}) // mark as unsupported + p.discardChan <- msg + cancel() + p.Unlock() + continue + } - utils.Logger.Info("register the number of senders for tag", - zap.String("tag", msg.Tag), - zap.Int("n", nSender)) - nSenderForTagMap.Store(msg.Tag, nSender) - p.producerTagChanMap.Store(msg.Tag, senderChanMap) + utils.Logger.Info("register the number of senders for tag", + zap.String("tag", msg.Tag), + zap.Int("n", nSender)) + p.tag2Cancel.Store(msg.Tag, cancel) + p.tag2NSender.Store(msg.Tag, nSender) + p.tag2SenderChan.Store(msg.Tag, sender2Inchan) + } + p.Unlock() } - if itf, ok = p.producerTagChanMap.Load(msg.Tag); !ok { - utils.Logger.Error("[panic] producerTagChanMap should contains tag", zap.String("tag", msg.Tag)) + if itf, ok = p.tag2SenderChan.Load(msg.Tag); !ok { + utils.Logger.Panic("[panic] tag2SenderChan should contains tag", zap.String("tag", msg.Tag)) continue } @@ -260,6 +284,6 @@ func (p *Producer) Run() { return true }) } - }() + }(i) } } diff --git a/recvs/base.go b/recvs/base.go index 9ce0a1b..752c082 100644 --- a/recvs/base.go +++ b/recvs/base.go @@ -6,6 +6,7 @@ package recvs import ( + "context" "sync" "github.com/Laisky/go-fluentd/libs" @@ -24,7 +25,7 @@ type AcceptorRecvItf interface { SetAsyncOutChan(chan<- *libs.FluentMsg) SetMsgPool(*sync.Pool) SetCounter(libs.CounterIft) - Run() + Run(context.Context) GetName() string } diff --git a/recvs/fluentd.go b/recvs/fluentd.go index 745c4ba..617476f 100644 --- a/recvs/fluentd.go +++ b/recvs/fluentd.go @@ -2,6 +2,7 @@ package recvs import ( "bytes" + "context" "fmt" "io" "net" @@ -84,7 +85,7 @@ func (r *FluentdRecv) GetName() string { } // Run starting this recv -func (r *FluentdRecv) Run() { +func (r *FluentdRecv) Run(ctx context.Context) { utils.Logger.Info("run FluentdRecv") var ( conn net.Conn @@ -96,7 +97,14 @@ func (r *FluentdRecv) Run() { utils.Logger.Error("try to bind addr got error", zap.Error(err)) } + defer utils.Logger.Info("fluentd recv exit") for { + select { + case <-ctx.Done(): + return + default: + } + conn, err = ln.Accept() if err != nil { utils.Logger.Error("try to accept connection got error", zap.Error(err)) @@ -281,6 +289,7 @@ func (r *FluentdRecv) closeConcatCtx(ctx *concatCtx) { for _, ctx.pmsg = range ctx.identifier2LastMsg { r.SendMsg(ctx.pmsg.msg) + r.pendingMsgPool.Put(ctx.pmsg) } } diff --git a/recvs/fluentd_test.go b/recvs/fluentd_test.go index 5c836b7..a0d5c0c 100644 --- a/recvs/fluentd_test.go +++ b/recvs/fluentd_test.go @@ -1,6 +1,7 @@ package recvs_test import ( + "context" "net" "testing" "time" @@ -32,7 +33,7 @@ func TestFluentdRecv(t *testing.T) { recv.SetSyncOutChan(syncOutChan) go func() { - recv.Run() + recv.Run(context.Background()) }() time.Sleep(100 * time.Millisecond) cnt := 0 diff --git a/recvs/http.go b/recvs/http.go index 5fa7de1..9ef8640 100644 --- a/recvs/http.go +++ b/recvs/http.go @@ -1,6 +1,7 @@ package recvs import ( + "context" "crypto/md5" "encoding/hex" "fmt" @@ -78,7 +79,7 @@ func (r *HTTPRecv) GetName() string { } // Run useless, just capatable for RecvItf -func (r *HTTPRecv) Run() { +func (r *HTTPRecv) Run(ctx context.Context) { utils.Logger.Info("run HTTPRecv") } diff --git a/recvs/kafka.go b/recvs/kafka.go index 3f2a0ea..b1f12dc 100644 --- a/recvs/kafka.go +++ b/recvs/kafka.go @@ -1,6 +1,7 @@ package recvs import ( + "context" "sync" "time" @@ -11,6 +12,10 @@ import ( "github.com/pkg/errors" ) +const ( + defaultKafkaReconnectInterval = 1 * time.Hour +) + func GetKafkaRewriteTag(rewriteTag, env string) string { if rewriteTag == "" { return "" @@ -24,6 +29,12 @@ type KafkaCommitCfg struct { IntervalDuration time.Duration } +func NewKafkaCfg() *KafkaCfg { + return &KafkaCfg{ + ReconnectInterval: defaultKafkaReconnectInterval, + } +} + /*KafkaCfg kafka client configuration Args: @@ -33,9 +44,9 @@ Args: Name: name of this recv plugin KMsgPool: sync.Pool for `*utils.kafka.KafkaMsg` Meta: add new field and value into `msg.Message` - IsJSONFormat: is unmarshal kafka msg JSONTagKey: load tag from kafka message(only work when IsJSONFormat is true) RewriteTag: rewrite `msg.Tag`, `msg.Message["tag"]` will keep origin value + ReconnectInterval: restart consumer periodically */ type KafkaCfg struct { *KafkaCommitCfg @@ -47,6 +58,7 @@ type KafkaCfg struct { IsJSONFormat bool JSONTagKey string RewriteTag string + ReconnectInterval time.Duration } type KafkaRecv struct { @@ -58,6 +70,9 @@ func NewKafkaRecv(cfg *KafkaCfg) *KafkaRecv { if cfg.MsgKey == "" && !cfg.IsJSONFormat { utils.Logger.Panic("at least set one of MsgKey and IsJSONFormat") } + if cfg.ReconnectInterval < defaultKafkaReconnectInterval { + utils.Logger.Warn("ReconnectInterval too small", zap.Duration("ReconnectInterval", cfg.ReconnectInterval)) + } utils.Logger.Info("create KafkaRecv", zap.Strings("topics", cfg.Topics), @@ -76,13 +91,19 @@ func (r *KafkaRecv) GetName() string { return r.Name } -func (r *KafkaRecv) Run() { +func (r *KafkaRecv) Run(ctx context.Context) { utils.Logger.Info("run KafkaRecv") for i := 0; i < r.NConsumer; i++ { go func(i int) { - defer utils.Logger.Panic("kafka reciver exit", zap.Int("n", i)) + defer utils.Logger.Info("kafka reciver exit", zap.Int("n", i)) for { - cli, err := kafka.NewKafkaCliWithGroupId(&kafka.KafkaCliCfg{ + select { + case <-ctx.Done(): + return + default: + } + + cli, err := kafka.NewKafkaCliWithGroupId(ctx, &kafka.KafkaCliCfg{ Brokers: r.Brokers, Topics: r.Topics, Groupid: r.Group, @@ -103,10 +124,25 @@ func (r *KafkaRecv) Run() { zap.String("group", r.Group)) var ( - kmsg *kafka.KafkaMsg - msg *libs.FluentMsg + ok bool + kmsg *kafka.KafkaMsg + msg *libs.FluentMsg + ctx2Consumer, cancal = context.WithTimeout(ctx, r.ReconnectInterval) ) - for kmsg = range cli.Messages() { // receive new kmsg, and convert to fluent msg + + CONSUMER_LOOP: + for { // receive new kmsg, and convert to fluent msg + select { + case <-ctx2Consumer.Done(): + break CONSUMER_LOOP + case kmsg, ok = <-cli.Messages(ctx): + if !ok { + utils.Logger.Info("consumer break") + cancal() + break CONSUMER_LOOP + } + } + utils.Logger.Debug("got new message from kafka", zap.Int("n", i), zap.ByteString("msg", kmsg.Message), @@ -123,7 +159,6 @@ func (r *KafkaRecv) Run() { r.syncOutChan <- msg // blockable cli.CommitWithMsg(kmsg) } - cli.Close() } }(i) diff --git a/recvs/rsyslog.go b/recvs/rsyslog.go index 17c52f5..44af00d 100644 --- a/recvs/rsyslog.go +++ b/recvs/rsyslog.go @@ -1,8 +1,11 @@ package recvs import ( + "context" "time" + "github.com/Laisky/go-syslog/format" + "github.com/Laisky/go-fluentd/libs" "github.com/Laisky/go-syslog" "github.com/Laisky/go-utils" @@ -43,17 +46,27 @@ func (r *RsyslogRecv) GetName() string { return r.Name } -func (r *RsyslogRecv) Run() { +func (r *RsyslogRecv) Run(ctx context.Context) { utils.Logger.Info("Run RsyslogRecv") go func() { - defer utils.Logger.Panic("rsyslog reciver exit", zap.String("name", r.GetName())) + defer utils.Logger.Info("rsyslog reciver exit", zap.String("name", r.GetName())) var ( - err error - msg *libs.FluentMsg - tag = "emqtt." + r.Env + err error + msg *libs.FluentMsg + tag = "emqtt." + r.Env + logPart format.LogParts + ctx2Srv context.Context + cancel func() ) for { + select { + case <-ctx.Done(): + return + default: + } + + ctx2Srv, cancel = context.WithCancel(ctx) srv, inchan := NewRsyslogSrv(r.Addr) utils.Logger.Info("listening rsyslog", zap.String("addr", r.Addr)) if err = srv.Boot(&syslog.BLBCfg{ @@ -61,10 +74,27 @@ func (r *RsyslogRecv) Run() { SYN: "hello", }); err != nil { utils.Logger.Error("try to start rsyslog server got error", zap.Error(err)) + cancel() continue } + go func(srv *syslog.Server, cancel func()) { + srv.Wait() + cancel() + }(srv, cancel) + + LOG_LOOP: + for { + select { + case <-ctx2Srv.Done(): + utils.Logger.Info("rsyslog server exit") + break LOG_LOOP + case logPart = <-inchan: + if logPart == nil { + utils.Logger.Info("rsyslog channel closed") + break LOG_LOOP + } + } - for logPart := range inchan { switch logPart[r.TimeKey].(type) { case time.Time: logPart[r.NewTimeKey] = logPart[r.TimeKey].(time.Time).UTC().Format(r.NewTimeFormat) diff --git a/senders/base.go b/senders/base.go index 692c41b..6882150 100644 --- a/senders/base.go +++ b/senders/base.go @@ -1,17 +1,19 @@ package senders import ( + "context" "sync" "time" "github.com/Laisky/go-fluentd/libs" + utils "github.com/Laisky/go-utils" jsoniter "github.com/json-iterator/go" ) var json = jsoniter.ConfigCompatibleWithStandardLibrary type SenderItf interface { - Spawn(string) chan<- *libs.FluentMsg // Spawn(tag) inChan + Spawn(context.Context, string) chan<- *libs.FluentMsg // Spawn(tag) inChan IsTagSupported(string) bool DiscardWhenBlocked() bool GetName() string @@ -33,8 +35,14 @@ type BaseSender struct { IsDiscardWhenBlocked bool } -func (s *BaseSender) runFlusher(inChan chan *libs.FluentMsg) { +func (s *BaseSender) runFlusher(ctx context.Context, inChan chan *libs.FluentMsg) { + defer utils.Logger.Info("flusher exit") for { + select { + case <-ctx.Done(): + return + default: + } time.Sleep(3 * time.Second) inChan <- nil } diff --git a/senders/elasticsearch.go b/senders/elasticsearch.go index c6d9c6f..2e3f528 100644 --- a/senders/elasticsearch.go +++ b/senders/elasticsearch.go @@ -3,6 +3,7 @@ package senders import ( "bytes" "compress/gzip" + "context" "fmt" "io/ioutil" "net/http" @@ -92,35 +93,35 @@ func (s *ElasticSearchSender) getMsgStarting(msg *libs.FluentMsg) ([]byte, error return []byte("{\"index\": {\"_index\": \"" + index + "\", \"_type\": \"logs\"}}\n"), nil } -func (s *ElasticSearchSender) SendBulkMsgs(ctx *bulkOpCtx, msgs []*libs.FluentMsg) (err error) { - ctx.cnt = ctx.cnt[:0] - for _, ctx.msg = range msgs { - if ctx.starting, err = s.getMsgStarting(ctx.msg); err != nil { +func (s *ElasticSearchSender) SendBulkMsgs(bulkCtx *bulkOpCtx, msgs []*libs.FluentMsg) (err error) { + bulkCtx.cnt = bulkCtx.cnt[:0] + for _, bulkCtx.msg = range msgs { + if bulkCtx.starting, err = s.getMsgStarting(bulkCtx.msg); err != nil { utils.Logger.Warn("try to generate bulk index got error", zap.Error(err)) continue } - b, err := json.Marshal(ctx.msg.Message) + b, err := json.Marshal(bulkCtx.msg.Message) if err != nil { return errors.Wrap(err, "try to marshal messages got error") } utils.Logger.Debug("prepare bulk content send to es", - zap.ByteString("starting", ctx.starting), + zap.ByteString("starting", bulkCtx.starting), zap.ByteString("body", b)) - ctx.cnt = append(ctx.cnt, ctx.starting...) - ctx.cnt = append(ctx.cnt, b...) - ctx.cnt = append(ctx.cnt, '\n') + bulkCtx.cnt = append(bulkCtx.cnt, bulkCtx.starting...) + bulkCtx.cnt = append(bulkCtx.cnt, b...) + bulkCtx.cnt = append(bulkCtx.cnt, '\n') } - ctx.buf.Reset() - ctx.gzWriter.Reset(ctx.buf) - if _, err = ctx.gzWriter.Write(ctx.cnt); err != nil { + bulkCtx.buf.Reset() + bulkCtx.gzWriter.Reset(bulkCtx.buf) + if _, err = bulkCtx.gzWriter.Write(bulkCtx.cnt); err != nil { return errors.Wrap(err, "try to compress messages got error") } - ctx.gzWriter.Close() - req, err := http.NewRequest("POST", s.Addr, ctx.buf) + bulkCtx.gzWriter.Close() + req, err := http.NewRequest("POST", s.Addr, bulkCtx.buf) if err != nil { return errors.Wrap(err, "try to init es request got error") } @@ -191,14 +192,17 @@ func (s *ElasticSearchSender) checkResp(resp *http.Response) (err error) { return nil } -func (s *ElasticSearchSender) Spawn(tag string) chan<- *libs.FluentMsg { +func (s *ElasticSearchSender) Spawn(ctx context.Context, tag string) chan<- *libs.FluentMsg { utils.Logger.Info("SpawnForTag", zap.String("tag", tag)) inChan := make(chan *libs.FluentMsg, s.InChanSize) // for each tag - go s.runFlusher(inChan) + go s.runFlusher(ctx, inChan) for i := 0; i < s.NFork; i++ { // parallel to each tag - go func() { - defer utils.Logger.Error("producer exits", zap.String("tag", tag), zap.String("name", s.GetName())) + go func(i int) { + defer utils.Logger.Info("producer exits", + zap.String("tag", tag), + zap.Int("i", i), + zap.String("name", s.GetName())) var ( maxRetry = 3 @@ -208,16 +212,22 @@ func (s *ElasticSearchSender) Spawn(tag string) chan<- *libs.FluentMsg { iBatch = 0 lastT = time.Unix(0, 0) err error - ctx = &bulkOpCtx{ + bulkCtx = &bulkOpCtx{ cnt: []byte{}, } nRetry, j int ) - ctx.buf = &bytes.Buffer{} - ctx.gzWriter = gzip.NewWriter(ctx.buf) + bulkCtx.buf = &bytes.Buffer{} + bulkCtx.gzWriter = gzip.NewWriter(bulkCtx.buf) + + for { + select { + case <-ctx.Done(): + return + case msg = <-inChan: + } - for msg = range inChan { if msg != nil { msgBatch[iBatch] = msg iBatch++ @@ -246,7 +256,7 @@ func (s *ElasticSearchSender) Spawn(tag string) chan<- *libs.FluentMsg { } SEND_MSG: - if err = s.SendBulkMsgs(ctx, msgBatchDelivery); err != nil { + if err = s.SendBulkMsgs(bulkCtx, msgBatchDelivery); err != nil { nRetry++ if nRetry > maxRetry { utils.Logger.Error("try send message got error", @@ -269,7 +279,7 @@ func (s *ElasticSearchSender) Spawn(tag string) chan<- *libs.FluentMsg { s.discardChan <- msg } } - }() + }(i) } return inChan diff --git a/senders/fluentd.go b/senders/fluentd.go index 52ddeb0..ceabf58 100644 --- a/senders/fluentd.go +++ b/senders/fluentd.go @@ -1,6 +1,7 @@ package senders import ( + "context" "fmt" "net" "time" @@ -50,14 +51,17 @@ func (s *FluentSender) GetName() string { return s.Name } -func (s *FluentSender) Spawn(tag string) chan<- *libs.FluentMsg { +func (s *FluentSender) Spawn(ctx context.Context, tag string) chan<- *libs.FluentMsg { utils.Logger.Info("spawn for tag", zap.String("tag", tag)) inChan := make(chan *libs.FluentMsg, s.InChanSize) // for each tag - go s.runFlusher(inChan) + go s.runFlusher(ctx, inChan) for i := 0; i < s.NFork; i++ { // parallel to each tag - go func() { - defer utils.Logger.Panic("producer exits", zap.String("tag", tag), zap.String("name", s.GetName())) + go func(i int) { + defer utils.Logger.Info("producer exits", + zap.String("tag", tag), + zap.String("name", s.GetName()), + zap.Int("i", i)) var ( nRetry int @@ -83,7 +87,13 @@ func (s *FluentSender) Spawn(tag string) chan<- *libs.FluentMsg { zap.String("tag", tag)) encoder = libs.NewFluentEncoder(conn) // one encoder for each connection - for msg = range inChan { + for { + select { + case <-ctx.Done(): + return + case msg = <-inChan: + } + if msg != nil { msgBatch[iBatch] = msg iBatch++ @@ -145,7 +155,7 @@ func (s *FluentSender) Spawn(tag string) chan<- *libs.FluentMsg { s.discardChan <- msg } } - }() + }(i) } return inChan diff --git a/senders/httpforward.go b/senders/httpforward.go index 5304da6..d2663d6 100644 --- a/senders/httpforward.go +++ b/senders/httpforward.go @@ -1,6 +1,7 @@ package senders import ( + "context" "fmt" "net/http" "time" @@ -56,14 +57,17 @@ func (s *HTTPSender) GetName() string { return s.Name } -func (s *HTTPSender) Spawn(tag string) chan<- *libs.FluentMsg { +func (s *HTTPSender) Spawn(ctx context.Context, tag string) chan<- *libs.FluentMsg { utils.Logger.Info("SpawnForTag", zap.String("tag", tag)) inChan := make(chan *libs.FluentMsg, s.InChanSize) // for each tag - go s.runFlusher(inChan) + go s.runFlusher(ctx, inChan) for i := 0; i < s.NFork; i++ { // parallel to each tag - go func() { - defer utils.Logger.Panic("producer exits", zap.String("tag", tag), zap.String("name", s.GetName())) + go func(i int) { + defer utils.Logger.Info("producer exits", + zap.String("tag", tag), + zap.Int("i", i), + zap.String("name", s.GetName())) var ( nRetry int @@ -130,7 +134,7 @@ func (s *HTTPSender) Spawn(tag string) chan<- *libs.FluentMsg { s.discardChan <- msg } } - }() + }(i) } return inChan diff --git a/senders/kafka.go b/senders/kafka.go index 65ebfbf..0d6b5e9 100644 --- a/senders/kafka.go +++ b/senders/kafka.go @@ -1,6 +1,7 @@ package senders import ( + "context" "fmt" "time" @@ -58,14 +59,17 @@ func (s *KafkaSender) GetName() string { return s.Name } -func (s *KafkaSender) Spawn(tag string) chan<- *libs.FluentMsg { +func (s *KafkaSender) Spawn(ctx context.Context, tag string) chan<- *libs.FluentMsg { utils.Logger.Info("SpawnForTag", zap.String("tag", tag)) inChan := make(chan *libs.FluentMsg, s.InChanSize) - go s.runFlusher(inChan) + go s.runFlusher(ctx, inChan) for i := 0; i < s.NFork; i++ { - go func() { - defer utils.Logger.Panic("kafka sender exit", zap.String("tag", tag), zap.String("name", s.GetName())) + go func(i int) { + defer utils.Logger.Info("kafka sender exit", + zap.String("tag", tag), + zap.String("name", s.GetName()), + zap.Int("i", i)) var ( jb []byte nRetry int @@ -77,6 +81,7 @@ func (s *KafkaSender) Spawn(tag string) chan<- *libs.FluentMsg { lastT = time.Unix(0, 0) err error j int + msg *libs.FluentMsg ) for j = 0; j < s.BatchSize; j++ { @@ -93,7 +98,13 @@ func (s *KafkaSender) Spawn(tag string) chan<- *libs.FluentMsg { zap.Strings("brokers", s.Brokers), zap.String("tag", tag)) - for msg := range inChan { + for { + select { + case <-ctx.Done(): + return + case msg = <-inChan: + } + if msg != nil { // msg.Message[s.TagKey] = msg.Tag // change msg tag msgBatch[iBatch] = msg @@ -168,7 +179,7 @@ func (s *KafkaSender) Spawn(tag string) chan<- *libs.FluentMsg { s.discardChan <- msg } } - }() + }(i) } return inChan diff --git a/senders/null.go b/senders/null.go index 302844e..36d2825 100644 --- a/senders/null.go +++ b/senders/null.go @@ -1,6 +1,7 @@ package senders import ( + "context" "fmt" "github.com/Laisky/go-fluentd/libs" @@ -55,36 +56,36 @@ func (s *NullSender) GetName() string { } // Spawn fork -func (s *NullSender) Spawn(tag string) chan<- *libs.FluentMsg { +func (s *NullSender) Spawn(ctx context.Context, tag string) chan<- *libs.FluentMsg { utils.Logger.Info("spawn for tag", zap.String("tag", tag)) inChan := make(chan *libs.FluentMsg, s.InChanSize) // for each tag for i := 0; i < s.NFork; i++ { go func() { - defer func() { - if err := recover(); err != nil { - utils.Logger.Panic("null sender exit", zap.Error(err.(error))) + defer utils.Logger.Info("null sender exit") + var msg *libs.FluentMsg + for { + select { + case <-ctx.Done(): + return + case msg = <-inChan: } - }() - for { - for msg := range inChan { - switch s.LogLevel { - case "info": - utils.Logger.Info("consume msg", - zap.String("tag", msg.Tag), - zap.String("msg", fmt.Sprint(msg.Message))) - case "debug": - utils.Logger.Debug("consume msg", - zap.String("tag", msg.Tag), - zap.String("msg", fmt.Sprint(msg.Message))) - } + switch s.LogLevel { + case "info": + utils.Logger.Info("consume msg", + zap.String("tag", msg.Tag), + zap.String("msg", fmt.Sprint(msg.Message))) + case "debug": + utils.Logger.Debug("consume msg", + zap.String("tag", msg.Tag), + zap.String("msg", fmt.Sprint(msg.Message))) + } - if s.IsCommit { - s.discardChan <- msg - } else { - s.discardWithoutCommitChan <- msg - } + if s.IsCommit { + s.discardChan <- msg + } else { + s.discardWithoutCommitChan <- msg } } }() diff --git a/server.go b/server.go index 55c6531..3049e50 100644 --- a/server.go +++ b/server.go @@ -3,7 +3,7 @@ package concator import ( "net/http" - "github.com/Depado/ginprom" + middlewares "github.com/Laisky/go-utils/gin-middlewares" "github.com/gin-contrib/pprof" @@ -30,12 +30,7 @@ func RunServer(addr string) { // supported action: // cmdline, profile, symbol, goroutine, heap, threadcreate, block pprof.Register(server, "pprof") - p := ginprom.New( - ginprom.Engine(server), - ginprom.Subsystem("gin"), - ginprom.Path("/metrics"), - ) - server.Use(p.Instrument()) + middlewares.BindPrometheus(server) utils.Logger.Info("listening on http", zap.String("addr", addr)) utils.Logger.Panic("server exit", zap.Error(server.Run(addr))) diff --git a/tagFilters/base.go b/tagFilters/base.go index f25f141..af2348e 100644 --- a/tagFilters/base.go +++ b/tagFilters/base.go @@ -1,6 +1,7 @@ package tagFilters import ( + "context" "fmt" "sync" @@ -15,7 +16,7 @@ var json = jsoniter.ConfigCompatibleWithStandardLibrary type TagFilterFactoryItf interface { IsTagSupported(string) bool - Spawn(string, chan<- *libs.FluentMsg) chan<- *libs.FluentMsg // Spawn(tag, outChan) inChan + Spawn(context.Context, string, chan<- *libs.FluentMsg) chan<- *libs.FluentMsg // Spawn(tag, outChan) inChan GetName() string SetMsgPool(*sync.Pool) @@ -46,20 +47,27 @@ func (f *BaseTagFilterFactory) DiscardMsg(msg *libs.FluentMsg) { f.committedChan <- msg } -func (f *BaseTagFilterFactory) runLB(lbkey string, inChan chan *libs.FluentMsg, inchans []chan *libs.FluentMsg) { +func (f *BaseTagFilterFactory) runLB(ctx context.Context, lbkey string, inChan chan *libs.FluentMsg, inchans []chan *libs.FluentMsg) { + defer utils.Logger.Info("concator lb exit") if len(inchans) < 1 { utils.Logger.Panic("nfork or inchans's length error", zap.Int("inchans_len", len(inchans))) } - defer utils.Logger.Panic("concator lb exit") var ( nfork = len(inchans) hashkey uint64 emptyHashkey = xxhash.Sum64String("") downChan = inchans[0] + msg *libs.FluentMsg ) - for msg := range inChan { + for { + select { + case <-ctx.Done(): + return + case msg = <-inChan: + } + if nfork != 1 { switch msg.Message[lbkey].(type) { case []byte: diff --git a/tagFilters/concator_f.go b/tagFilters/concator_f.go index 23b1e63..5786072 100644 --- a/tagFilters/concator_f.go +++ b/tagFilters/concator_f.go @@ -1,6 +1,7 @@ package tagFilters import ( + "context" "fmt" "regexp" "sync" @@ -12,19 +13,9 @@ import ( ) type ConcatorCfg struct { - Cf TagFilterFactoryItf - MaxLen int - Tag, MsgKey, Identifier string - OutChan chan<- *libs.FluentMsg - MsgPool, PMsgPool *sync.Pool - Regexp *regexp.Regexp -} - -// Concator work for one tag, contains many identifier("container_id") -// Warn: Concator should not blocking -type Concator struct { - *ConcatorCfg - slot map[string]*PendingMsg + MsgKey, + Identifier string + Regexp *regexp.Regexp } // LoadConcatorTagConfigs return the configurations about dispatch rules @@ -48,26 +39,14 @@ type PendingMsg struct { lastT time.Time } -// NewConcator create new Concator -func NewConcator(cfg *ConcatorCfg) *Concator { - utils.Logger.Info("create new concator", - zap.String("tag", cfg.Tag), - zap.String("identifier", cfg.Identifier), - zap.String("msgKey", cfg.MsgKey)) - - return &Concator{ - ConcatorCfg: cfg, - slot: map[string]*PendingMsg{}, - } -} - -// Run starting Concator to concatenate messages, +// StartNewConcator starting Concator to concatenate messages, // you should not run concator directly, // it's better to create and run Concator by ConcatorFactory // // TODO: concator for each tag now, // maybe set one concator for each identifier in the future for better performance -func (c *Concator) Run(inChan <-chan *libs.FluentMsg) { +func (c *ConcatorFactory) StartNewConcator(ctx context.Context, cfg *ConcatorCfg, outChan chan<- *libs.FluentMsg, inChan <-chan *libs.FluentMsg) { + defer utils.Logger.Info("concator exit") var ( msg *libs.FluentMsg pmsg *PendingMsg @@ -87,30 +66,36 @@ func (c *Concator) Run(inChan <-chan *libs.FluentMsg) { for { if len(c.slot) == 0 { // no msg waitting in slot utils.Logger.Debug("slot clear, waitting for new msg") - msg = <-inChan + select { + case <-ctx.Done(): + return + case msg = <-inChan: + } } else { select { + case <-ctx.Done(): + return case msg = <-inChan: default: // no new msg for identifier, pmsg = range c.slot { if utils.Clock.GetUTCNow().Sub(pmsg.lastT) > concatTimeoutTs { // timeout to flush // PAAS-210: I have no idea why this line could throw error - // utils.Logger.Debug("timeout flush", zap.ByteString("log", pmsg.msg.Message[c.MsgKey].([]byte))) + // utils.Logger.Debug("timeout flush", zap.ByteString("log", pmsg.msg.Message[cfg.MsgKey].([]byte))) - switch pmsg.msg.Message[c.MsgKey].(type) { + switch pmsg.msg.Message[cfg.MsgKey].(type) { case []byte: utils.Logger.Debug("timeout flush", - zap.ByteString("log", pmsg.msg.Message[c.MsgKey].([]byte)), + zap.ByteString("log", pmsg.msg.Message[cfg.MsgKey].([]byte)), zap.String("tag", pmsg.msg.Tag)) default: - utils.Logger.Error("[panic] unknown type of `pmsg.msg.Message[c.MsgKey]`", + utils.Logger.Panic("[panic] unknown type of `pmsg.msg.Message[cfg.MsgKey]`", zap.String("tag", pmsg.msg.Tag), - zap.String("log", fmt.Sprint(pmsg.msg.Message[c.MsgKey])), + zap.String("log", fmt.Sprint(pmsg.msg.Message[cfg.MsgKey])), zap.String("msg", fmt.Sprint(pmsg.msg))) } - c.PutDownstream(pmsg.msg) - c.PMsgPool.Put(pmsg) + outChan <- pmsg.msg + c.pMsgPool.Put(pmsg) delete(c.slot, identifier) } } @@ -123,40 +108,40 @@ func (c *Concator) Run(inChan <-chan *libs.FluentMsg) { timer.Reset(utils.Clock.GetUTCNow()) // unknown identifier - switch msg.Message[c.Identifier].(type) { + switch msg.Message[cfg.Identifier].(type) { case []byte: - identifier = string(msg.Message[c.Identifier].([]byte)) + identifier = string(msg.Message[cfg.Identifier].([]byte)) case string: - identifier = msg.Message[c.Identifier].(string) + identifier = msg.Message[cfg.Identifier].(string) default: utils.Logger.Warn("unknown identifier or unknown type", zap.String("tag", msg.Tag), - zap.String("identifier_key", c.Identifier), - zap.String("identifier", fmt.Sprint(msg.Message[c.Identifier]))) - c.PutDownstream(msg) + zap.String("identifier_key", cfg.Identifier), + zap.String("identifier", fmt.Sprint(msg.Message[cfg.Identifier]))) + outChan <- msg continue } // unknon msg key - switch msg.Message[c.MsgKey].(type) { + switch msg.Message[cfg.MsgKey].(type) { case []byte: - log = msg.Message[c.MsgKey].([]byte) + log = msg.Message[cfg.MsgKey].([]byte) case string: - log = []byte(msg.Message[c.MsgKey].(string)) - msg.Message[c.MsgKey] = log + log = []byte(msg.Message[cfg.MsgKey].(string)) + msg.Message[cfg.MsgKey] = log default: utils.Logger.Warn("unknown msg key or unknown type", zap.String("tag", msg.Tag), - zap.String("msg_key", c.MsgKey), + zap.String("msg_key", cfg.MsgKey), zap.String("msg", fmt.Sprint(msg.Message))) - c.PutDownstream(msg) + outChan <- msg continue } if pmsg, ok = c.slot[identifier]; !ok { // new identifier // new line with incorrect format, skip - if !c.Regexp.Match(log) { - c.PutDownstream(msg) + if !cfg.Regexp.Match(log) { + outChan <- msg continue } @@ -164,7 +149,7 @@ func (c *Concator) Run(inChan <-chan *libs.FluentMsg) { utils.Logger.Debug("got new identifier", zap.String("identifier", identifier), zap.ByteString("log", log)) - pmsg = c.PMsgPool.Get().(*PendingMsg) + pmsg = c.pMsgPool.Get().(*PendingMsg) pmsg.lastT = utils.Clock.GetUTCNow() pmsg.msg = msg c.slot[identifier] = pmsg @@ -172,11 +157,11 @@ func (c *Concator) Run(inChan <-chan *libs.FluentMsg) { } // replace exists msg in slot - if c.Regexp.Match(log) { // new line + if cfg.Regexp.Match(log) { // new line utils.Logger.Debug("got new line", zap.ByteString("log", log), zap.String("tag", msg.Tag)) - c.PutDownstream(c.slot[identifier].msg) + outChan <- c.slot[identifier].msg c.slot[identifier].msg = msg c.slot[identifier].lastT = utils.Clock.GetUTCNow() continue @@ -185,11 +170,11 @@ func (c *Concator) Run(inChan <-chan *libs.FluentMsg) { // need to concat utils.Logger.Debug("concat lines", zap.String("tag", msg.Tag), - zap.ByteString("log", msg.Message[c.MsgKey].([]byte))) - // c.slot[identifier].msg.Message[c.MsgKey] = - // append(c.slot[identifier].msg.Message[c.MsgKey].([]byte), '\n') - c.slot[identifier].msg.Message[c.MsgKey] = - append(c.slot[identifier].msg.Message[c.MsgKey].([]byte), msg.Message[c.MsgKey].([]byte)...) + zap.ByteString("log", msg.Message[cfg.MsgKey].([]byte))) + // c.slot[identifier].msg.Message[cfg.MsgKey] = + // append(c.slot[identifier].msg.Message[cfg.MsgKey].([]byte), '\n') + c.slot[identifier].msg.Message[cfg.MsgKey] = + append(c.slot[identifier].msg.Message[cfg.MsgKey].([]byte), msg.Message[cfg.MsgKey].([]byte)...) if c.slot[identifier].msg.ExtIds == nil { c.slot[identifier].msg.ExtIds = []int64{} // create ids, wait to append tail-msg's id } @@ -197,22 +182,18 @@ func (c *Concator) Run(inChan <-chan *libs.FluentMsg) { c.slot[identifier].lastT = utils.Clock.GetUTCNow() // too long to send - if len(c.slot[identifier].msg.Message[c.MsgKey].([]byte)) >= c.MaxLen { - utils.Logger.Debug("too long to send", zap.String("msgKey", c.MsgKey), zap.String("tag", msg.Tag)) - c.PutDownstream(c.slot[identifier].msg) - c.PMsgPool.Put(c.slot[identifier]) + if len(c.slot[identifier].msg.Message[cfg.MsgKey].([]byte)) >= c.MaxLen { + utils.Logger.Debug("too long to send", zap.String("msgKey", cfg.MsgKey), zap.String("tag", msg.Tag)) + outChan <- c.slot[identifier].msg + c.pMsgPool.Put(c.slot[identifier]) delete(c.slot, identifier) } // discard concated msg - c.Cf.DiscardMsg(msg) + c.DiscardMsg(msg) } } -func (c *Concator) PutDownstream(msg *libs.FluentMsg) { - c.OutChan <- msg -} - type ConcatorFactCfg struct { NFork, MaxLen int LBKey string @@ -223,7 +204,9 @@ type ConcatorFactCfg struct { type ConcatorFactory struct { *BaseTagFilterFactory *ConcatorFactCfg + pMsgPool *sync.Pool + slot map[string]*PendingMsg } // NewConcatorFact create new ConcatorFactory @@ -243,6 +226,7 @@ func NewConcatorFact(cfg *ConcatorFactCfg) *ConcatorFactory { cf := &ConcatorFactory{ BaseTagFilterFactory: &BaseTagFilterFactory{}, ConcatorFactCfg: cfg, + slot: map[string]*PendingMsg{}, pMsgPool: &sync.Pool{ New: func() interface{} { return &PendingMsg{} @@ -263,27 +247,19 @@ func (cf *ConcatorFactory) IsTagSupported(tag string) bool { } // Spawn create and run new Concator for new tag -func (cf *ConcatorFactory) Spawn(tag string, outChan chan<- *libs.FluentMsg) chan<- *libs.FluentMsg { +func (cf *ConcatorFactory) Spawn(ctx context.Context, tag string, outChan chan<- *libs.FluentMsg) chan<- *libs.FluentMsg { utils.Logger.Info("spawn concator tagfilter", zap.String("tag", tag)) var ( inChan = make(chan *libs.FluentMsg, cf.defaultInternalChanSize) inchans = []chan *libs.FluentMsg{} - cfg *ConcatorCfg + cfg = cf.Plugins[tag] ) for i := 0; i < cf.NFork; i++ { - cfg = cf.Plugins[tag] - cfg.Cf = cf - cfg.MaxLen = cf.MaxLen - cfg.Tag = tag - cfg.OutChan = outChan - cfg.PMsgPool = cf.pMsgPool - - concator := NewConcator(cfg) eachInchan := make(chan *libs.FluentMsg, cf.defaultInternalChanSize) - go concator.Run(eachInchan) + go cf.StartNewConcator(ctx, cfg, outChan, eachInchan) inchans = append(inchans, eachInchan) } - go cf.runLB(cf.LBKey, inChan, inchans) + go cf.runLB(ctx, cf.LBKey, inChan, inchans) return inChan } diff --git a/tagFilters/concator_test.go b/tagFilters/concator_test.go index 67987d8..4e03dc8 100644 --- a/tagFilters/concator_test.go +++ b/tagFilters/concator_test.go @@ -1,11 +1,8 @@ package tagFilters_test import ( - "math/rand" "regexp" - "sync" "testing" - "time" "github.com/Laisky/go-fluentd/libs" "github.com/Laisky/go-fluentd/tagFilters" @@ -31,81 +28,81 @@ func (f *Factory) Spawn(tag string, outChan chan<- *libs.FluentMsg) chan<- *libs return make(chan *libs.FluentMsg, 1000) } -func BenchmarkConcator(b *testing.B) { - // utils.SetupLogger("debug") - cf := &Factory{ - BaseTagFilterFactory: &tagFilters.BaseTagFilterFactory{}, - } - pMsgPool := &sync.Pool{ - New: func() interface{} { - return &tagFilters.PendingMsg{} - }, - } - msgPool := &sync.Pool{ - New: func() interface{} { - return &libs.FluentMsg{} - }, - } - inChan := make(chan *libs.FluentMsg, 2000) - outChan := make(chan *libs.FluentMsg, 10000) - - go func() { - i := 0.0 - now := time.Now() - for msg := range outChan { - i++ - if time.Now().Sub(now) > 1*time.Second { - now = time.Now() - b.Logf(">> msg: %v\n", string(msg.Message["log"].([]byte))) - b.Logf("%v/s\n", i/time.Now().Sub(now).Seconds()) - i = 0 - } - } - }() - - c := tagFilters.NewConcator(&tagFilters.ConcatorCfg{ - Cf: cf, - MaxLen: 100000, - Tag: "spring.sit", - MsgKey: "log", - Identifier: "container_id", - MsgPool: msgPool, - PMsgPool: pMsgPool, - OutChan: outChan, - Regexp: regexp.MustCompile(`^\d{4}-\d{2}-\d{2} +\d{2}:\d{2}:\d{2}\.\d{3} {0,}\|`), - }) - go c.Run(inChan) - - var ( - msg1, msg2 *libs.FluentMsg - ) - - b.Run("concator", func(b *testing.B) { - for i := 0; i < b.N; i++ { - if rand.Float64() <= 0.5 { - msg1 = msgPool.Get().(*libs.FluentMsg) - msg1.Tag = "app.spring.sit" - msg1.Id = 1 - msg1.Message = map[string]interface{}{ - "log": "2018-11-21 17:05:22.514 | test | INFO | http-nio-8080-exec-1 | com.pateo.qingcloud.cp.core.service.impl.CPBusiness.reflectAdapterRequest | 84: 123454321", - "container_id": "docker", - } - inChan <- msg1 - } else { - msg2 = msgPool.Get().(*libs.FluentMsg) - msg2.Tag = "app.spring.sit" - msg2.Id = 2 - msg2.Message = map[string]interface{}{ - "log": "12345", - "container_id": "docker", - } - inChan <- msg2 - } - } - }) - - b.Error("done") -} +// func BenchmarkConcator(b *testing.B) { +// // utils.SetupLogger("debug") +// cf := &Factory{ +// BaseTagFilterFactory: &tagFilters.BaseTagFilterFactory{}, +// } +// pMsgPool := &sync.Pool{ +// New: func() interface{} { +// return &tagFilters.PendingMsg{} +// }, +// } +// msgPool := &sync.Pool{ +// New: func() interface{} { +// return &libs.FluentMsg{} +// }, +// } +// inChan := make(chan *libs.FluentMsg, 2000) +// outChan := make(chan *libs.FluentMsg, 10000) + +// go func() { +// i := 0.0 +// now := time.Now() +// for msg := range outChan { +// i++ +// if time.Now().Sub(now) > 1*time.Second { +// now = time.Now() +// b.Logf(">> msg: %v\n", string(msg.Message["log"].([]byte))) +// b.Logf("%v/s\n", i/time.Now().Sub(now).Seconds()) +// i = 0 +// } +// } +// }() + +// c := tagFilters.NewConcator(&tagFilters.ConcatorCfg{ +// Cf: cf, +// MaxLen: 100000, +// Tag: "spring.sit", +// MsgKey: "log", +// Identifier: "container_id", +// MsgPool: msgPool, +// PMsgPool: pMsgPool, +// OutChan: outChan, +// Regexp: regexp.MustCompile(`^\d{4}-\d{2}-\d{2} +\d{2}:\d{2}:\d{2}\.\d{3} {0,}\|`), +// }) +// go c.Run(inChan) + +// var ( +// msg1, msg2 *libs.FluentMsg +// ) + +// b.Run("concator", func(b *testing.B) { +// for i := 0; i < b.N; i++ { +// if rand.Float64() <= 0.5 { +// msg1 = msgPool.Get().(*libs.FluentMsg) +// msg1.Tag = "app.spring.sit" +// msg1.Id = 1 +// msg1.Message = map[string]interface{}{ +// "log": "2018-11-21 17:05:22.514 | test | INFO | http-nio-8080-exec-1 | com.pateo.qingcloud.cp.core.service.impl.CPBusiness.reflectAdapterRequest | 84: 123454321", +// "container_id": "docker", +// } +// inChan <- msg1 +// } else { +// msg2 = msgPool.Get().(*libs.FluentMsg) +// msg2.Tag = "app.spring.sit" +// msg2.Id = 2 +// msg2.Message = map[string]interface{}{ +// "log": "12345", +// "container_id": "docker", +// } +// inChan <- msg2 +// } +// } +// }) + +// b.Error("done") +// } func BenchmarkRegexp(b *testing.B) { log := "2018-11-21 17:05:22.514 | test | INFO | http-nio-8080-exec-1 | com.pateo.qingcloud.cp.core.service.impl.CPBusiness.reflectAdapterRequest | 84: 123454321" diff --git a/tagFilters/parser_f.go b/tagFilters/parser_f.go index 3016a32..0395397 100644 --- a/tagFilters/parser_f.go +++ b/tagFilters/parser_f.go @@ -1,6 +1,7 @@ package tagFilters import ( + "context" "fmt" "regexp" "strings" @@ -32,37 +33,8 @@ func ParseAddCfg(env string, cfg interface{}) map[string]map[string]string { return ret } -type ParserCfg struct { - Cf *ParserFact - Tag, MsgKey string - Regexp *regexp.Regexp - OutChan chan<- *libs.FluentMsg - MsgPool *sync.Pool - IsRemoveOrigLog bool - Add map[string]map[string]string - ParseJsonKey, - MustInclude, - TimeKey, - TimeFormat, - NewTimeKey, - AppendTimeZone, - NewTimeFormat string - ReservedTimeKey bool -} - -// Parser is generanal parser -type Parser struct { - *ParserCfg -} - -func NewParser(cfg *ParserCfg) *Parser { - utils.Logger.Info("create new Parser tagfilter") - return &Parser{ - ParserCfg: cfg, - } -} - -func (f *Parser) Run(inChan <-chan *libs.FluentMsg) { +func (f *ParserFact) StartNewParser(ctx context.Context, outChan chan<- *libs.FluentMsg, inChan <-chan *libs.FluentMsg) { + defer utils.Logger.Info("parser runner exit") var ( err error ok bool @@ -71,9 +43,15 @@ func (f *Parser) Run(inChan <-chan *libs.FluentMsg) { k, v string t time.Time ) - for msg = range inChan { - if !f.Cf.IsTagSupported(msg.Tag) { - f.OutChan <- msg + for { + select { + case <-ctx.Done(): + return + case msg = <-inChan: + } + + if !f.IsTagSupported(msg.Tag) { + outChan <- msg } if f.MsgKey != "" { @@ -86,7 +64,7 @@ func (f *Parser) Run(inChan <-chan *libs.FluentMsg) { zap.String("tag", msg.Tag), zap.String("msg", fmt.Sprint(msg.Message)), zap.String("msg_key", f.MsgKey)) - f.OutChan <- msg + outChan <- msg continue } @@ -96,7 +74,7 @@ func (f *Parser) Run(inChan <-chan *libs.FluentMsg) { utils.Logger.Warn("message format not matched", zap.String("tag", msg.Tag), zap.ByteString("log", msg.Message[f.MsgKey].([]byte))) - f.Cf.DiscardMsg(msg) + f.DiscardMsg(msg) continue } } @@ -111,7 +89,7 @@ func (f *Parser) Run(inChan <-chan *libs.FluentMsg) { if f.MustInclude != "" { if _, ok = msg.Message[f.MustInclude]; !ok { utils.Logger.Warn("dicard since of missing key", zap.String("key", f.MustInclude)) - f.Cf.DiscardMsg(msg) + f.DiscardMsg(msg) continue } } @@ -196,7 +174,7 @@ func (f *Parser) Run(inChan <-chan *libs.FluentMsg) { zap.String("time_key", f.TimeKey), zap.String("time_format", f.TimeFormat), zap.String("append_time_zone", f.AppendTimeZone)) - f.Cf.DiscardMsg(msg) + f.DiscardMsg(msg) continue } @@ -209,7 +187,7 @@ func (f *Parser) Run(inChan <-chan *libs.FluentMsg) { zap.String("time_key", f.TimeKey), zap.String("time_format", f.TimeFormat), zap.String("append_time_zone", f.AppendTimeZone)) - f.Cf.DiscardMsg(msg) + f.DiscardMsg(msg) continue } @@ -220,22 +198,27 @@ func (f *Parser) Run(inChan <-chan *libs.FluentMsg) { msg.Message[f.NewTimeKey] = t.UTC().Format(f.NewTimeFormat) } - f.OutChan <- msg + outChan <- msg } } type ParserFactCfg struct { - NFork int - Name, LBKey string - Tags []string - Env, MsgKey string - Regexp *regexp.Regexp - MsgPool *sync.Pool - IsRemoveOrigLog bool - Add map[string]map[string]string - ParseJsonKey, MustInclude string - TimeKey, TimeFormat, NewTimeKey, AppendTimeZone, NewTimeFormat string - ReservedTimeKey bool + NFork int + Name, LBKey string + Tags []string + Env, MsgKey string + Regexp *regexp.Regexp + MsgPool *sync.Pool + IsRemoveOrigLog bool + Add map[string]map[string]string + ParseJsonKey, + MustInclude string + TimeKey, + TimeFormat, + NewTimeKey, + AppendTimeZone, + NewTimeFormat string + ReservedTimeKey bool } type ParserFact struct { @@ -274,35 +257,17 @@ func (cf *ParserFact) IsTagSupported(tag string) (ok bool) { return ok } -func (cf *ParserFact) Spawn(tag string, outChan chan<- *libs.FluentMsg) chan<- *libs.FluentMsg { +func (cf *ParserFact) Spawn(ctx context.Context, tag string, outChan chan<- *libs.FluentMsg) chan<- *libs.FluentMsg { utils.Logger.Info("spawn parser tagfilter", zap.String("tag", tag)) inChan := make(chan *libs.FluentMsg, cf.defaultInternalChanSize) inchans := []chan *libs.FluentMsg{} for i := 0; i < cf.NFork; i++ { - f := NewParser(&ParserCfg{ - Cf: cf, - Tag: tag, - OutChan: outChan, - MsgKey: cf.MsgKey, - MsgPool: cf.MsgPool, - Regexp: cf.Regexp, - IsRemoveOrigLog: cf.IsRemoveOrigLog, - Add: cf.Add, - ParseJsonKey: cf.ParseJsonKey, - MustInclude: cf.MustInclude, - TimeKey: cf.TimeKey, - TimeFormat: cf.TimeFormat, - NewTimeKey: cf.NewTimeKey, - AppendTimeZone: cf.AppendTimeZone, - NewTimeFormat: cf.NewTimeFormat, - ReservedTimeKey: cf.ReservedTimeKey, - }) eachInchan := make(chan *libs.FluentMsg, cf.defaultInternalChanSize) - go f.Run(eachInchan) + go cf.StartNewParser(ctx, outChan, eachInchan) inchans = append(inchans, eachInchan) } - go cf.runLB(cf.LBKey, inChan, inchans) + go cf.runLB(ctx, cf.LBKey, inChan, inchans) return inChan } diff --git a/tagFilters/pipeline.go b/tagFilters/pipeline.go index 8593b0a..d2c18b6 100644 --- a/tagFilters/pipeline.go +++ b/tagFilters/pipeline.go @@ -1,6 +1,7 @@ package tagFilters import ( + "context" "sync" "github.com/Laisky/go-fluentd/libs" @@ -9,6 +10,10 @@ import ( "github.com/Laisky/zap" ) +type TagPipelineItf interface { + Spawn(context.Context, string, chan<- *libs.FluentMsg) (chan<- *libs.FluentMsg, error) +} + type TagPipelineCfg struct { DefaultInternalChanSize int MsgPool *sync.Pool @@ -22,7 +27,7 @@ type TagPipeline struct { } // NewTagPipeline create new TagPipeline -func NewTagPipeline(cfg *TagPipelineCfg, itfs ...TagFilterFactoryItf) *TagPipeline { +func NewTagPipeline(ctx context.Context, cfg *TagPipelineCfg, itfs ...TagFilterFactoryItf) *TagPipeline { utils.Logger.Info("create tag pipeline") if cfg.DefaultInternalChanSize <= 0 { cfg.DefaultInternalChanSize = 1000 @@ -44,7 +49,7 @@ func NewTagPipeline(cfg *TagPipelineCfg, itfs ...TagFilterFactoryItf) *TagPipeli } // Spawn create and run new Concator for new tag, return inchan -func (p *TagPipeline) Spawn(tag string, outChan chan<- *libs.FluentMsg) (chan<- *libs.FluentMsg, error) { +func (p *TagPipeline) Spawn(ctx context.Context, tag string, outChan chan<- *libs.FluentMsg) (chan<- *libs.FluentMsg, error) { utils.Logger.Info("spawn tagpipeline", zap.String("tag", tag)) var ( f TagFilterFactoryItf @@ -59,7 +64,7 @@ func (p *TagPipeline) Spawn(tag string, outChan chan<- *libs.FluentMsg) (chan<- zap.String("name", f.GetName()), zap.String("tag", tag)) isTagSupported = true - downstreamChan = f.Spawn(tag, downstreamChan) // downstream's inChan is upstream's outChan + downstreamChan = f.Spawn(ctx, tag, downstreamChan) // downstream's inChan is upstream's outChan p.monitorChans[tag+"."+f.GetName()] = downstreamChan // instream } }