Skip to content

Commit

Permalink
GH-422 GH-606 Add support for simplifying message headers to attribut…
Browse files Browse the repository at this point in the history
…e mapping

Added CloudEventAttributesProvider and default implementation
Added CloudEventMessageUtils
  • Loading branch information
olegz committed Nov 13, 2020
1 parent f999cdd commit 9c58e6d
Show file tree
Hide file tree
Showing 12 changed files with 438 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.function.cloudevent;

import java.util.HashMap;
import java.util.Map;


/**
*
* @author Oleg Zhurakousky
* @since 3.1
*/
public class CloudEventAttributes extends HashMap<String, Object> {

/**
*
*/
private static final long serialVersionUID = 5393610770855366497L;



CloudEventAttributes(Map<String, Object> headers) {
super(headers);
}

@SuppressWarnings("unchecked")
public <A> A getId() {
return this.containsKey(CloudEventMessageUtils.CE_ID)
? (A) this.get(CloudEventMessageUtils.CE_ID)
: (A) this.get(CloudEventMessageUtils.ID);
}

@SuppressWarnings("unchecked")
public <A> A getSource() {
return this.containsKey(CloudEventMessageUtils.CE_SOURCE)
? (A) this.get(CloudEventMessageUtils.CE_SOURCE)
: (A) this.get(CloudEventMessageUtils.SOURCE);
}

@SuppressWarnings("unchecked")
public <A> A getSpecversion() {
return this.containsKey(CloudEventMessageUtils.CE_SPECVERSION)
? (A) this.get(CloudEventMessageUtils.CE_SPECVERSION)
: (A) this.get(CloudEventMessageUtils.SPECVERSION);
}

@SuppressWarnings("unchecked")
public <A> A getType() {
return this.containsKey(CloudEventMessageUtils.CE_TYPE)
? (A) this.get(CloudEventMessageUtils.CE_TYPE)
: (A) this.get(CloudEventMessageUtils.TYPE);
}

@SuppressWarnings("unchecked")
public <A> A getDataContentType() {
return this.containsKey(CloudEventMessageUtils.CE_DATACONTENTTYPE)
? (A) this.get(CloudEventMessageUtils.CE_DATACONTENTTYPE)
: (A) this.get(CloudEventMessageUtils.DATACONTENTTYPE);
}

public void setDataContentType(String datacontenttype) {
this.put(CloudEventMessageUtils.CE_DATACONTENTTYPE, datacontenttype);
}

@SuppressWarnings("unchecked")
public <A> A getAtttribute(String name) {
return (A) this.get(name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2020-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.function.cloudevent;

import org.springframework.messaging.MessageHeaders;

/**
*
* @author Oleg Zhurakousky
* @since 3.1
*/
public interface CloudEventAtttributesProvider {

/**
* Will construct instance of {@link CloudEventAttributes} setting its required attributes.
*
* @param ce_id value for Cloud Event 'id' attribute
* @param ce_specversion value for Cloud Event 'specversion' attribute
* @param ce_source value for Cloud Event 'source' attribute
* @param ce_type value for Cloud Event 'type' attribute
* @return instance of {@link CloudEventAttributes}
*/
CloudEventAttributes get(String ce_id, String ce_specversion, String ce_source, String ce_type);

/**
* Will construct instance of {@link CloudEventAttributes}
* Should default/generate cloud event ID and SPECVERSION.
*
* @param ce_source value for Cloud Event 'source' attribute
* @param ce_type value for Cloud Event 'type' attribute
* @return instance of {@link CloudEventAttributes}
*/
CloudEventAttributes get(String ce_source, String ce_type);


/**
* Will construct instance of {@link CloudEventAttributes} from {@link MessageHeaders}.
*
* Should copy Cloud Event related headers into an instance of {@link CloudEventAttributes}
* NOTE: Certain headers must not be copied.
*
* @param headers instance of {@link MessageHeaders}
* @return modifiable instance of {@link CloudEventAttributes}
*/
RequiredAttributeAccessor get(MessageHeaders headers);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class CloudEventDataContentTypeMessagePreProcessor implements Function<Me

private final ContentTypeResolver contentTypeResolver = new DefaultContentTypeResolver();

private final MimeType cloudEventContentType = MimeTypeUtils.parseMimeType("application/cloudevents");
private final MimeType cloudEventContentType = CloudEventMessageUtils.APPLICATION_CLOUDEVENTS;

private final CompositeMessageConverter messageConverter;

Expand All @@ -62,7 +62,7 @@ public CloudEventDataContentTypeMessagePreProcessor(CompositeMessageConverter me
@SuppressWarnings("unchecked")
@Override
public Message<?> apply(Message<?> inputMessage) {
if (CloudEventUtils.isBinary(inputMessage)) {
if (CloudEventMessageUtils.isBinary(inputMessage.getHeaders())) {
String dataContentType = this.getDataContentType(inputMessage.getHeaders());
Message<?> message = MessageBuilder.fromMessage(inputMessage)
.setHeader(MessageHeaders.CONTENT_TYPE, dataContentType)
Expand All @@ -78,7 +78,7 @@ else if (this.isStructured(inputMessage)) {
.parseMimeType(contentType.getType() + "/" + suffix);
Message<?> cloudEventMessage = MessageBuilder.fromMessage(inputMessage)
.setHeader(MessageHeaders.CONTENT_TYPE, cloudEventDeserializationContentType)
.setHeader(CloudEventUtils.CE_DATACONTENTTYPE, dataContentType).build();
.setHeader(CloudEventMessageUtils.CE_DATACONTENTTYPE, dataContentType).build();
Map<String, Object> structuredCloudEvent = (Map<String, Object>) this.messageConverter
.fromMessage(cloudEventMessage, Map.class);
Message<?> binaryCeMessage = this.buildCeMessageFromStructured(structuredCloudEvent);
Expand All @@ -90,27 +90,27 @@ else if (this.isStructured(inputMessage)) {
}

private Message<?> buildCeMessageFromStructured(Map<String, Object> structuredCloudEvent) {
MessageBuilder<?> builder = MessageBuilder.withPayload(structuredCloudEvent.get(CloudEventUtils.DATA));
structuredCloudEvent.remove(CloudEventUtils.DATA);
MessageBuilder<?> builder = MessageBuilder.withPayload(structuredCloudEvent.get(CloudEventMessageUtils.DATA));
structuredCloudEvent.remove(CloudEventMessageUtils.DATA);
builder.copyHeaders(structuredCloudEvent);
return builder.build();
}

private String getDataContentType(MessageHeaders headers) {
if (headers.containsKey(CloudEventUtils.DATACONTENTTYPE)) {
return (String) headers.get(CloudEventUtils.DATACONTENTTYPE);
if (headers.containsKey(CloudEventMessageUtils.DATACONTENTTYPE)) {
return (String) headers.get(CloudEventMessageUtils.DATACONTENTTYPE);
}
else if (headers.containsKey(CloudEventUtils.CE_DATACONTENTTYPE)) {
return (String) headers.get(CloudEventUtils.CE_DATACONTENTTYPE);
else if (headers.containsKey(CloudEventMessageUtils.CE_DATACONTENTTYPE)) {
return (String) headers.get(CloudEventMessageUtils.CE_DATACONTENTTYPE);
}
else if (headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
return headers.get(MessageHeaders.CONTENT_TYPE).toString();
}
return "application/json";
return MimeTypeUtils.APPLICATION_JSON_VALUE;
}

private boolean isStructured(Message<?> message) {
if (!CloudEventUtils.isBinary(message)) {
if (!CloudEventMessageUtils.isBinary(message.getHeaders())) {
Map<String, Object> headers = message.getHeaders();

if (headers.containsKey(MessageHeaders.CONTENT_TYPE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
public class CloudEventJsonMessageConverter extends JsonMessageConverter {

public CloudEventJsonMessageConverter(JsonMapper jsonMapper) {
super(jsonMapper, new MimeType("application", "cloudevents+json"));
super(jsonMapper, new MimeType(CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getType(),
CloudEventMessageUtils.APPLICATION_CLOUDEVENTS.getSubtype() + "+json"));
this.setStrictContentTypeMatch(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.Map;

import org.springframework.messaging.Message;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;

/**
* Miscellaneous utility methods to deal with Cloud Events - https://cloudevents.io/.
Expand All @@ -28,12 +30,22 @@
* @author Oleg Zhurakousky
* @since 3.1
*/
public final class CloudEventUtils {
public final class CloudEventMessageUtils {

private CloudEventUtils() {
private CloudEventMessageUtils() {

}

/**
* String value of 'application/cloudevents' mime type.
*/
public static String APPLICATION_CLOUDEVENTS_VALUE = "application/cloudevents";

/**
* {@link MimeType} instance representing 'application/cloudevents' mime type.
*/
public static MimeType APPLICATION_CLOUDEVENTS = MimeTypeUtils.parseMimeType(APPLICATION_CLOUDEVENTS_VALUE);

/**
* Prefix for attributes.
*/
Expand Down Expand Up @@ -132,16 +144,15 @@ private CloudEventUtils() {
/**
* Checks if {@link Message} represents cloud event in binary-mode.
*/
public static boolean isBinary(Message<?> message) {
Map<String, Object> headers = message.getHeaders();
return (headers.containsKey("id")
&& headers.containsKey("source")
&& headers.containsKey("specversion")
&& headers.containsKey("type"))
public static boolean isBinary(Map<String, Object> headers) {
return (headers.containsKey(ID)
&& headers.containsKey(SOURCE)
&& headers.containsKey(SPECVERSION)
&& headers.containsKey(TYPE))
||
(headers.containsKey("ce_id")
&& headers.containsKey("ce_source")
&& headers.containsKey("ce_specversion")
&& headers.containsKey("ce_type"));
(headers.containsKey(CE_ID)
&& headers.containsKey(CE_SOURCE)
&& headers.containsKey(CE_SPECVERSION)
&& headers.containsKey(CE_TYPE));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.function.cloudevent;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;

/**
*
* @author Oleg Zhurakousky
* @since 3.1
*
*/
public class DefaultCloudEventAttributesProvider implements CloudEventAtttributesProvider {
/*
* should i provide instance() method for convinience or should it be always injected into function
*/

@Override
public CloudEventAttributes get(String ce_id, String ce_specversion, String ce_source, String ce_type) {
Assert.hasText(ce_id, "'ce_id' must not be null or empty");
Assert.hasText(ce_specversion, "'ce_specversion' must not be null or empty");
Assert.hasText(ce_source, "'ce_source' must not be null or empty");
Assert.hasText(ce_type, "'ce_type' must not be null or empty");
Map<String, Object> requiredAttributes = new HashMap<>();
requiredAttributes.put(CloudEventMessageUtils.CE_ID, ce_id);
requiredAttributes.put(CloudEventMessageUtils.CE_SPECVERSION, ce_specversion);
requiredAttributes.put(CloudEventMessageUtils.CE_SOURCE, ce_source);
requiredAttributes.put(CloudEventMessageUtils.CE_TYPE, ce_type);
return new CloudEventAttributes(requiredAttributes);
}

@Override
public CloudEventAttributes get(String ce_source, String ce_type) {
return this.get(UUID.randomUUID().toString(), "1.0", ce_source, ce_type);
}

/**
* By default it will copy all the headers while exposing accessor to allow user to modify any of them.
*/
@Override
public RequiredAttributeAccessor get(MessageHeaders headers) {
return new RequiredAttributeAccessor(headers);
}

}
Loading

0 comments on commit 9c58e6d

Please sign in to comment.