Skip to content

Commit

Permalink
Merge pull request #12 from halprin/stream-scan-request
Browse files Browse the repository at this point in the history
Stream Scan Request
  • Loading branch information
halprin authored Mar 23, 2021
2 parents f3ebbd2 + 1445047 commit bcd521f
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 38 deletions.
11 changes: 1 addition & 10 deletions dynamo/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
var maxItemsPerBatchRequest = 25
var tableKeys []*dynamodb.KeySchemaElement

func deleteItems(dynamoItems []map[string]*dynamodb.AttributeValue, tableName string) error {
func deleteItems(dynamoItems []map[string]*dynamodb.AttributeValue, tableName string, goroutinePool *parallel.Pool) error {

var err error
tableKeys, err = getTableKeys(tableName)
Expand All @@ -22,15 +22,6 @@ func deleteItems(dynamoItems []map[string]*dynamodb.AttributeValue, tableName st

dynamoItemsChunks := chunkItems(dynamoItems)

concurrency, err := determineConcurrency(tableName)
if err != nil {
log.Println("Unable determine the concurrency")
return err
}

goroutinePool := parallel.NewPool(concurrency, len(dynamoItemsChunks))
defer goroutinePool.Release()

var errorChannels []chan error

for _, currentItemsChunk := range dynamoItemsChunks {
Expand Down
18 changes: 15 additions & 3 deletions dynamo/dynamo.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/halprin/delete-dynamodb-items/config"
"github.com/halprin/delete-dynamodb-items/parallel"
"log"
)

Expand All @@ -25,13 +26,24 @@ func DeleteAllItemsInTable() error {

tableName := *config.GetTableName()

items, err := getItems(tableName)
concurrency, err := determineConcurrency(tableName)
if err != nil {
log.Println("Unable determine the concurrency")
return err
}

err = deleteItems(items, tableName)
return err
// 1024 * 1024 / 25 = 41,943.04 ~= 41,944
goroutinePool := parallel.NewPool(concurrency, 41944)
defer goroutinePool.Release()

for subItemList := range getItemsGoroutine(tableName) {
err = deleteItems(subItemList, tableName, goroutinePool)
if err != nil {
return err
}
}

return nil
}

func describeTable(tableName string) (*dynamodb.DescribeTableOutput, error) {
Expand Down
47 changes: 24 additions & 23 deletions dynamo/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,35 @@ import (
"log"
)

func getItems(tableName string) ([]map[string]*dynamodb.AttributeValue, error) {
func getItemsGoroutine(tableName string) chan []map[string]*dynamodb.AttributeValue {
yield := make(chan []map[string]*dynamodb.AttributeValue)

var scannedItems []map[string]*dynamodb.AttributeValue

scanInput := &dynamodb.ScanInput{
TableName: aws.String(tableName),
}
go func() {
scanInput := &dynamodb.ScanInput{
TableName: aws.String(tableName),
}

for {
log.Println("Scanning items")
for {
log.Println("Scanning items")

scanOutput, err := dynamoService.Scan(scanInput)
if err != nil {
log.Println("Failed to scan the items")
return nil, err
}
scanOutput, err := dynamoService.Scan(scanInput)
if err != nil {
log.Println("Failed to scan the items")
break
}

scannedItems = append(scannedItems, scanOutput.Items...)
yield <- scanOutput.Items

if scanOutput.LastEvaluatedKey != nil && len(scanOutput.LastEvaluatedKey) > 0 {
//there are still items to scan, the the key to start scanning from again
scanInput.ExclusiveStartKey = scanOutput.LastEvaluatedKey
} else {
//no more items to scan, break out
break
if scanOutput.LastEvaluatedKey != nil && len(scanOutput.LastEvaluatedKey) > 0 {
//there are still items to scan, the the key to start scanning from again
scanInput.ExclusiveStartKey = scanOutput.LastEvaluatedKey
} else {
//no more items to scan, break out
break
}
}
}
close(yield)
}()

return scannedItems, nil
return yield
}

3 changes: 2 additions & 1 deletion generate_mass_data.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ aws dynamodb create-table --table-name "${table_name}" --attribute-definitions A
items_preamble="{\"${table_name}\": ["
items_middle=""
items_ending=']}'
lorem_ipsum='Lorem ipsum dolor sit amet, consectetur adipiscing elit. Donec a efficitur nunc. Morbi fermentum sem metus, vel venenatis leo porttitor quis. Etiam maximus neque a pharetra viverra. Sed turpis lacus, blandit ac tortor elementum, scelerisque feugiat risus. Nam malesuada augue et purus aliquet, et semper dolor cursus. Suspendisse volutpat dolor nec efficitur rutrum. Aliquam leo libero, posuere eget vulputate in, luctus nec nibh. Donec eu tellus eu libero scelerisque molestie. Ut sed pretium nibh. Donec suscipit eget dui quis lacinia. Aliquam non pulvinar massa, nec blandit lectus. Cras sollicitudin rhoncus ex. Nunc ipsum dui, dictum in risus nec, convallis rutrum justo. In tempor dui nisl, in fringilla massa vehicula ac. Donec a ipsum luctus, venenatis magna ut, venenatis risus. Vivamus eu dapibus odio. Aenean dapibus urna orci, sed pharetra nunc dapibus ac. Praesent ornare, felis sit amet mattis faucibus, odio arcu laoreet arcu, eu blandit nisi turpis cursus enim.'

for ((index = 1 ; index <= num_items ; index++)); do
current_request="{\"PutRequest\": {\"Item\": {\"id\": {\"S\": \"$(uuidgen)\"}}}}"
current_request="{\"PutRequest\": {\"Item\": {\"id\": {\"S\": \"$(uuidgen)\"}, \"text\": {\"S\": \"${lorem_ipsum}\"}}}}"
items_middle="${items_middle}${current_request},"
if [[ $((index % 25)) == 0 ]]; then
items_middle=${items_middle::${#items_middle}-1}
Expand Down
2 changes: 1 addition & 1 deletion parallel/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type Pool struct {
waitGroup sync.WaitGroup
}

//poolSize needs to be bigger than taskQueueSize
//taskQueueSize needs to be bigger than poolSize if you want to saturate the pool
func NewPool(poolSize int, taskQueueSize int) *Pool {
newPool := &Pool{
ingestionPoolChannel: make(chan func(), taskQueueSize),
Expand Down

0 comments on commit bcd521f

Please sign in to comment.