Skip to content

Commit

Permalink
ci: release v1.11.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Laisky committed Sep 5, 2019
1 parent ebd9580 commit a5d5aea
Show file tree
Hide file tree
Showing 33 changed files with 630 additions and 618 deletions.
2 changes: 1 addition & 1 deletion .docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM registry:5000/gobase:1.12.9-alpine3.10 AS gobuild
FROM registry:5000/gobase:1.13.0-alpine3.10 AS gobuild

ENV GO111MODULE=on
WORKDIR /go-fluentd
Expand Down
4 changes: 2 additions & 2 deletions .docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ go mod download
go mod vendor

# build base image
docker build . -f ./.docker/gobase.Dockerfile -t registry:5000/gobase:1.12.9-alpine3.10
docker push registry:5000/gobase:1.12.9-alpine3.10
docker build . -f ./.docker/gobase.Dockerfile -t registry:5000/gobase:1.13.0-alpine3.10
docker push registry:5000/gobase:1.13.0-alpine3.10

# build image
docker build . -f ./.docker/Dockerfile -t registry:5000/go-fluentd:1.8.2
Expand Down
6 changes: 3 additions & 3 deletions .docker/gobase.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# docker build . -f ./.docker/gobase.Dockerfile -t registry:5000/gobase:1.12.9-alpine3.10
# docker push registry:5000/gobase:1.12.9-alpine3.10
FROM golang:1.12.9-alpine3.10
# docker build . -f ./.docker/gobase.Dockerfile -t registry:5000/gobase:1.13.0-alpine3.10
# docker push registry:5000/gobase:1.13.0-alpine3.10
FROM golang:1.13.0-alpine3.10

# run dependencies
RUN apk update && apk upgrade && \
Expand Down
4 changes: 3 additions & 1 deletion .docker/test.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# docker build . -f ./.docker/test.Dockerfile -t registry:5000/go-fluentd-test:v1
# docker push registry:5000/go-fluentd-test:v1
FROM registry:5000/gobase:1.12.9-alpine3.10
FROM registry:5000/gobase:1.13.0-alpine3.10
ENV GO111MODULE=on



WORKDIR /go-fluentd
COPY go.mod .
COPY go.sum .
Expand Down
2 changes: 1 addition & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ build:app:
# - public
# image: registry:5000/docker:latest
# script:
# - docker pull registry:5000/gobase:1.12.9-alpine3.10 # pull latest image
# - docker pull registry:5000/gobase:1.13.0-alpine3.10 # pull latest image
# - docker build . -f ./.docker/test.Dockerfile -t go-fluentd-test:$DOCKER_TAG
# retry: 1

Expand Down
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,22 @@
*CURRENT*
---

- 2019-09-05 (Laisky) ci: upgrade go-utils v1.7.5
- 2019-09-05 (Laisky) fix: shrink deps
- 2019-09-05 (Laisky) ci: upgrade go-utils v1.7.4
- 2019-09-04 (Laisky) fix: upgrade go-utils
- 2019-09-04 (Laisky) fix: add context in postpipeline
- 2019-09-04 (Laisky) build: upgrade go v1.13.0
- 2019-09-04 (Laisky) fix: add context in accptor
- 2019-09-04 (Laisky) fix: add context in acceptorPipeline
- 2019-09-04 (Laisky) fix: add context in producer
- 2019-09-04 (Laisky) fix: refactor tagPipeline
- 2019-09-03 (Laisky) ci: upgrade go-utils v1.7.3
- 2019-09-03 (Laisky) docs: update changelog

*v1.10.8*
---

- 2019-09-03 (Laisky) fix: ctx.Done return

*v1.10.7*
Expand Down
5 changes: 3 additions & 2 deletions acceptor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package concator

