Skip to content

Commit

Permalink
Stream the scan results 1 page at a time so we don't load the entire …
Browse files Browse the repository at this point in the history
…table into memory
  • Loading branch information
halprin committed Mar 23, 2021
1 parent e96c9a4 commit 1445047
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 37 deletions.
11 changes: 1 addition & 10 deletions dynamo/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
var maxItemsPerBatchRequest = 25
var tableKeys []*dynamodb.KeySchemaElement

func deleteItems(dynamoItems []map[string]*dynamodb.AttributeValue, tableName string) error {
func deleteItems(dynamoItems []map[string]*dynamodb.AttributeValue, tableName string, goroutinePool *parallel.Pool) error {

var err error
tableKeys, err = getTableKeys(tableName)
Expand All @@ -22,15 +22,6 @@ func deleteItems(dynamoItems []map[string]*dynamodb.AttributeValue, tableName st

dynamoItemsChunks := chunkItems(dynamoItems)

concurrency, err := determineConcurrency(tableName)
if err != nil {
log.Println("Unable determine the concurrency")
return err
}

goroutinePool := parallel.NewPool(concurrency, len(dynamoItemsChunks))
defer goroutinePool.Release()

var errorChannels []chan error

for _, currentItemsChunk := range dynamoItemsChunks {
Expand Down
18 changes: 15 additions & 3 deletions dynamo/dynamo.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/halprin/delete-dynamodb-items/config"
"github.com/halprin/delete-dynamodb-items/parallel"
"log"
)

Expand All @@ -25,13 +26,24 @@ func DeleteAllItemsInTable() error {

tableName := *config.GetTableName()

items, err := getItems(tableName)
concurrency, err := determineConcurrency(tableName)
if err != nil {
log.Println("Unable determine the concurrency")
return err
}

err = deleteItems(items, tableName)
return err
// 1024 * 1024 / 25 = 41,943.04 ~= 41,944
goroutinePool := parallel.NewPool(concurrency, 41944)
defer goroutinePool.Release()

for subItemList := range getItemsGoroutine(tableName) {
err = deleteItems(subItemList, tableName, goroutinePool)
if err != nil {
return err
}
}

return nil
}

func describeTable(tableName string) (*dynamodb.DescribeTableOutput, error) {
Expand Down
47 changes: 24 additions & 23 deletions dynamo/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,35 @@ import (
"log"
)

func getItems(tableName string) ([]map[string]*dynamodb.AttributeValue, error) {
func getItemsGoroutine(tableName string) chan []map[string]*dynamodb.AttributeValue {
yield := make(chan []map[string]*dynamodb.AttributeValue)

var scannedItems []map[string]*dynamodb.AttributeValue

scanInput := &dynamodb.ScanInput{
TableName: aws.String(tableName),
}
go func() {
scanInput := &dynamodb.ScanInput{
TableName: aws.String(tableName),
}

for {
log.Println("Scanning items")
for {
log.Println("Scanning items")

scanOutput, err := dynamoService.Scan(scanInput)
if err != nil {
log.Println("Failed to scan the items")
return nil, err
}
scanOutput, err := dynamoService.Scan(scanInput)
if err != nil {
log.Println("Failed to scan the items")
break
}

scannedItems = append(scannedItems, scanOutput.Items...)
yield <- scanOutput.Items

if scanOutput.LastEvaluatedKey != nil && len(scanOutput.LastEvaluatedKey) > 0 {
//there are still items to scan, the the key to start scanning from again
scanInput.ExclusiveStartKey = scanOutput.LastEvaluatedKey
} else {
//no more items to scan, break out
break
if scanOutput.LastEvaluatedKey != nil && len(scanOutput.LastEvaluatedKey) > 0 {
//there are still items to scan, the the key to start scanning from again
scanInput.ExclusiveStartKey = scanOutput.LastEvaluatedKey
} else {
//no more items to scan, break out
break
}
}
}
close(yield)
}()

return scannedItems, nil
return yield
}

2 changes: 1 addition & 1 deletion parallel/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type Pool struct {
waitGroup sync.WaitGroup
}

//poolSize needs to be bigger than taskQueueSize
//taskQueueSize needs to be bigger than poolSize if you want to saturate the pool
func NewPool(poolSize int, taskQueueSize int) *Pool {
newPool := &Pool{
ingestionPoolChannel: make(chan func(), taskQueueSize),
Expand Down

0 comments on commit 1445047

Please sign in to comment.