Skip to content

Commit

Permalink
[INLONG-11674][Sort] Pulsar Source supports InlongMsg metadata (#11691)
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng authored Jan 22, 2025
1 parent 8cf5ed7 commit 5fd2248
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,12 @@ public enum MetaField {
/**
* Inlong data time for audit.
*/
AUDIT_DATA_TIME;
AUDIT_DATA_TIME,

/**
* Inlong properties in InlongMsg.
*/
INLONG_PROPERTIES;

public static MetaField forName(String name) {
for (MetaField metaField : values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public abstract class ExtractNode implements Node {

public static final String INLONG_MSG_AUDIT_TIME = "value.data-time";

public static final String INLONG_MSG_PROPERTIES = "value.inlong-msg-properties";

public static final String CONSUME_AUDIT_TIME = "consume_time";

@JsonProperty("id")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ public String getMetadataKey(MetaField metaField) {
metadataKey = CONSUME_AUDIT_TIME;
}
break;
case INLONG_PROPERTIES:
metadataKey = INLONG_MSG_PROPERTIES;
break;
default:
throw new UnsupportedOperationException(String.format("Unsupported meta field for %s: %s",
this.getClass().getSimpleName(), metaField));
Expand All @@ -209,7 +212,7 @@ public boolean isVirtual(MetaField metaField) {

@Override
public Set<MetaField> supportedMetaFields() {
return EnumSet.of(MetaField.AUDIT_DATA_TIME);
return EnumSet.of(MetaField.AUDIT_DATA_TIME, MetaField.INLONG_PROPERTIES);
}

@Override
Expand Down

0 comments on commit 5fd2248

Please sign in to comment.