Skip to content

Commit ef82ece

Browse files
dchakrav-githubdiwakar
and
diwakar
authored
Fix for removing delay and handshake mode from the codebase (#283)
* Allow for automated call graph generation to keep contexts unique within StdCallbackContext for replay deduping. Now developers do not need to provide a name when calling services. There is a unique call graph context maintained for each service call made. The context is request aware, so different requests made will have their own independent context for dedupe Dedupe identical requests ```java ProgressEvent<Model, StdCallbackContext> result = initiator.translateToServiceRequest(m -> createRepository) .makeServiceCall((r, c) -> c.injectCredentialsAndInvokeV2( r, c.client()::createRepository)) .success(); ProgressEvent<Model, StdCallbackContext> result_2 = // make same request call initiator.translateToServiceRequest(m -> createRepository) .makeServiceCall((r, c) -> c.injectCredentialsAndInvokeV2( r, c.client()::createRepository)) .success(); assertThat(result).isEqualsTo(result_2); ``` * Prevent ConcurrentModificationExceptions in stabilize calls if they access the map and attempt to modify it * - Remove 60s hard coded callback - Remove handshake mode - Adding wait strategy that can be changed, defaulting to re-schedules with no local waits for Lambda binding - changes unit tests to reflect removal of handshake mode, fixed unit tests for serialization errors Co-authored-by: diwakar <[email protected]>
1 parent 30e11d8 commit ef82ece

File tree

6 files changed

+117
-52
lines changed

6 files changed

+117
-52
lines changed

src/main/java/software/amazon/cloudformation/LambdaWrapper.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import software.amazon.cloudformation.proxy.OperationStatus;
6565
import software.amazon.cloudformation.proxy.ProgressEvent;
6666
import software.amazon.cloudformation.proxy.ResourceHandlerRequest;
67+
import software.amazon.cloudformation.proxy.WaitStrategy;
6768
import software.amazon.cloudformation.resource.ResourceTypeSchema;
6869
import software.amazon.cloudformation.resource.SchemaValidator;
6970
import software.amazon.cloudformation.resource.Serializer;
@@ -302,10 +303,9 @@ public void handleRequest(final InputStream inputStream, final OutputStream outp
302303
// in a non-AWS model)
303304
AmazonWebServicesClientProxy awsClientProxy = null;
304305
if (request.getRequestData().getCallerCredentials() != null) {
305-
awsClientProxy = new AmazonWebServicesClientProxy(callbackContext == null, this.loggerProxy,
306-
request.getRequestData().getCallerCredentials(),
307-
() -> (long) context.getRemainingTimeInMillis(),
308-
DelayFactory.CONSTANT_DEFAULT_DELAY_FACTORY);
306+
awsClientProxy = new AmazonWebServicesClientProxy(this.loggerProxy, request.getRequestData().getCallerCredentials(),
307+
DelayFactory.CONSTANT_DEFAULT_DELAY_FACTORY,
308+
WaitStrategy.scheduleForCallbackStrategy());
309309
}
310310

311311
ProgressEvent<ResourceT, CallbackT> handlerResponse = wrapInvocationAndHandleErrors(awsClientProxy,

src/main/java/software/amazon/cloudformation/proxy/AmazonWebServicesClientProxy.java

Lines changed: 9 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,11 @@
2121
import com.amazonaws.auth.AWSStaticCredentialsProvider;
2222
import com.amazonaws.auth.BasicSessionCredentials;
2323
import com.google.common.base.Preconditions;
24-
import com.google.common.util.concurrent.Uninterruptibles;
2524
import java.time.Duration;
2625
import java.time.Instant;
2726
import java.time.temporal.ChronoUnit;
2827
import java.util.Objects;
2928
import java.util.concurrent.CompletableFuture;
30-
import java.util.concurrent.TimeUnit;
3129
import java.util.function.BiFunction;
3230
import java.util.function.Function;
3331
import java.util.function.Supplier;
@@ -64,10 +62,9 @@ public class AmazonWebServicesClientProxy implements CallChain {
6462

6563
private final AWSCredentialsProvider v1CredentialsProvider;
6664
private final AwsCredentialsProvider v2CredentialsProvider;
67-
private final Supplier<Long> remainingTimeInMillis;
68-
private final boolean inHandshakeMode;
6965
private final LoggerProxy loggerProxy;
7066
private final DelayFactory override;
67+
private final WaitStrategy waitStrategy;
7168

7269
public AmazonWebServicesClientProxy(final LoggerProxy loggerProxy,
7370
final Credentials credentials,
@@ -79,18 +76,14 @@ public AmazonWebServicesClientProxy(final LoggerProxy loggerProxy,
7976
final Credentials credentials,
8077
final Supplier<Long> remainingTimeToExecute,
8178
final DelayFactory override) {
82-
this(false, loggerProxy, credentials, remainingTimeToExecute, override);
79+
this(loggerProxy, credentials, override, WaitStrategy.newLocalLoopAwaitStrategy(remainingTimeToExecute));
8380
}
8481

85-
public AmazonWebServicesClientProxy(final boolean inHandshakeMode,
86-
final LoggerProxy loggerProxy,
82+
public AmazonWebServicesClientProxy(final LoggerProxy loggerProxy,
8783
final Credentials credentials,
88-
final Supplier<Long> remainingTimeToExecute,
89-
final DelayFactory override) {
90-
this.inHandshakeMode = inHandshakeMode;
84+
final DelayFactory override,
85+
final WaitStrategy waitStrategy) {
9186
this.loggerProxy = loggerProxy;
92-
this.remainingTimeInMillis = remainingTimeToExecute;
93-
9487
BasicSessionCredentials basicSessionCredentials = new BasicSessionCredentials(credentials.getAccessKeyId(),
9588
credentials.getSecretAccessKey(),
9689
credentials.getSessionToken());
@@ -100,6 +93,7 @@ public AmazonWebServicesClientProxy(final boolean inHandshakeMode,
10093
credentials.getSecretAccessKey(), credentials.getSessionToken());
10194
this.v2CredentialsProvider = StaticCredentialsProvider.create(awsSessionCredentials);
10295
this.override = Objects.requireNonNull(override);
96+
this.waitStrategy = Objects.requireNonNull(waitStrategy);
10397
}
10498

10599
public <ClientT> ProxyClient<ClientT> newProxy(@Nonnull Supplier<ClientT> client) {
@@ -395,14 +389,6 @@ public ProgressEvent<ModelT, CallbackT> done(Callback<RequestT, ResponseT, Clien
395389
event = exceptionHandler.invoke(req, e, client, model, context);
396390
}
397391

398-
if (event != null && (event.isFailed() || event.isSuccess())) {
399-
return event;
400-
}
401-
402-
if (inHandshakeMode) {
403-
return ProgressEvent.defaultInProgressHandler(context, 60, model);
404-
}
405-
406392
if (event != null) {
407393
return event;
408394
}
@@ -422,15 +408,10 @@ public ProgressEvent<ModelT, CallbackT> done(Callback<RequestT, ResponseT, Clien
422408
return ProgressEvent.failed(model, context, HandlerErrorCode.NotStabilized,
423409
"Exceeded attempts to wait");
424410
}
425-
long remainingTime = getRemainingTimeInMillis();
426-
long localWait = next.toMillis() + 2 * elapsed + 100;
427-
if (remainingTime > localWait) {
428-
loggerProxy.log("Waiting for " + next.getSeconds() + " for call " + callGraph);
429-
Uninterruptibles.sleepUninterruptibly(next.getSeconds(), TimeUnit.SECONDS);
430-
continue;
411+
event = AmazonWebServicesClientProxy.this.waitStrategy.await(elapsed, next, context, model);
412+
if (event != null) {
413+
return event;
431414
}
432-
return ProgressEvent.defaultInProgressHandler(context, Math.max((int) next.getSeconds(), 60),
433-
model);
434415
}
435416
} finally {
436417
//
@@ -455,10 +436,6 @@ public ProgressEvent<ModelT, CallbackT> done(Function<ResponseT, ProgressEvent<M
455436

456437
}
457438

458-
public final long getRemainingTimeInMillis() {
459-
return remainingTimeInMillis.get();
460-
}
461-
462439
public <RequestT extends AmazonWebServiceRequest, ResultT extends AmazonWebServiceResult<ResponseMetadata>>
463440
ResultT
464441
injectCredentialsAndInvoke(final RequestT request, final Function<RequestT, ResultT> requestFunction) {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
package software.amazon.cloudformation.proxy;
16+
17+
import com.google.common.util.concurrent.Uninterruptibles;
18+
import java.time.Duration;
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.function.Supplier;
21+
22+
public interface WaitStrategy {
23+
<ModelT, CallbackT>
24+
ProgressEvent<ModelT, CallbackT>
25+
await(long operationElapsedTime, Duration nextAttempt, CallbackT context, ModelT model);
26+
27+
static WaitStrategy newLocalLoopAwaitStrategy(final Supplier<Long> remainingTimeToExecute) {
28+
return new WaitStrategy() {
29+
@Override
30+
public <ModelT, CallbackT>
31+
ProgressEvent<ModelT, CallbackT>
32+
await(long operationElapsedTime, Duration next, CallbackT context, ModelT model) {
33+
long remainingTime = remainingTimeToExecute.get();
34+
long localWait = next.toMillis() + 2 * operationElapsedTime + 100;
35+
if (remainingTime > localWait) {
36+
Uninterruptibles.sleepUninterruptibly(next.getSeconds(), TimeUnit.SECONDS);
37+
return null;
38+
}
39+
return ProgressEvent.defaultInProgressHandler(context, (int) next.getSeconds(), model);
40+
}
41+
};
42+
}
43+
44+
static WaitStrategy scheduleForCallbackStrategy() {
45+
return new WaitStrategy() {
46+
@Override
47+
public <ModelT, CallbackT>
48+
ProgressEvent<ModelT, CallbackT>
49+
await(long operationElapsedTime, Duration next, CallbackT context, ModelT model) {
50+
return ProgressEvent.defaultInProgressHandler(context, (int) next.getSeconds(), model);
51+
}
52+
};
53+
}
54+
}

src/test/java/software/amazon/cloudformation/proxy/End2EndCallChainTest.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ public AwsErrorDetails awsErrorDetails() {
333333
@Order(30)
334334
@Test
335335
public void createHandlerThrottleException() throws Exception {
336-
final HandlerRequest<Model, StdCallbackContext> request = prepareRequest(Model.builder().repoName("repository").build());
336+
HandlerRequest<Model, StdCallbackContext> request = prepareRequest(Model.builder().repoName("repository").build());
337337
request.setAction(Action.CREATE);
338338
final Serializer serializer = new Serializer();
339339
final InputStream stream = prepareStream(serializer, request);
@@ -364,25 +364,27 @@ public AwsErrorDetails awsErrorDetails() {
364364

365365
};
366366
when(client.describeRepository(eq(describeRequest))).thenThrow(throttleException);
367+
when(client.createRepository(any())).thenReturn(mock(CreateResponse.class));
367368

368369
final SdkHttpClient httpClient = mock(SdkHttpClient.class);
369370
final ServiceHandlerWrapper wrapper = new ServiceHandlerWrapper(providerLoggingCredentialsProvider,
370371
mock(CloudWatchLogPublisher.class),
371372
mock(LogPublisher.class), mock(MetricsPublisher.class),
372373
new Validator(), serializer, client, httpClient);
373374

374-
// Bail early for the handshake. Reinvoke handler again
375-
wrapper.handleRequest(stream, output, cxt);
376-
ProgressEvent<Model, StdCallbackContext> event = serializer.deserialize(output.toString("UTF8"),
377-
new TypeReference<ProgressEvent<Model, StdCallbackContext>>() {
378-
});
379-
request.setCallbackContext(event.getCallbackContext());
380-
output = new ByteArrayOutputStream(2048);
381-
wrapper.handleRequest(prepareStream(serializer, request), output, cxt);
382-
383-
// Handshake mode 1 try, Throttle retries 4 times (1, 0s), (2, 3s), (3, 6s), (4,
384-
// 9s)
385-
verify(client, times(5)).describeRepository(eq(describeRequest));
375+
ProgressEvent<Model, StdCallbackContext> progress;
376+
do {
377+
output = new ByteArrayOutputStream(2048);
378+
wrapper.handleRequest(prepareStream(serializer, request), output, cxt);
379+
progress = serializer.deserialize(output.toString(StandardCharsets.UTF_8.name()),
380+
new TypeReference<ProgressEvent<Model, StdCallbackContext>>() {
381+
});
382+
request = prepareRequest(progress.getResourceModel());
383+
request.setCallbackContext(progress.getCallbackContext());
384+
} while (progress.isInProgressCallbackDelay());
385+
386+
// Throttle retries 4 times (1, 0s), (2, 3s), (3, 6s), (4, 9s)
387+
verify(client, times(4)).describeRepository(eq(describeRequest));
386388

387389
ProgressEvent<Model, StdCallbackContext> response = serializer.deserialize(output.toString(StandardCharsets.UTF_8.name()),
388390
new TypeReference<ProgressEvent<Model, StdCallbackContext>>() {

src/test/java/software/amazon/cloudformation/proxy/service/CreateRequest.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,34 @@
1414
*/
1515
package software.amazon.cloudformation.proxy.service;
1616

17+
import java.util.Arrays;
1718
import java.util.Collections;
1819
import java.util.List;
1920
import software.amazon.awssdk.awscore.AwsRequest;
2021
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
2122
import software.amazon.awssdk.core.SdkField;
2223
import software.amazon.awssdk.core.SdkPojo;
24+
import software.amazon.awssdk.core.protocol.MarshallLocation;
25+
import software.amazon.awssdk.core.protocol.MarshallingType;
26+
import software.amazon.awssdk.core.traits.LocationTrait;
27+
import software.amazon.awssdk.utils.builder.SdkBuilder;
2328

2429
@lombok.Getter
2530
@lombok.EqualsAndHashCode(callSuper = false)
2631
@lombok.ToString(callSuper = true)
2732
public class CreateRequest extends AwsRequest {
2833

34+
private static final SdkField<String> REPO_NAME_FIELD = SdkField.<String>builder(MarshallingType.STRING)
35+
.getter((obj) -> ((CreateRequest) obj).repoName).setter((obj, val) -> ((Builder) obj).repoName(val))
36+
.traits(LocationTrait.builder().location(MarshallLocation.PAYLOAD).locationName("repoName").build()).build();
37+
38+
private static final SdkField<String> USER_NAME_FIELD = SdkField.<String>builder(MarshallingType.STRING)
39+
.getter((obj) -> ((CreateRequest) obj).getUserName()).setter((obj, val) -> ((Builder) obj).repoName(val))
40+
.traits(LocationTrait.builder().location(MarshallLocation.PAYLOAD).locationName("userName").build()).build();
41+
42+
private static final List<
43+
SdkField<?>> SDK_FIELDS = Collections.unmodifiableList(Arrays.asList(REPO_NAME_FIELD, USER_NAME_FIELD));
44+
2945
private final String repoName;
3046
private final String userName;
3147

@@ -49,7 +65,7 @@ public List<SdkField<?>> sdkFields() {
4965
@lombok.Getter
5066
@lombok.EqualsAndHashCode(callSuper = true)
5167
@lombok.ToString(callSuper = true)
52-
public static class Builder extends BuilderImpl implements SdkPojo {
68+
public static class Builder extends BuilderImpl implements SdkPojo, SdkBuilder<Builder, CreateRequest> {
5369
private String repoName;
5470
private String userName;
5571

@@ -76,7 +92,7 @@ public Builder overrideConfiguration(AwsRequestOverrideConfiguration awsRequestO
7692

7793
@Override
7894
public List<SdkField<?>> sdkFields() {
79-
return Collections.emptyList();
95+
return SDK_FIELDS;
8096
}
8197
}
8298

src/test/java/software/amazon/cloudformation/proxy/service/CreateResponse.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,32 @@
1414
*/
1515
package software.amazon.cloudformation.proxy.service;
1616

17+
import java.util.Arrays;
1718
import java.util.Collections;
1819
import java.util.List;
1920
import software.amazon.awssdk.awscore.AwsResponse;
2021
import software.amazon.awssdk.core.SdkField;
2122
import software.amazon.awssdk.core.SdkPojo;
23+
import software.amazon.awssdk.core.protocol.MarshallLocation;
24+
import software.amazon.awssdk.core.protocol.MarshallingType;
25+
import software.amazon.awssdk.core.traits.LocationTrait;
26+
import software.amazon.awssdk.utils.builder.SdkBuilder;
2227

2328
@lombok.Getter
2429
@lombok.EqualsAndHashCode(callSuper = true)
2530
@lombok.ToString
2631
public class CreateResponse extends AwsResponse {
32+
33+
private static final SdkField<String> REPO_NAME_FIELD = SdkField.<String>builder(MarshallingType.STRING)
34+
.getter((obj) -> ((CreateResponse) obj).getRepoName()).setter((obj, val) -> ((CreateResponse.Builder) obj).repoName(val))
35+
.traits(LocationTrait.builder().location(MarshallLocation.PAYLOAD).locationName("repoName").build()).build();
36+
37+
private static final SdkField<String> ERROR_FIELD = SdkField.<String>builder(MarshallingType.STRING)
38+
.getter((obj) -> ((CreateResponse) obj).getError()).setter((obj, val) -> ((Builder) obj).error(val))
39+
.traits(LocationTrait.builder().location(MarshallLocation.PAYLOAD).locationName("userName").build()).build();
40+
41+
private static final List<SdkField<?>> SDK_FIELDS = Collections.unmodifiableList(Arrays.asList(REPO_NAME_FIELD, ERROR_FIELD));
42+
2743
private final String repoName;
2844
private final String error;
2945

@@ -43,7 +59,7 @@ public List<SdkField<?>> sdkFields() {
4359
return Collections.emptyList();
4460
}
4561

46-
public static class Builder extends BuilderImpl implements SdkPojo {
62+
public static class Builder extends BuilderImpl implements SdkPojo, SdkBuilder<Builder, CreateResponse> {
4763
private String repoName;
4864
private String error;
4965

0 commit comments

Comments
 (0)