-
Notifications
You must be signed in to change notification settings - Fork 307
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] add driver/executor pod in Spark #3016
base: master
Are you sure you want to change the base?
[FEAT] add driver/executor pod in Spark #3016
Conversation
Thank you for opening this pull request! 🙌 These tips will help get your PR across the finish line:
|
2ff8b9a
to
af03383
Compare
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
af03383
to
7793398
Compare
Code Review Agent Run #3c7587Actionable Suggestions - 2
Additional Suggestions - 1
Review Details
|
Changelist by BitoThis pull request implements the following key changes.
|
driver_pod=self.driver_pod, | ||
executor_pod=self.executor_pod, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding driver_pod
and executor_pod
to the with_overrides
method to ensure consistent pod configuration overrides.
Code suggestion
Check the AI-generated fix before applying
@@ -56,6 +56,8 @@ def with_overrides(
new_spark_conf: Optional[Dict[str, str]] = None,
new_hadoop_conf: Optional[Dict[str, str]] = None,
new_databricks_conf: Optional[Dict[str, Dict]] = None,
+ new_driver_pod: Optional[K8sPod] = None,
+ new_executor_pod: Optional[K8sPod] = None,
) -> "SparkJob":
if not new_spark_conf:
new_spark_conf = self.spark_conf
@@ -66,6 +68,12 @@ def with_overrides(
if not new_databricks_conf:
new_databricks_conf = self.databricks_conf
+ if not new_driver_pod:
+ new_driver_pod = self.driver_pod
+
+ if not new_executor_pod:
+ new_executor_pod = self.executor_pod
+
return SparkJob(
spark_type=self.spark_type,
application_file=self.application_file,
@@ -74,6 +82,8 @@ def with_overrides(
hadoop_conf=new_hadoop_conf,
databricks_conf=new_databricks_conf,
databricks_instance=self.databricks_instance,
+ driver_pod=new_driver_pod,
+ executor_pod=new_executor_pod,
executor_path=self.executor_path,
)
Code Review Run #3c7587
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
@@ -176,6 +185,22 @@ def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: | |||
|
|||
return MessageToDict(job.to_flyte_idl()) | |||
|
|||
def to_k8s_pod(self, pod_template: PodTemplate | None, settings: SerializationSettings) -> K8sPod | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding type hints for the return value of _get_container()
in the to_k8s_pod()
method. The method appears to use this internal method but its return type is not clearly specified in the type hints.
Code suggestion
Check the AI-generated fix before applying
def to_k8s_pod(self, pod_template: PodTemplate | None, settings: SerializationSettings) -> K8sPod | None: | |
def to_k8s_pod(self, pod_template: PodTemplate | None, settings: SerializationSettings) -> K8sPod | None: | |
from flytekit.models import task as _task_model | |
_get_container: Callable[..., _task_model.Container] = self._get_container |
Code Review Run #3c7587
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Take the container with name set in driver/executor podTempalte primary_container_name Signed-off-by: machichima <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3016 +/- ##
==========================================
- Coverage 78.19% 76.84% -1.35%
==========================================
Files 201 205 +4
Lines 21274 21621 +347
Branches 2733 2762 +29
==========================================
- Hits 16635 16615 -20
- Misses 3843 4242 +399
+ Partials 796 764 -32 ☔ View full report in Codecov by Sentry. |
Code Review Agent Run #f512d4Actionable Suggestions - 0Review Details
|
Exclude those in the podTemplate of spark driver/executor pod Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Code Review Agent Run #27c6aeActionable Suggestions - 2
Review Details
|
if task_type != "spark": | ||
# for spark driver/executor, do not use the command and args from task podTemplate | ||
container.command = primary_container.command | ||
container.args = primary_container.args |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider extracting the Spark-specific container command/args logic into a separate helper function to improve code organization and readability. The current nested if condition makes the code harder to follow.
Code suggestion
Check the AI-generated fix before applying
- if task_type != "spark":
- # for spark driver/executor, do not use the command and args from task podTemplate
- container.command = primary_container.command
- container.args = primary_container.args
+ if _should_copy_container_command_args(task_type):
+ container.command = primary_container.command
+ container.args = primary_container.args
+
def _should_copy_container_command_args(task_type: str) -> bool:
+ # for spark driver/executor, do not use the command and args from task podTemplate
+ return task_type != "spark"
Code Review Run #27c6ae
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
pod_spec=driver_pod_spec_dict_remove_None, # type: ignore | ||
) | ||
|
||
target_executor_k8sPod = K8sPod( | ||
metadata=K8sObjectMetadata( | ||
labels={"lKeyA_e": "lValA", "lKeyB_e": "lValB"}, | ||
annotations={"aKeyA_e": "aValA", "aKeyB_e": "aValB"}, | ||
), | ||
pod_spec=executor_pod_spec_dict_remove_None, # type: ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider removing the # type: ignore
comments and properly typing the pod_spec
parameter to match the expected type.
Code suggestion
Check the AI-generated fix before applying
- pod_spec=driver_pod_spec_dict_remove_None, # type: ignore
+ pod_spec=V1PodSpec(**driver_pod_spec_dict_remove_None),
@@ -378,1 +378,1 @@
- pod_spec=executor_pod_spec_dict_remove_None, # type: ignore
+ pod_spec=V1PodSpec(**executor_pod_spec_dict_remove_None),
Code Review Run #27c6ae
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Signed-off-by: machichima <[email protected]>
Signed-off-by: machichima <[email protected]>
Code Review Agent Run #41dd0bActionable Suggestions - 1
Review Details
|
if SparkSession._instantiatedSession: | ||
SparkSession.builder.getOrCreate().stop() | ||
SparkSession._instantiatedSession = None | ||
yield | ||
pyspark.sql.SparkSession.builder.getOrCreate().stop() | ||
|
||
if SparkSession._instantiatedSession: | ||
SparkSession.builder.getOrCreate().stop() | ||
SparkSession._instantiatedSession = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider extracting the duplicate code block for stopping SparkSession into a helper function to improve maintainability and reduce duplication.
Code suggestion
Check the AI-generated fix before applying
- if SparkSession._instantiatedSession:
- SparkSession.builder.getOrCreate().stop()
- SparkSession._instantiatedSession = None
+ def _cleanup_spark_session():
+ if SparkSession._instantiatedSession:
+ SparkSession.builder.getOrCreate().stop()
+ SparkSession._instantiatedSession = None
+
+ _cleanup_spark_session()
yield
- if SparkSession._instantiatedSession:
- SparkSession.builder.getOrCreate().stop()
- SparkSession._instantiatedSession = None
+ _cleanup_spark_session()
Code Review Run #41dd0b
Is this a valid issue, or was it incorrectly flagged by the Agent?
- it was incorrectly flagged
Tracking issue
Related to flyteorg/flyte#4105
Why are the changes needed?
This PR update the flytekit-spark package to configure driver pod and executor pod separately using PodTemplate. Enable setting the separate primary_container_name for driver/executor pod separate from the task podTemplate.
What changes were proposed in this pull request?
Add driver_pod and executor_pod field with type PodTemplate in SparkJob.
How was this patch tested?
test_spark_driver_executor_podSpec
@task
forhello_spark
function inmy_spark
example here as follow to set the driver_pod and executor_pod.Verify the pods have Tolerations and EnvVar set.
Setup process
Screenshots
Check all the applicable boxes
Related PRs
flyteorg/flyte#6085
Docs link
Summary by Bito
Enhanced flytekit-spark package by implementing configurable driver and executor pod support through PodTemplate. Added driver_pod and executor_pod fields to SparkJob model with primary_only flag for pod spec serialization. The implementation includes type hint updates from K8sPod to PodTemplate, parameter order modifications, and improved SparkSession cleanup in tests. This enables granular control and customization of labels, annotations, containers, and tolerations for both driver and executor pods.Unit tests added: True
Estimated effort to review (1-5, lower is better): 2