Skip to content

Commit

Permalink
Adds support for spring-cloud-function and spring-cloud-stream handle…
Browse files Browse the repository at this point in the history
…rs (#3646)

* Add support for spring cloud function handler.

* Add support for spring cloud stream handler.

* Add support for path variable expansion in fn: and stream: URIs

* Adds support for config based fn/stream HandlerFunctions

* Starting support for webflux function support

* Adds fn:functionName support for webflux server

* Adds spring-cloud-starter-stream-rabbit to test scope

* Adds StreamRoutingFilter to webflux server
  • Loading branch information
spencergibb authored Jan 17, 2025
1 parent ab5e61d commit af200a5
Show file tree
Hide file tree
Showing 20 changed files with 1,512 additions and 3 deletions.
16 changes: 16 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
<junit-pioneer.version>2.3.0</junit-pioneer.version>
<spring-cloud-circuitbreaker.version>3.2.1-SNAPSHOT</spring-cloud-circuitbreaker.version>
<spring-cloud-commons.version>4.3.0-SNAPSHOT</spring-cloud-commons.version>
<spring-cloud-function.version>4.2.1-SNAPSHOT</spring-cloud-function.version>
<spring-cloud-stream.version>4.2.1-SNAPSHOT</spring-cloud-stream.version>
</properties>

<dependencyManagement>
Expand All @@ -75,6 +77,20 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-dependencies</artifactId>
<version>${spring-cloud-function.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>${spring-cloud-stream.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-test-support</artifactId>
Expand Down
30 changes: 30 additions & 0 deletions spring-cloud-gateway-server-mvc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-commons</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-context</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-loadbalancer</artifactId>
Expand Down Expand Up @@ -89,7 +99,22 @@
<artifactId>spring-boot-properties-migrator</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<scope>test</scope>
</dependency>
<!-- Third party test dependencies -->
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j_jdk17-caffeine</artifactId>
Expand All @@ -105,5 +130,10 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.server.mvc.handler;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import org.springframework.http.HttpHeaders;
import org.springframework.messaging.MessageHeaders;

/**
* @author Dave Syer
* @author Oleg Zhurakousky
*/
public final class FunctionHandlerHeaderUtils {

/**
* Message Header name which contains HTTP request parameters.
*/
public static final String HTTP_REQUEST_PARAM = "http_request_param";

private static HttpHeaders IGNORED = new HttpHeaders();

private static HttpHeaders REQUEST_ONLY = new HttpHeaders();

static {
IGNORED.add(MessageHeaders.ID, "");
IGNORED.add(HttpHeaders.CONTENT_LENGTH, "0");
// Headers that would typically be added by a downstream client
REQUEST_ONLY.add(HttpHeaders.ACCEPT, "");
REQUEST_ONLY.add(HttpHeaders.CONTENT_LENGTH, "");
REQUEST_ONLY.add(HttpHeaders.CONTENT_TYPE, "");
REQUEST_ONLY.add(HttpHeaders.HOST, "");
}

private FunctionHandlerHeaderUtils() {
throw new IllegalStateException("Can't instantiate a utility class");
}

public static HttpHeaders fromMessage(MessageHeaders headers, List<String> ignoredHeders) {
HttpHeaders result = new HttpHeaders();
for (String name : headers.keySet()) {
Object value = headers.get(name);
name = name.toLowerCase(Locale.ROOT);
if (!IGNORED.containsKey(name) && !ignoredHeders.contains(name)) {
Collection<?> values = multi(value);
for (Object object : values) {
result.set(name, object.toString());
}
}
}
return result;
}

@SuppressWarnings("unchecked")
public static HttpHeaders fromMessage(MessageHeaders headers) {
return fromMessage(headers, Collections.EMPTY_LIST);
}

public static HttpHeaders sanitize(HttpHeaders request, List<String> ignoredHeders,
List<String> requestOnlyHeaders) {
HttpHeaders result = new HttpHeaders();
for (String name : request.keySet()) {
List<String> value = request.get(name);
name = name.toLowerCase(Locale.ROOT);
if (!IGNORED.containsKey(name) && !REQUEST_ONLY.containsKey(name) && !ignoredHeders.contains(name)
&& !requestOnlyHeaders.contains(name)) {
result.put(name, value);
}
}
return result;
}

@SuppressWarnings("unchecked")
public static HttpHeaders sanitize(HttpHeaders request) {
return sanitize(request, Collections.EMPTY_LIST, Collections.EMPTY_LIST);
}

public static MessageHeaders fromHttp(HttpHeaders headers) {
Map<String, Object> map = new LinkedHashMap<>();
for (String name : headers.keySet()) {
Collection<?> values = multi(headers.get(name));
name = name.toLowerCase(Locale.ROOT);
Object value = values == null ? null : (values.size() == 1 ? values.iterator().next() : values);
if (name.toLowerCase(Locale.ROOT).equals(HttpHeaders.CONTENT_TYPE.toLowerCase(Locale.ROOT))) {
name = MessageHeaders.CONTENT_TYPE;
}
map.put(name, value);
}
return new MessageHeaders(map);
}

private static Collection<?> multi(Object value) {
return value instanceof Collection ? (Collection<?>) value : Arrays.asList(value);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.gateway.server.mvc.handler;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.CollectionUtils;
import org.springframework.web.servlet.function.ServerRequest;
import org.springframework.web.servlet.function.ServerResponse;
import org.springframework.web.servlet.function.ServerResponse.BodyBuilder;

import static org.springframework.cloud.gateway.server.mvc.handler.FunctionHandlerHeaderUtils.fromMessage;
import static org.springframework.cloud.gateway.server.mvc.handler.FunctionHandlerHeaderUtils.sanitize;

/**
* !INTERNAL USE ONLY!
*
* @author Oleg Zhurakousky
*
*/
final class FunctionHandlerRequestProcessingHelper {

private static Log logger = LogFactory.getLog(FunctionHandlerRequestProcessingHelper.class);

private FunctionHandlerRequestProcessingHelper() {

}

@SuppressWarnings({ "rawtypes", "unchecked" })
static ServerResponse processRequest(ServerRequest request, FunctionInvocationWrapper function, Object argument,
boolean eventStream, List<String> ignoredHeaders, List<String> requestOnlyHeaders) {
if (argument == null) {
argument = "";
}

if (function == null) {
return ServerResponse.notFound().build();
}

HttpHeaders headers = request.headers().asHttpHeaders();

Message<?> inputMessage = null;

MessageBuilder builder = MessageBuilder.withPayload(argument);
if (!CollectionUtils.isEmpty(request.params())) {
builder = builder.setHeader(FunctionHandlerHeaderUtils.HTTP_REQUEST_PARAM,
request.params().toSingleValueMap());
}
inputMessage = builder.copyHeaders(headers.toSingleValueMap()).build();

if (function.isRoutingFunction()) {
function.setSkipOutputConversion(true);
}

Object result = function.apply(inputMessage);
if (function.isConsumer()) {
/*
* if (result instanceof Publisher) { Mono.from((Publisher)
* result).subscribe(); }
*/
return HttpMethod.DELETE.equals(request.method()) ? ServerResponse.ok().build()
: ServerResponse.accepted()
.headers(h -> h.addAll(sanitize(headers, ignoredHeaders, requestOnlyHeaders)))
.build();
// Mono.empty() :
// Mono.just(ResponseEntity.accepted().headers(FunctionHandlerHeaderUtils.sanitize(headers,
// ignoredHeaders, requestOnlyHeaders)).build());
}

BodyBuilder responseOkBuilder = ServerResponse.ok()
.headers(h -> h.addAll(sanitize(headers, ignoredHeaders, requestOnlyHeaders)));

// FIXME: Mono/Flux
/*
* Publisher pResult; if (result instanceof Publisher) { pResult = (Publisher)
* result; if (eventStream) { return Flux.from(pResult); }
*
* if (pResult instanceof Flux) { pResult = ((Flux) pResult).onErrorContinue((e,
* v) -> { logger.error("Failed to process value: " + v, (Throwable) e);
* }).collectList(); } pResult = Mono.from(pResult); } else { pResult =
* Mono.just(result); }
*/

// return Mono.from(pResult).map(v -> {
if (result instanceof Iterable i) {
List aggregatedResult = (List) StreamSupport.stream(i.spliterator(), false).map(m -> {
return m instanceof Message ? processMessage(responseOkBuilder, (Message<?>) m, ignoredHeaders) : m;
}).collect(Collectors.toList());
return responseOkBuilder.header("content-type", "application/json").body(aggregatedResult);
}
else if (result instanceof Message message) {
return responseOkBuilder.body(processMessage(responseOkBuilder, message, ignoredHeaders));
}
else {
return responseOkBuilder.body(result);
}
// });
}

private static Object processMessage(BodyBuilder responseOkBuilder, Message<?> message,
List<String> ignoredHeaders) {
responseOkBuilder.headers(h -> h.addAll(fromMessage(message.getHeaders(), ignoredHeaders)));
return message.getPayload();
}

}
Loading

0 comments on commit af200a5

Please sign in to comment.