Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make time out in delete assets to be configurable #82

Merged
merged 12 commits into from
Sep 2, 2024
4 changes: 3 additions & 1 deletion cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"os"
"time"

"github.com/MakeNowJust/heredoc"
"github.com/goto/compass/core/asset"
Expand Down Expand Up @@ -108,7 +109,8 @@ type Config struct {
}

type Asset struct {
AdditionalTypes []string `mapstructure:"additional_types"`
AdditionalTypes []string `mapstructure:"additional_types"`
DeleteAssetsTimeout time.Duration `mapstructure:"delete_assets_timeout"`
haveiss marked this conversation as resolved.
Show resolved Hide resolved
}

func LoadConfig() (*Config, error) {
Expand Down
1 change: 1 addition & 0 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func runServer(ctx context.Context, cfg *Config) error {
LineageRepo: lineageRepository,
Worker: wrkr,
Logger: logger,
AssetConfig: cfg.Asset,
})
defer cancel()

Expand Down
1 change: 1 addition & 0 deletions compass.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,4 @@ client:
asset:
haveiss marked this conversation as resolved.
Show resolved Hide resolved
additional_types:
- fact_source
delete_assets_timeout: 10m
14 changes: 9 additions & 5 deletions core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/google/uuid"
"github.com/goto/compass/cli"
"github.com/goto/compass/pkg/queryexpr"
"github.com/goto/salt/log"
"go.opentelemetry.io/otel"
Expand All @@ -19,6 +20,7 @@ type Service struct {
lineageRepository LineageRepository
worker Worker
logger log.Logger
assetConfig cli.Asset
cancelFnList []func()

assetOpCounter metric.Int64Counter
Expand All @@ -40,6 +42,7 @@ type ServiceDeps struct {
LineageRepo LineageRepository
Worker Worker
Logger log.Logger
AssetConfig cli.Asset
}

func NewService(deps ServiceDeps) (service *Service, cancel func()) {
Expand All @@ -55,6 +58,7 @@ func NewService(deps ServiceDeps) (service *Service, cancel func()) {
lineageRepository: deps.LineageRepo,
worker: deps.Worker,
logger: deps.Logger,
assetConfig: deps.AssetConfig,
cancelFnList: make([]func(), 0),

assetOpCounter: assetOpCounter,
Expand Down Expand Up @@ -149,8 +153,8 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest)
return 0, err
}

if !request.DryRun {
newCtx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
if !request.DryRun && total > 0 {
newCtx, cancel := context.WithTimeout(context.Background(), s.assetConfig.DeleteAssetsTimeout)
s.cancelFnList = append(s.cancelFnList, cancel)
go s.executeDeleteAssets(newCtx, deleteSQLExpr)
}
Expand All @@ -161,16 +165,16 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest)
func (s *Service) executeDeleteAssets(ctx context.Context, deleteSQLExpr queryexpr.ExprStr) {
deletedURNs, err := s.assetRepository.DeleteByQueryExpr(ctx, deleteSQLExpr)
if err != nil {
s.logger.Error("Asset deletion failed, skipping Elasticsearch and Lineage deletions. Err:", err)
s.logger.Error("asset deletion failed, skipping elasticsearch and lineage deletions", "err:", err)
return
}

if err := s.lineageRepository.DeleteByURNs(ctx, deletedURNs); err != nil {
s.logger.Error("Error occurred during Lineage deletion:", err)
s.logger.Error("error occurred during lineage deletion", "err:", err)
}

if err := s.worker.EnqueueDeleteAssetsByQueryExprJob(ctx, deleteSQLExpr.String()); err != nil {
s.logger.Error("Error occurred during Elasticsearch deletion:", err)
s.logger.Error("error occurred during elasticsearch deletion", "err:", err)
}
}

Expand Down
Loading