Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36648] Bump Flink version to Flink 2.0-preview1 #140

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,9 @@ jobs:
compile_and_test:
strategy:
matrix:
flink: [ 1.20.0 ]
jdk: [ '8, 11, 17, 21' ]
flink: [ 2.0-preview1 ]
jdk: [ '11, 17, 21' ]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink }}
jdk_version: ${{ matrix.jdk }}
python_test:
strategy:
matrix:
flink: [ 1.20.0 ]
uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink }}
4 changes: 2 additions & 2 deletions .github/workflows/weekly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
strategy:
matrix:
flink_branches: [{
flink: 1.20-SNAPSHOT,
flink: 2.0-SNAPSHOT,
branch: main
}, {
flink: 1.19.1,
Expand All @@ -46,5 +46,5 @@ jobs:
with:
flink_version: ${{ matrix.flink_branches.flink }}
connector_branch: ${{ matrix.flink_branches.branch }}
jdk_version: ${{ matrix.flink_branches.jdk || '8, 11, 17, 21' }}
jdk_version: ${{ matrix.flink_branches.jdk || '11, 17, 21' }}
run_dependency_convergence: false

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages.job.coordination;

import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.job.coordination.ClientCoordinationHandler;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.OperatorUidPathParameter;
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/** Message headers for the {@link ClientCoordinationHandler}. */
@Documentation.ExcludeFromDocumentation(
"This API is not exposed to the users, as coordinators are used only internally.")
public class ClientCoordinationHeaders
implements RuntimeMessageHeaders<
ClientCoordinationRequestBody,
ClientCoordinationResponseBody,
ClientCoordinationMessageParameters> {

public static final String URL =
"/jobs/:" + JobIDPathParameter.KEY + "/coordinators/:" + OperatorUidPathParameter.KEY;

private static final ClientCoordinationHeaders INSTANCE = new ClientCoordinationHeaders();

private ClientCoordinationHeaders() {}

@Override
public Class<ClientCoordinationRequestBody> getRequestClass() {
return ClientCoordinationRequestBody.class;
}

@Override
public Class<ClientCoordinationResponseBody> getResponseClass() {
return ClientCoordinationResponseBody.class;
}

@Override
public HttpResponseStatus getResponseStatusCode() {
return HttpResponseStatus.OK;
}

@Override
public ClientCoordinationMessageParameters getUnresolvedMessageParameters() {
return new ClientCoordinationMessageParameters();
}

@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.POST;
}

@Override
public String getTargetRestEndpointURL() {
return URL;
}

public static ClientCoordinationHeaders getInstance() {
return INSTANCE;
}

@Override
public String getDescription() {
return "Send a request to a specified coordinator of the specified job and get the response. "
+ "This API is for internal use only.";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.test.resources.ResourceTestUtils;

import org.junit.jupiter.api.Disabled;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.Arrays;

/** Kafka sink E2E test based on connector testing framework. */
@SuppressWarnings("unused")
@Disabled(
"FIXME: Skip this test temporarily until bumping a version that includes the fix of https://issues.apache.org/jira/browse/FLINK-36568")
public class KafkaSinkE2ECase extends SinkTestSuiteBase<String> {
private static final String KAFKA_HOSTNAME = "kafka";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.test.resources.ResourceTestUtils;

import org.junit.jupiter.api.Disabled;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

Expand All @@ -39,6 +40,8 @@
import static org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.TOPIC;

/** Kafka E2E test based on connector testing framework. */
@Disabled(
"FIXME: Skip this test temporarily until bumping a version that includes the fix of https://issues.apache.org/jira/browse/FLINK-36568")
public class KafkaSourceE2ECase extends SourceTestSuiteBase<String> {
private static final String KAFKA_HOSTNAME = "kafka";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.jupiter.api.Disabled;
import org.junit.rules.Timeout;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
Expand All @@ -57,6 +58,8 @@
import static org.junit.Assert.assertThat;

/** End-to-end test for SQL client using Avro Confluent Registry format. */
@Disabled(
"FIXME: Skip this test temporarily until bumping a version that includes the fix of https://issues.apache.org/jira/browse/FLINK-36568")
public class SQLClientSchemaRegistryITCase {
public static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
public static final String INTER_CONTAINER_REGISTRY_ALIAS = "registry";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
package org.apache.flink.tests.util.kafka;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.connector.kafka.testutils.KafkaUtil;
import org.apache.flink.connector.testframe.container.FlinkContainers;
import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.test.util.JobSubmission;
import org.apache.flink.util.TestLoggerExtension;
Expand All @@ -42,6 +42,7 @@
import org.apache.kafka.common.serialization.VoidSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand All @@ -60,12 +61,15 @@
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.flink.configuration.CheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH;
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
import static org.assertj.core.api.Assertions.assertThat;

/** smoke test for the kafka connectors. */
@ExtendWith({TestLoggerExtension.class})
@Testcontainers
@Disabled(
"FIXME: Skip this test temporarily until bumping a version that includes the fix of https://issues.apache.org/jira/browse/FLINK-36568")
class SmokeKafkaITCase {

private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
Expand Down Expand Up @@ -99,8 +103,14 @@ class SmokeKafkaITCase {
private static Configuration getConfiguration() {
// modify configuration to have enough slots
final Configuration flinkConfig = new Configuration();
flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
flinkConfig.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
flinkConfig.set(TaskManagerOptions.NUM_TASK_SLOTS, 3);
flinkConfig.set(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
flinkConfig.set(
org.apache.flink.configuration.JobManagerOptions.TOTAL_PROCESS_MEMORY,
MemorySize.ofMebiBytes(1024));
flinkConfig.set(
org.apache.flink.configuration.TaskManagerOptions.TOTAL_PROCESS_MEMORY,
MemorySize.ofMebiBytes(1024));
// Workaround for FLINK-36454 ; default config is entirely overwritten
flinkConfig.setString(
"env.java.opts.all",
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.flink.streaming.kafka.test.base;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.ParameterTool;

import java.time.Duration;

/** The util class for kafka example. */
public class KafkaExampleUtil {
Expand All @@ -40,8 +43,14 @@ public static StreamExecutionEnvironment prepareExecutionEnv(ParameterTool param
+ "--group.id <some id>");
}

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
Configuration configuration = new Configuration();
configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 4);
configuration.set(
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY,
Duration.ofMillis(10000));
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
env.getConfig()
.setGlobalJobParameters(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.streaming.kafka.test;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
Expand All @@ -27,6 +26,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.kafka.test.base.KafkaExampleUtil;
import org.apache.flink.util.ParameterTool;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
Method <org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.getTransactionCoordinatorId()> calls method <org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.getTransactionCoordinatorId()> in (FlinkKafkaProducer.java:1327)
Loading
Loading