Skip to content

Commit

Permalink
非自动提交 ,手工操作
Browse files Browse the repository at this point in the history
  • Loading branch information
makemyownlife committed Apr 16, 2024
1 parent 8bd32de commit 6f789b7
Showing 1 changed file with 41 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.apache.rocketmq.client.consumer;

import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Test;

import java.util.List;
Expand Down Expand Up @@ -33,4 +36,42 @@ public void testAutoCommit() throws Exception {
}
}

@Test
public void testNoAutoCommit() throws MQClientException {
boolean running = true;
// 定义消费者组 mygroup
DefaultLitePullConsumer litePullConsumer = new
DefaultLitePullConsumer("mygroup");
// 设置名字服务地址
litePullConsumer.setNamesrvAddr("127.0.0.1:9876");
// 从最新的进度偏移量
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 订阅主题 TopicTest
litePullConsumer.subscribe("TopicTest", "*");
// 自动提交消费偏移量的选项设置为 false
litePullConsumer.setAutoCommit(false);
litePullConsumer.start();
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
if (CollectionUtils.isNotEmpty(messageExts)) {
// 取出第一条消息数据
MessageExt first = messageExts.get(0);
// 打印消息内存
System.out.println(new String(first.getBody()));
MessageQueue messageQueue = new MessageQueue(first.getTopic(), first.getBrokerName(), first.getQueueId());
// 回滚到第一条消息的点位
litePullConsumer.seek(messageQueue, first.getQueueOffset());
}
Thread.sleep(5000);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
litePullConsumer.shutdown();
}


}

}

0 comments on commit 6f789b7

Please sign in to comment.