Skip to content

Commit 86519a8

Browse files
committed
workflow execute context
1 parent f735287 commit 86519a8

12 files changed

+38
-31
lines changed

client/workflow/factory.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package workflow
22

33
import (
4+
"context"
45
"fmt"
56

67
"github.com/dtm-labs/logger"
@@ -19,12 +20,12 @@ var defaultFac = workflowFactory{
1920
handlers: map[string]*wfItem{},
2021
}
2122

22-
func (w *workflowFactory) execute(name string, gid string, data []byte) ([]byte, error) {
23+
func (w *workflowFactory) execute(ctx context.Context, name string, gid string, data []byte) ([]byte, error) {
2324
handler := w.handlers[name]
2425
if handler == nil {
2526
return nil, fmt.Errorf("workflow '%s' not registered. please register at startup", name)
2627
}
27-
wf := w.newWorkflow(name, gid, data)
28+
wf := w.newWorkflow(ctx, name, gid, data)
2829
for _, fn := range handler.custom {
2930
fn(wf)
3031
}

client/workflow/imp.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (wf *Workflow) initProgress(progresses []*dtmgpb.DtmProgress) {
4747

4848
type wfMeta struct{}
4949

50-
func (w *workflowFactory) newWorkflow(name string, gid string, data []byte) *Workflow {
50+
func (w *workflowFactory) newWorkflow(ctx context.Context, name string, gid string, data []byte) *Workflow {
5151
wf := &Workflow{
5252
TransBase: dtmimp.NewTransBase(gid, "workflow", "not inited", ""),
5353
Name: name,
@@ -58,6 +58,7 @@ func (w *workflowFactory) newWorkflow(name string, gid string, data []byte) *Wor
5858
currentOp: dtmimp.OpAction,
5959
},
6060
}
61+
wf.Context = ctx
6162
wf.Protocol = w.protocol
6263
if w.protocol == dtmimp.ProtocolGRPC {
6364
wf.Dtm = w.grpcDtm

client/workflow/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@ func (s *workflowServer) Execute(ctx context.Context, wd *wfpb.WorkflowData) (*e
2121
return nil, status.Errorf(codes.Internal, "workflow server not inited. please call workflow.InitGrpc first")
2222
}
2323
tb := dtmgimp.TransBaseFromGrpc(ctx)
24-
_, err := defaultFac.execute(tb.Op, tb.Gid, wd.Data)
24+
_, err := defaultFac.execute(ctx, tb.Op, tb.Gid, wd.Data)
2525
return &emptypb.Empty{}, dtmgrpc.DtmError2GrpcError(err)
2626
}

client/workflow/workflow.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,21 +56,21 @@ func Register2(name string, handler WfFunc2, custom ...func(wf *Workflow)) error
5656
// Execute will execute a workflow with the gid and specified params
5757
// if the workflow with the gid does not exist, then create a new workflow and execute it
5858
// if the workflow with the gid exists, resume to execute it
59-
func Execute(name string, gid string, data []byte) error {
60-
_, err := defaultFac.execute(name, gid, data)
59+
func Execute(ctx context.Context, name string, gid string, data []byte) error {
60+
_, err := defaultFac.execute(ctx, name, gid, data)
6161
return err
6262
}
6363

6464
// Execute2 is the same as Execute, but workflow func can return result
65-
func Execute2(name string, gid string, data []byte) ([]byte, error) {
66-
return defaultFac.execute(name, gid, data)
65+
func Execute2(ctx context.Context, name string, gid string, data []byte) ([]byte, error) {
66+
return defaultFac.execute(ctx, name, gid, data)
6767
}
6868

6969
// ExecuteByQS is like Execute, but name and gid will be obtained from qs
70-
func ExecuteByQS(qs url.Values, body []byte) error {
70+
func ExecuteByQS(ctx context.Context, qs url.Values, body []byte) error {
7171
name := qs.Get("op")
7272
gid := qs.Get("gid")
73-
_, err := defaultFac.execute(name, gid, body)
73+
_, err := defaultFac.execute(ctx, name, gid, body)
7474
return err
7575
}
7676

client/workflow/workflow_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010

1111
func TestAbnormal(t *testing.T) {
1212
fname := dtmimp.GetFuncName()
13-
_, err := defaultFac.execute(fname, fname, nil)
13+
_, err := defaultFac.execute(context.Background(), fname, fname, nil)
1414
assert.Error(t, err)
1515

1616
err = defaultFac.register(fname, func(wf *Workflow, data []byte) ([]byte, error) { return nil, nil })

test/busi/base_http.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func BaseAddRoute(app *gin.Engine) {
8888
app.POST(BusiAPI+"/workflow/resume", dtmutil.WrapHandler(func(ctx *gin.Context) interface{} {
8989
data, err := ioutil.ReadAll(ctx.Request.Body)
9090
logger.FatalIfError(err)
91-
return workflow.ExecuteByQS(ctx.Request.URL.Query(), data)
91+
return workflow.ExecuteByQS(ctx, ctx.Request.URL.Query(), data)
9292
}))
9393
app.POST(BusiAPI+"/TransIn", dtmutil.WrapHandler(func(c *gin.Context) interface{} {
9494
return handleGeneralBusiness(c, MainSwitch.TransInResult.Fetch(), reqFrom(c).TransInResult, "transIn")

test/dtmsvr_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestUpdateBranchAsync(t *testing.T) {
7070
return err
7171
})
7272
assert.Nil(t, err)
73-
err = workflow.Execute(gid, gid, nil)
73+
err = workflow.Execute(context.Background(), gid, gid, nil)
7474
assert.Nil(t, err)
7575

7676
time.Sleep(dtmsvr.UpdateBranchAsyncInterval)

test/workflow_grpc_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package test
88

99
import (
10+
"context"
1011
"database/sql"
1112
"testing"
1213

@@ -33,7 +34,7 @@ func TestWorkflowGrpcSimple(t *testing.T) {
3334
_, err = busi.BusiCli.TransInBSaga(wf.NewBranchCtx(), &req)
3435
return err
3536
})
36-
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
37+
err := workflow.Execute(context.Background(), gid, gid, dtmgimp.MustProtoMarshal(req))
3738
assert.Error(t, err)
3839
assert.Equal(t, StatusFailed, getTransStatus(gid))
3940
}
@@ -61,7 +62,7 @@ func TestWorkflowGrpcRollback(t *testing.T) {
6162
return err
6263
})
6364
before := getBeforeBalances("mysql")
64-
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
65+
err := workflow.Execute(context.Background(), gid, gid, dtmgimp.MustProtoMarshal(req))
6566
assert.Error(t, err, dtmcli.ErrFailure)
6667
assert.Equal(t, StatusFailed, getTransStatus(gid))
6768
assertSameBalance(t, before, "mysql")
@@ -106,7 +107,7 @@ func TestWorkflowMixed(t *testing.T) {
106107
assert.Nil(t, err)
107108
before := getBeforeBalances("mysql")
108109
req := &busi.ReqGrpc{Amount: 30}
109-
err = workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
110+
err = workflow.Execute(context.Background(), gid, gid, dtmgimp.MustProtoMarshal(req))
110111
assert.Nil(t, err)
111112
assert.Equal(t, StatusSucceed, getTransStatus(gid))
112113
assertNotSameBalance(t, before, "mysql")
@@ -127,7 +128,7 @@ func TestWorkflowGrpcError(t *testing.T) {
127128
_, err = busi.BusiCli.TransIn(wf.NewBranchCtx(), &req)
128129
return err
129130
})
130-
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
131+
err := workflow.Execute(context.Background(), gid, gid, dtmgimp.MustProtoMarshal(req))
131132
assert.Error(t, err)
132133
cronTransOnceForwardCron(t, gid, 1000)
133134
assert.Equal(t, StatusSucceed, getTransStatus(gid))

test/workflow_http_ret_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package test
22

33
import (
4+
"context"
45
"testing"
56

67
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
@@ -21,13 +22,13 @@ func TestWorkflowRet(t *testing.T) {
2122
return []byte("result of workflow"), err
2223
})
2324

24-
ret, err := workflow.Execute2(gid, gid, dtmimp.MustMarshal(req))
25+
ret, err := workflow.Execute2(context.Background(), gid, gid, dtmimp.MustMarshal(req))
2526
assert.Nil(t, err)
2627
assert.Equal(t, "result of workflow", string(ret))
2728
assert.Equal(t, StatusSucceed, getTransStatus(gid))
2829

2930
// the second execute will return result directly
30-
ret, err = workflow.Execute2(gid, gid, dtmimp.MustMarshal(req))
31+
ret, err = workflow.Execute2(context.Background(), gid, gid, dtmimp.MustMarshal(req))
3132
assert.Nil(t, err)
3233
assert.Equal(t, "result of workflow", string(ret))
3334
assert.Equal(t, StatusSucceed, getTransStatus(gid))

test/workflow_http_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package test
88

99
import (
10+
"context"
1011
"database/sql"
1112
"testing"
1213

@@ -41,7 +42,7 @@ func TestWorkflowNormal(t *testing.T) {
4142
return nil
4243
})
4344

44-
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
45+
err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req))
4546
assert.Nil(t, err)
4647
assert.Equal(t, StatusSucceed, getTransStatus(gid))
4748
}
@@ -82,7 +83,7 @@ func TestWorkflowRollback(t *testing.T) {
8283
})
8384
before := getBeforeBalances("mysql")
8485

85-
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
86+
err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req))
8687
assert.Error(t, err, dtmcli.ErrFailure)
8788
assert.Equal(t, StatusFailed, getTransStatus(gid))
8889
assertSameBalance(t, before, "mysql")
@@ -120,7 +121,7 @@ func TestWorkflowTcc(t *testing.T) {
120121
})
121122

122123
before := getBeforeBalances("mysql")
123-
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
124+
err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req))
124125
assert.Nil(t, err)
125126
assert.Equal(t, StatusSucceed, getTransStatus(gid))
126127
assertNotSameBalance(t, before, "mysql")
@@ -158,7 +159,7 @@ func TestWorkflowTccRollback(t *testing.T) {
158159
})
159160

