Skip to content

Commit ab44801

Browse files
author
Muhammad Luthfi Fahlevi
committed
refactor: remove unnecessary changes
1 parent 8c7f841 commit ab44801

File tree

3 files changed

+71
-72
lines changed

3 files changed

+71
-72
lines changed

core/asset/asset.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ type Repository interface {
2020
GetByVersionWithID(ctx context.Context, id, version string) (Asset, error)
2121
GetByVersionWithURN(ctx context.Context, urn, version string) (Asset, error)
2222
GetTypes(ctx context.Context, flt Filter) (map[Type]int, error)
23-
Upsert(ctx context.Context, ast *Asset) (Asset, error)
24-
UpsertPatch(ctx context.Context, ast *Asset, patchData map[string]interface{}) (Asset, error)
23+
Upsert(ctx context.Context, ast *Asset) (string, error)
24+
UpsertPatch(ctx context.Context, ast *Asset, patchData map[string]interface{}) (string, error)
2525
DeleteByID(ctx context.Context, id string) error
2626
DeleteByURN(ctx context.Context, urn string) error
2727
DeleteByQueryExpr(ctx context.Context, queryExpr queryexpr.ExprStr) ([]string, error)

core/asset/service.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -106,16 +106,16 @@ func (s *Service) UpsertAssetWithoutLineage(ctx context.Context, ast *Asset) (st
106106
currentTime := time.Now()
107107
ast.RefreshedAt = &currentTime
108108

109-
newAsset, err := s.assetRepository.Upsert(ctx, ast)
109+
assetID, err := s.assetRepository.Upsert(ctx, ast)
110110
if err != nil {
111111
return "", err
112112
}
113113

114-
if err := s.worker.EnqueueIndexAssetJob(ctx, newAsset); err != nil {
114+
if err := s.worker.EnqueueIndexAssetJob(ctx, *ast); err != nil {
115115
return "", err
116116
}
117117

118-
return newAsset.ID, nil
118+
return assetID, nil
119119
}
120120

121121
func (s *Service) UpsertPatchAsset(ctx context.Context, ast *Asset, upstreams, downstreams []string, patchData map[string]interface{}) (string, error) {
@@ -135,16 +135,16 @@ func (s *Service) UpsertPatchAssetWithoutLineage(ctx context.Context, ast *Asset
135135
currentTime := time.Now()
136136
ast.RefreshedAt = &currentTime
137137

138-
newAsset, err := s.assetRepository.UpsertPatch(ctx, ast, patchData)
138+
assetID, err := s.assetRepository.UpsertPatch(ctx, ast, patchData)
139139
if err != nil {
140140
return "", err
141141
}
142142

143-
if err := s.worker.EnqueueIndexAssetJob(ctx, newAsset); err != nil {
143+
if err := s.worker.EnqueueIndexAssetJob(ctx, *ast); err != nil {
144144
return "", err
145145
}
146146

147-
return newAsset.ID, nil
147+
return assetID, nil
148148
}
149149

150150
func (s *Service) DeleteAsset(ctx context.Context, id string) (err error) {

internal/store/postgres/asset_repository.go

Lines changed: 63 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,15 @@ func (r *AssetRepository) GetByID(ctx context.Context, id string) (asset.Asset,
165165
}
166166

167167
func (r *AssetRepository) GetByURN(ctx context.Context, urn string) (asset.Asset, error) {
168-
return r.GetByURNWithTx(ctx, nil, urn)
168+
ast, err := r.getWithPredicateWithTx(ctx, nil, sq.Eq{"a.urn": urn})
169+
if errors.Is(err, sql.ErrNoRows) {
170+
return asset.Asset{}, asset.NotFoundError{URN: urn}
171+
}
172+
if err != nil {
173+
return asset.Asset{}, fmt.Errorf("error getting asset with URN = %q: %w", urn, err)
174+
}
175+
176+
return ast, nil
169177
}
170178

171179
func (r *AssetRepository) GetByURNWithTx(ctx context.Context, tx *sqlx.Tx, urn string) (asset.Asset, error) {
@@ -174,7 +182,7 @@ func (r *AssetRepository) GetByURNWithTx(ctx context.Context, tx *sqlx.Tx, urn s
174182
return asset.Asset{}, asset.NotFoundError{URN: urn}
175183
}
176184
if err != nil {
177-
return asset.Asset{}, fmt.Errorf("error getting asset with URN = %q: %w", urn, err)
185+
return asset.Asset{}, fmt.Errorf("error getting asset with Tx for URN = %q: %w", urn, err)
178186
}
179187

180188
return ast, nil
@@ -322,13 +330,11 @@ func (r *AssetRepository) getByVersion(
322330
return ast.toVersionedAsset(latest)
323331
}
324332

325-
// UpsertPatch creates a new asset if it does not exist yet.
333+
// Upsert creates a new asset if it does not exist yet.
326334
// It updates if asset does exist.
327-
// Checking existence is done using "urn", "type", and "service" fields
328-
// And will revalidate again with additional: "data" and "name" fields.
329-
func (r *AssetRepository) UpsertPatch(ctx context.Context, ast *asset.Asset, patchData map[string]interface{}) (asset.Asset, error) {
330-
var upsertedAsset asset.Asset
331-
err := r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) error {
335+
// Checking existence is done using "urn", "type", "name", "data", and "service" fields.
336+
func (r *AssetRepository) Upsert(ctx context.Context, ast *asset.Asset) (assetID string, err error) {
337+
err = r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) (err error) {
332338
fetchedAsset, err := r.GetByURNWithTx(ctx, tx, ast.URN)
333339
if errors.As(err, new(asset.NotFoundError)) {
334340
err = nil
@@ -339,46 +345,43 @@ func (r *AssetRepository) UpsertPatch(ctx context.Context, ast *asset.Asset, pat
339345

340346
if fetchedAsset.ID == "" {
341347
// insert flow
342-
upsertedAsset, err = r.insert(ctx, tx, ast)
348+
assetID, err = r.insert(ctx, tx, ast)
343349
if err != nil {
344350
return fmt.Errorf("error inserting asset to DB: %w", err)
345351
}
352+
346353
return nil
347354
}
348355

349-
// update flow
350-
if err := copier.CopyWithOption(&ast, &fetchedAsset, copier.Option{DeepCopy: true}); err != nil {
351-
return err
352-
}
353-
ast.Patch(patchData)
354-
if err := r.validateAsset(*ast); err != nil {
355-
return err
356-
}
357356
changelog, err := fetchedAsset.Diff(ast)
358357
if err != nil {
359358
return fmt.Errorf("error diffing two assets: %w", err)
360359
}
361360

362-
upsertedAsset, err = r.update(ctx, tx, ast, &fetchedAsset, changelog)
363-
if err != nil {
361+
if err := r.update(ctx, tx, ast, &fetchedAsset, changelog); err != nil {
364362
return fmt.Errorf("error updating asset to DB: %w", err)
365363
}
364+
assetID = fetchedAsset.ID
366365

367366
return nil
368367
})
369368
if err != nil {
370-
return asset.Asset{}, err
369+
return "", err
371370
}
372371

373-
return upsertedAsset, nil
372+
return assetID, nil
374373
}
375374

376-
// Upsert creates a new asset if it does not exist yet.
375+
// UpsertPatch creates a new asset if it does not exist yet.
377376
// It updates if asset does exist.
378-
// Checking existence is done using "urn", "type", "name", "data", and "service" fields.
379-
func (r *AssetRepository) Upsert(ctx context.Context, ast *asset.Asset) (asset.Asset, error) {
380-
var upsertedAsset asset.Asset
381-
err := r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) error {
377+
// Checking existence is done using "urn", "type", and "service" fields
378+
// And will revalidate again with additional: "data" and "name" fields.
379+
func (r *AssetRepository) UpsertPatch( //nolint:gocognit
380+
ctx context.Context,
381+
ast *asset.Asset,
382+
patchData map[string]interface{},
383+
) (assetID string, err error) {
384+
err = r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) (err error) {
382385
fetchedAsset, err := r.GetByURNWithTx(ctx, tx, ast.URN)
383386
if errors.As(err, new(asset.NotFoundError)) {
384387
err = nil
@@ -389,31 +392,41 @@ func (r *AssetRepository) Upsert(ctx context.Context, ast *asset.Asset) (asset.A
389392

390393
if fetchedAsset.ID == "" {
391394
// insert flow
392-
upsertedAsset, err = r.insert(ctx, tx, ast)
395+
if err := r.validateAsset(*ast); err != nil {
396+
return err
397+
}
398+
assetID, err = r.insert(ctx, tx, ast)
393399
if err != nil {
394400
return fmt.Errorf("error inserting asset to DB: %w", err)
395401
}
396-
397402
return nil
398403
}
399404

405+
// update flow
406+
if err := copier.CopyWithOption(&ast, &fetchedAsset, copier.Option{DeepCopy: true}); err != nil {
407+
return err
408+
}
409+
ast.Patch(patchData)
410+
if err := r.validateAsset(*ast); err != nil {
411+
return err
412+
}
400413
changelog, err := fetchedAsset.Diff(ast)
401414
if err != nil {
402415
return fmt.Errorf("error diffing two assets: %w", err)
403416
}
404417

405-
upsertedAsset, err = r.update(ctx, tx, ast, &fetchedAsset, changelog)
406-
if err != nil {
418+
if err := r.update(ctx, tx, ast, &fetchedAsset, changelog); err != nil {
407419
return fmt.Errorf("error updating asset to DB: %w", err)
408420
}
421+
assetID = fetchedAsset.ID
409422

410423
return nil
411424
})
412425
if err != nil {
413-
return asset.Asset{}, err
426+
return "", err
414427
}
415428

416-
return upsertedAsset, nil
429+
return assetID, nil
417430
}
418431

419432
func (*AssetRepository) validateAsset(ast asset.Asset) error {
@@ -631,7 +644,7 @@ func (r *AssetRepository) deleteWithPredicate(ctx context.Context, pred sq.Eq) (
631644
return affectedRows, nil
632645
}
633646

634-
func (r *AssetRepository) insert(ctx context.Context, tx *sqlx.Tx, ast *asset.Asset) (asset.Asset, error) {
647+
func (r *AssetRepository) insert(ctx context.Context, tx *sqlx.Tx, ast *asset.Asset) (string, error) {
635648
currentTime := time.Now()
636649
if ast.RefreshedAt != nil {
637650
currentTime = *ast.RefreshedAt
@@ -647,44 +660,35 @@ func (r *AssetRepository) insert(ctx context.Context, tx *sqlx.Tx, ast *asset.As
647660
PlaceholderFormat(sq.Dollar).
648661
ToSql()
649662
if err != nil {
650-
return asset.Asset{}, fmt.Errorf("build insert query: %w", err)
663+
return "", fmt.Errorf("build insert query: %w", err)
651664
}
652665

653666
ast.Version = asset.BaseVersion
654667

655668
var id string
656669
err = tx.QueryRowContext(ctx, query, args...).Scan(&id)
657670
if err != nil {
658-
return asset.Asset{}, fmt.Errorf("run insert query: %w", err)
671+
return "", fmt.Errorf("run insert query: %w", err)
659672
}
660673
ast.ID = id
661674

662675
users, err := r.createOrFetchUsers(ctx, tx, ast.Owners)
663676
if err != nil {
664-
return asset.Asset{}, fmt.Errorf("create and fetch owners: %w", err)
677+
return "", fmt.Errorf("create and fetch owners: %w", err)
665678
}
666679

667680
err = r.insertOwners(ctx, tx, ast.ID, users)
668681
if err != nil {
669-
return asset.Asset{}, fmt.Errorf("run insert owners query: %w", err)
670-
}
671-
672-
if err := r.insertAssetVersion(ctx, tx, ast, diff.Changelog{}); err != nil {
673-
return asset.Asset{}, err
682+
return "", fmt.Errorf("run insert owners query: %w", err)
674683
}
675684

676-
insertedAsset, err := r.GetByURNWithTx(ctx, tx, ast.URN)
677-
if err != nil {
678-
return asset.Asset{}, err
679-
}
680-
681-
return insertedAsset, nil
685+
return id, r.insertAssetVersion(ctx, tx, ast, diff.Changelog{})
682686
}
683687

684-
func (r *AssetRepository) update(ctx context.Context, tx *sqlx.Tx, newAsset, oldAsset *asset.Asset, clog diff.Changelog) (asset.Asset, error) {
688+
func (r *AssetRepository) update(ctx context.Context, tx *sqlx.Tx, newAsset, oldAsset *asset.Asset, clog diff.Changelog) error {
685689
assetID := oldAsset.ID
686690
if !isValidUUID(assetID) {
687-
return asset.Asset{}, asset.InvalidError{AssetID: assetID}
691+
return asset.InvalidError{AssetID: assetID}
688692
}
689693

690694
currentTime := time.Now()
@@ -696,50 +700,45 @@ func (r *AssetRepository) update(ctx context.Context, tx *sqlx.Tx, newAsset, old
696700

697701
if len(clog) == 0 {
698702
if newAsset.RefreshedAt == nil || newAsset.RefreshedAt == oldAsset.RefreshedAt {
699-
return *newAsset, nil
703+
return nil
700704
}
701705

702-
return *newAsset, r.updateAssetRefreshedAt(ctx, tx, assetID, currentTime)
706+
return r.updateAssetRefreshedAt(ctx, tx, assetID, currentTime)
703707
}
704708

705709
// update assets
706710
newVersion, err := asset.IncreaseMinorVersion(oldAsset.Version)
707711
if err != nil {
708-
return asset.Asset{}, err
712+
return err
709713
}
710714
newAsset.Version = newVersion
711-
newAsset.ID = oldAsset.ID
715+
newAsset.ID = assetID
712716
newAsset.UpdatedAt = currentTime
713717
newAsset.RefreshedAt = &currentTime
714718

715719
if err := r.updateAsset(ctx, tx, assetID, newAsset); err != nil {
716-
return asset.Asset{}, err
720+
return err
717721
}
718722

719723
// insert versions
720724
if err := r.insertAssetVersion(ctx, tx, newAsset, clog); err != nil {
721-
return asset.Asset{}, err
725+
return err
722726
}
723727

724728
// managing owners
725729
newAssetOwners, err := r.createOrFetchUsers(ctx, tx, newAsset.Owners)
726730
if err != nil {
727-
return asset.Asset{}, fmt.Errorf("error creating and fetching owners: %w", err)
731+
return fmt.Errorf("error creating and fetching owners: %w", err)
728732
}
729733
toInserts, toRemoves := r.compareOwners(oldAsset.Owners, newAssetOwners)
730734
if err := r.insertOwners(ctx, tx, assetID, toInserts); err != nil {
731-
return asset.Asset{}, fmt.Errorf("error inserting asset's new owners: %w", err)
735+
return fmt.Errorf("error inserting asset's new owners: %w", err)
732736
}
733737
if err := r.removeOwners(ctx, tx, assetID, toRemoves); err != nil {
734-
return asset.Asset{}, fmt.Errorf("error removing asset's old owners: %w", err)
738+
return fmt.Errorf("error removing asset's old owners: %w", err)
735739
}
736740

737-
updatedAsset, err := r.GetByURNWithTx(ctx, tx, newAsset.URN)
738-
if err != nil {
739-
return asset.Asset{}, err
740-
}
741-
742-
return updatedAsset, nil
741+
return nil
743742
}
744743

745744
func (r *AssetRepository) updateAsset(ctx context.Context, tx *sqlx.Tx, assetID string, newAsset *asset.Asset) error {

0 commit comments

Comments
 (0)