diff --git a/CHANGELOG.md b/CHANGELOG.md index 84f8df3..b055b06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# 1.1.2 + +- Allow sending messages with a TTL via `PublishWithOptions` by adding a new `TTL` property in `PublishingOptions` ([PR](https://github.com/KardinalAI/gorabbit/pull/19)). + # 1.1.1 - Minor fix for correct usage of the `ConnectionName` parameter, and the possibility to declare it via environment variables ([PR](https://github.com/KardinalAI/gorabbit/pull/18)). diff --git a/README.md b/README.md index 3303589..08d9afd 100644 --- a/README.md +++ b/README.md @@ -219,12 +219,13 @@ type foo struct { err := client.Publish("events_exchange", "event.foo.bar.created", foo{Action: "bar"}) ``` -Optionally, you can set the message's `Priority` and `DeliveryMode` via the `PublishWithOptions` method. +Optionally, you can set the message's `Priority`, `DeliveryMode` and `Expiration` via the `PublishWithOptions` method. ```go options := gorabbit.SendOptions(). SetPriority(gorabbit.PriorityMedium). - SetDeliveryMode(gorabbit.Persistent) + SetDeliveryMode(gorabbit.Persistent). + SetTTL(5*time.Second) err := client.PublishWithOptions("events_exchange", "event.foo.bar.created", "foo string", options) ``` diff --git a/TODO.md b/TODO.md index 55e29c9..095c6d5 100644 --- a/TODO.md +++ b/TODO.md @@ -5,7 +5,7 @@ Must have or Nice to have features. ## Must Haves - [ ] Unregister consumer via a `UnregisterConsumer` -- [ ] Send messages with a TTL +- [x] ~~Send messages with a TTL~~ - [ ] Send messages with a definable header ## Nice to have @@ -17,3 +17,8 @@ Must have or Nice to have features. - [ ] Review the `publishingCache` in `channel.go` - [ ] Review the logger - [ ] Review the linter and redefine some rules if they are too strict + +## To Fix + +- [ ] Concurrent consumption throwing errors when `PrefetchSize` is set +- [ ] Consumer wildcard validator does not match all possibilities \ No newline at end of file diff --git a/channel.go b/channel.go index b5732f9..b7b3a7a 100644 --- a/channel.go +++ b/channel.go @@ -579,6 +579,7 @@ func (c *amqpChannel) publish(exchange string, routingKey string, payload []byte if options != nil { publishing.Priority = options.priority() publishing.DeliveryMode = options.mode() + publishing.Expiration = options.ttl() } // If the channel is not ready, we cannot publish, but we send the message to cache if the keepAlive flag is set to true. diff --git a/model.go b/model.go index 041af6a..256d6f3 100644 --- a/model.go +++ b/model.go @@ -1,6 +1,9 @@ package gorabbit import ( + "strconv" + "time" + amqp "github.com/rabbitmq/amqp091-go" ) @@ -58,6 +61,7 @@ type BindingConfig struct { type PublishingOptions struct { MessagePriority *MessagePriority DeliveryMode *DeliveryMode + TTL *time.Duration } func SendOptions() *PublishingOptions { @@ -80,6 +84,14 @@ func (m *PublishingOptions) mode() uint8 { return m.DeliveryMode.Uint8() } +func (m *PublishingOptions) ttl() string { + if m.TTL == nil { + return "" + } + + return strconv.FormatInt(m.TTL.Milliseconds(), 10) +} + func (m *PublishingOptions) SetPriority(priority MessagePriority) *PublishingOptions { m.MessagePriority = &priority @@ -92,6 +104,12 @@ func (m *PublishingOptions) SetMode(mode DeliveryMode) *PublishingOptions { return m } +func (m *PublishingOptions) SetTTL(ttl time.Duration) *PublishingOptions { + m.TTL = &ttl + + return m +} + type consumptionHealth map[string]bool func (s consumptionHealth) IsHealthy() bool {