160161
before := getBeforeBalances("mysql")
161-
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
162+
err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req))
162163
assert.Error(t, err)
163164
assert.Equal(t, StatusFailed, getTransStatus(gid))
164165
assertSameBalance(t, before, "mysql")
@@ -177,7 +178,7 @@ func TestWorkflowError(t *testing.T) {
177178
return err
178179
})
179180

180-
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
181+
err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req))
181182
assert.Error(t, err)
182183
cronTransOnceForwardCron(t, gid, 1000)
183184
assert.Equal(t, StatusSucceed, getTransStatus(gid))
@@ -196,7 +197,7 @@ func TestWorkflowOngoing(t *testing.T) {
196197
return err
197198
})
198199

199-
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
200+
err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req))
200201
assert.Error(t, err)
201202
cronTransOnceForwardCron(t, gid, 1000)
202203
assert.Equal(t, StatusSucceed, getTransStatus(gid))
@@ -224,7 +225,7 @@ func TestWorkflowResumeSkip(t *testing.T) {
224225
return err
225226
})
226227

227-
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
228+
err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req))
228229
assert.Error(t, err)
229230
cronTransOnceForwardCron(t, gid, 1000)
230231
assert.Equal(t, StatusSucceed, getTransStatus(gid))

test/workflow_ongoing_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package test
88

