Skip to content

Commit

Permalink
maintaining last 3 audit runs on disk
Browse files Browse the repository at this point in the history
Signed-off-by: Jaydip Gabani <[email protected]>
  • Loading branch information
JaydipGabani committed Dec 31, 2024
1 parent 7250fba commit 03f0059
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 13 deletions.
1 change: 1 addition & 0 deletions apis/connection/v1alpha1/connection_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Connection struct {
Spec ConnectionSpec `json:"spec,omitempty"`
Status ConnectionStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// ConnectionList contains a list of Config.
Expand Down
33 changes: 29 additions & 4 deletions pkg/pubsub/diskwriter/diskwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package diskwriter

import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"os"
Expand All @@ -13,8 +15,11 @@ import (
)

type DiskWriter struct {
Path string `json:"path,omitempty"`
file *os.File
Path string `json:"path,omitempty"`
file *os.File
auditRuns []string
currentAuditRun string
auditRunCount int
}

const (
Expand All @@ -27,17 +32,28 @@ func (r *DiskWriter) Publish(_ context.Context, data interface{}, _ string) erro
err := syscall.Flock(int(r.file.Fd()), syscall.LOCK_UN)
r.file.Close()
r.file = nil
r.auditRuns = append(r.auditRuns, r.currentAuditRun)
r.currentAuditRun = ""
if len(r.auditRuns) > 3 {
os.Remove(r.auditRuns[0])
r.auditRuns = r.auditRuns[1:]
}
r.auditRunCount++
return err
}

if r.currentAuditRun == "" {
r.currentAuditRun = generateRandomFileName() + ".txt"
}

jsonData, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("error marshaling data: %w", err)
}

if r.file == nil {
// Open a new file and acquire a lock
filePath := path.Join(r.Path, "violations.txt")
filePath := path.Join(r.Path, r.currentAuditRun)
file, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
Expand All @@ -59,7 +75,7 @@ func (r *DiskWriter) Publish(_ context.Context, data interface{}, _ string) erro
}
}

_, err = r.file.WriteString(string(jsonData) + "\n")
_, err = r.file.WriteString(fmt.Sprintf("Audit ID :%d", r.auditRunCount) + string(jsonData) + "\n")
if err != nil {
return fmt.Errorf("error publishing message to dapr: %w", err)
}
Expand Down Expand Up @@ -97,3 +113,12 @@ func NewConnection(_ context.Context, config interface{}) (connection.Connection
}
return &diskWriter, nil
}

func generateRandomFileName() string {
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
panic(err)
}
return hex.EncodeToString(b)
}
50 changes: 42 additions & 8 deletions test/pubsub/fake-reader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package main

import (
"bufio"
"fmt"
"log"
"os"
"path/filepath"
"syscall"
// "fmt".
"time"
)

Expand Down Expand Up @@ -37,11 +38,19 @@ type PubsubMsg struct {
// 1 - GK publish violations
// 1 - policy read violations.
func main() {
filePath := "/tmp/violations/violations.txt"
dirPath := "/tmp/violations/"

for {
// Find the latest created file in dirPath
latestFile, files, err := getLatestFile(dirPath)
if err != nil {
log.Printf("Error finding latest file: %v\n", err)
time.Sleep(5 * time.Second)
continue
}
log.Printf("out of all files: %v, reading from the %s \n", files, latestFile)
// Open the file in read-write mode
file, err := os.OpenFile(filePath, os.O_RDWR, 0o644)
file, err := os.OpenFile(latestFile, os.O_RDWR, 0o644)
if err != nil {
log.Printf("Error opening file: %v\n", err)
time.Sleep(5 * time.Second)
Expand Down Expand Up @@ -69,11 +78,6 @@ func main() {
log.Printf("Processed line: %s\n", line)
}

// Truncate the file to remove the processed content
if err := file.Truncate(0); err != nil {
log.Fatalf("Error truncating file: %v\n", err)
}

// Release the lock
if err := syscall.Flock(int(file.Fd()), syscall.LOCK_UN); err != nil {
log.Fatalf("Error unlocking file: %v\n", err)
Expand All @@ -83,5 +87,35 @@ func main() {
if err := file.Close(); err != nil {
log.Fatalf("Error closing file: %v\n", err)
}
time.Sleep(90 * time.Second)
}
}

func getLatestFile(dirPath string) (string, []string, error) {
var latestFile string
var latestModTime time.Time
var files []string

err := filepath.Walk(dirPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() && (latestFile == "" || info.ModTime().After(latestModTime)) {
latestFile = path
latestModTime = info.ModTime()
}
if !info.IsDir() {
files = append(files, path)
}
return nil
})
if err != nil {
return "", files, err
}

if latestFile == "" {
return "", files, fmt.Errorf("no files found in directory: %s", dirPath)
}

return latestFile, files, nil
}
1 change: 0 additions & 1 deletion test/pubsub/fake-writer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"fmt"
"os"
// "time".
"syscall"
)

Expand Down

0 comments on commit 03f0059

Please sign in to comment.