From 34f1b0579f2b146e234cf9b247fc83d9b829fb62 Mon Sep 17 00:00:00 2001 From: Ivan Kelly Date: Tue, 31 Oct 2017 19:31:21 -0700 Subject: [PATCH] Recycle PendingAddOps Avoid creating a new PendingAddOp object for each entry added, thus saving on garbage. Originally commit 55ba4723 on the yahoo-4.3 branch. Author: Ivan Kelly Author: Matteo Merli Reviewers: Jia Zhai , Sijie Guo This closes #664 from ivankelly/yahoo-bp-1 --- .../bookkeeper/client/LedgerHandle.java | 49 +++---- .../bookkeeper/client/LedgerHandleAdv.java | 38 ++--- .../bookkeeper/client/PendingAddOp.java | 132 +++++++++++++++--- 3 files changed, 141 insertions(+), 78 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index c61c85be084..acb97e040c4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -822,8 +822,8 @@ public void asyncAddEntry(final byte[] data, final int offset, final int length, public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx) { data.retain(); - PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx); - doAsyncAddEntry(op, data, cb, ctx); + PendingAddOp op = PendingAddOp.create(this, data, cb, ctx); + doAsyncAddEntry(op); } /** @@ -864,17 +864,16 @@ public void asyncAddEntry(final long entryId, final byte[] data, final int offse */ void asyncRecoveryAddEntry(final byte[] data, final int offset, final int length, final AddCallback cb, final Object ctx) { - PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx).enableRecoveryAdd(); - doAsyncAddEntry(op, Unpooled.wrappedBuffer(data, offset, length), cb, ctx); + PendingAddOp op = PendingAddOp.create(this, Unpooled.wrappedBuffer(data, offset, length), cb, ctx) + .enableRecoveryAdd(); + doAsyncAddEntry(op); } - protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, final AddCallback cb, final Object ctx) { + protected void doAsyncAddEntry(final PendingAddOp op) { if (throttler != null) { throttler.acquire(); } - final long entryId; - final long currentLength; boolean wasClosed = false; synchronized(this) { // synchronized on this to ensure that @@ -882,12 +881,11 @@ protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, final // updating lastAddPushed if (metadata.isClosed()) { wasClosed = true; - entryId = -1; - currentLength = 0; } else { - entryId = ++lastAddPushed; - currentLength = addToLength(data.readableBytes()); + long entryId = ++lastAddPushed; + long currentLedgerLength = addToLength(op.payload.readableBytes()); op.setEntryId(entryId); + op.setLedgerLength(currentLedgerLength); pendingAddOps.add(op); } } @@ -899,8 +897,8 @@ protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, final @Override public void safeRun() { LOG.warn("Attempt to add to closed ledger: {}", ledgerId); - cb.addComplete(BKException.Code.LedgerClosedException, - LedgerHandle.this, INVALID_ENTRY_ID, ctx); + op.cb.addComplete(BKException.Code.LedgerClosedException, + LedgerHandle.this, INVALID_ENTRY_ID, op.ctx); } @Override @@ -909,32 +907,17 @@ public String toString() { } }); } catch (RejectedExecutionException e) { - cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException), - LedgerHandle.this, INVALID_ENTRY_ID, ctx); + op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException), + LedgerHandle.this, INVALID_ENTRY_ID, op.ctx); } return; } try { - bk.getMainWorkerPool().submitOrdered(ledgerId, new SafeRunnable() { - @Override - public void safeRun() { - ByteBuf toSend = macManager.computeDigestAndPackageForSending(entryId, lastAddConfirmed, - currentLength, data); - try { - op.initiate(toSend, data.readableBytes()); - } finally { - toSend.release(); - } - } - @Override - public String toString() { - return String.format("AsyncAddEntry(lid=%d, eid=%d)", ledgerId, entryId); - } - }); + bk.getMainWorkerPool().submitOrdered(ledgerId, op); } catch (RejectedExecutionException e) { - cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException), - LedgerHandle.this, INVALID_ENTRY_ID, ctx); + op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException), + LedgerHandle.this, INVALID_ENTRY_ID, op.ctx); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java index 39f9932927c..28324b1b416 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java @@ -153,15 +153,15 @@ public void asyncAddEntry(final long entryId, final byte[] data, final int offse private void asyncAddEntry(final long entryId, ByteBuf data, final AddCallback cb, final Object ctx) { - PendingAddOp op = new PendingAddOp(this, cb, ctx); + PendingAddOp op = PendingAddOp.create(this, data, cb, ctx); op.setEntryId(entryId); + if ((entryId <= this.lastAddConfirmed) || pendingAddOps.contains(op)) { LOG.error("Trying to re-add duplicate entryid:{}", entryId); - cb.addComplete(BKException.Code.DuplicateEntryIdException, - LedgerHandleAdv.this, entryId, ctx); + op.submitCallback(BKException.Code.DuplicateEntryIdException); return; } - doAsyncAddEntry(op, data, cb, ctx); + doAsyncAddEntry(op); } /** @@ -170,7 +170,7 @@ private void asyncAddEntry(final long entryId, ByteBuf data, * unaltered in the base class. */ @Override - protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, final AddCallback cb, final Object ctx) { + protected void doAsyncAddEntry(final PendingAddOp op) { if (throttler != null) { throttler.acquire(); } @@ -185,9 +185,10 @@ protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, final wasClosed = true; currentLength = 0; } else { - currentLength = addToLength(length); + currentLength = addToLength(op.payload.readableBytes()); pendingAddOps.add(op); } + op.setLedgerLength(currentLength); } if (wasClosed) { @@ -197,8 +198,8 @@ protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, final @Override public void safeRun() { LOG.warn("Attempt to add to closed ledger: {}", ledgerId); - cb.addComplete(BKException.Code.LedgerClosedException, - LedgerHandleAdv.this, op.getEntryId(), ctx); + op.cb.addComplete(BKException.Code.LedgerClosedException, + LedgerHandleAdv.this, op.getEntryId(), op.ctx); } @Override public String toString() { @@ -206,28 +207,17 @@ public String toString() { } }); } catch (RejectedExecutionException e) { - cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException), - LedgerHandleAdv.this, op.getEntryId(), ctx); + op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException), + LedgerHandleAdv.this, op.getEntryId(), op.ctx); } return; } try { - bk.getMainWorkerPool().submit(new SafeRunnable() { - @Override - public void safeRun() { - ByteBuf toSend = macManager.computeDigestAndPackageForSending(op.getEntryId(), lastAddConfirmed, - currentLength, data); - try { - op.initiate(toSend, toSend.readableBytes()); - } finally { - toSend.release(); - } - } - }); + bk.getMainWorkerPool().submitOrdered(ledgerId, op); } catch (RejectedExecutionException e) { - cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException), - LedgerHandleAdv.this, op.getEntryId(), ctx); + op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException), + LedgerHandleAdv.this, op.getEntryId(), op.ctx); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java index 00a36b3168d..f4d05b77ba9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java @@ -17,7 +17,10 @@ */ package org.apache.bookkeeper.client; +import static com.google.common.base.Preconditions.checkNotNull; import io.netty.buffer.ByteBuf; +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; import io.netty.util.Timeout; import io.netty.util.TimerTask; @@ -48,9 +51,10 @@ * * */ -class PendingAddOp implements WriteCallback, TimerTask { +class PendingAddOp extends SafeRunnable implements WriteCallback, TimerTask { private final static Logger LOG = LoggerFactory.getLogger(PendingAddOp.class); + ByteBuf payload; ByteBuf toSend; AddCallback cb; Object ctx; @@ -64,21 +68,38 @@ class PendingAddOp implements WriteCallback, TimerTask { boolean isRecoveryAdd = false; long requestTimeNanos; - final int timeoutSec; + int timeoutSec; Timeout timeout = null; OpStatsLogger addOpLogger; - boolean callbackTriggered = false; - - PendingAddOp(LedgerHandle lh, AddCallback cb, Object ctx) { - this.lh = lh; - this.cb = cb; - this.ctx = ctx; - this.entryId = LedgerHandle.INVALID_ENTRY_ID; - - this.ackSet = lh.distributionSchedule.getAckSet(); - this.addOpLogger = lh.bk.getAddOpLogger(); - this.timeoutSec = lh.bk.getConf().getAddEntryQuorumTimeout(); + long currentLedgerLength; + int pendingWriteRequests; + boolean callbackTriggered; + boolean hasRun; + + static PendingAddOp create(LedgerHandle lh, ByteBuf payload, AddCallback cb, Object ctx) { + PendingAddOp op = RECYCLER.get(); + op.lh = lh; + op.isRecoveryAdd = false; + op.cb = cb; + op.ctx = ctx; + op.entryId = LedgerHandle.INVALID_ENTRY_ID; + op.currentLedgerLength = -1; + op.payload = payload; + op.entryLength = payload.readableBytes(); + + op.completed = false; + op.ackSet = lh.distributionSchedule.getAckSet(); + op.addOpLogger = lh.bk.getAddOpLogger(); + if (op.timeout != null) { + op.timeout.cancel(); + } + op.timeout = null; + op.timeoutSec = lh.bk.getConf().getAddEntryQuorumTimeout(); + op.pendingWriteRequests = 0; + op.callbackTriggered = false; + op.hasRun = false; + return op; } /** @@ -94,6 +115,10 @@ void setEntryId(long entryId) { this.entryId = entryId; } + void setLedgerLength(long ledgerLength) { + this.currentLedgerLength = ledgerLength; + } + long getEntryId() { return this.entryId; } @@ -103,6 +128,7 @@ void sendWriteRequest(int bookieIndex) { lh.bk.getBookieClient().addEntry(lh.metadata.currentEnsemble.get(bookieIndex), lh.ledgerId, lh.ledgerKey, entryId, toSend, this, bookieIndex, flags); + ++pendingWriteRequests; } @Override @@ -182,21 +208,31 @@ void unsetSuccessAndSendWriteRequest(int bookieIndex) { sendWriteRequest(bookieIndex); } - void initiate(ByteBuf toSend, int entryLength) { + /** + * Initiate the add operation + */ + public void safeRun() { + hasRun = true; if (callbackTriggered) { - // this should only be true if the request was failed due to another request ahead in the pending queue, + // this should only be true if the request was failed due + // to another request ahead in the pending queue, // so we can just ignore this request + maybeRecycle(); return; } if (timeoutSec > -1) { - this.timeout = lh.bk.getBookieClient().scheduleTimeout(this, timeoutSec, TimeUnit.SECONDS); + this.timeout = lh.bk.getBookieClient().scheduleTimeout( + this, timeoutSec, TimeUnit.SECONDS); } + this.requestTimeNanos = MathUtils.nowInNano(); - this.toSend = toSend; - // Retain the buffer until all writes are complete - this.toSend.retain(); - this.entryLength = entryLength; + checkNotNull(lh); + checkNotNull(lh.macManager); + + this.toSend = lh.macManager.computeDigestAndPackageForSending( + entryId, lh.lastAddConfirmed, currentLedgerLength, + payload); // Iterate over set and trigger the sendWriteRequests DistributionSchedule.WriteSet writeSet @@ -213,6 +249,7 @@ void initiate(ByteBuf toSend, int entryLength) { @Override public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) { int bookieIndex = (Integer) ctx; + --pendingWriteRequests; if (!lh.metadata.currentEnsemble.get(bookieIndex).equals(addr)) { // ensemble has already changed, failure of this addr is immaterial @@ -246,6 +283,7 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddre sendAddSuccessCallbacks(); // I am already finished, ignore incoming responses. // otherwise, we might hit the following error handling logic, which might cause bad things. + maybeRecycle(); return; } @@ -292,7 +330,6 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddre if (ackQuorum && !completed) { completed = true; - ackSet.recycle(); sendAddSuccessCallbacks(); } @@ -324,6 +361,8 @@ void submitCallback(final int rc) { } cb.addComplete(rc, lh, entryId, ctx); callbackTriggered = true; + + maybeRecycle(); } @Override @@ -348,4 +387,55 @@ public boolean equals(Object o) { return (this == o); } + private final Handle recyclerHandle; + private static final Recycler RECYCLER = new Recycler() { + protected PendingAddOp newObject(Recycler.Handle handle) { + return new PendingAddOp(handle); + } + }; + + private PendingAddOp(Handle recyclerHandle) { + this.recyclerHandle = recyclerHandle; + } + + private void maybeRecycle() { + // The reference to PendingAddOp can be held in 3 places + // - LedgerHandle#pendingAddOp + // This reference is released when the callback is run + // - The executor + // Released after safeRun finishes + // - BookieClient + // Holds a reference from the point the addEntry requests are + // sent. + // The object can only be recycled after all references are + // released, otherwise we could end up recycling twice and all + // joy that goes along with that. + if (hasRun && callbackTriggered && pendingWriteRequests == 0) { + recycle(); + } + } + + private void recycle() { + entryId = LedgerHandle.INVALID_ENTRY_ID; + currentLedgerLength = -1; + payload = null; + toSend = null; + cb = null; + ctx = null; + ackSet.recycle(); + ackSet = null; + lh = null; + isRecoveryAdd = false; + addOpLogger = null; + completed = false; + pendingWriteRequests = 0; + callbackTriggered = false; + hasRun = false; + if (timeout != null) { + timeout.cancel(); + } + timeout = null; + + recyclerHandle.recycle(this); + } }