Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First draft of implementing a new blocking queue #207

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/main/java/stormpot/internal/AllocationProcess.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import stormpot.Poolable;

import java.util.concurrent.LinkedTransferQueue;

import static stormpot.internal.AllocationProcessMode.DIRECT;
import static stormpot.internal.AllocationProcessMode.INLINE;
import static stormpot.internal.AllocationProcessMode.THREADED;
Expand All @@ -35,7 +33,7 @@ public static AllocationProcess threaded() {
return new AllocationProcess(THREADED) {
@Override
<T extends Poolable> AllocationController<T> buildAllocationController(
LinkedTransferQueue<BSlot<T>> live,
MpmcChunkedBlockingQueue<BSlot<T>> live,
RefillPile<T> disregardPile,
RefillPile<T> newAllocations,
PoolBuilderImpl<T> builder,
Expand All @@ -54,7 +52,7 @@ public static AllocationProcess inline() {
return new AllocationProcess(INLINE) {
@Override
<T extends Poolable> AllocationController<T> buildAllocationController(
LinkedTransferQueue<BSlot<T>> live,
MpmcChunkedBlockingQueue<BSlot<T>> live,
RefillPile<T> disregardPile,
RefillPile<T> newAllocations,
PoolBuilderImpl<T> builder,
Expand All @@ -73,7 +71,7 @@ public static AllocationProcess direct() {
return new AllocationProcess(DIRECT) {
@Override
<T extends Poolable> AllocationController<T> buildAllocationController(
LinkedTransferQueue<BSlot<T>> live,
MpmcChunkedBlockingQueue<BSlot<T>> live,
RefillPile<T> disregardPile,
RefillPile<T> newAllocations,
PoolBuilderImpl<T> builder,
Expand Down Expand Up @@ -103,7 +101,7 @@ public AllocationProcessMode getMode() {
}

abstract <T extends Poolable> AllocationController<T> buildAllocationController(
LinkedTransferQueue<BSlot<T>> live,
MpmcChunkedBlockingQueue<BSlot<T>> live,
RefillPile<T> disregardPile,
RefillPile<T> newAllocations,
PoolBuilderImpl<T> builder,
Expand Down
24 changes: 8 additions & 16 deletions src/main/java/stormpot/internal/BAllocThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
Expand All @@ -47,7 +45,7 @@ public final class BAllocThread<T extends Poolable> implements Runnable {
*/
private static final long shutdownPauseNanos = MILLISECONDS.toNanos(10);

private final LinkedTransferQueue<BSlot<T>> live;
private final MpmcChunkedBlockingQueue<BSlot<T>> live;
private final RefillPile<T> disregardPile;
private final RefillPile<T> newAllocations;
private final BSlot<T> poisonPill;
Expand All @@ -56,11 +54,11 @@ public final class BAllocThread<T extends Poolable> implements Runnable {
private final boolean backgroundExpirationEnabled;
private final PreciseLeakDetector leakDetector;
private final StackCompletion shutdownCompletion;
private final BlockingQueue<BSlot<T>> dead;
private final MpmcChunkedBlockingQueue<BSlot<T>> dead;
private final AtomicLong poisonedSlots;
private final long defaultDeadPollTimeout;
private final boolean optimizeForMemory;
private final LinkedTransferQueue<AllocatorSwitch<T>> switchRequests;
private final MpmcChunkedBlockingQueue<AllocatorSwitch<T>> switchRequests;

// Single reader: this. Many writers.
private volatile long targetSize;
Expand All @@ -78,7 +76,7 @@ public final class BAllocThread<T extends Poolable> implements Runnable {
private long priorGenerationObjectsToReplace;

BAllocThread(
LinkedTransferQueue<BSlot<T>> live,
MpmcChunkedBlockingQueue<BSlot<T>> live,
RefillPile<T> disregardPile,
RefillPile<T> newAllocations,
PoolBuilderImpl<T> builder,
Expand All @@ -95,11 +93,11 @@ public final class BAllocThread<T extends Poolable> implements Runnable {
this.leakDetector = builder.isPreciseLeakDetectionEnabled() ?
new PreciseLeakDetector() : null;
this.shutdownCompletion = new StackCompletion();
this.dead = new LinkedTransferQueue<>();
this.dead = new MpmcChunkedBlockingQueue<>();
this.poisonedSlots = new AtomicLong();
this.defaultDeadPollTimeout = builder.getBackgroundExpirationCheckDelay();
this.optimizeForMemory = builder.isOptimizeForReducedMemoryUsage();
switchRequests = new LinkedTransferQueue<>();
switchRequests = new MpmcChunkedBlockingQueue<>();
this.size = 0;
this.didAnythingLastIteration = true; // start out busy
}
Expand Down Expand Up @@ -479,14 +477,8 @@ long allocatedSize() {
}

long inUse() {
long inUse = 0;
long liveSize = 0;
for (BSlot<T> slot: live) {
liveSize++;
if (slot.isClaimedOrThreadLocal()) {
inUse++;
}
}
long inUse = live.count(BSlot::isClaimedOrThreadLocal);
long liveSize = live.size();
return size - liveSize + inUse;
}
}
5 changes: 2 additions & 3 deletions src/main/java/stormpot/internal/BSlot.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.lang.ref.Reference;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -53,7 +52,7 @@ public class BSlot<T extends Poolable> implements Slot, SlotInfo<T> {
@SuppressWarnings("FieldMayBeFinal")
private volatile int state;

final BlockingQueue<BSlot<T>> live;
final MpmcChunkedBlockingQueue<BSlot<T>> live;
final AtomicLong poisonedSlots;
long stamp;
long createdNanos;
Expand Down Expand Up @@ -81,7 +80,7 @@ public class BSlot<T extends Poolable> implements Slot, SlotInfo<T> {
* @param live The queue of live slots.
* @param poisonedSlots The counter of poisoned slots.
*/
public BSlot(BlockingQueue<BSlot<T>> live, AtomicLong poisonedSlots) {
public BSlot(MpmcChunkedBlockingQueue<BSlot<T>> live, AtomicLong poisonedSlots) {
// Volatile write in the constructor: This object must be safely published,
// so that we are sure that the volatile write happens-before other
// threads observe the pointer to this object.
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/stormpot/internal/BSlotPadded.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import stormpot.Poolable;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -42,7 +41,7 @@ public class BSlotPadded<T extends Poolable> extends BSlot<T> {
* @param live The queue of live slots.
* @param poisonedSlots The counter of poisoned slots.
*/
public BSlotPadded(BlockingQueue<BSlot<T>> live, AtomicLong poisonedSlots) {
public BSlotPadded(MpmcChunkedBlockingQueue<BSlot<T>> live, AtomicLong poisonedSlots) {
super(live, poisonedSlots);
}
}
5 changes: 2 additions & 3 deletions src/main/java/stormpot/internal/BlazePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Objects;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -71,7 +70,7 @@ public final class BlazePool<T extends Poolable> implements Pool<T>, ManagedPool
private static final Exception SHUTDOWN_POISON = new PoisonException("Stormpot Poison: Shutdown");
static final Exception EXPLICIT_EXPIRE_POISON = new PoisonException("Stormpot Poison: Expired");

private final LinkedTransferQueue<BSlot<T>> live;
private final MpmcChunkedBlockingQueue<BSlot<T>> live;
private final RefillPile<T> disregardPile;
private final RefillPile<T> newAllocations;
private final AllocationController<T> allocator;
Expand All @@ -96,7 +95,7 @@ public final class BlazePool<T extends Poolable> implements Pool<T>, ManagedPool
* @param factory The allocation process that builds the {@link AllocationController} used by this pool.
*/
public BlazePool(PoolBuilderImpl<T> builder, AllocationProcess factory) {
live = new LinkedTransferQueue<>();
live = new MpmcChunkedBlockingQueue<>();
disregardPile = new RefillPile<>(live);
newAllocations = new RefillPile<>(live);
optimizeForMemory = builder.isOptimizeForReducedMemoryUsage();
Expand Down
15 changes: 4 additions & 11 deletions src/main/java/stormpot/internal/DirectAllocationController.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import stormpot.PoolBuilder;
import stormpot.Poolable;

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -29,15 +28,15 @@
* @param <T> The concrete poolable type.
*/
public final class DirectAllocationController<T extends Poolable> extends AllocationController<T> {
private final LinkedTransferQueue<BSlot<T>> live;
private final MpmcChunkedBlockingQueue<BSlot<T>> live;
private final BSlot<T> poisonPill;
private final long size;
private final AtomicLong shutdownState;
private final AtomicLong poisonedSlots;
private final StackCompletion shutdownCompletion;

DirectAllocationController(
LinkedTransferQueue<BSlot<T>> live,
MpmcChunkedBlockingQueue<BSlot<T>> live,
RefillPile<T> disregardPile,
PoolBuilder<T> builder,
BSlot<T> poisonPill) {
Expand Down Expand Up @@ -141,14 +140,8 @@ public long allocatedSize() {

@Override
long inUse() {
long inUse = 0;
long liveSize = 0;
for (BSlot<T> slot: live) {
liveSize++;
if (slot.isClaimedOrThreadLocal()) {
inUse++;
}
}
long inUse = live.count(BSlot::isClaimedOrThreadLocal);
long liveSize = live.size();
return size - liveSize + inUse;
}
}
19 changes: 6 additions & 13 deletions src/main/java/stormpot/internal/InlineAllocationController.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.lang.invoke.VarHandle;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -54,7 +53,7 @@ public final class InlineAllocationController<T extends Poolable> extends Alloca
}
}

private final LinkedTransferQueue<BSlot<T>> live;
private final MpmcChunkedBlockingQueue<BSlot<T>> live;
private final RefillPile<T> disregardPile;
private final RefillPile<T> newAllocations;
private final BSlot<T> poisonPill;
Expand All @@ -63,7 +62,7 @@ public final class InlineAllocationController<T extends Poolable> extends Alloca
private final PreciseLeakDetector leakDetector;
private final boolean optimizeForMemory;
private final StackCompletion shutdownCompletion;
private final LinkedTransferQueue<AllocatorSwitch<T>> switchRequests;
private final MpmcChunkedBlockingQueue<AllocatorSwitch<T>> switchRequests;

private volatile long targetSize;
@SuppressWarnings("unused") // Assigned via VarHandle.
Expand All @@ -79,7 +78,7 @@ public final class InlineAllocationController<T extends Poolable> extends Alloca
private long priorGenerationObjectsToReplace;

InlineAllocationController(
LinkedTransferQueue<BSlot<T>> live,
MpmcChunkedBlockingQueue<BSlot<T>> live,
RefillPile<T> disregardPile,
RefillPile<T> newAllocations,
PoolBuilderImpl<T> builder,
Expand All @@ -94,7 +93,7 @@ public final class InlineAllocationController<T extends Poolable> extends Alloca
optimizeForMemory = builder.isOptimizeForReducedMemoryUsage();
leakDetector = builder.isPreciseLeakDetectionEnabled() ?
new PreciseLeakDetector() : null;
switchRequests = new LinkedTransferQueue<>();
switchRequests = new MpmcChunkedBlockingQueue<>();
setTargetSize(builder.getSize());
shutdownCompletion = new StackCompletion(this::shutdownCompletion);
}
Expand Down Expand Up @@ -475,14 +474,8 @@ public long allocatedSize() {

@Override
long inUse() {
long inUse = 0;
long liveSize = 0;
for (BSlot<T> slot: live) {
liveSize++;
if (slot.isClaimedOrThreadLocal()) {
inUse++;
}
}
long inUse = live.count(BSlot::isClaimedOrThreadLocal);
long liveSize = live.size();
return size - liveSize + inUse;
}
}
Loading
Loading