Skip to content

Commit

Permalink
Enable sqs guide (#1419)
Browse files Browse the repository at this point in the history
* update sqs guide

* disable groovy guide

* remove @nonnull annotation in kotlin class

* configuration note

* Change LOGGER to LOG as in all other guides

Change LOGGER to LOG

* Remove unused imports

* Add license

* Whitespace

whitespace

* Kotlinisation

* Remove SQSClient factory from guide

* Add pullout keys for kotlin class

---------

Co-authored-by: Tim Yates <[email protected]>
  • Loading branch information
sdelamo and timyates authored Jan 19, 2024
1 parent 5374358 commit af4dfd9
Show file tree
Hide file tree
Showing 30 changed files with 565 additions and 553 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,25 @@
package example.micronaut

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.jms.annotations.JMSListener
import io.micronaut.jms.annotations.Queue
import io.micronaut.messaging.annotation.MessageBody
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import java.util.concurrent.atomic.AtomicInteger

import static io.micronaut.jms.sqs.configuration.SqsConfiguration.CONNECTION_FACTORY_BEAN_NAME

@CompileStatic
@Slf4j('LOGGER')
@JMSListener(CONNECTION_FACTORY_BEAN_NAME) // <1>
class DemoConsumer {

private static final Logger LOG = LoggerFactory.getLogger(DemoConsumer.class);
private final AtomicInteger messageCount = new AtomicInteger(0);

@Queue(value = "demo_queue", concurrency = "1-3") // <2>
@Queue(value = "demo_queue") // <2>
void receive(@MessageBody String body) { // <3>
LOGGER.info("Message consumed: {}", body);
LOG.info("Message consumed: {}", body);
messageCount.incrementAndGet()
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,53 @@
*/
package example.micronaut

import io.micronaut.core.annotation.NonNull
import io.micronaut.http.HttpRequest
import io.micronaut.http.HttpResponse
import io.micronaut.http.client.HttpClient
import io.micronaut.http.client.annotation.Client
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import io.micronaut.test.support.TestPropertyProvider
import jakarta.inject.Inject
import org.testcontainers.containers.localstack.LocalStackContainer
import org.testcontainers.utility.DockerImageName
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

@MicronautTest
class MicronautguideSpec extends Specification implements TestPropertyProvider {
@MicronautTest // <1>
class MicronautguideSpec extends Specification implements TestPropertyProvider { // <3>
private static DockerImageName localstackImage = DockerImageName.parse("localstack/localstack:latest")
@AutoCleanup
@Shared
private static LocalStackContainer localstack = new LocalStackContainer(localstackImage)
.withServices(LocalStackContainer.Service.SQS)

@Override
@NonNull
Map<String, String> getProperties() {
if (!localstack.isRunning()) {
localstack.start()
}
Map.of("aws.access-key-id", localstack.accessKey,
"aws.secret-key", localstack.secretKey,
"aws.region", localstack.region,
"aws.services.sqs.endpoint-override", localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())
}

@Inject
@Client("/")
HttpClient client
HttpClient httpClient

@Inject
DemoConsumer demoConsumer

void 'test it works'() {
when:
int messageCount = demoConsumer.getMessageCount()

then:
messageCount == 0

void "verify jms consumes message"() {
when:
HttpResponse<?> response = client.toBlocking().exchange(HttpRequest.POST('/demo', [:]))

httpClient.toBlocking().exchange(HttpRequest.POST("/demo", Collections.emptyMap()))
then:
new PollingConditions(initialDelay: 3, timeout: 10).eventually {
demoConsumer.getMessageCount() == 1
new PollingConditions().eventually {
assert 1 == demoConsumer.messageCount
}
}

def cleanupSpec() {
LocalStack.close()
}

@Override
Map<String, String> getProperties() {
LocalStack.getProperties()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2017-2024 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.micronaut

import io.micronaut.context.event.BeanCreatedEvent
import io.micronaut.context.event.BeanCreatedEventListener
import io.micronaut.core.annotation.NonNull
import jakarta.inject.Singleton
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsClientBuilder

@Singleton // <1>
class SqsClientBuilderListener implements BeanCreatedEventListener<SqsClientBuilder> { // <2>

private final SqsConfig sqsConfig

SqsClientBuilderListener(SqsConfig sqsConfig) { // <3>
this.sqsConfig = sqsConfig
}

@Override
SqsClientBuilder onCreated(@NonNull BeanCreatedEvent<SqsClientBuilder> event) {
SqsClientBuilder builder = event.bean
try {
return builder
.endpointOverride(new URI(sqsConfig.sqs.endpointOverride))
.credentialsProvider(
StaticCredentialsProvider.create(
AwsBasicCredentials.create(sqsConfig.accessKeyId, sqsConfig.secretKey)
)
)
.region(Region.of(sqsConfig.region))
} catch (URISyntaxException e) {
throw new RuntimeException(e)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2017-2024 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.micronaut

import io.micronaut.context.event.BeanCreatedEvent
import io.micronaut.context.event.BeanCreatedEventListener
import jakarta.inject.Singleton
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest

@Singleton
public class SqsClientCreatedEventListener implements BeanCreatedEventListener<SqsClient> {
private static final String QUEUE_NAME = "demo_queue"
@Override
SqsClient onCreated(BeanCreatedEvent<SqsClient> event) {
SqsClient client = event.getBean()
if (client.listQueues().queueUrls().stream().noneMatch(it -> it.contains(QUEUE_NAME))) {
client.createQueue(
CreateQueueRequest.builder()
.queueName(QUEUE_NAME)
.build()
)
}
client
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package example.micronaut

import io.micronaut.context.annotation.ConfigurationBuilder
import io.micronaut.context.annotation.ConfigurationProperties

@ConfigurationProperties("aws") // <1>
class SqsConfig {
String accessKeyId
String secretKey
String region

@ConfigurationBuilder(configurationPrefix = "services.sqs")
final Sqs sqs = new Sqs()

static class Sqs {
String endpointOverride
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@

@JMSListener(CONNECTION_FACTORY_BEAN_NAME) // <1>
public class DemoConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(DemoConsumer.class);

private static final Logger LOG = LoggerFactory.getLogger(DemoConsumer.class);

private final AtomicInteger messageCount = new AtomicInteger(0);

@Queue(value = "demo_queue", concurrency = "1-3") // <2>
@Queue(value = "demo_queue") // <2>
public void receive(@MessageBody String body) { // <3>
LOGGER.info("Message has been consumed. Message body: {}", body);
LOG.info("Message has been consumed. Message body: {}", body);
messageCount.incrementAndGet();
}

Expand Down

This file was deleted.

Loading

0 comments on commit af4dfd9

Please sign in to comment.