Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
frikky committed Apr 30, 2024
2 parents d35d605 + 20a5e79 commit 899e218
Show file tree
Hide file tree
Showing 4 changed files with 721 additions and 20 deletions.
244 changes: 244 additions & 0 deletions db-connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7696,6 +7696,185 @@ func GetSchedule(ctx context.Context, schedulename string) (*ScheduleOld, error)
return curUser, nil
}

func GetHooks(ctx context.Context, OrgId string) ([]Hook, error) {
hooks := []Hook{}
nameKey := "hooks"
OrgId = strings.ToLower(OrgId)

//FIXME: Implement caching

if project.DbType == "opensearch" {
var buf bytes.Buffer
query := map[string]interface{}{
"from": 0,
"size": 1000,
"query": map[string]interface{}{
"match": map[string]interface{}{
"org_id": OrgId,
},
},
}

if err := json.NewEncoder(&buf).Encode(query); err != nil {
log.Printf("[WARNING] Error encoding find user query: %s", err)
return []Hook{}, err
}

res, err := project.Es.Search(
project.Es.Search.WithContext(ctx),
project.Es.Search.WithIndex(strings.ToLower(GetESIndexPrefix(nameKey))),
project.Es.Search.WithBody(&buf),
project.Es.Search.WithTrackTotalHits(true),
)
if err != nil {
log.Printf("[ERROR] Error getting response from Opensearch (get hooks): %s", err)
return []Hook{}, err
}

defer res.Body.Close()
if res.StatusCode == 404 {
return []Hook{}, nil
}

defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
log.Printf("[WARNING] Error parsing the response body: %s", err)
return []Hook{}, nil
} else {
// Print the response status and error information.
log.Printf("[%s] %s: %s",
res.Status(),
e["error"].(map[string]interface{})["type"],
e["error"].(map[string]interface{})["reason"],
)
}
}

if res.StatusCode != 200 && res.StatusCode != 201 {
return []Hook{}, fmt.Errorf("Bad statuscode: %d", res.StatusCode)
}

respBody, err := ioutil.ReadAll(res.Body)
if err != nil {
return []Hook{}, err
}
wrapper := AllHooksWrapper{}
err = json.Unmarshal(respBody, &wrapper)

if err != nil {
return []Hook{}, err
}

for _, hit := range wrapper.Hits.Hits {
hook := hit.Source
hooks = append(hooks, hook)
}
return hooks, err

} else {
q := datastore.NewQuery(nameKey).Filter("org_id = ", OrgId).Limit(1000)

_, err := project.Dbclient.GetAll(ctx, q, &hooks)
if err != nil && len(hooks) == 0 {
return hooks, err
}
}

return hooks, nil
}

func GetPipelines(ctx context.Context, OrgId string) ([]Pipeline, error) {
pipelines := []Pipeline{}
nameKey := "pipelines"
OrgId = strings.ToLower(OrgId)

//FIXME: Implement caching

if project.DbType == "opensearch" {
var buf bytes.Buffer
query := map[string]interface{}{
"from": 0,
"size": 1000,
"query": map[string]interface{}{
"match": map[string]interface{}{
"org_id": OrgId,
},
},
}

if err := json.NewEncoder(&buf).Encode(query); err != nil {
log.Printf("[WARNING] Error encoding find user query: %s", err)
return []Pipeline{}, err
}

res, err := project.Es.Search(
project.Es.Search.WithContext(ctx),
project.Es.Search.WithIndex(strings.ToLower(GetESIndexPrefix(nameKey))),
project.Es.Search.WithBody(&buf),
project.Es.Search.WithTrackTotalHits(true),
)
if err != nil {
log.Printf("[ERROR] Error getting response from Opensearch (get pipelines): %s", err)
return []Pipeline{}, err
}

defer res.Body.Close()
if res.StatusCode == 404 {
return []Pipeline{}, nil
}

defer res.Body.Close()
if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
log.Printf("[WARNING] Error parsing the response body: %s", err)
return []Pipeline{}, nil
} else {
// Print the response status and error information.
log.Printf("[%s] %s: %s",
res.Status(),
e["error"].(map[string]interface{})["type"],
e["error"].(map[string]interface{})["reason"],
)
}
}

if res.StatusCode != 200 && res.StatusCode != 201 {
return []Pipeline{}, fmt.Errorf("bad statuscode: %d", res.StatusCode)

}

respBody, err := ioutil.ReadAll(res.Body)
if err != nil {
return []Pipeline{}, err
}
wrapper := AllPipelinesWrapper{}
err = json.Unmarshal(respBody, &wrapper)

if err != nil {
return []Pipeline{}, err
}

for _, hit := range wrapper.Hits.Hits {
pipeline := hit.Source
pipelines = append(pipelines, pipeline)
}
return pipelines, err

} else {
// q := datastore.NewQuery(nameKey).Filter("org_id = ", OrgId).Limit(1000)

// _, err := project.Dbclient.GetAll(ctx, q, &pipelines)
// if err != nil && len(pipelines) == 0 {
// return pipelines, err
// }
}

return pipelines, nil
}

