Skip to content

Commit

Permalink
adding diskwriter driver
Browse files Browse the repository at this point in the history
Signed-off-by: Jaydip Gabani <[email protected]>
  • Loading branch information
JaydipGabani committed Sep 5, 2024
1 parent 2294222 commit 8f58972
Show file tree
Hide file tree
Showing 7 changed files with 316 additions and 21 deletions.
34 changes: 32 additions & 2 deletions deploy/gatekeeper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4933,13 +4933,39 @@ spec:
spec:
automountServiceAccountToken: true
containers:
- name: go-sub
image: jaydipgabani/fake-reader:latest
imagePullPolicy: Always
resources:
limits:
memory: 512Mi
requests:
cpu: 100m
memory: 512Mi
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
readOnlyRootFilesystem: true
runAsGroup: 999
runAsNonRoot: true
runAsUser: 1000
seccompProfile:
type: RuntimeDefault
volumeMounts:
- mountPath: /tmp/violations
name: tmp-violations
- args:
- --operation=audit
- --operation=status
- --operation=mutation-status
- --logtostderr
- --disable-opa-builtin={http.send}
- --disable-cert-rotation
- --enable-pub-sub=true
- --audit-connection=audit
- --audit-channel=audit
command:
- /manager
env:
Expand All @@ -4961,7 +4987,7 @@ spec:
value: manager
- name: OTEL_RESOURCE_ATTRIBUTES
value: k8s.pod.name=$(POD_NAME),k8s.namespace.name=$(NAMESPACE),k8s.container.name=$(CONTAINER_NAME)
image: openpolicyagent/gatekeeper:v3.18.0-beta.0
image: jaydipgabani/gatekeeper-e2e:latest
imagePullPolicy: Always
livenessProbe:
httpGet:
Expand Down Expand Up @@ -5002,6 +5028,8 @@ spec:
readOnly: true
- mountPath: /tmp/audit
name: tmp-volume
- mountPath: /tmp/violations
name: tmp-violations
nodeSelector:
kubernetes.io/os: linux
priorityClassName: system-cluster-critical
Expand All @@ -5014,6 +5042,8 @@ spec:
secretName: gatekeeper-webhook-server-cert
- emptyDir: {}
name: tmp-volume
- emptyDir: {}
name: tmp-violations
---
apiVersion: apps/v1
kind: Deployment
Expand Down Expand Up @@ -5080,7 +5110,7 @@ spec:
value: manager
- name: OTEL_RESOURCE_ATTRIBUTES
value: k8s.pod.name=$(POD_NAME),k8s.namespace.name=$(NAMESPACE),k8s.container.name=$(CONTAINER_NAME)
image: openpolicyagent/gatekeeper:v3.18.0-beta.0
image: jaydipgabani/gatekeeper-e2e:latest
imagePullPolicy: Always
livenessProbe:
httpGet:
Expand Down
86 changes: 86 additions & 0 deletions pkg/pubsub/diskwriter/diskwriter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package diskwriter

import (
"context"
"encoding/json"
"fmt"
"os"
"path"
"sync"
"syscall"

"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection"
)

type DiskWriter struct {
mu sync.Mutex
auditId string
Path string `json:"path,omitempty"`
}

const (
Name = "diskwriter"
)

func (r *DiskWriter) Publish(_ context.Context, data interface{}, topic string) error {
jsonData, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("error marshaling data: %w", err)
}

path := path.Join(r.Path, "violations.txt")

file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}

defer file.Close()

// Acquire an exclusive lock on the file
if err := syscall.Flock(int(file.Fd()), syscall.LOCK_EX); err != nil {
return fmt.Errorf("failed to lock file: %w", err)
}
defer syscall.Flock(int(file.Fd()), syscall.LOCK_UN)

r.mu.Lock()
defer r.mu.Unlock()

_, err = file.WriteString(string(jsonData) + "\n")
if err != nil {
return fmt.Errorf("error publishing message to dapr: %w", err)
}

return nil
}

func (r *DiskWriter) CloseConnection() error {
return nil
}

func (r *DiskWriter) UpdateConnection(_ context.Context, config interface{}) error {
// m, ok := config.(map[string]interface{})
// if !ok {
// return fmt.Errorf("invalid type assertion, config is not in expected format")
// }
// path, ok := m["path"].(string)
// if !ok {
// return fmt.Errorf("failed to get value of path")
// }
// r.Path = path
return nil
}

// Returns a new client for dapr.
func NewConnection(_ context.Context, config interface{}) (connection.Connection, error) {
var diskWriter DiskWriter
m, ok := config.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid type assertion, config is not in expected format")
}
diskWriter.Path, ok = m["path"].(string)
if !ok {
return nil, fmt.Errorf("failed to get value of path")
}
return &diskWriter, nil
}
2 changes: 2 additions & 0 deletions pkg/pubsub/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (

"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/connection"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/dapr"
"github.com/open-policy-agent/gatekeeper/v3/pkg/pubsub/diskwriter"
)

var pubSubs = newPubSubSet(map[string]InitiateConnection{
dapr.Name: dapr.NewConnection,
diskwriter.Name: diskwriter.NewConnection,
},
)

Expand Down
35 changes: 35 additions & 0 deletions test/pubsub/fake-reader/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
ARG BUILDPLATFORM="linux/amd64"
ARG BUILDERIMAGE="golang:1.22-bookworm"
ARG BASEIMAGE="gcr.io/distroless/static-debian12:nonroot"

