Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions cmd/workflow/purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,21 @@ package workflow

import (
"errors"
"slices"
"strings"

"github.com/dapr/cli/pkg/workflow"
"github.com/dapr/kit/signals"
"github.com/spf13/cobra"
)

var (
flagPurgeOlderThan string
flagPurgeAll bool
flagPurgeConn *connFlag
flagPurgeForce bool
schedulerNamespace string
flagPurgeOlderThan string
flagPurgeAll bool
flagPurgeConn *connFlag
flagPurgeForce bool
flagPurgeFilterStatus string
schedulerNamespace string
)

var PurgeCmd = &cobra.Command{
Expand All @@ -41,6 +44,9 @@ var PurgeCmd = &cobra.Command{
return errors.New("no arguments are accepted when using purge all flags")
}
default:
if cmd.Flags().Changed("all-filter-status") {
return errors.New("--all-filter-status can only be used with --all-older-than")
}
if len(args) == 0 {
return errors.New("one or more workflow instance ID arguments are required when not using purge all flags")
}
Expand Down Expand Up @@ -75,14 +81,38 @@ var PurgeCmd = &cobra.Command{
}
}

if cmd.Flags().Changed("all-filter-status") {
opts.AllFilterStatus = &flagPurgeFilterStatus
}

return workflow.Purge(ctx, opts)
},
}

var purgeFilterStatuses = workflow.RuntimeStatuses

