From 2a5be6cd3fe370248ce2d21b568bf5ba62a36430 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 2 Nov 2017 07:55:15 +0100 Subject: [PATCH] Recycle AddRequest/AddResponse objects This change was originally afd0ecb6 & 75bf0fa1 on the yahoo-4.3 branch Author: Matteo Merli Reviewers: Enrico Olivelli , Jia Zhai , Sijie Guo This closes #665 from ivankelly/yahoo-bp-2 --- .../bookkeeper/client/LedgerHandleAdv.java | 6 +- .../apache/bookkeeper/proto/AuthHandler.java | 2 +- .../bookkeeper/proto/BookieProtoEncoding.java | 54 ++++--- .../bookkeeper/proto/BookieProtocol.java | 139 ++++++++++++------ .../proto/PerChannelBookieClient.java | 11 +- .../bookkeeper/proto/ResponseBuilder.java | 4 +- .../bookkeeper/proto/WriteEntryProcessor.java | 10 +- 7 files changed, 148 insertions(+), 78 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java index 28324b1b416..0ad9a9d3519 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java @@ -175,7 +175,6 @@ protected void doAsyncAddEntry(final PendingAddOp op) { throttler.acquire(); } - final long currentLength; boolean wasClosed = false; synchronized (this) { // synchronized on this to ensure that @@ -183,12 +182,11 @@ protected void doAsyncAddEntry(final PendingAddOp op) { // updating lastAddPushed if (metadata.isClosed()) { wasClosed = true; - currentLength = 0; } else { - currentLength = addToLength(op.payload.readableBytes()); + long currentLength = addToLength(op.payload.readableBytes()); + op.setLedgerLength(currentLength); pendingAddOps.add(op); } - op.setLedgerLength(currentLength); } if (wasClosed) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java index 409fe4b4a6b..07807858d1e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java @@ -103,7 +103,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception BookieProtocol.Request req = (BookieProtocol.Request) msg; if (req.getOpCode() == BookieProtocol.ADDENTRY) { ctx.channel().writeAndFlush( - new BookieProtocol.AddResponse( + BookieProtocol.AddResponse.create( req.getProtocolVersion(), BookieProtocol.EUA, req.getLedgerId(), req.getEntryId())); } else if (req.getOpCode() == BookieProtocol.READENTRY) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 0fece29c521..df7f3b27160 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -102,7 +102,9 @@ public Object encode(Object msg, ByteBufAllocator allocator) ByteBuf buf = allocator.buffer(totalHeaderSize); buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), r.getFlags())); buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH); - return DoubleByteBuf.get(buf, ar.getData()); + ByteBuf data = ar.getData(); + ar.recycle(); + return DoubleByteBuf.get(buf, data); } else if (r instanceof BookieProtocol.ReadRequest) { int totalHeaderSize = 4 // for request type + 8 // for ledgerId @@ -155,7 +157,9 @@ public Object decode(ByteBuf packet) // Read ledger and entry id without advancing the reader index ledgerId = packet.getLong(packet.readerIndex()); entryId = packet.getLong(packet.readerIndex() + 8); - return new BookieProtocol.AddRequest(version, ledgerId, entryId, flags, masterKey, packet.retain()); + return BookieProtocol.AddRequest.create( + version, ledgerId, entryId, flags, + masterKey, packet.retain()); } case BookieProtocol.READENTRY: @@ -223,29 +227,33 @@ public Object encode(Object msg, ByteBufAllocator allocator) buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), (short) 0)); ServerStats.getInstance().incrementPacketsSent(); - if (msg instanceof BookieProtocol.ReadResponse) { - buf.writeInt(r.getErrorCode()); - buf.writeLong(r.getLedgerId()); - buf.writeLong(r.getEntryId()); + try { + if (msg instanceof BookieProtocol.ReadResponse) { + buf.writeInt(r.getErrorCode()); + buf.writeLong(r.getLedgerId()); + buf.writeLong(r.getEntryId()); + + BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse) r; + if (rr.hasData()) { + return DoubleByteBuf.get(buf, rr.getData()); + } else { + return buf; + } + } else if (msg instanceof BookieProtocol.AddResponse) { + buf.writeInt(r.getErrorCode()); + buf.writeLong(r.getLedgerId()); + buf.writeLong(r.getEntryId()); - BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse)r; - if (rr.hasData()) { - return DoubleByteBuf.get(buf, rr.getData()); - } else { return buf; + } else if (msg instanceof BookieProtocol.AuthResponse) { + BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse) r).getAuthMessage(); + return DoubleByteBuf.get(buf, Unpooled.wrappedBuffer(am.toByteArray())); + } else { + LOG.error("Cannot encode unknown response type {}", msg.getClass().getName()); + return msg; } - } else if (msg instanceof BookieProtocol.AddResponse) { - buf.writeInt(r.getErrorCode()); - buf.writeLong(r.getLedgerId()); - buf.writeLong(r.getEntryId()); - - return buf; - } else if (msg instanceof BookieProtocol.AuthResponse) { - BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse)r).getAuthMessage(); - return DoubleByteBuf.get(buf, Unpooled.wrappedBuffer(am.toByteArray())); - } else { - LOG.error("Cannot encode unknown response type {}", msg.getClass().getName()); - return msg; + } finally { + r.recycle(); } } @Override @@ -263,7 +271,7 @@ public Object decode(ByteBuf buffer) rc = buffer.readInt(); ledgerId = buffer.readLong(); entryId = buffer.readLong(); - return new BookieProtocol.AddResponse(version, rc, ledgerId, entryId); + return BookieProtocol.AddResponse.create(version, rc, ledgerId, entryId); case BookieProtocol.READENTRY: rc = buffer.readInt(); ledgerId = buffer.readLong(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 6fc91e51eef..2c7a82835db 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -23,6 +23,9 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; +import io.netty.util.ReferenceCounted; import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; @@ -40,7 +43,7 @@ public interface BookieProtocol { public static final byte LOWEST_COMPAT_PROTOCOL_VERSION = 0; /** - * Current version of the protocol, which client will use. + * Current version of the protocol, which client will use. */ public static final byte CURRENT_PROTOCOL_VERSION = 2; @@ -62,19 +65,19 @@ public interface BookieProtocol { */ public static final int MASTER_KEY_LENGTH = 20; - /** + /** * The first int of a packet is the header. * It contains the version, opCode and flags. * The initial versions of BK didn't have this structure - * and just had an int representing the opCode as the - * first int. This handles that case also. + * and just had an int representing the opCode as the + * first int. This handles that case also. */ final static class PacketHeader { public static int toInt(byte version, byte opCode, short flags) { if (version == 0) { return (int)opCode; } else { - return ((version & 0xFF) << 24) + return ((version & 0xFF) << 24) | ((opCode & 0xFF) << 16) | (flags & 0xFFFF); } @@ -177,20 +180,14 @@ public static short getFlags(int packetHeader) { public static final short FLAG_RECOVERY_ADD = 0x0002; static class Request { - - final byte protocolVersion; - final byte opCode; - final long ledgerId; - final long entryId; - final short flags; - final byte[] masterKey; - - protected Request(byte protocolVersion, byte opCode, long ledgerId, - long entryId, short flags) { - this(protocolVersion, opCode, ledgerId, entryId, flags, null); - } - - protected Request(byte protocolVersion, byte opCode, long ledgerId, + byte protocolVersion; + byte opCode; + long ledgerId; + long entryId; + short flags; + byte[] masterKey; + + protected void init(byte protocolVersion, byte opCode, long ledgerId, long entryId, short flags, byte[] masterKey) { this.protocolVersion = protocolVersion; this.opCode = opCode; @@ -233,15 +230,25 @@ byte[] getMasterKey() { public String toString() { return String.format("Op(%d)[Ledger:%d,Entry:%d]", opCode, ledgerId, entryId); } + + public void recycle() {} } static class AddRequest extends Request { - final ByteBuf data; - - public AddRequest(byte protocolVersion, long ledgerId, long entryId, - short flags, byte[] masterKey, ByteBuf data) { - super(protocolVersion, ADDENTRY, ledgerId, entryId, flags, masterKey); - this.data = data.retain(); + ByteBuf data; + + static AddRequest create(byte protocolVersion, long ledgerId, + long entryId, short flags, byte[] masterKey, + ByteBuf data) { + AddRequest add = RECYCLER.get(); + add.protocolVersion = protocolVersion; + add.opCode = ADDENTRY; + add.ledgerId = ledgerId; + add.entryId = entryId; + add.flags = flags; + add.masterKey = masterKey; + add.data = data.retain(); + return add; } ByteBuf getData() { @@ -255,16 +262,36 @@ boolean isRecoveryAdd() { void release() { data.release(); } + + private final Handle recyclerHandle; + private AddRequest(Handle recyclerHandle) { + this.recyclerHandle = recyclerHandle; + } + + private static final Recycler RECYCLER = new Recycler() { + protected AddRequest newObject(Handle handle) { + return new AddRequest(handle); + } + }; + + @Override + public void recycle() { + ledgerId = -1; + entryId = -1; + masterKey = null; + data = null; + recyclerHandle.recycle(this); + } } static class ReadRequest extends Request { ReadRequest(byte protocolVersion, long ledgerId, long entryId, short flags) { - super(protocolVersion, READENTRY, ledgerId, entryId, flags); + init(protocolVersion, READENTRY, ledgerId, entryId, flags, null); } ReadRequest(byte protocolVersion, long ledgerId, long entryId, short flags, byte[] masterKey) { - super(protocolVersion, READENTRY, ledgerId, entryId, flags, masterKey); + init(protocolVersion, READENTRY, ledgerId, entryId, flags, masterKey); } boolean isFencingRequest() { @@ -276,7 +303,7 @@ static class AuthRequest extends Request { final AuthMessage authMessage; AuthRequest(byte protocolVersion, AuthMessage authMessage) { - super(protocolVersion, AUTH, -1, -1, FLAG_NONE, null); + init(protocolVersion, AUTH, -1, -1, FLAG_NONE, null); this.authMessage = authMessage; } @@ -285,14 +312,14 @@ AuthMessage getAuthMessage() { } } - static class Response { - final byte protocolVersion; - final byte opCode; - final int errorCode; - final long ledgerId; - final long entryId; + static abstract class Response { + byte protocolVersion; + byte opCode; + int errorCode; + long ledgerId; + long entryId; - protected Response(byte protocolVersion, byte opCode, + protected void init(byte protocolVersion, byte opCode, int errorCode, long ledgerId, long entryId) { this.protocolVersion = protocolVersion; this.opCode = opCode; @@ -326,18 +353,20 @@ public String toString() { return String.format("Op(%d)[Ledger:%d,Entry:%d,errorCode=%d]", opCode, ledgerId, entryId, errorCode); } + + abstract void recycle(); } static class ReadResponse extends Response { final ByteBuf data; ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) { - super(protocolVersion, READENTRY, errorCode, ledgerId, entryId); + init(protocolVersion, READENTRY, errorCode, ledgerId, entryId); this.data = Unpooled.EMPTY_BUFFER; } ReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, ByteBuf data) { - super(protocolVersion, READENTRY, errorCode, ledgerId, entryId); + init(protocolVersion, READENTRY, errorCode, ledgerId, entryId); this.data = data; } @@ -348,18 +377,41 @@ boolean hasData() { ByteBuf getData() { return data; } + + void recycle() { + } } static class AddResponse extends Response { - AddResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId) { - super(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId); + static AddResponse create(byte protocolVersion, int errorCode, long ledgerId, long entryId) { + AddResponse response = RECYCLER.get(); + response.init(protocolVersion, ADDENTRY, errorCode, ledgerId, entryId); + return response; + } + + private final Handle recyclerHandle; + private AddResponse(Handle recyclerHandle) { + this.recyclerHandle = recyclerHandle; + } + + private static final Recycler RECYCLER = new Recycler() { + protected AddResponse newObject(Handle handle) { + return new AddResponse(handle); + } + }; + + public void recycle() { + recyclerHandle.recycle(this); } } - + static class ErrorResponse extends Response { ErrorResponse(byte protocolVersion, byte opCode, int errorCode, long ledgerId, long entryId) { - super(protocolVersion, opCode, errorCode, ledgerId, entryId); + init(protocolVersion, opCode, errorCode, ledgerId, entryId); + } + + void recycle() { } } @@ -367,13 +419,16 @@ static class AuthResponse extends Response { final AuthMessage authMessage; AuthResponse(byte protocolVersion, AuthMessage authMessage) { - super(protocolVersion, AUTH, EOK, -1, -1); + init(protocolVersion, AUTH, EOK, -1, -1); this.authMessage = authMessage; } AuthMessage getAuthMessage() { return authMessage; } + + void recycle() { + } } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index f74a4c96ba7..24164c8110e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -2,6 +2,7 @@ * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance @@ -532,11 +533,13 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf CompletionKey completionKey = null; if (useV2WireProtocol) { completionKey = acquireV2Key(ledgerId, entryId, OperationType.ADD_ENTRY); - request = new BookieProtocol.AddRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, + request = BookieProtocol.AddRequest.create( + BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId, (short) options, masterKey, toSend); } else { final long txnId = getTxnId(); completionKey = new V3CompletionKey(txnId, OperationType.ADD_ENTRY); + // Build the request and calculate the total size to be included in the packet. BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() .setVersion(ProtocolVersion.VERSION_THREE) @@ -854,6 +857,7 @@ private static String requestToString(Object request) { return request.toString(); } } + void errorOut(final CompletionKey key) { if (LOG.isDebugEnabled()) { LOG.debug("Removing completion key: {}", key); @@ -1010,6 +1014,7 @@ private void readV2Response(final BookieProtocol.Response response) { public void safeRun() { completionValue.handleV2Response(ledgerId, entryId, status, response); + response.recycle(); } }); } @@ -1734,7 +1739,6 @@ public void run(Timeout timeout) throws Exception { public void release() {} } - /** * Note : Helper functions follow */ @@ -1814,7 +1818,8 @@ public boolean equals(Object object) { } V2CompletionKey that = (V2CompletionKey) object; return this.entryId == that.entryId - && this.ledgerId == that.ledgerId; + && this.ledgerId == that.ledgerId + && this.operationType == that.operationType; } @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java index c0be1624714..342acd5df31 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ResponseBuilder.java @@ -25,7 +25,7 @@ class ResponseBuilder { static BookieProtocol.Response buildErrorResponse(int errorCode, BookieProtocol.Request r) { if (r.getOpCode() == BookieProtocol.ADDENTRY) { - return new BookieProtocol.AddResponse(r.getProtocolVersion(), errorCode, + return BookieProtocol.AddResponse.create(r.getProtocolVersion(), errorCode, r.getLedgerId(), r.getEntryId()); } else { assert(r.getOpCode() == BookieProtocol.READENTRY); @@ -35,7 +35,7 @@ static BookieProtocol.Response buildErrorResponse(int errorCode, BookieProtocol. } static BookieProtocol.Response buildAddResponse(BookieProtocol.Request r) { - return new BookieProtocol.AddResponse(r.getProtocolVersion(), BookieProtocol.EOK, r.getLedgerId(), + return BookieProtocol.AddResponse.create(r.getProtocolVersion(), BookieProtocol.EOK, r.getLedgerId(), r.getEntryId()); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java index c4b28406b02..416a478e927 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessor.java @@ -17,6 +17,7 @@ */ package org.apache.bookkeeper.proto; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; @@ -70,11 +71,12 @@ protected void processPacket() { startTimeNanos = MathUtils.nowInNano(); int rc = BookieProtocol.EOK; + ByteBuf addData = add.getData(); try { if (add.isRecoveryAdd()) { - requestProcessor.bookie.recoveryAddEntry(add.getData(), this, channel, add.getMasterKey()); + requestProcessor.bookie.recoveryAddEntry(addData, this, channel, add.getMasterKey()); } else { - requestProcessor.bookie.addEntry(add.getData(), this, channel, add.getMasterKey()); + requestProcessor.bookie.addEntry(addData, this, channel, add.getMasterKey()); } } catch (IOException e) { LOG.error("Error writing " + add, e); @@ -86,7 +88,7 @@ protected void processPacket() { LOG.error("Unauthorized access to ledger " + add.getLedgerId(), e); rc = BookieProtocol.EUA; } finally { - add.release(); + addData.release(); } if (rc != BookieProtocol.EOK) { @@ -95,6 +97,7 @@ protected void processPacket() { sendResponse(rc, ResponseBuilder.buildErrorResponse(rc, add), requestProcessor.addRequestStats); + add.recycle(); } } @@ -111,6 +114,7 @@ public void writeComplete(int rc, long ledgerId, long entryId, sendResponse(rc, ResponseBuilder.buildAddResponse(request), requestProcessor.addRequestStats); + request.recycle(); recycle(); }