Skip to content

Commit 104ee68

Browse files
committed
Apply Nullability into AMQP module
Related to: #10083 * Add `@org.jspecify.annotations.NullMarked` to all packages of SI-AMQP module * Fix raised concerns about `@Nullable`, including tests * Propagate `@Nullable` to some SI-core classes where that was concerned from the AMQP classes * The change to the `RequestReplyHeaderMapper` has led to fixes in the `DefaultSoapHeaderMapper` and its usage in the `AbstractWebServiceOutboundGateway` * Fix Javadocs in the `AvroTestClass1` & `AvroTestClass2` to avoid compilation warning even if they are claimed to be generated
1 parent 95202f7 commit 104ee68

File tree

38 files changed

+410
-387
lines changed

38 files changed

+410
-387
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/AbstractAmqpChannel.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616

1717
package org.springframework.integration.amqp.channel;
1818

19+
import java.util.Objects;
20+
21+
import org.jspecify.annotations.Nullable;
22+
1923
import org.springframework.amqp.core.AmqpAdmin;
2024
import org.springframework.amqp.core.AmqpTemplate;
2125
import org.springframework.amqp.core.MessageDeliveryMode;
@@ -53,21 +57,21 @@ public abstract class AbstractAmqpChannel extends AbstractMessageChannel impleme
5357

5458
private final AmqpTemplate amqpTemplate;
5559

56-
private final RabbitTemplate rabbitTemplate;
60+
private final @Nullable RabbitTemplate rabbitTemplate;
5761

5862
private final AmqpHeaderMapper outboundHeaderMapper;
5963

6064
private final AmqpHeaderMapper inboundHeaderMapper;
6165

62-
private AmqpAdmin admin;
66+
private @Nullable AmqpAdmin admin;
6367

64-
private ConnectionFactory connectionFactory;
68+
private @Nullable ConnectionFactory connectionFactory;
6569

6670
private boolean extractPayload;
6771

6872
private boolean loggingEnabled = true;
6973

70-
private MessageDeliveryMode defaultDeliveryMode;
74+
private @Nullable MessageDeliveryMode defaultDeliveryMode;
7175

7276
private boolean headersMappedLast;
7377

