Skip to content

Commit 4cbcaf2

Browse files
authored
feat: log malformed data during upsert ES docs (#70)
Co-authored-by: Hermawan Wijaya <[email protected]>
1 parent 39a1cee commit 4cbcaf2

File tree

2 files changed

+13
-0
lines changed

2 files changed

+13
-0
lines changed

internal/workermanager/discovery_worker.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"strings"
78
"time"
89

910
"github.com/goto/compass/core/asset"
@@ -108,6 +109,10 @@ func (m *Manager) SyncAssets(ctx context.Context, job worker.JobSpec) error {
108109

109110
for _, ast := range assets {
110111
if err := m.discoveryRepo.Upsert(ctx, ast); err != nil {
112+
if strings.Contains(err.Error(), "illegal_argument_exception") {
113+
m.logger.Error(err.Error())
114+
continue
115+
}
111116
return err
112117
}
113118
}

internal/workermanager/in_situ_worker.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,25 @@ package workermanager
33
import (
44
"context"
55
"fmt"
6+
"strings"
67
"sync"
78

89
"github.com/goto/compass/core/asset"
10+
"github.com/goto/salt/log"
911
)
1012

1113
type InSituWorker struct {
1214
discoveryRepo DiscoveryRepository
1315
assetRepo asset.Repository
1416
mutex sync.Mutex
17+
logger log.Logger
1518
}
1619

1720
func NewInSituWorker(deps Deps) *InSituWorker {
1821
return &InSituWorker{
1922
discoveryRepo: deps.DiscoveryRepo,
2023
assetRepo: deps.AssetRepo,
24+
logger: deps.Logger,
2125
}
2226
}
2327

@@ -61,6 +65,10 @@ func (m *InSituWorker) EnqueueSyncAssetJob(ctx context.Context, service string)
6165

6266
for _, ast := range assets {
6367
if err := m.discoveryRepo.Upsert(ctx, ast); err != nil {
68+
if strings.Contains(err.Error(), "illegal_argument_exception") {
69+
m.logger.Error(err.Error())
70+
continue
71+
}
6472
return err
6573
}
6674
}

0 commit comments

Comments
 (0)