Skip to content

Commit be09caf

Browse files
luthfifahleviMuhammad Luthfi Fahlevi
andauthored
feat: make time out in delete assets to be configurable (#82)
* fix: resolve error message * feat: add config * fix: remove cycle import and set default timout * docs: update as the default value * feat: add validation for Assets config * feat: move validate code in upper runServer func * feat: change error message * feat: make err as var variable * feat: move asset config into service * feat: move asset config into core/asset * feat: update config * refactor: rename the variable --------- Co-authored-by: Muhammad Luthfi Fahlevi <[email protected]>
1 parent 54777a8 commit be09caf

File tree

5 files changed

+36
-10
lines changed

5 files changed

+36
-10
lines changed

cli/config.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,7 @@ type Config struct {
104104
// Column search excluded keyword list
105105
ColSearchExclusionKeywords string `yaml:"col_search_excluded_keywords" mapstructure:"col_search_excluded_keywords"`
106106

107-
Asset Asset `mapstructure:"asset"`
108-
}
109-
110-
type Asset struct {
111-
AdditionalTypes []string `mapstructure:"additional_types"`
107+
Asset asset.Config `mapstructure:"asset"`
112108
}
113109

114110
func LoadConfig() (*Config, error) {

cli/server.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ func serverMigrateCommand(cfg *Config) *cobra.Command {
8383
}
8484

8585
func runServer(ctx context.Context, cfg *Config) error {
86+
if err := cfg.Asset.Validate(); err != nil {
87+
return err
88+
}
89+
8690
logger := initLogger(cfg.LogLevel)
8791
logger.Info("compass starting", "version", Version)
8892

@@ -154,6 +158,7 @@ func runServer(ctx context.Context, cfg *Config) error {
154158
LineageRepo: lineageRepository,
155159
Worker: wrkr,
156160
Logger: logger,
161+
Config: cfg.Asset,
157162
})
158163
defer cancel()
159164

compass.yaml.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,4 @@ client:
7878
asset:
7979
additional_types:
8080
- fact_source
81+
delete_assets_timeout: 5m

core/asset/config.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package asset
2+
3+
import (
4+
"errors"
5+
"time"
6+
)
7+
8+
var errDeleteAssetsTimeoutIsZero = errors.New("delete assets timeout must greater than 0 second")
9+
10+
type Config struct {
11+
AdditionalTypes []string `mapstructure:"additional_types"`
12+
DeleteAssetsTimeout time.Duration `mapstructure:"delete_assets_timeout" default:"5m"`
13+
}
14+
15+
func (c *Config) Validate() error {
16+
if c.DeleteAssetsTimeout == 0 {
17+
return errDeleteAssetsTimeoutIsZero
18+
}
19+
20+
return nil
21+
}

core/asset/service.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ type Service struct {
1919
lineageRepository LineageRepository
2020
worker Worker
2121
logger log.Logger
22+
config Config
2223
cancelFnList []func()
2324

2425
assetOpCounter metric.Int64Counter
@@ -40,6 +41,7 @@ type ServiceDeps struct {
4041
LineageRepo LineageRepository
4142
Worker Worker
4243
Logger log.Logger
44+
Config Config
4345
}
4446

4547
func NewService(deps ServiceDeps) (service *Service, cancel func()) {
@@ -55,6 +57,7 @@ func NewService(deps ServiceDeps) (service *Service, cancel func()) {
5557
lineageRepository: deps.LineageRepo,
5658
worker: deps.Worker,
5759
logger: deps.Logger,
60+
config: deps.Config,
5861
cancelFnList: make([]func(), 0),
5962

6063
assetOpCounter: assetOpCounter,
@@ -149,8 +152,8 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest)
149152
return 0, err
150153
}
151154

152-
if !request.DryRun {
153-
newCtx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
155+
if !request.DryRun && total > 0 {
156+
newCtx, cancel := context.WithTimeout(context.Background(), s.config.DeleteAssetsTimeout)
154157
s.cancelFnList = append(s.cancelFnList, cancel)
155158
go s.executeDeleteAssets(newCtx, deleteSQLExpr)
156159
}
@@ -161,16 +164,16 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest)
161164
func (s *Service) executeDeleteAssets(ctx context.Context, deleteSQLExpr queryexpr.ExprStr) {
162165
deletedURNs, err := s.assetRepository.DeleteByQueryExpr(ctx, deleteSQLExpr)
163166
if err != nil {
164-
s.logger.Error("Asset deletion failed, skipping Elasticsearch and Lineage deletions. Err:", err)
167+
s.logger.Error("asset deletion failed, skipping elasticsearch and lineage deletions", "err:", err)
165168
return
166169
}
167170

168171
if err := s.lineageRepository.DeleteByURNs(ctx, deletedURNs); err != nil {
169-
s.logger.Error("Error occurred during Lineage deletion:", err)
172+
s.logger.Error("error occurred during lineage deletion", "err:", err)
170173
}
171174

172175
if err := s.worker.EnqueueDeleteAssetsByQueryExprJob(ctx, deleteSQLExpr.String()); err != nil {
173-
s.logger.Error("Error occurred during Elasticsearch deletion:", err)
176+
s.logger.Error("error occurred during elasticsearch deletion", "err:", err)
174177
}
175178
}
176179

0 commit comments

Comments
 (0)