Skip to content

Commit

Permalink
add mq
Browse files Browse the repository at this point in the history
  • Loading branch information
lymallor committed Mar 2, 2022
1 parent 72fa42b commit 1cea283
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 28 deletions.
15 changes: 7 additions & 8 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@ func NewClient(url string) *Client {
func (c *Client) ReConnection(url string) {
for {
if c.GetConnect().IsClosed() || c.GetChannel().IsClosed() {
select {
case <-c.ReConnectionChan():
log.Print("reconnecting ...")
c.Connect = connections.NewConnect().Open(url)
c.Channel = channels.NewChannel(c.Connection)
c.ReChannels()
log.Println("Connect is closed ", c.GetConnect().IsClosed(), ",Channel is closed ", c.GetChannel().IsClosed())
}
<-c.ReConnectionChan()
log.Print("reconnecting ...")
c.Connect = connections.NewConnect().Open(url)
c.Channel = channels.NewChannel(c.Connection)
c.ReChannels()
log.Println("Connect is closed ", c.GetConnect().IsClosed(), ",Channel is closed ", c.GetChannel().IsClosed())

}
}
}
Expand Down
1 change: 0 additions & 1 deletion connections/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ type Connect struct {
ConnNotifyClose chan *amqp.Error
//通道异常接收
ChNotifyClose chan *amqp.Error
isConnected bool
}

// NewConnect new connect
Expand Down
36 changes: 17 additions & 19 deletions producers/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,27 +86,25 @@ LOOKUP:
break LOOKUP
}
p.publish(m)
select {
case cf := <-ch.GetConfirmChan():
if !cf.Ack {
log.Println("exchange error ", body)
exchangeNum++
if exchangeNum == p.opt.ResendNum+1 {
log.Println("exchange fail data", body)
cf := <-ch.GetConfirmChan()
if !cf.Ack {
log.Println("exchange error ", body)
exchangeNum++
if exchangeNum == p.opt.ResendNum+1 {
log.Println("exchange fail data", body)
Success = false
}
} else {
select {
case data := <-ch.GetReturnChan():
log.Println("queue error data", body, ",ReplyCode ", data.ReplyCode, ",retry num", queueNum, ",return data ", string(data.Body))
queueNum++
if queueNum == p.opt.ResendNum+1 {
log.Println("queue fail data", body)
Success = false
}
} else {
select {
case data := <-ch.GetReturnChan():
log.Println("queue error data", body, ",ReplyCode ", data.ReplyCode, ",retry num", queueNum, ",return data ", string(data.Body))
queueNum++
if queueNum == p.opt.ResendNum+1 {
log.Println("queue fail data", body)
Success = false
}
default:
break LOOKUP
}
default:
break LOOKUP
}
}

Expand Down

0 comments on commit 1cea283

Please sign in to comment.