func init() {
PurgeCmd.Flags().StringVar(&flagPurgeOlderThan, "all-older-than", "", "Purge workflow instances older than the specified Go duration or timestamp, e.g., '24h' or '2023-01-02T15:04:05Z'.")
PurgeCmd.Flags().BoolVar(&flagPurgeAll, "all", false, "Purge all workflow instances in a terminal state. Use with caution.")
PurgeCmd.Flags().StringVar(&flagPurgeFilterStatus, "all-filter-status", "", "Filter purge to only workflow instances with the given runtime status. Must be used with --all-older-than. One of "+strings.Join(purgeFilterStatuses, ", "))
PurgeCmd.MarkFlagsMutuallyExclusive("all-older-than", "all")
PurgeCmd.MarkFlagsMutuallyExclusive("all-filter-status", "all")

pre := PurgeCmd.PreRunE
PurgeCmd.PreRunE = func(cmd *cobra.Command, args []string) error {
if cmd.Flags().Changed("all-filter-status") {
if !slices.Contains(purgeFilterStatuses, flagPurgeFilterStatus) {
return errors.New("invalid value for --all-filter-status. Supported values are " + strings.Join(purgeFilterStatuses, ", "))
}
if !slices.Contains(workflow.TerminalStatuses, flagPurgeFilterStatus) && !flagPurgeForce {
return errors.New("--force is required when using --all-filter-status with a non-terminal status (" + flagPurgeFilterStatus + ")")
}
}
if pre != nil {
return pre(cmd, args)
}
return nil
}
PurgeCmd.Flags().BoolVar(&flagPurgeForce, "force", false, "force will force a purge of a workflow, regardless of its current runtime state, or whether an active worker can process it, the backend will attempt to delete it anyway. This necessarily means the purging is executed out side of the workflow state machine, and therefore, can lead to corrupt state or broken workflow execution. Usage of this should _only_ be used when you know the workflow is not being currently processed. It is highly recommended to avoid using this flag unless absolutely necessary.")

PurgeCmd.Flags().StringVar(&schedulerNamespace, "scheduler-namespace", "dapr-system", "Kubernetes namespace where the scheduler is deployed, only relevant if --kubernetes is set")
Expand Down
54 changes: 54 additions & 0 deletions cmd/workflow/purge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
Copyright 2026 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package workflow

import (
"testing"

"github.com/dapr/cli/pkg/workflow"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPurgeFilterStatuses(t *testing.T) {
assert.Equal(t, workflow.RuntimeStatuses, purgeFilterStatuses)
}

func TestPurgeCmdFlags(t *testing.T) {
t.Run("all-filter-status flag is registered", func(t *testing.T) {
f := PurgeCmd.Flags().Lookup("all-filter-status")
assert.NotNil(t, f)
assert.Equal(t, "string", f.Value.Type())
assert.Contains(t, f.Usage, "Must be used with --all-older-than")
})

t.Run("all-filter-status and all are mutually exclusive", func(t *testing.T) {
WorkflowCmd.SetArgs([]string{"purge", "--all", "--all-filter-status", "COMPLETED"})
err := WorkflowCmd.Execute()
require.Error(t, err)
assert.Contains(t, err.Error(), "if any flags in the group [all-filter-status all] are set none of the others can be")
})

t.Run("all-older-than flag is registered", func(t *testing.T) {
f := PurgeCmd.Flags().Lookup("all-older-than")
assert.NotNil(t, f)
})

t.Run("non-terminal status without force errors", func(t *testing.T) {
WorkflowCmd.SetArgs([]string{"purge", "--all-older-than", "1s", "--all-filter-status", "RUNNING"})
err := WorkflowCmd.Execute()
require.Error(t, err)
assert.Contains(t, err.Error(), "--force is required when using --all-filter-status with a non-terminal status")
})
}
11 changes: 1 addition & 10 deletions cmd/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,7 @@ func filterCmd(cmd *cobra.Command) *workflow.Filter {
status string
maxAge string

listStatuses = []string{
"RUNNING",
"COMPLETED",
"CONTINUED_AS_NEW",
"FAILED",
"CANCELED",
"TERMINATED",
"PENDING",
"SUSPENDED",
}
listStatuses = workflow.RuntimeStatuses
)

cmd.Flags().StringVarP(&name, "filter-name", "w", "", "Filter only the workflows with the given name")
Expand Down
22 changes: 22 additions & 0 deletions pkg/workflow/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,28 @@ type Filter struct {
Terminal bool
}

// RuntimeStatuses is the canonical list of workflow runtime statuses.
var RuntimeStatuses = []string{
"RUNNING",
"COMPLETED",
"CONTINUED_AS_NEW",
"FAILED",
"CANCELED",
"TERMINATED",
"PENDING",
"SUSPENDED",
}

// TerminalStatuses is the subset of RuntimeStatuses that represent terminal
// (completed) workflow states.
var TerminalStatuses = []string{
"COMPLETED",
"CONTINUED_AS_NEW",
"FAILED",
"CANCELED",
"TERMINATED",
}

type ListOutputShort struct {
Namespace string `csv:"-" json:"namespace" yaml:"namespace"`
AppID string `csv:"-" json:"appID" yaml:"appID"`
Expand Down
21 changes: 18 additions & 3 deletions pkg/workflow/purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,36 @@ type PurgeOptions struct {
AppID string
InstanceIDs []string
AllOlderThan *time.Time
AllFilterStatus *string
All bool
Force bool

ConnectionString *string
TableName *string
}

// BuildPurgeFilter constructs the Filter used when listing workflow instances
// for bulk purge. When AllFilterStatus is set, it filters by that status
// instead of using the default terminal-only filter.
func BuildPurgeFilter(allFilterStatus *string) Filter {
filter := Filter{
Terminal: true,
}
if allFilterStatus != nil {
filter.Terminal = false
filter.Status = allFilterStatus
}
return filter
}

func Purge(ctx context.Context, opts PurgeOptions) error {
var toPurge []string

if len(opts.InstanceIDs) > 0 {
toPurge = opts.InstanceIDs
} else {
filter := BuildPurgeFilter(opts.AllFilterStatus)

var list []*ListOutputWide
var err error
list, err = ListWide(ctx, ListOptions{
Expand All @@ -53,9 +70,7 @@ func Purge(ctx context.Context, opts PurgeOptions) error {
AppID: opts.AppID,
ConnectionString: opts.ConnectionString,
TableName: opts.TableName,
Filter: Filter{
Terminal: true,
},
Filter: filter,
})
if err != nil {
return err
Expand Down
48 changes: 48 additions & 0 deletions pkg/workflow/purge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Copyright 2026 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package workflow

import (
"testing"

"github.com/dapr/kit/ptr"
"github.com/stretchr/testify/assert"
)

func TestBuildPurgeFilter(t *testing.T) {
t.Run("nil status uses terminal filter", func(t *testing.T) {
filter := BuildPurgeFilter(nil)
assert.True(t, filter.Terminal)
assert.Nil(t, filter.Status)
})

t.Run("with status uses status filter instead of terminal", func(t *testing.T) {
filter := BuildPurgeFilter(ptr.Of("FAILED"))
assert.False(t, filter.Terminal)
assert.NotNil(t, filter.Status)
assert.Equal(t, "FAILED", *filter.Status)
})

t.Run("with COMPLETED status", func(t *testing.T) {
filter := BuildPurgeFilter(ptr.Of("COMPLETED"))
assert.False(t, filter.Terminal)
assert.Equal(t, "COMPLETED", *filter.Status)
})

t.Run("with RUNNING status", func(t *testing.T) {
filter := BuildPurgeFilter(ptr.Of("RUNNING"))
assert.False(t, filter.Terminal)
assert.Equal(t, "RUNNING", *filter.Status)
})
}
Loading
Loading