diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java index 26d07b3d8b2e..a34eb2c4ab0a 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java @@ -38,7 +38,6 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort; import org.apache.beam.model.pipeline.v1.Endpoints; import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.model.pipeline.v1.RunnerApi.Components; import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Urns; import org.apache.beam.runners.core.metrics.MonitoringInfoEncodings; @@ -95,7 +94,7 @@ public BeamFnDataReadRunner createRunnerForPTransform(Context context) context.getPTransformId(), context.getPTransform(), context.getProcessBundleInstructionIdSupplier(), - context.getCoders(), + context.getComponents(), context.getBeamFnStateClient(), context::addBundleProgressReporter, consumer); @@ -127,7 +126,7 @@ public BeamFnDataReadRunner createRunnerForPTransform(Context context) String pTransformId, RunnerApi.PTransform grpcReadNode, Supplier processBundleInstructionIdSupplier, - Map coders, + RunnerApi.Components components, BeamFnStateClient beamFnStateClient, Consumer addBundleProgressReporter, FnDataReceiver> consumer) @@ -138,13 +137,12 @@ public BeamFnDataReadRunner createRunnerForPTransform(Context context) this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier; this.consumer = consumer; - RehydratedComponents components = - RehydratedComponents.forComponents(Components.newBuilder().putAllCoders(coders).build()); + RehydratedComponents rehydratedComponents = RehydratedComponents.forComponents(components); this.coder = (Coder>) CoderTranslation.fromProto( - coders.get(port.getCoderId()), - components, + components.getCodersMap().get(port.getCoderId()), + rehydratedComponents, new StateBackedIterableTranslationContext() { @Override public Supplier> getCache() { diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java index ade5c32a8c14..e3474fb1e113 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java @@ -27,7 +27,6 @@ import org.apache.beam.fn.harness.state.StateBackedIterable.StateBackedIterableTranslationContext; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort; -import org.apache.beam.model.pipeline.v1.RunnerApi.Components; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite; import org.apache.beam.sdk.util.WindowedValue; @@ -64,13 +63,11 @@ static class Factory implements PTransformRunnerFactory> coder = (Coder>) CoderTranslation.fromProto( - context.getCoders().get(port.getCoderId()), + context.getComponents().getCodersMap().get(port.getCoderId()), components, new StateBackedIterableTranslationContext() { @Override diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java index 984aa30e7fb5..e0fefafbd1bd 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/CombineRunners.java @@ -137,15 +137,14 @@ public PrecombineRunner createRunnerForPTransform(Context throws IOException { // Get objects needed to create the runner. RehydratedComponents rehydratedComponents = - RehydratedComponents.forComponents( - RunnerApi.Components.newBuilder() - .putAllCoders(context.getCoders()) - .putAllWindowingStrategies(context.getWindowingStrategies()) - .build()); + RehydratedComponents.forComponents(context.getComponents()); String mainInputTag = Iterables.getOnlyElement(context.getPTransform().getInputsMap().keySet()); RunnerApi.PCollection mainInput = - context.getPCollections().get(context.getPTransform().getInputsOrThrow(mainInputTag)); + context + .getComponents() + .getPcollectionsMap() + .get(context.getPTransform().getInputsOrThrow(mainInputTag)); // Input coder may sometimes be WindowedValueCoder depending on runner, instead of the // expected KvCoder. diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index c39722c90d89..b11dbd84d90c 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -180,9 +180,7 @@ static class Factory> cacheTokens, Supplier> bundleCache, Cache processWideCache, - Map pCollections, - Map coders, - Map windowingStrategies, + RunnerApi.Components components, Consumer addStartFunction, Consumer addFinishFunction, Consumer addResetFunction, @@ -375,13 +371,7 @@ static class Factory maybeWindowedValueInputCoder = rehydratedComponents.getCoder(mainInput.getCoderId()); // TODO: Stop passing windowed value coders within PCollections. if (maybeWindowedValueInputCoder instanceof WindowedValue.WindowedValueCoder) { @@ -426,7 +417,8 @@ static class Factory entry : pTransform.getOutputsMap().entrySet()) { TupleTag outputTag = new TupleTag<>(entry.getKey()); - RunnerApi.PCollection outputPCollection = pCollections.get(entry.getValue()); + RunnerApi.PCollection outputPCollection = + components.getPcollectionsMap().get(entry.getValue()); Coder outputCoder = rehydratedComponents.getCoder(outputPCollection.getCoderId()); if (outputCoder instanceof WindowedValueCoder) { outputCoder = ((WindowedValueCoder) outputCoder).getValueCoder(); @@ -443,7 +435,7 @@ static class Factory getProcessWideCache(); - /** An immutable mapping from PCollection id to PCollection definition. */ - Map getPCollections(); - - /** An immutable mapping from coder id to coder definition. */ - Map getCoders(); - - /** An immutable mapping from windowing strategy id to windowing strategy definition. */ - Map getWindowingStrategies(); - /** An immutable set of runner capability urns. */ Set getRunnerCapabilities(); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 300796ac6f12..e6f87d58baec 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -66,11 +66,8 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse; import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor; import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.model.pipeline.v1.RunnerApi.Coder; -import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.StandardRunnerProtocols; -import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy; import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Urns; import org.apache.beam.runners.core.metrics.ShortIdMap; import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver; @@ -236,6 +233,7 @@ private void createRunnerAndConsumersForPTransformRecursively( Supplier> cacheTokens, Supplier> bundleCache, ProcessBundleDescriptor processBundleDescriptor, + RunnerApi.Components components, SetMultimap pCollectionIdsToConsumingPTransforms, PCollectionConsumerRegistry pCollectionConsumerRegistry, Set processedPTransformIds, @@ -268,6 +266,7 @@ private void createRunnerAndConsumersForPTransformRecursively( cacheTokens, bundleCache, processBundleDescriptor, + components, pCollectionIdsToConsumingPTransforms, pCollectionConsumerRegistry, processedPTransformIds, @@ -358,18 +357,8 @@ public Supplier> getCacheTokensSupplier() { } @Override - public Map getPCollections() { - return processBundleDescriptor.getPcollectionsMap(); - } - - @Override - public Map getCoders() { - return processBundleDescriptor.getCodersMap(); - } - - @Override - public Map getWindowingStrategies() { - return processBundleDescriptor.getWindowingStrategiesMap(); + public RunnerApi.Components getComponents() { + return components; } @Override @@ -867,6 +856,13 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) { continue; } + RunnerApi.Components components = + RunnerApi.Components.newBuilder() + .putAllCoders(bundleDescriptor.getCodersMap()) + .putAllPcollections(bundleDescriptor.getPcollectionsMap()) + .putAllWindowingStrategies(bundleDescriptor.getWindowingStrategiesMap()) + .build(); + createRunnerAndConsumersForPTransformRecursively( beamFnStateClient, beamFnDataClient, @@ -876,6 +872,7 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) { bundleProcessor::getCacheTokens, bundleProcessor::getBundleCache, bundleDescriptor, + components, pCollectionIdsToConsumingPTransforms, pCollectionConsumerRegistry, processedPTransformIds, diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java index 3d81564fee31..6c0e42de39d3 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/AssignWindowsRunnerTest.java @@ -192,8 +192,11 @@ public Coder windowCoder() { .build() .toByteString())) .build()) - .pCollections(Collections.singletonMap("input", pCollection)) - .coders(Collections.singletonMap("coder-id", coder)) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(Collections.singletonMap("input", pCollection)) + .putAllCoders(Collections.singletonMap("coder-id", coder)) + .build()) .build(); Collection> outputs = new ArrayList<>(); context.addPCollectionConsumer("output", outputs::add); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java index 55342f9362b2..adc1bd0eae18 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java @@ -149,12 +149,17 @@ public void testCreatingAndProcessingBeamFnDataReadRunner() throws Exception { PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(INPUT_TRANSFORM_ID, pTransform) .processBundleInstructionId(DEFAULT_BUNDLE_ID) - .pCollections( - ImmutableMap.of( - localOutputId, - RunnerApi.PCollection.newBuilder().setCoderId(ELEMENT_CODER_SPEC_ID).build())) - .coders(COMPONENTS.getCodersMap()) - .windowingStrategies(COMPONENTS.getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections( + ImmutableMap.of( + localOutputId, + RunnerApi.PCollection.newBuilder() + .setCoderId(ELEMENT_CODER_SPEC_ID) + .build())) + .putAllCoders(COMPONENTS.getCodersMap()) + .putAllWindowingStrategies(COMPONENTS.getWindowingStrategiesMap()) + .build()) .build(); context.addPCollectionConsumer(localOutputId, outputValues::add); @@ -187,12 +192,17 @@ public void testReuseForMultipleBundles() throws Exception { INPUT_TRANSFORM_ID, RemoteGrpcPortRead.readFromPort(PORT_SPEC, localOutputId).toPTransform()) .processBundleInstructionIdSupplier(bundleId::get) - .pCollections( - ImmutableMap.of( - localOutputId, - RunnerApi.PCollection.newBuilder().setCoderId(ELEMENT_CODER_SPEC_ID).build())) - .coders(COMPONENTS.getCodersMap()) - .windowingStrategies(COMPONENTS.getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections( + ImmutableMap.of( + localOutputId, + RunnerApi.PCollection.newBuilder() + .setCoderId(ELEMENT_CODER_SPEC_ID) + .build())) + .putAllCoders(COMPONENTS.getCodersMap()) + .putAllWindowingStrategies(COMPONENTS.getWindowingStrategiesMap()) + .build()) .build(); context.addPCollectionConsumer(localOutputId, outputValues::add); @@ -659,12 +669,17 @@ private static BeamFnDataReadRunner createReadRunner( PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(pTransformId, pTransform) .processBundleInstructionId(DEFAULT_BUNDLE_ID) - .pCollections( - ImmutableMap.of( - localOutputId, - RunnerApi.PCollection.newBuilder().setCoderId(ELEMENT_CODER_SPEC_ID).build())) - .coders(COMPONENTS.getCodersMap()) - .windowingStrategies(COMPONENTS.getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections( + ImmutableMap.of( + localOutputId, + RunnerApi.PCollection.newBuilder() + .setCoderId(ELEMENT_CODER_SPEC_ID) + .build())) + .putAllCoders(COMPONENTS.getCodersMap()) + .putAllWindowingStrategies(COMPONENTS.getWindowingStrategiesMap()) + .build()) .build(); context.addPCollectionConsumer(localOutputId, consumer); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java index 071ac037883b..7835516f1b5b 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java @@ -153,12 +153,15 @@ public void testReuseForMultipleBundles() throws Exception { .beamFnDataClient(mockBeamFnDataClient) .processBundleInstructionIdSupplier(bundleId::get) .outboundAggregators(aggregators) - .pCollections( - ImmutableMap.of( - localInputId, - RunnerApi.PCollection.newBuilder().setCoderId(ELEM_CODER_ID).build())) - .coders(COMPONENTS.getCodersMap()) - .windowingStrategies(COMPONENTS.getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections( + ImmutableMap.of( + localInputId, + RunnerApi.PCollection.newBuilder().setCoderId(ELEM_CODER_ID).build())) + .putAllCoders(COMPONENTS.getCodersMap()) + .putAllWindowingStrategies(COMPONENTS.getWindowingStrategiesMap()) + .build()) .build(); new BeamFnDataWriteRunner.Factory().createRunnerForPTransform(context); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java index 1bdc3d899718..d441045261b8 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CombineRunnersTest.java @@ -120,9 +120,12 @@ public void createPipeline() throws Exception { public void testPrecombine() throws Exception { PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_COMBINE_ID, pTransform) - .pCollections(pProto.getComponents().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(pProto.getComponents().getPcollectionsMap()) + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .build()) .build(); // Add a consumer and output target to check output values. Deque>> mainOutputValues = new ArrayDeque<>(); @@ -201,8 +204,11 @@ public void testMergeAccumulators() throws Exception { PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_COMBINE_ID, pTransform) - .pCollections(pCollectionMap) - .coders(coderMap) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(pCollectionMap) + .putAllCoders(coderMap) + .build()) .build(); // Add a consumer and output target to check output values. Deque>> mainOutputValues = new ArrayDeque<>(); @@ -262,8 +268,11 @@ public void testExtractOutputs() throws Exception { .build()); PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_COMBINE_ID, pTransform) - .pCollections(pCollectionMap) - .coders(coderMap) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(pCollectionMap) + .putAllCoders(coderMap) + .build()) .build(); // Add a consumer and output target to check output values. Deque>> mainOutputValues = new ArrayDeque<>(); @@ -306,8 +315,11 @@ public void testExtractOutputs() throws Exception { public void testConvertToAccumulators() throws Exception { PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_COMBINE_ID, pTransform) - .pCollections(pProto.getComponents().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(pProto.getComponents().getPcollectionsMap()) + .putAllCoders(pProto.getComponents().getCodersMap()) + .build()) .build(); // Add a consumer and output target to check output values. Deque>> mainOutputValues = new ArrayDeque<>(); @@ -373,8 +385,11 @@ public void testCombineGroupedValues() throws Exception { .build()); PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_COMBINE_ID, pTransform) - .pCollections(pCollectionMap) - .coders(coderMap) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(pCollectionMap) + .putAllCoders(coderMap) + .build()) .build(); // Add a consumer and output target to check output values. Deque>> mainOutputValues = new ArrayDeque<>(); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java index 828d146f0b99..bbb86cab959a 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FlattenRunnerTest.java @@ -90,8 +90,11 @@ public void testCreatingAndProcessingDoFlatten() throws Exception { PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(pTransformId, pTransform) .processBundleInstructionId("57") - .pCollections(pCollectionMap) - .coders(Collections.singletonMap("coder-id", coder)) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(pCollectionMap) + .putAllCoders(Collections.singletonMap("coder-id", coder)) + .build()) .build(); List> mainOutputValues = new ArrayList<>(); context.addPCollectionConsumer( @@ -150,8 +153,11 @@ public void testFlattenWithDuplicateInputCollectionProducesMultipleOutputs() thr PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(pTransformId, pTransform) .processBundleInstructionId("57") - .pCollections(Collections.singletonMap("inputATarget", pCollection)) - .coders(Collections.singletonMap("coder-id", coder)) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(Collections.singletonMap("inputATarget", pCollection)) + .putAllCoders(Collections.singletonMap("coder-id", coder)) + .build()) .build(); List> mainOutputValues = new ArrayList<>(); context.addPCollectionConsumer( diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java index f4d555dabcc1..8f4cfa1acf68 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java @@ -268,9 +268,12 @@ public void testUsingUserState() throws Exception { PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponents().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .build()) .build(); List> mainOutputValues = new ArrayList<>(); context.addPCollectionConsumer( @@ -428,9 +431,13 @@ public void testProcessElementWithSideInputsAndOutputs() throws Exception { PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponents().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllEnvironments(Collections.emptyMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .build()) .build(); List> mainOutputValues = new ArrayList<>(); List> additionalOutputValues = new ArrayList<>(); @@ -531,9 +538,13 @@ public void testProcessElementWithNonWindowObservingOptimization() throws Except PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .processBundleInstructionId("57") - .pCollections(pProto.getComponents().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllEnvironments(Collections.emptyMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .build()) .build(); List> mainOutputValues = new ArrayList<>(); List> additionalOutputValues = new ArrayList<>(); @@ -670,9 +681,13 @@ public void testSideInputIsAccessibleForDownstreamCallers() throws Exception { PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponents().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllEnvironments(Collections.emptyMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .build()) .build(); List>> mainOutputValues = new ArrayList<>(); context.addPCollectionConsumer( @@ -772,9 +787,13 @@ public void testUsingMetrics() throws Exception { PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponents().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllEnvironments(Collections.emptyMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .build()) .build(); List>> mainOutputValues = new ArrayList<>(); context.addPCollectionConsumer( @@ -942,9 +961,13 @@ public void testTimers() throws Exception { PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeStateClient) .processBundleInstructionId("57L") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllEnvironments(Collections.emptyMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .build()) .outboundAggregators( ImmutableMap.of(ApiServiceDescriptor.getDefaultInstance(), aggregator)) .timerApiServiceDescriptor(ApiServiceDescriptor.getDefaultInstance()) @@ -1678,9 +1701,13 @@ public void testProcessElementForSizedElementAndRestriction() throws Exception { PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllEnvironments(Collections.emptyMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .build()) .splitListener(splitListener) .build(); List> mainOutputValues = new ArrayList<>(); @@ -1968,9 +1995,13 @@ public void testProcessElementForSizedElementAndRestrictionSplitBeforeTryClaim() PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllEnvironments(Collections.emptyMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .build()) .splitListener(splitListener) .build(); List> mainOutputValues = new ArrayList<>(); @@ -2178,9 +2209,13 @@ public void testProcessElementForSizedElementAndRestrictionNoTryClaim() throws E PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllEnvironments(Collections.emptyMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .build()) .splitListener(splitListener) .build(); List> mainOutputValues = new ArrayList<>(); @@ -2372,9 +2407,12 @@ public void testProcessElementForWindowedSizedElementAndRestriction() throws Exc PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .build()) .splitListener(splitListener) .build(); List> mainOutputValues = new ArrayList<>(); @@ -2790,9 +2828,12 @@ public void testProcessElementForPairWithRestriction() throws Exception { PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .build()) .build(); List>> mainOutputValues = new ArrayList<>(); context.addPCollectionConsumer(outputPCollectionId, ((List) mainOutputValues)::add); @@ -2875,9 +2916,12 @@ public void testProcessElementForWindowedPairWithRestriction() throws Exception PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .build()) .build(); List>> mainOutputValues = new ArrayList<>(); context.addPCollectionConsumer(outputPCollectionId, ((List) mainOutputValues)::add); @@ -2986,9 +3030,12 @@ public void testProcessElementForWindowedPairWithRestrictionWithNonWindowObservi PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .build()) .build(); List>> mainOutputValues = new ArrayList<>(); context.addPCollectionConsumer(outputPCollectionId, ((List) mainOutputValues)::add); @@ -3081,9 +3128,12 @@ public void testProcessElementForSplitAndSizeRestriction() throws Exception { PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .build()) .build(); List, Double>>> mainOutputValues = new ArrayList<>(); Coder coder = @@ -3180,9 +3230,12 @@ public void testProcessElementForWindowedSplitAndSizeRestriction() throws Except PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .build()) .build(); List, Double>>> mainOutputValues = new ArrayList<>(); Coder coder = @@ -3321,9 +3374,12 @@ public void testProcessElementForWindowedSplitAndSizeRestriction() throws Except PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .build()) .build(); List, Double>>> mainOutputValues = new ArrayList<>(); Coder coder = @@ -3516,9 +3572,16 @@ public void testProcessElementForTruncateAndSizeRestrictionForwardSplitWhenObser PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + // .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .putAllEnvironments(Collections.emptyMap()) + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .build()) + // .coders(pProto.getComponents().getCodersMap()) + // .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) .build(); List, Double>>> mainOutputValues = new ArrayList<>(); Coder coder = @@ -3666,9 +3729,13 @@ public void testProcessElementForTruncateAndSizeRestrictionForwardSplitWithoutOb PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllEnvironments(Collections.emptyMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .build()) .build(); List, Double>>> mainOutputValues = new ArrayList<>(); Coder coder = @@ -3722,9 +3789,13 @@ public void testProcessElementForTruncateAndSizeRestriction() throws Exception { PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllEnvironments(Collections.emptyMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .build()) .build(); List, Double>>> mainOutputValues = new ArrayList<>(); Coder coder = @@ -3820,9 +3891,13 @@ public void testProcessElementForWindowedTruncateAndSizeRestriction() throws Exc PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .beamFnStateClient(fakeClient) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllEnvironments(Collections.emptyMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .build()) .build(); List, Double>>> mainOutputValues = new ArrayList<>(); Coder coder = @@ -3938,9 +4013,13 @@ public void testProcessElementForWindowedTruncateAndSizeRestriction() throws Exc PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllEnvironments(Collections.emptyMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .build()) .build(); List, Double>>> mainOutputValues = new ArrayList<>(); Coder coder = @@ -4066,9 +4145,13 @@ public void testDoFnSkewNotAllowed() throws Exception { PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllEnvironments(Collections.emptyMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .build()) .build(); List> mainOutputValues = new ArrayList<>(); Coder coder = StringUtf8Coder.of(); @@ -4125,9 +4208,13 @@ public void testDoFnSkewAllowed() throws Exception { PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(TEST_TRANSFORM_ID, pTransform) .processBundleInstructionId("57") - .pCollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) - .coders(pProto.getComponents().getCodersMap()) - .windowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(pProto.getComponents().getCodersMap()) + .putAllEnvironments(Collections.emptyMap()) + .putAllWindowingStrategies(pProto.getComponents().getWindowingStrategiesMap()) + .putAllPcollections(pProto.getComponentsOrBuilder().getPcollectionsMap()) + .build()) .build(); Coder coder = StringUtf8Coder.of(); context.addPCollectionConsumer( diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java index cf750bfd7d6a..18b15e7785b5 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/MapFnRunnersTest.java @@ -71,8 +71,11 @@ public void testValueOnlyMapping() throws Exception { PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(EXPECTED_ID, EXPECTED_PTRANSFORM) .processBundleInstructionId("57") - .pCollections(Collections.singletonMap("inputPC", INPUT_PCOLLECTION)) - .coders(Collections.singletonMap("coder-id", valueCoder)) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(Collections.singletonMap("inputPC", INPUT_PCOLLECTION)) + .putAllCoders(Collections.singletonMap("coder-id", valueCoder)) + .build()) .build(); List> outputConsumer = new ArrayList<>(); context.addPCollectionConsumer("outputPC", outputConsumer::add); @@ -97,8 +100,11 @@ public void testFullWindowedValueMapping() throws Exception { PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(EXPECTED_ID, EXPECTED_PTRANSFORM) .processBundleInstructionId("57") - .pCollections(Collections.singletonMap("inputPC", INPUT_PCOLLECTION)) - .coders(Collections.singletonMap("coder-id", valueCoder)) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(Collections.singletonMap("inputPC", INPUT_PCOLLECTION)) + .putAllCoders(Collections.singletonMap("coder-id", valueCoder)) + .build()) .build(); List> outputConsumer = new ArrayList<>(); context.addPCollectionConsumer("outputPC", outputConsumer::add); @@ -122,8 +128,11 @@ public void testFullWindowedValueMappingWithCompressedWindow() throws Exception PTransformRunnerFactoryTestContext context = PTransformRunnerFactoryTestContext.builder(EXPECTED_ID, EXPECTED_PTRANSFORM) .processBundleInstructionId("57") - .pCollections(Collections.singletonMap("inputPC", INPUT_PCOLLECTION)) - .coders(Collections.singletonMap("coder-id", valueCoder)) + .components( + RunnerApi.Components.newBuilder() + .putAllPcollections(Collections.singletonMap("inputPC", INPUT_PCOLLECTION)) + .putAllCoders(Collections.singletonMap("coder-id", valueCoder)) + .build()) .build(); List> outputConsumer = new ArrayList<>(); context.addPCollectionConsumer("outputPC", outputConsumer::add); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PTransformRunnerFactoryTestContext.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PTransformRunnerFactoryTestContext.java index acfd3bb70202..bd7b7726e692 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PTransformRunnerFactoryTestContext.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/PTransformRunnerFactoryTestContext.java @@ -114,9 +114,13 @@ public CompletableFuture handle(StateRequest.Builder requestBuild .cacheTokensSupplier(() -> Collections.emptyList()) .bundleCacheSupplier(() -> Caches.noop()) .processWideCache(Caches.noop()) - .pCollections(Collections.emptyMap()) // expected to be immutable - .coders(Collections.emptyMap()) // expected to be immutable - .windowingStrategies(Collections.emptyMap()) // expected to be immutable + .components( + RunnerApi.Components.newBuilder() + .putAllCoders(Collections.emptyMap()) + .putAllEnvironments(Collections.emptyMap()) + .putAllWindowingStrategies(Collections.emptyMap()) + .putAllPcollections(Collections.emptyMap()) + .build()) .pCollectionConsumers(new HashMap<>()) .startBundleFunctions(new ArrayList<>()) .finishBundleFunctions(new ArrayList<>()) @@ -175,11 +179,13 @@ default Builder processBundleInstructionId(String value) { return processBundleInstructionIdSupplier(() -> value); } - Builder pCollections(Map value); + // Builder pCollections(Map value); - Builder coders(Map value); + Builder components(RunnerApi.Components value); - Builder windowingStrategies(Map value); + // Builder coders(Map value); + + // Builder windowingStrategies(Map value); Builder runnerCapabilities(Set value);