From 9efc8c7cb5dd1edc3ca2be7968d5243ccd643ea6 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 12 Sep 2025 16:34:49 +0800 Subject: [PATCH 1/3] replace ConcurrentBitSetRecyclable with BitSet --- .../pulsar/client/impl/ConsumerImpl.java | 11 ++---- ...sistentAcknowledgmentsGroupingTracker.java | 39 +++++++++---------- .../pulsar/common/protocol/Commands.java | 21 ++-------- 3 files changed, 26 insertions(+), 45 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 6d5305136b162..2db5fa55a05fc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -136,7 +136,6 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.SafeCollectionUtils; import org.apache.pulsar.common.util.collections.BitSetRecyclable; -import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -3184,8 +3183,7 @@ private CompletableFuture doTransactionAcknowledgeForResponse(MessageId me } else { if (Commands.peerSupportsMultiMessageAcknowledgment( getClientCnx().getRemoteEndpointProtocolVersion())) { - List> entriesToAck = - new ArrayList<>(chunkMsgIds.length); + List> entriesToAck = new ArrayList<>(chunkMsgIds.length); for (MessageIdImpl cMsgId : chunkMsgIds) { if (cMsgId != null && chunkMsgIds.length > 1) { entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null)); @@ -3222,7 +3220,7 @@ private CompletableFuture doTransactionAcknowledgeForResponse(MessageId me } private ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID, - List> entries, + List> entries, long requestID) { BaseCommand cmd = newMultiMessageAckCommon(entries); cmd.getAck() @@ -3241,7 +3239,7 @@ protected BaseCommand initialValue() throws Exception { } }; - private static BaseCommand newMultiMessageAckCommon(List> entries) { + private static BaseCommand newMultiMessageAckCommon(List> entries) { BaseCommand cmd = LOCAL_BASE_COMMAND.get() .clear() .setType(BaseCommand.Type.ACK); @@ -3250,7 +3248,7 @@ private static BaseCommand newMultiMessageAckCommon(List pendingIndividualAcks; + // The value is the cloned bit set returned by `MessageIdAdv#getAckSet()` of a batch message's message id, when + // accessing the value, you must guarantee the thread safety. @VisibleForTesting - final ConcurrentSkipListMap pendingIndividualBatchIndexAcks; + final ConcurrentSkipListMap pendingIndividualBatchIndexAcks; private final ScheduledFuture scheduledTask; private final boolean batchIndexAckEnabled; @@ -133,7 +134,7 @@ public boolean isDuplicate(MessageId messageId) { return true; } if (messageIdAdv.getBatchIndex() >= 0) { - ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.get(key); + BitSet bitSet = pendingIndividualBatchIndexAcks.get(key); return bitSet != null && !bitSet.get(messageIdAdv.getBatchIndex()); } return false; @@ -327,26 +328,23 @@ private CompletableFuture doCumulativeAck(MessageIdAdv messageId, Map doIndividualBatchAckAsync(MessageIdAdv msgId) { - ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( + BitSet bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( MessageIdAdvUtils.discardBatch(msgId), __ -> { - final BitSet ackSet = msgId.getAckSet(); - final ConcurrentBitSetRecyclable value; + final var ackSet = msgId.getAckSet(); + final var size = msgId.getBatchSize(); if (ackSet != null) { synchronized (ackSet) { - if (!ackSet.isEmpty()) { - value = ConcurrentBitSetRecyclable.create(ackSet); - } else { - value = ConcurrentBitSetRecyclable.create(); - value.set(0, msgId.getBatchSize()); - } + return (BitSet) ackSet.clone(); } } else { - value = ConcurrentBitSetRecyclable.create(); - value.set(0, msgId.getBatchSize()); + final var newAckSet = new BitSet(size); + newAckSet.set(0, size); + return newAckSet; } - return value; }); - bitSet.clear(msgId.getBatchIndex()); + synchronized (bitSet) { + bitSet.clear(msgId.getBatchIndex()); + } return CompletableFuture.completedFuture(null); } @@ -445,7 +443,7 @@ private void flushAsync(ClientCnx cnx) { } // Flush all individual acks - List> entriesToAck = + List> entriesToAck = new ArrayList<>(pendingIndividualAcks.size() + pendingIndividualBatchIndexAcks.size()); if (!pendingIndividualAcks.isEmpty()) { if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) { @@ -487,8 +485,7 @@ private void flushAsync(ClientCnx cnx) { } while (true) { - Map.Entry entry = - pendingIndividualBatchIndexAcks.pollFirstEntry(); + Map.Entry entry = pendingIndividualBatchIndexAcks.pollFirstEntry(); if (entry == null) { // The entry has been removed in a different thread break; @@ -539,7 +536,7 @@ private CompletableFuture newImmediateAckAndFlush(long consumerId, Message // cumulative ack chunk by the last messageId if (chunkMsgIds != null && ackType != AckType.Cumulative) { if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) { - List> entriesToAck = new ArrayList<>(chunkMsgIds.length); + List> entriesToAck = new ArrayList<>(chunkMsgIds.length); for (MessageIdImpl cMsgId : chunkMsgIds) { if (cMsgId != null && chunkMsgIds.length > 1) { entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null)); @@ -568,7 +565,7 @@ private CompletableFuture newMessageAckCommandAndWrite( long entryId, BitSetRecyclable ackSet, AckType ackType, Map properties, boolean flush, TimedCompletableFuture timedCompletableFuture, - List> entriesToAck) { + List> entriesToAck) { if (consumer.isAckReceiptEnabled()) { final long requestId = consumer.getClient().newRequestId(); final ByteBuf cmd; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index cab4dc8bcab0e..5d159168bc34d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -32,6 +32,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Base64; +import java.util.BitSet; import java.util.Collections; import java.util.List; import java.util.Map; @@ -47,7 +48,6 @@ import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.ProducerAccessMode; import org.apache.pulsar.client.api.Range; -import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.proto.AuthMethod; @@ -111,7 +111,6 @@ import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.collections.BitSetRecyclable; -import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; @UtilityClass @Slf4j @@ -1033,25 +1032,14 @@ public static ByteBuf newLookupErrorResponse(ServerError error, String errorMsg, return serializeWithSize(newLookupErrorResponseCommand(error, errorMsg, requestId)); } - public static ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID, - List> entries) { - BaseCommand cmd = newMultiMessageAckCommon(entries); - cmd.getAck() - .setConsumerId(consumerId) - .setAckType(AckType.Individual) - .setTxnidLeastBits(txnID.getLeastSigBits()) - .setTxnidMostBits(txnID.getMostSigBits()); - return serializeWithSize(cmd); - } - - private static BaseCommand newMultiMessageAckCommon(List> entries) { + private static BaseCommand newMultiMessageAckCommon(List> entries) { BaseCommand cmd = localCmd(Type.ACK); CommandAck ack = cmd.setAck(); int entriesCount = entries.size(); for (int i = 0; i < entriesCount; i++) { long ledgerId = entries.get(i).getLeft(); long entryId = entries.get(i).getMiddle(); - ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight(); + BitSet bitSet = entries.get(i).getRight(); MessageIdData msgId = ack.addMessageId() .setLedgerId(ledgerId) .setEntryId(entryId); @@ -1060,7 +1048,6 @@ private static BaseCommand newMultiMessageAckCommon(List> entries, + List> entries, long requestId) { BaseCommand cmd = newMultiMessageAckCommon(entries); cmd.getAck() From 2abc7d360222ee9ac10338f481bef158e834bee1 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 12 Sep 2025 20:02:53 +0800 Subject: [PATCH 2/3] Remove ConcurrentBitSet and the recyclable object --- ...sistentAcknowledgmentsGroupingTracker.java | 6 +- .../util/collections/ConcurrentBitSet.java | 468 ------------------ .../ConcurrentBitSetRecyclable.java | 60 --- .../ConcurrentBitSetRecyclableTest.java | 60 --- 4 files changed, 2 insertions(+), 592 deletions(-) delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java delete mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index cc226c39c17d7..5a9ea4016029c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -331,12 +331,10 @@ CompletableFuture doIndividualBatchAckAsync(MessageIdAdv msgId) { BitSet bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( MessageIdAdvUtils.discardBatch(msgId), __ -> { final var ackSet = msgId.getAckSet(); - final var size = msgId.getBatchSize(); if (ackSet != null) { - synchronized (ackSet) { - return (BitSet) ackSet.clone(); - } + return ackSet; } else { + final var size = msgId.getBatchSize(); final var newAckSet = new BitSet(size); newAckSet.set(0, size); return newAckSet; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java deleted file mode 100644 index a37628cb300b8..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java +++ /dev/null @@ -1,468 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import java.util.BitSet; -import java.util.concurrent.locks.StampedLock; -import java.util.stream.IntStream; - -/** - * A {@code BitSet} that is protected by a {@code StampedLock} to provide thread-safe access. - * The {@link #length()} method is not thread safe and is not overridden because StampedLock is not reentrant. - * Use the {@link #safeLength()} method to get the length of the bit set in a thread-safe manner. - */ -public class ConcurrentBitSet extends BitSet { - - private static final long serialVersionUID = 1L; - private final StampedLock rwLock = new StampedLock(); - - public ConcurrentBitSet() { - super(); - } - - /** - * Creates a bit set whose initial size is large enough to explicitly represent bits with indices in the range - * {@code 0} through {@code nbits-1}. All bits are initially {@code false}. - * - * @param nbits the initial size of the bit set - * @throws NegativeArraySizeException if the specified initial size is negative - */ - public ConcurrentBitSet(int nbits) { - super(nbits); - } - - @Override - public boolean get(int bitIndex) { - long stamp = rwLock.tryOptimisticRead(); - boolean isSet = super.get(bitIndex); - if (!rwLock.validate(stamp)) { - stamp = rwLock.readLock(); - try { - isSet = super.get(bitIndex); - } finally { - rwLock.unlockRead(stamp); - } - } - return isSet; - } - - @Override - public void set(int bitIndex) { - long stamp = rwLock.writeLock(); - try { - super.set(bitIndex); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void clear(int bitIndex) { - long stamp = rwLock.writeLock(); - try { - super.clear(bitIndex); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void set(int fromIndex, int toIndex) { - long stamp = rwLock.writeLock(); - try { - super.set(fromIndex, toIndex); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void clear(int fromIndex, int toIndex) { - long stamp = rwLock.writeLock(); - try { - super.clear(fromIndex, toIndex); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void clear() { - long stamp = rwLock.writeLock(); - try { - super.clear(); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public int nextSetBit(int fromIndex) { - long stamp = rwLock.tryOptimisticRead(); - int nextSetBit = super.nextSetBit(fromIndex); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - nextSetBit = super.nextSetBit(fromIndex); - } finally { - rwLock.unlockRead(stamp); - } - } - return nextSetBit; - } - - @Override - public int nextClearBit(int fromIndex) { - long stamp = rwLock.tryOptimisticRead(); - int nextClearBit = super.nextClearBit(fromIndex); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - nextClearBit = super.nextClearBit(fromIndex); - } finally { - rwLock.unlockRead(stamp); - } - } - return nextClearBit; - } - - @Override - public int previousSetBit(int fromIndex) { - long stamp = rwLock.tryOptimisticRead(); - int previousSetBit = super.previousSetBit(fromIndex); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - previousSetBit = super.previousSetBit(fromIndex); - } finally { - rwLock.unlockRead(stamp); - } - } - return previousSetBit; - } - - @Override - public int previousClearBit(int fromIndex) { - long stamp = rwLock.tryOptimisticRead(); - int previousClearBit = super.previousClearBit(fromIndex); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - previousClearBit = super.previousClearBit(fromIndex); - } finally { - rwLock.unlockRead(stamp); - } - } - return previousClearBit; - } - - @Override - public boolean isEmpty() { - long stamp = rwLock.tryOptimisticRead(); - boolean isEmpty = super.isEmpty(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - isEmpty = super.isEmpty(); - } finally { - rwLock.unlockRead(stamp); - } - } - return isEmpty; - } - - @Override - public int cardinality() { - long stamp = rwLock.tryOptimisticRead(); - int cardinality = super.cardinality(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - cardinality = super.cardinality(); - } finally { - rwLock.unlockRead(stamp); - } - } - return cardinality; - } - - @Override - public int size() { - long stamp = rwLock.tryOptimisticRead(); - int size = super.size(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - size = super.size(); - } finally { - rwLock.unlockRead(stamp); - } - } - return size; - } - - @Override - public byte[] toByteArray() { - long stamp = rwLock.tryOptimisticRead(); - byte[] byteArray = super.toByteArray(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - byteArray = super.toByteArray(); - } finally { - rwLock.unlockRead(stamp); - } - } - return byteArray; - } - - @Override - public long[] toLongArray() { - long stamp = rwLock.tryOptimisticRead(); - long[] longArray = super.toLongArray(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - longArray = super.toLongArray(); - } finally { - rwLock.unlockRead(stamp); - } - } - return longArray; - } - - @Override - public void flip(int bitIndex) { - long stamp = rwLock.writeLock(); - try { - super.flip(bitIndex); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void flip(int fromIndex, int toIndex) { - long stamp = rwLock.writeLock(); - try { - super.flip(fromIndex, toIndex); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void set(int bitIndex, boolean value) { - long stamp = rwLock.writeLock(); - try { - super.set(bitIndex, value); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void set(int fromIndex, int toIndex, boolean value) { - long stamp = rwLock.writeLock(); - try { - super.set(fromIndex, toIndex, value); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public BitSet get(int fromIndex, int toIndex) { - long stamp = rwLock.tryOptimisticRead(); - BitSet bitSet = super.get(fromIndex, toIndex); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - bitSet = super.get(fromIndex, toIndex); - } finally { - rwLock.unlockRead(stamp); - } - } - return bitSet; - } - - /** - * Thread-safe version of {@code length()}. - * StampedLock is not reentrant and that's why the length() method is not overridden. Overriding length() method - * would require to use a reentrant lock which would be less performant. - * - * @return length of the bit set - */ - public int safeLength() { - long stamp = rwLock.tryOptimisticRead(); - int length = super.length(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - length = super.length(); - } finally { - rwLock.unlockRead(stamp); - } - } - return length; - } - - @Override - public boolean intersects(BitSet set) { - long stamp = rwLock.writeLock(); - try { - return super.intersects(set); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void and(BitSet set) { - long stamp = rwLock.writeLock(); - try { - super.and(set); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void or(BitSet set) { - long stamp = rwLock.writeLock(); - try { - super.or(set); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void xor(BitSet set) { - long stamp = rwLock.writeLock(); - try { - super.xor(set); - } finally { - rwLock.unlockWrite(stamp); - } - } - - @Override - public void andNot(BitSet set) { - long stamp = rwLock.writeLock(); - try { - super.andNot(set); - } finally { - rwLock.unlockWrite(stamp); - } - } - - /** - * Returns the clone of the internal wrapped {@code BitSet}. - * This won't be a clone of the {@code ConcurrentBitSet} object. - * - * @return a clone of the internal wrapped {@code BitSet} - */ - @Override - public Object clone() { - long stamp = rwLock.tryOptimisticRead(); - BitSet clonedBitSet = (BitSet) super.clone(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - clonedBitSet = (BitSet) super.clone(); - } finally { - rwLock.unlockRead(stamp); - } - } - return clonedBitSet; - } - - @Override - public String toString() { - long stamp = rwLock.tryOptimisticRead(); - String str = super.toString(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - str = super.toString(); - } finally { - rwLock.unlockRead(stamp); - } - } - return str; - } - - /** - * This operation is not supported on {@code ConcurrentBitSet}. - */ - @Override - public IntStream stream() { - throw new UnsupportedOperationException("stream is not supported"); - } - - public boolean equals(final Object o) { - if (o == this) { - return true; - } - if (!(o instanceof ConcurrentBitSet)) { - return false; - } - long stamp = rwLock.tryOptimisticRead(); - boolean isEqual = super.equals(o); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - isEqual = super.equals(o); - } finally { - rwLock.unlockRead(stamp); - } - } - return isEqual; - } - - public int hashCode() { - long stamp = rwLock.tryOptimisticRead(); - int hashCode = super.hashCode(); - if (!rwLock.validate(stamp)) { - // Fallback to read lock - stamp = rwLock.readLock(); - try { - hashCode = super.hashCode(); - } finally { - rwLock.unlockRead(stamp); - } - } - return hashCode; - } -} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java deleted file mode 100644 index 0ba409b2d7d17..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclable.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import io.netty.util.Recycler; -import io.netty.util.Recycler.Handle; -import lombok.EqualsAndHashCode; - -import java.util.BitSet; - -/** - * Safe multithreaded version of {@code BitSet} and leverage netty recycler. - */ -@EqualsAndHashCode(callSuper = true) -public class ConcurrentBitSetRecyclable extends ConcurrentBitSet { - - private final Handle recyclerHandle; - - private static final Recycler RECYCLER = new Recycler() { - protected ConcurrentBitSetRecyclable newObject(Handle recyclerHandle) { - return new ConcurrentBitSetRecyclable(recyclerHandle); - } - }; - - private ConcurrentBitSetRecyclable(Handle recyclerHandle) { - super(); - this.recyclerHandle = recyclerHandle; - } - - public static ConcurrentBitSetRecyclable create() { - return RECYCLER.get(); - } - - public static ConcurrentBitSetRecyclable create(BitSet bitSet) { - ConcurrentBitSetRecyclable recyclable = RECYCLER.get(); - recyclable.or(bitSet); - return recyclable; - } - - public void recycle() { - this.clear(); - recyclerHandle.recycle(this); - } -} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java deleted file mode 100644 index b2cbfdcc0cd18..0000000000000 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentBitSetRecyclableTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util.collections; - -import java.util.BitSet; -import org.testng.Assert; -import org.testng.annotations.Test; - -public class ConcurrentBitSetRecyclableTest { - - @Test(priority = 0) - public void testRecycle() { - ConcurrentBitSetRecyclable bitset1 = ConcurrentBitSetRecyclable.create(); - bitset1.set(3); - bitset1.recycle(); - ConcurrentBitSetRecyclable bitset2 = ConcurrentBitSetRecyclable.create(); - ConcurrentBitSetRecyclable bitset3 = ConcurrentBitSetRecyclable.create(); - Assert.assertSame(bitset2, bitset1); - Assert.assertFalse(bitset2.get(3)); - Assert.assertNotSame(bitset3, bitset1); - } - - @Test(priority = 1) - public void testGenerateByBitSet() { - BitSet bitSet = new BitSet(); - ConcurrentBitSetRecyclable bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet); - Assert.assertEquals(bitSet.toByteArray(), bitSetRecyclable.toByteArray()); - - bitSet.set(0, 10); - bitSetRecyclable.recycle(); - bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet); - Assert.assertEquals(bitSet.toByteArray(), bitSetRecyclable.toByteArray()); - - bitSet.clear(5); - bitSetRecyclable.recycle(); - bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet); - Assert.assertEquals(bitSet.toByteArray(), bitSetRecyclable.toByteArray()); - - bitSet.clear(); - bitSetRecyclable.recycle(); - bitSetRecyclable = ConcurrentBitSetRecyclable.create(bitSet); - Assert.assertEquals(bitSet.toByteArray(), bitSetRecyclable.toByteArray()); - } -} From b545c988271c0ac5ed2d1210ebfa87b32e4ed8eb Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 12 Sep 2025 20:34:13 +0800 Subject: [PATCH 3/3] Remove BitSetRecyclable use cases from client --- .../pulsar/client/impl/ConsumerBase.java | 5 +- .../pulsar/client/impl/ConsumerImpl.java | 50 ++++++++---------- .../impl/MessagePayloadContextImpl.java | 16 ++---- ...sistentAcknowledgmentsGroupingTracker.java | 51 ++++++++----------- .../pulsar/common/protocol/Commands.java | 7 ++- .../common/util/SafeCollectionUtils.java | 39 -------------- 6 files changed, 52 insertions(+), 116 deletions(-) delete mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/SafeCollectionUtils.java diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 27e2216e58949..1ac31adb385b6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -25,6 +25,7 @@ import io.netty.util.Timeout; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collections; import java.util.List; import java.util.Map; @@ -68,8 +69,8 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1309,7 +1310,7 @@ protected boolean isValidConsumerEpoch(MessageImpl message) { return true; } - protected boolean isSingleMessageAcked(BitSetRecyclable ackBitSet, int batchIndex) { + protected boolean isSingleMessageAcked(@Nullable BitSet ackBitSet, int batchIndex) { return ackBitSet != null && !ackBitSet.get(batchIndex); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 2db5fa55a05fc..d706b8962944b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -134,9 +134,8 @@ import org.apache.pulsar.common.util.CompletableFutureCancellationHandler; import org.apache.pulsar.common.util.ExceptionHandler; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.SafeCollectionUtils; -import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1287,7 +1286,7 @@ protected MessageImpl newSingleMessage(final int index, final MessageIdImpl messageId, final Schema schema, final boolean containMetadata, - final BitSetRecyclable ackBitSet, + @Nullable final BitSet ackBitSet, final BitSet ackSetInMessageId, final int redeliveryCount, final long consumerEpoch, @@ -1395,7 +1394,7 @@ protected void processPayloadByProcessor(final BrokerEntryMetadata brokerEntryMe final MessageIdImpl messageId, final Schema schema, final int redeliveryCount, - final List ackSet, + final long[] ackSet, long consumerEpoch) { final MessagePayloadImpl payload = MessagePayloadImpl.create(byteBuf); final MessagePayloadContextImpl entryContext = MessagePayloadContextImpl.get( @@ -1425,12 +1424,14 @@ protected void processPayloadByProcessor(final BrokerEntryMetadata brokerEntryMe } void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, ClientCnx cnx) { - List ackSet = Collections.emptyList(); + final long[] ackSet; if (cmdMessage.getAckSetsCount() > 0) { - ackSet = new ArrayList<>(cmdMessage.getAckSetsCount()); + ackSet = new long[cmdMessage.getAckSetsCount()]; for (int i = 0; i < cmdMessage.getAckSetsCount(); i++) { - ackSet.add(cmdMessage.getAckSetAt(i)); + ackSet[i] = cmdMessage.getAckSetAt(i); } + } else { + ackSet = new long[0]; } int redeliveryCount = cmdMessage.getRedeliveryCount(); MessageIdData messageId = cmdMessage.getMessageId(); @@ -1766,7 +1767,7 @@ private void interceptAndComplete(final Message message, final CompletableFut } void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata, - int redeliveryCount, List ackSet, ByteBuf uncompressedPayload, + int redeliveryCount, long[] ackSet, ByteBuf uncompressedPayload, MessageIdData messageId, ClientCnx cnx, long consumerEpoch, boolean isEncrypted) { int batchSize = msgMetadata.getNumMessagesInBatch(); @@ -1780,10 +1781,7 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, } BitSet ackSetInMessageId = BatchMessageIdImpl.newAckSet(batchSize); - BitSetRecyclable ackBitSet = null; - if (ackSet != null && ackSet.size() > 0) { - ackBitSet = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet)); - } + BitSet ackBitSet = ackSet.length > 0 ? BitSet.valueOf(ackSet) : null; SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); int skippedMessages = 0; @@ -1817,9 +1815,6 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, } executeNotifyCallback(message); } - if (ackBitSet != null) { - ackBitSet.recycle(); - } } catch (IllegalStateException e) { log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName, e); discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError); @@ -2675,11 +2670,10 @@ public CompletableFuture seekAsync(MessageId messageId) { } else { final long[] ackSetArr; if (MessageIdAdvUtils.isBatch(msgId)) { - final BitSetRecyclable ackSet = BitSetRecyclable.create(); + final BitSet ackSet = new BitSet(); ackSet.set(0, msgId.getBatchSize()); ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0)); ackSetArr = ackSet.toLongArray(); - ackSet.recycle(); } else { ackSetArr = new long[0]; } @@ -3162,18 +3156,17 @@ private CompletableFuture doTransactionAcknowledgeForResponse(MessageId me final long entryId = messageIdAdv.getEntryId(); final List cmdList; if (MessageIdAdvUtils.isBatch(messageIdAdv)) { - BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create(); - bitSetRecyclable.set(0, messageIdAdv.getBatchSize()); + BitSet bitSet = new BitSet(); + bitSet.set(0, messageIdAdv.getBatchSize()); if (ackType == AckType.Cumulative) { MessageIdAdvUtils.acknowledge(messageIdAdv, false); - bitSetRecyclable.clear(0, messageIdAdv.getBatchIndex() + 1); + bitSet.clear(0, messageIdAdv.getBatchIndex() + 1); } else { - bitSetRecyclable.clear(messageIdAdv.getBatchIndex()); + bitSet.clear(messageIdAdv.getBatchIndex()); } - cmdList = Collections.singletonList(Commands.newAck(consumerId, ledgerId, entryId, bitSetRecyclable, + cmdList = Collections.singletonList(Commands.newAck(consumerId, ledgerId, entryId, bitSet, ackType, validationError, properties, txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId, messageIdAdv.getBatchSize())); - bitSetRecyclable.recycle(); } else { MessageIdImpl[] chunkMsgIds = this.unAckedChunkedMessageIdSequenceMap.remove(messageIdAdv); // cumulative ack chunk by the last messageId @@ -3273,19 +3266,18 @@ private CompletableFuture doTransactionAcknowledgeForResponse(List private ConsumerImpl consumer; private int redeliveryCount; private BitSet ackSetInMessageId; - private BitSetRecyclable ackBitSet; + private BitSet ackBitSet; private long consumerEpoch; private MessagePayloadContextImpl(final Recycler.Handle handle) { @@ -64,7 +61,7 @@ public static MessagePayloadContextImpl get(final BrokerEntryMetadata brokerEntr @NonNull final MessageIdImpl messageId, @NonNull final ConsumerImpl consumer, final int redeliveryCount, - final List ackSet, + final long[] ackSet, final long consumerEpoch) { final MessagePayloadContextImpl context = RECYCLER.get(); context.consumerEpoch = consumerEpoch; @@ -75,9 +72,7 @@ public static MessagePayloadContextImpl get(final BrokerEntryMetadata brokerEntr context.consumer = consumer; context.redeliveryCount = redeliveryCount; context.ackSetInMessageId = BatchMessageIdImpl.newAckSet(context.getNumMessages()); - context.ackBitSet = (ackSet != null && ackSet.size() > 0) - ? BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet)) - : null; + context.ackBitSet = (ackSet.length > 0 ? BitSet.valueOf(ackSet) : null); return context; } @@ -90,10 +85,7 @@ public void recycle() { redeliveryCount = 0; consumerEpoch = DEFAULT_CONSUMER_EPOCH; ackSetInMessageId = null; - if (ackBitSet != null) { - ackBitSet.recycle(); - ackBitSet = null; - } + ackBitSet = null; recyclerHandle.recycle(this); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java index 5a9ea4016029c..de69550c98148 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java @@ -50,7 +50,6 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.FutureUtil; -import org.apache.pulsar.common.util.collections.BitSetRecyclable; import org.jspecify.annotations.Nullable; /** @@ -304,7 +303,7 @@ private CompletableFuture doIndividualBatchAck(MessageIdAdv batchMessageId } private CompletableFuture doCumulativeAck(MessageIdAdv messageId, Map properties, - BitSetRecyclable bitSet) { + BitSet bitSet) { consumer.getStats().incrementNumAcksSent(consumer.getUnAckedMessageTracker().removeMessagesTill(messageId)); if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an @@ -346,7 +345,7 @@ CompletableFuture doIndividualBatchAckAsync(MessageIdAdv msgId) { return CompletableFuture.completedFuture(null); } - private void doCumulativeAckAsync(MessageIdAdv msgId, BitSetRecyclable bitSet) { + private void doCumulativeAckAsync(MessageIdAdv msgId, BitSet bitSet) { // Handle concurrent updates from different threads lastCumulativeAck.update(msgId, bitSet); } @@ -357,7 +356,7 @@ private CompletableFuture doCumulativeBatchIndexAck(MessageIdAdv batchMess return doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(), batchMessageId.getBatchSize(), AckType.Cumulative, properties); } else { - BitSetRecyclable bitSet = BitSetRecyclable.create(); + BitSet bitSet = new BitSet(batchMessageId.getBatchSize()); bitSet.set(0, batchMessageId.getBatchSize()); bitSet.clear(0, batchMessageId.getBatchIndex() + 1); return doCumulativeAck(batchMessageId, null, bitSet); @@ -365,7 +364,7 @@ private CompletableFuture doCumulativeBatchIndexAck(MessageIdAdv batchMess } private CompletableFuture doImmediateAck(MessageIdAdv msgId, AckType ackType, Map properties, - BitSetRecyclable bitSet) { + BitSet bitSet) { ClientCnx cnx = consumer.getClientCnx(); if (cnx == null) { @@ -383,14 +382,14 @@ private CompletableFuture doImmediateBatchIndexAck(MessageIdAdv msgId, int return FutureUtil.failedFuture(new PulsarClientException .ConnectException("Consumer connect fail! consumer state:" + consumer.getState())); } - BitSetRecyclable bitSet; + BitSet bitSet; BitSet ackSetFromMsgId = msgId.getAckSet(); if (ackSetFromMsgId != null) { synchronized (ackSetFromMsgId) { - bitSet = BitSetRecyclable.valueOf(ackSetFromMsgId.toLongArray()); + bitSet = BitSet.valueOf(ackSetFromMsgId.toLongArray()); } } else { - bitSet = BitSetRecyclable.create(); + bitSet = new BitSet(batchSize); bitSet.set(0, batchSize); } if (ackType == AckType.Cumulative) { @@ -399,10 +398,8 @@ private CompletableFuture doImmediateBatchIndexAck(MessageIdAdv msgId, int bitSet.clear(batchIndex); } - CompletableFuture completableFuture = newMessageAckCommandAndWrite(cnx, consumer.consumerId, + return newMessageAckCommandAndWrite(cnx, consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), bitSet, ackType, properties, true, null, null); - bitSet.recycle(); - return completableFuture; } /** @@ -434,7 +431,7 @@ private void flushAsync(ClientCnx cnx) { shouldFlush = true; final MessageIdAdv messageId = lastCumulativeAckToFlush.getMessageId(); newMessageAckCommandAndWrite(cnx, consumer.consumerId, messageId.getLedgerId(), messageId.getEntryId(), - lastCumulativeAckToFlush.getBitSetRecyclable(), AckType.Cumulative, + lastCumulativeAckToFlush.getBitSet(), AckType.Cumulative, Collections.emptyMap(), false, (TimedCompletableFuture) this.currentCumulativeAckFuture, null); this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId); @@ -527,7 +524,7 @@ public void close() { } private CompletableFuture newImmediateAckAndFlush(long consumerId, MessageIdAdv msgId, - BitSetRecyclable bitSet, AckType ackType, + BitSet bitSet, AckType ackType, Map map, ClientCnx cnx) { MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId); final CompletableFuture completableFuture; @@ -560,7 +557,7 @@ private CompletableFuture newImmediateAckAndFlush(long consumerId, Message private CompletableFuture newMessageAckCommandAndWrite( ClientCnx cnx, long consumerId, long ledgerId, - long entryId, BitSetRecyclable ackSet, AckType ackType, + long entryId, BitSet ackSet, AckType ackType, Map properties, boolean flush, TimedCompletableFuture timedCompletableFuture, List> entriesToAck) { @@ -641,15 +638,12 @@ protected LastCumulativeAck initialValue() { public static final MessageIdAdv DEFAULT_MESSAGE_ID = (MessageIdAdv) MessageId.earliest; private volatile MessageIdAdv messageId = DEFAULT_MESSAGE_ID; - private BitSetRecyclable bitSetRecyclable = null; + private BitSet bitSet = null; private boolean flushRequired = false; - public synchronized void update(final MessageIdAdv messageId, final BitSetRecyclable bitSetRecyclable) { + public synchronized void update(final MessageIdAdv messageId, final BitSet bitSet) { if (compareTo(messageId) < 0) { - if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) { - this.bitSetRecyclable.recycle(); - } - set(messageId, bitSetRecyclable); + set(messageId, bitSet); flushRequired = true; } } @@ -657,8 +651,8 @@ public synchronized void update(final MessageIdAdv messageId, final BitSetRecycl public synchronized LastCumulativeAck flush() { if (flushRequired) { final LastCumulativeAck localLastCumulativeAck = LOCAL_LAST_CUMULATIVE_ACK.get(); - if (bitSetRecyclable != null) { - localLastCumulativeAck.set(messageId, BitSetRecyclable.valueOf(bitSetRecyclable.toLongArray())); + if (bitSet != null) { + localLastCumulativeAck.set(messageId, BitSet.valueOf(bitSet.toLongArray())); } else { localLastCumulativeAck.set(this.messageId, null); } @@ -671,11 +665,8 @@ public synchronized LastCumulativeAck flush() { } public synchronized void reset() { - if (bitSetRecyclable != null) { - bitSetRecyclable.recycle(); - } messageId = DEFAULT_MESSAGE_ID; - bitSetRecyclable = null; + bitSet = null; flushRequired = false; } @@ -694,16 +685,16 @@ public synchronized int compareTo(MessageIdAdv messageId) { ); } - private synchronized void set(final MessageIdAdv messageId, final BitSetRecyclable bitSetRecyclable) { + private synchronized void set(final MessageIdAdv messageId, final BitSet bitSet) { this.messageId = messageId; - this.bitSetRecyclable = bitSetRecyclable; + this.bitSet = bitSet; } @Override public String toString() { String s = messageId.toString(); - if (bitSetRecyclable != null) { - s += " (bit set: " + bitSetRecyclable + ")"; + if (bitSet != null) { + s += " (bit set: " + bitSet + ")"; } return s; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 5d159168bc34d..c1aeaf99c01c2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -110,7 +110,6 @@ import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; -import org.apache.pulsar.common.util.collections.BitSetRecyclable; @UtilityClass @Slf4j @@ -1067,13 +1066,13 @@ public static ByteBuf newMultiMessageAck(long consumerId, return serializeWithSize(cmd); } - public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, AckType ackType, + public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSet ackSet, AckType ackType, ValidationError validationError, Map properties, long requestId) { return newAck(consumerId, ledgerId, entryId, ackSet, ackType, validationError, properties, -1L, -1L, requestId, -1); } - public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, AckType ackType, + public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSet ackSet, AckType ackType, ValidationError validationError, Map properties, long txnIdLeastBits, long txnIdMostBits, long requestId, int batchSize) { BaseCommand cmd = localCmd(Type.ACK); @@ -1133,7 +1132,7 @@ private static ByteBuf newAck(ValidationError validationError, Map } - public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, AckType ackType, + public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSet ackSet, AckType ackType, ValidationError validationError, Map properties, long txnIdLeastBits, long txnIdMostBits, long requestId) { return newAck(consumerId, ledgerId, entryId, ackSet, ackType, validationError, diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SafeCollectionUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SafeCollectionUtils.java deleted file mode 100644 index 10301539b1b31..0000000000000 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SafeCollectionUtils.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.common.util; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; - -/** - * Safe collection utils. - */ -public class SafeCollectionUtils { - - public static List longArrayToList(long[] array) { - return array == null || array.length == 0 ? Collections.emptyList() - : Arrays.stream(array).boxed().collect(Collectors.toList()); - } - - public static long[] longListToArray(List list) { - return list == null || list.size() == 0 ? new long[0] : list.stream().mapToLong(l->l).toArray(); - } -}