Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50768][CORE] Introduce TaskContext.createResourceUninterruptibly to avoid stream leak by task interruption #49413

Closed
wants to merge 6 commits into from

Conversation

Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Jan 8, 2025

What changes were proposed in this pull request?

This PR fixes the potential stream leak issue by introduing TaskContext.createResourceUninterruptibly.

When a task is using TaskContext.createResourceUninterruptibly to create the resource, the task would be marked as uninterruptible. Thus, any interruption request during the call to TaskContext.createResourceUninterruptibly would be delayed until the creation finishes.

This PR introduces an new lock contention between Task.kill and TaskContext.createResourceUninterruptibly. But I think it is acceptable given that both are not on the hot-path.

(I will submmit a followup to apply TaskContext.createResourceUninterruptibly in the codebase if this PR is approved by the community.)

Why are the changes needed?

We had #48483 tried to fix the potential stream leak issue by task interruption. It mitigates the issue by using

def tryInitializeResource[R <: Closeable, T](createResource: => R)(initialize: R => T): T = {
  val resource = createResource
  try {
    initialize(resource)
  } catch {
    case e: Throwable =>
      resource.close()
      throw e
  }
} 

But this utility function has an issue that resource.close() would leak open resouces if initialize(resource) also created some resources internally, especially when initialize(resource) is interrupted (See the example of InterruptionSensitiveInputStream in the test).

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added a unit test.

Was this patch authored or co-authored using generative AI tooling?

No.


// Leave some time for the task to be interrupted during the
// creation of `InterruptionSensitiveInputStream`.
Thread.sleep(5000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How important is this sleep within the task? Could it potentially make the test flaky?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is necessary to ensure the task is interrupted during InterruptionSensitiveInputStream#initialize() so that we can test the leaked stream. Increase the sleep time should less likely be flaky.

core/src/main/scala/org/apache/spark/TaskContext.scala Outdated Show resolved Hide resolved
@github-actions github-actions bot added the BUILD label Jan 13, 2025
Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Ngone51
Copy link
Member Author

Ngone51 commented Jan 15, 2025

Could someone help merge this PR if we're good to proceed? Or if I can also merge myself with enough approvals?

@LuciferYang
Copy link
Contributor

@Ngone51 Does branch-3.5 also need this bug fix?

@Ngone51
Copy link
Member Author

Ngone51 commented Jan 15, 2025

@LuciferYang Thanks. Yes, I think so. Do I need send a separate PR?

@LuciferYang
Copy link
Contributor

Merged into master for Spark 4.0. Thanks @Ngone51 @xuanyuanking and @HyukjinKwon .

@Ngone51 Due to code conflicts, it cannot be directly merged into branch-3.5. If needed, please submit a separate pr. Thanks ~

@Ngone51
Copy link
Member Author

Ngone51 commented Jan 15, 2025

Sure, I'd submit a separate PR for branch-3.5.

@Ngone51
Copy link
Member Author

Ngone51 commented Jan 15, 2025

Hi @LuciferYang I found we didn't backport #48483 to branch-3.5 for the same issue. Generally speaking, this is not a critial bug fix but only for the corner case. Not sure if we want to backport it or not.

@HyukjinKwon Any idea about the backport policy for this kind of fix? is it a requirement or optional?

@HyukjinKwon
Copy link
Member

please go ahead. I think it's fien

@dongjoon-hyun
Copy link
Member

Please hold one backporting for now, @Ngone51 .

It's because this could introduce a flakiness into master branch. branch-3.5 can wait because we don't have Apache Spark 3.5.5 release schedule yet.

Screenshot 2025-01-14 at 20 44 13

In addition, if you can, please give us a confirmation that the failures of current master branch are irrelevant to this PR.

@dongjoon-hyun
Copy link
Member

For the record, after this PR, surprisingly, 3 commits fail consecutively at the same PySpark pipeline.

Screenshot 2025-01-14 at 20 48 10

override private[spark] def interruptible(): Boolean = taskContext.interruptible()

override private[spark] def pendingInterrupt(threadToInterrupt: Option[Thread], reason: String)
: Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix it with a followup. Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed at #49508

}

