Skip to content

Commit 638d5e6

Browse files
committed
GH-64: Add DynamoDbMetaDataStore implementation
Fixes #64 * Add `DynamoDbRunning` for testing against locally ran DynamoDB * Upgrade to Gradle 4.0.1, SI-4.3.11 * Switch on some Checkstyle rules for tests
1 parent d187c83 commit 638d5e6

File tree

10 files changed

+554
-7
lines changed

10 files changed

+554
-7
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,13 @@ flag to `true` ot switch it to the gateway mode.
497497
By default the `SnsMessageHandler` is one-way `MessageHandler`.
498498
499499
500+
## Metadata Store for Amazon DynamoDB
501+
502+
The `DynamoDbMetaDataStore`, a `ConcurrentMetadataStore` implementation, is provided to keep the metadata for Spring Integration components in the distributed Amazon DynamoDB store.
503+
The implementation is based on a simple table with `KEY` and `VALUE` attributes, both are string types and the `KEY` is primary key of the table.
504+
By default the `SpringIntegrationMetadataStore` table is used and it is created during `DynamoDbMetaDataStore` initialization if that doesn't exist yet.
505+
The `DynamoDbMetaDataStore` can be used for the `KinesisMessageDrivenChannelAdapter` as a cloud-based `cehckpointStore`.
506+
500507
[Spring Cloud AWS]: https://github.com/spring-cloud/spring-cloud-aws
501508
[AWS SDK for Java]: http://aws.amazon.com/sdkforjava/
502509
[Amazon Web Services]: http://aws.amazon.com/

