Skip to content

Commit

Permalink
Merge pull request #22 from breathbath/master
Browse files Browse the repository at this point in the history
Added effective go routine based lister for large API collections
  • Loading branch information
Dysar authored Jun 15, 2020
2 parents 82ccd3d + 1decd0f commit ec62b8a
Show file tree
Hide file tree
Showing 14 changed files with 1,086 additions and 21 deletions.
61 changes: 59 additions & 2 deletions examples/products/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package main

import (
"context"
"crypto/tls"
"flag"
"fmt"
"github.com/erply/api-go-wrapper/pkg/api"
"github.com/erply/api-go-wrapper/pkg/api/auth"
sharedCommon "github.com/erply/api-go-wrapper/pkg/api/common"
"github.com/erply/api-go-wrapper/pkg/api/products"
"net/http"
"time"
Expand All @@ -17,12 +19,20 @@ func main() {
clientCode := flag.String("cc", "", "client code")
flag.Parse()

connectionTimeout := 60 * time.Second
transport := &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
ResponseHeaderTimeout: connectionTimeout,
}
httpCl := &http.Client{Transport: transport}

sessionKey, err := auth.VerifyUser(*username, *password, *clientCode, http.DefaultClient)
if err != nil {
panic(err)
}

apiClient, err := api.NewClient(sessionKey, *clientCode, nil)
apiClient, err := api.NewClient(sessionKey, *clientCode, httpCl)
if err != nil {
panic(err)
}
Expand All @@ -32,7 +42,13 @@ func main() {
panic(err)
}

fmt.Printf("%+v\n", prods)
fmt.Printf("GetProductsBulk:\n%+v\n", prods)

prods, err = GetProductsInParallel(apiClient)
if err != nil {
panic(err)
}
fmt.Printf("GetProductsInParallel:\n%+v\n", prods)
}

func GetProductsBulk(cl *api.Client) (prods []products.Product, err error) {
Expand Down Expand Up @@ -65,3 +81,44 @@ func GetProductsBulk(cl *api.Client) (prods []products.Product, err error) {

return
}

func GetProductsInParallel(cl *api.Client) ([]products.Product, error) {
productsDataProvider := products.NewListingDataProvider(cl.ProductManager)

lister := sharedCommon.NewLister(
sharedCommon.ListingSettings{
MaxRequestsCountPerSecond: 5,
StreamBufferLength: 10,
MaxItemsPerRequest: 300,
MaxFetchersCount: 10,
},
productsDataProvider,
func(sleepTime time.Duration) {
time.Sleep(sleepTime)
},
)

ctx, cancel := context.WithTimeout(context.Background(), time.Second * 5)
defer cancel()

prodsChan := lister.Get(ctx, map[string]interface{}{
"changedSince": time.Date(2020, 2, 15, 0, 0, 0, 0, time.UTC).Unix(),
})

prods := make([]products.Product, 0)
var err error
doneChan := make(chan struct{}, 1)
go func() {
defer close(doneChan)
for prod := range prodsChan {
if prod.Err != nil {
err = prod.Err
return
}
prods = append(prods, prod.Payload.(products.Product))
}
}()

<-doneChan
return prods, err
}
4 changes: 2 additions & 2 deletions pkg/api/addresses/addressRequests.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (cli *Client) SaveAddress(ctx context.Context, filters map[string]string) (
func (cli *Client) SaveAddressesBulk(ctx context.Context, addrMap []map[string]interface{}, attrs map[string]string) (SaveAddressesResponseBulk, error) {
var saveAddressesResponseBulk SaveAddressesResponseBulk

if len(addrMap) > common.MaxBulkRequestsCount {
return saveAddressesResponseBulk, fmt.Errorf("cannot save more than %d addresses in one request", common.MaxBulkRequestsCount)
if len(addrMap) > sharedCommon.MaxBulkRequestsCount {
return saveAddressesResponseBulk, fmt.Errorf("cannot save more than %d addresses in one request", sharedCommon.MaxBulkRequestsCount)
}

bulkInputs := make([]common.BulkInput, 0, len(addrMap))
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Client struct {
CustomerManager customers.Manager
//POS related requests
PosManager pos.Manager
//Products related requests
//ListingDataProvider related requests
ProductManager products.Manager
//SalesDocuments, Payments, Projects, ShoppingCart, VatRates
SalesManager sales.Manager
Expand Down
1 change: 1 addition & 0 deletions internal/common/bulk.go → pkg/api/common/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ package common

const (
MaxBulkRequestsCount = 100
MaxCountPerBulkRequestItem = 100
)
241 changes: 241 additions & 0 deletions pkg/api/common/listing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package common

import (
"context"
"math"
"sync"
)

const DefaultMaxFetchersCount = 1
const DefaultMaxRequestsCountPerSecond = 0

type ListingSettings struct {
MaxRequestsCountPerSecond int
StreamBufferLength int
MaxFetchersCount int
MaxItemsPerRequest int
}

type Cursor struct {
Limit int
Offset int
}

type ItemsStream chan Item

type Item struct {
Err error
TotalCount int
Payload interface{}
}

func setListingSettingsDefaults(settingsFromInput ListingSettings) ListingSettings {
if settingsFromInput.MaxRequestsCountPerSecond == 0 {
settingsFromInput.MaxRequestsCountPerSecond = DefaultMaxRequestsCountPerSecond
}

if settingsFromInput.MaxItemsPerRequest == 0 || settingsFromInput.MaxItemsPerRequest > MaxCountPerBulkRequestItem * MaxCountPerBulkRequestItem {
settingsFromInput.MaxItemsPerRequest = MaxCountPerBulkRequestItem * MaxCountPerBulkRequestItem
}

if settingsFromInput.MaxFetchersCount == 0 {
settingsFromInput.MaxFetchersCount = DefaultMaxFetchersCount
}

return settingsFromInput
}

type DataProvider interface {
Count(ctx context.Context, filters map[string]interface{}) (int, error)
Read(ctx context.Context, bulkFilters []map[string]interface{}, callback func(item interface{})) error
}

type Lister struct {
listingSettings ListingSettings
reqThrottler Throttler
listingDataProvider DataProvider
}

func NewLister(settings ListingSettings, dataProvider DataProvider, sl Sleeper) *Lister {
settings = setListingSettingsDefaults(settings)

thrl := NewSleepThrottler(settings.MaxRequestsCountPerSecond, sl)

return &Lister{
listingSettings: settings,
reqThrottler: thrl,
listingDataProvider: dataProvider,
}
}

func (p *Lister) Get(ctx context.Context, filters map[string]interface{}) ItemsStream {
p.reqThrottler.Throttle()

filters["recordsOnPage"] = 1
filters["pageNo"] = 1

totalCount, err := p.listingDataProvider.Count(ctx, filters)
if err != nil {
outputChan := make(ItemsStream, 1)
defer close(outputChan)

outputChan <- Item{
Err: err,
TotalCount: totalCount,
Payload: nil,
}
return outputChan
}

cursorsChan := p.getCursors(ctx, totalCount)

childChans := make([]ItemsStream, 0, p.listingSettings.MaxFetchersCount)
for i := 0; i < p.listingSettings.MaxFetchersCount; i++ {
childChan := p.fetchProductsChunk(ctx, cursorsChan, totalCount, filters)
childChans = append(childChans, childChan)
}

return p.mergeChannels(ctx, childChans...)
}

func (p *Lister) fetchProductsChunk(ctx context.Context, cursorChan chan []Cursor, totalCount int, filters map[string]interface{}) ItemsStream {
prodStream := make(chan Item, p.listingSettings.StreamBufferLength)
go func() {
defer close(prodStream)
for cursors := range cursorChan {
p.fetchProductsFromAPI(ctx, cursors, totalCount, prodStream, filters)

select {
case <-ctx.Done():
return
default:
continue
}
}
}()

return prodStream
}

func (p *Lister) getCursors(ctx context.Context, totalCount int) chan []Cursor {
out := make(chan []Cursor, p.listingSettings.MaxFetchersCount)

leftCount := totalCount

go func() {
defer close(out)

curPage := 1
if p.listingSettings.MaxItemsPerRequest > MaxCountPerBulkRequestItem*MaxBulkRequestsCount {
p.listingSettings.MaxItemsPerRequest = MaxCountPerBulkRequestItem*MaxBulkRequestsCount
}

for ; leftCount > 0; {
countToFetchForBulkRequest := leftCount
if leftCount > p.listingSettings.MaxItemsPerRequest {
countToFetchForBulkRequest = p.listingSettings.MaxItemsPerRequest
}

bulkItemsCount := CeilDivisionInt(countToFetchForBulkRequest, MaxCountPerBulkRequestItem)
if bulkItemsCount > MaxBulkRequestsCount {
bulkItemsCount = MaxBulkRequestsCount
}

limit := CeilDivisionInt(p.listingSettings.MaxItemsPerRequest, bulkItemsCount)
if limit > MaxCountPerBulkRequestItem {
limit = MaxCountPerBulkRequestItem
}

cursorsForBulkRequest := make([]Cursor, 0, bulkItemsCount)
for i := 0; i < bulkItemsCount; i++ {
cursorsForBulkRequest = append(
cursorsForBulkRequest,
Cursor{
Limit: limit,
Offset: curPage,
},
)
curPage++
leftCount -= limit
}
select {
case out <- cursorsForBulkRequest:
continue
case <-ctx.Done():
return
}
}
}()

return out
}

func (p *Lister) fetchProductsFromAPI(
ctx context.Context,
cursors []Cursor,
totalCount int,
outputChan ItemsStream,
filters map[string]interface{},
) {
bulkFilters := make([]map[string]interface{}, 0, len(cursors))
for _, cursor := range cursors {
bulkFilter := make(map[string]interface{})
for filterKey, filterValue := range filters {
bulkFilter[filterKey] = filterValue
}
bulkFilter["recordsOnPage"] = cursor.Limit
bulkFilter["pageNo"] = cursor.Offset
bulkFilters = append(bulkFilters, bulkFilter)
}

p.reqThrottler.Throttle()

err := p.listingDataProvider.Read(ctx, bulkFilters, func(item interface{}) {
outputChan <- Item{
Err: nil,
TotalCount: totalCount,
Payload: item,
}
})

if err != nil {
outputChan <- Item{
Err: err,
TotalCount: totalCount,
Payload: nil,
}
return
}
}

func (p *Lister) mergeChannels(ctx context.Context, childChans ...ItemsStream) ItemsStream {
parentChan := make(ItemsStream, p.listingSettings.StreamBufferLength)

var wg sync.WaitGroup
wg.Add(len(childChans))

for _, childChan := range childChans {
go func(productsChildChan <-chan Item) {
defer wg.Done()
for prod := range productsChildChan {
select {
case parentChan <- prod:
continue
case <-ctx.Done():
return
}
}
}(childChan)
}

go func() {
wg.Wait()
close(parentChan)
}()

return parentChan
}

func CeilDivisionInt(x, y int) int {
return int(math.Ceil(float64(x) / float64(y)))
}
Loading

0 comments on commit ec62b8a

Please sign in to comment.