Skip to content

Commit

Permalink
refactor: make interface for query expr and implement to postgresql a…
Browse files Browse the repository at this point in the history
…nd elasticsearch
  • Loading branch information
Muhammad Luthfi Fahlevi committed Aug 9, 2024
1 parent 8e4782f commit f068a05
Show file tree
Hide file tree
Showing 16 changed files with 550 additions and 563 deletions.
4 changes: 2 additions & 2 deletions core/asset/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
type Repository interface {
GetAll(context.Context, Filter) ([]Asset, error)
GetCount(context.Context, Filter) (int, error)
GetCountByQuery(context.Context, string) (int, error)
GetCountByQueryExpr(context.Context, string, bool) (int, error)
GetByID(ctx context.Context, id string) (Asset, error)
GetByURN(ctx context.Context, urn string) (Asset, error)
GetVersionHistory(ctx context.Context, flt Filter, id string) ([]Asset, error)
Expand All @@ -22,7 +22,7 @@ type Repository interface {
Upsert(ctx context.Context, ast *Asset) (string, error)
DeleteByID(ctx context.Context, id string) error
DeleteByURN(ctx context.Context, urn string) error
DeleteByQuery(ctx context.Context, whereCondition string) ([]string, error)
DeleteByQueryExpr(ctx context.Context, queryExpr string) ([]string, error)
AddProbe(ctx context.Context, assetURN string, probe *Probe) error
GetProbes(ctx context.Context, assetURN string) ([]Probe, error)
GetProbesWithFilter(ctx context.Context, flt ProbesFilter) (map[string][]Probe, error)
Expand Down
2 changes: 1 addition & 1 deletion core/asset/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type DiscoveryRepository interface {
Upsert(context.Context, Asset) error
DeleteByID(ctx context.Context, assetID string) error
DeleteByURN(ctx context.Context, assetURN string) error
DeleteByQuery(ctx context.Context, filterQuery string) error
DeleteByQueryExpr(ctx context.Context, filterQuery string) error
Search(ctx context.Context, cfg SearchConfig) (results []SearchResult, err error)
Suggest(ctx context.Context, cfg SearchConfig) (suggestions []string, err error)
GroupAssets(ctx context.Context, cfg GroupConfig) (results []GroupResult, err error)
Expand Down
34 changes: 4 additions & 30 deletions core/asset/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"
"github.com/google/uuid"
"github.com/goto/compass/pkg/generic_helper"
"github.com/goto/compass/pkg/translator"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand All @@ -26,7 +24,7 @@ type Service struct {
type Worker interface {
EnqueueIndexAssetJob(ctx context.Context, ast Asset) error
EnqueueDeleteAssetJob(ctx context.Context, urn string) error
EnqueueDeleteAssetsByQueryJob(ctx context.Context, filterQuery string) error
EnqueueDeleteAssetsByQueryExprJob(ctx context.Context, queryExpr string) error
EnqueueSyncAssetJob(ctx context.Context, service string) error
Close() error
}
Expand Down Expand Up @@ -129,26 +127,7 @@ func (s *Service) DeleteAsset(ctx context.Context, id string) (err error) {
}

func (s *Service) DeleteAssets(ctx context.Context, queryExpr string, dryRun bool) (affectedRows uint32, err error) {
queryExprTranslator := &translator.QueryExprTranslator{
QueryExpr: queryExpr,
}

// Check existence of required identifiers
identifiers := queryExprTranslator.GetIdentifiers()
mustExist := generic_helper.Contains(identifiers, "refreshed_at") &&
generic_helper.Contains(identifiers, "type") &&
generic_helper.Contains(identifiers, "service")
if !mustExist {
return 0, fmt.Errorf(
"must exists these identifiers: refreshed_at, type. Current identifiers: %v", identifiers)
}

sqlWhereCondition, err := queryExprTranslator.ConvertToSQL()
if err != nil {
return 0, err
}

total, err := s.assetRepository.GetCountByQuery(ctx, sqlWhereCondition)
total, err := s.assetRepository.GetCountByQueryExpr(ctx, queryExpr, true)
if err != nil {
return 0, err
}
Expand All @@ -157,18 +136,13 @@ func (s *Service) DeleteAssets(ctx context.Context, queryExpr string, dryRun boo
return uint32(total), nil
}

esFilterQuery, err := queryExprTranslator.ConvertToEsQuery()
if err != nil {
return 0, err
}

go func() {
urns, err := s.assetRepository.DeleteByQuery(ctx, sqlWhereCondition)
urns, err := s.assetRepository.DeleteByQueryExpr(ctx, queryExpr)
if err != nil {
return
}

if err := s.worker.EnqueueDeleteAssetsByQueryJob(ctx, esFilterQuery); err != nil {
if err := s.worker.EnqueueDeleteAssetsByQueryExprJob(ctx, queryExpr); err != nil {
return
}

Expand Down
38 changes: 35 additions & 3 deletions internal/store/elasticsearch/discovery_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"encoding/json"
"errors"
"fmt"
generichelper "github.com/goto/compass/pkg/generic_helper"

Check failure on line 9 in internal/store/elasticsearch/discovery_repository.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `gci`-ed with --skip-generated -s standard -s default (gci)
queryexpr "github.com/goto/compass/pkg/query_expr"
"io"
"net/url"
"strings"
Expand All @@ -24,6 +26,26 @@ type DiscoveryRepository struct {
columnSearchExclusionList []string
}

type DeleteAssetESExpr struct {
queryexpr.ESExpr
}

func (d *DeleteAssetESExpr) Validate() error {
identifiers, err := queryexpr.GetIdentifiers(d.QueryExpr)
if err != nil {
return err
}

mustExist := generichelper.Contains(identifiers, "refreshed_at") &&
generichelper.Contains(identifiers, "type") &&
generichelper.Contains(identifiers, "service")
if !mustExist {
return fmt.Errorf("must exists these identifiers: refreshed_at, type. Current identifiers: %v", identifiers)
}

return nil
}

func NewDiscoveryRepository(cli *Client, logger log.Logger, requestTimeout time.Duration, colSearchExclusionList []string) *DiscoveryRepository {
return &DiscoveryRepository{
cli: cli,
Expand Down Expand Up @@ -144,12 +166,22 @@ func (repo *DiscoveryRepository) DeleteByURN(ctx context.Context, assetURN strin
return repo.deleteWithQuery(ctx, "DeleteByURN", fmt.Sprintf(`{"query":{"term":{"urn.keyword": %q}}}`, assetURN))
}

func (repo *DiscoveryRepository) DeleteByQuery(ctx context.Context, filterQuery string) error {
if filterQuery == "" {
func (repo *DiscoveryRepository) DeleteByQueryExpr(ctx context.Context, queryExpr string) error {
if queryExpr == "" {
return asset.ErrEmptyQuery
}

return repo.deleteWithQuery(ctx, "DeleteByQuery", filterQuery)
deleteAssetESExpr := &DeleteAssetESExpr{
queryexpr.ESExpr{
QueryExpr: queryExpr,
},
}
esQuery, err := queryexpr.ValidateAndGetQueryFromExpr(deleteAssetESExpr)
if err != nil {
return err
}

return repo.deleteWithQuery(ctx, "DeleteByQueryExpr", esQuery)
}

func (repo *DiscoveryRepository) deleteWithQuery(ctx context.Context, discoveryOp, qry string) (err error) {
Expand Down
83 changes: 71 additions & 12 deletions internal/store/postgres/asset_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"encoding/json"
"errors"
"fmt"
generichelper "github.com/goto/compass/pkg/generic_helper"

Check failure on line 9 in internal/store/postgres/asset_repository.go

View workflow job for this annotation

GitHub Actions / golangci

File is not `gci`-ed with --skip-generated -s standard -s default (gci)
queryexpr "github.com/goto/compass/pkg/query_expr"
"log"
"strings"
"time"
Expand All @@ -17,9 +19,7 @@ import (
"github.com/r3labs/diff/v2"
)

const (
batchSize = 1000
)
const batchSize = 1000

// AssetRepository is a type that manages user operation to the primary database
type AssetRepository struct {
Expand All @@ -29,6 +29,26 @@ type AssetRepository struct {
defaultUserProvider string
}

type DeleteAssetSQLExpr struct {
queryexpr.SQLExpr
}

func (d *DeleteAssetSQLExpr) Validate() error {
identifiers, err := queryexpr.GetIdentifiers(d.QueryExpr)
if err != nil {
return err
}

mustExist := generichelper.Contains(identifiers, "refreshed_at") &&
generichelper.Contains(identifiers, "type") &&
generichelper.Contains(identifiers, "service")
if !mustExist {
return fmt.Errorf("must exists these identifiers: refreshed_at, type. Current identifiers: %v", identifiers)
}

return nil
}

// GetAll retrieves list of assets with filters
func (r *AssetRepository) GetAll(ctx context.Context, flt asset.Filter) ([]asset.Asset, error) {
builder := r.getAssetSQL().Offset(uint64(flt.Offset))
Expand Down Expand Up @@ -117,8 +137,41 @@ func (r *AssetRepository) GetCount(ctx context.Context, flt asset.Filter) (int,
return total, nil
}

// GetCountByQuery retrieves number of assets for every type based on query
func (r *AssetRepository) GetCountByQuery(ctx context.Context, sqlQuery string) (int, error) {
// GetCountByQueryExpr retrieves number of assets for every type based on query expr
func (r *AssetRepository) GetCountByQueryExpr(ctx context.Context, queryExpr string, isDeleteExpr bool) (int, error) {
var sqlQuery string
if isDeleteExpr {
deleteExpr := &DeleteAssetSQLExpr{
queryexpr.SQLExpr{
QueryExpr: queryExpr,
},
}
query, err := queryexpr.ValidateAndGetQueryFromExpr(deleteExpr)
if err != nil {
return 0, err
}
sqlQuery = query
} else {
sqlExpr := &queryexpr.SQLExpr{
QueryExpr: queryExpr,
}
query, err := queryexpr.ValidateAndGetQueryFromExpr(sqlExpr)
if err != nil {
return 0, err
}
sqlQuery = query
}

total, err := r.getCountByQuery(ctx, sqlQuery)
if err != nil {
return 0, err
}

return total, err
}

// GetCountByQueryExpr retrieves number of assets for every type based on query expr
func (r *AssetRepository) getCountByQuery(ctx context.Context, sqlQuery string) (int, error) {
builder := sq.Select("count(1)").
From("assets").
Where(sqlQuery)
Expand Down Expand Up @@ -371,13 +424,23 @@ func (r *AssetRepository) DeleteByURN(ctx context.Context, urn string) error {
return nil
}

func (r *AssetRepository) DeleteByQuery(ctx context.Context, whereCondition string) ([]string, error) {
func (r *AssetRepository) DeleteByQueryExpr(ctx context.Context, queryExpr string) ([]string, error) {
var allURNs []string
err := r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) error {
deleteExpr := &DeleteAssetSQLExpr{
queryexpr.SQLExpr{
QueryExpr: queryExpr,
},
}
query, err := queryexpr.ValidateAndGetQueryFromExpr(deleteExpr)
if err != nil {
return err
}

var lastID string
for {
// Fetch a batch of rows to delete using the last ID as a marker
urns, nextLastID, err := r.getAllURNsWithBatch(ctx, whereCondition, lastID)
urns, nextLastID, err := r.getAllURNsWithBatch(ctx, query, lastID)
if err != nil {
log.Printf("Failed to get batch to delete: %v", err)
return err
Expand Down Expand Up @@ -647,11 +710,7 @@ func (r *AssetRepository) update(ctx context.Context, assetID string, newAsset,
onlyRefreshed := len(clog) == 1 && clog[0].Path[0] == "RefreshedAt"
if onlyRefreshed {
return r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) error {
if err := r.updateAsset(ctx, tx, assetID, newAsset); err != nil {
return err
}

return nil
return r.updateAsset(ctx, tx, assetID, newAsset)
})
}

Expand Down
18 changes: 9 additions & 9 deletions internal/workermanager/discovery_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type DiscoveryRepository interface {
Upsert(context.Context, asset.Asset) error
DeleteByURN(ctx context.Context, assetURN string) error
DeleteByQuery(ctx context.Context, filterQuery string) error
DeleteByQueryExpr(ctx context.Context, queryExpr string) error
SyncAssets(ctx context.Context, indexName string) (cleanupFn func() error, err error)
}

Expand Down Expand Up @@ -159,21 +159,21 @@ func (m *Manager) DeleteAsset(ctx context.Context, job worker.JobSpec) error {
return nil
}

func (m *Manager) EnqueueDeleteAssetsByQueryJob(ctx context.Context, filterQuery string) error {
func (m *Manager) EnqueueDeleteAssetsByQueryExprJob(ctx context.Context, queryExpr string) error {
err := m.worker.Enqueue(ctx, worker.JobSpec{
Type: jobDeleteAssetsByQuery,
Payload: ([]byte)(filterQuery),
Payload: ([]byte)(queryExpr),
})
if err != nil {
return fmt.Errorf("enqueue delete asset job: %w: urn '%s'", err, filterQuery)
return fmt.Errorf("enqueue delete asset job: %w: query expr '%s'", err, queryExpr)
}

return nil
}

func (m *Manager) deleteAssetsByQueryHandler() worker.JobHandler {
return worker.JobHandler{
Handle: m.DeleteAssetsByQuery,
Handle: m.DeleteAssetsByQueryExpr,
JobOpts: worker.JobOptions{
MaxAttempts: 3,
Timeout: m.indexTimeout,
Expand All @@ -182,11 +182,11 @@ func (m *Manager) deleteAssetsByQueryHandler() worker.JobHandler {
}
}

func (m *Manager) DeleteAssetsByQuery(ctx context.Context, job worker.JobSpec) error {
filterQuery := (string)(job.Payload)
if err := m.discoveryRepo.DeleteByQuery(ctx, filterQuery); err != nil {
func (m *Manager) DeleteAssetsByQueryExpr(ctx context.Context, job worker.JobSpec) error {
queryExpr := (string)(job.Payload)
if err := m.discoveryRepo.DeleteByQueryExpr(ctx, queryExpr); err != nil {
return &worker.RetryableError{
Cause: fmt.Errorf("delete asset from discovery repo: %w: query '%s'", err, filterQuery),
Cause: fmt.Errorf("delete asset from discovery repo: %w: query expr '%s'", err, queryExpr),
}
}
return nil
Expand Down
7 changes: 7 additions & 0 deletions internal/workermanager/in_situ_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ func (m *InSituWorker) EnqueueDeleteAssetJob(ctx context.Context, urn string) er
return nil
}

func (m *InSituWorker) EnqueueDeleteAssetsByQueryExprJob(ctx context.Context, queryExpr string) error {
if err := m.discoveryRepo.DeleteByQueryExpr(ctx, queryExpr); err != nil {
return fmt.Errorf("delete asset from discovery repo: %w: query expr '%s'", err, queryExpr)
}
return nil
}

func (m *InSituWorker) EnqueueSyncAssetJob(ctx context.Context, service string) error {
const batchSize = 1000

Expand Down
2 changes: 1 addition & 1 deletion pkg/generic_helper/generic_helper.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package generic_helper
package generichelper

// Contains checks if a target item exists in an array of any type.
func Contains[T comparable](arr []T, target T) bool {
Expand Down
Loading

0 comments on commit f068a05

Please sign in to comment.