build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ ext {
3535
servletApiVersion = '3.1.0'
3636
slf4jVersion = '1.7.25'
3737
springCloudAwsVersion = '1.2.1.RELEASE'
38-
springIntegrationVersion = '4.3.10.RELEASE'
38+
springIntegrationVersion = '4.3.11.RELEASE'
3939

4040
idPrefix = 'aws'
4141

@@ -108,6 +108,7 @@ dependencies {
108108
compile('org.springframework.integration:spring-integration-http', optional)
109109

110110
compile('com.amazonaws:aws-java-sdk-kinesis', optional)
111+
compile('com.amazonaws:aws-java-sdk-dynamodb', optional)
111112

112113
compile("javax.servlet:javax.servlet-api:$servletApiVersion", provided)
113114

gradle/wrapper/gradle-wrapper.jar

6 Bytes
Binary file not shown.
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
#Mon Jul 10 20:22:20 EDT 2017
1+
#Wed Jul 26 16:30:33 EDT 2017
22
distributionBase=GRADLE_USER_HOME
33
distributionPath=wrapper/dists
44
zipStoreBase=GRADLE_USER_HOME
55
zipStorePath=wrapper/dists
6-
distributionUrl=https\://services.gradle.org/distributions/gradle-4.0-bin.zip
6+
distributionUrl=https\://services.gradle.org/distributions/gradle-4.0.1-bin.zip

src/checkstyle/checkstyle-suppressions.xml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
55
<suppressions>
66
<suppress files="package-info\.java" checks=".*" />
7-
<suppress files="[\\/]test[\\/]" checks="RequireThis" />
8-
<suppress files="[\\/]test[\\/]" checks="FinalClass" />
97
<suppress files="[\\/]test[\\/]" checks="AvoidStaticImport" />
108
<suppress files="[\\/]test[\\/]" checks="InnerTypeLast" />
119
<suppress files="[\\/]test[\\/]" checks="Javadoc*" />

src/main/java/org/springframework/integration/aws/inbound/S3InboundFileSynchronizer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ protected long getModified(S3ObjectSummary file) {
8484
}
8585

8686
@Override
87-
protected void copyFileToLocalDirectory(String remoteDirectoryPath, S3ObjectSummary remoteFile,
87+
protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, S3ObjectSummary remoteFile,
8888
File localDirectory, Session<S3ObjectSummary> session) throws IOException {
89-
super.copyFileToLocalDirectory(((S3Session) session).normalizeBucketName(remoteDirectoryPath),
89+
return super.copyFileToLocalDirectory(((S3Session) session).normalizeBucketName(remoteDirectoryPath),
9090
remoteFile, localDirectory, session);
9191
}
9292

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
/*
2+
* Copyright 2017 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.aws.metadata;
18+
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.TimeUnit;
21+
22+
import org.apache.commons.logging.Log;
23+
import org.apache.commons.logging.LogFactory;
24+
25+
import org.springframework.beans.factory.InitializingBean;
26+
import org.springframework.integration.metadata.ConcurrentMetadataStore;
27+
import org.springframework.util.Assert;
28+
29+
import com.amazonaws.handlers.AsyncHandler;
30+
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
31+
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsync;
32+
import com.amazonaws.services.dynamodbv2.document.AttributeUpdate;
33+
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
34+
import com.amazonaws.services.dynamodbv2.document.Expected;
35+
import com.amazonaws.services.dynamodbv2.document.Item;
36+
import com.amazonaws.services.dynamodbv2.document.Table;
37+
import com.amazonaws.services.dynamodbv2.document.spec.DeleteItemSpec;
38+
import com.amazonaws.services.dynamodbv2.document.spec.UpdateItemSpec;
39+
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
40+
import com.amazonaws.services.dynamodbv2.model.ConditionalCheckFailedException;
41+
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
42+
import com.amazonaws.services.dynamodbv2.model.CreateTableResult;
43+
import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest;
44+
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
45+
import com.amazonaws.services.dynamodbv2.model.KeyType;
46+
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
47+
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
48+
import com.amazonaws.services.dynamodbv2.model.ReturnValue;
49+
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
50+
import com.amazonaws.waiters.FixedDelayStrategy;
51+
import com.amazonaws.waiters.MaxAttemptsRetryStrategy;
52+
import com.amazonaws.waiters.PollingStrategy;
53+
import com.amazonaws.waiters.Waiter;
54+
import com.amazonaws.waiters.WaiterHandler;
55+
import com.amazonaws.waiters.WaiterParameters;
56+
57+
/**
58+
* The {@link ConcurrentMetadataStore} for the {@link AmazonDynamoDB}.
59+
*
60+
* @author Artem Bilan
61+
*
62+
* @since 1.1
63+
*/
64+
public class DynamoDbMetaDataStore implements ConcurrentMetadataStore, InitializingBean {
65+
66+
/**
67+
* The {@value DEFAULT_TABLE_NAME} default name for the metadata table in the DynamoDB.
68+
*/
69+
public static final String DEFAULT_TABLE_NAME = "SpringIntegrationMetadataStore";
70+
71+
private static final Log logger = LogFactory.getLog(DynamoDbMetaDataStore.class);
72+
73+
private static final String KEY = "KEY";
74+
75+
private static final String VALUE = "VALUE";
76+
77+
private final AmazonDynamoDBAsync dynamoDB;
78+
79+
private final Table table;
80+
81+
private final CountDownLatch createTableLatch = new CountDownLatch(1);
82+
83+
private Long readCapacity = 10000L;
84+
85+
private Long writeCapacity = 10000L;
86+
87+
public DynamoDbMetaDataStore(AmazonDynamoDBAsync dynamoDB) {
88+
this(dynamoDB, DEFAULT_TABLE_NAME);
89+
}
90+
91+
public DynamoDbMetaDataStore(AmazonDynamoDBAsync dynamoDB, String tableName) {
92+
Assert.notNull(dynamoDB, "'dynamoDB' must not be null.");
93+
Assert.hasText(tableName, "'tableName' must not be empty.");
94+
this.dynamoDB = dynamoDB;
95+
this.table =
96+
new DynamoDB(this.dynamoDB)
97+
.getTable(tableName);
98+
99+
}
100+
101+
public void setReadCapacity(Long readCapacity) {
102+
this.readCapacity = readCapacity;
103+
}
104+
105+
public void setWriteCapacity(Long writeCapacity) {
106+
this.writeCapacity = writeCapacity;
107+
}
108+
109+
@Override
110+
public void afterPropertiesSet() throws Exception {
111+
try {
112+
this.table.describe();
113+
createTableLatch.countDown();
114+
return;
115+
}
116+
catch (ResourceNotFoundException e) {
117+
if (logger.isInfoEnabled()) {
118+
logger.info("No table '" + this.table.getTableName() + "'. Creating one...");
119+
}
120+
}
121+
122+
CreateTableRequest createTableRequest =
123+
new CreateTableRequest()
124+
.withTableName(this.table.getTableName())
125+
.withKeySchema(new KeySchemaElement(KEY, KeyType.HASH))
126+
.withAttributeDefinitions(new AttributeDefinition(KEY, ScalarAttributeType.S))
127+
.withProvisionedThroughput(new ProvisionedThroughput(this.readCapacity, this.writeCapacity));
128+
129+
130+
this.dynamoDB.createTableAsync(createTableRequest,
131+
new AsyncHandler<CreateTableRequest, CreateTableResult>() {
132+
133+
@Override
134+
public void onError(Exception e) {
135+
logger.error("Cannot create DynamoDb table: " +
136+
DynamoDbMetaDataStore.this.table.getTableName(), e);
137+
DynamoDbMetaDataStore.this.createTableLatch.countDown();
138+
}
139+
140+
@Override
141+
public void onSuccess(CreateTableRequest request, CreateTableResult createTableResult) {
142+
Waiter<DescribeTableRequest> waiter =
143+
DynamoDbMetaDataStore.this.dynamoDB.waiters()
144+
.tableExists();
145+
146+
WaiterParameters<DescribeTableRequest> waiterParameters =
147+
new WaiterParameters<>(
148+
new DescribeTableRequest(DynamoDbMetaDataStore.this.table.getTableName()))
149+
.withPollingStrategy(
150+
new PollingStrategy(new MaxAttemptsRetryStrategy(25),
151+
new FixedDelayStrategy(1)));
152+
153+
waiter.runAsync(waiterParameters, new WaiterHandler<DescribeTableRequest>() {
154+
155+
@Override
156+
public void onWaitSuccess(DescribeTableRequest request) {
157+
DynamoDbMetaDataStore.this.createTableLatch.countDown();
158+
DynamoDbMetaDataStore.this.table.describe();
159+
}
160+
161+
@Override
162+
public void onWaitFailure(Exception e) {
163+
logger.error("Cannot describe DynamoDb table: " +
164+
DynamoDbMetaDataStore.this.table.getTableName(), e);
165+
DynamoDbMetaDataStore.this.createTableLatch.countDown();
166+
}
167+
168+
});
169+
}
170+
171+
});
172+
}
173+
174+
private void awaitForActive() {
175+
try {
176+
this.createTableLatch.await(10, TimeUnit.SECONDS);
177+
}
178+
catch (InterruptedException e) {
179+
180+
}
181+
}
182+
183+
@Override
184+
public void put(String key, String value) {
185+
Assert.hasText(key, "'key' must not be empty.");
186+
Assert.hasText(value, "'value' must not be empty.");
187+
188+
awaitForActive();
189+
190+
this.table.putItem(
191+
new Item()
192+
.withPrimaryKey(KEY, key)
193+
.withString(VALUE, value));
194+
}
195+
196+
@Override
197+
public String get(String key) {
198+
Assert.hasText(key, "'key' must not be empty.");
199+
200+
201+
awaitForActive();
202+
203+
Item item = this.table.getItem(KEY, key);
204+
205+
return getValueIfAny(item);
206+
}
207+
208+
@Override
209+
public String putIfAbsent(String key, String value) {
210+
Assert.hasText(key, "'key' must not be empty.");
211+
Assert.hasText(value, "'value' must not be empty.");
212+
213+
awaitForActive();
214+
215+
try {
216+
this.table.updateItem(
217+
new UpdateItemSpec()
218+
.withPrimaryKey(KEY, key)
219+
.withAttributeUpdate(
220+
new AttributeUpdate(VALUE)
221+
.put(value))
222+
.withExpected(
223+
new Expected(KEY)
224+
.notExist()));
225+
226+
return null;
227+
}
228+
catch (ConditionalCheckFailedException e) {
229+
return get(key);
230+
}
231+
}
232+
233+
@Override
234+
public boolean replace(String key, String oldValue, String newValue) {
235+
Assert.hasText(key, "'key' must not be empty.");
236+
Assert.hasText(oldValue, "'value' must not be empty.");
237+
Assert.hasText(newValue, "'newValue' must not be empty.");
238+
239+
awaitForActive();
240+
241+
try {
242+
return this.table.updateItem(
243+
new UpdateItemSpec()
244+
.withPrimaryKey(KEY, key)
245+
.withAttributeUpdate(
246+
new AttributeUpdate(VALUE)
247+
.put(newValue))
248+
.withExpected(
249+
new Expected(VALUE)
250+
.eq(oldValue))
251+
.withReturnValues(ReturnValue.UPDATED_NEW))
252+
.getItem() != null;
253+
}
254+
catch (ConditionalCheckFailedException e) {
255+
return false;
256+
}
257+
}
258+
259+
@Override
260+
public String remove(String key) {
261+
Assert.hasText(key, "'key' must not be empty.");
262+
263+
awaitForActive();
264+
265+
Item item = this.table.deleteItem(
266+
new DeleteItemSpec()
267+
.withPrimaryKey(KEY, key)
268+
.withReturnValues(ReturnValue.ALL_OLD))
269+
.getItem();
270+
271+
return getValueIfAny(item);
272+
}
273+
274+
private static String getValueIfAny(Item item) {
275+
if (item != null) {
276+
return item.getString(VALUE);
277+
}
278+
else {
279+
return null;
280+
}
281+
}
282+
283+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Provides classes supporting metadata stores.
3+
*/
4+
package org.springframework.integration.aws.metadata;

0 commit comments

Comments
 (0)