FROM --platform=$BUILDPLATFORM $BUILDERIMAGE as builder

ARG TARGETPLATFORM
ARG TARGETOS
ARG TARGETARCH
ARG TARGETVARIANT=""
ARG LDFLAGS

ENV GO111MODULE=on \
CGO_ENABLED=0 \
GOOS=${TARGETOS} \
GOARCH=${TARGETARCH} \
GOARM=${TARGETVARIANT}

WORKDIR /go/src/github.com/open-policy-agent/gatekeeper/test/pubsub/fake-reader

COPY . .

RUN go mod init && go mod tidy && go mod vendor

RUN go build -o main

FROM $BASEIMAGE

WORKDIR /

COPY --from=builder /go/src/github.com/open-policy-agent/gatekeeper/test/pubsub/fake-reader/main .

USER 65532:65532

ENTRYPOINT ["/main"]
80 changes: 80 additions & 0 deletions test/pubsub/fake-reader/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package main

import (
"bufio"
// "fmt"
"time"
"log"
"os"
"syscall"
)

type PubsubMsg struct {
ID string `json:"id,omitempty"`
Details interface{} `json:"details,omitempty"`
EventType string `json:"eventType,omitempty"`
Group string `json:"group,omitempty"`
Version string `json:"version,omitempty"`
Kind string `json:"kind,omitempty"`
Name string `json:"name,omitempty"`
Namespace string `json:"namespace,omitempty"`
Message string `json:"message,omitempty"`
EnforcementAction string `json:"enforcementAction,omitempty"`
ConstraintAnnotations map[string]string `json:"constraintAnnotations,omitempty"`
ResourceGroup string `json:"resourceGroup,omitempty"`
ResourceAPIVersion string `json:"resourceAPIVersion,omitempty"`
ResourceKind string `json:"resourceKind,omitempty"`
ResourceNamespace string `json:"resourceNamespace,omitempty"`
ResourceName string `json:"resourceName,omitempty"`
ResourceLabels map[string]string `json:"resourceLabels,omitempty"`
}

func main() {
filePath := "/tmp/violations/violations.txt"

for {
// Open the file in read-write mode
file, err := os.OpenFile(filePath, os.O_RDWR, 0644)
if err != nil {
log.Printf("Error opening file: %v\n", err)
time.Sleep(5 * time.Second)
continue
}

// Acquire an exclusive lock on the file
if err := syscall.Flock(int(file.Fd()), syscall.LOCK_EX); err != nil {
log.Fatalf("Error locking file: %v\n", err)
}

// Read the file content
scanner := bufio.NewScanner(file)
var lines []string
for scanner.Scan() {
lines = append(lines, scanner.Text())
}

if err := scanner.Err(); err != nil {
log.Fatalf("Error reading file: %v\n", err)
}

// Process the read content
for _, line := range lines {
log.Printf("Processed line: %s\n", line)
}

// Truncate the file to remove the processed content
if err := file.Truncate(0); err != nil {
log.Fatalf("Error truncating file: %v\n", err)
}

// Release the lock
if err := syscall.Flock(int(file.Fd()), syscall.LOCK_UN); err != nil {
log.Fatalf("Error unlocking file: %v\n", err)
}

// Close the file
if err := file.Close(); err != nil {
log.Fatalf("Error closing file: %v\n", err)
}
}
}
62 changes: 62 additions & 0 deletions test/pubsub/fake-writer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package main

import (
"fmt"
"os"
// "time"
"syscall"
)

type PubsubMsg struct {
ID string `json:"id,omitempty"`
Details interface{} `json:"details,omitempty"`
EventType string `json:"eventType,omitempty"`
Group string `json:"group,omitempty"`
Version string `json:"version,omitempty"`
Kind string `json:"kind,omitempty"`
Name string `json:"name,omitempty"`
Namespace string `json:"namespace,omitempty"`
Message string `json:"message,omitempty"`
EnforcementAction string `json:"enforcementAction,omitempty"`
ConstraintAnnotations map[string]string `json:"constraintAnnotations,omitempty"`
ResourceGroup string `json:"resourceGroup,omitempty"`
ResourceAPIVersion string `json:"resourceAPIVersion,omitempty"`
ResourceKind string `json:"resourceKind,omitempty"`
ResourceNamespace string `json:"resourceNamespace,omitempty"`
ResourceName string `json:"resourceName,omitempty"`
ResourceLabels map[string]string `json:"resourceLabels,omitempty"`
}

func main() {
path := "/mount/d/go/src/github.com/open-policy-agent/gatekeeper/violations.txt"
msgId := 1

for {
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
fmt.Println("failed to open file: %w", err)
}

// Acquire an exclusive lock on the file
if err := syscall.Flock(int(file.Fd()), syscall.LOCK_EX); err != nil {
fmt.Println("failed to lock file: %w", err)
}

_, err = file.WriteString(fmt.Sprintf("violation_msg_", msgId) + "\n")
if err != nil {
fmt.Println("error publishing message to dapr: %w", err)
}

// Release the lock
if err := syscall.Flock(int(file.Fd()), syscall.LOCK_UN); err != nil {
fmt.Println("Error unlocking file: %v\n", err)
}

// Close the file
if err := file.Close(); err != nil {
fmt.Println("Error closing file: %v\n", err)
}
fmt.Println("Published message: violation_msg_", msgId)
msgId++
}
}
Loading

0 comments on commit 8f58972

Please sign in to comment.