Skip to content

Commit

Permalink
GH-422 Improvements in cloud event samples
Browse files Browse the repository at this point in the history
Added initial README
Polished tests
  • Loading branch information
olegz committed Nov 11, 2020
1 parent 2a88b52 commit a6eb833
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ else if (this.skipInputConversion) {
else if (input instanceof Message) {
convertedInput = this.convertInputMessageIfNecessary((Message) input, type);
if (convertedInput == null) { // give ConversionService a chance
convertedInput = this.convertNonMessageInputIfNecessary(type, ((Message) input).getPayload());
convertedInput = this.convertNonMessageInputIfNecessary(type, ((Message) input).getPayload(), false);
}
if (convertedInput != null && !FunctionTypeUtils.isMultipleArgumentType(this.inputType)) {
convertedInput = !convertedInput.equals(input)
Expand All @@ -818,7 +818,7 @@ else if (input instanceof Message) {
}
}
else {
convertedInput = this.convertNonMessageInputIfNecessary(type, input);
convertedInput = this.convertNonMessageInputIfNecessary(type, input, JsonMapper.isJsonString(input));
if (convertedInput != null && logger.isDebugEnabled()) {
logger.debug("Converted input: " + input + " to: " + convertedInput);
}
Expand All @@ -827,6 +827,7 @@ else if (input instanceof Message) {
if (this.isWrapConvertedInputInMessage(convertedInput)) {
convertedInput = MessageBuilder.withPayload(convertedInput).build();
}
Assert.notNull(convertedInput, "Failed to convert input: " + input + " to " + type);
return convertedInput;
}

Expand Down Expand Up @@ -897,13 +898,13 @@ private boolean containsRetainMessageSignalInHeaders(Message message) {
/*
*
*/
private Object convertNonMessageInputIfNecessary(Type inputType, Object input) {
private Object convertNonMessageInputIfNecessary(Type inputType, Object input, boolean maybeJson) {
Object convertedInput = null;
Class<?> rawInputType = this.isTypePublisher(inputType) || this.isInputTypeMessage()
? FunctionTypeUtils.getRawType(FunctionTypeUtils.getGenericType(inputType))
: this.getRawClassFor(inputType);

if (JsonMapper.isJsonString(input) && !Message.class.isAssignableFrom(rawInputType)) {
if (maybeJson && !Message.class.isAssignableFrom(rawInputType)) {
if (FunctionTypeUtils.isMessage(inputType)) {
inputType = FunctionTypeUtils.getGenericType(inputType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.cloud.function.context.config;

import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Map;

import org.springframework.cloud.function.json.JsonMapper;
Expand Down Expand Up @@ -50,6 +51,9 @@ protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @
return super.convertFromInternal(message, targetClass, conversionHint);
}
else {
if (targetClass.isInstance(message.getPayload()) && !(message.getPayload() instanceof Collection<?>)) {
return message.getPayload();
}
Type convertToType = conversionHint == null ? targetClass : (Type) conversionHint;
String jsonString = (String) message.getPayload();
Map<String, Object> mapEvent = this.mapper.fromJson(jsonString, Map.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
## Cloud Events with Spring samples

### Introduction
The current example uses spring-cloud-function framework as its core which allows users to only worry about functional aspects of
their requirement while taking care-off non-functional aspects. For more information on Spring Cloud Function please visit
our https://spring.io/projects/spring-cloud-function[project page].
The example provides dependency and instructions to demonstrate several distinct invocation models:

- Direct function invocation
- Function as a REST endpoint
- Function as message handler (e.g., Kafka, RabbitMQ etc)
- Function invocation via RSocket

The POM file defines all the necessary dependency in a segregated way, so you can choose the one you're interested in.

#### Direct function invocation

#### Function as a REST endpoint

Given that SCF allows function to be exposed as REST endpoints, you can post cloud event to any of the
functions by using function name as path (e.g., localhost:8080/<function_name>)

Here is an example of curl command posting a cloud event in binary-mode:

[source, text]
----
curl -w'\n' localhost:8080/asPOJO \
-H "ce-Specversion: 1.0" \
-H "ce-Type: com.example.springevent" \
-H "ce-Source: spring.io/spring-event" \
-H "Content-Type: application/json" \
-H "ce-Id: 0001" \
-d '{"releaseDate":"24-03-2004", "releaseName":"Spring Framework", "version":"1.0"}'
----

And here is an example of curl command posting a cloud event in structured-mode:

[source, text]
----
curl -w'\n' localhost:8080/asString \
-H "ce-Specversion: 1.0" \
-H "ce-Type: com.example.springevent" \
-H "ce-Source: spring.io/spring-event" \
-H "Content-Type: application/cloudevents+json" \
-H "ce-Id: 0001" \
-d '{
"specversion" : "1.0",
"type" : "org.springframework",
"source" : "https://spring.io/",
"id" : "A234-1234-1234",
"datacontenttype" : "application/json",
"data" : {
"version" : "1.0",
"releaseName" : "Spring Framework",
"releaseDate" : "24-03-2004"
}
}'
----

#### Function as message handler (e.g., Kafka, RabbitMQ etc)

Streaming support for Kafka and Rabbit is provided via Spring Cloud Stream framework (link). In fact we're only mentioning Kafka and Rabbit here as an example.
Streaming support is automatically provided for any existing binders (e.g., Solace, GCP, AWS etc) (link)
Binders are components of SCSt responsible to bind user code (e.g., function) to broker destinations so execution is triggered
by messages on broker destination and results of execution are sent to broker destinations. Binders also provide support consumer
groups and partitioning for both Kafka and RabbitMQ messaging systems.


#### Function invocation via RSocket

TBD
22 changes: 21 additions & 1 deletion spring-cloud-function-samples/function-sample-cloudevent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,42 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<!-- REST - only needed if you intend to invoke via HTTP -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-web</artifactId>
<version>3.1.0-SNAPSHOT</version>
</dependency>
<!-- end REST -->

<!-- RSocket - only needed if you intend to invoke via RSocket -->
<!-- <dependency> -->
<!-- <groupId>org.springframework.cloud</groupId> -->
<!-- <artifactId>spring-cloud-function-rsocket</artifactId> -->
<!-- <version>3.1.0-SNAPSHOT</version> -->
<!-- </dependency> -->
<!-- end RSocket -->

<!-- RabbitMQ - only needed if you intend to invoke via RabbitMQ -->
<!-- <dependency> -->
<!-- <groupId>org.springframework.cloud</groupId> -->
<!-- <artifactId>spring-cloud-stream-binder-rabbit</artifactId> -->
<!-- <version>3.1.0-SNAPSHOT</version> -->
<!-- </dependency> -->
<!-- end RabbitMQ -->

<!-- Kafka - only needed if you intend to invoke via RabbitMQ -->
<!-- <dependency> -->
<!-- <groupId>org.springframework.cloud</groupId> -->
<!-- <artifactId>spring-cloud-stream-binder-kafka</artifactId> -->
<!-- <version>3.1.0-SNAPSHOT</version> -->
<!-- </dependency> -->
<!-- end Kafka -->

<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,59 +43,38 @@ public static void main(String[] args) throws Exception {
SpringApplication.run(CloudeventDemoApplication.class, args);
}

/*
* curl -w'\n' localhost:8080/asStringMessage \
* -H "Ce-Specversion: 1.0" \
* -H "Ce-Type: com.example.springevent" \
* -H "Ce-Source: spring.io/spring-event" \
* -H "Content-Type: application/json" \
* -H "Ce-Id: 0001" \
* -d '{"releaseDate":"2004-03-24", "releaseName":"Spring Framework", "version":"1.0"}'
*/
@Bean
public Function<Message<String>, String> asStringMessage() {
return v -> v.getPayload().toString();
return v -> {
System.out.println("Received Cloud Event with raw data: " + v);
return v.getPayload();
};
}

/*
* curl -w'\n' localhost:8080/asString \
* -H "Ce-Specversion: 1.0" \
* -H "Ce-Type: com.example.springevent" \
* -H "Ce-Source: spring.io/spring-event" \
* -H "Content-Type: application/json" \
* -H "Ce-Id: 0001" \
* -d '{"releaseDate":"2004-03-24", "releaseName":"Spring Framework", "version":"1.0"}'
*/

@Bean
public Function<String, String> asString() {
return v -> v;
return v -> {
System.out.println("Received raw Cloud Event data: " + v);
return v;
};
}

/*
* curl -w'\n' localhost:8080/asPOJOMessage \
* -H "Ce-Specversion: 1.0" \
* -H "Ce-Type: com.example.springevent" \
* -H "Ce-Source: spring.io/spring-event" \
* -H "Content-Type: application/json" \
* -H "Ce-Id: 0001" \
* -d '{"releaseDate":"2004-03-24", "releaseName":"Spring Framework", "version":"1.0"}'
*/

@Bean
public Function<Message<SpringReleaseEvent>, String> asPOJOMessage() {
return v -> v.getPayload().toString();
return v -> {
System.out.println("Received Cloud Event with POJO data: " + v);
return v.getPayload().toString();
};
}

/*
* curl -w'\n' localhost:8080/asPOJO \
* -H "Ce-Specversion: 1.0" \
* -H "Ce-Type: com.example.springevent" \
* -H "Ce-Source: spring.io/spring-event" \
* -H "Content-Type: application/json" \
* -H "Ce-Id: 0001" \
* -d '{"releaseDate":"2004-03-24", "releaseName":"Spring Framework", "version":"1.0"}'
*/

@Bean
public Function<SpringReleaseEvent, String> asPOJO() {
return v -> v.toString();
return v -> {
System.out.println("Received POJO Cloud Event data: " + v);
return v.toString();
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* @author Oleg Zhurakousky
*
*/
public class CloudeventDemoApplicationTests {
public class CloudeventDemoApplicationRESTTests {

private TestRestTemplate testRestTemplate = new TestRestTemplate();

Expand Down Expand Up @@ -167,6 +167,37 @@ public void testAsStracturalFormatToPOJO() throws Exception {
assertThat(response.getBody()).isEqualTo("releaseDate:24-03-2004; releaseName:Spring Framework; version:1.0");
}

@Test
public void testAsStracturalFormatToString() throws Exception {
SpringApplication.run(CloudeventDemoApplication.class);

String payload = "{\n" +
" \"specversion\" : \"1.0\",\n" +
" \"type\" : \"org.springframework\",\n" +
" \"source\" : \"https://spring.io/\",\n" +
" \"id\" : \"A234-1234-1234\",\n" +
" \"datacontenttype\" : \"application/json\",\n" +
" \"data\" : {\n" +
" \"version\" : \"1.0\",\n" +
" \"releaseName\" : \"Spring Framework\",\n" +
" \"releaseDate\" : \"24-03-2004\"\n" +
" }\n" +
"}";

HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.valueOf("application/cloudevents+json;charset=utf-8"));

RequestEntity<String> re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asStringMessage"));
ResponseEntity<String> response = testRestTemplate.exchange(re, String.class);

assertThat(response.getBody()).isEqualTo(payload);

re = new RequestEntity<>(payload, headers, HttpMethod.POST, this.constructURI("/asString"));
response = testRestTemplate.exchange(re, String.class);

assertThat(response.getBody()).isEqualTo(payload);
}


@Configuration
public static class FooBarConverterConfiguration {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.spring.cloudevent;

/**
*
* @author Oleg Zhurakousky
*
*/
public class CloudeventDemoApplicationStreamTests {


}

0 comments on commit a6eb833

Please sign in to comment.