Skip to content

Commit

Permalink
Upgrade OTEL version to 1.2
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Jan 27, 2025
1 parent 9c61e03 commit 0810b20
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.InstrumentationLibrary;
import io.opentelemetry.proto.common.v1.InstrumentationScope;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint;
Expand Down Expand Up @@ -191,21 +190,6 @@ public static Map<String, Object> getResourceAttributes(final Resource resource)
.collect(Collectors.toMap(i -> PREFIX_AND_RESOURCE_ATTRIBUTES_REPLACE_DOT_WITH_AT.apply(i.getKey()), i -> convertAnyValue(i.getValue())));
}

/**
* Extracts the name and version of the used instrumentation library used
*
* @return A map, containing information about the instrumentation library
*/
public static Map<String, Object> getInstrumentationLibraryAttributes(final InstrumentationLibrary instrumentationLibrary) {
final Map<String, Object> instrumentationAttr = new HashMap<>();
if (!instrumentationLibrary.getName().isEmpty()) {
instrumentationAttr.put(INSTRUMENTATION_LIBRARY_NAME, instrumentationLibrary.getName());
}
if (!instrumentationLibrary.getVersion().isEmpty()) {
instrumentationAttr.put(INSTRUMENTATION_LIBRARY_VERSION, instrumentationLibrary.getVersion());
}
return instrumentationAttr;
}

