Skip to content

Commit 2ca3cad

Browse files
authored
feat(otredis): add cleanup command to redis module (#239)
* feat: add redis commands * feat: add redis commands * fix: cleanup code * chore: update minio env * fix: sync bug * fix: use waitGroup instead of errGroup for simplicity
1 parent 897906d commit 2ca3cad

File tree

4 files changed

+281
-2
lines changed

4 files changed

+281
-2
lines changed

.github/workflows/go.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ jobs:
6363
minio:
6464
image: bitnami/minio:latest
6565
env:
66-
MINIO_ACCESS_KEY: minioadmin
67-
MINIO_SECRET_KEY: minioadmin
66+
MINIO_ROOT_USER: minioadmin
67+
MINIO_ROOT_PASSWORD: minioadmin
6868
ports:
6969
- 9000:9000
7070
options: --name minio-server

otredis/cleanup_command.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
package otredis
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"os"
8+
"os/signal"
9+
"path/filepath"
10+
"strconv"
11+
"sync"
12+
"sync/atomic"
13+
"syscall"
14+
"time"
15+
16+
"github.com/DoNewsCode/core/logging"
17+
"github.com/go-kit/log"
18+
"github.com/spf13/cobra"
19+
)
20+
21+
// NewCleanupCommand creates a new command to clean up unused redis keys.
22+
func NewCleanupCommand(maker Maker, baseLogger log.Logger) *cobra.Command {
23+
const cursorFileName = "redis-scan-cursor"
24+
var (
25+
logger = logging.WithLevel(baseLogger)
26+
cursorPath string
27+
batchSize int64
28+
prefix string
29+
instance string
30+
)
31+
32+
type stats struct {
33+
scanned uint64
34+
removed uint64
35+
}
36+
37+
removeKeys := func(ctx context.Context, cursor *uint64, stats *stats, threshold time.Duration) error {
38+
var (
39+
keys []string
40+
err error
41+
)
42+
43+
logger.Info(fmt.Sprintf("scanning redis keys from cursor %d", *cursor))
44+
if err := os.WriteFile(cursorPath, []byte(fmt.Sprintf("%d", *cursor)), os.ModePerm); err != nil {
45+
logger.Err("cannot store cursor to cursor location", err, nil)
46+
}
47+
48+
redisClient, err := maker.Make(instance)
49+
if err != nil {
50+
return fmt.Errorf("cannot find redis instance under the name of %s: %w", instance, err)
51+
}
52+
keys, *cursor, err = redisClient.Scan(ctx, *cursor, prefix+"*", batchSize).Result()
53+
if err != nil {
54+
return err
55+
}
56+
stats.scanned += uint64(len(keys))
57+
58+
var wg sync.WaitGroup
59+
for _, key := range keys {
60+
wg.Add(1)
61+
go func(key string) {
62+
idleTime, _ := redisClient.ObjectIdleTime(ctx, key).Result()
63+
if idleTime > threshold {
64+
logger.Info(fmt.Sprintf("removing %s from redis as it is %s old", key, idleTime))
65+
redisClient.Del(ctx, key)
66+
atomic.AddUint64(&stats.removed, 1)
67+
}
68+
wg.Done()
69+
}(key)
70+
}
71+
wg.Wait()
72+
return nil
73+
}
74+
75+
initCursor := func() (uint64, error) {
76+
cursorPath = filepath.Join(os.TempDir(), fmt.Sprintf("%s.%s.txt", cursorFileName, instance))
77+
if _, err := os.Stat(cursorPath); os.IsNotExist(err) {
78+
return 0, nil
79+
}
80+
f, err := os.Open(cursorPath)
81+
if err != nil {
82+
return 0, err
83+
}
84+
defer f.Close()
85+
lastCursor, err := io.ReadAll(f)
86+
if err != nil {
87+
return 0, err
88+
}
89+
cursor, err := strconv.ParseUint(string(lastCursor), 10, 64)
90+
if err != nil {
91+
return 0, err
92+
}
93+
return cursor, nil
94+
}
95+
96+
cmd := &cobra.Command{
97+
Use: "cleanup",
98+
Short: "clean up idle keys in redis",
99+
Args: cobra.ExactArgs(1),
100+
Long: `clean up idle keys in redis based on the last access time.`,
101+
RunE: func(cmd *cobra.Command, args []string) error {
102+
var (
103+
cursor uint64
104+
stats stats
105+
duration time.Duration
106+
)
107+
108+
defer func() {
109+
if stats.scanned == 0 {
110+
return
111+
}
112+
logger.Info(fmt.Sprintf("%d keys scanned, %d keys removed(%.2f%%)", stats.scanned, stats.removed, float64(stats.removed)/float64(stats.scanned)), nil)
113+
}()
114+
115+
duration, err := time.ParseDuration(args[0])
116+
if err != nil {
117+
return fmt.Errorf("first argument must be valid duration string, got %w", err)
118+
}
119+
120+
cursor, err = initCursor()
121+
if err != nil {
122+
return fmt.Errorf("error restoring cursor from file: %w", err)
123+
}
124+
125+
shutdown := make(chan os.Signal, 1)
126+
signal.Notify(shutdown, syscall.SIGINT, syscall.SIGTERM)
127+
128+
for {
129+
select {
130+
case <-cmd.Context().Done():
131+
return cmd.Context().Err()
132+
case <-shutdown:
133+
return nil
134+
default:
135+
if err := removeKeys(cmd.Context(), &cursor, &stats, duration); err != nil {
136+
return fmt.Errorf("error while removing keys: %w", err)
137+
}
138+
}
139+
if cursor == 0 {
140+
break
141+
}
142+
}
143+
logger.Info("redis key clean-up completed", nil)
144+
os.Remove(cursorPath)
145+
return nil
146+
},
147+
}
148+
149+
cmd.Flags().Int64VarP(&batchSize, "batchSize", "b", 100, "specify the redis scan batch size")
150+
cmd.Flags().StringVarP(&cursorPath, "cursorPath", "c", cursorPath, "specify the location to store the cursor, so that the next execution can continue from where it's left off.")
151+
cmd.Flags().StringVarP(&prefix, "prefix", "p", "", "specify the prefix of redis keys to be scanned")
152+
cmd.Flags().StringVarP(&instance, "instance", "i", "default", "specify the redis instance to be scanned")
153+
154+
return cmd
155+
}

