Skip to content

Commit

Permalink
slog poc
Browse files Browse the repository at this point in the history
  • Loading branch information
makasim committed Nov 10, 2024
1 parent 1bb0532 commit ae7d76a
Show file tree
Hide file tree
Showing 43 changed files with 97 additions and 48 deletions.
13 changes: 4 additions & 9 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ import (
"log"
"log/slog"
"sync"
"sync/atomic"
)

var ErrFlowNotFound = errors.New("flow not found")
var execIDS = &atomic.Int64{}

type Engine struct {
d Doer
l *slog.Logger

wg *sync.WaitGroup
doneCh chan struct{}
}

func NewEngine(d Doer) (*Engine, error) {
func NewEngine(d Doer, l *slog.Logger) (*Engine, error) {
e := &Engine{
d: d,
l: l,

wg: &sync.WaitGroup{},
doneCh: make(chan struct{}),
Expand All @@ -46,11 +46,6 @@ func (e *Engine) Execute(stateCtx *StateCtx) error {
defer e.wg.Done()
}

if stateCtx.ExecID == 0 {
stateCtx.ExecID = execIDS.Add(1)
}
execID := stateCtx.ExecID

stateCtx.e = e

if stateCtx.Current.ID == `` {
Expand All @@ -73,7 +68,7 @@ func (e *Engine) Execute(stateCtx *StateCtx) error {
return err
}

slog.Info("executing", "exec_id", execID, "state_id", stateCtx.Current.ID, "state_rev", stateCtx.Current.Rev, "flow", stateCtx.Current.Transition.ToID)
e.l.Info("flowstate: executing", logWithStateCtx(stateCtx, []any{"flow", stateCtx.Current.Transition.ToID}))

Check failure on line 71 in engine.go

View workflow job for this annotation

GitHub Actions / build

undefined: logWithStateCtx
cmd0, err := f.Execute(stateCtx, e)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/jackc/pgx/v5 v5.6.0
github.com/mattn/go-sqlite3 v1.14.22
github.com/stretchr/testify v1.9.0
github.com/thejerf/slogassert v0.3.4
github.com/xo/dburl v0.23.2
go.uber.org/goleak v1.3.0
golang.org/x/time v0.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/thejerf/slogassert v0.3.4 h1:VoTsXixRbXMrRSSxDjYTiEDCM4VWbsYPW5rB/hX24kM=
github.com/thejerf/slogassert v0.3.4/go.mod h1:0zn9ISLVKo1aPMTqcGfG1o6dWwt+Rk574GlUxHD4rs8=
github.com/xo/dburl v0.23.2 h1:Fl88cvayrgE56JA/sqhNMLljCW/b7RmG1mMkKMZUFgA=
github.com/xo/dburl v0.23.2/go.mod h1:uazlaAQxj4gkshhfuuYyvwCBouOmNnG2aDxTCFZpmL4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
Expand Down
3 changes: 1 addition & 2 deletions state.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ type StateCtx struct {
// Transitions between committed and current states
Transitions []Transition `json:"transitions"`

ExecID int64 `json:"-"`
e *Engine `json:"-"`
e *Engine `json:"-"`
}

func (s *StateCtx) CopyTo(to *StateCtx) *StateCtx {
Expand Down
3 changes: 2 additions & 1 deletion testcases/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func Actor(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return nil, fmt.Errorf("must never be executed")
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/call_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func CallFlow(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/call_flow_with_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ func CallFlowWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/call_flow_with_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ func CallFlowWithWatch(t TestingT, d flowstate.Doer, fr FlowRegistry) {
), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func Condition(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ func Cron(t TestingT, d flowstate.Doer, fr FlowRegistry) {
), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/data_flow_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ func DataFlowConfig(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/data_store_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func DataStoreGet(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/data_store_get_with_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func DataStoreGetWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/delay_delayed_win_with_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func Delay_DelayedWin_WithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry)
), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/delay_engine_do.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func Delay_EngineDo(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/delay_paused.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ func Delay_Paused(t TestingT, d flowstate.Doer, fr FlowRegistry) {
), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/delay_paused_with_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func Delay_PausedWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) {
), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/delay_return.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func Delay_Return(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/delay_transited_win_with_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ func Delay_TransitedWin_WithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry
), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ func Fork(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/fork_join_first_wins.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func ForkJoin_FirstWins(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/fork_join_last_wins.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ func ForkJoin_LastWins(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/fork_with_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ func Fork_WithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/get_by_id_and_rev.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
func GetByIDAndRev(t TestingT, d flowstate.Doer, _ FlowRegistry) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/get_latest_by_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
func GetLatestByID(t TestingT, d flowstate.Doer, _ FlowRegistry) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/get_latest_by_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
func GetLatestByLabel(t TestingT, d flowstate.Doer, _ FlowRegistry) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/get_not_found.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (
func GetNotFound(t TestingT, d flowstate.Doer, _ FlowRegistry) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
14 changes: 14 additions & 0 deletions testcases/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package testcases

import (
"log/slog"

"github.com/thejerf/slogassert"
)

func newTestLogger(t TestingT) (*slog.Logger, *slogassert.Handler) {
h := slogassert.New(t, slog.LevelWarn, nil)
l := slog.New(h)

return l, h
}
3 changes: 2 additions & 1 deletion testcases/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ func Mutex(t TestingT, d flowstate.Doer, fr FlowRegistry) {
states = append(states, statesCtx)
}

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func Queue(t TestingT, d flowstate.Doer, fr FlowRegistry) {
), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/rate_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func RateLimit(t TestingT, d flowstate.Doer, fr FlowRegistry) {
states = append(states, stateCtx)
}

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/recovery_always_fail.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ func RecoveryAlwaysFail(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.Noop(stateCtx), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/recovery_first_attempt_fail.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func RecoveryFirstAttemptFail(t TestingT, d flowstate.Doer, fr FlowRegistry) {
), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
3 changes: 2 additions & 1 deletion testcases/single_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func SingleNode(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
1 change: 1 addition & 0 deletions testcases/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
)

type TestingT interface {
Helper()
Error(...interface{})
Errorf(format string, args ...interface{})
Fatalf(format string, args ...any)
Expand Down
3 changes: 2 additions & 1 deletion testcases/three_consequent_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func ThreeConsequentNodes(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

e, err := flowstate.NewEngine(d)
l, _ := newTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
Loading

0 comments on commit ae7d76a

Please sign in to comment.