From 8782155ae097278cd9a650bd4a6bda063e83888e Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Mon, 27 Nov 2023 04:35:59 +0000 Subject: [PATCH] Rebased to latest Signed-off-by: Krishna Kondaka --- .../kafka/buffer/KafkaBufferOTelIT.java | 397 +++++++++++++++--- 1 file changed, 344 insertions(+), 53 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferOTelIT.java b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferOTelIT.java index 32c4ef6cd2..11ce71482e 100644 --- a/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferOTelIT.java +++ b/data-prepper-plugins/kafka-plugins/src/integrationTest/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferOTelIT.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.kafka.buffer; +import com.google.protobuf.ByteString; +import org.apache.commons.codec.binary.Hex; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -24,6 +26,8 @@ import org.opensearch.dataprepper.model.metric.JacksonGauge; import org.opensearch.dataprepper.model.metric.JacksonSum; import org.opensearch.dataprepper.model.metric.JacksonHistogram; +import org.opensearch.dataprepper.model.log.OpenTelemetryLog; +import org.opensearch.dataprepper.model.trace.Span; import org.opensearch.dataprepper.plugins.kafka.configuration.EncryptionConfig; import org.opensearch.dataprepper.model.buffer.DelegatingBuffer; import org.opensearch.dataprepper.model.buffer.Buffer; @@ -31,36 +35,55 @@ import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest; +import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest; +import io.opentelemetry.proto.common.v1.AnyValue; +import io.opentelemetry.proto.common.v1.KeyValue; +import io.opentelemetry.proto.logs.v1.LogRecord; +import io.opentelemetry.proto.logs.v1.ResourceLogs; +import io.opentelemetry.proto.logs.v1.ScopeLogs; +import io.opentelemetry.proto.resource.v1.Resource; +import io.opentelemetry.proto.trace.v1.ScopeSpans; +import io.opentelemetry.proto.trace.v1.ResourceSpans; +import io.opentelemetry.proto.trace.v1.InstrumentationLibrarySpans; +import io.opentelemetry.proto.common.v1.InstrumentationLibrary; +import io.opentelemetry.proto.common.v1.InstrumentationScope; +import io.opentelemetry.proto.metrics.v1.Gauge; +import io.opentelemetry.proto.metrics.v1.Sum; +import io.opentelemetry.proto.metrics.v1.Histogram; +import io.opentelemetry.proto.metrics.v1.ScopeMetrics; +import io.opentelemetry.proto.metrics.v1.ResourceMetrics; +import io.opentelemetry.proto.metrics.v1.NumberDataPoint; +import io.opentelemetry.proto.metrics.v1.HistogramDataPoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; +import java.util.Random; import java.util.UUID; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.BufferedReader; import org.opensearch.dataprepper.plugins.otel.codec.OTelMetricDecoder; -import org.opensearch.dataprepper.plugins.source.otelmetrics.OTelMetricsGrpcService; -import com.google.protobuf.util.JsonFormat; +import org.opensearch.dataprepper.plugins.otel.codec.OTelTraceDecoder; +import org.opensearch.dataprepper.plugins.otel.codec.OTelLogsDecoder; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class KafkaBufferOTelIT { - private static final String TEST_REQUEST_MULTIPLE_METRICS_FILE = "test-request-multiple-metrics.json"; private static final Logger LOG = LoggerFactory.getLogger(KafkaBufferIT.class); + private static long TIME_DELTA = 3600L; @Mock private PluginSetting pluginSetting; @Mock @@ -76,7 +99,36 @@ public class KafkaBufferOTelIT { private PluginMetrics pluginMetrics; private String bootstrapServersCommaDelimited; - private OTelMetricsGrpcService oTelMetricsGrpcService; + private Random random; + private String logsRegion; + private String logsServiceName; + private int logsSeverityValue; + private String logsSeverityText; + private String logsBodyValue; + private long currentUnixTimeNano; + private Instant currentTime; + private int logsDroppedAttributesCount; + private String logsSchemaUrl; + + private String TraceId; + private String SpanId; + + private String traceServiceName; + private int traceDroppedAttributesCount; + private String TraceId2; + private String SpanId2; + private String scopeSpanName; + private String scopeName; + private String scopeVersion; + private String ilSpanName; + private String ilName; + private String ilVersion; + + private int metricsCountValue; + private int metricsSumValue; + private List histogramBucketCounts; + private List histogramExplicitBounds; + class KafkaDelegatingBuffer extends DelegatingBuffer { KafkaDelegatingBuffer(Buffer buffer) { super(buffer); @@ -85,6 +137,20 @@ class KafkaDelegatingBuffer extends DelegatingBuffer { @BeforeEach void setUp() { + random = new Random(); + currentTime = Instant.now(); + currentUnixTimeNano = ((long)currentTime.getEpochSecond() * 1000_000_000L) + currentTime.getNano(); + + TraceId = RandomStringUtils.randomNumeric(20); + SpanId = RandomStringUtils.randomNumeric(10); + TraceId2 = RandomStringUtils.randomNumeric(20); + SpanId2 = RandomStringUtils.randomNumeric(10); + scopeSpanName = RandomStringUtils.randomAlphabetic(10); + scopeName = RandomStringUtils.randomAlphabetic(10); + ilSpanName = RandomStringUtils.randomAlphabetic(10); + ilName = RandomStringUtils.randomAlphabetic(10); + scopeVersion = RandomStringUtils.randomNumeric(2); + ilVersion = RandomStringUtils.randomNumeric(2); pluginMetrics = PluginMetrics.fromNames(UUID.randomUUID().toString(), UUID.randomUUID().toString()); when(pluginSetting.getPipelineName()).thenReturn(UUID.randomUUID().toString()); @@ -114,64 +180,289 @@ void setUp() { when(kafkaBufferConfig.getEncryptionConfig()).thenReturn(encryptionConfig); } - private String getFileAsJsonString(String requestJsonFileName) throws IOException { - final StringBuilder jsonBuilder = new StringBuilder(); - try (final InputStream inputStream = Objects.requireNonNull( - KafkaBufferOTelIT.class.getClassLoader().getResourceAsStream(requestJsonFileName))) { - final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); - bufferedReader.lines().forEach(jsonBuilder::append); + private ExportMetricsServiceRequest createExportMetricsServiceRequest() { + final Resource resource = Resource.newBuilder() + .addAttributes(KeyValue.newBuilder() + .setKey("attribute1") + .setValue(AnyValue.newBuilder().setStringValue("attribute1-value").build()) + ).build(); + + metricsCountValue = random.nextInt(1000); + metricsSumValue = random.nextInt(10000); + Long tmpSum = 0L; + histogramExplicitBounds = List.of(10.0, 20.0, 30.0, 40.0); + histogramBucketCounts = new ArrayList(); + for (int i = 0; i < histogramExplicitBounds.size(); i++) { + Long value = (long)random.nextInt((int)(metricsSumValue/6)); + tmpSum += value; + histogramBucketCounts.add(value); } - return jsonBuilder.toString(); + histogramBucketCounts.add(metricsSumValue-tmpSum); + ScopeMetrics scopeMetrics = ScopeMetrics.newBuilder() + .addMetrics(io.opentelemetry.proto.metrics.v1.Metric.newBuilder() + .setName("counter") + .setUnit("1") + .setGauge(Gauge.newBuilder() + .addDataPoints(NumberDataPoint.newBuilder() + .addAttributes(KeyValue.newBuilder() + .setKey("gauge-1") + .setValue(AnyValue.newBuilder() + .setStringValue("gauge-value-1") + .build())) + .setStartTimeUnixNano(currentUnixTimeNano) + .setAsInt(metricsCountValue) + .build()) + .build()) + .build()) + .addMetrics(io.opentelemetry.proto.metrics.v1.Metric.newBuilder() + .setName("sum") + .setUnit("1") + .setSum(Sum.newBuilder() + .addDataPoints(NumberDataPoint.newBuilder() + .addAttributes(KeyValue.newBuilder() + .setKey("sum-1") + .setValue(AnyValue.newBuilder() + .setStringValue("sum-value-1") + .build())) + .setStartTimeUnixNano(currentUnixTimeNano) + .setAsInt(metricsSumValue) + .build()) + .setAggregationTemporality(io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE) + .build()) + .build()) + .addMetrics(io.opentelemetry.proto.metrics.v1.Metric.newBuilder() + .setName("histogram") + .setUnit("1") + .setHistogram(Histogram.newBuilder() + .addDataPoints(HistogramDataPoint.newBuilder() + .addAttributes(KeyValue.newBuilder() + .setKey("histogram-1") + .setValue(AnyValue.newBuilder() + .setStringValue("histogram-value-1") + .build())) + .setStartTimeUnixNano(currentUnixTimeNano) + .setCount(metricsCountValue) + .setSum(metricsSumValue) + .addAllBucketCounts(histogramBucketCounts) + .addAllExplicitBounds(histogramExplicitBounds) + .build()) + .setAggregationTemporality(io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE) + .build()) + .build()) + .build(); + + ResourceMetrics resourceMetrics = ResourceMetrics.newBuilder() + .setResource(resource) + .addScopeMetrics(scopeMetrics) + .build(); + + return ExportMetricsServiceRequest.newBuilder() + .addResourceMetrics(resourceMetrics) + .build(); } - private ExportMetricsServiceRequest buildExportMetricsServiceRequestFromJsonFile(String requestJsonFileName) throws IOException { - final ExportMetricsServiceRequest.Builder builder = ExportMetricsServiceRequest.newBuilder(); - JsonFormat.parser().merge(getFileAsJsonString(requestJsonFileName), builder); - return builder.build(); + private void validateMetric(Event event) { + JacksonMetric metric = (JacksonMetric) (JacksonEvent)event; + String metricKind = metric.getKind(); + assertTrue(metricKind.equals(Metric.KIND.GAUGE.toString()) || + metricKind.equals(Metric.KIND.SUM.toString()) || + metricKind.equals(Metric.KIND.HISTOGRAM.toString())); + + if (metric.getKind().equals(Metric.KIND.GAUGE.toString())) { + assertThat(metric.getUnit(), equalTo("1")); + assertThat(metric.getName(), equalTo("counter")); + JacksonGauge gauge = (JacksonGauge)metric; + assertThat(gauge.getValue(), equalTo((double)metricsCountValue)); + } else if (metric.getKind().equals(Metric.KIND.SUM.toString())) { + assertThat(metric.getUnit(), equalTo("1")); + assertThat(metric.getName(), equalTo("sum")); + JacksonSum sum = (JacksonSum)metric; + assertThat(sum.getValue(), equalTo((double)metricsSumValue)); + } else { // Histogram + assertThat(metric.getUnit(), equalTo("1")); + assertThat(metric.getName(), equalTo("histogram")); + JacksonHistogram histogram = (JacksonHistogram)metric; + assertThat(histogram.getSum(), equalTo((double)metricsSumValue)); + assertThat(histogram.getCount(), equalTo((long)metricsCountValue)); + assertThat(histogram.getExemplars(), equalTo(Collections.emptyList())); + assertThat(histogram.getExplicitBoundsList(), equalTo(histogramExplicitBounds)); + assertThat(histogram.getExplicitBoundsCount(), equalTo(histogramExplicitBounds.size())); + assertThat(histogram.getBucketCountsList(), equalTo(histogramBucketCounts)); + assertThat(histogram.getBucketCount(), equalTo(histogramBucketCounts.size())); + assertThat(histogram.getAggregationTemporality(), equalTo("AGGREGATION_TEMPORALITY_CUMULATIVE")); + } } @Test void test_otel_metrics_with_kafka_buffer() throws Exception { KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, new OTelMetricDecoder(), null, null); buffer = new KafkaDelegatingBuffer(kafkaBuffer); - oTelMetricsGrpcService = new OTelMetricsGrpcService(10000, - buffer, - pluginMetrics); - - final ExportMetricsServiceRequest request = buildExportMetricsServiceRequestFromJsonFile(TEST_REQUEST_MULTIPLE_METRICS_FILE); - oTelMetricsGrpcService.rawExport(request); + final ExportMetricsServiceRequest request = createExportMetricsServiceRequest(); + buffer.writeBytes(request.toByteArray(), null, 10_000); Map.Entry>, CheckpointState> readResult = kafkaBuffer.read(10_000); assertThat(readResult, notNullValue()); assertThat(readResult.getKey(), notNullValue()); assertThat(readResult.getKey().size(), equalTo(3)); for (Record record : readResult.getKey()) { - Event event = record.getData(); - JacksonMetric metric = (JacksonMetric) (JacksonEvent)(Event)record.getData(); - if (metric.getKind().equals(Metric.KIND.GAUGE.toString())) { - assertThat(metric.getUnit(), equalTo("1")); - assertThat(metric.getName(), equalTo("counter-int")); - JacksonGauge gauge = (JacksonGauge)metric; - assertThat(gauge.getValue(), equalTo(123.0)); - } else if (metric.getKind().equals(Metric.KIND.SUM.toString())) { - assertThat(metric.getUnit(), equalTo("1")); - assertThat(metric.getName(), equalTo("sum-int")); - JacksonSum sum = (JacksonSum)metric; - assertThat(sum.getValue(), equalTo(456.0)); - } else if (metric.getKind().equals(Metric.KIND.HISTOGRAM.toString())) { - assertThat(metric.getUnit(), equalTo("1")); - assertThat(metric.getName(), equalTo("histogram-int")); - JacksonHistogram histogram = (JacksonHistogram)metric; - assertThat(histogram.getSum(), equalTo(100.0)); - assertThat(histogram.getCount(), equalTo(30L)); - assertThat(histogram.getExemplars(), equalTo(Collections.emptyList())); - assertThat(histogram.getExplicitBoundsList(), equalTo(List.of(1.0, 2.0, 3.0, 4.0))); - assertThat(histogram.getExplicitBoundsCount(), equalTo(4)); - assertThat(histogram.getBucketCountsList(), equalTo(List.of(3L, 5L, 15L, 6L, 1L))); - assertThat(histogram.getBucketCount(), equalTo(5)); - assertThat(histogram.getAggregationTemporality(), equalTo("AGGREGATION_TEMPORALITY_CUMULATIVE")); - } else { - assertTrue("FAILED".equals("Unknown Metric type")); - } + validateMetric(record.getData()); + } + } + + private ExportLogsServiceRequest createExportLogsRequest() { + logsServiceName = RandomStringUtils.randomAlphabetic(8); + + final Resource resource = Resource.newBuilder() + .addAttributes(KeyValue.newBuilder() + .setKey("service.name") + .setValue(AnyValue.newBuilder().setStringValue(logsServiceName).build()) + ).build(); + + logsSeverityValue = random.nextInt(100000); + logsSeverityText = RandomStringUtils.randomAlphabetic(8); + logsSchemaUrl = RandomStringUtils.randomAlphabetic(9); + logsBodyValue = RandomStringUtils.randomAlphabetic(10); + logsDroppedAttributesCount = random.nextInt(1000); + logsRegion = RandomStringUtils.randomNumeric(6); + final ResourceLogs resourceLogs = ResourceLogs.newBuilder() + .addScopeLogs(ScopeLogs.newBuilder() + .addLogRecords(LogRecord.newBuilder() + .setTimeUnixNano(currentUnixTimeNano) + .setObservedTimeUnixNano(currentUnixTimeNano+TIME_DELTA*1000_000_000) + .setSeverityNumberValue(logsSeverityValue) + .setSeverityText(logsSeverityText) + .setBody(AnyValue.newBuilder().setStringValue(logsBodyValue).build()) + .setDroppedAttributesCount(logsDroppedAttributesCount) + .setTraceId(ByteString.copyFrom(TraceId.getBytes())) + .setSpanId(ByteString.copyFrom(SpanId.getBytes())) + .addAttributes(KeyValue.newBuilder() + .setKey("statement.region") + .setValue(AnyValue.newBuilder().setStringValue(logsRegion).build()).build()) + .build())) + .setResource(resource) + .setSchemaUrl(logsSchemaUrl) + .build(); + + return ExportLogsServiceRequest.newBuilder() + .addResourceLogs(resourceLogs) + .build(); + } + + private void validateLog(OpenTelemetryLog logRecord) throws Exception { + assertThat(logRecord.getServiceName(), is(logsServiceName)); + assertThat(logRecord.getTime(), is(currentTime.toString())); + assertThat(logRecord.getObservedTime(), is((currentTime.plusSeconds(TIME_DELTA)).toString())); + assertThat(logRecord.getBody(), is(logsBodyValue)); + assertThat(logRecord.getDroppedAttributesCount(), is(logsDroppedAttributesCount)); + assertThat(logRecord.getSchemaUrl(), is(logsSchemaUrl)); + assertThat(logRecord.getSeverityNumber(), is(logsSeverityValue)); + assertThat(logRecord.getSeverityText(), is(logsSeverityText)); + + assertThat(new String(Hex.decodeHex(logRecord.getTraceId())), is(ByteString.copyFrom(TraceId.getBytes()).toStringUtf8())); + assertThat(new String(Hex.decodeHex(logRecord.getSpanId())), is(ByteString.copyFrom(SpanId.getBytes()).toStringUtf8())); + Map mergedAttributes = logRecord.getAttributes(); + assertThat(mergedAttributes.keySet().size(), is(2)); + assertThat(mergedAttributes.get("log.attributes.statement@region"), is(logsRegion)); + assertThat(mergedAttributes.get("resource.attributes.service@name"), is(logsServiceName)); + } + + @Test + void test_otel_logs_with_kafka_buffer() throws Exception { + KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, new OTelLogsDecoder(), null, null); + buffer = new KafkaDelegatingBuffer(kafkaBuffer); + final ExportLogsServiceRequest request = createExportLogsRequest(); + buffer.writeBytes(request.toByteArray(), null, 10_000); + Map.Entry>, CheckpointState> readResult = kafkaBuffer.read(10_000); + assertThat(readResult, notNullValue()); + assertThat(readResult.getKey(), notNullValue()); + assertThat(readResult.getKey().size(), equalTo(1)); + for (Record record : readResult.getKey()) { + validateLog((OpenTelemetryLog)record.getData()); + } + } + + private ExportTraceServiceRequest createExportTraceRequest() { + traceServiceName = RandomStringUtils.randomAlphabetic(8); + traceDroppedAttributesCount = random.nextInt(1000); + + final Resource resource = Resource.newBuilder() + .addAttributes(KeyValue.newBuilder() + .setKey("service.name") + .setValue(AnyValue.newBuilder().setStringValue(traceServiceName).build()) + ) + .setDroppedAttributesCount(traceDroppedAttributesCount) + .build(); + + final ScopeSpans scopeSpans = ScopeSpans.newBuilder() + .setScope(InstrumentationScope.newBuilder() + .setName(scopeName) + .setVersion(scopeVersion) + .build()) + .addSpans(io.opentelemetry.proto.trace.v1.Span.newBuilder() + .setTraceId(ByteString.copyFrom(TraceId.getBytes())) + .setSpanId(ByteString.copyFrom(SpanId.getBytes())) + .setKind(io.opentelemetry.proto.trace.v1.Span.SpanKind.SPAN_KIND_INTERNAL) + .setStartTimeUnixNano(currentUnixTimeNano) + .setName(scopeSpanName) + .setEndTimeUnixNano(currentUnixTimeNano+TIME_DELTA*1000_000_000) + .build()) + .build(); + + final InstrumentationLibrarySpans ilSpans = InstrumentationLibrarySpans.newBuilder() + .setInstrumentationLibrary(InstrumentationLibrary.newBuilder() + .setName(ilName) + .setVersion(ilVersion) + .build()) + .addSpans(io.opentelemetry.proto.trace.v1.Span.newBuilder() + .setTraceId(ByteString.copyFrom(TraceId2.getBytes())) + .setSpanId(ByteString.copyFrom(SpanId2.getBytes())) + .setKind(io.opentelemetry.proto.trace.v1.Span.SpanKind.SPAN_KIND_INTERNAL) + .setName(ilSpanName) + .setStartTimeUnixNano(currentUnixTimeNano) + .setEndTimeUnixNano(currentUnixTimeNano+TIME_DELTA*1000_000_000) + .build()) + .build(); + ResourceSpans resourceSpans = ResourceSpans.newBuilder() + .setResource(resource) + .addScopeSpans(scopeSpans) + .addInstrumentationLibrarySpans(ilSpans) + .build(); + + return ExportTraceServiceRequest.newBuilder() + .addResourceSpans(resourceSpans) + .build(); + } + + private void validateSpan(Span span) throws Exception { + assertThat(new String(Hex.decodeHex(span.get("traceId", String.class))), is(ByteString.copyFrom(TraceId.getBytes()).toStringUtf8())); + assertThat(new String(Hex.decodeHex(span.get("spanId", String.class))), is(ByteString.copyFrom(SpanId.getBytes()).toStringUtf8())); + assertThat(span.get("droppedLinksCount", Integer.class), is(0)); + assertThat(span.get("droppedAttributesCount", Integer.class), is(0)); + assertThat(span.get("kind", String.class), is("SPAN_KIND_INTERNAL")); + assertThat(span.get("name", String.class), is(scopeSpanName)); + Map attributes = span.get("attributes", Map.class); + + assertThat(span.get("traceGroup", String.class), is(scopeSpanName)); + assertThat(attributes.get("instrumentationScope.name"), is(scopeName)); + assertThat(attributes.get("resource.attributes.service@name"), is(traceServiceName)); + assertThat(span.get("startTime", String.class), is(currentTime.toString())); + assertThat(span.get("endTime", String.class), is((currentTime.plusSeconds(TIME_DELTA)).toString())); + assertThat(span.get("durationInNanos", Long.class), is(TIME_DELTA*1000_000_000L)); + assertThat(span.get("parentSpanId", String.class), is("")); + + } + + @Test + void test_otel_traces_with_kafka_buffer() throws Exception { + KafkaBuffer kafkaBuffer = new KafkaBuffer(pluginSetting, kafkaBufferConfig, pluginFactory, acknowledgementSetManager, new OTelTraceDecoder(), null, null); + buffer = new KafkaDelegatingBuffer(kafkaBuffer); + final ExportTraceServiceRequest request = createExportTraceRequest(); + buffer.writeBytes(request.toByteArray(), null, 10_000); + Map.Entry>, CheckpointState> readResult = kafkaBuffer.read(10_000); + assertThat(readResult, notNullValue()); + assertThat(readResult.getKey(), notNullValue()); + assertThat(readResult.getKey().size(), equalTo(1)); + for (Record record : readResult.getKey()) { + validateSpan((Span)record.getData()); } }