diff --git a/README.md b/README.md index 95a803c..9922cf2 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,51 @@ NewClient("amqp://user:pass@localhost").GetProducer().Producer( } +``` + +### 🚀🚀 Concurrent Publisher +one connect more channel publisher,Increase throughput in production + +```go + url := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", "admin", "123456", "10.1.2.7", 5672, "") + conn := connections.NewConnect().Open(url) + p := NewProducer(channels.NewChannel(conn.Connection)) + job := make(chan string, 15) + //10 worker + for i := 0; i < 10; i++ { + go func(job <-chan string) { + exchangeName := "go-test" + routeKey := "go-test" + // new 10 mq channel + c := channels.NewChannel(conn.Connection) + for body := range job { + p.Producer( + msg.NewMessage( + msg.WithOptionsChannel(c), + msg.WithOptionsBody(body), + ), + WithOptionsProducer(&ProducerOpt{ + Exchange: exchangeName, + ExchangeType: lib.Topic, + RouteKey: routeKey, + Mandatory: true, + ResendNum: 2, + }), + WithOptionsProducerCallBack(&CallBack{Fnc: func(ret msg.Ret) { + log.Printf("call back %+v", ret) + }}), + ) + } + + }(job) + } + + for i := 0; i < 15; i++ { + job <- fmt.Sprintf("this is chan %d", i) + } + close(job) + <-time.After(30 * time.Second) ``` diff --git a/producers/producer_test.go b/producers/producer_test.go index 752ff83..8291143 100644 --- a/producers/producer_test.go +++ b/producers/producer_test.go @@ -8,6 +8,7 @@ import ( "github.com/ysk229/go-rabbitmq/msg" "log" "testing" + "time" ) func TestProducer(t *testing.T) { @@ -49,3 +50,47 @@ func TestProducer(t *testing.T) { } //select {} } + +func TestProducerChannel(t *testing.T) { + log.SetFlags(log.LstdFlags | log.Lshortfile) + //new client mq + url := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", "admin", "123456", "10.1.2.7", 5672, "") + conn := connections.NewConnect().Open(url) + p := NewProducer(channels.NewChannel(conn.Connection)) + job := make(chan string, 15) + //10 worker + for i := 0; i < 10; i++ { + go func(job <-chan string) { + exchangeName := "go-test" + routeKey := "go-test" + // new 10 mq channel + c := channels.NewChannel(conn.Connection) + for body := range job { + p.Producer( + msg.NewMessage( + msg.WithOptionsChannel(c), + msg.WithOptionsBody(body), + ), + WithOptionsProducer(&ProducerOpt{ + Exchange: exchangeName, + ExchangeType: lib.Topic, + RouteKey: routeKey, + Mandatory: true, + ResendNum: 2, + }), + WithOptionsProducerCallBack(&CallBack{Fnc: func(ret msg.Ret) { + log.Printf("call back %+v", ret) + + }}), + ) + } + + }(job) + } + + for i := 0; i < 15; i++ { + job <- fmt.Sprintf("this is chan %d", i) + } + close(job) + <-time.After(30 * time.Second) +}