Skip to content

Commit

Permalink
Merge pull request #394 from goxiaoy/workflow-context
Browse files Browse the repository at this point in the history
workflow execute add context
  • Loading branch information
yedf2 authored Feb 12, 2023
2 parents f83ca85 + 468d88a commit f4cd29a
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 10 deletions.
5 changes: 3 additions & 2 deletions client/workflow/factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package workflow

import (
"context"
"fmt"

"github.com/dtm-labs/logger"
Expand All @@ -19,12 +20,12 @@ var defaultFac = workflowFactory{
handlers: map[string]*wfItem{},
}

func (w *workflowFactory) execute(name string, gid string, data []byte) ([]byte, error) {
func (w *workflowFactory) execute(ctx context.Context, name string, gid string, data []byte) ([]byte, error) {
handler := w.handlers[name]
if handler == nil {
return nil, fmt.Errorf("workflow '%s' not registered. please register at startup", name)
}
wf := w.newWorkflow(name, gid, data)
wf := w.newWorkflow(ctx, name, gid, data)
for _, fn := range handler.custom {
fn(wf)
}
Expand Down
3 changes: 2 additions & 1 deletion client/workflow/imp.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (wf *Workflow) initProgress(progresses []*dtmgpb.DtmProgress) {

type wfMeta struct{}

func (w *workflowFactory) newWorkflow(name string, gid string, data []byte) *Workflow {
func (w *workflowFactory) newWorkflow(ctx context.Context, name string, gid string, data []byte) *Workflow {
wf := &Workflow{
TransBase: dtmimp.NewTransBase(gid, "workflow", "not inited", ""),
Name: name,
Expand All @@ -58,6 +58,7 @@ func (w *workflowFactory) newWorkflow(name string, gid string, data []byte) *Wor
currentOp: dtmimp.OpAction,
},
}
wf.Context = ctx
wf.Protocol = w.protocol
if w.protocol == dtmimp.ProtocolGRPC {
wf.Dtm = w.grpcDtm
Expand Down
2 changes: 1 addition & 1 deletion client/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ func (s *workflowServer) Execute(ctx context.Context, wd *wfpb.WorkflowData) (*e
return nil, status.Errorf(codes.Internal, "workflow server not inited. please call workflow.InitGrpc first")
}
tb := dtmgimp.TransBaseFromGrpc(ctx)
_, err := defaultFac.execute(tb.Op, tb.Gid, wd.Data)
_, err := defaultFac.execute(ctx, tb.Op, tb.Gid, wd.Data)
return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(err)
}
25 changes: 20 additions & 5 deletions client/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,39 @@ func Register2(name string, handler WfFunc2, custom ...func(wf *Workflow)) error
return defaultFac.register(name, handler, custom...)
}

// Execute will execute a workflow with the gid and specified params
// Execute is the same as ExecuteCtx, but with context.Background
func Execute(name string, gid string, data []byte) error {
return ExecuteCtx(context.Background(), name, gid, data)
}

// ExecuteCtx will execute a workflow with the gid and specified params
// if the workflow with the gid does not exist, then create a new workflow and execute it
// if the workflow with the gid exists, resume to execute it
func Execute(name string, gid string, data []byte) error {
_, err := defaultFac.execute(name, gid, data)
func ExecuteCtx(ctx context.Context, name string, gid string, data []byte) error {
_, err := defaultFac.execute(ctx, name, gid, data)
return err
}

// Execute2 is the same as Execute, but workflow func can return result
func Execute2(name string, gid string, data []byte) ([]byte, error) {
return defaultFac.execute(name, gid, data)
return Execute2Ctx(context.Background(), name, gid, data)
}

// Execute2Ctx is the same as Execute2, but with context.Background
func Execute2Ctx(ctx context.Context, name string, gid string, data []byte) ([]byte, error) {
return defaultFac.execute(ctx, name, gid, data)
}

// ExecuteByQS is like Execute, but name and gid will be obtained from qs
func ExecuteByQS(qs url.Values, body []byte) error {
return ExecuteByQSCtx(context.Background(), qs, body)
}

// ExecuteByQSCtx is the same as ExecuteByQS, but with context.Background
func ExecuteByQSCtx(ctx context.Context, qs url.Values, body []byte) error {
name := qs.Get("op")
gid := qs.Get("gid")
_, err := defaultFac.execute(name, gid, body)
_, err := defaultFac.execute(ctx, name, gid, body)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion client/workflow/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

func TestAbnormal(t *testing.T) {
fname := dtmimp.GetFuncName()
_, err := defaultFac.execute(fname, fname, nil)
_, err := defaultFac.execute(context.Background(), fname, fname, nil)
assert.Error(t, err)

err = defaultFac.register(fname, func(wf *Workflow, data []byte) ([]byte, error) { return nil, nil })
Expand Down

0 comments on commit f4cd29a

Please sign in to comment.