Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make time out in delete assets to be configurable #82

Merged
merged 12 commits into from
Sep 2, 2024
14 changes: 13 additions & 1 deletion cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"os"
"time"

"github.com/MakeNowJust/heredoc"
"github.com/goto/compass/core/asset"
Expand All @@ -21,6 +22,8 @@ import (

const configFlag = "config"

var errDeleteAssetsTimeoutIsZero = errors.New("delete assets timeout must greater than 0 second")

func configCommand(cfg *Config) *cobra.Command {
cmd := &cobra.Command{
Use: "config <command>",
Expand Down Expand Up @@ -108,7 +111,16 @@ type Config struct {
}

type Asset struct {
AdditionalTypes []string `mapstructure:"additional_types"`
AdditionalTypes []string `mapstructure:"additional_types"`
DeleteAssetsTimeout time.Duration `mapstructure:"delete_assets_timeout" default:"5m"`
irainia marked this conversation as resolved.
Show resolved Hide resolved
}

func (a *Asset) Validate() error {
if a.DeleteAssetsTimeout == 0 {
return errDeleteAssetsTimeoutIsZero
}

return nil
}

func LoadConfig() (*Config, error) {
Expand Down
15 changes: 10 additions & 5 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func serverMigrateCommand(cfg *Config) *cobra.Command {
}

func runServer(ctx context.Context, cfg *Config) error {
if err := cfg.Asset.Validate(); err != nil {
return err
}

logger := initLogger(cfg.LogLevel)
logger.Info("compass starting", "version", Version)

Expand Down Expand Up @@ -149,11 +153,12 @@ func runServer(ctx context.Context, cfg *Config) error {
}()

assetService, cancel := asset.NewService(asset.ServiceDeps{
AssetRepo: assetRepository,
DiscoveryRepo: discoveryRepository,
LineageRepo: lineageRepository,
Worker: wrkr,
Logger: logger,
AssetRepo: assetRepository,
DiscoveryRepo: discoveryRepository,
LineageRepo: lineageRepository,
Worker: wrkr,
Logger: logger,
DeleteAssetsTimeout: cfg.Asset.DeleteAssetsTimeout,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's pass the cfg altogether

})
defer cancel()

Expand Down
1 change: 1 addition & 0 deletions compass.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,4 @@ client:
asset:
haveiss marked this conversation as resolved.
Show resolved Hide resolved
additional_types:
- fact_source
delete_assets_timeout: 5m
23 changes: 13 additions & 10 deletions core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Service struct {
lineageRepository LineageRepository
worker Worker
logger log.Logger
deleteAssetsTimeout time.Duration
cancelFnList []func()

assetOpCounter metric.Int64Counter
Expand All @@ -35,11 +36,12 @@ type Worker interface {
}

type ServiceDeps struct {
AssetRepo Repository
DiscoveryRepo DiscoveryRepository
LineageRepo LineageRepository
Worker Worker
Logger log.Logger
AssetRepo Repository
DiscoveryRepo DiscoveryRepository
LineageRepo LineageRepository
Worker Worker
Logger log.Logger
DeleteAssetsTimeout time.Duration
}

func NewService(deps ServiceDeps) (service *Service, cancel func()) {
Expand All @@ -55,6 +57,7 @@ func NewService(deps ServiceDeps) (service *Service, cancel func()) {
lineageRepository: deps.LineageRepo,
worker: deps.Worker,
logger: deps.Logger,
deleteAssetsTimeout: deps.DeleteAssetsTimeout,
cancelFnList: make([]func(), 0),

assetOpCounter: assetOpCounter,
Expand Down Expand Up @@ -149,8 +152,8 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest)
return 0, err
}

if !request.DryRun {
newCtx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
if !request.DryRun && total > 0 {
newCtx, cancel := context.WithTimeout(context.Background(), s.deleteAssetsTimeout)
s.cancelFnList = append(s.cancelFnList, cancel)
go s.executeDeleteAssets(newCtx, deleteSQLExpr)
}
Expand All @@ -161,16 +164,16 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest)
func (s *Service) executeDeleteAssets(ctx context.Context, deleteSQLExpr queryexpr.ExprStr) {
deletedURNs, err := s.assetRepository.DeleteByQueryExpr(ctx, deleteSQLExpr)
if err != nil {
s.logger.Error("Asset deletion failed, skipping Elasticsearch and Lineage deletions. Err:", err)
s.logger.Error("asset deletion failed, skipping elasticsearch and lineage deletions", "err:", err)
return
}

if err := s.lineageRepository.DeleteByURNs(ctx, deletedURNs); err != nil {
s.logger.Error("Error occurred during Lineage deletion:", err)
s.logger.Error("error occurred during lineage deletion", "err:", err)
}

if err := s.worker.EnqueueDeleteAssetsByQueryExprJob(ctx, deleteSQLExpr.String()); err != nil {
s.logger.Error("Error occurred during Elasticsearch deletion:", err)
s.logger.Error("error occurred during elasticsearch deletion", "err:", err)
}
}

Expand Down
Loading