Skip to content

Commit

Permalink
Initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-ai committed Jun 14, 2023
1 parent 447c3be commit ecdf9e5
Show file tree
Hide file tree
Showing 50 changed files with 3,936 additions and 158 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/java_coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ jobs:
java-version: "11"
distribution: "adopt"
- name: Generate coverage report
run: mvn test --file ./java/pom.xml
working-directory: ./java
run: mvn test --file ./pom.xml
- name: Test summary
uses: test-summary/action@v1
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,39 @@

import static com.google.common.base.Preconditions.checkNotNull;

import java.time.Duration;
import java.util.Optional;

/**
* Session credentials used in service authentications.
*/
public class SessionCredentials {
private static final Duration EXPIRATION_BUFFER_TIME = Duration.ofSeconds(1);
private final String accessKey;
private final String accessSecret;
private final String securityToken;
private final long expiredTimestampMillis;

public SessionCredentials(String accessKey, String accessSecret, String securityToken,
long expiredTimestampMillis) {
this.accessKey = checkNotNull(accessKey, "accessKey should not be null");
this.accessSecret = checkNotNull(accessSecret, "accessSecret should not be null");
this.securityToken = checkNotNull(securityToken, "securityToken should not be null");
this.expiredTimestampMillis = expiredTimestampMillis;
}

public SessionCredentials(String accessKey, String accessSecret, String securityToken) {
this.accessKey = checkNotNull(accessKey, "accessKey should not be null");
this.accessSecret = checkNotNull(accessSecret, "accessSecret should not be null");
this.securityToken = checkNotNull(securityToken, "securityToken should not be null");
this.expiredTimestampMillis = Long.MAX_VALUE;
}

public SessionCredentials(String accessKey, String accessSecret) {
this.accessKey = checkNotNull(accessKey, "accessKey should not be null");
this.accessSecret = checkNotNull(accessSecret, "accessSecret should not be null");
this.securityToken = null;
this.expiredTimestampMillis = Long.MAX_VALUE;
}

public String getAccessKey() {
Expand All @@ -52,4 +65,8 @@ public String getAccessSecret() {
public Optional<String> tryGetSecurityToken() {
return null == securityToken ? Optional.empty() : Optional.of(securityToken);
}

public boolean expiredSoon() {
return System.currentTimeMillis() + EXPIRATION_BUFFER_TIME.toMillis() > expiredTimestampMillis;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.apis.consumer;

import java.util.Set;
import org.apache.rocketmq.client.apis.message.MessageQueue;

public interface TopicMessageQueueChangeListener {
/**
* This method will be invoked in the condition of queue numbers changed, These scenarios occur when the topic is
* expanded or shrunk.
*
* @param topic the topic to listen.
* @param messageQueues latest message queues of the topic.
*/
void onChanged(String topic, Set<MessageQueue> messageQueues);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.client.apis.message;

public interface MessageQueue {
/**
* Topic of the current message queue.
*/
String getTopic();
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public interface ProducerBuilder {
* ArrayList<String> topicList = new ArrayList<>();
* topicList.add("topicA");
* topicList.add("topicB");
* producerBuilder.setTopics(topicList);
* producerBuilder.setTopics(topicList.toArray(new String[0]));
* }</pre>
*
* @param topics topics to send/prepare.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.rocketmq.client.java.exception;

import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.GetOffsetRequest;
import apache.rocketmq.v2.PullMessageRequest;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.Status;
import org.apache.rocketmq.client.apis.ClientException;
Expand Down Expand Up @@ -61,6 +63,11 @@ public static void check(Status status, RpcFuture<?, ?> future) throws ClientExc
case CLIENT_ID_REQUIRED:
case ILLEGAL_POLLING_TIME:
throw new BadRequestException(codeNumber, requestId, statusMessage);
case ILLEGAL_OFFSET:
if (future.getRequest() instanceof PullMessageRequest) {
return;
}
// fall through on purpose.
case UNAUTHORIZED:
throw new UnauthorizedException(codeNumber, requestId, statusMessage);
case PAYMENT_REQUIRED:
Expand All @@ -71,11 +78,19 @@ public static void check(Status status, RpcFuture<?, ?> future) throws ClientExc
if (future.getRequest() instanceof ReceiveMessageRequest) {
return;
}
if (future.getRequest() instanceof PullMessageRequest) {
return;
}
// fall through on purpose.
case NOT_FOUND:
case TOPIC_NOT_FOUND:
case CONSUMER_GROUP_NOT_FOUND:
throw new NotFoundException(codeNumber, requestId, statusMessage);
case OFFSET_NOT_FOUND:
if (future.getRequest() instanceof GetOffsetRequest) {
return;
}
// fall through on purpose.
case PAYLOAD_TOO_LARGE:
case MESSAGE_BODY_TOO_LARGE:
throw new PayloadTooLargeException(codeNumber, requestId, statusMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public enum MessageHookPoints {
* The hook point of message reception.
*/
RECEIVE,
/**
* The hook point of message pulling.
*/
PULL,
/**
* The hook point of message consumption.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ public void onFailure(Throwable t) {
public void doStats() {
}

private ListenableFuture<TopicRouteData> fetchTopicRoute(final String topic) {
protected ListenableFuture<TopicRouteData> fetchTopicRoute(final String topic) {
final ListenableFuture<TopicRouteData> future0 = fetchTopicRoute0(topic);
final ListenableFuture<TopicRouteData> future = Futures.transformAsync(future0,
topicRouteData -> onTopicRouteDataFetched(topic, topicRouteData), MoreExecutors.directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,27 @@
import apache.rocketmq.v2.EndTransactionResponse;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.GetOffsetRequest;
import apache.rocketmq.v2.GetOffsetResponse;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.HeartbeatResponse;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.NotifyClientTerminationResponse;
import apache.rocketmq.v2.PullMessageRequest;
import apache.rocketmq.v2.PullMessageResponse;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryOffsetRequest;
import apache.rocketmq.v2.QueryOffsetResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.UpdateOffsetRequest;
import apache.rocketmq.v2.UpdateOffsetResponse;
import com.google.common.util.concurrent.AbstractIdleService;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
Expand Down Expand Up @@ -147,6 +155,18 @@ public abstract RpcFuture<AckMessageRequest, AckMessageResponse> ackMessage(Endp
ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue(Endpoints endpoints,
ForwardMessageToDeadLetterQueueRequest request, Duration duration);

public abstract RpcFuture<PullMessageRequest, List<PullMessageResponse>> pullMessage(Endpoints endpoints,
PullMessageRequest request, Duration duration);

public abstract RpcFuture<UpdateOffsetRequest, UpdateOffsetResponse> updateOffset(Endpoints endpoints,
UpdateOffsetRequest request, Duration duration);

public abstract RpcFuture<GetOffsetRequest, GetOffsetResponse> getOffset(Endpoints endpoints,
GetOffsetRequest request, Duration duration);

public abstract RpcFuture<QueryOffsetRequest, QueryOffsetResponse> queryOffset(Endpoints endpoints,
QueryOffsetRequest request, Duration duration);

/**
* Submit transaction resolution asynchronously, the method ensures no throwable.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,27 @@
import apache.rocketmq.v2.EndTransactionResponse;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueRequest;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.GetOffsetRequest;
import apache.rocketmq.v2.GetOffsetResponse;
import apache.rocketmq.v2.HeartbeatRequest;
import apache.rocketmq.v2.HeartbeatResponse;
import apache.rocketmq.v2.NotifyClientTerminationRequest;
import apache.rocketmq.v2.NotifyClientTerminationResponse;
import apache.rocketmq.v2.PullMessageRequest;
import apache.rocketmq.v2.PullMessageResponse;
import apache.rocketmq.v2.QueryAssignmentRequest;
import apache.rocketmq.v2.QueryAssignmentResponse;
import apache.rocketmq.v2.QueryOffsetRequest;
import apache.rocketmq.v2.QueryOffsetResponse;
import apache.rocketmq.v2.QueryRouteRequest;
import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.TelemetryCommand;
import apache.rocketmq.v2.UpdateOffsetRequest;
import apache.rocketmq.v2.UpdateOffsetResponse;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Metadata;
Expand Down Expand Up @@ -283,8 +291,7 @@ public RpcFuture<AckMessageRequest, AckMessageResponse> ackMessage(Endpoints end

@Override
public RpcFuture<ChangeInvisibleDurationRequest, ChangeInvisibleDurationResponse>
changeInvisibleDuration(Endpoints endpoints, ChangeInvisibleDurationRequest request,
Duration duration) {
changeInvisibleDuration(Endpoints endpoints, ChangeInvisibleDurationRequest request, Duration duration) {
try {
final Metadata metadata = client.sign();
final Context context = new Context(endpoints, metadata);
Expand Down Expand Up @@ -313,6 +320,66 @@ public RpcFuture<AckMessageRequest, AckMessageResponse> ackMessage(Endpoints end
}
}

@Override
public RpcFuture<PullMessageRequest, List<PullMessageResponse>> pullMessage(Endpoints endpoints,
PullMessageRequest request, Duration duration) {
try {
final Metadata metadata = client.sign();
final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<List<PullMessageResponse>> future =
rpcClient.pullMessage(metadata, request, asyncWorker, duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
return new RpcFuture<>(t);
}
}

@Override
public RpcFuture<UpdateOffsetRequest, UpdateOffsetResponse> updateOffset(Endpoints endpoints,
UpdateOffsetRequest request, Duration duration) {
try {
final Metadata metadata = client.sign();
final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<UpdateOffsetResponse> future =
rpcClient.updateOffset(metadata, request, asyncWorker, duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
return new RpcFuture<>(t);
}
}

@Override
public RpcFuture<GetOffsetRequest, GetOffsetResponse> getOffset(Endpoints endpoints, GetOffsetRequest request,
Duration duration) {
try {
final Metadata metadata = client.sign();
final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<GetOffsetResponse> future =
rpcClient.getOffset(metadata, request, asyncWorker, duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
return new RpcFuture<>(t);
}
}

@Override
public RpcFuture<QueryOffsetRequest, QueryOffsetResponse> queryOffset(Endpoints endpoints,
QueryOffsetRequest request, Duration duration) {
try {
final Metadata metadata = client.sign();
final Context context = new Context(endpoints, metadata);
final RpcClient rpcClient = getRpcClient(endpoints);
final ListenableFuture<QueryOffsetResponse> future =
rpcClient.queryOffset(metadata, request, asyncWorker, duration);
return new RpcFuture<>(context, request, future);
} catch (Throwable t) {
return new RpcFuture<>(t);
}
}

@Override
public RpcFuture<EndTransactionRequest, EndTransactionResponse> endTransaction(Endpoints endpoints,
EndTransactionRequest request, Duration duration) {
Expand Down Expand Up @@ -395,9 +462,9 @@ protected void startUp() {
() -> {
try {
log.info("Start to log statistics, clientVersion={}, clientWrapperVersion={}, "
+ "clientEndpoints={}, os description=[{}], java description=[{}], clientId={}",
+ "clientEndpoints={}, os description=[{}], java environment=[{}], clientId={}",
MetadataUtils.getVersion(), MetadataUtils.getWrapperVersion(), client.getEndpoints(),
Utilities.getOsDescription(), Utilities.getJavaDescription(), clientId);
Utilities.getOsDescription(), Utilities.getJavaEnvironmentSummary(), clientId);
client.doStats();
} catch (Throwable t) {
log.error("Exception raised during statistics logging, clientId={}", clientId, t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
public enum ClientType {
PRODUCER,
PUSH_CONSUMER,
SIMPLE_CONSUMER;
SIMPLE_CONSUMER,
PULL_CONSUMER;

public apache.rocketmq.v2.ClientType toProtobuf() {
if (PRODUCER.equals(this)) {
Expand All @@ -32,6 +33,9 @@ public apache.rocketmq.v2.ClientType toProtobuf() {
if (SIMPLE_CONSUMER.equals(this)) {
return apache.rocketmq.v2.ClientType.SIMPLE_CONSUMER;
}
if (PULL_CONSUMER.equals(this)) {
return apache.rocketmq.v2.ClientType.PULL_CONSUMER;
}
return apache.rocketmq.v2.ClientType.CLIENT_TYPE_UNSPECIFIED;
}
}
Loading

0 comments on commit ecdf9e5

Please sign in to comment.