|
1 | 1 | package com.typesafe.netty.http.pipelining;
|
2 | 2 |
|
3 |
| -import org.jboss.netty.channel.*; |
4 |
| -import org.jboss.netty.handler.codec.http.HttpRequest; |
| 3 | + |
| 4 | +import io.netty.channel.*; |
| 5 | +import io.netty.handler.codec.http.*; |
| 6 | +import io.netty.util.collection.IntObjectHashMap; |
| 7 | +import io.netty.util.collection.IntObjectMap; |
5 | 8 |
|
6 | 9 | import java.util.*;
|
| 10 | +import java.util.concurrent.atomic.AtomicInteger; |
7 | 11 |
|
8 | 12 | /**
|
9 | 13 | * Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their
|
10 |
| - * corresponding requests. NOTE: A side effect of using this handler is that upstream HttpRequest objects will |
11 |
| - * cause the original message event to be effectively transformed into an OrderedUpstreamMessageEvent. Conversely |
12 |
| - * OrderedDownstreamChannelEvent objects are expected to be received for the correlating response objects. |
| 14 | + * corresponding requests. |
| 15 | + * |
| 16 | + * Each incoming request is assigned a sequence number, which is provided in the wrapping {@link SequencedHttpRequest}. |
| 17 | + * When a handler writes a response and a response body, it must wrap each of those messages in a |
| 18 | + * {@link SequencedOutboundMessage} with the corresponding sequence from that request. It must send at least one |
| 19 | + * message in response to the request, and the last, and only the last message it sends must have the last flag set. |
| 20 | + * |
| 21 | + * If messages are sent after the last message is sent, these messages may end up being buffered until the sequence |
| 22 | + * number overflows and cycles back. If messages for a given sequence number are not sent sequentially, similar |
| 23 | + * behaviour could also occur. |
| 24 | + * |
| 25 | + * There is no limit to the amount of messages that this handler will buffer for a particular sequence number. It is |
| 26 | + * the responsibility of the handler sending the outbound messages to handle back pressure via promises associated |
| 27 | + * with each write event - if this is done, the buffering will be inherently bounded by back pressure. |
13 | 28 | *
|
14 |
| - * @author Christopher Hunt |
| 29 | + * This handler does however put a bound on the maximum number of in flight requests that it will handle, configured by |
| 30 | + * inFlightRequestsLowWatermark and inFlightRequestsHighWatermark. When the high watermark is exceeded, the handler will |
| 31 | + * push back on the client. When the low watermark is reached, the handler will start reading again. This back pressure |
| 32 | + * mechanism only works if ChannelOptions.AUTO_READ is false. |
| 33 | + * |
| 34 | + * This back pressure is implemented by blocking channelReadComplete events, so assumes that the following handlers |
| 35 | + * will not request reading unless they receive this event. Note that the handler does nothing to actually block |
| 36 | + * incoming requests when the high watermark is reached, it only pushes back on the TCP connection. If there are more |
| 37 | + * requests in the TCP buffers before this back pressure takes effect, including the current request that triggered the |
| 38 | + * high water mark to be exceeded, these requests will still be sent to the following handlers. |
| 39 | + * |
| 40 | + * When the low watermark is reached after the high watermark has been exceeded, the handler will stop blocking |
| 41 | + * channelReadComplete events, and will issue a read. |
15 | 42 | */
|
16 |
| -public class HttpPipeliningHandler extends SimpleChannelHandler { |
| 43 | +public class HttpPipeliningHandler extends ChannelDuplexHandler { |
17 | 44 |
|
18 |
| - public static final int INITIAL_EVENTS_HELD = 3; |
19 |
| - public static final int MAX_EVENTS_HELD = 10000; |
| 45 | + /** |
| 46 | + * The sequence of received HTTP requests. |
| 47 | + */ |
| 48 | + volatile private int receiveSequence = 0; |
20 | 49 |
|
21 |
| - private final int maxEventsHeld; |
| 50 | + /** |
| 51 | + * The currently sending sequence of HTTP requests. |
| 52 | + */ |
| 53 | + volatile private int currentlySendingSequence = 1; |
22 | 54 |
|
23 |
| - private int sequence; |
24 |
| - private int nextRequiredSequence; |
25 |
| - private int nextRequiredSubsequence; |
| 55 | + /** |
| 56 | + * Whether the high watermark has been exceeded. |
| 57 | + */ |
| 58 | + volatile private boolean highWatermarkExceeded = false; |
26 | 59 |
|
27 |
| - private final Queue<OrderedDownstreamChannelEvent> holdingQueue; |
| 60 | + /** |
| 61 | + * The number of requests in flight |
| 62 | + */ |
| 63 | + private final AtomicInteger inFlight = new AtomicInteger(); |
28 | 64 |
|
| 65 | + /** |
| 66 | + * A write message with a promise of when it's written. |
| 67 | + */ |
| 68 | + private static class WriteMessage { |
| 69 | + /** |
| 70 | + * The written message. |
| 71 | + */ |
| 72 | + final SequencedOutboundMessage message; |
| 73 | + |
| 74 | + /** |
| 75 | + * The future that is redeemed once the message is written. |
| 76 | + */ |
| 77 | + final ChannelPromise promise; |
| 78 | + |
| 79 | + public WriteMessage(SequencedOutboundMessage message, ChannelPromise promise) { |
| 80 | + this.message = message; |
| 81 | + this.promise = promise; |
| 82 | + } |
| 83 | + } |
| 84 | + |
| 85 | + /** |
| 86 | + * The buffered events, by sequence |
| 87 | + */ |
| 88 | + private final IntObjectMap<List<WriteMessage>> bufferedEvents = new IntObjectHashMap<>(); |
| 89 | + |
| 90 | + private final int inFlightRequestsLowWatermark; |
| 91 | + private final int inFlightRequestsHighWatermark; |
| 92 | + |
| 93 | + /** |
| 94 | + * Create the pipelining handler with low and high watermarks of 2 and 4. |
| 95 | + */ |
29 | 96 | public HttpPipeliningHandler() {
|
30 |
| - this(MAX_EVENTS_HELD); |
| 97 | + this(2, 4); |
31 | 98 | }
|
32 | 99 |
|
33 | 100 | /**
|
34 |
| - * @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel |
35 |
| - * connection. This is required as events cannot queue up indefintely; we would run out of |
36 |
| - * memory if this was the case. |
| 101 | + * Create the pipelining handler. |
| 102 | + * |
| 103 | + * @param inFlightRequestsLowWatermark The low watermark for in flight requests, where, if the high watermark has |
| 104 | + * been exceeded, the handler will start reading again. Must be at least 0. |
| 105 | + * @param inFlightRequestsHighWatermark The high watermark, once in flight requests has exceeded this, the handler |
| 106 | + * will stop reading, pushing back on the client. Must be greater than the |
| 107 | + * low watermark. |
37 | 108 | */
|
38 |
| - public HttpPipeliningHandler(final int maxEventsHeld) { |
39 |
| - this.maxEventsHeld = maxEventsHeld; |
40 |
| - |
41 |
| - holdingQueue = new PriorityQueue<OrderedDownstreamChannelEvent>(INITIAL_EVENTS_HELD, new Comparator<OrderedDownstreamChannelEvent>() { |
42 |
| - @Override |
43 |
| - public int compare(OrderedDownstreamChannelEvent o1, OrderedDownstreamChannelEvent o2) { |
44 |
| - final int delta = o1.getOrderedUpstreamMessageEvent().getSequence() - o2.getOrderedUpstreamMessageEvent().getSequence(); |
45 |
| - if (delta == 0) { |
46 |
| - return o1.getSubsequence() - o2.getSubsequence(); |
47 |
| - } else { |
48 |
| - return delta; |
49 |
| - } |
50 |
| - } |
51 |
| - }); |
| 109 | + public HttpPipeliningHandler(int inFlightRequestsLowWatermark, int inFlightRequestsHighWatermark) { |
| 110 | + if (inFlightRequestsLowWatermark < 0) { |
| 111 | + throw new IllegalArgumentException("inFlightRequestsLowWatermark must be an least 0, was " + inFlightRequestsLowWatermark); |
| 112 | + } |
| 113 | + if (inFlightRequestsHighWatermark <= inFlightRequestsLowWatermark) { |
| 114 | + throw new IllegalArgumentException("inFlightRequestsHighWatermark must be greater than inFlightRequestsLowWatermark, but was " + inFlightRequestsHighWatermark); |
| 115 | + } |
| 116 | + this.inFlightRequestsLowWatermark = inFlightRequestsLowWatermark; |
| 117 | + this.inFlightRequestsHighWatermark = inFlightRequestsHighWatermark; |
52 | 118 | }
|
53 | 119 |
|
54 |
| - public int getMaxEventsHeld() { |
55 |
| - return maxEventsHeld; |
| 120 | + @Override |
| 121 | + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { |
| 122 | + // Only forward read complete if we haven't exceeded the high watermark |
| 123 | + if (!highWatermarkExceeded) { |
| 124 | + ctx.fireChannelReadComplete(); |
| 125 | + } |
56 | 126 | }
|
57 | 127 |
|
58 | 128 | @Override
|
59 |
| - public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) { |
60 |
| - final Object msg = e.getMessage(); |
| 129 | + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
61 | 130 | if (msg instanceof HttpRequest) {
|
62 |
| - ctx.sendUpstream(new OrderedUpstreamMessageEvent(sequence++, e.getChannel(), msg, e.getRemoteAddress())); |
| 131 | + receiveSequence++; |
| 132 | + |
| 133 | + if (inFlight.incrementAndGet() > inFlightRequestsHighWatermark) { |
| 134 | + highWatermarkExceeded = true; |
| 135 | + } |
| 136 | + HttpRequest request = (HttpRequest) msg; |
| 137 | + ctx.fireChannelRead(new SequencedHttpRequest(receiveSequence, request)); |
| 138 | + |
63 | 139 | } else {
|
64 |
| - ctx.sendUpstream(e); |
| 140 | + ctx.fireChannelRead(msg); |
65 | 141 | }
|
66 | 142 | }
|
67 | 143 |
|
68 | 144 | @Override
|
69 |
| - public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) |
70 |
| - throws Exception { |
71 |
| - if (e instanceof OrderedDownstreamChannelEvent) { |
72 |
| - |
73 |
| - boolean channelShouldClose = false; |
74 |
| - |
75 |
| - synchronized (holdingQueue) { |
76 |
| - if (holdingQueue.size() < maxEventsHeld) { |
77 |
| - |
78 |
| - final OrderedDownstreamChannelEvent currentEvent = (OrderedDownstreamChannelEvent) e; |
79 |
| - holdingQueue.add(currentEvent); |
80 |
| - |
81 |
| - while (!holdingQueue.isEmpty()) { |
82 |
| - final OrderedDownstreamChannelEvent nextEvent = holdingQueue.peek(); |
83 |
| - if (nextEvent.getOrderedUpstreamMessageEvent().getSequence() != nextRequiredSequence | |
84 |
| - nextEvent.getSubsequence() != nextRequiredSubsequence) { |
85 |
| - break; |
86 |
| - } |
87 |
| - holdingQueue.remove(); |
88 |
| - ctx.sendDownstream(nextEvent.getChannelEvent()); |
89 |
| - if (nextEvent.isLast()) { |
90 |
| - ++nextRequiredSequence; |
91 |
| - nextRequiredSubsequence = 0; |
92 |
| - } else { |
93 |
| - ++nextRequiredSubsequence; |
94 |
| - } |
95 |
| - } |
| 145 | + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { |
| 146 | + if (msg instanceof SequencedOutboundMessage) { |
| 147 | + SequencedOutboundMessage sequenced = (SequencedOutboundMessage) msg; |
| 148 | + |
| 149 | + synchronizedWrite(ctx, sequenced, promise); |
| 150 | + } else { |
| 151 | + ctx.write(msg, promise); |
| 152 | + } |
| 153 | + } |
| 154 | + |
| 155 | + private void progressToNextSendingSequence(ChannelHandlerContext ctx) { |
| 156 | + currentlySendingSequence++; |
| 157 | + |
| 158 | + int inFlight = this.inFlight.decrementAndGet(); |
| 159 | + // If we're now at the low water mark, issue a read for the next request |
| 160 | + if (highWatermarkExceeded && inFlight == inFlightRequestsLowWatermark) { |
| 161 | + ctx.read(); |
| 162 | + highWatermarkExceeded = false; |
| 163 | + } |
| 164 | + } |
| 165 | + |
| 166 | + /** |
| 167 | + * Write the next sequences, if buffered. |
| 168 | + */ |
| 169 | + private void flushNextSequences(ChannelHandlerContext ctx) { |
| 170 | + synchronized (bufferedEvents) { |
96 | 171 |
|
| 172 | + progressToNextSendingSequence(ctx); |
| 173 | + |
| 174 | + List<WriteMessage> toFlush = bufferedEvents.get(currentlySendingSequence); |
| 175 | + |
| 176 | + // Loop while we still have a sequence to flush |
| 177 | + while (toFlush != null) { |
| 178 | + |
| 179 | + bufferedEvents.remove(currentlySendingSequence); |
| 180 | + |
| 181 | + WriteMessage lastWritten = null; |
| 182 | + |
| 183 | + // Flush each event |
| 184 | + for (WriteMessage message: toFlush) { |
| 185 | + ctx.write(message.message.getMessage(), message.promise); |
| 186 | + lastWritten = message; |
| 187 | + } |
| 188 | + |
| 189 | + // If the last message that we wrote was the last message for that sequence, |
| 190 | + // then increment the sequence and maybe get the next sequence from the buffer. |
| 191 | + if (lastWritten != null && lastWritten.message.isLast()) { |
| 192 | + progressToNextSendingSequence(ctx); |
| 193 | + toFlush = bufferedEvents.get(currentlySendingSequence); |
97 | 194 | } else {
|
98 |
| - channelShouldClose = true; |
| 195 | + toFlush = null; |
99 | 196 | }
|
100 | 197 | }
|
| 198 | + } |
| 199 | + } |
101 | 200 |
|
102 |
| - if (channelShouldClose) { |
103 |
| - Channels.close(e.getChannel()); |
| 201 | + /** |
| 202 | + * Perform a write, synchronized on the buffer. |
| 203 | + */ |
| 204 | + private void synchronizedWrite(ChannelHandlerContext ctx, SequencedOutboundMessage sequenced, ChannelPromise promise) { |
| 205 | + synchronized (bufferedEvents) { |
| 206 | + if (sequenced.getSequence() == currentlySendingSequence) { |
| 207 | + ctx.write(sequenced.getMessage(), promise); |
| 208 | + if (sequenced.isLast()) { |
| 209 | + flushNextSequences(ctx); |
| 210 | + } |
| 211 | + } else { |
| 212 | + List<WriteMessage> sequenceBuffer = bufferedEvents.get(sequenced.getSequence()); |
| 213 | + if (sequenceBuffer == null) { |
| 214 | + sequenceBuffer = new ArrayList<>(); |
| 215 | + bufferedEvents.put(sequenced.getSequence(), sequenceBuffer); |
| 216 | + } |
| 217 | + sequenceBuffer.add(new WriteMessage(sequenced, promise)); |
104 | 218 | }
|
105 |
| - } else { |
106 |
| - super.handleDownstream(ctx, e); |
107 | 219 | }
|
108 | 220 | }
|
109 |
| - |
110 | 221 | }
|
0 commit comments