From 1d07fda0a4836d1eb3835f0d8db9fc445fe43ddb Mon Sep 17 00:00:00 2001 From: "Alex Ellis (OpenFaaS Ltd)" Date: Mon, 1 Nov 2021 10:57:05 +0000 Subject: [PATCH 1/2] Wait for a function to become healthy in scale-up event Prior to this change, after scaling a function up and returning the API call, a function may still not be ready to serve traffic. This resulted in HTTP errors, for a percentage of the time, especially if the task was deleted instead of being just paused. Pausing was instant, but during re-creation the function needs some time to start up. This change puts a health check into the hot path for the scale event. It is blocking, so scaling up will have some additional latency, but will return with a ready endpoint much more of the time than previously. This approach means that faasd doesn't have to run a set of exec or HTTP healthchecks continually, and use CPU for each of them, even when a function is idle. Tested with the nodeinfo function, by killing the task and then invoking the function. Prior to this, the function may give an error code some of the time. Signed-off-by: Alex Ellis (OpenFaaS Ltd) --- cmd/provider.go | 2 +- pkg/provider/handlers/scale.go | 120 ++++++++++++++++++++++++--------- 2 files changed, 91 insertions(+), 31 deletions(-) diff --git a/cmd/provider.go b/cmd/provider.go index 00051c84..7f408b57 100644 --- a/cmd/provider.go +++ b/cmd/provider.go @@ -98,7 +98,7 @@ func makeProviderCmd() *cobra.Command { DeployHandler: handlers.MakeDeployHandler(client, cni, baseUserSecretsPath, alwaysPull), FunctionReader: handlers.MakeReadHandler(client), ReplicaReader: handlers.MakeReplicaReaderHandler(client), - ReplicaUpdater: handlers.MakeReplicaUpdateHandler(client, cni), + ReplicaUpdater: handlers.MakeReplicaUpdateHandler(client, cni, invokeResolver), UpdateHandler: handlers.MakeUpdateHandler(client, cni, baseUserSecretsPath, alwaysPull), HealthHandler: func(w http.ResponseWriter, r *http.Request) {}, InfoHandler: handlers.MakeInfoHandler(Version, GitCommit), diff --git a/pkg/provider/handlers/scale.go b/pkg/provider/handlers/scale.go index f705dc2f..3f5e90f1 100644 --- a/pkg/provider/handlers/scale.go +++ b/pkg/provider/handlers/scale.go @@ -6,16 +6,20 @@ import ( "fmt" "io/ioutil" "log" + "net" "net/http" + "net/url" + "time" "github.com/containerd/containerd" "github.com/containerd/containerd/namespaces" gocni "github.com/containerd/go-cni" + "github.com/openfaas/faas-provider/proxy" "github.com/openfaas/faas-provider/types" ) -func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w http.ResponseWriter, r *http.Request) { +func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI, resolver proxy.BaseURLResolver) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { @@ -30,12 +34,9 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h log.Printf("[Scale] request: %s\n", string(body)) req := types.ScaleServiceRequest{} - err := json.Unmarshal(body, &req) - - if err != nil { + if err := json.Unmarshal(body, &req); err != nil { log.Printf("[Scale] error parsing input: %s\n", err) http.Error(w, err.Error(), http.StatusBadRequest) - return } @@ -55,18 +56,23 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h name := req.ServiceName - if _, err := GetFunction(client, name, namespace); err != nil { + fn, err := GetFunction(client, name, namespace) + if err != nil { msg := fmt.Sprintf("service %s not found", name) log.Printf("[Scale] %s\n", msg) http.Error(w, msg, http.StatusNotFound) return } - ctx := namespaces.WithNamespace(context.Background(), namespace) + healthPath := "/_/healthz" + if v := fn.annotations["com.openfaas.health.http.path"]; len(v) > 0 { + healthPath = v + } - ctr, ctrErr := client.LoadContainer(ctx, name) - if ctrErr != nil { - msg := fmt.Sprintf("cannot load service %s, error: %s", name, ctrErr) + ctx := namespaces.WithNamespace(context.Background(), namespace) + ctr, err := client.LoadContainer(ctx, name) + if err != nil { + msg := fmt.Sprintf("cannot load service %s, error: %s", name, err) log.Printf("[Scale] %s\n", msg) http.Error(w, msg, http.StatusNotFound) return @@ -75,16 +81,16 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h var taskExists bool var taskStatus *containerd.Status - task, taskErr := ctr.Task(ctx, nil) - if taskErr != nil { - msg := fmt.Sprintf("cannot load task for service %s, error: %s", name, taskErr) + task, err := ctr.Task(ctx, nil) + if err != nil { + msg := fmt.Sprintf("cannot load task for service %s, error: %s", name, err) log.Printf("[Scale] %s\n", msg) taskExists = false } else { taskExists = true - status, statusErr := task.Status(ctx) - if statusErr != nil { - msg := fmt.Sprintf("cannot load task status for %s, error: %s", name, statusErr) + status, err := task.Status(ctx) + if err != nil { + msg := fmt.Sprintf("cannot load task status for %s, error: %s", name, err) log.Printf("[Scale] %s\n", msg) http.Error(w, msg, http.StatusInternalServerError) return @@ -99,28 +105,31 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h if req.Replicas == 0 { // If a task is running, pause it if taskExists && taskStatus.Status == containerd.Running { - if pauseErr := task.Pause(ctx); pauseErr != nil { - wrappedPauseErr := fmt.Errorf("error pausing task %s, error: %s", name, pauseErr) - log.Printf("[Scale] %s\n", wrappedPauseErr.Error()) - http.Error(w, wrappedPauseErr.Error(), http.StatusNotFound) + if err := task.Pause(ctx); err != nil { + werr := fmt.Errorf("error pausing task %s, error: %s", name, err) + log.Printf("[Scale] %s\n", werr.Error()) + http.Error(w, werr.Error(), http.StatusNotFound) return } } + + // Otherwise, no action is required + return } if taskExists { if taskStatus != nil { if taskStatus.Status == containerd.Paused { - if resumeErr := task.Resume(ctx); resumeErr != nil { - log.Printf("[Scale] error resuming task %s, error: %s\n", name, resumeErr) - http.Error(w, resumeErr.Error(), http.StatusBadRequest) + if err := task.Resume(ctx); err != nil { + log.Printf("[Scale] error resuming task %s, error: %s\n", name, err) + http.Error(w, err.Error(), http.StatusBadRequest) return } } else if taskStatus.Status == containerd.Stopped { // Stopped tasks cannot be restarted, must be removed, and created again - if _, delErr := task.Delete(ctx); delErr != nil { - log.Printf("[Scale] error deleting stopped task %s, error: %s\n", name, delErr) - http.Error(w, delErr.Error(), http.StatusBadRequest) + if _, err := task.Delete(ctx); err != nil { + log.Printf("[Scale] error deleting stopped task %s, error: %s\n", name, err) + http.Error(w, err.Error(), http.StatusBadRequest) return } createNewTask = true @@ -131,12 +140,63 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI) func(w h } if createNewTask { - deployErr := createTask(ctx, client, ctr, cni) - if deployErr != nil { - log.Printf("[Scale] error deploying %s, error: %s\n", name, deployErr) - http.Error(w, deployErr.Error(), http.StatusBadRequest) + err := createTask(ctx, client, ctr, cni) + if err != nil { + log.Printf("[Scale] error deploying %s, error: %s\n", name, err) + http.Error(w, err.Error(), http.StatusBadRequest) return } } + + if err := waitUntilHealthy(name, resolver, healthPath); err != nil { + log.Printf("[Scale] error waiting for function %s to become ready, error: %s\n", name, err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + } +} + +func waitUntilHealthy(name string, resolver proxy.BaseURLResolver, healthPath string) error { + endpoint, err := resolver.Resolve(name) + if err != nil { + return err } + + host, port, _ := net.SplitHostPort(endpoint.Host) + u, err := url.Parse(fmt.Sprintf("http://%s:%s%s", host, port, healthPath)) + if err != nil { + return err + } + + // Try to hit the health endpoint and block until + // ready. + attempts := 100 + pause := time.Millisecond * 20 + for i := 0; i < attempts; i++ { + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return err + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + if res.Body != nil { + res.Body.Close() + } + + if res.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected health status: %d", res.StatusCode) + } + + if err == nil { + break + } + + time.Sleep(pause) + } + + return nil } From bbd3b4ff07826a12f0972539e445e60ef772f47e Mon Sep 17 00:00:00 2001 From: "Alex Ellis (OpenFaaS Ltd)" Date: Mon, 1 Nov 2021 11:04:38 +0000 Subject: [PATCH 2/2] Add comment to explain how method works Signed-off-by: Alex Ellis (OpenFaaS Ltd) --- pkg/provider/handlers/scale.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/provider/handlers/scale.go b/pkg/provider/handlers/scale.go index 3f5e90f1..c2d39193 100644 --- a/pkg/provider/handlers/scale.go +++ b/pkg/provider/handlers/scale.go @@ -156,6 +156,13 @@ func MakeReplicaUpdateHandler(client *containerd.Client, cni gocni.CNI, resolver } } +// waitUntilHealthy blocks until the healthPath returns a HTTP 200 for the +// IP address resolved for the given function. +// Maximum retries: 100 +// Delay between each attempt: 20ms +// A custom path can be set via an annotation in the function's spec: +// com.openfaas.health.http.path: /handlers/ready +// func waitUntilHealthy(name string, resolver proxy.BaseURLResolver, healthPath string) error { endpoint, err := resolver.Resolve(name) if err != nil {