Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
makasim committed Nov 11, 2024
1 parent 5c8e9c2 commit ad5c729
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 44 deletions.
15 changes: 8 additions & 7 deletions cmd.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package flowstate

type Command interface {
setDoID(doID int64)
doID() int64
setSessID(id int64)
SessID() int64
cmd()
}

type command struct {
propDoID int64
sessID int64
}

func (_ *command) cmd() {}

func (cmd *command) setDoID(doID int64) {
cmd.propDoID = doID
func (cmd *command) setSessID(doID int64) {
cmd.sessID = doID
}
func (cmd *command) doID() int64 {
return cmd.propDoID

func (cmd *command) SessID() int64 {
return cmd.sessID
}
7 changes: 7 additions & 0 deletions cmd_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ type CommitCommand struct {
Commands []Command
}

func (cmd *CommitCommand) setSessID(id int64) {
cmd.command.setSessID(id)
for _, subCmd := range cmd.Commands {
subCmd.setSessID(id)
}
}

func CommitStateCtx(stateCtx *StateCtx) *CommitStateCtxCommand {
return &CommitStateCtxCommand{
StateCtx: stateCtx,
Expand Down
18 changes: 12 additions & 6 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

var ErrFlowNotFound = errors.New("flow not found")
var doIDS = &atomic.Int64{}
var sessIDS = &atomic.Int64{}

type Engine struct {
d Doer
Expand Down Expand Up @@ -48,6 +48,8 @@ func (e *Engine) Execute(stateCtx *StateCtx) error {
defer e.wg.Done()
}

sessID := sessIDS.Add(1)
stateCtx.sessID = sessID
stateCtx.e = e

if stateCtx.Current.ID == `` {
Expand Down Expand Up @@ -76,7 +78,7 @@ func (e *Engine) Execute(stateCtx *StateCtx) error {
return err
}

cmd0.setDoID(doIDS.Add(1))
cmd0.setSessID(sessID)

if cmd, ok := cmd0.(*ExecuteCommand); ok {
cmd.sync = true
Expand Down Expand Up @@ -107,14 +109,18 @@ func (e *Engine) Do(cmds ...Command) error {
return fmt.Errorf("no commands to do")
}

doID := doIDS.Add(1)
var sessID int64
for _, cmd := range cmds {
if cmd.doID() == 0 {
cmd.setDoID(doID)
if cmd.SessID() == 0 {
if sessID == 0 {
sessID = sessIDS.Add(1)
}

cmd.setSessID(sessID)

if cmtCmd, ok := cmd.(*CommitCommand); ok {
for _, subCmd := range cmtCmd.Commands {
subCmd.setDoID(doID)
subCmd.setSessID(sessID)
}
}
}
Expand Down
16 changes: 7 additions & 9 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@ package flowstate

import (
"fmt"
"log"
"log/slog"
"strconv"
)

func logExecute(stateCtx *StateCtx, l *slog.Logger) {
args := []any{
"sess", stateCtx.sessID,
"flow", stateCtx.Current.Transition.ToID,
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
Expand All @@ -28,18 +27,17 @@ func logExecute(stateCtx *StateCtx, l *slog.Logger) {
args = append(args, "recovered", currTs.Annotations[RecoveryAttemptAnnotation])
}

l.Info("execute", args...)
l.Info("execute flow", args...)
}

func logDo(cmd0 Command, l *slog.Logger) {
var args []any
args := []any{"sess", cmd0.SessID()}

switch cmd := cmd0.(type) {
case *CommitCommand:
for _, c := range cmd.Commands {
log.Printf("commit: %T", c)
}

//for _, c := range cmd.Commands {
// log.Printf("commit: %T", c)
//}
args = append(args, "cmd", "commit", "len", len(cmd.Commands))
case *CommitStateCtxCommand:
args = append(args, "cmd", "commit_state_ctx", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev)
Expand Down Expand Up @@ -94,6 +92,6 @@ func logDo(cmd0 Command, l *slog.Logger) {
args = append(args, "cmd", fmt.Sprintf("%T", cmd))
}

l.Info("do #"+strconv.FormatInt(cmd0.doID(), 10), args...)
l.Info("do commands", args...)

}
7 changes: 6 additions & 1 deletion state.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ type StateCtx struct {
// Transitions between committed and current states
Transitions []Transition `json:"transitions"`

e *Engine `json:"-"`
sessID int64 `json:"-"`
e *Engine `json:"-"`
}

func (s *StateCtx) SessID() int64 {
return s.sessID
}

func (s *StateCtx) CopyTo(to *StateCtx) *StateCtx {
Expand Down
14 changes: 6 additions & 8 deletions testcases/call_flow_with_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@ import (
func CallFlowWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())

var nextStateCtx *flowstate.StateCtx
stateCtx := &flowstate.StateCtx{
Current: flowstate.State{
ID: "aTID",
},
}

endedCh := make(chan struct{})
trkr := &Tracker{}

Expand All @@ -29,7 +22,7 @@ func CallFlowWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.Transit(stateCtx, `callEnd`), nil
}

nextStateCtx = &flowstate.StateCtx{
nextStateCtx := &flowstate.StateCtx{
Current: flowstate.State{
ID: "aNextTID",
},
Expand Down Expand Up @@ -99,6 +92,11 @@ func CallFlowWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) {
require.NoError(t, e.Shutdown(sCtx))
}()

stateCtx := &flowstate.StateCtx{
Current: flowstate.State{
ID: "aTID",
},
}
err = e.Do(flowstate.Commit(
flowstate.Transit(stateCtx, `call`),
))
Expand Down
11 changes: 5 additions & 6 deletions testcases/fork_join_first_wins.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
func ForkJoin_FirstWins(t TestingT, d flowstate.Doer, fr FlowRegistry) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())

var forkedStateCtx *flowstate.StateCtx
var forkedTwoStateCtx *flowstate.StateCtx
stateCtx := &flowstate.StateCtx{
Current: flowstate.State{
ID: "aTID",
Expand All @@ -26,16 +24,17 @@ func ForkJoin_FirstWins(t TestingT, d flowstate.Doer, fr FlowRegistry) {
Track(stateCtx, trkr)

stateCtx.Current.SetLabel(`theForkJoinLabel`, string(stateCtx.Current.ID))
forkedStateCtx = stateCtx.NewTo(`forkedTID`, &flowstate.StateCtx{})
forkedTwoStateCtx = stateCtx.NewTo(`forkedTwoTID`, &flowstate.StateCtx{})
forkedStateCtx := stateCtx.NewTo(`forkedTID`, &flowstate.StateCtx{})
forkedTwoStateCtx := stateCtx.NewTo(`forkedTwoTID`, &flowstate.StateCtx{})
copyStateCtx := stateCtx.CopyTo(&flowstate.StateCtx{})

if err := e.Do(
flowstate.Commit(
flowstate.Transit(stateCtx, `forked`),
flowstate.Transit(copyStateCtx, `forked`),
flowstate.Transit(forkedStateCtx, `forked`),
flowstate.Transit(forkedTwoStateCtx, `forked`),
),
flowstate.Execute(stateCtx),
flowstate.Execute(copyStateCtx),
flowstate.Execute(forkedStateCtx),
flowstate.Execute(forkedTwoStateCtx),
); err != nil {
Expand Down
11 changes: 5 additions & 6 deletions testcases/fork_join_last_wins.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
func ForkJoin_LastWins(t TestingT, d flowstate.Doer, fr FlowRegistry) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())

var forkedStateCtx *flowstate.StateCtx
var forkedTwoStateCtx *flowstate.StateCtx
stateCtx := &flowstate.StateCtx{
Current: flowstate.State{
ID: "aTID",
Expand All @@ -27,16 +25,17 @@ func ForkJoin_LastWins(t TestingT, d flowstate.Doer, fr FlowRegistry) {

stateCtx.Current.SetLabel(`theForkJoinLabel`, string(stateCtx.Current.ID))

forkedStateCtx = stateCtx.NewTo(`forkedTID`, &flowstate.StateCtx{})
forkedTwoStateCtx = stateCtx.NewTo(`forkedTwoTID`, &flowstate.StateCtx{})
forkedStateCtx := stateCtx.NewTo(`forkedTID`, &flowstate.StateCtx{})
forkedTwoStateCtx := stateCtx.NewTo(`forkedTwoTID`, &flowstate.StateCtx{})
copyStateCtx := stateCtx.CopyTo(&flowstate.StateCtx{})

if err := e.Do(
flowstate.Commit(
flowstate.Transit(stateCtx, `forked`),
flowstate.Transit(copyStateCtx, `forked`),
flowstate.Transit(forkedStateCtx, `forked`),
flowstate.Transit(forkedTwoStateCtx, `forked`),
),
flowstate.Execute(stateCtx),
flowstate.Execute(copyStateCtx),
flowstate.Execute(forkedStateCtx),
flowstate.Execute(forkedTwoStateCtx),
); err != nil {
Expand Down
15 changes: 14 additions & 1 deletion testcases/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,25 @@ package testcases

import (
"log/slog"
"os"

"github.com/thejerf/slogassert"
)

func newTestLogger(t TestingT) (*slog.Logger, *slogassert.Handler) {
h := slogassert.New(t, slog.LevelWarn, nil)
var wrappedH slog.Handler
if os.Getenv(`TEST_OUTPUT_LOG`) == `true` {
wrappedH = slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
if a.Key == `time` {
return slog.Attr{}
}
return a
},
})
}

h := slogassert.New(t, slog.LevelDebug, wrappedH)
l := slog.New(h)

return l, h
Expand Down

0 comments on commit ad5c729

Please sign in to comment.