-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathbatch_send.rs
45 lines (38 loc) · 1.24 KB
/
batch_send.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
use rabbitmq_stream_client::{
types::{ByteCapacity, Message},
Environment,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let environment = Environment::builder()
.host("localhost")
.port(5552)
.build()
.await?;
println!("creating batch_send stream");
let _ = environment.delete_stream("batch_send").await;
let messages_to_batch = 100;
let iterations = 10000;
environment
.stream_creator()
.max_length(ByteCapacity::GB(2))
.create("batch_send")
.await?;
let producer = environment.producer().build("batch_send").await?;
for _ in 0..iterations {
println!("accumulating messages in buffer");
let mut messages = Vec::with_capacity(messages_to_batch);
for i in 0..messages_to_batch {
let msg = Message::builder().body(format!("message{}", i)).build();
messages.push(msg);
}
println!("sending in batch mode");
producer
.batch_send(messages, |confirmation_status| async move {
println!("Message confirmed with status {:?}", confirmation_status);
})
.await?;
}
producer.close().await?;
Ok(())
}