Skip to content

Commit

Permalink
Merge pull request #8 from halprin/try-conurrency
Browse files Browse the repository at this point in the history
Limit Concurrency
  • Loading branch information
halprin authored Feb 27, 2021
2 parents f3a9844 + fca14ef commit dfc2c64
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 15 deletions.
80 changes: 80 additions & 0 deletions dynamo/concurrency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package dynamo

import (
"github.com/aws/aws-sdk-go/service/dynamodb"
"log"
"math"
"runtime"
)

func determineConcurrency(tableName string) (int, error) {
tableInfo, err := describeTable(tableName)
if err != nil {
return 0, err
}

if isOnDemand(tableInfo) {
concurrency := getOnDemandConcurrency()
log.Printf("Given on demand concurrency, set concurrency to %d\n", concurrency)
return concurrency, nil
}

//the table has provisioned capacity

numberOfItems := getNumberOfItems(tableInfo)
tableSize := getTableSizeInBytes(tableInfo)

roundedUpAverageItemSize := calculateAverageItemSize(tableSize, numberOfItems)
totalBatchSize := float64(maxItemsPerBatchRequest) * roundedUpAverageItemSize

writeCapacityUnits := getWriteCapacityUnits(tableInfo)
rawConcurrency := float64(writeCapacityUnits) / totalBatchSize

concurrency := 1
if rawConcurrency > 1 {
//possible truncation to size of int
concurrency = int(rawConcurrency)
}

log.Printf("Given provisioned write capacity of %d, number of items %d, and table size %f KB, set concurrency to %d\n", writeCapacityUnits, numberOfItems, float64(tableSize) / float64(1024), concurrency)

return concurrency, nil
}

func isOnDemand(describeTable *dynamodb.DescribeTableOutput) bool {
billingModeSummary := describeTable.Table.BillingModeSummary
if billingModeSummary != nil {
return *describeTable.Table.BillingModeSummary.BillingMode == dynamodb.BillingModePayPerRequest
}

return getWriteCapacityUnits(describeTable) == 0
}

func getWriteCapacityUnits(describeTable *dynamodb.DescribeTableOutput) int64 {
return *describeTable.Table.ProvisionedThroughput.WriteCapacityUnits
}

func getNumberOfItems(describeTable *dynamodb.DescribeTableOutput) int64 {
return *describeTable.Table.ItemCount
}

func getTableSizeInBytes(describeTable *dynamodb.DescribeTableOutput) int64 {
return *describeTable.Table.TableSizeBytes
}

func getOnDemandConcurrency() int {
//on demand's concurrency is the number of logical CPUs
return runtime.NumCPU()
}

func calculateAverageItemSize(tableSize int64, numberOfItems int64) float64 {
if tableSize == 0 || numberOfItems == 0 {
//possible for the size or number of items to be 0 since they aren't always updated
return 1.0
}

//truncates some of the digits if tableSize is too big
//1024 to get Kilobytes
averageItemSize := float64(tableSize) / float64(1024) / float64(numberOfItems)
return math.Ceil(averageItemSize)
}
48 changes: 35 additions & 13 deletions dynamo/delete.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package dynamo

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/halprin/delete-dynamodb-items/parallel"
"log"
"math/rand"
"time"
)

