diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java index e6d0bbc6cb0..e80d0064330 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/MetaField.java @@ -168,7 +168,12 @@ public enum MetaField { /** * Inlong data time for audit. */ - AUDIT_DATA_TIME; + AUDIT_DATA_TIME, + + /** + * Inlong properties in InlongMsg. + */ + INLONG_PROPERTIES; public static MetaField forName(String name) { for (MetaField metaField : values()) { diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java index fb0f9dcd805..c61b12ab32b 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java @@ -78,6 +78,8 @@ public abstract class ExtractNode implements Node { public static final String INLONG_MSG_AUDIT_TIME = "value.data-time"; + public static final String INLONG_MSG_PROPERTIES = "value.inlong-msg-properties"; + public static final String CONSUME_AUDIT_TIME = "consume_time"; @JsonProperty("id") diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java index ab90ba7d19f..1887aca1459 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java @@ -195,6 +195,9 @@ public String getMetadataKey(MetaField metaField) { metadataKey = CONSUME_AUDIT_TIME; } break; + case INLONG_PROPERTIES: + metadataKey = INLONG_MSG_PROPERTIES; + break; default: throw new UnsupportedOperationException(String.format("Unsupported meta field for %s: %s", this.getClass().getSimpleName(), metaField)); @@ -209,7 +212,7 @@ public boolean isVirtual(MetaField metaField) { @Override public Set supportedMetaFields() { - return EnumSet.of(MetaField.AUDIT_DATA_TIME); + return EnumSet.of(MetaField.AUDIT_DATA_TIME, MetaField.INLONG_PROPERTIES); } @Override