-
Notifications
You must be signed in to change notification settings - Fork 669
/
Copy pathdelete_handler.go
109 lines (93 loc) · 2.96 KB
/
delete_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Copyright (c) Abstract Machines
// SPDX-License-Identifier: Apache-2.0
// The DeleteHandler is a cron job that runs periodically to delete users that have been marked as deleted
// for a certain period of time together with the user's policies from the auth service.
// The handler runs in a separate goroutine and checks for users that have been marked as deleted for a certain period of time.
// If the user has been marked as deleted for more than the specified period,
// the handler deletes the user's policies from the auth service and deletes the user from the database.
package users
import (
"context"
"log/slog"
"time"
grpcDomainsV1 "github.com/absmach/supermq/api/grpc/domains/v1"
svcerr "github.com/absmach/supermq/pkg/errors/service"
"github.com/absmach/supermq/pkg/policies"
)
const defLimit = uint64(100)
type handler struct {
users Repository
domains grpcDomainsV1.DomainsServiceClient
policies policies.Service
checkInterval time.Duration
deleteAfter time.Duration
logger *slog.Logger
}
func NewDeleteHandler(ctx context.Context, users Repository, policyService policies.Service, domainsClient grpcDomainsV1.DomainsServiceClient, defCheckInterval, deleteAfter time.Duration, logger *slog.Logger) {
handler := &handler{
users: users,
domains: domainsClient,
policies: policyService,
checkInterval: defCheckInterval,
deleteAfter: deleteAfter,
logger: logger,
}
go func() {
ticker := time.NewTicker(handler.checkInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
handler.handle(ctx)
}
}
}()
}
func (h *handler) handle(ctx context.Context) {
pm := Page{Limit: defLimit, Offset: 0, Status: DeletedStatus}
for {
dbUsers, err := h.users.RetrieveAll(ctx, pm)
if err != nil {
h.logger.Error("failed to retrieve users", slog.Any("error", err))
break
}
if dbUsers.Total == 0 {
break
}
for _, u := range dbUsers.Users {
if time.Since(u.UpdatedAt) < h.deleteAfter {
continue
}
deletedRes, err := h.domains.DeleteUserFromDomains(ctx, &grpcDomainsV1.DeleteUserReq{
Id: u.ID,
})
if err != nil {
h.logger.Error("failed to delete user from domains", slog.Any("error", err))
continue
}
if !deletedRes.Deleted {
h.logger.Error("failed to delete user from domains", slog.Any("error", svcerr.ErrAuthorization))
continue
}
req := policies.Policy{
Subject: u.ID,
SubjectType: policies.UserType,
}
if err := h.policies.DeletePolicyFilter(ctx, req); err != nil {
h.logger.Error("failed to delete user policies", slog.Any("error", err))
continue
}
if err := h.users.Delete(ctx, u.ID); err != nil {
h.logger.Error("failed to delete user", slog.Any("error", err))
continue
}
h.logger.Info("user deleted", slog.Group("user",
slog.String("id", u.ID),
slog.String("first_name", u.FirstName),
slog.String("last_name", u.LastName),
))
}
}
}