Skip to content

Commit

Permalink
Recycle PendingAddOps
Browse files Browse the repository at this point in the history
Avoid creating a new PendingAddOp object for each entry added, thus
saving on garbage.

Originally commit 55ba472 on the yahoo-4.3 branch.

Author: Ivan Kelly <[email protected]>
Author: Matteo Merli <[email protected]>

Reviewers: Jia Zhai <None>, Sijie Guo <[email protected]>

This closes apache#664 from ivankelly/yahoo-bp-1
  • Loading branch information
ivankelly authored and Ivan Kelly committed Nov 2, 2017
1 parent 326c1ec commit 34f1b05
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -822,8 +822,8 @@ public void asyncAddEntry(final byte[] data, final int offset, final int length,

public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx) {
data.retain();
PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx);
doAsyncAddEntry(op, data, cb, ctx);
PendingAddOp op = PendingAddOp.create(this, data, cb, ctx);
doAsyncAddEntry(op);
}

/**
Expand Down Expand Up @@ -864,30 +864,28 @@ public void asyncAddEntry(final long entryId, final byte[] data, final int offse
*/
void asyncRecoveryAddEntry(final byte[] data, final int offset, final int length,
final AddCallback cb, final Object ctx) {
PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx).enableRecoveryAdd();
doAsyncAddEntry(op, Unpooled.wrappedBuffer(data, offset, length), cb, ctx);
PendingAddOp op = PendingAddOp.create(this, Unpooled.wrappedBuffer(data, offset, length), cb, ctx)
.enableRecoveryAdd();
doAsyncAddEntry(op);
}

protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, final AddCallback cb, final Object ctx) {
protected void doAsyncAddEntry(final PendingAddOp op) {
if (throttler != null) {
throttler.acquire();
}

final long entryId;
final long currentLength;
boolean wasClosed = false;
synchronized(this) {
// synchronized on this to ensure that
// the ledger isn't closed between checking and
// updating lastAddPushed
if (metadata.isClosed()) {
wasClosed = true;
entryId = -1;
currentLength = 0;
} else {
entryId = ++lastAddPushed;
currentLength = addToLength(data.readableBytes());
long entryId = ++lastAddPushed;
long currentLedgerLength = addToLength(op.payload.readableBytes());
op.setEntryId(entryId);
op.setLedgerLength(currentLedgerLength);
pendingAddOps.add(op);
}
}
Expand All @@ -899,8 +897,8 @@ protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, final
@Override
public void safeRun() {
LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
cb.addComplete(BKException.Code.LedgerClosedException,
LedgerHandle.this, INVALID_ENTRY_ID, ctx);
op.cb.addComplete(BKException.Code.LedgerClosedException,
LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
}

@Override
Expand All @@ -909,32 +907,17 @@ public String toString() {
}
});
} catch (RejectedExecutionException e) {
cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
LedgerHandle.this, INVALID_ENTRY_ID, ctx);
op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
}
return;
}

try {
bk.getMainWorkerPool().submitOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
ByteBuf toSend = macManager.computeDigestAndPackageForSending(entryId, lastAddConfirmed,
currentLength, data);
try {
op.initiate(toSend, data.readableBytes());
} finally {
toSend.release();
}
}
@Override
public String toString() {
return String.format("AsyncAddEntry(lid=%d, eid=%d)", ledgerId, entryId);
}
});
bk.getMainWorkerPool().submitOrdered(ledgerId, op);
} catch (RejectedExecutionException e) {
cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
LedgerHandle.this, INVALID_ENTRY_ID, ctx);
op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
LedgerHandle.this, INVALID_ENTRY_ID, op.ctx);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ public void asyncAddEntry(final long entryId, final byte[] data, final int offse

private void asyncAddEntry(final long entryId, ByteBuf data,
final AddCallback cb, final Object ctx) {
PendingAddOp op = new PendingAddOp(this, cb, ctx);
PendingAddOp op = PendingAddOp.create(this, data, cb, ctx);
op.setEntryId(entryId);

if ((entryId <= this.lastAddConfirmed) || pendingAddOps.contains(op)) {
LOG.error("Trying to re-add duplicate entryid:{}", entryId);
cb.addComplete(BKException.Code.DuplicateEntryIdException,
LedgerHandleAdv.this, entryId, ctx);
op.submitCallback(BKException.Code.DuplicateEntryIdException);
return;
}
doAsyncAddEntry(op, data, cb, ctx);
doAsyncAddEntry(op);
}

/**
Expand All @@ -170,7 +170,7 @@ private void asyncAddEntry(final long entryId, ByteBuf data,
* unaltered in the base class.
*/
@Override
protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, final AddCallback cb, final Object ctx) {
protected void doAsyncAddEntry(final PendingAddOp op) {
if (throttler != null) {
throttler.acquire();
}
Expand All @@ -185,9 +185,10 @@ protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, final
wasClosed = true;
currentLength = 0;
} else {
currentLength = addToLength(length);
currentLength = addToLength(op.payload.readableBytes());
pendingAddOps.add(op);
}
op.setLedgerLength(currentLength);
}

if (wasClosed) {
Expand All @@ -197,37 +198,26 @@ protected void doAsyncAddEntry(final PendingAddOp op, final ByteBuf data, final
@Override
public void safeRun() {
LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
cb.addComplete(BKException.Code.LedgerClosedException,
LedgerHandleAdv.this, op.getEntryId(), ctx);
op.cb.addComplete(BKException.Code.LedgerClosedException,
LedgerHandleAdv.this, op.getEntryId(), op.ctx);
}
@Override
public String toString() {
return String.format("AsyncAddEntryToClosedLedger(lid=%d)", ledgerId);
}
});
} catch (RejectedExecutionException e) {
cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
LedgerHandleAdv.this, op.getEntryId(), ctx);
op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
LedgerHandleAdv.this, op.getEntryId(), op.ctx);
}
return;
}

try {
bk.getMainWorkerPool().submit(new SafeRunnable() {
@Override
public void safeRun() {
ByteBuf toSend = macManager.computeDigestAndPackageForSending(op.getEntryId(), lastAddConfirmed,
currentLength, data);
try {
op.initiate(toSend, toSend.readableBytes());
} finally {
toSend.release();
}
}
});
bk.getMainWorkerPool().submitOrdered(ledgerId, op);
} catch (RejectedExecutionException e) {
cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
LedgerHandleAdv.this, op.getEntryId(), ctx);
op.cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
LedgerHandleAdv.this, op.getEntryId(), op.ctx);
}
}

Expand Down
Loading

0 comments on commit 34f1b05

Please sign in to comment.