Skip to content

Commit

Permalink
improve rewrite performance by up to ~3x
Browse files Browse the repository at this point in the history
  • Loading branch information
chengshiwen committed Aug 8, 2024
1 parent 0ece2fe commit a99b8d5
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 17 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ The configuration settings are as follows:
* `flush_time`: default is `1`, wait 1 second write whether point count has bigger than flush_size config
* `check_interval`: default is `1`, check backend active every 1 second
* `rewrite_interval`: default is `10`, rewrite every 10 seconds
* `rewrite_threads`: default is `5`, rewrite under 5 threads
* `conn_pool_size`: default is `20`, create a connection pool which size is 20
* `write_timeout`: default is `10`, write timeout until 10 seconds
* `idle_timeout`: default is `10`, keep-alives wait time until 10 seconds
Expand Down
46 changes: 31 additions & 15 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/panjf2000/ants/v2"
"golang.org/x/sync/errgroup"
)

type CacheBuffer struct {
Expand All @@ -30,6 +31,7 @@ type Backend struct {
flushSize int
flushTime int
rewriteInterval int
rewriteThreads int
rewriteTicker *time.Ticker
chWrite chan *LinePoint
chTimer <-chan time.Time
Expand All @@ -43,6 +45,7 @@ func NewBackend(cfg *BackendConfig, pxcfg *ProxyConfig) (ib *Backend) {
flushSize: pxcfg.FlushSize,
flushTime: pxcfg.FlushTime,
rewriteInterval: pxcfg.RewriteInterval,
rewriteThreads: pxcfg.RewriteThreads,
rewriteTicker: time.NewTicker(time.Duration(pxcfg.RewriteInterval) * time.Second),
chWrite: make(chan *LinePoint, 16),
buffers: make(map[string]map[string]*CacheBuffer),
Expand Down Expand Up @@ -233,15 +236,39 @@ func (ib *Backend) RewriteLoop() {
}

func (ib *Backend) Rewrite() (err error) {
b, err := ib.fb.Read()
blocks, err := ib.fb.ReadN(ib.rewriteThreads)
if err != nil {
log.Print("rewrite read file error: ", err)
return
}
if b == nil {
if len(blocks) == 0 {
return
}

var g errgroup.Group
for _, b := range blocks {
b := b
g.Go(func() error {
return ib.rewrite(b)
})
}

if err = g.Wait(); err != nil {
err = ib.fb.RollbackMeta()
if err != nil {
log.Printf("rollback meta error: %s", err)
}
return
}

err = ib.fb.UpdateMeta()
if err != nil {
log.Printf("update meta error: %s", err)
}
return
}

func (ib *Backend) rewrite(b []byte) (err error) {
p := bytes.SplitN(b, []byte{' '}, 3)
if len(p) < 3 {
log.Print("rewrite read invalid data with length: ", len(p))
Expand All @@ -263,23 +290,12 @@ func (ib *Backend) Rewrite() (err error) {
case nil:
case ErrBadRequest:
log.Printf("bad request, drop all data")
// err = nil
err = nil
case ErrNotFound:
log.Printf("bad backend, drop all data")
// err = nil
err = nil
default:
log.Printf("rewrite http error, url: %s, db: %s, rp: %s, plen: %d", ib.Url, db, rp, len(p[1]))

err = ib.fb.RollbackMeta()
if err != nil {
log.Printf("rollback meta error: %s", err)
}
return
}

err = ib.fb.UpdateMeta()
if err != nil {
log.Printf("update meta error: %s", err)
}
return
}
Expand Down
4 changes: 4 additions & 0 deletions backend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type ProxyConfig struct {
FlushTime int `mapstructure:"flush_time"`
CheckInterval int `mapstructure:"check_interval"`
RewriteInterval int `mapstructure:"rewrite_interval"`
RewriteThreads int `mapstructure:"rewrite_threads"`
ConnPoolSize int `mapstructure:"conn_pool_size"`
WriteTimeout int `mapstructure:"write_timeout"`
IdleTimeout int `mapstructure:"idle_timeout"`
Expand Down Expand Up @@ -110,6 +111,9 @@ func (cfg *ProxyConfig) setDefault() {
if cfg.RewriteInterval <= 0 {
cfg.RewriteInterval = 10
}
if cfg.RewriteThreads <= 0 {
cfg.RewriteThreads = 5
}
if cfg.ConnPoolSize <= 0 {
cfg.ConnPoolSize = 20
}
Expand Down
18 changes: 16 additions & 2 deletions backend/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (fb *FileBackend) Read() (p []byte, err error) {

err = binary.Read(fb.consumer, binary.BigEndian, &length)
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
if errors.Is(err, io.EOF) {
fb.setData(false)
} else {
log.Print("read length error: ", err)
Expand All @@ -137,7 +137,7 @@ func (fb *FileBackend) Read() (p []byte, err error) {

_, err = io.ReadFull(fb.consumer, p)
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
if errors.Is(err, io.EOF) {
fb.setData(false)
} else {
log.Print("readfull error: ", err)
Expand All @@ -147,6 +147,20 @@ func (fb *FileBackend) Read() (p []byte, err error) {
return
}

func (fb *FileBackend) ReadN(n int) (blocks [][]byte, err error) {
for i := 0; i < n; i++ {
p, err := fb.Read()
if err != nil {
if errors.Is(err, io.EOF) && len(blocks) > 0 {
return blocks, nil
}
return nil, err
}
blocks = append(blocks, p)
}
return
}

func (fb *FileBackend) RollbackMeta() (err error) {
fb.lock.Lock()
defer fb.lock.Unlock()
Expand Down
1 change: 1 addition & 0 deletions conf/proxy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ flush_size = 10000
flush_time = 1
check_interval = 1
rewrite_interval = 10
rewrite_threads = 5
conn_pool_size = 20
write_timeout = 10
idle_timeout = 10
Expand Down
1 change: 1 addition & 0 deletions conf/proxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ flush_size: 10000
flush_time: 1
check_interval: 1
rewrite_interval: 10
rewrite_threads: 5
conn_pool_size: 20
write_timeout: 10
idle_timeout: 10
Expand Down
1 change: 1 addition & 0 deletions docker/quick/proxy.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"flush_time": 1,
"check_interval": 1,
"rewrite_interval": 10,
"rewrite_threads": 5,
"conn_pool_size": 20,
"write_timeout": 10,
"idle_timeout": 10,
Expand Down
1 change: 1 addition & 0 deletions proxy.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"flush_time": 1,
"check_interval": 1,
"rewrite_interval": 10,
"rewrite_threads": 5,
"conn_pool_size": 20,
"write_timeout": 10,
"idle_timeout": 10,
Expand Down

0 comments on commit a99b8d5

Please sign in to comment.