Skip to content

Commit

Permalink
gpool
Browse files Browse the repository at this point in the history
  • Loading branch information
snail007 committed Dec 22, 2023
1 parent 0ff0b5d commit 4b9fb25
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 1 deletion.
13 changes: 12 additions & 1 deletion util/gpool/gpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"crypto/rand"
"encoding/hex"
"errors"
"fmt"
gcore "github.com/snail007/gmc/core"
gerror "github.com/snail007/gmc/module/error"
glist "github.com/snail007/gmc/util/list"
Expand Down Expand Up @@ -56,6 +57,8 @@ type Option struct {
IdleDuration time.Duration
// start the worker when the pool created
PreAlloc bool
// PanicHandler is used to handle panics from each job function.
PanicHandler func(e interface{})
}

// Blocking the count of queued job to run reach the max, if blocking Submit call
Expand Down Expand Up @@ -238,7 +241,15 @@ func (s *Pool) run(fn func()) {
defer func() {
s.g.Done()
if e := recover(); e != nil {
s.log("Pool: a job stopped unexpectedly, err: %s", gcore.ProviderError()().StackError(e))
msg := fmt.Sprintf("Pool: a job stopped unexpectedly, err: %s", gcore.ProviderError()().StackError(e))
if s.opt.Logger != nil {
s.opt.Logger.Error(msg)
} else {
fmt.Println(msg)
}
if s.opt.PanicHandler != nil {
s.opt.PanicHandler(e)
}
}
}()
fn()
Expand Down
84 changes: 84 additions & 0 deletions util/gpool/gpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
package gpool_test

import (
"bytes"
_ "github.com/snail007/gmc"
gcore "github.com/snail007/gmc/core"
gerror "github.com/snail007/gmc/module/error"
glog "github.com/snail007/gmc/module/log"
gfile "github.com/snail007/gmc/util/file"
"github.com/snail007/gmc/util/gpool"
gloop "github.com/snail007/gmc/util/loop"
gatomic "github.com/snail007/gmc/util/sync/atomic"
Expand All @@ -19,13 +21,65 @@ import (
"time"
)

func TestDebugLogging(t *testing.T) {
t.Parallel()
p := gpool.New(1)
p.SetDebug(true)
buf := bytes.NewBuffer(nil)
l := glog.New()
l.SetOutput(glog.NewLoggerWriter(buf))
p.SetLogger(l)
p.Submit(func() { panic("hello_gmc") })
time.Sleep(time.Second)
assert2.Contains(t, buf.String(), "hello_gmc")
p.Stop()
time.Sleep(time.Second)
}

func TestPanicLogging(t *testing.T) {
t.Parallel()
p := gpool.New(1)
buf := bytes.NewBuffer(nil)
l := glog.New()
l.SetOutput(glog.NewLoggerWriter(buf))
p.SetLogger(l)
p.Submit(func() { panic("hello_gmc") })
time.Sleep(time.Second)
assert2.Contains(t, buf.String(), "hello_gmc")

fn := "tmp_panic.txt"
gfile.Write(fn, []byte("123"), false)
defer os.Remove(fn)
f, err := os.OpenFile(fn, os.O_RDWR, 0777)
assert2.Nil(t, err)

os.Stdout = f
p.SetLogger(nil)
p.Submit(func() { panic("hello_gmc") })
time.Sleep(time.Second * 3)
assert2.Contains(t, string(gfile.Bytes(fn)), "hello_gmc")
}

func TestPanicHandler(t *testing.T) {
t.Parallel()
i := 0
p := gpool.NewWithOption(1, &gpool.Option{PanicHandler: func(e interface{}) {
i = 1
}})
p.Submit(func() { panic("") })
time.Sleep(time.Second)
assert2.Equal(t, 1, i)
}

func TestPreAlloc(t *testing.T) {
t.Parallel()
p := gpool.NewWithPreAlloc(3)
time.Sleep(time.Millisecond * 50)
assert2.Equal(t, 3, p.IdleWorkerCount())
}

func TestBlocking(t *testing.T) {
t.Parallel()
p := gpool.New(1)
p.SetMaxJobCount(1)
p.SetBlocking(true)
Expand All @@ -51,6 +105,7 @@ func TestBlocking(t *testing.T) {
}

func TestIdle(t *testing.T) {
t.Parallel()
p := gpool.New(3)
p.SetDebug(true)
p.SetIdleDuration(time.Second)
Expand All @@ -66,9 +121,29 @@ func TestIdle(t *testing.T) {
time.Sleep(time.Second * 2)
assert2.Equal(t, 0, p.WorkerCount())
p.Stop()
time.Sleep(time.Second)
}

func TestIdleNoIdle(t *testing.T) {
t.Parallel()
p := gpool.New(3)
p.SetDebug(true)
p.SetIdleDuration(time.Second * 10)
cnt := gatomic.NewInt(0)
gloop.For(3, func(loopIndex int) {
p.Submit(func() {
cnt.Increase(loopIndex)
})
})
time.Sleep(time.Millisecond * 500)
assert2.Equal(t, 3, cnt.Val())
p.Stop()
time.Sleep(time.Second)
assert2.Equal(t, 0, p.WorkerCount())
}

func TestNewGPool(t *testing.T) {
t.Parallel()
p := gpool.New(3)
if p != nil {
t.Log("New is okay")
Expand All @@ -79,6 +154,7 @@ func TestNewGPool(t *testing.T) {
}

func TestSubmit(t *testing.T) {
t.Parallel()
p := gpool.New(3)
a := make(chan bool)
p.Submit(func() {
Expand All @@ -98,6 +174,7 @@ func TestSubmit(t *testing.T) {
time.Sleep(time.Second)
}
func TestStop(t *testing.T) {
t.Parallel()
p := gpool.New(3)
a := make(chan bool)
p.Submit(func() {
Expand All @@ -114,11 +191,13 @@ func TestStop(t *testing.T) {
p.Stop()
}
func TestSetLogger(t *testing.T) {
t.Parallel()
p := gpool.New(3)
p.SetLogger(nil)
p.Stop()
}
func TestWaitDone(t *testing.T) {
t.Parallel()
start := time.Now()
p := gpool.NewWithPreAlloc(10)
gloop.For(10, func(loopIndex int) {
Expand All @@ -132,6 +211,7 @@ func TestWaitDone(t *testing.T) {
}

func TestRunning(t *testing.T) {
t.Parallel()
p := gpool.NewWithPreAlloc(3)
p.Submit(func() {
time.Sleep(time.Second)
Expand All @@ -155,6 +235,7 @@ func TestRunning(t *testing.T) {
}

func TestIncrease(t *testing.T) {
t.Parallel()
assert := assert2.New(t)
p := gpool.NewWithPreAlloc(3)
for i := 0; i < 3; i++ {
Expand All @@ -174,6 +255,7 @@ func TestIncrease(t *testing.T) {
}

func TestDecrease(t *testing.T) {
t.Parallel()
assert := assert2.New(t)
p := gpool.NewWithPreAlloc(2)
for i := 0; i < 6; i++ {
Expand All @@ -190,6 +272,7 @@ func TestDecrease(t *testing.T) {
}

func TestQueuedJobCount(t *testing.T) {
t.Parallel()
p := gpool.NewWithPreAlloc(3)
p.Submit(func() {
time.Sleep(time.Second)
Expand All @@ -213,6 +296,7 @@ func TestQueuedJobCount(t *testing.T) {
}

func TestGPool_MaxWaitCount(t *testing.T) {
t.Parallel()
assert := assert2.New(t)
p := gpool.New(1)
//p.SetLogger(glog.New())
Expand Down

0 comments on commit 4b9fb25

Please sign in to comment.