Skip to content

Commit 1fe5a77

Browse files
birklandemetsger
authored andcommitted
The jetty producer has fundamental flaws and is incapable of streaming
http requests to endpoints; it relies on an easily-exceeded 2MB buffer. The http4 component supports streaming out of the box, but Exchanges that originate from http requests automatically close streams when their lifecycle ends. This means that certain Camel constructs that terminate exchanges (e.g. recipientList, enrich, etc) must be avoided. This is an undocumented gotcha in Camel. Resolves fcrepo4-labs#76 Squashed commit of the following: commit f2c7250bb1545be7aeb171d504340d2af7795482 Author: Aaron Birkland <[email protected]> Date: Sat Nov 12 23:28:40 2016 -0500 Refactor intercept routes for streaming Http-based exchanges in streaming mode automatically close their connections after theexchange is done. As the "loop" and "enhance" nodes result in the termination of exchanges, the ssubsequent closing of the stream results in "attempt to read from closed stream" exceptions commit be18a35d271d6fe2d7640c57eb168257bb1cc53d Author: Aaron Birkland <[email protected]> Date: Thu Nov 10 19:24:16 2016 -0500 Complete conversion to http4 commit c665825763bd060681e3fd5222d22d6daab74a9b Author: Aaron Birkland <[email protected]> Date: Thu Nov 10 16:34:51 2016 -0500 checkstyle commit 5023908 Author: Elliot Metsger <[email protected]> Date: Wed Nov 9 11:36:26 2016 -0500 Work around limitations with buffering in the jetty component by using http4 instead. This problem manifests itself when API-X proxies the retrieval of large (>2MiB) resources from Fedora. (fcrepo4-labs#77) - Use http4 component instead of jetty in the 'execute-intercept' route - Includes IT demonstrating the issue whe the jetty component is used
1 parent 809db12 commit 1fe5a77

File tree

12 files changed

+508
-55
lines changed

12 files changed

+508
-55
lines changed

fcrepo-api-x-integration/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,13 @@
287287
<scope>test</scope>
288288
</dependency>
289289

290+
<dependency>
291+
<groupId>commons-io</groupId>
292+
<artifactId>commons-io</artifactId>
293+
<version>2.5</version>
294+
<scope>test</scope>
295+
</dependency>
296+
290297
<!-- Fedora -->
291298
<dependency>
292299
<groupId>org.fcrepo</groupId>

fcrepo-api-x-integration/src/test/java/org/fcrepo/apix/integration/InterceptingModalityIT.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -144,14 +144,19 @@ public void incomingInterceptHeaderResponseTest() throws Exception {
144144
onServiceRequest(ex -> {
145145
if (MODALITY_INTERCEPT_INCOMING.equals(ex.getIn().getHeader(HTTP_HEADER_MODALITY))) {
146146
ex.getOut().setHeader("foo", "bar");
147+
ex.getOut().setBody(ex.getIn().getBody());
148+
} else {
149+
ex.setOut(ex.getIn());
147150
}
148151
});
149152

150153
final URI object = postFromTestResource("objects/object_InterceptingServiceIT.ttl",
151154
objectContainer_intercept);
152155

153-
final FcrepoResponse response = client.get(object).accept("application/n-triples").perform();
154-
assertTrue(IOUtils.toString(response.getBody(), "UTF-8").contains(TEST_OBJECT_TYPE));
156+
try (final FcrepoResponse response = client.get(object).accept("application/n-triples").perform()) {
157+
final String body = IOUtils.toString(response.getBody(), "UTF-8");
158+
assertTrue(body.contains(TEST_OBJECT_TYPE));
159+
}
155160
}
156161

