Skip to content

Commit

Permalink
Add the method of graceful exit of consumption and the function of co…
Browse files Browse the repository at this point in the history
…nsumer retrying 3 times by default
  • Loading branch information
lymallor committed Jul 14, 2022
1 parent 1de66db commit 9970285
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 22 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ mq.GetConsumer().Consumer(
),
consumers.WithOptionsConsumerCallBack(
&consumers.CallBack{
Fnc: func(delivery consumers.Delivery) {
Fnc: func(delivery consumers.Delivery) error {
log.Printf("%+v", delivery)
return nil
},
},
),
Expand Down Expand Up @@ -97,7 +98,7 @@ NewClient("amqp://user:pass@localhost").GetProducer().Producer(

### 🚀🚀 Concurrent Consumer

one connect more channel Consumer,Increase throughput in production
one connect more channel Consumer,Increase throughput in Consumer

```go
url := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", "admin", "123456", "127.0.0.1", 5672, "")
Expand All @@ -117,14 +118,15 @@ one connect more channel Consumer,Increase throughput in production
&ConsumerOpt{QueueName: q, RoutingKey: routeKey, Exchange: exchangeName, ExchangeType: lib.Topic},
),
WithOptionsConsumerCallBack(
&CallBack{Fnc: func(delivery Delivery) {
&CallBack{Fnc: func(delivery Delivery) error{
time.Sleep(3 * time.Second)
if delivery.DeliveryTag == 1 {
_ = delivery.Ack(false)
} else {
_ = delivery.Nack(false, false)
}
log.Printf("%+v", delivery)
return nil
},
},
),
Expand Down
3 changes: 2 additions & 1 deletion _examples/concurrent/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ func main() {
&consumers.ConsumerOpt{QueueName: *queue, RoutingKey: *bindingKey, Exchange: *exchange, ExchangeType: lib.Topic},
),
consumers.WithOptionsConsumerCallBack(
&consumers.CallBack{Fnc: func(delivery consumers.Delivery) {
&consumers.CallBack{Fnc: func(delivery consumers.Delivery) error {
time.Sleep(3 * time.Second)
if delivery.DeliveryTag == 1 {
_ = delivery.Ack(false)
} else {
_ = delivery.Nack(false, false)
}
log.Printf("%+v", delivery)
return nil
},
},
),
Expand Down
3 changes: 2 additions & 1 deletion _examples/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ func main() {
&consumers.ConsumerOpt{QueueName: *queue, RoutingKey: *bindingKey, Exchange: *exchange, ExchangeType: lib.Topic},
),
consumers.WithOptionsConsumerCallBack(
&consumers.CallBack{Fnc: func(delivery consumers.Delivery) {
&consumers.CallBack{Fnc: func(delivery consumers.Delivery) error {
time.Sleep(3 * time.Second)
if delivery.DeliveryTag == 1 {
_ = delivery.Ack(false)
} else {
_ = delivery.Nack(false, false)
}
log.Printf("%+v", delivery)
return nil
},
},
),
Expand Down
3 changes: 2 additions & 1 deletion _examples/log/consumer/receive_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ func main() {
&consumers.ConsumerOpt{QueueName: "", RoutingKey: "", Exchange: "logs", ExchangeType: lib.Fanout},
),
consumers.WithOptionsConsumerCallBack(
&consumers.CallBack{Fnc: func(delivery consumers.Delivery) {
&consumers.CallBack{Fnc: func(delivery consumers.Delivery) error {
time.Sleep(3 * time.Second)
if delivery.DeliveryTag == 1 {
_ = delivery.Ack(false)
} else {
_ = delivery.Nack(false, false)
}
log.Printf("%+v", delivery)
return nil
},
},
),
Expand Down
3 changes: 2 additions & 1 deletion _examples/log/consumer/receive_logs_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ func main() {
&consumers.ConsumerOpt{QueueName: "", RoutingKey: os.Args[1:][0], Exchange: "logs_direct", ExchangeType: lib.Direct},
),
consumers.WithOptionsConsumerCallBack(
&consumers.CallBack{Fnc: func(delivery consumers.Delivery) {
&consumers.CallBack{Fnc: func(delivery consumers.Delivery) error {
time.Sleep(3 * time.Second)
if delivery.DeliveryTag == 1 {
_ = delivery.Ack(false)
} else {
_ = delivery.Nack(false, false)
}
log.Printf("%+v", delivery)
return nil
},
},
),
Expand Down
3 changes: 2 additions & 1 deletion _examples/log/consumer/receive_logs_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ func main() {
&consumers.ConsumerOpt{QueueName: "", RoutingKey: os.Args[1:][0], Exchange: "logs_topic", ExchangeType: lib.Topic},
),
consumers.WithOptionsConsumerCallBack(
&consumers.CallBack{Fnc: func(delivery consumers.Delivery) {
&consumers.CallBack{Fnc: func(delivery consumers.Delivery) error {
time.Sleep(3 * time.Second)
if delivery.DeliveryTag == 1 {
_ = delivery.Ack(false)
} else {
_ = delivery.Nack(false, false)
}
log.Printf("%+v", delivery)
return nil
},
},
),
Expand Down
80 changes: 70 additions & 10 deletions consumers/consumer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consumers

import (
"context"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/ysk229/go-rabbitmq/bindings"
"github.com/ysk229/go-rabbitmq/channels"
Expand All @@ -9,6 +10,10 @@ import (
"github.com/ysk229/go-rabbitmq/options"
"github.com/ysk229/go-rabbitmq/queues"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
)

Expand All @@ -33,14 +38,15 @@ type ConsumerOpt struct {
NoWait bool
Args lib.Table

ResendDelay time.Duration //消息发送失败后,多久秒重发,默认是3s
ResendNum int //消息重发次数
ReReceiveDelay time.Duration //After the message consumption failure, how long seconds to resend,default is 3s
ReReceiveNum int //Number of message reconsumption,default is 3 num ,-1 to disable the retry mechanism
GracefulDelay time.Duration //default is 10s

}

// CallBack call back consumer
type CallBack struct {
Fnc func(Delivery)
Fnc func(Delivery) error
}

// Delivery consumer result data
Expand All @@ -54,22 +60,30 @@ type ConsumerOption func(*Consumer)
// Consumer consumer
type Consumer struct {
*channels.Channel
opt *ConsumerOpt
cb *CallBack
opt *ConsumerOpt
cb *CallBack
wg *sync.WaitGroup
close bool
}

// NewConsumer New Consumer
func NewConsumer(ch *channels.Channel) *Consumer {
c := &Consumer{Channel: ch}
c := &Consumer{Channel: ch, wg: &sync.WaitGroup{}, opt: &ConsumerOpt{}}
return c
}

// WithOptionsConsumer With Options Consumer
func WithOptionsConsumer(opt *ConsumerOpt) ConsumerOption {
return func(c *Consumer) {
c.opt = opt
if c.opt.ResendDelay == 0 {
c.opt.ResendDelay = 3
if c.opt.ReReceiveDelay == 0 {
c.opt.ReReceiveDelay = 3
}
if c.opt.ReReceiveNum == 0 {
c.opt.ReReceiveNum = 3
}
if c.opt.GracefulDelay == 0 {
c.opt.GracefulDelay = 10
}
c.init()
}
Expand All @@ -87,6 +101,7 @@ func (c *Consumer) Consumer(ch *channels.Channel, opts ...ConsumerOption) {
for _, opt := range opts {
opt(c)
}
c.wg.Add(1)
c.subscribe(ch)
}

Expand All @@ -107,6 +122,7 @@ func (c *Consumer) subscribe(ch *channels.Channel) {
return
}
//
defer c.wg.Done()
log.Printf("subscribed...")
for d := range deliveries {
log.Printf(
Expand All @@ -115,11 +131,30 @@ func (c *Consumer) subscribe(ch *channels.Channel) {
d.DeliveryTag,
d.Body,
)
if c.close {
log.Printf("handle: deliveries channel closed")
// No new messages are processed after shutdown, and the message queue is notified to re-deliver the currently received messages
_ = d.Nack(true, true)
break
}

if c.cb != nil {
c.cb.Fnc(Delivery{d})
err = c.cb.Fnc(Delivery{d})
if err != nil && c.opt.ReReceiveNum > 1 {
for i := 1; i < c.opt.ReReceiveNum; i++ {
err := c.cb.Fnc(Delivery{d})
if err == nil {
break
}
if i == c.opt.ReReceiveNum-1 {
_ = d.Nack(false, true)
}
time.Sleep(c.opt.ReReceiveDelay)
}
}
}
}
log.Printf("handle: deliveries channel closed")

}

func (c *Consumer) init() {
Expand All @@ -140,3 +175,28 @@ func (c *Consumer) init() {
log.Println(err)
}
}

func (c *Consumer) GracefulShutdown() {
if c.opt.GracefulDelay == 0 {
c.opt.GracefulDelay = 10
}
sc := make(chan os.Signal, 1)
signal.Notify(sc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
s := <-sc
log.Println("Got signal:", s)
c.close = true
done := make(chan struct{})
go func() {
c.wg.Wait()
done <- struct{}{}
}()
ctx, cancel := context.WithTimeout(context.Background(), c.opt.GracefulDelay*time.Second)
defer cancel()
select {
case <-ctx.Done():
_ = c.GetChannel().Close()
log.Fatalf("RabbitMQ consumer did not shut down after %d seconds \n", c.opt.GracefulDelay)
case <-done:
_ = c.GetChannel().Close()
}
}
81 changes: 79 additions & 2 deletions consumers/consumer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consumers

import (
"errors"
"fmt"
"github.com/ysk229/go-rabbitmq/channels"
"github.com/ysk229/go-rabbitmq/connections"
Expand Down Expand Up @@ -28,7 +29,7 @@ func TestConsumer(t *testing.T) {
&ConsumerOpt{QueueName: q, RoutingKey: routeKey, Exchange: exchangeName, ExchangeType: lib.Topic},
),
WithOptionsConsumerCallBack(
&CallBack{Fnc: func(delivery Delivery) {
&CallBack{Fnc: func(delivery Delivery) error {

time.Sleep(3 * time.Second)
if delivery.DeliveryTag == 1 {
Expand All @@ -37,6 +38,7 @@ func TestConsumer(t *testing.T) {
_ = delivery.Nack(false, false)
}
log.Printf("%+v", delivery)
return nil
},
},
),
Expand All @@ -46,6 +48,80 @@ func TestConsumer(t *testing.T) {
log.Printf("running for %s", "10s")
time.Sleep(10 * time.Second)
}
func TestRetryConsumer(t *testing.T) {
log.SetFlags(log.LstdFlags | log.Lshortfile)
//new client mq
url := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", "admin", "123456", "127.0.0.1", 5672, "")
conn := connections.NewConnect().Open(url)
//new mq channel
channelClient := channels.NewChannel(conn.Connection)
exchangeName := "go-test"
routeKey := "go-test"
q := "go-test"
//i := 0
go func() {
NewConsumer(channelClient).Consumer(
channelClient,
WithOptionsConsumer(
&ConsumerOpt{QueueName: q, RoutingKey: routeKey, Exchange: exchangeName, ExchangeType: lib.Topic},
),
WithOptionsConsumerCallBack(
&CallBack{Fnc: func(delivery Delivery) error {

time.Sleep(3 * time.Second)
if delivery.DeliveryTag == 1 {
_ = delivery.Ack(false)
} else {
_ = delivery.Nack(false, false)
}
log.Printf("%+v", delivery)
return errors.New("error is nil then you can't retry, retry three times by default")
},
},
),
)
}()

log.Printf("running for %s", "10s")
time.Sleep(10 * time.Second)
}
func TestGracefulShutdownConsumer(t *testing.T) {
log.SetFlags(log.LstdFlags | log.Lshortfile)
//new client mq
url := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", "admin", "123456", "127.0.0.1", 5672, "")
conn := connections.NewConnect().Open(url)
//new mq channel
channelClient := channels.NewChannel(conn.Connection)
exchangeName := "go-test"
routeKey := "go-test"
q := "go-test"
c := NewConsumer(channelClient)
//Graceful exit
go func() {
c.Consumer(
channelClient,
WithOptionsConsumer(
&ConsumerOpt{QueueName: q, RoutingKey: routeKey, Exchange: exchangeName, ExchangeType: lib.Topic},
),
WithOptionsConsumerCallBack(
&CallBack{Fnc: func(delivery Delivery) error {

time.Sleep(3 * time.Second)
if delivery.DeliveryTag == 1 {
_ = delivery.Ack(false)
} else {
_ = delivery.Nack(false, false)
}
log.Printf("%+v", delivery)
return nil
},
},
),
)
}()

c.GracefulShutdown()
}
func TestConcurrentConsumer(t *testing.T) {
log.SetFlags(log.LstdFlags | log.Lshortfile)
//new client mq
Expand All @@ -66,7 +142,7 @@ func TestConcurrentConsumer(t *testing.T) {
&ConsumerOpt{QueueName: q, RoutingKey: routeKey, Exchange: exchangeName, ExchangeType: lib.Topic},
),
WithOptionsConsumerCallBack(
&CallBack{Fnc: func(delivery Delivery) {
&CallBack{Fnc: func(delivery Delivery) error {

time.Sleep(3 * time.Second)
if delivery.DeliveryTag == 1 {
Expand All @@ -75,6 +151,7 @@ func TestConcurrentConsumer(t *testing.T) {
_ = delivery.Nack(false, false)
}
log.Printf("%+v", delivery)
return nil
},
},
),
Expand Down
Loading

0 comments on commit 9970285

Please sign in to comment.