Skip to content

Commit

Permalink
Refactor unsubscribe method
Browse files Browse the repository at this point in the history
Signed-off-by: 0x6f736f646f <[email protected]>
  • Loading branch information
rodneyosodo committed May 26, 2022
1 parent 03bc4a4 commit 511261f
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions pkg/messaging/rabbitmq/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,18 @@ func (ps *pubsub) Subscribe(id, topic string, handler messaging.MessageHandler)
}
go ps.handle(msgs, handler)
s[id] = subscription{
cancel: handler.Cancel,
cancel: func() error {
if err := ps.ch.Cancel(id, false); err != nil {
return err
}
return handler.Cancel()
},
}

return nil
}

func (ps *pubsub) Unsubscribe(id, topic string) error {
defer ps.ch.Cancel(id, false)
if id == "" {
return ErrEmptyID
}
Expand All @@ -134,14 +138,14 @@ func (ps *pubsub) Unsubscribe(id, topic string) error {
if !ok {
return ErrNotSubscribed
}
if err := ps.ch.QueueUnbind(topic, topic, exchangeName, nil); err != nil {
return err
}
if current.cancel != nil {
if err := current.cancel(); err != nil {
return err
}
}
if err := ps.ch.QueueUnbind(topic, topic, exchangeName, nil); err != nil {
return err
}

delete(s, id)
if len(s) == 0 {
Expand Down

0 comments on commit 511261f

Please sign in to comment.