Skip to content

Commit 587794a

Browse files
committed
a
1 parent 91f523c commit 587794a

9 files changed

+49
-221
lines changed

Diff for: src/main/java/com/hivemq/mqtt/services/PublishDistributorImpl.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.common.util.concurrent.FutureCallback;
2121
import com.google.common.util.concurrent.Futures;
2222
import com.google.common.util.concurrent.ListenableFuture;
23+
import com.google.common.util.concurrent.MoreExecutors;
2324
import com.google.common.util.concurrent.SettableFuture;
2425
import com.hivemq.configuration.service.MqttConfigurationService;
2526
import com.hivemq.extension.sdk.api.annotations.NotNull;
@@ -193,7 +194,7 @@ public PublishDistributorImpl(
193194

194195
final SettableFuture<PublishStatus> statusFuture = SettableFuture.create();
195196

196-
Futures.addCallback(future, new FutureCallback<Void>() {
197+
Futures.addCallback(future, new FutureCallback<>() {
197198
@Override
198199
public void onSuccess(final Void result) {
199200
statusFuture.set(DELIVERED);
@@ -203,7 +204,7 @@ public void onSuccess(final Void result) {
203204
public void onFailure(final Throwable t) {
204205
statusFuture.set(FAILED);
205206
}
206-
}, singleWriterService.callbackExecutor(client));
207+
}, MoreExecutors.directExecutor());
207208
return statusFuture;
208209
}
209210

Diff for: src/main/java/com/hivemq/mqtt/services/PublishPollServiceImpl.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ public void onFailure(final Throwable t) {
195195
Exceptions.rethrowError("Exception in new messages handling", t);
196196
channel.disconnect();
197197
}
198-
}, singleWriterService.callbackExecutor(client));
198+
}, MoreExecutors.directExecutor());
199199
}
200200

201201
@Override
@@ -262,7 +262,7 @@ public void onSuccess(final ImmutableList<MessageWithID> messages) {
262262
public void onFailure(final Throwable t) {
263263
Exceptions.rethrowError("Exception in inflight messages handling", t);
264264
}
265-
}, singleWriterService.callbackExecutor(client));
265+
}, MoreExecutors.directExecutor());
266266
}
267267

268268
private @NotNull AtomicInteger inFlightMessageCount(final @NotNull Channel channel) {
@@ -391,7 +391,7 @@ public void onFailure(final @NotNull Throwable t) {
391391
"for shared subscription " +
392392
sharedSubscription, t);
393393
}
394-
}, singleWriterService.callbackExecutor(client));
394+
}, MoreExecutors.directExecutor());
395395
}
396396

397397
@Override

Diff for: src/main/java/com/hivemq/persistence/InMemoryProducerQueues.java

