Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor RequestBatchBuffer to seperate out flush policy and batch storage[DRAFT] #5926

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public void setUp() {

@AfterEach
public void tearDown() {
purgeQueue(defaultQueueUrl);
batchManager.close();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.sqs;

import org.junit.Ignore;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

/**
* Tests SQS message sending with improved memory management.
*/
@Ignore
public class SqsSendMessageApp {
private static final String QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/248213382692/myTestQueue0";
private static final int MESSAGE_SIZE = 12_000;
private static final int MESSAGE_COUNT = 270000;
private static final int DELAY_MS = 30;
private static final int BATCH_SIZE = 1000;

@Test
void testBatchSize() throws Exception {
// Create SQS client and batch manager
SqsAsyncClient sqsAsyncClient = SqsAsyncClient.builder().build();
SqsAsyncBatchManager batchManager = sqsAsyncClient.batchManager();

// Create message template
String messageBody = createLargeString('a', MESSAGE_SIZE);
SendMessageRequest messageTemplate = SendMessageRequest.builder()
.queueUrl(QUEUE_URL)
.messageBody(messageBody)
.build();


while (true) {
batchManager.sendMessage(messageTemplate).whenComplete((response, error) -> {
if (error != null) {
System.err.println("Error sending message: " + error.getMessage());
} else {
System.out.println("Message sent successfully: " + response.messageId());
}
});

Thread.sleep(DELAY_MS);
}
}

/**
* Creates a string of specified length filled with the given character.
*/
private String createLargeString(char ch, int length) {
char[] chars = new char[length];
java.util.Arrays.fill(chars, ch);
return new String(chars);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.sqs;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.junit.Ignore;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;

/**
* Tests SQS message sending with size monitoring.
*/
@Ignore
public class SqsSendMessageOld {

String QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/248213382692/myTestQueue0";
private static final int MESSAGE_SIZE = 12_000;
private static final int MESSAGE_COUNT = 270000;
private static final int DELAY_MS = 30;

@Test
void testBatchSize() throws Exception {
ExecutionInterceptor captureMessageSizeInterceptor = new CaptureMessageSizeInterceptor();

SqsAsyncClient sqsAsyncClient = SqsAsyncClient.builder()
// .overrideConfiguration(o -> o.addExecutionInterceptor(captureMessageSizeInterceptor))
.build();

String messageBody = createLargeString('a', MESSAGE_SIZE);
SqsAsyncBatchManager sqsAsyncBatchManager = sqsAsyncClient.batchManager();

SendMessageRequest sendMessageRequest = SendMessageRequest.builder()
.queueUrl(QUEUE_URL)
.messageBody(messageBody)
.delaySeconds(20)
.build();

List<CompletableFuture<SendMessageResponse>> futures = sendMessages(
sqsAsyncBatchManager, sendMessageRequest, MESSAGE_COUNT);

CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();

System.out.println("All messages sent successfully");
}

/**
* Sends multiple messages with a delay between each.
*
* @param batchManager The batch manager to use
* @param messageRequest The message request template
* @param count Number of messages to send
* @return List of futures for the send operations
* @throws InterruptedException If thread is interrupted during sleep
*/
private List<CompletableFuture<SendMessageResponse>> sendMessages(
SqsAsyncBatchManager batchManager,
SendMessageRequest messageRequest,
int count) throws InterruptedException {

List<CompletableFuture<SendMessageResponse>> futures = new ArrayList<>();

for (int i = 0; i < count; i++) {
CompletableFuture<SendMessageResponse> future = batchManager.sendMessage(messageRequest)
.whenComplete((response, error) -> {
if (error != null) {
error.printStackTrace();
} else {
System.out.println("Message sent with ID: " + response.messageId());
}
});

futures.add(future);

if (i < count - 1) {
Thread.sleep(DELAY_MS);
}
}

return futures;
}

/**
* Creates a string of specified length filled with the given character.
*
* @param ch Character to fill the string with
* @param length Length of the string to create
* @return The generated string
*/
private String createLargeString(char ch, int length) {
StringBuilder sb = new StringBuilder(length);
for (int i = 0; i < length; i++) {
sb.append(ch);
}
return sb.toString();
}

/**
* Interceptor that captures and logs message sizes in batch requests.
*/
static class CaptureMessageSizeInterceptor implements ExecutionInterceptor {
@Override
public void beforeTransmission(Context.BeforeTransmission context, ExecutionAttributes executionAttributes) {
if (context.request() instanceof SendMessageBatchRequest) {
SendMessageBatchRequest batchRequest = (SendMessageBatchRequest) context.request();

System.out.println("Batch contains " + batchRequest.entries().size() + " messages");

int totalMessageBodySize = 0;
for (SendMessageBatchRequestEntry entry : batchRequest.entries()) {
int messageSize = entry.messageBody().length();
totalMessageBodySize += messageSize;
System.out.println("Message body size: " + messageSize + " bytes");
}

System.out.println("Total message bodies size: " + totalMessageBodySize + " bytes");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import software.amazon.awssdk.annotations.SdkInternalApi;

/**
* Manages the generation of unique IDs for batch entries.
*/
@SdkInternalApi
class BatchEntryIdGenerator {
private int nextId = 0;
private int nextBatchEntry = 0;
private final ReentrantLock idLock = new ReentrantLock();

public String nextId() {
idLock.lock();
try {
if (nextId == Integer.MAX_VALUE) {
nextId = 0;
}
return Integer.toString(nextId++);
} finally {
idLock.unlock();
}
}

public boolean hasNextBatchEntry(Map<String, ?> contextMap) {
return contextMap.containsKey(Integer.toString(nextBatchEntry));
}

public String nextBatchEntry() {
idLock.lock();
try {
if (nextBatchEntry == Integer.MAX_VALUE) {
nextBatchEntry = 0;
}
return Integer.toString(nextBatchEntry++);
} finally {
idLock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ public void put(String batchKey, Supplier<ScheduledFuture<?>> scheduleFlush, Req
if (batchContextMap.size() == maxBatchKeys) {
throw new IllegalStateException("Reached MaxBatchKeys of: " + maxBatchKeys);
}
return new RequestBatchBuffer<>(scheduleFlush.get(), maxBatchSize, maxBatchBytesSize, maxBufferSize);
return RequestBatchBuffer.<RequestT, ResponseT>builder()
.scheduledFlush(scheduleFlush.get())
.maxBatchItems(maxBatchSize)
.maxBatchSizeInBytes(maxBatchBytesSize)
.maxBufferSize(maxBufferSize)
.build();
}).put(request, response);
}

Expand All @@ -68,28 +73,26 @@ public void forEach(BiConsumer<String, RequestBatchBuffer<RequestT, ResponseT>>
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(String batchKey) {
return batchContextMap.get(batchKey).flushableRequests();
return batchContextMap.get(batchKey).extractBatchIfNeeded();
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequestsOnByteLimitBeforeAdd(String batchKey,
RequestT request) {
return batchContextMap.get(batchKey).flushableRequestsOnByteLimitBeforeAdd(request);
return batchContextMap.get(batchKey).getFlushableBatchIfSizeExceeded(request);
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(String batchKey,
int maxBatchItems) {
return batchContextMap.get(batchKey).flushableScheduledRequests(maxBatchItems);
return batchContextMap.get(batchKey).extractEntriesForScheduledFlush(maxBatchItems);
}

public void cancelScheduledFlush(String batchKey) {
batchContextMap.get(batchKey).cancelScheduledFlush();
}

public void clear() {
for (Map.Entry<String, RequestBatchBuffer<RequestT, ResponseT>> entry : batchContextMap.entrySet()) {
String key = entry.getKey();
entry.getValue().clear();
batchContextMap.remove(key);
for (RequestBatchBuffer<RequestT, ResponseT> buffer : batchContextMap.values()) {
buffer.clear();
}
batchContextMap.clear();
}
Expand Down
Loading
Loading