otredis/module.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package otredis
2+
3+
import (
4+
"github.com/DoNewsCode/core/di"
5+
"github.com/go-kit/log"
6+
"github.com/spf13/cobra"
7+
)
8+
9+
// Module is the registration unit for package core. It provides redis command.
10+
type Module struct {
11+
maker Maker
12+
logger log.Logger
13+
}
14+
15+
// ModuleIn contains the input parameters needed for creating the new module.
16+
type ModuleIn struct {
17+
di.In
18+
19+
Maker Maker
20+
Logger log.Logger
21+
}
22+
23+
// New creates a Module.
24+
func New(in ModuleIn) Module {
25+
return Module{
26+
maker: in.Maker,
27+
logger: in.Logger,
28+
}
29+
}
30+
31+
// ProvideCommand provides redis commands.
32+
func (m Module) ProvideCommand(command *cobra.Command) {
33+
cleanupCmd := NewCleanupCommand(m.maker, m.logger)
34+
redisCmd := &cobra.Command{
35+
Use: "redis",
36+
Short: "manage redis",
37+
Long: "manage redis, such as cleaning up redis cache",
38+
}
39+
redisCmd.AddCommand(cleanupCmd)
40+
command.AddCommand(redisCmd)
41+
}

otredis/module_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package otredis
2+
3+
import (
4+
"context"
5+
"os"
6+
"strings"
7+
"testing"
8+
"time"
9+
10+
"github.com/DoNewsCode/core"
11+
"github.com/DoNewsCode/core/di"
12+
"github.com/spf13/cobra"
13+
"github.com/stretchr/testify/assert"
14+
)
15+
16+
func TestModule_ProvideCommand(t *testing.T) {
17+
if os.Getenv("REDIS_ADDR") == "" {
18+
t.Skip("set REDIS_ADDR to run TestModule_ProvideRunGroup")
19+
return
20+
}
21+
addrs := strings.Split(os.Getenv("REDIS_ADDR"), ",")
22+
23+
c := core.New(core.WithInline("redis.default.addrs", addrs))
24+
c.ProvideEssentials()
25+
c.Provide(di.Deps{
26+
provideRedisFactory(&providersOption{}),
27+
di.Bind(new(Factory), new(Maker)),
28+
})
29+
c.AddModuleFunc(New)
30+
rootCmd := cobra.Command{}
31+
c.ApplyRootCommand(&rootCmd)
32+
assert.True(t, rootCmd.HasSubCommands())
33+
34+
cases := []struct {
35+
name string
36+
args []string
37+
before func(t *testing.T, maker Maker)
38+
after func(t *testing.T, maker Maker)
39+
}{
40+
{
41+
"cleanup",
42+
[]string{"redis", "cleanup", "1ms"},
43+
func(t *testing.T, maker Maker) {
44+
client, _ := maker.Make("default")
45+
client.Set(context.Background(), "foo", "bar", 0)
46+
time.Sleep(time.Second)
47+
},
48+
func(t *testing.T, maker Maker) {
49+
client, _ := maker.Make("default")
50+
results, _ := client.Keys(context.Background(), "*").Result()
51+
assert.Empty(t, results)
52+
},
53+
},
54+
{
55+
"cleanup",
56+
[]string{"redis", "cleanup", "1ms", "-p", "bar"},
57+
func(t *testing.T, maker Maker) {
58+
client, _ := maker.Make("default")
59+
client.Set(context.Background(), "foo", "bar", 0)
60+
client.Set(context.Background(), "bar", "bar", 0)
61+
time.Sleep(time.Second)
62+
},
63+
func(t *testing.T, maker Maker) {
64+
client, _ := maker.Make("default")
65+
results, _ := client.Keys(context.Background(), "*").Result()
66+
assert.Len(t, results, 1)
67+
},
68+
},
69+
}
70+
71+
for _, cc := range cases {
72+
t.Run(cc.name, func(t *testing.T) {
73+
c.Invoke(func(maker Maker) {
74+
cc.before(t, maker)
75+
})
76+
rootCmd.SetArgs(cc.args)
77+
rootCmd.Execute()
78+
c.Invoke(func(maker Maker) {
79+
cc.after(t, maker)
80+
})
81+
})
82+
}
83+
}

0 commit comments

Comments
 (0)