From 8f16d838bafcb7d3d4566cc446753e8e9582b8a8 Mon Sep 17 00:00:00 2001 From: Zhouxiang Zhan Date: Tue, 16 Apr 2024 14:20:16 +0800 Subject: [PATCH] [ISSUE #726] Add test module to provide integration test for java client (#727) * Add GrpcServerIntegrationTest * Add AttemptIdIntegrationTest * fix jdk17 SelfSignedCertificate --- java/pom.xml | 15 ++ java/test/pom.xml | 71 ++++++ .../test/client/AttemptIdIntegrationTest.java | 224 ++++++++++++++++++ .../rocketmq/test/helper/ResponseWriter.java | 71 ++++++ .../server/GrpcServerIntegrationTest.java | 61 +++++ .../rocketmq/test/server/MockServer.java | 35 +++ 6 files changed, 477 insertions(+) create mode 100644 java/test/pom.xml create mode 100644 java/test/src/test/java/org/apache/rocketmq/test/client/AttemptIdIntegrationTest.java create mode 100644 java/test/src/test/java/org/apache/rocketmq/test/helper/ResponseWriter.java create mode 100644 java/test/src/test/java/org/apache/rocketmq/test/server/GrpcServerIntegrationTest.java create mode 100644 java/test/src/test/java/org/apache/rocketmq/test/server/MockServer.java diff --git a/java/pom.xml b/java/pom.xml index 779a74d4..f081c19a 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -31,6 +31,7 @@ client-apis client client-shade + test @@ -62,6 +63,7 @@ 2.6.0 3.10.0 4.1.0 + 1.77 3.8.0 @@ -166,6 +168,12 @@ grpc-stub ${grpc.version} + + io.grpc + grpc-testing + test + ${grpc.version} + com.google.guava guava @@ -247,6 +255,13 @@ ${awaitility.version} test + + + org.bouncycastle + bcpkix-jdk18on + ${bcpkix.version} + test + diff --git a/java/test/pom.xml b/java/test/pom.xml new file mode 100644 index 00000000..55f643e3 --- /dev/null +++ b/java/test/pom.xml @@ -0,0 +1,71 @@ + + + + + 4.0.0 + + org.apache.rocketmq + rocketmq-client-java-parent + 5.0.7-SNAPSHOT + + + test + + + 8 + UTF-8 + + + + + ${project.groupId} + rocketmq-client-java-noshade + + + io.grpc + grpc-testing + test + + + junit + junit + test + + + org.assertj + assertj-core + test + + + org.mockito + mockito-core + test + + + org.awaitility + awaitility + test + + + org.bouncycastle + bcpkix-jdk18on + test + + + + \ No newline at end of file diff --git a/java/test/src/test/java/org/apache/rocketmq/test/client/AttemptIdIntegrationTest.java b/java/test/src/test/java/org/apache/rocketmq/test/client/AttemptIdIntegrationTest.java new file mode 100644 index 00000000..b03cf70d --- /dev/null +++ b/java/test/src/test/java/org/apache/rocketmq/test/client/AttemptIdIntegrationTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.client; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import apache.rocketmq.v2.Address; +import apache.rocketmq.v2.Assignment; +import apache.rocketmq.v2.Broker; +import apache.rocketmq.v2.Code; +import apache.rocketmq.v2.Endpoints; +import apache.rocketmq.v2.ExponentialBackoff; +import apache.rocketmq.v2.HeartbeatRequest; +import apache.rocketmq.v2.HeartbeatResponse; +import apache.rocketmq.v2.MessageQueue; +import apache.rocketmq.v2.MessageType; +import apache.rocketmq.v2.Permission; +import apache.rocketmq.v2.QueryAssignmentRequest; +import apache.rocketmq.v2.QueryAssignmentResponse; +import apache.rocketmq.v2.QueryRouteRequest; +import apache.rocketmq.v2.QueryRouteResponse; +import apache.rocketmq.v2.ReceiveMessageRequest; +import apache.rocketmq.v2.ReceiveMessageResponse; +import apache.rocketmq.v2.Resource; +import apache.rocketmq.v2.Status; +import apache.rocketmq.v2.TelemetryCommand; +import com.google.protobuf.Duration; +import io.grpc.stub.StreamObserver; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientServiceProvider; +import org.apache.rocketmq.client.apis.SessionCredentialsProvider; +import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; +import org.apache.rocketmq.client.apis.consumer.ConsumeResult; +import org.apache.rocketmq.client.apis.consumer.FilterExpression; +import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; +import org.apache.rocketmq.client.apis.consumer.PushConsumer; +import org.apache.rocketmq.test.helper.ResponseWriter; +import org.apache.rocketmq.test.server.GrpcServerIntegrationTest; +import org.apache.rocketmq.test.server.MockServer; +import org.junit.Before; +import org.junit.Test; + +public class AttemptIdIntegrationTest extends GrpcServerIntegrationTest { + private final String topic = "topic"; + private final String broker = "broker"; + private final ResponseWriter responseWriter = ResponseWriter.getInstance(); + private final Status mockStatus = Status.newBuilder() + .setCode(Code.OK) + .setMessage("mock test") + .build(); + + private final List attemptIdList = new CopyOnWriteArrayList<>(); + private final AtomicBoolean serverDeadlineFlag = new AtomicBoolean(true); + + @Before + public void setUp() throws Exception { + MockServer serverImpl = new MockServer() { + @Override + public void queryRoute(QueryRouteRequest request, StreamObserver responseObserver) { + responseWriter.write(responseObserver, QueryRouteResponse.newBuilder() + .setStatus(mockStatus) + .addMessageQueues(MessageQueue.newBuilder() + .setTopic(Resource.newBuilder() + .setName(topic).build()) + .setId(0) + .setPermission(Permission.READ_WRITE) + .setBroker(Broker.newBuilder() + .setName(broker) + .setId(0) + .setEndpoints(Endpoints.newBuilder() + .addAddresses(Address.newBuilder() + .setHost("127.0.0.1") + .setPort(port) + .build()) + .build()) + .build()) + .addAcceptMessageTypes(MessageType.NORMAL) + .build()) + .build()); + } + + @Override + public void heartbeat(HeartbeatRequest request, StreamObserver responseObserver) { + responseWriter.write(responseObserver, HeartbeatResponse.newBuilder().setStatus(mockStatus) + .build()); + } + + @Override + public void queryAssignment(QueryAssignmentRequest request, + StreamObserver responseObserver) { + responseWriter.write(responseObserver, QueryAssignmentResponse.newBuilder().setStatus(mockStatus) + .addAssignments(Assignment.newBuilder() + .setMessageQueue(MessageQueue.newBuilder() + .setTopic(Resource.newBuilder() + .setName(topic).build()) + .setId(0) + .setPermission(Permission.READ_WRITE) + .setBroker(Broker.newBuilder() + .setName(broker) + .setId(0) + .setEndpoints(Endpoints.newBuilder() + .addAddresses(Address.newBuilder() + .setHost("127.0.0.1") + .setPort(port) + .build()) + .build()) + .build()) + .addAcceptMessageTypes(MessageType.NORMAL) + .build()) + .build()) + .build()); + } + + @Override + public void receiveMessage(ReceiveMessageRequest request, + StreamObserver responseObserver) { + // prevent too much request + if (attemptIdList.size() >= 3) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + attemptIdList.add(request.getAttemptId()); + if (serverDeadlineFlag.compareAndSet(true, false)) { + // timeout + } else { + responseObserver.onNext(ReceiveMessageResponse.newBuilder().setStatus(mockStatus).build()); + responseObserver.onCompleted(); + } + } + + @Override + public StreamObserver telemetry(StreamObserver responseObserver) { + return new StreamObserver() { + @Override + public void onNext(TelemetryCommand value) { + responseObserver.onNext(value.toBuilder().setStatus(mockStatus) + .setSettings(value.getSettings().toBuilder() + .setBackoffPolicy(value.getSettings().getBackoffPolicy().toBuilder() + .setMaxAttempts(16) + .setExponentialBackoff(ExponentialBackoff.newBuilder() + .setInitial(Duration.newBuilder() + .setSeconds(1).build()) + .setMax(Duration.newBuilder() + .setSeconds(10).build()) + .setMultiplier(1.5f) + .build()))).build()); + responseObserver.onCompleted(); + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + }; + } + }; + + setUpServer(serverImpl, port); + serverImpl.setPort(port); + } + + @Test + public void test() throws Exception { + final ClientServiceProvider provider = ClientServiceProvider.loadService(); + String accessKey = "yourAccessKey"; + String secretKey = "yourSecretKey"; + SessionCredentialsProvider sessionCredentialsProvider = + new StaticSessionCredentialsProvider(accessKey, secretKey); + + String endpoints = "127.0.0.1" + ":" + port; + int timeout = 1000; + ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() + .setEndpoints(endpoints) + .setCredentialProvider(sessionCredentialsProvider) + .setRequestTimeout(java.time.Duration.of(timeout, ChronoUnit.MILLIS)) + .build(); + String tag = "yourMessageTagA"; + FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); + String consumerGroup = "yourConsumerGroup"; + PushConsumer pushConsumer = provider.newPushConsumerBuilder() + .setClientConfiguration(clientConfiguration) + .setConsumerGroup(consumerGroup) + .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) + .setMessageListener(messageView -> ConsumeResult.SUCCESS) + .build(); + try { + await().atMost(java.time.Duration.ofSeconds(5)).untilAsserted(() -> { + assertThat(attemptIdList.size()).isGreaterThanOrEqualTo(3); + assertThat(attemptIdList.get(0)).isEqualTo(attemptIdList.get(1)); + assertThat(attemptIdList.get(0)).isNotEqualTo(attemptIdList.get(2)); + }); + } finally { + pushConsumer.close(); + } + } +} diff --git a/java/test/src/test/java/org/apache/rocketmq/test/helper/ResponseWriter.java b/java/test/src/test/java/org/apache/rocketmq/test/helper/ResponseWriter.java new file mode 100644 index 00000000..17d2597b --- /dev/null +++ b/java/test/src/test/java/org/apache/rocketmq/test/helper/ResponseWriter.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.helper; + +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.ServerCallStreamObserver; +import io.grpc.stub.StreamObserver; + +public class ResponseWriter { + protected static final Object INSTANCE_CREATE_LOCK = new Object(); + protected static volatile ResponseWriter instance; + + public static ResponseWriter getInstance() { + if (instance == null) { + synchronized (INSTANCE_CREATE_LOCK) { + if (instance == null) { + instance = new ResponseWriter(); + } + } + } + return instance; + } + + public void write(StreamObserver observer, final T response) { + if (writeResponse(observer, response)) { + observer.onCompleted(); + } + } + + public boolean writeResponse(StreamObserver observer, final T response) { + if (null == response) { + return false; + } + if (isCancelled(observer)) { + return false; + } + try { + observer.onNext(response); + } catch (StatusRuntimeException statusRuntimeException) { + if (Status.CANCELLED.equals(statusRuntimeException.getStatus())) { + return false; + } + throw statusRuntimeException; + } + return true; + } + + public boolean isCancelled(StreamObserver observer) { + if (observer instanceof ServerCallStreamObserver) { + final ServerCallStreamObserver serverCallStreamObserver = (ServerCallStreamObserver) observer; + return serverCallStreamObserver.isCancelled(); + } + return false; + } +} \ No newline at end of file diff --git a/java/test/src/test/java/org/apache/rocketmq/test/server/GrpcServerIntegrationTest.java b/java/test/src/test/java/org/apache/rocketmq/test/server/GrpcServerIntegrationTest.java new file mode 100644 index 00000000..1f08d27b --- /dev/null +++ b/java/test/src/test/java/org/apache/rocketmq/test/server/GrpcServerIntegrationTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.server; + +import apache.rocketmq.v2.MessagingServiceGrpc; +import io.grpc.Server; +import io.grpc.ServerInterceptor; +import io.grpc.ServerServiceDefinition; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.util.SelfSignedCertificate; +import io.grpc.testing.GrpcCleanupRule; +import java.io.IOException; +import java.security.cert.CertificateException; +import org.junit.Rule; + +public class GrpcServerIntegrationTest { + /** + * This rule manages automatic graceful shutdown for the registered servers and channels at the end of test. + */ + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + /** + * Let OS pick up an available port. + */ + protected int port = 0; + + + protected void setUpServer(MessagingServiceGrpc.MessagingServiceImplBase serverImpl, + int port, ServerInterceptor... interceptors) throws IOException, CertificateException { + SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate(); + ServerServiceDefinition serviceDefinition = serverImpl.bindService(); + NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port) + .directExecutor() + .addService(serviceDefinition) + .useTransportSecurity(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey()); + for (ServerInterceptor interceptor : interceptors) { + serverBuilder = serverBuilder.intercept(interceptor); + } + Server server = serverBuilder.build() + .start(); + this.port = server.getPort(); + // Create a server, add service, start, and register for automatic graceful shutdown. + grpcCleanup.register(server); + } +} diff --git a/java/test/src/test/java/org/apache/rocketmq/test/server/MockServer.java b/java/test/src/test/java/org/apache/rocketmq/test/server/MockServer.java new file mode 100644 index 00000000..9fdd5fad --- /dev/null +++ b/java/test/src/test/java/org/apache/rocketmq/test/server/MockServer.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.test.server; + +import apache.rocketmq.v2.MessagingServiceGrpc; + +public class MockServer extends MessagingServiceGrpc.MessagingServiceImplBase { + private Integer port; + + public MockServer() { + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } +}