In this tutorial, we will use the Messaging Topology Operator to create and configure:
- a queue
- a user
- assign permission to the user
- publish and consume messages to the created queue
Your Kubernetes cluster needs to have:
- Cluster Operator and Messaging Topology Operator installed
- a RabbitmqCluster deployed
You can create a queue by creating a custom resource queues.rabbitmq.com
. For example:
apiVersion: rabbitmq.com/v1beta1
kind: Queue
metadata:
name: tutorial
namespace: REPLACEME #same namespace as the deployed RabbitmqCluster
spec:
name: tutorial # this will be the name of the queue
rabbitmqClusterReference:
name: # name of the RabbitmqCluster
This will create a classic queue named 'tutorial' in default '/' vhost. To check the queue is created successfully, you can ssh onto your RabbitmqCluster pod
kubectl exec -it NAME-OF-THE-RMQ-POD -- /bin/bash
and run the following command to see if queue 'tutorial' is listed:
rabbitmqctl list_queues
You can create a RabbitMQ user with pre-defined username and password. Username and password are passed to the Operator through a Kubernetes secret object:
apiVersion: v1
kind: Secret
metadata:
name: user-secret
namespace: REPLACEME #same namespace as the deployed RabbitmqCluster
type: Opaque
stringData:
username: test
password: test
Then, lets create a RabbitMQ user by creating a custom resource users.rabbitmq.com
:
apiVersion: rabbitmq.com/v1beta1
kind: User
metadata:
name: tutorial-user
namespace: REPLACEME #same namespace as the deployed RabbitmqCluster
spec:
importCredentialsSecret:
name: user-secret
rabbitmqClusterReference:
name: # name of the RabbitmqCluster
This will create a RabbitMQ user with username 'test' and password 'test' (as specified in the Kubernetes secret).
Before we can use the user 'test' to publish and consume messages, we need to grant it permissions.
This can be achieved by create a custom resource permissions.rabbitmq.com
:
apiVersion: rabbitmq.com/v1beta1
kind: Permission
metadata:
name: tutorial-user-permission
namespace: REPLACEME #same namespace as the deployed RabbitmqCluster
spec:
vhost: "/"
user: "test" # name of the created user
permissions:
write: ".*"
configure: ".*"
read: ".*"
rabbitmqClusterReference:
name: # name of the RabbitmqCluster
This is the equivalent of running rabbitmqctl set_permissions -p "/" "test" ".*" ".*" ".*"
.
To check user 'test' is created and configured with the right permissions, you can ssh onto your RabbitmqCluster pod, and run:
rabbitmqctl list_permissions
This command will list all users that has access to the default vhost '/'. You should see user 'test' listed here with read, write, and configure permissions all set to '.*'.
We now have everything we need to be able to publish and consume messages. The following Golang example uses the created user 'test' to publish and consume messages from queue 'tutorial'. The example uses the RabbitMQ Golang client.
Before you can run the example below, you need to have Golang installed in your development environment. To install Golang, you can follow this guide.
package main
import (
"fmt"
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
rmqHostname := "" // please put in external ip or hostname for the deployed RabbitmqCluster
conn, err := amqp.Dial(fmt.Sprintf("amqp://test:test@%s:5672/", rmqHostname)) // username 'test' and password 'test'
if err != nil {
log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("%s: %s", "Failed to open a channel", err)
}
defer ch.Close()
// publish message
msg := "Topology Operator Tutorial" // message body
if err = ch.Publish(
"",
"tutorial", // name of the queue as routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
}); err != nil {
log.Fatalf("%s: %s", "Failed to publish a message", err)
}
log.Printf("Message published: %s", msg)
//consume and print message
msgs, err := ch.Consume(
"tutorial", // name of the queue
"",
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatalf("%s: %s", "Failed to register a consumer", err)
}
for m := range msgs {
log.Printf("Received message: %s", m.Body)
m.Ack(false)
}
}
The code snippet can be run by creating a file with an extension .go
(for example main.go
), then from command line,
you can run the just created program by referring its filename:
go run main.go
Messaging Topology Operator documentation, API reference, and more examples.