From ac4b92781f0ff16789a9d034562d9aae5859a272 Mon Sep 17 00:00:00 2001 From: hw Date: Sun, 10 Mar 2024 17:22:00 +0900 Subject: [PATCH] feat: async fork --- src/Lazy/fork.ts | 167 +++++++++++++++--- src/dataStructure/linkedList/linkedList.ts | 40 ++++- .../linkedList/linkedListNode.ts | 14 +- test/Lazy/fork.spec.ts | 141 ++++++++++++++- 4 files changed, 325 insertions(+), 37 deletions(-) diff --git a/src/Lazy/fork.ts b/src/Lazy/fork.ts index be81d504..609a3e1d 100644 --- a/src/Lazy/fork.ts +++ b/src/Lazy/fork.ts @@ -11,47 +11,160 @@ type ReturnForkType | AsyncIterable> = type Value = any; -const forkMap = new WeakMap< - Iterator, - LinkedList> ->(); +type ForkItem = { + queue: LinkedList>; + originNext: () => IteratorResult; +}; + +const forkMap = new WeakMap, ForkItem>(); function sync(iterable: Iterable) { const iterator = iterable[Symbol.iterator](); - let queue = forkMap.get(iterator) as LinkedList>; - if (!queue) { - queue = new LinkedList(); - forkMap.set(iterator, queue); - } - let current: LinkedListNode> | null = queue.getTail(); - let done = false; + const getNext = (forkItem: ForkItem) => { + let current: LinkedListNode> | null = + forkItem.queue.getLastNode(); - return { - [Symbol.iterator]() { - return iterator; - }, + const done = () => { + iterator.next = forkItem.originNext; + + return { + done: true, + value: undefined, + } as const; + }; - next() { - if (done) { - return { - done, - value: undefined, - }; + let isDone = false; + const next = () => { + if (isDone) { + return done(); } const item = current?.getNext(); - if (isNil(item)) { - const node = iterator.next(); - current = queue.insertLast(node); - done = node.done ?? true; + + if (isNil(item) || item === forkItem.queue.getTail()) { + const node = forkItem.originNext(); + + current = forkItem.queue.insertLast(node); + isDone = node.done ?? true; + if (isDone) { + return done(); + } return node; } current = item; return current.getValue(); + }; + + return next; + }; + + let forkItem = forkMap.get(iterator) as ForkItem; + + if (!forkItem) { + const originNext = iterator.next.bind(iterator); + forkItem = { + queue: new LinkedList(), + originNext: originNext, + }; + + iterator.next = getNext(forkItem); + forkMap.set(iterator, forkItem); + } + + const next = getNext(forkItem); + + return { + [Symbol.iterator]() { + return this; + }, + + next: next, + }; +} + +type ForkAsyncItem = { + queue: LinkedList>; + next: (...args: any) => Promise>; +}; + +const forkAsyncMap = new WeakMap, ForkAsyncItem>(); + +function async(iterable: AsyncIterable) { + const iterator = iterable[Symbol.asyncIterator](); + + const getNext = (forkItem: ForkAsyncItem) => { + let current: Promise> | null> = + Promise.resolve(forkItem.queue.getLastNode()); + + const done = () => { + iterator.next = forkItem.next; + + return { + done: true, + value: undefined, + } as const; + }; + + let isDone = false; + const next = async (_concurrent: any) => { + if (isDone) { + return done(); + } + + const itemCurrent = await current; + const item = itemCurrent?.getNext(); + + return new Promise((resolve, reject) => { + if (isNil(item) || item === forkItem.queue.getTail()) { + return forkItem + .next(_concurrent) + .then((node) => { + current = current.then(() => { + return forkItem.queue.insertLast(node); + }); + + isDone = node.done ?? true; + if (isDone) { + return resolve(done()); + } + + return resolve(node); + }) + .catch(reject); + } + + current = current.then(() => { + return item; + }); + + resolve(item.getValue()); + }); + }; + + return next; + }; + + let forkItem = forkAsyncMap.get(iterator) as ForkAsyncItem; + if (!forkItem) { + const originNext = iterator.next.bind(iterator); + forkItem = { + queue: new LinkedList(), + next: originNext, + }; + + iterator.next = getNext(forkItem) as any; + forkAsyncMap.set(iterator, forkItem); + } + + const next = getNext(forkItem); + return { + [Symbol.asyncIterator]() { + return this; }, + next: next, }; } @@ -101,11 +214,11 @@ function fork | AsyncIterable>( iterable: A, ): ReturnForkType { if (isIterable(iterable)) { - return sync(iterable) as ReturnForkType; + return sync(iterable) as any; } if (isAsyncIterable(iterable)) { - throw new TypeError("'fork' asyncIterable isn't supported not yet"); + return async(iterable) as any; } throw new TypeError("'iterable' must be type of Iterable or AsyncIterable"); diff --git a/src/dataStructure/linkedList/linkedList.ts b/src/dataStructure/linkedList/linkedList.ts index e371ff6c..59e96264 100644 --- a/src/dataStructure/linkedList/linkedList.ts +++ b/src/dataStructure/linkedList/linkedList.ts @@ -6,16 +6,29 @@ export class LinkedList { constructor() { this.head = new LinkedListNode(null as unknown as T); - this.tail = this.head; + this.tail = new LinkedListNode(null as unknown as T); + + this.head.setNextNode(this.tail); + this.tail.setPrevNode(this.head); } insertFirst(value: T): LinkedListNode { const node = new LinkedListNode(value); if (this.isEmpty()) { - this.tail = node; + this.tail.setPrevNode(node); this.head.setNextNode(node); + + node.setNextNode(this.tail); + node.setPrevNode(this.head); } else { - node.setNextNode(this.head.getNext()); + const firstNode = this.head.getNext(); + if (!firstNode) { + throw new TypeError("firstNode must be a LinkedListNode"); + } + + node.setPrevNode(this.head); + node.setNextNode(firstNode); + firstNode.setPrevNode(node); this.head.setNextNode(node); } @@ -28,8 +41,17 @@ export class LinkedList { } const node = new LinkedListNode(value); - this.tail?.setNextNode(node); - this.tail = node; + const lastNode = this.tail.getPrev(); + + if (!lastNode) { + throw new TypeError("lastNode must be a LinkedListNode"); + } + + node.setPrevNode(lastNode); + node.setNextNode(this.tail); + lastNode.setNextNode(node); + this.tail.setPrevNode(node); + return node; } @@ -45,13 +67,17 @@ export class LinkedList { return this.tail; } + getLastNode() { + return this.tail.getPrev(); + } + toArray() { const arr = []; let cur = this.head; - while (cur.hasNext()) { + while (cur.getNext() !== this.tail) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion cur = cur.getNext()!; - arr.push(cur); + arr.push(cur.getValue()); } return arr; diff --git a/src/dataStructure/linkedList/linkedListNode.ts b/src/dataStructure/linkedList/linkedListNode.ts index f479aa27..516cca4a 100644 --- a/src/dataStructure/linkedList/linkedListNode.ts +++ b/src/dataStructure/linkedList/linkedListNode.ts @@ -1,18 +1,26 @@ export class LinkedListNode { private value: T; private next: LinkedListNode | null; + private prev: LinkedListNode | null; constructor(value: T) { this.value = value; this.next = null; + this.prev = null; } - setNextNode(node: LinkedListNode | null) { + setNextNode(node: LinkedListNode) { this.next = node; return node; } + setPrevNode(node: LinkedListNode) { + this.prev = node; + + return node; + } + getValue() { return this.value; } @@ -21,6 +29,10 @@ export class LinkedListNode { return this.next; } + getPrev() { + return this.prev; + } + hasNext() { return this.next instanceof LinkedListNode; } diff --git a/test/Lazy/fork.spec.ts b/test/Lazy/fork.spec.ts index 0a7c2ae0..1060a327 100644 --- a/test/Lazy/fork.spec.ts +++ b/test/Lazy/fork.spec.ts @@ -1,4 +1,4 @@ -import { fork, map, pipe } from "../../src/index"; +import { fork, map, pipe, toAsync } from "../../src/index"; describe("fork", function () { describe("sync", function () { @@ -44,8 +44,19 @@ describe("fork", function () { const iter1 = fork(arr); const iter2 = fork(arr); + const iter3 = pipe( + fork(iter1), + map((a) => String(a)), + ); + + expect(arr.next()).toEqual({ value: 11, done: false }); + expect(arr.next()).toEqual({ value: 12, done: false }); + expect(arr.next()).toEqual({ value: 13, done: false }); + expect(arr.next()).toEqual({ value: undefined, done: true }); + expect(iter1.next()).toEqual({ value: 11, done: false }); expect(iter2.next()).toEqual({ value: 11, done: false }); + expect(iter3.next()).toEqual({ value: "11", done: false }); expect(iter1.next()).toEqual({ value: 12, done: false }); expect(iter1.next()).toEqual({ value: 13, done: false }); @@ -56,6 +67,24 @@ describe("fork", function () { expect(iter2.next()).toEqual({ value: undefined, done: true }); }); + it("forked iterator proceeds independently even if there is no data to process from the original.", function () { + const arr = pipe( + [1, 2, 3], + map((a) => a + 10), + ); + + const iter1 = fork(arr); + expect(arr.next()).toEqual({ value: 11, done: false }); + expect(arr.next()).toEqual({ value: 12, done: false }); + expect(arr.next()).toEqual({ value: 13, done: false }); + expect(arr.next()).toEqual({ value: undefined, done: true }); + + expect(iter1.next()).toEqual({ value: 11, done: false }); + expect(iter1.next()).toEqual({ value: 12, done: false }); + expect(iter1.next()).toEqual({ value: 13, done: false }); + expect(iter1.next()).toEqual({ value: undefined, done: true }); + }); + it("should be forked in the middle of iterable progress", function () { const arr = pipe( [1, 2, 3], @@ -65,15 +94,123 @@ describe("fork", function () { const iter1 = fork(arr); const iter2 = fork(iter1); + expect(arr.next()).toEqual({ value: 11, done: false }); expect(iter1.next()).toEqual({ value: 11, done: false }); expect(iter2.next()).toEqual({ value: 11, done: false }); - const iter3 = fork(iter1); + const iter3 = fork(arr); + const iter4 = fork(iter1); + expect(arr.next()).toEqual({ value: 12, done: false }); expect(iter1.next()).toEqual({ value: 12, done: false }); expect(iter1.next()).toEqual({ value: 13, done: false }); expect(iter1.next()).toEqual({ value: undefined, done: true }); expect(iter2.next()).toEqual({ value: 12, done: false }); expect(iter3.next()).toEqual({ value: 12, done: false }); + expect(iter4.next()).toEqual({ value: 12, done: false }); + }); + }); + + describe("async", function () { + it("should be forked iterable(number)", async function () { + const arr = toAsync([1, 2, 3]); + + const iter1 = fork(arr); + const iter2 = fork(arr); + + expect(await arr.next()).toEqual({ value: 1, done: false }); + expect(await iter1.next()).toEqual({ value: 1, done: false }); + expect(await iter1.next()).toEqual({ value: 2, done: false }); + expect(await iter1.next()).toEqual({ value: 3, done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + + expect(await iter2.next()).toEqual({ value: 1, done: false }); + expect(await iter2.next()).toEqual({ value: 2, done: false }); + expect(await iter2.next()).toEqual({ value: 3, done: false }); + expect(await iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it("should be forked iterable(string)", async function () { + const arr = toAsync("abc"); + + const iter1 = fork(arr); + const iter2 = fork(arr); + + expect(await iter1.next()).toEqual({ value: "a", done: false }); + expect(await iter1.next()).toEqual({ value: "b", done: false }); + expect(await iter1.next()).toEqual({ value: "c", done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + + expect(await iter2.next()).toEqual({ value: "a", done: false }); + expect(await iter2.next()).toEqual({ value: "b", done: false }); + expect(await iter2.next()).toEqual({ value: "c", done: false }); + expect(await iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it("should be able to be used as a forked function in the pipeline", async function () { + const arr = pipe( + toAsync([1, 2, 3]), + map((a) => a + 10), + ); + + const iter1 = fork(arr); + const iter2 = fork(arr); + + expect(await arr.next()).toEqual({ value: 11, done: false }); + expect(await arr.next()).toEqual({ value: 12, done: false }); + expect(await arr.next()).toEqual({ value: 13, done: false }); + expect(await arr.next()).toEqual({ value: undefined, done: true }); + + expect(await iter1.next()).toEqual({ value: 11, done: false }); + expect(await iter2.next()).toEqual({ value: 11, done: false }); + + expect(await iter1.next()).toEqual({ value: 12, done: false }); + expect(await iter1.next()).toEqual({ value: 13, done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + + expect(await iter2.next()).toEqual({ value: 12, done: false }); + expect(await iter2.next()).toEqual({ value: 13, done: false }); + expect(await iter2.next()).toEqual({ value: undefined, done: true }); + }); + + it("forked iterator proceeds independently even if there is no data to process from the original", async function () { + const arr = pipe( + toAsync([1, 2, 3]), + map((a) => a + 10), + ); + + const iter1 = fork(arr); + expect(await arr.next()).toEqual({ value: 11, done: false }); + expect(await arr.next()).toEqual({ value: 12, done: false }); + expect(await arr.next()).toEqual({ value: 13, done: false }); + expect(await arr.next()).toEqual({ value: undefined, done: true }); + + expect(await iter1.next()).toEqual({ value: 11, done: false }); + expect(await iter1.next()).toEqual({ value: 12, done: false }); + expect(await iter1.next()).toEqual({ value: 13, done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + }); + + it("should be forked in the middle of iterable progress", async function () { + const arr = pipe( + toAsync([1, 2, 3]), + map((a) => a + 10), + ); + + const iter1 = fork(arr); + const iter2 = fork(iter1); + + expect(await arr.next()).toEqual({ value: 11, done: false }); + expect(await iter1.next()).toEqual({ value: 11, done: false }); + expect(await iter2.next()).toEqual({ value: 11, done: false }); + + const iter3 = fork(arr); + const iter4 = fork(iter1); + expect(await iter1.next()).toEqual({ value: 12, done: false }); + expect(await iter1.next()).toEqual({ value: 13, done: false }); + expect(await iter1.next()).toEqual({ value: undefined, done: true }); + expect(await iter2.next()).toEqual({ value: 12, done: false }); + expect(await iter3.next()).toEqual({ value: 12, done: false }); + expect(await iter4.next()).toEqual({ value: 12, done: false }); }); }); });