Skip to content

Commit

Permalink
Improve parallelism of mapParUnordered.
Browse files Browse the repository at this point in the history
Previously, we would find all non-pending promises and yield all
of them. But if there was a pause during yielding them, time could
pass with (possibly many) fewer items in progress than `max`.

Now we only yield one "done" item at a time, and immediately
queue another to take its place.
  • Loading branch information
Cody Casterline committed Apr 25, 2023
1 parent 86fae0f commit a4b0581
Showing 1 changed file with 20 additions and 12 deletions.
32 changes: 20 additions & 12 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -604,33 +604,41 @@ export class LazyAsync<T> implements AsyncIterable<T>, LazyShared<T> {
* suffer from head-of-line blocking.
*/
mapParUnordered<Out>(max: number, transform: Transform<T, Promise<Out>>): LazyAsync<Out> {
if (max <= 0) { throw new Error("max must be > 0")}
let inner = this.#inner
let gen = async function*() {

let pending: StatefulPromise<Out>[] = []
let done: StatefulPromise<Out>[] = []

// Get a list of (at least one) "done" task.
let getDone = async () => {
let repartition = async () => {
if (pending.length == 0 || done.length > 0) {
// nothing to do.
return
}

await Promise.race(pending)
let done = pending.filter(it => it.state != "pending")
pending = pending.filter(it => it.state === "pending")
return done
let values = lazy(pending)
.partition(it => it.state === "pending")
pending = values.matches
done = values.others
}

for await (let item of inner) {
pending.push(stateful(transform(item)))

while (pending.length >= max) {
let done = await getDone()
for (let d of done) {
yield await d
}
while (pending.length + done.length >= max) {
await repartition()
yield done.pop()!
}
}
while (pending.length > 0) {
let done = await getDone()
while (pending.length + done.length > 0) {
await repartition()
for (let d of done) {
yield await d
yield d
}
done = []
}
}

Expand Down

0 comments on commit a4b0581

Please sign in to comment.