From ad5c729013adbadc4eb4034d60034981d2798a78 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 11 Nov 2024 18:03:25 +0200 Subject: [PATCH] fixes --- cmd.go | 15 ++++++++------- cmd_commit.go | 7 +++++++ engine.go | 18 ++++++++++++------ log.go | 16 +++++++--------- state.go | 7 ++++++- testcases/call_flow_with_commit.go | 14 ++++++-------- testcases/fork_join_first_wins.go | 11 +++++------ testcases/fork_join_last_wins.go | 11 +++++------ testcases/log.go | 15 ++++++++++++++- 9 files changed, 70 insertions(+), 44 deletions(-) diff --git a/cmd.go b/cmd.go index 395e596..4288c6d 100644 --- a/cmd.go +++ b/cmd.go @@ -1,20 +1,21 @@ package flowstate type Command interface { - setDoID(doID int64) - doID() int64 + setSessID(id int64) + SessID() int64 cmd() } type command struct { - propDoID int64 + sessID int64 } func (_ *command) cmd() {} -func (cmd *command) setDoID(doID int64) { - cmd.propDoID = doID +func (cmd *command) setSessID(doID int64) { + cmd.sessID = doID } -func (cmd *command) doID() int64 { - return cmd.propDoID + +func (cmd *command) SessID() int64 { + return cmd.sessID } diff --git a/cmd_commit.go b/cmd_commit.go index 9d56054..a4f9f38 100644 --- a/cmd_commit.go +++ b/cmd_commit.go @@ -15,6 +15,13 @@ type CommitCommand struct { Commands []Command } +func (cmd *CommitCommand) setSessID(id int64) { + cmd.command.setSessID(id) + for _, subCmd := range cmd.Commands { + subCmd.setSessID(id) + } +} + func CommitStateCtx(stateCtx *StateCtx) *CommitStateCtxCommand { return &CommitStateCtxCommand{ StateCtx: stateCtx, diff --git a/engine.go b/engine.go index e0cdd81..3d10a43 100644 --- a/engine.go +++ b/engine.go @@ -11,7 +11,7 @@ import ( ) var ErrFlowNotFound = errors.New("flow not found") -var doIDS = &atomic.Int64{} +var sessIDS = &atomic.Int64{} type Engine struct { d Doer @@ -48,6 +48,8 @@ func (e *Engine) Execute(stateCtx *StateCtx) error { defer e.wg.Done() } + sessID := sessIDS.Add(1) + stateCtx.sessID = sessID stateCtx.e = e if stateCtx.Current.ID == `` { @@ -76,7 +78,7 @@ func (e *Engine) Execute(stateCtx *StateCtx) error { return err } - cmd0.setDoID(doIDS.Add(1)) + cmd0.setSessID(sessID) if cmd, ok := cmd0.(*ExecuteCommand); ok { cmd.sync = true @@ -107,14 +109,18 @@ func (e *Engine) Do(cmds ...Command) error { return fmt.Errorf("no commands to do") } - doID := doIDS.Add(1) + var sessID int64 for _, cmd := range cmds { - if cmd.doID() == 0 { - cmd.setDoID(doID) + if cmd.SessID() == 0 { + if sessID == 0 { + sessID = sessIDS.Add(1) + } + + cmd.setSessID(sessID) if cmtCmd, ok := cmd.(*CommitCommand); ok { for _, subCmd := range cmtCmd.Commands { - subCmd.setDoID(doID) + subCmd.setSessID(sessID) } } } diff --git a/log.go b/log.go index 6c48a06..797064d 100644 --- a/log.go +++ b/log.go @@ -2,13 +2,12 @@ package flowstate import ( "fmt" - "log" "log/slog" - "strconv" ) func logExecute(stateCtx *StateCtx, l *slog.Logger) { args := []any{ + "sess", stateCtx.sessID, "flow", stateCtx.Current.Transition.ToID, "id", stateCtx.Current.ID, "rev", stateCtx.Current.Rev, @@ -28,18 +27,17 @@ func logExecute(stateCtx *StateCtx, l *slog.Logger) { args = append(args, "recovered", currTs.Annotations[RecoveryAttemptAnnotation]) } - l.Info("execute", args...) + l.Info("execute flow", args...) } func logDo(cmd0 Command, l *slog.Logger) { - var args []any + args := []any{"sess", cmd0.SessID()} switch cmd := cmd0.(type) { case *CommitCommand: - for _, c := range cmd.Commands { - log.Printf("commit: %T", c) - } - + //for _, c := range cmd.Commands { + // log.Printf("commit: %T", c) + //} args = append(args, "cmd", "commit", "len", len(cmd.Commands)) case *CommitStateCtxCommand: args = append(args, "cmd", "commit_state_ctx", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev) @@ -94,6 +92,6 @@ func logDo(cmd0 Command, l *slog.Logger) { args = append(args, "cmd", fmt.Sprintf("%T", cmd)) } - l.Info("do #"+strconv.FormatInt(cmd0.doID(), 10), args...) + l.Info("do commands", args...) } diff --git a/state.go b/state.go index ccbf5e1..9eefab8 100644 --- a/state.go +++ b/state.go @@ -75,7 +75,12 @@ type StateCtx struct { // Transitions between committed and current states Transitions []Transition `json:"transitions"` - e *Engine `json:"-"` + sessID int64 `json:"-"` + e *Engine `json:"-"` +} + +func (s *StateCtx) SessID() int64 { + return s.sessID } func (s *StateCtx) CopyTo(to *StateCtx) *StateCtx { diff --git a/testcases/call_flow_with_commit.go b/testcases/call_flow_with_commit.go index 72fd18e..a684e2a 100644 --- a/testcases/call_flow_with_commit.go +++ b/testcases/call_flow_with_commit.go @@ -12,13 +12,6 @@ import ( func CallFlowWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - var nextStateCtx *flowstate.StateCtx - stateCtx := &flowstate.StateCtx{ - Current: flowstate.State{ - ID: "aTID", - }, - } - endedCh := make(chan struct{}) trkr := &Tracker{} @@ -29,7 +22,7 @@ func CallFlowWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) { return flowstate.Transit(stateCtx, `callEnd`), nil } - nextStateCtx = &flowstate.StateCtx{ + nextStateCtx := &flowstate.StateCtx{ Current: flowstate.State{ ID: "aNextTID", }, @@ -99,6 +92,11 @@ func CallFlowWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) { require.NoError(t, e.Shutdown(sCtx)) }() + stateCtx := &flowstate.StateCtx{ + Current: flowstate.State{ + ID: "aTID", + }, + } err = e.Do(flowstate.Commit( flowstate.Transit(stateCtx, `call`), )) diff --git a/testcases/fork_join_first_wins.go b/testcases/fork_join_first_wins.go index 24248c0..253399a 100644 --- a/testcases/fork_join_first_wins.go +++ b/testcases/fork_join_first_wins.go @@ -12,8 +12,6 @@ import ( func ForkJoin_FirstWins(t TestingT, d flowstate.Doer, fr FlowRegistry) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - var forkedStateCtx *flowstate.StateCtx - var forkedTwoStateCtx *flowstate.StateCtx stateCtx := &flowstate.StateCtx{ Current: flowstate.State{ ID: "aTID", @@ -26,16 +24,17 @@ func ForkJoin_FirstWins(t TestingT, d flowstate.Doer, fr FlowRegistry) { Track(stateCtx, trkr) stateCtx.Current.SetLabel(`theForkJoinLabel`, string(stateCtx.Current.ID)) - forkedStateCtx = stateCtx.NewTo(`forkedTID`, &flowstate.StateCtx{}) - forkedTwoStateCtx = stateCtx.NewTo(`forkedTwoTID`, &flowstate.StateCtx{}) + forkedStateCtx := stateCtx.NewTo(`forkedTID`, &flowstate.StateCtx{}) + forkedTwoStateCtx := stateCtx.NewTo(`forkedTwoTID`, &flowstate.StateCtx{}) + copyStateCtx := stateCtx.CopyTo(&flowstate.StateCtx{}) if err := e.Do( flowstate.Commit( - flowstate.Transit(stateCtx, `forked`), + flowstate.Transit(copyStateCtx, `forked`), flowstate.Transit(forkedStateCtx, `forked`), flowstate.Transit(forkedTwoStateCtx, `forked`), ), - flowstate.Execute(stateCtx), + flowstate.Execute(copyStateCtx), flowstate.Execute(forkedStateCtx), flowstate.Execute(forkedTwoStateCtx), ); err != nil { diff --git a/testcases/fork_join_last_wins.go b/testcases/fork_join_last_wins.go index 7258a66..68aeab7 100644 --- a/testcases/fork_join_last_wins.go +++ b/testcases/fork_join_last_wins.go @@ -12,8 +12,6 @@ import ( func ForkJoin_LastWins(t TestingT, d flowstate.Doer, fr FlowRegistry) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) - var forkedStateCtx *flowstate.StateCtx - var forkedTwoStateCtx *flowstate.StateCtx stateCtx := &flowstate.StateCtx{ Current: flowstate.State{ ID: "aTID", @@ -27,16 +25,17 @@ func ForkJoin_LastWins(t TestingT, d flowstate.Doer, fr FlowRegistry) { stateCtx.Current.SetLabel(`theForkJoinLabel`, string(stateCtx.Current.ID)) - forkedStateCtx = stateCtx.NewTo(`forkedTID`, &flowstate.StateCtx{}) - forkedTwoStateCtx = stateCtx.NewTo(`forkedTwoTID`, &flowstate.StateCtx{}) + forkedStateCtx := stateCtx.NewTo(`forkedTID`, &flowstate.StateCtx{}) + forkedTwoStateCtx := stateCtx.NewTo(`forkedTwoTID`, &flowstate.StateCtx{}) + copyStateCtx := stateCtx.CopyTo(&flowstate.StateCtx{}) if err := e.Do( flowstate.Commit( - flowstate.Transit(stateCtx, `forked`), + flowstate.Transit(copyStateCtx, `forked`), flowstate.Transit(forkedStateCtx, `forked`), flowstate.Transit(forkedTwoStateCtx, `forked`), ), - flowstate.Execute(stateCtx), + flowstate.Execute(copyStateCtx), flowstate.Execute(forkedStateCtx), flowstate.Execute(forkedTwoStateCtx), ); err != nil { diff --git a/testcases/log.go b/testcases/log.go index c98a312..dc458cc 100644 --- a/testcases/log.go +++ b/testcases/log.go @@ -2,12 +2,25 @@ package testcases import ( "log/slog" + "os" "github.com/thejerf/slogassert" ) func newTestLogger(t TestingT) (*slog.Logger, *slogassert.Handler) { - h := slogassert.New(t, slog.LevelWarn, nil) + var wrappedH slog.Handler + if os.Getenv(`TEST_OUTPUT_LOG`) == `true` { + wrappedH = slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{ + ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { + if a.Key == `time` { + return slog.Attr{} + } + return a + }, + }) + } + + h := slogassert.New(t, slog.LevelDebug, wrappedH) l := slog.New(h) return l, h