From 1a27f89a2f0217ea5bff3a8d1a8ab8250d9ef5f0 Mon Sep 17 00:00:00 2001 From: Yi Chen Date: Mon, 8 Jul 2024 11:01:18 +0800 Subject: [PATCH] Update Signed-off-by: Yi Chen --- .dockerignore | 18 +- .golangci.yaml | 4 +- Dockerfile | 27 +- Makefile | 39 ++- charts/spark-operator-chart/README.md | 8 +- .../templates/controller/deployment.yaml | 4 +- .../templates/controller/rbac.yaml | 51 +++- .../templates/spark/rbac.yaml | 11 +- .../templates/webhook/deployment.yaml | 9 + .../templates/webhook/secret.yaml | 2 +- charts/spark-operator-chart/values.yaml | 12 +- cmd/operator/controller/root.go | 2 +- cmd/operator/controller/start.go | 48 ++-- cmd/operator/root.go | 4 +- cmd/operator/version.go | 3 - cmd/operator/version/root.go | 40 +++ cmd/operator/webhook/root.go | 2 +- cmd/operator/webhook/start.go | 35 ++- codecov.yml => codecov.yaml | 0 entrypoint.sh | 13 +- examples/spark-pi-dynamic-allocation.yaml | 4 +- examples/spark-pi-prometheus.yaml | 18 +- examples/spark-pi.yaml | 1 - go.mod | 2 +- .../controller.go | 2 +- .../event_filter.go | 14 +- .../event_handler.go | 8 +- .../scheduledsparkapplication/event_filter.go | 2 +- .../controller/sparkapplication/controller.go | 105 +++---- .../sparkapplication/controller_test.go | 4 - .../sparkapplication/event_filter.go | 15 +- .../sparkapplication/event_handler.go | 92 ++++-- .../controller/sparkapplication/metrics.go | 214 -------------- .../sparkapplication/metrics_test.go | 69 ----- .../controller/sparkapplication/submission.go | 69 ++--- .../controller/sparkapplication/web_ui.go | 36 +-- .../event_filter.go | 14 +- .../event_handler.go | 8 +- internal/metrics/metrcis.go | 2 +- internal/metrics/sparkapplication_metrics.go | 241 +++++++++++----- internal/metrics/sparkpod_metrics.go | 125 ++++++-- internal/scheduler/volcano/scheduler.go | 4 +- .../scheduledsparkapplication_defaulter.go | 2 +- .../scheduledsparkapplication_validator.go | 9 +- internal/webhook/sparkpod_defaulter.go | 32 ++- pkg/certificate/certificate_test.go | 2 + pkg/common/prometheus.go | 1 + pkg/common/spark.go | 240 ++++++++++++++++ pkg/common/spark_properties.go | 271 ------------------ pkg/util/capabilities.go | 25 +- pkg/util/metrics.go | 108 ------- pkg/util/metrics_test.go | 70 ----- pkg/util/sparkapplication.go | 4 + version.go | 90 ++++++ 54 files changed, 1108 insertions(+), 1127 deletions(-) delete mode 100644 cmd/operator/version.go create mode 100644 cmd/operator/version/root.go rename codecov.yml => codecov.yaml (100%) delete mode 100644 internal/controller/sparkapplication/metrics.go delete mode 100644 internal/controller/sparkapplication/metrics_test.go delete mode 100644 pkg/common/spark_properties.go delete mode 100644 pkg/util/metrics_test.go create mode 100644 version.go diff --git a/.dockerignore b/.dockerignore index 580768184..9b0bebd90 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,21 +1,31 @@ +.github/ .idea/ .vscode/ bin/ charts/ docs/ +config/ examples/ +hack/ manifest/ -sparkctl/sparkctl -sparkctl/sparkctl-linux-amd64 -sparkctl/sparkctl-darwin-amd64 +spark-docker/ +sparkctl/ test/ vendor/ +.dockerignore .DS_Store .gitignore .gitlab-ci.yaml +.golangci.yaml .pre-commit-config.yaml +ADOPTERS.md +CODE_OF_CONDUCT.md +codecov.ymal CONTRIBUTING.md +cover.out +Dockerfile +LICENSE OWNERS +PROJECT README.md -spark-operator test.sh diff --git a/.golangci.yaml b/.golangci.yaml index 2de9c4511..01f5a3e58 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -24,14 +24,14 @@ linters-settings: - default - prefix(github.com/kubeflow/spark-operator) depguard: - main: + Main: files: - $all - "!$test" listMode: Lax deny: reflect: Please don't use reflect package - test: + Test: files: - $test listMode: Lax diff --git a/Dockerfile b/Dockerfile index 61b2cc576..53b5c3d75 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,35 +16,26 @@ ARG SPARK_IMAGE=spark:3.5.0 -FROM golang:1.22-alpine as builder +FROM golang:1.22.0 as builder WORKDIR /workspace -# Copy the Go Modules manifests -COPY go.mod go.mod -COPY go.sum go.sum -# Cache deps before building and copying source so that we don't need to re-download as much -# and so that source changes don't invalidate our downloaded layer -RUN go mod download +COPY . . -# Copy the go source code -COPY api/ api/ -COPY cmd/ cmd/ -COPY internal/ internal/ -COPY pkg/ pkg/ - -# Build ARG TARGETARCH -RUN CGO_ENABLED=0 GOOS=linux GOARCH=${TARGETARCH} GO111MODULE=on go build -a -o /usr/bin/spark-operator cmd/main.go + +RUN CGO_ENABLED=0 GOOS=linux GOARCH=${TARGETARCH} GO111MODULE=on make build-operator FROM ${SPARK_IMAGE} + USER root -COPY --from=builder /usr/bin/spark-operator /usr/bin/ -RUN apt-get update --allow-releaseinfo-change \ - && apt-get update \ + +RUN apt-get update \ && apt-get install -y tini \ && rm -rf /var/lib/apt/lists/* +COPY --from=builder /workspace/bin/spark-operator /usr/bin/spark-operator + COPY entrypoint.sh /usr/bin/ ENTRYPOINT ["/usr/bin/entrypoint.sh"] diff --git a/Makefile b/Makefile index d18942dea..3c8131d66 100644 --- a/Makefile +++ b/Makefile @@ -12,12 +12,23 @@ endif SHELL = /usr/bin/env bash -o pipefail .SHELLFLAGS = -ec +# Version information. +# OPERATOR_VERSION ?= $$(grep appVersion $(SPARK_OPERATOR_CHART_PATH)/Chart.yaml | awk '{print $$2}') +VERSION=v2.0.0 +BUILD_DATE = $(shell date -u +"%Y-%m-%dT%H:%M:%S%:z") +GIT_COMMIT = $(shell git rev-parse HEAD) +GIT_TAG = $(shell if [ -z "`git status --porcelain`" ]; then git describe --exact-match --tags HEAD 2>/dev/null; fi) +GIT_TREE_STATE = $(shell if [ -z "`git status --porcelain`" ]; then echo "clean" ; else echo "dirty"; fi) +GIT_SHA = $(shell git rev-parse --short HEAD || echo "HEAD") +GIT_VERSION = ${VERSION}-${GIT_SHA} + +REPO=github.com/kubeflow/spark-operator SPARK_OPERATOR_GOPATH=/go/src/github.com/kubeflow/spark-operator +SPARK_OPERATOR_CHART_PATH=charts/spark-operator-chart +OPERATOR_VERSION ?= $$(grep appVersion $(SPARK_OPERATOR_CHART_PATH)/Chart.yaml | awk '{print $$2}') DEP_VERSION:=`grep DEP_VERSION= Dockerfile | awk -F\" '{print $$2}'` BUILDER=`grep "FROM golang:" Dockerfile | awk '{print $$2}'` UNAME:=`uname | tr '[:upper:]' '[:lower:]'` -REPO=github.com/kubeflow/spark-operator -SPARK_OPERATOR_CHART_PATH=charts/spark-operator-chart # CONTAINER_TOOL defines the container tool to be used for building images. # Be aware that the target commands are only tested with Docker which is @@ -25,7 +36,6 @@ SPARK_OPERATOR_CHART_PATH=charts/spark-operator-chart # tools. (i.e. podman) CONTAINER_TOOL ?= docker -OPERATOR_VERSION ?= $$(grep appVersion $(SPARK_OPERATOR_CHART_PATH)/Chart.yaml | awk '{print $$2}') # Image URL to use all building/pushing image targets IMAGE_REPOSITORY ?= docker.io/kubeflow/spark-operator IMAGE_TAG ?= $(OPERATOR_VERSION) @@ -93,13 +103,14 @@ go-vet: ## Run go vet against code. .PHONY: lint lint: golangci-lint ## Run golangci-lint linter + @echo "Running golangci-lint run..." $(GOLANGCI_LINT) run .PHONY: lint-fix lint-fix: golangci-lint ## Run golangci-lint linter and perform fixes + @echo "Running golangci-lint run --fix..." $(GOLANGCI_LINT) run --fix - .PHONY: unit-test unit-test: envtest ## Run unit tests. @echo "Running unit tests..." @@ -114,9 +125,16 @@ e2e-test: envtest kind-create-cluster ## Run the e2e tests against a Kind k8s in ##@ Build +override LDFLAGS += \ + -X ${REPO}.version=${VERSION} \ + -X ${REPO}.buildDate=${BUILD_DATE} \ + -X ${REPO}.gitCommit=${GIT_COMMIT} \ + -X ${REPO}.gitTreeState=${GIT_TREE_STATE} \ + -extldflags "-static" + .PHONY: build-operator build-operator: ## Build Spark operator - go build -o bin/spark-operator cmd/main.go + go build -o bin/spark-operator -ldflags '${LDFLAGS}' cmd/main.go .PHONY: build-sparkctl build-sparkctl: ## Build sparkctl binary @@ -171,16 +189,13 @@ docker-push: ## Push docker image with the operator. # - have enabled BuildKit. More info: https://docs.docker.com/develop/develop-images/build_enhancements/ # - be able to push the image to your registry (i.e. if you do not set a valid value via IMG=> then the export will fail) # To adequately provide solutions that are compatible with multiple platforms, you should consider using this option. -PLATFORMS ?= linux/arm64,linux/amd64 +PLATFORMS ?= linux/amd64,linux/arm64 .PHONY: docker-buildx docker-buildx: ## Build and push docker image for the operator for cross-platform support - # copy existing Dockerfile and insert --platform=${BUILDPLATFORM} into Dockerfile.cross, and preserve the original Dockerfile - sed -e '1 s/\(^FROM\)/FROM --platform=\$$\{BUILDPLATFORM\}/; t' -e ' 1,// s//FROM --platform=\$$\{BUILDPLATFORM\}/' Dockerfile > Dockerfile.cross - $(CONTAINER_TOOL) buildx create --name spark-operator-builder $(CONTAINER_TOOL) buildx use spark-operator-builder - - $(CONTAINER_TOOL) buildx build --push --platform=$(PLATFORMS) --tag ${IMAGE_REPOSITORY}:${IMAGE_TAG} -f Dockerfile.cross . + - $(CONTAINER_TOOL) buildx build --push --platform=$(PLATFORMS) --tag ${IMAGE_REPOSITORY}:${IMAGE_TAG} -f Dockerfile . - $(CONTAINER_TOOL) buildx rm spark-operator-builder - rm Dockerfile.cross ##@ Helm @@ -222,11 +237,11 @@ kind-delete-custer: kind ## Delete the created kind cluster. rm -f $(KIND_KUBE_CONFIG) .PHONY: install -install: manifests kustomize ## Install CRDs into the K8s cluster specified in ~/.kube/config. +install-crd: manifests kustomize ## Install CRDs into the K8s cluster specified in ~/.kube/config. $(KUSTOMIZE) build config/crd | $(KUBECTL) apply -f - .PHONY: uninstall -uninstall: manifests kustomize ## Uninstall CRDs from the K8s cluster specified in ~/.kube/config. Call with ignore-not-found=true to ignore resource not found errors during deletion. +uninstall-crd: manifests kustomize ## Uninstall CRDs from the K8s cluster specified in ~/.kube/config. Call with ignore-not-found=true to ignore resource not found errors during deletion. $(KUSTOMIZE) build config/crd | $(KUBECTL) delete --ignore-not-found=$(ignore-not-found) -f - .PHONY: deploy diff --git a/charts/spark-operator-chart/README.md b/charts/spark-operator-chart/README.md index ca7fbdfbb..b7cccf9c3 100644 --- a/charts/spark-operator-chart/README.md +++ b/charts/spark-operator-chart/README.md @@ -83,7 +83,7 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum | batchScheduler.enable | bool | `false` | Enable batch scheduler for spark jobs scheduling. If enabled, users can specify batch scheduler name in spark application | | commonLabels | object | `{}` | Common labels to add to the resources | | controller.ingressUrlFormat | string | `""` | Ingress URL format. Requires the UI service to be enabled by setting `controller.uiService.enable` to true. | -| controller.logLevel | int | `1` | Set higher levels for more verbose logging | +| controller.logLevel | string | `"info"` | Configure the verbosity of logging, can be one of `debug`, `info`, `error` | | controller.rbac.annotations | object | `{}` | Optional annotations for the controller RBAC resources | | controller.rbac.create | bool | `true` | Specifies whether to create RBAC resources for the controller | | controller.replicaCount | int | `1` | Number of replicas of controller, leader election will be enabled if this is greater than 1 | @@ -111,9 +111,9 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum | priorityClassName | string | `""` | A priority class to be used for running spark-operator pod. | | prometheus.metrics.enable | bool | `true` | Specifies whether to enable prometheus metrics scraping | | prometheus.metrics.endpoint | string | `"/metrics"` | Metrics serving endpoint | -| prometheus.metrics.port | int | `10254` | Metrics port | +| prometheus.metrics.port | int | `8080` | Metrics port | | prometheus.metrics.portName | string | `"metrics"` | Metrics port name | -| prometheus.metrics.prefix | string | `""` | Metric prefix, will be added to all exported metrics | +| prometheus.metrics.prefix | string | `""` | Metrics prefix, will be added to all exported metrics | | prometheus.podMonitor.create | bool | `false` | Specifies whether to create pod monitor. Note that prometheus metrics should be enabled as well. | | prometheus.podMonitor.jobLabel | string | `"spark-operator-podmonitor"` | The label to use to retrieve the job name from | | prometheus.podMonitor.labels | object | `{}` | Pod monitor labels | @@ -133,7 +133,7 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum | volumes | list | `[]` | | | webhook.enable | bool | `true` | Specifies whether to enable webhook server | | webhook.failurePolicy | string | `"Fail"` | Specifies how unrecognized errors are handled, allowed values are `Ignore` or `Fail`. | -| webhook.logLevel | int | `1` | Set higher levels for more verbose logging | +| webhook.logLevel | string | `"info"` | Configure the verbosity of logging, can be one of `debug`, `info`, `error` | | webhook.port | int | `9443` | Specifies webhook port | | webhook.portName | string | `"webhook"` | Specifies webhook service port name | | webhook.rbac.annotations | object | `{}` | Optional annotations for the webhook RBAC resources | diff --git a/charts/spark-operator-chart/templates/controller/deployment.yaml b/charts/spark-operator-chart/templates/controller/deployment.yaml index 9976f0ccb..742d420cb 100644 --- a/charts/spark-operator-chart/templates/controller/deployment.yaml +++ b/charts/spark-operator-chart/templates/controller/deployment.yaml @@ -69,10 +69,10 @@ spec: - --enable-batch-scheduler={{ .Values.batchScheduler.enable }} {{- if .Values.prometheus.metrics.enable }} - --enable-metrics=true - - --metrics-labels=app_type - - --metrics-port={{ .Values.prometheus.metrics.port }} + - --metrics-bind-address=:{{ .Values.prometheus.metrics.port }} - --metrics-endpoint={{ .Values.prometheus.metrics.endpoint }} - --metrics-prefix={{ .Values.prometheus.metrics.prefix }} + - --metrics-labels=app_type {{- end }} - --leader-election=true - --leader-election-lock-name={{ include "spark-operator.controller.leaderElectionName" . }} diff --git a/charts/spark-operator-chart/templates/controller/rbac.yaml b/charts/spark-operator-chart/templates/controller/rbac.yaml index 91ddfd315..fb355a452 100644 --- a/charts/spark-operator-chart/templates/controller/rbac.yaml +++ b/charts/spark-operator-chart/templates/controller/rbac.yaml @@ -30,28 +30,45 @@ rules: - "" resources: - pods - - persistentvolumeclaims verbs: - - "*" + - get + - list + - watch + - create + - update + - patch + - delete + - deletecollection - apiGroups: - "" resources: - - services - configmaps verbs: - - create - get + - create + - update + - patch - delete +- apiGroups: + - "" + resources: + - services + verbs: + - get + - list + - watch + - create - update - patch + - delete - apiGroups: - extensions - networking.k8s.io resources: - ingresses verbs: - - create - get + - create - delete - apiGroups: - "" @@ -85,13 +102,31 @@ rules: - sparkoperator.k8s.io resources: - sparkapplications - - sparkapplications/status - - sparkapplications/finalizers - scheduledsparkapplications + verbs: + - get + - list + - watch + - create + - update + - patch + - delete +- apiGroups: + - sparkoperator.k8s.io + resources: + - sparkapplications/status - scheduledsparkapplications/status + verbs: + - get + - update + - patch +- apiGroups: + - sparkoperator.k8s.io + resources: + - sparkapplications/finalizers - scheduledsparkapplications/finalizers verbs: - - "*" + - update {{- if .Values.batchScheduler.enable }} {{/* required for the `volcano` batch scheduler */}} - apiGroups: diff --git a/charts/spark-operator-chart/templates/spark/rbac.yaml b/charts/spark-operator-chart/templates/spark/rbac.yaml index eb372b8a8..945f1bea9 100644 --- a/charts/spark-operator-chart/templates/spark/rbac.yaml +++ b/charts/spark-operator-chart/templates/spark/rbac.yaml @@ -35,11 +35,18 @@ rules: - "" resources: - pods - - services - configmaps - persistentvolumeclaims + - services verbs: - - "*" + - get + - list + - watch + - create + - update + - patch + - delete + - deletecollection --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/charts/spark-operator-chart/templates/webhook/deployment.yaml b/charts/spark-operator-chart/templates/webhook/deployment.yaml index ca39cbc17..2cfc13009 100644 --- a/charts/spark-operator-chart/templates/webhook/deployment.yaml +++ b/charts/spark-operator-chart/templates/webhook/deployment.yaml @@ -57,6 +57,15 @@ spec: - --webhook-svc-name={{ include "spark-operator.webhook.serviceName" . }} - --webhook-svc-namespace={{ .Release.Namespace }} - --webhook-port={{ .Values.webhook.port }} + - --mutating-webhook-name={{ include "spark-operator.webhook.name" . }} + - --validating-webhook-name={{ include "spark-operator.webhook.name" . }} + {{- if .Values.prometheus.metrics.enable }} + - --enable-metrics=true + - --metrics-bind-address=:{{ .Values.prometheus.metrics.port }} + - --metrics-endpoint={{ .Values.prometheus.metrics.endpoint }} + - --metrics-prefix={{ .Values.prometheus.metrics.prefix }} + - --metrics-labels=app_type + {{- end }} - --leader-election=true - --leader-election-lock-name={{ include "spark-operator.webhook.leaderElectionName" . }} - --leader-election-lock-namespace={{ .Release.Namespace }} diff --git a/charts/spark-operator-chart/templates/webhook/secret.yaml b/charts/spark-operator-chart/templates/webhook/secret.yaml index 773f80025..1d6fa9751 100644 --- a/charts/spark-operator-chart/templates/webhook/secret.yaml +++ b/charts/spark-operator-chart/templates/webhook/secret.yaml @@ -22,7 +22,7 @@ metadata: labels: {{- include "spark-operator.webhook.labels" . | nindent 4 }} annotations: - helm.sh/hook: pre-install + helm.sh/hook: pre-install,pre-upgrade,pre-rollback helm.sh/hook-delete-policy: before-hook-creation data: ca-key.pem: "" diff --git a/charts/spark-operator-chart/values.yaml b/charts/spark-operator-chart/values.yaml index 3d1c91225..573afe16b 100644 --- a/charts/spark-operator-chart/values.yaml +++ b/charts/spark-operator-chart/values.yaml @@ -42,8 +42,8 @@ controller: # -- Number of replicas of controller, leader election will be enabled if this is greater than 1 replicaCount: 1 - # -- Set higher levels for more verbose logging - logLevel: 1 + # -- Configure the verbosity of logging, can be one of `debug`, `info`, `error` + logLevel: info uiService: # -- Enable UI service creation for Spark application @@ -81,8 +81,8 @@ webhook: # -- Number of replicas of webhook server replicaCount: 1 - # -- Set higher levels for more verbose logging - logLevel: 1 + # -- Configure the verbosity of logging, can be one of `debug`, `info`, `error` + logLevel: info # -- Specifies webhook port port: 9443 @@ -134,12 +134,12 @@ prometheus: # -- Specifies whether to enable prometheus metrics scraping enable: true # -- Metrics port - port: 10254 + port: 8080 # -- Metrics port name portName: metrics # -- Metrics serving endpoint endpoint: /metrics - # -- Metric prefix, will be added to all exported metrics + # -- Metrics prefix, will be added to all exported metrics prefix: "" # Prometheus pod monitor for controller pods diff --git a/cmd/operator/controller/root.go b/cmd/operator/controller/root.go index 8318a3223..eeaa8edcd 100644 --- a/cmd/operator/controller/root.go +++ b/cmd/operator/controller/root.go @@ -21,7 +21,7 @@ import ( ) func NewCommand() *cobra.Command { - var command = &cobra.Command{ + command := &cobra.Command{ Use: "controller", Short: "Spark operator controller", RunE: func(cmd *cobra.Command, _ []string) error { diff --git a/cmd/operator/controller/start.go b/cmd/operator/controller/start.go index 7ded5acd6..19596a496 100644 --- a/cmd/operator/controller/start.go +++ b/cmd/operator/controller/start.go @@ -48,10 +48,12 @@ import ( // Register volcano scheduler _ "github.com/kubeflow/spark-operator/internal/scheduler/volcano" + sparkoperator "github.com/kubeflow/spark-operator" "github.com/kubeflow/spark-operator/api/v1beta1" "github.com/kubeflow/spark-operator/api/v1beta2" "github.com/kubeflow/spark-operator/internal/controller/scheduledsparkapplication" "github.com/kubeflow/spark-operator/internal/controller/sparkapplication" + "github.com/kubeflow/spark-operator/internal/metrics" "github.com/kubeflow/spark-operator/pkg/common" // +kubebuilder:scaffold:imports ) @@ -87,7 +89,6 @@ var ( // Metrics enableMetrics bool metricsBindAddress string - metricsPort string metricsEndpoint string metricsPrefix string metricsLabels []string @@ -112,10 +113,11 @@ func NewStartCommand() *cobra.Command { var command = &cobra.Command{ Use: "start", Short: "Start controller and webhook", - PreRun: func(cmd *cobra.Command, args []string) { + PreRun: func(_ *cobra.Command, args []string) { development = viper.GetBool("development") }, - Run: func(cmd *cobra.Command, args []string) { + Run: func(_ *cobra.Command, args []string) { + sparkoperator.PrintVersion(false) start() }, } @@ -137,11 +139,9 @@ func NewStartCommand() *cobra.Command { command.Flags().DurationVar(&leaderElectionRenewDeadline, "leader-election-renew-deadline", 14*time.Second, "Leader election renew deadline.") command.Flags().DurationVar(&leaderElectionRetryPeriod, "leader-election-retry-period", 4*time.Second, "Leader election retry period.") + command.Flags().BoolVar(&enableMetrics, "enable-metrics", false, "Enable metrics.") command.Flags().StringVar(&metricsBindAddress, "metrics-bind-address", "0", "The address the metric endpoint binds to. "+ "Use the port :8080. If not set, it will be 0 in order to disable the metrics server") - - command.Flags().BoolVar(&enableMetrics, "enable-metrics", false, "Whether to enable the metrics endpoint.") - command.Flags().StringVar(&metricsPort, "metrics-port", "10254", "Port for the metrics endpoint.") command.Flags().StringVar(&metricsEndpoint, "metrics-endpoint", "/metrics", "Metrics endpoint.") command.Flags().StringVar(&metricsPrefix, "metrics-prefix", "", "Prefix for the metrics.") command.Flags().StringSliceVar(&metricsLabels, "metrics-labels", []string{}, "Labels to be added to the metrics.") @@ -203,25 +203,12 @@ func start() { os.Exit(1) } - var metrics *sparkapplication.Metrics - // if enableMetrics { - // metrics = sparkapplication.NewMetrics(&util.MetricConfig{ - // MetricsEndpoint: metricsEndpoint, - // MetricsPort: metricsPort, - // MetricsPrefix: metricsPrefix, - // MetricsLabels: metricsLabels, - // MetricsJobStartLatencyBuckets: metricsJobStartLatencyBuckets, - // }) - // metrics.RegisterMetrics() - // } - // Setup controller for SparkApplication. if err = sparkapplication.NewReconciler( mgr, mgr.GetScheme(), mgr.GetClient(), mgr.GetEventRecorderFor("spark-application-controller"), - metrics, nil, newSparkApplicationReconcilerOptions(), ).SetupWithManager(mgr, newControllerOptions()); err != nil { @@ -313,12 +300,15 @@ func newCacheOptions() cache.Options { Scheme: scheme, DefaultNamespaces: defaultNamespaces, ByObject: map[client.Object]cache.ByObject{ - &v1beta2.SparkApplication{}: {}, &corev1.Pod{}: { Label: labels.SelectorFromSet(labels.Set{ common.LabelLaunchedBySparkOperator: "true", }), }, + &corev1.ConfigMap{}: {}, + &corev1.PersistentVolumeClaim{}: {}, + &corev1.Service{}: {}, + &v1beta2.SparkApplication{}: {}, }, } @@ -335,11 +325,21 @@ func newControllerOptions() controller.Options { } func newSparkApplicationReconcilerOptions() sparkapplication.Options { + var sparkApplicationMetrics *metrics.SparkApplicationMetrics + var sparkExecutorMetrics *metrics.SparkExecutorMetrics + if enableMetrics { + sparkApplicationMetrics = metrics.NewSparkApplicationMetrics(metricsPrefix, metricsLabels, metricsJobStartLatencyBuckets) + sparkApplicationMetrics.Register() + sparkExecutorMetrics = metrics.NewSparkExecutorMetrics(metricsPrefix, metricsLabels) + sparkExecutorMetrics.Register() + } options := sparkapplication.Options{ - Namespaces: namespaces, - EnableUIService: enableUIService, - IngressClassName: ingressClassName, - IngressURLFormat: ingressURLFormat, + Namespaces: namespaces, + EnableUIService: enableUIService, + IngressClassName: ingressClassName, + IngressURLFormat: ingressURLFormat, + SparkApplicationMetrics: sparkApplicationMetrics, + SparkExecutorMetrics: sparkExecutorMetrics, } return options } diff --git a/cmd/operator/root.go b/cmd/operator/root.go index b1fa25846..2ddaa900d 100644 --- a/cmd/operator/root.go +++ b/cmd/operator/root.go @@ -20,11 +20,12 @@ import ( "github.com/spf13/cobra" "github.com/kubeflow/spark-operator/cmd/operator/controller" + "github.com/kubeflow/spark-operator/cmd/operator/version" "github.com/kubeflow/spark-operator/cmd/operator/webhook" ) func NewCommand() *cobra.Command { - var command = &cobra.Command{ + command := &cobra.Command{ Use: "spark-operator", Short: "Spark operator", RunE: func(cmd *cobra.Command, _ []string) error { @@ -33,5 +34,6 @@ func NewCommand() *cobra.Command { } command.AddCommand(controller.NewCommand()) command.AddCommand(webhook.NewCommand()) + command.AddCommand(version.NewCommand()) return command } diff --git a/cmd/operator/version.go b/cmd/operator/version.go deleted file mode 100644 index 931dc5bfe..000000000 --- a/cmd/operator/version.go +++ /dev/null @@ -1,3 +0,0 @@ -package operator - -// TODO diff --git a/cmd/operator/version/root.go b/cmd/operator/version/root.go new file mode 100644 index 000000000..95f844c09 --- /dev/null +++ b/cmd/operator/version/root.go @@ -0,0 +1,40 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package version + +import ( + "github.com/spf13/cobra" + + sparkoperator "github.com/kubeflow/spark-operator" +) + +var ( + short bool +) + +func NewCommand() *cobra.Command { + command := &cobra.Command{ + Use: "version", + Short: "version", + RunE: func(cmd *cobra.Command, args []string) error { + sparkoperator.PrintVersion(short) + return nil + }, + } + command.Flags().BoolVar(&short, "short", false, "Print just the version string.") + return command +} diff --git a/cmd/operator/webhook/root.go b/cmd/operator/webhook/root.go index 95b7b9ed3..47609ea49 100644 --- a/cmd/operator/webhook/root.go +++ b/cmd/operator/webhook/root.go @@ -21,7 +21,7 @@ import ( ) func NewCommand() *cobra.Command { - var command = &cobra.Command{ + command := &cobra.Command{ Use: "webhook", Short: "Spark operator webhook", RunE: func(cmd *cobra.Command, _ []string) error { diff --git a/cmd/operator/webhook/start.go b/cmd/operator/webhook/start.go index 83e6e2b6d..9a77f7939 100644 --- a/cmd/operator/webhook/start.go +++ b/cmd/operator/webhook/start.go @@ -30,8 +30,10 @@ import ( "github.com/spf13/viper" "go.uber.org/zap" "go.uber.org/zap/zapcore" + admissionregistrationv1 "k8s.io/api/admissionregistration/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -45,6 +47,7 @@ import ( metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" ctrlwebhook "sigs.k8s.io/controller-runtime/pkg/webhook" + sparkoperator "github.com/kubeflow/spark-operator" "github.com/kubeflow/spark-operator/api/v1beta1" "github.com/kubeflow/spark-operator/api/v1beta2" "github.com/kubeflow/spark-operator/internal/controller/mutatingwebhookconfiguration" @@ -91,13 +94,11 @@ var ( leaderElectionRetryPeriod time.Duration // Metrics - enableMetrics bool - metricsBindAddress string - metricsPort string - metricsEndpoint string - metricsPrefix string - metricsLabels []string - metricsJobStartLatencyBuckets []float64 + enableMetrics bool + metricsBindAddress string + metricsEndpoint string + metricsPrefix string + metricsLabels []string healthProbeBindAddress string secureMetrics bool @@ -118,10 +119,11 @@ func NewStartCommand() *cobra.Command { var command = &cobra.Command{ Use: "start", Short: "Start controller and webhook", - PreRun: func(cmd *cobra.Command, args []string) { + PreRun: func(_ *cobra.Command, args []string) { development = viper.GetBool("development") }, Run: func(cmd *cobra.Command, args []string) { + sparkoperator.PrintVersion(false) start() }, } @@ -152,15 +154,12 @@ func NewStartCommand() *cobra.Command { command.Flags().DurationVar(&leaderElectionRenewDeadline, "leader-election-renew-deadline", 14*time.Second, "Leader election renew deadline.") command.Flags().DurationVar(&leaderElectionRetryPeriod, "leader-election-retry-period", 4*time.Second, "Leader election retry period.") + command.Flags().BoolVar(&enableMetrics, "enable-metrics", false, "Enable metrics.") command.Flags().StringVar(&metricsBindAddress, "metrics-bind-address", "0", "The address the metric endpoint binds to. "+ "Use the port :8080. If not set, it will be 0 in order to disable the metrics server") - - command.Flags().BoolVar(&enableMetrics, "enable-metrics", false, "Whether to enable the metrics endpoint.") - command.Flags().StringVar(&metricsPort, "metrics-port", "10254", "Port for the metrics endpoint.") command.Flags().StringVar(&metricsEndpoint, "metrics-endpoint", "/metrics", "Metrics endpoint.") command.Flags().StringVar(&metricsPrefix, "metrics-prefix", "", "Prefix for the metrics.") command.Flags().StringSliceVar(&metricsLabels, "metrics-labels", []string{}, "Labels to be added to the metrics.") - command.Flags().Float64SliceVar(&metricsJobStartLatencyBuckets, "metrics-job-start-latency-buckets", []float64{30, 60, 90, 120, 150, 180, 210, 240, 270, 300}, "Buckets for the job start latency histogram.") command.Flags().StringVar(&healthProbeBindAddress, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") command.Flags().BoolVar(&secureMetrics, "secure-metrics", false, "If set the metrics endpoint is served securely") @@ -281,7 +280,7 @@ func start() { if err := ctrl.NewWebhookManagedBy(mgr). For(&v1beta2.SparkApplication{}). - WithDefaulter(webhook.NewScheduledSparkApplicationDefaulter()). + WithDefaulter(webhook.NewSparkApplicationDefaulter()). WithValidator(webhook.NewSparkApplicationValidator()). Complete(); err != nil { logger.Error(err, "Failed to create mutating webhook for Spark application") @@ -384,6 +383,16 @@ func newCacheOptions() cache.Options { }, &v1beta2.SparkApplication{}: {}, &v1beta2.ScheduledSparkApplication{}: {}, + &admissionregistrationv1.MutatingWebhookConfiguration{}: { + Field: fields.SelectorFromSet(fields.Set{ + "metadata.name": mutatingWebhookName, + }), + }, + &admissionregistrationv1.ValidatingWebhookConfiguration{}: { + Field: fields.SelectorFromSet(fields.Set{ + "metadata.name": validatingWebhookName, + }), + }, }, } diff --git a/codecov.yml b/codecov.yaml similarity index 100% rename from codecov.yml rename to codecov.yaml diff --git a/entrypoint.sh b/entrypoint.sh index f3c83ebad..0ca873012 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -4,21 +4,18 @@ set -ex # Check whether there is a passwd entry for the container UID -myuid=$(id -u) -mygid=$(id -g) +uid=$(id -u) +gid=$(id -g) + # turn off -e for getent because it will return error code in anonymous uid case set +e -uidentry=$(getent passwd $myuid) +uidentry=$(getent passwd $uid) set -e -echo $myuid -echo $mygid -echo $uidentry - # If there is no passwd entry for the container UID, attempt to create one if [[ -z "$uidentry" ]] ; then if [[ -w /etc/passwd ]] ; then - echo "$myuid:x:$myuid:$mygid:anonymous uid:$SPARK_HOME:/bin/false" >> /etc/passwd + echo "$uid:x:$uid:$gid:anonymous uid:$SPARK_HOME:/bin/false" >> /etc/passwd else echo "Container ENTRYPOINT failed to add passwd entry for anonymous UID" fi diff --git a/examples/spark-pi-dynamic-allocation.yaml b/examples/spark-pi-dynamic-allocation.yaml index 714a8e8ae..800313914 100644 --- a/examples/spark-pi-dynamic-allocation.yaml +++ b/examples/spark-pi-dynamic-allocation.yaml @@ -34,7 +34,7 @@ spec: cores: 1 coreLimit: 1200m memory: 512m - serviceAccount: spark + serviceAccount: spark-operator-spark executor: labels: version: 3.5.0 @@ -44,6 +44,6 @@ spec: memory: 512m dynamicAllocation: enabled: true - initialExecutors: 3 + initialExecutors: 2 maxExecutors: 5 minExecutors: 1 diff --git a/examples/spark-pi-prometheus.yaml b/examples/spark-pi-prometheus.yaml index b47de1db6..29a447061 100644 --- a/examples/spark-pi-prometheus.yaml +++ b/examples/spark-pi-prometheus.yaml @@ -14,7 +14,7 @@ # limitations under the License. # -apiVersion: "sparkoperator.k8s.io/v1beta2" +apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pi @@ -22,31 +22,31 @@ metadata: spec: type: Scala mode: cluster - image: "gcr.io/spark-operator/spark:v3.1.1-gcs-prometheus" + image: gcr.io/spark-operator/spark:v3.1.1-gcs-prometheus imagePullPolicy: Always mainClass: org.apache.spark.examples.SparkPi - mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar" + mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar arguments: - - "100000" - sparkVersion: "3.1.1" + - "100000" + sparkVersion: 3.1.1 restartPolicy: type: Never driver: cores: 1 - coreLimit: "1200m" - memory: "512m" + coreLimit: 1200m + memory: 512m labels: version: 3.1.1 serviceAccount: spark-operator-spark executor: cores: 1 instances: 1 - memory: "512m" + memory: 512m labels: version: 3.1.1 monitoring: exposeDriverMetrics: true exposeExecutorMetrics: true prometheus: - jmxExporterJar: "/prometheus/jmx_prometheus_javaagent-0.11.0.jar" + jmxExporterJar: /prometheus/jmx_prometheus_javaagent-0.11.0.jar port: 8090 diff --git a/examples/spark-pi.yaml b/examples/spark-pi.yaml index 595310181..efbca6118 100644 --- a/examples/spark-pi.yaml +++ b/examples/spark-pi.yaml @@ -34,4 +34,3 @@ spec: labels: version: 3.5.0 instances: 1 - diff --git a/go.mod b/go.mod index 658e02667..49935ad71 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,6 @@ require ( github.com/onsi/ginkgo/v2 v2.14.0 github.com/onsi/gomega v1.30.0 github.com/prometheus/client_golang v1.19.0 - github.com/prometheus/client_model v0.6.1 github.com/robfig/cron/v3 v3.0.1 github.com/spf13/cobra v1.8.0 github.com/spf13/viper v1.19.0 @@ -152,6 +151,7 @@ require ( github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.52.2 // indirect github.com/prometheus/procfs v0.13.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect diff --git a/internal/controller/mutatingwebhookconfiguration/controller.go b/internal/controller/mutatingwebhookconfiguration/controller.go index 71260a91f..a16908cbb 100644 --- a/internal/controller/mutatingwebhookconfiguration/controller.go +++ b/internal/controller/mutatingwebhookconfiguration/controller.go @@ -71,7 +71,7 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, options controller.Optio func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger.Info("Updating CA bundle of MutatingWebhookConfiguration", "name", req.Name, "namespace", req.Namespace) if err := r.updateMutatingWebhookConfiguration(ctx, req.NamespacedName); err != nil { - return ctrl.Result{Requeue: true}, nil + return ctrl.Result{Requeue: true}, err } return ctrl.Result{}, nil } diff --git a/internal/controller/mutatingwebhookconfiguration/event_filter.go b/internal/controller/mutatingwebhookconfiguration/event_filter.go index 08aaa3041..64131300b 100644 --- a/internal/controller/mutatingwebhookconfiguration/event_filter.go +++ b/internal/controller/mutatingwebhookconfiguration/event_filter.go @@ -36,21 +36,21 @@ func NewEventFilter(name string) *EventFilter { var _ predicate.Predicate = &EventFilter{} // Create implements predicate.Predicate. -func (m *EventFilter) Create(event.CreateEvent) bool { - return true +func (f *EventFilter) Create(e event.CreateEvent) bool { + return e.Object.GetName() == f.name } // Update implements predicate.Predicate. -func (m *EventFilter) Update(event.UpdateEvent) bool { - return true +func (f *EventFilter) Update(e event.UpdateEvent) bool { + return e.ObjectOld.GetName() == f.name } // Delete implements predicate.Predicate. -func (m *EventFilter) Delete(event.DeleteEvent) bool { +func (f *EventFilter) Delete(event.DeleteEvent) bool { return false } // Generic implements predicate.Predicate. -func (m *EventFilter) Generic(event.GenericEvent) bool { - return true +func (f *EventFilter) Generic(event.GenericEvent) bool { + return false } diff --git a/internal/controller/mutatingwebhookconfiguration/event_handler.go b/internal/controller/mutatingwebhookconfiguration/event_handler.go index 6aa60c2b2..da3772ccd 100644 --- a/internal/controller/mutatingwebhookconfiguration/event_handler.go +++ b/internal/controller/mutatingwebhookconfiguration/event_handler.go @@ -43,7 +43,7 @@ func (h *EventHandler) Create(ctx context.Context, event event.CreateEvent, queu if !ok { return } - logger.V(1).Info("MutatingWebhookConfiguration created", "name", mwc.Name, "namespace", mwc.Namespace) + logger.Info("MutatingWebhookConfiguration created", "name", mwc.Name, "namespace", mwc.Namespace) key := types.NamespacedName{ Namespace: mwc.Namespace, Name: mwc.Name, @@ -65,7 +65,7 @@ func (h *EventHandler) Update(ctx context.Context, event event.UpdateEvent, queu return } - logger.V(1).Info("MutatingWebhookConfiguration updated", "name", newWebhook.Name, "namespace", newWebhook.Namespace) + logger.Info("MutatingWebhookConfiguration updated", "name", newWebhook.Name, "namespace", newWebhook.Namespace) key := types.NamespacedName{ Namespace: newWebhook.Namespace, Name: newWebhook.Name, @@ -79,7 +79,7 @@ func (h *EventHandler) Delete(ctx context.Context, event event.DeleteEvent, queu if !ok { return } - logger.V(1).Info("MutatingWebhookConfiguration deleted", "name", mwc.Name, "namespace", mwc.Namespace) + logger.Info("MutatingWebhookConfiguration deleted", "name", mwc.Name, "namespace", mwc.Namespace) key := types.NamespacedName{ Namespace: mwc.Namespace, Name: mwc.Name, @@ -93,7 +93,7 @@ func (h *EventHandler) Generic(ctx context.Context, event event.GenericEvent, qu if !ok { return } - logger.V(1).Info("MutatingWebhookConfiguration generic event", "name", mwc.Name, "namespace", mwc.Namespace) + logger.Info("MutatingWebhookConfiguration generic event", "name", mwc.Name, "namespace", mwc.Namespace) key := types.NamespacedName{ Namespace: mwc.Namespace, Name: mwc.Name, diff --git a/internal/controller/scheduledsparkapplication/event_filter.go b/internal/controller/scheduledsparkapplication/event_filter.go index 7387488a5..bd1ddefb2 100644 --- a/internal/controller/scheduledsparkapplication/event_filter.go +++ b/internal/controller/scheduledsparkapplication/event_filter.go @@ -62,7 +62,7 @@ func (f *EventFilter) Update(e event.UpdateEvent) bool { } // Delete implements predicate.Predicate. -func (f *EventFilter) Delete(e event.DeleteEvent) bool { +func (f *EventFilter) Delete(_ event.DeleteEvent) bool { return false } diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 05e7d6acf..5347d3340 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -40,6 +40,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/kubeflow/spark-operator/api/v1beta2" + "github.com/kubeflow/spark-operator/internal/metrics" "github.com/kubeflow/spark-operator/internal/scheduler" "github.com/kubeflow/spark-operator/pkg/common" "github.com/kubeflow/spark-operator/pkg/util" @@ -49,11 +50,15 @@ var ( logger = log.Log.WithName("") ) +// Options defines the options of the controller. type Options struct { Namespaces []string EnableUIService bool IngressClassName string IngressURLFormat string + + SparkApplicationMetrics *metrics.SparkApplicationMetrics + SparkExecutorMetrics *metrics.SparkExecutorMetrics } // Reconciler reconciles a SparkApplication object. @@ -63,18 +68,18 @@ type Reconciler struct { client client.Client recorder record.EventRecorder options Options - metrics *Metrics registry *scheduler.Registry } +// Reconciler implements reconcile.Reconciler. var _ reconcile.Reconciler = &Reconciler{} +// NewReconciler creates a new Reconciler instance. func NewReconciler( manager ctrl.Manager, scheme *runtime.Scheme, client client.Client, recorder record.EventRecorder, - metrics *Metrics, registry *scheduler.Registry, options Options, ) *Reconciler { @@ -83,12 +88,20 @@ func NewReconciler( scheme: scheme, client: client, recorder: recorder, - metrics: metrics, registry: registry, options: options, } } +// +kubebuilder:rbac:groups=,resources=pods,verbs=get;list;watch;create;update;patch;delete;deletecollection +// +kubebuilder:rbac:groups=,resources=configmaps,verbs=get;list;create;update;patch;delete +// +kubebuilder:rbac:groups=,resources=services,verbs=get;create;delete +// +kubebuilder:rbac:groups=,resources=nodes,verbs=get +// +kubebuilder:rbac:groups=,resources=events,verbs=create;update;patch +// +kubebuilder:rbac:groups=,resources=resourcequotas,verbs=get;list;watch +// +kubebuilder:rbac:groups=extensions,resources=ingresses,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=get // +kubebuilder:rbac:groups=sparkoperator.k8s.io,resources=sparkapplications,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=sparkoperator.k8s.io,resources=sparkapplications/status,verbs=get;update;patch // +kubebuilder:rbac:groups=sparkoperator.k8s.io,resources=sparkapplications/finalizers,verbs=update @@ -150,9 +163,14 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error { return ctrl.NewControllerManagedBy(mgr). Named("spark-application-controller"). + Watches( + &corev1.Pod{}, + NewSparkPodEventHandler(mgr.GetClient(), r.options.SparkExecutorMetrics), + builder.WithPredicates(newSparkPodEventFilter(r.options.Namespaces)), + ). Watches( &v1beta2.SparkApplication{}, - NewSparkApplicationEventHandler(nil), + NewSparkApplicationEventHandler(r.options.SparkApplicationMetrics), builder.WithPredicates( NewSparkApplicationEventFilter( mgr.GetClient(), @@ -161,11 +179,6 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager, options controller.Optio ), ), ). - Watches( - &corev1.Pod{}, - NewSparkPodEventHandler(mgr.GetCache()), - builder.WithPredicates(newSparkPodEventFilter(r.options.Namespaces)), - ). WithOptions(options). Complete(r) } @@ -187,7 +200,7 @@ func (r *Reconciler) finalizeSparkApplication(ctx context.Context, req ctrl.Requ } app := old.DeepCopy() - if err := r.deleteSparkResources(app); err != nil { + if err := r.deleteSparkResources(ctx, app); err != nil { logger.Error(err, "Failed to delete resources associated with the SparkApplication", "name", app.Name, "namespace", app.Namespace) return err } @@ -198,9 +211,6 @@ func (r *Reconciler) finalizeSparkApplication(ctx context.Context, req ctrl.Requ return err } - if r.metrics != nil { - r.metrics.exportMetricsOnDeletion(app) - } return nil }, ); retryErr != nil { @@ -298,10 +308,10 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte app := old.DeepCopy() if util.ShouldRetry(app) { if isNextRetryDue(app) { - if r.validateSparkResourceDeletion(app) { + if r.validateSparkResourceDeletion(ctx, app) { _ = r.submitSparkApplication(app) } else { - if err := r.deleteSparkResources(app); err != nil { + if err := r.deleteSparkResources(ctx, app); err != nil { logger.Error(err, "failed to delete resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace) } return err @@ -361,7 +371,7 @@ func (r *Reconciler) reconcilePendingRerunSparkApplication(ctx context.Context, } app := old.DeepCopy() logger.Info("Pending rerun SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) - if r.validateSparkResourceDeletion(app) { + if r.validateSparkResourceDeletion(ctx, app) { logger.Info("Successfully deleted resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) r.recordSparkApplicationEvent(app) r.resetSparkApplicationStatus(app) @@ -393,7 +403,7 @@ func (r *Reconciler) reconcileInvalidatingSparkApplication(ctx context.Context, } app := old.DeepCopy() // Invalidate the current run and enqueue the SparkApplication for re-execution. - if err := r.deleteSparkResources(app); err != nil { + if err := r.deleteSparkResources(ctx, app); err != nil { logger.Error(err, "Failed to delete resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace) } else { r.resetSparkApplicationStatus(app) @@ -423,7 +433,7 @@ func (r *Reconciler) reconcileSucceedingSparkApplication(ctx context.Context, re } app := old.DeepCopy() if util.ShouldRetry(app) { - if err := r.deleteSparkResources(app); err != nil { + if err := r.deleteSparkResources(ctx, app); err != nil { logger.Error(err, "failed to delete spark resources", "name", app.Name, "namespace", app.Namespace) return err } @@ -456,7 +466,7 @@ func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req c app := old.DeepCopy() if util.ShouldRetry(app) { if isNextRetryDue(app) { - if err := r.deleteSparkResources(app); err != nil { + if err := r.deleteSparkResources(ctx, app); err != nil { logger.Error(err, "failed to delete spark resources", "name", app.Name, "namespace", app.Namespace) return err } @@ -601,7 +611,7 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error logger.Info("Submitting SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) if util.PrometheusMonitoringEnabled(app) { - logger.V(1).Info("Configure Prometheus monitoring for SparkApplication", "name", app.Name, "namespace", app.Namespace) + logger.Info("Configure Prometheus monitoring for SparkApplication", "name", app.Name, "namespace", app.Namespace) if err := configPrometheusMonitoring(app, r.client); err != nil { return fmt.Errorf("failed to configure Prometheus monitoring: %v", err) } @@ -609,27 +619,24 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error // Use batch scheduler to perform scheduling task before submitting (before build command arguments). if needScheduling, scheduler := r.shouldDoBatchScheduling(app); needScheduling { - logger.V(1).Info("Do batch scheduling for SparkApplication", "name", app.Name, "namespace", app.Namespace) + logger.Info("Do batch scheduling for SparkApplication", "name", app.Name, "namespace", app.Namespace) if err := scheduler.Schedule(app); err != nil { return fmt.Errorf("failed to process batch scheduler: %v", err) } } - driverInfo := v1beta2.DriverInfo{} - if r.options.EnableUIService { - logger.Info("Creating web UI service for SparkApplication", "name", app.Name, "namespace", app.Namespace) service, err := r.createWebUIService(app) if err != nil { return fmt.Errorf("failed to create web UI service") } - driverInfo.WebUIServiceName = service.serviceName - driverInfo.WebUIPort = service.servicePort - driverInfo.WebUIAddress = fmt.Sprintf("%s:%d", service.serviceIP, app.Status.DriverInfo.WebUIPort) + app.Status.DriverInfo.WebUIServiceName = service.serviceName + app.Status.DriverInfo.WebUIPort = service.servicePort + app.Status.DriverInfo.WebUIAddress = fmt.Sprintf("%s:%d", service.serviceIP, app.Status.DriverInfo.WebUIPort) + logger.Info("Created web UI service for SparkApplication", "name", app.Name, "namespace", app.Namespace) // Create UI Ingress if ingress-format is set. if r.options.IngressURLFormat != "" { - logger.Info("Creating web UI ingress for SparkApplication", "name", app.Name, "namespace", app.Namespace) // We are going to want to use an ingress url. ingressURL, err := getDriverIngressURL(r.options.IngressURLFormat, app.Name, app.Namespace) if err != nil { @@ -647,8 +654,9 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error if err != nil { return fmt.Errorf("failed to create web UI service") } - driverInfo.WebUIIngressAddress = ingress.ingressURL.String() - driverInfo.WebUIIngressName = ingress.ingressName + app.Status.DriverInfo.WebUIIngressAddress = ingress.ingressURL.String() + app.Status.DriverInfo.WebUIIngressName = ingress.ingressName + logger.Info("Created web UI ingress for SparkApplication", "name", app.Name, "namespace", app.Namespace) } } @@ -674,7 +682,7 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error } driverPodName := util.GetDriverPodName(app) - driverInfo.PodName = driverPodName + app.Status.DriverInfo.PodName = driverPodName app.Status.SubmissionID = uuid.New().String() sparkSubmitArgs, err := buildSparkSubmitArgs(app) if err != nil { @@ -698,7 +706,6 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error app.Status.AppState = v1beta2.ApplicationState{ State: v1beta2.ApplicationStateSubmitted, } - app.Status.DriverInfo = driverInfo app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1 app.Status.ExecutionAttempts = app.Status.ExecutionAttempts + 1 app.Status.LastSubmissionAttemptTime = metav1.Now() @@ -899,23 +906,23 @@ func (r *Reconciler) updateSparkApplicationStatus(ctx context.Context, app *v1be } // Delete the resources associated with the spark application. -func (r *Reconciler) deleteSparkResources(app *v1beta2.SparkApplication) error { - if err := r.deleteDriverPod(app); err != nil { +func (r *Reconciler) deleteSparkResources(ctx context.Context, app *v1beta2.SparkApplication) error { + if err := r.deleteDriverPod(ctx, app); err != nil { return err } - if err := r.deleteWebUIService(app); err != nil { + if err := r.deleteWebUIService(ctx, app); err != nil { return err } - if err := r.deleteWebUIIngress(app); err != nil { + if err := r.deleteWebUIIngress(ctx, app); err != nil { return err } return nil } -func (r *Reconciler) deleteDriverPod(app *v1beta2.SparkApplication) error { +func (r *Reconciler) deleteDriverPod(ctx context.Context, app *v1beta2.SparkApplication) error { podName := app.Status.DriverInfo.PodName // Derive the driver pod name in case the driver pod name was not recorded in the status, // which could happen if the status update right after submission failed. @@ -925,7 +932,7 @@ func (r *Reconciler) deleteDriverPod(app *v1beta2.SparkApplication) error { logger.Info("Deleting driver pod", "name", podName, "namespace", app.Namespace) if err := r.client.Delete( - context.TODO(), + ctx, &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: podName, @@ -939,14 +946,14 @@ func (r *Reconciler) deleteDriverPod(app *v1beta2.SparkApplication) error { return nil } -func (r *Reconciler) deleteWebUIService(app *v1beta2.SparkApplication) error { +func (r *Reconciler) deleteWebUIService(ctx context.Context, app *v1beta2.SparkApplication) error { svcName := app.Status.DriverInfo.WebUIServiceName if svcName == "" { return nil } logger.Info("Deleting Spark web UI service", "name", svcName, "namespace", app.Namespace) if err := r.client.Delete( - context.TODO(), + ctx, &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: svcName, @@ -962,16 +969,16 @@ func (r *Reconciler) deleteWebUIService(app *v1beta2.SparkApplication) error { return nil } -func (r *Reconciler) deleteWebUIIngress(app *v1beta2.SparkApplication) error { +func (r *Reconciler) deleteWebUIIngress(ctx context.Context, app *v1beta2.SparkApplication) error { ingressName := app.Status.DriverInfo.WebUIIngressName if ingressName == "" { return nil } if util.IngressCapabilities.Has("networking.k8s.io/v1") { - logger.V(1).Info("Deleting Spark web UI ingress", "name", ingressName, "namespace", app.Namespace) + logger.Info("Deleting Spark web UI ingress", "name", ingressName, "namespace", app.Namespace) if err := r.client.Delete( - context.TODO(), + ctx, &networkingv1.Ingress{ ObjectMeta: metav1.ObjectMeta{ Name: ingressName, @@ -1008,28 +1015,30 @@ func (r *Reconciler) deleteWebUIIngress(app *v1beta2.SparkApplication) error { } // Validate that any Spark resources (driver/Service/Ingress) created for the application have been deleted. -func (r *Reconciler) validateSparkResourceDeletion(app *v1beta2.SparkApplication) bool { +func (r *Reconciler) validateSparkResourceDeletion(ctx context.Context, app *v1beta2.SparkApplication) bool { + // Validate whether driver pod has been deleted. driverPodName := app.Status.DriverInfo.PodName // Derive the driver pod name in case the driver pod name was not recorded in the status, // which could happen if the status update right after submission failed. if driverPodName == "" { driverPodName = util.GetDriverPodName(app) } - - if err := r.client.Get(context.TODO(), types.NamespacedName{Name: driverPodName, Namespace: app.Namespace}, &corev1.Pod{}); err == nil || !errors.IsNotFound(err) { + if err := r.client.Get(ctx, types.NamespacedName{Name: driverPodName, Namespace: app.Namespace}, &corev1.Pod{}); err == nil || !errors.IsNotFound(err) { return false } + // Validate whether Spark web UI service has been deleted. sparkUIServiceName := app.Status.DriverInfo.WebUIServiceName if sparkUIServiceName != "" { - if err := r.client.Get(context.TODO(), types.NamespacedName{Name: sparkUIServiceName, Namespace: app.Namespace}, &corev1.Service{}); err == nil || !errors.IsNotFound(err) { + if err := r.client.Get(ctx, types.NamespacedName{Name: sparkUIServiceName, Namespace: app.Namespace}, &corev1.Service{}); err == nil || !errors.IsNotFound(err) { return false } } + // Validate whether Spark web UI ingress has been deleted. sparkUIIngressName := app.Status.DriverInfo.WebUIIngressName if sparkUIIngressName != "" { - if err := r.client.Get(context.TODO(), types.NamespacedName{Name: sparkUIIngressName, Namespace: app.Namespace}, &networkingv1.Ingress{}); err == nil || !errors.IsNotFound(err) { + if err := r.client.Get(ctx, types.NamespacedName{Name: sparkUIIngressName, Namespace: app.Namespace}, &networkingv1.Ingress{}); err == nil || !errors.IsNotFound(err) { return false } } diff --git a/internal/controller/sparkapplication/controller_test.go b/internal/controller/sparkapplication/controller_test.go index 45f1833cc..07e3b0606 100644 --- a/internal/controller/sparkapplication/controller_test.go +++ b/internal/controller/sparkapplication/controller_test.go @@ -117,7 +117,6 @@ var _ = Describe("SparkApplication Controller", func() { k8sClient, nil, nil, - nil, sparkapplication.Options{Namespaces: []string{appNamespace}}, ) result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) @@ -174,7 +173,6 @@ var _ = Describe("SparkApplication Controller", func() { k8sClient, nil, nil, - nil, sparkapplication.Options{Namespaces: []string{appNamespace}}, ) result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) @@ -226,7 +224,6 @@ var _ = Describe("SparkApplication Controller", func() { k8sClient, nil, nil, - nil, sparkapplication.Options{Namespaces: []string{appNamespace}}, ) result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) @@ -283,7 +280,6 @@ var _ = Describe("SparkApplication Controller", func() { k8sClient, nil, nil, - nil, sparkapplication.Options{Namespaces: []string{appNamespace}}, ) result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) diff --git a/internal/controller/sparkapplication/event_filter.go b/internal/controller/sparkapplication/event_filter.go index 3e166d721..045ee6869 100644 --- a/internal/controller/sparkapplication/event_filter.go +++ b/internal/controller/sparkapplication/event_filter.go @@ -62,12 +62,21 @@ func (f *sparkPodEventFilter) Create(e event.CreateEvent) bool { // Update implements predicate.Predicate. func (f *sparkPodEventFilter) Update(e event.UpdateEvent) bool { - pod, ok := e.ObjectNew.(*corev1.Pod) + oldPod, ok := e.ObjectOld.(*corev1.Pod) if !ok { return false } - return f.filter(pod) + newPod, ok := e.ObjectNew.(*corev1.Pod) + if !ok { + return false + } + + if newPod.Status.Phase == oldPod.Status.Phase { + return false + } + + return f.filter(newPod) } // Delete implements predicate.Predicate. @@ -158,7 +167,7 @@ func (f *EventFilter) Update(e event.UpdateEvent) bool { if !equality.Semantic.DeepEqual(oldApp.Spec, newApp.Spec) { // Force-set the application status to Invalidating which handles clean-up and application re-run. newApp.Status.AppState.State = v1beta2.ApplicationStateInvalidating - logger.V(1).Info("Updating SparkApplication status", "name", newApp.Name, "namespace", newApp.Namespace, " oldState", oldApp.Status.AppState.State, "newState", newApp.Status.AppState.State) + logger.Info("Updating SparkApplication status", "name", newApp.Name, "namespace", newApp.Namespace, " oldState", oldApp.Status.AppState.State, "newState", newApp.Status.AppState.State) if err := f.client.Status().Update(context.TODO(), newApp); err != nil { logger.Error(err, "Failed to update application status", "application", newApp.Name) f.recorder.Eventf( diff --git a/internal/controller/sparkapplication/event_handler.go b/internal/controller/sparkapplication/event_handler.go index fd58cdbc5..134e7f0f0 100644 --- a/internal/controller/sparkapplication/event_handler.go +++ b/internal/controller/sparkapplication/event_handler.go @@ -23,7 +23,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -35,48 +35,83 @@ import ( // SparkPodEventHandler watches Spark pods and update the SparkApplication objects accordingly. type SparkPodEventHandler struct { - cache cache.Cache + client client.Client + metrics *metrics.SparkExecutorMetrics } +// SparkPodEventHandler implements handler.EventHandler. var _ handler.EventHandler = &SparkPodEventHandler{} // NewSparkPodEventHandler creates a new sparkPodEventHandler instance. -func NewSparkPodEventHandler(cache cache.Cache) *SparkPodEventHandler { +func NewSparkPodEventHandler(client client.Client, metrics *metrics.SparkExecutorMetrics) *SparkPodEventHandler { handler := &SparkPodEventHandler{ - cache: cache, + client: client, + metrics: metrics, } return handler } // Create implements handler.EventHandler. func (h *SparkPodEventHandler) Create(ctx context.Context, event event.CreateEvent, queue workqueue.RateLimitingInterface) { - pod := event.Object.(*corev1.Pod) + pod, ok := event.Object.(*corev1.Pod) + if !ok { + return + } logger.V(1).Info("Spark pod created", "name", pod.Name, "namespace", pod.Namespace, "phase", pod.Status.Phase) h.enqueueSparkAppForUpdate(ctx, pod, queue) + + if h.metrics != nil && util.IsExecutorPod(pod) { + h.metrics.HandleSparkExecutorCreate(pod) + } } // Update implements handler.EventHandler. func (h *SparkPodEventHandler) Update(ctx context.Context, event event.UpdateEvent, queue workqueue.RateLimitingInterface) { - oldPod := event.ObjectOld.(*corev1.Pod) - newPod := event.ObjectNew.(*corev1.Pod) - if newPod.ResourceVersion == oldPod.ResourceVersion { + oldPod, ok := event.ObjectOld.(*corev1.Pod) + if !ok { return } - logger.V(1).Info("Spark pod updated", "name", newPod.Name, "namespace", newPod.Namespace, "oldPhase", oldPod.Status.Phase, "newPhase", newPod.Status.Phase) + + newPod, ok := event.ObjectNew.(*corev1.Pod) + if !ok { + return + } + + if newPod.Status.Phase == oldPod.Status.Phase { + return + } + + logger.Info("Spark pod updated", "name", newPod.Name, "namespace", newPod.Namespace, "oldPhase", oldPod.Status.Phase, "newPhase", newPod.Status.Phase) h.enqueueSparkAppForUpdate(ctx, newPod, queue) + + if h.metrics != nil && util.IsExecutorPod(oldPod) && util.IsExecutorPod(newPod) { + h.metrics.HandleSparkExecutorUpdate(oldPod, newPod) + } } // Delete implements handler.EventHandler. func (h *SparkPodEventHandler) Delete(ctx context.Context, event event.DeleteEvent, queue workqueue.RateLimitingInterface) { - pod := event.Object.(*corev1.Pod) - logger.V(1).Info("Spark pod deleted", "name", pod.Name, "namespace", pod.Namespace, "phase", pod.Status.Phase) + pod, ok := event.Object.(*corev1.Pod) + if !ok { + return + } + + logger.Info("Spark pod deleted", "name", pod.Name, "namespace", pod.Namespace, "phase", pod.Status.Phase) h.enqueueSparkAppForUpdate(ctx, pod, queue) + + if h.metrics != nil && util.IsExecutorPod(pod) { + h.metrics.HandleSparkExecutorDelete(pod) + } } // Generic implements handler.EventHandler. func (h *SparkPodEventHandler) Generic(ctx context.Context, event event.GenericEvent, queue workqueue.RateLimitingInterface) { - pod := event.Object.(*corev1.Pod) - logger.V(1).Info("Spark pod generic event ", "name", pod.Name, "namespace", pod.Namespace, "phase", pod.Status.Phase) + pod, ok := event.Object.(*corev1.Pod) + if !ok { + return + } + + logger.Info("Spark pod generic event ", "name", pod.Name, "namespace", pod.Namespace, "phase", pod.Status.Phase) h.enqueueSparkAppForUpdate(ctx, pod, queue) } @@ -91,10 +126,9 @@ func (h *SparkPodEventHandler) enqueueSparkAppForUpdate(ctx context.Context, pod Name: name, } - app := v1beta2.SparkApplication{} - + app := &v1beta2.SparkApplication{} if submissionID, ok := pod.Labels[common.LabelSubmissionID]; ok { - if err := h.cache.Get(ctx, key, &app); err != nil { + if err := h.client.Get(ctx, key, app); err != nil { return } if app.Status.SubmissionID != submissionID { @@ -102,6 +136,11 @@ func (h *SparkPodEventHandler) enqueueSparkAppForUpdate(ctx context.Context, pod } } + // Do not enqueue SparkApplication in invalidating state when driver pod get deleted. + if util.GetApplicationState(app) == v1beta2.ApplicationStateInvalidating { + return + } + queue.AddRateLimited(ctrl.Request{NamespacedName: key}) } @@ -126,8 +165,12 @@ func (h *EventHandler) Create(ctx context.Context, event event.CreateEvent, queu return } - logger.V(1).Info("SparkApplication created", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) + logger.Info("SparkApplication created", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) queue.AddRateLimited(ctrl.Request{NamespacedName: types.NamespacedName{Name: app.Name, Namespace: app.Namespace}}) + + if h.metrics != nil { + h.metrics.HandleSparkApplicationCreate(app) + } } // Update implements handler.EventHandler. @@ -142,9 +185,12 @@ func (h *EventHandler) Update(ctx context.Context, event event.UpdateEvent, queu return } - logger.V(1).Info("SparkApplication updated", "name", oldApp.Name, "namespace", oldApp.Namespace, "oldState", oldApp.Status.AppState.State, "newState", newApp.Status.AppState.State) - + logger.Info("SparkApplication updated", "name", oldApp.Name, "namespace", oldApp.Namespace, "oldState", oldApp.Status.AppState.State, "newState", newApp.Status.AppState.State) queue.AddRateLimited(ctrl.Request{NamespacedName: types.NamespacedName{Name: newApp.Name, Namespace: newApp.Namespace}}) + + if h.metrics != nil { + h.metrics.HandleSparkApplicationUpdate(oldApp, newApp) + } } // Delete implements handler.EventHandler. @@ -154,8 +200,12 @@ func (h *EventHandler) Delete(ctx context.Context, event event.DeleteEvent, queu return } - logger.V(1).Info("SparkApplication deleted", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) + logger.Info("SparkApplication deleted", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) queue.AddRateLimited(ctrl.Request{NamespacedName: types.NamespacedName{Name: app.Name, Namespace: app.Namespace}}) + + if h.metrics != nil { + h.metrics.HandleSparkApplicationDelete(app) + } } // Generic implements handler.EventHandler. @@ -165,6 +215,6 @@ func (h *EventHandler) Generic(ctx context.Context, event event.GenericEvent, qu return } - logger.V(1).Info("SparkApplication generic event", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) + logger.Info("SparkApplication generic event", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) queue.AddRateLimited(ctrl.Request{NamespacedName: types.NamespacedName{Name: app.Name, Namespace: app.Namespace}}) } diff --git a/internal/controller/sparkapplication/metrics.go b/internal/controller/sparkapplication/metrics.go deleted file mode 100644 index caac22ddf..000000000 --- a/internal/controller/sparkapplication/metrics.go +++ /dev/null @@ -1,214 +0,0 @@ -/* -Copyright 2018 Google LLC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sparkapplication - -import ( - "time" - - "github.com/golang/glog" - "github.com/prometheus/client_golang/prometheus" - - "github.com/kubeflow/spark-operator/api/v1beta2" - "github.com/kubeflow/spark-operator/pkg/util" -) - -type Metrics struct { - labels []string - prefix string - - sparkAppCount *prometheus.CounterVec - sparkAppSubmitCount *prometheus.CounterVec - sparkAppFailedSubmissionCount *prometheus.CounterVec - sparkAppSuccessCount *prometheus.CounterVec - sparkAppFailureCount *prometheus.CounterVec - - sparkAppExecutorSuccessCount *prometheus.CounterVec - sparkAppExecutorFailureCount *prometheus.CounterVec - - sparkAppSuccessExecutionTime *prometheus.SummaryVec - sparkAppFailureExecutionTime *prometheus.SummaryVec - - sparkAppRunningCount *util.PositiveGauge - sparkAppExecutorRunningCount *util.PositiveGauge - - sparkAppStartLatency *prometheus.SummaryVec - sparkAppStartLatencyHistogram *prometheus.HistogramVec -} - -func (m *Metrics) exportMetricsOnDeletion(app *v1beta2.SparkApplication) { - metricLabels := m.fetchMetricLabels(app) - state := app.Status.AppState.State - if state == v1beta2.ApplicationStateRunning { - logger.V(1).Info("Decreasing spark application running count", "name", app.Name, "namespace", app.Namespace, "state") - m.sparkAppRunningCount.Dec(metricLabels) - } - for executorName, executorState := range app.Status.ExecutorState { - if executorState == v1beta2.ExecutorStateRunning { - logger.V(1).Info("Decreasing executor pod running count", "name", executorName, "namespace", app.Namespace) - m.sparkAppExecutorRunningCount.Dec(metricLabels) - } - } -} - -func (m *Metrics) exportMetricsOnUpdate(oldApp, newApp *v1beta2.SparkApplication) { - metricLabels := m.fetchMetricLabels(newApp) - - oldState := oldApp.Status.AppState.State - newState := newApp.Status.AppState.State - if newState != oldState { - if oldState == v1beta2.ApplicationStateNew { - if m, err := m.sparkAppCount.GetMetricWith(metricLabels); err != nil { - glog.Errorf("Error while exporting metrics: %v", err) - } else { - m.Inc() - } - } - - switch newState { - case v1beta2.ApplicationStateSubmitted: - if m, err := m.sparkAppSubmitCount.GetMetricWith(metricLabels); err != nil { - glog.Errorf("Error while exporting metrics: %v", err) - } else { - m.Inc() - } - case v1beta2.ApplicationStateRunning: - m.sparkAppRunningCount.Inc(metricLabels) - m.exportJobStartLatencyMetrics(newApp, metricLabels) - case v1beta2.ApplicationStateSucceeding: - if !newApp.Status.LastSubmissionAttemptTime.Time.IsZero() && !newApp.Status.TerminationTime.Time.IsZero() { - d := newApp.Status.TerminationTime.Time.Sub(newApp.Status.LastSubmissionAttemptTime.Time) - if m, err := m.sparkAppSuccessExecutionTime.GetMetricWith(metricLabels); err != nil { - glog.Errorf("Error while exporting metrics: %v", err) - } else { - m.Observe(float64(d / time.Microsecond)) - } - } - m.sparkAppRunningCount.Dec(metricLabels) - if m, err := m.sparkAppSuccessCount.GetMetricWith(metricLabels); err != nil { - glog.Errorf("Error while exporting metrics: %v", err) - } else { - m.Inc() - } - case v1beta2.ApplicationStateFailing: - if !newApp.Status.LastSubmissionAttemptTime.Time.IsZero() && !newApp.Status.TerminationTime.Time.IsZero() { - d := newApp.Status.TerminationTime.Time.Sub(newApp.Status.LastSubmissionAttemptTime.Time) - if m, err := m.sparkAppFailureExecutionTime.GetMetricWith(metricLabels); err != nil { - glog.Errorf("Error while exporting metrics: %v", err) - } else { - m.Observe(float64(d / time.Microsecond)) - } - } - m.sparkAppRunningCount.Dec(metricLabels) - if m, err := m.sparkAppFailureCount.GetMetricWith(metricLabels); err != nil { - glog.Errorf("Error while exporting metrics: %v", err) - } else { - m.Inc() - } - case v1beta2.ApplicationStateFailedSubmission: - if m, err := m.sparkAppFailedSubmissionCount.GetMetricWith(metricLabels); err != nil { - glog.Errorf("Error while exporting metrics: %v", err) - } else { - m.Inc() - } - } - } - - // In the event that state transitions happened too quickly and the spark app skipped the RUNNING state, the job - // start latency should still be captured. - // Note: There is an edge case that a Submitted state can go directly to a Failing state if the driver pod is - // deleted. This is very unlikely if not being done intentionally, so we choose not to handle it. - if newState != oldState { - if (newState == v1beta2.ApplicationStateFailing || newState == v1beta2.ApplicationStateSucceeding) && oldState == v1beta2.ApplicationStateSubmitted { - // TODO: remove this log once we've gathered some data in prod fleets. - glog.V(2).Infof("Calculating job start latency metrics for edge case transition from %v to %v in app %v in namespace %v.", oldState, newState, newApp.Name, newApp.Namespace) - m.exportJobStartLatencyMetrics(newApp, metricLabels) - } - } - - oldExecutorStates := oldApp.Status.ExecutorState - // Potential Executor status updates - for executor, newExecState := range newApp.Status.ExecutorState { - switch newExecState { - case v1beta2.ExecutorStateRunning: - if oldExecutorStates[executor] != newExecState { - glog.V(2).Infof("Exporting Metrics for Executor %s. OldState: %v NewState: %v", executor, - oldExecutorStates[executor], newExecState) - m.sparkAppExecutorRunningCount.Inc(metricLabels) - } - case v1beta2.ExecutorStateCompleted: - if oldExecutorStates[executor] != newExecState { - glog.V(2).Infof("Exporting Metrics for Executor %s. OldState: %v NewState: %v", executor, - oldExecutorStates[executor], newExecState) - m.sparkAppExecutorRunningCount.Dec(metricLabels) - if m, err := m.sparkAppExecutorSuccessCount.GetMetricWith(metricLabels); err != nil { - glog.Errorf("Error while exporting metrics: %v", err) - } else { - m.Inc() - } - } - case v1beta2.ExecutorStateFailed: - if oldExecutorStates[executor] != newExecState { - glog.V(2).Infof("Exporting Metrics for Executor %s. OldState: %v NewState: %v", executor, - oldExecutorStates[executor], newExecState) - m.sparkAppExecutorRunningCount.Dec(metricLabels) - if m, err := m.sparkAppExecutorFailureCount.GetMetricWith(metricLabels); err != nil { - glog.Errorf("Error while exporting metrics: %v", err) - } else { - m.Inc() - } - } - } - } -} - -func (m *Metrics) exportJobStartLatencyMetrics(app *v1beta2.SparkApplication, labels map[string]string) { - // Expose the job start latency related metrics of an SparkApp only once when it runs for the first time - if app.Status.ExecutionAttempts == 1 { - latency := time.Since(app.CreationTimestamp.Time) - if m, err := m.sparkAppStartLatency.GetMetricWith(labels); err != nil { - glog.Errorf("Error while exporting metrics: %v", err) - } else { - m.Observe(float64(latency / time.Microsecond)) - } - if m, err := m.sparkAppStartLatencyHistogram.GetMetricWith(labels); err != nil { - glog.Errorf("Error while exporting metrics: %v", err) - } else { - m.Observe(float64(latency / time.Second)) - } - } -} - -func (m *Metrics) fetchMetricLabels(app *v1beta2.SparkApplication) map[string]string { - // Convert app labels into ones that can be used as metric labels. - validLabels := make(map[string]string) - for key, val := range app.Labels { - newKey := util.CreateValidMetricNameLabel("", key) - validLabels[newKey] = val - } - - metricLabels := make(map[string]string) - for _, label := range m.labels { - if value, ok := validLabels[label]; ok { - metricLabels[label] = value - } else if label == "namespace" { // If the "namespace" label is in the metrics config, use it. - metricLabels[label] = app.Namespace - } else { - metricLabels[label] = "Unknown" - } - } - return metricLabels -} diff --git a/internal/controller/sparkapplication/metrics_test.go b/internal/controller/sparkapplication/metrics_test.go deleted file mode 100644 index 37bf484b7..000000000 --- a/internal/controller/sparkapplication/metrics_test.go +++ /dev/null @@ -1,69 +0,0 @@ -/* -Copyright 2018 Google LLC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sparkapplication - -import ( - "testing" -) - -func TestSparkAppMetrics(t *testing.T) { - // http.DefaultServeMux = new(http.ServeMux) - // // Test with label containing "-". Expect them to be converted to "_". - // metricsConfig := &util.MetricConfig{ - // MetricsPrefix: "", - // MetricsLabels: []string{"app-id", "namespace"}, - // MetricsJobStartLatencyBuckets: []float64{30, 60, 90, 120}, - // } - // metrics := NewMetrics(metricsConfig) - // app1 := map[string]string{"app_id": "test1", "namespace": "default"} - - // var wg sync.WaitGroup - // wg.Add(1) - // go func() { - // for i := 0; i < 10; i++ { - // metrics.sparkAppCount.With(app1).Inc() - // metrics.sparkAppSubmitCount.With(app1).Inc() - // metrics.sparkAppRunningCount.Inc(app1) - // metrics.sparkAppSuccessCount.With(app1).Inc() - // metrics.sparkAppFailureCount.With(app1).Inc() - // metrics.sparkAppFailedSubmissionCount.With(app1).Inc() - // metrics.sparkAppSuccessExecutionTime.With(app1).Observe(float64(100 * i)) - // metrics.sparkAppFailureExecutionTime.With(app1).Observe(float64(500 * i)) - // metrics.sparkAppStartLatency.With(app1).Observe(float64(10 * i)) - // metrics.sparkAppStartLatencyHistogram.With(app1).Observe(float64(10 * i)) - // metrics.sparkAppExecutorRunningCount.Inc(app1) - // metrics.sparkAppExecutorSuccessCount.With(app1).Inc() - // metrics.sparkAppExecutorFailureCount.With(app1).Inc() - // } - // for i := 0; i < 5; i++ { - // metrics.sparkAppRunningCount.Dec(app1) - // metrics.sparkAppExecutorRunningCount.Dec(app1) - // } - // wg.Done() - // }() - - // wg.Wait() - // assert.InEpsilon(t, float64(10), util.FetchCounterValue(metrics.sparkAppCount, app1), common.Epsilon) - // assert.InEpsilon(t, float64(10), util.FetchCounterValue(metrics.sparkAppSubmitCount, app1), common.Epsilon) - // assert.InEpsilon(t, float64(5), metrics.sparkAppRunningCount.Value(app1), common.Epsilon) - // assert.InEpsilon(t, float64(10), util.FetchCounterValue(metrics.sparkAppSuccessCount, app1), common.Epsilon) - // assert.InEpsilon(t, float64(10), util.FetchCounterValue(metrics.sparkAppFailureCount, app1), common.Epsilon) - // assert.InEpsilon(t, float64(10), util.FetchCounterValue(metrics.sparkAppFailedSubmissionCount, app1), common.Epsilon) - // assert.InEpsilon(t, float64(5), metrics.sparkAppExecutorRunningCount.Value(app1), common.Epsilon) - // assert.InEpsilon(t, float64(10), util.FetchCounterValue(metrics.sparkAppExecutorFailureCount, app1), common.Epsilon) - // assert.InEpsilon(t, float64(10), util.FetchCounterValue(metrics.sparkAppExecutorSuccessCount, app1), common.Epsilon) -} diff --git a/internal/controller/sparkapplication/submission.go b/internal/controller/sparkapplication/submission.go index de35e4a55..5d40ea16f 100644 --- a/internal/controller/sparkapplication/submission.go +++ b/internal/controller/sparkapplication/submission.go @@ -68,17 +68,17 @@ func runSparkSubmit(submission *submission) (bool, error) { return true, nil } +// buildSparkSubmitArgs builds the arguments for spark-submit. func buildSparkSubmitArgs(app *v1beta2.SparkApplication) ([]string, error) { optionFuncs := []sparkSubmitOptionFunc{ masterOption, - mainClassOption, deployModeOption, - proxyUserOption, + mainClassOption, nameOption, + dependenciesOption, namespaceOption, imageOption, pythonVersionOption, - dependenciesOption, memoryOverheadFactorOption, submissionWaitAppCompletionOption, sparkConfOption, @@ -94,6 +94,7 @@ func buildSparkSubmitArgs(app *v1beta2.SparkApplication) ([]string, error) { executorEnvOption, nodeSelectorOption, dynamicAllocationOption, + proxyUserOption, mainApplicationFileOption, applicationOption, } @@ -112,27 +113,16 @@ func buildSparkSubmitArgs(app *v1beta2.SparkApplication) ([]string, error) { type sparkSubmitOptionFunc func(*v1beta2.SparkApplication) ([]string, error) -func mainClassOption(app *v1beta2.SparkApplication) ([]string, error) { - if app.Spec.MainClass == nil { - return nil, nil - } - options := []string{ - "--class", - *app.Spec.MainClass, - } - return options, nil -} - func masterOption(_ *v1beta2.SparkApplication) ([]string, error) { masterURL, err := util.GetMasterURL() if err != nil { return nil, fmt.Errorf("failed to get master URL: %v", err) } - options := []string{ + args := []string{ "--master", masterURL, } - return options, nil + return args, nil } func deployModeOption(app *v1beta2.SparkApplication) ([]string, error) { @@ -143,29 +133,26 @@ func deployModeOption(app *v1beta2.SparkApplication) ([]string, error) { return args, nil } -func proxyUserOption(app *v1beta2.SparkApplication) ([]string, error) { - if app.Spec.ProxyUser == nil || *app.Spec.ProxyUser == "" { +func mainClassOption(app *v1beta2.SparkApplication) ([]string, error) { + if app.Spec.MainClass == nil { return nil, nil } args := []string{ - "--proxy-user", - *app.Spec.ProxyUser, + "--class", + *app.Spec.MainClass, } return args, nil } -func namespaceOption(app *v1beta2.SparkApplication) ([]string, error) { - args := []string{ - "--conf", - fmt.Sprintf("%s=%s", common.SparkKubernetesNamespace, app.Namespace), - } +func nameOption(app *v1beta2.SparkApplication) ([]string, error) { + args := []string{"--name", app.Name} return args, nil } -func nameOption(app *v1beta2.SparkApplication) ([]string, error) { +func namespaceOption(app *v1beta2.SparkApplication) ([]string, error) { args := []string{ "--conf", - fmt.Sprintf("%s=%s", common.SparkAppName, app.Name), + fmt.Sprintf("%s=%s", common.SparkKubernetesNamespace, app.Namespace), } return args, nil } @@ -184,22 +171,27 @@ func dependenciesOption(app *v1beta2.SparkApplication) ([]string, error) { if len(app.Spec.Deps.Jars) > 0 { args = append(args, "--jars", strings.Join(app.Spec.Deps.Jars, ",")) } - if len(app.Spec.Deps.Files) > 0 { - args = append(args, "--files", strings.Join(app.Spec.Deps.Files, ",")) - } - if len(app.Spec.Deps.PyFiles) > 0 { - args = append(args, "--py-files", strings.Join(app.Spec.Deps.PyFiles, ",")) - } + if len(app.Spec.Deps.Packages) > 0 { args = append(args, "--packages", strings.Join(app.Spec.Deps.Packages, ",")) } + if len(app.Spec.Deps.ExcludePackages) > 0 { args = append(args, "--exclude-packages", strings.Join(app.Spec.Deps.ExcludePackages, ",")) } + if len(app.Spec.Deps.Repositories) > 0 { args = append(args, "--repositories", strings.Join(app.Spec.Deps.Repositories, ",")) } + if len(app.Spec.Deps.PyFiles) > 0 { + args = append(args, "--py-files", strings.Join(app.Spec.Deps.PyFiles, ",")) + } + + if len(app.Spec.Deps.Files) > 0 { + args = append(args, "--files", strings.Join(app.Spec.Deps.Files, ",")) + } + return args, nil } @@ -790,6 +782,17 @@ func dynamicAllocationOption(app *v1beta2.SparkApplication) ([]string, error) { return args, nil } +func proxyUserOption(app *v1beta2.SparkApplication) ([]string, error) { + if app.Spec.ProxyUser == nil || *app.Spec.ProxyUser == "" { + return nil, nil + } + args := []string{ + "--proxy-user", + *app.Spec.ProxyUser, + } + return args, nil +} + func mainApplicationFileOption(app *v1beta2.SparkApplication) ([]string, error) { if app.Spec.MainApplicationFile == nil { return nil, nil diff --git a/internal/controller/sparkapplication/web_ui.go b/internal/controller/sparkapplication/web_ui.go index d2cc1c425..284b4b6f4 100644 --- a/internal/controller/sparkapplication/web_ui.go +++ b/internal/controller/sparkapplication/web_ui.go @@ -54,19 +54,15 @@ func (r *Reconciler) createWebUIIngress(app *v1beta2.SparkApplication, service S return r.createDriverIngressLegacy(app, service, ingressName, ingressURL) } -// getWebUITargetPort attempts to get the Spark web UI port from configuration property spark.ui.port -// in Spec.SparkConf if it is present, otherwise the default port is returned. -// Note that we don't attempt to get the port from Spec.SparkConfigMap. -func getWebUITargetPort(app *v1beta2.SparkApplication) (int32, error) { - portStr, ok := app.Spec.SparkConf[common.SparkUIPortKey] - if !ok { - return common.DefaultSparkWebUIPort, nil +func getWebUIServicePortName(app *v1beta2.SparkApplication) string { + if app.Spec.SparkUIOptions == nil { + return common.DefaultSparkWebUIPortName } - port, err := strconv.Atoi(portStr) - if err != nil { - return common.DefaultSparkWebUIPort, nil + portName := app.Spec.SparkUIOptions.ServicePortName + if portName != nil { + return *portName } - return int32(port), nil + return common.DefaultSparkWebUIPortName } func getWebUIServicePort(app *v1beta2.SparkApplication) (int32, error) { @@ -80,13 +76,17 @@ func getWebUIServicePort(app *v1beta2.SparkApplication) (int32, error) { return common.DefaultSparkWebUIPort, nil } -func getWebUIServicePortName(app *v1beta2.SparkApplication) string { - if app.Spec.SparkUIOptions == nil { - return common.DefaultSparkWebUIPortName +// getWebUITargetPort attempts to get the Spark web UI port from configuration property spark.ui.port +// in Spec.SparkConf if it is present, otherwise the default port is returned. +// Note that we don't attempt to get the port from Spec.SparkConfigMap. +func getWebUITargetPort(app *v1beta2.SparkApplication) (int32, error) { + portStr, ok := app.Spec.SparkConf[common.SparkUIPortKey] + if !ok { + return common.DefaultSparkWebUIPort, nil } - portName := app.Spec.SparkUIOptions.ServicePortName - if portName != nil { - return *portName + port, err := strconv.Atoi(portStr) + if err != nil { + return common.DefaultSparkWebUIPort, nil } - return common.DefaultSparkWebUIPortName + return int32(port), nil } diff --git a/internal/controller/validatingwebhookconfiguration/event_filter.go b/internal/controller/validatingwebhookconfiguration/event_filter.go index 7a5b9c71f..d78076e4b 100644 --- a/internal/controller/validatingwebhookconfiguration/event_filter.go +++ b/internal/controller/validatingwebhookconfiguration/event_filter.go @@ -36,21 +36,21 @@ func NewEventFilter(name string) *EventFilter { var _ predicate.Predicate = &EventFilter{} // Create implements predicate.Predicate. -func (v *EventFilter) Create(event.CreateEvent) bool { - return true +func (f *EventFilter) Create(e event.CreateEvent) bool { + return e.Object.GetName() == f.name } // Update implements predicate.Predicate. -func (v *EventFilter) Update(event.UpdateEvent) bool { - return true +func (f *EventFilter) Update(e event.UpdateEvent) bool { + return e.ObjectOld.GetName() == f.name } // Delete implements predicate.Predicate. -func (v *EventFilter) Delete(event.DeleteEvent) bool { +func (f *EventFilter) Delete(event.DeleteEvent) bool { return false } // Generic implements predicate.Predicate. -func (v *EventFilter) Generic(event.GenericEvent) bool { - return true +func (f *EventFilter) Generic(event.GenericEvent) bool { + return false } diff --git a/internal/controller/validatingwebhookconfiguration/event_handler.go b/internal/controller/validatingwebhookconfiguration/event_handler.go index a47b765a9..065fd0ec3 100644 --- a/internal/controller/validatingwebhookconfiguration/event_handler.go +++ b/internal/controller/validatingwebhookconfiguration/event_handler.go @@ -43,7 +43,7 @@ func (h *EventHandler) Create(ctx context.Context, event event.CreateEvent, queu if !ok { return } - logger.V(1).Info("ValidatingWebhookConfiguration created", "name", vwc.Name, "namespace", vwc.Namespace) + logger.Info("ValidatingWebhookConfiguration created", "name", vwc.Name, "namespace", vwc.Namespace) key := types.NamespacedName{ Namespace: vwc.Namespace, Name: vwc.Name, @@ -65,7 +65,7 @@ func (h *EventHandler) Update(ctx context.Context, event event.UpdateEvent, queu return } - logger.V(1).Info("ValidatingWebhookConfiguration updated", "name", newWebhook.Name, "namespace", newWebhook.Namespace) + logger.Info("ValidatingWebhookConfiguration updated", "name", newWebhook.Name, "namespace", newWebhook.Namespace) key := types.NamespacedName{ Namespace: newWebhook.Namespace, Name: newWebhook.Name, @@ -79,7 +79,7 @@ func (h *EventHandler) Delete(ctx context.Context, event event.DeleteEvent, queu if !ok { return } - logger.V(1).Info("ValidatingWebhookConfiguration deleted", "name", vwc.Name, "namespace", vwc.Namespace) + logger.Info("ValidatingWebhookConfiguration deleted", "name", vwc.Name, "namespace", vwc.Namespace) key := types.NamespacedName{ Namespace: vwc.Namespace, Name: vwc.Name, @@ -93,7 +93,7 @@ func (h *EventHandler) Generic(ctx context.Context, event event.GenericEvent, qu if !ok { return } - logger.V(1).Info("ValidatingWebhookConfiguration generic event", "name", vwc.Name, "namespace", vwc.Namespace) + logger.Info("ValidatingWebhookConfiguration generic event", "name", vwc.Name, "namespace", vwc.Namespace) key := types.NamespacedName{ Namespace: vwc.Namespace, Name: vwc.Name, diff --git a/internal/metrics/metrcis.go b/internal/metrics/metrcis.go index 92543e971..825010755 100644 --- a/internal/metrics/metrcis.go +++ b/internal/metrics/metrcis.go @@ -19,5 +19,5 @@ package metrics import "sigs.k8s.io/controller-runtime/pkg/log" var ( - logger = log.Log.WithName("metrics") + logger = log.Log.WithName("") ) diff --git a/internal/metrics/sparkapplication_metrics.go b/internal/metrics/sparkapplication_metrics.go index bcffdcf6a..4aa1fc1dd 100644 --- a/internal/metrics/sparkapplication_metrics.go +++ b/internal/metrics/sparkapplication_metrics.go @@ -26,97 +26,98 @@ import ( ) type SparkApplicationMetrics struct { - Prefix string - Labels []string - JobStartLatencyBuckets []float64 - - Count *prometheus.CounterVec - SubmitCount *prometheus.CounterVec - FailedSubmissionCount *prometheus.CounterVec - RunningCount *prometheus.GaugeVec - SuccessCount *prometheus.CounterVec - FailureCount *prometheus.CounterVec - - SuccessExecutionTime *prometheus.SummaryVec - FailureExecutionTime *prometheus.SummaryVec - - StartLatency *prometheus.SummaryVec - StartLatencyHistogram *prometheus.HistogramVec + prefix string + labels []string + jobStartLatencyBuckets []float64 + + count *prometheus.CounterVec + submitCount *prometheus.CounterVec + failedSubmissionCount *prometheus.CounterVec + runningCount *prometheus.GaugeVec + successCount *prometheus.CounterVec + failureCount *prometheus.CounterVec + + successExecutionTime *prometheus.SummaryVec + failureExecutionTime *prometheus.SummaryVec + + startLatency *prometheus.SummaryVec + startLatencyHistogram *prometheus.HistogramVec } func NewSparkApplicationMetrics(prefix string, labels []string, jobStartLatencyBuckets []float64) *SparkApplicationMetrics { validLabels := make([]string, 0, len(labels)) - for i, label := range labels { - validLabels[i] = util.CreateValidMetricNameLabel("", label) + for _, label := range labels { + validLabel := util.CreateValidMetricNameLabel("", label) + validLabels = append(validLabels, validLabel) } return &SparkApplicationMetrics{ - Prefix: prefix, - Labels: validLabels, - JobStartLatencyBuckets: jobStartLatencyBuckets, + prefix: prefix, + labels: validLabels, + jobStartLatencyBuckets: jobStartLatencyBuckets, - Count: prometheus.NewCounterVec( + count: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationCount), Help: "Total number of SparkApplication", }, validLabels, ), - SubmitCount: prometheus.NewCounterVec( + submitCount: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationSubmitCount), Help: "Total number of submitted SparkApplication", }, validLabels, ), - FailedSubmissionCount: prometheus.NewCounterVec( + failedSubmissionCount: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationFailedSubmissionCount), Help: "Total number of failed SparkApplication submission", }, validLabels, ), - RunningCount: prometheus.NewGaugeVec( + runningCount: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationRunningCount), Help: "Total number of running SparkApplication", }, validLabels, ), - SuccessCount: prometheus.NewCounterVec( + successCount: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationSuccessCount), Help: "Total number of successful SparkApplication", }, validLabels, ), - FailureCount: prometheus.NewCounterVec( + failureCount: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationFailureCount), Help: "Total number of failed SparkApplication", }, validLabels, ), - SuccessExecutionTime: prometheus.NewSummaryVec( + successExecutionTime: prometheus.NewSummaryVec( prometheus.SummaryOpts{ Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationSuccessExecutionTimeSeconds), }, validLabels, ), - FailureExecutionTime: prometheus.NewSummaryVec( + failureExecutionTime: prometheus.NewSummaryVec( prometheus.SummaryOpts{ Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationFailureExecutionTimeSeconds), }, validLabels, ), - StartLatency: prometheus.NewSummaryVec( + startLatency: prometheus.NewSummaryVec( prometheus.SummaryOpts{ Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationStartLatencySeconds), Help: "Spark App Start Latency via the Operator", }, validLabels, ), - StartLatencyHistogram: prometheus.NewHistogramVec( + startLatencyHistogram: prometheus.NewHistogramVec( prometheus.HistogramOpts{ Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationStartLatencyHistogram), Help: "Spark App Start Latency counts in buckets via the Operator", @@ -128,87 +129,195 @@ func NewSparkApplicationMetrics(prefix string, labels []string, jobStartLatencyB } func (m *SparkApplicationMetrics) Register() { - if err := metrics.Registry.Register(m.Count); err != nil { + if err := metrics.Registry.Register(m.count); err != nil { logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationCount) } - if err := metrics.Registry.Register(m.SubmitCount); err != nil { + if err := metrics.Registry.Register(m.submitCount); err != nil { logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationSubmitCount) } - if err := metrics.Registry.Register(m.FailedSubmissionCount); err != nil { + if err := metrics.Registry.Register(m.failedSubmissionCount); err != nil { logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationFailedSubmissionCount) } - if err := metrics.Registry.Register(m.RunningCount); err != nil { + if err := metrics.Registry.Register(m.runningCount); err != nil { logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationRunningCount) } - if err := metrics.Registry.Register(m.SuccessCount); err != nil { + if err := metrics.Registry.Register(m.successCount); err != nil { logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationSuccessCount) } - if err := metrics.Registry.Register(m.FailureCount); err != nil { + if err := metrics.Registry.Register(m.failureCount); err != nil { logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationFailureCount) } - if err := metrics.Registry.Register(m.SuccessExecutionTime); err != nil { + if err := metrics.Registry.Register(m.successExecutionTime); err != nil { logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationSuccessExecutionTimeSeconds) } - if err := metrics.Registry.Register(m.FailureExecutionTime); err != nil { + if err := metrics.Registry.Register(m.failureExecutionTime); err != nil { logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationFailureExecutionTimeSeconds) } - if err := metrics.Registry.Register(m.StartLatency); err != nil { + if err := metrics.Registry.Register(m.startLatency); err != nil { logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationStartLatencySeconds) } - if err := metrics.Registry.Register(m.StartLatencyHistogram); err != nil { + if err := metrics.Registry.Register(m.startLatencyHistogram); err != nil { logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationStartLatencyHistogram) } } func (m *SparkApplicationMetrics) HandleSparkApplicationCreate(app *v1beta2.SparkApplication) { - labels := m.getMetricLabels(app) - if counter, err := m.Count.GetMetricWith(labels); err != nil { - logger.Error(err, "Failed to export spark application metric", "name", common.MetricSparkApplicationCount) - } else { - counter.Inc() + state := util.GetApplicationState(app) + + switch state { + case v1beta2.ApplicationStateNew: + m.incCount(app) + case v1beta2.ApplicationStateSubmitted: + m.incSubmitCount(app) + case v1beta2.ApplicationStateFailedSubmission: + m.incFailedSubmissionCount(app) + case v1beta2.ApplicationStateRunning: + m.incRunningCount(app) + case v1beta2.ApplicationStateFailed: + m.incFailureCount(app) + case v1beta2.ApplicationStateCompleted: + m.incSuccessCount(app) } } func (m *SparkApplicationMetrics) HandleSparkApplicationUpdate(oldApp *v1beta2.SparkApplication, newApp *v1beta2.SparkApplication) { - // labels := m.getMetricLabels(newApp) - oldState := oldApp.Status.AppState.State - newState := newApp.Status.AppState.State + oldState := util.GetApplicationState(oldApp) + newState := util.GetApplicationState(newApp) if newState == oldState { return } + switch oldState { + case v1beta2.ApplicationStateRunning: + m.decRunningCount(oldApp) + } + switch newState { + case v1beta2.ApplicationStateNew: + m.incCount(newApp) case v1beta2.ApplicationStateSubmitted: - if oldState == v1beta2.ApplicationStateNew { - - } + m.incSubmitCount(newApp) case v1beta2.ApplicationStateFailedSubmission: - if oldState == v1beta2.ApplicationStateNew { - } + m.incFailedSubmissionCount(newApp) case v1beta2.ApplicationStateRunning: + m.incRunningCount(newApp) case v1beta2.ApplicationStateFailed: + m.incFailureCount(newApp) case v1beta2.ApplicationStateCompleted: - case v1beta2.ApplicationStateUnknown: + m.incSuccessCount(newApp) } } func (m *SparkApplicationMetrics) HandleSparkApplicationDelete(app *v1beta2.SparkApplication) { - labels := m.getMetricLabels(app) - state := app.Status.AppState.State + state := util.GetApplicationState(app) switch state { case v1beta2.ApplicationStateRunning: - if counter, err := m.RunningCount.GetMetricWith(labels); err != nil { - logger.Error(err, "Failed to export spark application metric", "name", common.MetricSparkApplicationCount) - } else { - counter.Dec() - } + m.decRunningCount(app) + } +} + +func (m *SparkApplicationMetrics) incCount(app *v1beta2.SparkApplication) { + labels := m.getMetricLabels(app) + counter, err := m.count.GetMetricWith(labels) + if err != nil { + logger.Error(err, "Failed to get spark application metric", "name", common.MetricSparkApplicationCount, "labels", labels) + return + } + + logger.V(1).Info("Increasing SparkApplication count", "name", app.Name, "namespace", app.Namespace) + counter.Inc() +} + +func (m *SparkApplicationMetrics) incSubmitCount(app *v1beta2.SparkApplication) { + labels := m.getMetricLabels(app) + counter, err := m.submitCount.GetMetricWith(labels) + if err != nil { + logger.Error(err, "Failed to get spark application metric", "name", common.MetricSparkApplicationSubmitCount, "labels", labels) + return + } + + logger.V(1).Info("Increasing SparkApplication submit count", "name", app.Name, "namespace", app.Namespace) + counter.Inc() +} + +func (m *SparkApplicationMetrics) incFailedSubmissionCount(app *v1beta2.SparkApplication) { + labels := m.getMetricLabels(app) + counter, err := m.failedSubmissionCount.GetMetricWith(labels) + if err != nil { + logger.Error(err, "Failed to get spark application metric", "name", common.MetricSparkApplicationFailedSubmissionCount, "labels", labels) + return } - // logger.V(1).Info("Decreasing spark application running count", "name", app.Name, "namespace", app.Namespace, "state") + logger.V(1).Info("Increasing SparkApplication failed submission count", "name", app.Name, "namespace", app.Namespace) + counter.Inc() } -func (m *SparkApplicationMetrics) getMetricLabels(_ *v1beta2.SparkApplication) map[string]string { - labels := make(map[string]string) - return labels +func (m *SparkApplicationMetrics) incRunningCount(app *v1beta2.SparkApplication) { + labels := m.getMetricLabels(app) + gauge, err := m.runningCount.GetMetricWith(labels) + if err != nil { + logger.Error(err, "Failed to get spark application metric", "name", common.MetricSparkApplicationRunningCount, "labels", labels) + return + } + + logger.V(1).Info("Increasing SparkApplication running count", "name", app.Name, "namespace", app.Namespace) + gauge.Inc() +} + +func (m *SparkApplicationMetrics) decRunningCount(app *v1beta2.SparkApplication) { + labels := m.getMetricLabels(app) + gauge, err := m.runningCount.GetMetricWith(labels) + if err != nil { + logger.Error(err, "Failed to get spark application metric", "name", common.MetricSparkApplicationRunningCount, "labels", labels) + return + } + + logger.V(1).Info("Decreasing SparkApplication running count", "name", app.Name, "namespace", app.Namespace) + gauge.Dec() +} + +func (m *SparkApplicationMetrics) incSuccessCount(app *v1beta2.SparkApplication) { + labels := m.getMetricLabels(app) + counter, err := m.successCount.GetMetricWith(labels) + if err != nil { + logger.Error(err, "Failed to get spark application metric", "name", common.MetricSparkApplicationSuccessCount, "labels", labels) + return + } + + logger.V(1).Info("Increasing SparkApplication success count", "name", app.Name, "namespace", app.Namespace) + counter.Inc() +} + +func (m *SparkApplicationMetrics) incFailureCount(app *v1beta2.SparkApplication) { + labels := m.getMetricLabels(app) + counter, err := m.failureCount.GetMetricWith(labels) + if err != nil { + logger.Error(err, "Failed to get spark application metric", "name", common.MetricSparkApplicationFailureCount, "labels", labels) + return + } + + logger.V(1).Info("Increasing SparkApplication failure count", "name", app.Name, "namespace", app.Namespace) + counter.Inc() +} + +func (m *SparkApplicationMetrics) getMetricLabels(app *v1beta2.SparkApplication) map[string]string { + // Convert spark application validLabels to valid metric validLabels. + validLabels := make(map[string]string) + for key, val := range app.Labels { + newKey := util.CreateValidMetricNameLabel(m.prefix, key) + validLabels[newKey] = val + } + + metricLabels := make(map[string]string) + for _, label := range m.labels { + if _, ok := validLabels[label]; ok { + metricLabels[label] = validLabels[label] + } else if label == "namespace" { + metricLabels[label] = app.Namespace + } else { + metricLabels[label] = "Unknown" + } + } + return metricLabels } diff --git a/internal/metrics/sparkpod_metrics.go b/internal/metrics/sparkpod_metrics.go index 29e03ba0e..4e3c1803d 100644 --- a/internal/metrics/sparkpod_metrics.go +++ b/internal/metrics/sparkpod_metrics.go @@ -19,6 +19,7 @@ package metrics import ( "github.com/prometheus/client_golang/prometheus" corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/metrics" "github.com/kubeflow/spark-operator/api/v1beta2" "github.com/kubeflow/spark-operator/pkg/common" @@ -26,18 +27,25 @@ import ( ) type SparkExecutorMetrics struct { + prefix string + labels []string + runningCount *prometheus.GaugeVec successCount *prometheus.CounterVec failureCount *prometheus.CounterVec } func NewSparkExecutorMetrics(prefix string, labels []string) *SparkExecutorMetrics { - validLabels := make([]string, len(labels)) - for i, label := range labels { - validLabels[i] = util.CreateValidMetricNameLabel("", label) + validLabels := make([]string, 0, len(labels)) + for _, label := range labels { + validLabel := util.CreateValidMetricNameLabel("", label) + validLabels = append(validLabels, validLabel) } return &SparkExecutorMetrics{ + prefix: prefix, + labels: labels, + runningCount: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkExecutorRunningCount), @@ -63,19 +71,23 @@ func NewSparkExecutorMetrics(prefix string, labels []string) *SparkExecutorMetri } func (m *SparkExecutorMetrics) Register() { - if err := prometheus.Register(m.runningCount); err != nil { + if err := metrics.Registry.Register(m.runningCount); err != nil { logger.Error(err, "Failed to register spark executor metric", "name", common.MetricSparkExecutorRunningCount) } - if err := prometheus.Register(m.successCount); err != nil { + if err := metrics.Registry.Register(m.successCount); err != nil { logger.Error(err, "Failed to register spark executor metric", "name", common.MetricSparkExecutorSuccessCount) } - if err := prometheus.Register(m.failureCount); err != nil { + if err := metrics.Registry.Register(m.failureCount); err != nil { logger.Error(err, "Failed to register spark executor metric", "name", common.MetricSparkExecutorFailureCount) } } func (m *SparkExecutorMetrics) HandleSparkExecutorCreate(pod *corev1.Pod) { - // app := util.GetAppName(pod) + state := util.GetExecutorState(pod) + switch state { + case v1beta2.ExecutorStateRunning: + m.incRunningCount(pod) + } } func (m *SparkExecutorMetrics) HandleSparkExecutorUpdate(oldPod, newPod *corev1.Pod) { @@ -85,40 +97,95 @@ func (m *SparkExecutorMetrics) HandleSparkExecutorUpdate(oldPod, newPod *corev1. return } - labels := m.getMetricLabels(newPod) - runningCount, err := m.runningCount.GetMetricWith(labels) - if err != nil { - logger.Error(err, "Failed to get metric", "name", common.MetricSparkExecutorRunningCount, "labels", labels) - return - } - switch oldState { - case v1beta2.ExecutorStatePending: case v1beta2.ExecutorStateRunning: - runningCount.Dec() - case v1beta2.ExecutorStateCompleted: - runningCount.Dec() - case v1beta2.ExecutorStateFailed: - runningCount.Dec() - case v1beta2.ExecutorStateUnknown: + m.decRunningCount(oldPod) } switch newState { - case v1beta2.ExecutorStatePending: case v1beta2.ExecutorStateRunning: - runningCount.Inc() + m.incRunningCount(newPod) case v1beta2.ExecutorStateCompleted: - runningCount.Dec() + m.incSuccessCount(newPod) case v1beta2.ExecutorStateFailed: - runningCount.Dec() - case v1beta2.ExecutorStateUnknown: + m.incFailureCount(newPod) } } func (m *SparkExecutorMetrics) HandleSparkExecutorDelete(pod *corev1.Pod) { + state := util.GetExecutorState(pod) + + switch state { + case v1beta2.ExecutorStateRunning: + m.decRunningCount(pod) + } +} + +func (m *SparkExecutorMetrics) incRunningCount(pod *corev1.Pod) { + labels := m.getMetricLabels(pod) + runningCount, err := m.runningCount.GetMetricWith(labels) + if err != nil { + logger.Error(err, "Failed to get metric", "name", common.MetricSparkExecutorRunningCount, "labels", labels) + return + } + + logger.V(1).Info("Increasing Spark executor running count", "name", pod.Name, "namespace", pod.Namespace) + runningCount.Inc() } -func (m *SparkExecutorMetrics) getMetricLabels(_ *corev1.Pod) map[string]string { - labels := make(map[string]string) - return labels +func (m *SparkExecutorMetrics) decRunningCount(pod *corev1.Pod) { + labels := m.getMetricLabels(pod) + runningCount, err := m.runningCount.GetMetricWith(labels) + if err != nil { + logger.Error(err, "Failed to get metric", "name", common.MetricSparkExecutorRunningCount, "labels", labels) + return + } + + logger.V(1).Info("Decreasing Spark executor running count", "name", pod.Name, "namespace", pod.Namespace) + runningCount.Dec() +} + +func (m *SparkExecutorMetrics) incSuccessCount(pod *corev1.Pod) { + labels := m.getMetricLabels(pod) + successCount, err := m.successCount.GetMetricWith(labels) + if err != nil { + logger.Error(err, "Failed to get metric", "name", common.MetricSparkExecutorSuccessCount, "labels", labels) + return + } + + logger.V(1).Info("Increasing Spark executor success count", "name", pod.Name, "namespace", pod.Namespace) + successCount.Inc() +} + +func (m *SparkExecutorMetrics) incFailureCount(pod *corev1.Pod) { + labels := m.getMetricLabels(pod) + failureCount, err := m.failureCount.GetMetricWith(labels) + if err != nil { + logger.Error(err, "Failed to get metric", "name", common.MetricSparkExecutorFailureCount, "labels", labels) + return + } + + logger.V(1).Info("Increasing Spark executor running count", "name", pod.Name, "namespace", pod.Namespace) + failureCount.Inc() +} + +func (m *SparkExecutorMetrics) getMetricLabels(pod *corev1.Pod) map[string]string { + // Convert pod metricLabels to valid metric metricLabels. + validLabels := make(map[string]string) + for key, val := range pod.Labels { + newKey := util.CreateValidMetricNameLabel("", key) + validLabels[newKey] = val + } + + metricLabels := make(map[string]string) + for _, label := range m.labels { + if _, ok := validLabels[label]; ok { + metricLabels[label] = validLabels[label] + } else if label == "namespace" { + metricLabels[label] = pod.Namespace + } else { + metricLabels[label] = "Unknown" + } + } + return metricLabels } diff --git a/internal/scheduler/volcano/scheduler.go b/internal/scheduler/volcano/scheduler.go index c19069b80..a4f489571 100644 --- a/internal/scheduler/volcano/scheduler.go +++ b/internal/scheduler/volcano/scheduler.go @@ -35,7 +35,7 @@ import ( ) func init() { - scheduler.GetRegistry().Register(common.VolcanoSchedulerName, Factory) + _ = scheduler.GetRegistry().Register(common.VolcanoSchedulerName, Factory) } // Scheduler is a batch scheduler that uses Volcano to schedule Spark applications. @@ -92,7 +92,7 @@ func (s *Scheduler) Name() string { } // ShouldSchedule implements batchscheduler.Interface. -func (s *Scheduler) ShouldSchedule(app *v1beta2.SparkApplication) bool { +func (s *Scheduler) ShouldSchedule(_ *v1beta2.SparkApplication) bool { // There is no additional requirement for volcano scheduler return true } diff --git a/internal/webhook/scheduledsparkapplication_defaulter.go b/internal/webhook/scheduledsparkapplication_defaulter.go index 27d250eff..afdf4304e 100644 --- a/internal/webhook/scheduledsparkapplication_defaulter.go +++ b/internal/webhook/scheduledsparkapplication_defaulter.go @@ -40,7 +40,7 @@ var _ admission.CustomDefaulter = &ScheduledSparkApplicationDefaulter{} // Default implements admission.CustomDefaulter. func (d *ScheduledSparkApplicationDefaulter) Default(ctx context.Context, obj runtime.Object) error { - app, ok := obj.(*v1beta2.SparkApplication) + app, ok := obj.(*v1beta2.ScheduledSparkApplication) if !ok { return nil } diff --git a/internal/webhook/scheduledsparkapplication_validator.go b/internal/webhook/scheduledsparkapplication_validator.go index 154d97b6b..7fcef1a4f 100644 --- a/internal/webhook/scheduledsparkapplication_validator.go +++ b/internal/webhook/scheduledsparkapplication_validator.go @@ -41,7 +41,7 @@ var _ admission.CustomValidator = &ScheduledSparkApplicationValidator{} // ValidateCreate implements admission.CustomValidator. func (v *ScheduledSparkApplicationValidator) ValidateCreate(ctx context.Context, obj runtime.Object) (warnings admission.Warnings, err error) { - app, ok := obj.(*v1beta2.SparkApplication) + app, ok := obj.(*v1beta2.ScheduledSparkApplication) if !ok { return nil, nil } @@ -54,7 +54,7 @@ func (v *ScheduledSparkApplicationValidator) ValidateCreate(ctx context.Context, // ValidateUpdate implements admission.CustomValidator. func (v *ScheduledSparkApplicationValidator) ValidateUpdate(ctx context.Context, oldObj runtime.Object, newObj runtime.Object) (warnings admission.Warnings, err error) { - newApp, ok := newObj.(*v1beta2.SparkApplication) + newApp, ok := newObj.(*v1beta2.ScheduledSparkApplication) if !ok { return nil, nil } @@ -67,7 +67,7 @@ func (v *ScheduledSparkApplicationValidator) ValidateUpdate(ctx context.Context, // ValidateDelete implements admission.CustomValidator. func (v *ScheduledSparkApplicationValidator) ValidateDelete(ctx context.Context, obj runtime.Object) (warnings admission.Warnings, err error) { - app, ok := obj.(*v1beta2.SparkApplication) + app, ok := obj.(*v1beta2.ScheduledSparkApplication) if !ok { return nil, nil } @@ -75,6 +75,7 @@ func (v *ScheduledSparkApplicationValidator) ValidateDelete(ctx context.Context, return nil, nil } -func (v *ScheduledSparkApplicationValidator) validate(_ *v1beta2.SparkApplication) error { +func (v *ScheduledSparkApplicationValidator) validate(_ *v1beta2.ScheduledSparkApplication) error { + // TODO: implement validate logic return nil } diff --git a/internal/webhook/sparkpod_defaulter.go b/internal/webhook/sparkpod_defaulter.go index 696746f6e..65beafe82 100644 --- a/internal/webhook/sparkpod_defaulter.go +++ b/internal/webhook/sparkpod_defaulter.go @@ -247,9 +247,19 @@ func addSparkConfigMap(pod *corev1.Pod, app *v1beta2.SparkApplication) error { if app.Spec.SparkConfigMap == nil { return nil } - addConfigMapVolume(pod, *app.Spec.SparkConfigMap, common.SparkConfigMapVolumeName) - addConfigMapVolumeMount(pod, common.SparkConfigMapVolumeName, common.DefaultSparkConfDir) - addEnvironmentVariable(pod, common.EnvSparkConfDir, common.DefaultSparkConfDir) + + if err := addConfigMapVolume(pod, *app.Spec.SparkConfigMap, common.SparkConfigMapVolumeName); err != nil { + return err + } + + if err := addConfigMapVolumeMount(pod, common.SparkConfigMapVolumeName, common.DefaultSparkConfDir); err != nil { + return err + } + + if err := addEnvironmentVariable(pod, common.EnvSparkConfDir, common.DefaultSparkConfDir); err != nil { + return err + } + return nil } @@ -257,9 +267,19 @@ func addHadoopConfigMap(pod *corev1.Pod, app *v1beta2.SparkApplication) error { if app.Spec.HadoopConfigMap == nil { return nil } - addConfigMapVolume(pod, *app.Spec.HadoopConfigMap, common.HadoopConfigMapVolumeName) - addConfigMapVolumeMount(pod, common.HadoopConfigMapVolumeName, common.DefaultHadoopConfDir) - addEnvironmentVariable(pod, common.EnvHadoopConfDir, common.DefaultHadoopConfDir) + + if err := addConfigMapVolume(pod, *app.Spec.HadoopConfigMap, common.HadoopConfigMapVolumeName); err != nil { + return err + } + + if err := addConfigMapVolumeMount(pod, common.HadoopConfigMapVolumeName, common.DefaultHadoopConfDir); err != nil { + return err + } + + if err := addEnvironmentVariable(pod, common.EnvHadoopConfDir, common.DefaultHadoopConfDir); err != nil { + return err + } + return nil } diff --git a/pkg/certificate/certificate_test.go b/pkg/certificate/certificate_test.go index c86941700..b09d196e8 100644 --- a/pkg/certificate/certificate_test.go +++ b/pkg/certificate/certificate_test.go @@ -41,6 +41,8 @@ var _ = Describe("Certificate Provider", func() { By("Creating a new cert provider") cp = certificate.NewProvider(k8sClient, secretName, secretNamespace) Expect(cp).NotTo(BeNil()) + + By("Generating new certificates") Expect(cp.Generate()).To(Succeed()) }) diff --git a/pkg/common/prometheus.go b/pkg/common/prometheus.go index 8b08b2fe7..2e141f327 100644 --- a/pkg/common/prometheus.go +++ b/pkg/common/prometheus.go @@ -19,6 +19,7 @@ package common const ( // PrometheusConfigMapNameSuffix is the name prefix of the Prometheus ConfigMap. PrometheusConfigMapNameSuffix = "prom-conf" + // PrometheusConfigMapMountPath is the mount path of the Prometheus ConfigMap. PrometheusConfigMapMountPath = "/etc/metrics/conf" ) diff --git a/pkg/common/spark.go b/pkg/common/spark.go index 13439fd69..c0ec41a46 100644 --- a/pkg/common/spark.go +++ b/pkg/common/spark.go @@ -16,6 +16,246 @@ limitations under the License. package common +// Spark properties. +const ( + // SparkAppName is the configuration property for application name. + SparkAppName = "spark.app.name" + + SparkDriverCores = "spark.driver.cores" + + SparkDriverMemory = "spark.driver.memory" + + SparkDriverMemoryOverhead = "spark.driver.memoryOverhead" + + SparkExecutorInstances = "spark.executor.instances" + + SparkExecutorCores = "spark.executor.cores" + + SparkExecutorMemory = "spark.executor.memory" + + SparkExecutorMemoryOverhead = "spark.executor.memoryOverhead" + + SparkUIProxyBase = "spark.ui.proxyBase" + + SparkUIProxyRedirectURI = "spark.ui.proxyRedirectUri" +) + +// Spark on Kubernetes properties. +const ( + + // SparkKubernetesDriverMaster is the Spark configuration key for specifying the Kubernetes master the driver use + // to manage executor pods and other Kubernetes resources. + SparkKubernetesDriverMaster = "spark.kubernetes.driver.master" + + // SparkKubernetesNamespace is the configuration property for application namespace. + SparkKubernetesNamespace = "spark.kubernetes.namespace" + + // SparkKubernetesContainerImage is the configuration property for specifying the unified container image. + SparkKubernetesContainerImage = "spark.kubernetes.container.image" + + // SparkKubernetesContainerImagePullPolicy is the configuration property for specifying the container image pull policy. + SparkKubernetesContainerImagePullPolicy = "spark.kubernetes.container.image.pullPolicy" + + // SparkKubernetesContainerImagePullSecrets is the configuration property for specifying the comma-separated list of image-pull + // secrets. + SparkKubernetesContainerImagePullSecrets = "spark.kubernetes.container.image.pullSecrets" + + SparkKubernetesAllocationBatchSize = "spark.kubernetes.allocation.batch.size" + + SparkKubernetesAllocationBatchDelay = "spark.kubernetes.allocation.batch.delay" + + // SparkKubernetesAuthenticateDriverServiceAccountName is the Spark configuration key for specifying name of the Kubernetes service + // account used by the driver pod. + SparkKubernetesAuthenticateDriverServiceAccountName = "spark.kubernetes.authenticate.driver.serviceAccountName" + + // account used by the executor pod. + SparkKubernetesAuthenticateExecutorServiceAccountName = "spark.kubernetes.authenticate.executor.serviceAccountName" + + // SparkKubernetesDriverLabelPrefix is the Spark configuration key prefix for labels on the driver Pod. + SparkKubernetesDriverLabelPrefix = "spark.kubernetes.driver.label." + SparkKubernetesDriverLabelTemplate = "spark.kubernetes.driver.label.%s" + + // SparkKubernetesDriverAnnotationPrefix is the Spark configuration key prefix for annotations on the driver Pod. + SparkKubernetesDriverAnnotationPrefix = "spark.kubernetes.driver.annotation." + SparkKubernetesDriverAnnotationTemplate = "spark.kubernetes.driver.annotation.%s" + + // SparkKubernetesDriverServiceLabelPrefix is the key prefix of annotations to be added to the driver service. + SparkKubernetesDriverServiceLabelPrefix = "spark.kubernetes.driver.service.label." + SparkKubernetesDriverServiceLabelTemplate = "spark.kubernetes.driver.service.label.%s" + + // SparkKubernetesDriverServiceAnnotationPrefix is the key prefix of annotations to be added to the driver service. + SparkKubernetesDriverServiceAnnotationPrefix = "spark.kubernetes.driver.service.annotation." + SparkKubernetesDriverServiceAnnotationTemplate = "spark.kubernetes.driver.service.annotation.%s" + + // SparkKubernetesExecutorLabelPrefix is the Spark configuration key prefix for labels on the executor Pods. + SparkKubernetesExecutorLabelPrefix = "spark.kubernetes.executor.label." + SparkKubernetesExecutorLabelTemplate = "spark.kubernetes.executor.label.%s" + + // SparkKubernetesExecutorAnnotationPrefix is the Spark configuration key prefix for annotations on the executor Pods. + SparkKubernetesExecutorAnnotationPrefix = "spark.kubernetes.executor.annotation." + SparkKubernetesExecutorAnnotationTemplate = "spark.kubernetes.executor.annotation.%s" + + // SparkKubernetesDriverPodName is the Spark configuration key for driver pod name. + SparkKubernetesDriverPodName = "spark.kubernetes.driver.pod.name" + + SparkKubernetesExecutorPodNamePrefix = "spark.kubernetes.executor.podNamePrefix" + + // SparkKubernetesDriverRequestCores is the configuration property for specifying the physical CPU request for the driver. + SparkKubernetesDriverRequestCores = "spark.kubernetes.driver.request.cores" + + // SparkKubernetesDriverLimitCores is the configuration property for specifying the hard CPU limit for the driver pod. + SparkKubernetesDriverLimitCores = "spark.kubernetes.driver.limit.cores" + + // SparkKubernetesExecutorRequestCores is the configuration property for specifying the physical CPU request for executors. + SparkKubernetesExecutorRequestCores = "spark.kubernetes.executor.request.cores" + + // SparkKubernetesExecutorLimitCores is the configuration property for specifying the hard CPU limit for the executor pods. + SparkKubernetesExecutorLimitCores = "spark.kubernetes.executor.limit.cores" + + // SparkKubernetesNodeSelectorPrefix is the configuration property prefix for specifying node selector for the pods. + SparkKubernetesNodeSelectorTemplate = "spark.kubernetes.node.selector.%s" + + SparkKubernetesDriverNodeSelectorTemplate = "spark.kubernetes.driver.node.selector.%s" + + SparkKubernetesExecutorNodeSelectorTemplate = "spark.kubernetes.executor.node.selector.%s" + + // SparkKubernetesDriverEnvPrefix is the Spark configuration prefix for setting environment variables + // into the driver. + SparkKubernetesDriverEnvPrefix = "spark.kubernetes.driverEnv." + SparkKubernetesDriverEnvTemplate = "spark.kubernetes.driverEnv.%s" + + // SparkKubernetesDriverSecretsPrefix is the configuration property prefix for specifying secrets to be mounted into the + // driver. + SparkKubernetesDriverSecretsPrefix = "spark.kubernetes.driver.secrets." + SparkKubernetesDriverSecretsTemplate = "spark.kubernetes.driver.secrets.%s" + + // SparkKubernetesExecutorSecretsPrefix is the configuration property prefix for specifying secrets to be mounted into the + // executors. + SparkKubernetesExecutorSecretsPrefix = "spark.kubernetes.executor.secrets." + SparkKubernetesExecutorSecretsTemplate = "spark.kubernetes.executor.secrets.%s" + + // SparkKubernetesDriverSecretKeyRefPrefix is the configuration property prefix for specifying environment variables + // from SecretKeyRefs for the driver. + SparkKubernetesDriverSecretKeyRefPrefix = "spark.kubernetes.driver.secretKeyRef." + SparkKubernetesDriverSecretKeyRefTemplate = "spark.kubernetes.driver.secretKeyRef.%s" + + // SparkKubernetesExecutorSecretKeyRefPrefix is the configuration property prefix for specifying environment variables + // from SecretKeyRefs for the executors. + SparkKubernetesExecutorSecretKeyRefPrefix = "spark.kubernetes.executor.secretKeyRef." + SparkKubernetesExecutorSecretKeyRefTemplate = "spark.kubernetes.executor.secretKeyRef.%s" + + // SparkKubernetesDriverContainerImage is the configuration property for specifying a custom driver container image. + SparkKubernetesDriverContainerImage = "spark.kubernetes.driver.container.image" + + // SparkKubernetesExecutorContainerImage is the configuration property for specifying a custom executor container image. + SparkKubernetesExecutorContainerImage = "spark.kubernetes.executor.container.image" + + // SparkKubernetesDriverVolumesPrefix is the Spark volumes configuration for mounting a volume into the driver pod. + SparkKubernetesDriverVolumesPrefix = "spark.kubernetes.driver.volumes." + SparkKubernetesDriverVolumesMountPathTemplate = "spark.kubernetes.driver.volumes.%s.%s.mount.path" + SparkKubernetesDriverVolumesMountSubPathTemplate = "spark.kubernetes.driver.volumes.%s.%s.mount.subPath" + SparkKubernetesDriverVolumesMountReadOnlyTemplate = "spark.kubernetes.driver.volumes.%s.%s.mount.readOnly" + SparkKubernetesDriverVolumesOptionsTemplate = "spark.kubernetes.driver.volumes.%s.%s.options.%s" + + // SparkKubernetesExecutorVolumesPrefix is the Spark volumes configuration for mounting a volume into the driver pod. + SparkKubernetesExecutorVolumesPrefix = "spark.kubernetes.executor.volumes." + SparkKubernetesExecutorVolumesMountPathTemplate = "spark.kubernetes.executor.volumes.%s.%s.mount.path" + SparkKubernetesExecutorVolumesMountSubPathTemplate = "spark.kubernetes.executor.volumes.%s.%s.mount.subPath" + SparkKubernetesExecutorVolumesMountReadOnlyTemplate = "spark.kubernetes.executor.volumes.%s.%s.mount.readOnly" + SparkKubernetesExecutorVolumesOptionsTemplate = "spark.kubernetes.executor.volumes.%s.%s.options.%s" + + // SparkKubernetesMemoryOverheadFactor is the Spark configuration key for specifying memory overhead factor used for Non-JVM memory. + SparkKubernetesMemoryOverheadFactor = "spark.kubernetes.memoryOverheadFactor" + + // SparkKubernetesPysparkPythonVersion is the Spark configuration key for specifying python version used. + SparkKubernetesPysparkPythonVersion = "spark.kubernetes.pyspark.pythonVersion" + + SparkKubernetesDriverPodTemplateFile = "spark.kubernetes.driver.podTemplateFile" + + SparkKubernetesDriverPodTemplateContainerName = "spark.kubernetes.driver.podTemplateContainerName" + + SparkKubernetesExecutorPodTemplateFile = "spark.kubernetes.executor.podTemplateFile" + + SparkKubernetesExecutorPodTemplateContainerName = "spark.kubernetes.executor.podTemplateContainerName" + + SparkKubernetesDriverSchedulerName = "spark.kubernetes.driver.schedulerName" + + SparkKubernetesExecutorSchedulerName = "spark.kubernetes.executor.schedulerName" + + // SparkExecutorEnvVarConfigKeyPrefix is the Spark configuration prefix for setting environment variables + // into the executor. + SparkExecutorEnvVarConfigKeyPrefix = "spark.executorEnv." + + // SparkKubernetesInitContainerImage is the Spark configuration key for specifying a custom init-container image. + SparkKubernetesInitContainerImage = "spark.kubernetes.initContainer.image" + + // SparkKubernetesMountDependenciesJarsDownloadDir is the Spark configuration key for specifying the download path in the driver and + // executors for remote jars. + SparkKubernetesMountDependenciesJarsDownloadDir = "spark.kubernetes.mountDependencies.jarsDownloadDir" + + // SparkKubernetesMountDependenciesFilesDownloadDir is the Spark configuration key for specifying the download path in the driver and + // executors for remote files. + SparkKubernetesMountDependenciesFilesDownloadDir = "spark.kubernetes.mountDependencies.filesDownloadDir" + + // SparkKubernetesMountDependenciesTimeout is the Spark configuration key for specifying the timeout in seconds of downloading + // remote dependencies. + SparkKubernetesMountDependenciesTimeout = "spark.kubernetes.mountDependencies.timeout" + + // SparkKubernetesMountDependenciesMaxSimultaneousDownloads is the Spark configuration key for specifying the maximum number of remote + // dependencies to download. + SparkKubernetesMountDependenciesMaxSimultaneousDownloads = "spark.kubernetes.mountDependencies.maxSimultaneousDownloads" + + // SparkKubernetesSubmissionWaitAppCompletion is the Spark configuration key for specifying whether to wait for application to complete. + SparkKubernetesSubmissionWaitAppCompletion = "spark.kubernetes.submission.waitAppCompletion" + + // SparkDriverExtraJavaOptions is the Spark configuration key for a string of extra JVM options to pass to driver. + SparkDriverExtraJavaOptions = "spark.driver.extraJavaOptions" + + // SparkExecutorExtraJavaOptions is the Spark configuration key for a string of extra JVM options to pass to executors. + SparkExecutorExtraJavaOptions = "spark.executor.extraJavaOptions" + + // SparkKubernetesExecutorDeleteOnTermination is the Spark configuration for specifying whether executor pods should be deleted in case of failure or normal termination. + SparkKubernetesExecutorDeleteOnTermination = "spark.kubernetes.executor.deleteOnTermination" +) + +// Dynamic allocation properties. +// Ref: https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation +const ( + // SparkDynamicAllocationEnabled is the Spark configuration key for specifying if dynamic + // allocation is enabled or not. + SparkDynamicAllocationEnabled = "spark.dynamicAllocation.enabled" + + SparkDynamicAllocationExecutorIdleTimeout = "spark.dynamicAllocation.executorIdleTimeout" + + SparkDynamicAllocationCachedExecutorIdleTimeout = "spark.dynamicAllocation.cachedExecutorIdleTimeout" + + // SparkDynamicAllocationInitialExecutors is the Spark configuration key for specifying + // the initial number of executors to request if dynamic allocation is enabled. + SparkDynamicAllocationInitialExecutors = "spark.dynamicAllocation.initialExecutors" + + // SparkDynamicAllocationMaxExecutors is the Spark configuration key for specifying the + // upper bound of the number of executors to request if dynamic allocation is enabled. + SparkDynamicAllocationMaxExecutors = "spark.dynamicAllocation.maxExecutors" + + // SparkDynamicAllocationMinExecutors is the Spark configuration key for specifying the + // lower bound of the number of executors to request if dynamic allocation is enabled. + SparkDynamicAllocationMinExecutors = "spark.dynamicAllocation.minExecutors" + + SparkDynamicAllocationExecutorAllocationRatio = "spark.dynamicAllocation.executorAllocationRatio" + + SparkDynamicAllocationSchedulerBacklogTimeout = "spark.dynamicAllocation.schedulerBacklogTimeout" + + SparkDynamicAllocationSustainedSchedulerBacklogTimeout = "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout" + + // SparkDynamicAllocationShuffleTrackingEnabled is the Spark configuration key for + // specifying if shuffle data tracking is enabled. + SparkDynamicAllocationShuffleTrackingEnabled = "spark.dynamicAllocation.shuffleTracking.enabled" + + // SparkDynamicAllocationShuffleTrackingTimeout is the Spark configuration key for specifying + // the shuffle tracking timeout in milliseconds if shuffle tracking is enabled. + SparkDynamicAllocationShuffleTrackingTimeout = "spark.dynamicAllocation.shuffleTracking.timeout" +) + const ( // SparkRoleDriver is the value of the spark-role label for the driver. SparkRoleDriver = "driver" diff --git a/pkg/common/spark_properties.go b/pkg/common/spark_properties.go deleted file mode 100644 index 4ac2743ce..000000000 --- a/pkg/common/spark_properties.go +++ /dev/null @@ -1,271 +0,0 @@ -/* -Copyright 2024 The Kubeflow authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package common - -// Spark properties. -const ( - // SparkAppName is the configuration property for application name. - SparkAppName = "spark.app.name" - - SparkDriverCores = "spark.driver.cores" - - SparkDriverMemory = "spark.driver.memory" - - SparkDriverMemoryOverhead = "spark.driver.memoryOverhead" - - SparkExecutorInstances = "spark.executor.instances" - - SparkExecutorCores = "spark.executor.cores" - - SparkExecutorMemory = "spark.executor.memory" - - SparkExecutorMemoryOverhead = "spark.executor.memoryOverhead" - - SparkUIProxyBase = "spark.ui.proxyBase" - - SparkUIProxyRedirectURI = "spark.ui.proxyRedirectUri" -) - -// Spark on Kubernetes properties. -const ( - - // SparkKubernetesDriverMaster is the Spark configuration key for specifying the Kubernetes master the driver use - // to manage executor pods and other Kubernetes resources. - SparkKubernetesDriverMaster = "spark.kubernetes.driver.master" - - // SparkKubernetesNamespace is the configuration property for application namespace. - SparkKubernetesNamespace = "spark.kubernetes.namespace" - - // SparkKubernetesContainerImage is the configuration property for specifying the unified container image. - SparkKubernetesContainerImage = "spark.kubernetes.container.image" - - // SparkKubernetesContainerImagePullPolicy is the configuration property for specifying the container image pull policy. - SparkKubernetesContainerImagePullPolicy = "spark.kubernetes.container.image.pullPolicy" - - // SparkKubernetesContainerImagePullSecrets is the configuration property for specifying the comma-separated list of image-pull - // secrets. - SparkKubernetesContainerImagePullSecrets = "spark.kubernetes.container.image.pullSecrets" - - SparkKubernetesAllocationBatchSize = "spark.kubernetes.allocation.batch.size" - - SparkKubernetesAllocationBatchDelay = "spark.kubernetes.allocation.batch.delay" - - // SparkKubernetesAuthenticateDriverServiceAccountName is the Spark configuration key for specifying name of the Kubernetes service - // account used by the driver pod. - SparkKubernetesAuthenticateDriverServiceAccountName = "spark.kubernetes.authenticate.driver.serviceAccountName" - - // account used by the executor pod. - SparkKubernetesAuthenticateExecutorServiceAccountName = "spark.kubernetes.authenticate.executor.serviceAccountName" - - // SparkKubernetesDriverLabelPrefix is the Spark configuration key prefix for labels on the driver Pod. - SparkKubernetesDriverLabelPrefix = "spark.kubernetes.driver.label." - SparkKubernetesDriverLabelTemplate = "spark.kubernetes.driver.label.%s" - - // SparkKubernetesDriverAnnotationPrefix is the Spark configuration key prefix for annotations on the driver Pod. - SparkKubernetesDriverAnnotationPrefix = "spark.kubernetes.driver.annotation." - SparkKubernetesDriverAnnotationTemplate = "spark.kubernetes.driver.annotation.%s" - - // SparkKubernetesDriverServiceLabelPrefix is the key prefix of annotations to be added to the driver service. - SparkKubernetesDriverServiceLabelPrefix = "spark.kubernetes.driver.service.label." - SparkKubernetesDriverServiceLabelTemplate = "spark.kubernetes.driver.service.label.%s" - - // SparkKubernetesDriverServiceAnnotationPrefix is the key prefix of annotations to be added to the driver service. - SparkKubernetesDriverServiceAnnotationPrefix = "spark.kubernetes.driver.service.annotation." - SparkKubernetesDriverServiceAnnotationTemplate = "spark.kubernetes.driver.service.annotation.%s" - - // SparkKubernetesExecutorLabelPrefix is the Spark configuration key prefix for labels on the executor Pods. - SparkKubernetesExecutorLabelPrefix = "spark.kubernetes.executor.label." - SparkKubernetesExecutorLabelTemplate = "spark.kubernetes.executor.label.%s" - - // SparkKubernetesExecutorAnnotationPrefix is the Spark configuration key prefix for annotations on the executor Pods. - SparkKubernetesExecutorAnnotationPrefix = "spark.kubernetes.executor.annotation." - SparkKubernetesExecutorAnnotationTemplate = "spark.kubernetes.executor.annotation.%s" - - // SparkKubernetesDriverPodName is the Spark configuration key for driver pod name. - SparkKubernetesDriverPodName = "spark.kubernetes.driver.pod.name" - - SparkKubernetesExecutorPodNamePrefix = "spark.kubernetes.executor.podNamePrefix" - - // SparkKubernetesDriverRequestCores is the configuration property for specifying the physical CPU request for the driver. - SparkKubernetesDriverRequestCores = "spark.kubernetes.driver.request.cores" - - // SparkKubernetesDriverLimitCores is the configuration property for specifying the hard CPU limit for the driver pod. - SparkKubernetesDriverLimitCores = "spark.kubernetes.driver.limit.cores" - - // SparkKubernetesExecutorRequestCores is the configuration property for specifying the physical CPU request for executors. - SparkKubernetesExecutorRequestCores = "spark.kubernetes.executor.request.cores" - - // SparkKubernetesExecutorLimitCores is the configuration property for specifying the hard CPU limit for the executor pods. - SparkKubernetesExecutorLimitCores = "spark.kubernetes.executor.limit.cores" - - // SparkKubernetesNodeSelectorPrefix is the configuration property prefix for specifying node selector for the pods. - SparkKubernetesNodeSelectorPrefix = "spark.kubernetes.node.selector." - SparkKubernetesNodeSelectorTemplate = "spark.kubernetes.node.selector.%s" - - SparkKubernetesDriverNodeSelectorPrefix = "spark.kubernetes.driver.node.selector." - SparkKubernetesDriverNodeSelectorTemplate = "spark.kubernetes.driver.node.selector.%s" - - SparkKubernetesExecutorNodeSelectorPrefix = "spark.kubernetes.executor.node.selector." - SparkKubernetesExecutorNodeSelectorTemplate = "spark.kubernetes.executor.node.selector.%s" - - // SparkKubernetesDriverEnvPrefix is the Spark configuration prefix for setting environment variables - // into the driver. - SparkKubernetesDriverEnvPrefix = "spark.kubernetes.driverEnv." - SparkKubernetesDriverEnvTemplate = "spark.kubernetes.driverEnv.%s" - - // SparkKubernetesDriverSecretsPrefix is the configuration property prefix for specifying secrets to be mounted into the - // driver. - SparkKubernetesDriverSecretsPrefix = "spark.kubernetes.driver.secrets." - SparkKubernetesDriverSecretsTemplate = "spark.kubernetes.driver.secrets.%s" - - // SparkKubernetesExecutorSecretsPrefix is the configuration property prefix for specifying secrets to be mounted into the - // executors. - SparkKubernetesExecutorSecretsPrefix = "spark.kubernetes.executor.secrets." - SparkKubernetesExecutorSecretsTemplate = "spark.kubernetes.executor.secrets.%s" - - // SparkKubernetesDriverSecretKeyRefPrefix is the configuration property prefix for specifying environment variables - // from SecretKeyRefs for the driver. - SparkKubernetesDriverSecretKeyRefPrefix = "spark.kubernetes.driver.secretKeyRef." - SparkKubernetesDriverSecretKeyRefTemplate = "spark.kubernetes.driver.secretKeyRef.%s" - - // SparkKubernetesExecutorSecretKeyRefPrefix is the configuration property prefix for specifying environment variables - // from SecretKeyRefs for the executors. - SparkKubernetesExecutorSecretKeyRefPrefix = "spark.kubernetes.executor.secretKeyRef." - SparkKubernetesExecutorSecretKeyRefTemplate = "spark.kubernetes.executor.secretKeyRef.%s" - - // SparkKubernetesDriverContainerImage is the configuration property for specifying a custom driver container image. - SparkKubernetesDriverContainerImage = "spark.kubernetes.driver.container.image" - - // SparkKubernetesExecutorContainerImage is the configuration property for specifying a custom executor container image. - SparkKubernetesExecutorContainerImage = "spark.kubernetes.executor.container.image" - - // SparkKubernetesDriverVolumesPrefix is the Spark volumes configuration for mounting a volume into the driver pod. - SparkKubernetesDriverVolumesPrefix = "spark.kubernetes.driver.volumes." - SparkKubernetesDriverVolumesMountPathTemplate = "spark.kubernetes.driver.volumes.%s.%s.mount.path" - SparkKubernetesDriverVolumesMountSubPathTemplate = "spark.kubernetes.driver.volumes.%s.%s.mount.subPath" - SparkKubernetesDriverVolumesMountReadOnlyTemplate = "spark.kubernetes.driver.volumes.%s.%s.mount.readOnly" - SparkKubernetesDriverVolumesOptionsTemplate = "spark.kubernetes.driver.volumes.%s.%s.options.%s" - - // SparkKubernetesExecutorVolumesPrefix is the Spark volumes configuration for mounting a volume into the driver pod. - SparkKubernetesExecutorVolumesPrefix = "spark.kubernetes.executor.volumes." - SparkKubernetesExecutorVolumesMountPathTemplate = "spark.kubernetes.executor.volumes.%s.%s.mount.path" - SparkKubernetesExecutorVolumesMountSubPathTemplate = "spark.kubernetes.executor.volumes.%s.%s.mount.subPath" - SparkKubernetesExecutorVolumesMountReadOnlyTemplate = "spark.kubernetes.executor.volumes.%s.%s.mount.readOnly" - SparkKubernetesExecutorVolumesOptionsTemplate = "spark.kubernetes.executor.volumes.%s.%s.options.%s" - - // SparkKubernetesMemoryOverheadFactor is the Spark configuration key for specifying memory overhead factor used for Non-JVM memory. - SparkKubernetesMemoryOverheadFactor = "spark.kubernetes.memoryOverheadFactor" - - // SparkKubernetesPysparkPythonVersion is the Spark configuration key for specifying python version used. - SparkKubernetesPysparkPythonVersion = "spark.kubernetes.pyspark.pythonVersion" - - SparkKubernetesDriverPodTemplateFile = "spark.kubernetes.driver.podTemplateFile" - - SparkKubernetesDriverPodTemplateContainerName = "spark.kubernetes.driver.podTemplateContainerName" - - SparkKubernetesExecutorPodTemplateFile = "spark.kubernetes.executor.podTemplateFile" - - SparkKubernetesExecutorPodTemplateContainerName = "spark.kubernetes.executor.podTemplateContainerName" - - SparkKubernetesDriverSchedulerName = "spark.kubernetes.driver.schedulerName" - - SparkKubernetesExecutorSchedulerName = "spark.kubernetes.executor.schedulerName" - - // SparkExecutorEnvVarConfigKeyPrefix is the Spark configuration prefix for setting environment variables - // into the executor. - SparkExecutorEnvVarConfigKeyPrefix = "spark.executorEnv." - - // SparkKubernetesInitContainerImage is the Spark configuration key for specifying a custom init-container image. - SparkKubernetesInitContainerImage = "spark.kubernetes.initContainer.image" - - // SparkKubernetesMountDependenciesJarsDownloadDir is the Spark configuration key for specifying the download path in the driver and - // executors for remote jars. - SparkKubernetesMountDependenciesJarsDownloadDir = "spark.kubernetes.mountDependencies.jarsDownloadDir" - - // SparkKubernetesMountDependenciesFilesDownloadDir is the Spark configuration key for specifying the download path in the driver and - // executors for remote files. - SparkKubernetesMountDependenciesFilesDownloadDir = "spark.kubernetes.mountDependencies.filesDownloadDir" - - // SparkKubernetesMountDependenciesTimeout is the Spark configuration key for specifying the timeout in seconds of downloading - // remote dependencies. - SparkKubernetesMountDependenciesTimeout = "spark.kubernetes.mountDependencies.timeout" - - // SparkKubernetesMountDependenciesMaxSimultaneousDownloads is the Spark configuration key for specifying the maximum number of remote - // dependencies to download. - SparkKubernetesMountDependenciesMaxSimultaneousDownloads = "spark.kubernetes.mountDependencies.maxSimultaneousDownloads" - - // SparkKubernetesSubmissionWaitAppCompletion is the Spark configuration key for specifying whether to wait for application to complete. - SparkKubernetesSubmissionWaitAppCompletion = "spark.kubernetes.submission.waitAppCompletion" - - // SparkDriverExtraJavaOptions is the Spark configuration key for a string of extra JVM options to pass to driver. - SparkDriverExtraJavaOptions = "spark.driver.extraJavaOptions" - - // SparkExecutorExtraJavaOptions is the Spark configuration key for a string of extra JVM options to pass to executors. - SparkExecutorExtraJavaOptions = "spark.executor.extraJavaOptions" - - // SparkKubernetesExecutorDeleteOnTermination is the Spark configuration for specifying whether executor pods should be deleted in case of failure or normal termination. - SparkKubernetesExecutorDeleteOnTermination = "spark.kubernetes.executor.deleteOnTermination" -) - -// Dynamic allocation properties. -// Ref: https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation -const ( - // SparkDynamicAllocationEnabled is the Spark configuration key for specifying if dynamic - // allocation is enabled or not. - // @since 1.2.0 - SparkDynamicAllocationEnabled = "spark.dynamicAllocation.enabled" - - // @since 1.4.0 - SparkDynamicAllocationExecutorIdleTimeout = "spark.dynamicAllocation.executorIdleTimeout" - - // @since 1.4.0 - SparkDynamicAllocationCachedExecutorIdleTimeout = "spark.dynamicAllocation.cachedExecutorIdleTimeout" - - // SparkDynamicAllocationInitialExecutors is the Spark configuration key for specifying - // the initial number of executors to request if dynamic allocation is enabled. - // @since 1.3.0 - SparkDynamicAllocationInitialExecutors = "spark.dynamicAllocation.initialExecutors" - - // SparkDynamicAllocationMaxExecutors is the Spark configuration key for specifying the - // upper bound of the number of executors to request if dynamic allocation is enabled. - // @since 1.2.0 - SparkDynamicAllocationMaxExecutors = "spark.dynamicAllocation.maxExecutors" - - // SparkDynamicAllocationMinExecutors is the Spark configuration key for specifying the - // lower bound of the number of executors to request if dynamic allocation is enabled. - // @since 1.2.0 - SparkDynamicAllocationMinExecutors = "spark.dynamicAllocation.minExecutors" - - // @since 2.4.0 - SparkDynamicAllocationExecutorAllocationRatio = "spark.dynamicAllocation.executorAllocationRatio" - - // @since 1.2.0 - SparkDynamicAllocationSchedulerBacklogTimeout = "spark.dynamicAllocation.schedulerBacklogTimeout" - - // @since 1.2.0 - SparkDynamicAllocationSustainedSchedulerBacklogTimeout = "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout" - - // SparkDynamicAllocationShuffleTrackingEnabled is the Spark configuration key for - // specifying if shuffle data tracking is enabled. - // @since 3.0.0 - SparkDynamicAllocationShuffleTrackingEnabled = "spark.dynamicAllocation.shuffleTracking.enabled" - - // SparkDynamicAllocationShuffleTrackingTimeout is the Spark configuration key for specifying - // the shuffle tracking timeout in milliseconds if shuffle tracking is enabled. - // @since 3.0.0 - SparkDynamicAllocationShuffleTrackingTimeout = "spark.dynamicAllocation.shuffleTracking.timeout" -) diff --git a/pkg/util/capabilities.go b/pkg/util/capabilities.go index 5040da6e9..068bdcb4d 100644 --- a/pkg/util/capabilities.go +++ b/pkg/util/capabilities.go @@ -38,6 +38,19 @@ func (c Capabilities) String() string { return strings.Join(keys, ", ") } +var ( + IngressCapabilities Capabilities +) + +func InitializeIngressCapabilities(client kubernetes.Interface) (err error) { + if IngressCapabilities != nil { + return + } + + IngressCapabilities, err = getPreferredAvailableAPIs(client, "Ingress") + return +} + // getPreferredAvailableAPIs queries the cluster for the preferred resources information and returns a Capabilities // instance containing those api groups that support the specified kind. // @@ -70,15 +83,3 @@ func getPreferredAvailableAPIs(client kubernetes.Interface, kind string) (Capabi return caps, nil } - -var ( - IngressCapabilities Capabilities -) - -func InitializeIngressCapabilities(client kubernetes.Interface) (err error) { - if IngressCapabilities != nil { - return - } - IngressCapabilities, err = getPreferredAvailableAPIs(client, "Ingress") - return -} diff --git a/pkg/util/metrics.go b/pkg/util/metrics.go index d54e5e606..fe87508d9 100644 --- a/pkg/util/metrics.go +++ b/pkg/util/metrics.go @@ -18,117 +18,9 @@ package util import ( "strings" - "sync" - - "github.com/golang/glog" - "github.com/prometheus/client_golang/prometheus" - prometheusmodel "github.com/prometheus/client_model/go" ) func CreateValidMetricNameLabel(prefix, name string) string { // "-" is not a valid character for prometheus metric names or labels. return strings.Replace(prefix+name, "-", "_", -1) } - -// Best effort metric registration with Prometheus. -func RegisterMetric(metric prometheus.Collector) { - if err := prometheus.Register(metric); err != nil { - // Ignore AlreadyRegisteredError. - if _, ok := err.(prometheus.AlreadyRegisteredError); ok { - return - } - logger.Error(err, "Failed to register metric") - } -} - -// MetricConfig is a container of configuration properties for the collection and exporting of -// application metrics to Prometheus. -type MetricConfig struct { - MetricsEndpoint string - MetricsPort string - MetricsPrefix string - MetricsLabels []string - MetricsJobStartLatencyBuckets []float64 -} - -// A variant of Prometheus Gauge that only holds non-negative values. -type PositiveGauge struct { - mux sync.RWMutex - name string - gaugeMetric *prometheus.GaugeVec -} - -func NewPositiveGauge(name string, description string, labels []string) *PositiveGauge { - validLabels := make([]string, len(labels)) - for i, label := range labels { - validLabels[i] = CreateValidMetricNameLabel("", label) - } - - gauge := prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: name, - Help: description, - }, - validLabels, - ) - - return &PositiveGauge{ - gaugeMetric: gauge, - name: name, - } -} - -func fetchGaugeValue(m *prometheus.GaugeVec, labels map[string]string) float64 { - // Hack to get the current value of the metric to support PositiveGauge - pb := &prometheusmodel.Metric{} - - m.With(labels).Write(pb) - return pb.GetGauge().GetValue() -} - -func (p *PositiveGauge) Register() { - RegisterMetric(p.gaugeMetric) -} - -func (p *PositiveGauge) Value(labelMap map[string]string) float64 { - p.mux.RLock() - defer p.mux.RUnlock() - return fetchGaugeValue(p.gaugeMetric, labelMap) -} - -// Increment the Metric for the labels specified -func (p *PositiveGauge) Inc(labelMap map[string]string) { - p.mux.Lock() - defer p.mux.Unlock() - - if m, err := p.gaugeMetric.GetMetricWith(labelMap); err != nil { - glog.Errorf("Error while exporting metrics: %v", err) - } else { - glog.V(2).Infof("Incrementing %s with labels %s", p.name, labelMap) - m.Inc() - } -} - -// Decrement the metric only if its positive for the labels specified -func (p *PositiveGauge) Dec(labelMap map[string]string) { - p.mux.Lock() - defer p.mux.Unlock() - - // Decrement only if positive - val := fetchGaugeValue(p.gaugeMetric, labelMap) - if val > 0 { - glog.V(2).Infof("Decrementing %s with labels %s metricVal to %v", p.name, labelMap, val-1) - if m, err := p.gaugeMetric.GetMetricWith(labelMap); err != nil { - glog.Errorf("Error while exporting metrics: %v", err) - } else { - m.Dec() - } - } -} - -func FetchCounterValue(m *prometheus.CounterVec, labels map[string]string) float64 { - pb := &prometheusmodel.Metric{} - m.With(labels).Write(pb) - - return pb.GetCounter().GetValue() -} diff --git a/pkg/util/metrics_test.go b/pkg/util/metrics_test.go deleted file mode 100644 index cd175a0d3..000000000 --- a/pkg/util/metrics_test.go +++ /dev/null @@ -1,70 +0,0 @@ -/* -Copyright 2018 Google LLC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "sync" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/kubeflow/spark-operator/pkg/common" -) - -func TestPositiveGauge_EmptyLabels(t *testing.T) { - gauge := NewPositiveGauge("testGauge", "test-description", []string{}) - emptyMap := map[string]string{} - gauge.Dec(emptyMap) - assert.Equal(t, float64(0), fetchGaugeValue(gauge.gaugeMetric, emptyMap), common.Epsilon) - - gauge.Inc(emptyMap) - assert.InEpsilon(t, float64(1), fetchGaugeValue(gauge.gaugeMetric, emptyMap), common.Epsilon) - gauge.Dec(map[string]string{}) - assert.Equal(t, float64(0), fetchGaugeValue(gauge.gaugeMetric, emptyMap), common.Epsilon) -} - -func TestPositiveGauge_WithLabels(t *testing.T) { - gauge := NewPositiveGauge("testGauge1", "test-description-1", []string{"app_id"}) - app1 := map[string]string{"app_id": "test1"} - app2 := map[string]string{"app_id": "test2"} - - var wg sync.WaitGroup - wg.Add(2) - go func() { - for i := 0; i < 10; i++ { - gauge.Inc(app1) - } - for i := 0; i < 5; i++ { - gauge.Dec(app1) - } - wg.Done() - }() - go func() { - for i := 0; i < 5; i++ { - gauge.Inc(app2) - } - for i := 0; i < 10; i++ { - gauge.Dec(app2) - } - wg.Done() - }() - - wg.Wait() - assert.InEpsilon(t, float64(5), fetchGaugeValue(gauge.gaugeMetric, app1), common.Epsilon) - // Always Positive Gauge. - assert.Equal(t, float64(0), fetchGaugeValue(gauge.gaugeMetric, app2), common.Epsilon) -} diff --git a/pkg/util/sparkapplication.go b/pkg/util/sparkapplication.go index b56dc0dee..9e1ac832f 100644 --- a/pkg/util/sparkapplication.go +++ b/pkg/util/sparkapplication.go @@ -48,6 +48,10 @@ func GetDriverPodName(app *v1beta2.SparkApplication) string { return fmt.Sprintf("%s-driver", app.Name) } +func GetApplicationState(app *v1beta2.SparkApplication) v1beta2.ApplicationStateType { + return app.Status.AppState.State +} + // IsExpired returns whether the given SparkApplication is expired. func IsExpired(app *v1beta2.SparkApplication) bool { // The application has no TTL defined and will never expire. diff --git a/version.go b/version.go new file mode 100644 index 000000000..e08232a51 --- /dev/null +++ b/version.go @@ -0,0 +1,90 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sparkoperator + +import ( + "fmt" + "runtime" +) + +type VersionInfo struct { + Version string + BuildDate string + GitCommit string + GitTag string + GitTreeState string + GoVersion string + Compiler string + Platform string +} + +var ( + version = "0.0.0" // value from VERSION file + buildDate = "1970-01-01T00:00:00Z" // output from `date -u +'%Y-%m-%dT%H:%M:%SZ'` + gitCommit = "" // output from `git rev-parse HEAD` + gitTag = "" // output from `git describe --exact-match --tags HEAD` (if clean tree state) + gitTreeState = "" // determined from `git status --porcelain`. either 'clean' or 'dirty' +) + +func getVersion() VersionInfo { + var versionStr string + if gitCommit != "" && gitTag != "" && gitTreeState == "clean" { + // if we have a clean tree state and the current commit is tagged, + // this is an official release. + versionStr = gitTag + } else { + // otherwise formulate a query version string based on as much metadata + // information we have available. + versionStr = version + if len(gitCommit) >= 7 { + versionStr += "+" + gitCommit[0:7] + if gitTreeState != "clean" { + versionStr += ".dirty" + } + } else { + versionStr += "+unknown" + } + } + return VersionInfo{ + Version: versionStr, + BuildDate: buildDate, + GitCommit: gitCommit, + GitTag: gitTag, + GitTreeState: gitTreeState, + GoVersion: runtime.Version(), + Compiler: runtime.Compiler, + Platform: fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH), + } +} + +// PrintVersion info directly by command +func PrintVersion(short bool) { + v := getVersion() + fmt.Printf("Spark Operator Version: %s\n", v.Version) + if short { + return + } + fmt.Printf("Build Date: %s\n", v.BuildDate) + fmt.Printf("Git Commit ID: %s\n", v.GitCommit) + if v.GitTag != "" { + fmt.Printf("Git Tag: %s\n", v.GitTag) + } + fmt.Printf("Git Tree State: %s\n", v.GitTreeState) + fmt.Printf("Go Version: %s\n", v.GoVersion) + fmt.Printf("Compiler: %s\n", v.Compiler) + fmt.Printf("Platform: %s\n", v.Platform) +}