Skip to content

Commit

Permalink
adding eventhub driver
Browse files Browse the repository at this point in the history
Signed-off-by: Jaydip Gabani <[email protected]>
  • Loading branch information
JaydipGabani committed Jul 29, 2024
1 parent bd96c52 commit f5581d4
Show file tree
Hide file tree
Showing 160 changed files with 26,023 additions and 71 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ MANAGER_IMAGE_PATCH := "apiVersion: apps/v1\
\n - --default-create-vap-for-templates=${GENERATE_VAP}\
\n - --default-create-vap-binding-for-constraints=${GENERATE_VAPBINDING}\
\n - --experimental-enable-k8s-native-validation\
\n - --enable-pub-sub=${ENABLE_PUBSUB}\
\n - --audit-connection=${AUDIT_CONNECTION}\
\n - --audit-channel=${AUDIT_CHANNEL}\
\n"

# Get the currently used golang install path (in GOPATH/bin, unless GOBIN is set)
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ require (
cloud.google.com/go/compute/metadata v0.5.0 // indirect
cloud.google.com/go/monitoring v1.20.1 // indirect
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.7.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.1 // indirect
github.com/Azure/go-amqp v1.0.5 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.20.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.44.0 // indirect
github.com/Microsoft/hcsshim v0.11.4 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ cloud.google.com/go/trace v1.10.11 h1:+Y1emOgcyGy6OdJ2KQbT4t2oecPp49GtJn8j3GM1pW
cloud.google.com/go/trace v1.10.11/go.mod h1:fUr5L3wSXerNfT0f1bBg08W4axS2VbHGgYcfH4KuTXU=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 h1:E+OJmp2tPvt1W+amx48v1eqbjDYsgN+RzP4q16yV5eM=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1/go.mod h1:a6xsAQUZg+VsS3TJ05SRp524Hs4pZ/AeFSr5ENf0Yjo=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.7.0 h1:rTfKOCZGy5ViVrlA74ZPE99a+SgoEE2K/yg3RyW9dFA=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.7.0/go.mod h1:4OG6tQ9EOP/MT0NMjDlRzWoVFxfu9rN9B2X+tlSVktg=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.1 h1:0f6XnzroY1yCQQwxGf/n/2xlaBF02Qhof2as99dGNsY=
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.1/go.mod h1:vMGz6NOUGJ9h5ONl2kkyaqq5E0g7s4CHNSrXN5fl8UY=
github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU=
github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.20.0 h1:tk85AYGwOf6VNtoOQi8w/kVDi2vmPxp3/OU2FsUpdcA=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.20.0/go.mod h1:Xx0VKh7GJ4si3rmElbh19Mejxz68ibWg/J30ZOMrqzU=
Expand Down
8 changes: 1 addition & 7 deletions pkg/controller/pubsub/pubsub_config_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pubsub

import (
"context"
"encoding/json"
"flag"
"fmt"

Expand Down Expand Up @@ -118,13 +117,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (
if _, ok := cfg.Data["provider"]; !ok {
return reconcile.Result{}, fmt.Errorf(fmt.Sprintf("missing provider field in configmap %s, unable to configure respective pubsub", request.NamespacedName))
}
var config interface{}
err = json.Unmarshal([]byte(cfg.Data["config"]), &config)
if err != nil {
return reconcile.Result{}, err
}

err = r.system.UpsertConnection(ctx, config, request.Name, cfg.Data["provider"])
err = r.system.UpsertConnection(ctx, cfg.Data["config"], request.Name, cfg.Data["provider"])
if err != nil {
return reconcile.Result{}, err
}
Expand Down
38 changes: 15 additions & 23 deletions pkg/pubsub/dapr/dapr.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,13 @@ import (
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection"
)

type ClientConfig struct {
// Name of the component to be used for pub sub messaging
Component string `json:"component"`
}

// Dapr represents driver for interacting with pub sub using dapr.
type Dapr struct {
// Array of clients to talk to different endpoints
client daprClient.Client

// Name of the pubsub component
pubSubComponent string
Component string `json:"component"`
}

const (
Expand All @@ -33,7 +28,7 @@ func (r *Dapr) Publish(_ context.Context, data interface{}, topic string) error
return fmt.Errorf("error marshaling data: %w", err)
}

err = r.client.PublishEvent(context.Background(), r.pubSubComponent, topic, jsonData)
err = r.client.PublishEvent(context.Background(), r.Component, topic, jsonData)
if err != nil {
return fmt.Errorf("error publishing message to dapr: %w", err)
}
Expand All @@ -46,38 +41,35 @@ func (r *Dapr) CloseConnection() error {
}

func (r *Dapr) UpdateConnection(_ context.Context, config interface{}) error {
var cfg ClientConfig
m, ok := config.(map[string]interface{})
dClient := &Dapr{}
cfg, ok := config.(string)
if !ok {
return fmt.Errorf("invalid type assertion, config is not in expected format")
}
cfg.Component, ok = m["component"].(string)
if !ok {
return fmt.Errorf("failed to get value of component")
err := json.Unmarshal([]byte(cfg), &dClient)
if err != nil {
return err
}
r.pubSubComponent = cfg.Component
r.Component = dClient.Component
return nil
}

// Returns a new client for dapr.
func NewConnection(_ context.Context, config interface{}) (connection.Connection, error) {
var cfg ClientConfig
m, ok := config.(map[string]interface{})
dClient := &Dapr{}
cfg, ok := config.(string)
if !ok {
return nil, fmt.Errorf("invalid type assertion, config is not in expected format")
}
cfg.Component, ok = m["component"].(string)
if !ok {
return nil, fmt.Errorf("failed to get value of component")
err := json.Unmarshal([]byte(cfg), &dClient)
if err != nil {
return nil, err
}

tmp, err := daprClient.NewClient()
dClient.client, err = daprClient.NewClient()
if err != nil {
return nil, err
}

return &Dapr{
client: tmp,
pubSubComponent: cfg.Component,
}, nil
return dClient, nil
}
2 changes: 1 addition & 1 deletion pkg/pubsub/dapr/dapr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestDapr_UpdateConnection(t *testing.T) {
assert.True(t, ok)
tmp, ok := r.(*Dapr)
assert.True(t, ok)
assert.Equal(t, cmp, tmp.pubSubComponent)
assert.Equal(t, cmp, tmp.Component)
}
})
}
Expand Down
34 changes: 15 additions & 19 deletions pkg/pubsub/dapr/fake_dapr_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func FakeConnection() (connection.Connection, func()) {
c, f := getTestClient(ctx)
return &Dapr{
client: c,
pubSubComponent: "test",
Component: "test",
}, f
}

Expand All @@ -340,7 +340,7 @@ type FakeDapr struct {
client daprClient.Client

// Name of the pubsub component
pubSubComponent string
Component string `json:"component"`

// closing function
f func()
Expand All @@ -356,36 +356,32 @@ func (r *FakeDapr) CloseConnection() error {
}

func (r *FakeDapr) UpdateConnection(_ context.Context, config interface{}) error {
var cfg ClientConfig
m, ok := config.(map[string]interface{})
fClient := &FakeDapr{}
cfg, ok := config.(string)
if !ok {
return fmt.Errorf("invalid type assertion, config is not in expected format")
}
cfg.Component, ok = m["component"].(string)
if !ok {
return fmt.Errorf("failed to get value of component")
err := json.Unmarshal([]byte(cfg), &fClient)
if err != nil {
return err
}
r.pubSubComponent = cfg.Component
r.Component = fClient.Component
return nil
}

// Returns a fake client for dapr.
func FakeNewConnection(ctx context.Context, config interface{}) (connection.Connection, error) {
var cfg ClientConfig
m, ok := config.(map[string]interface{})
fClient := &FakeDapr{}
cfg, ok := config.(string)
if !ok {
return nil, fmt.Errorf("invalid type assertion, config is not in expected format")
}
cfg.Component, ok = m["component"].(string)
if !ok {
return nil, fmt.Errorf("failed to get value of component")
err := json.Unmarshal([]byte(cfg), &fClient)
if err != nil {
return nil, err
}

c, f := getTestClient(ctx)
fClient.client, fClient.f = getTestClient(ctx)

return &FakeDapr{
client: c,
pubSubComponent: cfg.Component,
f: f,
}, nil
return fClient, nil
}
91 changes: 91 additions & 0 deletions pkg/pubsub/eventhub/eventhub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package eventhub

import (
"context"
"fmt"
"encoding/json"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection"
)


// Dapr represents driver for interacting with pub sub using dapr.
type EventHub struct {
// Array of clients to talk to different endpoints
producerClient *azeventhubs.ProducerClient

// Name of the pubsub component
ConnectionString string `json:"connectionString"`
EventHubName string `json:"eventHubName"`
}

const (
Name = "eventhub"
)

func (r *EventHub) Publish(ctx context.Context, data interface{}, topic string) error {
jsonData, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("error marshaling data: %w", err)
}

newBatchOptions := &azeventhubs.EventDataBatchOptions{}

batch, err := r.producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
err = batch.AddEventData(&azeventhubs.EventData{
Body: jsonData,
}, nil)
if err != nil {
return fmt.Errorf("error adding event data to batch: %w", err)
}

err = r.producerClient.SendEventDataBatch(ctx, batch, nil)
if err != nil {
return fmt.Errorf("error publishing message to dapr: %w", err)
}

return nil
}

func (r *EventHub) CloseConnection() error {
return nil
}

func (r *EventHub) UpdateConnection(_ context.Context, config interface{}) error {
cfg, ok := config.(string)
if !ok {
return fmt.Errorf("invalid type assertion, config is not in expected format")
}

err := json.Unmarshal([]byte(cfg), &r)
if err != nil {
return err
}

r.producerClient, err = azeventhubs.NewProducerClientFromConnectionString(r.ConnectionString, r.EventHubName, nil)
if err != nil {
return err
}
return nil
}

// Returns a new client for dapr.
func NewConnection(_ context.Context, config interface{}) (connection.Connection, error) {
cfg, ok := config.(string)
if !ok {
return nil, fmt.Errorf("invalid type assertion, config is not in expected format")
}
client := &EventHub{}
err := json.Unmarshal([]byte(cfg), &client)
if err != nil {
return nil, err
}

client.producerClient, err = azeventhubs.NewProducerClientFromConnectionString(client.ConnectionString, client.EventHubName, nil)
if err != nil {
return nil, err
}

return client, nil
}
2 changes: 2 additions & 0 deletions pkg/pubsub/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (

"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/dapr"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/eventhub"
)

var pubSubs = newPubSubSet(map[string]InitiateConnection{
dapr.Name: dapr.NewConnection,
eventhub.Name: eventhub.NewConnection,
},
)

Expand Down
6 changes: 3 additions & 3 deletions pkg/pubsub/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ func NewSystem() *System {
return &System{}
}

func (s *System) Publish(_ context.Context, connection string, topic string, msg interface{}) error {
func (s *System) Publish(ctx context.Context, connection string, topic string, msg interface{}) error {
s.mux.RLock()
defer s.mux.RUnlock()
if len(s.connections) > 0 {
if c, ok := s.connections[connection]; ok {
return c.Publish(context.Background(), msg, topic)
return c.Publish(ctx, msg, topic)
}
return fmt.Errorf("connection is not initialized, name: %s ", connection)
}
return fmt.Errorf("No connections are established")
}

func (s *System) UpsertConnection(ctx context.Context, config interface{}, name string, provider string) error {
func (s *System) UpsertConnection(ctx context.Context, config string, name string, provider string) error {
s.mux.Lock()
defer s.mux.Unlock()
// Check if the connection already exists.
Expand Down
36 changes: 18 additions & 18 deletions test/pubsub/publish-components.yaml
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
namespace: gatekeeper-system
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: redis-master.default.svc.cluster.local:6379
- name: redisPassword
secretKeyRef:
name: redis
key: redis-password
# ---
# apiVersion: dapr.io/v1alpha1
# kind: Component
# metadata:
# name: pubsub
# namespace: gatekeeper-system
# spec:
# type: pubsub.redis
# version: v1
# metadata:
# - name: redisHost
# value: redis-master.default.svc.cluster.local:6379
# - name: redisPassword
# secretKeyRef:
# name: redis
# key: redis-password
---
apiVersion: v1
kind: ConfigMap
metadata:
name: audit
name: audit-connection
namespace: gatekeeper-system
data:
provider: "dapr"
provider: "eventhub"
config: |
{
"component": "pubsub"
Expand Down
Loading

0 comments on commit f5581d4

Please sign in to comment.