Skip to content

Commit

Permalink
Interlink
Browse files Browse the repository at this point in the history
  • Loading branch information
SergioLangaritaBenitez committed Jan 9, 2024
1 parent f993a74 commit 1615930
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 24 deletions.
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func main() {
// Check if the cluster has available GPUs
cfg.CheckAvailableGPUs(kubeClientset)

cfg.CheckAvailableInterLink(kubeClientset)

// Create the ServerlessBackend
back := backends.MakeServerlessBackend(kubeClientset, kubeConfig, cfg)

Expand Down
58 changes: 47 additions & 11 deletions pkg/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package handlers

import (
"context"
"encoding/base64"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -46,7 +47,18 @@ var (
command = []string{"/bin/sh"}
)

// MakeJobHandler makes a handler to manage async invocations
const (
//
NodeSelectorKey = "kubernetes.io/hostname"

InterLinkNodeName = "vega-new-vk"
InterLinkDNSPolicy = "ClusterFirst"
InterLinkRestartPolicy = "OnFailure"
InterLinkTolerationKey = "virtual-node.interlink/no-schedule"
InterLinkTolerationOperator = "Exists"
)

// MakeJobHandler makes a han/home/slangarita/Escritorio/interlink-cluster/PodCern/PodCern.yamldler to manage async invocations
func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back types.ServerlessBackend, rm resourcemanager.ResourceManager) gin.HandlerFunc {
return func(c *gin.Context) {
service, err := back.ReadService(c.Param("serviceName"))
Expand All @@ -60,6 +72,13 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back
return
}

// Get podSpec from the service
podSpec, err := service.ToPodSpec(cfg)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}

// Check auth token
authHeader := c.GetHeader("Authorization")
splitToken := strings.Split(authHeader, "Bearer ")
Expand All @@ -80,9 +99,32 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back
return
}
// Make event envVar
event := v1.EnvVar{
Name: types.EventVariable,
Value: string(eventBytes),

event := v1.EnvVar{}
var args string
if cfg.InterLinkAvailable && service.EnableInterLink {
event = v1.EnvVar{
Name: types.EventVariable,
Value: base64.StdEncoding.EncodeToString([]byte(eventBytes)),
}
args = fmt.Sprintf("\" wget %s && chmod 0755 supervisor && echo \\$%s | base64 -d | ./supervisor \"", service.GetSupervisorURL(), types.EventVariable)
podSpec.NodeSelector = map[string]string{
NodeSelectorKey: InterLinkNodeName,
}
podSpec.DNSPolicy = InterLinkDNSPolicy
podSpec.RestartPolicy = InterLinkRestartPolicy
podSpec.Tolerations = []v1.Toleration{
{
Key: InterLinkTolerationKey,
Operator: InterLinkTolerationOperator,
},
}
} else {
event = v1.EnvVar{
Name: types.EventVariable,
Value: string(eventBytes),
}
args = fmt.Sprintf("echo $%s | %s", types.EventVariable, service.GetSupervisorPath())
}

// Make JOB_UUID envVar
Expand All @@ -102,18 +144,12 @@ func MakeJobHandler(cfg *types.Config, kubeClientset *kubernetes.Clientset, back
},
}

