Skip to content

Commit

Permalink
Merge pull request #57 from makasim/slog
Browse files Browse the repository at this point in the history
[Draft] slog poc
  • Loading branch information
makasim authored Nov 12, 2024
2 parents 2abddbf + a82a7c5 commit 4ad8716
Show file tree
Hide file tree
Showing 52 changed files with 361 additions and 91 deletions.
11 changes: 11 additions & 0 deletions cmd.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
package flowstate

type Command interface {
setSessID(id int64)
SessID() int64
cmd()
}

type command struct {
sessID int64
}

func (_ *command) cmd() {}

func (cmd *command) setSessID(doID int64) {
cmd.sessID = doID
}

func (cmd *command) SessID() int64 {
return cmd.sessID
}
7 changes: 7 additions & 0 deletions cmd_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ type CommitCommand struct {
Commands []Command
}

func (cmd *CommitCommand) setSessID(id int64) {
cmd.command.setSessID(id)
for _, subCmd := range cmd.Commands {
subCmd.setSessID(id)
}
}

func CommitStateCtx(stateCtx *StateCtx) *CommitStateCtxCommand {
return &CommitStateCtxCommand{
StateCtx: stateCtx,
Expand Down
49 changes: 44 additions & 5 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,26 @@ import (
"context"
"errors"
"fmt"
"log"
"log/slog"
"sync"
"sync/atomic"
)

var ErrFlowNotFound = errors.New("flow not found")
var sessIDS = &atomic.Int64{}

type Engine struct {
d Doer
l *slog.Logger

wg *sync.WaitGroup
doneCh chan struct{}
}

func NewEngine(d Doer) (*Engine, error) {
func NewEngine(d Doer, l *slog.Logger) (*Engine, error) {
e := &Engine{
d: d,
l: l,

wg: &sync.WaitGroup{},
doneCh: make(chan struct{}),
Expand All @@ -43,6 +47,8 @@ func (e *Engine) Execute(stateCtx *StateCtx) error {
defer e.wg.Done()
}

sessID := sessIDS.Add(1)
stateCtx.sessID = sessID
stateCtx.e = e

if stateCtx.Current.ID == `` {
Expand All @@ -65,19 +71,27 @@ func (e *Engine) Execute(stateCtx *StateCtx) error {
return err
}

logExecute(stateCtx, e.l)
cmd0, err := f.Execute(stateCtx, e)
if err != nil {
return err
}

cmd0.setSessID(sessID)

if cmd, ok := cmd0.(*ExecuteCommand); ok {
cmd.sync = true
}

conflictErr := &ErrCommitConflict{}

if err = e.do(cmd0); errors.As(err, conflictErr) {
log.Printf("INFO: engine: execute: %s\n", conflictErr)
e.l.Info("engine: do conflict",
"sess", cmd0.SessID(),
"conflict", err.Error(),
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
)
return nil
} else if err != nil {
return err
Expand All @@ -99,7 +113,22 @@ func (e *Engine) Do(cmds ...Command) error {
return fmt.Errorf("no commands to do")
}

var sessID int64
for _, cmd := range cmds {
if cmd.SessID() == 0 {
if sessID == 0 {
sessID = sessIDS.Add(1)
}

cmd.setSessID(sessID)

if cmtCmd, ok := cmd.(*CommitCommand); ok {
for _, subCmd := range cmtCmd.Commands {
subCmd.setSessID(sessID)
}
}
}

if err := e.do(cmd); err != nil {
return err
}
Expand Down Expand Up @@ -136,18 +165,28 @@ func (e *Engine) Shutdown(ctx context.Context) error {
}

func (e *Engine) do(cmd0 Command) error {
logDo(cmd0, e.l)

switch cmd := cmd0.(type) {
case *ExecuteCommand:
if cmd.sync {
return nil
}

go func() {
if err := e.Execute(cmd.StateCtx); err != nil {
log.Printf("ERROR: engine: go execute: %s\n", err)
stateCtx := cmd.StateCtx
if err := e.Execute(stateCtx); err != nil {
e.l.Error("execute failed",
"sess", stateCtx.SessID(),
"error", err,
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
)
}
}()
return nil
case *CommitCommand:
return e.d.Do(cmd0)
default:
return e.d.Do(cmd0)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/jackc/pgx/v5 v5.6.0
github.com/mattn/go-sqlite3 v1.14.22
github.com/stretchr/testify v1.9.0
github.com/thejerf/slogassert v0.3.4
github.com/xo/dburl v0.23.2
go.uber.org/goleak v1.3.0
golang.org/x/time v0.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/thejerf/slogassert v0.3.4 h1:VoTsXixRbXMrRSSxDjYTiEDCM4VWbsYPW5rB/hX24kM=
github.com/thejerf/slogassert v0.3.4/go.mod h1:0zn9ISLVKo1aPMTqcGfG1o6dWwt+Rk574GlUxHD4rs8=
github.com/xo/dburl v0.23.2 h1:Fl88cvayrgE56JA/sqhNMLljCW/b7RmG1mMkKMZUFgA=
github.com/xo/dburl v0.23.2/go.mod h1:uazlaAQxj4gkshhfuuYyvwCBouOmNnG2aDxTCFZpmL4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
Expand Down
94 changes: 94 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package flowstate

import (
"fmt"
"log/slog"
)

func logExecute(stateCtx *StateCtx, l *slog.Logger) {
args := []any{
"sess", stateCtx.sessID,
"flow", stateCtx.Current.Transition.ToID,
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
}

currTs := stateCtx.Current.Transition

if currTs.Annotations[StateAnnotation] != `` {
args = append(args, "state", currTs.Annotations[StateAnnotation])
}

if currTs.Annotations[DelayDurationAnnotation] != `` {
args = append(args, "delayed", "true")
}

if currTs.Annotations[RecoveryAttemptAnnotation] != `` {
args = append(args, "recovered", currTs.Annotations[RecoveryAttemptAnnotation])
}

l.Info("engine: execute", args...)
}

func logDo(cmd0 Command, l *slog.Logger) {
args := []any{"sess", cmd0.SessID()}

switch cmd := cmd0.(type) {
case *CommitCommand:
args = append(args, "cmd", "commit", "len", len(cmd.Commands))
case *CommitStateCtxCommand:
args = append(args, "cmd", "commit_state_ctx", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev)
case *TransitCommand:
args = append(args, "cmd", "transit", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev, "to", cmd.FlowID)
case *PauseCommand:
args = append(args, "cmd", "pause", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev)
if cmd.FlowID != `` {
args = append(args, "to", cmd.FlowID)
}
case *ResumeCommand:
args = append(args, "cmd", "resume", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev)
case *EndCommand:
args = append(args, "cmd", "end", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev)
case *DelayCommand:
args = append(args, "cmd", "delay", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev, "dur", cmd.Duration)
case *ExecuteCommand:
args = append(args, "cmd", "execute", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev)
case *NoopCommand:
args = append(args, "cmd", "noop", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev)
case *ReferenceDataCommand:
args = append(args,
"cmd", "reference_data",
"id", cmd.StateCtx.Current.ID,
"rev", cmd.StateCtx.Current.Rev,
"data_id", cmd.Data.ID,
"data_rev", cmd.Data.Rev,
"annot", cmd.Annotation,
)
case *DereferenceDataCommand:
args = append(args,
"cmd", "dereference_data",
"id", cmd.StateCtx.Current.ID,
"rev", cmd.StateCtx.Current.Rev,
"data_id", cmd.Data.ID,
"data_rev", cmd.Data.Rev,
"annot", cmd.Annotation,
)
case *StoreDataCommand:
args = append(args, "cmd", "store_data", "data_id", cmd.Data.ID, "data_rev", cmd.Data.Rev)
case *GetDataCommand:
args = append(args, "cmd", "get_data", "data_id", cmd.Data.ID, "data_rev", cmd.Data.Rev)
case *DeserializeCommand:
args = append(args, "cmd", "deserialize")
case *SerializeCommand:
args = append(args, "cmd", "serialize")
case *GetFlowCommand:
args = append(args, "cmd", "get_flow", "flow_id", cmd.StateCtx.Current.Transition.ToID)
case *WatchCommand:
args = append(args, "cmd", "watch")
default:
args = append(args, "cmd", fmt.Sprintf("%T", cmd))
}

l.Info("engine: do", args...)

}
38 changes: 30 additions & 8 deletions memdriver/delayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
"log"
"log/slog"
"sync"
"time"

Expand All @@ -17,11 +17,13 @@ type Delayer struct {
e *flowstate.Engine
stopCh chan struct{}
wg sync.WaitGroup
l *slog.Logger
}

func NewDelayer() *Delayer {
func NewDelayer(l *slog.Logger) *Delayer {
return &Delayer{
stopCh: make(chan struct{}),
l: l,
}
}

Expand All @@ -43,26 +45,46 @@ func (d *Delayer) Do(cmd0 flowstate.Command) error {
t := time.NewTimer(cmd.Duration)
defer t.Stop()

stateCtx := cmd.DelayStateCtx

select {
case <-t.C:
if cmd.DelayStateCtx.Current.Transition.Annotations[flowstate.DelayCommitAnnotation] == `true` {
if stateCtx.Current.Transition.Annotations[flowstate.DelayCommitAnnotation] == `true` {
conflictErr := &flowstate.ErrCommitConflict{}
if err := d.e.Do(flowstate.Commit(
flowstate.CommitStateCtx(cmd.DelayStateCtx),
)); errors.As(err, conflictErr) {
log.Printf(`ERROR: memdriver: delayer: engine: commit: %s\n`, conflictErr)
d.l.Info("delayer: commit conflict",
"sess", cmd.SessID(),
"conflict", err.Error(),
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
)
return
} else if err != nil {
log.Printf(`ERROR: memdriver: delayer: engine: commit: %s`, err)
d.l.Error("delayer: commit failed",
"sess", cmd.SessID(),
"error", err,
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
)
return
}
}

if err := d.e.Execute(cmd.DelayStateCtx); err != nil {
log.Printf(`ERROR: memdriver: delayer: engine: execute: %s`, err)
if err := d.e.Execute(stateCtx); err != nil {
d.l.Error("delayer: execute failed",
"sess", stateCtx.SessID(),
"error", err,
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
)
}
case <-d.stopCh:
log.Printf(`ERROR: memdriver: delayer: state delay was terminated`)
d.l.Error("delayer: delaying terminated",
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
)
return
}
}()
Expand Down
Loading

0 comments on commit 4ad8716

Please sign in to comment.