Skip to content

Commit 9a656d4

Browse files
committed
worflows: activity retry policy
Signed-off-by: Fabian Martinez <[email protected]>
1 parent dd9a2d5 commit 9a656d4

File tree

7 files changed

+44
-9
lines changed

7 files changed

+44
-9
lines changed

examples/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ require (
2626
github.com/golang/protobuf v1.5.4 // indirect
2727
github.com/kr/pretty v0.3.1 // indirect
2828
github.com/marusama/semaphore/v2 v2.5.0 // indirect
29-
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d // indirect
29+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 // indirect
3030
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
3131
go.opentelemetry.io/otel v1.27.0 // indirect
3232
go.opentelemetry.io/otel/metric v1.27.0 // indirect

examples/go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
3939
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
4040
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
4141
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
42-
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw=
43-
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
42+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0=
43+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
4444
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
4545
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
4646
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=

examples/workflow/main.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,12 @@ func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
182182
return nil, err
183183
}
184184

185-
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
185+
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input), workflow.RetryPolicy(workflow.ActivityRetryPolicy{
186+
MaxAttempts: 3,
187+
InitialRetryInterval: 1 * time.Second,
188+
BackoffCoefficient: 2,
189+
MaxRetryInterval: 3 * time.Second,
190+
})).Await(&output); err != nil {
186191
return nil, err
187192
}
188193

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/go-chi/chi/v5 v5.1.0
88
github.com/golang/mock v1.6.0
99
github.com/google/uuid v1.6.0
10-
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d
10+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428
1111
github.com/stretchr/testify v1.9.0
1212
google.golang.org/grpc v1.65.0
1313
google.golang.org/protobuf v1.34.2

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
2828
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
2929
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
3030
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
31-
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw=
32-
github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
31+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0=
32+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
3333
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
3434
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
3535
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=

workflow/activity_context.go

+31-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package workflow
1717
import (
1818
"context"
1919
"encoding/json"
20+
"time"
2021

2122
"google.golang.org/protobuf/types/known/wrapperspb"
2223

@@ -38,7 +39,16 @@ func (wfac *ActivityContext) Context() context.Context {
3839
type callActivityOption func(*callActivityOptions) error
3940

4041
type callActivityOptions struct {
41-
rawInput *wrapperspb.StringValue
42+
rawInput *wrapperspb.StringValue
43+
retryPolicy *ActivityRetryPolicy
44+
}
45+
46+
type ActivityRetryPolicy struct {
47+
MaxAttempts int
48+
InitialRetryInterval time.Duration
49+
BackoffCoefficient float64
50+
MaxRetryInterval time.Duration
51+
RetryTimeout time.Duration
4252
}
4353

4454
// ActivityInput is an option to pass a JSON-serializable input
@@ -61,6 +71,26 @@ func ActivityRawInput(input string) callActivityOption {
6171
}
6272
}
6373

74+
func RetryPolicy(policy ActivityRetryPolicy) callActivityOption {
75+
return func(opts *callActivityOptions) error {
76+
opts.retryPolicy = &policy
77+
return nil
78+
}
79+
}
80+
81+
func (opts *callActivityOptions) getRetryPolicy() *task.ActivityRetryPolicy {
82+
if opts.retryPolicy == nil {
83+
return nil
84+
}
85+
return &task.ActivityRetryPolicy{
86+
MaxAttempts: opts.retryPolicy.MaxAttempts,
87+
InitialRetryInterval: opts.retryPolicy.InitialRetryInterval,
88+
BackoffCoefficient: opts.retryPolicy.BackoffCoefficient,
89+
MaxRetryInterval: opts.retryPolicy.MaxRetryInterval,
90+
RetryTimeout: opts.retryPolicy.RetryTimeout,
91+
}
92+
}
93+
6494
func marshalData(input any) ([]byte, error) {
6595
if input == nil {
6696
return nil, nil

workflow/context.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (wfc *WorkflowContext) CallActivity(activity interface{}, opts ...callActiv
6868
}
6969
}
7070

71-
return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue()))
71+
return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue()), task.WithRetryPolicy(options.getRetryPolicy()))
7272
}
7373

7474
// CallChildWorkflow returns a completable task for a given workflow.

0 commit comments

Comments
 (0)