/**
* Extracts the name and version of the used instrumentation scope used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,16 @@
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.InstrumentationLibrary;
import io.opentelemetry.proto.common.v1.InstrumentationScope;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.logs.v1.LogRecord;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.opentelemetry.proto.metrics.v1.InstrumentationLibraryMetrics;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import io.opentelemetry.proto.resource.v1.Resource;
import io.opentelemetry.proto.trace.v1.InstrumentationLibrarySpans;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.proto.trace.v1.ScopeSpans;
import io.opentelemetry.proto.trace.v1.Status;
Expand Down Expand Up @@ -92,6 +89,7 @@ public class OTelProtoCodec {
static final String INSTRUMENTATION_LIBRARY_VERSION = "instrumentationLibrary.version";
static final String STATUS_CODE = "status.code";
static final String STATUS_MESSAGE = "status.message";
static final String ATTRIBUTES_KEY = "attributes";


/**
Expand Down Expand Up @@ -198,17 +196,6 @@ protected Collection<OpenTelemetryLog> parseResourceLogs(ResourceLogs rs, final
final Map<String, Object> resourceAttributes = OTelProtoCodec.getResourceAttributes(rs.getResource());
final String schemaUrl = rs.getSchemaUrl();

Stream<OpenTelemetryLog> mappedInstrumentationLibraryLogs = rs.getInstrumentationLibraryLogsList()
.stream()
.map(ils ->
processLogsList(ils.getLogRecordsList(),
serviceName,
OTelProtoCodec.getInstrumentationLibraryAttributes(ils.getInstrumentationLibrary()),
resourceAttributes,
schemaUrl,
timeReceived))
.flatMap(Collection::stream);

Stream<OpenTelemetryLog> mappedScopeListLogs = rs.getScopeLogsList()
.stream()
.map(sls ->
Expand All @@ -220,7 +207,7 @@ protected Collection<OpenTelemetryLog> parseResourceLogs(ResourceLogs rs, final
timeReceived))
.flatMap(Collection::stream);

return Stream.concat(mappedInstrumentationLibraryLogs, mappedScopeListLogs).collect(Collectors.toList());
return mappedScopeListLogs.collect(Collectors.toList());
}

protected Map<String, ResourceSpans> splitResourceSpansByTraceId(final ResourceSpans resourceSpans) {
Expand All @@ -239,22 +226,6 @@ protected Map<String, ResourceSpans> splitResourceSpansByTraceId(final ResourceS
}
}

if (resourceSpans.getInstrumentationLibrarySpansList().size() > 0) {
for (Map.Entry<String, List<InstrumentationLibrarySpans>> entry: splitInstrumentationLibrarySpansByTraceId(resourceSpans.getInstrumentationLibrarySpansList()).entrySet()) {
ResourceSpans.Builder resourceSpansBuilder;
String traceId = entry.getKey();
if (resultBuilderMap.containsKey(traceId)) {
resourceSpansBuilder = resultBuilderMap.get(traceId);
} else {
resourceSpansBuilder = ResourceSpans.newBuilder();
if (hasResource) {
resourceSpansBuilder.setResource(resource);
}
resultBuilderMap.put(traceId, resourceSpansBuilder);
}
resourceSpansBuilder.addAllInstrumentationLibrarySpans(entry.getValue());
}
}
for (Map.Entry<String, ResourceSpans.Builder> entry: resultBuilderMap.entrySet()) {
result.put(entry.getKey(), entry.getValue().build());
}
Expand All @@ -270,10 +241,6 @@ protected List<Span> parseResourceSpans(final ResourceSpans resourceSpans, final
return parseScopeSpans(resourceSpans.getScopeSpansList(), serviceName, resourceAttributes, timeReceived);
}

if (resourceSpans.getInstrumentationLibrarySpansList().size() > 0) {
return parseInstrumentationLibrarySpans(resourceSpans.getInstrumentationLibrarySpansList(), serviceName, resourceAttributes, timeReceived);
}

LOG.debug("No spans found to parse from ResourceSpans object: {}", resourceSpans);
return Collections.emptyList();
}
Expand Down Expand Up @@ -306,39 +273,6 @@ private Map<String, List<ScopeSpans>> splitScopeSpansByTraceId(final List<ScopeS
return result;
}

private List<Span> parseInstrumentationLibrarySpans(final List<InstrumentationLibrarySpans> instrumentationLibrarySpansList,
final String serviceName, final Map<String, Object> resourceAttributes,
final Instant timeReceived) {
return instrumentationLibrarySpansList.stream()
.map(instrumentationLibrarySpans -> parseSpans(instrumentationLibrarySpans.getSpansList(),
instrumentationLibrarySpans.getInstrumentationLibrary(), this::getInstrumentationLibraryAttributes,
serviceName, resourceAttributes, timeReceived))
.flatMap(Collection::stream)
.collect(Collectors.toList());
}

private Map<String, List<InstrumentationLibrarySpans>> splitInstrumentationLibrarySpansByTraceId(final List<InstrumentationLibrarySpans> instrumentationLibrarySpansList) {
Map<String, List<InstrumentationLibrarySpans>> result = new HashMap<>();
for (InstrumentationLibrarySpans is: instrumentationLibrarySpansList) {
final boolean hasInstrumentationLibrary = is.hasInstrumentationLibrary();
final io.opentelemetry.proto.common.v1.InstrumentationLibrary instrumentationLibrary = is.getInstrumentationLibrary();
for (Map.Entry<String, List<io.opentelemetry.proto.trace.v1.Span>> entry: splitSpansByTraceId(is.getSpansList()).entrySet()) {
String traceId = entry.getKey();
InstrumentationLibrarySpans.Builder ilSpansBuilder = InstrumentationLibrarySpans.newBuilder().setSchemaUrl(is.getSchemaUrl()).addAllSpans(entry.getValue());
if (hasInstrumentationLibrary) {
ilSpansBuilder.setInstrumentationLibrary(instrumentationLibrary);
}

if (!result.containsKey(traceId)) {
result.put(traceId, new ArrayList<>());
}
result.get(traceId).add(ilSpansBuilder.build());
}
}
return result;
}


private Map<String, List<io.opentelemetry.proto.trace.v1.Span>> splitSpansByTraceId(final List<io.opentelemetry.proto.trace.v1.Span> spans) {
Map<String, List<io.opentelemetry.proto.trace.v1.Span>> result = new HashMap<>();
for (io.opentelemetry.proto.trace.v1.Span span: spans) {
Expand Down Expand Up @@ -547,17 +481,6 @@ protected TraceGroupFields getTraceGroupFields(final io.opentelemetry.proto.trac
return traceGroupFieldsBuilder.build();
}

protected Map<String, Object> getInstrumentationLibraryAttributes(final InstrumentationLibrary instrumentationLibrary) {
final Map<String, Object> instrumentationAttr = new HashMap<>();
if (!instrumentationLibrary.getName().isEmpty()) {
instrumentationAttr.put(INSTRUMENTATION_SCOPE_NAME, instrumentationLibrary.getName());
}
if (!instrumentationLibrary.getVersion().isEmpty()) {
instrumentationAttr.put(INSTRUMENTATION_SCOPE_VERSION, instrumentationLibrary.getVersion());
}
return instrumentationAttr;
}

protected Map<String, Object> getSpanStatusAttributes(final Status status) {
final Map<String, Object> statusAttr = new HashMap<>();
statusAttr.put(STATUS_CODE, status.getCodeValue());
Expand Down Expand Up @@ -600,11 +523,6 @@ public Collection<Record<? extends Metric>> parseExportMetricsServiceRequest(
final Map<String, Object> resourceAttributes = OTelProtoCodec.getResourceAttributes(rs.getResource());
final String serviceName = OTelProtoCodec.getServiceName(rs.getResource()).orElse(null);

for (InstrumentationLibraryMetrics is : rs.getInstrumentationLibraryMetricsList()) {
final Map<String, Object> ils = OTelProtoCodec.getInstrumentationLibraryAttributes(is.getInstrumentationLibrary());
recordsOut.addAll(processMetricsList(is.getMetricsList(), serviceName, ils, resourceAttributes, schemaUrl, droppedCounter, exponentialHistogramMaxAllowedScale, timeReceived, calculateHistogramBuckets, calculateExponentialHistogramBuckets, flattenAttributes));
}

for (ScopeMetrics sm : rs.getScopeMetricsList()) {
final Map<String, Object> ils = OTelProtoCodec.getInstrumentationScopeAttributes(sm.getScope());
recordsOut.addAll(processMetricsList(sm.getMetricsList(), serviceName, ils, resourceAttributes, schemaUrl, droppedCounter, exponentialHistogramMaxAllowedScale, timeReceived, calculateHistogramBuckets, calculateExponentialHistogramBuckets, flattenAttributes));
Expand Down Expand Up @@ -1171,23 +1089,6 @@ public static Map<String, Object> getResourceAttributes(final Resource resource)
.collect(Collectors.toMap(i -> PREFIX_AND_RESOURCE_ATTRIBUTES_REPLACE_DOT_WITH_AT.apply(i.getKey()), i -> convertAnyValue(i.getValue())));
}

/**
* Extracts the name and version of the used instrumentation library used
*
* @param instrumentationLibrary the instrumentation library
* @return A map, containing information about the instrumentation library
*/
public static Map<String, Object> getInstrumentationLibraryAttributes(final InstrumentationLibrary instrumentationLibrary) {
final Map<String, Object> instrumentationAttr = new HashMap<>();
if (!instrumentationLibrary.getName().isEmpty()) {
instrumentationAttr.put(INSTRUMENTATION_LIBRARY_NAME, instrumentationLibrary.getName());
}
if (!instrumentationLibrary.getVersion().isEmpty()) {
instrumentationAttr.put(INSTRUMENTATION_LIBRARY_VERSION, instrumentationLibrary.getVersion());
}
return instrumentationAttr;
}