Expand All @@ -22,14 +22,33 @@ func deleteItems(dynamoItems []map[string]*dynamodb.AttributeValue, tableName st

dynamoItemsChunks := chunkItems(dynamoItems)

concurrency, err := determineConcurrency(tableName)
if err != nil {
log.Println("Unable determine the concurrency")
return err
}

goroutinePool := parallel.NewPool(concurrency, len(dynamoItemsChunks))
defer goroutinePool.Release()

var errorChannels []chan error

for _, currentItemsChunk := range dynamoItemsChunks {
errorChannel := make(chan error)

errorChannel := make(chan error, 1)
errorChannels = append(errorChannels, errorChannel)
go deleteChunkGoroutine(currentItemsChunk, tableName, errorChannel)

//wrapping in a function to make a copy of the currentItemsChunk and errorChannel arguments that are passed in,
//else all executions try to delete the same chunk of items
func(currentItemsChunk []map[string]*dynamodb.AttributeValue, errorChannel chan error) {
goroutinePool.Submit(func() {
deleteChunkGoroutine(currentItemsChunk, tableName, errorChannel)
})
}(currentItemsChunk, errorChannel)
}

log.Println("Waiting for all deletion goroutines to complete")

for errorFromGoroutine := range parallel.MergeErrorChannels(errorChannels) {
if errorFromGoroutine != nil {
log.Println("One of the delete goroutines failed")
Expand Down Expand Up @@ -62,16 +81,10 @@ func deleteChunk(currentItemsChunk []map[string]*dynamodb.AttributeValue, tableN
}

func getTableKeys(tableName string) ([]*dynamodb.KeySchemaElement, error) {
describeTableInput := &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
}

tableInfo, err := dynamoService.DescribeTable(describeTableInput)
tableInfo, err := describeTable(tableName)
if err != nil {
log.Println("Unable to describe the the table")
return nil, err
}

return tableInfo.Table.KeySchema, nil
}

Expand Down Expand Up @@ -124,7 +137,15 @@ func convertItemToKey(item map[string]*dynamodb.AttributeValue) map[string]*dyna
}

func incrementallyBatchDelete(requestItems map[string][]*dynamodb.WriteRequest) error {
millisecondsToWait := 20
//used to induce jitter
randomGenerator := rand.New(rand.NewSource(time.Now().UnixNano()))

baseMillisecondsToWait := 20
maxMillisecondsToWait := 40
millisecondsToWait := randomGenerator.Intn(maxMillisecondsToWait)

//start of waiting so all the goroutines don't call batch delete at the same time
time.Sleep(time.Duration(millisecondsToWait) * time.Millisecond)

for {
batchWriteItemInput := &dynamodb.BatchWriteItemInput{
Expand All @@ -149,9 +170,10 @@ func incrementallyBatchDelete(requestItems map[string][]*dynamodb.WriteRequest)
break
}

//do an exponential back-off
//do an exponential back-off with jitter
time.Sleep(time.Duration(millisecondsToWait) * time.Millisecond)
millisecondsToWait *= 2
maxMillisecondsToWait *= 2
millisecondsToWait = baseMillisecondsToWait + randomGenerator.Intn(maxMillisecondsToWait)
}

return nil
Expand Down
15 changes: 15 additions & 0 deletions dynamo/dynamo.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dynamo

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"log"
Expand All @@ -23,3 +24,17 @@ func DeleteAllItemsInTable(tableName string) error {
err = deleteItems(items, tableName)
return err
}

func describeTable(tableName string) (*dynamodb.DescribeTableOutput, error) {
describeTableInput := &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
}

tableInfo, err := dynamoService.DescribeTable(describeTableInput)
if err != nil {
log.Println("Unable to describe the the table")
return nil, err
}

return tableInfo, nil
}
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/aws/aws-sdk-go v1.37.15 h1:W7l7gLLMcYRlg6a+uvf3Zz4jYwdqYzhe5ymqwWoOhp4=
github.com/aws/aws-sdk-go v1.37.15/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go v1.37.17 h1:Ga33kM38f58l7X+Z2B6JNdz9dFqxjR8AXHBbK3bXYc0=
github.com/aws/aws-sdk-go v1.37.17/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
Expand Down
47 changes: 47 additions & 0 deletions parallel/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package parallel

import (
"sync"
)

type Pool struct {
ingestionPoolChannel chan func()
executionPoolChannel chan func()
waitGroup sync.WaitGroup
}

//poolSize needs to be bigger than taskQueueSize
func NewPool(poolSize int, taskQueueSize int) *Pool {
newPool := &Pool{
ingestionPoolChannel: make(chan func(), taskQueueSize),
executionPoolChannel: make(chan func(), poolSize),
}

go newPool.submitIngestionGoroutine()
for workerIndex := 0; workerIndex < poolSize; workerIndex++ {
go newPool.submitExecutionGoroutine()
}

return newPool
}

func (pool *Pool) Submit(task func()) {
pool.ingestionPoolChannel <- task
}

func (pool *Pool) Release() {
close(pool.ingestionPoolChannel)
close(pool.executionPoolChannel)
}

func (pool *Pool) submitIngestionGoroutine() {
for submittedTask := range pool.ingestionPoolChannel {
pool.executionPoolChannel <- submittedTask
}
}

func (pool *Pool) submitExecutionGoroutine() {
for submittedTask := range pool.executionPoolChannel {
submittedTask()
}
}

0 comments on commit dfc2c64

Please sign in to comment.