From 20f53e1acfb4ca6f7c28fb8634f10c06e18a551a Mon Sep 17 00:00:00 2001 From: Aaron Ai Date: Fri, 13 Jan 2023 10:26:32 +0800 Subject: [PATCH] Add pull API --- .../client/apis/consumer/PullConsumer.java | 63 ++++++++++++++ .../apis/consumer/PullConsumerBuilder.java | 82 +++++++++++++++++++ .../apis/consumer/PushConsumerBuilder.java | 2 +- .../TopicMessageQueueChangeListener.java | 32 ++++++++ .../client/apis/message/MessageQueue.java | 24 ++++++ 5 files changed, 202 insertions(+), 1 deletion(-) create mode 100644 java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumer.java create mode 100644 java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumerBuilder.java create mode 100644 java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/TopicMessageQueueChangeListener.java create mode 100644 java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageQueue.java diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumer.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumer.java new file mode 100644 index 000000000..774f19f1d --- /dev/null +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumer.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.apis.consumer; + +import java.io.Closeable; +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.apis.message.MessageQueue; +import org.apache.rocketmq.client.apis.message.MessageView; + +public interface PullConsumer extends Closeable { + String getConsumerGroup(); + + void registerMessageQueueChangeListenerByTopic(String topic, TopicMessageQueueChangeListener listener); + + Collection fetchMessageQueues(String topic) throws ClientException; + + PullConsumer subscribe(String topic, FilterExpression filterExpression) throws ClientException; + + PullConsumer unsubscribe(String topic); + + void assign(Collection messageQueues); + + List poll(Duration timeout); + + void seek(MessageQueue messageQueue, long offset); + + void pause(Collection messageQueues); + + void resume(Collection messageQueues); + + Optional offsetForTimestamp(MessageQueue messageQueue, Long timestamp); + + Optional committed(MessageQueue messageQueue); + + void commit(); + + void seekToBegin(MessageQueue messageQueue) throws ClientException; + + void seekToEnd(MessageQueue messageQueue) throws ClientException; + + @Override + void close() throws IOException; +} diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumerBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumerBuilder.java new file mode 100644 index 000000000..79b8c4b19 --- /dev/null +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumerBuilder.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.apis.consumer; + +import java.time.Duration; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientException; + +public interface PullConsumerBuilder { + /** + * Set the client configuration for the consumer. + * + * @param clientConfiguration client's configuration. + * @return the consumer builder instance. + */ + PullConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration); + + /** + * Set the load balancing group for the consumer. + * + * @param consumerGroup consumer load balancing group. + * @return the consumer builder instance. + */ + PullConsumerBuilder setConsumerGroup(String consumerGroup); + + /** + * Automate the consumer's offset commit. + * + * @return the consumer builder instance. + */ + PullConsumerBuilder enableAutoCommit(boolean enable); + + /** + * Set the consumer's offset commit interval if auto commit is enabled. + * + * @param duration offset commit interval + * @return the consumer builder instance. + */ + PullConsumerBuilder setAutoCommitInterval(Duration duration); + + /** + * Set the maximum number of messages cached locally. + * + * @param count message count. + * @return the consumer builder instance. + */ + PullConsumerBuilder setMaxCacheMessageCountEachQueue(int count); + + /** + * Set the maximum bytes of messages cached locally. + * + * @param bytes message size. + * @return the consumer builder instance. + */ + PullConsumerBuilder setMaxCacheMessageSizeInBytesEachQueue(int bytes); + + /** + * Finalize the build of {@link PullConsumer} and start. + * + *

This method will block until the pull consumer starts successfully. + * + *

Especially, if this method is invoked more than once, different pull consumer will be created and started. + * + * @return the pull consumer instance. + */ + PullConsumer build() throws ClientException; +} diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java index e971d9457..53f6c637e 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java @@ -86,7 +86,7 @@ public interface PushConsumerBuilder { * *

This method will block until the push consumer starts successfully. * - *

Especially, if this method is invoked more than once, different push consumers will be created and started. + *

Especially, if this method is invoked more than once, different push consumer will be created and started. * * @return the push consumer instance. */ diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/TopicMessageQueueChangeListener.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/TopicMessageQueueChangeListener.java new file mode 100644 index 000000000..467206728 --- /dev/null +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/TopicMessageQueueChangeListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.apis.consumer; + +import java.util.Set; +import org.apache.rocketmq.client.apis.message.MessageQueue; + +public interface TopicMessageQueueChangeListener { + /** + * This method will be invoked in the condition of queue numbers changed, These scenarios occur when the topic is + * expanded or shrunk. + * + * @param topic the topic to listen. + * @param messageQueues + */ + void onChanged(String topic, Set messageQueues); +} diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageQueue.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageQueue.java new file mode 100644 index 000000000..8d03f9467 --- /dev/null +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageQueue.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.apis.message; + +public interface MessageQueue { + String getTopic(); + + String getId(); +}