Skip to content

Commit

Permalink
add mq
Browse files Browse the repository at this point in the history
  • Loading branch information
lymallor committed Mar 3, 2022
1 parent 51177d8 commit adae640
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 0 deletions.
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,51 @@ NewClient("amqp://user:pass@localhost").GetProducer().Producer(

}

```

### 🚀🚀 Concurrent Publisher

one connect more channel publisher,Increase throughput in production

```go
url := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", "admin", "123456", "10.1.2.7", 5672, "")
conn := connections.NewConnect().Open(url)
p := NewProducer(channels.NewChannel(conn.Connection))
job := make(chan string, 15)
//10 worker
for i := 0; i < 10; i++ {
go func(job <-chan string) {
exchangeName := "go-test"
routeKey := "go-test"
// new 10 mq channel
c := channels.NewChannel(conn.Connection)
for body := range job {
p.Producer(
msg.NewMessage(
msg.WithOptionsChannel(c),
msg.WithOptionsBody(body),
),
WithOptionsProducer(&ProducerOpt{
Exchange: exchangeName,
ExchangeType: lib.Topic,
RouteKey: routeKey,
Mandatory: true,
ResendNum: 2,
}),
WithOptionsProducerCallBack(&CallBack{Fnc: func(ret msg.Ret) {
log.Printf("call back %+v", ret)
}}),
)
}

}(job)
}

for i := 0; i < 15; i++ {
job <- fmt.Sprintf("this is chan %d", i)
}
close(job)
<-time.After(30 * time.Second)
```


Expand Down
45 changes: 45 additions & 0 deletions producers/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ysk229/go-rabbitmq/msg"
"log"
"testing"
"time"
)

func TestProducer(t *testing.T) {
Expand Down Expand Up @@ -49,3 +50,47 @@ func TestProducer(t *testing.T) {
}
//select {}
}

func TestProducerChannel(t *testing.T) {
log.SetFlags(log.LstdFlags | log.Lshortfile)
//new client mq
url := fmt.Sprintf("amqp://%s:%s@%s:%d/%s", "admin", "123456", "10.1.2.7", 5672, "")
conn := connections.NewConnect().Open(url)
p := NewProducer(channels.NewChannel(conn.Connection))
job := make(chan string, 15)
//10 worker
for i := 0; i < 10; i++ {
go func(job <-chan string) {
exchangeName := "go-test"
routeKey := "go-test"
// new 10 mq channel
c := channels.NewChannel(conn.Connection)
for body := range job {
p.Producer(
msg.NewMessage(
msg.WithOptionsChannel(c),
msg.WithOptionsBody(body),
),
WithOptionsProducer(&ProducerOpt{
Exchange: exchangeName,
ExchangeType: lib.Topic,
RouteKey: routeKey,
Mandatory: true,
ResendNum: 2,
}),
WithOptionsProducerCallBack(&CallBack{Fnc: func(ret msg.Ret) {
log.Printf("call back %+v", ret)

}}),
)
}

}(job)
}

for i := 0; i < 15; i++ {
job <- fmt.Sprintf("this is chan %d", i)
}
close(job)
<-time.After(30 * time.Second)
}

0 comments on commit adae640

Please sign in to comment.