Skip to content

Commit

Permalink
Issue #75: Integrate new akv_01 with metadata wrapper objects (#76)
Browse files Browse the repository at this point in the history
* add isStub() checks to DefaultPlugin; fix tests to use ParsedEventFactory with UnparsedEventImpl and metadata wrapper objects; fix ParsedEventWithException to match the new interface; update nlf_01 to 2.3.0; update akv_01 to 6.3.0

* initialize RealHostname in ctor; change time variable to enqueuedTime

* run RealHostname in ctor
  • Loading branch information
eemhu authored Feb 17, 2025
1 parent 1cdb9b0 commit 368acab
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 45 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@
<dependency>
<groupId>com.teragrep</groupId>
<artifactId>nlf_01</artifactId>
<version>2.2.0</version>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.teragrep</groupId>
<artifactId>akv_01</artifactId>
<version>6.0.0</version>
<version>6.3.0</version>
</dependency>
<!-- Azure functions -->
<dependency>
Expand Down
75 changes: 50 additions & 25 deletions src/main/java/com/teragrep/aer_02/plugin/DefaultPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.teragrep.akv_01.event.ParsedEvent;
import com.teragrep.akv_01.plugin.Plugin;
import com.teragrep.nlf_01.PropertiesJson;
import com.teragrep.nlf_01.util.RealHostname;
import com.teragrep.rlo_14.Facility;
import com.teragrep.rlo_14.SDElement;
import com.teragrep.rlo_14.Severity;
Expand All @@ -57,7 +58,6 @@

import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.format.DateTimeParseException;
import java.util.*;