157162
@Test
@@ -167,6 +172,9 @@ public void incomingRequestBodyTest() throws Exception {
167172
onServiceRequest(ex -> {
168173
if (MODALITY_INTERCEPT_INCOMING.equals(ex.getIn().getHeader(HTTP_HEADER_MODALITY))) {
169174
ex.getOut().setBody(String.format("<> a <%s> .", TYPE));
175+
ex.getOut().setHeaders(ex.getIn().getHeaders());
176+
} else {
177+
ex.setOut(ex.getIn());
170178
}
171179
});
172180

@@ -192,6 +200,7 @@ public void outgoingHeaderTest() throws Exception {
192200

193201
// Give our request a specific body
194202
onServiceRequest(ex -> {
203+
ex.setOut(ex.getIn());
195204
if (MODALITY_INTERCEPT_OUTGOING.equals(ex.getIn().getHeader(HTTP_HEADER_MODALITY))) {
196205
ex.getOut().setHeader(TEST_HEADER, TEST_HEADER_VALUE);
197206
}
@@ -240,6 +249,7 @@ public void outgoingErrorTest() throws Exception {
240249

241250
// Give our request a specific body
242251
onServiceRequest(ex -> {
252+
ex.setOut(ex.getIn());
243253
if (MODALITY_INTERCEPT_OUTGOING.equals(ex.getIn().getHeader(HTTP_HEADER_MODALITY))) {
244254
ex.getOut().setHeader(Exchange.HTTP_RESPONSE_CODE, 418);
245255
}
@@ -248,14 +258,10 @@ public void outgoingErrorTest() throws Exception {
248258
final URI object = postFromTestResource("objects/object_InterceptingServiceIT.ttl",
249259
objectContainer_intercept);
250260

251-
final FcrepoResponse response = client.get(object).accept("application/n-triples").perform();
252-
253-
// Response code should be Fedora's 200
254-
assertEquals(200, response.getStatusCode());
255-
256-
// Body should be returned as normal
257-
assertTrue(IOUtils.toString(response.getBody(), "UTF-8").contains(TEST_OBJECT_TYPE));
258-
261+
try {
262+
client.get(object).accept("application/n-triples").perform();
263+
} catch (final FcrepoOperationFailedException e) {
264+
assertEquals(418, e.getStatusCode());
265+
}
259266
}
260-
261267
}

fcrepo-api-x-integration/src/test/java/org/fcrepo/apix/integration/KarafIT.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import java.io.File;
3232
import java.io.FileInputStream;
33+
import java.io.InputStream;
3334
import java.net.URI;
3435
import java.util.ArrayList;
3536
import java.util.Arrays;
@@ -204,10 +205,19 @@ public default Option deployFile(String path) {
204205
* @return the resulting WebResource
205206
*/
206207
public default WebResource testResource(String path) {
208+
return testResource(path, "text/turtle");
209+
}
207210

211+
/**
212+
* Get a test resource from test-classes
213+
*
214+
* @param path the resource path relative to {@link #testResources}
215+
* @return the resulting WebResource
216+
*/
217+
public default WebResource testResource(String path, String contentType) {
208218
final File file = new File(testResources, path);
209219
try {
210-
return WebResource.of(new FileInputStream(file), "text/turtle", URI.create(FilenameUtils.getBaseName(
220+
return WebResource.of(new FileInputStream(file), contentType, URI.create(FilenameUtils.getBaseName(
211221
path)), null);
212222
} catch (final Exception e) {
213223
throw new RuntimeException(e);
@@ -225,6 +235,34 @@ public default URI postFromTestResource(final String filePath, final URI intoCon
225235
}
226236
}
227237

238+
public default URI postFromTestResource(final String filePath, final URI intoContainer, final String contentType)
239+
throws Exception {
240+
return postFromTestResource(filePath, intoContainer, contentType,
241+
String.format("%s_%s", testMethodName(), getBaseName(filePath)));
242+
}
243+
244+
public default URI postFromTestResource(final String filePath, final URI intoContainer,
245+
final String contentType, final String slug) throws Exception {
246+
try (final WebResource object = testResource(filePath, contentType);
247+
final FcrepoResponse response = client.post(intoContainer)
248+
.body(object.representation(), object.contentType())
249+
.slug(slug)
250+
.perform()) {
251+
return response.getLocation();
252+
}
253+
}
254+
255+
public default URI postFromStream(final InputStream in, final URI intoContainer, final String contentType,
256+
final String slug) throws Exception {
257+
try (final WebResource object = WebResource.of(in, contentType);
258+
final FcrepoResponse response = client.post(intoContainer)
259+
.body(object.representation(), object.contentType())
260+
.slug(slug)
261+
.perform()) {
262+
return response.getLocation();
263+
}
264+
}
265+
228266
public static <T> T attempt(final int times, final Callable<T> it) {
229267

230268
Exception caught = null;

fcrepo-api-x-integration/src/test/java/org/fcrepo/apix/integration/LoaderIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ public List<Option> additionalKarafConfig() {
144144
public void setUp() {
145145

146146
onServiceRequest(ex -> {
147+
ex.setOut(ex.getIn());
147148
if ("OPTIONS".equals(ex.getIn().getHeader(Exchange.HTTP_METHOD))) {
148149
ex.getOut().setBody(optionsResponse.get());
149150
} else if ("GET".equals(ex.getIn().getHeader(Exchange.HTTP_METHOD))) {

fcrepo-api-x-integration/src/test/java/org/fcrepo/apix/integration/ServiceBasedTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,12 @@ public void configure() throws Exception {
135135
from("jetty:" + serviceEndpoint +
136136
"?matchOnUriPrefix=true")
137137
.process(ex -> {
138-
ex.getOut().copyFrom(responseFromService);
139138
requestToService.copyFrom(ex.getIn());
140139

141140
if (processFromTest != null) {
142141
processFromTest.process(ex);
142+
} else {
143+
ex.getOut().copyFrom(responseFromService);
143144
}
144145

145146
});

0 commit comments

Comments
 (0)