@@ -149,8 +153,6 @@ public void setDefaultDeliveryMode(MessageDeliveryMode defaultDeliveryMode) {
149153
public void setExtractPayload(boolean extractPayload) {
150154
if (extractPayload) {
151155
Assert.isTrue(this.rabbitTemplate != null, "amqpTemplate must be a RabbitTemplate for 'extractPayload'");
152-
Assert.state(this.outboundHeaderMapper != null && this.inboundHeaderMapper != null,
153-
"'extractPayload' requires both inbound and outbound header mappers");
154156
}
155157
this.extractPayload = extractPayload;
156158
}
@@ -211,6 +213,7 @@ protected AmqpTemplate getAmqpTemplate() {
211213
}
212214

213215
protected RabbitTemplate getRabbitTemplate() {
216+
Assert.notNull(this.rabbitTemplate, "The 'RabbitTemplate' must be provided.");
214217
return this.rabbitTemplate;
215218
}
216219

@@ -222,11 +225,11 @@ protected final void setConnectionFactory(ConnectionFactory connectionFactory) {
222225
this.connectionFactory = connectionFactory;
223226
}
224227

225-
protected AmqpAdmin getAdmin() {
228+
protected @Nullable AmqpAdmin getAdmin() {
226229
return this.admin;
227230
}
228231

229-
protected ConnectionFactory getConnectionFactory() {
232+
protected @Nullable ConnectionFactory getConnectionFactory() {
230233
return this.connectionFactory;
231234
}
232235

@@ -251,8 +254,8 @@ public void destroy() {
251254
protected boolean doSend(Message<?> message, long timeout) {
252255
if (this.extractPayload) {
253256
this.amqpTemplate.send(getExchangeName(), getRoutingKey(), MappingUtils.mapMessage(message,
254-
this.rabbitTemplate.getMessageConverter(), this.outboundHeaderMapper, this.defaultDeliveryMode,
255-
this.headersMappedLast));
257+
getRabbitTemplate().getMessageConverter(), Objects.requireNonNull(this.outboundHeaderMapper),
258+
this.defaultDeliveryMode, this.headersMappedLast));
256259
}
257260
else {
258261
this.amqpTemplate.convertAndSend(getExchangeName(), getRoutingKey(), message);
@@ -261,7 +264,7 @@ protected boolean doSend(Message<?> message, long timeout) {
261264
}
262265

263266
@Override
264-
public void onCreate(Connection connection) {
267+
public void onCreate(@Nullable Connection connection) {
265268
doDeclares();
266269
}
267270

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/AbstractSubscribableAmqpChannel.java

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.commons.logging.Log;
2222
import org.apache.commons.logging.LogFactory;
23+
import org.jspecify.annotations.Nullable;
2324

2425
import org.springframework.amqp.AmqpConnectException;
2526
import org.springframework.amqp.core.AmqpTemplate;
@@ -28,6 +29,7 @@
2829
import org.springframework.amqp.rabbit.core.RabbitAdmin;
2930
import org.springframework.amqp.rabbit.core.RabbitTemplate;
3031
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
32+
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
3133
import org.springframework.amqp.support.converter.MessageConverter;
3234
import org.springframework.amqp.support.converter.SimpleMessageConverter;
3335
import org.springframework.integration.MessageDispatchingException;
@@ -39,7 +41,6 @@
3941
import org.springframework.integration.support.MessageBuilderFactory;
4042
import org.springframework.integration.support.management.ManageableSmartLifecycle;
4143
import org.springframework.messaging.Message;
42-
import org.springframework.messaging.MessageDeliveryException;
4344
import org.springframework.messaging.MessageHandler;
4445
import org.springframework.messaging.SubscribableChannel;
4546
import org.springframework.util.Assert;
@@ -60,11 +61,12 @@ abstract class AbstractSubscribableAmqpChannel extends AbstractAmqpChannel
6061

6162
private final AbstractMessageListenerContainer container;
6263

64+
@SuppressWarnings("NullAway.Init")
6365
private volatile AbstractDispatcher dispatcher;
6466

6567
private final boolean isPubSub;
6668

67-
private volatile Integer maxSubscribers;
69+
private volatile @Nullable Integer maxSubscribers;
6870

6971
private volatile boolean declared;
7072

@@ -151,8 +153,9 @@ protected AbstractSubscribableAmqpChannel(String channelName,
151153
*/
152154
public void setMaxSubscribers(int maxSubscribers) {
153155
this.maxSubscribers = maxSubscribers;
154-
if (this.dispatcher != null) {
155-
this.dispatcher.setMaxSubscribers(this.maxSubscribers);
156+
AbstractDispatcher dispatcherToUse = this.dispatcher;
157+
if (dispatcherToUse != null) {
158+
dispatcherToUse.setMaxSubscribers(maxSubscribers);
156159
}
157160
}
158161

@@ -169,14 +172,18 @@ public boolean unsubscribe(MessageHandler handler) {
169172
@Override
170173
public void onInit() {
171174
super.onInit();
172-
this.dispatcher = this.createDispatcher();
173-
if (this.maxSubscribers == null) {
174-
this.maxSubscribers =
175+
this.dispatcher = createDispatcher();
176+
Integer maxSubscribersToCheck = this.maxSubscribers;
177+
if (maxSubscribersToCheck == null) {
178+
int newMaxSubscribers =
175179
this.isPubSub
176180
? getIntegrationProperties().getChannelsMaxBroadcastSubscribers()
177181
: getIntegrationProperties().getChannelsMaxUnicastSubscribers();
182+
setMaxSubscribers(newMaxSubscribers);
183+
}
184+
else {
185+
this.dispatcher.setMaxSubscribers(maxSubscribersToCheck);
178186
}
179-
setMaxSubscribers(this.maxSubscribers);
180187
String queue = obtainQueueName(this.channelName);
181188
this.container.setQueueNames(queue);
182189
MessageConverter converter =
@@ -199,17 +206,17 @@ public void onInit() {
199206

200207
@Override
201208
public boolean isAutoStartup() {
202-
return (this.container != null) && this.container.isAutoStartup();
209+
return this.container.isAutoStartup();
203210
}
204211

205212
@Override
206213
public int getPhase() {
207-
return (this.container != null) ? this.container.getPhase() : 0;
214+
return this.container.getPhase();
208215
}
209216

210217
@Override
211218
public boolean isRunning() {
212-
return (this.container != null) && this.container.isRunning();
219+
return this.container.isRunning();
213220
}
214221

215222
@Override
@@ -224,22 +231,20 @@ public void start() {
224231
"Postponed to the next connection create...");
225232
}
226233
}
227-
if (this.container != null) {
228-
this.container.start();
229-
}
234+
this.container.start();
230235
}
231236

232237
@Override
233238
public void stop() {
234-
if (this.container != null) {
239+
if (isRunning()) {
235240
this.container.stop();
236241
this.declared = false;
237242
}
238243
}
239244

240245
@Override
241246
public void stop(Runnable callback) {
242-
if (this.container != null) {
247+
if (isRunning()) {
243248
this.container.stop(callback);
244249
this.declared = false;
245250
}
@@ -251,10 +256,8 @@ public void stop(Runnable callback) {
251256
@Override
252257
public void destroy() {
253258
super.destroy();
254-
if (this.container != null) {
255-
this.container.destroy();
256-
this.declared = false;
257-
}
259+
this.container.destroy();
260+
this.declared = false;
258261
}
259262

260263
protected abstract AbstractDispatcher createDispatcher();
@@ -301,25 +304,18 @@ public void onMessage(org.springframework.amqp.core.Message message) {
301304
this.dispatcher.dispatch(messageToSend);
302305
}
303306
catch (MessageDispatchingException e) {
304-
String exceptionMessage = e.getMessage() + " for amqp-channel '"
305-
+ this.channel.getFullChannelName() + "'.";
306-
if (this.isPubSub) {
307-
// log only for backwards compatibility with pub/sub
308-
if (this.logger.isWarnEnabled()) {
309-
this.logger.warn(exceptionMessage, e);
310-
}
311-
}
312-
else {
313-
throw new MessageDeliveryException(messageToSend, exceptionMessage, e);
314-
}
307+
String exceptionMessage =
308+
e.getMessage() + " for amqp-channel '" + this.channel.getFullChannelName() + "'.";
309+
310+
throw new ListenerExecutionFailedException(exceptionMessage, e, message);
315311
}
316312
}
317313

318314
private Message<Object> buildMessage(org.springframework.amqp.core.Message message, Object converted) {
319315
AbstractIntegrationMessageBuilder<Object> messageBuilder =
320316
this.messageBuilderFactory.withPayload(converted);
321317
if (this.channel.isExtractPayload()) {
322-
Map<String, Object> headers =
318+
Map<String, @Nullable Object> headers =
323319
this.inboundHeaderMapper.toHeadersFromRequest(message.getMessageProperties());
324320
messageBuilder.copyHeaders(headers);
325321
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/PointToPointSubscribableAmqpChannel.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
*/
4040
public class PointToPointSubscribableAmqpChannel extends AbstractSubscribableAmqpChannel {
4141

42+
@SuppressWarnings("NullAway.Init")
4243
private volatile Queue queue;
4344

4445
/**

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/PollableAmqpChannel.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,10 @@ public class PollableAmqpChannel extends AbstractAmqpChannel
5757

5858
private final String channelName;
5959

60+
@SuppressWarnings("NullAway.Init")
6061
private Queue queue;
6162

62-
private CounterFacade receiveCounter;
63+
private @Nullable CounterFacade receiveCounter;
6364

6465
private volatile int executorInterceptorsSize;
6566

@@ -148,19 +149,16 @@ protected void doDeclares() {
148149
}
149150

150151
@Override
151-
@Nullable
152-
public Message<?> receive() {
152+
public @Nullable Message<?> receive() {
153153
return doReceive(null);
154154
}
155155

156156
@Override
157-
@Nullable
158-
public Message<?> receive(long timeout) {
157+
public @Nullable Message<?> receive(long timeout) {
159158
return doReceive(timeout);
160159
}
161160

162-
@Nullable
163-
protected Message<?> doReceive(Long timeout) {
161+
protected @Nullable Message<?> doReceive(@Nullable Long timeout) {
164162
ChannelInterceptorList interceptorList = getIChannelInterceptorList();
165163
Deque<ChannelInterceptor> interceptorStack = null;
166164
AtomicBoolean counted = new AtomicBoolean();
@@ -194,7 +192,7 @@ protected Message<?> doReceive(Long timeout) {
194192
}
195193

196194
@Nullable
197-
protected Object performReceive(Long timeout) {
195+
protected Object performReceive(@Nullable Long timeout) {
198196
if (!this.declared) {
199197
doDeclares();
200198
this.declared = true;
@@ -220,7 +218,7 @@ protected Object performReceive(Long timeout) {
220218

221219
if (message != null) {
222220
Object payload = rabbitTemplate.getMessageConverter().fromMessage(message);
223-
Map<String, Object> headers = getInboundHeaderMapper()
221+
Map<String, @Nullable Object> headers = getInboundHeaderMapper()
224222
.toHeadersFromRequest(message.getMessageProperties());
225223
return getMessageBuilderFactory()
226224
.withPayload(payload)
@@ -233,7 +231,8 @@ protected Object performReceive(Long timeout) {
233231
}
234232
}
235233

236-
private Message<?> buildMessageFromResult(@Nullable Object object, boolean traceEnabled, AtomicBoolean counted) {
234+
private @Nullable Message<?> buildMessageFromResult(
235+
@Nullable Object object, boolean traceEnabled, AtomicBoolean counted) {
237236

238237
Message<?> message = null;
239238
if (object != null) {
@@ -323,8 +322,7 @@ public boolean removeInterceptor(ChannelInterceptor interceptor) {
323322
}
324323

325324
@Override
326-
@Nullable
327-
public ChannelInterceptor removeInterceptor(int index) {
325+
public @Nullable ChannelInterceptor removeInterceptor(int index) {
328326
ChannelInterceptor interceptor = super.removeInterceptor(index);
329327
if (interceptor instanceof ExecutorChannelInterceptor) {
330328
this.executorInterceptorsSize--;

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/channel/PublishSubscribeAmqpChannel.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ public class PublishSubscribeAmqpChannel extends AbstractSubscribableAmqpChannel
4242

4343
private final Queue queue = new AnonymousQueue();
4444

45+
@SuppressWarnings("NullAway.Init")
4546
private volatile FanoutExchange exchange;
4647

48+
@SuppressWarnings("NullAway.Init")
4749
private volatile Binding binding;
4850

4951
/**
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/**
22
* Provides classes related to AMQP-backed channels.
33
*/
4+
@org.jspecify.annotations.NullMarked
45
package org.springframework.integration.amqp.channel;

0 commit comments

Comments
 (0)