From 7317bfba0fe6278df6fc05f8dbb74d18d2bbd18d Mon Sep 17 00:00:00 2001 From: Nadeshiko Kagamihara Date: Tue, 5 Sep 2023 21:07:50 -0700 Subject: [PATCH 1/5] Add necessary capabilities for reactors to create other reactors and make connections 0. Added _addChild and _addSibling necessary for mutation APIs 1. Solved issues related to hierarchy and ownership. --- src/core/reactor.ts | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/core/reactor.ts b/src/core/reactor.ts index 806bf2af7..0f200f21d 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -153,6 +153,11 @@ export abstract class Reactor extends Component { */ private readonly _keyChain = new Map(); + // This is the keychain for creation, i.e. if Reactor R's mutation created reactor B, + // then R is B's creator, even if they are siblings. R should have access to B, + // at least semantically......? + private readonly _creatorKeyChain = new Map(); + /** * This graph has in it all the dependencies implied by this container's * ports, reactions, and connections. @@ -387,6 +392,9 @@ export abstract class Reactor extends Component { return owner._getKey(component, this._keyChain.get(owner)); } } + return component + .getContainer() + ._getKey(component, this._creatorKeyChain.get(component.getContainer())); } /** @@ -1555,6 +1563,36 @@ export abstract class Reactor extends Component { toString(): string { return this._getFullyQualifiedName(); } + + protected _addChild( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + const newReactor = new constructor(this, ...args); + return newReactor; + } + + protected _addSibling( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + if (this._getContainer() == null) { + throw new Error( + `Reactor ${this} does not have a parent. Sibling is not well-defined.` + ); + } + if (this._getContainer() === this) { + throw new Error( + `Reactor ${this} is self-contained. Adding sibling creates logical issue.` + ); + } + const newReactor = this._getContainer()._addChild( + constructor, + ...args + ); + this._creatorKeyChain.set(newReactor, newReactor._key); + return newReactor; + } } /* From 5b53bf78ad5529413cceacca941fe9be33196432 Mon Sep 17 00:00:00 2001 From: Nadeshiko Kagamihara Date: Tue, 5 Sep 2023 21:26:24 -0700 Subject: [PATCH 2/5] Mutation Sandbox API --- src/core/reactor.ts | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/src/core/reactor.ts b/src/core/reactor.ts index 0f200f21d..8fa8844b6 100644 --- a/src/core/reactor.ts +++ b/src/core/reactor.ts @@ -475,6 +475,20 @@ export abstract class Reactor extends Component { public delete(reactor: Reactor): void { reactor._delete(); } + + public addChild( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + return this.reactor._addChild(constructor, ...args); + } + + public addSibling( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ): R { + return this.reactor._addSibling(constructor, ...args); + } }; /** @@ -1586,10 +1600,7 @@ export abstract class Reactor extends Component { `Reactor ${this} is self-contained. Adding sibling creates logical issue.` ); } - const newReactor = this._getContainer()._addChild( - constructor, - ...args - ); + const newReactor = this._getContainer()._addChild(constructor, ...args); this._creatorKeyChain.set(newReactor, newReactor._key); return newReactor; } @@ -1833,6 +1844,16 @@ export interface MutationSandbox extends ReactionSandbox { getReactor: () => Reactor; // Container + addChild: ( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ) => R; + + addSibling: ( + constructor: new (container: Reactor, ...args: G) => R, + ...args: G + ) => R; + // FIXME: // forkJoin(constructor: new () => Reactor, ): void; } From 09ee75245d8c4ec1178c718a0674c839ccf1b0fe Mon Sep 17 00:00:00 2001 From: Nadeshiko Kagamihara Date: Wed, 6 Sep 2023 18:13:54 -0700 Subject: [PATCH 3/5] Quicksort Reactor implementation with the ability to modify --- src/benchmark/quicksort.ts | 242 +++++++++++++++++++++++++++++++++++++ 1 file changed, 242 insertions(+) create mode 100644 src/benchmark/quicksort.ts diff --git a/src/benchmark/quicksort.ts b/src/benchmark/quicksort.ts new file mode 100644 index 000000000..279a634f5 --- /dev/null +++ b/src/benchmark/quicksort.ts @@ -0,0 +1,242 @@ +import { + type WritablePort, + Parameter, + InPort, + OutPort, + State, + Action, + Reactor, + App, + TimeValue, + Origin, + Log, + PrecedenceGraph +} from "../core/internal"; + +// This is the thrshold for the quicksort algorithm, feeding sorters below this number will use Array.prototype.sort() +// toSorted() is more ideal but I think there is some compatibility issue. +const T = 8; + +const arr = [ + 578, 530, 482, 105, 400, 787, 563, 613, 483, 888, 439, 928, 857, 404, 949, + 736, 68, 761, 951, 432, 799, 212, 108, 937, 562, 616, 436, 358, 221, 315, 423, + 539, 215, 795, 409, 227, 715, 847, 66, 242, 168, 637, 572, 468, 116, 668, 213, + 859, 880, 291, 609, 502, 486, 710, 662, 172, 991, 631, 120, 905, 751, 293, + 411, 503, 901, 53, 774, 145, 831, 140, 592, 184, 228, 111, 907, 640, 553, 519, + 579, 389, 735, 545, 975, 255, 83, 449, 673, 427, 369, 854, 86, 33, 885, 940, + 904, 764, 834, 250, 183, 191 +]; + +Log.global.level = Log.levels.INFO; + +// Change the boolean value to toggle between the "matryoshka" version or "flat" version. +const useHierarchy = true; + +class QuickSorter extends Reactor { + parentReadPort: InPort; + parentWritePort: OutPort; + leftWritePort: OutPort; + rightWritePort: OutPort; + leftReadPort: InPort; + rightReadPort: InPort; + + resultArr: State; + numFragments: State; + + leftReactor: Reactor | undefined; + rightReactor: Reactor | undefined; + + constructor(parent: Reactor, name = "root") { + super(parent, name); + this.parentReadPort = new InPort(this); + this.parentWritePort = new OutPort(this); + this.leftWritePort = new OutPort(this); + this.rightWritePort = new OutPort(this); + this.leftReadPort = new InPort(this); + this.rightReadPort = new InPort(this); + this.resultArr = new State([]); + this.numFragments = new State(0); + + // When the parent sends a message, we send it to children. + this.addMutation( + [this.parentReadPort], + [ + this.parentReadPort, + this.writable(this.parentWritePort), + this.leftWritePort, + this.rightWritePort, + this.leftReadPort, + this.rightReadPort, + this.resultArr, + this.numFragments + ], + function ( + this, + parentReadPort, + parentWritePort, + leftWritePort, + rightWritePort, + leftread, + rightread, + resultArr, + numFragments + ) { + const hierarchyImplementation = ( + useHierarchy + ? this.getReactor()._uncheckedAddChild + : this.getReactor()._uncheckedAddSibling + ).bind(this.getReactor()); + + const fullarr = parentReadPort.get(); + if (fullarr == null) { + throw Error("Received null from port"); + } + if (fullarr.length < T) { + const sorted = [...fullarr].sort((a, b) => a - b); + parentWritePort.set(sorted); + return; + } + const pivot = fullarr[0]; + const leftToSort = fullarr.filter((val) => val < pivot); + const righttoSort = fullarr.filter((val) => val > pivot); + const pivots = fullarr.filter((val) => val === pivot); + + resultArr.set(pivots); + numFragments.set(numFragments.get() + 1); + + console.log( + `I received a request! ${fullarr}! Pivot is ${pivot}, so I divided it into ${leftToSort} and ${righttoSort}` + ); + + // First, create 2 new reactors + const leftReactor = hierarchyImplementation( + QuickSorter, + `${this.getReactor()._name}/l` + ); + const rightReactor = hierarchyImplementation( + QuickSorter, + `${this.getReactor()._name}/r` + ); + + // Connect ports accoringly + this.connect(leftWritePort, leftReactor.parentReadPort); + this.connect(rightWritePort, rightReactor.parentReadPort); + + this.connect(leftReactor.parentWritePort, leftread); + this.connect(rightReactor.parentWritePort, rightread); + + this.getReactor().writable(leftWritePort).set(leftToSort); + this.getReactor().writable(rightWritePort).set(righttoSort); + } + ); + + this.addReaction( + [this.leftReadPort], + [ + this.leftReadPort, + this.resultArr, + this.numFragments, + this.writable(this.parentWritePort) + ], + function (this, leftreadport, resultArr, numFragments, parentWrite) { + const leftResult = leftreadport.get(); + const myResult = resultArr.get(); + if (leftResult == null) { + throw Error("Left return null"); + } + if (myResult.length === 0) { + throw Error( + "Result length is 0, but should contain at least the pivots." + ); + } + + console.log(`I received a result from my left! ${leftResult}!`); + resultArr.set([...leftResult, ...myResult]); + + numFragments.set(numFragments.get() + 1); + if (numFragments.get() === 3) { + parentWrite.set(resultArr.get()); + } + } + ); + + this.addReaction( + [this.rightReadPort], + [ + this.rightReadPort, + this.resultArr, + this.numFragments, + this.writable(this.parentWritePort) + ], + function (this, rightreadport, resultArr, numFragments, parentWrite) { + const rightResult = rightreadport.get(); + const myResult = resultArr.get(); + if (rightResult == null) { + throw Error("Right return null"); + } + if (myResult.length === 0) { + throw Error( + "Result length is 0, but should contain at least the pivots." + ); + } + + console.log(`I received a result from my right! ${rightResult}!`); + resultArr.set([...myResult, ...rightResult]); + + numFragments.set(numFragments.get() + 1); + if (numFragments.get() === 3) { + parentWrite.set(resultArr.get()); + } + } + ); + } +} + +class Supplier extends Reactor { + rootWritePort: OutPort; + rootReadPort: InPort; + + constructor(parent: Reactor, arr: number[], name = "Innocent Supplier") { + super(parent, name); + this.rootWritePort = new OutPort(this); + this.rootReadPort = new InPort(this); + this.addReaction( + [this.startup], + [this.writable(this.rootWritePort)], + function (this, rootwrite) { + rootwrite.set(arr); + } + ); + + this.addReaction( + [this.rootReadPort], + [this.rootReadPort], + function (this, rootReadPort) { + console.log(`I received final result: ${rootReadPort.get() ?? "null"}`); + } + ); + } +} + +class Arbiter extends App { + rootSorter: QuickSorter; + supplier: Supplier; + + constructor( + name: string, + timeout: TimeValue | undefined = undefined, + keepAlive = false, + fast = false, + success?: () => void, + fail?: () => void + ) { + super(timeout, keepAlive, fast, success, fail, name); + this.rootSorter = new QuickSorter(this, "root"); + this.supplier = new Supplier(this, arr); + this._connect(this.supplier.rootWritePort, this.rootSorter.parentReadPort); + this._connect(this.rootSorter.parentWritePort, this.supplier.rootReadPort); + } +} + +const arb = new Arbiter("arbiter"); +arb._start(); From c21b29b2d2a3d5646d880e55e3cb0ad4ba8c4172 Mon Sep 17 00:00:00 2001 From: Nadeshiko Kagamihara Date: Wed, 6 Sep 2023 18:16:53 -0700 Subject: [PATCH 4/5] Implemented quicksort with the ability to change topology structure --- src/benchmark/quicksort.ts | 35 +++++++++++------------------------ 1 file changed, 11 insertions(+), 24 deletions(-) diff --git a/src/benchmark/quicksort.ts b/src/benchmark/quicksort.ts index 279a634f5..ca21e2309 100644 --- a/src/benchmark/quicksort.ts +++ b/src/benchmark/quicksort.ts @@ -1,16 +1,11 @@ import { - type WritablePort, - Parameter, InPort, OutPort, State, - Action, Reactor, App, - TimeValue, - Origin, - Log, - PrecedenceGraph + type TimeValue, + Log } from "../core/internal"; // This is the thrshold for the quicksort algorithm, feeding sorters below this number will use Array.prototype.sort() @@ -46,8 +41,8 @@ class QuickSorter extends Reactor { leftReactor: Reactor | undefined; rightReactor: Reactor | undefined; - constructor(parent: Reactor, name = "root") { - super(parent, name); + constructor(parent: Reactor) { + super(parent); this.parentReadPort = new InPort(this); this.parentWritePort = new OutPort(this); this.leftWritePort = new OutPort(this); @@ -82,10 +77,8 @@ class QuickSorter extends Reactor { numFragments ) { const hierarchyImplementation = ( - useHierarchy - ? this.getReactor()._uncheckedAddChild - : this.getReactor()._uncheckedAddSibling - ).bind(this.getReactor()); + useHierarchy ? this.addChild : this.addSibling + ).bind(this); const fullarr = parentReadPort.get(); if (fullarr == null) { @@ -109,14 +102,8 @@ class QuickSorter extends Reactor { ); // First, create 2 new reactors - const leftReactor = hierarchyImplementation( - QuickSorter, - `${this.getReactor()._name}/l` - ); - const rightReactor = hierarchyImplementation( - QuickSorter, - `${this.getReactor()._name}/r` - ); + const leftReactor = hierarchyImplementation(QuickSorter); + const rightReactor = hierarchyImplementation(QuickSorter); // Connect ports accoringly this.connect(leftWritePort, leftReactor.parentReadPort); @@ -197,7 +184,7 @@ class Supplier extends Reactor { rootReadPort: InPort; constructor(parent: Reactor, arr: number[], name = "Innocent Supplier") { - super(parent, name); + super(parent); this.rootWritePort = new OutPort(this); this.rootReadPort = new InPort(this); this.addReaction( @@ -230,8 +217,8 @@ class Arbiter extends App { success?: () => void, fail?: () => void ) { - super(timeout, keepAlive, fast, success, fail, name); - this.rootSorter = new QuickSorter(this, "root"); + super(timeout, keepAlive, fast, success, fail); + this.rootSorter = new QuickSorter(this); this.supplier = new Supplier(this, arr); this._connect(this.supplier.rootWritePort, this.rootSorter.parentReadPort); this._connect(this.rootSorter.parentWritePort, this.supplier.rootReadPort); From 10761ebfda466809899ab60f83ec19f198c990f1 Mon Sep 17 00:00:00 2001 From: Nadeshiko Kagamihara Date: Wed, 6 Sep 2023 18:21:52 -0700 Subject: [PATCH 5/5] Store children in a state, and destroy finished reactors --- src/benchmark/quicksort.ts | 62 ++++++++++++++++++++++++++++++++------ 1 file changed, 52 insertions(+), 10 deletions(-) diff --git a/src/benchmark/quicksort.ts b/src/benchmark/quicksort.ts index ca21e2309..0d041bebf 100644 --- a/src/benchmark/quicksort.ts +++ b/src/benchmark/quicksort.ts @@ -38,8 +38,8 @@ class QuickSorter extends Reactor { resultArr: State; numFragments: State; - leftReactor: Reactor | undefined; - rightReactor: Reactor | undefined; + leftReactor: State; + rightReactor: State; constructor(parent: Reactor) { super(parent); @@ -52,6 +52,9 @@ class QuickSorter extends Reactor { this.resultArr = new State([]); this.numFragments = new State(0); + this.leftReactor = new State(undefined); + this.rightReactor = new State(undefined); + // When the parent sends a message, we send it to children. this.addMutation( [this.parentReadPort], @@ -63,7 +66,9 @@ class QuickSorter extends Reactor { this.leftReadPort, this.rightReadPort, this.resultArr, - this.numFragments + this.numFragments, + this.leftReactor, + this.rightReactor ], function ( this, @@ -74,7 +79,9 @@ class QuickSorter extends Reactor { leftread, rightread, resultArr, - numFragments + numFragments, + stateLeftReactor, // This is really cursed, but s_ is to indicate that this is a state + stateRightReactor ) { const hierarchyImplementation = ( useHierarchy ? this.addChild : this.addSibling @@ -105,6 +112,9 @@ class QuickSorter extends Reactor { const leftReactor = hierarchyImplementation(QuickSorter); const rightReactor = hierarchyImplementation(QuickSorter); + stateLeftReactor.set(leftReactor); + stateRightReactor.set(rightReactor); + // Connect ports accoringly this.connect(leftWritePort, leftReactor.parentReadPort); this.connect(rightWritePort, rightReactor.parentReadPort); @@ -117,17 +127,26 @@ class QuickSorter extends Reactor { } ); - this.addReaction( + this.addMutation( [this.leftReadPort], [ this.leftReadPort, this.resultArr, this.numFragments, - this.writable(this.parentWritePort) + this.writable(this.parentWritePort), + this.leftReactor ], - function (this, leftreadport, resultArr, numFragments, parentWrite) { + function ( + this, + leftreadport, + resultArr, + numFragments, + parentWrite, + stateLeftReactor + ) { const leftResult = leftreadport.get(); const myResult = resultArr.get(); + const leftReactor = stateLeftReactor.get(); if (leftResult == null) { throw Error("Left return null"); } @@ -136,10 +155,16 @@ class QuickSorter extends Reactor { "Result length is 0, but should contain at least the pivots." ); } + if (leftReactor == null) { + throw Error("Right reactor is null."); + } console.log(`I received a result from my left! ${leftResult}!`); resultArr.set([...leftResult, ...myResult]); + this.delete(leftReactor); + stateLeftReactor.set(undefined); + numFragments.set(numFragments.get() + 1); if (numFragments.get() === 3) { parentWrite.set(resultArr.get()); @@ -147,17 +172,26 @@ class QuickSorter extends Reactor { } ); - this.addReaction( + this.addMutation( [this.rightReadPort], [ this.rightReadPort, this.resultArr, this.numFragments, - this.writable(this.parentWritePort) + this.writable(this.parentWritePort), + this.rightReactor ], - function (this, rightreadport, resultArr, numFragments, parentWrite) { + function ( + this, + rightreadport, + resultArr, + numFragments, + parentWrite, + stateRightReactor + ) { const rightResult = rightreadport.get(); const myResult = resultArr.get(); + const rightReactor = stateRightReactor.get(); if (rightResult == null) { throw Error("Right return null"); } @@ -166,10 +200,18 @@ class QuickSorter extends Reactor { "Result length is 0, but should contain at least the pivots." ); } + if (rightReactor == null) { + throw Error("Right reactor is null."); + } console.log(`I received a result from my right! ${rightResult}!`); resultArr.set([...myResult, ...rightResult]); + // Destroy right reactor and the connection + + this.delete(rightReactor); + stateRightReactor.set(undefined); + numFragments.set(numFragments.get() + 1); if (numFragments.get() === 3) { parentWrite.set(resultArr.get());