diff --git a/examples/go.mod b/examples/go.mod index 3a05d251..d504837f 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -26,7 +26,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/marusama/semaphore/v2 v2.5.0 // indirect - github.com/microsoft/durabletask-go v0.5.0 // indirect + github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d // indirect github.com/xhit/go-str2duration/v2 v2.1.0 // indirect go.opentelemetry.io/otel v1.27.0 // indirect go.opentelemetry.io/otel/metric v1.27.0 // indirect diff --git a/examples/go.sum b/examples/go.sum index 28f207fb..b3b2f562 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -39,8 +39,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM= github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ= -github.com/microsoft/durabletask-go v0.5.0 h1:4DWBgg05wnkV/VwakaiPqZ4cARvATP74ZQJFcXVMC18= -github.com/microsoft/durabletask-go v0.5.0/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= +github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw= +github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= diff --git a/examples/workflow/README.md b/examples/workflow/README.md index 85e01e74..54b9e5fe 100644 --- a/examples/workflow/README.md +++ b/examples/workflow/README.md @@ -26,16 +26,9 @@ expected_stdout_lines: - '== APP == workflow purged' - '== APP == stage: 2' - '== APP == workflow started with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9' + - '== APP == workflow status: RUNNING' - '== APP == workflow terminated' - '== APP == workflow purged' - - '== APP == workflow client test' - - '== APP == [wfclient] started workflow with id: a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9' - - '== APP == [wfclient] workflow status: RUNNING' - - '== APP == [wfclient] stage: 1' - - '== APP == [wfclient] event raised' - - '== APP == [wfclient] stage: 2' - - '== APP == [wfclient] workflow terminated' - - '== APP == [wfclient] workflow purged' - '== APP == workflow worker successfully shutdown' background: true diff --git a/examples/workflow/main.go b/examples/workflow/main.go index 99c16407..0e5677dd 100644 --- a/examples/workflow/main.go +++ b/examples/workflow/main.go @@ -20,16 +20,11 @@ import ( "log" "time" - "github.com/dapr/go-sdk/client" "github.com/dapr/go-sdk/workflow" ) var stage = 0 -const ( - workflowComponent = "dapr" -) - func main() { w, err := workflow.NewWorker() if err != nil { @@ -54,70 +49,49 @@ func main() { } fmt.Println("runner started") - daprClient, err := client.NewClient() + wfClient, err := workflow.NewClient() if err != nil { log.Fatalf("failed to intialise client: %v", err) } - defer daprClient.Close() + defer wfClient.Close() ctx := context.Background() // Start workflow test - respStart, err := daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - WorkflowName: "TestWorkflow", - Options: nil, - Input: 1, - SendRawInput: false, - }) + instanceID, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1)) if err != nil { log.Fatalf("failed to start workflow: %v", err) } - fmt.Printf("workflow started with id: %v\n", respStart.InstanceID) + fmt.Printf("workflow started with id: %v\n", instanceID) // Pause workflow test - err = daprClient.PauseWorkflowBeta1(ctx, &client.PauseWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) - + err = wfClient.SuspendWorkflow(ctx, instanceID, "") if err != nil { log.Fatalf("failed to pause workflow: %v", err) } - respGet, err := daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) + respFetch, err := wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true)) if err != nil { - log.Fatalf("failed to get workflow: %v", err) + log.Fatalf("failed to fetch workflow: %v", err) } - if respGet.RuntimeStatus != workflow.StatusSuspended.String() { - log.Fatalf("workflow not paused: %v", respGet.RuntimeStatus) + if respFetch.RuntimeStatus != workflow.StatusSuspended { + log.Fatalf("workflow not paused: %v", respFetch.RuntimeStatus) } fmt.Printf("workflow paused\n") // Resume workflow test - err = daprClient.ResumeWorkflowBeta1(ctx, &client.ResumeWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) - + err = wfClient.ResumeWorkflow(ctx, instanceID, "") if err != nil { log.Fatalf("failed to resume workflow: %v", err) } - respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) + respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true)) if err != nil { log.Fatalf("failed to get workflow: %v", err) } - if respGet.RuntimeStatus != workflow.StatusRunning.String() { + if respFetch.RuntimeStatus != workflow.StatusRunning { log.Fatalf("workflow not running") } @@ -127,14 +101,7 @@ func main() { // Raise Event Test - err = daprClient.RaiseEventWorkflowBeta1(ctx, &client.RaiseEventWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - EventName: "testEvent", - EventData: "testData", - SendRawData: false, - }) - + err = wfClient.RaiseEvent(ctx, instanceID, "testEvent", workflow.WithEventPayload("testData")) if err != nil { fmt.Printf("failed to raise event: %v", err) } @@ -145,31 +112,22 @@ func main() { fmt.Printf("stage: %d\n", stage) - respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) + respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true)) if err != nil { log.Fatalf("failed to get workflow: %v", err) } - fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus) + fmt.Printf("workflow status: %v\n", respFetch.RuntimeStatus) // Purge workflow test - err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) + err = wfClient.PurgeWorkflow(ctx, instanceID) if err != nil { log.Fatalf("failed to purge workflow: %v", err) } - respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) - if err != nil && respGet != nil { - log.Fatal("failed to purge workflow") + respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true)) + if err == nil || respFetch != nil { + log.Fatalf("failed to purge workflow: %v", err) } fmt.Println("workflow purged") @@ -177,120 +135,30 @@ func main() { fmt.Printf("stage: %d\n", stage) // Terminate workflow test - respStart, err = daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - WorkflowName: "TestWorkflow", - Options: nil, - Input: 1, - SendRawInput: false, - }) + id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1)) if err != nil { log.Fatalf("failed to start workflow: %v", err) } + fmt.Printf("workflow started with id: %v\n", instanceID) - fmt.Printf("workflow started with id: %s\n", respStart.InstanceID) - - err = daprClient.TerminateWorkflowBeta1(ctx, &client.TerminateWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) + metadata, err := wfClient.WaitForWorkflowStart(ctx, id) if err != nil { - log.Fatalf("failed to terminate workflow: %v", err) + log.Fatalf("failed to get workflow: %v", err) } + fmt.Printf("workflow status: %s\n", metadata.RuntimeStatus.String()) - respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) + err = wfClient.TerminateWorkflow(ctx, id) if err != nil { - log.Fatalf("failed to get workflow: %v", err) - } - if respGet.RuntimeStatus != workflow.StatusTerminated.String() { - log.Fatal("failed to terminate workflow") + log.Fatalf("failed to terminate workflow: %v", err) } - fmt.Println("workflow terminated") - err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) - - respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{ - InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9", - WorkflowComponent: workflowComponent, - }) - if err == nil || respGet != nil { + err = wfClient.PurgeWorkflow(ctx, id) + if err != nil { log.Fatalf("failed to purge workflow: %v", err) } - fmt.Println("workflow purged") - // WFClient - // TODO: Expand client validation - - stage = 0 - fmt.Println("workflow client test") - - wfClient, err := workflow.NewClient() - if err != nil { - log.Fatalf("[wfclient] faield to initialize: %v", err) - } - - id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1)) - if err != nil { - log.Fatalf("[wfclient] failed to start workflow: %v", err) - } - - fmt.Printf("[wfclient] started workflow with id: %s\n", id) - - metadata, err := wfClient.FetchWorkflowMetadata(ctx, id) - if err != nil { - log.Fatalf("[wfclient] failed to get worfklow: %v", err) - } - - fmt.Printf("[wfclient] workflow status: %v\n", metadata.RuntimeStatus.String()) - - if stage != 1 { - log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage) - } - - fmt.Printf("[wfclient] stage: %d\n", stage) - - // TODO: WaitForWorkflowStart - // TODO: WaitForWorkflowCompletion - - // raise event - - if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil { - log.Fatalf("[wfclient] failed to raise event: %v", err) - } - - fmt.Println("[wfclient] event raised") - - // Sleep to allow the workflow to advance - time.Sleep(time.Second) - - if stage != 2 { - log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage) - } - - fmt.Printf("[wfclient] stage: %d\n", stage) - - // stop workflow - if err := wfClient.TerminateWorkflow(ctx, id); err != nil { - log.Fatalf("[wfclient] failed to terminate workflow: %v", err) - } - - fmt.Println("[wfclient] workflow terminated") - - if err := wfClient.PurgeWorkflow(ctx, id); err != nil { - log.Fatalf("[wfclient] failed to purge workflow: %v", err) - } - - fmt.Println("[wfclient] workflow purged") - // stop workflow runtime if err := w.Shutdown(); err != nil { log.Fatalf("failed to shutdown runtime: %v", err) diff --git a/go.mod b/go.mod index 59ef8c0e..139a636c 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/go-chi/chi/v5 v5.1.0 github.com/golang/mock v1.6.0 github.com/google/uuid v1.6.0 - github.com/microsoft/durabletask-go v0.5.0 + github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d github.com/stretchr/testify v1.9.0 google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 diff --git a/go.sum b/go.sum index b2628cee..0db5a317 100644 --- a/go.sum +++ b/go.sum @@ -28,8 +28,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM= github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ= -github.com/microsoft/durabletask-go v0.5.0 h1:4DWBgg05wnkV/VwakaiPqZ4cARvATP74ZQJFcXVMC18= -github.com/microsoft/durabletask-go v0.5.0/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= +github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d h1:Phnx8/wPd9BM6RPIjlqNl8kAaFjtU+Sdw9CzmZd8Wsw= +github.com/microsoft/durabletask-go v0.5.1-0.20241014200046-fac9dd959f4d/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= diff --git a/service/http/topic.go b/service/http/topic.go index 9e04eaa6..8cb87a69 100644 --- a/service/http/topic.go +++ b/service/http/topic.go @@ -190,9 +190,11 @@ func (s *Server) registerBaseHandler() { err := runtime.GetActorRuntimeInstanceContext().Deactivate(r.Context(), actorType, actorID) if err == actorErr.ErrActorTypeNotFound || err == actorErr.ErrActorIDNotFound { w.WriteHeader(http.StatusNotFound) + return } if err != actorErr.Success { w.WriteHeader(http.StatusInternalServerError) + return } w.WriteHeader(http.StatusOK) } @@ -207,9 +209,11 @@ func (s *Server) registerBaseHandler() { err := runtime.GetActorRuntimeInstanceContext().InvokeReminder(r.Context(), actorType, actorID, reminderName, reqData) if err == actorErr.ErrActorTypeNotFound { w.WriteHeader(http.StatusNotFound) + return } if err != actorErr.Success { w.WriteHeader(http.StatusInternalServerError) + return } w.WriteHeader(http.StatusOK) } @@ -224,9 +228,11 @@ func (s *Server) registerBaseHandler() { err := runtime.GetActorRuntimeInstanceContext().InvokeTimer(r.Context(), actorType, actorID, timerName, reqData) if err == actorErr.ErrActorTypeNotFound { w.WriteHeader(http.StatusNotFound) + return } if err != actorErr.Success { w.WriteHeader(http.StatusInternalServerError) + return } w.WriteHeader(http.StatusOK) } diff --git a/workflow/client.go b/workflow/client.go index 9a9a053f..1cbe0229 100644 --- a/workflow/client.go +++ b/workflow/client.go @@ -23,14 +23,29 @@ import ( "github.com/microsoft/durabletask-go/api" "github.com/microsoft/durabletask-go/backend" durabletaskclient "github.com/microsoft/durabletask-go/client" + "google.golang.org/grpc" dapr "github.com/dapr/go-sdk/client" ) type Client struct { + conn *grpc.ClientConn taskHubClient *durabletaskclient.TaskHubGrpcClient } +type WorkflowIDReusePolicy struct { + OperationStatus []Status + Action CreateWorkflowAction +} + +type CreateWorkflowAction = api.CreateOrchestrationAction + +const ( + ReuseIDActionError CreateWorkflowAction = api.REUSE_ID_ACTION_ERROR + ReuseIDActionIgnore CreateWorkflowAction = api.REUSE_ID_ACTION_IGNORE + ReuseIDActionTerminate CreateWorkflowAction = api.REUSE_ID_ACTION_TERMINATE +) + // WithInstanceID is an option to set an InstanceID when scheduling a new workflow. func WithInstanceID(id string) api.NewOrchestrationOptions { return api.WithInstanceID(api.InstanceID(id)) @@ -53,6 +68,13 @@ func WithStartTime(time time.Time) api.NewOrchestrationOptions { return api.WithStartTime(time) } +func WithReuseIDPolicy(policy WorkflowIDReusePolicy) api.NewOrchestrationOptions { + return api.WithOrchestrationIdReusePolicy(&api.OrchestrationIdReusePolicy{ + OperationStatus: convertStatusSlice(policy.OperationStatus), + Action: policy.Action, + }) +} + // WithFetchPayloads is an option to return the payload from a workflow. func WithFetchPayloads(fetchPayloads bool) api.FetchOrchestrationMetadataOptions { return api.WithFetchPayloads(fetchPayloads) @@ -78,6 +100,16 @@ func WithRawOutput(data string) api.TerminateOptions { return api.WithRawOutput(data) } +// WithRecursiveTerminate configures whether to terminate all sub-workflows created by the target workflow. +func WithRecursiveTerminate(recursive bool) api.TerminateOptions { + return api.WithRecursiveTerminate(recursive) +} + +// WithRecursivePurge configures whether to purge all sub-workflows created by the target workflow. +func WithRecursivePurge(recursive bool) api.PurgeOptions { + return api.WithRecursivePurge(recursive) +} + type clientOption func(*clientOptions) error type clientOptions struct { @@ -113,9 +145,11 @@ func NewClient(opts ...clientOption) (*Client, error) { return &Client{}, fmt.Errorf("failed to initialise dapr.Client: %v", err) } - taskHubClient := durabletaskclient.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger()) + conn := daprClient.GrpcClientConn() + taskHubClient := durabletaskclient.NewTaskHubGrpcClient(conn, backend.DefaultLogger()) return &Client{ + conn: conn, taskHubClient: taskHubClient, }, nil } @@ -205,9 +239,13 @@ func (c *Client) ResumeWorkflow(ctx context.Context, id, reason string) error { // PurgeWorkflow will purge a given workflow and return an error output. // NOTE: The workflow must be in a terminated or completed state. -func (c *Client) PurgeWorkflow(ctx context.Context, id string) error { +func (c *Client) PurgeWorkflow(ctx context.Context, id string, opts ...api.PurgeOptions) error { if id == "" { return errors.New("no workflow id specified") } - return c.taskHubClient.PurgeOrchestrationState(ctx, api.InstanceID(id)) + return c.taskHubClient.PurgeOrchestrationState(ctx, api.InstanceID(id), opts...) +} + +func (c *Client) Close() { + _ = c.conn.Close() } diff --git a/workflow/client_test.go b/workflow/client_test.go index ee6ce64e..8cc57ca2 100644 --- a/workflow/client_test.go +++ b/workflow/client_test.go @@ -54,7 +54,10 @@ func TestClientMethods(t *testing.T) { } ctx := context.Background() t.Run("ScheduleNewWorkflow - empty wf name", func(t *testing.T) { - id, err := testClient.ScheduleNewWorkflow(ctx, "", nil) + id, err := testClient.ScheduleNewWorkflow(ctx, "", WithReuseIDPolicy(WorkflowIDReusePolicy{ + OperationStatus: []Status{StatusCompleted}, + Action: ReuseIDActionIgnore, + })) require.Error(t, err) assert.Empty(t, id) }) diff --git a/workflow/context.go b/workflow/context.go index 7bec4f25..5bf77cbe 100644 --- a/workflow/context.go +++ b/workflow/context.go @@ -50,6 +50,11 @@ func (wfc *WorkflowContext) IsReplaying() bool { return wfc.orchestrationContext.IsReplaying } +// SetCustomStatus sets custom status to the workflow context +func (wfc *WorkflowContext) SetCustomStatus(cs string) { + wfc.orchestrationContext.SetCustomStatus(cs) +} + // CallActivity returns a completable task for a given activity. // You must call Await(output any) on the returned Task to block the workflow and wait for the task to complete. // The value passed to the Await method must be a pointer or can be nil to ignore the returned value. diff --git a/workflow/state.go b/workflow/state.go index 969b9dad..7dce21af 100644 --- a/workflow/state.go +++ b/workflow/state.go @@ -48,6 +48,28 @@ func (s Status) String() string { return status[s] } +func (s Status) RuntimeStatus() api.OrchestrationStatus { + switch s { + case StatusRunning: + return api.RUNTIME_STATUS_RUNNING + case StatusCompleted: + return api.RUNTIME_STATUS_COMPLETED + case StatusContinuedAsNew: + return api.RUNTIME_STATUS_CONTINUED_AS_NEW + case StatusFailed: + return api.RUNTIME_STATUS_FAILED + case StatusCanceled: + return api.RUNTIME_STATUS_CANCELED + case StatusTerminated: + return api.RUNTIME_STATUS_TERMINATED + case StatusPending: + return api.RUNTIME_STATUS_PENDING + case StatusSuspended: + return api.RUNTIME_STATUS_SUSPENDED + } + return -1 +} + type WorkflowState struct { Metadata api.OrchestrationMetadata } @@ -57,3 +79,11 @@ func (wfs *WorkflowState) RuntimeStatus() Status { s := Status(wfs.Metadata.RuntimeStatus.Number()) return s } + +func convertStatusSlice(ss []Status) []api.OrchestrationStatus { + out := []api.OrchestrationStatus{} + for _, s := range ss { + out = append(out, s.RuntimeStatus()) + } + return out +}