- 🌈 支持ACK机制
- 📦 支持异步通信
- 🛡 支持消费端线性扩展
- 🎨 支持消费者故障后由其他消费者认领
- 🌍 接口幂等性实现
- redis v5.0.0+
@SpringBootTest
public class ProducerTest {
@Autowired
Producer producer;
@Test
void produce() throws InterruptedException {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
producer.setClient(redisson);
Message msg = new Message();
Testbean test = new Testbean("test", 13);
msg.setTopic("t1");
try {
msg.setProperties(BeanMapUtils.toMap(test));
} catch (IntrospectionException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
producer.sendMessageAsync(msg);
while(true){
}
}
}
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
redisson = Redisson.create(config);
HiListener<Testbean> hiListener = new HiListener<>();
new PullConsumerClient.Builder()
.setRedissonClient(redisson)
.setService("service")
.build()
.<Testbean>subscribe("t1")
.registerListener(hiListener)
.registerListener(hiListener)
.<Testbean>subscribe("t2")
.registerListener(hiListener)
.start();
application.yml
af:
dolphinmq:
# 每次拉取数据的量
fetchMessageSize: 5
#检查consumer不活跃的门槛(单位秒)
pendingListIdleThreshold: 10
#每次拉取PendingList的大小
checkPendingListSize: 1000
#死信门槛(计次器次数)
deadLetterThreshold: 32
#认领门槛(单位毫秒)
claimThreshold: 3600
#是否从头开始订阅消息
isStartFromHead: true
#拉取信息的周期(单位秒)
pullHealthyMessagesPeriod: 1
#检查PendingList周期(单位秒)
checkPendingListsPeriod: 10
#超过了该长度stream前面部分会被持久化(非严格模式——MAXLEN~)
trimThreshold: 10000