Skip to content

Commit 6df32ee

Browse files
author
Sumeet Rai
committed
added support for sync.Map to cancel and delete function within context
1 parent c0186c9 commit 6df32ee

File tree

3 files changed

+54
-24
lines changed

3 files changed

+54
-24
lines changed

cli/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,14 +152,15 @@ func runServer(ctx context.Context, cfg *Config) error {
152152
}
153153
}()
154154

155-
assetService := asset.NewService(asset.ServiceDeps{
155+
assetService, cancel := asset.NewService(asset.ServiceDeps{
156156
AssetRepo: assetRepository,
157157
DiscoveryRepo: discoveryRepository,
158158
LineageRepo: lineageRepository,
159159
Worker: wrkr,
160160
Logger: logger,
161161
Config: cfg.Asset,
162162
})
163+
defer cancel()
163164

164165
// init discussion
165166
discussionRepository, err := postgres.NewDiscussionRepository(pgClient, 0)

core/asset/service.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package asset
33
import (
44
"context"
55
"fmt"
6+
"sync"
67
"time"
78

89
"github.com/google/uuid"
@@ -20,8 +21,8 @@ type Service struct {
2021
worker Worker
2122
logger log.Logger
2223
config Config
23-
24-
assetOpCounter metric.Int64Counter
24+
cancelFnMap sync.Map
25+
assetOpCounter metric.Int64Counter
2526
}
2627

2728
//go:generate mockery --name=Worker -r --case underscore --with-expecter --structname Worker --filename worker_mock.go --output=./mocks
@@ -43,7 +44,7 @@ type ServiceDeps struct {
4344
Config Config
4445
}
4546

46-
func NewService(deps ServiceDeps) (service *Service) {
47+
func NewService(deps ServiceDeps) (service *Service, cancel func()) {
4748
assetOpCounter, err := otel.Meter("github.com/goto/compass/core/asset").
4849
Int64Counter("compass.asset.operation")
4950
if err != nil {
@@ -57,11 +58,18 @@ func NewService(deps ServiceDeps) (service *Service) {
5758
worker: deps.Worker,
5859
logger: deps.Logger,
5960
config: deps.Config,
60-
61-
assetOpCounter: assetOpCounter,
61+
cancelFnMap: sync.Map{},
62+
assetOpCounter: assetOpCounter,
6263
}
6364

64-
return newService
65+
return newService, func() {
66+
newService.cancelFnMap.Range(func(_, value interface{}) bool {
67+
if cancelFn, ok := value.(func()); ok {
68+
cancelFn()
69+
}
70+
return true
71+
})
72+
}
6573
}
6674

6775
func (s *Service) GetAllAssets(ctx context.Context, flt Filter, withTotal bool) ([]Asset, uint32, error) {
@@ -148,15 +156,22 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest)
148156

149157
if !request.DryRun && total > 0 {
150158
newCtx, cancel := context.WithTimeout(context.Background(), s.config.DeleteAssetsTimeout)
151-
go func() {
152-
defer cancel()
159+
cancelID := generateUniqueCancelID()
160+
s.cancelFnMap.Store(cancelID, cancel)
161+
go func(id string) {
153162
s.executeDeleteAssets(newCtx, deleteSQLExpr)
154-
}()
163+
cancel()
164+
s.cancelFnMap.Delete(id)
165+
}(cancelID)
155166
}
156167

157168
return uint32(total), nil
158169
}
159170

171+
func generateUniqueCancelID() string {
172+
return uuid.New().String()
173+
}
174+
160175
func (s *Service) executeDeleteAssets(ctx context.Context, deleteSQLExpr queryexpr.ExprStr) {
161176
deletedURNs, err := s.assetRepository.DeleteByQueryExpr(ctx, deleteSQLExpr)
162177
if err != nil {

core/asset/service_test.go

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,12 @@ func TestService_GetAllAssets(t *testing.T) {
9090
tc.Setup(ctx, mockAssetRepo, mockDiscoveryRepo, mockLineageRepo)
9191
}
9292

93-
svc := asset.NewService(asset.ServiceDeps{
93+
svc, cancel := asset.NewService(asset.ServiceDeps{
9494
AssetRepo: mockAssetRepo,
9595
DiscoveryRepo: mockDiscoveryRepo,
9696
LineageRepo: mockLineageRepo,
9797
})
98+
defer cancel()
9899

99100
got, cnt, err := svc.GetAllAssets(ctx, tc.Filter, tc.WithTotal)
100101
if err != nil && errors.Is(tc.Err, err) {
@@ -160,7 +161,8 @@ func TestService_GetTypes(t *testing.T) {
160161
tc.Setup(ctx, mockAssetRepo)
161162
}
162163

163-
svc := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo})
164+
svc, cancel := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo})
165+
defer cancel()
164166

165167
got, err := svc.GetTypes(ctx, tc.Filter)
166168
if err != nil && errors.Is(tc.Err, err) {
@@ -245,12 +247,13 @@ func TestService_UpsertAsset(t *testing.T) {
245247
tc.Setup(ctx, assetRepo, discoveryRepo, lineageRepo)
246248
}
247249

248-
svc := asset.NewService(asset.ServiceDeps{
250+
svc, cancel := asset.NewService(asset.ServiceDeps{
249251
AssetRepo: assetRepo,
250252
DiscoveryRepo: discoveryRepo,
251253
LineageRepo: lineageRepo,
252254
Worker: workermanager.NewInSituWorker(workermanager.Deps{DiscoveryRepo: discoveryRepo}),
253255
})
256+
defer cancel()
254257

255258
rid, err := svc.UpsertAsset(ctx, tc.Asset, tc.Upstreams, tc.Downstreams)
256259
if tc.Err != nil {
@@ -309,12 +312,13 @@ func TestService_UpsertAssetWithoutLineage(t *testing.T) {
309312
tc.Setup(ctx, assetRepo, discoveryRepo)
310313
}
311314

312-
svc := asset.NewService(asset.ServiceDeps{
315+
svc, cancel := asset.NewService(asset.ServiceDeps{
313316
AssetRepo: assetRepo,
314317
DiscoveryRepo: discoveryRepo,
315318
LineageRepo: mocks.NewLineageRepository(t),
316319
Worker: workermanager.NewInSituWorker(workermanager.Deps{DiscoveryRepo: discoveryRepo}),
317320
})
321+
defer cancel()
318322

319323
rid, err := svc.UpsertAssetWithoutLineage(ctx, tc.Asset)
320324
if tc.Err != nil {
@@ -436,12 +440,13 @@ func TestService_DeleteAsset(t *testing.T) {
436440
tc.Setup(ctx, assetRepo, discoveryRepo, lineageRepo)
437441
}
438442

439-
svc := asset.NewService(asset.ServiceDeps{
443+
svc, cancel := asset.NewService(asset.ServiceDeps{
440444
AssetRepo: assetRepo,
441445
DiscoveryRepo: discoveryRepo,
442446
LineageRepo: lineageRepo,
443447
Worker: workermanager.NewInSituWorker(workermanager.Deps{DiscoveryRepo: discoveryRepo}),
444448
})
449+
defer cancel()
445450

446451
err := svc.DeleteAsset(ctx, tc.ID)
447452
if err != nil && errors.Is(tc.Err, err) {
@@ -519,12 +524,13 @@ func TestService_DeleteAssets(t *testing.T) {
519524
tc.Setup(ctx, assetRepo, worker, lineageRepo)
520525
}
521526

522-
svc := asset.NewService(asset.ServiceDeps{
527+
svc, cancel := asset.NewService(asset.ServiceDeps{
523528
AssetRepo: assetRepo,
524529
DiscoveryRepo: discoveryRepo,
525530
LineageRepo: lineageRepo,
526531
Worker: worker,
527532
})
533+
defer cancel()
528534

529535
affectedRows, err := svc.DeleteAssets(ctx, tc.Request)
530536
time.Sleep(1 * time.Second)
@@ -646,11 +652,12 @@ func TestService_GetAssetByID(t *testing.T) {
646652
tc.Setup(ctx, mockAssetRepo)
647653
}
648654

649-
svc := asset.NewService(asset.ServiceDeps{
655+
svc, cancel := asset.NewService(asset.ServiceDeps{
650656
AssetRepo: mockAssetRepo,
651657
DiscoveryRepo: mocks.NewDiscoveryRepository(t),
652658
LineageRepo: mocks.NewLineageRepository(t),
653659
})
660+
defer cancel()
654661

655662
actual, err := svc.GetAssetByID(ctx, tc.ID)
656663
if tc.Expected != nil {
@@ -755,11 +762,12 @@ func TestService_GetAssetByIDWithoutProbes(t *testing.T) {
755762
tc.Setup(ctx, mockAssetRepo)
756763
}
757764

758-
svc := asset.NewService(asset.ServiceDeps{
765+
svc, cancel := asset.NewService(asset.ServiceDeps{
759766
AssetRepo: mockAssetRepo,
760767
DiscoveryRepo: mocks.NewDiscoveryRepository(t),
761768
LineageRepo: mocks.NewLineageRepository(t),
762769
})
770+
defer cancel()
763771

764772
actual, err := svc.GetAssetByIDWithoutProbes(ctx, tc.ID)
765773
if tc.Expected != nil {
@@ -828,11 +836,12 @@ func TestService_GetAssetByVersion(t *testing.T) {
828836
tc.Setup(ctx, mockAssetRepo)
829837
}
830838

831-
svc := asset.NewService(asset.ServiceDeps{
839+
svc, cancel := asset.NewService(asset.ServiceDeps{
832840
AssetRepo: mockAssetRepo,
833841
DiscoveryRepo: mocks.NewDiscoveryRepository(t),
834842
LineageRepo: mocks.NewLineageRepository(t),
835843
})
844+
defer cancel()
836845

837846
_, err := svc.GetAssetByVersion(ctx, tc.ID, "v0.0.2")
838847
if tc.ExpectedErr != nil {
@@ -881,11 +890,12 @@ func TestService_GetAssetVersionHistory(t *testing.T) {
881890
tc.Setup(ctx, mockAssetRepo)
882891
}
883892

884-
svc := asset.NewService(asset.ServiceDeps{
893+
svc, cancel := asset.NewService(asset.ServiceDeps{
885894
AssetRepo: mockAssetRepo,
886895
DiscoveryRepo: mockDiscoveryRepo,
887896
LineageRepo: mockLineageRepo,
888897
})
898+
defer cancel()
889899

890900
_, err := svc.GetAssetVersionHistory(ctx, asset.Filter{}, tc.ID)
891901
if err != nil && errors.Is(tc.Err, err) {
@@ -1067,11 +1077,12 @@ func TestService_GetLineage(t *testing.T) {
10671077
tc.Setup(ctx, mockAssetRepo, mockDiscoveryRepo, mockLineageRepo)
10681078
}
10691079

1070-
svc := asset.NewService(asset.ServiceDeps{
1080+
svc, cancel := asset.NewService(asset.ServiceDeps{
10711081
AssetRepo: mockAssetRepo,
10721082
DiscoveryRepo: mockDiscoveryRepo,
10731083
LineageRepo: mockLineageRepo,
10741084
})
1085+
defer cancel()
10751086

10761087
actual, err := svc.GetLineage(ctx, "urn-source-1", tc.Query)
10771088
if tc.Err == nil {
@@ -1141,11 +1152,12 @@ func TestService_SearchSuggestGroupAssets(t *testing.T) {
11411152
tc.Setup(ctx, mockDiscoveryRepo)
11421153
}
11431154

1144-
svc := asset.NewService(asset.ServiceDeps{
1155+
svc, cancel := asset.NewService(asset.ServiceDeps{
11451156
AssetRepo: mockAssetRepo,
11461157
DiscoveryRepo: mockDiscoveryRepo,
11471158
LineageRepo: mockLineageRepo,
11481159
})
1160+
defer cancel()
11491161

11501162
_, err := svc.SearchAssets(ctx, asset.SearchConfig{})
11511163
if err != nil && !assert.Equal(t, tc.ErrSearch, err) {
@@ -1177,7 +1189,8 @@ func TestService_CreateAssetProbe(t *testing.T) {
11771189
mockAssetRepo := mocks.NewAssetRepository(t)
11781190
mockAssetRepo.EXPECT().AddProbe(ctx, assetURN, &probe).Return(nil)
11791191

1180-
svc := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo})
1192+
svc, cancel := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo})
1193+
defer cancel()
11811194

11821195
err := svc.AddProbe(ctx, assetURN, &probe)
11831196
assert.NoError(t, err)
@@ -1189,7 +1202,8 @@ func TestService_CreateAssetProbe(t *testing.T) {
11891202
mockAssetRepo := mocks.NewAssetRepository(t)
11901203
mockAssetRepo.EXPECT().AddProbe(ctx, assetURN, &probe).Return(expectedErr)
11911204

1192-
svc := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo})
1205+
svc, cancel := asset.NewService(asset.ServiceDeps{AssetRepo: mockAssetRepo})
1206+
defer cancel()
11931207

11941208
err := svc.AddProbe(ctx, assetURN, &probe)
11951209
assert.Equal(t, expectedErr, err)

0 commit comments

Comments
 (0)