Skip to content

Commit

Permalink
Nicer syntax for parallel mapping.
Browse files Browse the repository at this point in the history
  • Loading branch information
NfNitLoop committed Aug 3, 2023
1 parent cc65750 commit 05c7ec4
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 37 deletions.
112 changes: 92 additions & 20 deletions mod.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/**
*
* Better Iterators
* ================
*
Expand Down Expand Up @@ -84,7 +83,7 @@
* You can convert a `Lazy` to a `LazyAsync`:
*
* ```ts
* import { lazy, range } from "./mod.ts"
* import { lazy } from "./mod.ts"
* let urls = [
* "https://www.example.com/foo",
* "https://www.example.com/bar"
Expand All @@ -103,9 +102,24 @@
* Here, {@link LazyAsync#map} does the least surprising thing and does not
* introduce parallelism implicitly. You've still got serial lazy iteration.
*
* If you DO want parallelism, the {@link LazyAsync#mapPar} and
* {@link LazyAsync#mapParUnordered} methods let you explicitly opt in at your
* chosen level of parallelism.
* If you DO want parallelism, you can explicitly opt in like this:
*
* ```ts
* import { lazy } from "./mod.ts"
* let urls = [
* "https://www.example.com/foo",
* "https://www.example.com/bar"
* ]
* let lazySizes = lazy(urls)
* .map({
* parallel: 5,
* async mapper(url) {
* let response = await fetch(url)
* return await response.text()
* }
* })
* // etc.
* ```
*
* Functional Idioms
* -----------------
Expand Down Expand Up @@ -142,12 +156,13 @@ export function lazy<T>(iter: Iterable<T>|AsyncIterable<T>): Lazy<T>|LazyAsync<T
* (Note: `LazyAsync` implements some methods that are not shared.)
*
* You shouldn't need to interact with these classes or this interface
* directly, though. You can convert to the appropriate one with {@link lazy}.
* directly, though. You can use {@link lazy} to automatically instantiate the
* correct one.
*
* Operations on lazy iterators consume the underlying iterator. You should not
* use them again.
*/
interface LazyShared<T> {
export interface LazyShared<T> {

/**
* Apply `transform` to each element.
Expand All @@ -156,6 +171,11 @@ interface LazyShared<T> {
*/
map<Out>(transform: Transform<T, Out>): LazyShared<Out>

/**
* Asynchronously transform elements in parallel.
*/
map<Out>(options: ParallelMapOptions<T, Out>): LazyAsync<Out>

/** Keeps only items for which `f` is `true`. */
filter(f: Filter<T>): LazyShared<T>
/** Overload to support type narrowing. */
Expand Down Expand Up @@ -280,6 +300,34 @@ interface LazyShared<T> {
loop(): LazyShared<T>
}

/**
* Passing this to the map() function indicates that you want to map
* values
*/
export interface ParallelMapOptions<T, Out> {
/**
* The maximum number of map functions to run in parallel.
* This gives you bounded parallelism so that you don't overwhelm
* whatever resource you're mapping with. (ex: fetch, exec, write, etc.)
*/
parallel: number

/**
* The async mapping function to run in parallel.
*/
mapper: (t: T) => Promise<Out>

/**
* Should we maintain the input ordering in the output?
*
* This is the least-surprising behavior, but it comes at the cost of
* potential head-of-line blocking.
*
* Default: true
*/
ordered?: boolean
}

export class Lazy<T> implements Iterable<T>, LazyShared<T> {

/** Prefer to use the {@link lazy} function. */
Expand Down Expand Up @@ -312,7 +360,16 @@ export class Lazy<T> implements Iterable<T>, LazyShared<T> {
*
* Works like {@link Array#map}.
*/
map<Out>(transform: Transform<T, Out>): Lazy<Out> {
map<Out>(transform: Transform<T, Out>): Lazy<Out>;
map<Out>(options: ParallelMapOptions<T,Out>): LazyAsync<Out>;
map<Out>(arg: Transform<T,Out>|ParallelMapOptions<T,Out>): Lazy<Out>|LazyAsync<Out> {
if ("parallel" in arg) {
return this.toAsync().map(arg)
}
return this.#simpleMap(arg)
}

#simpleMap<Out>(transform: Transform<T, Out>): Lazy<Out> {
let inner = this.#inner
let transformIter = function*() {
for (let item of inner) {
Expand Down Expand Up @@ -647,7 +704,21 @@ export class LazyAsync<T> implements AsyncIterable<T>, LazyShared<T> {
*
* Works like {@link Array#map}.
*/
map<Out>(transform: Transform<T, Awaitable<Out>>): LazyAsync<Out> {
map<Out>(transform: Transform<T, Awaitable<Out>>): LazyAsync<Out>;
map<Out>(options: ParallelMapOptions<T, Out>): LazyAsync<Out>;
map<Out>(arg: Transform<T, Awaitable<Out>>|ParallelMapOptions<T,Out>): LazyAsync<Out> {
if ("parallel" in arg) {
const { ordered, parallel, mapper } = arg
if (ordered == false) {
return this.#mapParUnordered(parallel, mapper)
}
return this.#mapPar(parallel, mapper)
}

return this.#simpleMap(arg)
}

#simpleMap<Out>(transform: Transform<T, Awaitable<Out>>): LazyAsync<Out> {
let inner = this.#inner
let gen = async function*() {
for await (let item of inner) {
Expand All @@ -658,17 +729,14 @@ export class LazyAsync<T> implements AsyncIterable<T>, LazyShared<T> {
}

/**
*
* Like {@link map}, but performs up to `max` operations in parallel.
*
* Note: This function will ensure that the order of outputs is the same
* as the order of the inputs they were mapped from. This can introduce
* head-of-line blocking, which can slow performance. If you don't need this,
* use {@link mapParUnordered} instead.
*
* @deprecated Use {@link map} with {@link ParallelMapOptions} instead.
*/
mapPar<Out>(max: number, transform: Transform<T, Promise<Out>>): LazyAsync<Out> {
let inner = this.#inner
return this.#mapPar(max, transform)
}

#mapPar<Out>(max: number, transform: Transform<T, Promise<Out>>): LazyAsync<Out> {
let inner = this.#inner
let gen = async function*() {

let pending = new Queue<Promise<Out>>()
Expand All @@ -688,10 +756,14 @@ export class LazyAsync<T> implements AsyncIterable<T>, LazyShared<T> {
}

/**
* A version of {@link mapPar} that does *not* enforce ordering, so doesn't
* suffer from head-of-line blocking.
* @deprecated Use {@link map} with {@link ParallelMapOptions}
* and `ordered: false`
*/
mapParUnordered<Out>(max: number, transform: Transform<T, Promise<Out>>): LazyAsync<Out> {
return this.#mapParUnordered(max, transform)
}

#mapParUnordered<Out>(max: number, transform: Transform<T, Promise<Out>>): LazyAsync<Out> {
if (max <= 0) { throw new Error("max must be > 0")}
let inner = this.#inner
let gen = async function*() {
Expand Down
4 changes: 2 additions & 2 deletions tests/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ export { assert, assertEquals, assertIsError, assertThrows } from "https://deno.
export { delay } from "https://deno.land/[email protected]/async/delay.ts";


import { lazy, Lazy, LazyAsync } from "../mod.ts";
import { lazy, LazyShared } from "../mod.ts";

export class ParallelTracker {
count = 0
Expand Down Expand Up @@ -46,7 +46,7 @@ export class Timer {
}


export async function testBoth<T>(t: Deno.TestContext, data: Iterable<T> | (() => Iterable<T>), innerTest: (iter: Lazy<T>|LazyAsync<T>) => Promise<unknown>) {
export async function testBoth<T>(t: Deno.TestContext, data: Iterable<T> | (() => Iterable<T>), innerTest: (iter: LazyShared<T>) => Promise<unknown>) {
let input: () => Iterable<T>
if (Symbol.iterator in data) {
const inputValues = [...data]
Expand Down
42 changes: 27 additions & 15 deletions tests/main_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,12 @@ Deno.test(async function asyncViaLazyAsync() {
Deno.test(async function asyncLazyParallel() {
let zoomie = range({to: 100})
.toAsync()
.mapPar(1000, async (input) => {
await delay(50)
return input*input
.map({
parallel: 1000,
async mapper(input) {
await delay(50)
return input*input
},
})

let timer = new Timer()
Expand All @@ -117,12 +120,15 @@ Deno.test(async function asyncLazyMaxParallelism() {
let tracker = new ParallelTracker()
let zoomie = range({to: 100})
.toAsync()
.mapPar(maxParallel, async (input) => {
tracker.start()
// trying to control for parallelism for async tasks:
await delay(5)
tracker.end()
return input*input
.map({
parallel: maxParallel,
async mapper(input) {
tracker.start()
// trying to control for parallelism for async tasks:
await delay(5)
tracker.end()
return input*input
},
})

// No iteration yet:
Expand All @@ -148,12 +154,16 @@ Deno.test(async function unorderedParallelism() {
let tracker = new ParallelTracker()
let zoomie = range({to: 10})
.toAsync()
.mapParUnordered(maxParallel, async (input) => {
tracker.start()
// trying to control for parallelism for async tasks:
await delay(Math.random() * 50)
tracker.end()
return input*input
.map({
parallel: maxParallel,
ordered: false,
async mapper (input) {
tracker.start()
// trying to control for parallelism for async tasks:
await delay(Math.random() * 50)
tracker.end()
return input*input
},
})

// No iteration yet:
Expand Down Expand Up @@ -209,6 +219,8 @@ Deno.test(function generatorState() {
Deno.test(async function lazyConsumed(t: Deno.TestContext) {
// deno-lint-ignore require-await
await testBoth(t, range({to: 10}), async (iter) => {


// should work OK:
iter.map(it => it * it)

Expand Down

0 comments on commit 05c7ec4

Please sign in to comment.