diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumer.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumer.java new file mode 100644 index 000000000..f4a70c6f9 --- /dev/null +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumer.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.apis.consumer; + +import java.io.Closeable; +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import org.apache.rocketmq.client.apis.ClientException; +import org.apache.rocketmq.client.apis.message.MessageQueue; +import org.apache.rocketmq.client.apis.message.MessageView; + +public interface PullConsumer extends Closeable { + /** + * Get the consumer group of the consumer. + */ + String getConsumerGroup(); + + /** + * @param topic the topic that needs to be monitored. + * @param listener the callback to detect the message queue changes. + */ + void registerMessageQueueChangeListenerByTopic(String topic, TopicMessageQueueChangeListener listener); + + /** + * Fetch message queues of the topic. + */ + Collection fetchMessageQueues(String topic) throws ClientException; + + /** + * Manually assign a list of message queues to this consumer. + * + *

This interface does not allow for incremental assignment and will replace the previous assignment (if + * previous assignment existed). + * + * @param messageQueues the list of message queues that are to be assigned to this consumer. + */ + void assign(Collection messageQueues); + + /** + * Fetch messages from assigned message queues specified by {@link #assign(Collection)}. + * + *

The messages polled from remote are across the message queue. + * + * @param timeout the maximum time to block. + * @return list of fetched messages. + */ + List poll(Duration timeout); + + /** + * Overrides the fetch offsets that the consumer will use on the next poll. If this method is invoked for the same + * message queue more than once, the latest offset will be used on the next {@link #poll(Duration)}. + * + * @param messageQueue the message queue to override the fetch offset. + * @param offset message offset. + */ + void seek(MessageQueue messageQueue, long offset); + + /** + * Suspending message pulling from the message queues. + * + * @param messageQueues message queues that need to be suspended. + */ + void pause(Collection messageQueues); + + /** + * Resuming message pulling from the message queues. + * + * @param messageQueues message queues that need to be resumed. + */ + void resume(Collection messageQueues); + + /** + * Look up the offsets for the given message queue by timestamp. The returned offset for each message queue is the + * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding message + * queue. + * + * @param messageQueue message queue that needs to be looked up. + * @param timestamp the timestamp for which to search. + * @return the offset of the message queue, or {@link Optional#empty()} if there is no message. + */ + Optional offsetForTimestamp(MessageQueue messageQueue, Long timestamp); + + /** + * Get the latest committed offset for the given message queue. + * + * @return the latest committed offset, or {@link Optional#empty()} if there was no prior commit. + */ + Optional committed(MessageQueue messageQueue); + + /** + * Commit offset manually. + */ + void commit() throws ClientException; + + /** + * Overrides the fetch offsets with the beginning offset that the consumer will use on the next poll. If this + * method is invoked for the same message queue more than once, the latest offset will be used on the next + * {@link #poll(Duration)}. + * + * @param messageQueue the message queue to seek. + */ + void seekToBegin(MessageQueue messageQueue) throws ClientException; + + /** + * Overrides the fetch offsets with the end offset that the consumer will use on the next poll. If this method is + * invoked for the same message queue more than once, the latest offset will be used on the next + * {@link #poll(Duration)}. + * + * @param messageQueue the message queue to seek. + */ + void seekToEnd(MessageQueue messageQueue) throws ClientException; + + /** + * Close the pull consumer and release all related resources. + */ + @Override + void close() throws IOException; +} diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumerBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumerBuilder.java new file mode 100644 index 000000000..79b8c4b19 --- /dev/null +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PullConsumerBuilder.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.apis.consumer; + +import java.time.Duration; +import org.apache.rocketmq.client.apis.ClientConfiguration; +import org.apache.rocketmq.client.apis.ClientException; + +public interface PullConsumerBuilder { + /** + * Set the client configuration for the consumer. + * + * @param clientConfiguration client's configuration. + * @return the consumer builder instance. + */ + PullConsumerBuilder setClientConfiguration(ClientConfiguration clientConfiguration); + + /** + * Set the load balancing group for the consumer. + * + * @param consumerGroup consumer load balancing group. + * @return the consumer builder instance. + */ + PullConsumerBuilder setConsumerGroup(String consumerGroup); + + /** + * Automate the consumer's offset commit. + * + * @return the consumer builder instance. + */ + PullConsumerBuilder enableAutoCommit(boolean enable); + + /** + * Set the consumer's offset commit interval if auto commit is enabled. + * + * @param duration offset commit interval + * @return the consumer builder instance. + */ + PullConsumerBuilder setAutoCommitInterval(Duration duration); + + /** + * Set the maximum number of messages cached locally. + * + * @param count message count. + * @return the consumer builder instance. + */ + PullConsumerBuilder setMaxCacheMessageCountEachQueue(int count); + + /** + * Set the maximum bytes of messages cached locally. + * + * @param bytes message size. + * @return the consumer builder instance. + */ + PullConsumerBuilder setMaxCacheMessageSizeInBytesEachQueue(int bytes); + + /** + * Finalize the build of {@link PullConsumer} and start. + * + *

This method will block until the pull consumer starts successfully. + * + *

Especially, if this method is invoked more than once, different pull consumer will be created and started. + * + * @return the pull consumer instance. + */ + PullConsumer build() throws ClientException; +} diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java index d123e7511..8212c6737 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumer.java @@ -93,7 +93,7 @@ public interface PushConsumer extends Closeable { * *

Nothing occurs if the specified topic does not exist in subscription expressions of the push consumer. * - * @param topic the topic to remove the subscription. + * @param topic the topic to remove from the subscription. * @return push consumer instance. */ PushConsumer unsubscribe(String topic) throws ClientException; diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java index e971d9457..53f6c637e 100644 --- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/PushConsumerBuilder.java @@ -86,7 +86,7 @@ public interface PushConsumerBuilder { * *

This method will block until the push consumer starts successfully. * - *

Especially, if this method is invoked more than once, different push consumers will be created and started. + *

Especially, if this method is invoked more than once, different push consumer will be created and started. * * @return the push consumer instance. */ diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/TopicMessageQueueChangeListener.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/TopicMessageQueueChangeListener.java new file mode 100644 index 000000000..5dbea0654 --- /dev/null +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/consumer/TopicMessageQueueChangeListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.apis.consumer; + +import java.util.Set; +import org.apache.rocketmq.client.apis.message.MessageQueue; + +public interface TopicMessageQueueChangeListener { + /** + * This method will be invoked in the condition of queue numbers changed, These scenarios occur when the topic is + * expanded or shrunk. + * + * @param topic the topic to listen. + * @param messageQueues latest message queues of the topic. + */ + void onChanged(String topic, Set messageQueues); +} diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageQueue.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageQueue.java new file mode 100644 index 000000000..429f590ff --- /dev/null +++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/message/MessageQueue.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.apis.message; + +public interface MessageQueue { + /** + * Topic of the current message queue. + */ + String getTopic(); + + /** + * Get the identifier of the current message queue. + */ + String getId(); +}