Skip to content

Commit

Permalink
Merge pull request #63 from makasim/log-exec-sess-id
Browse files Browse the repository at this point in the history
Log exec sess
  • Loading branch information
makasim authored Nov 23, 2024
2 parents cccf72f + a07609d commit dda1265
Show file tree
Hide file tree
Showing 47 changed files with 149 additions and 120 deletions.
2 changes: 1 addition & 1 deletion cmd_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func Watch(labels map[string]string) *WatchCommand {
return (&WatchCommand{}).WithORLabels(labels)
}

func DoWatch(e *Engine, cmd *WatchCommand) (WatchListener, error) {
func DoWatch(e Engine, cmd *WatchCommand) (WatchListener, error) {
if err := e.Do(cmd); err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions doer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
var ErrCommandNotSupported = errors.New("command not supported")

type Doer interface {
Init(e *Engine) error
Init(e Engine) error
Do(cmd Command) error
Shutdown(ctx context.Context) error
}
Expand All @@ -19,7 +19,7 @@ func (d DoerFunc) Do(cmd Command) error {
return d(cmd)
}

func (d DoerFunc) Init(_ *Engine) error {
func (d DoerFunc) Init(_ Engine) error {
return nil
}

Expand Down
49 changes: 35 additions & 14 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,22 @@ import (
var ErrFlowNotFound = errors.New("flow not found")
var sessIDS = &atomic.Int64{}

type Engine struct {
type Engine interface {
Execute(stateCtx *StateCtx) error
Do(cmds ...Command) error
Shutdown(ctx context.Context) error
}

type engine struct {
d Doer
l *slog.Logger

wg *sync.WaitGroup
doneCh chan struct{}
}

func NewEngine(d Doer, l *slog.Logger) (*Engine, error) {
e := &Engine{
func NewEngine(d Doer, l *slog.Logger) (Engine, error) {
e := &engine{
d: d,
l: l,

Expand All @@ -38,18 +44,20 @@ func NewEngine(d Doer, l *slog.Logger) (*Engine, error) {
return e, nil
}

func (e *Engine) Execute(stateCtx *StateCtx) error {
func (e *engine) Execute(stateCtx *StateCtx) error {
select {
case <-e.doneCh:
return nil
return fmt.Errorf("engine stopped")
default:
e.wg.Add(1)
defer e.wg.Done()
}

sessID := sessIDS.Add(1)
sessE := &execEngine{engine: e, sessID: sessID}
stateCtx.sessID = sessID
stateCtx.e = e
stateCtx.doneCh = e.doneCh

if stateCtx.Current.ID == `` {
return fmt.Errorf(`state id empty`)
Expand All @@ -72,7 +80,7 @@ func (e *Engine) Execute(stateCtx *StateCtx) error {
}

logExecute(stateCtx, e.l)
cmd0, err := f.Execute(stateCtx, e)
cmd0, err := f.Execute(stateCtx, sessE)
if err != nil {
return err
}
Expand All @@ -85,7 +93,7 @@ func (e *Engine) Execute(stateCtx *StateCtx) error {

conflictErr := &ErrCommitConflict{}

if err = e.do(cmd0); errors.As(err, conflictErr) {
if err = e.doCmd(stateCtx.SessID(), cmd0); errors.As(err, conflictErr) {
e.l.Info("engine: do conflict",
"sess", cmd0.SessID(),
"conflict", err.Error(),
Expand All @@ -108,7 +116,11 @@ func (e *Engine) Execute(stateCtx *StateCtx) error {
}
}

func (e *Engine) Do(cmds ...Command) error {
func (e *engine) Do(cmds ...Command) error {
return e.do(0, cmds...)
}

func (e *engine) do(execSessID int64, cmds ...Command) error {
if len(cmds) == 0 {
return fmt.Errorf("no commands to do")
}
Expand All @@ -129,15 +141,15 @@ func (e *Engine) Do(cmds ...Command) error {
}
}

if err := e.do(cmd); err != nil {
if err := e.doCmd(execSessID, cmd); err != nil {
return err
}
}

return nil
}

func (e *Engine) Shutdown(ctx context.Context) error {
func (e *engine) Shutdown(ctx context.Context) error {
select {
case <-e.doneCh:
return nil
Expand All @@ -164,8 +176,8 @@ func (e *Engine) Shutdown(ctx context.Context) error {
return e.d.Shutdown(ctx)
}

func (e *Engine) do(cmd0 Command) error {
logDo(cmd0, e.l)
func (e *engine) doCmd(execSessID int64, cmd0 Command) error {
logDo(execSessID, cmd0, e.l)

switch cmd := cmd0.(type) {
case *ExecuteCommand:
Expand All @@ -192,7 +204,7 @@ func (e *Engine) do(cmd0 Command) error {
}
}

func (e *Engine) getFlow(stateCtx *StateCtx) (Flow, error) {
func (e *engine) getFlow(stateCtx *StateCtx) (Flow, error) {
cmd := GetFlow(stateCtx)
if err := e.d.Do(cmd); err != nil {
return nil, err
Expand All @@ -201,7 +213,7 @@ func (e *Engine) getFlow(stateCtx *StateCtx) (Flow, error) {
return cmd.Flow, nil
}

func (e *Engine) continueExecution(cmd0 Command) (*StateCtx, error) {
func (e *engine) continueExecution(cmd0 Command) (*StateCtx, error) {
switch cmd := cmd0.(type) {
case *CommitCommand:
if len(cmd.Commands) != 1 {
Expand All @@ -227,3 +239,12 @@ func (e *Engine) continueExecution(cmd0 Command) (*StateCtx, error) {
return nil, fmt.Errorf("unknown command 123 %T", cmd0)
}
}

type execEngine struct {
*engine
sessID int64
}

func (e *execEngine) Do(cmds ...Command) error {
return e.do(e.sessID, cmds...)
}
6 changes: 3 additions & 3 deletions flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package flowstate
type FlowID string

type Flow interface {
Execute(stateCtx *StateCtx, e *Engine) (Command, error)
Execute(stateCtx *StateCtx, e Engine) (Command, error)
}

type FlowFunc func(stateCtx *StateCtx, e *Engine) (Command, error)
type FlowFunc func(stateCtx *StateCtx, e Engine) (Command, error)

func (f FlowFunc) Execute(stateCtx *StateCtx, e *Engine) (Command, error) {
func (f FlowFunc) Execute(stateCtx *StateCtx, e Engine) (Command, error) {
return f(stateCtx, e)
}
11 changes: 9 additions & 2 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package flowstate
import (
"fmt"
"log/slog"
"strconv"
)

func logExecute(stateCtx *StateCtx, l *slog.Logger) {
Expand Down Expand Up @@ -32,8 +33,14 @@ func logExecute(stateCtx *StateCtx, l *slog.Logger) {
l.Info("engine: execute", args...)
}

func logDo(cmd0 Command, l *slog.Logger) {
args := []any{"sess", cmd0.SessID()}
func logDo(execSessID int64, cmd0 Command, l *slog.Logger) {
var args []any

if execSessID > 0 {
args = []any{"sess", strconv.FormatInt(execSessID, 10) + ":" + strconv.FormatInt(cmd0.SessID(), 10)}
} else {
args = []any{"sess", cmd0.SessID()}
}

switch cmd := cmd0.(type) {
case *CommitCommand:
Expand Down
4 changes: 2 additions & 2 deletions memdriver/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var _ flowstate.Doer = &Commiter{}

type Commiter struct {
l *Log
e *flowstate.Engine
e flowstate.Engine
}

func NewCommiter(l *Log) *Commiter {
Expand Down Expand Up @@ -76,7 +76,7 @@ func (d *Commiter) Do(cmd0 flowstate.Command) error {
return nil
}

func (d *Commiter) Init(e *flowstate.Engine) error {
func (d *Commiter) Init(e flowstate.Engine) error {
d.e = e
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion memdriver/data_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func NewDataLog() *DataLog {
return &DataLog{}
}

func (l *DataLog) Init(_ *flowstate.Engine) error {
func (l *DataLog) Init(_ flowstate.Engine) error {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions memdriver/delayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
var _ flowstate.Doer = &Delayer{}

type Delayer struct {
e *flowstate.Engine
e flowstate.Engine
stopCh chan struct{}
wg sync.WaitGroup
l *slog.Logger
Expand Down Expand Up @@ -92,7 +92,7 @@ func (d *Delayer) Do(cmd0 flowstate.Command) error {
return nil
}

func (d *Delayer) Init(e *flowstate.Engine) error {
func (d *Delayer) Init(e flowstate.Engine) error {
d.e = e
d.wg.Add(1)
return nil
Expand Down
2 changes: 1 addition & 1 deletion memdriver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (d *Driver) Do(cmd0 flowstate.Command) error {
return fmt.Errorf("no doer for command %T", cmd0)
}

func (d *Driver) Init(e *flowstate.Engine) error {
func (d *Driver) Init(e flowstate.Engine) error {
for _, doer := range d.doers {
if err := doer.Init(e); err != nil {
return fmt.Errorf("%T: init: %w", doer, err)
Expand Down
2 changes: 1 addition & 1 deletion memdriver/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (d *Getter) Do(cmd0 flowstate.Command) error {
return nil
}

func (d *Getter) Init(e *flowstate.Engine) error {
func (d *Getter) Init(_ flowstate.Engine) error {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions memdriver/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var _ flowstate.WatchListener = &listener{}

type Watcher struct {
l *Log
e *flowstate.Engine
e flowstate.Engine
}

func NewWatcher(l *Log) *Watcher {
Expand Down Expand Up @@ -63,7 +63,7 @@ func (w *Watcher) Do(cmd0 flowstate.Command) error {
return nil
}

func (w *Watcher) Init(e *flowstate.Engine) error {
func (w *Watcher) Init(e flowstate.Engine) error {
w.e = e
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pgdriver/commiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type commiterQueries interface {
type Commiter struct {
conn conn
q commiterQueries
e *flowstate.Engine
e flowstate.Engine
}

func NewCommiter(conn conn, db commiterQueries) *Commiter {
Expand Down Expand Up @@ -122,7 +122,7 @@ func (cmtr *Commiter) Do(cmd0 flowstate.Command) error {
return tx.Commit(context.Background())
}

func (cmtr *Commiter) Init(e *flowstate.Engine) error {
func (cmtr *Commiter) Init(e flowstate.Engine) error {
cmtr.e = e
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pgdriver/dataer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewDataer(conn conn, q dataerQueries) *Dataer {
}
}

func (d *Dataer) Init(_ *flowstate.Engine) error {
func (d *Dataer) Init(_ flowstate.Engine) error {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pgdriver/delayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Delayer struct {
now func() time.Time
doneCh chan struct{}

e *flowstate.Engine
e flowstate.Engine
}

func NewDelayer(conn conn, q delayerQueries, now func() time.Time) *Delayer {
Expand Down Expand Up @@ -71,7 +71,7 @@ func (d *Delayer) Do(cmd0 flowstate.Command) error {
return nil
}

func (d *Delayer) Init(e *flowstate.Engine) error {
func (d *Delayer) Init(e flowstate.Engine) error {
d.e = e

go func() {
Expand Down
2 changes: 1 addition & 1 deletion pgdriver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (d *Driver) Do(cmd0 flowstate.Command) error {
return fmt.Errorf("no doer for command %T", cmd0)
}

func (d *Driver) Init(e *flowstate.Engine) error {
func (d *Driver) Init(e flowstate.Engine) error {
for _, doer := range d.doers {
if err := doer.Init(e); err != nil {
return fmt.Errorf("%T: init: %w", doer, err)
Expand Down
2 changes: 1 addition & 1 deletion pgdriver/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (d *Getter) Do(cmd0 flowstate.Command) error {
return nil
}

func (d *Getter) Init(_ *flowstate.Engine) error {
func (d *Getter) Init(_ flowstate.Engine) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pgdriver/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (w *Watcher) Do(cmd0 flowstate.Command) error {
return nil
}

func (w *Watcher) Init(_ *flowstate.Engine) error {
func (w *Watcher) Init(_ flowstate.Engine) error {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions recoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type RecovererDoer struct {
failoverDur time.Duration

wl WatchListener
e *Engine
e Engine
doneCh chan struct{}
stoppedCh chan struct{}
log []State
Expand All @@ -30,7 +30,7 @@ func (d *RecovererDoer) Do(_ Command) error {
return ErrCommandNotSupported
}

func (d *RecovererDoer) Init(e *Engine) error {
func (d *RecovererDoer) Init(e Engine) error {
cmd := Watch(nil)
if err := e.Do(cmd); err != nil {
return err
Expand Down
Loading

0 comments on commit dda1265

Please sign in to comment.