Skip to content
This repository was archived by the owner on Dec 12, 2022. It is now read-only.

Commit 0272b1c

Browse files
authored
Merge pull request #36 from netty/rc-thread-safety
Allow slices to obtain ownership when parent is closed
2 parents 374b052 + de305bd commit 0272b1c

13 files changed

+373
-392
lines changed

src/main/java/io/netty/buffer/api/CleanerPooledDrop.java

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public void drop(Buffer buf) {
4545
GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null);
4646
if (c != null) {
4747
c.clean();
48+
delegate.drop(buf);
4849
}
4950
}
5051

@@ -57,24 +58,43 @@ public void attach(Buffer buf) {
5758
c.clean();
5859
}
5960

60-
var pool = this.pool;
6161
var mem = manager.unwrapRecoverableMemory(buf);
62-
var delegate = this.delegate;
63-
WeakReference<Buffer> ref = new WeakReference<>(buf);
62+
WeakReference<CleanerPooledDrop> ref = new WeakReference<>(this);
6463
AtomicBoolean gate = new AtomicBoolean(true);
65-
cleanable = new GatedCleanable(gate, CLEANER.register(this, () -> {
64+
cleanable = new GatedCleanable(gate, CLEANER.register(this, new CleanAction(pool, mem, ref, gate)));
65+
}
66+
67+
@Override
68+
public String toString() {
69+
return "CleanerPooledDrop(" + delegate + ')';
70+
}
71+
72+
private static final class CleanAction implements Runnable {
73+
private final SizeClassedMemoryPool pool;
74+
private final Object mem;
75+
private final WeakReference<CleanerPooledDrop> ref;
76+
private final AtomicBoolean gate;
77+
78+
private CleanAction(SizeClassedMemoryPool pool, Object mem, WeakReference<CleanerPooledDrop> ref,
79+
AtomicBoolean gate) {
80+
this.pool = pool;
81+
this.mem = mem;
82+
this.ref = ref;
83+
this.gate = gate;
84+
}
85+
86+
@Override
87+
public void run() {
6688
if (gate.getAndSet(false)) {
67-
Buffer b = ref.get();
68-
if (b == null) {
89+
var monitored = ref.get();
90+
if (monitored == null) {
6991
pool.recoverMemory(mem);
70-
} else {
71-
delegate.drop(b);
7292
}
7393
}
74-
}));
94+
}
7595
}
7696