/**
* Extracts the name and version of the used instrumentation scope used
*
Expand All @@ -1202,6 +1103,9 @@ public static Map<String, Object> getInstrumentationScopeAttributes(final Instru
if (!instrumentationScope.getVersion().isEmpty()) {
instrumentationScopeAttr.put(INSTRUMENTATION_SCOPE_VERSION, instrumentationScope.getVersion());
}
if (!instrumentationScope.getAttributesList().isEmpty()) {
instrumentationScopeAttr.put(ATTRIBUTES_KEY, OTelProtoCodec.unpackKeyValueListLog(instrumentationScope.getAttributesList()));
}
return instrumentationScopeAttr;
}

Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencyResolutionManagement {
version('protobuf', '3.24.3')
library('protobuf-core', 'com.google.protobuf', 'protobuf-java').versionRef('protobuf')
library('protobuf-util', 'com.google.protobuf', 'protobuf-java-util').versionRef('protobuf')
version('opentelemetry', '0.16.0-alpha')
version('opentelemetry', '1.2.0-alpha')
library('opentelemetry-proto', 'io.opentelemetry.proto', 'opentelemetry-proto').versionRef('opentelemetry')
version('opensearchJava', '2.8.1')
library('opensearch-java', 'org.opensearch.client', 'opensearch-java').versionRef('opensearchJava')
Expand Down

0 comments on commit 0810b20

Please sign in to comment.