Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial functionality for default function routing #3691

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The `Retry` filter supports the following parameters:
* `methods`: The HTTP methods that should be retried, represented by using `org.springframework.http.HttpMethod`.
* `series`: The series of status codes to be retried, represented by using `org.springframework.http.HttpStatus.Series`.
* `exceptions`: A list of thrown exceptions that should be retried.
* `cacheBody`: A flag to signal if the request body should be cached. If set to `true`, the `adaptCacheBody` filter must be used to send the cached body downstream.
//* `backoff`: The configured exponential backoff for the retries.
//Retries are performed after a backoff interval of `firstBackoff * (factor ^ n)`, where `n` is the iteration.
//If `maxBackoff` is configured, the maximum backoff applied is limited to `maxBackoff`.
Expand All @@ -20,8 +21,11 @@ The following defaults are configured for `Retry` filter, if enabled:
* `series`: 5XX series
* `methods`: GET method
* `exceptions`: `IOException`, `TimeoutException` and `RetryException`
* `cacheBody`: `false`
//* `backoff`: disabled

WARNING: Setting `cacheBody` to `true` causes the gateway to read the whole body into memory. This should be used with caution.

The following listing configures a Retry filter:

.application.yml
Expand All @@ -42,11 +46,14 @@ spring:
retries: 3
series: SERVER_ERROR
methods: GET,POST
cacheBody: true
- name: AdaptCachedBody
----

.GatewaySampleApplication.java
[source,java]
----
import static org.springframework.cloud.gateway.server.mvc.filter.FilterFunctions.adaptCachedBody;
import static org.springframework.cloud.gateway.server.mvc.filter.RetryFilterFunctions.retry;
import static org.springframework.cloud.gateway.server.mvc.handler.GatewayRouterFunctions.route;
import static org.springframework.cloud.gateway.server.mvc.handler.HandlerFunctions.http;
Expand All @@ -59,7 +66,8 @@ class RouteConfiguration {
public RouterFunction<ServerResponse> gatewayRouterFunctionsAddReqHeader() {
return route("add_request_parameter_route")
.route(host("*.retry.com"), http("https://example.org"))
.filter(retry(config -> config.setRetries(3).setSeries(Set.of(HttpStatus.Series.SERVER_ERROR)).setMethods(Set.of(HttpMethod.GET, HttpMethod.POST))))
.filter(retry(config -> config.setRetries(3).setSeries(Set.of(HttpStatus.Series.SERVER_ERROR)).setMethods(Set.of(HttpMethod.GET, HttpMethod.POST)).setCacheBody(true)))
.filter(adaptCachedBody())
.build();
}
}
Expand Down
2 changes: 1 addition & 1 deletion docs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"@antora/atlas-extension": "1.0.0-alpha.2",
"@antora/collector-extension": "1.0.1",
"@asciidoctor/tabs": "1.0.0-beta.6",
"@springio/antora-extensions": "1.14.2",
"@springio/antora-extensions": "1.14.4",
"@springio/asciidoctor-extensions": "1.0.0-alpha.14"
}
}
2 changes: 1 addition & 1 deletion spring-cloud-gateway-integration-tests/grpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<properties>
<protoc.version>3.25.1</protoc.version>
<grpc.version>1.70.0</grpc.version>
<grpc.version>1.71.0</grpc.version>
</properties>

<parent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,14 @@ public static ByteArrayInputStream cacheBody(ServerRequest request) {
}
}

public static ByteArrayInputStream getOrCacheBody(ServerRequest request) {
ByteArrayInputStream body = getAttribute(request, MvcUtils.CACHED_REQUEST_BODY_ATTR);
if (body != null) {
return body;
}
return cacheBody(request);
}

