Skip to content

Commit 521acc7

Browse files
authored
GH-414: Avoid I/O in synchronized blocks (#457)
See #414 Use ReentrantLocks to avoid unnecessary virtual thread pinning.
1 parent 9ebdf17 commit 521acc7

File tree

8 files changed

+101
-45
lines changed

8 files changed

+101
-45
lines changed

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/listener/DefaultReactivePulsarMessageListenerContainer.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.List;
2121
import java.util.Objects;
2222
import java.util.concurrent.atomic.AtomicBoolean;
23+
import java.util.concurrent.locks.ReentrantLock;
2324

2425
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
2526
import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
@@ -49,7 +50,7 @@ public non-sealed class DefaultReactivePulsarMessageListenerContainer<T>
4950

5051
private boolean autoStartup = true;
5152

52-
private final Object lifecycleMonitor = new Object();
53+
private final ReentrantLock lifecycleLock = new ReentrantLock();
5354

5455
private final AtomicBoolean running = new AtomicBoolean(false);
5556

@@ -106,22 +107,30 @@ public void setConsumerCustomizer(ReactiveMessageConsumerBuilderCustomizer<T> co
106107

107108
@Override
108109
public final void start() {
109-
synchronized (this.lifecycleMonitor) {
110+
this.lifecycleLock.lock();
111+
try {
110112
if (!isRunning()) {
111113
Objects.requireNonNull(this.pulsarContainerProperties.getMessageHandler(),
112114
"A ReactivePulsarMessageHandler must be provided");
113115
doStart();
114116
}
115117
}
118+
finally {
119+
this.lifecycleLock.unlock();
120+
}
116121
}
117122

118123
@Override
119124
public void stop() {
120-
synchronized (this.lifecycleMonitor) {
125+
this.lifecycleLock.lock();
126+
try {
121127
if (isRunning()) {
122128
doStop();
123129
}
124130
}
131+
finally {
132+
this.lifecycleLock.unlock();
133+
}
125134
}
126135

127136
private void doStart() {

spring-pulsar/src/main/java/org/springframework/pulsar/config/GenericListenerEndpointRegistry.java

+12-11
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Set;
2525
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.concurrent.locks.ReentrantLock;
2728

2829
import org.springframework.beans.BeansException;
2930
import org.springframework.beans.factory.BeanInitializationException;
@@ -57,6 +58,7 @@
5758
* @param <E> listener endpoint type.
5859
* @author Soby Chacko
5960
* @author Christophe Bornet
61+
* @author Chris Bono
6062
*/
6163
public class GenericListenerEndpointRegistry<C extends MessageListenerContainer, E extends ListenerEndpoint<C>>
6264
implements PulsarListenerContainerRegistry, DisposableBean, SmartLifecycle, ApplicationContextAware,
@@ -66,6 +68,8 @@ public class GenericListenerEndpointRegistry<C extends MessageListenerContainer,
6668

6769
private final Map<String, C> listenerContainers = new ConcurrentHashMap<>();
6870

71+
private final ReentrantLock containersLock = new ReentrantLock();
72+
6973
private ConfigurableApplicationContext applicationContext;
7074

7175
private int phase = C.DEFAULT_PHASE;
@@ -118,18 +122,18 @@ public void registerListenerContainer(E endpoint, ListenerContainerFactory<? ext
118122
boolean startImmediately) {
119123
Assert.notNull(endpoint, "Endpoint must not be null");
120124
Assert.notNull(factory, "Factory must not be null");
121-
122-
String subscriptionName = endpoint.getSubscriptionName();
123125
String id = endpoint.getId();
124-
125-
Assert.hasText(subscriptionName, "Endpoint id must not be empty");
126-
127-
synchronized (this.listenerContainers) {
126+
Assert.hasText(id, "Endpoint id must not be empty");
127+
this.containersLock.lock();
128+
try {
128129
Assert.state(!this.listenerContainers.containsKey(id),
129-
"Another endpoint is already registered with id '" + subscriptionName + "'");
130+
"Another endpoint is already registered with id '" + id + "'");
130131
C container = createListenerContainer(endpoint, factory);
131132
this.listenerContainers.put(id, container);
132133
}
134+
finally {
135+
this.containersLock.unlock();
136+
}
133137
}
134138

135139
protected C createListenerContainer(E endpoint, ListenerContainerFactory<? extends C, E> factory) {
@@ -146,10 +150,7 @@ protected C createListenerContainer(E endpoint, ListenerContainerFactory<? exten
146150
}
147151

148152
int containerPhase = listenerContainer.getPhase();
149-
if (listenerContainer.isAutoStartup() && containerPhase != C.DEFAULT_PHASE) { // a
150-
// custom
151-
// phase
152-
// value
153+
if (listenerContainer.isAutoStartup() && containerPhase != C.DEFAULT_PHASE) {
153154
if (this.phase != C.DEFAULT_PHASE && this.phase != containerPhase) {
154155
throw new IllegalStateException("Encountered phase mismatch between container "
155156
+ "factory definitions: " + this.phase + " vs " + containerPhase);

spring-pulsar/src/main/java/org/springframework/pulsar/config/GenericReaderEndpointRegistry.java

+10-5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Set;
2525
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.concurrent.locks.ReentrantLock;
2728

2829
import org.springframework.beans.BeansException;
2930
import org.springframework.beans.factory.BeanInitializationException;
@@ -56,6 +57,7 @@
5657
* @param <C> container type
5758
* @param <E> endpoint type
5859
* @author Soby Chacko
60+
* @author Chris Bono
5961
*/
6062
public class GenericReaderEndpointRegistry<C extends PulsarMessageReaderContainer, E extends PulsarReaderEndpoint<C>>
6163
implements PulsarReaderContainerRegistry, DisposableBean, SmartLifecycle, ApplicationContextAware,
@@ -65,6 +67,8 @@ public class GenericReaderEndpointRegistry<C extends PulsarMessageReaderContaine
6567

6668
private final Map<String, C> readerContainers = new ConcurrentHashMap<>();
6769

70+
private final ReentrantLock containersLock = new ReentrantLock();
71+
6872
private ConfigurableApplicationContext applicationContext;
6973

7074
private int phase = C.DEFAULT_PHASE;
@@ -123,12 +127,16 @@ public void registerReaderContainer(E endpoint, ReaderContainerFactory<? extends
123127

124128
Assert.hasText(subscriptionName, "Endpoint id must not be empty");
125129

126-
synchronized (this.readerContainers) {
130+
this.containersLock.lock();
131+
try {
127132
Assert.state(!this.readerContainers.containsKey(id),
128133
"Another endpoint is already registered with id '" + subscriptionName + "'");
129134
C container = createReaderContainer(endpoint, factory);
130135
this.readerContainers.put(id, container);
131136
}
137+
finally {
138+
this.containersLock.unlock();
139+
}
132140
}
133141

134142
protected C createReaderContainer(E endpoint, ReaderContainerFactory<? extends C, E> factory) {
@@ -145,10 +153,7 @@ protected C createReaderContainer(E endpoint, ReaderContainerFactory<? extends C
145153
}
146154

147155
int containerPhase = readerContainer.getPhase();
148-
if (readerContainer.isAutoStartup() && containerPhase != C.DEFAULT_PHASE) { // a
149-
// custom
150-
// phase
151-
// value
156+
if (readerContainer.isAutoStartup() && containerPhase != C.DEFAULT_PHASE) {
152157
if (this.phase != C.DEFAULT_PHASE && this.phase != containerPhase) {
153158
throw new IllegalStateException("Encountered phase mismatch between container "
154159
+ "factory definitions: " + this.phase + " vs " + containerPhase);

spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarListenerEndpointRegistrar.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.ArrayList;
2020
import java.util.List;
21+
import java.util.concurrent.locks.ReentrantLock;
2122

2223
import org.springframework.beans.factory.BeanFactory;
2324
import org.springframework.beans.factory.BeanFactoryAware;
@@ -39,6 +40,8 @@ public class PulsarListenerEndpointRegistrar implements BeanFactoryAware, Initia
3940

4041
private final List<PulsarListenerEndpointDescriptor> endpointDescriptors = new ArrayList<>();
4142

43+
private final ReentrantLock endpointDescriptorsLock = new ReentrantLock();
44+
4245
private GenericListenerEndpointRegistry endpointRegistry;
4346

4447
private ListenerContainerFactory<?, ?> containerFactory;
@@ -81,13 +84,17 @@ public void afterPropertiesSet() {
8184
}
8285

8386
protected void registerAllEndpoints() {
84-
synchronized (this.endpointDescriptors) {
87+
this.endpointDescriptorsLock.lock();
88+
try {
8589
for (PulsarListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
8690
ListenerContainerFactory<?, ?> factory = resolveContainerFactory(descriptor);
8791
this.endpointRegistry.registerListenerContainer(descriptor.endpoint, factory);
8892
}
8993
this.startImmediately = true; // trigger immediate startup
9094
}
95+
finally {
96+
this.endpointDescriptorsLock.unlock();
97+
}
9198
}
9299

93100
private ListenerContainerFactory<?, ?> resolveContainerFactory(PulsarListenerEndpointDescriptor descriptor) {
@@ -115,7 +122,8 @@ public void registerEndpoint(ListenerEndpoint endpoint, @Nullable ListenerContai
115122
// Factory may be null, we defer the resolution right before actually creating the
116123
// container
117124
PulsarListenerEndpointDescriptor descriptor = new PulsarListenerEndpointDescriptor(endpoint, factory);
118-
synchronized (this.endpointDescriptors) {
125+
this.endpointDescriptorsLock.lock();
126+
try {
119127
if (this.startImmediately) { // Register and start immediately
120128
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
121129
resolveContainerFactory(descriptor), true);
@@ -124,6 +132,9 @@ public void registerEndpoint(ListenerEndpoint endpoint, @Nullable ListenerContai
124132
this.endpointDescriptors.add(descriptor);
125133
}
126134
}
135+
finally {
136+
this.endpointDescriptorsLock.unlock();
137+
}
127138
}
128139

129140
private static final class PulsarListenerEndpointDescriptor {

spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarReaderEndpointRegistrar.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.ArrayList;
2020
import java.util.List;
21+
import java.util.concurrent.locks.ReentrantLock;
2122

2223
import org.springframework.beans.factory.BeanFactory;
2324
import org.springframework.beans.factory.BeanFactoryAware;
@@ -38,6 +39,8 @@ public class PulsarReaderEndpointRegistrar implements BeanFactoryAware, Initiali
3839

3940
private final List<PulsarReaderEndpointDescriptor> endpointDescriptors = new ArrayList<>();
4041

42+
private final ReentrantLock endpointDescriptorsLock = new ReentrantLock();
43+
4144
private GenericReaderEndpointRegistry endpointRegistry;
4245

4346
private ReaderContainerFactory<?, ?> containerFactory;
@@ -80,13 +83,17 @@ public void afterPropertiesSet() {
8083
}
8184

8285
protected void registerAllEndpoints() {
83-
synchronized (this.endpointDescriptors) {
86+
this.endpointDescriptorsLock.lock();
87+
try {
8488
for (PulsarReaderEndpointDescriptor descriptor : this.endpointDescriptors) {
8589
ReaderContainerFactory<?, ?> factory = resolveContainerFactory(descriptor);
8690
this.endpointRegistry.registerReaderContainer(descriptor.endpoint, factory);
8791
}
8892
this.startImmediately = true; // trigger immediate startup
8993
}
94+
finally {
95+
this.endpointDescriptorsLock.unlock();
96+
}
9097
}
9198

9299
private ReaderContainerFactory<?, ?> resolveContainerFactory(PulsarReaderEndpointDescriptor descriptor) {
@@ -114,7 +121,8 @@ public void registerEndpoint(PulsarReaderEndpoint endpoint, @Nullable ReaderCont
114121
// Factory may be null, we defer the resolution right before actually creating the
115122
// container
116123
PulsarReaderEndpointDescriptor descriptor = new PulsarReaderEndpointDescriptor(endpoint, factory);
117-
synchronized (this.endpointDescriptors) {
124+
this.endpointDescriptorsLock.lock();
125+
try {
118126
if (this.startImmediately) { // Register and start immediately
119127
this.endpointRegistry.registerReaderContainer(descriptor.endpoint, resolveContainerFactory(descriptor),
120128
true);
@@ -123,6 +131,9 @@ public void registerEndpoint(PulsarReaderEndpoint endpoint, @Nullable ReaderCont
123131
this.endpointDescriptors.add(descriptor);
124132
}
125133
}
134+
finally {
135+
this.endpointDescriptorsLock.unlock();
136+
}
126137
}
127138

128139
private static final class PulsarReaderEndpointDescriptor {

spring-pulsar/src/main/java/org/springframework/pulsar/listener/AbstractPulsarMessageListenerContainer.java

+23-5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.pulsar.listener;
1818

19+
import java.util.concurrent.locks.ReentrantLock;
20+
1921
import org.apache.pulsar.client.api.DeadLetterPolicy;
2022
import org.apache.pulsar.client.api.RedeliveryBackoff;
2123

@@ -38,7 +40,7 @@ public non-sealed abstract class AbstractPulsarMessageListenerContainer<T> exten
3840

3941
private final PulsarContainerProperties pulsarContainerProperties;
4042

41-
protected final Object lifecycleMonitor = new Object();
43+
protected final ReentrantLock lifecycleLock = new ReentrantLock();
4244

4345
private volatile boolean paused;
4446

@@ -93,22 +95,30 @@ public void setAutoStartup(boolean autoStartup) {
9395

9496
@Override
9597
public final void start() {
96-
synchronized (this.lifecycleMonitor) {
98+
this.lifecycleLock.lock();
99+
try {
97100
if (!isRunning()) {
98101
Assert.state(this.pulsarContainerProperties.getMessageListener() instanceof PulsarRecordMessageListener,
99102
() -> "A " + PulsarRecordMessageListener.class.getName() + " implementation must be provided");
100103
doStart();
101104
}
102105
}
106+
finally {
107+
this.lifecycleLock.unlock();
108+
}
103109
}
104110

105111
@Override
106112
public void stop() {
107-
synchronized (this.lifecycleMonitor) {
113+
this.lifecycleLock.lock();
114+
try {
108115
if (isRunning()) {
109116
doStop();
110117
}
111118
}
119+
finally {
120+
this.lifecycleLock.unlock();
121+
}
112122
}
113123

114124
@Override
@@ -159,16 +169,24 @@ public ConsumerBuilderCustomizer<T> getConsumerBuilderCustomizer() {
159169

160170
@Override
161171
public void pause() {
162-
synchronized (this.lifecycleMonitor) {
172+
this.lifecycleLock.lock();
173+
try {
163174
doPause();
164175
}
176+
finally {
177+
this.lifecycleLock.unlock();
178+
}
165179
}
166180

167181
@Override
168182
public void resume() {
169-
synchronized (this.lifecycleMonitor) {
183+
this.lifecycleLock.lock();
184+
try {
170185
doResume();
171186
}
187+
finally {
188+
this.lifecycleLock.unlock();
189+
}
172190
}
173191

174192
protected boolean isPaused() {

0 commit comments

Comments
 (0)