Skip to content

Commit

Permalink
gpool
Browse files Browse the repository at this point in the history
  • Loading branch information
snail007 committed Dec 22, 2023
1 parent 22a8f3b commit 61e0658
Show file tree
Hide file tree
Showing 8 changed files with 402 additions and 66 deletions.
3 changes: 2 additions & 1 deletion demos/website/initialize/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bytes"
"fmt"
"github.com/snail007/gmc"
gcore "github.com/snail007/gmc/core"
"github.com/snail007/gmc/demos/website/router"
"net"
"strings"
Expand Down Expand Up @@ -55,6 +56,6 @@ func Initialize(s *gmc.HTTPServer) (err error) {
}
buf.WriteString("http://127.0.0.1:" + port + path + "\n")
}
s.Logger().Writer().Write(buf.Bytes())
s.Logger().Writer().Write(buf.Bytes(), gcore.LogLeveInfo)
return
}
2 changes: 1 addition & 1 deletion util/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (s *Executor) waitFirst(checkSuccess bool) (value interface{}, err error) {
//a task returned, call rootCancel to cancel others task.
go s.rootCancel()
return v.value, v.err
case <-gsync.Wait(&g):
case <-gsync.WaitGroupToChan(&g):
// all task done, return the last err
return nil, allResult.Pop().(taskResult).err
}
Expand Down
3 changes: 2 additions & 1 deletion util/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gexec
import (
"bytes"
"context"
"errors"
"fmt"
gcore "github.com/snail007/gmc/core"
gerror "github.com/snail007/gmc/module/error"
Expand Down Expand Up @@ -221,7 +222,7 @@ func (s *Command) combinedOutput(cmd *exec.Cmd) ([]byte, error) {
// ExecAsync async execute command on linux system.
func (s *Command) ExecAsync() (e error) {
if s.async {
panic("ExecAsync can not run with Async is enabled")
return errors.New("ExecAsync can not run with Async is enabled")
}
s.execAsync = true
_, e = s.Exec()
Expand Down
181 changes: 138 additions & 43 deletions util/gpool/gpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ package gpool
import (
"crypto/rand"
"encoding/hex"
"errors"
gcore "github.com/snail007/gmc/core"
gerror "github.com/snail007/gmc/module/error"
glist "github.com/snail007/gmc/util/list"
gmap "github.com/snail007/gmc/util/map"
"io"
"sync"
"time"
)

const (
Expand All @@ -22,16 +24,44 @@ const (
statusStopped
)

var (
MaxWaitCountReached = errors.New("max await job count reached")
)

// GPool is a goroutine pool, you can increase or decrease pool size in runtime.
type GPool struct {
tasks *glist.List
logger gcore.Logger
workers *gmap.Map
debug bool
maxWaitCount int
lazy sync.Once
initWorkCount int
g *sync.WaitGroup
tasks *glist.List
logger gcore.Logger
workers *gmap.Map
debug bool
maxWaitCount int
maxWorkCount int
g *sync.WaitGroup
idleDuration time.Duration
submitBlockChanList *glist.List
blockingOnMaxWait bool
}

// BlockingOnMaxWait if the count of await job to run reach the max,
// if blocking Submit call
func (s *GPool) BlockingOnMaxWait() bool {
return s.blockingOnMaxWait
}

func (s *GPool) SetBlockingOnMaxWait(blockingOnMaxWait bool) {
s.blockingOnMaxWait = blockingOnMaxWait
}

// IdleDuration is the idle time duration before the worker exit,
// duration 0 means the work will not exit.
func (s *GPool) IdleDuration() time.Duration {
return s.idleDuration
}

// SetIdleDuration set the idle time duration before the worker exit,
// duration 0 means the work will not exit.
func (s *GPool) SetIdleDuration(idleDuration time.Duration) {
s.idleDuration = idleDuration
}

// MaxTaskAwaitCount returns the max waiting task count.
Expand Down Expand Up @@ -61,28 +91,37 @@ func New(workerCount int) (p *GPool) {

func NewWithLogger(workerCount int, logger gcore.Logger) (p *GPool) {
p = &GPool{
tasks: glist.New(),
logger: logger,
workers: gmap.New(),
initWorkCount: workerCount,
g: &sync.WaitGroup{},
submitBlockChanList: glist.New(),
tasks: glist.New(),
logger: logger,
workers: gmap.New(),
maxWorkCount: workerCount,
g: &sync.WaitGroup{},
}
return
}

// Increase add the count of `workerCount` workers
func (s *GPool) Increase(workerCount int) {
s.maxWorkCount += workerCount
s.addWorker(workerCount)
}
func (s *GPool) removeWorker(w *worker) {
w.Stop()
s.workers.Delete(w.id)
}

// Decrease stop the count of `workerCount` workers
func (s *GPool) Decrease(workerCount int) {
// find idle workers
s.maxWorkCount -= workerCount
if s.maxWorkCount < 0 {
s.maxWorkCount = 0
}
// find awaiting workers
s.workers.Range(func(_, v interface{}) bool {
w := v.(*worker)
if w.Status() == statusWaiting {
w.Stop()
s.workers.Delete(w.id)
s.removeWorker(w)
workerCount--
if workerCount == 0 {
return false
Expand All @@ -95,8 +134,7 @@ func (s *GPool) Decrease(workerCount int) {
s.workers.Range(func(_, v interface{}) bool {
w := v.(*worker)
if w.Status() == statusRunning {
v.(*worker).Stop()
s.workers.Delete(w.id)
s.removeWorker(w)
workerCount--
if workerCount == 0 {
return false
Expand All @@ -113,6 +151,7 @@ func (s *GPool) ResetTo(workerCount int) {
if length == workerCount {
return
}
s.maxWorkCount = workerCount
if workerCount > length {
s.Increase(workerCount - length)
} else {
Expand All @@ -131,6 +170,9 @@ func (s *GPool) WaitDone() {
}

func (s *GPool) addWorker(cnt int) {
if s.WorkerCount() >= s.maxWorkCount {
return
}
for i := 0; i < cnt; i++ {
w := newWorker(s)
s.workers.Store(w.id, w)
Expand All @@ -157,17 +199,23 @@ func (s *GPool) run(fn func()) {
}

// Submit adds a function as a task ready to run
func (s *GPool) Submit(task func()) bool {
s.lazy.Do(func() {
s.addWorker(s.initWorkCount)
})
if s.maxWaitCount > 0 && s.tasks.Len() > s.maxWaitCount {
return false
func (s *GPool) Submit(task func()) error {
if s.WorkerCount() < s.maxWorkCount {
s.addWorker(1)
}
if s.maxWaitCount > 0 && s.tasks.Len() >= s.maxWaitCount {
if s.blockingOnMaxWait {
ch := make(chan bool)
s.submitBlockChanList.Add(ch)
<-ch
} else {
return MaxWaitCountReached
}
}
s.g.Add(1)
s.tasks.Add(task)
s.notifyAll()
return true
return nil
}

// notify all workers, only idle workers be awakened
Expand All @@ -187,7 +235,7 @@ func (s *GPool) pop() (fn func()) {
return
}

// Stop stop and remove all workers in the pool
// Stop and remove all workers in the pool
func (s *GPool) Stop() {
s.workers.Range(func(_, v interface{}) bool {
v.(*worker).Stop()
Expand All @@ -196,8 +244,8 @@ func (s *GPool) Stop() {
s.workers.Clear()
}

// Running returns the count of running workers
func (s *GPool) Running() (workerCount int) {
// RunningWork returns the count of running workers
func (s *GPool) RunningWork() (workerCount int) {
s.workers.Range(func(_, v interface{}) bool {
if v.(*worker).Status() == statusRunning {
workerCount++
Expand All @@ -207,6 +255,17 @@ func (s *GPool) Running() (workerCount int) {
return
}

// AwaitingWorker returns the count of awaiting workers
func (s *GPool) AwaitingWorker() (workerCount int) {
s.workers.Range(func(_, v interface{}) bool {
if v.(*worker).Status() == statusWaiting {
workerCount++
}
return true
})
return
}

// Awaiting returns the count of task ready to run
func (s *GPool) Awaiting() (taskCount int) {
return s.tasks.Len()
Expand Down Expand Up @@ -245,6 +304,11 @@ func (w *worker) Status() int {

func (w *worker) SetStatus(status int) {
w.status = status
if status == statusWaiting && w.pool.submitBlockChanList.Len() > 0 {
if ch := w.pool.submitBlockChanList.Pop(); ch != nil {
ch.(chan bool) <- true
}
}
}

func (w *worker) Wakeup() bool {
Expand Down Expand Up @@ -285,34 +349,65 @@ func (w *worker) Stop() {
func (w *worker) start() {
go func() {
w.Wakeup()
var t *time.Timer
defer func() {
if t != nil {
t.Stop()
}
w.SetStatus(statusStopped)
w.pool.removeWorker(w)
w.pool.debugLog("GPool: worker[%s] stopped", w.id)
}()
w.pool.debugLog("GPool: worker[%s] started ...", w.id)
var fn func()
var doJob = func() (isReturn bool) {
w.SetStatus(statusRunning)
w.pool.debugLog("GPool: worker[%s] running ...", w.id)
for {
//w.pool.debugLog("GPool: worker[%s] read break", w.id)
if w.isBreak() {
w.pool.debugLog("GPool: worker[%s] break", w.id)
return true
}
if fn = w.pool.pop(); fn != nil {
//w.pool.debugLog("GPool: worker[%s] called", w.id)
w.pool.run(fn)
} else {
w.pool.debugLog("GPool: worker[%s] no task, break", w.id)
break
}
}
return
}
for {
w.SetStatus(statusWaiting)
w.pool.debugLog("GPool: worker[%s] waiting ...", w.id)
select {
case _, ok := <-w.wakeupSig:
if !ok {
if w.pool.idleDuration > 0 {
if t == nil {
t = time.NewTimer(w.pool.idleDuration)
} else {
t.Reset(w.pool.idleDuration)
}
select {
case <-t.C:
w.pool.debugLog("GPool: worker[%s] idle timeout, exited", w.id)
return
case _, ok := <-w.wakeupSig:
if !ok {
return
}
if doJob() {
return
}
}
w.SetStatus(statusRunning)
w.pool.debugLog("GPool: worker[%s] running ...", w.id)
for {
//w.pool.debugLog("GPool: worker[%s] read break", w.id)
if w.isBreak() {
w.pool.debugLog("GPool: worker[%s] break", w.id)
} else {
select {
case _, ok := <-w.wakeupSig:
if !ok {
return
}
if fn = w.pool.pop(); fn != nil {
//w.pool.debugLog("GPool: worker[%s] called", w.id)
w.pool.run(fn)
} else {
w.pool.debugLog("GPool: worker[%s] no task, break", w.id)
break
if doJob() {
return
}
}
}
Expand Down
Loading

0 comments on commit 61e0658

Please sign in to comment.