From 2eb10de9f5589bc2c59e0eaca2904d8b53a070c4 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Tue, 17 Mar 2026 12:45:41 +0000 Subject: [PATCH 1/3] Add `wf purge --all-filter-status` flag Allow filtering by runtime status when purging workflow instances with `--all-older-than`. This enables purging only instances in a specific state (e.g., COMPLETED, FAILED, TERMINATED) rather than all terminal instances. The flag is mutually exclusive with `--all` and requires `--all-older-than` to be set. Signed-off-by: joshvanl --- cmd/workflow/purge.go | 46 ++++++++-- cmd/workflow/purge_test.go | 58 ++++++++++++ pkg/workflow/purge.go | 13 ++- pkg/workflow/purge_test.go | 124 ++++++++++++++++++++++++++ tests/e2e/standalone/workflow_test.go | 87 ++++++++++++++++++ 5 files changed, 320 insertions(+), 8 deletions(-) create mode 100644 cmd/workflow/purge_test.go create mode 100644 pkg/workflow/purge_test.go diff --git a/cmd/workflow/purge.go b/cmd/workflow/purge.go index 3197fafb4..cc193540c 100644 --- a/cmd/workflow/purge.go +++ b/cmd/workflow/purge.go @@ -15,6 +15,8 @@ package workflow import ( "errors" + "slices" + "strings" "github.com/dapr/cli/pkg/workflow" "github.com/dapr/kit/signals" @@ -22,11 +24,12 @@ import ( ) var ( - flagPurgeOlderThan string - flagPurgeAll bool - flagPurgeConn *connFlag - flagPurgeForce bool - schedulerNamespace string + flagPurgeOlderThan string + flagPurgeAll bool + flagPurgeConn *connFlag + flagPurgeForce bool + flagPurgeFilterStatus string + schedulerNamespace string ) var PurgeCmd = &cobra.Command{ @@ -41,6 +44,9 @@ var PurgeCmd = &cobra.Command{ return errors.New("no arguments are accepted when using purge all flags") } default: + if cmd.Flags().Changed("all-filter-status") { + return errors.New("--all-filter-status can only be used with --all-older-than") + } if len(args) == 0 { return errors.New("one or more workflow instance ID arguments are required when not using purge all flags") } @@ -75,14 +81,44 @@ var PurgeCmd = &cobra.Command{ } } + if cmd.Flags().Changed("all-filter-status") { + opts.AllFilterStatus = &flagPurgeFilterStatus + } + return workflow.Purge(ctx, opts) }, } +var purgeFilterStatuses = []string{ + "RUNNING", + "COMPLETED", + "CONTINUED_AS_NEW", + "FAILED", + "CANCELED", + "TERMINATED", + "PENDING", + "SUSPENDED", +} + func init() { PurgeCmd.Flags().StringVar(&flagPurgeOlderThan, "all-older-than", "", "Purge workflow instances older than the specified Go duration or timestamp, e.g., '24h' or '2023-01-02T15:04:05Z'.") PurgeCmd.Flags().BoolVar(&flagPurgeAll, "all", false, "Purge all workflow instances in a terminal state. Use with caution.") + PurgeCmd.Flags().StringVar(&flagPurgeFilterStatus, "all-filter-status", "", "Filter purge to only workflow instances with the given runtime status. Must be used with --all-older-than. One of "+strings.Join(purgeFilterStatuses, ", ")) PurgeCmd.MarkFlagsMutuallyExclusive("all-older-than", "all") + PurgeCmd.MarkFlagsMutuallyExclusive("all-filter-status", "all") + + pre := PurgeCmd.PreRunE + PurgeCmd.PreRunE = func(cmd *cobra.Command, args []string) error { + if cmd.Flags().Changed("all-filter-status") { + if !slices.Contains(purgeFilterStatuses, flagPurgeFilterStatus) { + return errors.New("invalid value for --all-filter-status. Supported values are " + strings.Join(purgeFilterStatuses, ", ")) + } + } + if pre != nil { + return pre(cmd, args) + } + return nil + } PurgeCmd.Flags().BoolVar(&flagPurgeForce, "force", false, "force will force a purge of a workflow, regardless of its current runtime state, or whether an active worker can process it, the backend will attempt to delete it anyway. This necessarily means the purging is executed out side of the workflow state machine, and therefore, can lead to corrupt state or broken workflow execution. Usage of this should _only_ be used when you know the workflow is not being currently processed. It is highly recommended to avoid using this flag unless absolutely necessary.") PurgeCmd.Flags().StringVar(&schedulerNamespace, "scheduler-namespace", "dapr-system", "Kubernetes namespace where the scheduler is deployed, only relevant if --kubernetes is set") diff --git a/cmd/workflow/purge_test.go b/cmd/workflow/purge_test.go new file mode 100644 index 000000000..d0d2f10ff --- /dev/null +++ b/cmd/workflow/purge_test.go @@ -0,0 +1,58 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPurgeFilterStatuses(t *testing.T) { + expected := []string{ + "RUNNING", + "COMPLETED", + "CONTINUED_AS_NEW", + "FAILED", + "CANCELED", + "TERMINATED", + "PENDING", + "SUSPENDED", + } + assert.Equal(t, expected, purgeFilterStatuses) +} + +func TestPurgeCmdFlags(t *testing.T) { + t.Run("all-filter-status flag is registered", func(t *testing.T) { + f := PurgeCmd.Flags().Lookup("all-filter-status") + assert.NotNil(t, f) + assert.Equal(t, "string", f.Value.Type()) + assert.Contains(t, f.Usage, "Must be used with --all-older-than") + }) + + t.Run("all-filter-status and all are mutually exclusive", func(t *testing.T) { + // The mutual exclusivity is registered via MarkFlagsMutuallyExclusive. + // We verify the flag group exists by checking that the command + // has both flags and that they are correctly configured. + allFlag := PurgeCmd.Flags().Lookup("all") + assert.NotNil(t, allFlag) + filterFlag := PurgeCmd.Flags().Lookup("all-filter-status") + assert.NotNil(t, filterFlag) + }) + + t.Run("all-older-than flag is registered", func(t *testing.T) { + f := PurgeCmd.Flags().Lookup("all-older-than") + assert.NotNil(t, f) + }) +} diff --git a/pkg/workflow/purge.go b/pkg/workflow/purge.go index 711473009..5cdacca5f 100644 --- a/pkg/workflow/purge.go +++ b/pkg/workflow/purge.go @@ -32,6 +32,7 @@ type PurgeOptions struct { AppID string InstanceIDs []string AllOlderThan *time.Time + AllFilterStatus *string All bool Force bool @@ -45,6 +46,14 @@ func Purge(ctx context.Context, opts PurgeOptions) error { if len(opts.InstanceIDs) > 0 { toPurge = opts.InstanceIDs } else { + filter := Filter{ + Terminal: true, + } + if opts.AllFilterStatus != nil { + filter.Terminal = false + filter.Status = opts.AllFilterStatus + } + var list []*ListOutputWide var err error list, err = ListWide(ctx, ListOptions{ @@ -53,9 +62,7 @@ func Purge(ctx context.Context, opts PurgeOptions) error { AppID: opts.AppID, ConnectionString: opts.ConnectionString, TableName: opts.TableName, - Filter: Filter{ - Terminal: true, - }, + Filter: filter, }) if err != nil { return err diff --git a/pkg/workflow/purge_test.go b/pkg/workflow/purge_test.go new file mode 100644 index 000000000..392ac2099 --- /dev/null +++ b/pkg/workflow/purge_test.go @@ -0,0 +1,124 @@ +/* +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "testing" + "time" + + "github.com/dapr/kit/ptr" + "github.com/stretchr/testify/assert" +) + +func TestPurgeOptions_AllFilterStatus(t *testing.T) { + t.Run("AllFilterStatus sets filter status instead of terminal", func(t *testing.T) { + opts := PurgeOptions{ + AllOlderThan: ptr.Of(time.Now()), + AllFilterStatus: ptr.Of("COMPLETED"), + } + + assert.NotNil(t, opts.AllFilterStatus) + assert.Equal(t, "COMPLETED", *opts.AllFilterStatus) + assert.NotNil(t, opts.AllOlderThan) + }) + + t.Run("nil AllFilterStatus defaults to terminal filtering", func(t *testing.T) { + opts := PurgeOptions{ + AllOlderThan: ptr.Of(time.Now()), + } + + assert.Nil(t, opts.AllFilterStatus) + }) + + t.Run("AllFilterStatus with various statuses", func(t *testing.T) { + statuses := []string{ + "RUNNING", "COMPLETED", "CONTINUED_AS_NEW", + "FAILED", "CANCELED", "TERMINATED", + "PENDING", "SUSPENDED", + } + + for _, status := range statuses { + t.Run(status, func(t *testing.T) { + opts := PurgeOptions{ + AllOlderThan: ptr.Of(time.Now()), + AllFilterStatus: ptr.Of(status), + } + assert.Equal(t, status, *opts.AllFilterStatus) + }) + } + }) +} + +func TestPurgeFilterBuildLogic(t *testing.T) { + // Tests the filter construction logic that Purge uses internally. + // When AllFilterStatus is set, Terminal should be false and Status should + // be the provided value. When AllFilterStatus is nil, Terminal should be true. + + t.Run("without AllFilterStatus uses terminal filter", func(t *testing.T) { + opts := PurgeOptions{ + All: true, + } + + filter := Filter{Terminal: true} + if opts.AllFilterStatus != nil { + filter.Terminal = false + filter.Status = opts.AllFilterStatus + } + + assert.True(t, filter.Terminal) + assert.Nil(t, filter.Status) + }) + + t.Run("with AllFilterStatus uses status filter", func(t *testing.T) { + opts := PurgeOptions{ + AllOlderThan: ptr.Of(time.Now()), + AllFilterStatus: ptr.Of("FAILED"), + } + + filter := Filter{Terminal: true} + if opts.AllFilterStatus != nil { + filter.Terminal = false + filter.Status = opts.AllFilterStatus + } + + assert.False(t, filter.Terminal) + assert.NotNil(t, filter.Status) + assert.Equal(t, "FAILED", *filter.Status) + }) + + t.Run("AllOlderThan filters by created time", func(t *testing.T) { + now := time.Now() + cutoff := now.Add(-1 * time.Hour) + opts := PurgeOptions{ + AllOlderThan: &cutoff, + AllFilterStatus: ptr.Of("COMPLETED"), + } + + // Simulate the filtering logic from Purge + list := []*ListOutputWide{ + {InstanceID: "old-1", Created: now.Add(-2 * time.Hour), RuntimeStatus: "COMPLETED"}, + {InstanceID: "new-1", Created: now.Add(-30 * time.Minute), RuntimeStatus: "COMPLETED"}, + {InstanceID: "old-2", Created: now.Add(-3 * time.Hour), RuntimeStatus: "COMPLETED"}, + } + + var toPurge []string + for _, w := range list { + if w.Created.Before(*opts.AllOlderThan) { + toPurge = append(toPurge, w.InstanceID) + } + } + + assert.Equal(t, []string{"old-1", "old-2"}, toPurge) + }) +} diff --git a/tests/e2e/standalone/workflow_test.go b/tests/e2e/standalone/workflow_test.go index a4698e7d1..1fe3cdcab 100644 --- a/tests/e2e/standalone/workflow_test.go +++ b/tests/e2e/standalone/workflow_test.go @@ -319,6 +319,93 @@ func TestWorkflowPurge(t *testing.T) { assert.NotContains(t, output, "purge-older") }) + t.Run("purge older than with filter status only purges matching status", func(t *testing.T) { + // Create one workflow that will complete (SimpleWorkflow) and one that + // will be terminated (LongWorkflow) so they have different statuses. + output, err := cmdWorkflowRun(appID, "SimpleWorkflow", + "--instance-id=filter-completed") + require.NoError(t, err, output) + + output, err = cmdWorkflowRun(appID, "LongWorkflow", + "--instance-id=filter-terminated") + require.NoError(t, err, output) + + time.Sleep(3 * time.Second) + + // Terminate one so we have two different terminal statuses. + _, err = cmdWorkflowTerminate(appID, "filter-terminated") + require.NoError(t, err) + + time.Sleep(5 * time.Second) + + // Purge only COMPLETED instances older than 1s. + output, err = cmdWorkflowPurge(appID, redisConnString, + "--all-older-than", "1s", "--all-filter-status", "COMPLETED") + require.NoError(t, err, output) + assert.Contains(t, output, `Purged workflow instance "filter-completed"`) + assert.NotContains(t, output, "filter-terminated") + + // Verify filter-terminated still exists. + output, err = cmdWorkflowList(appID, "-o", "json", redisConnString) + require.NoError(t, err, output) + assert.NotContains(t, output, "filter-completed") + assert.Contains(t, output, "filter-terminated") + + // Clean up the remaining instance. + _, _ = cmdWorkflowPurge(appID, "filter-terminated") + }) + + t.Run("purge older than with filter status TERMINATED", func(t *testing.T) { + output, err := cmdWorkflowRun(appID, "SimpleWorkflow", + "--instance-id=fs-completed") + require.NoError(t, err, output) + + output, err = cmdWorkflowRun(appID, "LongWorkflow", + "--instance-id=fs-terminated") + require.NoError(t, err, output) + + time.Sleep(3 * time.Second) + + _, err = cmdWorkflowTerminate(appID, "fs-terminated") + require.NoError(t, err) + + time.Sleep(5 * time.Second) + + // Purge only TERMINATED instances older than 1s. + output, err = cmdWorkflowPurge(appID, redisConnString, + "--all-older-than", "1s", "--all-filter-status", "TERMINATED") + require.NoError(t, err, output) + assert.Contains(t, output, `Purged workflow instance "fs-terminated"`) + assert.NotContains(t, output, "fs-completed") + + // Verify fs-completed still exists. + output, err = cmdWorkflowList(appID, "-o", "json", redisConnString) + require.NoError(t, err, output) + assert.Contains(t, output, "fs-completed") + assert.NotContains(t, output, "fs-terminated") + + // Clean up. + _, _ = cmdWorkflowPurge(appID, "fs-completed") + }) + + t.Run("all-filter-status without all-older-than errors", func(t *testing.T) { + _, err := cmdWorkflowPurge(appID, redisConnString, + "--all-filter-status", "COMPLETED") + require.Error(t, err) + }) + + t.Run("all-filter-status with invalid value errors", func(t *testing.T) { + _, err := cmdWorkflowPurge(appID, redisConnString, + "--all-older-than", "1s", "--all-filter-status", "INVALID") + require.Error(t, err) + }) + + t.Run("all-filter-status with all flag errors", func(t *testing.T) { + _, err := cmdWorkflowPurge(appID, redisConnString, + "--all", "--all-filter-status", "COMPLETED") + require.Error(t, err) + }) + t.Run("also purge scheduler", func(t *testing.T) { output, err := cmdWorkflowRun(appID, "EventWorkflow", "--instance-id=also-sched") From bfcf4038a5bf2597ee39ac8031cc7f1c44145a87 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Tue, 17 Mar 2026 13:03:53 +0000 Subject: [PATCH 2/3] Review comments Signed-off-by: joshvanl --- cmd/workflow/purge.go | 11 +-- cmd/workflow/purge_test.go | 25 ++----- cmd/workflow/workflow.go | 11 +-- pkg/workflow/list.go | 12 +++ pkg/workflow/purge.go | 22 ++++-- pkg/workflow/purge_test.go | 104 ++++---------------------- tests/e2e/standalone/workflow_test.go | 10 ++- 7 files changed, 58 insertions(+), 137 deletions(-) diff --git a/cmd/workflow/purge.go b/cmd/workflow/purge.go index cc193540c..1261445f5 100644 --- a/cmd/workflow/purge.go +++ b/cmd/workflow/purge.go @@ -89,16 +89,7 @@ var PurgeCmd = &cobra.Command{ }, } -var purgeFilterStatuses = []string{ - "RUNNING", - "COMPLETED", - "CONTINUED_AS_NEW", - "FAILED", - "CANCELED", - "TERMINATED", - "PENDING", - "SUSPENDED", -} +var purgeFilterStatuses = workflow.RuntimeStatuses func init() { PurgeCmd.Flags().StringVar(&flagPurgeOlderThan, "all-older-than", "", "Purge workflow instances older than the specified Go duration or timestamp, e.g., '24h' or '2023-01-02T15:04:05Z'.") diff --git a/cmd/workflow/purge_test.go b/cmd/workflow/purge_test.go index d0d2f10ff..7ea2c2f75 100644 --- a/cmd/workflow/purge_test.go +++ b/cmd/workflow/purge_test.go @@ -16,21 +16,13 @@ package workflow import ( "testing" + "github.com/dapr/cli/pkg/workflow" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestPurgeFilterStatuses(t *testing.T) { - expected := []string{ - "RUNNING", - "COMPLETED", - "CONTINUED_AS_NEW", - "FAILED", - "CANCELED", - "TERMINATED", - "PENDING", - "SUSPENDED", - } - assert.Equal(t, expected, purgeFilterStatuses) + assert.Equal(t, workflow.RuntimeStatuses, purgeFilterStatuses) } func TestPurgeCmdFlags(t *testing.T) { @@ -42,13 +34,10 @@ func TestPurgeCmdFlags(t *testing.T) { }) t.Run("all-filter-status and all are mutually exclusive", func(t *testing.T) { - // The mutual exclusivity is registered via MarkFlagsMutuallyExclusive. - // We verify the flag group exists by checking that the command - // has both flags and that they are correctly configured. - allFlag := PurgeCmd.Flags().Lookup("all") - assert.NotNil(t, allFlag) - filterFlag := PurgeCmd.Flags().Lookup("all-filter-status") - assert.NotNil(t, filterFlag) + WorkflowCmd.SetArgs([]string{"purge", "--all", "--all-filter-status", "COMPLETED"}) + err := WorkflowCmd.Execute() + require.Error(t, err) + assert.Contains(t, err.Error(), "if any flags in the group [all-filter-status all] are set none of the others can be") }) t.Run("all-older-than flag is registered", func(t *testing.T) { diff --git a/cmd/workflow/workflow.go b/cmd/workflow/workflow.go index bbfb91ff5..f16de974d 100644 --- a/cmd/workflow/workflow.go +++ b/cmd/workflow/workflow.go @@ -148,16 +148,7 @@ func filterCmd(cmd *cobra.Command) *workflow.Filter { status string maxAge string - listStatuses = []string{ - "RUNNING", - "COMPLETED", - "CONTINUED_AS_NEW", - "FAILED", - "CANCELED", - "TERMINATED", - "PENDING", - "SUSPENDED", - } + listStatuses = workflow.RuntimeStatuses ) cmd.Flags().StringVarP(&name, "filter-name", "w", "", "Filter only the workflows with the given name") diff --git a/pkg/workflow/list.go b/pkg/workflow/list.go index 477512e3c..106341f3a 100644 --- a/pkg/workflow/list.go +++ b/pkg/workflow/list.go @@ -44,6 +44,18 @@ type Filter struct { Terminal bool } +// RuntimeStatuses is the canonical list of workflow runtime statuses. +var RuntimeStatuses = []string{ + "RUNNING", + "COMPLETED", + "CONTINUED_AS_NEW", + "FAILED", + "CANCELED", + "TERMINATED", + "PENDING", + "SUSPENDED", +} + type ListOutputShort struct { Namespace string `csv:"-" json:"namespace" yaml:"namespace"` AppID string `csv:"-" json:"appID" yaml:"appID"` diff --git a/pkg/workflow/purge.go b/pkg/workflow/purge.go index 5cdacca5f..7a9c220c1 100644 --- a/pkg/workflow/purge.go +++ b/pkg/workflow/purge.go @@ -40,19 +40,27 @@ type PurgeOptions struct { TableName *string } +// BuildPurgeFilter constructs the Filter used when listing workflow instances +// for bulk purge. When AllFilterStatus is set, it filters by that status +// instead of using the default terminal-only filter. +func BuildPurgeFilter(allFilterStatus *string) Filter { + filter := Filter{ + Terminal: true, + } + if allFilterStatus != nil { + filter.Terminal = false + filter.Status = allFilterStatus + } + return filter +} + func Purge(ctx context.Context, opts PurgeOptions) error { var toPurge []string if len(opts.InstanceIDs) > 0 { toPurge = opts.InstanceIDs } else { - filter := Filter{ - Terminal: true, - } - if opts.AllFilterStatus != nil { - filter.Terminal = false - filter.Status = opts.AllFilterStatus - } + filter := BuildPurgeFilter(opts.AllFilterStatus) var list []*ListOutputWide var err error diff --git a/pkg/workflow/purge_test.go b/pkg/workflow/purge_test.go index 392ac2099..28a44a8ce 100644 --- a/pkg/workflow/purge_test.go +++ b/pkg/workflow/purge_test.go @@ -15,110 +15,34 @@ package workflow import ( "testing" - "time" "github.com/dapr/kit/ptr" "github.com/stretchr/testify/assert" ) -func TestPurgeOptions_AllFilterStatus(t *testing.T) { - t.Run("AllFilterStatus sets filter status instead of terminal", func(t *testing.T) { - opts := PurgeOptions{ - AllOlderThan: ptr.Of(time.Now()), - AllFilterStatus: ptr.Of("COMPLETED"), - } - - assert.NotNil(t, opts.AllFilterStatus) - assert.Equal(t, "COMPLETED", *opts.AllFilterStatus) - assert.NotNil(t, opts.AllOlderThan) - }) - - t.Run("nil AllFilterStatus defaults to terminal filtering", func(t *testing.T) { - opts := PurgeOptions{ - AllOlderThan: ptr.Of(time.Now()), - } - - assert.Nil(t, opts.AllFilterStatus) - }) - - t.Run("AllFilterStatus with various statuses", func(t *testing.T) { - statuses := []string{ - "RUNNING", "COMPLETED", "CONTINUED_AS_NEW", - "FAILED", "CANCELED", "TERMINATED", - "PENDING", "SUSPENDED", - } - - for _, status := range statuses { - t.Run(status, func(t *testing.T) { - opts := PurgeOptions{ - AllOlderThan: ptr.Of(time.Now()), - AllFilterStatus: ptr.Of(status), - } - assert.Equal(t, status, *opts.AllFilterStatus) - }) - } - }) -} - -func TestPurgeFilterBuildLogic(t *testing.T) { - // Tests the filter construction logic that Purge uses internally. - // When AllFilterStatus is set, Terminal should be false and Status should - // be the provided value. When AllFilterStatus is nil, Terminal should be true. - - t.Run("without AllFilterStatus uses terminal filter", func(t *testing.T) { - opts := PurgeOptions{ - All: true, - } - - filter := Filter{Terminal: true} - if opts.AllFilterStatus != nil { - filter.Terminal = false - filter.Status = opts.AllFilterStatus - } - +func TestBuildPurgeFilter(t *testing.T) { + t.Run("nil status uses terminal filter", func(t *testing.T) { + filter := BuildPurgeFilter(nil) assert.True(t, filter.Terminal) assert.Nil(t, filter.Status) }) - t.Run("with AllFilterStatus uses status filter", func(t *testing.T) { - opts := PurgeOptions{ - AllOlderThan: ptr.Of(time.Now()), - AllFilterStatus: ptr.Of("FAILED"), - } - - filter := Filter{Terminal: true} - if opts.AllFilterStatus != nil { - filter.Terminal = false - filter.Status = opts.AllFilterStatus - } - + t.Run("with status uses status filter instead of terminal", func(t *testing.T) { + filter := BuildPurgeFilter(ptr.Of("FAILED")) assert.False(t, filter.Terminal) assert.NotNil(t, filter.Status) assert.Equal(t, "FAILED", *filter.Status) }) - t.Run("AllOlderThan filters by created time", func(t *testing.T) { - now := time.Now() - cutoff := now.Add(-1 * time.Hour) - opts := PurgeOptions{ - AllOlderThan: &cutoff, - AllFilterStatus: ptr.Of("COMPLETED"), - } - - // Simulate the filtering logic from Purge - list := []*ListOutputWide{ - {InstanceID: "old-1", Created: now.Add(-2 * time.Hour), RuntimeStatus: "COMPLETED"}, - {InstanceID: "new-1", Created: now.Add(-30 * time.Minute), RuntimeStatus: "COMPLETED"}, - {InstanceID: "old-2", Created: now.Add(-3 * time.Hour), RuntimeStatus: "COMPLETED"}, - } - - var toPurge []string - for _, w := range list { - if w.Created.Before(*opts.AllOlderThan) { - toPurge = append(toPurge, w.InstanceID) - } - } + t.Run("with COMPLETED status", func(t *testing.T) { + filter := BuildPurgeFilter(ptr.Of("COMPLETED")) + assert.False(t, filter.Terminal) + assert.Equal(t, "COMPLETED", *filter.Status) + }) - assert.Equal(t, []string{"old-1", "old-2"}, toPurge) + t.Run("with RUNNING status", func(t *testing.T) { + filter := BuildPurgeFilter(ptr.Of("RUNNING")) + assert.False(t, filter.Terminal) + assert.Equal(t, "RUNNING", *filter.Status) }) } diff --git a/tests/e2e/standalone/workflow_test.go b/tests/e2e/standalone/workflow_test.go index 1fe3cdcab..6a0bf2c3d 100644 --- a/tests/e2e/standalone/workflow_test.go +++ b/tests/e2e/standalone/workflow_test.go @@ -352,7 +352,10 @@ func TestWorkflowPurge(t *testing.T) { assert.Contains(t, output, "filter-terminated") // Clean up the remaining instance. - _, _ = cmdWorkflowPurge(appID, "filter-terminated") + t.Cleanup(func() { + _, err := cmdWorkflowPurge(appID, "filter-terminated") + assert.NoError(t, err) + }) }) t.Run("purge older than with filter status TERMINATED", func(t *testing.T) { @@ -385,7 +388,10 @@ func TestWorkflowPurge(t *testing.T) { assert.NotContains(t, output, "fs-terminated") // Clean up. - _, _ = cmdWorkflowPurge(appID, "fs-completed") + t.Cleanup(func() { + _, err := cmdWorkflowPurge(appID, "fs-completed") + assert.NoError(t, err) + }) }) t.Run("all-filter-status without all-older-than errors", func(t *testing.T) { From c2e76b29f305b05d0506308eec2f8ba99c692bbe Mon Sep 17 00:00:00 2001 From: joshvanl Date: Wed, 18 Mar 2026 17:07:39 +0000 Subject: [PATCH 3/3] Adds review comments Signed-off-by: joshvanl --- cmd/workflow/purge.go | 3 ++ cmd/workflow/purge_test.go | 9 +++- pkg/workflow/list.go | 10 +++++ pkg/workflow/purge_test.go | 2 +- tests/e2e/standalone/workflow_test.go | 64 ++++++++++++++++++++++++--- 5 files changed, 80 insertions(+), 8 deletions(-) diff --git a/cmd/workflow/purge.go b/cmd/workflow/purge.go index 1261445f5..415003091 100644 --- a/cmd/workflow/purge.go +++ b/cmd/workflow/purge.go @@ -104,6 +104,9 @@ func init() { if !slices.Contains(purgeFilterStatuses, flagPurgeFilterStatus) { return errors.New("invalid value for --all-filter-status. Supported values are " + strings.Join(purgeFilterStatuses, ", ")) } + if !slices.Contains(workflow.TerminalStatuses, flagPurgeFilterStatus) && !flagPurgeForce { + return errors.New("--force is required when using --all-filter-status with a non-terminal status (" + flagPurgeFilterStatus + ")") + } } if pre != nil { return pre(cmd, args) diff --git a/cmd/workflow/purge_test.go b/cmd/workflow/purge_test.go index 7ea2c2f75..9a30ae9ed 100644 --- a/cmd/workflow/purge_test.go +++ b/cmd/workflow/purge_test.go @@ -1,5 +1,5 @@ /* -Copyright 2025 The Dapr Authors +Copyright 2026 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -44,4 +44,11 @@ func TestPurgeCmdFlags(t *testing.T) { f := PurgeCmd.Flags().Lookup("all-older-than") assert.NotNil(t, f) }) + + t.Run("non-terminal status without force errors", func(t *testing.T) { + WorkflowCmd.SetArgs([]string{"purge", "--all-older-than", "1s", "--all-filter-status", "RUNNING"}) + err := WorkflowCmd.Execute() + require.Error(t, err) + assert.Contains(t, err.Error(), "--force is required when using --all-filter-status with a non-terminal status") + }) } diff --git a/pkg/workflow/list.go b/pkg/workflow/list.go index 106341f3a..9fbbd5376 100644 --- a/pkg/workflow/list.go +++ b/pkg/workflow/list.go @@ -56,6 +56,16 @@ var RuntimeStatuses = []string{ "SUSPENDED", } +// TerminalStatuses is the subset of RuntimeStatuses that represent terminal +// (completed) workflow states. +var TerminalStatuses = []string{ + "COMPLETED", + "CONTINUED_AS_NEW", + "FAILED", + "CANCELED", + "TERMINATED", +} + type ListOutputShort struct { Namespace string `csv:"-" json:"namespace" yaml:"namespace"` AppID string `csv:"-" json:"appID" yaml:"appID"` diff --git a/pkg/workflow/purge_test.go b/pkg/workflow/purge_test.go index 28a44a8ce..120d17a08 100644 --- a/pkg/workflow/purge_test.go +++ b/pkg/workflow/purge_test.go @@ -1,5 +1,5 @@ /* -Copyright 2025 The Dapr Authors +Copyright 2026 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/tests/e2e/standalone/workflow_test.go b/tests/e2e/standalone/workflow_test.go index 6a0bf2c3d..5a308be76 100644 --- a/tests/e2e/standalone/workflow_test.go +++ b/tests/e2e/standalone/workflow_test.go @@ -330,13 +330,39 @@ func TestWorkflowPurge(t *testing.T) { "--instance-id=filter-terminated") require.NoError(t, err, output) - time.Sleep(3 * time.Second) + // Wait for SimpleWorkflow to complete. + require.EventuallyWithT(t, func(c *assert.CollectT) { + out, err := cmdWorkflowList(appID, redisConnString, "-o", "json") + require.NoError(c, err) + var list []map[string]interface{} + require.NoError(c, json.Unmarshal([]byte(out), &list)) + for _, item := range list { + if item["instanceID"] == "filter-completed" { + assert.Equal(c, "COMPLETED", item["runtimeStatus"]) + return + } + } + assert.Fail(c, "filter-completed not found") + }, 30*time.Second, 500*time.Millisecond) // Terminate one so we have two different terminal statuses. _, err = cmdWorkflowTerminate(appID, "filter-terminated") require.NoError(t, err) - time.Sleep(5 * time.Second) + // Wait for the terminate to take effect. + require.EventuallyWithT(t, func(c *assert.CollectT) { + out, err := cmdWorkflowList(appID, redisConnString, "-o", "json") + require.NoError(c, err) + var list []map[string]interface{} + require.NoError(c, json.Unmarshal([]byte(out), &list)) + for _, item := range list { + if item["instanceID"] == "filter-terminated" { + assert.Equal(c, "TERMINATED", item["runtimeStatus"]) + return + } + } + assert.Fail(c, "filter-terminated not found") + }, 30*time.Second, 500*time.Millisecond) // Purge only COMPLETED instances older than 1s. output, err = cmdWorkflowPurge(appID, redisConnString, @@ -353,7 +379,7 @@ func TestWorkflowPurge(t *testing.T) { // Clean up the remaining instance. t.Cleanup(func() { - _, err := cmdWorkflowPurge(appID, "filter-terminated") + _, err := cmdWorkflowPurge(appID, redisConnString, "filter-terminated") assert.NoError(t, err) }) }) @@ -367,12 +393,38 @@ func TestWorkflowPurge(t *testing.T) { "--instance-id=fs-terminated") require.NoError(t, err, output) - time.Sleep(3 * time.Second) + // Wait for SimpleWorkflow to complete. + require.EventuallyWithT(t, func(c *assert.CollectT) { + out, err := cmdWorkflowList(appID, redisConnString, "-o", "json") + require.NoError(c, err) + var list []map[string]interface{} + require.NoError(c, json.Unmarshal([]byte(out), &list)) + for _, item := range list { + if item["instanceID"] == "fs-completed" { + assert.Equal(c, "COMPLETED", item["runtimeStatus"]) + return + } + } + assert.Fail(c, "fs-completed not found") + }, 30*time.Second, 500*time.Millisecond) _, err = cmdWorkflowTerminate(appID, "fs-terminated") require.NoError(t, err) - time.Sleep(5 * time.Second) + // Wait for the terminate to take effect. + require.EventuallyWithT(t, func(c *assert.CollectT) { + out, err := cmdWorkflowList(appID, redisConnString, "-o", "json") + require.NoError(c, err) + var list []map[string]interface{} + require.NoError(c, json.Unmarshal([]byte(out), &list)) + for _, item := range list { + if item["instanceID"] == "fs-terminated" { + assert.Equal(c, "TERMINATED", item["runtimeStatus"]) + return + } + } + assert.Fail(c, "fs-terminated not found") + }, 30*time.Second, 500*time.Millisecond) // Purge only TERMINATED instances older than 1s. output, err = cmdWorkflowPurge(appID, redisConnString, @@ -389,7 +441,7 @@ func TestWorkflowPurge(t *testing.T) { // Clean up. t.Cleanup(func() { - _, err := cmdWorkflowPurge(appID, "fs-completed") + _, err := cmdWorkflowPurge(appID, redisConnString, "fs-completed") assert.NoError(t, err) }) })