Skip to content

Commit

Permalink
gpool
Browse files Browse the repository at this point in the history
  • Loading branch information
snail007 committed Dec 22, 2023
1 parent bbe446f commit 0ff0b5d
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 68 deletions.
68 changes: 62 additions & 6 deletions util/gpool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,38 @@ goos: darwin
goarch: amd64
pkg: github.com/snail007/gmc/util/gpool
cpu: Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
BenchmarkSubmit-16 5806988 193.8 ns/op
BenchmarkWorker-16 5232226 232.5 ns/op
BenchmarkSubmit/pool_size:20-16 717519 3822 ns/op
BenchmarkSubmit/pool_size:40-16 932514 3944 ns/op
BenchmarkSubmit/pool_size:60-16 789867 4295 ns/op
BenchmarkSubmit/pool_size:80-16 1000000 5250 ns/op
BenchmarkSubmit/pool_size:100-16 972837 5719 ns/op
BenchmarkSubmit/pool_size:200-16 798679 6224 ns/op
BenchmarkSubmit/pool_size:400-16 683112 6566 ns/op
BenchmarkSubmit/pool_size:600-16 571062 5244 ns/op
BenchmarkSubmit/pool_size:800-16 664258 9264 ns/op
BenchmarkSubmit/pool_size:1000-16 495985 5359 ns/op
BenchmarkSubmit/pool_size:10000-16 564003 6340 ns/op
BenchmarkSubmit/pool_size:20000-16 563130 6611 ns/op
BenchmarkSubmit/pool_size:30000-16 572671 6293 ns/op
BenchmarkSubmit/pool_size:40000-16 529896 5777 ns/op
BenchmarkSubmit/pool_size:50000-16 495811 5074 ns/op
BenchmarkJob/pool_size:20-16 546973 4891 ns/op
BenchmarkJob/pool_size:40-16 525769 4606 ns/op
BenchmarkJob/pool_size:60-16 514962 5270 ns/op
BenchmarkJob/pool_size:80-16 522291 5347 ns/op
BenchmarkJob/pool_size:100-16 537969 4681 ns/op
BenchmarkJob/pool_size:200-16 609165 5018 ns/op
BenchmarkJob/pool_size:400-16 513234 5614 ns/op
BenchmarkJob/pool_size:600-16 591480 5476 ns/op
BenchmarkJob/pool_size:800-16 537184 5458 ns/op
BenchmarkJob/pool_size:1000-16 475809 5273 ns/op
BenchmarkJob/pool_size:10000-16 723447 6300 ns/op
BenchmarkJob/pool_size:20000-16 591313 4874 ns/op
BenchmarkJob/pool_size:30000-16 508342 4536 ns/op
BenchmarkJob/pool_size:40000-16 484904 5399 ns/op
BenchmarkJob/pool_size:50000-16 458240 5261 ns/op
PASS
ok github.com/snail007/gmc/util/gpool 3.071s
ok github.com/snail007/gmc/util/gpool 101.870s
```

```text
Expand All @@ -56,8 +84,36 @@ goos: darwin
goarch: amd64
pkg: github.com/snail007/gmc/util/gpool
cpu: Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
BenchmarkSubmit-16 18357210 190.3 ns/op
BenchmarkWorker-16 15653331 232.4 ns/op
BenchmarkSubmit/pool_size:20-16 1000000 3702 ns/op
BenchmarkSubmit/pool_size:40-16 1000000 6413 ns/op
BenchmarkSubmit/pool_size:60-16 1000000 4236 ns/op
BenchmarkSubmit/pool_size:80-16 1000000 4683 ns/op
BenchmarkSubmit/pool_size:100-16 1000000 7908 ns/op
BenchmarkSubmit/pool_size:200-16 1000000 6421 ns/op
BenchmarkSubmit/pool_size:400-16 1000000 7677 ns/op
BenchmarkSubmit/pool_size:600-16 1000000 10708 ns/op
BenchmarkSubmit/pool_size:800-16 1000000 9914 ns/op
BenchmarkSubmit/pool_size:1000-16 1000000 7588 ns/op
BenchmarkSubmit/pool_size:10000-16 1000000 7316 ns/op
BenchmarkSubmit/pool_size:20000-16 1000000 8698 ns/op
BenchmarkSubmit/pool_size:30000-16 1000000 7268 ns/op
BenchmarkSubmit/pool_size:40000-16 1000000 7404 ns/op
BenchmarkSubmit/pool_size:50000-16 1000000 9545 ns/op
BenchmarkJob/pool_size:20-16 1000000 6091 ns/op
BenchmarkJob/pool_size:40-16 1000000 6476 ns/op
BenchmarkJob/pool_size:60-16 1000000 4791 ns/op
BenchmarkJob/pool_size:80-16 1000000 5697 ns/op
BenchmarkJob/pool_size:100-16 1000000 5325 ns/op
BenchmarkJob/pool_size:200-16 1000000 6210 ns/op
BenchmarkJob/pool_size:400-16 1000000 5936 ns/op
BenchmarkJob/pool_size:600-16 1000000 6310 ns/op
BenchmarkJob/pool_size:800-16 1000000 8020 ns/op
BenchmarkJob/pool_size:1000-16 1000000 7428 ns/op
BenchmarkJob/pool_size:10000-16 1000000 6842 ns/op
BenchmarkJob/pool_size:20000-16 1000000 7807 ns/op
BenchmarkJob/pool_size:30000-16 1000000 5834 ns/op
BenchmarkJob/pool_size:40000-16 1000000 5572 ns/op
BenchmarkJob/pool_size:50000-16 1000000 6033 ns/op
PASS
ok github.com/snail007/gmc/util/gpool 7.921s
ok github.com/snail007/gmc/util/gpool 204.891s
```
59 changes: 41 additions & 18 deletions util/gpool/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,57 @@
package gpool

import (
gcast "github.com/snail007/gmc/util/cast"
gloop "github.com/snail007/gmc/util/loop"
"sync"
"testing"
)

var pSubmit, pWorker *Pool

func BenchmarkSubmit(b *testing.B) {
pSubmit = New(1)
b.StartTimer()
for i := 0; i < b.N; i++ {
pSubmit.Submit(func() {
var do = func(max, step int) {
gloop.ForBy(max, step, func(loopIndex, loopValue int) {
size := loopValue + step
b.Run("pool size:"+gcast.ToString(size), func(b *testing.B) {
pSubmit = New(size)
b.StartTimer()
for i := 0; i < b.N; i++ {
pSubmit.Submit(func() {
})
}
b.StopTimer()
pSubmit.Stop()
})
})
}
b.StopTimer()
pSubmit.Stop()
do(100, 20)
do(1000, 200)
do(50000, 10000)

}
func BenchmarkWorker(b *testing.B) {
pWorker = New(1)
b.StopTimer()
g := sync.WaitGroup{}
g.Add(b.N)
b.StartTimer()
for i := 0; i < b.N; i++ {
pWorker.Submit(func() {
g.Done()
func BenchmarkJob(b *testing.B) {
var do = func(max, step int) {
gloop.ForBy(max, step, func(loopIndex, loopValue int) {
size := loopValue + step
b.Run("pool size:"+gcast.ToString(size), func(b *testing.B) {
pWorker = New(size)
b.StopTimer()
g := sync.WaitGroup{}
g.Add(b.N)
b.StartTimer()
for i := 0; i < b.N; i++ {
pWorker.Submit(func() {
g.Done()
})
}
g.Wait()
b.StopTimer()
pWorker.Stop()
})
})
}
g.Wait()
b.StopTimer()
pWorker.Stop()
do(100, 20)
do(1000, 200)
do(50000, 10000)
}
115 changes: 71 additions & 44 deletions util/gpool/gpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
gmap "github.com/snail007/gmc/util/map"
"io"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -30,13 +31,15 @@ var (

// Pool is a goroutine pool, you can increase or decrease pool size in runtime.
type Pool struct {
maxWorkCount int
jobs *glist.List
workers *gmap.Map
g *sync.WaitGroup
submitBlockChanList *glist.List
submitLock *sync.Mutex
opt *Option
maxWorkCount int
jobs *glist.List
workers *gmap.Map
g *sync.WaitGroup
submitBlockChanList *glist.List
submitLock *sync.Mutex
opt *Option
idleWorkerCounter *int64
runningWorkerCounter *int64
}

// Option sets the pool
Expand Down Expand Up @@ -114,37 +117,52 @@ func NewWithPreAlloc(workerCount int) (p *Pool) {

func NewWithOption(workerCount int, opt *Option) (p *Pool) {
p = &Pool{
submitBlockChanList: glist.New(),
jobs: glist.New(),
workers: gmap.New(),
maxWorkCount: workerCount,
g: &sync.WaitGroup{},
submitLock: &sync.Mutex{},
opt: opt,
submitBlockChanList: glist.New(),
jobs: glist.New(),
workers: gmap.New(),
maxWorkCount: workerCount,
g: &sync.WaitGroup{},
submitLock: &sync.Mutex{},
opt: opt,
idleWorkerCounter: new(int64),
runningWorkerCounter: new(int64),
}
if opt.PreAlloc {
p.ResetTo(workerCount)
p.maxWorkCount = 0
p.Increase(workerCount)
}
return p
}

// Increase add the count of `workerCount` workers
func (s *Pool) Increase(workerCount int) {
s.maxWorkCount += workerCount
s.increase(workerCount, true)
}

func (s *Pool) increase(workerCount int, modifyCounter bool) {
if modifyCounter {
s.maxWorkCount += workerCount
}
s.addWorker(workerCount)
}

func (s *Pool) removeWorker(w *worker) {
w.Stop()
s.workers.Delete(w.id)
}
func (s *Pool) Decrease(workerCount int) {
s.decrease(workerCount, true)
}

// Decrease stop the count of `workerCount` workers
func (s *Pool) Decrease(workerCount int) {
s.maxWorkCount -= workerCount
if s.maxWorkCount < 0 {
s.maxWorkCount = 0
func (s *Pool) decrease(workerCount int, modifyCounter bool) {
if modifyCounter {
s.maxWorkCount -= workerCount
if s.maxWorkCount < 0 {
s.maxWorkCount = 0
}
}

// find idle workers
s.workers.Range(func(_, v interface{}) bool {
w := v.(*worker)
Expand Down Expand Up @@ -181,9 +199,9 @@ func (s *Pool) ResetTo(workerCount int) {
}
s.maxWorkCount = workerCount
if workerCount > length {
s.Increase(workerCount - length)
s.increase(workerCount-length, false)
} else {
s.Decrease(length - workerCount)
s.decrease(length-workerCount, false)
}
}

Expand Down Expand Up @@ -278,35 +296,16 @@ func (s *Pool) Stop() {

// RunningWorkCount returns the count of running workers
func (s *Pool) RunningWorkCount() (workerCount int) {
s.workers.RangeFast(func(_, v interface{}) bool {
if v.(*worker).Status() == statusRunning {
workerCount++
}
return true
})
return
return int(atomic.LoadInt64(s.runningWorkerCounter))
}

// IdleWorkerCount returns the count of idle workers
func (s *Pool) IdleWorkerCount() (workerCount int) {
s.workers.RangeFast(func(_, v interface{}) bool {
if v.(*worker).Status() == statusIdle {
workerCount++
}
return true
})
return
return int(atomic.LoadInt64(s.idleWorkerCounter))
}

func (s *Pool) hasIdleWorker() (has bool) {
s.workers.RangeFast(func(_, v interface{}) bool {
if v.(*worker).Status() == statusIdle {
has = true
return false
}
return true
})
return
return atomic.LoadInt64(s.idleWorkerCounter) > 0
}

// QueuedJobCount returns the count of queued job
Expand Down Expand Up @@ -356,6 +355,26 @@ func (w *worker) SetStatus(status int) {
}
}

func (w *worker) addWorkerCounter(cnt *int64, val int64) {
if val < 0 {
if atomic.LoadInt64(cnt)-val >= 0 {
atomic.AddInt64(cnt, val)
} else {
atomic.AddInt64(cnt, 0)
}
} else {
atomic.AddInt64(cnt, val)
}
}

func (w *worker) addIdleWorkerCounter(val int64) {
w.addWorkerCounter(w.pool.idleWorkerCounter, val)
}

func (w *worker) addRunningWorkerCounter(val int64) {
w.addWorkerCounter(w.pool.runningWorkerCounter, val)
}

func (w *worker) Wakeup() bool {
defer gerror.RecoverNop()
select {
Expand Down Expand Up @@ -407,6 +426,8 @@ func (w *worker) start() {
var fn func()
var doJob = func() (isReturn bool) {
w.SetStatus(statusRunning)
w.addRunningWorkerCounter(1)
defer w.addRunningWorkerCounter(-1)
w.pool.debugLog("Pool: worker[%s] running ...", w.id)
for {
//w.pool.debugLog("Pool: worker[%s] read break", w.id)
Expand All @@ -426,6 +447,7 @@ func (w *worker) start() {
}
for {
w.SetStatus(statusIdle)
w.addIdleWorkerCounter(1)
w.pool.debugLog("Pool: worker[%s] waiting ...", w.id)
if w.pool.opt.IdleDuration > 0 {
if t == nil {
Expand All @@ -436,11 +458,14 @@ func (w *worker) start() {
select {
case <-t.C:
w.pool.debugLog("Pool: worker[%s] idle timeout, exited", w.id)
w.addIdleWorkerCounter(-1)
return
case _, ok := <-w.wakeupSig:
if !ok {
w.addIdleWorkerCounter(-1)
return
}
w.addIdleWorkerCounter(-1)
if doJob() {
return
}
Expand All @@ -449,8 +474,10 @@ func (w *worker) start() {
select {
case _, ok := <-w.wakeupSig:
if !ok {
w.addIdleWorkerCounter(-1)
return
}
w.addIdleWorkerCounter(-1)
if doJob() {
return
}
Expand Down

0 comments on commit 0ff0b5d

Please sign in to comment.