Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExecutorReservation during task scheduling #119

Open
mingmwang opened this issue Aug 8, 2022 · 2 comments
Open

ExecutorReservation during task scheduling #119

mingmwang opened this issue Aug 8, 2022 · 2 comments
Labels
enhancement New feature or request

Comments

@mingmwang
Copy link
Contributor

mingmwang commented Aug 8, 2022

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
(This section helps Arrow developers understand the context and why for this feature, in addition to the what)

/// Represents a task slot that is reserved (i.e. available for scheduling but not visible to the
/// rest of the system).
/// When tasks finish we want to preferentially assign new tasks from the same job, so the reservation
/// can already be assigned to a particular job ID. In that case, the scheduler will try to schedule
/// available tasks for that job to the reserved task slot.
#[derive(Clone, Debug)]
pub struct ExecutorReservation {
    pub executor_id: String,
    pub job_id: Option<String>,
}

Instead of making a reservation and cancel the reservation later and depends on the global lock, can we leverage the
compare-and-swap operation provided by sled/etcd?
Since we need to update the available task slots for each executor in atomic, I think compare-and-swap operators can satisfy the needs perfectly. And if we want to switch to an in-memory state store, we can also leverage rust atomic long/int to provide atomicity. Then the assignment logic can be simple.

And I think this statement "When tasks finish we want to preferentially assign new tasks from the same job" is not true.
In a system which has multiple SQL jobs running concurrently(multiple SQL jobs compete for the resources), we should support FAIR or FIFO scheduling , If we prefer to assign new tasks for the same job, it could starve other jobs.

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

@mingmwang mingmwang added the enhancement New feature or request label Aug 8, 2022
@mingmwang
Copy link
Contributor Author

@thinkharderdev

@thinkharderdev
Copy link
Contributor

Since we need to update the available task slots for each executor in atomic, I think compare-and-swap operators can satisfy the needs perfectly.

This is a bit problematic with etcd since it doesn't have an CAS primitve. Instead you have transactions which can be used to build a CAS operation but it also doesn't have an increment/decrement primitive so you would effectively have to do multiple transactions to do a CAS.

And I think this statement "When tasks finish we want to preferentially assign new tasks from the same job" is not true.
In a system which has multiple SQL jobs running concurrently(multiple SQL jobs compete for the resources), we should support FAIR or FIFO scheduling , If we prefer to assign new tasks for the same job, it could starve other jobs.

I agree that what we really need is a priority assigned at the job level. The current approach is mostly a hack until we have that in place. Their are two reasons for the current approach:

  1. Without some sort of priority queue for jobs, we want to avoid a "live lock" situation where the system is handling more queries than it has capacity for and no query is completing because the scheduler is just randomly assigning tasks. Trying to schedule tasks from the same job at least ensures that a job, once started will complete.
  2. Ideally we want to try and take into account data locality. If we can schedule tasks on the same executor where their input partition is written, then we can avoid network overhead in reading shuffles. The current approach doesn't do that exactly but it makes it more probable and I think ultimately we should be doing that explicitly.

Ultimately I think the direction this all points in though is to separate concerns a bit in the scheduler tier. The schedulers should elect a leader which is responsible for task scheduling (and can use in-memory data structures as you mention) and the followers can handle the planning. That will add a lot of complexity to the implementation but should remove basically all of the need for locks on the state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants