Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v1.1.2] Allow publishing with TTL specified #19

Merged
merged 3 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 1.1.2

- Allow sending messages with a TTL via `PublishWithOptions` by adding a new `TTL` property in `PublishingOptions` ([PR](https://github.com/KardinalAI/gorabbit/pull/19)).

# 1.1.1

- Minor fix for correct usage of the `ConnectionName` parameter, and the possibility to declare it via environment variables ([PR](https://github.com/KardinalAI/gorabbit/pull/18)).
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,13 @@ type foo struct {
err := client.Publish("events_exchange", "event.foo.bar.created", foo{Action: "bar"})
```

Optionally, you can set the message's `Priority` and `DeliveryMode` via the `PublishWithOptions` method.
Optionally, you can set the message's `Priority`, `DeliveryMode` and `Expiration` via the `PublishWithOptions` method.
matt-ben marked this conversation as resolved.
Show resolved Hide resolved

```go
options := gorabbit.SendOptions().
SetPriority(gorabbit.PriorityMedium).
SetDeliveryMode(gorabbit.Persistent)
SetDeliveryMode(gorabbit.Persistent).
SetTTL(5*time.Second)

err := client.PublishWithOptions("events_exchange", "event.foo.bar.created", "foo string", options)
```
Expand Down
7 changes: 6 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Must have or Nice to have features.
## Must Haves

- [ ] Unregister consumer via a `UnregisterConsumer`
- [ ] Send messages with a TTL
- [x] ~~Send messages with a TTL~~
- [ ] Send messages with a definable header

## Nice to have
Expand All @@ -17,3 +17,8 @@ Must have or Nice to have features.
- [ ] Review the `publishingCache` in `channel.go`
- [ ] Review the logger
- [ ] Review the linter and redefine some rules if they are too strict

## To Fix

- [ ] Concurrent consumption throwing errors when `PrefetchSize` is set
- [ ] Consumer wildcard validator does not match all possibilities
1 change: 1 addition & 0 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ func (c *amqpChannel) publish(exchange string, routingKey string, payload []byte
if options != nil {
publishing.Priority = options.priority()
publishing.DeliveryMode = options.mode()
publishing.Expiration = options.ttl()
}

// If the channel is not ready, we cannot publish, but we send the message to cache if the keepAlive flag is set to true.
Expand Down
18 changes: 18 additions & 0 deletions model.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package gorabbit

import (
"strconv"
"time"

amqp "github.com/rabbitmq/amqp091-go"
)

Expand Down Expand Up @@ -58,6 +61,7 @@ type BindingConfig struct {
type PublishingOptions struct {
MessagePriority *MessagePriority
DeliveryMode *DeliveryMode
TTL *time.Duration
}

func SendOptions() *PublishingOptions {
Expand All @@ -80,6 +84,14 @@ func (m *PublishingOptions) mode() uint8 {
return m.DeliveryMode.Uint8()
}

func (m *PublishingOptions) ttl() string {
if m.TTL == nil {
return ""
}

return strconv.FormatInt(m.TTL.Milliseconds(), 10)
}

func (m *PublishingOptions) SetPriority(priority MessagePriority) *PublishingOptions {
m.MessagePriority = &priority

Expand All @@ -92,6 +104,12 @@ func (m *PublishingOptions) SetMode(mode DeliveryMode) *PublishingOptions {
return m
}

func (m *PublishingOptions) SetTTL(ttl time.Duration) *PublishingOptions {
m.TTL = &ttl

return m
}

type consumptionHealth map[string]bool

func (s consumptionHealth) IsHealthy() bool {
Expand Down
Loading