Skip to content

Commit

Permalink
Recycle AddRequest/AddResponse objects
Browse files Browse the repository at this point in the history
This change was originally afd0ecb & 75bf0fa on the yahoo-4.3 branch

Author: Matteo Merli <[email protected]>

Reviewers: Enrico Olivelli <[email protected]>, Jia Zhai <None>, Sijie Guo <[email protected]>

This closes apache#665 from ivankelly/yahoo-bp-2
  • Loading branch information
merlimat authored and Ivan Kelly committed Nov 2, 2017
1 parent c003170 commit 2a5be6c
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,20 +175,18 @@ protected void doAsyncAddEntry(final PendingAddOp op) {
throttler.acquire();
}

final long currentLength;
boolean wasClosed = false;
synchronized (this) {
// synchronized on this to ensure that
// the ledger isn't closed between checking and
// updating lastAddPushed
if (metadata.isClosed()) {
wasClosed = true;
currentLength = 0;
} else {
currentLength = addToLength(op.payload.readableBytes());
long currentLength = addToLength(op.payload.readableBytes());
op.setLedgerLength(currentLength);
pendingAddOps.add(op);
}
op.setLedgerLength(currentLength);
}

if (wasClosed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
BookieProtocol.Request req = (BookieProtocol.Request) msg;
if (req.getOpCode() == BookieProtocol.ADDENTRY) {
ctx.channel().writeAndFlush(
new BookieProtocol.AddResponse(
BookieProtocol.AddResponse.create(
req.getProtocolVersion(), BookieProtocol.EUA,
req.getLedgerId(), req.getEntryId()));
} else if (req.getOpCode() == BookieProtocol.READENTRY) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ public Object encode(Object msg, ByteBufAllocator allocator)
ByteBuf buf = allocator.buffer(totalHeaderSize);
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), r.getFlags()));
buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
return DoubleByteBuf.get(buf, ar.getData());
ByteBuf data = ar.getData();
ar.recycle();
return DoubleByteBuf.get(buf, data);
} else if (r instanceof BookieProtocol.ReadRequest) {
int totalHeaderSize = 4 // for request type
+ 8 // for ledgerId
Expand Down Expand Up @@ -155,7 +157,9 @@ public Object decode(ByteBuf packet)
// Read ledger and entry id without advancing the reader index
ledgerId = packet.getLong(packet.readerIndex());
entryId = packet.getLong(packet.readerIndex() + 8);
return new BookieProtocol.AddRequest(version, ledgerId, entryId, flags, masterKey, packet.retain());
return BookieProtocol.AddRequest.create(
version, ledgerId, entryId, flags,
masterKey, packet.retain());
}

case BookieProtocol.READENTRY:
Expand Down Expand Up @@ -223,29 +227,33 @@ public Object encode(Object msg, ByteBufAllocator allocator)
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), (short) 0));

ServerStats.getInstance().incrementPacketsSent();
if (msg instanceof BookieProtocol.ReadResponse) {
buf.writeInt(r.getErrorCode());
buf.writeLong(r.getLedgerId());
buf.writeLong(r.getEntryId());
try {
if (msg instanceof BookieProtocol.ReadResponse) {
buf.writeInt(r.getErrorCode());
buf.writeLong(r.getLedgerId());
buf.writeLong(r.getEntryId());

BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse) r;
if (rr.hasData()) {
return DoubleByteBuf.get(buf, rr.getData());
} else {
return buf;
}
} else if (msg instanceof BookieProtocol.AddResponse) {
buf.writeInt(r.getErrorCode());
buf.writeLong(r.getLedgerId());
buf.writeLong(r.getEntryId());

BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse)r;
if (rr.hasData()) {
return DoubleByteBuf.get(buf, rr.getData());
} else {
return buf;
} else if (msg instanceof BookieProtocol.AuthResponse) {
BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse) r).getAuthMessage();
return DoubleByteBuf.get(buf, Unpooled.wrappedBuffer(am.toByteArray()));
} else {
LOG.error("Cannot encode unknown response type {}", msg.getClass().getName());
return msg;
}
} else if (msg instanceof BookieProtocol.AddResponse) {
buf.writeInt(r.getErrorCode());
buf.writeLong(r.getLedgerId());
buf.writeLong(r.getEntryId());

return buf;
} else if (msg instanceof BookieProtocol.AuthResponse) {
BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse)r).getAuthMessage();
return DoubleByteBuf.get(buf, Unpooled.wrappedBuffer(am.toByteArray()));
} else {
LOG.error("Cannot encode unknown response type {}", msg.getClass().getName());
return msg;
} finally {
r.recycle();
}
}
@Override
Expand All @@ -263,7 +271,7 @@ public Object decode(ByteBuf buffer)
rc = buffer.readInt();
ledgerId = buffer.readLong();
entryId = buffer.readLong();
return new BookieProtocol.AddResponse(version, rc, ledgerId, entryId);
return BookieProtocol.AddResponse.create(version, rc, ledgerId, entryId);
case BookieProtocol.READENTRY:
rc = buffer.readInt();
ledgerId = buffer.readLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCounted;

