-
-
Notifications
You must be signed in to change notification settings - Fork 721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid handling stale long-running messages on scheduler #8991
base: main
Are you sure you want to change the base?
Avoid handling stale long-running messages on scheduler #8991
Conversation
…ated to process the task
if worker not in self.workers: | ||
logger.debug( | ||
"Received long-running signal from unknown worker %s. Ignoring.", worker | ||
) | ||
return | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mostly for good measure, I think it should the code should also work without this.
steal = self.extensions.get("stealing") | ||
if steal is not None: | ||
steal.remove_key_from_stealable(ts) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't tested the move of this code, but I'm certain that we should deal with staleness before taking any meaningful actions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, absolutely
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 27 files ± 0 27 suites ±0 11h 37m 24s ⏱️ + 7m 42s For more details on these failures, see this check. Results for commit 88c42f7. ± Comparison against base commit 49f5e74. |
|
||
ws = ts.processing_on | ||
if ws is None: | ||
logger.debug("Received long-running signal from duplicate task. Ignoring.") | ||
return | ||
|
||
if ws.address != worker: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally there was a more reliable way to verify the requests integrity.
A chain like this
processing -> long running -> released -> processing (without a long running transition)
that happens on the same worker would still recognize a stale event as valid. However, I doubt this is a relevant scenario in practice.
# Assert that the handler did not fail and no state was corrupted | ||
logs = caplog.getvalue() | ||
assert not logs | ||
assert not wsB.task_prefix_count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer a test that does not rely on logging. Is this corruption detectable with validate
? (If not, can it be made detectable with this?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, let me check.
# Submit task and wait until it executes on a | ||
x = c.submit( | ||
f, | ||
block_secede, | ||
block_long_running, | ||
key="x", | ||
workers=[a.address], | ||
) | ||
await wait_for_state("x", "executing", a) | ||
|
||
with captured_logger("distributed.scheduler", logging.ERROR) as caplog: | ||
with freeze_batched_send(a.batched_stream): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For review (and future maintainability) it might be helpful to briefly document in a sentence or two what the below code is constructing and asserting
key="x", | ||
workers=[a.address], | ||
) | ||
await wait_for_state("x", "executing", a) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since you're already dealing with so many events above, why not using an event for this as well? Is it important to interrupt as soon as the task is in this state, i.e. before it's executed on the TPE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me, no strong preference over polling vs. adding yet another event. I felt that this was a bit easier to read but I guess YMMV.
|
||
def f(block_secede, block_long_running): | ||
block_secede.wait() | ||
distributed.secede() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this also trigger when using worker_client
? The secede
is an API I typically discourage from using. Mostly because the counterpart rejoin
is quite broken
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly suppose it does. The original workload where this popped up had many clients connected to the scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. If we can rewrite the test to use validate (or extend validate) that'd be great but it's not a blocker
pre-commit run --all-files