Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50600][CONNECT][SQL] Set analyzed on analysis failure #49383

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ object QueryPlanningTracker {
* Callbacks after planning phase completion.
*/
abstract class QueryPlanningTrackerCallback {
/**
* Called when query fails analysis
*
* @param tracker tracker that triggered the callback.
* @param parsedPlan The plan prior to analysis
* see @org.apache.spark.sql.catalyst.analysis.Analyzer
*/
def analysisFailed(tracker: QueryPlanningTracker, parsedPlan: LogicalPlan): Unit = {
// Noop by default for backward compatibility
}
/**
* Called when query has been analyzed.
*
Expand Down Expand Up @@ -147,6 +157,17 @@ class QueryPlanningTracker(
ret
}

/**
* Set when the query has been parsed but failed to be analyzed.
* Can be called multiple times upon plan change.
*
* @param parsedPlan The plan prior analysis
* see @org.apache.spark.sql.catalyst.analysis.Analyzer
*/
private[sql] def setAnalysisFailed(parsedPlan: LogicalPlan): Unit = {
trackerCallback.foreach(_.analysisFailed(this, parsedPlan))
}

/**
* Set when the query has been analysed.
* Can be called multiple times upon plan change.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ class QueryPlanningTrackerSuite extends SparkFunSuite {
val mockCallback = mock[QueryPlanningTrackerCallback]
val mockPlan1 = mock[LogicalPlan]
val mockPlan2 = mock[LogicalPlan]
val mockPlan3 = mock[LogicalPlan]
val mockPlan4 = mock[LogicalPlan]
val t = new QueryPlanningTracker(Some(mockCallback))
t.setAnalysisFailed(mockPlan3)
verify(mockCallback, times(1)).analysisFailed(t, mockPlan3)
t.setAnalysisFailed(mockPlan4)
verify(mockCallback, times(1)).analysisFailed(t, mockPlan4)
t.setAnalyzed(mockPlan1)
verify(mockCallback, times(1)).analyzed(t, mockPlan1)
t.setAnalyzed(mockPlan2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,19 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) {
*
* @param analyzedPlan
* The analyzed plan generated by the Connect request plan. None when the request does not
* generate a plan.
* generate a Spark plan or analysis fails.
* @param parsedPlan
* The analyzed plan generated by the Connect request plan. None when the request does not
jdesjean marked this conversation as resolved.
Show resolved Hide resolved
* generate a plan or does not fail analysis.
*/
def postAnalyzed(analyzedPlan: Option[LogicalPlan] = None): Unit = {
def postAnalyzed(
analyzedPlan: Option[LogicalPlan] = None,
parsedPlan: Option[LogicalPlan] = None): Unit = {
assertStatus(List(ExecuteStatus.Started, ExecuteStatus.Analyzed), ExecuteStatus.Analyzed)
val event =
SparkListenerConnectOperationAnalyzed(jobTag, operationId, clock.getTimeMillis())
event.analyzedPlan = analyzedPlan
event.parsedPlan = parsedPlan
listenerBus.post(event)
}

Expand Down Expand Up @@ -251,6 +257,12 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) {
postAnalyzed(Some(analyzedPlan))
}

override def analysisFailed(
tracker: QueryPlanningTracker,
parsedPlan: LogicalPlan): Unit = {
postAnalyzed(parsedPlan = Some(parsedPlan))
}

def readyForExecution(tracker: QueryPlanningTracker): Unit = postReadyForExecution()
}))
}
Expand Down Expand Up @@ -341,9 +353,15 @@ case class SparkListenerConnectOperationAnalyzed(
extraTags: Map[String, String] = Map.empty)
extends SparkListenerEvent {

/**
* Parsed Spark plan generated by the Connect request. None when the Connect request does not
* generate a Spark plan or does not fail analysis.
*/
@JsonIgnore var parsedPlan: Option[LogicalPlan] = None

/**
* Analyzed Spark plan generated by the Connect request. None when the Connect request does not
* generate a Spark plan.
* generate a Spark plan or analysis fails.
*/
@JsonIgnore var analyzedPlan: Option[LogicalPlan] = None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,18 @@ class QueryExecution(
}

private val lazyAnalyzed = LazyTry {
val plan = executePhase(QueryPlanningTracker.ANALYSIS) {
// We can't clone `logical` here, which will reset the `_analyzed` flag.
sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
try {
val plan = executePhase(QueryPlanningTracker.ANALYSIS) {
// We can't clone `logical` here, which will reset the `_analyzed` flag.
sparkSession.sessionState.analyzer.executeAndCheck(logical, tracker)
}
tracker.setAnalyzed(plan)
plan
} catch {
case NonFatal(e) =>
tracker.setAnalysisFailed(logical)
throw e
}
tracker.setAnalyzed(plan)
plan
}

def analyzed: LogicalPlan = lazyAnalyzed.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package org.apache.spark.sql.execution

import scala.collection.mutable
import scala.io.Source
import scala.util.Try

import org.apache.spark.sql.{AnalysisException, Dataset, ExtendedExplainGenerator, FastOperator}
import org.apache.spark.sql.catalyst.{QueryPlanningTracker, QueryPlanningTrackerCallback}
import org.apache.spark.sql.catalyst.analysis.CurrentNamespace
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.{QueryPlanningTracker, QueryPlanningTrackerCallback, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{CurrentNamespace, UnresolvedFunction, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan, OneRowRelation, Project, ShowTables, SubqueryAlias}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
Expand Down Expand Up @@ -405,6 +406,21 @@ class QueryExecutionSuite extends SharedSparkSession {
}
}

test("SPARK-50600: Failed analysis should send analyzed event") {
val mockCallback = MockCallback()

def table(ref: String): LogicalPlan = UnresolvedRelation(TableIdentifier(ref))

val unresolvedUndefinedFunc = UnresolvedFunction("unknown", Seq.empty, isDistinct = false)
val plan = Project(Seq(Alias(unresolvedUndefinedFunc, "call1")()), table("table"))
val dataset = Try {
val df = Dataset.ofRows(spark, plan, new QueryPlanningTracker(Some(mockCallback)))
df.queryExecution.assertAnalyzed()
}
assert(dataset.failed.get.isInstanceOf[AnalysisException])
mockCallback.assertAnalyzed()
}

case class MockCallbackEagerCommand(
var trackerAnalyzed: QueryPlanningTracker = null,
var trackerReadyForExecution: QueryPlanningTracker = null)
Expand Down Expand Up @@ -447,6 +463,15 @@ class QueryExecutionSuite extends SharedSparkSession {
var trackerAnalyzed: QueryPlanningTracker = null,
var trackerReadyForExecution: QueryPlanningTracker = null)
extends QueryPlanningTrackerCallback {
override def analysisFailed(
trackerFromCallback: QueryPlanningTracker,
analyzedPlan: LogicalPlan): Unit = {
trackerAnalyzed = trackerFromCallback
assert(!trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.ANALYSIS))
assert(!trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.OPTIMIZATION))
assert(!trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.PLANNING))
assert(analyzedPlan != null)
}
def analyzed(trackerFromCallback: QueryPlanningTracker, plan: LogicalPlan): Unit = {
trackerAnalyzed = trackerFromCallback
assert(trackerAnalyzed.phases.keySet.contains(QueryPlanningTracker.ANALYSIS))
Expand Down
Loading