Skip to content

Commit 4f6c0e1

Browse files
committed
worflows: activity retry policy
Signed-off-by: Fabian Martinez <[email protected]>
1 parent f9baae2 commit 4f6c0e1

File tree

7 files changed

+44
-9
lines changed

7 files changed

+44
-9
lines changed

examples/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ require (
2626
github.com/golang/protobuf v1.5.4 // indirect
2727
github.com/kr/pretty v0.3.1 // indirect
2828
github.com/marusama/semaphore/v2 v2.5.0 // indirect
29-
github.com/microsoft/durabletask-go v0.5.0 // indirect
29+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 // indirect
3030
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
3131
go.opentelemetry.io/otel v1.27.0 // indirect
3232
go.opentelemetry.io/otel/metric v1.27.0 // indirect

examples/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
3939
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
4040
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
4141
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
42-
github.com/microsoft/durabletask-go v0.5.0 h1:4DWBgg05wnkV/VwakaiPqZ4cARvATP74ZQJFcXVMC18=
43-
github.com/microsoft/durabletask-go v0.5.0/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
42+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0=
43+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
4444
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
4545
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
4646
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=

examples/workflow/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,12 @@ func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
314314
return nil, err
315315
}
316316

317-
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
317+
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input), workflow.RetryPolicy(workflow.ActivityRetryPolicy{
318+
MaxAttempts: 3,
319+
InitialRetryInterval: 1 * time.Second,
320+
BackoffCoefficient: 2,
321+
MaxRetryInterval: 3 * time.Second,
322+
})).Await(&output); err != nil {
318323
return nil, err
319324
}
320325

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ require (
77
github.com/go-chi/chi/v5 v5.1.0
88
github.com/golang/mock v1.6.0
99
github.com/google/uuid v1.6.0
10-
github.com/microsoft/durabletask-go v0.5.0
10+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428
1111
github.com/stretchr/testify v1.9.0
1212
google.golang.org/grpc v1.65.0
1313
google.golang.org/protobuf v1.34.2

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
2828
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
2929
github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM=
3030
github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ=
31-
github.com/microsoft/durabletask-go v0.5.0 h1:4DWBgg05wnkV/VwakaiPqZ4cARvATP74ZQJFcXVMC18=
32-
github.com/microsoft/durabletask-go v0.5.0/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
31+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428 h1:I1yeX4tWqOdBzpRzSbY1TnHU2NI25Pdu6OXUm39emm0=
32+
github.com/microsoft/durabletask-go v0.5.1-0.20241024170039-0c4afbc95428/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE=
3333
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
3434
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
3535
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=

workflow/activity_context.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package workflow
1717
import (
1818
"context"
1919
"encoding/json"
20+
"time"
2021

2122
"google.golang.org/protobuf/types/known/wrapperspb"
2223

@@ -38,7 +39,16 @@ func (wfac *ActivityContext) Context() context.Context {
3839
type callActivityOption func(*callActivityOptions) error
3940

4041
type callActivityOptions struct {
41-
rawInput *wrapperspb.StringValue
42+
rawInput *wrapperspb.StringValue
43+
retryPolicy *ActivityRetryPolicy
44+
}
45+
46+
type ActivityRetryPolicy struct {
47+
MaxAttempts int
48+
InitialRetryInterval time.Duration
49+
BackoffCoefficient float64
50+
MaxRetryInterval time.Duration
51+
RetryTimeout time.Duration
4252
}
4353

4454
// ActivityInput is an option to pass a JSON-serializable input
@@ -61,6 +71,26 @@ func ActivityRawInput(input string) callActivityOption {
6171
}
6272
}
6373

74+
func RetryPolicy(policy ActivityRetryPolicy) callActivityOption {
75+
return func(opts *callActivityOptions) error {
76+
opts.retryPolicy = &policy
77+
return nil
78+
}
79+
}
80+
81+
func (opts *callActivityOptions) getRetryPolicy() *task.ActivityRetryPolicy {
82+
if opts.retryPolicy == nil {
83+
return nil
84+
}
85+
return &task.ActivityRetryPolicy{
86+
MaxAttempts: opts.retryPolicy.MaxAttempts,
87+
InitialRetryInterval: opts.retryPolicy.InitialRetryInterval,
88+
BackoffCoefficient: opts.retryPolicy.BackoffCoefficient,
89+
MaxRetryInterval: opts.retryPolicy.MaxRetryInterval,
90+
RetryTimeout: opts.retryPolicy.RetryTimeout,
91+
}
92+
}
93+
6494
func marshalData(input any) ([]byte, error) {
6595
if input == nil {
6696
return nil, nil

workflow/context.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (wfc *WorkflowContext) CallActivity(activity interface{}, opts ...callActiv
6363
}
6464
}
6565

66-
return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue()))
66+
return wfc.orchestrationContext.CallActivity(activity, task.WithRawActivityInput(options.rawInput.GetValue()), task.WithRetryPolicy(options.getRetryPolicy()))
6767
}
6868

6969
// CallChildWorkflow returns a completable task for a given workflow.

0 commit comments

Comments
 (0)