Skip to content

Commit

Permalink
added a function to handle file category change
Browse files Browse the repository at this point in the history
  • Loading branch information
satti-hari-krishna-reddy committed Jun 10, 2024
1 parent f2d99c6 commit e51c11a
Showing 1 changed file with 148 additions and 17 deletions.
165 changes: 148 additions & 17 deletions functions/onprem/orborus/orborus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ package main

import (
"github.com/shuffle/shuffle-shared"

"archive/zip"
"bytes"
"context"
"encoding/json"
Expand All @@ -29,6 +29,7 @@ import (
"strings"
"sync"
"time"
"path/filepath"

//"os/signal"
//"syscall"
Expand Down Expand Up @@ -1643,6 +1644,13 @@ func main() {
}

toBeRemoved.Data = append(toBeRemoved.Data, incRequest)
} else if incRequest.Type == "CATEGORY_CHANGE" {
err := handleFileCategoryChange()
if err != nil {
log.Printf("[ERROR] Failed to download the file category: %s", err)
}

toBeRemoved.Data = append(toBeRemoved.Data, incRequest)
} else if incRequest.Type == "DOCKER_IMAGE_DOWNLOAD" {
log.Printf("[INFO] Should delete -> download new image %#v", incRequest.ExecutionArgument)

Expand All @@ -1654,7 +1662,11 @@ func main() {

}
toBeRemoved.Data = append(toBeRemoved.Data, incRequest)
} else {
} else if incRequest.Type == "CATEGORY_UPDATE" {
handleFileCategoryChange()
toBeRemoved.Data = append(toBeRemoved.Data, incRequest)

}else {
newrequests = append(newrequests, incRequest)
}
}
Expand Down Expand Up @@ -2265,24 +2277,24 @@ func createPipeline(command, identifier string) (string, error) {
log.Printf("[INFO] an existing pipeline found with ID: %s. it will be deleted", pipelineId)
toBeDeleted = true
}
if strings.Contains(command, "shuffler.io") {
// if strings.Contains(command, "shuffler.io") {

} else {
var scheme string
if strings.Contains(command, "http://") {
scheme = "http://"
} else if strings.Contains(command, "https://") {
scheme = "https://"
}
// } else {
// var scheme string
// if strings.Contains(command, "http://") {
// scheme = "http://"
// } else if strings.Contains(command, "https://") {
// scheme = "https://"
// }

startIndex := strings.Index(command, scheme)
if startIndex != -1 {
endIndex := startIndex + len(scheme)
endIndex += strings.Index(command[endIndex:], "/")
// startIndex := strings.Index(command, scheme)
// if startIndex != -1 {
// endIndex := startIndex + len(scheme)
// endIndex += strings.Index(command[endIndex:], "/")

command = command[:startIndex] + baseUrl + command[endIndex:]
}
}
// command = command[:startIndex] + baseUrl + command[endIndex:]
// }
// }
requestBody := map[string]interface{}{
"definition": command,
"name": identifier,
Expand Down Expand Up @@ -2508,6 +2520,125 @@ func searchPipeline(identifier string) (string, error) {
return "", errors.New("no existing pipeline found with name")
}

func handleFileCategoryChange() error{
apiEndpoint := "https://expert-acorn-v6vg4j4j5w7q2wg6g-5001.app.github.dev/api/v1/files/namespaces/hari"
apiKey := "23e57313-5f0f-4a20-bddd-a9059c980adf"

req, err := http.NewRequest("GET", apiEndpoint, nil)
if err != nil {
return err
}

req.Header.Add("Authorization", "Bearer "+apiKey)

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return err
}

out, err := os.Create("files.zip")
if err != nil {
return err
}

defer out.Close()
defer os.Remove("files.zip")

_, err = io.Copy(out, resp.Body)
if err != nil {
return err
}

fmt.Println("ZIP file downloaded successfully.")

err = extractZIP("files.zip", "unzipped_files")
if err != nil {
return err
}

destPath := "/var/lib/tenzir/unzipped_files"

err = copyToTenzir("unzipped_files", destPath)
if err != nil {
return err
}

fmt.Println("Files copied to container successfully.")
return nil
}

func extractZIP(zipFile, destDir string) error {
r, err := zip.OpenReader(zipFile)
if err != nil {
return err
}
defer r.Close()

if err := os.MkdirAll(destDir, 0755); err != nil {
return err
}

for _, f := range r.File {
err := extractFile(f, destDir)
if err != nil {
return err
}
}

return nil
}

func extractFile(f *zip.File, destDir string) error {
rc, err := f.Open()
if err != nil {
return err
}
defer rc.Close()

path := filepath.Join(destDir, f.Name)

out, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
if err != nil {
return err
}
defer out.Close()

_, err = io.Copy(out, rc)
return err
}

func copyToTenzir(srcPath, destPath string) error {
containerName := "tenzir-node"

// Check if the extracted_files directory exists in the container
checkCmd := exec.Command("docker", "exec", containerName, "test", "-d", destPath)
if err := checkCmd.Run(); err == nil {
rmCmd := exec.Command("docker", "exec", "-u", "root", containerName, "rm", "-rf", destPath)
if err := rmCmd.Run(); err != nil {
return fmt.Errorf("error removing existing directory in container: %v", err)
}
}

// Copy the new directory to the container
cpCmd := exec.Command("docker", "cp", srcPath, fmt.Sprintf("%s:%s", containerName, destPath))
var out bytes.Buffer
cpCmd.Stdout = &out
cpCmd.Stderr = &out

err := cpCmd.Run()
if err != nil {
return fmt.Errorf("error copying files: %v, output: %s", err, out.String())
}

return nil
}

// func savePipelineData(pipelineId, identifier, status string) error {

// url := fmt.Sprintf("%s/api/v1/triggers/pipeline/save", baseUrl)
Expand Down

0 comments on commit e51c11a

Please sign in to comment.