From 368acab0db3bb077d4b00b61e317d5328f3861e2 Mon Sep 17 00:00:00 2001
From: eemhu <125959687+eemhu@users.noreply.github.com>
Date: Mon, 17 Feb 2025 16:20:51 +0200
Subject: [PATCH] Issue #75: Integrate new akv_01 with metadata wrapper objects
(#76)
* add isStub() checks to DefaultPlugin; fix tests to use ParsedEventFactory with UnparsedEventImpl and metadata wrapper objects; fix ParsedEventWithException to match the new interface; update nlf_01 to 2.3.0; update akv_01 to 6.3.0
* initialize RealHostname in ctor; change time variable to enqueuedTime
* run RealHostname in ctor
---
pom.xml | 4 +-
.../teragrep/aer_02/plugin/DefaultPlugin.java | 75 ++++++++++++-------
.../plugin/ParsedEventWithException.java | 33 +++++---
.../aer_02/EventDataConsumerTest.java | 21 +++++-
.../aer_02/EventDataConsumerTlsTest.java | 11 ++-
.../plugin/DefaultPluginFactoryTest.java | 19 ++++-
.../aer_02/plugin/DefaultPluginTest.java | 14 +++-
7 files changed, 132 insertions(+), 45 deletions(-)
diff --git a/pom.xml b/pom.xml
index 159c250..67ccec2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,12 +51,12 @@
com.teragrep
nlf_01
- 2.2.0
+ 2.3.0
com.teragrep
akv_01
- 6.0.0
+ 6.3.0
diff --git a/src/main/java/com/teragrep/aer_02/plugin/DefaultPlugin.java b/src/main/java/com/teragrep/aer_02/plugin/DefaultPlugin.java
index 3550303..4a3bea5 100644
--- a/src/main/java/com/teragrep/aer_02/plugin/DefaultPlugin.java
+++ b/src/main/java/com/teragrep/aer_02/plugin/DefaultPlugin.java
@@ -49,6 +49,7 @@
import com.teragrep.akv_01.event.ParsedEvent;
import com.teragrep.akv_01.plugin.Plugin;
import com.teragrep.nlf_01.PropertiesJson;
+import com.teragrep.nlf_01.util.RealHostname;
import com.teragrep.rlo_14.Facility;
import com.teragrep.rlo_14.SDElement;
import com.teragrep.rlo_14.Severity;
@@ -57,7 +58,6 @@
import java.time.Instant;
import java.time.ZonedDateTime;
-import java.time.format.DateTimeParseException;
import java.util.*;
public final class DefaultPlugin implements Plugin {
@@ -67,7 +67,11 @@ public final class DefaultPlugin implements Plugin {
private final String syslogAppname;
public DefaultPlugin(final String realHostname, final String syslogHostname, final String syslogAppname) {
- this.realHostname = realHostname;
+ this(new RealHostname(realHostname), syslogHostname, syslogAppname);
+ }
+
+ public DefaultPlugin(final RealHostname realHostname, final String syslogHostname, final String syslogAppname) {
+ this.realHostname = realHostname.hostname();
this.syslogHostname = syslogHostname;
this.syslogAppname = syslogAppname;
}
@@ -75,43 +79,59 @@ public DefaultPlugin(final String realHostname, final String syslogHostname, fin
@Override
public List syslogMessage(final ParsedEvent parsedEvent) {
final List syslogMessages = new ArrayList<>();
- ZonedDateTime time;
- boolean timeSet;
- try {
- time = parsedEvent.enqueuedTime().zonedDateTime();
- timeSet = true;
+ ZonedDateTime enqueuedTime;
+ if (!parsedEvent.enqueuedTimeUtc().isStub()) {
+ enqueuedTime = parsedEvent.enqueuedTimeUtc().zonedDateTime();
}
- catch (DateTimeParseException ignored) {
- time = ZonedDateTime.now();
- timeSet = false;
+ else {
+ enqueuedTime = ZonedDateTime.now();
+ }
+
+ String fullyQualifiedNamespace = "";
+ String eventHubName = "";
+ String partitionId = "";
+ String consumerGroup = "";
+ if (!parsedEvent.partitionCtx().isStub()) {
+ fullyQualifiedNamespace = String
+ .valueOf(parsedEvent.partitionCtx().asMap().getOrDefault("FullyQualifiedNamespace", ""));
+ eventHubName = String.valueOf(parsedEvent.partitionCtx().asMap().getOrDefault("EventHubName", ""));
+ partitionId = String.valueOf(parsedEvent.partitionCtx().asMap().getOrDefault("PartitionId", ""));
+ consumerGroup = String.valueOf(parsedEvent.partitionCtx().asMap().getOrDefault("ConsumerGroup", ""));
}
+ final SDElement sdPartition = new SDElement("aer_02_partition@48577")
+ .addSDParam("fully_qualified_namespace", fullyQualifiedNamespace)
+ .addSDParam("eventhub_name", eventHubName)
+ .addSDParam("partition_id", partitionId)
+ .addSDParam("consumer_group", consumerGroup);
+
final SDElement sdId = new SDElement("event_id@48577")
.addSDParam("uuid", UUID.randomUUID().toString())
.addSDParam("hostname", realHostname)
.addSDParam("unixtime", Instant.now().toString())
.addSDParam("id_source", "aer_02");
- final SDElement sdPartition = new SDElement("aer_02_partition@48577")
- .addSDParam(
- "fully_qualified_namespace",
- String.valueOf(parsedEvent.partitionContext().getOrDefault("FullyQualifiedNamespace", ""))
- )
- .addSDParam(
- "eventhub_name", String.valueOf(parsedEvent.partitionContext().getOrDefault("EventHubName", ""))
- )
- .addSDParam("partition_id", String.valueOf(parsedEvent.partitionContext().getOrDefault("PartitionId", ""))).addSDParam("consumer_group", String.valueOf(parsedEvent.partitionContext().getOrDefault("ConsumerGroup", "")));
+ String partitionKey = "";
+ if (!parsedEvent.systemProperties().isStub()) {
+ partitionKey = String.valueOf(parsedEvent.systemProperties().asMap().getOrDefault("PartitionKey", ""));
+ }
- final String partitionKey = String.valueOf(parsedEvent.systemProperties().getOrDefault("PartitionKey", ""));
+ String offset = "";
+ if (!parsedEvent.offset().isStub()) {
+ offset = parsedEvent.offset().value();
+ }
final SDElement sdEvent = new SDElement("aer_02_event@48577")
- .addSDParam("offset", parsedEvent.offset() == null ? "" : parsedEvent.offset())
- .addSDParam("enqueued_time", timeSet ? time.toInstant().toString() : "")
- .addSDParam("partition_key", partitionKey == null ? "" : partitionKey)
+ .addSDParam("offset", offset)
+ .addSDParam(
+ "enqueued_time",
+ parsedEvent.enqueuedTimeUtc().isStub() ? "" : enqueuedTime.toInstant().toString()
+ )
+ .addSDParam("partition_key", partitionKey)
.addSDParam("properties", new PropertiesJson(parsedEvent.properties()).toJsonObject().toString());
final SDElement sdComponentInfo = new SDElement("aer_02@48577")
- .addSDParam("timestamp_source", timeSet ? "timeEnqueued" : "generated");
+ .addSDParam("timestamp_source", parsedEvent.enqueuedTimeUtc().isStub() ? "generated" : "timeEnqueued");
List records = new ArrayList<>();
if (parsedEvent.isJsonStructure()) {
@@ -127,9 +147,14 @@ public List syslogMessage(final ParsedEvent parsedEvent) {
records.add(parsedEvent.asString());
}
+ String seqNum = "";
+ if (!parsedEvent.systemProperties().isStub()) {
+ seqNum = String.valueOf(parsedEvent.systemProperties().asMap().getOrDefault("SequenceNumber", "0"));
+ }
+
for (final String record : records) {
syslogMessages
- .add(new SyslogMessage().withSeverity(Severity.INFORMATIONAL).withFacility(Facility.LOCAL0).withTimestamp(timeSet ? time.toInstant().toEpochMilli() : Instant.now().toEpochMilli()).withHostname(syslogHostname).withAppName(syslogAppname).withSDElement(sdId).withSDElement(sdPartition).withSDElement(sdEvent).withSDElement(sdComponentInfo).withMsgId(String.valueOf(parsedEvent.systemProperties().getOrDefault("SequenceNumber", "0"))).withMsg(record));
+ .add(new SyslogMessage().withSeverity(Severity.INFORMATIONAL).withFacility(Facility.LOCAL0).withTimestamp(!parsedEvent.enqueuedTimeUtc().isStub() ? enqueuedTime.toInstant().toEpochMilli() : Instant.now().toEpochMilli()).withHostname(syslogHostname).withAppName(syslogAppname).withSDElement(sdId).withSDElement(sdPartition).withSDElement(sdEvent).withSDElement(sdComponentInfo).withMsgId(seqNum).withMsg(record));
}
return syslogMessages;
diff --git a/src/main/java/com/teragrep/aer_02/plugin/ParsedEventWithException.java b/src/main/java/com/teragrep/aer_02/plugin/ParsedEventWithException.java
index efd8ae4..c9380fd 100644
--- a/src/main/java/com/teragrep/aer_02/plugin/ParsedEventWithException.java
+++ b/src/main/java/com/teragrep/aer_02/plugin/ParsedEventWithException.java
@@ -46,7 +46,12 @@
package com.teragrep.aer_02.plugin;
import com.teragrep.akv_01.event.ParsedEvent;
-import com.teragrep.akv_01.time.EnqueuedTime;
+import com.teragrep.akv_01.event.metadata.offset.EventOffset;
+import com.teragrep.akv_01.event.metadata.partitionContext.EventPartitionContext;
+import com.teragrep.akv_01.event.metadata.properties.EventProperties;
+import com.teragrep.akv_01.event.metadata.properties.EventPropertiesImpl;
+import com.teragrep.akv_01.event.metadata.systemProperties.EventSystemProperties;
+import com.teragrep.akv_01.event.metadata.time.EnqueuedTime;
import jakarta.json.JsonStructure;
import java.util.HashMap;
@@ -83,29 +88,37 @@ public String resourceId() {
}
@Override
- public Map partitionContext() {
- return parsedEvent.partitionContext();
+ public EventPartitionContext partitionCtx() {
+ return parsedEvent.partitionCtx();
}
@Override
- public Map properties() {
- final Map props = new HashMap<>(parsedEvent.properties());
+ public String payload() {
+ return parsedEvent.payload();
+ }
+
+ @Override
+ public EventProperties properties() {
+ final Map props = new HashMap<>();
+ if (!parsedEvent.properties().isStub()) {
+ props.putAll(parsedEvent.properties().asMap());
+ }
props.put("aer-02-exception", exception);
- return props;
+ return new EventPropertiesImpl(props);
}
@Override
- public Map systemProperties() {
+ public EventSystemProperties systemProperties() {
return parsedEvent.systemProperties();
}
@Override
- public EnqueuedTime enqueuedTime() {
- return parsedEvent.enqueuedTime();
+ public EnqueuedTime enqueuedTimeUtc() {
+ return parsedEvent.enqueuedTimeUtc();
}
@Override
- public String offset() {
+ public EventOffset offset() {
return parsedEvent.offset();
}
}
diff --git a/src/test/java/com/teragrep/aer_02/EventDataConsumerTest.java b/src/test/java/com/teragrep/aer_02/EventDataConsumerTest.java
index 66e9fc7..af9f290 100644
--- a/src/test/java/com/teragrep/aer_02/EventDataConsumerTest.java
+++ b/src/test/java/com/teragrep/aer_02/EventDataConsumerTest.java
@@ -50,8 +50,14 @@
import com.teragrep.aer_02.fakes.OutputFake;
import com.teragrep.aer_02.plugin.DefaultPluginFactory;
import com.teragrep.aer_02.plugin.WrappedPluginFactoryWithConfig;
-import com.teragrep.akv_01.event.EventImpl;
import com.teragrep.akv_01.event.ParsedEvent;
+import com.teragrep.akv_01.event.ParsedEventFactory;
+import com.teragrep.akv_01.event.UnparsedEventImpl;
+import com.teragrep.akv_01.event.metadata.offset.EventOffsetImpl;
+import com.teragrep.akv_01.event.metadata.partitionContext.EventPartitionContextImpl;
+import com.teragrep.akv_01.event.metadata.properties.EventPropertiesImpl;
+import com.teragrep.akv_01.event.metadata.systemProperties.EventSystemPropertiesImpl;
+import com.teragrep.akv_01.event.metadata.time.EnqueuedTimeImpl;
import com.teragrep.akv_01.plugin.PluginFactoryConfigImpl;
import com.teragrep.nlf_01.NLFPluginFactory;
import jakarta.json.Json;
@@ -109,7 +115,18 @@ public void testLatencyMetric() {
}
String enqueuedTime = LocalDateTime.now(ZoneId.of("Z")).minusSeconds(10).toString();
parsedEvents
- .add(new EventImpl("event", new HashMap<>(partitionContext), props, systemProps, enqueuedTime, String.valueOf(i)).parsedEvent());
+ .add(
+ new ParsedEventFactory(
+ new UnparsedEventImpl(
+ "event",
+ new EventPartitionContextImpl(new HashMap<>(partitionContext)),
+ new EventPropertiesImpl(props),
+ new EventSystemPropertiesImpl(systemProps),
+ new EnqueuedTimeImpl(enqueuedTime),
+ new EventOffsetImpl(String.valueOf(i))
+ )
+ ).parsedEvent()
+ );
}
eventDataConsumer.accept(parsedEvents);
diff --git a/src/test/java/com/teragrep/aer_02/EventDataConsumerTlsTest.java b/src/test/java/com/teragrep/aer_02/EventDataConsumerTlsTest.java
index 4db7acd..6967d9b 100644
--- a/src/test/java/com/teragrep/aer_02/EventDataConsumerTlsTest.java
+++ b/src/test/java/com/teragrep/aer_02/EventDataConsumerTlsTest.java
@@ -53,8 +53,14 @@
import com.teragrep.aer_02.fakes.SystemPropsFake;
import com.teragrep.aer_02.plugin.DefaultPluginFactory;
import com.teragrep.aer_02.plugin.WrappedPluginFactoryWithConfig;
-import com.teragrep.akv_01.event.EventImpl;
import com.teragrep.akv_01.event.ParsedEvent;
+import com.teragrep.akv_01.event.ParsedEventFactory;
+import com.teragrep.akv_01.event.UnparsedEventImpl;
+import com.teragrep.akv_01.event.metadata.offset.EventOffsetImpl;
+import com.teragrep.akv_01.event.metadata.partitionContext.EventPartitionContextImpl;
+import com.teragrep.akv_01.event.metadata.properties.EventPropertiesImpl;
+import com.teragrep.akv_01.event.metadata.systemProperties.EventSystemPropertiesImpl;
+import com.teragrep.akv_01.event.metadata.time.EnqueuedTimeImpl;
import com.teragrep.akv_01.plugin.PluginFactoryConfigImpl;
import com.teragrep.net_01.channel.socket.TLSFactory;
import com.teragrep.net_01.eventloop.EventLoop;
@@ -220,9 +226,10 @@ public boolean isStub() {
List enqueuedArray = Arrays.asList("2010-01-01T00:00:00", "2010-01-02T00:00:00", "2010-01-03T00:00:00");
List parsedEvents = new ArrayList<>();
+
for (int i = 0; i < eventDatas.size(); i++) {
parsedEvents
- .add(new EventImpl(eventDatas.get(i), pcf.asMap(), propsArray[i], sysPropsArray[i], enqueuedArray.get(i), offsets.get(i)).parsedEvent());
+ .add(new ParsedEventFactory(new UnparsedEventImpl(eventDatas.get(i), new EventPartitionContextImpl(pcf.asMap()), new EventPropertiesImpl(propsArray[i]), new EventSystemPropertiesImpl(sysPropsArray[i]), new EnqueuedTimeImpl(enqueuedArray.get(i)), new EventOffsetImpl(offsets.get(i)))).parsedEvent());
}
edc.accept(parsedEvents);
diff --git a/src/test/java/com/teragrep/aer_02/plugin/DefaultPluginFactoryTest.java b/src/test/java/com/teragrep/aer_02/plugin/DefaultPluginFactoryTest.java
index 9ea6239..cbfc156 100644
--- a/src/test/java/com/teragrep/aer_02/plugin/DefaultPluginFactoryTest.java
+++ b/src/test/java/com/teragrep/aer_02/plugin/DefaultPluginFactoryTest.java
@@ -46,7 +46,13 @@
package com.teragrep.aer_02.plugin;
import com.teragrep.aer_02.SyslogBridge;
-import com.teragrep.akv_01.event.EventImpl;
+import com.teragrep.akv_01.event.ParsedEventFactory;
+import com.teragrep.akv_01.event.UnparsedEventImpl;
+import com.teragrep.akv_01.event.metadata.offset.EventOffsetImpl;
+import com.teragrep.akv_01.event.metadata.partitionContext.EventPartitionContextImpl;
+import com.teragrep.akv_01.event.metadata.properties.EventPropertiesImpl;
+import com.teragrep.akv_01.event.metadata.systemProperties.EventSystemPropertiesImpl;
+import com.teragrep.akv_01.event.metadata.time.EnqueuedTimeImpl;
import com.teragrep.akv_01.plugin.Plugin;
import com.teragrep.akv_01.plugin.PluginFactory;
import com.teragrep.akv_01.plugin.PluginFactoryInitialization;
@@ -71,7 +77,16 @@ void testPluginFactoryCreateObject() {
.assertDoesNotThrow(
() -> plugin
.syslogMessage(
- new EventImpl("msg", new HashMap<>(), new HashMap<>(), new HashMap<>(), now, "").parsedEvent()
+ new ParsedEventFactory(
+ new UnparsedEventImpl(
+ "msg",
+ new EventPartitionContextImpl(new HashMap<>()),
+ new EventPropertiesImpl(new HashMap<>()),
+ new EventSystemPropertiesImpl(new HashMap<>()),
+ new EnqueuedTimeImpl(now),
+ new EventOffsetImpl("")
+ )
+ ).parsedEvent()
)
);
Assertions.assertEquals(DefaultPlugin.class, plugin.getClass());
diff --git a/src/test/java/com/teragrep/aer_02/plugin/DefaultPluginTest.java b/src/test/java/com/teragrep/aer_02/plugin/DefaultPluginTest.java
index 056c187..59be714 100644
--- a/src/test/java/com/teragrep/aer_02/plugin/DefaultPluginTest.java
+++ b/src/test/java/com/teragrep/aer_02/plugin/DefaultPluginTest.java
@@ -45,7 +45,13 @@
*/
package com.teragrep.aer_02.plugin;
-import com.teragrep.akv_01.event.EventImpl;
+import com.teragrep.akv_01.event.ParsedEventFactory;
+import com.teragrep.akv_01.event.UnparsedEventImpl;
+import com.teragrep.akv_01.event.metadata.offset.EventOffsetImpl;
+import com.teragrep.akv_01.event.metadata.partitionContext.EventPartitionContextImpl;
+import com.teragrep.akv_01.event.metadata.properties.EventPropertiesImpl;
+import com.teragrep.akv_01.event.metadata.systemProperties.EventSystemPropertiesImpl;
+import com.teragrep.akv_01.event.metadata.time.EnqueuedTimeImpl;
import com.teragrep.rlo_14.SyslogMessage;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.jupiter.api.Assertions;
@@ -72,7 +78,11 @@ void testDefaultPluginCreateSyslogMessage() {
final DefaultPlugin defaultPlugin = new DefaultPlugin("realHostname", "syslogHostname", "syslogAppname");
final List msg = defaultPlugin
- .syslogMessage(new EventImpl("event", partitionContext, props, systemProps, enqueuedTime, "0").parsedEvent());
+ .syslogMessage(
+ new ParsedEventFactory(
+ new UnparsedEventImpl("event", new EventPartitionContextImpl(partitionContext), new EventPropertiesImpl(props), new EventSystemPropertiesImpl(systemProps), new EnqueuedTimeImpl(enqueuedTime), new EventOffsetImpl("0"))
+ ).parsedEvent()
+ );
Assertions.assertEquals("event", msg.get(0).getMsg());
// aer_02@48577; aer_02_event@48577; aer_02_partition@48577; event_id@48577