From 0860625f6b4ff9b94c5f6f64e0433bc904873038 Mon Sep 17 00:00:00 2001 From: Muhammad Luthfi Fahlevi Date: Mon, 2 Sep 2024 13:45:54 +0700 Subject: [PATCH 01/12] fix: resolve error message --- core/asset/service.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/asset/service.go b/core/asset/service.go index 05406436..d6b1b561 100644 --- a/core/asset/service.go +++ b/core/asset/service.go @@ -149,8 +149,10 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest) return 0, err } - if !request.DryRun { - newCtx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + if !request.DryRun && total > 0 { + // TODO: make timeout configurable + timeout := 5 * time.Minute + newCtx, cancel := context.WithTimeout(context.Background(), timeout) s.cancelFnList = append(s.cancelFnList, cancel) go s.executeDeleteAssets(newCtx, deleteSQLExpr) } @@ -161,16 +163,16 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest) func (s *Service) executeDeleteAssets(ctx context.Context, deleteSQLExpr queryexpr.ExprStr) { deletedURNs, err := s.assetRepository.DeleteByQueryExpr(ctx, deleteSQLExpr) if err != nil { - s.logger.Error("Asset deletion failed, skipping Elasticsearch and Lineage deletions. Err:", err) + s.logger.Error("asset deletion failed, skipping elasticsearch and lineage deletions", "err:", err) return } if err := s.lineageRepository.DeleteByURNs(ctx, deletedURNs); err != nil { - s.logger.Error("Error occurred during Lineage deletion:", err) + s.logger.Error("error occurred during lineage deletion", "err:", err) } if err := s.worker.EnqueueDeleteAssetsByQueryExprJob(ctx, deleteSQLExpr.String()); err != nil { - s.logger.Error("Error occurred during Elasticsearch deletion:", err) + s.logger.Error("error occurred during elasticsearch deletion", "err:", err) } } From 715011a07f6b6ef0bf23465e8b94420d2d875a15 Mon Sep 17 00:00:00 2001 From: Muhammad Luthfi Fahlevi Date: Mon, 2 Sep 2024 15:33:12 +0700 Subject: [PATCH 02/12] feat: add config --- cli/config.go | 4 +++- cli/server.go | 1 + compass.yaml.example | 1 + core/asset/service.go | 8 +++++--- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/cli/config.go b/cli/config.go index 75c66100..32c7ab5e 100644 --- a/cli/config.go +++ b/cli/config.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "os" + "time" "github.com/MakeNowJust/heredoc" "github.com/goto/compass/core/asset" @@ -108,7 +109,8 @@ type Config struct { } type Asset struct { - AdditionalTypes []string `mapstructure:"additional_types"` + AdditionalTypes []string `mapstructure:"additional_types"` + DeleteAssetsTimeout time.Duration `mapstructure:"delete_assets_timeout"` } func LoadConfig() (*Config, error) { diff --git a/cli/server.go b/cli/server.go index f3070021..d779bd89 100644 --- a/cli/server.go +++ b/cli/server.go @@ -154,6 +154,7 @@ func runServer(ctx context.Context, cfg *Config) error { LineageRepo: lineageRepository, Worker: wrkr, Logger: logger, + AssetConfig: cfg.Asset, }) defer cancel() diff --git a/compass.yaml.example b/compass.yaml.example index 9d723980..40418db1 100644 --- a/compass.yaml.example +++ b/compass.yaml.example @@ -78,3 +78,4 @@ client: asset: additional_types: - fact_source + delete_assets_timeout: 10m \ No newline at end of file diff --git a/core/asset/service.go b/core/asset/service.go index d6b1b561..d8d589f1 100644 --- a/core/asset/service.go +++ b/core/asset/service.go @@ -6,6 +6,7 @@ import ( "time" "github.com/google/uuid" + "github.com/goto/compass/cli" "github.com/goto/compass/pkg/queryexpr" "github.com/goto/salt/log" "go.opentelemetry.io/otel" @@ -19,6 +20,7 @@ type Service struct { lineageRepository LineageRepository worker Worker logger log.Logger + assetConfig cli.Asset cancelFnList []func() assetOpCounter metric.Int64Counter @@ -40,6 +42,7 @@ type ServiceDeps struct { LineageRepo LineageRepository Worker Worker Logger log.Logger + AssetConfig cli.Asset } func NewService(deps ServiceDeps) (service *Service, cancel func()) { @@ -55,6 +58,7 @@ func NewService(deps ServiceDeps) (service *Service, cancel func()) { lineageRepository: deps.LineageRepo, worker: deps.Worker, logger: deps.Logger, + assetConfig: deps.AssetConfig, cancelFnList: make([]func(), 0), assetOpCounter: assetOpCounter, @@ -150,9 +154,7 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest) } if !request.DryRun && total > 0 { - // TODO: make timeout configurable - timeout := 5 * time.Minute - newCtx, cancel := context.WithTimeout(context.Background(), timeout) + newCtx, cancel := context.WithTimeout(context.Background(), s.assetConfig.DeleteAssetsTimeout) s.cancelFnList = append(s.cancelFnList, cancel) go s.executeDeleteAssets(newCtx, deleteSQLExpr) } From 9ee9390b84ddcbc2aad2ea4ed60cfa943aa7a3f7 Mon Sep 17 00:00:00 2001 From: Muhammad Luthfi Fahlevi Date: Mon, 2 Sep 2024 15:48:23 +0700 Subject: [PATCH 03/12] fix: remove cycle import and set default timout --- cli/config.go | 2 +- cli/server.go | 12 ++++++------ core/asset/service.go | 19 +++++++++---------- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/cli/config.go b/cli/config.go index 32c7ab5e..b81642e7 100644 --- a/cli/config.go +++ b/cli/config.go @@ -110,7 +110,7 @@ type Config struct { type Asset struct { AdditionalTypes []string `mapstructure:"additional_types"` - DeleteAssetsTimeout time.Duration `mapstructure:"delete_assets_timeout"` + DeleteAssetsTimeout time.Duration `mapstructure:"delete_assets_timeout" default:"5m"` } func LoadConfig() (*Config, error) { diff --git a/cli/server.go b/cli/server.go index d779bd89..4ee43270 100644 --- a/cli/server.go +++ b/cli/server.go @@ -149,12 +149,12 @@ func runServer(ctx context.Context, cfg *Config) error { }() assetService, cancel := asset.NewService(asset.ServiceDeps{ - AssetRepo: assetRepository, - DiscoveryRepo: discoveryRepository, - LineageRepo: lineageRepository, - Worker: wrkr, - Logger: logger, - AssetConfig: cfg.Asset, + AssetRepo: assetRepository, + DiscoveryRepo: discoveryRepository, + LineageRepo: lineageRepository, + Worker: wrkr, + Logger: logger, + DeleteAssetsTimeout: cfg.Asset.DeleteAssetsTimeout, }) defer cancel() diff --git a/core/asset/service.go b/core/asset/service.go index d8d589f1..d5e668d3 100644 --- a/core/asset/service.go +++ b/core/asset/service.go @@ -6,7 +6,6 @@ import ( "time" "github.com/google/uuid" - "github.com/goto/compass/cli" "github.com/goto/compass/pkg/queryexpr" "github.com/goto/salt/log" "go.opentelemetry.io/otel" @@ -20,7 +19,7 @@ type Service struct { lineageRepository LineageRepository worker Worker logger log.Logger - assetConfig cli.Asset + deleteAssetsTimeout time.Duration cancelFnList []func() assetOpCounter metric.Int64Counter @@ -37,12 +36,12 @@ type Worker interface { } type ServiceDeps struct { - AssetRepo Repository - DiscoveryRepo DiscoveryRepository - LineageRepo LineageRepository - Worker Worker - Logger log.Logger - AssetConfig cli.Asset + AssetRepo Repository + DiscoveryRepo DiscoveryRepository + LineageRepo LineageRepository + Worker Worker + Logger log.Logger + DeleteAssetsTimeout time.Duration } func NewService(deps ServiceDeps) (service *Service, cancel func()) { @@ -58,7 +57,7 @@ func NewService(deps ServiceDeps) (service *Service, cancel func()) { lineageRepository: deps.LineageRepo, worker: deps.Worker, logger: deps.Logger, - assetConfig: deps.AssetConfig, + deleteAssetsTimeout: deps.DeleteAssetsTimeout, cancelFnList: make([]func(), 0), assetOpCounter: assetOpCounter, @@ -154,7 +153,7 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest) } if !request.DryRun && total > 0 { - newCtx, cancel := context.WithTimeout(context.Background(), s.assetConfig.DeleteAssetsTimeout) + newCtx, cancel := context.WithTimeout(context.Background(), s.deleteAssetsTimeout) s.cancelFnList = append(s.cancelFnList, cancel) go s.executeDeleteAssets(newCtx, deleteSQLExpr) } From 81a39539604c83b716e0e8ba525a537ec742f3df Mon Sep 17 00:00:00 2001 From: Muhammad Luthfi Fahlevi Date: Mon, 2 Sep 2024 15:53:13 +0700 Subject: [PATCH 04/12] docs: update as the default value --- compass.yaml.example | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/compass.yaml.example b/compass.yaml.example index 40418db1..bcd99f44 100644 --- a/compass.yaml.example +++ b/compass.yaml.example @@ -78,4 +78,4 @@ client: asset: additional_types: - fact_source - delete_assets_timeout: 10m \ No newline at end of file + delete_assets_timeout: 5m \ No newline at end of file From 546644e75b917f97583135877be5cc6e54b165aa Mon Sep 17 00:00:00 2001 From: Muhammad Luthfi Fahlevi Date: Mon, 2 Sep 2024 16:58:37 +0700 Subject: [PATCH 05/12] feat: add validation for Assets config --- cli/config.go | 8 ++++++++ cli/server.go | 4 ++++ 2 files changed, 12 insertions(+) diff --git a/cli/config.go b/cli/config.go index b81642e7..084cb629 100644 --- a/cli/config.go +++ b/cli/config.go @@ -113,6 +113,14 @@ type Asset struct { DeleteAssetsTimeout time.Duration `mapstructure:"delete_assets_timeout" default:"5m"` } +func (a *Asset) Validate() error { + if a.DeleteAssetsTimeout == 0 { + return errors.New("delete assets timeout is required") + } + + return nil +} + func LoadConfig() (*Config, error) { var cfg Config defer func() { diff --git a/cli/server.go b/cli/server.go index 4ee43270..736ba67e 100644 --- a/cli/server.go +++ b/cli/server.go @@ -148,6 +148,10 @@ func runServer(ctx context.Context, cfg *Config) error { } }() + if err = cfg.Asset.Validate(); err != nil { + return err + } + assetService, cancel := asset.NewService(asset.ServiceDeps{ AssetRepo: assetRepository, DiscoveryRepo: discoveryRepository, From 0f0b609aa221ef1dcc319b6c93c94ed00f16ea77 Mon Sep 17 00:00:00 2001 From: Muhammad Luthfi Fahlevi Date: Mon, 2 Sep 2024 17:03:08 +0700 Subject: [PATCH 06/12] feat: move validate code in upper runServer func --- cli/server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cli/server.go b/cli/server.go index 736ba67e..1f79deaa 100644 --- a/cli/server.go +++ b/cli/server.go @@ -83,6 +83,10 @@ func serverMigrateCommand(cfg *Config) *cobra.Command { } func runServer(ctx context.Context, cfg *Config) error { + if err := cfg.Asset.Validate(); err != nil { + return err + } + logger := initLogger(cfg.LogLevel) logger.Info("compass starting", "version", Version) @@ -148,10 +152,6 @@ func runServer(ctx context.Context, cfg *Config) error { } }() - if err = cfg.Asset.Validate(); err != nil { - return err - } - assetService, cancel := asset.NewService(asset.ServiceDeps{ AssetRepo: assetRepository, DiscoveryRepo: discoveryRepository, From a938f0fe5eb77e30d325e3852966341aa02452ab Mon Sep 17 00:00:00 2001 From: Muhammad Luthfi Fahlevi Date: Mon, 2 Sep 2024 17:07:17 +0700 Subject: [PATCH 07/12] feat: change error message --- cli/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/config.go b/cli/config.go index 084cb629..f0ed057a 100644 --- a/cli/config.go +++ b/cli/config.go @@ -115,7 +115,7 @@ type Asset struct { func (a *Asset) Validate() error { if a.DeleteAssetsTimeout == 0 { - return errors.New("delete assets timeout is required") + return errors.New("delete assets timeout must greater than 0 second") } return nil From 74eb7a821ece31d5f1763e73e43605d5681aeece Mon Sep 17 00:00:00 2001 From: Muhammad Luthfi Fahlevi Date: Mon, 2 Sep 2024 18:03:45 +0700 Subject: [PATCH 08/12] feat: make err as var variable --- cli/config.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cli/config.go b/cli/config.go index f0ed057a..d5b88697 100644 --- a/cli/config.go +++ b/cli/config.go @@ -22,6 +22,8 @@ import ( const configFlag = "config" +var errDeleteAssetsTimeoutIsZero = errors.New("delete assets timeout must greater than 0 second") + func configCommand(cfg *Config) *cobra.Command { cmd := &cobra.Command{ Use: "config ", @@ -115,7 +117,7 @@ type Asset struct { func (a *Asset) Validate() error { if a.DeleteAssetsTimeout == 0 { - return errors.New("delete assets timeout must greater than 0 second") + return errDeleteAssetsTimeoutIsZero } return nil From b934a40d145e1d8591de835f8e68ba16176df5db Mon Sep 17 00:00:00 2001 From: Muhammad Luthfi Fahlevi Date: Mon, 2 Sep 2024 18:37:11 +0700 Subject: [PATCH 09/12] feat: move asset config into service --- cli/config.go | 18 ------------------ cli/root.go | 2 +- cli/server.go | 14 +++++++------- compass.yaml.example | 9 ++++----- core/asset/service.go | 19 ++++++++++--------- internal/server/server.go | 17 +++++++++++++++++ 6 files changed, 39 insertions(+), 40 deletions(-) diff --git a/cli/config.go b/cli/config.go index d5b88697..bc1b4436 100644 --- a/cli/config.go +++ b/cli/config.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "os" - "time" "github.com/MakeNowJust/heredoc" "github.com/goto/compass/core/asset" @@ -22,8 +21,6 @@ import ( const configFlag = "config" -var errDeleteAssetsTimeoutIsZero = errors.New("delete assets timeout must greater than 0 second") - func configCommand(cfg *Config) *cobra.Command { cmd := &cobra.Command{ Use: "config ", @@ -106,21 +103,6 @@ type Config struct { // Column search excluded keyword list ColSearchExclusionKeywords string `yaml:"col_search_excluded_keywords" mapstructure:"col_search_excluded_keywords"` - - Asset Asset `mapstructure:"asset"` -} - -type Asset struct { - AdditionalTypes []string `mapstructure:"additional_types"` - DeleteAssetsTimeout time.Duration `mapstructure:"delete_assets_timeout" default:"5m"` -} - -func (a *Asset) Validate() error { - if a.DeleteAssetsTimeout == 0 { - return errDeleteAssetsTimeoutIsZero - } - - return nil } func LoadConfig() (*Config, error) { diff --git a/cli/root.go b/cli/root.go index 56e0f366..9c2c9f49 100644 --- a/cli/root.go +++ b/cli/root.go @@ -46,7 +46,7 @@ func New(cliConfig *Config) *cobra.Command { } } - if err := registerAdditionalAssetTypes(cliConfig.Asset.AdditionalTypes); err != nil { + if err := registerAdditionalAssetTypes(cliConfig.Service.Asset.AdditionalTypes); err != nil { return fmt.Errorf("error registering additional asset types: %w", err) } diff --git a/cli/server.go b/cli/server.go index 1f79deaa..0f7ec8f2 100644 --- a/cli/server.go +++ b/cli/server.go @@ -83,7 +83,7 @@ func serverMigrateCommand(cfg *Config) *cobra.Command { } func runServer(ctx context.Context, cfg *Config) error { - if err := cfg.Asset.Validate(); err != nil { + if err := cfg.Service.Asset.Validate(); err != nil { return err } @@ -153,12 +153,12 @@ func runServer(ctx context.Context, cfg *Config) error { }() assetService, cancel := asset.NewService(asset.ServiceDeps{ - AssetRepo: assetRepository, - DiscoveryRepo: discoveryRepository, - LineageRepo: lineageRepository, - Worker: wrkr, - Logger: logger, - DeleteAssetsTimeout: cfg.Asset.DeleteAssetsTimeout, + AssetRepo: assetRepository, + DiscoveryRepo: discoveryRepository, + LineageRepo: lineageRepository, + Worker: wrkr, + Logger: logger, + AssetConfig: cfg.Service.Asset, }) defer cancel() diff --git a/compass.yaml.example b/compass.yaml.example index bcd99f44..b5302f15 100644 --- a/compass.yaml.example +++ b/compass.yaml.example @@ -52,6 +52,10 @@ service: max_send_msg_size: 33554432 max_recv_msg_size: 33554432 request_timeout: 5s + asset: + additional_types: + - fact_source + delete_assets_timeout: 5m worker: enabled: true @@ -74,8 +78,3 @@ client: host: localhost:8081 serverheaderkey_uuid: Compass-User-UUID // if ommited, will use value on service.identity.headerkey_uuid serverheadervalue_uuid: gotocompany@email.com - -asset: - additional_types: - - fact_source - delete_assets_timeout: 5m \ No newline at end of file diff --git a/core/asset/service.go b/core/asset/service.go index d5e668d3..ad7e2163 100644 --- a/core/asset/service.go +++ b/core/asset/service.go @@ -6,6 +6,7 @@ import ( "time" "github.com/google/uuid" + "github.com/goto/compass/internal/server" "github.com/goto/compass/pkg/queryexpr" "github.com/goto/salt/log" "go.opentelemetry.io/otel" @@ -19,7 +20,7 @@ type Service struct { lineageRepository LineageRepository worker Worker logger log.Logger - deleteAssetsTimeout time.Duration + assetConfig server.AssetConfig cancelFnList []func() assetOpCounter metric.Int64Counter @@ -36,12 +37,12 @@ type Worker interface { } type ServiceDeps struct { - AssetRepo Repository - DiscoveryRepo DiscoveryRepository - LineageRepo LineageRepository - Worker Worker - Logger log.Logger - DeleteAssetsTimeout time.Duration + AssetRepo Repository + DiscoveryRepo DiscoveryRepository + LineageRepo LineageRepository + Worker Worker + Logger log.Logger + AssetConfig server.AssetConfig } func NewService(deps ServiceDeps) (service *Service, cancel func()) { @@ -57,7 +58,7 @@ func NewService(deps ServiceDeps) (service *Service, cancel func()) { lineageRepository: deps.LineageRepo, worker: deps.Worker, logger: deps.Logger, - deleteAssetsTimeout: deps.DeleteAssetsTimeout, + assetConfig: deps.AssetConfig, cancelFnList: make([]func(), 0), assetOpCounter: assetOpCounter, @@ -153,7 +154,7 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest) } if !request.DryRun && total > 0 { - newCtx, cancel := context.WithTimeout(context.Background(), s.deleteAssetsTimeout) + newCtx, cancel := context.WithTimeout(context.Background(), s.assetConfig.DeleteAssetsTimeout) s.cancelFnList = append(s.cancelFnList, cancel) go s.executeDeleteAssets(newCtx, deleteSQLExpr) } diff --git a/internal/server/server.go b/internal/server/server.go index 380faecc..d2be1bdd 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -30,6 +30,8 @@ import ( "google.golang.org/protobuf/encoding/protojson" ) +var errDeleteAssetsTimeoutIsZero = errors.New("delete assets timeout must greater than 0 second") + type Config struct { Host string `mapstructure:"host" default:"0.0.0.0"` Port int `mapstructure:"port" default:"8080"` @@ -40,6 +42,21 @@ type Config struct { RequestTimeout time.Duration `mapstructure:"request_timeout" default:"10s"` // GRPC Config GRPC GRPCConfig `mapstructure:"grpc"` + + Asset AssetConfig `mapstructure:"asset"` +} + +type AssetConfig struct { + AdditionalTypes []string `mapstructure:"additional_types"` + DeleteAssetsTimeout time.Duration `mapstructure:"delete_assets_timeout" default:"5m"` +} + +func (a *AssetConfig) Validate() error { + if a.DeleteAssetsTimeout == 0 { + return errDeleteAssetsTimeoutIsZero + } + + return nil } func (cfg Config) addr() string { return fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) } From b4c695bbea736a9cb3c3a4476888b70539994ad5 Mon Sep 17 00:00:00 2001 From: Muhammad Luthfi Fahlevi Date: Mon, 2 Sep 2024 18:48:48 +0700 Subject: [PATCH 10/12] feat: move asset config into core/asset --- cli/config.go | 2 ++ cli/root.go | 2 +- cli/server.go | 4 ++-- core/asset/config.go | 21 +++++++++++++++++++++ core/asset/service.go | 5 ++--- internal/server/server.go | 17 ----------------- 6 files changed, 28 insertions(+), 23 deletions(-) create mode 100644 core/asset/config.go diff --git a/cli/config.go b/cli/config.go index bc1b4436..5407cf5c 100644 --- a/cli/config.go +++ b/cli/config.go @@ -103,6 +103,8 @@ type Config struct { // Column search excluded keyword list ColSearchExclusionKeywords string `yaml:"col_search_excluded_keywords" mapstructure:"col_search_excluded_keywords"` + + Asset asset.Config `mapstructure:"asset"` } func LoadConfig() (*Config, error) { diff --git a/cli/root.go b/cli/root.go index 9c2c9f49..56e0f366 100644 --- a/cli/root.go +++ b/cli/root.go @@ -46,7 +46,7 @@ func New(cliConfig *Config) *cobra.Command { } } - if err := registerAdditionalAssetTypes(cliConfig.Service.Asset.AdditionalTypes); err != nil { + if err := registerAdditionalAssetTypes(cliConfig.Asset.AdditionalTypes); err != nil { return fmt.Errorf("error registering additional asset types: %w", err) } diff --git a/cli/server.go b/cli/server.go index 0f7ec8f2..e698f933 100644 --- a/cli/server.go +++ b/cli/server.go @@ -83,7 +83,7 @@ func serverMigrateCommand(cfg *Config) *cobra.Command { } func runServer(ctx context.Context, cfg *Config) error { - if err := cfg.Service.Asset.Validate(); err != nil { + if err := cfg.Asset.Validate(); err != nil { return err } @@ -158,7 +158,7 @@ func runServer(ctx context.Context, cfg *Config) error { LineageRepo: lineageRepository, Worker: wrkr, Logger: logger, - AssetConfig: cfg.Service.Asset, + AssetConfig: cfg.Asset, }) defer cancel() diff --git a/core/asset/config.go b/core/asset/config.go new file mode 100644 index 00000000..bc14d641 --- /dev/null +++ b/core/asset/config.go @@ -0,0 +1,21 @@ +package asset + +import ( + "errors" + "time" +) + +var errDeleteAssetsTimeoutIsZero = errors.New("delete assets timeout must greater than 0 second") + +type Config struct { + AdditionalTypes []string `mapstructure:"additional_types"` + DeleteAssetsTimeout time.Duration `mapstructure:"delete_assets_timeout" default:"5m"` +} + +func (c *Config) Validate() error { + if c.DeleteAssetsTimeout == 0 { + return errDeleteAssetsTimeoutIsZero + } + + return nil +} diff --git a/core/asset/service.go b/core/asset/service.go index ad7e2163..9fd97b89 100644 --- a/core/asset/service.go +++ b/core/asset/service.go @@ -6,7 +6,6 @@ import ( "time" "github.com/google/uuid" - "github.com/goto/compass/internal/server" "github.com/goto/compass/pkg/queryexpr" "github.com/goto/salt/log" "go.opentelemetry.io/otel" @@ -20,7 +19,7 @@ type Service struct { lineageRepository LineageRepository worker Worker logger log.Logger - assetConfig server.AssetConfig + assetConfig Config cancelFnList []func() assetOpCounter metric.Int64Counter @@ -42,7 +41,7 @@ type ServiceDeps struct { LineageRepo LineageRepository Worker Worker Logger log.Logger - AssetConfig server.AssetConfig + AssetConfig Config } func NewService(deps ServiceDeps) (service *Service, cancel func()) { diff --git a/internal/server/server.go b/internal/server/server.go index d2be1bdd..380faecc 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -30,8 +30,6 @@ import ( "google.golang.org/protobuf/encoding/protojson" ) -var errDeleteAssetsTimeoutIsZero = errors.New("delete assets timeout must greater than 0 second") - type Config struct { Host string `mapstructure:"host" default:"0.0.0.0"` Port int `mapstructure:"port" default:"8080"` @@ -42,21 +40,6 @@ type Config struct { RequestTimeout time.Duration `mapstructure:"request_timeout" default:"10s"` // GRPC Config GRPC GRPCConfig `mapstructure:"grpc"` - - Asset AssetConfig `mapstructure:"asset"` -} - -type AssetConfig struct { - AdditionalTypes []string `mapstructure:"additional_types"` - DeleteAssetsTimeout time.Duration `mapstructure:"delete_assets_timeout" default:"5m"` -} - -func (a *AssetConfig) Validate() error { - if a.DeleteAssetsTimeout == 0 { - return errDeleteAssetsTimeoutIsZero - } - - return nil } func (cfg Config) addr() string { return fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) } From 7b5b0c188002a5d3e9dd49371d28854d791ada31 Mon Sep 17 00:00:00 2001 From: Muhammad Luthfi Fahlevi Date: Mon, 2 Sep 2024 19:03:08 +0700 Subject: [PATCH 11/12] feat: update config --- compass.yaml.example | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/compass.yaml.example b/compass.yaml.example index b5302f15..bcd99f44 100644 --- a/compass.yaml.example +++ b/compass.yaml.example @@ -52,10 +52,6 @@ service: max_send_msg_size: 33554432 max_recv_msg_size: 33554432 request_timeout: 5s - asset: - additional_types: - - fact_source - delete_assets_timeout: 5m worker: enabled: true @@ -78,3 +74,8 @@ client: host: localhost:8081 serverheaderkey_uuid: Compass-User-UUID // if ommited, will use value on service.identity.headerkey_uuid serverheadervalue_uuid: gotocompany@email.com + +asset: + additional_types: + - fact_source + delete_assets_timeout: 5m \ No newline at end of file From fbc4a5ee385d1e2072a4eac31d8e4469c3e2a1c4 Mon Sep 17 00:00:00 2001 From: Muhammad Luthfi Fahlevi Date: Mon, 2 Sep 2024 19:10:47 +0700 Subject: [PATCH 12/12] refactor: rename the variable --- cli/server.go | 2 +- core/asset/service.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cli/server.go b/cli/server.go index e698f933..446328a9 100644 --- a/cli/server.go +++ b/cli/server.go @@ -158,7 +158,7 @@ func runServer(ctx context.Context, cfg *Config) error { LineageRepo: lineageRepository, Worker: wrkr, Logger: logger, - AssetConfig: cfg.Asset, + Config: cfg.Asset, }) defer cancel() diff --git a/core/asset/service.go b/core/asset/service.go index 9fd97b89..7fe5b3ff 100644 --- a/core/asset/service.go +++ b/core/asset/service.go @@ -19,7 +19,7 @@ type Service struct { lineageRepository LineageRepository worker Worker logger log.Logger - assetConfig Config + config Config cancelFnList []func() assetOpCounter metric.Int64Counter @@ -41,7 +41,7 @@ type ServiceDeps struct { LineageRepo LineageRepository Worker Worker Logger log.Logger - AssetConfig Config + Config Config } func NewService(deps ServiceDeps) (service *Service, cancel func()) { @@ -57,7 +57,7 @@ func NewService(deps ServiceDeps) (service *Service, cancel func()) { lineageRepository: deps.LineageRepo, worker: deps.Worker, logger: deps.Logger, - assetConfig: deps.AssetConfig, + config: deps.Config, cancelFnList: make([]func(), 0), assetOpCounter: assetOpCounter, @@ -153,7 +153,7 @@ func (s *Service) DeleteAssets(ctx context.Context, request DeleteAssetsRequest) } if !request.DryRun && total > 0 { - newCtx, cancel := context.WithTimeout(context.Background(), s.assetConfig.DeleteAssetsTimeout) + newCtx, cancel := context.WithTimeout(context.Background(), s.config.DeleteAssetsTimeout) s.cancelFnList = append(s.cancelFnList, cancel) go s.executeDeleteAssets(newCtx, deleteSQLExpr) }