Skip to content

Commit

Permalink
poc
Browse files Browse the repository at this point in the history
  • Loading branch information
makasim committed Nov 11, 2024
1 parent 8962421 commit a82a7c5
Show file tree
Hide file tree
Showing 45 changed files with 112 additions and 69 deletions.
2 changes: 1 addition & 1 deletion engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (e *Engine) Execute(stateCtx *StateCtx) error {
conflictErr := &ErrCommitConflict{}

if err = e.do(cmd0); errors.As(err, conflictErr) {
e.l.Info("do conflict",
e.l.Info("engine: do conflict",
"sess", cmd0.SessID(),
"conflict", err.Error(),
"id", stateCtx.Current.ID,
Expand Down
7 changes: 2 additions & 5 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,14 @@ func logExecute(stateCtx *StateCtx, l *slog.Logger) {
args = append(args, "recovered", currTs.Annotations[RecoveryAttemptAnnotation])
}

l.Info("execute flow", args...)
l.Info("engine: execute", args...)
}

func logDo(cmd0 Command, l *slog.Logger) {
args := []any{"sess", cmd0.SessID()}

switch cmd := cmd0.(type) {
case *CommitCommand:
//for _, c := range cmd.Commands {
// log.Printf("commit: %T", c)
//}
args = append(args, "cmd", "commit", "len", len(cmd.Commands))
case *CommitStateCtxCommand:
args = append(args, "cmd", "commit_state_ctx", "id", cmd.StateCtx.Current.ID, "rev", cmd.StateCtx.Current.Rev)
Expand Down Expand Up @@ -92,6 +89,6 @@ func logDo(cmd0 Command, l *slog.Logger) {
args = append(args, "cmd", fmt.Sprintf("%T", cmd))
}

l.Info("do commands", args...)
l.Info("engine: do", args...)

}
38 changes: 30 additions & 8 deletions memdriver/delayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
"log"
"log/slog"
"sync"
"time"

Expand All @@ -17,11 +17,13 @@ type Delayer struct {
e *flowstate.Engine
stopCh chan struct{}
wg sync.WaitGroup
l *slog.Logger
}

func NewDelayer() *Delayer {
func NewDelayer(l *slog.Logger) *Delayer {
return &Delayer{
stopCh: make(chan struct{}),
l: l,
}
}

Expand All @@ -43,26 +45,46 @@ func (d *Delayer) Do(cmd0 flowstate.Command) error {
t := time.NewTimer(cmd.Duration)
defer t.Stop()

stateCtx := cmd.DelayStateCtx

select {
case <-t.C:
if cmd.DelayStateCtx.Current.Transition.Annotations[flowstate.DelayCommitAnnotation] == `true` {
if stateCtx.Current.Transition.Annotations[flowstate.DelayCommitAnnotation] == `true` {
conflictErr := &flowstate.ErrCommitConflict{}
if err := d.e.Do(flowstate.Commit(
flowstate.CommitStateCtx(cmd.DelayStateCtx),
)); errors.As(err, conflictErr) {
log.Printf(`ERROR: memdriver: delayer: engine: commit: %s\n`, conflictErr)
d.l.Info("delayer: commit conflict",
"sess", cmd.SessID(),
"conflict", err.Error(),
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
)
return
} else if err != nil {
log.Printf(`ERROR: memdriver: delayer: engine: commit: %s`, err)
d.l.Error("delayer: commit failed",
"sess", cmd.SessID(),
"error", err,
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
)
return
}
}

if err := d.e.Execute(cmd.DelayStateCtx); err != nil {
log.Printf(`ERROR: memdriver: delayer: engine: execute: %s`, err)
if err := d.e.Execute(stateCtx); err != nil {
d.l.Error("delayer: execute failed",
"sess", stateCtx.SessID(),
"error", err,
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
)
}
case <-d.stopCh:
log.Printf(`ERROR: memdriver: delayer: state delay was terminated`)
d.l.Error("delayer: delaying terminated",
"id", stateCtx.Current.ID,
"rev", stateCtx.Current.Rev,
)
return
}
}()
Expand Down
34 changes: 23 additions & 11 deletions memdriver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,34 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"os"

"github.com/makasim/flowstate"
)

type Driver struct {
*FlowRegistry

l *Log
log *Log
doers []flowstate.Doer
recoverer flowstate.Doer

l *slog.Logger
}

func New(opts ...Option) *Driver {
l := &Log{}
log := &Log{}

d := &Driver{
l: l,
log: log,
FlowRegistry: &FlowRegistry{},

l: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{})),
}

for _, opt := range opts {
opt(d)
}

doers := []flowstate.Doer{
Expand All @@ -37,17 +47,13 @@ func New(opts ...Option) *Driver {

NewDataLog(),
NewFlowGetter(d.FlowRegistry),
NewCommiter(l),
NewGetter(l),
NewWatcher(l),
NewDelayer(),
NewCommiter(log),
NewGetter(log),
NewWatcher(log),
NewDelayer(d.l),
}
d.doers = doers

for _, opt := range opts {
opt(d)
}

return d
}

Expand Down Expand Up @@ -105,3 +111,9 @@ func WithRecoverer(r flowstate.Doer) Option {
d.recoverer = r
}
}

func WithLogger(l *slog.Logger) Option {
return func(d *Driver) {
d.l = l
}
}
3 changes: 2 additions & 1 deletion memdriver/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (

func TestSuite(t *testing.T) {
s := testcases.Get(func(t testcases.TestingT) (flowstate.Doer, testcases.FlowRegistry) {
d := memdriver.New()
l, _ := testcases.NewTestLogger(t)
d := memdriver.New(memdriver.WithLogger(l))
return d, d
})

Expand Down
18 changes: 14 additions & 4 deletions pgdriver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"time"

"github.com/makasim/flowstate"
Expand All @@ -18,6 +20,7 @@ type Driver struct {
doers []flowstate.Doer

recoverer flowstate.Doer
l *slog.Logger
}

func New(conn conn, opts ...Option) *Driver {
Expand All @@ -26,6 +29,11 @@ func New(conn conn, opts ...Option) *Driver {

q: &queries{},
FlowRegistry: &memdriver.FlowRegistry{},
l: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{})),
}

for _, opt := range opts {
opt(d)
}

d.doers = []flowstate.Doer{
Expand All @@ -50,10 +58,6 @@ func New(conn conn, opts ...Option) *Driver {
NewDelayer(d.conn, d.q, time.Now),
}

for _, opt := range opts {
opt(d)
}

return d
}

Expand Down Expand Up @@ -111,3 +115,9 @@ func WithRecoverer(r flowstate.Doer) Option {
d.recoverer = r
}
}

func WithLogger(l *slog.Logger) Option {
return func(d *Driver) {
d.l = l
}
}
3 changes: 2 additions & 1 deletion pgdriver/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func TestSuite(t *testing.T) {
conn.Close()
})

d := pgdriver.New(conn)
l, _ := testcases.NewTestLogger(t)
d := pgdriver.New(conn, pgdriver.WithLogger(l))
return d, d
})

Expand Down
2 changes: 1 addition & 1 deletion testcases/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func Actor(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return nil, fmt.Errorf("must never be executed")
}))

