Skip to content

Commit f4cd29a

Browse files
authored
Merge pull request #394 from goxiaoy/workflow-context
workflow execute add context
2 parents f83ca85 + 468d88a commit f4cd29a

File tree

5 files changed

+27
-10
lines changed

5 files changed

+27
-10
lines changed

Diff for: client/workflow/factory.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package workflow
22

33
import (
4+
"context"
45
"fmt"
56

67
"github.com/dtm-labs/logger"
@@ -19,12 +20,12 @@ var defaultFac = workflowFactory{
1920
handlers: map[string]*wfItem{},
2021
}
2122

22-
func (w *workflowFactory) execute(name string, gid string, data []byte) ([]byte, error) {
23+
func (w *workflowFactory) execute(ctx context.Context, name string, gid string, data []byte) ([]byte, error) {
2324
handler := w.handlers[name]
2425
if handler == nil {
2526
return nil, fmt.Errorf("workflow '%s' not registered. please register at startup", name)
2627
}
27-
wf := w.newWorkflow(name, gid, data)
28+
wf := w.newWorkflow(ctx, name, gid, data)
2829
for _, fn := range handler.custom {
2930
fn(wf)
3031
}

Diff for: client/workflow/imp.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (wf *Workflow) initProgress(progresses []*dtmgpb.DtmProgress) {
4747

4848
type wfMeta struct{}
4949

50-
func (w *workflowFactory) newWorkflow(name string, gid string, data []byte) *Workflow {
50+
func (w *workflowFactory) newWorkflow(ctx context.Context, name string, gid string, data []byte) *Workflow {
5151
wf := &Workflow{
5252
TransBase: dtmimp.NewTransBase(gid, "workflow", "not inited", ""),
5353
Name: name,
@@ -58,6 +58,7 @@ func (w *workflowFactory) newWorkflow(name string, gid string, data []byte) *Wor
5858
currentOp: dtmimp.OpAction,
5959
},
6060
}
61+
wf.Context = ctx
6162
wf.Protocol = w.protocol
6263
if w.protocol == dtmimp.ProtocolGRPC {
6364
wf.Dtm = w.grpcDtm

Diff for: client/workflow/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@ func (s *workflowServer) Execute(ctx context.Context, wd *wfpb.WorkflowData) (*e
2121
return nil, status.Errorf(codes.Internal, "workflow server not inited. please call workflow.InitGrpc first")
2222
}
2323
tb := dtmgimp.TransBaseFromGrpc(ctx)
24-
_, err := defaultFac.execute(tb.Op, tb.Gid, wd.Data)
24+
_, err := defaultFac.execute(ctx, tb.Op, tb.Gid, wd.Data)
2525
return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(err)
2626
}

Diff for: client/workflow/workflow.go

+20-5
Original file line numberDiff line numberDiff line change
@@ -53,24 +53,39 @@ func Register2(name string, handler WfFunc2, custom ...func(wf *Workflow)) error
5353
return defaultFac.register(name, handler, custom...)
5454
}
5555

56-
// Execute will execute a workflow with the gid and specified params
56+
// Execute is the same as ExecuteCtx, but with context.Background
57+
func Execute(name string, gid string, data []byte) error {
58+
return ExecuteCtx(context.Background(), name, gid, data)
59+
}
60+
61+
// ExecuteCtx will execute a workflow with the gid and specified params
5762
// if the workflow with the gid does not exist, then create a new workflow and execute it
5863
// if the workflow with the gid exists, resume to execute it
59-
func Execute(name string, gid string, data []byte) error {
60-
_, err := defaultFac.execute(name, gid, data)
64+
func ExecuteCtx(ctx context.Context, name string, gid string, data []byte) error {
65+
_, err := defaultFac.execute(ctx, name, gid, data)
6166
return err
6267
}
6368

6469
// Execute2 is the same as Execute, but workflow func can return result
6570
func Execute2(name string, gid string, data []byte) ([]byte, error) {
66-
return defaultFac.execute(name, gid, data)
71+
return Execute2Ctx(context.Background(), name, gid, data)
72+
}
73+
74+
// Execute2Ctx is the same as Execute2, but with context.Background
75+
func Execute2Ctx(ctx context.Context, name string, gid string, data []byte) ([]byte, error) {
76+
return defaultFac.execute(ctx, name, gid, data)
6777
}
6878

6979
// ExecuteByQS is like Execute, but name and gid will be obtained from qs
7080
func ExecuteByQS(qs url.Values, body []byte) error {
81+
return ExecuteByQSCtx(context.Background(), qs, body)
82+
}
83+
84+
// ExecuteByQSCtx is the same as ExecuteByQS, but with context.Background
85+
func ExecuteByQSCtx(ctx context.Context, qs url.Values, body []byte) error {
7186
name := qs.Get("op")
7287
gid := qs.Get("gid")
73-
_, err := defaultFac.execute(name, gid, body)
88+
_, err := defaultFac.execute(ctx, name, gid, body)
7489
return err
7590
}
7691

Diff for: client/workflow/workflow_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010

1111
func TestAbnormal(t *testing.T) {
1212
fname := dtmimp.GetFuncName()
13-
_, err := defaultFac.execute(fname, fname, nil)
13+
_, err := defaultFac.execute(context.Background(), fname, fname, nil)
1414
assert.Error(t, err)
1515

1616
err = defaultFac.register(fname, func(wf *Workflow, data []byte) ([]byte, error) { return nil, nil })

0 commit comments

Comments
 (0)