diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..7c521e4 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,3 @@ +# 1.1.0 + +- Allow setting a connection name ([PR](https://github.com/KardinalAI/gorabbit/pull/8)) diff --git a/client.go b/client.go index 2bbc810..8a0baef 100644 --- a/client.go +++ b/client.go @@ -156,6 +156,7 @@ func newClientFromOptions(options *ClientOptions) MQTTClient { client.connectionManager = newConnectionManager( client.ctx, dialURL, + options.ConnectionName, options.KeepAlive, options.RetryDelay, options.MaxRetry, diff --git a/client_options.go b/client_options.go index 4d6d6ef..7b67544 100644 --- a/client_options.go +++ b/client_options.go @@ -26,6 +26,9 @@ type ClientOptions struct { // UseTLS defines whether we use amqp or amqps protocol. UseTLS bool + // ConnectionName is the client connection name passed on to the RabbitMQ server. + ConnectionName string + // KeepAlive will determine whether the re-connection and retry mechanisms should be triggered. KeepAlive bool @@ -142,6 +145,13 @@ func (c *ClientOptions) SetUseTLS(use bool) *ClientOptions { return c } +// SetConnectionName will assign the ConnectionName. +func (c *ClientOptions) SetConnectionName(connectionName string) *ClientOptions { + c.ConnectionName = connectionName + + return c +} + // SetKeepAlive will assign the KeepAlive status. func (c *ClientOptions) SetKeepAlive(keepAlive bool) *ClientOptions { c.KeepAlive = keepAlive diff --git a/connection.go b/connection.go index 429abd0..626d45c 100644 --- a/connection.go +++ b/connection.go @@ -19,6 +19,9 @@ type amqpConnection struct { // uri represents the connection string to the RabbitMQ server. uri string + // connectionName is the client connection name passed on to the RabbitMQ server. + connectionName string + // keepAlive is the flag that will define whether active guards and re-connections are enabled or not. keepAlive bool @@ -50,16 +53,18 @@ type amqpConnection struct { // newConsumerConnection initializes a new consumer amqpConnection with given arguments. // - ctx is the parent context. // - uri is the connection string. +// - connectionName is the connection name. // - keepAlive will keep the connection alive if true. // - retryDelay defines the delay between each re-connection, if the keepAlive flag is set to true. // - logger is the parent logger. -func newConsumerConnection(ctx context.Context, uri string, keepAlive bool, retryDelay time.Duration, logger logger) *amqpConnection { - return newConnection(ctx, uri, keepAlive, retryDelay, logger, connectionTypeConsumer) +func newConsumerConnection(ctx context.Context, uri, connectionName string, keepAlive bool, retryDelay time.Duration, logger logger) *amqpConnection { + return newConnection(ctx, uri, connectionName, keepAlive, retryDelay, logger, connectionTypeConsumer) } // newPublishingConnection initializes a new publisher amqpConnection with given arguments. // - ctx is the parent context. // - uri is the connection string. +// - connectionName is the connection name. // - keepAlive will keep the connection alive if true. // - retryDelay defines the delay between each re-connection, if the keepAlive flag is set to true. // - maxRetry defines the publishing max retry header. @@ -69,6 +74,7 @@ func newConsumerConnection(ctx context.Context, uri string, keepAlive bool, retr func newPublishingConnection( ctx context.Context, uri string, + connectionName string, keepAlive bool, retryDelay time.Duration, maxRetry uint, @@ -76,7 +82,7 @@ func newPublishingConnection( publishingCacheTTL time.Duration, logger logger, ) *amqpConnection { - conn := newConnection(ctx, uri, keepAlive, retryDelay, logger, connectionTypePublisher) + conn := newConnection(ctx, uri, connectionName, keepAlive, retryDelay, logger, connectionTypePublisher) conn.maxRetry = maxRetry conn.publishingCacheSize = publishingCacheSize @@ -88,16 +94,26 @@ func newPublishingConnection( // newConnection initializes a new amqpConnection with given arguments. // - ctx is the parent context. // - uri is the connection string. +// - connectionName is the connection name. // - keepAlive will keep the connection alive if true. // - retryDelay defines the delay between each re-connection, if the keepAlive flag is set to true. // - logger is the parent logger. -func newConnection(ctx context.Context, uri string, keepAlive bool, retryDelay time.Duration, logger logger, connectionType connectionType) *amqpConnection { +func newConnection( + ctx context.Context, + uri string, + connectionName string, + keepAlive bool, + retryDelay time.Duration, + logger logger, + connectionType connectionType, +) *amqpConnection { conn := &amqpConnection{ - ctx: ctx, - uri: uri, - keepAlive: keepAlive, - retryDelay: retryDelay, - channels: make(amqpChannels, 0), + ctx: ctx, + uri: uri, + connectionName: connectionName, + keepAlive: keepAlive, + retryDelay: retryDelay, + channels: make(amqpChannels, 0), logger: inheritLogger(logger, map[string]interface{}{ "context": "connection", "type": connectionType, @@ -127,8 +143,17 @@ func (a *amqpConnection) open() error { a.logger.Debug("Connecting to RabbitMQ server", logField{Key: "uri", Value: a.uriForLog()}) + props := amqp.NewConnectionProperties() + if a.connectionName != "" { + props.SetClientConnectionName(a.connectionName) + } + // We request a connection from the RabbitMQ server. - conn, err := amqp.Dial(a.uri) + conn, err := amqp.DialConfig(a.uri, amqp.Config{ + Heartbeat: defaultHeartbeat, + Locale: defaultLocale, + Properties: props, + }) if err != nil { a.logger.Error(err, "Connection failed") diff --git a/connection_manager.go b/connection_manager.go index be7f783..bd71413 100644 --- a/connection_manager.go +++ b/connection_manager.go @@ -18,6 +18,7 @@ type connectionManager struct { func newConnectionManager( ctx context.Context, uri string, + connectionName string, keepAlive bool, retryDelay time.Duration, maxRetry uint, @@ -26,8 +27,8 @@ func newConnectionManager( logger logger, ) *connectionManager { c := &connectionManager{ - consumerConnection: newConsumerConnection(ctx, uri, keepAlive, retryDelay, logger), - publisherConnection: newPublishingConnection(ctx, uri, keepAlive, retryDelay, maxRetry, publishingCacheSize, publishingCacheTTL, logger), + consumerConnection: newConsumerConnection(ctx, uri, connectionName, keepAlive, retryDelay, logger), + publisherConnection: newPublishingConnection(ctx, uri, connectionName, keepAlive, retryDelay, maxRetry, publishingCacheSize, publishingCacheTTL, logger), } return c diff --git a/constants.go b/constants.go index 691a863..95861cd 100644 --- a/constants.go +++ b/constants.go @@ -30,6 +30,12 @@ const ( defaultMode = Release ) +// Default values for the amqp Config. +const ( + defaultHeartbeat = 10 * time.Second + defaultLocale = "en_US" +) + const ( xDeathCountHeader = "x-death-count" )