Skip to content

Commit 921a6a7

Browse files
authored
update durabletask to use fork and child workflow retries (#656)
* update durabletask to use fork and child workflow retries Signed-off-by: Fabian Martinez <[email protected]> * lint Signed-off-by: Fabian Martinez <[email protected]> --------- Signed-off-by: Fabian Martinez <[email protected]>
1 parent 282a58b commit 921a6a7

15 files changed

+82
-33
lines changed

Diff for: examples/go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ require (
1818
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
1919
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
2020
github.com/cespare/xxhash/v2 v2.3.0 // indirect
21-
github.com/dapr/dapr v1.14.5-0.20241120233620-c86a77f6db5f // indirect
21+
github.com/dapr/dapr v1.15.0-rc.1 // indirect
22+
github.com/dapr/durabletask-go v0.5.1-0.20241127212625-4232880fd198 // indirect
2223
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
2324
github.com/go-chi/chi/v5 v5.1.0 // indirect
2425
github.com/go-logr/logr v1.4.2 // indirect
2526
github.com/go-logr/stdr v1.2.2 // indirect
2627
github.com/golang/protobuf v1.5.4 // indirect
2728
github.com/kr/pretty v0.3.1 // indirect
2829
github.com/marusama/semaphore/v2 v2.5.0 // indirect
29-
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 // indirect
3030
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
3131
go.opentelemetry.io/otel v1.30.0 // indirect
3232
go.opentelemetry.io/otel/metric v1.30.0 // indirect

Diff for: examples/go.sum

+4-4
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY
77
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
88
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
99
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
10-
github.com/dapr/dapr v1.14.5-0.20241120233620-c86a77f6db5f h1:wXPHK2o5FIABU5BvKk/21MN6GKaoUvWc7fESH/hwVls=
11-
github.com/dapr/dapr v1.14.5-0.20241120233620-c86a77f6db5f/go.mod h1:WlsLcudco11+BhaIvg2XyGxD+2GcZf8OTOawd94dAQs=
10+
github.com/dapr/dapr v1.15.0-rc.1 h1:7JP3zSannxQwV27A9pPR2b/DSNmgcSjJOhRDwM4eFpQ=
11+
github.com/dapr/dapr v1.15.0-rc.1/go.mod h1:SycZrBWgfmog+C5T4p0X6VIpnREQ3xajrYxdih+gn9w=
12+
github.com/dapr/durabletask-go v0.5.1-0.20241127212625-4232880fd198 h1:vTq9HmTXow4iVb/1dO4qVZhK7XC5KlwbLl2iNIE23Qc=
13+
github.com/dapr/durabletask-go v0.5.1-0.20241127212625-4232880fd198/go.mod h1:C4ykYCSNv1k2C4wvZv11h2ClkE/qsXw0tv6idOWVmDc=
1214
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1315
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
1416
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -39,8 +41,6 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
3941
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
4042
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
4143
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
42-
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0=
43-
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
4444
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
4545
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
4646
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=

Diff for: go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ go 1.23.3
44

55
require (
66
github.com/dapr/dapr v1.15.0-rc.1
7+
github.com/dapr/durabletask-go v0.5.1-0.20241127212625-4232880fd198
78
github.com/go-chi/chi/v5 v5.1.0
89
github.com/golang/mock v1.6.0
910
github.com/google/uuid v1.6.0
10-
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428
1111
github.com/stretchr/testify v1.9.0
1212
google.golang.org/grpc v1.67.0
1313
google.golang.org/protobuf v1.34.2

Diff for: go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY
33
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
44
github.com/dapr/dapr v1.15.0-rc.1 h1:7JP3zSannxQwV27A9pPR2b/DSNmgcSjJOhRDwM4eFpQ=
55
github.com/dapr/dapr v1.15.0-rc.1/go.mod h1:SycZrBWgfmog+C5T4p0X6VIpnREQ3xajrYxdih+gn9w=
6+
github.com/dapr/durabletask-go v0.5.1-0.20241127212625-4232880fd198 h1:vTq9HmTXow4iVb/1dO4qVZhK7XC5KlwbLl2iNIE23Qc=
7+
github.com/dapr/durabletask-go v0.5.1-0.20241127212625-4232880fd198/go.mod h1:C4ykYCSNv1k2C4wvZv11h2ClkE/qsXw0tv6idOWVmDc=
68
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
79
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
810
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
@@ -28,8 +30,6 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
2830
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
2931
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
3032
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
31-
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0=
32-
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
3333
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
3434
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
3535
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=

Diff for: workflow/activity_context.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121

2222
"google.golang.org/protobuf/types/known/wrapperspb"
2323

24-
"github.com/microsoft/durabletask-go/task"
24+
"github.com/dapr/durabletask-go/task"
2525
)
2626

2727
type ActivityContext struct {
@@ -78,11 +78,11 @@ func ActivityRetryPolicy(policy RetryPolicy) callActivityOption {
7878
}
7979
}
8080

81-
func (opts *callActivityOptions) getRetryPolicy() *task.ActivityRetryPolicy {
81+
func (opts *callActivityOptions) getRetryPolicy() *task.RetryPolicy {
8282
if opts.retryPolicy == nil {
8383
return nil
8484
}
85-
return &task.ActivityRetryPolicy{
85+
return &task.RetryPolicy{
8686
MaxAttempts: opts.retryPolicy.MaxAttempts,
8787
InitialRetryInterval: opts.retryPolicy.InitialRetryInterval,
8888
BackoffCoefficient: opts.retryPolicy.BackoffCoefficient,

Diff for: workflow/activity_context_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ import (
2121
"testing"
2222
"time"
2323

24-
"github.com/microsoft/durabletask-go/task"
2524
"github.com/stretchr/testify/assert"
2625
"github.com/stretchr/testify/require"
26+
27+
"github.com/dapr/durabletask-go/task"
2728
)
2829

2930
type testingTaskActivityContext struct {
@@ -79,7 +80,7 @@ func TestCallActivityOptions(t *testing.T) {
7980
BackoffCoefficient: 2,
8081
MaxRetryInterval: 2 * time.Second,
8182
}))
82-
assert.Equal(t, &task.ActivityRetryPolicy{
83+
assert.Equal(t, &task.RetryPolicy{
8384
MaxAttempts: 3,
8485
InitialRetryInterval: 100 * time.Millisecond,
8586
BackoffCoefficient: 2,

Diff for: workflow/client.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ import (
2020
"fmt"
2121
"time"
2222

23-
"github.com/microsoft/durabletask-go/api"
24-
"github.com/microsoft/durabletask-go/backend"
25-
durabletaskclient "github.com/microsoft/durabletask-go/client"
2623
"google.golang.org/grpc"
2724

25+
"github.com/dapr/durabletask-go/api"
26+
"github.com/dapr/durabletask-go/backend"
27+
durabletaskclient "github.com/dapr/durabletask-go/client"
28+
2829
dapr "github.com/dapr/go-sdk/client"
2930
)
3031

Diff for: workflow/context.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
"fmt"
1919
"time"
2020

21-
"github.com/microsoft/durabletask-go/task"
21+
"github.com/dapr/durabletask-go/task"
2222
)
2323

2424
type WorkflowContext struct {
@@ -68,7 +68,7 @@ func (wfc *WorkflowContext) CallActivity(activity interface{}, opts ...callActiv
6868
}
6969
}
7070

71-
return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue()), task.WithRetryPolicy(options.getRetryPolicy()))
71+
return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue()), task.WithActivityRetryPolicy(options.getRetryPolicy()))
7272
}
7373

7474
// CallChildWorkflow returns a completable task for a given workflow.
@@ -84,9 +84,9 @@ func (wfc *WorkflowContext) CallChildWorkflow(workflow interface{}, opts ...call
8484
}
8585
}
8686
if options.instanceID != "" {
87-
return wfc.orchestrationContext.CallSubOrchestrator(workflow, task.WithRawSubOrchestratorInput(options.rawInput.GetValue()), task.WithSubOrchestrationInstanceID(options.instanceID))
87+
return wfc.orchestrationContext.CallSubOrchestrator(workflow, task.WithRawSubOrchestratorInput(options.rawInput.GetValue()), task.WithSubOrchestrationInstanceID(options.instanceID), task.WithSubOrchestrationRetryPolicy(options.getRetryPolicy()))
8888
}
89-
return wfc.orchestrationContext.CallSubOrchestrator(workflow, task.WithRawSubOrchestratorInput(options.rawInput.GetValue()))
89+
return wfc.orchestrationContext.CallSubOrchestrator(workflow, task.WithRawSubOrchestratorInput(options.rawInput.GetValue()), task.WithSubOrchestrationRetryPolicy(options.getRetryPolicy()))
9090
}
9191

9292
// CreateTimer returns a completable task that blocks for a given duration.

Diff for: workflow/context_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ import (
1818
"testing"
1919
"time"
2020

21-
"github.com/microsoft/durabletask-go/task"
2221
"github.com/stretchr/testify/assert"
2322
"github.com/stretchr/testify/require"
23+
24+
"github.com/dapr/durabletask-go/task"
2425
)
2526

2627
func TestContext(t *testing.T) {

Diff for: workflow/state.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ limitations under the License.
1414
*/
1515
package workflow
1616

17-
import "github.com/microsoft/durabletask-go/api"
17+
import "github.com/dapr/durabletask-go/api"
1818

1919
type Status int
2020

Diff for: workflow/state_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ package workflow
1717
import (
1818
"testing"
1919

20-
"github.com/microsoft/durabletask-go/api"
2120
"github.com/stretchr/testify/assert"
21+
22+
"github.com/dapr/durabletask-go/api"
2223
)
2324

2425
func TestString(t *testing.T) {

Diff for: workflow/worker.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ import (
2525

2626
dapr "github.com/dapr/go-sdk/client"
2727

28-
"github.com/microsoft/durabletask-go/backend"
29-
durabletaskclient "github.com/microsoft/durabletask-go/client"
30-
"github.com/microsoft/durabletask-go/task"
28+
"github.com/dapr/durabletask-go/backend"
29+
durabletaskclient "github.com/dapr/durabletask-go/client"
30+
"github.com/dapr/durabletask-go/task"
3131
)
3232

3333
type WorkflowWorker struct {

Diff for: workflow/worker_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919

2020
daprClient "github.com/dapr/go-sdk/client"
2121

22-
"github.com/microsoft/durabletask-go/task"
22+
"github.com/dapr/durabletask-go/task"
2323

2424
"github.com/stretchr/testify/assert"
2525
"github.com/stretchr/testify/require"

Diff for: workflow/workflow.go

+26-4
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ import (
1818
"fmt"
1919
"time"
2020

21-
"github.com/microsoft/durabletask-go/api"
22-
"github.com/microsoft/durabletask-go/task"
2321
"google.golang.org/protobuf/types/known/wrapperspb"
22+
23+
"github.com/dapr/durabletask-go/api"
24+
"github.com/dapr/durabletask-go/task"
2425
)
2526

2627
type Metadata struct {
@@ -87,8 +88,9 @@ func convertMetadata(orchestrationMetadata *api.OrchestrationMetadata) *Metadata
8788
}
8889

8990
type callChildWorkflowOptions struct {
90-
instanceID string
91-
rawInput *wrapperspb.StringValue
91+
instanceID string
92+
rawInput *wrapperspb.StringValue
93+
retryPolicy *RetryPolicy
9294
}
9395

9496
type callChildWorkflowOption func(*callChildWorkflowOptions) error
@@ -121,6 +123,26 @@ func ChildWorkflowInstanceID(instanceID string) callChildWorkflowOption {
121123
}
122124
}
123125

126+
func ChildWorkflowRetryPolicy(policy RetryPolicy) callChildWorkflowOption {
127+
return func(opts *callChildWorkflowOptions) error {
128+
opts.retryPolicy = &policy
129+
return nil
130+
}
131+
}
132+
133+
func (opts *callChildWorkflowOptions) getRetryPolicy() *task.RetryPolicy {
134+
if opts.retryPolicy == nil {
135+
return nil
136+
}
137+
return &task.RetryPolicy{
138+
MaxAttempts: opts.retryPolicy.MaxAttempts,
139+
InitialRetryInterval: opts.retryPolicy.InitialRetryInterval,
140+
BackoffCoefficient: opts.retryPolicy.BackoffCoefficient,
141+
MaxRetryInterval: opts.retryPolicy.MaxRetryInterval,
142+
RetryTimeout: opts.retryPolicy.RetryTimeout,
143+
}
144+
}
145+
124146
// NewTaskSlice returns a slice of tasks which can be executed in parallel
125147
func NewTaskSlice(length int) []task.Task {
126148
taskSlice := make([]task.Task, length)

Diff for: workflow/workflow_test.go

+24-1
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@ package workflow
22

33
import (
44
"testing"
5+
"time"
56

6-
"github.com/microsoft/durabletask-go/api"
77
"github.com/stretchr/testify/assert"
8+
9+
"github.com/dapr/durabletask-go/api"
10+
"github.com/dapr/durabletask-go/task"
811
)
912

1013
func TestConvertMetadata(t *testing.T) {
@@ -37,6 +40,26 @@ func TestCallChildWorkflowOptions(t *testing.T) {
3740
opts := returnCallChildWorkflowOptions(ChildWorkflowInput(make(chan int)))
3841
assert.Empty(t, opts.rawInput.GetValue())
3942
})
43+
44+
t.Run("child workflow retry policy - set", func(t *testing.T) {
45+
opts := returnCallChildWorkflowOptions(ChildWorkflowRetryPolicy(RetryPolicy{
46+
MaxAttempts: 3,
47+
InitialRetryInterval: 100 * time.Millisecond,
48+
BackoffCoefficient: 2,
49+
MaxRetryInterval: 2 * time.Second,
50+
}))
51+
assert.Equal(t, &task.RetryPolicy{
52+
MaxAttempts: 3,
53+
InitialRetryInterval: 100 * time.Millisecond,
54+
BackoffCoefficient: 2,
55+
MaxRetryInterval: 2 * time.Second,
56+
}, opts.getRetryPolicy())
57+
})
58+
59+
t.Run("child workflow retry policy - empty", func(t *testing.T) {
60+
opts := returnCallChildWorkflowOptions()
61+
assert.Empty(t, opts.getRetryPolicy())
62+
})
4063
}
4164

4265
func returnCallChildWorkflowOptions(opts ...callChildWorkflowOption) callChildWorkflowOptions {

0 commit comments

Comments
 (0)