override private[spark] def createResourceUninterruptibly[T <: Closeable](resourceBuilder: => T)
: T = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon
Copy link
Member

im taking a look now for the failure

@dongjoon-hyun
Copy link
Member

Thank you so much! I looked at, but didn't find any clear clue.

@LuciferYang
Copy link
Contributor

LuciferYang commented Jan 15, 2025

Seems unrelated, I tested the result of reverting the current one, but pyspark.conf still failed:

image

@HyukjinKwon
Copy link
Member

found the cuase. working on the fix

@HyukjinKwon
Copy link
Member

no tihs pr is fine

@LuciferYang
Copy link
Contributor

@HyukjinKwon Are you saying this pr caused the test to fail?

@HyukjinKwon
Copy link
Member

nono this PR is fine. I will make a PR soon with a PR description with 20 mins

@HyukjinKwon
Copy link
Member

here: #49500

// Should be protected by `TaskContext.synchronized`.
private var pendingInterruptRequest: Option[(Option[Thread], String)] = None

// Whether this task is able to be interrupted. Should be protected by `TaskContext.synchronized`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe mark as @transient?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to @volatile?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already have TaskContext.synchronized protected, @volatile could be redundant. And if we use @volatile only, it's not thread safety as we could have two threads modify _interruptible concurrently.


override def interruptible(): Boolean = TaskContext.synchronized(_interruptible)

override def pendingInterrupt(threadToInterrupt: Option[Thread], reason: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API looks weird if threadToInterrupt is None, as there is nothing to interrupt.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When threadToInterrupt=None, it would still "interrupt" the task by invoking TaskContext.markInterrupted(). It just won't invoke Thread.interrupt() on the task thread.

}

TaskContext.synchronized {
interruptIfRequired()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a safeguard that the caller may mistakenly call pendingInterrupt even if the task is interruptable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This is for the case where resource creation happens after the task has been marked as interrupted. In that case, pendingInterruptRequest is None and reasonIfKilled is not None, and killTaskIfInterrupted() would throw TaskKilledException() to stop the task thread.

@Ngone51
Copy link
Member Author

Ngone51 commented Jan 15, 2025

FYI I created a followup PR (#49508) to use TaskContext.createResourceUninterruptibly() where it applies.

dongjoon-hyun pushed a commit that referenced this pull request Jan 16, 2025
…ninterruptibly() to risky resource creations

### What changes were proposed in this pull request?

This is a follow-up PR for #49413. This PR intends to apply `TaskContext.createResourceUninterruptibly()` to the resource creation where it has the potential risk of resource leak in the case of task cancellation.

### Why are the changes needed?

Avoid resource leak.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

n/a

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49508 from Ngone51/SPARK-50768-followup.

Authored-by: Yi Wu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun pushed a commit that referenced this pull request Jan 16, 2025
…ninterruptibly() to risky resource creations

### What changes were proposed in this pull request?

This is a follow-up PR for #49413. This PR intends to apply `TaskContext.createResourceUninterruptibly()` to the resource creation where it has the potential risk of resource leak in the case of task cancellation.

### Why are the changes needed?

Avoid resource leak.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

n/a

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #49508 from Ngone51/SPARK-50768-followup.

Authored-by: Yi Wu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 9b32334)
Signed-off-by: Dongjoon Hyun <[email protected]>
nartal1 added a commit to NVIDIA/spark-rapids that referenced this pull request Jan 17, 2025
…atabricks] (#11972)

This fixes #11971 . This
unblocks the scala_2.13 nightly build jobs.

MockTaskContext extends TaskContext in spark-rapids code. Recently
additional definitions were added in Spark-4.0's TaskContext.
In this PR, we added the missing methods to MockTaskContext.scala file.
We are not overriding these methods so that it can compile on all Spark
versions.

In Spark-4.0, these were added in TaskContext.scala:
PR : apache/spark#49413

```
private[spark] def interruptible(): Boolean
private[spark] def pendingInterrupt(threadToInterrupt: Option[Thread], reason: String): Unit
private[spark] def createResourceUninterruptibly[T <: Closeable](resourceBuilder: => T): T
```

Signed-off-by: Niranjan Artal <[email protected]>
Co-authored-by: Gera Shegalov <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants