Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make time out in delete assets to be configurable #82

Merged
merged 12 commits into from
Sep 2, 2024
6 changes: 1 addition & 5 deletions cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,7 @@ type Config struct {
// Column search excluded keyword list
ColSearchExclusionKeywords string `yaml:"col_search_excluded_keywords" mapstructure:"col_search_excluded_keywords"`

Asset Asset `mapstructure:"asset"`
}

type Asset struct {
AdditionalTypes []string `mapstructure:"additional_types"`
Asset asset.Config `mapstructure:"asset"`
}

func LoadConfig() (*Config, error) {
Expand Down
5 changes: 5 additions & 0 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func serverMigrateCommand(cfg *Config) *cobra.Command {
}

func runServer(ctx context.Context, cfg *Config) error {
if err := cfg.Asset.Validate(); err != nil {
return err
}

logger := initLogger(cfg.LogLevel)
logger.Info("compass starting", "version", Version)

Expand Down Expand Up @@ -154,6 +158,7 @@ func runServer(ctx context.Context, cfg *Config) error {
LineageRepo: lineageRepository,
Worker: wrkr,
Logger: logger,
Config: cfg.Asset,
})
defer cancel()

Expand Down
1 change: 1 addition & 0 deletions compass.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,4 @@ client:
asset:
haveiss marked this conversation as resolved.
Show resolved Hide resolved
additional_types:
- fact_source
delete_assets_timeout: 5m
21 changes: 21 additions & 0 deletions core/asset/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package asset

import (
"errors"
"time"
)

var errDeleteAssetsTimeoutIsZero = errors.New("delete assets timeout must greater than 0 second")

type Config struct {
AdditionalTypes []string `mapstructure:"additional_types"`
DeleteAssetsTimeout time.Duration `mapstructure:"delete_assets_timeout" default:"5m"`
}

func (c *Config) Validate() error {
if c.DeleteAssetsTimeout == 0 {
return errDeleteAssetsTimeoutIsZero
}

return nil
}
13 changes: 8 additions & 5 deletions core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Service struct {
lineageRepository LineageRepository
worker Worker
logger log.Logger
config Config
cancelFnList []func()

assetOpCounter metric.Int64Counter
Expand All @@ -40,6 +41,7 @@ type ServiceDeps struct {
LineageRepo LineageRepository
Worker Worker
Logger log.Logger
Config Config
}

func NewService(deps ServiceDeps) (service *Service, cancel func()) {
Expand All @@ -55,6 +57,7 @@ func NewService(deps ServiceDeps) (service *Service, cancel func()) {
lineageRepository: deps.LineageRepo,
worker: deps.Worker,
logger: deps.Logger,
config: deps.Config,
cancelFnList: make([]func(), 0),

assetOpCounter: assetOpCounter,
Expand Down Expand Up @@ -149,8 +152,8 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest)
return 0, err
}

if !request.DryRun {
newCtx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
if !request.DryRun && total > 0 {
newCtx, cancel := context.WithTimeout(context.Background(), s.config.DeleteAssetsTimeout)
s.cancelFnList = append(s.cancelFnList, cancel)
go s.executeDeleteAssets(newCtx, deleteSQLExpr)
}
Expand All @@ -161,16 +164,16 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest)
func (s *Service) executeDeleteAssets(ctx context.Context, deleteSQLExpr queryexpr.ExprStr) {
deletedURNs, err := s.assetRepository.DeleteByQueryExpr(ctx, deleteSQLExpr)
if err != nil {
s.logger.Error("Asset deletion failed, skipping Elasticsearch and Lineage deletions. Err:", err)
s.logger.Error("asset deletion failed, skipping elasticsearch and lineage deletions", "err:", err)
return
}

if err := s.lineageRepository.DeleteByURNs(ctx, deletedURNs); err != nil {
s.logger.Error("Error occurred during Lineage deletion:", err)
s.logger.Error("error occurred during lineage deletion", "err:", err)
}

if err := s.worker.EnqueueDeleteAssetsByQueryExprJob(ctx, deleteSQLExpr.String()); err != nil {
s.logger.Error("Error occurred during Elasticsearch deletion:", err)
s.logger.Error("error occurred during elasticsearch deletion", "err:", err)
}
}

Expand Down
Loading