func GetSessionNew(ctx context.Context, sessionId string) (User, error) {
cacheKey := fmt.Sprintf("session_%s", sessionId)
user := &User{}
Expand Down Expand Up @@ -7916,6 +8095,31 @@ func GetApikey(ctx context.Context, apikey string) (User, error) {
return users[0], nil
}

func savePipelineData(ctx context.Context, pipeline Pipeline) error {
// assuming IndexRequest can be used as an upsert operation
nameKey := "pipelines"

pipelineData, err := json.Marshal(pipeline)
if err != nil {
log.Printf("[WARNING] Failed marshalling in savePipelineData: %s", err)
return err
}
triggerId := strings.ToLower(pipeline.TriggerId)
if project.DbType == "opensearch" {
err = indexEs(ctx, nameKey, triggerId, pipelineData)
if err != nil {
return err
}
} else {
// key := datastore.NameKey(nameKey, pipelineId, nil)
// if _, err := project.Dbclient.Put(ctx, key, &pipeline); err != nil {
// log.Printf("[ERROR] failed to add pipeline: %s", err)
// return err
}

return nil
}

func GetHook(ctx context.Context, hookId string) (*Hook, error) {
nameKey := "hooks"
hookId = strings.ToLower(hookId)
Expand Down Expand Up @@ -8026,6 +8230,46 @@ func SetHook(ctx context.Context, hook Hook) error {
return nil
}

func GetPipeline(ctx context.Context, triggerId string) (*Pipeline, error) {
pipeline := &Pipeline{}
nameKey := "pipelines"

triggerId = strings.ToLower(triggerId)

if project.DbType == "opensearch" {

res, err := project.Es.Get(strings.ToLower(GetESIndexPrefix(nameKey)), triggerId)
if err != nil {
return &Pipeline{}, err
}

defer res.Body.Close()
if res.StatusCode == 404 {
return &Pipeline{}, errors.New("pipeline doesn't exist")
}

respBody, err := ioutil.ReadAll(res.Body)
if err != nil {
return &Pipeline{}, err
}

wrapped := PipelineWrapper{}
err = json.Unmarshal(respBody, &wrapped)
if err != nil {
return &Pipeline{}, err
}

pipeline = &wrapped.Source
} else {
// key := datastore.NameKey(nameKey, triggerId, nil)
// err := project.Dbclient.Get(ctx, key, pipeline)
// if err != nil {
// return &Pipeline{}, err
// }
}
return pipeline, nil
}

func GetNotification(ctx context.Context, id string) (*Notification, error) {
nameKey := "notifications"

Expand Down
49 changes: 34 additions & 15 deletions pipelines.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package shuffle

import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
"io/ioutil"
"encoding/json"

"github.com/satori/go.uuid"
uuid "github.com/satori/go.uuid"
)

// Pipeline is a sequence of stages that are executed in order.
Expand Down Expand Up @@ -75,7 +75,6 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)
return
}


ctx := GetContext(request)
environments, err := GetEnvironments(ctx, user.ActiveOrg.Id)
if err != nil {
Expand All @@ -101,7 +100,7 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)
}

availableCommands := []string{
"create",
"create", "delete", "start", "stop",
}

matchingCommand := ""
Expand All @@ -120,13 +119,12 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)
}

// 1. Add to trigger list
/* TBD */

/* TBD */

// Look for PIPELINE_ command that exists in the queue already
parsedId := fmt.Sprintf("%s_%s", strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(pipeline.Environment, " ", "-"), "_", "-")), user.ActiveOrg.Id)

startCommand := strings.ToUpper(strings.Split(pipeline.Type, " ")[0])
//parsedId := fmt.Sprintf("%s_%s", strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(pipeline.Environment, " ", "-"), "_", "-")), user.ActiveOrg.Id)
parsedId := strings.ToLower(pipeline.Environment)
formattedType := fmt.Sprintf("PIPELINE_%s", startCommand)
existingQueue, err := GetWorkflowQueue(ctx, parsedId, 10)
for _, queue := range existingQueue.Data {
Expand All @@ -142,11 +140,33 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)

// 2. Send to environment queue
execRequest := ExecutionRequest{
Type: formattedType,
ExecutionId: uuid.NewV4().String(),
ExecutionSource: pipeline.Name,
Type: formattedType,
ExecutionId: uuid.NewV4().String(),
ExecutionSource: pipeline.TriggerId,
ExecutionArgument: pipeline.Command,
Priority: 11,
Priority: 11,
}

if startCommand == "CREATE" {

pipelineData := Pipeline{}
pipelineData.Name = pipeline.Name
pipelineData.Type = startCommand
pipelineData.Command = pipeline.Command
pipelineData.Environment = pipeline.Environment
pipelineData.WorkflowId = pipeline.WorkflowId
pipelineData.OrgId = user.ActiveOrg.Id
pipelineData.Status = "uninitialized"
pipelineData.TriggerId = pipeline.TriggerId

err = savePipelineData(ctx, pipelineData)
if err != nil {
log.Printf("[ERROR] Failed to save the pipeline with trigger id: %s into the db: %s", pipeline.TriggerId, err)
resp.WriteHeader(500)
resp.Write([]byte(`{"success": false}`))
return
}
log.Printf("[INFO] Successfully saved the pipeline info")
}

err = SetWorkflowQueue(ctx, execRequest, parsedId)
Expand All @@ -157,7 +177,6 @@ func HandleNewPipelineRegister(resp http.ResponseWriter, request *http.Request)
return
}

pipelineId := "test"
resp.WriteHeader(200)
resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Pipeline created", "id": "%s"}`, pipelineId)))
resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Pipeline will be created"}`)))
}
Loading

0 comments on commit 899e218

Please sign in to comment.