diff --git a/client/client.go b/client/client.go index 9874f1e..9347490 100644 --- a/client/client.go +++ b/client/client.go @@ -14,7 +14,7 @@ import ( "github.com/pkg/errors" ) -const maxRetries = 3 +const maxRetries = 10 func attentiveBackoff(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration { // Retry for rate limits and server errors. @@ -30,6 +30,11 @@ func attentiveBackoff(min, max time.Duration, attemptNum int, resp *http.Respons timeToWait := time.Until(retryAfterDate) + if timeToWait < 1*time.Second { + // by default lets back off at least 1 second + return 1 * time.Second + } + return timeToWait } diff --git a/go.mod b/go.mod index 94e175f..0082eea 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.20 require ( github.com/alecthomas/kingpin/v2 v2.3.2 github.com/bmatcuk/doublestar/v4 v4.6.0 - github.com/davecgh/go-spew v1.1.1 github.com/deepmap/oapi-codegen v1.12.4 github.com/fatih/color v1.12.0 github.com/ghodss/yaml v1.0.0 @@ -28,17 +27,21 @@ require ( github.com/rodaine/table v1.1.0 github.com/samber/lo v1.38.1 github.com/schollz/progressbar/v3 v3.13.1 + github.com/sourcegraph/conc v0.3.0 github.com/stretchr/objx v0.5.0 github.com/yargevad/filepathx v1.0.0 github.com/zyedidia/highlight v0.0.0-20200217010119-291680feaca1 golang.org/x/oauth2 v0.8.0 golang.org/x/sync v0.2.0 - golang.org/x/time v0.0.0-20220411224347-583f2d630306 gopkg.in/guregu/null.v3 v3.5.0 gopkg.in/yaml.v2 v2.4.0 ) -require gopkg.in/sourcemap.v1 v1.0.5 // indirect +require ( + go.uber.org/atomic v1.9.0 // indirect + go.uber.org/multierr v1.9.0 // indirect + gopkg.in/sourcemap.v1 v1.0.5 // indirect +) require ( github.com/ProtonMail/go-crypto v0.0.0-20230217124315-7d5c6f04bbb8 // indirect diff --git a/go.sum b/go.sum index 75e2fc2..0d6d162 100644 --- a/go.sum +++ b/go.sum @@ -146,6 +146,8 @@ github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXn github.com/schollz/progressbar/v3 v3.13.1 h1:o8rySDYiQ59Mwzy2FELeHY5ZARXZTVJC7iHD6PEFUiE= github.com/schollz/progressbar/v3 v3.13.1/go.mod h1:xvrbki8kfT1fzWzBT/UZd9L6GA+jdL7HAgq2RFnO6fQ= github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -171,6 +173,10 @@ github.com/yargevad/filepathx v1.0.0 h1:SYcT+N3tYGi+NvazubCNlvgIPbzAk7i7y2dwg3I5 github.com/yargevad/filepathx v1.0.0/go.mod h1:BprfX/gpYNJHJfc35GjRRpVcwWXS89gGulUIU5tK3tA= github.com/zyedidia/highlight v0.0.0-20200217010119-291680feaca1 h1:8oQDIgT8V1yyEoEvvoXkSfoJgSst+dUEwunxq8fbs1c= github.com/zyedidia/highlight v0.0.0-20200217010119-291680feaca1/go.mod h1:c1r+Ob9tUTPB0FKWO1+x+Hsc/zNa45WdGq7Y38Ybip0= +go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= +go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= +go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= @@ -211,8 +217,6 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/time v0.0.0-20220411224347-583f2d630306 h1:+gHMid33q6pen7kv9xvT+JRinntgeXO2AeZVd0AWD3w= -golang.org/x/time v0.0.0-20220411224347-583f2d630306/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.9.3 h1:Gn1I8+64MsuTb/HpH+LmQtNas23LhUVr3rYZ0eKuaMM= golang.org/x/tools v0.9.3/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= diff --git a/reconcile/entries.go b/reconcile/entries.go index bad6c0b..827cbcd 100644 --- a/reconcile/entries.go +++ b/reconcile/entries.go @@ -10,7 +10,8 @@ import ( "github.com/incident-io/catalog-importer/v2/output" "github.com/pkg/errors" "github.com/samber/lo" - "golang.org/x/sync/errgroup" + + "github.com/sourcegraph/conc/pool" ) type EntriesClient struct { @@ -105,18 +106,18 @@ func Entries(ctx context.Context, logger kitlog.Logger, cl EntriesClient, output logger.Log("msg", fmt.Sprintf("found %d entries in the catalog, deleting %d of them", len(entries), len(toDelete))) - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(10) - + // Use a pool of workers to avoid hitting API limits but multiple other + // routines doing a smash and grab on the rate we do have available. if onStart := progress.OnDeleteStart; onStart != nil { onStart(len(toDelete)) } + p := pool.New().WithErrors().WithContext(ctx).WithMaxGoroutines(10) for _, entry := range toDelete { var ( entry = entry // avoid shadow loop variable ) - g.Go(func() error { + p.Go(func(ctx context.Context) error { if onProgress := progress.OnDeleteProgress; onProgress != nil { defer onProgress() } @@ -127,12 +128,12 @@ func Entries(ctx context.Context, logger kitlog.Logger, cl EntriesClient, output } logger.Log("msg", "destroyed catalog entry", "catalog_entry_id", entry.Id) - return nil }) } - if err := g.Wait(); err != nil { + err := p.Wait() + if err != nil { return errors.Wrap(err, "destroying catalog entries") } } @@ -157,19 +158,17 @@ func Entries(ctx context.Context, logger kitlog.Logger, cl EntriesClient, output logger.Log("msg", fmt.Sprintf("found %d entries that need creating", len(toCreate))) - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(10) - if onStart := progress.OnCreateStart; onStart != nil { onStart(len(toCreate)) } + p := pool.New().WithErrors().WithContext(ctx).WithMaxGoroutines(10) for _, model := range toCreate { var ( model = model // capture loop variable ) - g.Go(func() error { + p.Go(func(ctx context.Context) error { if onProgress := progress.OnCreateProgress; onProgress != nil { defer onProgress() } @@ -192,8 +191,9 @@ func Entries(ctx context.Context, logger kitlog.Logger, cl EntriesClient, output }) } - if err := g.Wait(); err != nil { - return errors.Wrap(err, "creating new catalog entries") + err := p.Wait() + if err != nil { + return errors.Wrap(err, "destroying catalog entries") } } @@ -265,9 +265,7 @@ func Entries(ctx context.Context, logger kitlog.Logger, cl EntriesClient, output logger.Log("msg", fmt.Sprintf("found %d entries that need updating", len(toUpdate))) - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(10) - + p := pool.New().WithErrors().WithContext(ctx).WithMaxGoroutines(10) if onStart := progress.OnUpdateStart; onStart != nil { onStart(len(toUpdate)) } @@ -278,7 +276,7 @@ func Entries(ctx context.Context, logger kitlog.Logger, cl EntriesClient, output entry = entriesByExternalID[model.ExternalID] // for ID ) - g.Go(func() error { + p.Go(func(ctx context.Context) error { if onProgress := progress.OnUpdateProgress; onProgress != nil { defer onProgress() } @@ -299,8 +297,9 @@ func Entries(ctx context.Context, logger kitlog.Logger, cl EntriesClient, output }) } - if err := g.Wait(); err != nil { - return errors.Wrap(err, "updating catalog entries") + err := p.Wait() + if err != nil { + return errors.Wrap(err, "destroying catalog entries") } }