Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into julient/batch-datastream
Browse files Browse the repository at this point in the history
  • Loading branch information
jto committed Jan 23, 2025
2 parents f6b8779 + dc7426f commit 10c480c
Show file tree
Hide file tree
Showing 24 changed files with 443 additions and 92 deletions.
8 changes: 8 additions & 0 deletions .test-infra/tools/flaky_test_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ def get_workflow_issues(issues):
return workflows


def ensure_labels_exist(repo, labels):
existing_labels = {label.name for label in repo.get_labels()}
for label in labels:
if label not in existing_labels:
repo.create_label(name=label, color="ededed")


def create_github_issue(repo, alert):
github_workflow_failing_runs_url = f"https://github.com/{GIT_ORG}/beam/actions/{alert.workflow_filename}?query=is%3Afailure+branch%3Amaster"
title = f"The {alert.workflow_name} job is flaky"
Expand All @@ -92,6 +99,7 @@ def create_github_issue(repo, alert):
if READ_ONLY == "true":
print("READ_ONLY is true, not creating issue")
else:
ensure_labels_exist(repo, labels)
repo.create_issue(title=title, body=body, labels=labels)


Expand Down
2 changes: 1 addition & 1 deletion runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -840,7 +840,7 @@ def buildAndPushDistrolessDockerJavaContainer = tasks.register("buildAndPushDist
}
dependsOn ":sdks:java:container:distroless:${javaVer}:docker"
def defaultDockerImageName = containerImageName(
name: "${project.docker_image_default_repo_prefix}${javaVer}_sdk",
name: "${project.docker_image_default_repo_prefix}${javaVer}_sdk_distroess",
root: "apache",
tag: project.sdk_version)
doLast {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import org.apache.beam.runners.dataflow.worker.counters.Counter.AtomicCounterValue;
import org.apache.beam.runners.dataflow.worker.counters.Counter.CounterUpdateExtractor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -380,15 +381,22 @@ public <UpdateT> UpdateT extractUpdate(
}

/** Implements a {@link Counter} for tracking the sum of long values. */
public static class LongSumCounterValue extends LongCounterValue {
public static class LongSumCounterValue extends BaseCounterValue<Long, Long> {
private final LongAdder aggregate = new LongAdder();

@Override
public Long getAggregate() {
return aggregate.sum();
}

@Override
public void addValue(Long value) {
aggregate.addAndGet(value);
aggregate.add(value);
}

@Override
public Long getAndReset() {
return aggregate.getAndSet(0);
return aggregate.sumThenReset();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class BoundedQueueExecutor {

private final ThreadPoolExecutor executor;
private final long maximumBytesOutstanding;

Expand All @@ -54,17 +55,17 @@ public class BoundedQueueExecutor {
private long totalTimeMaxActiveThreadsUsed;

public BoundedQueueExecutor(
int maximumPoolSize,
int initialMaximumPoolSize,
long keepAliveTime,
TimeUnit unit,
int maximumElementsOutstanding,
long maximumBytesOutstanding,
ThreadFactory threadFactory) {
this.maximumPoolSize = maximumPoolSize;
this.maximumPoolSize = initialMaximumPoolSize;
executor =
new ThreadPoolExecutor(
maximumPoolSize,
maximumPoolSize,
initialMaximumPoolSize,
initialMaximumPoolSize,
keepAliveTime,
unit,
new LinkedBlockingQueue<>(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private void reportProgress() {
LOG.info("Cancelling workitem execution: {}", workString(), e);
worker.abort();
} catch (Throwable e) {
LOG.warn("Error reporting workitem progress update to Dataflow service: ", e);
LOG.error("Error reporting workitem progress update to Dataflow service: ", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
// released (2.11.0)
@SuppressWarnings("unused")
public class BoundedQueueExecutorTest {

private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
private static final int DEFAULT_MAX_THREADS = 2;
private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
Expand Down Expand Up @@ -247,7 +248,8 @@ public void testRecordTotalTimeMaxActiveThreadsUsed() throws Exception {
}

@Test
public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeUpdated() throws Exception {
public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsIncreased()
throws Exception {
CountDownLatch processStart1 = new CountDownLatch(1);
CountDownLatch processStart2 = new CountDownLatch(1);
CountDownLatch processStart3 = new CountDownLatch(1);
Expand Down Expand Up @@ -287,6 +289,58 @@ public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeUpdated()
executor.shutdown();
}

@Test
public void testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsReduced()
throws Exception {
CountDownLatch processStart1 = new CountDownLatch(1);
CountDownLatch processStop1 = new CountDownLatch(1);
CountDownLatch processStart2 = new CountDownLatch(1);
CountDownLatch processStop2 = new CountDownLatch(1);
CountDownLatch processStart3 = new CountDownLatch(1);
CountDownLatch processStop3 = new CountDownLatch(1);
Runnable m1 = createSleepProcessWorkFn(processStart1, processStop1);
Runnable m2 = createSleepProcessWorkFn(processStart2, processStop2);
Runnable m3 = createSleepProcessWorkFn(processStart3, processStop3);

// Initial state.
assertEquals(0, executor.activeCount());
assertEquals(2, executor.getMaximumPoolSize());

// m1 is accepted.
executor.execute(m1, 1);
processStart1.await();
assertEquals(1, executor.activeCount());
assertEquals(2, executor.getMaximumPoolSize());
assertEquals(0L, executor.allThreadsActiveTime());

processStop1.countDown();
while (executor.activeCount() != 0) {
// Waiting for all threads to be ended.
Thread.sleep(200);
}

// Reduce max pool size to 1
executor.setMaximumPoolSize(1, 105);

assertEquals(0, executor.activeCount());
executor.execute(m2, 1);
processStart2.await();
Thread.sleep(100);
assertEquals(1, executor.activeCount());
assertEquals(1, executor.getMaximumPoolSize());
processStop2.countDown();

while (executor.activeCount() != 0) {
// Waiting for all threads to be ended.
Thread.sleep(200);
}

// allThreadsActiveTime() should be recorded
// since when the second task was running it reached the new max pool size.
assertThat(executor.allThreadsActiveTime(), greaterThan(0L));
executor.shutdown();
}

@Test
public void testRenderSummaryHtml() {
String expectedSummaryHtml =
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/core/runtime/graphx/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package graphx

import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
Expand All @@ -35,6 +34,7 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/resource"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
)
Expand Down Expand Up @@ -125,7 +125,7 @@ func CreateEnvironment(ctx context.Context, urn string, extractEnvironmentConfig
case URNEnvProcess:
config := extractEnvironmentConfig(ctx)
payload := &pipepb.ProcessPayload{}
if err := json.Unmarshal([]byte(config), payload); err != nil {
if err := protojson.Unmarshal([]byte(config), payload); err != nil {
return nil, fmt.Errorf("unable to json unmarshal --environment_config: %w", err)
}
serializedPayload = protox.MustEncode(payload)
Expand Down
2 changes: 0 additions & 2 deletions sdks/java/container/distroless/common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,3 @@ docker {
load project.useBuildx() && !pushContainers
push pushContainers
}

dockerPrepare.dependsOn ":sdks:java:container:java${imageJavaVersion}:docker"
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class AvroGenericRecordToStorageApiProto {
.put(Schema.Type.STRING, Object::toString)
.put(Schema.Type.BOOLEAN, Function.identity())
.put(Schema.Type.ENUM, o -> o.toString())
.put(Schema.Type.BYTES, o -> ByteString.copyFrom(((ByteBuffer) o).duplicate()))
.put(Schema.Type.BYTES, AvroGenericRecordToStorageApiProto::convertBytes)
.build();

// A map of supported logical types to their encoding functions.
Expand Down Expand Up @@ -145,6 +145,16 @@ static ByteString convertDecimal(LogicalType logicalType, Object value) {
return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
}

static ByteString convertBytes(Object value) {
if (value instanceof byte[]) {
// for backward compatibility
// this is not accepted by the avro spec, but users may have abused it
return ByteString.copyFrom((byte[]) value);
} else {
return ByteString.copyFrom(((ByteBuffer) value).duplicate());
}
}

/**
* Given an Avro Schema, returns a protocol-buffer TableSchema that can be used to write data
* through BigQuery Storage API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,13 @@ public synchronized BigQueryStorageStreamSource<T> getCurrentSource() {
.setName(source.readStream.getName())
.setFraction((float) fraction)
.build();

SplitReadStreamResponse splitResponse = storageClient.splitReadStream(splitRequest);
SplitReadStreamResponse splitResponse;
try {
splitResponse = storageClient.splitReadStream(splitRequest);
} catch (Exception e) {
LOG.warn("Skip split read stream due to failed request: ", e);
return null;
}
if (!splitResponse.hasPrimaryStream() || !splitResponse.hasRemainderStream()) {
// No more splits are possible!
impossibleSplitPointCalls.inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ enum TestEnum {
SchemaBuilder.record("TestRecord")
.fields()
.optionalBytes("bytesValue")
.optionalBytes("byteBufferValue")
.requiredInt("intValue")
.optionalLong("longValue")
.optionalFloat("floatValue")
Expand Down Expand Up @@ -138,64 +139,71 @@ enum TestEnum {
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("intvalue")
.setName("bytebuffervalue")
.setNumber(2)
.setType(Type.TYPE_BYTES)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("intvalue")
.setNumber(3)
.setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("longvalue")
.setNumber(3)
.setNumber(4)
.setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("floatvalue")
.setNumber(4)
.setNumber(5)
.setType(Type.TYPE_DOUBLE)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("doublevalue")
.setNumber(5)
.setNumber(6)
.setType(Type.TYPE_DOUBLE)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("stringvalue")
.setNumber(6)
.setNumber(7)
.setType(Type.TYPE_STRING)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("booleanvalue")
.setNumber(7)
.setNumber(8)
.setType(Type.TYPE_BOOL)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("arrayvalue")
.setNumber(8)
.setNumber(9)
.setType(Type.TYPE_STRING)
.setLabel(Label.LABEL_REPEATED)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("enumvalue")
.setNumber(9)
.setNumber(10)
.setType(Type.TYPE_STRING)
.setLabel(Label.LABEL_OPTIONAL)
.build())
.addField(
FieldDescriptorProto.newBuilder()
.setName("fixedvalue")
.setNumber(10)
.setNumber(11)
.setType(Type.TYPE_BYTES)
.setLabel(Label.LABEL_REQUIRED)
.build())
Expand Down Expand Up @@ -309,7 +317,8 @@ enum TestEnum {
Instant now = Instant.now();
baseRecord =
new GenericRecordBuilder(BASE_SCHEMA)
.set("bytesValue", ByteBuffer.wrap(BYTES))
.set("bytesValue", BYTES)
.set("byteBufferValue", ByteBuffer.wrap(BYTES))
.set("intValue", (int) 3)
.set("longValue", (long) 4)
.set("floatValue", (float) 3.14)
Expand Down Expand Up @@ -346,6 +355,7 @@ enum TestEnum {
baseProtoExpectedFields =
ImmutableMap.<String, Object>builder()
.put("bytesvalue", ByteString.copyFrom(BYTES))
.put("bytebuffervalue", ByteString.copyFrom(BYTES))
.put("intvalue", (long) 3)
.put("longvalue", (long) 4)
.put("floatvalue", (double) 3.14)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;

@AutoValue
public abstract class IcebergCatalogConfig implements Serializable {
private transient @MonotonicNonNull Catalog cachedCatalog;

@Pure
@Nullable
public abstract String getCatalogName();
Expand All @@ -47,6 +51,9 @@ public static Builder builder() {
}

public org.apache.iceberg.catalog.Catalog catalog() {
if (cachedCatalog != null) {
return cachedCatalog;
}
String catalogName = getCatalogName();
if (catalogName == null) {
catalogName = "apache-beam-" + ReleaseInfo.getReleaseInfo().getVersion();
Expand All @@ -63,7 +70,8 @@ public org.apache.iceberg.catalog.Catalog catalog() {
for (Map.Entry<String, String> prop : confProps.entrySet()) {
config.set(prop.getKey(), prop.getValue());
}
return CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
cachedCatalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, config);
return cachedCatalog;
}

@AutoValue.Builder
Expand Down
Loading

0 comments on commit 10c480c

Please sign in to comment.