+12-43
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,6 @@
5555
*/
5656
public class InMemoryProducerQueues implements ProducerQueues {
5757

58-
private final int amountOfQueues;
59-
6058
private final int bucketsPerQueue;
6159

6260
private final @NotNull AtomicBoolean shutdown = new AtomicBoolean(false);
@@ -75,7 +73,6 @@ public class InMemoryProducerQueues implements ProducerQueues {
7573
public InMemoryProducerQueues(final int persistenceBucketCount, final int amountOfQueues) {
7674

7775
this.persistenceBucketCount = persistenceBucketCount;
78-
this.amountOfQueues = amountOfQueues;
7976
bucketsPerQueue = persistenceBucketCount / amountOfQueues;
8077
shutdownGracePeriod = InternalConfigurations.PERSISTENCE_SHUTDOWN_GRACE_PERIOD_MSEC.get();
8178

@@ -94,7 +91,8 @@ public InMemoryProducerQueues(final int persistenceBucketCount, final int amount
9491
}
9592

9693
@VisibleForTesting
97-
@NotNull ImmutableList<Integer> createBucketIndexes(final int queueIndex, final int bucketsPerQueue) {
94+
@NotNull
95+
ImmutableList<Integer> createBucketIndexes(final int queueIndex, final int bucketsPerQueue) {
9896
final ImmutableList.Builder<Integer> builder = ImmutableList.builder();
9997
for (int i = bucketsPerQueue * queueIndex; i < bucketsPerQueue * (queueIndex + 1); i++) {
10098
builder.add(i);
@@ -104,60 +102,32 @@ public InMemoryProducerQueues(final int persistenceBucketCount, final int amount
104102

105103
public <R> @NotNull ListenableFuture<R> submit(final @NotNull String key, final @NotNull Task<R> task) {
106104
//noinspection ConstantConditions (future is never null if the callbacks are null)
107-
return submitInternal(getBucket(key), task, null, null, false);
105+
return submitInternal(getBucket(key), task, false);
108106
}
109107

110108
public <R> @NotNull ListenableFuture<R> submit(final int bucketIndex, @NotNull final Task<R> task) {
111109
//noinspection ConstantConditions (futuer is never null if the callbacks are null)
112-
return submitInternal(bucketIndex, task, null, null, false);
113-
}
114-
115-
116-
public <R> @Nullable ListenableFuture<R> submit(
117-
final int bucketIndex,
118-
final @NotNull Task<R> task,
119-
@Nullable final SingleWriterService.SuccessCallback<R> successCallback,
120-
@Nullable final SingleWriterService.FailedCallback failedCallback) {
121-
122-
return submitInternal(bucketIndex, task, successCallback, failedCallback, false);
110+
return submitInternal(bucketIndex, task, false);
123111
}
124112

125-
126113
private <R> @Nullable ListenableFuture<R> submitInternal(
127114
final int bucketIndex,
128115
final @NotNull Task<R> task,
129-
@Nullable final SingleWriterService.SuccessCallback<R> successCallback,
130-
@Nullable final SingleWriterService.FailedCallback failedCallback,
131116
final boolean ignoreShutdown) {
132117
if (!ignoreShutdown && shutdown.get() && System.currentTimeMillis() - shutdownStartTime > shutdownGracePeriod) {
133118
return SettableFuture.create(); // Future will never return since we are shutting down.
134119
}
135120
final int queueIndex = bucketIndex / bucketsPerQueue;
136-
final SettableFuture<R> resultFuture;
137-
if (successCallback == null) {
138-
resultFuture = SettableFuture.create();
139-
} else {
140-
resultFuture = null;
141-
}
121+
final SettableFuture<R> resultFuture = SettableFuture.create();
142122

143123
final MpscUnboundedArrayQueue<Runnable> queue = queues[queueIndex];
144124
final AtomicInteger wip = wips[queueIndex];
145125
queue.offer(() -> {
146126
try {
147127
final R result = task.doTask(bucketIndex);
148-
if (resultFuture != null) {
149-
resultFuture.set(result);
150-
} else {
151-
successCallback.afterTask(result);
152-
}
128+
resultFuture.set(result);
153129
} catch (final Throwable e) {
154-
if (resultFuture != null) {
155-
resultFuture.setException(e);
156-
} else {
157-
if (failedCallback != null) {
158-
failedCallback.afterTask(e);
159-
}
160-
}
130+
resultFuture.setException(e);
161131
}
162132
});
163133

@@ -194,7 +164,8 @@ public InMemoryProducerQueues(final int persistenceBucketCount, final int amount
194164
* @return a list of listenableFutures of type R
195165
*/
196166
public @NotNull <R> List<ListenableFuture<R>> submitToAllBuckets(
197-
final @NotNull Task<R> task, final boolean parallel) {
167+
final @NotNull Task<R> task,
168+
final boolean parallel) {
198169
if (parallel) {
199170
return submitToAllBucketsParallel(task, false);
200171
} else {
@@ -214,11 +185,12 @@ public InMemoryProducerQueues(final int persistenceBucketCount, final int amount
214185
}
215186

216187
private @NotNull <R> List<ListenableFuture<R>> submitToAllBucketsParallel(
217-
final @NotNull Task<R> task, final boolean ignoreShutdown) {
188+
final @NotNull Task<R> task,
189+
final boolean ignoreShutdown) {
218190
final ImmutableList.Builder<ListenableFuture<R>> builder = ImmutableList.builder();
219191
for (int bucket = 0; bucket < persistenceBucketCount; bucket++) {
220192
//noinspection ConstantConditions (futuer is never null if the callbacks are null)
221-
builder.add(submitInternal(bucket, task, null, null, ignoreShutdown));
193+
builder.add(submitInternal(bucket, task, ignoreShutdown));
222194
}
223195
return builder.build();
224196
}
@@ -239,12 +211,10 @@ public InMemoryProducerQueues(final int persistenceBucketCount, final int amount
239211
return builder.build();
240212
}
241213

242-
243214
public int getBucket(final @NotNull String key) {
244215
return BucketUtils.getBucket(key, persistenceBucketCount);
245216
}
246217

247-
248218
@NotNull
249219
public ListenableFuture<Void> shutdown(final @Nullable Task<Void> finalTask) {
250220
if (shutdown.getAndSet(true)) {
@@ -288,5 +258,4 @@ public void onFailure(final @NotNull Throwable throwable) {
288258
}, executorService);
289259
return closeFuture;
290260
}
291-
292261
}

Diff for: src/main/java/com/hivemq/persistence/InMemorySingleWriter.java

+1-13
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
*/
3535
public class InMemorySingleWriter implements SingleWriterService {
3636

37-
private static final @NotNull Logger log = LoggerFactory.getLogger(SingleWriterServiceImpl.class);
37+
private static final @NotNull Logger log = LoggerFactory.getLogger(InMemorySingleWriter.class);
3838

3939
private static final int AMOUNT_OF_PRODUCERS = 5;
4040
private static final int RETAINED_MESSAGE_QUEUE_INDEX = 0;
@@ -45,7 +45,6 @@ public class InMemorySingleWriter implements SingleWriterService {
4545

4646
private final @NotNull InMemoryProducerQueues @NotNull [] producers =
4747
new InMemoryProducerQueues[AMOUNT_OF_PRODUCERS];
48-
private final @NotNull InMemoryProducerQueues callbackProducerQueue;
4948

5049
private final int persistenceBucketCount;
5150

@@ -54,15 +53,11 @@ public InMemorySingleWriter() {
5453

5554
persistenceBucketCount = InternalConfigurations.PERSISTENCE_BUCKET_COUNT.get();
5655
final int threadPoolSize = InternalConfigurations.SINGLE_WRITER_THREAD_POOL_SIZE.get();
57-
final int creditsPerExecution = InternalConfigurations.SINGLE_WRITER_CREDITS_PER_EXECUTION.get();
5856
final int amountOfQueues = validAmountOfQueues(threadPoolSize, persistenceBucketCount);
5957

6058
for (int i = 0; i < producers.length; i++) {
6159
producers[i] = new InMemoryProducerQueues(persistenceBucketCount, amountOfQueues);
6260
}
63-
callbackProducerQueue = new InMemoryProducerQueues(persistenceBucketCount, amountOfQueues);
64-
65-
6661
}
6762

6863
@VisibleForTesting
@@ -95,13 +90,6 @@ int validAmountOfQueues(final int processorCount, final int bucketCount) {
9590
return producers[ATTRIBUTE_STORE_QUEUE_INDEX];
9691
}
9792

98-
public @NotNull Executor callbackExecutor(final @NotNull String key) {
99-
return command -> callbackProducerQueue.submit(key, (bucketIndex) -> {
100-
command.run();
101-
return null; // this is fine, because Executors dont return anything. The return value will not be used.
102-
});
103-
}
104-
10593
public int getPersistenceBucketCount() {
10694
return persistenceBucketCount;
10795
}

Diff for: src/main/java/com/hivemq/persistence/ProducerQueues.java

+9-16
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,18 @@
2626
*/
2727
public interface ProducerQueues {
2828

29+
<R> @NotNull ListenableFuture<R> submit(@NotNull String key, @NotNull SingleWriterServiceImpl.Task<R> task);
2930

30-
<R> @NotNull ListenableFuture<R> submit(
31-
@NotNull final String key, @NotNull final SingleWriterServiceImpl.Task<R> task);
31+
<R> @NotNull ListenableFuture<R> submit(int bucketIndex, @NotNull SingleWriterServiceImpl.Task<R> task);
3232

33-
<R> @NotNull ListenableFuture<R> submit(final int bucketIndex, @NotNull final SingleWriterServiceImpl.Task<R> task);
33+
@NotNull
34+
<R> List<ListenableFuture<R>> submitToAllBucketsParallel(@NotNull SingleWriterService.Task<R> task);
3435

36+
@NotNull
37+
<R> List<ListenableFuture<R>> submitToAllBucketsSequential(@NotNull SingleWriterService.Task<R> task);
3538

36-
<R> @Nullable ListenableFuture<R> submit(
37-
final int bucketIndex,
38-
@NotNull final SingleWriterServiceImpl.Task<R> task,
39-
@Nullable final SingleWriterServiceImpl.SuccessCallback<R> successCallback,
40-
@Nullable final SingleWriterServiceImpl.FailedCallback failedCallback);
41-
42-
@NotNull <R> List<ListenableFuture<R>> submitToAllBucketsParallel(final @NotNull SingleWriterService.Task<R> task);
43-
44-
@NotNull <R> List<ListenableFuture<R>> submitToAllBucketsSequential(final @NotNull SingleWriterService.Task<R> task);
45-
46-
int getBucket(@NotNull final String key);
47-
48-
@NotNull ListenableFuture<Void> shutdown(final @Nullable SingleWriterServiceImpl.Task<Void> finalTask);
39+
int getBucket(@NotNull String key);
4940

41+
@NotNull
42+
ListenableFuture<Void> shutdown(@Nullable SingleWriterServiceImpl.Task<Void> finalTask);
5043
}

0 commit comments

Comments
 (0)