public static String expand(ServerRequest request, String template) {
Assert.notNull(request, "request may not be null");
Assert.notNull(template, "template may not be null");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2013-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.server.mvc.config;

import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.function.RouterFunction;
import org.springframework.web.servlet.function.ServerResponse;

import static org.springframework.cloud.gateway.server.mvc.handler.GatewayRouterFunctions.route;
import static org.springframework.cloud.gateway.server.mvc.handler.HandlerFunctions.fn;

@Configuration
@ConditionalOnClass(name = "org.springframework.cloud.function.context.FunctionCatalog")
@ConditionalOnProperty(name = "spring.cloud.gateway.function.enabled", havingValue = "true", matchIfMissing = true)
public class DefaultFunctionConfiguration {

@Bean
RouterFunction<ServerResponse> gatewayToFunctionRouter() {
// @formatter:off
return route("functionroute")
.POST("/{path}/{name}", fn("{path}/{name}"))
.POST("/{path}", fn("{path}"))
.GET("/{path}/{name}", fn("{path}/{name}"))
.GET("/{path}", fn("{path}"))
.build();
// @formatter:on
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.function.Consumer;

import org.springframework.cloud.gateway.server.mvc.common.Configurable;
import org.springframework.cloud.gateway.server.mvc.common.MvcUtils;
import org.springframework.cloud.gateway.server.mvc.common.Shortcut;
import org.springframework.core.NestedRuntimeException;
import org.springframework.http.HttpMethod;
Expand Down Expand Up @@ -71,6 +72,9 @@ public static HandlerFilterFunction<ServerResponse, ServerResponse> retry(RetryC
.setPolicies(Arrays.asList(simpleRetryPolicy, new HttpRetryPolicy(config)).toArray(new RetryPolicy[0]));
RetryTemplate retryTemplate = retryTemplateBuilder.customPolicy(compositeRetryPolicy).build();
return (request, next) -> retryTemplate.execute(context -> {
if (config.isCacheBody()) {
MvcUtils.getOrCacheBody(request);
}
ServerResponse serverResponse = next.handle(request);

if (isRetryableStatusCode(serverResponse.statusCode(), config)
Expand Down Expand Up @@ -121,6 +125,8 @@ public static class RetryConfig {

private Set<HttpMethod> methods = new HashSet<>(List.of(HttpMethod.GET));

private boolean cacheBody = false;

// TODO: individual statuses
// TODO: backoff
// TODO: support more Spring Retry policies
Expand Down Expand Up @@ -176,6 +182,15 @@ public RetryConfig addMethods(HttpMethod... methods) {
return this;
}

public boolean isCacheBody() {
return cacheBody;
}

public RetryConfig setCacheBody(boolean cacheBody) {
this.cacheBody = cacheBody;
return this;
}

}

private static class RetryException extends NestedRuntimeException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.cloud.gateway.server.mvc.handler;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -50,9 +51,16 @@ private FunctionHandlerRequestProcessingHelper() {

}

@SuppressWarnings({ "rawtypes", "unchecked" })
static ServerResponse processRequest(ServerRequest request, FunctionInvocationWrapper function, Object argument,
boolean eventStream, List<String> ignoredHeaders, List<String> requestOnlyHeaders) {
return processRequest(request, function, argument, eventStream, ignoredHeaders, requestOnlyHeaders, null);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
static ServerResponse processRequest(ServerRequest request, FunctionInvocationWrapper function, Object argument,
boolean eventStream, List<String> ignoredHeaders, List<String> requestOnlyHeaders,
Map<String, String> additionalHeaders) {

if (argument == null) {
argument = "";
}
Expand All @@ -70,42 +78,29 @@ static ServerResponse processRequest(ServerRequest request, FunctionInvocationWr
builder = builder.setHeader(FunctionHandlerHeaderUtils.HTTP_REQUEST_PARAM,
request.params().toSingleValueMap());
}

if (!CollectionUtils.isEmpty(additionalHeaders)) {
builder.copyHeaders(additionalHeaders);
}
inputMessage = builder.copyHeaders(headers.toSingleValueMap()).build();

if (function.isRoutingFunction()) {
function.setSkipOutputConversion(true);
}

if (logger.isDebugEnabled()) {
logger.debug("Sending request to " + function + " with argument: " + inputMessage);
}
Object result = function.apply(inputMessage);
if (function.isConsumer()) {
/*
* if (result instanceof Publisher) { Mono.from((Publisher)
* result).subscribe(); }
*/
return HttpMethod.DELETE.equals(request.method()) ? ServerResponse.ok().build()
: ServerResponse.accepted()
.headers(h -> h.addAll(sanitize(headers, ignoredHeaders, requestOnlyHeaders)))
.build();
// Mono.empty() :
// Mono.just(ResponseEntity.accepted().headers(FunctionHandlerHeaderUtils.sanitize(headers,
// ignoredHeaders, requestOnlyHeaders)).build());
}

BodyBuilder responseOkBuilder = ServerResponse.ok()
.headers(h -> h.addAll(sanitize(headers, ignoredHeaders, requestOnlyHeaders)));

// FIXME: Mono/Flux
/*
* Publisher pResult; if (result instanceof Publisher) { pResult = (Publisher)
* result; if (eventStream) { return Flux.from(pResult); }
*
* if (pResult instanceof Flux) { pResult = ((Flux) pResult).onErrorContinue((e,
* v) -> { logger.error("Failed to process value: " + v, (Throwable) e);
* }).collectList(); } pResult = Mono.from(pResult); } else { pResult =
* Mono.just(result); }
*/

// return Mono.from(pResult).map(v -> {
if (result instanceof Iterable i) {
List aggregatedResult = (List) StreamSupport.stream(i.spliterator(), false).map(m -> {
return m instanceof Message ? processMessage(responseOkBuilder, (Message<?>) m, ignoredHeaders) : m;
Expand All @@ -118,7 +113,6 @@ else if (result instanceof Message message) {
else {
return responseOkBuilder.body(result);
}
// });
}

private static Object processMessage(BodyBuilder responseOkBuilder, Message<?> message,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,19 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.servlet.ServletException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.gateway.server.mvc.GatewayMvcClassPathWarningAutoConfiguration;
import org.springframework.cloud.gateway.server.mvc.common.MvcUtils;
import org.springframework.cloud.gateway.server.mvc.config.RouteProperties;
import org.springframework.cloud.stream.function.StreamOperations;
Expand All @@ -43,6 +50,8 @@

public abstract class HandlerFunctions {

private static final Log log = LogFactory.getLog(GatewayMvcClassPathWarningAutoConfiguration.class);

private HandlerFunctions() {

}
Expand All @@ -56,13 +65,44 @@ public static HandlerFunction<ServerResponse> fn(RouteProperties routeProperties
public static HandlerFunction<ServerResponse> fn(String functionName) {
Assert.hasText(functionName, "'functionName' must not be empty");
return request -> {
String expandedFunctionName = MvcUtils.expand(request, functionName);
FunctionCatalog functionCatalog = MvcUtils.getApplicationContext(request).getBean(FunctionCatalog.class);
FunctionInvocationWrapper function = functionCatalog.lookup(expandedFunctionName,
request.headers().accept().stream().map(MimeType::toString).toArray(String[]::new));
String expandedFunctionName = MvcUtils.expand(request, functionName);
FunctionInvocationWrapper function;
Object body = null;
if (expandedFunctionName.contains("/")) {
String[] functionBodySplit = expandedFunctionName.split("/");
function = functionCatalog.lookup(functionBodySplit[0],
request.headers().accept().stream().map(MimeType::toString).toArray(String[]::new));
if (function != null && function.isSupplier()) {
log.warn("Supplier must not have any arguments. Supplier: '" + function.getFunctionDefinition()
+ "' has '" + functionBodySplit[1] + "' as an argument which is ignored.");
}
body = functionBodySplit[1];
}
else {
function = functionCatalog.lookup(expandedFunctionName,
request.headers().accept().stream().map(MimeType::toString).toArray(String[]::new));
}

/*
* If function can not be found in the current runtime, we will default to
* RoutingFunction which has additional logic to determine the function to
* invoke.
*/
Map<String, String> additionalRequestHeaders = new HashMap<>();
if (function == null) {
additionalRequestHeaders.put(FunctionProperties.FUNCTION_DEFINITION, expandedFunctionName);

function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME,
request.headers().accept().stream().map(MimeType::toString).toArray(String[]::new));
}

if (function != null) {
Object body = function.isSupplier() ? null : request.body(function.getRawInputType());
return processRequest(request, function, body, false, Collections.emptyList(), Collections.emptyList());
if (body == null) {
body = function.isSupplier() ? null : request.body(function.getRawInputType());
}
return processRequest(request, function, body, false, Collections.emptyList(), Collections.emptyList(),
additionalRequestHeaders);
}
return ServerResponse.notFound().build();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ private static class HostPatternPredicate implements RequestPredicate, ChangePat
@Override
public boolean test(ServerRequest request) {
String host = request.headers().firstHeader(HttpHeaders.HOST);
if (host == null) {
host = "";
}
PathContainer pathContainer = PathContainer.parsePath(host, PathContainer.Options.MESSAGE_ROUTE);
PathPattern.PathMatchInfo info = this.pattern.matchAndExtract(pathContainer);
traceMatch("Pattern", this.pattern.getPatternString(), host, info != null);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
org.springframework.cloud.gateway.server.mvc.GatewayServerMvcAutoConfiguration
org.springframework.cloud.gateway.server.mvc.GatewayMvcClassPathWarningAutoConfiguration
org.springframework.cloud.gateway.server.mvc.handler.GatewayMultipartAutoConfiguration
org.springframework.boot.autoconfigure.web.client.RestClientAutoConfiguration
org.springframework.boot.autoconfigure.web.client.RestClientAutoConfiguration
org.springframework.cloud.gateway.server.mvc.config.DefaultFunctionConfiguration
Loading
Loading