// Get podSpec from the service
podSpec, err := service.ToPodSpec(cfg)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
return
}
// Add podSpec variables
podSpec.RestartPolicy = restartPolicy
for i, c := range podSpec.Containers {
if c.Name == types.ContainerName {
podSpec.Containers[i].Command = command
podSpec.Containers[i].Args = []string{"-c", fmt.Sprintf("echo $%s | %s", types.EventVariable, service.GetSupervisorPath())}
podSpec.Containers[i].Args = []string{"-c", args}
podSpec.Containers[i].Env = append(podSpec.Containers[i].Env, event)
podSpec.Containers[i].Env = append(podSpec.Containers[i].Env, jobUUIDVar)
podSpec.Containers[i].Env = append(podSpec.Containers[i].Env, resourceIDVar)
Expand Down
17 changes: 17 additions & 0 deletions pkg/types/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ type Config struct {
// Parameter used to check if the cluster have GPUs
GPUAvailable bool `json:"gpu_available"`

// Parameter used to check if the cluster have vega nodes
InterLinkAvailable bool `json:"interLink_available"`

// Port used for the ClusterIP k8s service (default: 8080)
ServicePort int `json:"-"`

Expand Down Expand Up @@ -354,3 +357,17 @@ func (cfg *Config) CheckAvailableGPUs(kubeClientset kubernetes.Interface) {
}
}
}

func (cfg *Config) CheckAvailableInterLink(kubeClientset kubernetes.Interface) {
nodes, err := kubeClientset.CoreV1().Nodes().List(context.TODO(),
metav1.ListOptions{LabelSelector: "!node-role.kubernetes.io/control-plane,!node-role.kubernetes.io/master,type=virtual-kubelet"})
if err != nil {
log.Printf("Error getting list of nodes: %v\n", err)
}
if len(nodes.Items) > 0 {
cfg.InterLinkAvailable = true
} else {
cfg.InterLinkAvailable = false
}

}
43 changes: 30 additions & 13 deletions pkg/types/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ const (
// SupervisorName name of the FaaS Supervisor binary
SupervisorName = "supervisor"

//
SupervisorURL = "https://github.com/grycap/faas-supervisor/releases/download/1.5.8/supervisor"

// ServiceLabel label for deploying services in all backs
ServiceLabel = "oscar_service"

Expand Down Expand Up @@ -232,6 +235,8 @@ type Service struct {
// Clusters configuration for the OSCAR clusters that can be used as service's replicas
// Optional
Clusters map[string]Cluster `json:"clusters,omitempty"`

EnableInterLink bool `json:"enable_InterLink"`
}

// ToPodSpec returns a k8s podSpec from the Service
Expand All @@ -249,11 +254,6 @@ func (service *Service) ToPodSpec(cfg *Config) (*v1.PodSpec, error) {
Image: service.Image,
Env: ConvertEnvVars(service.Environment.Vars),
VolumeMounts: []v1.VolumeMount{
{
Name: VolumeName,
ReadOnly: true,
MountPath: VolumePath,
},
{
Name: ConfigVolumeName,
ReadOnly: true,
Expand All @@ -265,14 +265,6 @@ func (service *Service) ToPodSpec(cfg *Config) (*v1.PodSpec, error) {
},
},
Volumes: []v1.Volume{
{
Name: VolumeName,
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: PVCName,
},
},
},
{
Name: ConfigVolumeName,
VolumeSource: v1.VolumeSource{
Expand All @@ -285,7 +277,28 @@ func (service *Service) ToPodSpec(cfg *Config) (*v1.PodSpec, error) {
},
},
}
if cfg.InterLinkAvailable && service.EnableInterLink {
// Add specs of InterLink
podSpec.Containers[0].ImagePullPolicy = "Always"
} else {
// Add specs
volumeMount := v1.VolumeMount{
Name: VolumeName,
ReadOnly: true,
MountPath: VolumePath,
}
volume := v1.Volume{

Name: VolumeName,
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: PVCName,
},
},
}
podSpec.Containers[0].VolumeMounts = append(podSpec.Containers[0].VolumeMounts, volumeMount)
podSpec.Volumes = append(podSpec.Volumes, volume)
}
// Add the required environment variables for the watchdog
addWatchdogEnvVars(podSpec, cfg, service)

Expand Down Expand Up @@ -409,6 +422,10 @@ func (service *Service) GetSupervisorPath() string {
return fmt.Sprintf("%s/%s", VolumePath, SupervisorName)
}

func (service *Service) GetSupervisorURL() string {
return SupervisorURL
}

// HasReplicas checks if the service has replicas defined
func (service *Service) HasReplicas() bool {
return len(service.Replicas) > 0
Expand Down

0 comments on commit 1615930

Please sign in to comment.