Skip to content

Commit 8724d3a

Browse files
authored
Add ChunkedEncodedPublisher (#6262)
This publisher supports encoding binary streams using chunked encoding. This publisher fills the same purpose that ChunkedEncodedInputStream does, but for async bodies.
1 parent 01d9806 commit 8724d3a

File tree

3 files changed

+646
-0
lines changed

3 files changed

+646
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding;
17+
18+
import java.nio.ByteBuffer;
19+
import java.nio.charset.StandardCharsets;
20+
import java.util.ArrayList;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.function.Function;
24+
import java.util.stream.Collectors;
25+
import org.reactivestreams.Publisher;
26+
import org.reactivestreams.Subscriber;
27+
import software.amazon.awssdk.annotations.SdkInternalApi;
28+
import software.amazon.awssdk.utils.Pair;
29+
import software.amazon.awssdk.utils.async.AddingTrailingDataSubscriber;
30+
import software.amazon.awssdk.utils.async.DelegatingSubscriber;
31+
import software.amazon.awssdk.utils.async.FlatteningSubscriber;
32+
import software.amazon.awssdk.utils.internal.MappingSubscriber;
33+
34+
/**
35+
* An implementation of chunk-transfer encoding, but by wrapping a {@link Publisher} of {@link ByteBuffer}. This implementation
36+
* supports chunk-headers, chunk-extensions.
37+
* <p>
38+
* Per <a href="https://datatracker.ietf.org/doc/html/rfc7230#section-4.1">RFC-7230</a>, a chunk-transfer encoded message is
39+
* defined as:
40+
* <pre>
41+
* chunked-body = *chunk
42+
* last-chunk
43+
* trailer-part
44+
* CRLF
45+
* chunk = chunk-size [ chunk-ext ] CRLF
46+
* chunk-data CRLF
47+
* chunk-size = 1*HEXDIG
48+
* last-chunk = 1*("0") [ chunk-ext ] CRLF
49+
* chunk-data = 1*OCTET ; a sequence of chunk-size octets
50+
*
51+
* chunk-ext = *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
52+
* chunk-ext-name = token
53+
* chunk-ext-val = token / quoted-string
54+
* </pre>
55+
*
56+
* @see ChunkedEncodedInputStream
57+
*/
58+
@SdkInternalApi
59+
public class ChunkedEncodedPublisher implements Publisher<ByteBuffer> {
60+
private static final byte[] CRLF = {'\r', '\n'};
61+
private static final byte SEMICOLON = ';';
62+
private static final byte EQUALS = '=';
63+
64+
private final Publisher<ByteBuffer> wrapped;
65+
private final List<ChunkExtensionProvider> extensions = new ArrayList<>();
66+
private final int chunkSize;
67+
private ByteBuffer chunkBuffer;
68+
private final boolean addEmptyTrailingChunk;
69+
70+
public ChunkedEncodedPublisher(Builder b) {
71+
this.wrapped = b.publisher;
72+
this.chunkSize = b.chunkSize;
73+
this.extensions.addAll(b.extensions);
74+
this.addEmptyTrailingChunk = b.addEmptyTrailingChunk;
75+
}
76+
77+
@Override
78+
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
79+
Publisher<Iterable<ByteBuffer>> chunked = chunk(wrapped);
80+
Publisher<Iterable<ByteBuffer>> trailingAdded = addTrailingChunks(chunked);
81+
Publisher<ByteBuffer> flattened = flatten(trailingAdded);
82+
Publisher<ByteBuffer> encoded = map(flattened, this::encodeChunk);
83+
84+
encoded.subscribe(subscriber);
85+
}
86+
87+
public static Builder builder() {
88+
return new Builder();
89+
}
90+
91+
private Iterable<Iterable<ByteBuffer>> getTrailingChunks() {
92+
List<ByteBuffer> trailing = new ArrayList<>();
93+
94+
if (chunkBuffer != null) {
95+
chunkBuffer.flip();
96+
if (chunkBuffer.hasRemaining()) {
97+
trailing.add(chunkBuffer);
98+
}
99+
}
100+
101+
if (addEmptyTrailingChunk) {
102+
trailing.add(ByteBuffer.allocate(0));
103+
}
104+
105+
return Collections.singletonList(trailing);
106+
}
107+
108+
private Publisher<Iterable<ByteBuffer>> chunk(Publisher<ByteBuffer> upstream) {
109+
return subscriber -> {
110+
upstream.subscribe(new ChunkingSubscriber(subscriber));
111+
};
112+
}
113+
114+
private Publisher<ByteBuffer> flatten(Publisher<Iterable<ByteBuffer>> upstream) {
115+
return subscriber -> upstream.subscribe(new FlatteningSubscriber<>(subscriber));
116+
}
117+
118+
public Publisher<Iterable<ByteBuffer>> addTrailingChunks(Publisher<Iterable<ByteBuffer>> upstream) {
119+
return subscriber -> {
120+
upstream.subscribe(new AddingTrailingDataSubscriber<>(subscriber, this::getTrailingChunks));
121+
};
122+
}
123+
124+
public Publisher<ByteBuffer> map(Publisher<ByteBuffer> upstream, Function<? super ByteBuffer, ? extends ByteBuffer> mapper) {
125+
return subscriber -> upstream.subscribe(MappingSubscriber.create(subscriber, mapper));
126+
}
127+
128+
// TODO: Trailing checksum
129+
private ByteBuffer encodeChunk(ByteBuffer byteBuffer) {
130+
int contentLen = byteBuffer.remaining();
131+
byte[] chunkSizeHex = Integer.toHexString(contentLen).getBytes(StandardCharsets.UTF_8);
132+
133+
List<Pair<byte[], byte[]>> chunkExtensions = this.extensions.stream()
134+
.map(e -> {
135+
ByteBuffer duplicate = byteBuffer.duplicate();
136+
return e.get(duplicate);
137+
}).collect(Collectors.toList());
138+
139+
int extensionsLength = calculateExtensionsLength(chunkExtensions);
140+
141+
int encodedLen = chunkSizeHex.length + extensionsLength + CRLF.length + contentLen + CRLF.length;
142+
143+
ByteBuffer encoded = ByteBuffer.allocate(encodedLen);
144+
encoded.put(chunkSizeHex);
145+
146+
chunkExtensions.forEach(p -> {
147+
encoded.put(SEMICOLON);
148+
encoded.put(p.left());
149+
if (p.right() != null && p.right().length > 0) {
150+
encoded.put(EQUALS);
151+
encoded.put(p.right());
152+
}
153+
});
154+
155+
encoded.put(CRLF);
156+
encoded.put(byteBuffer);
157+
encoded.put(CRLF);
158+
159+
encoded.flip();
160+
161+
return encoded;
162+
}
163+
164+
private int calculateExtensionsLength(List<Pair<byte[], byte[]>> chunkExtensions) {
165+
return chunkExtensions.stream()
166+
.mapToInt(p -> {
167+
int keyLen = p.left().length;
168+
byte[] value = p.right();
169+
if (value.length > 0) {
170+
return 1 + keyLen + 1 + value.length; // ';ext-key=ext-value'
171+
}
172+
// ';ext-key
173+
return 1 + keyLen;
174+
}).sum();
175+
}
176+
177+
private class ChunkingSubscriber extends DelegatingSubscriber<ByteBuffer, Iterable<ByteBuffer>> {
178+
protected ChunkingSubscriber(Subscriber<? super Iterable<ByteBuffer>> subscriber) {
179+
super(subscriber);
180+
}
181+
182+
@Override
183+
public void onNext(ByteBuffer byteBuffer) {
184+
if (chunkBuffer == null) {
185+
chunkBuffer = ByteBuffer.allocate(chunkSize);
186+
}
187+
188+
long totalBufferedBytes = (long) chunkBuffer.position() + byteBuffer.remaining();
189+
int nBufferedChunks = (int) (totalBufferedBytes / chunkSize);
190+
191+
List<ByteBuffer> chunks = new ArrayList<>(nBufferedChunks);
192+
193+
if (nBufferedChunks > 0) {
194+
for (int i = 0; i < nBufferedChunks; i++) {
195+
ByteBuffer slice = byteBuffer.slice();
196+
int maxBytesToCopy = Math.min(chunkBuffer.remaining(), slice.remaining());
197+
slice.limit(maxBytesToCopy);
198+
199+
chunkBuffer.put(slice);
200+
if (!chunkBuffer.hasRemaining()) {
201+
chunkBuffer.flip();
202+
chunks.add(chunkBuffer);
203+
chunkBuffer = ByteBuffer.allocate(chunkSize);
204+
}
205+
206+
byteBuffer.position(byteBuffer.position() + maxBytesToCopy);
207+
}
208+
209+
if (byteBuffer.hasRemaining()) {
210+
chunkBuffer.put(byteBuffer);
211+
}
212+
} else {
213+
chunkBuffer.put(byteBuffer);
214+
}
215+
216+
subscriber.onNext(chunks);
217+
}
218+
}
219+
220+
public static class Builder {
221+
private Publisher<ByteBuffer> publisher;
222+
private int chunkSize;
223+
private boolean addEmptyTrailingChunk;
224+
private final List<ChunkExtensionProvider> extensions = new ArrayList<>();
225+
226+
public Builder publisher(Publisher<ByteBuffer> publisher) {
227+
this.publisher = publisher;
228+
return this;
229+
}
230+
231+
public Builder chunkSize(int chunkSize) {
232+
this.chunkSize = chunkSize;
233+
return this;
234+
}
235+
236+
public Builder addEmptyTrailingChunk(boolean addEmptyTrailingChunk) {
237+
this.addEmptyTrailingChunk = addEmptyTrailingChunk;
238+
return this;
239+
}
240+
241+
public Builder addExtension(ChunkExtensionProvider extension) {
242+
this.extensions.add(extension);
243+
return this;
244+
}
245+
246+
public ChunkedEncodedPublisher build() {
247+
return new ChunkedEncodedPublisher(this);
248+
}
249+
}
250+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding;
17+
18+
import io.reactivex.Flowable;
19+
import java.nio.ByteBuffer;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import org.reactivestreams.Publisher;
23+
import org.reactivestreams.tck.PublisherVerification;
24+
import org.reactivestreams.tck.TestEnvironment;
25+
26+
public class ChunkedEncodedPublisherTckTest extends PublisherVerification<ByteBuffer> {
27+
private static final int INPUT_STREAM_ELEMENT_SIZE = 64;
28+
private static final int CHUNK_SIZE = 16 * 1024;
29+
30+
public ChunkedEncodedPublisherTckTest() {
31+
super(new TestEnvironment());
32+
}
33+
34+
@Override
35+
public Publisher<ByteBuffer> createPublisher(long l) {
36+
return createChunkedPublisher(l);
37+
}
38+
39+
@Override
40+
public Publisher<ByteBuffer> createFailedPublisher() {
41+
return null;
42+
}
43+
44+
@Override
45+
public long maxElementsFromPublisher() {
46+
return 512;
47+
}
48+
49+
private Publisher<ByteBuffer> createChunkedPublisher(long chunksToProduce) {
50+
// max of 8 MiB
51+
long totalSize = chunksToProduce * CHUNK_SIZE;
52+
53+
int totalElements = (int) (totalSize / INPUT_STREAM_ELEMENT_SIZE);
54+
55+
byte[] content = new byte[INPUT_STREAM_ELEMENT_SIZE];
56+
57+
List<ByteBuffer> elements = new ArrayList<>();
58+
for (int i = 0; i < totalElements; i++) {
59+
elements.add(ByteBuffer.wrap(content));
60+
}
61+
62+
Publisher<ByteBuffer> inputPublisher = Flowable.fromIterable(elements);
63+
64+
return ChunkedEncodedPublisher.builder()
65+
.chunkSize(CHUNK_SIZE)
66+
.publisher(inputPublisher)
67+
.addEmptyTrailingChunk(false)
68+
.build();
69+
}
70+
}

0 commit comments

Comments
 (0)