+name: CI
+on: [push]
+ test-library:
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ go-version: [ '1.19', '1.20', '1.21.x' ]
+ steps:
+ - uses: actions/checkout@v4
+ - name: Setup Go
+ uses: actions/setup-go@v4
+ with:
+ go-version: ${{ matrix.go-version }}
+ - name: Install dependencies
+ run: go get .
+ - name: Go formatting analysis
+ run: |
+ if [ -n "$(gofmt -l .)" ]; then
+ gofmt -d .
+ exit 1
+ fi
+ - name: Go code quality analysis
+ run: go vet ./...
+ - name: Go unit testing
+ run: |
+ go test -race $(go list ./... | grep -v /vendor/) -v -coverprofile=coverage.out
+ go tool cover -func=coverage.out
+ - name: Upload coverage results
+ uses: actions/upload-artifact@v3
+ with:
+ name: coverage
+ path: coverage.out
+ test-lint:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ - name: Setup Go
+ uses: actions/setup-go@v4
+ with:
+ go-version: ${{ matrix.go-version }}
+ - name: Install golangci-lint
+ run: |
+ curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v1.52.2
+ - name: Run golangci-lint
+ run: ./bin/golangci-lint run -v
+## Golden config for golangci-lint v1.52.2
+# This is the best config for golangci-lint based on my experience and opinion.
+# It is very strict, but not extremely strict.
+# Feel free to adopt and change it for your needs.
+ timeout: 1m # default 1m
+ errcheck:
+ disable-default-exclusions: false # disable built-in exclude list # default: false
+ check-type-assertions: true # default false
+ check-blank: false # default false
+ ignore: "fmt:.*" # default fmt:.*
+ exclude-functions: [] # see https://github.com/kisielk/errcheck#excluding-functions for details # default []
+ gosimple:
+ go: "1.20" # default 1.13
+ checks: [ "*" ] # https://staticcheck.io/docs/options#checks # default ["*"]
+ govet:
+ enable-all: true
+ disable:
+ - fieldalignment # too strict
+ settings:
+ shadow:
+ strict: true # default false
+ staticcheck:
+ go: "1.20" # default 1.13
+ checks:
+ - "*" # https://staticcheck.io/docs/options#checks # default ["*"]
+ - "-SA1019" # Removed DEPRACATED subgroup to be able to use "deprecated" tag only in structs.
+ structcheck:
+ exported-fields: false # default false
+ unused:
+ check-exported: false # default false # TODO: enable after fixing false positives
+ varcheck:
+ exported-fields: false # default false # TODO: enable after fixing false positives
+ bidichk:
+ # The following configurations check for all mentioned invisible unicode runes.
+ # All runes are enabled by default.
+ left-to-right-embedding: true # default true
+ right-to-left-embedding: true # default true
+ pop-directional-formatting: true # default true
+ left-to-right-override: true # default true
+ right-to-left-override: true # default true
+ left-to-right-isolate: true # default true
+ right-to-left-isolate: true # default true
+ first-strong-isolate: true # default true
+ pop-directional-isolate: true # default true
+ cyclop:
+ max-complexity: 30 # the maximal code complexity to report # default 10
+ package-average: 10.0 # the maximal average package complexity. If it's higher than 0.0 (float) the check is enabled # default 0.0
+ skip-tests: false # should ignore tests # default false
+ dupl:
+ threshold: 150 # default 150
+ errorlint:
+ # Check whether fmt.Errorf uses the %w verb for formatting errors. See the readme for caveats
+ errorf: true # default true
+ # Check for plain type assertions and type switches
+ asserts: true # default true
+ # Check for plain error comparisons
+ comparison: true # default true
+ exhaustive:
+ check-generated: false # indicates whether to check switch statements in generated Go source files # default false
+ default-signifies-exhaustive: false # if true, presence of "default" case in switch statements satisfies exhaustiveness, even if all enum members are not listed # default false
+ ignore-enum-members: "" # enum members matching the supplied regex do not have to be listed in switch statements to satisfy exhaustiveness # default ""
+ package-scope-only: false # consider enums only in package scopes, not in inner scopes # default false
+ # forbidigo:
+ # forbid: # forbid the following identifiers # default ^(fmt\.Print(|f|ln)|print|println)$
+ # - ^(fmt\.Print(|f|ln)|print|println)$
+ # exclude-godoc-examples: true # exclude godoc examples from forbidigo checks # default is true
+ funlen:
+ lines: 100 # default 60
+ statements: 50 # default 40
+ gocognit:
+ min-complexity: 20 # minimal code complexity to report, 30 by default (but we recommend 10-20)
+ goconst:
+ match-constant: true # look for existing constants matching the values # default true
+ min-len: 3 # minimal length of string constant # default 3
+ min-occurrences: 3 # minimum occurrences of constant string count to trigger issue # default 3
+ numbers: true # search also for duplicated numbers # default false
+ min: 3 # minimum value, only works with goconst.numbers # default 3
+ max: 3 # maximum value, only works with goconst.numbers # default 3
+ ignore-calls: true # ignore when constant is not used as function argument # default true
+ ignore-tests: false # ignore test files # default false
+ gocritic:
+ settings:
+ captLocal:
+ paramsOnly: false # whether to restrict checker to params only # default true
+ elseif:
+ skipBalanced: false # whether to skip balanced if-else pairs # default true
+ #hugeParam: # disabled by default
+ # sizeThreshold: 80 # size in bytes that makes the warning trigger # default 80
+ #nestingReduce: # disabled by default
+ # bodyWidth: 5 # min number of statements inside a branch to trigger a warning # default 5
+ #rangeExprCopy: # disabled by default
+ # sizeThreshold: 512 # size in bytes that makes the warning trigger # default 512
+ # skipTestFuncs: true # whether to check test functions # default true
+ #rangeValCopy: # disabled by default
+ # sizeThreshold: 128 # size in bytes that makes the warning trigger # default 128
+ # skipTestFuncs: true # whether to check test functions # default true
+ #ruleguard: # disabled by default
+ # rules: "" # path to a gorules file # default ""
+ #tooManyResultsChecker: # disabled by default
+ # maxResults: 5 # maximum number of results # default 5
+ #truncateCmp: # disabled by default
+ # skipArchDependent: true # whether to skip int/uint/uintptr types # default true
+ underef:
+ skipRecvDeref: false # whether to skip (*x).method() calls where x is a pointer receiver # default true
+ #unnamedResult: # disabled by default
+ # checkExported: false # whether to check exported functions # default false
+ gocyclo:
+ min-complexity: 30 # default 30
+ godot:
+ scope: declarations # comments to be checked: `declarations` (default), `toplevel`, or `all`
+ exclude: [] # list of regexps for excluding particular comment lines from check # default []
+ capital: false # check that each sentence starts with a capital letter # default false
+ period: true # check that each sentence ends with a period # default true
+ gomnd:
+ # List of enabled checks, see https://github.com/tommy-muehle/go-mnd/#checks for description.
+ checks: # default argument,case,condition,operation,return,assign
+ - argument
+ - case
+ - condition
+ - operation
+ - return
+ - assign
+ # List of numbers to exclude from analysis. The numbers should be written as string.
+ # Following values always ignored: "1", "1.0", "0" and "0.0"
+ ignored-numbers: [] # default []
+ # List of file patterns to exclude from analysis.
+ # Following values always ignored: `.+_test.go`
+ ignored-files: [] # default []
+ # List of function patterns to exclude from analysis.
+ # Following functions always ignored: `time.Date`
+ ignored-functions: ["strconv.ParseUint"] # default []
+ gomoddirectives:
+ replace-allow-list: [] # list of allowed `replace` directives # default []
+ replace-local: false # allow local `replace` directives # default false
+ exclude-forbidden: false # forbid the use of `exclude` directives # default false
+ retract-allow-no-explanation: false # allow to use `retract` directives without explanation # default false
+ gomodguard:
+ allowed:
+ modules: [] # default []
+ domains: [] # default []
+ blocked:
+ modules:
+ - github.com/golang/protobuf:
+ recommendations:
+ - google.golang.org/protobuf
+ reason: "see https://developers.google.com/protocol-buffers/docs/reference/go/faq#modules"
+ - github.com/satori/go.uuid:
+ recommendations:
+ - github.com/google/uuid
+ reason: "satori's package is not maintained"
+ - github.com/gofrs/uuid:
+ recommendations:
+ - github.com/google/uuid
+ reason: "see recommendation from dev-infra team: https://confluence.gtforge.com/x/gQI6Aw"
+ versions: [] # default []
+ local_replace_directives: true # default false
+ lll:
+ line-length: 120 # default 120
+ makezero:
+ always: false # default false
+ maligned:
+ suggest-new: true # default false
+ misspell:
+ locale: US
+ ignore-words:
+ - "" # default: ""
+ nakedret:
+ max-func-lines: 0 # default 30
+ nestif:
+ min-complexity: 4 # default 5
+ nilnil:
+ checked-types: # default [ptr, func, iface, map, chan]
+ - ptr
+ - func
+ - iface
+ - map
+ - chan
+ nolintlint:
+ allow-unused: false # default false
+ allow-leading-space: true # default true
+ allow-no-explanation: [funlen, gocognit, lll] # default []
+ require-explanation: true # default false
+ require-specific: true # default false
+ prealloc:
+ simple: false # default true
+ range-loops: true # default true
+ for-loops: false # default false
+ predeclared:
+ ignore: "" # comma-separated list of predeclared identifiers to not report on # default ""
+ q: false # include method names and field names (i.e., qualified names) in checks # default false
+ promlinter:
+ # Promlinter cannot infer all metrics name in static analysis.
+ # Enable strict mode will also include the errors caused by failing to parse the args.
+ strict: false # default false
+ # Please refer to https://github.com/yeya24/promlinter#usage for detailed usage.
+ # disabled-linters:
+ # - "Help"
+ # - "MetricUnits"
+ # - "Counter"
+ # - "HistogramSummaryReserved"
+ # - "MetricTypeInName"
+ # - "ReservedChars"
+ # - "CamelCase"
+ # - "lintUnitAbbreviations"
+ revive:
+ # default rules are ignored if any of following settings is defined
+ #max-open-files: 0 # maximum number of open files at the same time # defaults 0 - unlimited
+ #ignore-generated-header: false # when set to false, ignores files with "GENERATED" header, similar to golint # default false
+ #confidence: 0.3 # default failure confidence, this means that linting errors with less than X confidence will be ignored # default 0.8
+ #severity: "warning" # minimal rule severity to fail {"error", "warning"} # default "warning"
+ #enable-all-rules: false # default false
+ # There is a list of default rules, but it can be redefined, see https://github.com/mgechev/revive#available-rules
+ rules:
+ - name: "var-naming"
+ arguments: [["ID"], []]
+ # - name: "xxx"
+ # disabled: false
+ # arguments: []
+ # severity: "xxx"
+ # allows to redefine rule severity (without changing default rules list)
+ #directives:
+ # - name: "xxx"
+ # severity: "xxx"
+ rowserrcheck:
+ packages:
+ - database/sql
+ - github.com/jmoiron/sqlx
+ # stylecheck:
+ # go: "1.18" # default 1.13
+ # checks: [ "*" ] # https://staticcheck.io/docs/options#checks # default ["*"]
+ # dot-import-whitelist: [] # https://staticcheck.io/docs/options#dot_import_whitelist # default []
+ # initialisms: [ "ACL", "API", "ASCII", "CPU", "CSS", "DNS", "EOF", "GUID", "HTML", "HTTP", "HTTPS", "ID", "IP", "JSON", "QPS", "RAM", "RPC", "SLA", "SMTP", "SQL", "SSH", "TCP", "TLS", "TTL", "UDP", "UI", "GID", "UID", "UUID", "URI", "URL", "UTF8", "VM", "XML", "XMPP", "XSRF", "XSS" ] # https://staticcheck.io/docs/options#initialisms
+ # http-status-code-whitelist: [ "200", "400", "404", "500" ] # https://staticcheck.io/docs/options#http_status_code_whitelist
+ tenv:
+ all: true # check all functions in _test.go, not only test functions # default false
+ testpackage:
+ skip-regexp: (export|internal)_test\.go # default (export|internal)_test\.go
+ unparam:
+ check-exported: true # default false
+ wrapcheck:
+ ignoreSigs: [] # specifies substrings of signatures to ignore. Overrides default https://github.com/tomarrell/wrapcheck#configuration # default []
+ ignoreSigRegexps: [] # this is similar to the ignoreSigs, but gives slightly more flexibility # default []
+ ignorePackageGlobs: [] # see https://github.com/tomarrell/wrapcheck#configuration # default []
+ disable-all: true
+ enable:
+ ## enabled by default
+ # - deadcode
+ - errcheck
+ - gosimple
+ - govet
+ - ineffassign
+ - staticcheck
+ # - structcheck
+ - typecheck
+ - unused
+ # - varcheck
+ ## disabled by default
+ - asciicheck
+ - bidichk
+ - bodyclose
+ # - containedctx
+ - contextcheck
+ - cyclop
+ - decorder
+ - dogsled
+ - dupl
+ - durationcheck
+ - errchkjson
+ - errname
+ - errorlint
+ - exhaustive
+ - exportloopref
+ - funlen
+ # gochecknoglobals
+ - gochecknoinits
+ # - gocognit # NEED
+ - goconst
+ - gocritic
+ - gocyclo
+ - godot
+ # - godox
+ - goimports
+ - gomnd
+ - gomoddirectives
+ - gomodguard
+ - goprintffuncname
+ - gosec
+ - grouper
+ # - ifshort
+ - importas
+ # - lll # long long lines
+ - maintidx
+ - makezero
+ - nakedret
+ - nestif
+ - nilerr
+ - nilnil
+ - noctx
+ - nolintlint
+ - prealloc
+ - predeclared
+ - promlinter
+ - revive
+ - rowserrcheck
+ - sqlclosecheck
+ - tenv
+ - testpackage
+ - thelper
+ - tparallel
+ - unconvert
+ - unparam
+ - wastedassign
+ - whitespace
+ - wsl
+ ## disabled
+ #- depguard # replaced with gomodguard
+ #- exhaustivestruct # too strict - finds structs that have uninitialized fields # TODO: maybe enable for some packages?
+ #- forbidigo # Forbids identifiers
+ #- forcetypeassert # errcheck is used instead
+ #- gci # is not used - sorts imports
+ #- goerr113 # too strict - checks the errors handling expressions
+ #- gofumpt # replaced with goimports, gofumports is not available yet
+ #- goheader # is not used - checks that each file has the licence at the beginning
+ #- golint # deprecated - revive is used instead
+ #- interfacer # deprecated and has false positives
+ #- ireturn # good, but too strict - accept interfaces, return concrete types
+ #- maligned # deprecated
+ #- misspell # useless - correct commonly misspelled English words... quickly
+ #- nlreturn # too strict - requires a new line before return and branch statements
+ #- paralleltest # too many false positives
+ #- scopelint # deprecated
+ #- stylecheck # revive does the same
+ #- tagliatelle # is not used - checks the struct tags
+ #- wrapcheck # too strict - requires wrapping errors from external packages (even from the same repo) and interfaces
+ #- varnamelen # great idea, but too many false positives - checking length of variable's name matches its usage scope
+ uniq-by-line: false # default true
+ max-issues-per-linter: 0
+ max-same-issues: 0
+ exclude-rules:
+ - source: "^//\\s*go:generate\\s"
+ linters:
+ - lll
+ - source: "(noinspection|TODO)"
+ linters:
+ - godot
+ - source: "//noinspection"
+ linters:
+ - gocritic
+ - source: "^\\s+if _, ok := err\\.\\([^.]+\\.InternalError\\); ok {"
+ linters:
+ - errorlint
+ - path: "_test\\.go"
+ linters:
+ - bodyclose
+ - dupl
+ - funlen
+ - gochecknoinits
+ - goconst
+ - noctx
+ - wrapcheck
+ - wsl
+# Contributor Covenant Code of Conduct
+## Our Pledge
+We as members, contributors, and leaders pledge to make participation in our
+community a harassment-free experience for everyone, regardless of age, body
+size, visible or invisible disability, ethnicity, sex characteristics, gender
+identity and expression, level of experience, education, socio-economic status,
+nationality, personal appearance, race, religion, or sexual identity
+and orientation.
+We pledge to act and interact in ways that contribute to an open, welcoming,
+diverse, inclusive, and healthy community.
+## Our Standards
+Examples of behavior that contributes to a positive environment for our
+community include:
+* Demonstrating empathy and kindness toward other people
+* Being respectful of differing opinions, viewpoints, and experiences
+* Giving and gracefully accepting constructive feedback
+* Accepting responsibility and apologizing to those affected by our mistakes,
+ and learning from the experience
+* Focusing on what is best not just for us as individuals, but for the
+ overall community
+Examples of unacceptable behavior include:
+* The use of sexualized language or imagery, and sexual attention or
+ advances of any kind
+* Trolling, insulting or derogatory comments, and personal or political attacks
+* Public or private harassment
+* Publishing others' private information, such as a physical or email
+ address, without their explicit permission
+* Other conduct which could reasonably be considered inappropriate in a
+ professional setting
+## Enforcement Responsibilities
+Community leaders are responsible for clarifying and enforcing our standards of
+acceptable behavior and will take appropriate and fair corrective action in
+response to any behavior that they deem inappropriate, threatening, offensive,
+or harmful.
+Community leaders have the right and responsibility to remove, edit, or reject
+comments, commits, code, wiki edits, issues, and other contributions that are
+not aligned to this Code of Conduct, and will communicate reasons for moderation
+decisions when appropriate.
+## Scope
+This Code of Conduct applies within all community spaces, and also applies when
+an individual is officially representing the community in public spaces.
+Examples of representing our community include using an official email address,
+posting via an official social media account, or acting as an appointed
+representative at an online or offline event.
+## Enforcement
+Instances of abusive, harassing, or otherwise unacceptable behavior may be
+reported to the community leaders responsible for enforcement at
+All complaints will be reviewed and investigated promptly and fairly.
+All community leaders are obligated to respect the privacy and security of the
+reporter of any incident.
+## Enforcement Guidelines
+Community leaders will follow these Community Impact Guidelines in determining
+the consequences for any action they deem in violation of this Code of Conduct:
+### 1. Correction
+**Community Impact**: Use of inappropriate language or other behavior deemed
+unprofessional or unwelcome in the community.
+**Consequence**: A private, written warning from community leaders, providing
+clarity around the nature of the violation and an explanation of why the
+behavior was inappropriate. A public apology may be requested.
+### 2. Warning
+**Community Impact**: A violation through a single incident or series
+of actions.
+**Consequence**: A warning with consequences for continued behavior. No
+interaction with the people involved, including unsolicited interaction with
+those enforcing the Code of Conduct, for a specified period of time. This
+includes avoiding interactions in community spaces as well as external channels
+like social media. Violating these terms may lead to a temporary or
+permanent ban.
+### 3. Temporary Ban
+**Community Impact**: A serious violation of community standards, including
+sustained inappropriate behavior.
+**Consequence**: A temporary ban from any sort of interaction or public
+communication with the community for a specified period of time. No public or
+private interaction with the people involved, including unsolicited interaction
+with those enforcing the Code of Conduct, is allowed during this period.
+Violating these terms may lead to a permanent ban.
+### 4. Permanent Ban
+**Community Impact**: Demonstrating a pattern of violation of community
+standards, including sustained inappropriate behavior, harassment of an
+individual, or aggression toward or disparagement of classes of individuals.
+**Consequence**: A permanent ban from any sort of public interaction within
+the community.
+## Attribution
+This Code of Conduct is adapted from the [Contributor Covenant][homepage],
+version 2.0, available at
+Community Impact Guidelines were inspired by
+[Mozilla's code of conduct enforcement ladder][Mozilla CoC].
+For answers to common questions about this code of conduct, see the FAQ at
+[https://www.contributor-covenant.org/faq][FAQ]. Translations are available
+at [https://www.contributor-covenant.org/translations][translations].
+[homepage]: https://www.contributor-covenant.org
+[v2.0]: https://www.contributor-covenant.org/version/2/0/code_of_conduct.html
+[Mozilla CoC]: https://github.com/mozilla/diversity
+[FAQ]: https://www.contributor-covenant.org/faq
+[translations]: https://www.contributor-covenant.org/translations
+MIT License
+Copyright (c) 2021-2023 KardinalAI
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+# Gorabbit
+Gorabbit is a wrapper that provides high level and robust RabbitMQ operations through a client or a manager.
+This wrapper depends on the official [Go RabbitMQ plugin](https://github.com/rabbitmq/amqp091-go).
+* [Installation](#installation)
+ * [Go Module](#go-module)
+ * [Environment Variables](#environment-variables)
+* [Always On Mechanism](#always-on-mechanism)
+* [Client](#client)
+ * [Initialization](#client-initialization)
+ * [Options](#client-options)
+ * [Default Options](#client-with-default-options)
+ * [Custom Options](#client-with-custom-options)
+ * [Builder](#client-options-using-the-builder)
+ * [Struct](#client-options-using-struct-initialization)
+ * [Disconnection](#client-disconnection)
+ * [Publishing](#publishing)
+ * [Consuming](#consuming)
+ * [Ready and Health Checks](#ready-and-health-checks)
+* [Manager](#manager)
+ * [Initialization](#manager-initialization)
+ * [Options](#manager-options)
+ * [Default Options](#manager-with-default-options)
+ * [Custom Options](#manager-with-custom-options)
+ * [Builder](#manager-options-using-the-builder)
+ * [Struct](#manager-options-using-struct-initialization)
+ * [Disconnection](#manager-disconnection)
+ * [Operations](#manager-operations)
+ * [Exchange Creation](#exchange-creation)
+ * [Queue Creation](#queue-creation)
+ * [Binding Creation](#binding-creation)
+ * [Message Count](#queue-messages-count)
+ * [Push Message](#push-message)
+ * [Pop Message](#pop-message)
+ * [Purge Queue](#purge-queue)
+ * [Delete Queue](#delete-queue)
+ * [Delete Exchange](#delete-exchange)
+ * [Setup From Definitions](#setup-from-schema-definition-file)
+## Installation
+### Go module
+go get github.com/KardinalAI/gorabbit/v1
+### Environment variables
+The client's and manager's `Mode` can also be set via an environment variable that will **override** the manually
+entered value.
+GORABBIT_MODE: debug # possible values: release or debug
+The client and manager can also be completely disabled via the following environment variable:
+GORABBIT_DISABLED: true # possible values: true, false, 1, or 0
+## Always-on mechanism
+Here is a visual representation of the always-on mechanism of a connection and channel when the `KeepAlive` flag is set
+to true.
+![Always on mechanism](assets/always-on-mechanism.png)
+## Client
+The gorabbit client offers 2 main functionalities:
+* Publishing
+* Consuming
+Additionally, the client also provides a ready check and a health check.
+### Client initialization
+A client can be initialized via the constructor `NewClient`. This constructor takes `ClientOptions` as an optional
+### Client options
+| Property | Description | Default Value |
+| Host | The hostname of the RabbitMQ server | |
+| Port | The port of the RabbitMQ server | 5672 |
+| Username | The plain authentication username | guest |
+| Password | The plain authentication password | guest |
+| Vhost | The specific vhost to use when connection to CloudAMQP | |
+| UseTLS | The flag that activates the use of TLS (amqps) | false |
+| KeepAlive | The flag that activates retry and re-connect mechanisms | true |
+| RetryDelay | The delay between each retry and re-connection | 3 seconds |
+| MaxRetry | The max number of message retry if it failed to process | 5 |
+| PublishingCacheTTL | The time to live for a failed publish when set in cache | 60 seconds |
+| PublishingCacheSize | The max number of failed publish to add into cache | 128 |
+| Mode | The mode defines whether logs are shown or not | Release |
+### Client with default options
+Passing `nil` options will trigger the client to use default values (host, port, credentials, etc...)
+via `DefaultClientOptions()`.
+client := gorabbit.NewClient(nil)
+You can also explicitly pass `DefaultClientOptions()` for a cleaner initialization.
+client := gorabbit.NewClient(gorabbit.DefaultClientOptions())
+Finally, passing a `NewClientOptions()` method also initializes default values if not overwritten.
+client := gorabbit.NewClient(gorabbit.NewClientOptions())
+### Client with options from environment variables
+You can instantiate a client from environment variables, without the need of manually specifying options in the code.
+client := gorabbit.NewClientFromEnv()
+Here are the following supported environment variables:
+* `RABBITMQ_HOST`: Defines the host,
+* `RABBITMQ_PORT`: Defines the port,
+* `RABBITMQ_USERNAME`: Defines the username,
+* `RABBITMQ_PASSWORD`: Defines the password,
+* `RABBITMQ_VHOST`: Defines the vhost,
+* `RABBITMQ_USE_TLS`: Defines whether to use TLS or no.
+**Note that environment variables are all optional, so missing keys will be replaced by their corresponding default.**
+### Client with custom options
+We can input custom values for a specific property, either via the built-in builder or via direct struct initialization.
+#### Client options using the builder
+`NewClientOptions()` and `DefaultClientOptions()` both return an instance of `*ClientOptions` that can act as a builder.
+options := gorabbit.NewClientOptions().
+ SetMode(gorabbit.Debug).
+ SetCredentials("root", "password").
+ SetRetryDelay(5 * time.Second)
+client := gorabbit.NewClient(options)
+> :information_source: There is a setter method for each property.
+#### Client options using struct initialization
+`ClientOptions` is an exported type, so it can be used directly.
+options := gorabbit.ClientOptions {
+ Host: "localhost",
+ Port: 5673,
+ Username: "root",
+ Password: "password",
+ ...
+client := gorabbit.NewClient(&options)
+> :warning: Direct initialization via the struct **does not use default values on missing properties**, so be sure to
+> fill in every property available.
+### Client disconnection
+When a client is initialized, to prevent a leak, always disconnect it when no longer needed.
+client := gorabbit.NewClient(gorabbit.DefaultClientOptions())
+defer client.Disconnect()
+### Publishing
+To send a message, the client offers two simple methods: `Publish` and `PublishWithOptions`. The required arguments for
+publishing are:
+* Exchange (which exchange the message should be sent to)
+* Routing Key
+* Payload (`interface{}`, the object will be marshalled internally)
+Example of sending a simple string
+err := client.Publish("events_exchange", "event.foo.bar.created", "foo string")
+Example of sending an object
+type foo struct {
+ Action string
+err := client.Publish("events_exchange", "event.foo.bar.created", foo{Action: "bar"})
+Optionally, you can set the message's `Priority` and `DeliveryMode` via the `PublishWithOptions` method.
+options := gorabbit.SendOptions().
+ SetPriority(gorabbit.PriorityMedium).
+ SetDeliveryMode(gorabbit.Persistent)
+err := client.PublishWithOptions("events_exchange", "event.foo.bar.created", "foo string", options)
+> :information_source: If the `KeepAlive` flag is set to true when initializing the client, failed publishing will be
+> cached once
+> and re-published as soon as the channel is back up.
+> ![publishing safeguard](assets/publishing-safeguard.png)
+### Consuming
+To consume messages, gorabbit offers a very simple asynchronous consumer method `Consume` that takes a `MessageConsumer`
+as argument. Error handling, acknowledgement, negative acknowledgement and rejection are all done internally by the
+err := client.RegisterConsumer(gorabbit.MessageConsumer{
+ Queue: "events_queue",
+ Name: "toto_consumer",
+ PrefetchSize: 0,
+ PrefetchCount: 10,
+ AutoAck: false,
+ ConcurrentProcess: false,
+ Handlers: gorabbit.MQTTMessageHandlers{
+ "event.foo.bar.created": func (payload []byte) error {
+ fmt.Println(string(payload))
+ return nil
+ },
+ },
+* Queue: The queue to consume messages from
+* Name: Unique identifier for the consumer
+* PrefetchSize: The maximum size of messages that can be processed at the same time
+* PrefetchCount: The maximum number of messages that can be processed at the same time
+* AutoAck: Automatic acknowledgement of messages upon reception
+* ConcurrentProcess: Asynchronous handling of deliveries
+* Handlers: A list of handlers for specified routes
+**NB:** [RabbitMQ Wildcards](https://www.cloudamqp.com/blog/rabbitmq-topic-exchange-explained.html) are also supported.
+If multiple routing keys have the same handler, a wildcard can be used, for example:
+`event.foo.bar.*` or `event.foo.#`.
+> :information_source: If the `KeepAlive` flag is set to true when initializing the client, consumers will
+> auto-reconnect after a connection loss.
+> This mechanism is indefinite and therefore, consuming from a non-existent queue will trigger an error repeatedly but
+> will not affect
+> other consumptions. This is because each consumer has its **own channel**.
+> ![consumer safeguard](assets/consumer-safeguard.png)
+### Ready and Health checks
+The client offers `IsReady()` and `IsHealthy()` checks that can be used for monitoring.
+**Ready:** Verifies that connections are opened and ready to launch new operations.
+**Healthy:** Verifies that both connections and channels are opened, ready and ongoing operations are working
+(Consumers are consuming).
+## Manager
+The gorabbit manager offers multiple management operations:
+* Exchange, queue and bindings creation
+* Exchange and queue deletion
+* Queue evaluation: Exists, number of messages
+* Queue operations: Pop message, push message, purge
+> :warning: A manager should only be used for either testing RabbitMQ functionalities or setting up a RabbitMQ server.
+> The manager does not provide robust mechanisms of retry and reconnection like the client.
+### Manager initialization
+A manager can be initialized via the constructor `NewManager`. This constructor takes `ManagerOptions` as an optional
+### Manager options
+| Property | Description | Default Value |
+| Host | The hostname of the RabbitMQ server | |
+| Port | The port of the RabbitMQ server | 5672 |
+| Username | The plain authentication username | guest |
+| Password | The plain authentication password | guest |
+| Vhost | The specific vhost to use when connection to CloudAMQP | |
+| UseTLS | The flag that activates the use of TLS (amqps) | false |
+| Mode | The mode defines whether logs are shown or not | Release |
+### Manager with default options
+Passing `nil` options will trigger the manager to use default values (host, port, credentials, etc...)
+via `DefaultManagerOptions()`.
+manager := gorabbit.NewManager(nil)
+You can also explicitly pass `DefaultManagerOptions()` for a cleaner initialization.
+manager := gorabbit.NewManager(gorabbit.DefaultManagerOptions())
+Finally, passing a `NewManagerOptions()` method also initializes default values if not overwritten.
+manager := gorabbit.NewManager(gorabbit.NewManagerOptions())
+### Manager with options from environment variables
+You can instantiate a manager from environment variables, without the need of manually specifying options in the code.
+manager := gorabbit.NewManagerFromEnv()
+Here are the following supported environment variables:
+* `RABBITMQ_HOST`: Defines the host,
+* `RABBITMQ_PORT`: Defines the port,
+* `RABBITMQ_USERNAME`: Defines the username,
+* `RABBITMQ_PASSWORD`: Defines the password,
+* `RABBITMQ_VHOST`: Defines the vhost,
+* `RABBITMQ_USE_TLS`: Defines whether to use TLS or no.
+**Note that environment variables are all optional, so missing keys will be replaced by their corresponding default.**
+### Manager with custom options
+We can input custom values for a specific property, either via the built-in builder or via direct struct initialization.
+#### Manager options using the builder
+`NewManagerOptions()` and `DefaultManagerOptions()` both return an instance of `*ManagerOptions` that can act as a
+options := gorabbit.NewManagerOptions().
+ SetMode(gorabbit.Debug).
+ SetCredentials("root", "password")
+manager := gorabbit.NewManager(options)
+> :information_source: There is a setter method for each property.
+#### Manager options using struct initialization
+`ManagerOptions` is an exported type, so it can be used directly.
+options := gorabbit.ManagerOptions {
+ Host: "localhost",
+ Port: 5673,
+ Username: "root",
+ Password: "password",
+ Mode: gorabbit.Debug,
+manager := gorabbit.NewManager(&options)
+> :warning: Direct initialization via the struct **does not use default values on missing properties**, so be sure to
+> fill in every property available.
+### Manager disconnection
+When a manager is initialized, to prevent a leak, always disconnect it when no longer needed.
+manager := gorabbit.NewManager(gorabbit.DefaultManagerOptions())
+defer manager.Disconnect()
+### Manager operations
+The manager offers all necessary operations to manager a RabbitMQ server.
+#### Exchange creation
+Creates an exchange with optional arguments.
+err := manager.CreateExchange(gorabbit.ExchangeConfig{
+ Name: "events_exchange",
+ Type: gorabbit.ExchangeTypeTopic,
+ Persisted: false,
+ Args: nil,
+#### Queue creation
+Creates a queue with optional arguments and bindings if declared.
+err := manager.CreateQueue(gorabbit.QueueConfig{
+ Name: "events_queue",
+ Durable: false,
+ Exclusive: false,
+ Args: nil,
+ Bindings: &[]gorabbit.BindingConfig{
+ {
+ RoutingKey: "event.foo.bar.created",
+ Exchange: "events_exchange",
+ },
+ },
+#### Binding creation
+Binds a queue to an exchange via a given routing key.
+err := manager.BindExchangeToQueueViaRoutingKey("events_exchange", "events_queue", "event.foo.bar.created")
+#### Queue messages count
+Returns the number of messages in a queue, or an error if the queue does not exist. This method can also evaluate the
+existence of a queue.
+messageCount, err := manager.GetNumberOfMessages("events_queue")
+#### Push message
+Pushes a single message to a given exchange.
+err := manager.PushMessageToExchange("events_exchange", "event.foo.bar.created", "single_message_payload")
+#### Pop message
+Retrieves a single message from a given queue and auto acknowledges it if `autoAck` is set to true.
+message, err := manager.PopMessageFromQueue("events_queue", true)
+#### Purge queue
+Deletes all messages from a given queue.
+err := manager.PurgeQueue("events_queue")
+#### Delete queue
+Deletes a given queue.
+err := manager.DeleteQueue("events_queue")
+#### Delete exchange
+Deletes a given exchange.
+err := manager.DeleteExchange("events_exchange")
+#### Setup from schema definition file
+You can setup exchanges, queues and bindings automatically by referencing a
+[RabbitMQ Schema Definition](assets/definitions.example.json) JSON file.
+err := manager.SetupFromDefinitions("/path/to/definitions.json")
+> :warning: The standard RabbitMQ definitions file contains configurations for
+> `users`, `vhosts` and `permissions`. Those configurations are not taken into consideration
+> in the `SetupFromDefinitions` method.
+## Launch Local RabbitMQ Server
+To run a local rabbitMQ server quickly with a docker container, simply run the following command:
+docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
+It will launch a local RabbitMQ server mapped on port 5672, and the management dashboard will be mapped on
+port 15672 accessible on localhost:15672 with a username "guest" and password "guest".
+## License
+**Gorabbit** is licensed under the [MIT](LICENSE).
\ No newline at end of file
@@ -0,0 +1,56 @@
+ "exchanges": [
+ {
+ "name": "foo_exchange",
+ "vhost": "/",
+ "type": "topic",
+ "durable": true,
+ "auto_delete": false,
+ "internal": false,
+ "arguments": {}
+ },
+ {
+ "name": "bar_exchange",
+ "vhost": "/",
+ "type": "topic",
+ "durable": true,
+ "auto_delete": false,
+ "internal": false,
+ "arguments": {}
+ }
+ ],
+ "queues": [
+ {
+ "name": "foo",
+ "vhost": "/",
+ "durable": true,
+ "auto_delete": false,
+ "arguments": {}
+ },
+ {
+ "name": "bar",
+ "vhost": "/",
+ "durable": true,
+ "auto_delete": false,
+ "arguments": {}
+ }
+ ],
+ "bindings": [
+ {
+ "source": "foo_exchange",
+ "vhost": "/",
+ "destination": "foo",
+ "destination_type": "queue",
+ "routing_key": "event.foo.#",
+ "arguments": {}
+ },
+ {
+ "source": "bar_exchange",
+ "vhost": "/",
+ "destination": "bar",
+ "destination_type": "queue",
+ "routing_key": "event.bar.#",
+ "arguments": {}
+ }
+ ]
+package gorabbit
+import (
+ "context"
+ "fmt"
+ "time"
+ "github.com/google/uuid"
+ amqp "github.com/rabbitmq/amqp091-go"
+// amqpChannels is a simple wrapper of an amqpChannel slice.
+type amqpChannels []*amqpChannel
+// publishingChannel loops through all channels and returns the first available publisher channel if it exists.
+func (a amqpChannels) publishingChannel() *amqpChannel {
+ for _, channel := range a {
+ if channel != nil && channel.connectionType == connectionTypePublisher {
+ return channel
+ }
+ }
+ return nil
+// updateParentConnection updates every channel's parent connection.
+func (a amqpChannels) updateParentConnection(conn *amqp.Connection) {
+ for _, channel := range a {
+ channel.connection = conn
+ }
+// amqpChannel holds information about the management of the native amqp.Channel.
+type amqpChannel struct {
+ // ctx is the parent context and acts as a safeguard.
+ ctx context.Context
+ // connection is the native amqp.Connection.
+ connection *amqp.Connection
+ // channel is the native amqp.Channel.
+ channel *amqp.Channel
+ // keepAlive is the flag that will define whether active guards and re-connections are enabled or not.
+ keepAlive bool
+ // retryDelay defines the delay to wait before re-connecting if the channel was closed and the keepAlive flag is set to true.
+ retryDelay time.Duration
+ // consumer is the MessageConsumer that holds all necessary information for the consumption of messages.
+ consumer *MessageConsumer
+ // consumptionCtx holds the consumption context.
+ consumptionCtx context.Context
+ // consumptionCancel is the cancel function of the consumptionCtx.
+ consumptionCancel context.CancelFunc
+ // consumptionHealth manages the status of all active consumptions.
+ consumptionHealth consumptionHealth
+ // publishingCache manages the caching of unpublished messages due to a connection error.
+ publishingCache *ttlMap[string, mqttPublishing]
+ // maxRetry defines the retry header for each message.
+ maxRetry uint
+ // closed is an inner property that switches to true if the channel was explicitly closed.
+ closed bool
+ // logger logs events.
+ logger logger
+ // releaseLogger forces logs not matter the mode. It is used to log important things.
+ releaseLogger logger
+ // connectionType defines the connectionType.
+ connectionType connectionType
+// newConsumerChannel instantiates a new consumerChannel and amqpChannel for method inheritance.
+// - ctx is the parent context.
+// - connection is the parent amqp.Connection.
+// - keepAlive will keep the channel alive if true.
+// - retryDelay defines the delay between each retry, if the keepAlive flag is set to true.
+// - consumer is the MessageConsumer that will hold consumption information.
+// - maxRetry is the retry header for each message.
+// - logger is the parent logger.
+func newConsumerChannel(ctx context.Context, connection *amqp.Connection, keepAlive bool, retryDelay time.Duration, consumer *MessageConsumer, logger logger) *amqpChannel {
+ channel := &amqpChannel{
+ ctx: ctx,
+ connection: connection,
+ keepAlive: keepAlive,
+ retryDelay: retryDelay,
+ logger: inheritLogger(logger, map[string]interface{}{
+ "context": "channel",
+ "type": connectionTypeConsumer,
+ "consumer": consumer.Name,
+ "queue": consumer.Queue,
+ }),
+ releaseLogger: &stdLogger{
+ logger: newLogrus(),
+ identifier: libraryName,
+ logFields: map[string]interface{}{
+ "context": "channel",
+ "type": connectionTypeConsumer,
+ "consumer": consumer.Name,
+ "queue": consumer.Queue,
+ },
+ },
+ connectionType: connectionTypeConsumer,
+ consumptionHealth: make(consumptionHealth),
+ consumer: consumer,
+ }
+ // We open an initial channel.
+ err := channel.open()
+ // If the channel failed to open and the keepAlive flag is set to true, we want to retry until success.
+ if err != nil && keepAlive {
+ go channel.retry()
+ }
+ return channel
+// newPublishingChannel instantiates a new publishingChannel and amqpChannel for method inheritance.
+// - ctx is the parent context.
+// - connection is the parent amqp.Connection.
+// - keepAlive will keep the channel alive if true.
+// - retryDelay defines the delay between each retry, if the keepAlive flag is set to true.
+// - maxRetry defines the maximum number of times a message can be retried if its consumption failed.
+// - publishingCacheSize is the maximum cache size of failed publishing.
+// - publishingCacheTTL defines the time to live for each failed publishing that was put in cache.
+// - logger is the parent logger.
+func newPublishingChannel(ctx context.Context, connection *amqp.Connection, keepAlive bool, retryDelay time.Duration, maxRetry uint, publishingCacheSize uint64, publishingCacheTTL time.Duration, logger logger) *amqpChannel {
+ channel := &amqpChannel{
+ ctx: ctx,
+ connection: connection,
+ keepAlive: keepAlive,
+ retryDelay: retryDelay,
+ logger: inheritLogger(logger, map[string]interface{}{
+ "context": "channel",
+ "type": connectionTypePublisher,
+ }),
+ releaseLogger: &stdLogger{
+ logger: newLogrus(),
+ identifier: libraryName,
+ logFields: map[string]interface{}{
+ "context": "channel",
+ "type": connectionTypePublisher,
+ },
+ },
+ connectionType: connectionTypePublisher,
+ publishingCache: newTTLMap[string, mqttPublishing](publishingCacheSize, publishingCacheTTL),
+ maxRetry: maxRetry,
+ }
+ // We open an initial channel.
+ err := channel.open()
+ // If the channel failed to open and the keepAlive flag is set to true, we want to retry until success.
+ if err != nil && keepAlive {
+ go channel.retry()
+ }
+ return channel
+// open opens a new amqp.Channel from the parent connection.
+func (c *amqpChannel) open() error {
+ // If the channel is nil or closed we return an error.
+ if c.connection == nil || c.connection.IsClosed() {
+ err := errConnectionClosed
+ c.logger.Error(err, "Could not open channel")
+ return err
+ }
+ // We request a channel from the parent connection.
+ channel, err := c.connection.Channel()
+ if err != nil {
+ c.logger.Error(err, "Could not open channel")
+ return err
+ }
+ c.channel = channel
+ c.logger.Info("Channel opened")
+ c.onChannelOpened()
+ // If the keepAlive flag is set to true, we activate a new guard.
+ if c.keepAlive {
+ go c.guard()
+ }
+ return nil
+// reconnect will indefinitely call the open method until a connection is successfully established or the context is canceled.
+func (c *amqpChannel) retry() {
+ c.logger.Debug("Retry launched")
+ for {
+ select {
+ case <-c.ctx.Done():
+ c.logger.Debug("Retry stopped by the context")
+ // If the context was canceled, we break out of the method.
+ return
+ default:
+ // Wait for the retryDelay.
+ time.Sleep(c.retryDelay)
+ // If there is no channel or the current channel is closed, we open a new channel.
+ if !c.ready() {
+ err := c.open()
+ // If the operation succeeds, we break the loop.
+ if err == nil {
+ c.logger.Debug("Retry successful")
+ return
+ }
+ c.logger.Error(err, "Could not open new channel during retry")
+ } else {
+ // If the channel exists and is active, we break out.
+ return
+ }
+ }
+ }
+// guard is a channel safeguard that listens to channel close events and re-launches the channel.
+func (c *amqpChannel) guard() {
+ c.logger.Debug("Guard launched")
+ for {
+ select {
+ case <-c.ctx.Done():
+ c.logger.Debug("Guard stopped by the context")
+ // If the context was canceled, we break out of the method.
+ return
+ case err, ok := <-c.channel.NotifyClose(make(chan *amqp.Error)):
+ if !ok {
+ return
+ }
+ if err != nil {
+ c.logger.Warn("Channel lost", logField{Key: "reason", Value: err.Reason}, logField{Key: "code", Value: err.Code})
+ }
+ // If the channel was explicitly closed, we do not want to retry.
+ if c.closed {
+ return
+ }
+ c.onChannelClosed()
+ go c.retry()
+ return
+ }
+ }
+// close the channel only if it is ready.
+func (c *amqpChannel) close() error {
+ if c.ready() {
+ err := c.channel.Close()
+ if err != nil {
+ c.logger.Error(err, "Could not close channel")
+ return err
+ }
+ }
+ c.closed = true
+ return nil
+// ready returns true if the channel exists and is not closed.
+func (c *amqpChannel) ready() bool {
+ return c.channel != nil && !c.channel.IsClosed()
+// healthy returns true if the channel exists and is not closed.
+func (c *amqpChannel) healthy() bool {
+ if c.connectionType == connectionTypeConsumer {
+ return c.ready() && c.consumptionHealth.IsHealthy()
+ }
+ return c.ready()
+// onChannelOpened is called when a channel is successfully opened.
+func (c *amqpChannel) onChannelOpened() {
+ if c.connectionType == connectionTypeConsumer {
+ // We re-instantiate the consumptionContext and consumptionCancel.
+ c.consumptionCtx, c.consumptionCancel = context.WithCancel(c.ctx)
+ // This is just a safeguard.
+ if c.consumer != nil {
+ c.logger.Info("Launching consumer", logField{Key: "event", Value: "onChannelOpened"})
+ // If the consumer is present we want to start consuming.
+ go c.consume()
+ }
+ } else {
+ // If the publishing cache is empty, nothing to do here.
+ if c.publishingCache == nil || c.publishingCache.Len() == 0 {
+ return
+ }
+ c.logger.Info("Emptying publishing cache", logField{Key: "event", Value: "onChannelOpened"})
+ // For each cached unsuccessful message, we try publishing it again.
+ c.publishingCache.ForEach(func(key string, msg mqttPublishing) {
+ _ = c.channel.PublishWithContext(c.ctx, msg.Exchange, msg.RoutingKey, msg.Mandatory, msg.Immediate, msg.Msg)
+ c.publishingCache.Delete(key)
+ })
+ }
+// onChannelClosed is called when a channel is closed.
+func (c *amqpChannel) onChannelClosed() {
+ if c.connectionType == connectionTypeConsumer {
+ c.logger.Info("Canceling consumptions", logField{Key: "event", Value: "onChannelClosed"})
+ // We cancel the consumptionCtx.
+ c.consumptionCancel()
+ }
+// getID returns a unique identifier for the channel.
+func (c *amqpChannel) getID() string {
+ if c.consumer == nil {
+ return fmt.Sprintf("publisher_%s", uuid.NewString())
+ }
+ return fmt.Sprintf("%s_%s", c.consumer.Name, uuid.NewString())
+// consume handles the consumption mechanism.
+func (c *amqpChannel) consume() {
+ // TODO(Alex): Check if this can actually happen
+ // If the channel is not ready, we cannot consume.
+ if !c.ready() {
+ c.logger.Warn("Channel not ready, cannot launch consumer")
+ return
+ }
+ // TODO(Alex): Double check why setting a prefetch size greater than 0 causes an error
+ // Set the QOS, which defines how many messages can be processed at the same time.
+ err := c.channel.Qos(c.consumer.PrefetchCount, c.consumer.PrefetchSize, false)
+ if err != nil {
+ c.logger.Error(err, "Could not define QOS for consumer")
+ return
+ }
+ deliveries, err := c.channel.Consume(c.consumer.Queue, c.getID(), c.consumer.AutoAck, false, false, false, nil)
+ c.consumptionHealth.AddSubscription(c.consumer.Queue, err)
+ if err != nil {
+ c.logger.Error(err, "Could not consume messages")
+ // If the queue does not exist yet, we want to force a release log with a warning for better visibility.
+ if isErrorNotFound(err) {
+ c.releaseLogger.Warn("Queue does not exist", logField{Key: "queue", Value: c.consumer.Queue})
+ }
+ return
+ }
+ for {
+ select {
+ case <-c.consumptionCtx.Done():
+ return
+ case delivery := <-deliveries:
+ // When a queue is deleted midway, a delivery with no tag or ID is received.
+ if delivery.DeliveryTag == 0 && delivery.MessageId == "" {
+ c.logger.Warn("Queue has been deleted, stopping consumer")
+ return
+ }
+ // We copy the delivery for the concurrent process of it (otherwise we may process the wrong delivery
+ // if a new one is consumed while the previous is still being processed).
+ loopDelivery := delivery
+ if c.consumer.ConcurrentProcess {
+ // We process the message asynchronously if the concurrency is set to true.
+ go c.processDelivery(&loopDelivery)
+ } else {
+ // Otherwise, we process the message synchronously.
+ c.processDelivery(&loopDelivery)
+ }
+ }
+ }
+// processDelivery is the logic that defines what to do with a processed delivery and its error.
+func (c *amqpChannel) processDelivery(delivery *amqp.Delivery) {
+ handler := c.consumer.Handlers.FindFunc(delivery.RoutingKey)
+ // If the handler doesn't exist for the received delivery, we negative acknowledge it without requeue.
+ if handler == nil {
+ c.logger.Debug("No handler found", logField{Key: "routingKey", Value: delivery.RoutingKey})
+ // If the consumer is not set to auto acknowledge the delivery, we negative acknowledge it without requeue.
+ if !c.consumer.AutoAck {
+ _ = delivery.Nack(false, false)
+ }
+ return
+ }
+ err := handler(delivery.Body)
+ // If the consumer has the autoAck flag activated, we want to retry the delivery in case of an error.
+ if c.consumer.AutoAck {
+ if err != nil {
+ go c.retryDelivery(delivery, true)
+ }
+ return
+ }
+ // If there is no error, we can simply acknowledge the delivery.
+ if err == nil {
+ c.logger.Debug("Delivery successfully processed", logField{Key: "messageID", Value: delivery.MessageId})
+ _ = delivery.Ack(false)
+ return
+ }
+ // Otherwise we retry the delivery.
+ go c.retryDelivery(delivery, false)
+// retryDelivery processes a delivery retry based on its redelivery header.
+func (c *amqpChannel) retryDelivery(delivery *amqp.Delivery, alreadyAcknowledged bool) {
+ c.logger.Debug("Delivery retry launched")
+ for {
+ select {
+ case <-c.consumptionCtx.Done():
+ c.logger.Debug("Delivery retry stopped by the consumption context")
+ return
+ default:
+ // We wait for the retry delay before retrying a message.
+ time.Sleep(c.retryDelay)
+ // We first extract the xDeathCountHeader.
+ maxRetryHeader, exists := delivery.Headers[xDeathCountHeader]
+ // If the header doesn't exist.
+ if !exists {
+ c.logger.Debug("Delivery retry invalid")
+ // We negative acknowledge the delivery without requeue if the autoAck flag is set to false.
+ if !alreadyAcknowledged {
+ _ = delivery.Nack(false, false)
+ }
+ return
+ }
+ // We then cast the value as an int32.
+ retriesCount, ok := maxRetryHeader.(int32)
+ // If the casting fails,we negative acknowledge the delivery without requeue if the autoAck flag is set to false.
+ if !ok {
+ c.logger.Debug("Delivery retry invalid")
+ if !alreadyAcknowledged {
+ _ = delivery.Nack(false, false)
+ }
+ return
+ }
+ // If the retries count is still greater than 0, we re-publish the delivery with a decremented xDeathCountHeader.
+ if retriesCount > 0 {
+ c.logger.Debug("Retrying delivery", logField{Key: "retriesLeft", Value: retriesCount - 1})
+ // We first negative acknowledge the existing delivery to remove it from queue if the autoAck flag is set to false.
+ if !alreadyAcknowledged {
+ _ = delivery.Nack(false, false)
+ }
+ // We create a new publishing which is a copy of the old one but with a decremented xDeathCountHeader.
+ newPublishing := amqp.Publishing{
+ ContentType: "application/json",
+ Body: delivery.Body,
+ Type: delivery.RoutingKey,
+ Priority: delivery.Priority,
+ DeliveryMode: delivery.DeliveryMode,
+ MessageId: delivery.MessageId,
+ Timestamp: delivery.Timestamp,
+ Headers: map[string]interface{}{
+ xDeathCountHeader: int(retriesCount - 1),
+ },
+ }
+ // We work on a best-effort basis. We try to re-publish the delivery, but we do nothing if it fails.
+ _ = c.channel.PublishWithContext(c.ctx, delivery.Exchange, delivery.RoutingKey, false, false, newPublishing)
+ return
+ }
+ c.logger.Debug("Cannot retry delivery, max retries reached")
+ // Otherwise, we negative acknowledge the delivery without requeue if the autoAck flag is set to false.
+ if !alreadyAcknowledged {
+ _ = delivery.Nack(false, false)
+ }
+ return
+ }
+ }
+// publish will publish a message with the given configuration.
+func (c *amqpChannel) publish(exchange string, routingKey string, payload []byte, options *publishingOptions) error {
+ publishing := &amqp.Publishing{
+ ContentType: "application/json",
+ Body: payload,
+ Type: routingKey,
+ Priority: PriorityMedium.Uint8(),
+ DeliveryMode: Persistent.Uint8(),
+ MessageId: uuid.NewString(),
+ Timestamp: time.Now(),
+ Headers: map[string]interface{}{
+ xDeathCountHeader: int(c.maxRetry),
+ },
+ }
+ // If options are declared, we add the option.
+ if options != nil {
+ publishing.Priority = options.priority()
+ publishing.DeliveryMode = options.mode()
+ }
+ // If the channel is not ready, we cannot publish, but we send the message to cache if the keepAlive flag is set to true.
+ if !c.ready() {
+ err := errChannelClosed
+ if c.keepAlive {
+ c.logger.Error(err, "Could not publish message, sending to cache")
+ msg := mqttPublishing{
+ Exchange: exchange,
+ RoutingKey: routingKey,
+ Mandatory: false,
+ Immediate: false,
+ Msg: *publishing,
+ }
+ c.publishingCache.Put(msg.HashCode(), msg)
+ } else {
+ c.logger.Error(err, "Could not publish message")
+ }
+ return err
+ }
+ err := c.channel.PublishWithContext(c.ctx, exchange, routingKey, false, false, *publishing)
+ // If the message could not be sent we return an error without caching it.
+ if err != nil {
+ c.logger.Error(err, "Could not publish message")
+ // If the exchange does not exist yet, we want to force a release log with a warning for better visibility.
+ if isErrorNotFound(err) {
+ c.releaseLogger.Warn("The MQTT message was not sent, exchange does not exist", logField{Key: "exchange", Value: exchange}, logField{Key: "routingKey", Value: routingKey})
+ }
+ return err
+ }
+ c.logger.Debug("Message successfully sent", logField{Key: "messageID", Value: publishing.MessageId})
+ return nil
+package gorabbit
+import (
+ "context"
+ "fmt"
+ "os"
+// MQTTClient is a simple MQTT interface that offers basic client operations such as:
+// - Publishing
+// - Consuming
+// - Disconnecting
+// - Ready and health checks
+type MQTTClient interface {
+ // Disconnect launches the disconnection process.
+ // This operation disables to client permanently.
+ Disconnect() error
+ // Publish will send the desired payload through the selected channel.
+ // - exchange is the name of the exchange targeted for event publishing.
+ // - routingKey is the route that the exchange will use to forward the message.
+ // - payload is the object you want to send as a byte array.
+ // Returns an error if the connection to the RabbitMQ server is down.
+ Publish(exchange, routingKey string, payload interface{}) error
+ // PublishWithOptions will send the desired payload through the selected channel.
+ // - exchange is the name of the exchange targeted for event publishing.
+ // - routingKey is the route that the exchange will use to forward the message.
+ // - payload is the object you want to send as a byte array.
+ // Optionally you can add publishingOptions for extra customization.
+ // Returns an error if the connection to the RabbitMQ server is down.
+ PublishWithOptions(exchange, routingKey string, payload interface{}, options *publishingOptions) error
+ // RegisterConsumer will register a MessageConsumer for internal queue subscription and message processing.
+ // The MessageConsumer will hold a list of MQTTMessageHandlers to internalize message processing.
+ // Based on the return of error of each handler, the process of acknowledgment, rejection and retry of messages is
+ // fully handled internally.
+ // Furthermore, connection lost and channel errors are also internally handled by the connectionManager that will keep consumers
+ // alive if and when necessary.
+ RegisterConsumer(consumer MessageConsumer) error
+ // IsReady returns true if the client is fully operational and connected to the RabbitMQ.
+ IsReady() bool
+ // IsHealthy returns true if the client is ready (IsReady) and all channels are operating successfully.
+ IsHealthy() bool
+ // GetHost returns the host used to initialize the client.
+ GetHost() string
+ // GetPort returns the port used to initialize the client.
+ GetPort() uint
+ // GetUsername returns the username used to initialize the client.
+ GetUsername() string
+ // GetVhost returns the vhost used to initialize the client.
+ GetVhost() string
+ // IsDisabled returns whether the client is disabled or not.
+ IsDisabled() bool
+type mqttClient struct {
+ // Host is the RabbitMQ server host name.
+ Host string
+ // Port is the RabbitMQ server port number.
+ Port uint
+ // Username is the RabbitMQ server allowed username.
+ Username string
+ // Password is the RabbitMQ server allowed password.
+ Password string
+ // Vhost is used for CloudAMQP connections to set the specific vhost.
+ Vhost string
+ // logger defines the logger used, depending on the mode set.
+ logger logger
+ // disabled completely disables the client if true.
+ disabled bool
+ // connectionManager manages the connection and channel logic and high-level logic
+ // such as keep alive mechanism and health check.
+ connectionManager *connectionManager
+ // ctx holds the global context used for the client.
+ ctx context.Context
+ // cancel is the cancelFunc for the ctx.
+ cancel context.CancelFunc
+// NewClient will instantiate a new MQTTClient.
+// If options is set to nil, the DefaultClientOptions will be used.
+func NewClient(options *ClientOptions) MQTTClient {
+ // If no options is passed, we use the DefaultClientOptions.
+ if options == nil {
+ options = DefaultClientOptions()
+ }
+ return newClientFromOptions(options)
+// NewClientFromEnv will instantiate a new MQTTClient from environment variables.
+func NewClientFromEnv() MQTTClient {
+ options := NewClientOptionsFromEnv()
+ return newClientFromOptions(options)
+func newClientFromOptions(options *ClientOptions) MQTTClient {
+ client := &mqttClient{
+ Host: options.Host,
+ Port: options.Port,
+ Username: options.Username,
+ Password: options.Password,
+ Vhost: options.Vhost,
+ logger: &noLogger{},
+ }
+ // We check if the disabled flag is present, which will completely disable the MQTTClient.
+ if disabledOverride := os.Getenv("GORABBIT_DISABLED"); disabledOverride != "" {
+ switch disabledOverride {
+ case "1", "true":
+ client.disabled = true
+ return client
+ }
+ }
+ // We check if the mode was overwritten with the environment variable "GORABBIT_MODE".
+ if modeOverride := os.Getenv("GORABBIT_MODE"); isValidMode(modeOverride) {
+ // We override the mode only if it is valid
+ options.Mode = modeOverride
+ }
+ if options.Mode == Debug {
+ // If the mode is Debug, we want to actually log important events.
+ client.logger = newStdLogger()
+ }
+ client.ctx, client.cancel = context.WithCancel(context.Background())
+ protocol := defaultProtocol
+ if options.UseTLS {
+ protocol = securedProtocol
+ }
+ dialURL := fmt.Sprintf("%s://%s:%s@%s:%d/%s", protocol, client.Username, client.Password, client.Host, client.Port, client.Vhost)
+ client.connectionManager = newConnectionManager(
+ client.ctx,
+ dialURL,
+ options.KeepAlive,
+ options.RetryDelay,
+ options.MaxRetry,
+ options.PublishingCacheSize,
+ options.PublishingCacheTTL,
+ client.logger,
+ )
+ return client
+func (client *mqttClient) Publish(exchange string, routingKey string, payload interface{}) error {
+ return client.PublishWithOptions(exchange, routingKey, payload, nil)
+func (client *mqttClient) PublishWithOptions(exchange string, routingKey string, payload interface{}, options *publishingOptions) error {
+ // client is disabled, so we do nothing and return no error.
+ if client.disabled {
+ return nil
+ }
+ return client.connectionManager.publish(exchange, routingKey, payload, options)
+func (client *mqttClient) RegisterConsumer(consumer MessageConsumer) error {
+ // client is disabled, so we do nothing and return no error.
+ if client.disabled {
+ return nil
+ }
+ return client.connectionManager.registerConsumer(consumer)
+func (client *mqttClient) Disconnect() error {
+ // client is disabled, so we do nothing and return no error.
+ if client.disabled {
+ return nil
+ }
+ err := client.connectionManager.close()
+ if err != nil {
+ return err
+ }
+ // cancel the context to stop all reconnection goroutines.
+ client.cancel()
+ // disable the client to avoid trying to launch new operations.
+ client.disabled = true
+ return nil
+func (client *mqttClient) IsReady() bool {
+ // client is disabled, so we do nothing and return true.
+ if client.disabled {
+ return true
+ }
+ return client.connectionManager.isReady()
+func (client *mqttClient) IsHealthy() bool {
+ // client is disabled, so we do nothing and return true.
+ if client.disabled {
+ return true
+ }
+ return client.connectionManager.isHealthy()
+func (client *mqttClient) GetHost() string {
+ return client.Host
+func (client *mqttClient) GetPort() uint {
+ return client.Port
+func (client *mqttClient) GetUsername() string {
+ return client.Username
+func (client *mqttClient) GetVhost() string {
+ return client.Vhost
+func (client *mqttClient) IsDisabled() bool {
+ return client.disabled
+package gorabbit
+import (
+ "time"
+ "github.com/Netflix/go-env"
+// ClientOptions holds all necessary properties to launch a successful connection with an MQTTClient.
+type ClientOptions struct {
+ // Host is the RabbitMQ server host name.
+ Host string
+ // Port is the RabbitMQ server port number.
+ Port uint
+ // Username is the RabbitMQ server allowed username.
+ Username string
+ // Password is the RabbitMQ server allowed password.
+ Password string
+ // Vhost is used for CloudAMQP connections to set the specific vhost.
+ Vhost string
+ // UseTLS defines whether we use amqp or amqps protocol.
+ UseTLS bool
+ // KeepAlive will determine whether the re-connection and retry mechanisms should be triggered.
+ KeepAlive bool
+ // RetryDelay will define the delay for the re-connection and retry mechanism.
+ RetryDelay time.Duration
+ // MaxRetry will define the number of retries when an amqpMessage could not be processed.
+ MaxRetry uint
+ // PublishingCacheTTL defines the time to live for each publishing cache item.
+ PublishingCacheTTL time.Duration
+ // PublishingCacheSize defines the max length of the publishing cache.
+ PublishingCacheSize uint64
+ // Mode will specify whether logs are enabled or not.
+ Mode string
+// DefaultClientOptions will return a ClientOptions with default values.
+func DefaultClientOptions() *ClientOptions {
+ return &ClientOptions{
+ Host: defaultHost,
+ Port: defaultPort,
+ Username: defaultUsername,
+ Password: defaultPassword,
+ Vhost: defaultVhost,
+ UseTLS: defaultUseTLS,
+ KeepAlive: defaultKeepAlive,
+ RetryDelay: defaultRetryDelay,
+ MaxRetry: defaultMaxRetry,
+ PublishingCacheTTL: defaultPublishingCacheTTL,
+ PublishingCacheSize: defaultPublishingCacheSize,
+ Mode: defaultMode,
+ }
+// NewClientOptions is the exported builder for a ClientOptions and will offer setter methods for an easy construction.
+// Any non-assigned field will be set to default through DefaultClientOptions.
+func NewClientOptions() *ClientOptions {
+ return DefaultClientOptions()
+// NewClientOptionsFromEnv will generate a ClientOptions from environment variables. Empty values will be taken as default
+// through the DefaultClientOptions.
+func NewClientOptionsFromEnv() *ClientOptions {
+ defaultOpts := DefaultClientOptions()
+ fromEnv := new(RabbitMQEnvs)
+ _, err := env.UnmarshalFromEnviron(fromEnv)
+ if err != nil {
+ return defaultOpts
+ }
+ if fromEnv.Host != "" {
+ defaultOpts.Host = fromEnv.Host
+ }
+ if fromEnv.Port > 0 {
+ defaultOpts.Port = fromEnv.Port
+ }
+ if fromEnv.Username != "" {
+ defaultOpts.Username = fromEnv.Username
+ }
+ if fromEnv.Password != "" {
+ defaultOpts.Password = fromEnv.Password
+ }
+ if fromEnv.Vhost != "" {
+ defaultOpts.Vhost = fromEnv.Vhost
+ }
+ defaultOpts.UseTLS = fromEnv.UseTLS
+ return defaultOpts
+// SetHost will assign the Host.
+func (c *ClientOptions) SetHost(host string) *ClientOptions {
+ c.Host = host
+ return c
+// SetPort will assign the Port.
+func (c *ClientOptions) SetPort(port uint) *ClientOptions {
+ c.Port = port
+ return c
+// SetCredentials will assign the Username and Password.
+func (c *ClientOptions) SetCredentials(username, password string) *ClientOptions {
+ c.Username = username
+ c.Password = password
+ return c
+// SetVhost will assign the Vhost.
+func (c *ClientOptions) SetVhost(vhost string) *ClientOptions {
+ c.Vhost = vhost
+ return c
+// SetUseTLS will assign the UseTLS status.
+func (c *ClientOptions) SetUseTLS(use bool) *ClientOptions {
+ c.UseTLS = use
+ return c
+// SetKeepAlive will assign the KeepAlive status.
+func (c *ClientOptions) SetKeepAlive(keepAlive bool) *ClientOptions {
+ c.KeepAlive = keepAlive
+ return c
+// SetRetryDelay will assign the retry delay.
+func (c *ClientOptions) SetRetryDelay(delay time.Duration) *ClientOptions {
+ c.RetryDelay = delay
+ return c
+// SetMaxRetry will assign the max retry count.
+func (c *ClientOptions) SetMaxRetry(retry uint) *ClientOptions {
+ c.MaxRetry = retry
+ return c
+// SetPublishingCacheTTL will assign the publishing cache item TTL.
+func (c *ClientOptions) SetPublishingCacheTTL(ttl time.Duration) *ClientOptions {
+ c.PublishingCacheTTL = ttl
+ return c
+// SetPublishingCacheSize will assign the publishing cache max length.
+func (c *ClientOptions) SetPublishingCacheSize(size uint64) *ClientOptions {
+ c.PublishingCacheSize = size
+ return c
+// SetMode will assign the Mode if valid.
+func (c *ClientOptions) SetMode(mode string) *ClientOptions {
+ if isValidMode(mode) {
+ c.Mode = mode
+ }
+ return c
+package gorabbit
+import (
+ "context"
+ "net/url"
+ "time"
+ amqp "github.com/rabbitmq/amqp091-go"
+// amqpConnection holds information about the management of the native amqp.Connection.
+type amqpConnection struct {
+ // ctx is the parent context and acts as a safeguard.
+ ctx context.Context
+ // connection is the native amqp.Connection.
+ connection *amqp.Connection
+ // uri represents the connection string to the RabbitMQ server.
+ uri string
+ // keepAlive is the flag that will define whether active guards and re-connections are enabled or not.
+ keepAlive bool
+ // retryDelay defines the delay to wait before re-connecting if we lose connection and the keepAlive flag is set to true.
+ retryDelay time.Duration
+ // closed is an inner property that switches to true if the connection was explicitly closed.
+ closed bool
+ // channels holds a list of active amqpChannel
+ channels amqpChannels
+ // maxRetry defines the number of retries when publishing a message.
+ maxRetry uint
+ // publishingCacheSize defines the maximum length of cached failed publishing.
+ publishingCacheSize uint64
+ // publishingCacheTTL defines the time to live for a cached failed publishing.
+ publishingCacheTTL time.Duration
+ // logger logs events.
+ logger logger
+ // connectionType defines the connectionType.
+ connectionType connectionType
+// newConsumerConnection initializes a new consumer amqpConnection with given arguments.
+// - ctx is the parent context.
+// - uri is the connection string.
+// - keepAlive will keep the connection alive if true.
+// - retryDelay defines the delay between each re-connection, if the keepAlive flag is set to true.
+// - logger is the parent logger.
+func newConsumerConnection(ctx context.Context, uri string, keepAlive bool, retryDelay time.Duration, logger logger) *amqpConnection {
+ return newConnection(ctx, uri, keepAlive, retryDelay, logger, connectionTypeConsumer)
+// newPublishingConnection initializes a new publisher amqpConnection with given arguments.
+// - ctx is the parent context.
+// - uri is the connection string.
+// - keepAlive will keep the connection alive if true.
+// - retryDelay defines the delay between each re-connection, if the keepAlive flag is set to true.
+// - maxRetry defines the publishing max retry header.
+// - publishingCacheSize defines the maximum length of failed publishing cache.
+// - publishingCacheTTL defines the time to live for failed publishing in cache.
+// - logger is the parent logger.
+func newPublishingConnection(ctx context.Context, uri string, keepAlive bool, retryDelay time.Duration, maxRetry uint, publishingCacheSize uint64, publishingCacheTTL time.Duration, logger logger) *amqpConnection {
+ conn := newConnection(ctx, uri, keepAlive, retryDelay, logger, connectionTypePublisher)
+ conn.maxRetry = maxRetry
+ conn.publishingCacheSize = publishingCacheSize
+ conn.publishingCacheTTL = publishingCacheTTL
+ return conn
+// newConnection initializes a new amqpConnection with given arguments.
+// - ctx is the parent context.
+// - uri is the connection string.
+// - keepAlive will keep the connection alive if true.
+// - retryDelay defines the delay between each re-connection, if the keepAlive flag is set to true.
+// - logger is the parent logger.
+func newConnection(ctx context.Context, uri string, keepAlive bool, retryDelay time.Duration, logger logger, connectionType connectionType) *amqpConnection {
+ conn := &amqpConnection{
+ ctx: ctx,
+ uri: uri,
+ keepAlive: keepAlive,
+ retryDelay: retryDelay,
+ channels: make(amqpChannels, 0),
+ logger: inheritLogger(logger, map[string]interface{}{
+ "context": "connection",
+ "type": connectionType,
+ }),
+ connectionType: connectionType,
+ }
+ conn.logger.Debug("Initializing new amqp connection", logField{Key: "uri", Value: conn.uriForLog()})
+ // We open an initial connection.
+ err := conn.open()
+ // If the connection failed and the keepAlive flag is set to true, we want to re-connect until success.
+ if err != nil && keepAlive {
+ go conn.reconnect()
+ }
+ return conn
+// open opens a new amqp.Connection with the help of a defined uri.
+func (a *amqpConnection) open() error {
+ // If the uri is empty, we return an error.
+ if a.uri == "" {
+ return errEmptyURI
+ }
+ a.logger.Debug("Connecting to RabbitMQ server", logField{Key: "uri", Value: a.uriForLog()})
+ // We request a connection from the RabbitMQ server.
+ conn, err := amqp.Dial(a.uri)
+ if err != nil {
+ a.logger.Error(err, "Connection failed")
+ return err
+ }
+ a.logger.Info("Connection successful", logField{Key: "uri", Value: a.uriForLog()})
+ a.connection = conn
+ a.channels.updateParentConnection(a.connection)
+ // If the keepAlive flag is set to true, we activate a new guard.
+ if a.keepAlive {
+ go a.guard()
+ }
+ return nil
+// reconnect will indefinitely call the open method until a connection is successfully established or the context is canceled.
+func (a *amqpConnection) reconnect() {
+ a.logger.Debug("Re-connection launched")
+ for {
+ select {
+ case <-a.ctx.Done():
+ a.logger.Debug("Re-connection stopped by the context")
+ // If the context was canceled, we break out of the method.
+ return
+ default:
+ // Wait for the retryDelay.
+ time.Sleep(a.retryDelay)
+ // If there is no connection or the current connection is closed, we open a new connection.
+ if !a.ready() {
+ err := a.open()
+ // If the operation succeeds, we break the loop.
+ if err == nil {
+ a.logger.Debug("Re-connection successful")
+ return
+ }
+ a.logger.Error(err, "Could not open new connection during re-connection")
+ } else {
+ // If the connection exists and is active, we break out.
+ return
+ }
+ }
+ }
+// guard is a connection safeguard that listens to connection close events and re-launches the connection.
+func (a *amqpConnection) guard() {
+ a.logger.Debug("Guard launched")
+ for {
+ select {
+ case <-a.ctx.Done():
+ a.logger.Debug("Guard stopped by the context")
+ // If the context was canceled, we break out of the method.
+ return
+ case err, ok := <-a.connection.NotifyClose(make(chan *amqp.Error)):
+ if !ok {
+ return
+ }
+ if err != nil {
+ a.logger.Warn("Connection lost", logField{Key: "reason", Value: err.Reason}, logField{Key: "code", Value: err.Code})
+ }
+ // If the connection was explicitly closed, we do not want to re-connect.
+ if a.closed {
+ return
+ }
+ go a.reconnect()
+ return
+ }
+ }
+// close the connection only if it is ready.
+func (a *amqpConnection) close() error {
+ if a.ready() {
+ for _, channel := range a.channels {
+ err := channel.close()
+ if err != nil {
+ return err
+ }
+ }
+ err := a.connection.Close()
+ if err != nil {
+ a.logger.Error(err, "Could not close connection")
+ return err
+ }
+ }
+ a.closed = true
+ a.logger.Info("Connection closed")
+ return nil
+// ready returns true if the connection exists and is not closed.
+func (a *amqpConnection) ready() bool {
+ return a.connection != nil && !a.connection.IsClosed()
+// healthy returns true if the connection exists, is not closed and all child channels are healthy.
+func (a *amqpConnection) healthy() bool {
+ // If the connection is not ready, return false.
+ if !a.ready() {
+ return false
+ }
+ // Verify that all connection channels are ready too.
+ for _, channel := range a.channels {
+ if !channel.healthy() {
+ return false
+ }
+ }
+ return true
+// registerConsumer opens a new consumerChannel and registers the MessageConsumer.
+func (a *amqpConnection) registerConsumer(consumer MessageConsumer) error {
+ for _, channel := range a.channels {
+ if channel.consumer != nil && channel.consumer.Queue == consumer.Queue {
+ err := errConsumerAlreadyExists
+ a.logger.Error(err, "Could not register consumer", logField{Key: "consumer", Value: consumer.Name})
+ return err
+ }
+ }
+ if err := consumer.Handlers.Validate(); err != nil {
+ return err
+ }
+ channel := newConsumerChannel(a.ctx, a.connection, a.keepAlive, a.retryDelay, &consumer, a.logger)
+ a.channels = append(a.channels, channel)
+ a.logger.Info("Consumer registered", logField{Key: "consumer", Value: consumer.Name})
+ return nil
+func (a *amqpConnection) publish(exchange, routingKey string, payload []byte, options *publishingOptions) error {
+ publishingChannel := a.channels.publishingChannel()
+ if publishingChannel == nil {
+ publishingChannel = newPublishingChannel(a.ctx, a.connection, a.keepAlive, a.retryDelay, a.maxRetry, a.publishingCacheSize, a.publishingCacheTTL, a.logger)
+ a.channels = append(a.channels, publishingChannel)
+ }
+ return publishingChannel.publish(exchange, routingKey, payload, options)
+// uriForLog returns the uri with the password hidden for security measures.
+func (a *amqpConnection) uriForLog() string {
+ if a.uri == "" {
+ return a.uri
+ }
+ parsedURL, err := url.Parse(a.uri)
+ if err != nil {
+ return ""
+ }
+ hiddenPassword := "xxxx"
+ if parsedURL.User != nil {
+ parsedURL.User = url.UserPassword(parsedURL.User.Username(), hiddenPassword)
+ }
+ return parsedURL.String()
+package gorabbit
+import (
+ "context"
+ "encoding/json"
+ "time"
+type connectionManager struct {
+ // consumerConnection holds the independent consuming connection.
+ consumerConnection *amqpConnection
+ // publisherConnection holds the independent publishing connection.
+ publisherConnection *amqpConnection
+// newConnectionManager instantiates a new connectionManager with given arguments.
+func newConnectionManager(
+ ctx context.Context,
+ uri string,
+ keepAlive bool,
+ retryDelay time.Duration,
+ maxRetry uint,
+ publishingCacheSize uint64,
+ publishingCacheTTL time.Duration,
+ logger logger,
+) *connectionManager {
+ c := &connectionManager{
+ consumerConnection: newConsumerConnection(ctx, uri, keepAlive, retryDelay, logger),
+ publisherConnection: newPublishingConnection(ctx, uri, keepAlive, retryDelay, maxRetry, publishingCacheSize, publishingCacheTTL, logger),
+ }
+ return c
+// close offers the basic connection and channel close() mechanism but with extra higher level checks.
+func (c *connectionManager) close() error {
+ if err := c.publisherConnection.close(); err != nil {
+ return err
+ }
+ return c.consumerConnection.close()
+// isReady returns true if both consumerConnection and publishingConnection are ready.
+func (c *connectionManager) isReady() bool {
+ if c.publisherConnection == nil || c.consumerConnection == nil {
+ return false
+ }
+ return c.publisherConnection.ready() && c.consumerConnection.ready()
+// isHealthy returns true if both consumerConnection and publishingConnection are healthy.
+func (c *connectionManager) isHealthy() bool {
+ if c.publisherConnection == nil || c.consumerConnection == nil {
+ return false
+ }
+ return c.publisherConnection.healthy() && c.consumerConnection.healthy()
+// registerConsumer registers a new MessageConsumer.
+func (c *connectionManager) registerConsumer(consumer MessageConsumer) error {
+ if c.consumerConnection == nil {
+ return errConsumerConnectionNotInitialized
+ }
+ return c.consumerConnection.registerConsumer(consumer)
+func (c *connectionManager) publish(exchange, routingKey string, payload interface{}, options *publishingOptions) error {
+ if c.publisherConnection == nil {
+ return errPublisherConnectionNotInitialized
+ }
+ payloadBytes, err := json.Marshal(payload)
+ if err != nil {
+ return err
+ }
+ return c.publisherConnection.publish(exchange, routingKey, payloadBytes, options)
diff --git a/constants.go b/constants.go
new file mode 100644
index 0000000..691a863
--- /dev/null
+++ b/constants.go
@@ -0,0 +1,110 @@
+package gorabbit
+import (
+ "errors"
+ "time"
+// Library name.
+const libraryName = "Gorabbit"
+// Connection protocols.
+const (
+ defaultProtocol = "amqp"
+ securedProtocol = "amqps"
+// Default values for the ClientOptions and ManagerOptions.
+const (
+ defaultHost = ""
+ defaultPort = 5672
+ defaultUsername = "guest"
+ defaultPassword = "guest"
+ defaultVhost = ""
+ defaultUseTLS = false
+ defaultKeepAlive = true
+ defaultRetryDelay = 3 * time.Second
+ defaultMaxRetry = 5
+ defaultPublishingCacheTTL = 60 * time.Second
+ defaultPublishingCacheSize = 128
+ defaultMode = Release
+const (
+ xDeathCountHeader = "x-death-count"
+// Connection Types.
+type connectionType string
+const (
+ connectionTypeConsumer connectionType = "consumer"
+ connectionTypePublisher connectionType = "publisher"
+// Exchange Types
+type ExchangeType string
+const (
+ ExchangeTypeTopic ExchangeType = "topic"
+ ExchangeTypeDirect ExchangeType = "direct"
+ ExchangeTypeFanout ExchangeType = "fanout"
+ ExchangeTypeHeaders ExchangeType = "headers"
+func (e ExchangeType) String() string {
+ return string(e)
+// Priority Levels.
+type MessagePriority uint8
+const (
+ PriorityLowest MessagePriority = 1
+ PriorityVeryLow MessagePriority = 2
+ PriorityLow MessagePriority = 3
+ PriorityMedium MessagePriority = 4
+ PriorityHigh MessagePriority = 5
+ PriorityHighest MessagePriority = 6
+func (m MessagePriority) Uint8() uint8 {
+ return uint8(m)
+// Delivery Modes.
+type DeliveryMode uint8
+const (
+ Transient DeliveryMode = 1
+ Persistent DeliveryMode = 2
+func (d DeliveryMode) Uint8() uint8 {
+ return uint8(d)
+// Logging Modes.
+const (
+ Release = "release"
+ Debug = "debug"
+func isValidMode(mode string) bool {
+ return mode == Release || mode == Debug
+// Errors.
+var (
+ errEmptyURI = errors.New("amqp uri is empty")
+ errChannelClosed = errors.New("channel is closed")
+ errConnectionClosed = errors.New("connection is closed")
+ errConsumerAlreadyExists = errors.New("consumer already exists")
+ errConsumerConnectionNotInitialized = errors.New("consumerConnection is not initialized")
+ errPublisherConnectionNotInitialized = errors.New("publisherConnection is not initialized")
+ errEmptyQueue = errors.New("queue is empty")
diff --git a/consumer.go b/consumer.go
new file mode 100644
index 0000000..d37c347
--- /dev/null
+++ b/consumer.go
@@ -0,0 +1,224 @@
+package gorabbit
+import (
+ "errors"
+ "fmt"
+ "strings"
+// MQTTMessageHandlers is a wrapper that holds a map[string]MQTTMessageHandlerFunc.
+type MQTTMessageHandlers map[string]MQTTMessageHandlerFunc
+// MQTTMessageHandlerFunc is the function that will be called when a delivery is received.
+type MQTTMessageHandlerFunc func(payload []byte) error
+// Validate verifies that all routing keys in the handlers are properly formatted and allowed.
+func (mh MQTTMessageHandlers) Validate() error {
+ for k := range mh {
+ // A routing key cannot be empty.
+ if len(k) == 0 {
+ return errors.New("a routing key cannot be empty")
+ }
+ // A routing key cannot be equal to the wildcard '#'.
+ if len(k) == 1 && k == "#" {
+ return errors.New("a routing key cannot be the wildcard '#'")
+ }
+ // A routing key cannot contain spaces.
+ if strings.Contains(k, " ") {
+ return errors.New("a routing key cannot contain spaces")
+ }
+ // If a routing key is not just made up of one word.
+ if strings.Contains(k, ".") {
+ // We need to make sure that we do not find an empty word or a '%' in the middle of the key.
+ split := strings.Split(k, ".")
+ for i, v := range split {
+ // We cannot have empty strings.
+ if v == "" {
+ return fmt.Errorf("the routing key '%s' is not properly formatted", k)
+ }
+ // The wildcard '#' is not allowed in the middle.
+ if v == "#" && i > 0 && i < len(split)-1 {
+ return fmt.Errorf("the wildcard '#' in the routing key '%s' is not allowed", k)
+ }
+ }
+ }
+ }
+ return nil
+// matchesPrefixWildcard verifies that everything that comes after the '#' wildcard matches.
+func (mh MQTTMessageHandlers) matchesPrefixWildcard(storedWords, words []string) bool {
+ // compareIndex starts after the wildcard in the storedWords array.
+ compareIndex := 1
+ // we initialize the wordIdx at -1.
+ wordIdx := -1
+ // Here we are searching for the first occurrence of the first word after the '#' wildcard
+ // of the storedWords in the words.
+ for i, w := range words {
+ if w == storedWords[compareIndex] {
+ // We can now start comparing at 'i'.
+ wordIdx = i
+ break
+ }
+ }
+ // If we did not find the first word, then surely the key does not match.
+ if wordIdx == -1 {
+ return false
+ }
+ // If the length of storedWords is not the same as the length of words after the wildcard,
+ // then surely the key does not match.
+ if len(storedWords)-compareIndex != len(words)-wordIdx {
+ return false
+ }
+ // Now we can compare, word by word if the routing keys matches.
+ for i := wordIdx; i < len(words); i++ {
+ // Be careful, if we find '*' then it should match no matter what.
+ if storedWords[compareIndex] != words[i] && storedWords[compareIndex] != "*" {
+ return false
+ }
+ // We move right in the storedWords.
+ compareIndex++
+ }
+ return true
+// matchesSuffixWildcard verifies that everything that comes before the '#' wildcard matches.
+func (mh MQTTMessageHandlers) matchesSuffixWildcard(storedWords, words []string) bool {
+ backCount := 2
+ // compareIndex starts before the wildcard in the storedWords array.
+ compareIndex := len(storedWords) - backCount
+ // we initialize the wordIdx at -1.
+ wordIdx := -1
+ // Here we are searching for the first occurrence of the first word before the '#' wildcard
+ // of the storedWords in the words.
+ for i, w := range words {
+ if w == storedWords[compareIndex] {
+ wordIdx = i
+ break
+ }
+ }
+ // If we did not find the first word, then surely the key does not match.
+ if wordIdx == -1 {
+ return false
+ }
+ // If the indexes are not the same then surely the key does not match.
+ if compareIndex != wordIdx {
+ return false
+ }
+ // Now we can compare, word by word, going backwards if the routing keys matches.
+ for i := wordIdx; i > -1; i-- {
+ // Be careful, if we find '*' then it should match no matter what.
+ if storedWords[compareIndex] != words[i] && storedWords[compareIndex] != "*" {
+ return false
+ }
+ // We move left in the storedWords.
+ compareIndex--
+ }
+ return true
+// matchesSuffixWildcard verifies that 2 keys match word by word.
+func (mh MQTTMessageHandlers) matchesKey(storedWords, words []string) bool {
+ // If the lengths are not the same then surely the key does not match.
+ if len(storedWords) != len(words) {
+ return false
+ }
+ // Now we can compare, word by word if the routing keys matches.
+ for i, word := range words {
+ // Be careful, if we find '*' then it should match no matter what.
+ if storedWords[i] != word && storedWords[i] != "*" {
+ return false
+ }
+ }
+ return true
+func (mh MQTTMessageHandlers) FindFunc(routingKey string) MQTTMessageHandlerFunc {
+ // We first check for a direct match
+ if fn, found := mh[routingKey]; found {
+ return fn
+ }
+ // Split the routing key into individual words.
+ words := strings.Split(routingKey, ".")
+ // Check if any of the registered keys match the routing key.
+ for key, fn := range mh {
+ // Split the registered key into individual words.
+ storedWords := strings.Split(key, ".")
+ //nolint: gocritic,nestif // We need this if-else block
+ if storedWords[0] == "#" {
+ if !mh.matchesPrefixWildcard(storedWords, words) {
+ continue
+ }
+ } else if storedWords[len(storedWords)-1] == "#" {
+ if !mh.matchesSuffixWildcard(storedWords, words) {
+ continue
+ }
+ } else {
+ if !mh.matchesKey(storedWords, words) {
+ continue
+ }
+ }
+ return fn
+ }
+ // No matching keys were found.
+ return nil
+// MessageConsumer holds all the information needed to consume messages.
+type MessageConsumer struct {
+ // Queue defines the queue from which we want to consume messages.
+ Queue string
+ // Name is a unique identifier of the consumer. Should be as explicit as possible.
+ Name string
+ // PrefetchSize defines the max size of messages that are allowed to be processed at the same time.
+ // This property is dropped if AutoAck is set to true.
+ PrefetchSize int
+ // PrefetchCount defines the max number of messages that are allowed to be processed at the same time.
+ // This property is dropped if AutoAck is set to true.
+ PrefetchCount int
+ // AutoAck defines whether a message is directly acknowledged or not when being consumed.
+ AutoAck bool
+ // ConcurrentProcess will make MQTTMessageHandlers run concurrently for faster consumption, if set to true.
+ ConcurrentProcess bool
+ // Handlers is the list of defined handlers.
+ Handlers MQTTMessageHandlers
+// HashCode returns a unique identifier for the defined consumer.
+func (c MessageConsumer) HashCode() string {
+ return fmt.Sprintf("%s-%s", c.Queue, c.Name)
diff --git a/consumer_test.go b/consumer_test.go
new file mode 100644
index 0000000..106bdff
--- /dev/null
+++ b/consumer_test.go
@@ -0,0 +1,160 @@
+package gorabbit_test
+import (
+ "errors"
+ "testing"
+ "github.com/stretchr/testify/assert"
+ "github.com/KardinalAI/gorabbit/v1"
+func TestMQTTMessageHandlers_Validate(t *testing.T) {
+ tests := []struct {
+ handlers gorabbit.MQTTMessageHandlers
+ expectedError error
+ }{
+ {
+ handlers: gorabbit.MQTTMessageHandlers{
+ "event.user.#": func(payload []byte) error { return nil },
+ "event.email.*.generated": func(payload []byte) error { return nil },
+ "event.*.space.boom": func(payload []byte) error { return nil },
+ "*.toto.order.passed": func(payload []byte) error { return nil },
+ "#.toto": func(payload []byte) error { return nil },
+ },
+ expectedError: nil,
+ },
+ {
+ handlers: gorabbit.MQTTMessageHandlers{
+ "": func(payload []byte) error { return nil },
+ },
+ expectedError: errors.New("a routing key cannot be empty"),
+ },
+ {
+ handlers: gorabbit.MQTTMessageHandlers{
+ " ": func(payload []byte) error { return nil },
+ },
+ expectedError: errors.New("a routing key cannot contain spaces"),
+ },
+ {
+ handlers: gorabbit.MQTTMessageHandlers{
+ "#": func(payload []byte) error { return nil },
+ },
+ expectedError: errors.New("a routing key cannot be the wildcard '#'"),
+ },
+ {
+ handlers: gorabbit.MQTTMessageHandlers{
+ "toto.#.titi": func(payload []byte) error { return nil },
+ },
+ expectedError: errors.New("the wildcard '#' in the routing key 'toto.#.titi' is not allowed"),
+ },
+ {
+ handlers: gorabbit.MQTTMessageHandlers{
+ "toto titi": func(payload []byte) error { return nil },
+ },
+ expectedError: errors.New("a routing key cannot contain spaces"),
+ },
+ {
+ handlers: gorabbit.MQTTMessageHandlers{
+ "toto..titi": func(payload []byte) error { return nil },
+ },
+ expectedError: errors.New("the routing key 'toto..titi' is not properly formatted"),
+ },
+ {
+ handlers: gorabbit.MQTTMessageHandlers{
+ ".toto.titi": func(payload []byte) error { return nil },
+ },
+ expectedError: errors.New("the routing key '.toto.titi' is not properly formatted"),
+ },
+ {
+ handlers: gorabbit.MQTTMessageHandlers{
+ "toto.titi.": func(payload []byte) error { return nil },
+ },
+ expectedError: errors.New("the routing key 'toto.titi.' is not properly formatted"),
+ },
+ }
+ for _, test := range tests {
+ err := test.handlers.Validate()
+ assert.Equal(t, test.expectedError, err)
+ }
+func TestMQTTMessageHandlers_FindFunc(t *testing.T) {
+ handlers := gorabbit.MQTTMessageHandlers{
+ "event.user.#": func(payload []byte) error { return nil },
+ "event.email.*.generated": func(payload []byte) error { return nil },
+ "event.*.space.boom": func(payload []byte) error { return nil },
+ "*.toto.order.passed": func(payload []byte) error { return nil },
+ "#.toto": func(payload []byte) error { return nil },
+ }
+ tests := []struct {
+ input string
+ shouldMatch bool
+ }{
+ {
+ input: "event.user.plan.generated",
+ shouldMatch: true,
+ },
+ {
+ input: "event.user.password.generated.before.awakening.the.titan",
+ shouldMatch: true,
+ },
+ {
+ input: "event.email.subject.generated",
+ shouldMatch: true,
+ },
+ {
+ input: "event.email.toto.generated",
+ shouldMatch: true,
+ },
+ {
+ input: "event.email.titi.generated",
+ shouldMatch: true,
+ },
+ {
+ input: "event.email.order.created",
+ shouldMatch: false,
+ },
+ {
+ input: "event.toto.space.boom",
+ shouldMatch: true,
+ },
+ {
+ input: "event.toto.space.not_boom",
+ shouldMatch: false,
+ },
+ {
+ input: "command.toto.order.passed",
+ shouldMatch: true,
+ },
+ {
+ input: "command.toto.order.passed.please",
+ shouldMatch: false,
+ },
+ {
+ input: "event.toto",
+ shouldMatch: true,
+ },
+ {
+ input: "event.space.space.toto",
+ shouldMatch: true,
+ },
+ {
+ input: "event.toto.space",
+ shouldMatch: false,
+ },
+ }
+ for _, test := range tests {
+ fn := handlers.FindFunc(test.input)
+ if test.shouldMatch {
+ assert.NotNil(t, fn)
+ } else {
+ assert.Nil(t, fn)
+ }
+ }
+module github.com/KardinalAI/gorabbit/v1
+go 1.20
+require (
+ github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d
+ github.com/google/uuid v1.4.0
+ github.com/rabbitmq/amqp091-go v1.9.0
+ github.com/sirupsen/logrus v1.9.3
+ github.com/stretchr/testify v1.8.4
+require (
+ github.com/davecgh/go-spew v1.1.1 // indirect
+ github.com/pmezard/go-difflib v1.0.0 // indirect
+ golang.org/x/sys v0.15.0 // indirect
+ gopkg.in/yaml.v3 v3.0.1 // indirect
+github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d h1:wvStE9wLpws31NiWUx+38wny1msZ/tm+eL5xmm4Y7So=
+github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d/go.mod h1:9XMFaCeRyW7fC9XJOWQ+NdAv8VLG7ys7l3x4ozEGLUQ=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
+github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
+github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
+github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
+github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
+github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
+github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
+go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
+go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
+golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
+golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+package gorabbit
+import (
+ "os"
+ "github.com/sirupsen/logrus"
+type logField struct {
+ Key string
+ Value interface{}
+// logger is the interface that defines log methods.
+type logger interface {
+ Error(error, string, ...logField)
+ Warn(string, ...logField)
+ Info(string, ...logField)
+ Debug(string, ...logField)
+// stdLogger logs to stdout using logrus (https://github.com/sirupsen/logrus).
+type stdLogger struct {
+ logger *logrus.Logger
+ identifier string
+ logFields map[string]interface{}
+func newStdLogger() logger {
+ return &stdLogger{
+ logger: newLogrus(),
+ identifier: libraryName,
+ logFields: nil,
+ }
+func (l stdLogger) getExtraFields(fields []logField) map[string]interface{} {
+ extraFields := make(map[string]interface{})
+ for k, field := range l.logFields {
+ extraFields[k] = field
+ }
+ for _, extraField := range fields {
+ extraFields[extraField.Key] = extraField.Value
+ }
+ return extraFields
+func (l stdLogger) Error(err error, s string, fields ...logField) {
+ log := l.logger.WithField("library", l.identifier)
+ extraFields := l.getExtraFields(fields)
+ log.WithFields(extraFields).WithError(err).Error(s)
+func (l stdLogger) Warn(s string, fields ...logField) {
+ log := l.logger.WithField("library", l.identifier)
+ extraFields := l.getExtraFields(fields)
+ log.WithFields(extraFields).Warn(s)
+func (l stdLogger) Info(s string, fields ...logField) {
+ log := l.logger.WithField("library", l.identifier)
+ extraFields := l.getExtraFields(fields)
+ log.WithFields(extraFields).Info(s)
+func (l stdLogger) Debug(s string, fields ...logField) {
+ log := l.logger.WithField("library", l.identifier)
+ extraFields := l.getExtraFields(fields)
+ log.WithFields(extraFields).Debug(s)
+// noLogger does not log at all, this is the default.
+type noLogger struct{}
+func (l noLogger) Error(_ error, _ string, _ ...logField) {}
+func (l noLogger) Warn(_ string, _ ...logField) {}
+func (l noLogger) Info(_ string, _ ...logField) {}
+func (l noLogger) Debug(_ string, _ ...logField) {}
+func newLogrus() *logrus.Logger {
+ log := &logrus.Logger{
+ Out: os.Stdout,
+ Formatter: &logrus.JSONFormatter{
+ DisableTimestamp: true,
+ },
+ Level: logrus.DebugLevel,
+ }
+ logLevel := os.Getenv("LOG_LEVEL")
+ if logLevel != "" {
+ lvl, err := logrus.ParseLevel(logLevel)
+ if err == nil {
+ log.Level = lvl
+ }
+ }
+ return log
+func inheritLogger(parent logger, logFields map[string]interface{}) logger {
+ switch v := parent.(type) {
+ case *stdLogger:
+ return &stdLogger{
+ logger: v.logger,
+ identifier: libraryName,
+ logFields: logFields,
+ }
+ default:
+ return parent
+ }
+package gorabbit
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "os"
+ "time"
+ "github.com/google/uuid"
+ amqp "github.com/rabbitmq/amqp091-go"
+// MQTTManager is a simple MQTT interface that offers basic management operations such as:
+// - Creation of queue, exchange and bindings
+// - Deletion of queues and exchanges
+// - Purge of queues
+// - Queue evaluation (existence and number of messages)
+type MQTTManager interface {
+ // Disconnect launches the disconnection process.
+ // This operation disables to manager permanently.
+ Disconnect() error
+ // CreateQueue will create a new queue from QueueConfig.
+ CreateQueue(config QueueConfig) error
+ // CreateExchange will create a new exchange from ExchangeConfig.
+ CreateExchange(config ExchangeConfig) error
+ // BindExchangeToQueueViaRoutingKey will bind an exchange to a queue via a given routingKey.
+ // Returns an error if the connection to the RabbitMQ server is down or if the exchange or queue does not exist.
+ BindExchangeToQueueViaRoutingKey(exchange, queue, routingKey string) error
+ // GetNumberOfMessages retrieves the number of messages currently sitting in a given queue.
+ // Returns an error if the connection to the RabbitMQ server is down or the queue does not exist.
+ GetNumberOfMessages(queue string) (int, error)
+ // PushMessageToExchange pushes a message to a given exchange with a given routing key.
+ // Returns an error if the connection to the RabbitMQ server is down or the exchange does not exist.
+ PushMessageToExchange(exchange, routingKey string, payload interface{}) error
+ // PopMessageFromQueue retrieves the first message of a queue. The message can then be auto-acknowledged or not.
+ // Returns an error if the connection to the RabbitMQ server is down or the queue does not exist or is empty.
+ PopMessageFromQueue(queue string, autoAck bool) (*amqp.Delivery, error)
+ // PurgeQueue will empty a queue of all its current messages.
+ // Returns an error if the connection to the RabbitMQ server is down or the queue does not exist.
+ PurgeQueue(queue string) error
+ // DeleteQueue permanently deletes an existing queue.
+ // Returns an error if the connection to the RabbitMQ server is down or the queue does not exist.
+ DeleteQueue(queue string) error
+ // DeleteExchange permanently deletes an existing exchange.
+ // Returns an error if the connection to the RabbitMQ server is down or the exchange does not exist.
+ DeleteExchange(exchange string) error
+ // SetupFromDefinitions loads a definitions.json file and automatically sets up exchanges, queues and bindings.
+ SetupFromDefinitions(path string) error
+ // GetHost returns the host used to initialize the manager.
+ GetHost() string
+ // GetPort returns the port used to initialize the manager.
+ GetPort() uint
+ // GetUsername returns the username used to initialize the manager.
+ GetUsername() string
+ // GetVhost returns the vhost used to initialize the manager.
+ GetVhost() string
+ // IsDisabled returns whether the manager is disabled or not.
+ IsDisabled() bool
+type mqttManager struct {
+ // Host is the RabbitMQ server host name.
+ Host string
+ // Port is the RabbitMQ server port number.
+ Port uint
+ // Username is the RabbitMQ server allowed username.
+ Username string
+ // Password is the RabbitMQ server allowed password.
+ Password string
+ // Vhost is used for CloudAMQP connections to set the specific vhost.
+ Vhost string
+ // logger defines the logger used, depending on the mode set.
+ logger logger
+ // disabled completely disables the manager if true.
+ disabled bool
+ // connection holds the single connection to the RabbitMQ server.
+ connection *amqp.Connection
+ // channel holds the single channel from the connection.
+ channel *amqp.Channel
+// NewManager will instantiate a new MQTTManager.
+// If options is set to nil, the DefaultManagerOptions will be used.
+func NewManager(options *ManagerOptions) (MQTTManager, error) {
+ // If no options is passed, we use the DefaultManagerOptions.
+ if options == nil {
+ options = DefaultManagerOptions()
+ }
+ return newManagerFromOptions(options)
+// NewManagerFromEnv will instantiate a new MQTTManager from environment variables.
+func NewManagerFromEnv() (MQTTManager, error) {
+ options := NewManagerOptionsFromEnv()
+ return newManagerFromOptions(options)
+func newManagerFromOptions(options *ManagerOptions) (MQTTManager, error) {
+ manager := &mqttManager{
+ Host: options.Host,
+ Port: options.Port,
+ Username: options.Username,
+ Password: options.Password,
+ Vhost: options.Vhost,
+ logger: &noLogger{},
+ }
+ // We check if the disabled flag is present, which will completely disable the MQTTManager.
+ if disabledOverride := os.Getenv("GORABBIT_DISABLED"); disabledOverride != "" {
+ switch disabledOverride {
+ case "1", "true":
+ manager.disabled = true
+ return manager, nil
+ }
+ }
+ // We check if the mode was overwritten with the environment variable "GORABBIT_MODE".
+ if modeOverride := os.Getenv("GORABBIT_MODE"); isValidMode(modeOverride) {
+ // We override the mode only if it is valid
+ options.Mode = modeOverride
+ }
+ if options.Mode == Debug {
+ // If the mode is Debug, we want to actually log important events.
+ manager.logger = newStdLogger()
+ }
+ protocol := defaultProtocol
+ if options.UseTLS {
+ protocol = securedProtocol
+ }
+ dialURL := fmt.Sprintf("%s://%s:%s@%s:%d/%s", protocol, manager.Username, manager.Password, manager.Host, manager.Port, manager.Vhost)
+ var err error
+ manager.connection, err = amqp.Dial(dialURL)
+ if err != nil {
+ return manager, err
+ }
+ manager.channel, err = manager.connection.Channel()
+ if err != nil {
+ return manager, err
+ }
+ return manager, nil
+func (manager *mqttManager) Disconnect() error {
+ // Manager is disabled, so we do nothing and return no error.
+ if manager.disabled {
+ return nil
+ }
+ // We close the manager's channel only if it is opened.
+ if manager.channel != nil && !manager.channel.IsClosed() {
+ err := manager.channel.Close()
+ if err != nil {
+ return err
+ }
+ }
+ // We close the manager's connection only if it is opened.
+ if manager.connection != nil && !manager.connection.IsClosed() {
+ return manager.connection.Close()
+ }
+ return nil
+func (manager *mqttManager) CreateQueue(config QueueConfig) error {
+ // Manager is disabled, so we do nothing and return no error.
+ if manager.disabled {
+ return nil
+ }
+ // If the manager is not ready, we return its error.
+ if ready, err := manager.ready(); !ready {
+ return err
+ }
+ // We declare the queue via the channel.
+ _, err := manager.channel.QueueDeclare(
+ config.Name, // name
+ config.Durable, // durable
+ false, // delete when unused
+ config.Exclusive, // exclusive
+ false, // no-wait
+ config.Args,
+ )
+ if err != nil {
+ return err
+ }
+ // If bindings are also declared, we create the bindings too.
+ if config.Bindings != nil {
+ for _, binding := range config.Bindings {
+ err = manager.BindExchangeToQueueViaRoutingKey(binding.Exchange, config.Name, binding.RoutingKey)
+ if err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+func (manager *mqttManager) CreateExchange(config ExchangeConfig) error {
+ // Manager is disabled, so we do nothing and return no error.
+ if manager.disabled {
+ return nil
+ }
+ // If the manager is not ready, we return its error.
+ if ready, err := manager.ready(); !ready {
+ return err
+ }
+ // We declare the exchange via the channel.
+ return manager.channel.ExchangeDeclare(
+ config.Name, // name
+ config.Type.String(), // type
+ config.Persisted, // durable
+ !config.Persisted, // auto-deleted
+ false, // internal
+ false, // no-wait
+ config.Args, // arguments
+ )
+func (manager *mqttManager) BindExchangeToQueueViaRoutingKey(exchange, queue, routingKey string) error {
+ // Manager is disabled, so we do nothing and return no error.
+ if manager.disabled {
+ return nil
+ }
+ // If the manager is not ready, we return its error.
+ if ready, err := manager.ready(); !ready {
+ return err
+ }
+ // We bind the queue to a given exchange and routing key via the channel.
+ return manager.channel.QueueBind(
+ queue,
+ routingKey,
+ exchange,
+ false,
+ nil,
+ )
+func (manager *mqttManager) GetNumberOfMessages(queue string) (int, error) {
+ // Manager is disabled, so we do nothing and return no error.
+ if manager.disabled {
+ return -1, nil
+ }
+ // If the manager is not ready, we return its error.
+ if ready, err := manager.ready(); !ready {
+ return -1, err
+ }
+ // We passively declare the queue via the channel, this will return the existing queue or an error if it doesn't exist.
+ q, err := manager.channel.QueueDeclarePassive(
+ queue,
+ false,
+ false,
+ false,
+ false,
+ nil,
+ )
+ if err != nil {
+ return -1, err
+ }
+ return q.Messages, nil
+func (manager *mqttManager) PushMessageToExchange(exchange, routingKey string, payload interface{}) error {
+ // Manager is disabled, so we do nothing and return no error.
+ if manager.disabled {
+ return nil
+ }
+ // If the manager is not ready, we return its error.
+ if ready, err := manager.ready(); !ready {
+ return err
+ }
+ // We convert the payload to a []byte.
+ payloadBytes, err := json.Marshal(payload)
+ if err != nil {
+ return err
+ }
+ // We build the amqp.Publishing object.
+ publishing := amqp.Publishing{
+ ContentType: "application/json",
+ Body: payloadBytes,
+ Type: routingKey,
+ Priority: PriorityMedium.Uint8(),
+ DeliveryMode: Transient.Uint8(),
+ MessageId: uuid.NewString(),
+ Timestamp: time.Now(),
+ }
+ // We push the message via the channel.
+ return manager.channel.PublishWithContext(context.TODO(), exchange, routingKey, false, false, publishing)
+func (manager *mqttManager) PopMessageFromQueue(queue string, autoAck bool) (*amqp.Delivery, error) {
+ // Manager is disabled, so we do nothing and return no error.
+ if manager.disabled {
+ //nolint: nilnil // We must return
+ return nil, nil
+ }
+ // If the manager is not ready, we return its error.
+ if ready, err := manager.ready(); !ready {
+ return nil, err
+ }
+ // We get the message via the channel.
+ m, ok, err := manager.channel.Get(queue, autoAck)
+ if err != nil {
+ return nil, err
+ }
+ // If the queue is empty.
+ if !ok {
+ return nil, errEmptyQueue
+ }
+ return &m, nil
+func (manager *mqttManager) PurgeQueue(queue string) error {
+ // Manager is disabled, so we do nothing and return no error.
+ if manager.disabled {
+ return nil
+ }
+ // If the manager is not ready, we return its error.
+ if ready, err := manager.ready(); !ready {
+ return err
+ }
+ // We purge the queue via the channel.
+ _, err := manager.channel.QueuePurge(queue, false)
+ if err != nil {
+ return err
+ }
+ return nil
+func (manager *mqttManager) DeleteQueue(queue string) error {
+ // Manager is disabled, so we do nothing and return no error.
+ if manager.disabled {
+ return nil
+ }
+ // If the manager is not ready, we return its error.
+ if ready, err := manager.ready(); !ready {
+ return err
+ }
+ // We delete the queue via the channel.
+ _, err := manager.channel.QueueDelete(queue, false, false, false)
+ if err != nil {
+ return err
+ }
+ return nil
+func (manager *mqttManager) DeleteExchange(exchange string) error {
+ // Manager is disabled, so we do nothing and return no error.
+ if manager.disabled {
+ return nil
+ }
+ // If the manager is not ready, we return its error.
+ if ready, err := manager.ready(); !ready {
+ return err
+ }
+ // We delete the exchange via the channel.
+ return manager.channel.ExchangeDelete(exchange, false, false)
+func (manager *mqttManager) SetupFromDefinitions(path string) error {
+ // Manager is disabled, so we do nothing and return no error.
+ if manager.disabled {
+ return nil
+ }
+ // If the manager is not ready, we return its error.
+ if ready, err := manager.ready(); !ready {
+ return err
+ }
+ // We read the definitions.json file.
+ definitions, err := os.ReadFile(path)
+ if err != nil {
+ return err
+ }
+ def := new(SchemaDefinitions)
+ // We parse the definitions.json file into the corresponding struct.
+ err = json.Unmarshal(definitions, def)
+ if err != nil {
+ return err
+ }
+ for _, queue := range def.Queues {
+ // We create the queue.
+ err = manager.CreateQueue(QueueConfig{
+ Name: queue.Name,
+ Durable: queue.Durable,
+ Exclusive: false,
+ })
+ if err != nil {
+ return err
+ }
+ }
+ for _, exchange := range def.Exchanges {
+ // We create the exchange.
+ err = manager.CreateExchange(ExchangeConfig{
+ Name: exchange.Name,
+ Type: ExchangeType(exchange.Type),
+ Persisted: exchange.Durable,
+ })
+ if err != nil {
+ return err
+ }
+ }
+ for _, binding := range def.Bindings {
+ // We bind the given exchange to the given queue via the given routing key.
+ err = manager.BindExchangeToQueueViaRoutingKey(binding.Source, binding.Destination, binding.RoutingKey)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+func (manager *mqttManager) checkChannel() error {
+ var err error
+ // If the connection is nil or closed, we must request a new channel.
+ if manager.channel == nil || manager.channel.IsClosed() {
+ manager.channel, err = manager.connection.Channel()
+ }
+ return err
+func (manager *mqttManager) ready() (bool, error) {
+ // Manager is disabled, so we do nothing and return no error.
+ if manager.disabled {
+ return true, nil
+ }
+ // If the connection is nil or closed, we return an error because the manager is not ready.
+ if manager.connection == nil || manager.connection.IsClosed() {
+ return false, errConnectionClosed
+ }
+ // We check the channel as it might have been closed, and we need to request a new one.
+ if err := manager.checkChannel(); err != nil {
+ return false, err
+ }
+ // If the channel is still nil or closed, we return an error because the manager is not ready.
+ if manager.channel == nil || manager.channel.IsClosed() {
+ return false, errChannelClosed
+ }
+ return true, nil
+func (manager *mqttManager) GetHost() string {
+ return manager.Host
+func (manager *mqttManager) GetPort() uint {
+ return manager.Port
+func (manager *mqttManager) GetUsername() string {
+ return manager.Username
+func (manager *mqttManager) GetVhost() string {
+ return manager.Vhost
+func (manager *mqttManager) IsDisabled() bool {
+ return manager.disabled
+package gorabbit
+import "github.com/Netflix/go-env"
+// ManagerOptions holds all necessary properties to launch a successful connection with an MQTTManager.
+type ManagerOptions struct {
+ // Host is the RabbitMQ server host name.
+ Host string
+ // Port is the RabbitMQ server port number.
+ Port uint
+ // Username is the RabbitMQ server allowed username.
+ Username string
+ // Password is the RabbitMQ server allowed password.
+ Password string
+ // Vhost is used for CloudAMQP connections to set the specific vhost.
+ Vhost string
+ // UseTLS defines whether we use amqp or amqps protocol.
+ UseTLS bool
+ // Mode will specify whether logs are enabled or not.
+ Mode string
+// DefaultManagerOptions will return a ManagerOptions with default values.
+func DefaultManagerOptions() *ManagerOptions {
+ return &ManagerOptions{
+ Host: defaultHost,
+ Port: defaultPort,
+ Username: defaultUsername,
+ Password: defaultPassword,
+ Vhost: defaultVhost,
+ UseTLS: defaultUseTLS,
+ Mode: defaultMode,
+ }
+// NewManagerOptions is the exported builder for a ManagerOptions and will offer setter methods for an easy construction.
+// Any non-assigned field will be set to default through DefaultManagerOptions.
+func NewManagerOptions() *ManagerOptions {
+ return DefaultManagerOptions()
+// NewManagerOptionsFromEnv will generate a ManagerOptions from environment variables. Empty values will be taken as default
+// through the DefaultManagerOptions.
+func NewManagerOptionsFromEnv() *ManagerOptions {
+ defaultOpts := DefaultManagerOptions()
+ fromEnv := new(RabbitMQEnvs)
+ _, err := env.UnmarshalFromEnviron(fromEnv)
+ if err != nil {
+ return defaultOpts
+ }
+ if fromEnv.Host != "" {
+ defaultOpts.Host = fromEnv.Host
+ }
+ if fromEnv.Port > 0 {
+ defaultOpts.Port = fromEnv.Port
+ }
+ if fromEnv.Username != "" {
+ defaultOpts.Username = fromEnv.Username
+ }
+ if fromEnv.Password != "" {
+ defaultOpts.Password = fromEnv.Password
+ }
+ if fromEnv.Vhost != "" {
+ defaultOpts.Vhost = fromEnv.Vhost
+ }
+ defaultOpts.UseTLS = fromEnv.UseTLS
+ return defaultOpts
+// SetHost will assign the host.
+func (m *ManagerOptions) SetHost(host string) *ManagerOptions {
+ m.Host = host
+ return m
+// SetPort will assign the port.
+func (m *ManagerOptions) SetPort(port uint) *ManagerOptions {
+ m.Port = port
+ return m
+// SetCredentials will assign the username and password.
+func (m *ManagerOptions) SetCredentials(username, password string) *ManagerOptions {
+ m.Username = username
+ m.Password = password
+ return m
+// SetVhost will assign the Vhost.
+func (m *ManagerOptions) SetVhost(vhost string) *ManagerOptions {
+ m.Vhost = vhost
+ return m
+// SetUseTLS will assign the UseTLS status.
+func (m *ManagerOptions) SetUseTLS(use bool) *ManagerOptions {
+ m.UseTLS = use
+ return m
+// SetMode will assign the mode if valid.
+func (m *ManagerOptions) SetMode(mode string) *ManagerOptions {
+ if isValidMode(mode) {
+ m.Mode = mode
+ }
+ return m
+package gorabbit
+import (
+ amqp "github.com/rabbitmq/amqp091-go"
+type SchemaDefinitions struct {
+ Exchanges []struct {
+ Name string `json:"name"`
+ Vhost string `json:"vhost"`
+ Type string `json:"type"`
+ Durable bool `json:"durable"`
+ AutoDelete bool `json:"auto_delete"`
+ Internal bool `json:"internal"`
+ Arguments struct {
+ } `json:"arguments"`
+ } `json:"exchanges"`
+ Queues []struct {
+ Name string `json:"name"`
+ Vhost string `json:"vhost"`
+ Durable bool `json:"durable"`
+ AutoDelete bool `json:"auto_delete"`
+ Arguments struct {
+ } `json:"arguments"`
+ } `json:"queues"`
+ Bindings []struct {
+ Source string `json:"source"`
+ Vhost string `json:"vhost"`
+ Destination string `json:"destination"`
+ DestinationType string `json:"destination_type"`
+ RoutingKey string `json:"routing_key"`
+ Arguments struct {
+ } `json:"arguments"`
+ } `json:"bindings"`
+type ExchangeConfig struct {
+ Name string `yaml:"name"`
+ Type ExchangeType `yaml:"type"`
+ Persisted bool `yaml:"persisted"`
+ Args map[string]interface{} `yaml:"args"`
+type QueueConfig struct {
+ Name string `yaml:"name"`
+ Durable bool `yaml:"durable"`
+ Exclusive bool `yaml:"exclusive"`
+ Args map[string]interface{} `yaml:"args"`
+ Bindings []BindingConfig `yaml:"bindings"`
+type BindingConfig struct {
+ RoutingKey string `yaml:"routing_key"`
+ Exchange string `yaml:"exchange"`
+type publishingOptions struct {
+ messagePriority *MessagePriority
+ deliveryMode *DeliveryMode
+func SendOptions() *publishingOptions {
+ return &publishingOptions{}
+func (m *publishingOptions) priority() uint8 {
+ if m.messagePriority == nil {
+ return PriorityMedium.Uint8()
+ }
+ return m.messagePriority.Uint8()
+func (m *publishingOptions) mode() uint8 {
+ if m.deliveryMode == nil {
+ return Persistent.Uint8()
+ }
+ return m.deliveryMode.Uint8()
+func (m *publishingOptions) SetPriority(priority MessagePriority) *publishingOptions {
+ m.messagePriority = &priority
+ return m
+func (m *publishingOptions) SetMode(mode DeliveryMode) *publishingOptions {
+ m.deliveryMode = &mode
+ return m
+type consumptionHealth map[string]bool
+func (s consumptionHealth) IsHealthy() bool {
+ for _, v := range s {
+ if !v {
+ return false
+ }
+ }
+ return true
+func (s consumptionHealth) AddSubscription(queue string, err error) {
+ s[queue] = err == nil
+type mqttPublishing struct {
+ Exchange string
+ RoutingKey string
+ Mandatory bool
+ Immediate bool
+ Msg amqp.Publishing
+func (m mqttPublishing) HashCode() string {
+ return m.Msg.MessageId
+type RabbitMQEnvs struct {
+ Host string `env:"RABBITMQ_HOST"`
+ Port uint `env:"RABBITMQ_PORT"`
+ Username string `env:"RABBITMQ_USERNAME"`
+ Password string `env:"RABBITMQ_PASSWORD"`
+ Vhost string `env:"RABBITMQ_VHOST"`
+ UseTLS bool `env:"RABBITMQ_USE_TLS"`
+package gorabbit
+import (
+ "sync"
+ "time"
+type ttlMapValue[V any] struct {
+ value V
+ createdAt time.Time
+type ttlMap[K comparable, V any] struct {
+ m map[K]ttlMapValue[V]
+ l sync.Mutex
+func newTTLMap[K comparable, V any](ln uint64, maxTTL time.Duration) *ttlMap[K, V] {
+ m := &ttlMap[K, V]{m: make(map[K]ttlMapValue[V], ln)}
+ go func() {
+ const tickFraction = 3
+ for now := range time.Tick(maxTTL / tickFraction) {
+ m.l.Lock()
+ for k := range m.m {
+ issueDate := m.m[k].createdAt
+ if now.Sub(issueDate) >= maxTTL {
+ delete(m.m, k)
+ }
+ }
+ m.l.Unlock()
+ }
+ }()
+ return m
+func (m *ttlMap[K, V]) Len() int {
+ return len(m.m)
+func (m *ttlMap[K, V]) Put(k K, v V) {
+ m.l.Lock()
+ defer m.l.Unlock()
+ if _, ok := m.m[k]; !ok {
+ m.m[k] = ttlMapValue[V]{value: v, createdAt: time.Now()}
+ }
+func (m *ttlMap[K, V]) Get(k K) (V, bool) {
+ m.l.Lock()
+ defer m.l.Unlock()
+ v, found := m.m[k]
+ innerVal := v.value
+ return innerVal, found
+func (m *ttlMap[K, V]) ForEach(process func(k K, v V)) {
+ for key, value := range m.m {
+ innerVal := value.value
+ process(key, innerVal)
+ }
+func (m *ttlMap[K, V]) Delete(k K) {
+ m.l.Lock()
+ defer m.l.Unlock()
+ delete(m.m, k)
+package gorabbit
+import (
+ "errors"
+ amqp "github.com/rabbitmq/amqp091-go"
+// Error Utils.
+const (
+ codeNotFound = 404
+// isErrorNotFound checks if the error returned by a connection or channel has the 404 code.
+func isErrorNotFound(err error) bool {
+ var amqpError *amqp.Error
+ errors.As(err, &amqpError)
+ if amqpError == nil {
+ return false
+ }
+ return amqpError.Code == codeNotFound