Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Controller changes to support Serving as a builtin source #2357

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

BulkBeing
Copy link
Contributor

@BulkBeing BulkBeing commented Jan 23, 2025

fixes #2319

Tested the validation with below pipeline:

apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: simple-pipeline
spec:
  vertices:
    - name: serving-in
      containerTemplate:
        imagePullPolicy: Never
      scale:
        min: 1
      source:
        serving:
          service: true
          msgIDHeaderKey: "X-Numaflow-Id"
          store:
            url: "redis://redis:6379"

    - name: compute-sum
      udf:
        container:
          # compute the sum
          image: quay.io/numaio/numaflow-go/reduce-sum:stable
        groupBy:
          window:
            fixed:
              length: 60s
          keyed: true
          storage:
            emptyDir: {}
      partitions: 2

    - name: serve-sink
      containerTemplate:
        imagePullPolicy: Never
      scale:
        min: 1
      sink:
        udsink:
          container:
            image: servesink:0.1
            imagePullPolicy: Never
            env:
              - name: NUMAFLOW_CALLBACK_URL_KEY
                value: "X-Numaflow-Callback-Url"
              - name: NUMAFLOW_MSG_ID_HEADER_KEY
                value: "X-Numaflow-Id"

  edges:
    - from: serving-in
      to: compute-sum
    - from: compute-sum
      to: serve-sink
➜  kubectl apply -f simple-pipeline-test.yaml
Error from server (BadRequest): error when creating "simple-pipeline-test.yaml": admission webhook "webhook.numaflow.numaproj.io" denied the request: pipeline has a Serving source "serving-in" and a reduce vertex "compute-sum". Reduce is not supported with Serving source

@yhl25
Copy link
Contributor

yhl25 commented Jan 27, 2025

Add validation checks to not support reduce when serving source is configured? Also enable callback when serving source is configured so that the users can avoid setting the callback annotation?

Copy link

codecov bot commented Jan 27, 2025

Codecov Report

Attention: Patch coverage is 77.77778% with 18 lines in your changes missing coverage. Please review.

Project coverage is 69.68%. Comparing base (084be89) to head (5e7089a).
Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
rust/serving/src/config.rs 56.41% 17 Missing ⚠️
pkg/apis/numaflow/v1alpha1/vertex_types.go 96.77% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2357      +/-   ##
==========================================
- Coverage   69.70%   69.68%   -0.02%     
==========================================
  Files         361      361              
  Lines       49934    49984      +50     
==========================================
+ Hits        34805    34830      +25     
- Misses      14053    14084      +31     
+ Partials     1076     1070       -6     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@BulkBeing BulkBeing marked this pull request as ready for review January 30, 2025 10:20
@vigith vigith requested a review from yhl25 January 30, 2025 16:06
Makefile Outdated Show resolved Hide resolved
for _, vtx := range req.PipelineSpec.Vertices {
if vtx.IsASource() && vtx.Source.Serving != nil {
commonEnvVars = append(commonEnvVars,
corev1.EnvVar{Name: EnvCallbackEnabled, Value: "true"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we changed the logic from getting the information from annotations.

Are we expecting it's always true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR removes the need for having an annotation to enable callbacks. Instead, if a serving source is used, the environment variable to enable callbacks will be set automatically. The EnvCallbackEnabled is checked to construct the callback http client, the callback only happens if the message contains the X-Numaflow-Callback-Url header.

pkg/apis/numaflow/v1alpha1/vertex_types.go Outdated Show resolved Hide resolved
pkg/apis/numaflow/v1alpha1/vertex_types.go Outdated Show resolved Hide resolved
@BulkBeing BulkBeing requested a review from whynowy January 31, 2025 05:18
)

// if auth is configured, set the auth token in the environment
if servingSource.Auth != nil && servingSource.Auth.Token != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should have been mounted as a volume to the container, there's no need to add env vars to it.

@@ -343,13 +349,61 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) {
}

if v.IsASource() && v.Spec.Source.Serving != nil {
servingContainer, err := v.getServingContainer(req)
servingSource := v.Spec.Source.Serving
servingSourceBytes, err := json.Marshal(servingSource)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's already an env var EnvVertexObject, no need to do this.

… source spec as env variable

Signed-off-by: Sreekanth <[email protected]>
}
if servingSource != nil {
for _, v := range pl.Spec.Vertices {
if v.UDF != nil && v.UDF.GroupBy != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a utility function IsReduceUDF()

@BulkBeing BulkBeing requested review from kohlisid and whynowy February 3, 2025 04:20
Copy link
Member

@whynowy whynowy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kohlisid - please review.

Comment on lines 366 to 367
// set the serving source stream name in the environment because the numa container will be reading from it
corev1.EnvVar{Name: EnvServingJetstreamStream, Value: req.ServingSourceStreamName},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need js stream anymore right?

Comment on lines 160 to 172
#[derive(Deserialize)]
struct Source {
serving: Serving,
}
#[derive(Deserialize)]
struct Spec {
source: Source,
}

#[derive(Deserialize)]
struct VertexObject {
spec: Spec,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use the generated models?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Controller changes to run serving as a builtin source
4 participants