import (
"context"
"fmt"
"sync"

Expand Down Expand Up @@ -45,7 +46,7 @@ func NewAcceptor(cfg *AcceptorCfg, recvs ...recvs.AcceptorRecvItf) *Acceptor {

// Run starting acceptor to listening and receive messages,
// you can use `acceptor.MessageChan()` to load messages`
func (a *Acceptor) Run() {
func (a *Acceptor) Run(ctx context.Context) {
// got exists max id from legacy
utils.Logger.Info("process legacy data...")
maxID, err := a.Journal.LoadMaxID()
Expand All @@ -64,7 +65,7 @@ func (a *Acceptor) Run() {
recv.SetSyncOutChan(a.syncOutChan)
recv.SetMsgPool(a.MsgPool)
recv.SetCounter(couter)
go recv.Run()
go recv.Run(ctx)
}
}

Expand Down
24 changes: 16 additions & 8 deletions acceptorFilters/pipeline.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package acceptorFilters

import (
"context"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -78,7 +79,7 @@ func (f *AcceptorPipeline) DiscardMsg(msg *libs.FluentMsg) {
f.MsgPool.Put(msg)
}

func (f *AcceptorPipeline) Wrap(asyncInChan, syncInChan chan *libs.FluentMsg) (outChan, skipDumpChan chan *libs.FluentMsg) {
func (f *AcceptorPipeline) Wrap(ctx context.Context, asyncInChan, syncInChan chan *libs.FluentMsg) (outChan, skipDumpChan chan *libs.FluentMsg) {
outChan = make(chan *libs.FluentMsg, f.OutChanSize)
skipDumpChan = make(chan *libs.FluentMsg, f.OutChanSize)

Expand All @@ -92,11 +93,13 @@ func (f *AcceptorPipeline) Wrap(asyncInChan, syncInChan chan *libs.FluentMsg) (o
filter AcceptorFilterItf
msg *libs.FluentMsg
)
defer utils.Logger.Panic("quit acceptorPipeline asyncChan", zap.String("msg", fmt.Sprint(msg)))
defer utils.Logger.Info("quit acceptorPipeline asyncChan", zap.String("last_msg", fmt.Sprint(msg)))

NEXT_ASYNC_MSG:
for {
NEXT_ASYNC_MSG:
select {
case <-ctx.Done():
return
case msg = <-f.reEnterChan: // CAUTION: do not put msg into reEnterChan forever
case msg = <-asyncInChan:
}
Expand All @@ -112,7 +115,7 @@ func (f *AcceptorPipeline) Wrap(asyncInChan, syncInChan chan *libs.FluentMsg) (o

for _, filter = range f.filters {
if msg = filter.Filter(msg); msg == nil { // quit filters for this msg
goto NEXT_ASYNC_MSG
continue NEXT_ASYNC_MSG
}
}

Expand All @@ -132,9 +135,15 @@ func (f *AcceptorPipeline) Wrap(asyncInChan, syncInChan chan *libs.FluentMsg) (o
filter AcceptorFilterItf
msg *libs.FluentMsg
)
defer utils.Logger.Panic("quit acceptorPipeline syncChan", zap.String("msg", fmt.Sprint(msg)))
defer utils.Logger.Info("quit acceptorPipeline syncChan", zap.String("last_msg", fmt.Sprint(msg)))

for msg = range syncInChan {
NEXT_SYNC_MSG:
for {
select {
case <-ctx.Done():
return
case msg = <-syncInChan:
}
// utils.Logger.Debug("AcceptorPipeline got blockable msg")
f.counter.Count()

Expand All @@ -148,12 +157,11 @@ func (f *AcceptorPipeline) Wrap(asyncInChan, syncInChan chan *libs.FluentMsg) (o
if msg = filter.Filter(msg); msg == nil { // quit filters for this msg
// do not discard in pipeline
// filter can make decision to bypass or discard msg
goto NEXT_SYNC_MSG
continue NEXT_SYNC_MSG
}
}

outChan <- msg
NEXT_SYNC_MSG:
}

}()
Expand Down
72 changes: 40 additions & 32 deletions controllor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/Laisky/zap"
)

const (
ctxKey utils.CtxKeyT = "key"
var (
ctxKey = utils.CtxKeyT{}
)

// Controllor is an IoC that manage all roles
Expand Down Expand Up @@ -121,28 +121,29 @@ func (c *Controllor) initRecvs(env string) []recvs.AcceptorRecvItf {
MaxAllowedAheadSec: utils.Settings.GetDuration("settings.acceptor.recvs.plugins."+name+".max_allowed_ahead_sec") * time.Second,
}))
case "kafka":
receivers = append(receivers, recvs.NewKafkaRecv(&recvs.KafkaCfg{
KMsgPool: sharingKMsgPool,
Meta: utils.FallBack(
func() interface{} {
return utils.Settings.Get("settings.acceptor.recvs.plugins." + name + ".meta").(map[string]interface{})
}, map[string]interface{}{}).(map[string]interface{}),
Name: name,
MsgKey: utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".msg_key"),
Brokers: utils.Settings.GetStringSlice("settings.acceptor.recvs.plugins." + name + ".brokers." + env),
Topics: []string{utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".topics." + env)},
Group: utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".groups." + env),
Tag: utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".tags." + env),
IsJSONFormat: utils.Settings.GetBool("settings.acceptor.recvs.plugins." + name + ".is_json_format"),
TagKey: utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".tag_key"),
JSONTagKey: utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".json_tag_key"),
RewriteTag: recvs.GetKafkaRewriteTag(utils.Settings.GetString("settings.acceptor.recvs.plugins."+name+".rewrite_tag"), env),
NConsumer: utils.Settings.GetInt("settings.acceptor.recvs.plugins." + name + ".nconsumer"),
KafkaCommitCfg: &recvs.KafkaCommitCfg{
IntervalNum: utils.Settings.GetInt("settings.acceptor.recvs.plugins." + name + ".interval_num"),
IntervalDuration: utils.Settings.GetDuration("settings.acceptor.recvs.plugins."+name+".interval_sec") * time.Second,
},
}))
kafkaCfg := recvs.NewKafkaCfg()
kafkaCfg.KMsgPool = sharingKMsgPool
kafkaCfg.Meta = utils.FallBack(
func() interface{} {
return utils.Settings.Get("settings.acceptor.recvs.plugins." + name + ".meta").(map[string]interface{})
}, map[string]interface{}{}).(map[string]interface{})
kafkaCfg.Name = name
kafkaCfg.MsgKey = utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".msg_key")
kafkaCfg.Brokers = utils.Settings.GetStringSlice("settings.acceptor.recvs.plugins." + name + ".brokers." + env)
kafkaCfg.Topics = []string{utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".topics." + env)}
kafkaCfg.Group = utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".groups." + env)
kafkaCfg.Tag = utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".tags." + env)
kafkaCfg.IsJSONFormat = utils.Settings.GetBool("settings.acceptor.recvs.plugins." + name + ".is_json_format")
kafkaCfg.TagKey = utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".tag_key")
kafkaCfg.JSONTagKey = utils.Settings.GetString("settings.acceptor.recvs.plugins." + name + ".json_tag_key")
kafkaCfg.RewriteTag = recvs.GetKafkaRewriteTag(utils.Settings.GetString("settings.acceptor.recvs.plugins."+name+".rewrite_tag"), env)
kafkaCfg.NConsumer = utils.Settings.GetInt("settings.acceptor.recvs.plugins." + name + ".nconsumer")
kafkaCfg.KafkaCommitCfg = &recvs.KafkaCommitCfg{
IntervalNum: utils.Settings.GetInt("settings.acceptor.recvs.plugins." + name + ".interval_num"),
IntervalDuration: utils.Settings.GetDuration("settings.acceptor.recvs.plugins."+name+".interval_sec") * time.Second,
}

