diff --git a/functions/onprem/orborus/orborus.go b/functions/onprem/orborus/orborus.go index f9e28d44a..7109f8708 100755 --- a/functions/onprem/orborus/orborus.go +++ b/functions/onprem/orborus/orborus.go @@ -10,7 +10,7 @@ package main import ( "github.com/shuffle/shuffle-shared" - + "archive/zip" "bytes" "context" "encoding/json" @@ -29,6 +29,7 @@ import ( "strings" "sync" "time" + "path/filepath" //"os/signal" //"syscall" @@ -1643,6 +1644,13 @@ func main() { } toBeRemoved.Data = append(toBeRemoved.Data, incRequest) + } else if incRequest.Type == "CATEGORY_CHANGE" { + err := handleFileCategoryChange() + if err != nil { + log.Printf("[ERROR] Failed to download the file category: %s", err) + } + + toBeRemoved.Data = append(toBeRemoved.Data, incRequest) } else if incRequest.Type == "DOCKER_IMAGE_DOWNLOAD" { log.Printf("[INFO] Should delete -> download new image %#v", incRequest.ExecutionArgument) @@ -1654,7 +1662,11 @@ func main() { } toBeRemoved.Data = append(toBeRemoved.Data, incRequest) - } else { + } else if incRequest.Type == "CATEGORY_UPDATE" { + handleFileCategoryChange() + toBeRemoved.Data = append(toBeRemoved.Data, incRequest) + + }else { newrequests = append(newrequests, incRequest) } } @@ -2265,24 +2277,24 @@ func createPipeline(command, identifier string) (string, error) { log.Printf("[INFO] an existing pipeline found with ID: %s. it will be deleted", pipelineId) toBeDeleted = true } - if strings.Contains(command, "shuffler.io") { + // if strings.Contains(command, "shuffler.io") { - } else { - var scheme string - if strings.Contains(command, "http://") { - scheme = "http://" - } else if strings.Contains(command, "https://") { - scheme = "https://" - } + // } else { + // var scheme string + // if strings.Contains(command, "http://") { + // scheme = "http://" + // } else if strings.Contains(command, "https://") { + // scheme = "https://" + // } - startIndex := strings.Index(command, scheme) - if startIndex != -1 { - endIndex := startIndex + len(scheme) - endIndex += strings.Index(command[endIndex:], "/") + // startIndex := strings.Index(command, scheme) + // if startIndex != -1 { + // endIndex := startIndex + len(scheme) + // endIndex += strings.Index(command[endIndex:], "/") - command = command[:startIndex] + baseUrl + command[endIndex:] - } - } + // command = command[:startIndex] + baseUrl + command[endIndex:] + // } + // } requestBody := map[string]interface{}{ "definition": command, "name": identifier, @@ -2508,6 +2520,125 @@ func searchPipeline(identifier string) (string, error) { return "", errors.New("no existing pipeline found with name") } +func handleFileCategoryChange() error{ + apiEndpoint := "https://expert-acorn-v6vg4j4j5w7q2wg6g-5001.app.github.dev/api/v1/files/namespaces/hari" + apiKey := "23e57313-5f0f-4a20-bddd-a9059c980adf" + + req, err := http.NewRequest("GET", apiEndpoint, nil) + if err != nil { + return err + } + + req.Header.Add("Authorization", "Bearer "+apiKey) + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return err + } + + out, err := os.Create("files.zip") + if err != nil { + return err + } + + defer out.Close() + defer os.Remove("files.zip") + + _, err = io.Copy(out, resp.Body) + if err != nil { + return err + } + + fmt.Println("ZIP file downloaded successfully.") + + err = extractZIP("files.zip", "unzipped_files") + if err != nil { + return err + } + + destPath := "/var/lib/tenzir/unzipped_files" + + err = copyToTenzir("unzipped_files", destPath) + if err != nil { + return err + } + + fmt.Println("Files copied to container successfully.") + return nil +} + +func extractZIP(zipFile, destDir string) error { + r, err := zip.OpenReader(zipFile) + if err != nil { + return err + } + defer r.Close() + + if err := os.MkdirAll(destDir, 0755); err != nil { + return err + } + + for _, f := range r.File { + err := extractFile(f, destDir) + if err != nil { + return err + } + } + + return nil +} + +func extractFile(f *zip.File, destDir string) error { + rc, err := f.Open() + if err != nil { + return err + } + defer rc.Close() + + path := filepath.Join(destDir, f.Name) + + out, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode()) + if err != nil { + return err + } + defer out.Close() + + _, err = io.Copy(out, rc) + return err +} + +func copyToTenzir(srcPath, destPath string) error { + containerName := "tenzir-node" + + // Check if the extracted_files directory exists in the container + checkCmd := exec.Command("docker", "exec", containerName, "test", "-d", destPath) + if err := checkCmd.Run(); err == nil { + rmCmd := exec.Command("docker", "exec", "-u", "root", containerName, "rm", "-rf", destPath) + if err := rmCmd.Run(); err != nil { + return fmt.Errorf("error removing existing directory in container: %v", err) + } + } + + // Copy the new directory to the container + cpCmd := exec.Command("docker", "cp", srcPath, fmt.Sprintf("%s:%s", containerName, destPath)) + var out bytes.Buffer + cpCmd.Stdout = &out + cpCmd.Stderr = &out + + err := cpCmd.Run() + if err != nil { + return fmt.Errorf("error copying files: %v, output: %s", err, out.String()) + } + + return nil +} + // func savePipelineData(pipelineId, identifier, status string) error { // url := fmt.Sprintf("%s/api/v1/triggers/pipeline/save", baseUrl)