Skip to content

Commit

Permalink
Revert "Drop support DataflowMetrics and MetricsToCounterUpdateConver…
Browse files Browse the repository at this point in the history
…ter until Dataflow java client support is ready"

This reverts commit a2cb708.
  • Loading branch information
rohitsinha54 committed Feb 1, 2025
1 parent 5e031ab commit 1c7fe06
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 7 deletions.
1 change: 1 addition & 0 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ dependencies {
// io-kafka is only used in PTransform override so it is optional
provided project(":sdks:java:io:kafka")
implementation project(":sdks:java:io:google-cloud-platform")
implementation project(":runners:core-java")
implementation library.java.avro
implementation library.java.bigdataoss_util
implementation library.java.commons_codec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.metrics.BoundedTrieData;
import org.apache.beam.sdk.metrics.BoundedTrieResult;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
Expand All @@ -55,7 +57,11 @@
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
})
class DataflowMetrics extends MetricResults {

private static final Logger LOG = LoggerFactory.getLogger(DataflowMetrics.class);
// TODO (rosinha): Remove this once bounded_trie is available in metrics proto Dataflow
// java client.
public static final String BOUNDED_TRIE = "bounded_trie";
/**
* Client for the Dataflow service. This can be used to query the service for information about
* the job.
Expand Down Expand Up @@ -104,13 +110,13 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) {
ImmutableList<MetricResult<DistributionResult>> distributions = ImmutableList.of();
ImmutableList<MetricResult<GaugeResult>> gauges = ImmutableList.of();
ImmutableList<MetricResult<StringSetResult>> stringSets = ImmutableList.of();
ImmutableList<MetricResult<BoundedTrieResult>> boudedTries = ImmutableList.of();
ImmutableList<MetricResult<BoundedTrieResult>> boundedTries = ImmutableList.of();
JobMetrics jobMetrics;
try {
jobMetrics = getJobMetrics();
} catch (IOException e) {
LOG.warn("Unable to query job metrics.\n");
return MetricQueryResults.create(counters, distributions, gauges, stringSets, boudedTries);
return MetricQueryResults.create(counters, distributions, gauges, stringSets, boundedTries);
}
metricUpdates = firstNonNull(jobMetrics.getMetrics(), Collections.emptyList());
return populateMetricQueryResults(metricUpdates, filter);
Expand All @@ -134,13 +140,15 @@ private static class DataflowMetricResultExtractor {
private final ImmutableList.Builder<MetricResult<DistributionResult>> distributionResults;
private final ImmutableList.Builder<MetricResult<GaugeResult>> gaugeResults;
private final ImmutableList.Builder<MetricResult<StringSetResult>> stringSetResults;
private final ImmutableList.Builder<MetricResult<BoundedTrieResult>> boundedTrieResults;
private final boolean isStreamingJob;

DataflowMetricResultExtractor(boolean isStreamingJob) {
counterResults = ImmutableList.builder();
distributionResults = ImmutableList.builder();
gaugeResults = ImmutableList.builder();
stringSetResults = ImmutableList.builder();
boundedTrieResults = ImmutableList.builder();
/* In Dataflow streaming jobs, only ATTEMPTED metrics are available.
* In Dataflow batch jobs, only COMMITTED metrics are available, but
* we must provide ATTEMPTED, so we use COMMITTED as a good approximation.
Expand Down Expand Up @@ -169,6 +177,11 @@ public void addMetricResult(
// stringset metric
StringSetResult value = getStringSetValue(committed);
stringSetResults.add(MetricResult.create(metricKey, !isStreamingJob, value));
} else if (committed.get(BOUNDED_TRIE) != null && attempted.get(BOUNDED_TRIE) != null) {
// TODO (rosinha): This is dummy code. Once Dataflow MetricUpdate
// google client api is updated. Update this.
BoundedTrieResult value = getBoundedTrieValue(committed);
boundedTrieResults.add(MetricResult.create(metricKey, !isStreamingJob, value));
} else {
// This is exceptionally unexpected. We expect matching user metrics to only have the
// value types provided by the Metrics API.
Expand Down Expand Up @@ -196,6 +209,15 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) {
return StringSetResult.create(ImmutableSet.copyOf(((Collection) metricUpdate.getSet())));
}

private BoundedTrieResult getBoundedTrieValue(MetricUpdate metricUpdate) {
if (metricUpdate.get(BOUNDED_TRIE) == null) {
return BoundedTrieResult.empty();
}
BoundedTrie bTrie = (BoundedTrie) metricUpdate.get(BOUNDED_TRIE);
BoundedTrieData trieData = BoundedTrieData.fromProto(bTrie);
return BoundedTrieResult.create(trieData.extractResult().getResult());
}

private DistributionResult getDistributionValue(MetricUpdate metricUpdate) {
if (metricUpdate.getDistribution() == null) {
return DistributionResult.IDENTITY_ELEMENT;
Expand All @@ -220,9 +242,13 @@ public Iterable<MetricResult<GaugeResult>> getGaugeResults() {
return gaugeResults.build();
}

public Iterable<MetricResult<StringSetResult>> geStringSetResults() {
public Iterable<MetricResult<StringSetResult>> getStringSetResults() {
return stringSetResults.build();
}

public Iterable<MetricResult<BoundedTrieResult>> getBoundedTrieResults() {
return boundedTrieResults.build();
}
}

private static class DataflowMetricQueryResultsFactory {
Expand Down Expand Up @@ -388,8 +414,8 @@ public MetricQueryResults build() {
extractor.getCounterResults(),
extractor.getDistributionResults(),
extractor.getGaugeResults(),
extractor.geStringSetResults(),
ImmutableList.of());
extractor.getStringSetResults(),
extractor.getBoundedTrieResults());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Set;
import org.apache.beam.runners.core.metrics.BoundedTrieData;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.DataflowTemplateJob;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
import org.apache.beam.sdk.metrics.BoundedTrieResult;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
Expand Down Expand Up @@ -196,6 +198,13 @@ private MetricUpdate makeStringSetMetricUpdate(
return setStructuredName(update, name, namespace, step, tentative);
}

private MetricUpdate makeBoundedTrieMetricUpdate(
String name, String namespace, String step, BoundedTrieData data, boolean tentative) {
MetricUpdate update = new MetricUpdate();
update.set(DataflowMetrics.BOUNDED_TRIE, data.toProto());
return setStructuredName(update, name, namespace, step, tentative);
}

@Test
public void testSingleCounterUpdates() throws IOException {
AppliedPTransform<?, ?, ?> myStep = mock(AppliedPTransform.class);
Expand Down Expand Up @@ -286,6 +295,64 @@ public void testSingleStringSetUpdates() throws IOException {
StringSetResult.create(ImmutableSet.of("ab", "cd")))));
}

@Test
public void testSingleBoundedTrieUpdates() throws IOException {
AppliedPTransform<?, ?, ?> myStep = mock(AppliedPTransform.class);
when(myStep.getFullName()).thenReturn("myStepName");
BiMap<AppliedPTransform<?, ?, ?>, String> transformStepNames = HashBiMap.create();
transformStepNames.put(myStep, "s2");

JobMetrics jobMetrics = new JobMetrics();
DataflowPipelineJob job = mock(DataflowPipelineJob.class);
DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
when(options.isStreaming()).thenReturn(false);
when(job.getDataflowOptions()).thenReturn(options);
when(job.getState()).thenReturn(State.RUNNING);
when(job.getJobId()).thenReturn(JOB_ID);
when(job.getTransformStepNames()).thenReturn(transformStepNames);

// The parser relies on the fact that one tentative and one committed metric update exist in
// the job metrics results.
MetricUpdate mu1 =
makeBoundedTrieMetricUpdate(
"counterName",
"counterNamespace",
"s2",
new BoundedTrieData(ImmutableList.of("ab", "cd")),
false);
MetricUpdate mu1Tentative =
makeBoundedTrieMetricUpdate(
"counterName",
"counterNamespace",
"s2",
new BoundedTrieData(ImmutableList.of("ab", "cd")),
true);
jobMetrics.setMetrics(ImmutableList.of(mu1, mu1Tentative));
DataflowClient dataflowClient = mock(DataflowClient.class);
when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);

DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
MetricQueryResults result = dataflowMetrics.allMetrics();
assertThat(
result.getBoundedTries(),
containsInAnyOrder(
attemptedMetricsResult(
"counterNamespace",
"counterName",
"myStepName",
BoundedTrieResult.create(
ImmutableSet.of(ImmutableList.of("ab", "cd", String.valueOf(false)))))));
assertThat(
result.getBoundedTries(),
containsInAnyOrder(
committedMetricsResult(
"counterNamespace",
"counterName",
"myStepName",
BoundedTrieResult.create(
ImmutableSet.of(ImmutableList.of("ab", "cd", String.valueOf(false)))))));
}

@Test
public void testIgnoreDistributionButGetCounterUpdates() throws IOException {
AppliedPTransform<?, ?, ?> myStep = mock(AppliedPTransform.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,12 @@ public Iterable<CounterUpdate> extractMetricUpdates(boolean isFinalUpdate) {
.transform(
update ->
MetricsToCounterUpdateConverter.fromStringSet(
update.getKey(), true, update.getUpdate())));
update.getKey(), true, update.getUpdate())),
FluentIterable.from(updates.boundedTrieUpdates())
.transform(
update ->
MetricsToCounterUpdateConverter.fromBoundedTrie(
update.getKey(), update.getUpdate())));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.api.services.dataflow.model.IntegerGauge;
import com.google.api.services.dataflow.model.StringList;
import java.util.ArrayList;
import org.apache.beam.runners.core.metrics.BoundedTrieData;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.StringSetData;
import org.apache.beam.sdk.metrics.MetricKey;
Expand Down Expand Up @@ -111,6 +112,16 @@ public static CounterUpdate fromStringSet(
.setStringList(stringList);
}

public static CounterUpdate fromBoundedTrie(MetricKey key, BoundedTrieData boundedTrieData) {
// BoundedTrie uses SET kind metric aggregation which tracks unique strings.
CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET);
// TODO (rosinha): Once the CounterUpdate API is updated in dataflow client update this.
return new CounterUpdate()
.setStructuredNameAndMetadata(name)
.setCumulative(false)
.set("bounded_trie", boundedTrieData.toProto());
}

public static CounterUpdate fromDistribution(
MetricKey key, boolean isCumulative, DistributionData update) {
CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.DISTRIBUTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ public Iterable<CounterUpdate> extractUpdates() {
return counterUpdates()
.append(distributionUpdates())
.append(gaugeUpdates())
.append(stringSetUpdates());
.append(stringSetUpdates())
.append(boundedTrieUpdates());
}

private FluentIterable<CounterUpdate> counterUpdates() {
Expand Down Expand Up @@ -277,6 +278,20 @@ private FluentIterable<CounterUpdate> stringSetUpdates() {
.filter(Predicates.notNull());
}

private FluentIterable<CounterUpdate> boundedTrieUpdates() {
return FluentIterable.from(boundedTries.entries())
.transform(
new Function<Entry<MetricName, BoundedTrieCell>, CounterUpdate>() {
@Override
public @Nullable CounterUpdate apply(
@Nonnull Map.Entry<MetricName, BoundedTrieCell> entry) {
return MetricsToCounterUpdateConverter.fromBoundedTrie(
MetricKey.create(stepName, entry.getKey()), entry.getValue().getCumulative());
}
})
.filter(Predicates.notNull());
}

private FluentIterable<CounterUpdate> distributionUpdates() {
return FluentIterable.from(distributions.entries())
.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.google.api.services.dataflow.model.DistributionUpdate;
import com.google.api.services.dataflow.model.StringList;
import java.util.Arrays;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.runners.core.metrics.BoundedTrieData;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
Expand All @@ -40,13 +42,15 @@
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope;
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.sdk.metrics.BoundedTrie;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -193,6 +197,42 @@ public void extractMetricUpdatesStringSet() {
assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected));
}

@Test
public void extractMetricUpdatesBoundedTrie() {
BatchModeExecutionContext executionContext =
BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "testStage");
DataflowOperationContext operationContext =
executionContext.createOperationContext(NameContextsForTests.nameContextForTest());

BoundedTrie boundedTrie =
operationContext
.metricsContainer()
.getBoundedTrie(MetricName.named("namespace", "some-bounded-trie"));
boundedTrie.add("ab");
boundedTrie.add("cd");

BoundedTrieData trieData = new BoundedTrieData();
trieData.add(ImmutableList.of("ab"));
trieData.add(ImmutableList.of("cd"));
MetricsApi.BoundedTrie expectedTrie = trieData.toProto();

final CounterUpdate expected =
new CounterUpdate()
.setStructuredNameAndMetadata(
new CounterStructuredNameAndMetadata()
.setName(
new CounterStructuredName()
.setOrigin("USER")
.setOriginNamespace("namespace")
.setName("some-bounded-trie")
.setOriginalStepName("originalName"))
.setMetadata(new CounterMetadata().setKind(Kind.SET.toString())))
.setCumulative(false)
.set("bounded_trie", expectedTrie);

assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected));
}

@Test
public void extractMsecCounters() {
BatchModeExecutionContext executionContext =
Expand Down
Loading

0 comments on commit 1c7fe06

Please sign in to comment.