77-
private static class GatedCleanable implements Cleanable {
97+
private static final class GatedCleanable implements Cleanable {
7898
private final AtomicBoolean gate;
7999
private final Cleanable cleanable;
80100

src/main/java/io/netty/buffer/api/CompositeBuffer.java

Lines changed: 42 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,23 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
3131
* non-composite copy of the buffer.
3232
*/
3333
private static final int MAX_CAPACITY = Integer.MAX_VALUE - 8;
34-
private static final Drop<CompositeBuffer> COMPOSITE_DROP = buf -> {
35-
for (Buffer b : buf.bufs) {
36-
b.close();
34+
private static final Drop<CompositeBuffer> COMPOSITE_DROP = new Drop<CompositeBuffer>() {
35+
@Override
36+
public void drop(CompositeBuffer buf) {
37+
for (Buffer b : buf.bufs) {
38+
b.close();
39+
}
40+
buf.makeInaccessible();
41+
}
42+
43+
@Override
44+
public String toString() {
45+
return "COMPOSITE_DROP";
3746
}
38-
buf.makeInaccessible();
3947
};
4048

4149
private final BufferAllocator allocator;
4250
private final TornBufferAccessors tornBufAccessors;
43-
private final boolean isSendable;
4451
private Buffer[] bufs;
4552
private int[] offsets; // The offset, for the composite buffer, where each constituent buffer starts.
4653
private int capacity;
@@ -52,7 +59,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
5259
private boolean readOnly;
5360

5461
CompositeBuffer(BufferAllocator allocator, Deref<Buffer>[] refs) {
55-
this(allocator, true, filterExternalBufs(refs), COMPOSITE_DROP, false);
62+
this(allocator, filterExternalBufs(refs), COMPOSITE_DROP, false);
5663
}
5764

5865
private static Buffer[] filterExternalBufs(Deref<Buffer>[] refs) {
@@ -106,11 +113,10 @@ private static Stream<Buffer> flattenBuffer(Buffer buf) {
106113
return Stream.of(buf);
107114
}
108115

109-
private CompositeBuffer(BufferAllocator allocator, boolean isSendable, Buffer[] bufs, Drop<CompositeBuffer> drop,
116+
private CompositeBuffer(BufferAllocator allocator, Buffer[] bufs, Drop<CompositeBuffer> drop,
110117
boolean acquireBufs) {
111118
super(drop);
112119
this.allocator = allocator;
113-
this.isSendable = isSendable;
114120
if (acquireBufs) {
115121
for (Buffer buf : bufs) {
116122
buf.acquire();
@@ -297,46 +303,31 @@ public Buffer slice(int offset, int length) {
297303
offset + ", and length was " + length + '.');
298304
}
299305
Buffer choice = (Buffer) chooseBuffer(offset, 0);
300-
Buffer[] slices = null;
301-
acquire(); // Increase reference count of the original composite buffer.
302-
Drop<CompositeBuffer> drop = obj -> {
303-
close(); // Decrement the reference count of the original composite buffer.
304-
COMPOSITE_DROP.drop(obj);
305-
};
306-
307-
try {
308-
if (length > 0) {
309-
slices = new Buffer[bufs.length];
310-
int off = subOffset;
311-
int cap = length;
312-
int i;
313-
for (i = searchOffsets(offset); cap > 0; i++) {
314-
var buf = bufs[i];
315-
int avail = buf.capacity() - off;
316-
slices[i] = buf.slice(off, Math.min(cap, avail));
317-
cap -= avail;
318-
off = 0;
319-
}
320-
slices = Arrays.copyOf(slices, i);
321-
} else {
322-
// Specialize for length == 0, since we must slice from at least one constituent buffer.
323-
slices = new Buffer[] { choice.slice(subOffset, 0) };
324-
}
325-
326-
return new CompositeBuffer(allocator, false, slices, drop, true);
327-
} catch (Throwable throwable) {
328-
// We called acquire prior to the try-clause. We need to undo that if we're not creating a composite buffer:
329-
close();
330-
throw throwable;
331-
} finally {
332-
if (slices != null) {
333-
for (Buffer slice : slices) {
334-
if (slice != null) {
335-
slice.close(); // Ownership now transfers to the composite buffer.
336-
}
337-
}
338-
}
306+
Buffer[] slices;
307+
308+
if (length > 0) {
309+
slices = new Buffer[bufs.length];
310+
int off = subOffset;
311+
int cap = length;
312+
int i;
313+
for (i = searchOffsets(offset); cap > 0; i++) {
314+
var buf = bufs[i];
315+
int avail = buf.capacity() - off;
316+
slices[i] = buf.slice(off, Math.min(cap, avail));
317+
cap -= avail;
318+
off = 0;
319+
}
320+
slices = Arrays.copyOf(slices, i);
321+
} else {
322+
// Specialize for length == 0, since we must slice from at least one constituent buffer.
323+
slices = new Buffer[] { choice.slice(subOffset, 0) };
339324
}
325+
326+
// Use the constructor that skips filtering out empty buffers, and skips acquiring on the buffers.
327+
// This is important because 1) slice() already acquired the buffers, and 2) if this slice is empty
328+
// then we need to keep holding on to it to prevent this originating composite buffer from getting
329+
// ownership. If it did, its behaviour would be inconsistent with that of a non-composite buffer.
330+
return new CompositeBuffer(allocator, slices, COMPOSITE_DROP, false);
340331
}
341332

342333
@Override
@@ -749,7 +740,7 @@ public Buffer bifurcate() {
749740
}
750741
if (bufs.length == 0) {
751742
// Bifurcating a zero-length buffer is trivial.
752-
return new CompositeBuffer(allocator, true, bufs, unsafeGetDrop(), true).order(order);
743+
return new CompositeBuffer(allocator, bufs, unsafeGetDrop(), true).order(order);
753744
}
754745

755746
int i = searchOffsets(woff);
@@ -761,7 +752,7 @@ public Buffer bifurcate() {
761752
}
762753
computeBufferOffsets();
763754
try {
764-
var compositeBuf = new CompositeBuffer(allocator, true, bifs, unsafeGetDrop(), true);
755+
var compositeBuf = new CompositeBuffer(allocator, bifs, unsafeGetDrop(), true);
765756
compositeBuf.order = order; // Preserve byte order even if bifs array is empty.
766757
return compositeBuf;
767758
} finally {
@@ -1164,7 +1155,7 @@ public CompositeBuffer transferOwnership(Drop<CompositeBuffer> drop) {
11641155
for (int i = 0; i < sends.length; i++) {
11651156
received[i] = sends[i].receive();
11661157
}
1167-
var composite = new CompositeBuffer(allocator, true, received, drop, true);
1158+
var composite = new CompositeBuffer(allocator, received, drop, true);
11681159
composite.readOnly = readOnly;
11691160
drop.attach(composite);
11701161
return composite;
@@ -1179,18 +1170,9 @@ void makeInaccessible() {
11791170
closed = true;
11801171
}
11811172

1182-
@Override
1183-
protected IllegalStateException notSendableException() {
1184-
if (!isSendable) {
1185-
return new IllegalStateException(
1186-
"Cannot send() this buffer. This buffer might be a slice of another buffer.");
1187-
}
1188-
return super.notSendableException();
1189-
}
1190-
11911173
@Override
11921174
public boolean isOwned() {
1193-
return isSendable && super.isOwned() && allConstituentsAreOwned();
1175+
return super.isOwned() && allConstituentsAreOwned();
11941176
}
11951177

11961178
private boolean allConstituentsAreOwned() {

src/main/java/io/netty/buffer/api/MemoryManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,6 @@ static MemoryManager getNativeMemoryManager() {
3434
Buffer allocateShared(AllocatorControl allo, long size, Drop<Buffer> drop, Cleaner cleaner);
3535
Drop<Buffer> drop();
3636
Object unwrapRecoverableMemory(Buffer buf);
37+
int capacityOfRecoverableMemory(Object memory);
3738
Buffer recoverMemory(Object recoverableMemory, Drop<Buffer> drop);
3839
}

src/main/java/io/netty/buffer/api/RcSupport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ protected RcSupport(Drop<T> drop) {
3737
@Override
3838
public final I acquire() {
3939
if (acquires < 0) {
40-
throw attachTrace(new IllegalStateException("Resource is closed."));
40+
throw attachTrace(new IllegalStateException("This resource is closed: " + this + '.'));
4141
}
4242
if (acquires == Integer.MAX_VALUE) {
4343
throw new IllegalStateException("Cannot acquire more references; counter would overflow.");
@@ -98,7 +98,7 @@ protected <E extends Throwable> E attachTrace(E throwable) {
9898
*/
9999
protected IllegalStateException notSendableException() {
100100
return new IllegalStateException(
101-
"Cannot send() a reference counted object with " + acquires + " outstanding acquires: " + this + '.');
101+
"Cannot send() a reference counted object with " + countBorrows() + " borrows: " + this + '.');
102102
}
103103

104104
@Override

src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop<B
2828
private static final VarHandle CLOSE = Statics.findVarHandle(
2929
lookup(), SizeClassedMemoryPool.class, "closed", boolean.class);
3030
private final MemoryManager manager;
31-
private final ConcurrentHashMap<Integer, ConcurrentLinkedQueue<Send<Buffer>>> pool;
31+
private final ConcurrentHashMap<Integer, ConcurrentLinkedQueue<Object>> pool;
3232
@SuppressWarnings("unused")
3333
private volatile boolean closed;
3434

@@ -41,9 +41,9 @@ protected SizeClassedMemoryPool(MemoryManager manager) {
4141
public Buffer allocate(int size) {
4242
BufferAllocator.checkSize(size);
4343
var sizeClassPool = getSizeClassPool(size);
44-
Send<Buffer> send = sizeClassPool.poll();
45-
if (send != null) {
46-
return send.receive()
44+
Object memory = sizeClassPool.poll();
45+
if (memory != null) {
46+
return recoverMemoryIntoBuffer(memory)
4747
.reset()
4848
.readOnly(false)
4949
.fill((byte) 0)
@@ -71,10 +71,10 @@ public void close() {
7171
if (CLOSE.compareAndSet(this, false, true)) {
7272
var capturedExceptions = new ArrayList<Exception>(4);
7373
pool.forEach((k, v) -> {
74-
Send<Buffer> send;
75-
while ((send = v.poll()) != null) {
74+
Object memory;
75+
while ((memory = v.poll()) != null) {
7676
try {
77-
send.receive().close();
77+
dispose(recoverMemoryIntoBuffer(memory));
7878
} catch (Exception e) {
7979
capturedExceptions.add(e);
8080
}
@@ -94,40 +94,47 @@ public void drop(Buffer buf) {
9494
dispose(buf);
9595
return;
9696
}
97-
var sizeClassPool = getSizeClassPool(buf.capacity());
98-
sizeClassPool.offer(buf.send());
97+
Object mem = manager.unwrapRecoverableMemory(buf);
98+
var sizeClassPool = getSizeClassPool(manager.capacityOfRecoverableMemory(mem));
99+
sizeClassPool.offer(mem);
99100
if (closed) {
100-
Send<Buffer> send;
101-
while ((send = sizeClassPool.poll()) != null) {
102-
send.receive().close();
101+
Object memory;
102+
while ((memory = sizeClassPool.poll()) != null) {
103+
dispose(recoverMemoryIntoBuffer(memory));
103104
}
104105
}
105106
}
106107

108+
@Override
109+
public String toString() {
110+
return "SizeClassedMemoryPool";
111+
}
112+
107113
@Override
108114
public Object allocateUntethered(Buffer originator, int size) {
109115
var sizeClassPool = getSizeClassPool(size);
110-
Send<Buffer> send = sizeClassPool.poll();
111-
Buffer untetheredBuf;
112-
if (send != null) {
113-
var transfer = (TransferSend<Buffer, Buffer>) send;
114-
var owned = transfer.unsafeUnwrapOwned();
115-
untetheredBuf = owned.transferOwnership(NO_OP_DROP);
116-
} else {
117-
untetheredBuf = createBuf(size, NO_OP_DROP);
116+
Object memory = sizeClassPool.poll();
117+
if (memory == null) {
118+
Buffer untetheredBuf = createBuf(size, NO_OP_DROP);
119+
memory = manager.unwrapRecoverableMemory(untetheredBuf);
118120
}
119-
return manager.unwrapRecoverableMemory(untetheredBuf);
121+
return memory;
120122
}
121123

122124
@Override
123125
public void recoverMemory(Object memory) {
126+
Buffer buf = recoverMemoryIntoBuffer(memory);
127+
buf.close();
128+
}
129+
130+
private Buffer recoverMemoryIntoBuffer(Object memory) {
124131
var drop = getDrop();
125132
var buf = manager.recoverMemory(memory, drop);
126133
drop.attach(buf);
127-
buf.close();
134+
return buf;
128135
}
129136

130-
private ConcurrentLinkedQueue<Send<Buffer>> getSizeClassPool(int size) {
137+
private ConcurrentLinkedQueue<Object> getSizeClassPool(int size) {
131138
return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>());
132139
}
133140

src/main/java/io/netty/buffer/api/Statics.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,15 @@
2121

2222
interface Statics {
2323
Cleaner CLEANER = Cleaner.create();
24-
Drop<Buffer> NO_OP_DROP = buf -> {
24+
Drop<Buffer> NO_OP_DROP = new Drop<Buffer>() {
25+
@Override
26+
public void drop(Buffer obj) {
27+
}
28+
29+
@Override
30+
public String toString() {
31+
return "NO_OP_DROP";
32+
}
2533
};
2634

2735
static VarHandle findVarHandle(Lookup lookup, Class<?> recv, String name, Class<?> type) {

0 commit comments

Comments
 (0)