public final class DefaultPlugin implements Plugin {
Expand All @@ -67,51 +67,71 @@ public final class DefaultPlugin implements Plugin {
private final String syslogAppname;

public DefaultPlugin(final String realHostname, final String syslogHostname, final String syslogAppname) {
this.realHostname = realHostname;
this(new RealHostname(realHostname), syslogHostname, syslogAppname);
}

public DefaultPlugin(final RealHostname realHostname, final String syslogHostname, final String syslogAppname) {
this.realHostname = realHostname.hostname();
this.syslogHostname = syslogHostname;
this.syslogAppname = syslogAppname;
}

@Override
public List<SyslogMessage> syslogMessage(final ParsedEvent parsedEvent) {
final List<SyslogMessage> syslogMessages = new ArrayList<>();
ZonedDateTime time;
boolean timeSet;
try {
time = parsedEvent.enqueuedTime().zonedDateTime();
timeSet = true;
ZonedDateTime enqueuedTime;
if (!parsedEvent.enqueuedTimeUtc().isStub()) {
enqueuedTime = parsedEvent.enqueuedTimeUtc().zonedDateTime();
}
catch (DateTimeParseException ignored) {
time = ZonedDateTime.now();
timeSet = false;
else {
enqueuedTime = ZonedDateTime.now();
}

String fullyQualifiedNamespace = "";
String eventHubName = "";
String partitionId = "";
String consumerGroup = "";
if (!parsedEvent.partitionCtx().isStub()) {
fullyQualifiedNamespace = String
.valueOf(parsedEvent.partitionCtx().asMap().getOrDefault("FullyQualifiedNamespace", ""));
eventHubName = String.valueOf(parsedEvent.partitionCtx().asMap().getOrDefault("EventHubName", ""));
partitionId = String.valueOf(parsedEvent.partitionCtx().asMap().getOrDefault("PartitionId", ""));
consumerGroup = String.valueOf(parsedEvent.partitionCtx().asMap().getOrDefault("ConsumerGroup", ""));
}

final SDElement sdPartition = new SDElement("aer_02_partition@48577")
.addSDParam("fully_qualified_namespace", fullyQualifiedNamespace)
.addSDParam("eventhub_name", eventHubName)
.addSDParam("partition_id", partitionId)
.addSDParam("consumer_group", consumerGroup);

final SDElement sdId = new SDElement("event_id@48577")
.addSDParam("uuid", UUID.randomUUID().toString())
.addSDParam("hostname", realHostname)
.addSDParam("unixtime", Instant.now().toString())
.addSDParam("id_source", "aer_02");

final SDElement sdPartition = new SDElement("aer_02_partition@48577")
.addSDParam(
"fully_qualified_namespace",
String.valueOf(parsedEvent.partitionContext().getOrDefault("FullyQualifiedNamespace", ""))
)
.addSDParam(
"eventhub_name", String.valueOf(parsedEvent.partitionContext().getOrDefault("EventHubName", ""))
)
.addSDParam("partition_id", String.valueOf(parsedEvent.partitionContext().getOrDefault("PartitionId", ""))).addSDParam("consumer_group", String.valueOf(parsedEvent.partitionContext().getOrDefault("ConsumerGroup", "")));
String partitionKey = "";
if (!parsedEvent.systemProperties().isStub()) {
partitionKey = String.valueOf(parsedEvent.systemProperties().asMap().getOrDefault("PartitionKey", ""));
}

final String partitionKey = String.valueOf(parsedEvent.systemProperties().getOrDefault("PartitionKey", ""));
String offset = "";
if (!parsedEvent.offset().isStub()) {
offset = parsedEvent.offset().value();
}

final SDElement sdEvent = new SDElement("aer_02_event@48577")
.addSDParam("offset", parsedEvent.offset() == null ? "" : parsedEvent.offset())
.addSDParam("enqueued_time", timeSet ? time.toInstant().toString() : "")
.addSDParam("partition_key", partitionKey == null ? "" : partitionKey)
.addSDParam("offset", offset)
.addSDParam(
"enqueued_time",
parsedEvent.enqueuedTimeUtc().isStub() ? "" : enqueuedTime.toInstant().toString()
)
.addSDParam("partition_key", partitionKey)
.addSDParam("properties", new PropertiesJson(parsedEvent.properties()).toJsonObject().toString());

final SDElement sdComponentInfo = new SDElement("aer_02@48577")
.addSDParam("timestamp_source", timeSet ? "timeEnqueued" : "generated");
.addSDParam("timestamp_source", parsedEvent.enqueuedTimeUtc().isStub() ? "generated" : "timeEnqueued");

List<String> records = new ArrayList<>();
if (parsedEvent.isJsonStructure()) {
Expand All @@ -127,9 +147,14 @@ public List<SyslogMessage> syslogMessage(final ParsedEvent parsedEvent) {
records.add(parsedEvent.asString());
}

String seqNum = "";
if (!parsedEvent.systemProperties().isStub()) {
seqNum = String.valueOf(parsedEvent.systemProperties().asMap().getOrDefault("SequenceNumber", "0"));
}

for (final String record : records) {
syslogMessages
.add(new SyslogMessage().withSeverity(Severity.INFORMATIONAL).withFacility(Facility.LOCAL0).withTimestamp(timeSet ? time.toInstant().toEpochMilli() : Instant.now().toEpochMilli()).withHostname(syslogHostname).withAppName(syslogAppname).withSDElement(sdId).withSDElement(sdPartition).withSDElement(sdEvent).withSDElement(sdComponentInfo).withMsgId(String.valueOf(parsedEvent.systemProperties().getOrDefault("SequenceNumber", "0"))).withMsg(record));
.add(new SyslogMessage().withSeverity(Severity.INFORMATIONAL).withFacility(Facility.LOCAL0).withTimestamp(!parsedEvent.enqueuedTimeUtc().isStub() ? enqueuedTime.toInstant().toEpochMilli() : Instant.now().toEpochMilli()).withHostname(syslogHostname).withAppName(syslogAppname).withSDElement(sdId).withSDElement(sdPartition).withSDElement(sdEvent).withSDElement(sdComponentInfo).withMsgId(seqNum).withMsg(record));
}

return syslogMessages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@
package com.teragrep.aer_02.plugin;

import com.teragrep.akv_01.event.ParsedEvent;
import com.teragrep.akv_01.time.EnqueuedTime;
import com.teragrep.akv_01.event.metadata.offset.EventOffset;
import com.teragrep.akv_01.event.metadata.partitionContext.EventPartitionContext;
import com.teragrep.akv_01.event.metadata.properties.EventProperties;
import com.teragrep.akv_01.event.metadata.properties.EventPropertiesImpl;
import com.teragrep.akv_01.event.metadata.systemProperties.EventSystemProperties;
import com.teragrep.akv_01.event.metadata.time.EnqueuedTime;
import jakarta.json.JsonStructure;

import java.util.HashMap;
Expand Down Expand Up @@ -83,29 +88,37 @@ public String resourceId() {
}

@Override
public Map<String, Object> partitionContext() {
return parsedEvent.partitionContext();
public EventPartitionContext partitionCtx() {
return parsedEvent.partitionCtx();
}

@Override
public Map<String, Object> properties() {
final Map<String, Object> props = new HashMap<>(parsedEvent.properties());
public String payload() {
return parsedEvent.payload();
}

@Override
public EventProperties properties() {
final Map<String, Object> props = new HashMap<>();
if (!parsedEvent.properties().isStub()) {
props.putAll(parsedEvent.properties().asMap());
}
props.put("aer-02-exception", exception);
return props;
return new EventPropertiesImpl(props);
}

@Override
public Map<String, Object> systemProperties() {
public EventSystemProperties systemProperties() {
return parsedEvent.systemProperties();
}

@Override
public EnqueuedTime enqueuedTime() {
return parsedEvent.enqueuedTime();
public EnqueuedTime enqueuedTimeUtc() {
return parsedEvent.enqueuedTimeUtc();
}

@Override
public String offset() {
public EventOffset offset() {
return parsedEvent.offset();
}
}
21 changes: 19 additions & 2 deletions src/test/java/com/teragrep/aer_02/EventDataConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,14 @@
import com.teragrep.aer_02.fakes.OutputFake;
import com.teragrep.aer_02.plugin.DefaultPluginFactory;
import com.teragrep.aer_02.plugin.WrappedPluginFactoryWithConfig;
import com.teragrep.akv_01.event.EventImpl;
import com.teragrep.akv_01.event.ParsedEvent;
import com.teragrep.akv_01.event.ParsedEventFactory;
import com.teragrep.akv_01.event.UnparsedEventImpl;
import com.teragrep.akv_01.event.metadata.offset.EventOffsetImpl;
import com.teragrep.akv_01.event.metadata.partitionContext.EventPartitionContextImpl;
import com.teragrep.akv_01.event.metadata.properties.EventPropertiesImpl;
import com.teragrep.akv_01.event.metadata.systemProperties.EventSystemPropertiesImpl;
import com.teragrep.akv_01.event.metadata.time.EnqueuedTimeImpl;
import com.teragrep.akv_01.plugin.PluginFactoryConfigImpl;
import com.teragrep.nlf_01.NLFPluginFactory;
import jakarta.json.Json;
Expand Down Expand Up @@ -109,7 +115,18 @@ public void testLatencyMetric() {
}
String enqueuedTime = LocalDateTime.now(ZoneId.of("Z")).minusSeconds(10).toString();
parsedEvents
.add(new EventImpl("event", new HashMap<>(partitionContext), props, systemProps, enqueuedTime, String.valueOf(i)).parsedEvent());
.add(
new ParsedEventFactory(
new UnparsedEventImpl(
"event",
new EventPartitionContextImpl(new HashMap<>(partitionContext)),
new EventPropertiesImpl(props),
new EventSystemPropertiesImpl(systemProps),
new EnqueuedTimeImpl(enqueuedTime),
new EventOffsetImpl(String.valueOf(i))
)
).parsedEvent()
);
}
eventDataConsumer.accept(parsedEvents);

Expand Down
11 changes: 9 additions & 2 deletions src/test/java/com/teragrep/aer_02/EventDataConsumerTlsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,14 @@
import com.teragrep.aer_02.fakes.SystemPropsFake;
import com.teragrep.aer_02.plugin.DefaultPluginFactory;
import com.teragrep.aer_02.plugin.WrappedPluginFactoryWithConfig;
import com.teragrep.akv_01.event.EventImpl;
import com.teragrep.akv_01.event.ParsedEvent;
import com.teragrep.akv_01.event.ParsedEventFactory;
import com.teragrep.akv_01.event.UnparsedEventImpl;
import com.teragrep.akv_01.event.metadata.offset.EventOffsetImpl;
import com.teragrep.akv_01.event.metadata.partitionContext.EventPartitionContextImpl;
import com.teragrep.akv_01.event.metadata.properties.EventPropertiesImpl;
import com.teragrep.akv_01.event.metadata.systemProperties.EventSystemPropertiesImpl;
import com.teragrep.akv_01.event.metadata.time.EnqueuedTimeImpl;
import com.teragrep.akv_01.plugin.PluginFactoryConfigImpl;
import com.teragrep.net_01.channel.socket.TLSFactory;
import com.teragrep.net_01.eventloop.EventLoop;
Expand Down Expand Up @@ -220,9 +226,10 @@ public boolean isStub() {
List<String> enqueuedArray = Arrays.asList("2010-01-01T00:00:00", "2010-01-02T00:00:00", "2010-01-03T00:00:00");

List<ParsedEvent> parsedEvents = new ArrayList<>();

for (int i = 0; i < eventDatas.size(); i++) {
parsedEvents
.add(new EventImpl(eventDatas.get(i), pcf.asMap(), propsArray[i], sysPropsArray[i], enqueuedArray.get(i), offsets.get(i)).parsedEvent());
.add(new ParsedEventFactory(new UnparsedEventImpl(eventDatas.get(i), new EventPartitionContextImpl(pcf.asMap()), new EventPropertiesImpl(propsArray[i]), new EventSystemPropertiesImpl(sysPropsArray[i]), new EnqueuedTimeImpl(enqueuedArray.get(i)), new EventOffsetImpl(offsets.get(i)))).parsedEvent());
}

edc.accept(parsedEvents);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@
package com.teragrep.aer_02.plugin;

import com.teragrep.aer_02.SyslogBridge;
import com.teragrep.akv_01.event.EventImpl;
import com.teragrep.akv_01.event.ParsedEventFactory;
import com.teragrep.akv_01.event.UnparsedEventImpl;
import com.teragrep.akv_01.event.metadata.offset.EventOffsetImpl;
import com.teragrep.akv_01.event.metadata.partitionContext.EventPartitionContextImpl;
import com.teragrep.akv_01.event.metadata.properties.EventPropertiesImpl;
import com.teragrep.akv_01.event.metadata.systemProperties.EventSystemPropertiesImpl;
import com.teragrep.akv_01.event.metadata.time.EnqueuedTimeImpl;
import com.teragrep.akv_01.plugin.Plugin;
import com.teragrep.akv_01.plugin.PluginFactory;
import com.teragrep.akv_01.plugin.PluginFactoryInitialization;
Expand All @@ -71,7 +77,16 @@ void testPluginFactoryCreateObject() {
.assertDoesNotThrow(
() -> plugin
.syslogMessage(
new EventImpl("msg", new HashMap<>(), new HashMap<>(), new HashMap<>(), now, "").parsedEvent()
new ParsedEventFactory(
new UnparsedEventImpl(
"msg",
new EventPartitionContextImpl(new HashMap<>()),
new EventPropertiesImpl(new HashMap<>()),
new EventSystemPropertiesImpl(new HashMap<>()),
new EnqueuedTimeImpl(now),
new EventOffsetImpl("")
)
).parsedEvent()
)
);
Assertions.assertEquals(DefaultPlugin.class, plugin.getClass());
Expand Down
14 changes: 12 additions & 2 deletions src/test/java/com/teragrep/aer_02/plugin/DefaultPluginTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@
*/
package com.teragrep.aer_02.plugin;

import com.teragrep.akv_01.event.EventImpl;
import com.teragrep.akv_01.event.ParsedEventFactory;
import com.teragrep.akv_01.event.UnparsedEventImpl;
import com.teragrep.akv_01.event.metadata.offset.EventOffsetImpl;
import com.teragrep.akv_01.event.metadata.partitionContext.EventPartitionContextImpl;
import com.teragrep.akv_01.event.metadata.properties.EventPropertiesImpl;
import com.teragrep.akv_01.event.metadata.systemProperties.EventSystemPropertiesImpl;
import com.teragrep.akv_01.event.metadata.time.EnqueuedTimeImpl;
import com.teragrep.rlo_14.SyslogMessage;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.junit.jupiter.api.Assertions;
Expand All @@ -72,7 +78,11 @@ void testDefaultPluginCreateSyslogMessage() {

final DefaultPlugin defaultPlugin = new DefaultPlugin("realHostname", "syslogHostname", "syslogAppname");
final List<SyslogMessage> msg = defaultPlugin
.syslogMessage(new EventImpl("event", partitionContext, props, systemProps, enqueuedTime, "0").parsedEvent());
.syslogMessage(
new ParsedEventFactory(
new UnparsedEventImpl("event", new EventPartitionContextImpl(partitionContext), new EventPropertiesImpl(props), new EventSystemPropertiesImpl(systemProps), new EnqueuedTimeImpl(enqueuedTime), new EventOffsetImpl("0"))
).parsedEvent()
);

Assertions.assertEquals("event", msg.get(0).getMsg());
// aer_02@48577; aer_02_event@48577; aer_02_partition@48577; event_id@48577
Expand Down

0 comments on commit 368acab

Please sign in to comment.