Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple semaphores render scheduler unresponsive and crash entire cluster #8999

Open
carusyte opened this issue Feb 1, 2025 · 0 comments
Open

Comments

@carusyte
Copy link

carusyte commented Feb 1, 2025

Describe the issue:
When multiple semaphores are passed as argument to the tasks, the CPU utilization of scheduler process may increase without limit due to unknown "leakage" even if none of the semaphores are used inside the tasks. The scheduler becomes unresponsive eventually and the entire cluster failed as a consequence.

Minimal Complete Verifiable Example:

import os
os.environ["PYTHONUNBUFFERED"] = "1"

import time
import random
from datetime import datetime

from dask.distributed import (
    Client,
    LocalCluster,
    get_worker,
    wait,
    Semaphore,
)

#NOTE: adjust the number of workers as needed. The more the sooner of crash.
n_workers = 16

def dummy_task(i, locks):
    print(
        f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")} worker#{get_worker().name} on tier2 task #{i}'
    )

    duration = random.uniform(3, 8)
    time.sleep(duration)

    return None


def main():
    cluster = LocalCluster(
        n_workers=n_workers,
        threads_per_worker=1,
        processes=True,
    )
    client = Client(cluster)

    locks = {}
    #NOTE: adjust the number of semaphores or max_leases as needed. The more the sooner of crash.
    for i in range (10): 
        locks[f"lock{i}"] = Semaphore(max_leases=10, name=f"dummy_semaphore{i}")

    futures = []
    i = 0
    while True:
        futures.append(
            client.submit(
                dummy_task,
                i,
                locks,
            )
        )
        if len(futures) > n_workers * 2:
            _, undone = wait(futures, return_when="FIRST_COMPLETED")
            futures = list(undone)
        i += 1

if __name__ == "__main__":
    main()

Anything else we need to know?:
In my real-world example, I'm destined to hit the following error:

Futures cancelled for reason: scheduler-connection-lost. 
Message: Client lost the connection to the scheduler. Please check your connection and re-run your work. 

or

distributed.core - INFO - Event loop was unresponsive in Nanny for 4.41s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.
distributed.core - INFO - Event loop was unresponsive in Scheduler for 14.70s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

And, py-spy flamegraph may show that it takes longer and longer for the scheduler process to run create_task and add_future:

Image

Found some possibly related issues or discussions around but I'm not sure if they use semaphore / lock.

Environment:

  • Dask version: 2025.1.0
  • Python version: 3.12.8 (same issue in 3.11)
  • Operating System: Ubuntu 22.04
  • Install method (conda, pip, source): conda
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant