From cefa444f896886d1ac4b620d2247ad966d88ed1a Mon Sep 17 00:00:00 2001 From: Andrey Pozolotin Date: Mon, 15 Jun 2020 11:22:10 +0300 Subject: [PATCH] Added testing and examples for products lister --- examples/products/main.go | 48 +++++- pkg/api/common/listing.go | 16 +- pkg/api/common/listing_test.go | 268 +++++++++++++++++++++++-------- pkg/api/products/listing_test.go | 79 ++++++++- 4 files changed, 331 insertions(+), 80 deletions(-) diff --git a/examples/products/main.go b/examples/products/main.go index f1308aa..245bf4f 100644 --- a/examples/products/main.go +++ b/examples/products/main.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/erply/api-go-wrapper/pkg/api" "github.com/erply/api-go-wrapper/pkg/api/auth" + sharedCommon "github.com/erply/api-go-wrapper/pkg/api/common" "github.com/erply/api-go-wrapper/pkg/api/products" "net/http" "time" @@ -32,7 +33,13 @@ func main() { panic(err) } - fmt.Printf("%+v\n", prods) + fmt.Printf("GetProductsBulk:\n%+v\n", prods) + + prods, err = GetProductsInParallel(apiClient) + if err != nil { + panic(err) + } + fmt.Printf("GetProductsInParallel:\n%+v\n", prods) } func GetProductsBulk(cl *api.Client) (prods []products.Product, err error) { @@ -65,3 +72,42 @@ func GetProductsBulk(cl *api.Client) (prods []products.Product, err error) { return } + +func GetProductsInParallel(cl *api.Client) ([]products.Product, error) { + productsDataProvider := products.NewListingDataProvider(cl.ProductManager) + + lister := sharedCommon.NewLister( + sharedCommon.ListingSettings{ + MaxRequestsCountPerSecond: 5, + StreamBufferLength: 10, + MaxItemsPerRequest: 10, + MaxFetchersCount: 2, + }, + productsDataProvider, + func(sleepTime time.Duration) { + time.Sleep(sleepTime) + }, + ) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second * 5) + defer cancel() + + prodsChan := lister.Get(ctx, map[string]interface{}{}) + + prods := make([]products.Product, 0) + var err error + doneChan := make(chan struct{}, 1) + go func() { + defer close(doneChan) + for prod := range prodsChan { + if prod.Err != nil { + err = prod.Err + return + } + prods = append(prods, prod.Payload.(products.Product)) + } + }() + + <-doneChan + return prods, err +} diff --git a/pkg/api/common/listing.go b/pkg/api/common/listing.go index 1cfbaed..fe9f734 100644 --- a/pkg/api/common/listing.go +++ b/pkg/api/common/listing.go @@ -26,7 +26,6 @@ type ItemsStream chan Item type Item struct { Err error TotalCount int - ProgressCount int Payload interface{} } @@ -35,8 +34,8 @@ func setListingSettingsDefaults(settingsFromInput ListingSettings) ListingSettin settingsFromInput.MaxRequestsCountPerSecond = DefaultMaxRequestsCountPerSecond } - if settingsFromInput.MaxItemsPerRequest == 0 || settingsFromInput.MaxItemsPerRequest > MaxCountPerBulkRequestItem { - settingsFromInput.MaxItemsPerRequest = MaxCountPerBulkRequestItem + if settingsFromInput.MaxItemsPerRequest == 0 || settingsFromInput.MaxItemsPerRequest > MaxCountPerBulkRequestItem * MaxCountPerBulkRequestItem { + settingsFromInput.MaxItemsPerRequest = MaxCountPerBulkRequestItem * MaxCountPerBulkRequestItem } if settingsFromInput.MaxFetchersCount == 0 { @@ -131,25 +130,21 @@ func (p *Lister) getCursors(ctx context.Context, totalCount int) chan []Cursor { p.listingSettings.MaxItemsPerRequest = MaxCountPerBulkRequestItem*MaxBulkRequestsCount } - for ; leftCount > 0; { //leftCount 1000, p.listingSettings.MaxItemsPerRequest 100 + for ; leftCount > 0; { countToFetchForBulkRequest := leftCount if leftCount > p.listingSettings.MaxItemsPerRequest { countToFetchForBulkRequest = p.listingSettings.MaxItemsPerRequest } - //countToFetchForBulkRequest 100 - bulkItemsCount := CeilDivisionInt(countToFetchForBulkRequest, MaxCountPerBulkRequestItem) if bulkItemsCount > MaxBulkRequestsCount { bulkItemsCount = MaxBulkRequestsCount } - //bulkItemsCount 1 limit := CeilDivisionInt(p.listingSettings.MaxItemsPerRequest, bulkItemsCount) if limit > MaxCountPerBulkRequestItem { limit = MaxCountPerBulkRequestItem } - //limit 100 cursorsForBulkRequest := make([]Cursor, 0, bulkItemsCount) for i := 0; i < bulkItemsCount; i++ { @@ -244,8 +239,3 @@ func (p *Lister) mergeChannels(ctx context.Context, childChans ...ItemsStream) I func CeilDivisionInt(x, y int) int { return int(math.Ceil(float64(x) / float64(y))) } - -func AddDefaultPaginationOption(filters map[string]interface{}, limit, page int) { - filters["recordsOnPage"] = limit - filters["pageNo"] = page -} diff --git a/pkg/api/common/listing_test.go b/pkg/api/common/listing_test.go index daa8014..d4f4cce 100644 --- a/pkg/api/common/listing_test.go +++ b/pkg/api/common/listing_test.go @@ -4,12 +4,13 @@ import ( "context" "errors" "github.com/stretchr/testify/assert" + "sort" "sync" "testing" "time" ) -var nullSleeper = func(sleepTime time.Duration) {} +var NullSleeper = func(sleepTime time.Duration) {} type payloadMock struct { ID int @@ -64,83 +65,175 @@ func (dpm *DataProviderMock) Read(ctx context.Context, bulkFilters []map[string] } func TestReadingSuccess(t *testing.T) { - inputProds := []payloadMock{ - {ID: 1}, - } - dp := &DataProviderMock{ - CountOutputCount: 10, - ProductsToRead: inputProds, - countLock: sync.Mutex{}, - CountFiltersInput: map[string]interface{}{}, - readLock: sync.Mutex{}, - ReadBulkFilters: [][]map[string]interface{}{}, - } - - lister := NewLister( - ListingSettings{ - MaxRequestsCountPerSecond: 0, - StreamBufferLength: 0, - MaxItemsPerRequest: 2, - MaxFetchersCount: 2, - }, - dp, - nullSleeper, - ) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - prodsChan := lister.Get(ctx, map[string]interface{}{"filterKey": "filterVal"}) - - actualProds := collectProdsFromChannel(prodsChan) - - assert.Equal(t, map[string]interface{}{"filterKey": "filterVal", "pageNo": 1, "recordsOnPage": 1}, dp.CountFiltersInput) - assert.Equal(t, ctx, dp.CountContextInput) - assert.Equal(t, ctx, dp.ReadContextInput) - - expectedBulkFilterInputs := [][]map[string]interface{}{ + testCases := []struct { + name string + total int + inputProds []payloadMock + listingSettings ListingSettings + expectedBulkFilterInputs func() [][]map[string]interface{} + expectedProdIDs []int + }{ { - { - "filterKey": "filterVal", - "pageNo": 1, - "recordsOnPage": 2, + name: "too small request limit", + total: 10, + inputProds: []payloadMock{ + {ID: 1}, + {ID: 2}, }, - }, - { - { - "filterKey": "filterVal", - "pageNo": 2, - "recordsOnPage": 2, + expectedProdIDs: []int{1, 1, 1, 1, 1, 2, 2, 2, 2, 2}, + listingSettings: ListingSettings{ + MaxRequestsCountPerSecond: 0, + StreamBufferLength: 0, + MaxItemsPerRequest: 2, + MaxFetchersCount: 2, + }, + expectedBulkFilterInputs: func() (res [][]map[string]interface{}) { + expectedBulkRequestsCount := 5 + res = make([][]map[string]interface{}, 0, expectedBulkRequestsCount) + expectedBulkInputCount := 1 + for i := 0; i < expectedBulkRequestsCount; i++ { + bulkItems := make([]map[string]interface{}, 0, expectedBulkInputCount) + for y := 0; y < expectedBulkInputCount; y++ { + bulkItems = append(bulkItems, map[string]interface{}{ + "filterKey": "filterVal", + "pageNo": i + 1, + "recordsOnPage": 2, + }) + } + res = append(res, bulkItems) + } + return }, }, { - { - "filterKey": "filterVal", - "pageNo": 3, - "recordsOnPage": 2, + name: "max request limit", + total: 10001, + inputProds: []payloadMock{ + {ID: 3}, + }, + expectedProdIDs: []int{3, 3}, + listingSettings: ListingSettings{ + MaxRequestsCountPerSecond: 0, + StreamBufferLength: 10, + MaxItemsPerRequest: 10000, + MaxFetchersCount: 2, + }, + expectedBulkFilterInputs: func() (res [][]map[string]interface{}) { + res = make([][]map[string]interface{}, 0, 2) + bulkItems := make([]map[string]interface{}, 0, 100) + for y := 0; y < 100; y++ { + bulkItems = append(bulkItems, map[string]interface{}{ + "filterKey": "filterVal", + "pageNo": y + 1, + "recordsOnPage": 100, + }) + } + res = append(res, bulkItems) + res = append(res, []map[string]interface{}{ + { + "filterKey": "filterVal", + "pageNo": 101, + "recordsOnPage": 100, + }, + }) + return }, }, { - { - "filterKey": "filterVal", - "pageNo": 4, - "recordsOnPage": 2, + name: "fetch all in one request", + total: 1000, + inputProds: []payloadMock{ + {ID: 4}, + }, + expectedProdIDs: []int{4}, + listingSettings: ListingSettings{ + MaxRequestsCountPerSecond: 0, + StreamBufferLength: 10, + MaxItemsPerRequest: 10000, + MaxFetchersCount: 10, + }, + expectedBulkFilterInputs: func() (res [][]map[string]interface{}) { + res = make([][]map[string]interface{}, 0, 2) + bulkItems := make([]map[string]interface{}, 0, 100) + for y := 0; y < 10; y++ { + bulkItems = append(bulkItems, map[string]interface{}{ + "filterKey": "filterVal", + "pageNo": y + 1, + "recordsOnPage": 100, + }) + } + res = append(res, bulkItems) + return }, }, { - { - "filterKey": "filterVal", - "pageNo": 5, - "recordsOnPage": 2, + name: "max items per request is impossible", + total: 100, + inputProds: []payloadMock{ + {ID: 5}, + }, + expectedProdIDs: []int{5}, + listingSettings: ListingSettings{ + MaxRequestsCountPerSecond: 0, + StreamBufferLength: 10, + MaxItemsPerRequest: 10001, + MaxFetchersCount: 10, + }, + expectedBulkFilterInputs: func() (res [][]map[string]interface{}) { + return [][]map[string]interface{}{ + { + { + "filterKey": "filterVal", + "pageNo": 1, + "recordsOnPage": 100, + }, + }, + } }, }, } - assert.ElementsMatch(t, expectedBulkFilterInputs, dp.ReadBulkFilters) + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + dp := &DataProviderMock{ + CountOutputCount: testCase.total, + ProductsToRead: testCase.inputProds, + countLock: sync.Mutex{}, + CountFiltersInput: map[string]interface{}{}, + readLock: sync.Mutex{}, + ReadBulkFilters: [][]map[string]interface{}{}, + } + lister := NewLister( + testCase.listingSettings, + dp, + NullSleeper, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + prodsChan := lister.Get(ctx, map[string]interface{}{"filterKey": "filterVal"}) + + actualProds := collectProdsFromChannel(prodsChan) + + assert.Equal(t, map[string]interface{}{"filterKey": "filterVal", "pageNo": 1, "recordsOnPage": 1}, dp.CountFiltersInput) + assert.Equal(t, ctx, dp.CountContextInput) + assert.Equal(t, ctx, dp.ReadContextInput) + assert.ElementsMatch(t, testCase.expectedBulkFilterInputs(), dp.ReadBulkFilters) + + actualProgressCounts := make([]int, 0, len(actualProds)) + actualProdIDs := make([]int, 0, len(actualProds)) + for _, prod := range actualProds { + assert.NoError(t, prod.Err) + assert.Equal(t, testCase.total, prod.TotalCount) + assert.IsType(t, prod.Payload, payloadMock{}) + actualProdIDs = append(actualProdIDs, prod.Payload.(payloadMock).ID) + } - for _, prod := range actualProds { - assert.NoError(t, prod.Err) - assert.Equal(t, 10, prod.TotalCount) - assert.Equal(t, payloadMock{ID: 1}, prod.Payload) + sort.Ints(actualProgressCounts) + sort.Ints(actualProdIDs) + + assert.Equal(t, testCase.expectedProdIDs, actualProdIDs) + }) } } @@ -150,7 +243,7 @@ func TestReadCountError(t *testing.T) { CountOutputErrorStr: "some count error", } - lister := NewLister(ListingSettings{}, dp, nullSleeper) + lister := NewLister(ListingSettings{}, dp, NullSleeper) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -162,6 +255,55 @@ func TestReadCountError(t *testing.T) { assert.EqualError(t, actualProds[0].Err, "some count error") } +func TestCancelReading(t *testing.T) { + dp := &DataProviderMock{ + CountOutputCount: 10, + ProductsToRead: []payloadMock{ + {ID: 4}, + }, + countLock: sync.Mutex{}, + CountFiltersInput: map[string]interface{}{}, + readLock: sync.Mutex{}, + ReadBulkFilters: [][]map[string]interface{}{}, + } + lister := NewLister( + ListingSettings{ + MaxRequestsCountPerSecond: 0, + StreamBufferLength: 10, + MaxItemsPerRequest: 100, + MaxFetchersCount: 10, + }, + dp, + NullSleeper, + ) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + prodsChan := lister.Get(ctx, map[string]interface{}{"filterKey": "filterVal"}) + + actualProds := collectProdsFromChannel(prodsChan) + assert.Len(t, actualProds, 0) + assert.Len(t, dp.ReadBulkFilters, 0) +} + +func TestReadItemsError(t *testing.T) { + dp := &DataProviderMock{ + CountOutputCount: 1, + ReadErrorStr: "some read items error", + } + + lister := NewLister(ListingSettings{}, dp, NullSleeper) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + prodsChan := lister.Get(ctx, map[string]interface{}{}) + + actualProds := collectProdsFromChannel(prodsChan) + + assert.Len(t, actualProds, 1) + assert.EqualError(t, actualProds[0].Err, "some read items error") +} + func collectProdsFromChannel(prodsChan ItemsStream) []Item { actualProds := make([]Item, 0) doneChan := make(chan struct{}, 1) diff --git a/pkg/api/products/listing_test.go b/pkg/api/products/listing_test.go index 91cbdbb..c67b8d5 100644 --- a/pkg/api/products/listing_test.go +++ b/pkg/api/products/listing_test.go @@ -10,7 +10,9 @@ import ( "github.com/stretchr/testify/assert" "net/http" "net/http/httptest" + "sort" "testing" + "time" ) func extractBulkFiltersFromRequest(r *http.Request) (res map[string]interface{}, err error) { @@ -201,8 +203,8 @@ func TestReadSuccess(t *testing.T) { context.Background(), []map[string]interface{}{ { - "somekey": "smeval", - "pageNo": 1, + "somekey": "smeval", + "pageNo": 1, "recordsOnPage": 2, }, }, @@ -216,7 +218,7 @@ func TestReadSuccess(t *testing.T) { return } - assert.Equal(t, []int{1,2,3,4,5}, actualProdIDs) + assert.Equal(t, []int{1, 2, 3, 4, 5}, actualProdIDs) } func TestReadError(t *testing.T) { @@ -245,3 +247,74 @@ func TestReadError(t *testing.T) { assert.Contains(t, err.Error(), errors.MalformedRequest.String()) } + +func TestReadSuccessIntegration(t *testing.T) { + const totalCount = 11 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + parsedRequest, err := extractBulkFiltersFromRequest(r) + assert.NoError(t, err) + if err != nil { + return + } + + requests := parsedRequest["requests"].([]map[string]interface{}) + assert.Len(t, requests, 1) + + if requests[0]["pageNo"] == float64(1) { + err = sendRequest(w, 0, totalCount, [][]int{{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}}) + } else { + err = sendRequest(w, 0, totalCount, [][]int{{11}}) + } + + assert.NoError(t, err) + if err != nil { + return + } + })) + + baseClient := common.NewClient("somesess", "someclient", "", nil, nil) + baseClient.Url = srv.URL + productsClient := NewClient(baseClient) + productsDataProvider := NewListingDataProvider(productsClient) + + lister := sharedCommon.NewLister( + sharedCommon.ListingSettings{ + MaxRequestsCountPerSecond: 0, + StreamBufferLength: 10, + MaxItemsPerRequest: 10, + MaxFetchersCount: 10, + }, + productsDataProvider, + func(sleepTime time.Duration) {}, + ) + + prodsChan := lister.Get(context.Background(), map[string]interface{}{}) + + actualProdIDs := collectProdIDsFromChannel(prodsChan) + sort.Ints(actualProdIDs) + + assert.Equal(t, []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, actualProdIDs) +} + +func collectProdIDsFromChannel(prodsChan sharedCommon.ItemsStream) []int { + actualProdIDs := make([]int, 0) + doneChan := make(chan struct{}, 1) + go func() { + defer close(doneChan) + for prod := range prodsChan { + actualProdIDs = append(actualProdIDs, prod.Payload.(Product).ProductID) + } + }() + +mainLoop: + for { + select { + case <-doneChan: + break mainLoop + case <-time.After(time.Second * 5): + break mainLoop + } + } + + return actualProdIDs +}