Skip to content

Commit aabfc8e

Browse files
authored
[Feature][Rabbitmq] Allow configuration of queue durability and deletion policy (#7365)
1 parent 8ea9080 commit aabfc8e

File tree

5 files changed

+98
-3
lines changed

5 files changed

+98
-3
lines changed

docs/en/connector-v2/sink/Rabbitmq.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,21 @@ convenience method for setting the fields in an AMQP URI: host, port, username,
5757

5858
the queue to write the message to
5959

60+
### durable [boolean]
61+
62+
true: The queue will survive a server restart.
63+
false: The queue will be deleted on server restart.
64+
65+
### exclusive [boolean]
66+
67+
true: The queue is used only by the current connection and will be deleted when the connection closes.
68+
false: The queue can be used by multiple connections.
69+
70+
### auto_delete [boolean]
71+
72+
true: The queue will be deleted automatically when the last consumer unsubscribes.
73+
false: The queue will not be automatically deleted.
74+
6075
### schema [Config]
6176

6277
#### fields [Config]
@@ -112,6 +127,30 @@ sink {
112127
}
113128
```
114129

130+
### Example 2
131+
132+
queue with durable, exclusive, auto_delete:
133+
134+
```hocon
135+
sink {
136+
RabbitMQ {
137+
host = "rabbitmq-e2e"
138+
port = 5672
139+
virtual_host = "/"
140+
username = "guest"
141+
password = "guest"
142+
queue_name = "test1"
143+
durable = "true"
144+
exclusive = "false"
145+
auto_delete = "false"
146+
rabbitmq.config = {
147+
requested-heartbeat = 10
148+
connection-timeout = 10
149+
}
150+
}
151+
}
152+
```
153+
115154
## Changelog
116155

117156
### next version

seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/client/RabbitmqClient.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,11 +189,16 @@ public void close() {
189189

190190
protected void setupQueue() throws IOException {
191191
if (config.getQueueName() != null) {
192-
declareQueueDefaults(channel, config.getQueueName());
192+
declareQueueDefaults(channel, config);
193193
}
194194
}
195195

196-
private void declareQueueDefaults(Channel channel, String queueName) throws IOException {
197-
channel.queueDeclare(queueName, true, false, false, null);
196+
private void declareQueueDefaults(Channel channel, RabbitmqConfig config) throws IOException {
197+
channel.queueDeclare(
198+
config.getQueueName(),
199+
config.getDurable(),
200+
config.getExclusive(),
201+
config.getAutoDelete(),
202+
null);
198203
}
199204
}

seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public class RabbitmqConfig implements Serializable {
5353
private Integer prefetchCount;
5454
private long deliveryTimeout;
5555
private String queueName;
56+
private Boolean durable;
57+
private Boolean exclusive;
58+
private Boolean autoDelete;
5659
private String routingKey;
5760
private boolean logFailuresOnly = false;
5861
private String exchange = "";
@@ -195,6 +198,30 @@ public class RabbitmqConfig implements Serializable {
195198
"Whether the messages received are supplied with a unique"
196199
+ "id to deduplicate messages (in case of failed acknowledgments).");
197200

201+
public static final Option<Boolean> DURABLE =
202+
Options.key("durable")
203+
.booleanType()
204+
.defaultValue(true)
205+
.withDescription(
206+
"true: The queue will survive a server restart."
207+
+ "false: The queue will be deleted on server restart.");
208+
209+
public static final Option<Boolean> EXCLUSIVE =
210+
Options.key("exclusive")
211+
.booleanType()
212+
.defaultValue(false)
213+
.withDescription(
214+
"true: The queue is used only by the current connection and will be deleted when the connection closes."
215+
+ "false: The queue can be used by multiple connections.");
216+
217+
public static final Option<Boolean> AUTO_DELETE =
218+
Options.key("auto_delete")
219+
.booleanType()
220+
.defaultValue(false)
221+
.withDescription(
222+
"true: The queue will be deleted automatically when the last consumer unsubscribes."
223+
+ "false: The queue will not be automatically deleted.");
224+
198225
private void parseSinkOptionProperties(Config pluginConfig) {
199226
if (CheckConfigUtil.isValidParam(pluginConfig, RABBITMQ_CONFIG.key())) {
200227
pluginConfig
@@ -259,6 +286,15 @@ public RabbitmqConfig(Config config) {
259286
if (config.hasPath(USE_CORRELATION_ID.key())) {
260287
this.usesCorrelationId = config.getBoolean(USE_CORRELATION_ID.key());
261288
}
289+
if (config.hasPath(DURABLE.key())) {
290+
this.durable = config.getBoolean(DURABLE.key());
291+
}
292+
if (config.hasPath(EXCLUSIVE.key())) {
293+
this.exclusive = config.getBoolean(EXCLUSIVE.key());
294+
}
295+
if (config.hasPath(AUTO_DELETE.key())) {
296+
this.autoDelete = config.getBoolean(AUTO_DELETE.key());
297+
}
262298
parseSinkOptionProperties(config);
263299
}
264300

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rabbitmq/RabbitmqIT.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ public class RabbitmqIT extends TestSuiteBase implements TestResource {
7575
private static final String SINK_QUEUE_NAME = "test1";
7676
private static final String USERNAME = "guest";
7777
private static final String PASSWORD = "guest";
78+
private static final Boolean DURABLE = true;
79+
private static final Boolean EXCLUSIVE = false;
80+
private static final Boolean AUTO_DELETE = false;
7881

7982
private static final Pair<SeaTunnelRowType, List<SeaTunnelRow>> TEST_DATASET =
8083
generateTestDataSet();
@@ -185,6 +188,9 @@ private void initRabbitMQ() {
185188
config.setVirtualHost("/");
186189
config.setUsername(USERNAME);
187190
config.setPassword(PASSWORD);
191+
config.setDurable(DURABLE);
192+
config.setExclusive(EXCLUSIVE);
193+
config.setAutoDelete(AUTO_DELETE);
188194
rabbitmqClient = new RabbitmqClient(config);
189195
} catch (Exception e) {
190196
throw new RuntimeException("init Rabbitmq error", e);
@@ -201,6 +207,9 @@ private RabbitmqClient initSinkRabbitMQ() {
201207
config.setVirtualHost("/");
202208
config.setUsername(USERNAME);
203209
config.setPassword(PASSWORD);
210+
config.setDurable(DURABLE);
211+
config.setExclusive(EXCLUSIVE);
212+
config.setAutoDelete(AUTO_DELETE);
204213
return new RabbitmqClient(config);
205214
} catch (Exception e) {
206215
throw new RuntimeException("init Rabbitmq error", e);

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rabbitmq-e2e/src/test/resources/rabbitmq-to-rabbitmq.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ source {
2828
username = "guest"
2929
password = "guest"
3030
queue_name = "test"
31+
durable = "true"
32+
exclusive = "false"
33+
auto_delete = "false"
3134
for_e2e_testing = true
3235
schema = {
3336
fields {
@@ -61,6 +64,9 @@ sink {
6164
virtual_host = "/"
6265
username = "guest"
6366
password = "guest"
67+
durable = "true"
68+
exclusive = "false"
69+
auto_delete = "false"
6470
queue_name = "test1"
6571
}
6672
}

0 commit comments

Comments
 (0)