diff --git a/apis/connection/v1alpha1/connection_types.go b/apis/connection/v1alpha1/connection_types.go index 8ef9df0f59a..89c2cb25101 100644 --- a/apis/connection/v1alpha1/connection_types.go +++ b/apis/connection/v1alpha1/connection_types.go @@ -53,6 +53,7 @@ type Connection struct { Spec ConnectionSpec `json:"spec,omitempty"` Status ConnectionStatus `json:"status,omitempty"` } + // +kubebuilder:object:root=true // ConnectionList contains a list of Config. diff --git a/pkg/pubsub/diskwriter/diskwriter.go b/pkg/pubsub/diskwriter/diskwriter.go index 94e5429790c..7a385cd7a60 100644 --- a/pkg/pubsub/diskwriter/diskwriter.go +++ b/pkg/pubsub/diskwriter/diskwriter.go @@ -2,6 +2,8 @@ package diskwriter import ( "context" + "crypto/rand" + "encoding/hex" "encoding/json" "fmt" "os" @@ -13,8 +15,11 @@ import ( ) type DiskWriter struct { - Path string `json:"path,omitempty"` - file *os.File + Path string `json:"path,omitempty"` + file *os.File + auditRuns []string + currentAuditRun string + auditRunCount int } const ( @@ -27,9 +32,20 @@ func (r *DiskWriter) Publish(_ context.Context, data interface{}, _ string) erro err := syscall.Flock(int(r.file.Fd()), syscall.LOCK_UN) r.file.Close() r.file = nil + r.auditRuns = append(r.auditRuns, r.currentAuditRun) + r.currentAuditRun = "" + if len(r.auditRuns) > 3 { + os.Remove(r.auditRuns[0]) + r.auditRuns = r.auditRuns[1:] + } + r.auditRunCount++ return err } + if r.currentAuditRun == "" { + r.currentAuditRun = generateRandomFileName() + ".txt" + } + jsonData, err := json.Marshal(data) if err != nil { return fmt.Errorf("error marshaling data: %w", err) @@ -37,7 +53,7 @@ func (r *DiskWriter) Publish(_ context.Context, data interface{}, _ string) erro if r.file == nil { // Open a new file and acquire a lock - filePath := path.Join(r.Path, "violations.txt") + filePath := path.Join(r.Path, r.currentAuditRun) file, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) if err != nil { return fmt.Errorf("failed to open file: %w", err) @@ -59,7 +75,7 @@ func (r *DiskWriter) Publish(_ context.Context, data interface{}, _ string) erro } } - _, err = r.file.WriteString(string(jsonData) + "\n") + _, err = r.file.WriteString(fmt.Sprintf("Audit ID :%d", r.auditRunCount) + string(jsonData) + "\n") if err != nil { return fmt.Errorf("error publishing message to dapr: %w", err) } @@ -97,3 +113,12 @@ func NewConnection(_ context.Context, config interface{}) (connection.Connection } return &diskWriter, nil } + +func generateRandomFileName() string { + b := make([]byte, 16) + _, err := rand.Read(b) + if err != nil { + panic(err) + } + return hex.EncodeToString(b) +} diff --git a/test/pubsub/fake-reader/main.go b/test/pubsub/fake-reader/main.go index 8f1ac996658..c6b49b07096 100644 --- a/test/pubsub/fake-reader/main.go +++ b/test/pubsub/fake-reader/main.go @@ -2,10 +2,11 @@ package main import ( "bufio" + "fmt" "log" "os" + "path/filepath" "syscall" - // "fmt". "time" ) @@ -37,11 +38,19 @@ type PubsubMsg struct { // 1 - GK publish violations // 1 - policy read violations. func main() { - filePath := "/tmp/violations/violations.txt" + dirPath := "/tmp/violations/" for { + // Find the latest created file in dirPath + latestFile, files, err := getLatestFile(dirPath) + if err != nil { + log.Printf("Error finding latest file: %v\n", err) + time.Sleep(5 * time.Second) + continue + } + log.Printf("out of all files: %v, reading from the %s \n", files, latestFile) // Open the file in read-write mode - file, err := os.OpenFile(filePath, os.O_RDWR, 0o644) + file, err := os.OpenFile(latestFile, os.O_RDWR, 0o644) if err != nil { log.Printf("Error opening file: %v\n", err) time.Sleep(5 * time.Second) @@ -69,11 +78,6 @@ func main() { log.Printf("Processed line: %s\n", line) } - // Truncate the file to remove the processed content - if err := file.Truncate(0); err != nil { - log.Fatalf("Error truncating file: %v\n", err) - } - // Release the lock if err := syscall.Flock(int(file.Fd()), syscall.LOCK_UN); err != nil { log.Fatalf("Error unlocking file: %v\n", err) @@ -83,5 +87,35 @@ func main() { if err := file.Close(); err != nil { log.Fatalf("Error closing file: %v\n", err) } + time.Sleep(90 * time.Second) } } + +func getLatestFile(dirPath string) (string, []string, error) { + var latestFile string + var latestModTime time.Time + var files []string + + err := filepath.Walk(dirPath, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() && (latestFile == "" || info.ModTime().After(latestModTime)) { + latestFile = path + latestModTime = info.ModTime() + } + if !info.IsDir() { + files = append(files, path) + } + return nil + }) + if err != nil { + return "", files, err + } + + if latestFile == "" { + return "", files, fmt.Errorf("no files found in directory: %s", dirPath) + } + + return latestFile, files, nil +} diff --git a/test/pubsub/fake-writer/main.go b/test/pubsub/fake-writer/main.go index 4854d9b59e1..8968963d5bc 100644 --- a/test/pubsub/fake-writer/main.go +++ b/test/pubsub/fake-writer/main.go @@ -3,7 +3,6 @@ package main import ( "fmt" "os" - // "time". "syscall" )