|
1 | 1 | package com.typesafe.netty.http.pipelining;
|
2 | 2 |
|
3 |
| -import org.jboss.netty.channel.*; |
4 |
| -import org.jboss.netty.handler.codec.http.HttpRequest; |
| 3 | + |
| 4 | +import io.netty.channel.*; |
| 5 | +import io.netty.handler.codec.http.*; |
5 | 6 |
|
6 | 7 | import java.util.*;
|
| 8 | +import java.util.concurrent.atomic.AtomicInteger; |
7 | 9 |
|
8 | 10 | /**
|
9 | 11 | * Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their
|
10 |
| - * corresponding requests. NOTE: A side effect of using this handler is that upstream HttpRequest objects will |
11 |
| - * cause the original message event to be effectively transformed into an OrderedUpstreamMessageEvent. Conversely |
12 |
| - * OrderedDownstreamChannelEvent objects are expected to be received for the correlating response objects. |
| 12 | + * corresponding requests. |
| 13 | + * |
| 14 | + * Each incoming request is assigned a sequence number, which is provided in the wrapping {@link SequencedHttpRequest}. |
| 15 | + * When a handler writes a response and a response body, it must wrap each of those messages in a |
| 16 | + * {@link SequencedOutboundMessage} with the corresponding sequence from that request. It must send at least one |
| 17 | + * message in response to the request, and the last, and only the last message it sends must have the last flag set. |
| 18 | + * |
| 19 | + * If messages are sent after the last message is sent, these messages may end up being buffered until the sequence |
| 20 | + * number overflows and cycles back. If messages for a given sequence number are not sent sequentially, similar |
| 21 | + * behaviour could also occur. |
| 22 | + * |
| 23 | + * There is no limit to the amount of messages that this handler will buffer for a particular sequence number. It is |
| 24 | + * the responsibility of the handler sending the outbound messages to handle back pressure via promises associated |
| 25 | + * with each write event - if this is done, the buffering will be inherently bounded by back pressure. |
13 | 26 | *
|
14 |
| - * @author Christopher Hunt |
| 27 | + * This handler does however put a bound on the maximum number of in flight requests that it will handle, configured by |
| 28 | + * inFlightRequestsLowWatermark and inFlightRequestsHighWatermark. When the high watermark is exceeded, the handler will |
| 29 | + * push back on the client. When the low watermark is reached, the handler will start reading again. |
15 | 30 | */
|
16 |
| -public class HttpPipeliningHandler extends SimpleChannelHandler { |
| 31 | +public class HttpPipeliningHandler extends ChannelDuplexHandler { |
17 | 32 |
|
18 |
| - public static final int INITIAL_EVENTS_HELD = 3; |
19 |
| - public static final int MAX_EVENTS_HELD = 10000; |
| 33 | + /** |
| 34 | + * The sequence of received HTTP requests. |
| 35 | + */ |
| 36 | + volatile private int receiveSequence = 0; |
20 | 37 |
|
21 |
| - private final int maxEventsHeld; |
| 38 | + /** |
| 39 | + * The currently sending sequence of HTTP requests. |
| 40 | + */ |
| 41 | + volatile private int currentlySendingSequence = 1; |
22 | 42 |
|
23 |
| - private int sequence; |
24 |
| - private int nextRequiredSequence; |
25 |
| - private int nextRequiredSubsequence; |
| 43 | + /** |
| 44 | + * Whether the high watermark has been exceeded. |
| 45 | + */ |
| 46 | + volatile private boolean highWatermarkExceeded = false; |
26 | 47 |
|
27 |
| - private final Queue<OrderedDownstreamChannelEvent> holdingQueue; |
| 48 | + /** |
| 49 | + * The number of requests in flight |
| 50 | + */ |
| 51 | + private final AtomicInteger inFlight = new AtomicInteger(); |
28 | 52 |
|
29 |
| - public HttpPipeliningHandler() { |
30 |
| - this(MAX_EVENTS_HELD); |
| 53 | + /** |
| 54 | + * A write message with a promise of when it's written. |
| 55 | + */ |
| 56 | + private static class WriteMessage { |
| 57 | + /** |
| 58 | + * The written message. |
| 59 | + */ |
| 60 | + final SequencedOutboundMessage message; |
| 61 | + |
| 62 | + /** |
| 63 | + * The future that is redeemed once the message is written. |
| 64 | + */ |
| 65 | + final ChannelPromise promise; |
| 66 | + |
| 67 | + public WriteMessage(SequencedOutboundMessage message, ChannelPromise promise) { |
| 68 | + this.message = message; |
| 69 | + this.promise = promise; |
| 70 | + } |
31 | 71 | }
|
32 | 72 |
|
33 | 73 | /**
|
34 |
| - * @param maxEventsHeld the maximum number of channel events that will be retained prior to aborting the channel |
35 |
| - * connection. This is required as events cannot queue up indefintely; we would run out of |
36 |
| - * memory if this was the case. |
| 74 | + * The buffered events, by sequence |
37 | 75 | */
|
38 |
| - public HttpPipeliningHandler(final int maxEventsHeld) { |
39 |
| - this.maxEventsHeld = maxEventsHeld; |
40 |
| - |
41 |
| - holdingQueue = new PriorityQueue<OrderedDownstreamChannelEvent>(INITIAL_EVENTS_HELD, new Comparator<OrderedDownstreamChannelEvent>() { |
42 |
| - @Override |
43 |
| - public int compare(OrderedDownstreamChannelEvent o1, OrderedDownstreamChannelEvent o2) { |
44 |
| - final int delta = o1.getOrderedUpstreamMessageEvent().getSequence() - o2.getOrderedUpstreamMessageEvent().getSequence(); |
45 |
| - if (delta == 0) { |
46 |
| - return o1.getSubsequence() - o2.getSubsequence(); |
47 |
| - } else { |
48 |
| - return delta; |
49 |
| - } |
50 |
| - } |
51 |
| - }); |
| 76 | + private final Map<Integer, List<WriteMessage>> bufferedEvents = new HashMap<>(); |
| 77 | + |
| 78 | + private final int inFlightRequestsLowWatermark; |
| 79 | + private final int inFlightRequestsHighWatermark; |
| 80 | + |
| 81 | + /** |
| 82 | + * Create the pipelining handler with low and high watermarks of 2 and 4. |
| 83 | + */ |
| 84 | + public HttpPipeliningHandler() { |
| 85 | + this(2, 4); |
52 | 86 | }
|
53 | 87 |
|
54 |
| - public int getMaxEventsHeld() { |
55 |
| - return maxEventsHeld; |
| 88 | + /** |
| 89 | + * Create the pipelining handler. |
| 90 | + * |
| 91 | + * @param inFlightRequestsLowWatermark The soft limit for the number of in flight requests. If this limit is exceeded, |
| 92 | + * the handler will start responding with 503 errors. Must be at least 1. |
| 93 | + * @param inFlightRequestsHighWatermark The hard limit for the number of in flight requests. If this limit is exceeded, |
| 94 | + * the handler will close the connection. Must be at least |
| 95 | + * {@code inFlightRequestsLowWatermark}. |
| 96 | + */ |
| 97 | + public HttpPipeliningHandler(int inFlightRequestsLowWatermark, int inFlightRequestsHighWatermark) { |
| 98 | + if (inFlightRequestsLowWatermark < 0) { |
| 99 | + throw new IllegalArgumentException("inFlightRequestsLowWatermark must be an least 0, was " + inFlightRequestsLowWatermark); |
| 100 | + } |
| 101 | + if (inFlightRequestsHighWatermark <= inFlightRequestsLowWatermark) { |
| 102 | + throw new IllegalArgumentException("inFlightRequestsHighWatermark must be greater than inFlightRequestsLowWatermark, but was " + inFlightRequestsHighWatermark); |
| 103 | + } |
| 104 | + this.inFlightRequestsLowWatermark = inFlightRequestsLowWatermark; |
| 105 | + this.inFlightRequestsHighWatermark = inFlightRequestsHighWatermark; |
56 | 106 | }
|
57 | 107 |
|
58 | 108 | @Override
|
59 |
| - public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) { |
60 |
| - final Object msg = e.getMessage(); |
| 109 | + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { |
| 110 | + // Only forward read complete if we haven't exceeded the high watermark |
| 111 | + if (!highWatermarkExceeded) { |
| 112 | + ctx.fireChannelReadComplete(); |
| 113 | + } } |
| 114 | + |
| 115 | + @Override |
| 116 | + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
61 | 117 | if (msg instanceof HttpRequest) {
|
62 |
| - ctx.sendUpstream(new OrderedUpstreamMessageEvent(sequence++, e.getChannel(), msg, e.getRemoteAddress())); |
| 118 | + receiveSequence++; |
| 119 | + |
| 120 | + if (inFlight.incrementAndGet() > inFlightRequestsHighWatermark) { |
| 121 | + highWatermarkExceeded = true; |
| 122 | + } |
| 123 | + HttpRequest request = (HttpRequest) msg; |
| 124 | + ctx.fireChannelRead(new SequencedHttpRequest(receiveSequence, request)); |
| 125 | + |
63 | 126 | } else {
|
64 |
| - ctx.sendUpstream(e); |
| 127 | + ctx.fireChannelRead(msg); |
65 | 128 | }
|
66 | 129 | }
|
67 | 130 |
|
68 | 131 | @Override
|
69 |
| - public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) |
70 |
| - throws Exception { |
71 |
| - if (e instanceof OrderedDownstreamChannelEvent) { |
72 |
| - |
73 |
| - boolean channelShouldClose = false; |
74 |
| - |
75 |
| - synchronized (holdingQueue) { |
76 |
| - if (holdingQueue.size() < maxEventsHeld) { |
77 |
| - |
78 |
| - final OrderedDownstreamChannelEvent currentEvent = (OrderedDownstreamChannelEvent) e; |
79 |
| - holdingQueue.add(currentEvent); |
80 |
| - |
81 |
| - while (!holdingQueue.isEmpty()) { |
82 |
| - final OrderedDownstreamChannelEvent nextEvent = holdingQueue.peek(); |
83 |
| - if (nextEvent.getOrderedUpstreamMessageEvent().getSequence() != nextRequiredSequence | |
84 |
| - nextEvent.getSubsequence() != nextRequiredSubsequence) { |
85 |
| - break; |
86 |
| - } |
87 |
| - holdingQueue.remove(); |
88 |
| - ctx.sendDownstream(nextEvent.getChannelEvent()); |
89 |
| - if (nextEvent.isLast()) { |
90 |
| - ++nextRequiredSequence; |
91 |
| - nextRequiredSubsequence = 0; |
92 |
| - } else { |
93 |
| - ++nextRequiredSubsequence; |
94 |
| - } |
95 |
| - } |
| 132 | + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { |
| 133 | + if (msg instanceof SequencedOutboundMessage) { |
| 134 | + SequencedOutboundMessage sequenced = (SequencedOutboundMessage) msg; |
| 135 | + |
| 136 | + synchronizedWrite(ctx, sequenced, promise); |
| 137 | + } else { |
| 138 | + ctx.write(msg, promise); |
| 139 | + } |
| 140 | + } |
| 141 | + |
| 142 | + private void progressToNextSendingSequence(ChannelHandlerContext ctx) { |
| 143 | + currentlySendingSequence++; |
| 144 | + |
| 145 | + int inFlight = this.inFlight.decrementAndGet(); |
| 146 | + // If we're now at the low water mark, issue a read for the next request |
| 147 | + if (highWatermarkExceeded && inFlight == inFlightRequestsLowWatermark) { |
| 148 | + ctx.read(); |
| 149 | + highWatermarkExceeded = false; |
| 150 | + } |
| 151 | + } |
| 152 | + |
| 153 | + /** |
| 154 | + * Write the next sequences, if buffered. |
| 155 | + */ |
| 156 | + private void flushNextSequences(ChannelHandlerContext ctx) { |
| 157 | + synchronized (bufferedEvents) { |
| 158 | + |
| 159 | + progressToNextSendingSequence(ctx); |
96 | 160 |
|
| 161 | + List<WriteMessage> toFlush = bufferedEvents.get(currentlySendingSequence); |
| 162 | + |
| 163 | + // Loop while we still have a sequence to flush |
| 164 | + while (toFlush != null) { |
| 165 | + |
| 166 | + bufferedEvents.remove(currentlySendingSequence); |
| 167 | + |
| 168 | + WriteMessage lastWritten = null; |
| 169 | + |
| 170 | + // Flush each event |
| 171 | + for (WriteMessage message: toFlush) { |
| 172 | + ctx.write(message.message.getMessage(), message.promise); |
| 173 | + lastWritten = message; |
| 174 | + } |
| 175 | + |
| 176 | + // If the last message that we wrote was the last message for that sequence, |
| 177 | + // then increment the sequence and maybe get the next sequence from the buffer. |
| 178 | + if (lastWritten != null && lastWritten.message.isLast()) { |
| 179 | + progressToNextSendingSequence(ctx); |
| 180 | + toFlush = bufferedEvents.get(currentlySendingSequence); |
97 | 181 | } else {
|
98 |
| - channelShouldClose = true; |
| 182 | + toFlush = null; |
99 | 183 | }
|
100 | 184 | }
|
| 185 | + } |
| 186 | + } |
101 | 187 |
|
102 |
| - if (channelShouldClose) { |
103 |
| - Channels.close(e.getChannel()); |
| 188 | + /** |
| 189 | + * Perform a write, synchronized on the buffer. |
| 190 | + */ |
| 191 | + private void synchronizedWrite(ChannelHandlerContext ctx, SequencedOutboundMessage sequenced, ChannelPromise promise) { |
| 192 | + synchronized (bufferedEvents) { |
| 193 | + if (sequenced.getSequence() == currentlySendingSequence) { |
| 194 | + ctx.write(sequenced.getMessage(), promise); |
| 195 | + if (sequenced.isLast()) { |
| 196 | + flushNextSequences(ctx); |
| 197 | + } |
| 198 | + } else { |
| 199 | + List<WriteMessage> sequenceBuffer = bufferedEvents.get(sequenced.getSequence()); |
| 200 | + if (sequenceBuffer == null) { |
| 201 | + sequenceBuffer = new ArrayList<>(); |
| 202 | + bufferedEvents.put(sequenced.getSequence(), sequenceBuffer); |
| 203 | + } |
| 204 | + sequenceBuffer.add(new WriteMessage(sequenced, promise)); |
104 | 205 | }
|
105 |
| - } else { |
106 |
| - super.handleDownstream(ctx, e); |
107 | 206 | }
|
108 | 207 | }
|
109 |
| - |
110 | 208 | }
|
0 commit comments