diff --git a/cmd.go b/cmd.go index cdbcbd7..4288c6d 100644 --- a/cmd.go +++ b/cmd.go @@ -1,10 +1,21 @@ package flowstate type Command interface { + setSessID(id int64) + SessID() int64 cmd() } type command struct { + sessID int64 } func (_ *command) cmd() {} + +func (cmd *command) setSessID(doID int64) { + cmd.sessID = doID +} + +func (cmd *command) SessID() int64 { + return cmd.sessID +} diff --git a/cmd_commit.go b/cmd_commit.go index 9d56054..a4f9f38 100644 --- a/cmd_commit.go +++ b/cmd_commit.go @@ -15,6 +15,13 @@ type CommitCommand struct { Commands []Command } +func (cmd *CommitCommand) setSessID(id int64) { + cmd.command.setSessID(id) + for _, subCmd := range cmd.Commands { + subCmd.setSessID(id) + } +} + func CommitStateCtx(stateCtx *StateCtx) *CommitStateCtxCommand { return &CommitStateCtxCommand{ StateCtx: stateCtx, diff --git a/engine.go b/engine.go index 2af00ed..bf1dfaf 100644 --- a/engine.go +++ b/engine.go @@ -4,22 +4,26 @@ import ( "context" "errors" "fmt" - "log" + "log/slog" "sync" + "sync/atomic" ) var ErrFlowNotFound = errors.New("flow not found") +var sessIDS = &atomic.Int64{} type Engine struct { d Doer + l *slog.Logger wg *sync.WaitGroup doneCh chan struct{} } -func NewEngine(d Doer) (*Engine, error) { +func NewEngine(d Doer, l *slog.Logger) (*Engine, error) { e := &Engine{ d: d, + l: l, wg: &sync.WaitGroup{}, doneCh: make(chan struct{}), @@ -43,6 +47,8 @@ func (e *Engine) Execute(stateCtx *StateCtx) error { defer e.wg.Done() } + sessID := sessIDS.Add(1) + stateCtx.sessID = sessID stateCtx.e = e if stateCtx.Current.ID == `` { @@ -65,11 +71,14 @@ func (e *Engine) Execute(stateCtx *StateCtx) error { return err } + logExecute(stateCtx, e.l) cmd0, err := f.Execute(stateCtx, e) if err != nil { return err } + cmd0.setSessID(sessID) + if cmd, ok := cmd0.(*ExecuteCommand); ok { cmd.sync = true } @@ -77,7 +86,12 @@ func (e *Engine) Execute(stateCtx *StateCtx) error { conflictErr := &ErrCommitConflict{} if err = e.do(cmd0); errors.As(err, conflictErr) { - log.Printf("INFO: engine: execute: %s\n", conflictErr) + e.l.Info("engine: do conflict", + "sess", cmd0.SessID(), + "conflict", err.Error(), + "id", stateCtx.Current.ID, + "rev", stateCtx.Current.Rev, + ) return nil } else if err != nil { return err @@ -99,7 +113,22 @@ func (e *Engine) Do(cmds ...Command) error { return fmt.Errorf("no commands to do") } + var sessID int64 for _, cmd := range cmds { + if cmd.SessID() == 0 { + if sessID == 0 { + sessID = sessIDS.Add(1) + } + + cmd.setSessID(sessID) + + if cmtCmd, ok := cmd.(*CommitCommand); ok { + for _, subCmd := range cmtCmd.Commands { + subCmd.setSessID(sessID) + } + } + } + if err := e.do(cmd); err != nil { return err } @@ -136,6 +165,8 @@ func (e *Engine) Shutdown(ctx context.Context) error { } func (e *Engine) do(cmd0 Command) error { + logDo(cmd0, e.l) + switch cmd := cmd0.(type) { case *ExecuteCommand: if cmd.sync { @@ -143,11 +174,19 @@ func (e *Engine) do(cmd0 Command) error { } go func() { - if err := e.Execute(cmd.StateCtx); err != nil { - log.Printf("ERROR: engine: go execute: %s\n", err) + stateCtx := cmd.StateCtx + if err := e.Execute(stateCtx); err != nil { + e.l.Error("execute failed", + "sess", stateCtx.SessID(), + "error", err, + "id", stateCtx.Current.ID, + "rev", stateCtx.Current.Rev, + ) } }() return nil + case *CommitCommand: + return e.d.Do(cmd0) default: return e.d.Do(cmd0) } diff --git a/go.mod b/go.mod index de7a680..afa7ceb 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/jackc/pgx/v5 v5.6.0 github.com/mattn/go-sqlite3 v1.14.22 github.com/stretchr/testify v1.9.0 + github.com/thejerf/slogassert v0.3.4 github.com/xo/dburl v0.23.2 go.uber.org/goleak v1.3.0 golang.org/x/time v0.5.0 diff --git a/go.sum b/go.sum index 2e19f69..f98afe3 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/thejerf/slogassert v0.3.4 h1:VoTsXixRbXMrRSSxDjYTiEDCM4VWbsYPW5rB/hX24kM= +github.com/thejerf/slogassert v0.3.4/go.mod h1:0zn9ISLVKo1aPMTqcGfG1o6dWwt+Rk574GlUxHD4rs8= github.com/xo/dburl v0.23.2 h1:Fl88cvayrgE56JA/sqhNMLljCW/b7RmG1mMkKMZUFgA= github.com/xo/dburl v0.23.2/go.mod h1:uazlaAQxj4gkshhfuuYyvwCBouOmNnG2aDxTCFZpmL4= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= diff --git a/log.go b/log.go new file mode 100644 index 0000000..e94827d --- /dev/null +++ b/log.go @@ -0,0 +1,94 @@ +package flowstate + +import ( + "fmt" + "log/slog" +) + +func logExecute(stateCtx *StateCtx, l *slog.Logger) { + args := []any{ + "sess", stateCtx.sessID, + "flow", stateCtx.Current.Transition.ToID, + "id", stateCtx.Current.ID, + "rev", stateCtx.Current.Rev, + } + + currTs := stateCtx.Current.Transition + + if currTs.Annotations[StateAnnotation] != `` { + args = append(args, "state", currTs.Annotations[StateAnnotation]) + } + + if currTs.Annotations[DelayDurationAnnotation] != `` { + args = append(args, "delayed", "true") + } + + if currTs.Annotations[RecoveryAttemptAnnotation] != `` { + args = append(args, "recovered", currTs.Annotations[RecoveryAttemptAnnotation]) + } + + l.Info("engine: execute", args...) +} + +func logDo(cmd0 Command, l *slog.Logger) { + args := []any{"sess", cmd0.SessID()} + + switch cmd := cmd0.(type) { + case *CommitCommand: + args = append(args, "cmd", "commit", "len", len(cmd.Commands)) + case *CommitStateCtxCommand: + args = append(args, "cmd", "commit_state_ctx", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev) + case *TransitCommand: + args = append(args, "cmd", "transit", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev, "to", cmd.FlowID) + case *PauseCommand: + args = append(args, "cmd", "pause", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev) + if cmd.FlowID != `` { + args = append(args, "to", cmd.FlowID) + } + case *ResumeCommand: + args = append(args, "cmd", "resume", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev) + case *EndCommand: + args = append(args, "cmd", "end", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev) + case *DelayCommand: + args = append(args, "cmd", "delay", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev, "dur", cmd.Duration) + case *ExecuteCommand: + args = append(args, "cmd", "execute", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev) + case *NoopCommand: + args = append(args, "cmd", "noop", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev) + case *ReferenceDataCommand: + args = append(args, + "cmd", "reference_data", + "id", cmd.StateCtx.Current.ID, + "rev", cmd.StateCtx.Current.Rev, + "data_id", cmd.Data.ID, + "data_rev", cmd.Data.Rev, + "annot", cmd.Annotation, + ) + case *DereferenceDataCommand: + args = append(args, + "cmd", "dereference_data", + "id", cmd.StateCtx.Current.ID, + "rev", cmd.StateCtx.Current.Rev, + "data_id", cmd.Data.ID, + "data_rev", cmd.Data.Rev, + "annot", cmd.Annotation, + ) + case *StoreDataCommand: + args = append(args, "cmd", "store_data", "data_id", cmd.Data.ID, "data_rev", cmd.Data.Rev) + case *GetDataCommand: + args = append(args, "cmd", "get_data", "data_id", cmd.Data.ID, "data_rev", cmd.Data.Rev) + case *DeserializeCommand: + args = append(args, "cmd", "deserialize") + case *SerializeCommand: + args = append(args, "cmd", "serialize") + case *GetFlowCommand: + args = append(args, "cmd", "get_flow", "flow_id", cmd.StateCtx.Current.Transition.ToID) + case *WatchCommand: + args = append(args, "cmd", "watch") + default: + args = append(args, "cmd", fmt.Sprintf("%T", cmd)) + } + + l.Info("engine: do", args...) + +} diff --git a/memdriver/delayer.go b/memdriver/delayer.go index caf72e5..c85b516 100644 --- a/memdriver/delayer.go +++ b/memdriver/delayer.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "log" + "log/slog" "sync" "time" @@ -17,11 +17,13 @@ type Delayer struct { e *flowstate.Engine stopCh chan struct{} wg sync.WaitGroup + l *slog.Logger } -func NewDelayer() *Delayer { +func NewDelayer(l *slog.Logger) *Delayer { return &Delayer{ stopCh: make(chan struct{}), + l: l, } } @@ -43,26 +45,46 @@ func (d *Delayer) Do(cmd0 flowstate.Command) error { t := time.NewTimer(cmd.Duration) defer t.Stop() + stateCtx := cmd.DelayStateCtx + select { case <-t.C: - if cmd.DelayStateCtx.Current.Transition.Annotations[flowstate.DelayCommitAnnotation] == `true` { + if stateCtx.Current.Transition.Annotations[flowstate.DelayCommitAnnotation] == `true` { conflictErr := &flowstate.ErrCommitConflict{} if err := d.e.Do(flowstate.Commit( flowstate.CommitStateCtx(cmd.DelayStateCtx), )); errors.As(err, conflictErr) { - log.Printf(`ERROR: memdriver: delayer: engine: commit: %s\n`, conflictErr) + d.l.Info("delayer: commit conflict", + "sess", cmd.SessID(), + "conflict", err.Error(), + "id", stateCtx.Current.ID, + "rev", stateCtx.Current.Rev, + ) return } else if err != nil { - log.Printf(`ERROR: memdriver: delayer: engine: commit: %s`, err) + d.l.Error("delayer: commit failed", + "sess", cmd.SessID(), + "error", err, + "id", stateCtx.Current.ID, + "rev", stateCtx.Current.Rev, + ) return } } - if err := d.e.Execute(cmd.DelayStateCtx); err != nil { - log.Printf(`ERROR: memdriver: delayer: engine: execute: %s`, err) + if err := d.e.Execute(stateCtx); err != nil { + d.l.Error("delayer: execute failed", + "sess", stateCtx.SessID(), + "error", err, + "id", stateCtx.Current.ID, + "rev", stateCtx.Current.Rev, + ) } case <-d.stopCh: - log.Printf(`ERROR: memdriver: delayer: state delay was terminated`) + d.l.Error("delayer: delaying terminated", + "id", stateCtx.Current.ID, + "rev", stateCtx.Current.Rev, + ) return } }() diff --git a/memdriver/driver.go b/memdriver/driver.go index fe93e72..189c6a7 100644 --- a/memdriver/driver.go +++ b/memdriver/driver.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "log/slog" + "os" "github.com/makasim/flowstate" ) @@ -11,17 +13,25 @@ import ( type Driver struct { *FlowRegistry - l *Log + log *Log doers []flowstate.Doer recoverer flowstate.Doer + + l *slog.Logger } func New(opts ...Option) *Driver { - l := &Log{} + log := &Log{} d := &Driver{ - l: l, + log: log, FlowRegistry: &FlowRegistry{}, + + l: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{})), + } + + for _, opt := range opts { + opt(d) } doers := []flowstate.Doer{ @@ -37,17 +47,13 @@ func New(opts ...Option) *Driver { NewDataLog(), NewFlowGetter(d.FlowRegistry), - NewCommiter(l), - NewGetter(l), - NewWatcher(l), - NewDelayer(), + NewCommiter(log), + NewGetter(log), + NewWatcher(log), + NewDelayer(d.l), } d.doers = doers - for _, opt := range opts { - opt(d) - } - return d } @@ -105,3 +111,9 @@ func WithRecoverer(r flowstate.Doer) Option { d.recoverer = r } } + +func WithLogger(l *slog.Logger) Option { + return func(d *Driver) { + d.l = l + } +} diff --git a/memdriver/suite_test.go b/memdriver/suite_test.go index e2d60ba..58009a6 100644 --- a/memdriver/suite_test.go +++ b/memdriver/suite_test.go @@ -10,7 +10,8 @@ import ( func TestSuite(t *testing.T) { s := testcases.Get(func(t testcases.TestingT) (flowstate.Doer, testcases.FlowRegistry) { - d := memdriver.New() + l, _ := testcases.NewTestLogger(t) + d := memdriver.New(memdriver.WithLogger(l)) return d, d }) diff --git a/pgdriver/delayer.go b/pgdriver/delayer.go index fb5c390..d7c86bd 100644 --- a/pgdriver/delayer.go +++ b/pgdriver/delayer.go @@ -147,7 +147,11 @@ func (*Delayer) key() string { } func (d *Delayer) Shutdown(_ context.Context) error { - close(d.doneCh) - - return nil + select { + case <-d.doneCh: + return fmt.Errorf(`already shutdown`) + default: + close(d.doneCh) + return nil + } } diff --git a/pgdriver/driver.go b/pgdriver/driver.go index 63a21c2..a693aeb 100644 --- a/pgdriver/driver.go +++ b/pgdriver/driver.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "log/slog" + "os" "time" "github.com/makasim/flowstate" @@ -18,6 +20,7 @@ type Driver struct { doers []flowstate.Doer recoverer flowstate.Doer + l *slog.Logger } func New(conn conn, opts ...Option) *Driver { @@ -26,6 +29,11 @@ func New(conn conn, opts ...Option) *Driver { q: &queries{}, FlowRegistry: &memdriver.FlowRegistry{}, + l: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{})), + } + + for _, opt := range opts { + opt(d) } d.doers = []flowstate.Doer{ @@ -50,10 +58,6 @@ func New(conn conn, opts ...Option) *Driver { NewDelayer(d.conn, d.q, time.Now), } - for _, opt := range opts { - opt(d) - } - return d } @@ -111,3 +115,9 @@ func WithRecoverer(r flowstate.Doer) Option { d.recoverer = r } } + +func WithLogger(l *slog.Logger) Option { + return func(d *Driver) { + d.l = l + } +} diff --git a/pgdriver/suite_test.go b/pgdriver/suite_test.go index 64ff4d7..5251e57 100644 --- a/pgdriver/suite_test.go +++ b/pgdriver/suite_test.go @@ -32,7 +32,8 @@ func TestSuite(t *testing.T) { conn.Close() }) - d := pgdriver.New(conn) + l, _ := testcases.NewTestLogger(t) + d := pgdriver.New(conn, pgdriver.WithLogger(l)) return d, d }) diff --git a/state.go b/state.go index ccbf5e1..9eefab8 100644 --- a/state.go +++ b/state.go @@ -75,7 +75,12 @@ type StateCtx struct { // Transitions between committed and current states Transitions []Transition `json:"transitions"` - e *Engine `json:"-"` + sessID int64 `json:"-"` + e *Engine `json:"-"` +} + +func (s *StateCtx) SessID() int64 { + return s.sessID } func (s *StateCtx) CopyTo(to *StateCtx) *StateCtx { diff --git a/testcases/actor.go b/testcases/actor.go index 206ebfb..8f73da6 100644 --- a/testcases/actor.go +++ b/testcases/actor.go @@ -49,7 +49,8 @@ func Actor(t TestingT, d flowstate.Doer, fr FlowRegistry) { return nil, fmt.Errorf("must never be executed") })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/call_flow.go b/testcases/call_flow.go index d8f7907..74ed62d 100644 --- a/testcases/call_flow.go +++ b/testcases/call_flow.go @@ -79,7 +79,8 @@ func CallFlow(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.End(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/call_flow_with_commit.go b/testcases/call_flow_with_commit.go index 323a690..ca37031 100644 --- a/testcases/call_flow_with_commit.go +++ b/testcases/call_flow_with_commit.go @@ -12,13 +12,6 @@ import ( func CallFlowWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - var nextStateCtx *flowstate.StateCtx - stateCtx := &flowstate.StateCtx{ - Current: flowstate.State{ - ID: "aTID", - }, - } - endedCh := make(chan struct{}) trkr := &Tracker{} @@ -29,7 +22,7 @@ func CallFlowWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.Transit(stateCtx, `callEnd`), nil } - nextStateCtx = &flowstate.StateCtx{ + nextStateCtx := &flowstate.StateCtx{ Current: flowstate.State{ ID: "aNextTID", }, @@ -89,7 +82,8 @@ func CallFlowWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.End(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) @@ -98,6 +92,11 @@ func CallFlowWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) { require.NoError(t, e.Shutdown(sCtx)) }() + stateCtx := &flowstate.StateCtx{ + Current: flowstate.State{ + ID: "aTID", + }, + } err = e.Do(flowstate.Commit( flowstate.Transit(stateCtx, `call`), )) diff --git a/testcases/call_flow_with_watch.go b/testcases/call_flow_with_watch.go index 79912c0..00b0d41 100644 --- a/testcases/call_flow_with_watch.go +++ b/testcases/call_flow_with_watch.go @@ -97,7 +97,8 @@ func CallFlowWithWatch(t TestingT, d flowstate.Doer, fr FlowRegistry) { ), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/condition.go b/testcases/condition.go index f881cc0..84c5802 100644 --- a/testcases/condition.go +++ b/testcases/condition.go @@ -33,7 +33,8 @@ func Condition(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.End(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/cron.go b/testcases/cron.go index cb4f76b..75d5133 100644 --- a/testcases/cron.go +++ b/testcases/cron.go @@ -87,7 +87,8 @@ func Cron(t TestingT, d flowstate.Doer, fr FlowRegistry) { ), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/data_flow_config.go b/testcases/data_flow_config.go index fb11c9b..240ef6f 100644 --- a/testcases/data_flow_config.go +++ b/testcases/data_flow_config.go @@ -125,7 +125,8 @@ func DataFlowConfig(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.End(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/data_store_get.go b/testcases/data_store_get.go index 7d5a448..952b612 100644 --- a/testcases/data_store_get.go +++ b/testcases/data_store_get.go @@ -63,7 +63,8 @@ func DataStoreGet(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.End(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/data_store_get_with_commit.go b/testcases/data_store_get_with_commit.go index b775b02..1fe9a4f 100644 --- a/testcases/data_store_get_with_commit.go +++ b/testcases/data_store_get_with_commit.go @@ -63,7 +63,8 @@ func DataStoreGetWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.End(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/delay_delayed_win_with_commit.go b/testcases/delay_delayed_win_with_commit.go index 4ce3df8..8d0979a 100644 --- a/testcases/delay_delayed_win_with_commit.go +++ b/testcases/delay_delayed_win_with_commit.go @@ -48,7 +48,8 @@ func Delay_DelayedWin_WithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) ), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/delay_engine_do.go b/testcases/delay_engine_do.go index 3d075fb..98567c3 100644 --- a/testcases/delay_engine_do.go +++ b/testcases/delay_engine_do.go @@ -33,7 +33,8 @@ func Delay_EngineDo(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.End(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/delay_paused.go b/testcases/delay_paused.go index ed8eb7a..e02313d 100644 --- a/testcases/delay_paused.go +++ b/testcases/delay_paused.go @@ -38,7 +38,8 @@ func Delay_Paused(t TestingT, d flowstate.Doer, fr FlowRegistry) { ), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/delay_paused_with_commit.go b/testcases/delay_paused_with_commit.go index 175e22d..fbdeaea 100644 --- a/testcases/delay_paused_with_commit.go +++ b/testcases/delay_paused_with_commit.go @@ -40,7 +40,8 @@ func Delay_PausedWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) { ), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/delay_return.go b/testcases/delay_return.go index 9245f68..a6ea7ec 100644 --- a/testcases/delay_return.go +++ b/testcases/delay_return.go @@ -27,7 +27,8 @@ func Delay_Return(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.End(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/delay_transited_win_with_commit.go b/testcases/delay_transited_win_with_commit.go index 81f2731..357effc 100644 --- a/testcases/delay_transited_win_with_commit.go +++ b/testcases/delay_transited_win_with_commit.go @@ -44,7 +44,8 @@ func Delay_TransitedWin_WithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry ), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/fork.go b/testcases/fork.go index 57f0702..7a6bb3c 100644 --- a/testcases/fork.go +++ b/testcases/fork.go @@ -51,7 +51,8 @@ func Fork(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.End(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/fork_join_first_wins.go b/testcases/fork_join_first_wins.go index fe8d469..d860464 100644 --- a/testcases/fork_join_first_wins.go +++ b/testcases/fork_join_first_wins.go @@ -12,8 +12,6 @@ import ( func ForkJoin_FirstWins(t TestingT, d flowstate.Doer, fr FlowRegistry) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - var forkedStateCtx *flowstate.StateCtx - var forkedTwoStateCtx *flowstate.StateCtx stateCtx := &flowstate.StateCtx{ Current: flowstate.State{ ID: "aTID", @@ -26,16 +24,17 @@ func ForkJoin_FirstWins(t TestingT, d flowstate.Doer, fr FlowRegistry) { Track(stateCtx, trkr) stateCtx.Current.SetLabel(`theForkJoinLabel`, string(stateCtx.Current.ID)) - forkedStateCtx = stateCtx.NewTo(`forkedTID`, &flowstate.StateCtx{}) - forkedTwoStateCtx = stateCtx.NewTo(`forkedTwoTID`, &flowstate.StateCtx{}) + forkedStateCtx := stateCtx.NewTo(`forkedTID`, &flowstate.StateCtx{}) + forkedTwoStateCtx := stateCtx.NewTo(`forkedTwoTID`, &flowstate.StateCtx{}) + copyStateCtx := stateCtx.CopyTo(&flowstate.StateCtx{}) if err := e.Do( flowstate.Commit( - flowstate.Transit(stateCtx, `forked`), + flowstate.Transit(copyStateCtx, `forked`), flowstate.Transit(forkedStateCtx, `forked`), flowstate.Transit(forkedTwoStateCtx, `forked`), ), - flowstate.Execute(stateCtx), + flowstate.Execute(copyStateCtx), flowstate.Execute(forkedStateCtx), flowstate.Execute(forkedTwoStateCtx), ); err != nil { @@ -104,7 +103,8 @@ func ForkJoin_FirstWins(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.End(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/fork_join_last_wins.go b/testcases/fork_join_last_wins.go index 6db87b1..e2b4b9b 100644 --- a/testcases/fork_join_last_wins.go +++ b/testcases/fork_join_last_wins.go @@ -12,8 +12,6 @@ import ( func ForkJoin_LastWins(t TestingT, d flowstate.Doer, fr FlowRegistry) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - var forkedStateCtx *flowstate.StateCtx - var forkedTwoStateCtx *flowstate.StateCtx stateCtx := &flowstate.StateCtx{ Current: flowstate.State{ ID: "aTID", @@ -27,16 +25,17 @@ func ForkJoin_LastWins(t TestingT, d flowstate.Doer, fr FlowRegistry) { stateCtx.Current.SetLabel(`theForkJoinLabel`, string(stateCtx.Current.ID)) - forkedStateCtx = stateCtx.NewTo(`forkedTID`, &flowstate.StateCtx{}) - forkedTwoStateCtx = stateCtx.NewTo(`forkedTwoTID`, &flowstate.StateCtx{}) + forkedStateCtx := stateCtx.NewTo(`forkedTID`, &flowstate.StateCtx{}) + forkedTwoStateCtx := stateCtx.NewTo(`forkedTwoTID`, &flowstate.StateCtx{}) + copyStateCtx := stateCtx.CopyTo(&flowstate.StateCtx{}) if err := e.Do( flowstate.Commit( - flowstate.Transit(stateCtx, `forked`), + flowstate.Transit(copyStateCtx, `forked`), flowstate.Transit(forkedStateCtx, `forked`), flowstate.Transit(forkedTwoStateCtx, `forked`), ), - flowstate.Execute(stateCtx), + flowstate.Execute(copyStateCtx), flowstate.Execute(forkedStateCtx), flowstate.Execute(forkedTwoStateCtx), ); err != nil { @@ -105,7 +104,8 @@ func ForkJoin_LastWins(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.End(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/fork_with_commit.go b/testcases/fork_with_commit.go index 648f39d..1bf0d67 100644 --- a/testcases/fork_with_commit.go +++ b/testcases/fork_with_commit.go @@ -53,7 +53,8 @@ func Fork_WithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.End(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/get_by_id_and_rev.go b/testcases/get_by_id_and_rev.go index bc1f8d3..444ea67 100644 --- a/testcases/get_by_id_and_rev.go +++ b/testcases/get_by_id_and_rev.go @@ -12,7 +12,8 @@ import ( func GetByIDAndRev(t TestingT, d flowstate.Doer, _ FlowRegistry) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/get_latest_by_id.go b/testcases/get_latest_by_id.go index 24c02db..d315b79 100644 --- a/testcases/get_latest_by_id.go +++ b/testcases/get_latest_by_id.go @@ -12,7 +12,8 @@ import ( func GetLatestByID(t TestingT, d flowstate.Doer, _ FlowRegistry) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/get_latest_by_label.go b/testcases/get_latest_by_label.go index eed7937..541eeb5 100644 --- a/testcases/get_latest_by_label.go +++ b/testcases/get_latest_by_label.go @@ -12,7 +12,8 @@ import ( func GetLatestByLabel(t TestingT, d flowstate.Doer, _ FlowRegistry) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/get_not_found.go b/testcases/get_not_found.go index 615a0a6..ab4c432 100644 --- a/testcases/get_not_found.go +++ b/testcases/get_not_found.go @@ -12,7 +12,8 @@ import ( func GetNotFound(t TestingT, d flowstate.Doer, _ FlowRegistry) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/log.go b/testcases/log.go new file mode 100644 index 0000000..80266c3 --- /dev/null +++ b/testcases/log.go @@ -0,0 +1,27 @@ +package testcases + +import ( + "log/slog" + "os" + + "github.com/thejerf/slogassert" +) + +func NewTestLogger(t TestingT) (*slog.Logger, *slogassert.Handler) { + var wrappedH slog.Handler + if os.Getenv(`TEST_OUTPUT_LOG`) == `true` { + wrappedH = slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ + ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { + if a.Key == `time` { + return slog.Attr{} + } + return a + }, + }) + } + + h := slogassert.New(t, slog.LevelDebug, wrappedH) + l := slog.New(h) + + return l, h +} diff --git a/testcases/mutex.go b/testcases/mutex.go index 743aa5f..bb06cec 100644 --- a/testcases/mutex.go +++ b/testcases/mutex.go @@ -118,7 +118,8 @@ func Mutex(t TestingT, d flowstate.Doer, fr FlowRegistry) { states = append(states, statesCtx) } - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/queue.go b/testcases/queue.go index 8436f9f..5b0ab3c 100644 --- a/testcases/queue.go +++ b/testcases/queue.go @@ -69,7 +69,8 @@ func Queue(t TestingT, d flowstate.Doer, fr FlowRegistry) { ), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/rate_limit.go b/testcases/rate_limit.go index 0c9d2bc..1377f69 100644 --- a/testcases/rate_limit.go +++ b/testcases/rate_limit.go @@ -92,7 +92,8 @@ func RateLimit(t TestingT, d flowstate.Doer, fr FlowRegistry) { states = append(states, stateCtx) } - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/recovery_always_fail.go b/testcases/recovery_always_fail.go index ac53bb9..6bf3764 100644 --- a/testcases/recovery_always_fail.go +++ b/testcases/recovery_always_fail.go @@ -20,7 +20,8 @@ func RecoveryAlwaysFail(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.Noop(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/recovery_first_attempt_fail.go b/testcases/recovery_first_attempt_fail.go index a4754f3..c7fa826 100644 --- a/testcases/recovery_first_attempt_fail.go +++ b/testcases/recovery_first_attempt_fail.go @@ -27,7 +27,8 @@ func RecoveryFirstAttemptFail(t TestingT, d flowstate.Doer, fr FlowRegistry) { ), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/single_node.go b/testcases/single_node.go index 61ddfcb..459d001 100644 --- a/testcases/single_node.go +++ b/testcases/single_node.go @@ -19,7 +19,8 @@ func SingleNode(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.End(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/suite.go b/testcases/suite.go index b08adb5..d4974fd 100644 --- a/testcases/suite.go +++ b/testcases/suite.go @@ -7,6 +7,7 @@ import ( ) type TestingT interface { + Helper() Error(...interface{}) Errorf(format string, args ...interface{}) Fatalf(format string, args ...any) diff --git a/testcases/three_consequent_nodes.go b/testcases/three_consequent_nodes.go index 449d38e..40107c7 100644 --- a/testcases/three_consequent_nodes.go +++ b/testcases/three_consequent_nodes.go @@ -27,7 +27,8 @@ func ThreeConsequentNodes(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.End(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/two_consequent_nodes.go b/testcases/two_consequent_nodes.go index 76e9f57..026150e 100644 --- a/testcases/two_consequent_nodes.go +++ b/testcases/two_consequent_nodes.go @@ -23,7 +23,8 @@ func TwoConsequentNodes(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.End(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/two_consequent_nodes_with_commit.go b/testcases/two_consequent_nodes_with_commit.go index 4123e51..d4f07ed 100644 --- a/testcases/two_consequent_nodes_with_commit.go +++ b/testcases/two_consequent_nodes_with_commit.go @@ -29,7 +29,8 @@ func TwoConsequentNodesWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) return flowstate.End(stateCtx), nil })) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) require.NoError(t, err) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) diff --git a/testcases/watch_labels.go b/testcases/watch_labels.go index 2b18283..dd1897f 100644 --- a/testcases/watch_labels.go +++ b/testcases/watch_labels.go @@ -13,7 +13,8 @@ import ( func WatchLabels(t TestingT, d flowstate.Doer, _ FlowRegistry) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) defer sCtxCancel() diff --git a/testcases/watch_or_labels.go b/testcases/watch_or_labels.go index 9dd51d0..e0f653b 100644 --- a/testcases/watch_or_labels.go +++ b/testcases/watch_or_labels.go @@ -12,7 +12,8 @@ import ( func WatchORLabels(t TestingT, d flowstate.Doer, _ FlowRegistry) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) defer sCtxCancel() diff --git a/testcases/watch_since_latest.go b/testcases/watch_since_latest.go index e65b0ff..bed6420 100644 --- a/testcases/watch_since_latest.go +++ b/testcases/watch_since_latest.go @@ -12,7 +12,8 @@ import ( func WatchSinceLatest(t TestingT, d flowstate.Doer, _ FlowRegistry) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) defer sCtxCancel() diff --git a/testcases/watch_since_rev.go b/testcases/watch_since_rev.go index f22ae0e..5ffd478 100644 --- a/testcases/watch_since_rev.go +++ b/testcases/watch_since_rev.go @@ -12,7 +12,8 @@ import ( func WatchSinceRev(t TestingT, d flowstate.Doer, _ FlowRegistry) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) defer sCtxCancel() diff --git a/testcases/watch_since_time.go b/testcases/watch_since_time.go index 9c1466e..6460e65 100644 --- a/testcases/watch_since_time.go +++ b/testcases/watch_since_time.go @@ -12,7 +12,8 @@ import ( func WatchSinceTime(t TestingT, d flowstate.Doer, _ FlowRegistry) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - e, err := flowstate.NewEngine(d) + l, _ := NewTestLogger(t) + e, err := flowstate.NewEngine(d, l) defer func() { sCtx, sCtxCancel := context.WithTimeout(context.Background(), time.Second*5) defer sCtxCancel()