|
1 | 1 | package com.typesafe.netty.http.pipelining;
|
2 | 2 |
|
3 |
| -import org.jboss.netty.channel.*; |
4 |
| -import org.jboss.netty.handler.codec.http.HttpRequest; |
| 3 | + |
| 4 | +import io.netty.channel.*; |
| 5 | +import io.netty.handler.codec.http.*; |
| 6 | +import io.netty.util.collection.IntObjectHashMap; |
| 7 | +import io.netty.util.collection.IntObjectMap; |
5 | 8 |
|
6 | 9 | import java.util.*;
|
7 | 10 |
|
8 | 11 | /**
|
9 | 12 | * Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their
|
10 |
| - * corresponding requests. NOTE: A side effect of using this handler is that upstream HttpRequest objects will |
11 |
| - * cause the original message event to be effectively transformed into an OrderedUpstreamMessageEvent. Conversely |
12 |
| - * OrderedDownstreamChannelEvent objects are expected to be received for the correlating response objects. |
| 13 | + * corresponding requests. |
| 14 | + * |
| 15 | + * Each incoming request is assigned a sequence number, which is provided in the wrapping {@link SequencedHttpRequest}. |
| 16 | + * When a handler writes a response and a response body, it must wrap each of those messages in a |
| 17 | + * {@link SequencedOutboundMessage} with the corresponding sequence from that request. It must send at least one |
| 18 | + * message in response to the request, and the last, and only the last message it sends must have the last flag set. |
| 19 | + * |
| 20 | + * If messages are sent after the last message is sent, these messages may end up being buffered until the sequence |
| 21 | + * number overflows and cycles back. If messages for a given sequence number are not sent sequentially, similar |
| 22 | + * behaviour could also occur. |
| 23 | + * |
| 24 | + * There is no limit to the amount of messages that this handler will buffer for a particular sequence number. It is |
| 25 | + * the responsibility of the handler sending the outbound messages to handle back pressure via promises associated |
| 26 | + * with each write event - if this is done, the buffering will be inherently bounded by back pressure. |
13 | 27 | *
|
14 |
| - * @author Christopher Hunt |
| 28 | + * This handler does however put a bound on the maximum number of in flight requests that it will handle, configured by |
| 29 | + * inFlightRequestsLowWatermark and inFlightRequestsHighWatermark. When the high watermark is exceeded, the handler will |
| 30 | + * push back on the client. When the low watermark is reached, the handler will start reading again. This back pressure |
| 31 | + * mechanism only works if ChannelOptions.AUTO_READ is false. |
| 32 | + * |
| 33 | + * This back pressure is implemented by blocking channelReadComplete events, so assumes that the following handlers |
| 34 | + * will not request reading unless they receive this event. Note that the handler does nothing to actually block |
| 35 | + * incoming requests when the high watermark is reached, it only pushes back on the TCP connection. If there are more |
| 36 | + * requests in the TCP buffer before this back pressure takes effect, these requests will still be sent to the following |
| 37 | + * handlers. |
| 38 | + * |
| 39 | + * If channelReadComplete is invoked while the high watermark is reached, then when the low watermark is reached, this |
| 40 | + * will be fired again, to signal demand. |
15 | 41 | */
|
16 |
| -public class HttpPipeliningHandler extends SimpleChannelHandler { |
| 42 | +public class HttpPipeliningHandler extends ChannelDuplexHandler { |
| 43 | + |
| 44 | + /** |
| 45 | + * The sequence of received HTTP requests. |
| 46 | + */ |
| 47 | + private int receiveSequence = 0; |
| 48 | + |
| 49 | + /** |
| 50 | + * The currently sending sequence of HTTP requests. |
| 51 | + */ |
| 52 | + private int currentlySendingSequence = 1; |
17 | 53 |
|
18 |
| - public static final int INITIAL_EVENTS_HELD = 3; |
19 |
| - public static final int MAX_EVENTS_HELD = 10000; |
| 54 | + /** |
| 55 | + * Whether the high watermark has been exceeded. |
| 56 | + */ |
| 57 | + private boolean highWatermarkReached = false; |
| 58 | + |
| 59 | + /** |
| 60 | + * If the high watermark has been exceeded, whether a channel read complete has occurred. |
| 61 | + */ |
| 62 | + private boolean channelReadCompleteOccurred = false; |
20 | 63 |
|
21 |
| - private final int maxEventsHeld; |
| 64 | + /** |
| 65 | + * A write message with a promise of when it's written. |
| 66 | + */ |
| 67 | + private static class WriteMessage { |
| 68 | + /** |
| 69 | + * The written message. |
| 70 | + */ |
| 71 | + final SequencedOutboundMessage message; |
| 72 | + |
| 73 | + /** |
| 74 | + * The future that is redeemed once the message is written. |
| 75 | + */ |
| 76 | + final ChannelPromise promise; |
| 77 | + |
| 78 | + public WriteMessage(SequencedOutboundMessage message, ChannelPromise promise) { |
| 79 | + this.message = message; |
| 80 | + this.promise = promise; |
| 81 | + } |
| 82 | + } |
22 | 83 |
|
23 |
| - private int sequence; |
24 |
| - private int nextRequiredSequence; |
25 |
| - private int nextRequiredSubsequence; |
| 84 | + /** |
| 85 | + * The buffered events, by sequence |
| 86 | + */ |
| 87 | + private final IntObjectMap<List<WriteMessage>> bufferedEvents = new IntObjectHashMap<>(); |
26 | 88 |
|
27 |
| - private final Queue<OrderedDownstreamChannelEvent> holdingQueue; |
| 89 | + private final int inFlightRequestsLowWatermark; |
| 90 | + private final int inFlightRequestsHighWatermark; |
28 | 91 |
|
| 92 | + /** |
| 93 | + * Create the pipelining handler with low and high watermarks of 2 and 4. |
| 94 | + */ |
29 | 95 | public HttpPipeliningHandler() {
|
30 |
| - this(MAX_EVENTS_HELD); |
| 96 | + this(2, 4); |
31 | 97 | }
|
32 | 98 |
|
33 | 99 | /**
|
34 |
| - * @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel |
35 |
| - * connection. This is required as events cannot queue up indefintely; we would run out of |
36 |
| - * memory if this was the case. |
| 100 | + * Create the pipelining handler. |
| 101 | + * |
| 102 | + * @param inFlightRequestsLowWatermark The low watermark for in flight requests, where, if the high watermark has |
| 103 | + * been exceeded, the handler will start reading again. Must be at least 0. |
| 104 | + * @param inFlightRequestsHighWatermark The high watermark, once in flight requests has exceeded this, the handler |
| 105 | + * will stop reading, pushing back on the client. Must be greater than the |
| 106 | + * low watermark. |
37 | 107 | */
|
38 |
| - public HttpPipeliningHandler(final int maxEventsHeld) { |
39 |
| - this.maxEventsHeld = maxEventsHeld; |
40 |
| - |
41 |
| - holdingQueue = new PriorityQueue<OrderedDownstreamChannelEvent>(INITIAL_EVENTS_HELD, new Comparator<OrderedDownstreamChannelEvent>() { |
42 |
| - @Override |
43 |
| - public int compare(OrderedDownstreamChannelEvent o1, OrderedDownstreamChannelEvent o2) { |
44 |
| - final int delta = o1.getOrderedUpstreamMessageEvent().getSequence() - o2.getOrderedUpstreamMessageEvent().getSequence(); |
45 |
| - if (delta == 0) { |
46 |
| - return o1.getSubsequence() - o2.getSubsequence(); |
47 |
| - } else { |
48 |
| - return delta; |
49 |
| - } |
50 |
| - } |
51 |
| - }); |
| 108 | + public HttpPipeliningHandler(int inFlightRequestsLowWatermark, int inFlightRequestsHighWatermark) { |
| 109 | + if (inFlightRequestsLowWatermark < 0) { |
| 110 | + throw new IllegalArgumentException("inFlightRequestsLowWatermark must be an least 0, was " + inFlightRequestsLowWatermark); |
| 111 | + } |
| 112 | + if (inFlightRequestsHighWatermark <= inFlightRequestsLowWatermark) { |
| 113 | + throw new IllegalArgumentException("inFlightRequestsHighWatermark must be greater than inFlightRequestsLowWatermark, but was " + inFlightRequestsHighWatermark); |
| 114 | + } |
| 115 | + this.inFlightRequestsLowWatermark = inFlightRequestsLowWatermark; |
| 116 | + this.inFlightRequestsHighWatermark = inFlightRequestsHighWatermark; |
52 | 117 | }
|
53 | 118 |
|
54 |
| - public int getMaxEventsHeld() { |
55 |
| - return maxEventsHeld; |
| 119 | + private int inFlight() { |
| 120 | + return receiveSequence - currentlySendingSequence + 1; |
56 | 121 | }
|
57 | 122 |
|
58 | 123 | @Override
|
59 |
| - public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) { |
60 |
| - final Object msg = e.getMessage(); |
61 |
| - if (msg instanceof HttpRequest) { |
62 |
| - ctx.sendUpstream(new OrderedUpstreamMessageEvent(sequence++, e.getChannel(), msg, e.getRemoteAddress())); |
| 124 | + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { |
| 125 | + // Only forward read complete if we haven't exceeded the high watermark |
| 126 | + if (!highWatermarkReached) { |
| 127 | + ctx.fireChannelReadComplete(); |
63 | 128 | } else {
|
64 |
| - ctx.sendUpstream(e); |
| 129 | + // Store the fact that read complete has been requested. |
| 130 | + channelReadCompleteOccurred = true; |
| 131 | + } |
| 132 | + } |
| 133 | + |
| 134 | + @Override |
| 135 | + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
| 136 | + Object toSend = msg; |
| 137 | + |
| 138 | + // Wrap message in sequenced if it needs to be |
| 139 | + if (msg instanceof HttpRequest) { |
| 140 | + receiveSequence++; |
| 141 | + HttpRequest request = (HttpRequest) msg; |
| 142 | + toSend = new SequencedHttpRequest(receiveSequence, request); |
| 143 | + } |
| 144 | + |
| 145 | + // If we've reached the end of an http request, and we're at or over the high watermark, |
| 146 | + // set it to reached. |
| 147 | + if (msg instanceof LastHttpContent && inFlight() >= inFlightRequestsHighWatermark) { |
| 148 | + highWatermarkReached = true; |
65 | 149 | }
|
| 150 | + |
| 151 | + ctx.fireChannelRead(toSend); |
66 | 152 | }
|
67 | 153 |
|
68 | 154 | @Override
|
69 |
| - public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) |
70 |
| - throws Exception { |
71 |
| - if (e instanceof OrderedDownstreamChannelEvent) { |
72 |
| - |
73 |
| - boolean channelShouldClose = false; |
74 |
| - |
75 |
| - synchronized (holdingQueue) { |
76 |
| - if (holdingQueue.size() < maxEventsHeld) { |
77 |
| - |
78 |
| - final OrderedDownstreamChannelEvent currentEvent = (OrderedDownstreamChannelEvent) e; |
79 |
| - holdingQueue.add(currentEvent); |
80 |
| - |
81 |
| - while (!holdingQueue.isEmpty()) { |
82 |
| - final OrderedDownstreamChannelEvent nextEvent = holdingQueue.peek(); |
83 |
| - if (nextEvent.getOrderedUpstreamMessageEvent().getSequence() != nextRequiredSequence | |
84 |
| - nextEvent.getSubsequence() != nextRequiredSubsequence) { |
85 |
| - break; |
86 |
| - } |
87 |
| - holdingQueue.remove(); |
88 |
| - ctx.sendDownstream(nextEvent.getChannelEvent()); |
89 |
| - if (nextEvent.isLast()) { |
90 |
| - ++nextRequiredSequence; |
91 |
| - nextRequiredSubsequence = 0; |
92 |
| - } else { |
93 |
| - ++nextRequiredSubsequence; |
94 |
| - } |
95 |
| - } |
96 |
| - |
97 |
| - } else { |
98 |
| - channelShouldClose = true; |
99 |
| - } |
| 155 | + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { |
| 156 | + if (msg instanceof SequencedOutboundMessage) { |
| 157 | + SequencedOutboundMessage sequenced = (SequencedOutboundMessage) msg; |
| 158 | + |
| 159 | + writeInSequence(ctx, sequenced, promise); |
| 160 | + } else { |
| 161 | + ctx.write(msg, promise); |
| 162 | + } |
| 163 | + } |
| 164 | + |
| 165 | + private void progressToNextSendingSequence(ChannelHandlerContext ctx) { |
| 166 | + currentlySendingSequence++; |
| 167 | + |
| 168 | + int inFlight = this.inFlight(); |
| 169 | + // If we're now at the low water mark, set it to false |
| 170 | + if (highWatermarkReached && inFlight == inFlightRequestsLowWatermark) { |
| 171 | + highWatermarkReached = false; |
| 172 | + |
| 173 | + // Check if, while we were over the high watermark, a channel read had occurred |
| 174 | + // that we blocked |
| 175 | + if (channelReadCompleteOccurred) { |
| 176 | + // Send it on |
| 177 | + ctx.fireChannelReadComplete(); |
| 178 | + channelReadCompleteOccurred = false; |
| 179 | + } |
| 180 | + } |
| 181 | + } |
| 182 | + |
| 183 | + /** |
| 184 | + * Write the next sequences, if buffered. |
| 185 | + */ |
| 186 | + private void flushNextSequences(ChannelHandlerContext ctx) { |
| 187 | + progressToNextSendingSequence(ctx); |
| 188 | + |
| 189 | + List<WriteMessage> toFlush = bufferedEvents.get(currentlySendingSequence); |
| 190 | + |
| 191 | + // Loop while we still have a sequence to flush |
| 192 | + while (toFlush != null) { |
| 193 | + |
| 194 | + bufferedEvents.remove(currentlySendingSequence); |
| 195 | + |
| 196 | + WriteMessage lastWritten = null; |
| 197 | + |
| 198 | + // Flush each event |
| 199 | + for (WriteMessage message: toFlush) { |
| 200 | + ctx.write(message.message.getMessage(), message.promise); |
| 201 | + lastWritten = message; |
100 | 202 | }
|
101 | 203 |
|
102 |
| - if (channelShouldClose) { |
103 |
| - Channels.close(e.getChannel()); |
| 204 | + // If the last message that we wrote was the last message for that sequence, |
| 205 | + // then increment the sequence and maybe get the next sequence from the buffer. |
| 206 | + if (lastWritten != null && lastWritten.message.isLast()) { |
| 207 | + progressToNextSendingSequence(ctx); |
| 208 | + toFlush = bufferedEvents.get(currentlySendingSequence); |
| 209 | + } else { |
| 210 | + toFlush = null; |
104 | 211 | }
|
105 |
| - } else { |
106 |
| - super.handleDownstream(ctx, e); |
107 | 212 | }
|
108 | 213 | }
|
109 | 214 |
|
| 215 | + /** |
| 216 | + * Write the message in sequence. |
| 217 | + */ |
| 218 | + private void writeInSequence(ChannelHandlerContext ctx, SequencedOutboundMessage sequenced, ChannelPromise promise) { |
| 219 | + if (sequenced.getSequence() == currentlySendingSequence) { |
| 220 | + ctx.write(sequenced.getMessage(), promise); |
| 221 | + if (sequenced.isLast()) { |
| 222 | + flushNextSequences(ctx); |
| 223 | + } |
| 224 | + } else { |
| 225 | + List<WriteMessage> sequenceBuffer = bufferedEvents.get(sequenced.getSequence()); |
| 226 | + if (sequenceBuffer == null) { |
| 227 | + sequenceBuffer = new ArrayList<>(); |
| 228 | + bufferedEvents.put(sequenced.getSequence(), sequenceBuffer); |
| 229 | + } |
| 230 | + sequenceBuffer.add(new WriteMessage(sequenced, promise)); |
| 231 | + } |
| 232 | + } |
110 | 233 | }
|
0 commit comments