diff --git a/README.md b/README.md index 396457bf..bfd6f3c8 100644 --- a/README.md +++ b/README.md @@ -384,6 +384,7 @@ The following settings apply to both the `JsonRpcServer` and `JsonServiceExporte * `rethrowExceptions` - Boolean specifying whether or not the server should re-throw exceptions after sending them back to the client. * `backwardsComaptible` - Boolean specifying whether or not the server should allow for jsonrpc 1.0 calls. This only includes the omission of the jsonrpc property of the request object, it will not enable class hinting. * `errorResolver` - An implementation of the `ErrorResolver` interface that resolves exception thrown by services into meaningful responses to be sent to clients. Multiple `ErrorResolver`s can be configured using the `MultipleErrorResolver` implementation of this interface. + * `batchExecutorService` - A configured `ExecutorService` to use for parallel JSON-RPC batch processing. By default batch requests are handled sequentially. ### Server Method resolution Methods are resolved in the following way, each step immediately short circuits the diff --git a/build.gradle b/build.gradle index 94ce2fd0..7f313de0 100644 --- a/build.gradle +++ b/build.gradle @@ -18,8 +18,8 @@ description = 'This project aims to provide the facility to easily implement JSO version = '1.5.3-2' group = 'com.github.briandilley.jsonrpc4j' -sourceCompatibility = 1.7 -targetCompatibility = 1.7 +sourceCompatibility = 1.8 +targetCompatibility = 1.8 compileJava { options.encoding = 'UTF-8' diff --git a/src/main/java/com/googlecode/jsonrpc4j/JsonResponse.java b/src/main/java/com/googlecode/jsonrpc4j/JsonResponse.java new file mode 100644 index 00000000..efa24d31 --- /dev/null +++ b/src/main/java/com/googlecode/jsonrpc4j/JsonResponse.java @@ -0,0 +1,46 @@ +package com.googlecode.jsonrpc4j; + +import com.fasterxml.jackson.databind.JsonNode; + +/** + * Contains the JSON-RPC answer in {@code response} + * {@code exceptionToRethrow} contains exception, which should be thrown when property {@code rethrowExceptions} + * is active + */ +public class JsonResponse { + private JsonNode response; + private int code; + private RuntimeException exceptionToRethrow; + + public JsonResponse() { + } + + public JsonResponse(JsonNode response, int code) { + this.response = response; + this.code = code; + } + + public JsonNode getResponse() { + return response; + } + + public void setResponse(JsonNode response) { + this.response = response; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public RuntimeException getExceptionToRethrow() { + return exceptionToRethrow; + } + + public void setExceptionToRethrow(RuntimeException exceptionToRethrow) { + this.exceptionToRethrow = exceptionToRethrow; + } +} diff --git a/src/main/java/com/googlecode/jsonrpc4j/JsonRpcBasicServer.java b/src/main/java/com/googlecode/jsonrpc4j/JsonRpcBasicServer.java index 78f503fd..0d19a075 100644 --- a/src/main/java/com/googlecode/jsonrpc4j/JsonRpcBasicServer.java +++ b/src/main/java/com/googlecode/jsonrpc4j/JsonRpcBasicServer.java @@ -18,9 +18,13 @@ import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import static com.googlecode.jsonrpc4j.ErrorResolver.JsonError.ERROR_NOT_HANDLED; +import static com.googlecode.jsonrpc4j.ErrorResolver.JsonError.INTERNAL_ERROR; import static com.googlecode.jsonrpc4j.ReflectionUtil.findCandidateMethods; import static com.googlecode.jsonrpc4j.ReflectionUtil.getParameterTypes; import static com.googlecode.jsonrpc4j.Util.hasNonNullData; @@ -74,7 +78,9 @@ public class JsonRpcBasicServer { private ConvertedParameterTransformer convertedParameterTransformer = null; private boolean shouldLogInvocationErrors = true; private List interceptorList = new ArrayList<>(); - + private ExecutorService batchExecutorService = null; + private long parallelBatchProcessingTimeout; + /** * Creates the server with the given {@link ObjectMapper} delegating * all calls to the given {@code handler}. @@ -240,9 +246,16 @@ public int handleRequest(final InputStream input, final OutputStream output) thr for (JsonRpcInterceptor interceptor : interceptorList) { interceptor.preHandleJson(jsonNode); } - return handleJsonNodeRequest(jsonNode, output).code; - } catch (JsonParseException | JsonMappingException e) { - return writeAndFlushValueError(output, createResponseError(VERSION, NULL, JsonError.PARSE_ERROR)).code; + JsonResponse jsonResponse = handleJsonNodeRequest(jsonNode); + writeAndFlushValue(output, jsonResponse.getResponse()); + if (jsonResponse.getExceptionToRethrow() != null) { + throw jsonResponse.getExceptionToRethrow(); + } + return jsonResponse.getCode(); + } catch (JsonParseException | JsonMappingException e) { + JsonResponse responseError = createResponseError(VERSION, NULL, JsonError.PARSE_ERROR); + writeAndFlushValue(output, responseError.getResponse()); + return responseError.getCode(); } } @@ -263,77 +276,150 @@ protected Class[] getHandlerInterfaces(final String serviceName) { } /** - * Handles the given {@link JsonNode} and writes the responses to the given {@link OutputStream}. + * Handles the given {@link JsonNode} and creates {@link JsonResponse} * - * @param node the {@link JsonNode} - * @param output the {@link OutputStream} - * @return the error code, or {@code 0} if none - * @throws IOException on error + * @param node the {@link JsonNode} + * @return the {@link JsonResponse} instance */ - protected JsonError handleJsonNodeRequest(final JsonNode node, final OutputStream output) throws IOException { - if (node.isArray()) { - return handleArray(ArrayNode.class.cast(node), output); - } - if (node.isObject()) { - return handleObject(ObjectNode.class.cast(node), output); - } - return this.writeAndFlushValueError(output, this.createResponseError(VERSION, NULL, JsonError.INVALID_REQUEST)); - } - + protected JsonResponse handleJsonNodeRequest(final JsonNode node) { + if (node.isArray()) { + return handleArray((ArrayNode) node); + } + if (node.isObject()) { + return handleObject((ObjectNode) node); + } + return createResponseError(VERSION, NULL, JsonError.INVALID_REQUEST); + } + /** - * Handles the given {@link ArrayNode} and writes the - * responses to the given {@link OutputStream}. - * - * @param node the {@link JsonNode} - * @param output the {@link OutputStream} - * @return the error code, or {@code 0} if none - * @throws IOException on error + * Handles the given {@link ArrayNode} and creates {@link JsonResponse} + * if {@code batchExecutorService} is configured, then handles batch in parallel otherwise handles it sequentially + * + * @param node the {@link JsonNode} + * @return the {@link JsonResponse} instance */ - private JsonError handleArray(ArrayNode node, OutputStream output) throws IOException { - logger.debug("Handling {} requests", node.size()); - JsonError result = JsonError.OK; - output.write('['); - int errorCount = 0; - for (int i = 0; i < node.size(); i++) { - JsonError nodeResult = handleJsonNodeRequest(node.get(i), output); - if (isError(nodeResult)) { - result = JsonError.BULK_ERROR; - errorCount += 1; - } - if (i != node.size() - 1) { - output.write(','); - } - } - output.write(']'); - logger.debug("served {} requests, error {}, result {}", node.size(), errorCount, result); - // noinspection unchecked - return result; + private JsonResponse handleArray(ArrayNode node) { + logger.debug("Handling {} requests", node.size()); + + if (batchExecutorService != null) { + return getBatchResponseInParallel(node); + } else { + return getBatchResponseSequentially(node); + } } - - private boolean isError(JsonError result) { - return result.code != JsonError.OK.code; + + /** + * Handles the given {@link ArrayNode} sequentially and creates {@link JsonResponse} + * + * @param node the {@link JsonNode} + * @return the {@link JsonResponse} instance + */ + private JsonResponse getBatchResponseSequentially(ArrayNode node) { + JsonError result = JsonError.OK; + ArrayNode batchResult = mapper.createArrayNode(); + int errorCount = 0; + JsonResponse response = new JsonResponse(); + + for (int i = 0; i < node.size(); i++) { + JsonResponse nodeResult = handleJsonNodeRequest(node.get(i)); + handleRethrowException(response, nodeResult); + batchResult.add(nodeResult.getResponse()); + if (isError(nodeResult)) { + result = JsonError.BULK_ERROR; + errorCount += 1; + } + } + + logger.debug("served {} requests, error {}, result {}", node.size(), errorCount, result); + + response.setResponse(batchResult); + response.setCode(result.getCode()); + return response; + } + + /** + * Handles the given {@link ArrayNode} in parallel and creates {@link JsonResponse} + * + * @param node the {@link JsonNode} + * @return the {@link JsonResponse} instance + */ + private JsonResponse getBatchResponseInParallel(ArrayNode node) { + JsonError result = JsonError.OK; + ArrayNode batchResult = mapper.createArrayNode(); + int errorCount = 0; + JsonResponse response = new JsonResponse(); + + Map> responses = new HashMap<>(); + for (int i = 0; i < node.size(); i++) { + JsonNode jsonNode = node.get(i); + Object id = parseId(jsonNode.get(ID)); + Future responseFuture = batchExecutorService.submit(() -> handleJsonNodeRequest(jsonNode)); + responses.put(id, responseFuture); + } + + for (Map.Entry> responseFuture : responses.entrySet()) { + JsonResponse singleJsonResponse = getSingleJsonResponse(responseFuture); + handleRethrowException(response, singleJsonResponse); + batchResult.add(singleJsonResponse.getResponse()); + if (isError(singleJsonResponse)) { + result = JsonError.BULK_ERROR; + errorCount += 1; + } + } + + logger.debug("served {} requests, error {}, result {}", node.size(), errorCount, result); + + response.setResponse(batchResult); + response.setCode(result.getCode()); + return response; + } + + + private void handleRethrowException(JsonResponse response, JsonResponse singleJsonResponse) { + if (singleJsonResponse.getExceptionToRethrow() != null && response.getExceptionToRethrow() == null) { + response.setExceptionToRethrow(singleJsonResponse.getExceptionToRethrow()); + } + } + + /** + * Gets the {@link JsonResponse} from the {@link Future} + * + * @param responseFuture {@link Map.Entry} with an id of the JSON-RPC request as a key + * and {@link Future} with {@link JsonResponse} as a value + * @return the {@link JsonResponse} instance + */ + private JsonResponse getSingleJsonResponse(Map.Entry> responseFuture) { + JsonResponse response; + try { + response = responseFuture.getValue().get(parallelBatchProcessingTimeout, TimeUnit.MILLISECONDS); + } catch (Throwable t) { + JsonError jsonError = new JsonError(INTERNAL_ERROR.code, t.getMessage(), t.getClass().getName()); + return createResponseError(VERSION, responseFuture.getKey(), jsonError); + } + return response; + } + + private boolean isError(JsonResponse result) { + return result.getCode() != JsonError.OK.code; } /** - * Handles the given {@link ObjectNode} and writes the - * responses to the given {@link OutputStream}. + * Handles the given {@link ObjectNode} and creates {@link JsonResponse} * * @param node the {@link JsonNode} - * @param output the {@link OutputStream} - * @return the error code, or {@code 0} if none - * @throws IOException on error + * @return the {@link JsonResponse} instance */ - private JsonError handleObject(final ObjectNode node, final OutputStream output) throws IOException { + private JsonResponse handleObject(final ObjectNode node) { logger.debug("Request: {}", node); if (!isValidRequest(node)) { - return writeAndFlushValueError(output, createResponseError(VERSION, NULL, JsonError.INVALID_REQUEST)); + return createResponseError(VERSION, NULL, JsonError.INVALID_REQUEST); } Object id = parseId(node.get(ID)); String jsonRpc = hasNonNullData(node, JSONRPC) ? node.get(JSONRPC).asText() : VERSION; if (!hasNonNullData(node, METHOD)) { - return writeAndFlushValueError(output, createResponseError(jsonRpc, id, JsonError.METHOD_NOT_FOUND)); + return createResponseError(jsonRpc, id, JsonError.METHOD_NOT_FOUND); } final String fullMethodName = node.get(METHOD).asText(); @@ -342,11 +428,11 @@ private JsonError handleObject(final ObjectNode node, final OutputStream output) Set methods = findCandidateMethods(getHandlerInterfaces(serviceName), partialMethodName); if (methods.isEmpty()) { - return writeAndFlushValueError(output, createResponseError(jsonRpc, id, JsonError.METHOD_NOT_FOUND)); + return createResponseError(jsonRpc, id, JsonError.METHOD_NOT_FOUND); } AMethodWithItsArgs methodArgs = findBestMethodByParamsNode(methods, node.get(PARAMS)); if (methodArgs == null) { - return writeAndFlushValueError(output, createResponseError(jsonRpc, id, JsonError.METHOD_PARAMS_INVALID)); + return createResponseError(jsonRpc, id, JsonError.METHOD_PARAMS_INVALID); } try (InvokeListenerHandler handler = new InvokeListenerHandler(methodArgs, invocationListener)) { try { @@ -366,20 +452,19 @@ private JsonError handleObject(final ObjectNode node, final OutputStream output) interceptor.postHandle(target, methodArgs.method, methodArgs.arguments, result); } if (!isNotificationRequest(id)) { - ObjectNode response = createResponseSuccess(jsonRpc, id, handler.result); - writeAndFlushValue(output, response); + return createResponseSuccess(jsonRpc, id, handler.result); } - return JsonError.OK; + return new JsonResponse(null, JsonError.OK.code); } catch (JsonParseException | JsonMappingException e) { throw e; // rethrow this, it will be handled as PARSE_ERROR later } catch (Throwable e) { handler.error = e; - return handleError(output, id, jsonRpc, methodArgs, e); + return handleError(id, jsonRpc, methodArgs, e); } } } - - private JsonError handleError(OutputStream output, Object id, String jsonRpc, AMethodWithItsArgs methodArgs, Throwable e) throws IOException { + + private JsonResponse handleError(Object id, String jsonRpc, AMethodWithItsArgs methodArgs, Throwable e) { Throwable unwrappedException = getException(e); if (shouldLogInvocationErrors) { @@ -387,21 +472,22 @@ private JsonError handleError(OutputStream output, Object id, String jsonRpc, AM } JsonError error = resolveError(methodArgs, unwrappedException); - writeAndFlushValueError(output, createResponseError(jsonRpc, id, error)); - if (rethrowExceptions) { - throw new RuntimeException(unwrappedException); - } - return error; + JsonResponse responseError = createResponseError(jsonRpc, id, error); + if (rethrowExceptions) { + responseError.setExceptionToRethrow(new RuntimeException(unwrappedException)); + } + return responseError; + } private Throwable getException(final Throwable thrown) { Throwable e = thrown; - while (InvocationTargetException.class.isInstance(e)) { + while (e instanceof InvocationTargetException) { // noinspection ThrowableResultOfMethodCallIgnored - e = InvocationTargetException.class.cast(e).getTargetException(); - while (UndeclaredThrowableException.class.isInstance(e)) { + e = ((InvocationTargetException) e).getTargetException(); + while (e instanceof UndeclaredThrowableException) { // noinspection ThrowableResultOfMethodCallIgnored - e = UndeclaredThrowableException.class.cast(e).getUndeclaredThrowable(); + e = ((UndeclaredThrowableException) e).getUndeclaredThrowable(); } } return e; @@ -545,7 +631,54 @@ private Object[] convertJsonToParameters(Method m, List params) throws } return convertedParams; } - + + /** + * Creates a response. + * + * @param jsonRpc the version string + * @param id the id of the request + * @param result the result object + * @param errorObject the error data (if any) + * @return the response object + */ + private JsonResponse createResponse(String jsonRpc, Object id, JsonNode result, JsonError errorObject) { + ObjectNode response = mapper.createObjectNode(); + response.put(JSONRPC, jsonRpc); + if (id instanceof Integer) { + response.put(ID, ((Integer) id).intValue()); + } else if (id instanceof Long) { + response.put(ID, ((Long) id).longValue()); + } else if (id instanceof Float) { + response.put(ID, ((Float) id).floatValue()); + } else if (id instanceof Double) { + response.put(ID, ((Double) id).doubleValue()); + } else if (id instanceof BigDecimal) { + response.put(ID, (BigDecimal) id); + } else { + response.put(ID, (String) id); + } + + int responseCode = JsonError.OK.code; + if (errorObject != null) { + ObjectNode error = mapper.createObjectNode(); + error.put(ERROR_CODE, errorObject.code); + error.put(ERROR_MESSAGE, errorObject.message); + if (errorObject.data != null) { + error.set(DATA, mapper.valueToTree(errorObject.data)); + } + responseCode = errorObject.getCode(); + response.set(ERROR, error); + } else { + response.set(RESULT, result); + } + + for (JsonRpcInterceptor interceptor : interceptorList) { + interceptor.postHandleJson(response); + } + + return new JsonResponse(response, responseCode); + } + /** * Convenience method for creating an error response. * @@ -554,32 +687,10 @@ private Object[] convertJsonToParameters(Method m, List params) throws * @param errorObject the error data (if any) * @return the error response */ - private ErrorObjectWithJsonError createResponseError(String jsonRpc, Object id, JsonError errorObject) { - ObjectNode response = mapper.createObjectNode(); - ObjectNode error = mapper.createObjectNode(); - error.put(ERROR_CODE, errorObject.code); - error.put(ERROR_MESSAGE, errorObject.message); - if (errorObject.data != null) { - error.set(DATA, mapper.valueToTree(errorObject.data)); - } - response.put(JSONRPC, jsonRpc); - if (Integer.class.isInstance(id)) { - response.put(ID, Integer.class.cast(id).intValue()); - } else if (Long.class.isInstance(id)) { - response.put(ID, Long.class.cast(id).longValue()); - } else if (Float.class.isInstance(id)) { - response.put(ID, Float.class.cast(id).floatValue()); - } else if (Double.class.isInstance(id)) { - response.put(ID, Double.class.cast(id).doubleValue()); - } else if (BigDecimal.class.isInstance(id)) { - response.put(ID, BigDecimal.class.cast(id)); - } else { - response.put(ID, String.class.cast(id)); - } - response.set(ERROR, error); - return new ErrorObjectWithJsonError(response, errorObject); - } - + private JsonResponse createResponseError(String jsonRpc, Object id, JsonError errorObject) { + return createResponse(jsonRpc, id, null, errorObject); + } + /** * Creates a success response. * @@ -588,26 +699,10 @@ private ErrorObjectWithJsonError createResponseError(String jsonRpc, Object id, * @param result the result object * @return the response object */ - private ObjectNode createResponseSuccess(String jsonRpc, Object id, JsonNode result) { - ObjectNode response = mapper.createObjectNode(); - response.put(JSONRPC, jsonRpc); - if (Integer.class.isInstance(id)) { - response.put(ID, Integer.class.cast(id).intValue()); - } else if (Long.class.isInstance(id)) { - response.put(ID, Long.class.cast(id).longValue()); - } else if (Float.class.isInstance(id)) { - response.put(ID, Float.class.cast(id).floatValue()); - } else if (Double.class.isInstance(id)) { - response.put(ID, Double.class.cast(id).doubleValue()); - } else if (BigDecimal.class.isInstance(id)) { - response.put(ID, BigDecimal.class.cast(id)); - } else { - response.put(ID, String.class.cast(id)); - } - response.set(RESULT, result); - return response; - } - + private JsonResponse createResponseSuccess(String jsonRpc, Object id, JsonNode result) { + return createResponse(jsonRpc, id, result, null); + } + /** * Finds the {@link Method} from the supplied {@link Set} that * best matches the rest of the arguments supplied and returns @@ -623,9 +718,9 @@ private AMethodWithItsArgs findBestMethodByParamsNode(Set methods, JsonN } AMethodWithItsArgs matchedMethod; if (paramsNode.isArray()) { - matchedMethod = findBestMethodUsingParamIndexes(methods, paramsNode.size(), ArrayNode.class.cast(paramsNode)); + matchedMethod = findBestMethodUsingParamIndexes(methods, paramsNode.size(), (ArrayNode) paramsNode); } else if (paramsNode.isObject()) { - matchedMethod = findBestMethodUsingParamNames(methods, collectFieldNames(paramsNode), ObjectNode.class.cast(paramsNode)); + matchedMethod = findBestMethodUsingParamNames(methods, collectFieldNames(paramsNode), (ObjectNode) paramsNode); } else { throw new IllegalArgumentException("Unknown params node type: " + paramsNode.toString()); } @@ -865,13 +960,13 @@ private JsonError writeAndFlushValueError(OutputStream output, ErrorObjectWithJs * @param value the value to write * @throws IOException on error */ - private void writeAndFlushValue(OutputStream output, ObjectNode value) throws IOException { + private void writeAndFlushValue(OutputStream output, JsonNode value) throws IOException { + if (value == null) { + return; + } logger.debug("Response: {}", value); - for (JsonRpcInterceptor interceptor : interceptorList) { - interceptor.postHandleJson(value); - } - mapper.writeValue(new NoCloseOutputStream(output), value); + mapper.writeValue(new NoCloseOutputStream(output), value); output.write('\n'); } @@ -1001,8 +1096,21 @@ public void setConvertedParameterTransformer(ConvertedParameterTransformer conve public void setShouldLogInvocationErrors(boolean shouldLogInvocationErrors) { this.shouldLogInvocationErrors = shouldLogInvocationErrors; } - - private static class ErrorObjectWithJsonError { + + /** + * Sets the configured {@link ExecutorService} to use it for parallel JSON-RPC batch processing + * + * @param batchExecutorService configured {@link ExecutorService} + */ + public void setBatchExecutorService(ExecutorService batchExecutorService) { + this.batchExecutorService = batchExecutorService; + } + + public void setParallelBatchProcessingTimeout(long parallelBatchProcessingTimeout) { + this.parallelBatchProcessingTimeout = parallelBatchProcessingTimeout; + } + + private static class ErrorObjectWithJsonError { private final ObjectNode node; private final JsonError error; @@ -1076,14 +1184,14 @@ private void collectArgumentsBasedOnName(Method method, Set paramNames, private void collectVarargsFromNode(JsonNode node) { if (node.isArray()) { - ArrayNode arrayNode = ArrayNode.class.cast(node); + ArrayNode arrayNode = (ArrayNode) node; for (int i = 0; i < node.size(); i++) { addArgument(arrayNode.get(i)); } } if (node.isObject()) { - ObjectNode objectNode = ObjectNode.class.cast(node); + ObjectNode objectNode = (ObjectNode) node; Iterator> items = objectNode.fields(); while (items.hasNext()) { Map.Entry item = items.next(); diff --git a/src/main/java/com/googlecode/jsonrpc4j/spring/AbstractJsonServiceExporter.java b/src/main/java/com/googlecode/jsonrpc4j/spring/AbstractJsonServiceExporter.java index 034dd9a5..ba145534 100644 --- a/src/main/java/com/googlecode/jsonrpc4j/spring/AbstractJsonServiceExporter.java +++ b/src/main/java/com/googlecode/jsonrpc4j/spring/AbstractJsonServiceExporter.java @@ -9,6 +9,7 @@ import org.springframework.remoting.support.RemoteExporter; import java.util.List; +import java.util.concurrent.ExecutorService; /** * {@link RemoteExporter} that exports services using Json @@ -33,6 +34,8 @@ abstract class AbstractJsonServiceExporter extends RemoteExporter implements Ini private ConvertedParameterTransformer convertedParameterTransformer = null; private String contentType = null; private List interceptorList; + private ExecutorService batchExecutorService = null; + private long parallelBatchProcessingTimeout; /** * {@inheritDoc} @@ -70,6 +73,8 @@ public void afterPropertiesSet() throws Exception { jsonRpcServer.setHttpStatusCodeProvider(httpStatusCodeProvider); jsonRpcServer.setConvertedParameterTransformer(convertedParameterTransformer); jsonRpcServer.setShouldLogInvocationErrors(shouldLogInvocationErrors); + jsonRpcServer.setBatchExecutorService(batchExecutorService); + jsonRpcServer.setParallelBatchProcessingTimeout(parallelBatchProcessingTimeout); if (contentType != null) { jsonRpcServer.setContentType(contentType); @@ -195,4 +200,18 @@ public void setContentType(String contentType) { public void setInterceptorList(List interceptorList) { this.interceptorList = interceptorList; } + + /** + * @param batchExecutorService the {@link ExecutorService} to set + */ + public void setBatchExecutorService(ExecutorService batchExecutorService) { + this.batchExecutorService = batchExecutorService; + } + + /** + * @param parallelBatchProcessingTimeout timeout used for parallel batch processing + */ + public void setParallelBatchProcessingTimeout(long parallelBatchProcessingTimeout) { + this.parallelBatchProcessingTimeout = parallelBatchProcessingTimeout; + } } diff --git a/src/main/java/com/googlecode/jsonrpc4j/spring/AutoJsonRpcServiceImplExporter.java b/src/main/java/com/googlecode/jsonrpc4j/spring/AutoJsonRpcServiceImplExporter.java index 3237fc62..80031f8f 100644 --- a/src/main/java/com/googlecode/jsonrpc4j/spring/AutoJsonRpcServiceImplExporter.java +++ b/src/main/java/com/googlecode/jsonrpc4j/spring/AutoJsonRpcServiceImplExporter.java @@ -14,6 +14,7 @@ import java.util.*; import java.util.Map.Entry; +import java.util.concurrent.ExecutorService; import java.util.regex.Pattern; import static java.lang.String.format; @@ -40,11 +41,11 @@ */ @SuppressWarnings("unused") public class AutoJsonRpcServiceImplExporter implements BeanFactoryPostProcessor { - + private static final Logger logger = LoggerFactory.getLogger(AutoJsonRpcServiceImplExporter.class); - + private static final String PATH_PREFIX = "/"; - + private static final Pattern PATTERN_JSONRPC_PATH = Pattern.compile("^/?[A-Za-z0-9._~-]+(/[A-Za-z0-9._~-]+)*$"); private ObjectMapper objectMapper; @@ -60,6 +61,8 @@ public class AutoJsonRpcServiceImplExporter implements BeanFactoryPostProcessor private ConvertedParameterTransformer convertedParameterTransformer = null; private String contentType = null; private List interceptorList = null; + private ExecutorService batchExecutorService = null; + private long parallelBatchProcessingTimeout; /** * Finds the beans to expose. @@ -198,7 +201,11 @@ private void registerServiceProxy(DefaultListableBeanFactory defaultListableBean if (interceptorList != null) { builder.addPropertyValue("interceptorList", interceptorList); } - + + if (batchExecutorService != null) { + builder.addPropertyValue("batchExecutorService", batchExecutorService); + } + builder.addPropertyValue("backwardsCompatible", backwardsCompatible); builder.addPropertyValue("rethrowExceptions", rethrowExceptions); builder.addPropertyValue("allowExtraParams", allowExtraParams); @@ -318,4 +325,18 @@ public void setInterceptorList(List interceptorList) { } this.interceptorList = interceptorList; } + + /** + * @param batchExecutorService the {@link ExecutorService} to set + */ + public void setBatchExecutorService(ExecutorService batchExecutorService) { + this.batchExecutorService = batchExecutorService; + } + + /** + * @param parallelBatchProcessingTimeout timeout used for parallel batch processing + */ + public void setParallelBatchProcessingTimeout(long parallelBatchProcessingTimeout) { + this.parallelBatchProcessingTimeout = parallelBatchProcessingTimeout; + } } diff --git a/src/test/java/com/googlecode/jsonrpc4j/server/JsonRpcServerBatchTest.java b/src/test/java/com/googlecode/jsonrpc4j/server/JsonRpcServerBatchTest.java new file mode 100644 index 00000000..8cc4c241 --- /dev/null +++ b/src/test/java/com/googlecode/jsonrpc4j/server/JsonRpcServerBatchTest.java @@ -0,0 +1,81 @@ +package com.googlecode.jsonrpc4j.server; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.googlecode.jsonrpc4j.JsonRpcServer; +import org.easymock.EasyMock; +import org.easymock.Mock; +import org.easymock.MockType; +import org.junit.Test; +import org.springframework.mock.web.MockHttpServletRequest; +import org.springframework.mock.web.MockHttpServletResponse; +import org.springframework.util.StreamUtils; + +import javax.servlet.http.HttpServletResponse; +import java.io.InputStream; + +import static com.googlecode.jsonrpc4j.JsonRpcBasicServer.DATA; +import static com.googlecode.jsonrpc4j.JsonRpcBasicServer.ERROR; +import static com.googlecode.jsonrpc4j.JsonRpcBasicServer.ERROR_MESSAGE; +import static com.googlecode.jsonrpc4j.JsonRpcBasicServer.RESULT; +import static com.googlecode.jsonrpc4j.util.Util.decodeAnswer; +import static com.googlecode.jsonrpc4j.util.Util.getFromArrayWithId; +import static com.googlecode.jsonrpc4j.util.Util.messageWithListParams; +import static com.googlecode.jsonrpc4j.util.Util.multiMessageOfStream; +import static com.googlecode.jsonrpc4j.util.Util.toByteArrayOutputStream; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public abstract class JsonRpcServerBatchTest { + @Mock(type = MockType.NICE) + protected JsonRpcServerTest.ServiceInterface mockService; + protected JsonRpcServer jsonRpcServer; + + @Test + public void parallelBatchProcessingTest() throws Exception { + EasyMock.expect(mockService.testMethod("Parameter1")).andReturn("Result1"); + EasyMock.expect(mockService.testMethod("Parameter2")).andReturn("Result2"); + EasyMock.replay(mockService); + + InputStream inputStream = multiMessageOfStream( + messageWithListParams(1, "testMethod", "Parameter1"), + messageWithListParams(2, "testMethod", "Parameter2")); + + MockHttpServletRequest request = new MockHttpServletRequest("POST", "/test-post"); + MockHttpServletResponse response = new MockHttpServletResponse(); + + request.setContent(StreamUtils.copyToByteArray(inputStream)); + + jsonRpcServer.handle(request, response); + + assertEquals(HttpServletResponse.SC_OK, response.getStatus()); + JsonNode answer = decodeAnswer(toByteArrayOutputStream(response.getContentAsByteArray())); + assertTrue(answer instanceof ArrayNode); + assertEquals("Result1", getFromArrayWithId(answer, 1).get(RESULT).asText()); + assertEquals("Result2", getFromArrayWithId(answer, 2).get(RESULT).asText()); + } + + @Test + public void parallelBatchProcessingBulkErrorTest() throws Exception { + EasyMock.expect(mockService.testMethod("Parameter1")).andThrow(new RuntimeException("Error")); + EasyMock.expect(mockService.testMethod("Parameter2")).andReturn("Result2"); + EasyMock.replay(mockService); + + InputStream inputStream = multiMessageOfStream( + messageWithListParams(1, "testMethod", "Parameter1"), + messageWithListParams(2, "testMethod", "Parameter2")); + + MockHttpServletRequest request = new MockHttpServletRequest("POST", "/test-post"); + MockHttpServletResponse response = new MockHttpServletResponse(); + + request.setContent(StreamUtils.copyToByteArray(inputStream)); + + jsonRpcServer.handle(request, response); + + assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, response.getStatus()); + JsonNode answer = decodeAnswer(toByteArrayOutputStream(response.getContentAsByteArray())); + assertTrue(answer instanceof ArrayNode); + assertEquals("Error", getFromArrayWithId(answer, 1).get(ERROR).get(DATA).get(ERROR_MESSAGE).asText()); + assertEquals("Result2", getFromArrayWithId(answer, 2).get(RESULT).asText()); + } +} diff --git a/src/test/java/com/googlecode/jsonrpc4j/server/JsonRpcServerParallelBatchProcessingTest.java b/src/test/java/com/googlecode/jsonrpc4j/server/JsonRpcServerParallelBatchProcessingTest.java new file mode 100644 index 00000000..26bba97d --- /dev/null +++ b/src/test/java/com/googlecode/jsonrpc4j/server/JsonRpcServerParallelBatchProcessingTest.java @@ -0,0 +1,26 @@ +package com.googlecode.jsonrpc4j.server; + +import com.googlecode.jsonrpc4j.JsonRpcServer; +import com.googlecode.jsonrpc4j.util.Util; +import org.easymock.EasyMockRunner; +import org.junit.Before; +import org.junit.runner.RunWith; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +@RunWith(EasyMockRunner.class) +public class JsonRpcServerParallelBatchProcessingTest extends JsonRpcServerBatchTest { + + @Before + public void setup() { + jsonRpcServer = new JsonRpcServer(Util.mapper, mockService, JsonRpcServerTest.ServiceInterface.class); + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, + 50, + 1000, + TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(50)); + jsonRpcServer.setBatchExecutorService(threadPoolExecutor); + } +} diff --git a/src/test/java/com/googlecode/jsonrpc4j/server/JsonRpcServerSequentialBatchProcessingTest.java b/src/test/java/com/googlecode/jsonrpc4j/server/JsonRpcServerSequentialBatchProcessingTest.java new file mode 100644 index 00000000..868fa8d4 --- /dev/null +++ b/src/test/java/com/googlecode/jsonrpc4j/server/JsonRpcServerSequentialBatchProcessingTest.java @@ -0,0 +1,16 @@ +package com.googlecode.jsonrpc4j.server; + +import com.googlecode.jsonrpc4j.JsonRpcServer; +import com.googlecode.jsonrpc4j.util.Util; +import org.easymock.EasyMockRunner; +import org.junit.Before; +import org.junit.runner.RunWith; + +@RunWith(EasyMockRunner.class) +public class JsonRpcServerSequentialBatchProcessingTest extends JsonRpcServerBatchTest { + + @Before + public void setup() { + jsonRpcServer = new JsonRpcServer(Util.mapper, mockService, JsonRpcServerTest.ServiceInterface.class); + } +}