import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;

Expand All @@ -40,7 +43,7 @@ public interface BookieProtocol {
public static final byte LOWEST_COMPAT_PROTOCOL_VERSION = 0;

/**
* Current version of the protocol, which client will use.
* Current version of the protocol, which client will use.
*/
public static final byte CURRENT_PROTOCOL_VERSION = 2;

Expand All @@ -62,19 +65,19 @@ public interface BookieProtocol {
*/
public static final int MASTER_KEY_LENGTH = 20;

/**
/**
* The first int of a packet is the header.
* It contains the version, opCode and flags.
* The initial versions of BK didn't have this structure
* and just had an int representing the opCode as the
* first int. This handles that case also.
* and just had an int representing the opCode as the
* first int. This handles that case also.
*/
final static class PacketHeader {
public static int toInt(byte version, byte opCode, short flags) {
if (version == 0) {
return (int)opCode;
} else {
return ((version & 0xFF) << 24)
return ((version & 0xFF) << 24)
| ((opCode & 0xFF) << 16)
| (flags & 0xFFFF);
}
Expand Down Expand Up @@ -177,20 +180,14 @@ public static short getFlags(int packetHeader) {
public static final short FLAG_RECOVERY_ADD = 0x0002;

static class Request {

final byte protocolVersion;
final byte opCode;
final long ledgerId;
final long entryId;
final short flags;
final byte[] masterKey;

protected Request(byte protocolVersion, byte opCode, long ledgerId,
long entryId, short flags) {
this(protocolVersion, opCode, ledgerId, entryId, flags, null);
}

protected Request(byte protocolVersion, byte opCode, long ledgerId,
byte protocolVersion;
byte opCode;
long ledgerId;
long entryId;
short flags;
byte[] masterKey;

protected void init(byte protocolVersion, byte opCode, long ledgerId,
long entryId, short flags, byte[] masterKey) {
this.protocolVersion = protocolVersion;
this.opCode = opCode;
Expand Down Expand Up @@ -233,15 +230,25 @@ byte[] getMasterKey() {
public String toString() {
return String.format("Op(%d)[Ledger:%d,Entry:%d]", opCode, ledgerId, entryId);
}

public void recycle() {}
}

static class AddRequest extends Request {
final ByteBuf data;

public AddRequest(byte protocolVersion, long ledgerId, long entryId,
short flags, byte[] masterKey, ByteBuf data) {
super(protocolVersion, ADDENTRY, ledgerId, entryId, flags, masterKey);
this.data = data.retain();
ByteBuf data;

static AddRequest create(byte protocolVersion, long ledgerId,
long entryId, short flags, byte[] masterKey,
ByteBuf data) {
AddRequest add = RECYCLER.get();
add.protocolVersion = protocolVersion;
add.opCode = ADDENTRY;
add.ledgerId = ledgerId;
add.entryId = entryId;
add.flags = flags;
add.masterKey = masterKey;
add.data = data.retain();
return add;
}

ByteBuf getData() {
Expand All @@ -255,16 +262,36 @@ boolean isRecoveryAdd() {
void release() {
data.release();
}

private final Handle<AddRequest> recyclerHandle;
private AddRequest(Handle<AddRequest> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private static final Recycler<AddRequest> RECYCLER = new Recycler<AddRequest>() {
protected AddRequest newObject(Handle<AddRequest> handle) {
return new AddRequest(handle);
}
};

@Override
public void recycle() {
ledgerId = -1;
entryId = -1;
masterKey = null;
data = null;
recyclerHandle.recycle(this);
}
}

static class ReadRequest extends Request {
ReadRequest(byte protocolVersion, long ledgerId, long entryId, short flags) {
super(protocolVersion, READENTRY, ledgerId, entryId, flags);
init(protocolVersion, READENTRY, ledgerId, entryId, flags, null);
}

ReadRequest(byte protocolVersion, long ledgerId, long entryId,
short flags, byte[] masterKey) {
super(protocolVersion, READENTRY, ledgerId, entryId, flags, masterKey);
init(protocolVersion, READENTRY, ledgerId, entryId, flags, masterKey);
}

boolean isFencingRequest() {
Expand All @@ -276,7 +303,7 @@ static class AuthRequest extends Request {
final AuthMessage authMessage;

AuthRequest(byte protocolVersion, AuthMessage authMessage) {
super(protocolVersion, AUTH, -1, -1, FLAG_NONE, null);
init(protocolVersion, AUTH, -1, -1, FLAG_NONE, null);
this.authMessage = authMessage;
}

Expand All @@ -285,14 +312,14 @@ AuthMessage getAuthMessage() {
}
}

static class Response {
final byte protocolVersion;
final byte opCode;
final int errorCode;
final long ledgerId;
final long entryId;
static abstract class Response {
byte protocolVersion;
byte opCode;
int errorCode;
long ledgerId;
long entryId;

protected Response(byte protocolVersion, byte opCode,
protected void init(byte protocolVersion, byte opCode,
int errorCode, long ledgerId, long entryId) {
this.protocolVersion = protocolVersion;
this.opCode = opCode;
Expand Down Expand Up @@ -326,18 +353,20 @@ public String toString() {
return String.format("Op(%d)[Ledger:%d,Entry:%d,errorCode=%d]",
opCode, ledgerId, entryId, errorCode);
}

abstract void recycle();
}

static class ReadResponse extends Response {
final ByteBuf data;

ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) {
super(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
init(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
this.data = Unpooled.EMPTY_BUFFER;
}

ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, ByteBuf data) {
super(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
init(protocolVersion, READENTRY, errorCode, ledgerId, entryId);
this.data = data;
}

Expand All @@ -348,32 +377,58 @@ boolean hasData() {
ByteBuf getData() {
return data;
}

void recycle() {
}
}

static class AddResponse extends Response {
AddResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) {
super(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId);
static AddResponse create(byte protocolVersion, int errorCode, long ledgerId, long entryId) {
AddResponse response = RECYCLER.get();
response.init(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId);
return response;
}

private final Handle<AddResponse> recyclerHandle;
private AddResponse(Handle<AddResponse> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private static final Recycler<AddResponse> RECYCLER = new Recycler<AddResponse>() {
protected AddResponse newObject(Handle<AddResponse> handle) {
return new AddResponse(handle);
}
};

public void recycle() {
recyclerHandle.recycle(this);
}
}

static class ErrorResponse extends Response {
ErrorResponse(byte protocolVersion, byte opCode, int errorCode,
long ledgerId, long entryId) {
super(protocolVersion, opCode, errorCode, ledgerId, entryId);
init(protocolVersion, opCode, errorCode, ledgerId, entryId);
}

void recycle() {
}
}

static class AuthResponse extends Response {
final AuthMessage authMessage;

AuthResponse(byte protocolVersion, AuthMessage authMessage) {
super(protocolVersion, AUTH, EOK, -1, -1);
init(protocolVersion, AUTH, EOK, -1, -1);
this.authMessage = authMessage;
}

AuthMessage getAuthMessage() {
return authMessage;
}

void recycle() {
}
}

}
Loading

0 comments on commit 2a5be6c

Please sign in to comment.