99
import (
10+
"context"
1011
"database/sql"
1112
"testing"
1213

@@ -47,7 +48,7 @@ func TestWorkflowSimpleResume(t *testing.T) {
4748
return err
4849
})
4950

50-
err := workflow.Execute(gid, gid, dtmimp.MustMarshal(req))
51+
err := workflow.Execute(context.Background(), gid, gid, dtmimp.MustMarshal(req))
5152
assert.Error(t, err)
5253
cronTransOnceForwardNow(t, gid, 1000)
5354
assert.Equal(t, StatusSucceed, getTransStatus(gid))
@@ -94,7 +95,7 @@ func TestWorkflowGrpcRollbackResume(t *testing.T) {
9495
})
9596
before := getBeforeBalances("mysql")
9697
req := &busi.ReqGrpc{Amount: 30, TransInResult: "FAILURE"}
97-
err := workflow.Execute(gid, gid, dtmgimp.MustProtoMarshal(req))
98+
err := workflow.Execute(context.Background(), gid, gid, dtmgimp.MustProtoMarshal(req))
9899
assert.Error(t, err, dtmcli.ErrOngoing)
99100
assert.Equal(t, StatusPrepared, getTransStatus(gid))
100101
cronTransOnceForwardNow(t, gid, 1000)
@@ -140,7 +141,7 @@ func TestWorkflowXaResume(t *testing.T) {
140141
return err
141142
})
142143
before := getBeforeBalances("mysql")
143-
err := workflow.Execute(gid, gid, nil)
144+
err := workflow.Execute(context.Background(), gid, gid, nil)
144145
assert.Equal(t, dtmcli.ErrOngoing, err)
145146

146147
cronTransOnceForwardNow(t, gid, 1000)

test/workflow_xa_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package test
88

99
import (
10+
"context"
1011
"database/sql"
1112
"testing"
1213

@@ -34,7 +35,7 @@ func TestWorkflowXaAction(t *testing.T) {
3435
return err
3536
})
3637
before := getBeforeBalances("mysql")
37-
err := workflow.Execute(gid, gid, nil)
38+
err := workflow.Execute(context.Background(), gid, gid, nil)
3839
assert.Nil(t, err)
3940
assert.Equal(t, StatusSucceed, getTransStatus(gid))
4041
assertNotSameBalance(t, before, "mysql")
@@ -58,7 +59,7 @@ func TestWorkflowXaRollback(t *testing.T) {
5859
return err
5960
})
6061
before := getBeforeBalances("mysql")
61-
err := workflow.Execute(gid, gid, nil)
62+
err := workflow.Execute(context.Background(), gid, gid, nil)
6263
assert.Equal(t, dtmcli.ErrFailure, err)
6364
assert.Equal(t, StatusFailed, getTransStatus(gid))
6465
assertSameBalance(t, before, "mysql")

0 commit comments

Comments
 (0)