-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48344][SQL] Enhance SQL Script Execution: Replace NOOP with COLLECT for Result DataFrames #49372
base: master
Are you sure you want to change the base?
[SPARK-48344][SQL] Enhance SQL Script Execution: Replace NOOP with COLLECT for Result DataFrames #49372
Conversation
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecutionContext.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/scripting/SqlScriptingExecution.scala
Show resolved
Hide resolved
|
||
/** | ||
* Returns the next result statement from the script. | ||
* Multiple consecutive calls without calling `hasNext()` would return the same result statement. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This violates the Java Iterator contract. Can we make sure it gets the next result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because of the nature of contract between the caller and SqlScriptingExecution API which is to execute returned statement before proceeding with iteration, we can't advance further before first executing the returned statement.
Advancing in the next
method immediately after returning the current statement would break the contract.
Advancing to the next result statement or end of script has to be done after current result statement is executed. For that reason next
can't be the one advancing the iteration. Here is the example of the code that would not work correctly:
override def hasNext(): Boolean = current.isDefined
override def next(): DataFrame = {
val tmp = current
current = getNextResult
return tmp
}
The idea is to enforce correct usage of SqlScriptingExecution API through the future PR review process, and keep the behavior of hasNext
and next
well documented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we should not use Java iterator here but ask the caller to invoke getNextResult
directly. The contract is caller must execute the returned DataFrame before getting the next one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still have to know when we have reached the end of script and there are multiple solutions to this problem.
I have two proposals:
- Remove
extends Iterator[DataFrame]
fromSqlScriptingExecution
and continue to usehasNext
andnext
as we use them now. That way we don't have to respect Java iterator contract, and we still have user friendly way to iterate through script. - Remove everything related to iterator and execute the script in a way similar to this:
var result = Option[Seq[Row]] = None
var currentDF: Option[DataFrame] = sse.getNextResult()
while (currentDF.isDefined) {
result = Some(currentDF.collect().toSeq)
currentDF = sse.getNextResult()
}
return result...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea proposal 2 is what I was proposing.
What changes were proposed in this pull request?
This pull request proposes replacing the noop operation with collect for all result DataFrames on the caller side of the SQL Script execution process.
This is the 4th PR in the series of introducing SQL Scripting Execution into Spark.
Why are the changes needed?
The proposed change is necessary to maintain a critical invariant during SQL Script execution: when
SqlScriptingExecution
returns the next available result statement, it must be executed before proceeding with iteration.Implementation details
SQL Script execution is based on iterating over interpreted statements and executing them as they are encountered. For certain statement types (result statements), execution is delegated to the caller side (
SparkSession
). To achieve this, the iteration process is divided into two stages:getTreeIterator
method.SqlScriptingExecution
serves as a second-level iterator, iterating over all statements and executing those that are not result statements. Result statements are returned to the caller for execution on the caller side. The caller must adhere to the contract of executing the returned statement before continuing iteration.Due to the nature of this contract between the caller and the
SqlScriptingExecution
API, the implementation of thehasNext
method is not idempotent. We will enforce correct usage ofSqlScriptingExecution
API through the future PR review process.In this approach we collect every DataFrame to eliminate concerns about which one needs to be returned last. This strategy will also be utilized when we introduce multiple-results API functionality.
Impact and Considerations
This change enhances the robustness of our SQL Script execution process and lays the groundwork for future improvements, including the implementation of a multiple-results API. Reviewers should pay particular attention to the handling of DataFrame collection and the maintenance of execution order integrity.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing tests.
Was this patch authored or co-authored using generative AI tooling?
No