Skip to content

Commit 576b9b5

Browse files
author
Muhammad Luthfi Fahlevi
committed
fix: insert logic and make it single responsibility for upsert patch and upsert post
1 parent 675a75c commit 576b9b5

File tree

2 files changed

+65
-35
lines changed

2 files changed

+65
-35
lines changed

internal/store/postgres/asset_repository.go

+55-22
Original file line numberDiff line numberDiff line change
@@ -322,22 +322,61 @@ func (r *AssetRepository) getByVersion(
322322
return ast.toVersionedAsset(latest)
323323
}
324324

325+
// UpsertPatch creates a new asset if it does not exist yet.
326+
// It updates if asset does exist.
327+
// Checking existence is done using "urn", "type", and "service" fields
328+
// And will revalidate again with additional: "data" and "name" fields.
325329
func (r *AssetRepository) UpsertPatch(ctx context.Context, ast *asset.Asset, patchData map[string]interface{}) (asset.Asset, error) {
326-
return r.upsertWithPatchOption(ctx, ast, patchData)
327-
}
330+
var upsertedAsset asset.Asset
331+
err := r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) error {
332+
fetchedAsset, err := r.GetByURNWithTx(ctx, tx, ast.URN)
333+
if errors.As(err, new(asset.NotFoundError)) {
334+
err = nil
335+
}
336+
if err != nil {
337+
return fmt.Errorf("error getting asset by URN: %w", err)
338+
}
328339

329-
func (r *AssetRepository) Upsert(ctx context.Context, ast *asset.Asset) (asset.Asset, error) {
330-
return r.upsertWithPatchOption(ctx, ast, nil)
340+
if fetchedAsset.ID == "" {
341+
// insert flow
342+
upsertedAsset, err = r.insert(ctx, tx, ast)
343+
if err != nil {
344+
return fmt.Errorf("error inserting asset to DB: %w", err)
345+
}
346+
return nil
347+
}
348+
349+
// update flow
350+
if err := copier.CopyWithOption(&ast, &fetchedAsset, copier.Option{DeepCopy: true}); err != nil {
351+
return err
352+
}
353+
ast.Patch(patchData)
354+
if err := r.validateAsset(*ast); err != nil {
355+
return err
356+
}
357+
changelog, err := fetchedAsset.Diff(ast)
358+
if err != nil {
359+
return fmt.Errorf("error diffing two assets: %w", err)
360+
}
361+
362+
upsertedAsset, err = r.update(ctx, tx, ast, &fetchedAsset, changelog)
363+
if err != nil {
364+
return fmt.Errorf("error updating asset to DB: %w", err)
365+
}
366+
367+
return nil
368+
})
369+
if err != nil {
370+
return asset.Asset{}, err
371+
}
372+
373+
return upsertedAsset, nil
331374
}
332375