l, _ := newTestLogger(t)
l, _ := NewTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion testcases/call_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func CallFlow(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

l, _ := newTestLogger(t)
l, _ := NewTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion testcases/call_flow_with_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func CallFlowWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

l, _ := newTestLogger(t)
l, _ := NewTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion testcases/call_flow_with_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func CallFlowWithWatch(t TestingT, d flowstate.Doer, fr FlowRegistry) {
), nil
}))

l, _ := newTestLogger(t)
l, _ := NewTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion testcases/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func Condition(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

l, _ := newTestLogger(t)
l, _ := NewTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion testcases/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func Cron(t TestingT, d flowstate.Doer, fr FlowRegistry) {
), nil
}))

l, _ := newTestLogger(t)
l, _ := NewTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion testcases/data_flow_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func DataFlowConfig(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

l, _ := newTestLogger(t)
l, _ := NewTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion testcases/data_store_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func DataStoreGet(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

l, _ := newTestLogger(t)
l, _ := NewTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion testcases/data_store_get_with_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func DataStoreGetWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

l, _ := newTestLogger(t)
l, _ := NewTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion testcases/delay_delayed_win_with_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func Delay_DelayedWin_WithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry)
), nil
}))

l, _ := newTestLogger(t)
l, _ := NewTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion testcases/delay_engine_do.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func Delay_EngineDo(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

l, _ := newTestLogger(t)
l, _ := NewTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion testcases/delay_paused.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func Delay_Paused(t TestingT, d flowstate.Doer, fr FlowRegistry) {
), nil
}))

l, _ := newTestLogger(t)
l, _ := NewTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion testcases/delay_paused_with_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func Delay_PausedWithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry) {
), nil
}))

l, _ := newTestLogger(t)
l, _ := NewTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion testcases/delay_return.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func Delay_Return(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

l, _ := newTestLogger(t)
l, _ := NewTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion testcases/delay_transited_win_with_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func Delay_TransitedWin_WithCommit(t TestingT, d flowstate.Doer, fr FlowRegistry
), nil
}))

l, _ := newTestLogger(t)
l, _ := NewTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion testcases/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func Fork(t TestingT, d flowstate.Doer, fr FlowRegistry) {
return flowstate.End(stateCtx), nil
}))

l, _ := newTestLogger(t)
l, _ := NewTestLogger(t)
e, err := flowstate.NewEngine(d, l)
require.NoError(t, err)
defer func() {
Expand Down
Loading

0 comments on commit a82a7c5

Please sign in to comment.