Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitMQ tests run on unique queue and exchange names #2443

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private MapBasedConfig dataconfig() {
.with("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.data.queue.durable", true)
.with("mp.messaging.incoming.data.queue.name", "(server.auto)")
.with("mp.messaging.incoming.data.exchange.name", exchange)
.with("mp.messaging.incoming.data.exchange.name", exchangeName)
.with("mp.messaging.incoming.data.exchange.type", "direct")
.with("mp.messaging.incoming.data.max-outstanding-messages", 1)
.with("mp.messaging.incoming.data.concurrency", 3)
Expand All @@ -44,11 +44,11 @@ private MapBasedConfig dataconfig() {

private void produceMessages() {
AtomicInteger counter = new AtomicInteger(0);
usage.produce(exchange, null, "foo", 4, counter::getAndIncrement,
usage.produce(exchangeName, null, "foo", 4, counter::getAndIncrement,
new AMQP.BasicProperties.Builder().contentType("text/plain").headers(Map.of("key", "foo")).build());
usage.produce(exchange, null, "bar", 3, counter::getAndIncrement,
usage.produce(exchangeName, null, "bar", 3, counter::getAndIncrement,
new AMQP.BasicProperties.Builder().contentType("text/plain").headers(Map.of("key", "bar")).build());
usage.produce(exchange, null, "qux", 3, counter::getAndIncrement,
usage.produce(exchangeName, null, "qux", 3, counter::getAndIncrement,
new AMQP.BasicProperties.Builder().contentType("text/plain").headers(Map.of("key", "qux")).build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ public class ConsumerBackPressure extends WeldTestBase {
private MapBasedConfig getBaseConfig() {
return new MapBasedConfig()
.with("mp.messaging.outgoing.to-rabbitmq.connector", RabbitMQConnector.CONNECTOR_NAME)
.with("mp.messaging.outgoing.to-rabbitmq.exchange.name", exchange)
.with("mp.messaging.outgoing.to-rabbitmq.exchange.name", exchangeName)

.with("mp.messaging.incoming.from-rabbitmq.connector", RabbitMQConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.from-rabbitmq.queue.name", queue)
.with("mp.messaging.incoming.from-rabbitmq.queue.name", queueName)
.with("mp.messaging.incoming.from-rabbitmq.queue.durable", true)

.with("mp.messaging.incoming.from-rabbitmq.exchange.name", exchange);
.with("mp.messaging.incoming.from-rabbitmq.exchange.name", exchangeName);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ public class LocalPropagationAckTest extends WeldTestBase {
private MapBasedConfig dataconfig() {
return commonConfig()
.with("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.data.queue.name", queue)
.with("mp.messaging.incoming.data.exchange.name", exchange)
.with("mp.messaging.incoming.data.queue.name", queueName)
.with("mp.messaging.incoming.data.exchange.name", exchangeName)
.with("mp.messaging.incoming.data.exchange.routing-keys", routingKeys)
.with("mp.messaging.incoming.data.tracing.enabled", false);
}

private void produceIntegers() {
AtomicInteger counter = new AtomicInteger(1);
usage.produce(exchange, queue, routingKeys, 5, counter::getAndIncrement);
usage.produce(exchangeName, queueName, routingKeys, 5, counter::getAndIncrement);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ public class LocalPropagationTest extends WeldTestBase {
private MapBasedConfig dataconfig() {
return commonConfig()
.with("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.data.queue.name", queue)
.with("mp.messaging.incoming.data.exchange.name", exchange)
.with("mp.messaging.incoming.data.queue.name", queueName)
.with("mp.messaging.incoming.data.exchange.name", exchangeName)
.with("mp.messaging.incoming.data.exchange.routing-keys", routingKeys)
.with("mp.messaging.incoming.data.tracing.enabled", false);
}

private void produceIntegers() {
AtomicInteger counter = new AtomicInteger(1);
usage.produce(exchange, queue, routingKeys, 5, counter::getAndIncrement);
usage.produce(exchangeName, queueName, routingKeys, 5, counter::getAndIncrement);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,14 @@
import static org.awaitility.Awaitility.await;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.jboss.weld.environment.se.Weld;
import org.jboss.weld.environment.se.WeldContainer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;

import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
import io.vertx.core.json.JsonObject;
Expand All @@ -24,15 +20,6 @@ public class RabbitMQArgumentsCDIConfigTest extends RabbitMQBrokerTestBase {

private WeldContainer container;

String queueName;

@BeforeEach
public void initQueue(TestInfo testInfo) {
String cn = testInfo.getTestClass().map(Class::getSimpleName).orElse(UUID.randomUUID().toString());
String mn = testInfo.getTestMethod().map(Method::getName).orElse(UUID.randomUUID().toString());
queueName = cn + "-" + mn + "-" + UUID.randomUUID().getMostSignificantBits();
}

@Test
public void testConfigByCDIQueueArguments() throws IOException {
Weld weld = new Weld();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.smallrye.reactive.messaging.rabbitmq;

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.UUID;

import jakarta.enterprise.inject.Any;
import jakarta.enterprise.inject.se.SeContainer;
Expand All @@ -11,6 +13,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
Expand Down Expand Up @@ -50,6 +53,9 @@ public class RabbitMQBrokerTestBase {
protected RabbitMQUsage usage;
ExecutionHolder executionHolder;

protected String exchangeName;
protected String queueName;

@BeforeAll
public static void startBroker() {
RABBIT.start();
Expand Down Expand Up @@ -80,6 +86,14 @@ public void setup() {
MapBasedConfig.cleanup();
}

@BeforeEach
public void initQueueExchange(TestInfo testInfo) {
String cn = testInfo.getTestClass().map(Class::getSimpleName).orElse(UUID.randomUUID().toString());
String mn = testInfo.getTestMethod().map(Method::getName).orElse(UUID.randomUUID().toString());
queueName = "queue" + cn + "-" + mn + "-" + UUID.randomUUID().getMostSignificantBits();
exchangeName = "exchange" + cn + "-" + mn + "-" + UUID.randomUUID().getMostSignificantBits();
}

@AfterEach
public void tearDown() {
usage.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ private Proxy createContainerProxy(ToxiproxyContainer toxiproxy, int toxiPort) {

@Test
void testSendingMessagesToRabbitMQ_connection_fails() {
final String exchangeName = "exchg1";
final String routingKey = "normal";

List<Integer> received = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -104,7 +103,6 @@ void testSendingMessagesToRabbitMQ_connection_fails() {

@Test
void testSendingMessagesToRabbitMQ_connection_fails_after_connection() {
final String exchangeName = "exchg1";
final String routingKey = "normal";

List<Integer> received = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -154,8 +152,6 @@ void testSendingMessagesToRabbitMQ_connection_fails_after_connection() {
*/
@Test
void testReceivingMessagesFromRabbitMQ_connection_fails() {
final String exchangeName = "exchg2";
final String queueName = "q2";
final String routingKey = "xyzzy";
try (ToxiproxyContainer toxiproxy = new ToxiproxyContainer(DockerImageName.parse("ghcr.io/shopify/toxiproxy:latest")
.asCompatibleSubstituteFor("shopify/toxiproxy"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public void cleanup() {
@Test
void testOutgoingDeclarations() throws Exception {

final String exchangeName = "exchgOutgoingDeclareTest";
final boolean exchangeDurable = false;
final boolean exchangeAutoDelete = true;
final String exchangeType = "fanout";
Expand Down Expand Up @@ -97,12 +96,10 @@ void testOutgoingDeclarations() throws Exception {
*/
@Test
void testIncomingDeclarations() throws Exception {
final String exchangeName = "exchgIncomingDeclareTest";
final boolean exchangeDurable = false;
final boolean exchangeAutoDelete = true;
final String exchangeType = "fanout";

final String queueName = "qIncomingDeclareTest";
final boolean queueDurable = false;
final boolean queueExclusive = true;
final boolean queueAutoDelete = true;
Expand Down Expand Up @@ -207,12 +204,10 @@ void testIncomingDeclarations() throws Exception {
*/
@Test
void testIncomingDeclarationsWithDLQ() throws Exception {
final String exchangeName = "exchgIncomingDeclareTestWithDLQ";
final boolean exchangeDurable = false;
final boolean exchangeAutoDelete = true;
final String exchangeType = "fanout";

final String queueName = "qIncomingDeclareTestWithDLQ";
final boolean queueDurable = false;
final boolean queueExclusive = true;
final boolean queueAutoDelete = true;
Expand Down Expand Up @@ -361,7 +356,6 @@ void testIncomingDeclarationsWithDLQ() throws Exception {
@Test
void testIncomingDeclarationsWithQuorum() throws Exception {

final String queueName = "qIncomingDeclareTestWithDeliveryLimit";
final boolean queueDurable = true;
final String queueType = "quorum";
final long queueDeliveryLimit = 10;
Expand Down Expand Up @@ -405,7 +399,6 @@ void testIncomingDeclarationsWithQuorum() throws Exception {
*/
@Test
void testSendingMessagesToRabbitMQ() throws InterruptedException {
final String exchangeName = "exchg1";
final String routingKey = "normal";

CountDownLatch latch = new CountDownLatch(10);
Expand Down Expand Up @@ -439,7 +432,6 @@ void testSendingMessagesToRabbitMQ() throws InterruptedException {
*/
@Test
void testSendingMessagesToRabbitMQPublishConfirms() throws InterruptedException {
final String exchangeName = "exchg1";
final String routingKey = "normal";

List<Long> receivedTags = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -503,7 +495,6 @@ public List<Long> getDeliveryTags() {
*/
@Test
void testSendingNullPayloadsToRabbitMQ() throws InterruptedException {
final String exchangeName = "exchg1";
final String routingKey = "normal";

CountDownLatch latch = new CountDownLatch(10);
Expand Down Expand Up @@ -535,8 +526,6 @@ void testSendingNullPayloadsToRabbitMQ() throws InterruptedException {
*/
@Test
void testReceivingMessagesFromRabbitMQ() {
final String exchangeName = "exchg2";
final String queueName = "q2";
final String routingKey = "xyzzy";
new MapBasedConfig()
.put("mp.messaging.incoming.data.exchange.name", exchangeName)
Expand Down Expand Up @@ -576,8 +565,6 @@ void testReceivingMessagesFromRabbitMQ() {
*/
@Test
void testReceivingMessagesFromRabbitMQWithInvalidContentType() {
final String exchangeName = "exchg3";
final String queueName = "q3";
final String routingKey = "xyzzy";
new MapBasedConfig()
.put("mp.messaging.incoming.data.exchange.name", exchangeName)
Expand Down Expand Up @@ -617,8 +604,6 @@ void testReceivingMessagesFromRabbitMQWithInvalidContentType() {
*/
@Test
void testReceivingMessagesFromRabbitMQWithOverriddenContentType() {
final String exchangeName = "exchg4";
final String queueName = "q4";
final String routingKey = "xyzzy";
new MapBasedConfig()
.put("mp.messaging.incoming.data.exchange.name", exchangeName)
Expand Down Expand Up @@ -697,8 +682,6 @@ void testDefaultExchangeName() {
*/
@Test
void testNackWithRejectAndRequeue() {
final String exchangeName = "exchg6";
final String queueName = "q6";
final String dlxName = "dlx6";
final String dlqName = "dlq6";
final String routingKey = "xyzzy";
Expand Down Expand Up @@ -770,8 +753,6 @@ void testNackWithRejectAndRequeue() {
*/
@Test
void testConsumerArguments() {
final String exchangeName = "exchg7";
final String queueName = "q7";
final String routingKey = "xyzzy";
new MapBasedConfig()
.put("mp.messaging.incoming.data.exchange.name", exchangeName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ static void shutdown() {
void incoming() {
IncomingTracing tracing = runApplication(commonConfig()
.with("mp.messaging.incoming.from-rabbitmq.connector", RabbitMQConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.from-rabbitmq.queue.name", queue)
.with("mp.messaging.incoming.from-rabbitmq.exchange.name", exchange)
.with("mp.messaging.incoming.from-rabbitmq.queue.name", queueName)
.with("mp.messaging.incoming.from-rabbitmq.exchange.name", exchangeName)
.with("mp.messaging.incoming.from-rabbitmq.exchange.routing-keys", routingKeys)
.with("mp.messaging.incoming.from-rabbitmq.tracing.enabled", true),
IncomingTracing.class);

AtomicInteger counter = new AtomicInteger(1);
usage.produce(exchange, queue, routingKeys, 5, counter::getAndIncrement,
usage.produce(exchangeName, queueName, routingKeys, 5, counter::getAndIncrement,
new AMQP.BasicProperties().builder().expiration("10000").contentType("text/plain").build());
await().atMost(5, SECONDS).until(() -> tracing.getResults().size() == 5);

Expand All @@ -104,19 +104,19 @@ void incoming() {
assertEquals("rabbitmq", consumer.getAttributes().get(MESSAGING_SYSTEM));
assertEquals("receive", consumer.getAttributes().get(MESSAGING_OPERATION));
assertEquals("normal", consumer.getAttributes().get(MESSAGING_RABBITMQ_ROUTING_KEY));
assertEquals(queue, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertEquals(queueName, consumer.getAttributes().get(MESSAGING_DESTINATION_NAME));
assertNull(consumer.getAttributes().get(MESSAGING_PROTOCOL));
assertNull(consumer.getAttributes().get(MESSAGING_PROTOCOL_VERSION));
assertEquals(queue + " receive", consumer.getName());
assertEquals(queueName + " receive", consumer.getName());
});
}

@Test
void incomingClientPropagate() {
IncomingTracing tracing = runApplication(commonConfig()
.with("mp.messaging.incoming.from-rabbitmq.connector", RabbitMQConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.from-rabbitmq.queue.name", queue)
.with("mp.messaging.incoming.from-rabbitmq.exchange.name", exchange)
.with("mp.messaging.incoming.from-rabbitmq.queue.name", queueName)
.with("mp.messaging.incoming.from-rabbitmq.exchange.name", exchangeName)
.with("mp.messaging.incoming.from-rabbitmq.exchange.routing-keys", routingKeys)
.with("mp.messaging.incoming.from-rabbitmq.tracing.enabled", true),
IncomingTracing.class);
Expand All @@ -137,7 +137,7 @@ void incomingClientPropagate() {
.headers(headers).build();

AtomicInteger counter = new AtomicInteger(1);
usage.produce(exchange, queue, routingKeys, 5, counter::getAndIncrement, properties);
usage.produce(exchangeName, queueName, routingKeys, 5, counter::getAndIncrement, properties);
await().atMost(5, SECONDS).until(() -> tracing.getResults().size() == 5);

CompletableResultCode completableResultCode = tracerProvider.forceFlush();
Expand All @@ -154,13 +154,13 @@ void incomingOutgoing() {

IncomingOutgoingTracing tracing = runApplication(commonConfig()
.with("mp.messaging.outgoing.to-rabbitmq.connector", RabbitMQConnector.CONNECTOR_NAME)
.with("mp.messaging.outgoing.to-rabbitmq.queue.name", queue)
.with("mp.messaging.outgoing.to-rabbitmq.exchange.name", exchange)
.with("mp.messaging.outgoing.to-rabbitmq.queue.name", queueName)
.with("mp.messaging.outgoing.to-rabbitmq.exchange.name", exchangeName)
.with("mp.messaging.outgoing.to-rabbitmq.exchange.routing-keys", routingKeys)
.with("mp.messaging.outgoing.to-rabbitmq.tracing.enabled", true)
.with("mp.messaging.incoming.from-rabbitmq.connector", RabbitMQConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.from-rabbitmq.queue.name", queue)
.with("mp.messaging.incoming.from-rabbitmq.exchange.name", exchange)
.with("mp.messaging.incoming.from-rabbitmq.queue.name", queueName)
.with("mp.messaging.incoming.from-rabbitmq.exchange.name", exchangeName)
.with("mp.messaging.incoming.from-rabbitmq.exchange.routing-keys", routingKeys)
.with("mp.messaging.incoming.from-rabbitmq.tracing.enabled", true),
IncomingOutgoingTracing.class);
Expand Down Expand Up @@ -191,14 +191,14 @@ void incomingOutgoing() {
void incomingOutgoingSink() {
IncomingOutgoingSinkTracing tracing = runApplication(commonConfig()
.with("mp.messaging.incoming.from-rabbitmq.connector", RabbitMQConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.from-rabbitmq.queue.name", queue)
.with("mp.messaging.incoming.from-rabbitmq.exchange.name", exchange)
.with("mp.messaging.incoming.from-rabbitmq.queue.name", queueName)
.with("mp.messaging.incoming.from-rabbitmq.exchange.name", exchangeName)
.with("mp.messaging.incoming.from-rabbitmq.exchange.routing-keys", routingKeys)
.with("mp.messaging.incoming.from-rabbitmq.tracing.enabled", true),
IncomingOutgoingSinkTracing.class);

AtomicInteger counter = new AtomicInteger(1);
usage.produce(exchange, queue, routingKeys, 5, counter::getAndIncrement,
usage.produce(exchangeName, queueName, routingKeys, 5, counter::getAndIncrement,
new AMQP.BasicProperties().builder().expiration("10000").contentType("text/plain").build());
await().atMost(5, SECONDS).until(() -> tracing.getResults().size() == 5);

Expand Down
Loading
Loading