333376
// Upsert creates a new asset if it does not exist yet.
334377
// It updates if asset does exist.
335-
// Checking existence is done using "urn", "type", and "service" fields.
336-
func (r *AssetRepository) upsertWithPatchOption( //nolint:gocognit
337-
ctx context.Context,
338-
ast *asset.Asset,
339-
patchData map[string]interface{},
340-
) (asset.Asset, error) {
378+
// Checking existence is done using "urn", "type", "name", "data", and "service" fields.
379+
func (r *AssetRepository) Upsert(ctx context.Context, ast *asset.Asset) (asset.Asset, error) {
341380
var upsertedAsset asset.Asset
342381
err := r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) error {
343382
fetchedAsset, err := r.GetByURNWithTx(ctx, tx, ast.URN)
@@ -347,27 +386,17 @@ func (r *AssetRepository) upsertWithPatchOption( //nolint:gocognit
347386
if err != nil {
348387
return fmt.Errorf("error getting asset by URN: %w", err)
349388
}
350-
if err := r.validateAsset(fetchedAsset); err != nil {
351-
return err
352-
}
353389

354390
if fetchedAsset.ID == "" {
355391
// insert flow
356392
upsertedAsset, err = r.insert(ctx, tx, ast)
357393
if err != nil {
358394
return fmt.Errorf("error inserting asset to DB: %w", err)
359395
}
396+
360397
return nil
361398
}
362399

363-
// update flow
364-
if patchData != nil {
365-
err := copier.CopyWithOption(&ast, &fetchedAsset, copier.Option{DeepCopy: true})
366-
if err != nil {
367-
return err
368-
}
369-
ast.Patch(patchData)
370-
}
371400
changelog, err := fetchedAsset.Diff(ast)
372401
if err != nil {
373402
return fmt.Errorf("error diffing two assets: %w", err)
@@ -614,6 +643,7 @@ func (r *AssetRepository) insert(ctx context.Context, tx *sqlx.Tx, ast *asset.As
614643
"created_at", "updated_by", "updated_at", "refreshed_at", "version").
615644
Values(ast.URN, ast.Type, ast.Service, ast.Name, ast.Description, ast.Data, ast.URL, ast.Labels,
616645
ast.CreatedAt, ast.UpdatedBy.ID, ast.UpdatedAt, currentTime, asset.BaseVersion).
646+
Suffix("RETURNING \"id\"").
617647
PlaceholderFormat(sq.Dollar).
618648
ToSql()
619649
if err != nil {
@@ -622,9 +652,12 @@ func (r *AssetRepository) insert(ctx context.Context, tx *sqlx.Tx, ast *asset.As
622652

623653
ast.Version = asset.BaseVersion
624654

625-
if err := r.execContext(ctx, tx, query, args...); err != nil {
655+
var id string
656+
err = tx.QueryRowContext(ctx, query, args...).Scan(&id)
657+
if err != nil {
626658
return asset.Asset{}, fmt.Errorf("run insert query: %w", err)
627659
}
660+
ast.ID = id
628661

629662
users, err := r.createOrFetchUsers(ctx, tx, ast.Owners)
630663
if err != nil {

internal/store/postgres/asset_repository_test.go

+10-13
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (r *AssetRepositoryTestSuite) createUsers(userRepo user.Repository) []user.
8787
return users
8888
}
8989

90-
func (r *AssetRepositoryTestSuite) BeforeTest() {
90+
func (r *AssetRepositoryTestSuite) BeforeTest(suiteName, testName string) {
9191
err := testutils.RunMigrationsWithClient(r.T(), r.client)
9292
r.NoError(err)
9393

@@ -607,18 +607,17 @@ func (r *AssetRepositoryTestSuite) TestGetByID() {
607607
}
608608

609609
var err error
610-
upsertedAsset, err := r.repository.Upsert(r.ctx, &asset1)
610+
upsertedAsset1, err := r.repository.Upsert(r.ctx, &asset1)
611611
r.Require().NoError(err)
612-
r.NotEmpty(upsertedAsset.ID)
612+
r.NotEmpty(upsertedAsset1.ID)
613613

614-
upsertedAsset, err = r.repository.Upsert(r.ctx, &asset2)
614+
upsertedAsset2, err := r.repository.Upsert(r.ctx, &asset2)
615615
r.Require().NoError(err)
616-
r.NotEmpty(upsertedAsset.ID)
616+
r.NotEmpty(upsertedAsset2.ID)
617617

618-
result, err := r.repository.GetByID(r.ctx, asset2.ID)
618+
result, err := r.repository.GetByID(r.ctx, upsertedAsset2.ID)
619619
r.NoError(err)
620-
asset2.UpdatedBy = r.users[1]
621-
r.assertAsset(&asset2, &result)
620+
r.assertAsset(&upsertedAsset2, &result)
622621
})
623622

624623
r.Run("return owners if any", func() {
@@ -637,10 +636,8 @@ func (r *AssetRepositoryTestSuite) TestGetByID() {
637636
r.Require().NoError(err)
638637
r.Require().NotEmpty(upsertedAsset.ID)
639638

640-
result, err := r.repository.GetByID(r.ctx, ast.ID)
641-
r.NoError(err)
642-
r.Len(result.Owners, len(ast.Owners))
643-
for i, owner := range result.Owners {
639+
r.Len(upsertedAsset.Owners, len(ast.Owners))
640+
for i, owner := range upsertedAsset.Owners {
644641
r.Equal(ast.Owners[i].ID, owner.ID)
645642
}
646643
})
@@ -1471,7 +1468,7 @@ func (r *AssetRepositoryTestSuite) TestAddProbe() {
14711468
})
14721469

14731470
r.Run("should populate CreatedAt and persist probe", func() {
1474-
r.BeforeTest()
1471+
r.BeforeTest("", "")
14751472
ast := asset.Asset{
14761473
URN: "urn-add-probe-1",
14771474
Type: typeJob,

0 commit comments

Comments
 (0)