Skip to content

Commit 981eb05

Browse files
authored
Add graceful restart to client (#452)
This commit introduces a restartable client that participates in SmartLifecycle and handles stop and start gracefully. Resolves #422
1 parent 97d524e commit 981eb05

File tree

15 files changed

+1258
-13
lines changed

15 files changed

+1258
-13
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ nohttp {
3434
source.exclude "**/build/**"
3535
source.exclude "**/out/**"
3636
source.exclude "**/target/**"
37+
source.exclude "**/*.dylib"
3738
}
3839

3940
check {

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactory.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Collections;
2020
import java.util.List;
2121
import java.util.Objects;
22+
import java.util.concurrent.atomic.AtomicReference;
2223

2324
import org.apache.pulsar.client.api.PulsarClient;
2425
import org.apache.pulsar.client.api.Schema;
@@ -31,6 +32,7 @@
3132
import org.springframework.core.log.LogAccessor;
3233
import org.springframework.lang.Nullable;
3334
import org.springframework.pulsar.core.DefaultTopicResolver;
35+
import org.springframework.pulsar.core.PulsarClientProxy;
3436
import org.springframework.pulsar.core.TopicResolver;
3537
import org.springframework.util.Assert;
3638
import org.springframework.util.CollectionUtils;
@@ -42,10 +44,15 @@
4244
* @author Christophe Bornet
4345
* @author Chris Bono
4446
*/
45-
public final class DefaultReactivePulsarSenderFactory<T> implements ReactivePulsarSenderFactory<T> {
47+
public final class DefaultReactivePulsarSenderFactory<T>
48+
implements ReactivePulsarSenderFactory<T>, RestartableComponentSupport {
49+
50+
private static final int LIFECYCLE_PHASE = (Integer.MIN_VALUE / 2) - 100;
4651

4752
private final LogAccessor logger = new LogAccessor(this.getClass());
4853

54+
private final AtomicReference<State> currentState = RestartableComponentSupport.initialState();
55+
4956
private final ReactivePulsarClient reactivePulsarClient;
5057

5158
private final TopicResolver topicResolver;
@@ -110,6 +117,9 @@ public ReactiveMessageSender<T> createSender(Schema<T> schema, @Nullable String
110117
private ReactiveMessageSender<T> doCreateReactiveMessageSender(Schema<T> schema, @Nullable String topic,
111118
@Nullable List<ReactiveMessageSenderBuilderCustomizer<T>> customizers) {
112119
Objects.requireNonNull(schema, "Schema must be specified");
120+
121+
this.logger.warn(() -> "**** Du CreateMessageSender for topic=" + topic);
122+
113123
String resolvedTopic = this.topicResolver.resolveTopic(topic, () -> getDefaultTopic()).orElseThrow();
114124
this.logger.trace(() -> "Creating reactive message sender for '%s' topic".formatted(resolvedTopic));
115125

@@ -139,6 +149,41 @@ public String getDefaultTopic() {
139149
return this.defaultTopic;
140150
}
141151

152+
/**
153+
* Return the phase that this lifecycle object is supposed to run in.
154+
* <p>
155+
* This component has a phase that comes after the {@link PulsarClientProxy
156+
* restartable client} but before other lifecycle and smart lifecycle components whose
157+
* phase values are &quot;0&quot; and &quot;max&quot;, respectively.
158+
* @return a phase that is after the restartable client and before other default
159+
* components.
160+
* @see PulsarClientProxy#getPhase()
161+
*/
162+
@Override
163+
public int getPhase() {
164+
return LIFECYCLE_PHASE;
165+
}
166+
167+
@Override
168+
public AtomicReference<State> currentState() {
169+
return this.currentState;
170+
}
171+
172+
@Override
173+
public LogAccessor logger() {
174+
return this.logger;
175+
}
176+
177+
@Override
178+
public void doStop() {
179+
try {
180+
this.reactiveMessageSenderCache.close();
181+
}
182+
catch (Exception e) {
183+
throw new RuntimeException(e);
184+
}
185+
}
186+
142187
/**
143188
* Builder for {@link DefaultReactivePulsarSenderFactory}.
144189
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright 2022-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.reactive.core;
18+
19+
import java.util.concurrent.atomic.AtomicReference;
20+
21+
import org.springframework.beans.factory.DisposableBean;
22+
import org.springframework.context.SmartLifecycle;
23+
import org.springframework.core.log.LogAccessor;
24+
import org.springframework.lang.Nullable;
25+
26+
/**
27+
* Provides a simple base implementation for a component that can be restarted (stopped
28+
* then started) and still be in a usable state.
29+
* <p>
30+
* This is an interface that provides default methods that rely on the current component
31+
* state which must be maintained by the implementing component.
32+
* <p>
33+
* This can serve as a base implementation for coordinated checkpoint and restore by
34+
* simply implementing the {@link #doStart() start} and/or {@link #doStop() stop} callback
35+
* to re-acquire and release resources, respectively.
36+
* <p>
37+
* Implementors are required to provide the component state and a logger.
38+
*
39+
* @author Chris Bono
40+
*/
41+
interface RestartableComponentSupport extends SmartLifecycle, DisposableBean {
42+
43+
/**
44+
* Gets the initial state for the implementing component.
45+
* @return the initial component state
46+
*/
47+
static AtomicReference<State> initialState() {
48+
return new AtomicReference<>(State.CREATED);
49+
}
50+
51+
/**
52+
* Callback to get the current state from the component.
53+
* @return the current state of the component
54+
*/
55+
AtomicReference<State> currentState();
56+
57+
/**
58+
* Callback to get the component specific logger.
59+
* @return the component specific logger
60+
*/
61+
LogAccessor logger();
62+
63+
/**
64+
* Lifecycle state of this factory.
65+
*/
66+
enum State {
67+
68+
/** Component initially created. */
69+
CREATED,
70+
/** Component in the process of being started. */
71+
STARTING,
72+
/** Component has been started. */
73+
STARTED,
74+
/** Component in the process of being stopped. */
75+
STOPPING,
76+
/** Component has been stopped. */
77+
STOPPED,
78+
/** Component has been destroyed. */
79+
DESTROYED;
80+
81+
}
82+
83+
@Override
84+
default boolean isRunning() {
85+
return State.STARTED.equals(currentState().get());
86+
}
87+
88+
@Override
89+
default void start() {
90+
State current = currentState().getAndUpdate(state -> isCreatedOrStopped(state) ? State.STARTING : state);
91+
if (isCreatedOrStopped(current)) {
92+
logger().debug(() -> "Starting...");
93+
doStart();
94+
currentState().set(State.STARTED);
95+
logger().debug(() -> "Started");
96+
}
97+
}
98+
99+
private static boolean isCreatedOrStopped(@Nullable State state) {
100+
return State.CREATED.equals(state) || State.STOPPED.equals(state);
101+
}
102+
103+
/**
104+
* Callback invoked during startup - default implementation does nothing.
105+
*/
106+
default void doStart() {
107+
}
108+
109+
@Override
110+
default void stop() {
111+
State current = currentState().getAndUpdate(state -> isCreatedOrStarted(state) ? State.STOPPING : state);
112+
if (isCreatedOrStarted(current)) {
113+
logger().debug(() -> "Stopping...");
114+
doStop();
115+
currentState().set(State.STOPPED);
116+
logger().debug(() -> "Stopped");
117+
}
118+
}
119+
120+
private static boolean isCreatedOrStarted(@Nullable State state) {
121+
return State.CREATED.equals(state) || State.STARTED.equals(state);
122+
}
123+
124+
/**
125+
* Callback invoked during stop - default implementation does nothing.
126+
*/
127+
default void doStop() {
128+
}
129+
130+
@Override
131+
default void destroy() {
132+
logger().debug(() -> "Destroying...");
133+
stop();
134+
currentState().set(State.DESTROYED);
135+
logger().debug(() -> "Destroyed");
136+
}
137+
138+
}

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarSenderFactoryTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,12 @@
2020
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
2121
import static org.assertj.core.api.Assertions.assertThatNullPointerException;
2222
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.Mockito.clearInvocations;
2324
import static org.mockito.Mockito.inOrder;
2425
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.spy;
27+
import static org.mockito.Mockito.times;
28+
import static org.mockito.Mockito.verify;
2529

2630
import java.util.Arrays;
2731
import java.util.Collections;
@@ -210,4 +214,25 @@ private ReactivePulsarSenderFactory<String> newSenderFactoryWithCustomizers(
210214

211215
}
212216

217+
@Nested
218+
class RestartFactoryTests {
219+
220+
@Test
221+
void restartLifecycle() throws Exception {
222+
var cache = spy(AdaptedReactivePulsarClientFactory.createCache());
223+
var senderFactory = (DefaultReactivePulsarSenderFactory<String>) newSenderFactoryWithCache(cache);
224+
senderFactory.start();
225+
senderFactory.createSender(schema, "topic1");
226+
senderFactory.stop();
227+
senderFactory.stop();
228+
verify(cache, times(1)).close();
229+
clearInvocations(cache);
230+
senderFactory.start();
231+
senderFactory.createSender(schema, "topic2");
232+
senderFactory.stop();
233+
verify(cache, times(1)).close();
234+
}
235+
236+
}
237+
213238
}

spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Objects;
2424
import java.util.Set;
2525
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.atomic.AtomicReference;
2627
import java.util.function.Consumer;
2728

2829
import org.apache.pulsar.client.api.MessageId;
@@ -58,12 +59,17 @@
5859
* @author Alexander Preuß
5960
* @author Christophe Bornet
6061
*/
61-
public class CachingPulsarProducerFactory<T> extends DefaultPulsarProducerFactory<T> implements DisposableBean {
62+
public class CachingPulsarProducerFactory<T> extends DefaultPulsarProducerFactory<T>
63+
implements RestartableComponentSupport {
64+
65+
private static final int LIFECYCLE_PHASE = (Integer.MIN_VALUE / 2) - 100;
6266

6367
private final LogAccessor logger = new LogAccessor(this.getClass());
6468

6569
private final CacheProvider<ProducerCacheKey<T>, Producer<T>> producerCache;
6670

71+
private final AtomicReference<State> currentState = RestartableComponentSupport.initialState();
72+
6773
/**
6874
* Construct a caching producer factory with the specified values for the cache
6975
* configuration.
@@ -85,7 +91,7 @@ public CachingPulsarProducerFactory(PulsarClient pulsarClient, @Nullable String
8591
(key, producer, cause) -> {
8692
this.logger.debug(() -> "Producer %s evicted from cache due to %s"
8793
.formatted(ProducerUtils.formatProducer(producer), cause));
88-
closeProducer(producer);
94+
closeProducer(producer, true);
8995
});
9096
}
9197

@@ -113,22 +119,51 @@ private Producer<T> createCacheableProducer(Schema<T> schema, String topic,
113119
}
114120
}
115121

122+
/**
123+
* Return the phase that this lifecycle object is supposed to run in.
124+
* <p>
125+
* Because this object depends on the restartable client, it uses a phase slightly
126+
* larger than the one used by the restartable client. This ensures that it starts
127+
* after and stops before the restartable client.
128+
* @return the phase to execute in (just after the restartable client)
129+
* @see PulsarClientProxy#getPhase()
130+
*/
131+
@Override
132+
public int getPhase() {
133+
return LIFECYCLE_PHASE;
134+
}
135+
116136
@Override
117-
public void destroy() {
118-
this.producerCache.invalidateAll((key, producer) -> closeProducer(producer));
137+
public AtomicReference<State> currentState() {
138+
return this.currentState;
119139
}
120140

121-
private void closeProducer(Producer<T> producer) {
141+
@Override
142+
public LogAccessor logger() {
143+
return this.logger;
144+
}
145+
146+
@Override
147+
public void doStop() {
148+
this.producerCache.invalidateAll((key, producer) -> closeProducer(producer, false));
149+
}
150+
151+
private void closeProducer(Producer<T> producer, boolean async) {
122152
Producer<T> actualProducer = null;
123153
if (producer instanceof ProducerWithCloseCallback<T> wrappedProducer) {
124154
actualProducer = wrappedProducer.getActualProducer();
125155
}
126156
if (actualProducer == null) {
127-
this.logger.warn(() -> "Unable to get actual producer for %s - will skip closing it"
157+
this.logger.trace(() -> "Unable to get actual producer for %s - will skip closing it"
128158
.formatted(ProducerUtils.formatProducer(producer)));
129159
return;
130160
}
131-
ProducerUtils.closeProducerAsync(actualProducer, this.logger);
161+
if (async) {
162+
ProducerUtils.closeProducerAsync(actualProducer, this.logger);
163+
}
164+
else {
165+
ProducerUtils.closeProducer(actualProducer, this.logger, Duration.ofSeconds(15L));
166+
}
132167
}
133168

134169
/**

0 commit comments

Comments
 (0)