Skip to content

Commit 4953b12

Browse files
authored
workflows support reuse id policy (#637)
* workflows support reuse id policy Signed-off-by: Fabian Martinez <[email protected]> * add new type Signed-off-by: Fabian Martinez <[email protected]> * lint Signed-off-by: Fabian Martinez <[email protected]> * wrap all types Signed-off-by: Fabian Martinez <[email protected]> * use existing type Signed-off-by: Fabian Martinez <[email protected]> --------- Signed-off-by: Fabian Martinez <[email protected]>
1 parent a074ea7 commit 4953b12

File tree

3 files changed

+54
-1
lines changed

3 files changed

+54
-1
lines changed

workflow/client.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,19 @@ type Client struct {
3131
taskHubClient *durabletaskclient.TaskHubGrpcClient
3232
}
3333

34+
type WorkflowIDReusePolicy struct {
35+
OperationStatus []Status
36+
Action CreateWorkflowAction
37+
}
38+
39+
type CreateWorkflowAction = api.CreateOrchestrationAction
40+
41+
const (
42+
ReuseIDActionError CreateWorkflowAction = api.REUSE_ID_ACTION_ERROR
43+
ReuseIDActionIgnore CreateWorkflowAction = api.REUSE_ID_ACTION_IGNORE
44+
ReuseIDActionTerminate CreateWorkflowAction = api.REUSE_ID_ACTION_TERMINATE
45+
)
46+
3447
// WithInstanceID is an option to set an InstanceID when scheduling a new workflow.
3548
func WithInstanceID(id string) api.NewOrchestrationOptions {
3649
return api.WithInstanceID(api.InstanceID(id))
@@ -53,6 +66,13 @@ func WithStartTime(time time.Time) api.NewOrchestrationOptions {
5366
return api.WithStartTime(time)
5467
}
5568

69+
func WithReuseIDPolicy(policy WorkflowIDReusePolicy) api.NewOrchestrationOptions {
70+
return api.WithOrchestrationIdReusePolicy(&api.OrchestrationIdReusePolicy{
71+
OperationStatus: convertStatusSlice(policy.OperationStatus),
72+
Action: policy.Action,
73+
})
74+
}
75+
5676
// WithFetchPayloads is an option to return the payload from a workflow.
5777
func WithFetchPayloads(fetchPayloads bool) api.FetchOrchestrationMetadataOptions {
5878
return api.WithFetchPayloads(fetchPayloads)

workflow/client_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,10 @@ func TestClientMethods(t *testing.T) {
5454
}
5555
ctx := context.Background()
5656
t.Run("ScheduleNewWorkflow - empty wf name", func(t *testing.T) {
57-
id, err := testClient.ScheduleNewWorkflow(ctx, "", nil)
57+
id, err := testClient.ScheduleNewWorkflow(ctx, "", WithReuseIDPolicy(WorkflowIDReusePolicy{
58+
OperationStatus: []Status{StatusCompleted},
59+
Action: ReuseIDActionIgnore,
60+
}))
5861
require.Error(t, err)
5962
assert.Empty(t, id)
6063
})

workflow/state.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,28 @@ func (s Status) String() string {
4848
return status[s]
4949
}
5050

51+
func (s Status) RuntimeStatus() api.OrchestrationStatus {
52+
switch s {
53+
case StatusRunning:
54+
return api.RUNTIME_STATUS_RUNNING
55+
case StatusCompleted:
56+
return api.RUNTIME_STATUS_COMPLETED
57+
case StatusContinuedAsNew:
58+
return api.RUNTIME_STATUS_CONTINUED_AS_NEW
59+
case StatusFailed:
60+
return api.RUNTIME_STATUS_FAILED
61+
case StatusCanceled:
62+
return api.RUNTIME_STATUS_CANCELED
63+
case StatusTerminated:
64+
return api.RUNTIME_STATUS_TERMINATED
65+
case StatusPending:
66+
return api.RUNTIME_STATUS_PENDING
67+
case StatusSuspended:
68+
return api.RUNTIME_STATUS_SUSPENDED
69+
}
70+
return -1
71+
}
72+
5173
type WorkflowState struct {
5274
Metadata api.OrchestrationMetadata
5375
}
@@ -57,3 +79,11 @@ func (wfs *WorkflowState) RuntimeStatus() Status {
5779
s := Status(wfs.Metadata.RuntimeStatus.Number())
5880
return s
5981
}
82+
83+
func convertStatusSlice(ss []Status) []api.OrchestrationStatus {
84+
out := []api.OrchestrationStatus{}
85+
for _, s := range ss {
86+
out = append(out, s.RuntimeStatus())
87+
}
88+
return out
89+
}

0 commit comments

Comments
 (0)