Skip to content

Commit 937da13

Browse files
committed
GH-9112: Workaround for Paho stopReconnectCycle
Fixes: #9112 `Mqttv5ClientManager` hangs in `stop()` if never was connected. The scheduled reconnect timer in the client is never cancelled. * Call `stopReconnectCycle()` on the client via reflection when we disconnect from the client in Spring Integration MQTT components **Auto-cherry-pick to `6.2.x` & `6.1.x`**
1 parent da29e2d commit 937da13

File tree

11 files changed

+151
-7
lines changed

11 files changed

+151
-7
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.mqtt.aot;
18+
19+
import java.util.stream.Stream;
20+
21+
import org.springframework.aot.hint.ExecutableMode;
22+
import org.springframework.aot.hint.ReflectionHints;
23+
import org.springframework.aot.hint.RuntimeHints;
24+
import org.springframework.aot.hint.RuntimeHintsRegistrar;
25+
import org.springframework.util.ClassUtils;
26+
import org.springframework.util.ReflectionUtils;
27+
28+
/**
29+
* {@link RuntimeHintsRegistrar} for Spring Integration MQTT module.
30+
*
31+
* @author Artem Bilan
32+
*
33+
* @since 6.1.9
34+
*/
35+
class MqttRuntimeHints implements RuntimeHintsRegistrar {
36+
37+
@Override
38+
public void registerHints(RuntimeHints hints, ClassLoader classLoader) {
39+
ReflectionHints reflectionHints = hints.reflection();
40+
// TODO until the real fix in Paho library.
41+
Stream.of("org.eclipse.paho.client.mqttv3.MqttAsyncClient", "org.eclipse.paho.mqttv5.client.MqttAsyncClient")
42+
.filter((typeName) -> ClassUtils.isPresent(typeName, classLoader))
43+
.map((typeName) -> loadClassByName(typeName, classLoader))
44+
.flatMap((type) -> Stream.ofNullable(ReflectionUtils.findMethod(type, "stopReconnectCycle")))
45+
.forEach(method -> reflectionHints.registerMethod(method, ExecutableMode.INVOKE));
46+
}
47+
48+
private static Class<?> loadClassByName(String typeName, ClassLoader classLoader) {
49+
try {
50+
return ClassUtils.forName(typeName, classLoader);
51+
}
52+
catch (ClassNotFoundException ex) {
53+
throw new IllegalArgumentException(ex);
54+
}
55+
}
56+
57+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
/**
2+
* Provides classes to support Spring AOT.
3+
*/
4+
@org.springframework.lang.NonNullApi
5+
@org.springframework.lang.NonNullFields
6+
package org.springframework.integration.mqtt.aot;

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv3ClientManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
import org.eclipse.paho.client.mqttv3.MqttMessage;
2727

2828
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
29+
import org.springframework.integration.mqtt.support.MqttUtils;
2930
import org.springframework.util.Assert;
3031

3132
/**
@@ -149,6 +150,9 @@ public void stop() {
149150
}
150151
try {
151152
client.disconnectForcibly(getDisconnectCompletionTimeout());
153+
if (getConnectionInfo().isAutomaticReconnect()) {
154+
MqttUtils.stopClientReconnectCycle(client);
155+
}
152156
}
153157
catch (MqttException e) {
154158
logger.error("Could not disconnect from the client", e);

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/core/Mqttv5ClientManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
2929

3030
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
31+
import org.springframework.integration.mqtt.support.MqttUtils;
3132
import org.springframework.util.Assert;
3233

3334
/**
@@ -151,6 +152,9 @@ public void stop() {
151152

152153
try {
153154
client.disconnectForcibly(getDisconnectCompletionTimeout());
155+
if (getConnectionInfo().isAutomaticReconnect()) {
156+
MqttUtils.stopClientReconnectCycle(client);
157+
}
154158
}
155159
catch (MqttException e) {
156160
logger.error("Could not disconnect from the client", e);

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/MqttPahoMessageDrivenChannelAdapter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,9 @@ protected void doStop() {
228228

229229
try {
230230
this.client.disconnectForcibly(getDisconnectCompletionTimeout());
231+
if (getConnectionInfo().isAutomaticReconnect()) {
232+
MqttUtils.stopClientReconnectCycle(this.client);
233+
}
231234
}
232235
catch (MqttException ex) {
233236
logger.error(ex, "Exception while disconnecting");

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.springframework.integration.mqtt.support.MqttHeaderMapper;
5454
import org.springframework.integration.mqtt.support.MqttHeaders;
5555
import org.springframework.integration.mqtt.support.MqttMessageConverter;
56+
import org.springframework.integration.mqtt.support.MqttUtils;
5657
import org.springframework.lang.Nullable;
5758
import org.springframework.messaging.Message;
5859
import org.springframework.messaging.MessageHeaders;
@@ -299,6 +300,9 @@ protected void doStop() {
299300
}
300301
if (getClientManager() == null) {
301302
this.mqttClient.disconnectForcibly(getDisconnectCompletionTimeout());
303+
if (getConnectionInfo().isAutomaticReconnect()) {
304+
MqttUtils.stopClientReconnectCycle(this.mqttClient);
305+
}
302306
}
303307
}
304308
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/MqttPahoMessageHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -176,6 +176,9 @@ protected void doStop() {
176176
IMqttAsyncClient theClient = this.client;
177177
if (theClient != null) {
178178
theClient.disconnect().waitForCompletion(getDisconnectCompletionTimeout());
179+
if (getConnectionInfo().isAutomaticReconnect()) {
180+
MqttUtils.stopClientReconnectCycle(theClient);
181+
}
179182
theClient.close();
180183
this.client = null;
181184
}

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/outbound/Mqttv5PahoMessageHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.springframework.integration.mqtt.event.MqttProtocolErrorEvent;
4242
import org.springframework.integration.mqtt.support.MqttHeaderMapper;
4343
import org.springframework.integration.mqtt.support.MqttMessageConverter;
44+
import org.springframework.integration.mqtt.support.MqttUtils;
4445
import org.springframework.lang.Nullable;
4546
import org.springframework.messaging.Message;
4647
import org.springframework.messaging.MessageHandlingException;
@@ -184,6 +185,9 @@ protected void doStop() {
184185
try {
185186
if (getClientManager() == null) {
186187
this.mqttClient.disconnect().waitForCompletion(getDisconnectCompletionTimeout());
188+
if (getConnectionInfo().isAutomaticReconnect()) {
189+
MqttUtils.stopClientReconnectCycle(this.mqttClient);
190+
}
187191
}
188192
}
189193
catch (MqttException ex) {

spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/support/MqttUtils.java

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2021 the original author or authors.
2+
* Copyright 2020-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,9 +16,13 @@
1616

1717
package org.springframework.integration.mqtt.support;
1818

19+
import java.lang.reflect.Method;
20+
1921
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
2022

2123
import org.springframework.beans.BeanUtils;
24+
import org.springframework.util.ClassUtils;
25+
import org.springframework.util.ReflectionUtils;
2226

2327
/**
2428
* MQTT Utilities.
@@ -30,6 +34,38 @@
3034
*/
3135
public final class MqttUtils {
3236

37+
private static final boolean PAHO_MQTTV3_PRESENT =
38+
ClassUtils.isPresent("org.eclipse.paho.client.mqttv3.MqttAsyncClient", null);
39+
40+
private static final boolean PAHO_MQTTV5_PRESENT =
41+
ClassUtils.isPresent("org.eclipse.paho.mqttv5.client.MqttAsyncClient", null);
42+
43+
private static final Method V3_STOP_RECONNECT_CYCLE_METHOD;
44+
45+
private static final Method V5_STOP_RECONNECT_CYCLE_METHOD;
46+
47+
static {
48+
if (PAHO_MQTTV3_PRESENT) {
49+
V3_STOP_RECONNECT_CYCLE_METHOD =
50+
ReflectionUtils.findMethod(org.eclipse.paho.client.mqttv3.MqttAsyncClient.class,
51+
"stopReconnectCycle");
52+
ReflectionUtils.makeAccessible(V3_STOP_RECONNECT_CYCLE_METHOD);
53+
}
54+
else {
55+
V3_STOP_RECONNECT_CYCLE_METHOD = null;
56+
}
57+
58+
if (PAHO_MQTTV5_PRESENT) {
59+
V5_STOP_RECONNECT_CYCLE_METHOD =
60+
ReflectionUtils.findMethod(org.eclipse.paho.mqttv5.client.MqttAsyncClient.class,
61+
"stopReconnectCycle");
62+
ReflectionUtils.makeAccessible(V5_STOP_RECONNECT_CYCLE_METHOD);
63+
}
64+
else {
65+
V5_STOP_RECONNECT_CYCLE_METHOD = null;
66+
}
67+
}
68+
3369
private MqttUtils() {
3470
}
3571

@@ -47,4 +83,26 @@ public static MqttConnectOptions cloneConnectOptions(MqttConnectOptions options)
4783
return options2;
4884
}
4985

86+
/**
87+
* Perform a {@code stopReconnectCycle()} (via reflection) method on the provided client
88+
* to clean up resources on client stop.
89+
* TODO until the real fix in Paho library.
90+
* @param client the MQTTv3 Paho client instance.
91+
* @since 6.1.9
92+
*/
93+
public static void stopClientReconnectCycle(org.eclipse.paho.client.mqttv3.IMqttAsyncClient client) {
94+
ReflectionUtils.invokeMethod(V3_STOP_RECONNECT_CYCLE_METHOD, client);
95+
}
96+
97+
/**
98+
* Perform a {@code stopReconnectCycle()} (via reflection) method on the provided client
99+
* to clean up resources on client stop.
100+
* TODO until the real fix in Paho library.
101+
* @param client the MQTTv5 Paho client instance.
102+
* @since 6.1.9
103+
*/
104+
public static void stopClientReconnectCycle(org.eclipse.paho.mqttv5.client.IMqttAsyncClient client) {
105+
ReflectionUtils.invokeMethod(V5_STOP_RECONNECT_CYCLE_METHOD, client);
106+
}
107+
50108
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.springframework.aot.hint.RuntimeHintsRegistrar=org.springframework.integration.mqtt.aot.MqttRuntimeHints

spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/MqttAdapterTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -488,8 +488,8 @@ public void testDifferentQos() throws Exception {
488488
given(token.getGrantedQos()).willReturn(new int[] {2, 0});
489489
willReturn(token).given(client).subscribe(any(String[].class), any(int[].class), any());
490490

491-
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("foo", "bar", factory,
492-
"baz", "fix");
491+
MqttPahoMessageDrivenChannelAdapter adapter =
492+
new MqttPahoMessageDrivenChannelAdapter("tcp://mqtt.host", "bar", factory, "baz", "fix");
493493
AtomicReference<Method> method = new AtomicReference<>();
494494
ReflectionUtils.doWithMethods(MqttPahoMessageDrivenChannelAdapter.class, m -> {
495495
m.setAccessible(true);

0 commit comments

Comments
 (0)