receivers = append(receivers, recvs.NewKafkaRecv(kafkaCfg))
default:
utils.Logger.Panic("unknown recv type",
zap.String("recv_type", utils.Settings.GetString("settings.acceptor.recvs.plugins."+name+".type")),
Expand Down Expand Up @@ -171,7 +172,7 @@ func (c *Controllor) initAcceptor(journal *Journal, receivers []recvs.AcceptorRe
receivers...,
)

acceptor.Run()
acceptor.Run(c.ctx)
return acceptor
}

Expand Down Expand Up @@ -290,7 +291,7 @@ func (c *Controllor) initTagPipeline(env string, waitCommitChan chan<- *libs.Flu
})}, fs...)
}

return tagFilters.NewTagPipeline(&tagFilters.TagPipelineCfg{
return tagFilters.NewTagPipeline(c.ctx, &tagFilters.TagPipelineCfg{
MsgPool: c.msgPool,
CommitedChan: waitCommitChan,
DefaultInternalChanSize: utils.Settings.GetInt("settings.tag_filters.internal_chan_size"),
Expand All @@ -306,7 +307,7 @@ func (c *Controllor) initDispatcher(waitDispatchChan chan *libs.FluentMsg, tagPi
NFork: utils.Settings.GetInt("settings.dispatcher.nfork"),
OutChanSize: utils.Settings.GetInt("settings.dispatcher.out_chan_size"),
})
dispatcher.Run()
dispatcher.Run(c.ctx)

return dispatcher
}
Expand Down Expand Up @@ -485,8 +486,15 @@ func (c *Controllor) initProducer(env string, waitProduceChan chan *libs.FluentM
)
}

func (c *Controllor) runHeartBeat() {
func (c *Controllor) runHeartBeat(ctx context.Context) {
defer utils.Logger.Info("heartbeat exit")
for {
select {
case <-ctx.Done():
return
default:
}

utils.Logger.Info("heartbeat",
zap.Int("goroutine", runtime.NumGoroutine()),
)
Expand All @@ -509,7 +517,7 @@ func (c *Controllor) Run() {
waitCommitChan := journal.GetCommitChan()
waitAccepPipelineSyncChan := acceptor.GetSyncOutChan()
waitAccepPipelineAsyncChan := acceptor.GetAsyncOutChan()
waitDumpChan, skipDumpChan := acceptorPipeline.Wrap(waitAccepPipelineAsyncChan, waitAccepPipelineSyncChan)
waitDumpChan, skipDumpChan := acceptorPipeline.Wrap(c.ctx, waitAccepPipelineAsyncChan, waitAccepPipelineSyncChan)

// after `journal.DumpMsgFlow`, every discarded msg should commit to waitCommitChan
waitDispatchChan := journal.DumpMsgFlow(c.ctx, c.msgPool, waitDumpChan, skipDumpChan)
Expand All @@ -518,12 +526,12 @@ func (c *Controllor) Run() {
dispatcher := c.initDispatcher(waitDispatchChan, tagPipeline)
waitPostPipelineChan := dispatcher.GetOutChan()
postPipeline := c.initPostPipeline(env, waitCommitChan)
waitProduceChan := postPipeline.Wrap(waitPostPipelineChan)
waitProduceChan := postPipeline.Wrap(c.ctx, waitPostPipelineChan)
producerSenders := c.initSenders(env)
producer := c.initProducer(env, waitProduceChan, waitCommitChan, producerSenders)

// heartbeat
go c.runHeartBeat()
go c.runHeartBeat(c.ctx)

// monitor
monitor.AddMetric("controllor", func() map[string]interface{} {
Expand All @@ -549,6 +557,6 @@ func (c *Controllor) Run() {
})
monitor.BindHTTP(server)

go producer.Run()
go producer.Run(c.ctx)
RunServer(utils.Settings.GetString("addr"))
}
Loading

0 comments on commit a5d5aea

Please sign in to comment.