Skip to content

Commit

Permalink
This is to implement queue args for RabbitMQ shovels
Browse files Browse the repository at this point in the history
  • Loading branch information
LarsFronius committed Jan 11, 2025
1 parent 62f01be commit 7b12c34
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 10 deletions.
6 changes: 6 additions & 0 deletions api/v1beta1/shovel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ type ShovelSpec struct {
SourcePrefetchCount int `json:"srcPrefetchCount,omitempty"`
DestinationAddForwardHeaders bool `json:"destAddForwardHeaders,omitempty"`
DestinationAddTimestampHeader bool `json:"destAddTimestampHeader,omitempty"`
// +kubebuilder:validation:Type=object
// +kubebuilder:pruning:PreserveUnknownFields
SourceQueueArgs *runtime.RawExtension `json:"srcQueueArgs,omitempty"`
// +kubebuilder:validation:Type=object
// +kubebuilder:pruning:PreserveUnknownFields
DestinationQueueArgs *runtime.RawExtension `json:"destQueueArgs,omitempty"`

// +kubebuilder:validation:Enum=amqp091;amqp10
DestinationProtocol string `json:"destProtocol,omitempty"`
Expand Down
28 changes: 18 additions & 10 deletions api/v1beta1/shovel_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,22 @@ var _ = Describe("Shovel spec", func() {
DestinationProtocol: "amqp091",
DestinationPublishProperties: &runtime.RawExtension{Raw: []byte(`{"delivery_mode": 1}`)},
DestinationQueue: "a-queue",
PrefetchCount: 10,
ReconnectDelay: 10,
SourceAddress: "myQueue",
SourceDeleteAfter: "never",
SourceExchange: "an-exchange",
SourceExchangeKey: "a-key",
SourcePrefetchCount: 10,
SourceProtocol: "amqp091",
SourceQueue: "a-queue",
SourceConsumerArgs: &runtime.RawExtension{Raw: []byte(`{"arg": "arg-value"}`)},
DestinationQueueArgs: &runtime.RawExtension{
Raw: []byte(`{"x-queue-type": "quorum"}`),
},
PrefetchCount: 10,
ReconnectDelay: 10,
SourceAddress: "myQueue",
SourceDeleteAfter: "never",
SourceExchange: "an-exchange",
SourceExchangeKey: "a-key",
SourcePrefetchCount: 10,
SourceProtocol: "amqp091",
SourceQueue: "a-queue",
SourceQueueArgs: &runtime.RawExtension{
Raw: []byte(`{"x-queue-type": "quorum"}`),
},
SourceConsumerArgs: &runtime.RawExtension{Raw: []byte(`{"arg": "arg-value"}`)},
}}
Expect(k8sClient.Create(ctx, &shovel)).To(Succeed())
fetched := &Shovel{}
Expand All @@ -107,6 +113,7 @@ var _ = Describe("Shovel spec", func() {
Expect(fetched.Spec.DestinationProperties.Raw).To(Equal([]byte(`{"key":"a-property"}`)))
Expect(fetched.Spec.DestinationMessageAnnotations.Raw).To(Equal([]byte(`{"key":"a-property"}`)))
Expect(fetched.Spec.DestinationQueue).To(Equal("a-queue"))
Expect(fetched.Spec.DestinationQueueArgs.Raw).To(Equal([]byte(`{"x-queue-type":"quorum"}`)))
Expect(fetched.Spec.PrefetchCount).To(Equal(10))
Expect(fetched.Spec.ReconnectDelay).To(Equal(10))

Expand All @@ -117,6 +124,7 @@ var _ = Describe("Shovel spec", func() {
Expect(fetched.Spec.SourcePrefetchCount).To(Equal(10))
Expect(fetched.Spec.SourceProtocol).To(Equal("amqp091"))
Expect(fetched.Spec.SourceQueue).To(Equal("a-queue"))
Expect(fetched.Spec.SourceQueueArgs.Raw).To(Equal([]byte(`{"x-queue-type":"quorum"}`)))
Expect(fetched.Spec.SourceConsumerArgs.Raw).To(Equal([]byte(`{"arg":"arg-value"}`)))
})

Expand Down
10 changes: 10 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions config/crd/bases/rabbitmq.com_shovels.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ spec:
destQueue:
description: amqp091 configuration
type: string
destQueueArgs:
type: object
x-kubernetes-preserve-unknown-fields: true
name:
description: Required property; cannot be updated
type: string
Expand Down Expand Up @@ -155,6 +158,9 @@ spec:
srcQueue:
description: amqp091 configuration
type: string
srcQueueArgs:
type: object
x-kubernetes-preserve-unknown-fields: true
uriSecret:
description: |-
Secret contains the AMQP URI(s) to configure Shovel destination and source.
Expand Down
2 changes: 2 additions & 0 deletions docs/api/rabbitmq.com.ref.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,8 @@ Required property.
| *`srcPrefetchCount`* __integer__ |
| *`destAddForwardHeaders`* __boolean__ |
| *`destAddTimestampHeader`* __boolean__ |
| *`srcQueueArgs`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#rawextension-runtime-pkg[$$RawExtension$$]__ |
| *`destQueueArgs`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#rawextension-runtime-pkg[$$RawExtension$$]__ |
| *`destProtocol`* __string__ |
| *`destQueue`* __string__ | amqp091 configuration
| *`destExchange`* __string__ | amqp091 configuration
Expand Down
14 changes: 14 additions & 0 deletions internal/shovel_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ func GenerateShovelDefinition(s *topology.Shovel, srcUri, destUri string) (*rabb
return nil, fmt.Errorf("failed to unmarshall destination message annotations: %v", err)
}
}
srcQueueArgs := make(map[string]interface{})
if s.Spec.SourceQueueArgs != nil {
if err := json.Unmarshal(s.Spec.SourceQueueArgs.Raw, &srcQueueArgs); err != nil {
return nil, fmt.Errorf("failed to unmarshall source queue args: %v", err)
}
}
destQueueArgs := make(map[string]interface{})
if s.Spec.DestinationQueueArgs != nil {
if err := json.Unmarshal(s.Spec.DestinationQueueArgs.Raw, &destQueueArgs); err != nil {
return nil, fmt.Errorf("failed to unmarshall destination queue args: %v", err)
}
}

return &rabbithole.ShovelDefinition{
SourceURI: strings.Split(srcUri, ","),
Expand All @@ -56,6 +68,7 @@ func GenerateShovelDefinition(s *topology.Shovel, srcUri, destUri string) (*rabb
DestinationProtocol: s.Spec.DestinationProtocol,
DestinationPublishProperties: destPubProperties,
DestinationQueue: s.Spec.DestinationQueue,
DestinationQueueArgs: destQueueArgs,
DestinationMessageAnnotations: destMsgAnnotations,
PrefetchCount: s.Spec.PrefetchCount,
ReconnectDelay: s.Spec.ReconnectDelay,
Expand All @@ -66,6 +79,7 @@ func GenerateShovelDefinition(s *topology.Shovel, srcUri, destUri string) (*rabb
SourcePrefetchCount: s.Spec.SourcePrefetchCount,
SourceProtocol: s.Spec.SourceProtocol,
SourceQueue: s.Spec.SourceQueue,
SourceQueueArgs: srcQueueArgs,
SourceConsumerArgs: srcConArgs,
}, nil
}

0 comments on commit 7b12c34

Please sign in to comment.