Skip to content

Commit

Permalink
concurrency fix (#28)
Browse files Browse the repository at this point in the history
  • Loading branch information
tlaziuk authored Feb 19, 2018
1 parent 604ce1d commit 10b955d
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 16 deletions.
6 changes: 2 additions & 4 deletions delay.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
/**
* delay the execution by given amount of time
* resolve the promise after given amount of time
*
* @param delay time in milliseconds
* @param val optional value to return
*/
export default <T = void>(delay: number, val?: T | PromiseLike<T>) => new Promise<T>(
(resolve) => {
setTimeout(() => {
resolve(val);
}, delay);
setTimeout(resolve, delay, val);
},
);
142 changes: 140 additions & 2 deletions index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ describe(ASAP.name, () => {
const asap = new ASAP();
expect(asap.c).to.be.equal(1);
});
it("should concurrency be working", () => {
it("should assigning and getting concurrency be working", () => {
const asap = new ASAP();
expect(() => { asap.c = 1; }).to.not.throw();
expect(() => { asap.c = 0; }).to.not.throw();
Expand Down Expand Up @@ -156,7 +156,7 @@ describe(ASAP.name, () => {
expect(spyFn3.calledBefore(spyFn5)).to.be.equal(true, "task 3 should run before task 5");
expect(spyFn4.calledBefore(spyFn5)).to.be.equal(true, "task 4 should run before task 5");
});
it("should the queue run slow tasks with unmatching concurrency to the tasks number", async () => {
it("should the queue run slow tasks", async () => {
const asap = new ASAP();
asap.c = 2;
await Promise.all([
Expand Down Expand Up @@ -283,4 +283,142 @@ describe(ASAP.name, () => {
await delay(20);
expect(spyFn2.callCount).to.be.equal(0, "task 2 called");
}).timeout(100);
it("should the queue be consumed properly", async () => {
const asap = new ASAP(false);
const fn = () => delay(5);
const spyFn1 = spy(fn);
const spyFn2 = spy(fn);
const spyFn3 = spy(fn);
const spyFn4 = spy(fn);
const prom = Promise.all([
asap.q(spyFn1),
asap.q(spyFn2),
asap.q(spyFn3),
asap.q(spyFn4),
]);
asap.c = 1;
await prom;
expect(spyFn2.calledAfter(spyFn1)).to.be.equal(true);
expect(spyFn3.calledAfter(spyFn2)).to.be.equal(true);
expect(spyFn4.calledAfter(spyFn3)).to.be.equal(true);
}).timeout(100);
it("should the queue be consumed properly - concurrency of 2", async () => {
const asap = new ASAP(false);
const spyFn1 = spy(() => delay(40));
const spyFn2 = spy(() => delay(35));
const spyFn3 = spy(() => delay(30));
const spyFn4 = spy(() => delay(25));
const spyFn5 = spy(() => delay(20));
const spyFn6 = spy(() => delay(15));
const spyFn7 = spy(() => delay(10));
const spyFn8 = spy(() => delay(5));
const spyFn9 = spy(() => delay(5));
/**
* 1=======4====5===89
* 2======3=====6==7=
*/
const prom = Promise.all([
asap.q(spyFn1).then(() => {
expect(spyFn2.callCount).to.be.equal(1, "#2 not called before #1 finished");
expect(spyFn3.callCount).to.be.equal(1, "#3 not called before #1 finished");
expect(spyFn4.callCount).to.be.equal(0, "#4 called before #1 finished");
}),
asap.q(spyFn2).then(() => {
expect(spyFn1.callCount).to.be.equal(1, "#1 not called before #2 finished");
expect(spyFn3.callCount).to.be.equal(0, "#3 called before #2 finished");
expect(spyFn4.callCount).to.be.equal(0, "#4 called before #2 finished");
}),
asap.q(spyFn3).then(() => {
expect(spyFn2.callCount).to.be.equal(1, "#2 not called before #3 finished");
expect(spyFn4.callCount).to.be.equal(1, "#4 not called before #3 finished");
}),
asap.q(spyFn4).then(() => {
expect(spyFn3.callCount).to.be.equal(1, "#3 not called before #4 finished");
}),
asap.q(spyFn5).then(() => {
expect(spyFn7.callCount).to.be.equal(1, "#7 not called before #5 finished");
expect(spyFn8.callCount).to.be.equal(0, "#8 called before #5 finished");
}),
asap.q(spyFn6).then(() => {
expect(spyFn5.callCount).to.be.equal(1, "#5 not called before #6 finished");
expect(spyFn7.callCount).to.be.equal(0, "#7 called before #6 finished");
expect(spyFn8.callCount).to.be.equal(0, "#5 called before #6 finished");
}),
asap.q(spyFn7).then(() => {
expect(spyFn8.callCount).to.be.equal(1, "#8 not called before #7 finished");
}),
asap.q(spyFn8).then(() => {
expect(spyFn7.callCount).to.be.equal(1, "#7 not called before #8 finished");
}),
asap.q(spyFn9),
]);
asap.c = 2;
await prom;
}).timeout(500);
it("should the queue be consumed properly - concurrency of 3", async () => {
const asap = new ASAP(false);
const spyFn1 = spy(() => delay(40));
const spyFn2 = spy(() => delay(5));
const spyFn3 = spy(() => delay(30));
const spyFn4 = spy(() => delay(15));
const spyFn5 = spy(() => delay(20));
const spyFn6 = spy(() => delay(25));
const spyFn7 = spy(() => delay(10));
const spyFn8 = spy(() => delay(35));
const spyFn9 = spy(() => delay(5));
/**
* 1=======7=9
* 24==5===8======
* 3=====6====
*/
const prom = Promise.all([
asap.q(spyFn1).then(() => {
expect(spyFn2.callCount).to.be.equal(1, "#2 not called before #1 finished");
expect(spyFn3.callCount).to.be.equal(1, "#3 not called before #1 finished");
expect(spyFn4.callCount).to.be.equal(1, "#4 not called before #1 finished");
expect(spyFn5.callCount).to.be.equal(1, "#5 not called before #1 finished");
expect(spyFn6.callCount).to.be.equal(1, "#6 not called before #1 finished");
expect(spyFn7.callCount).to.be.equal(0, "#7 called before #1 finished");
expect(spyFn8.callCount).to.be.equal(0, "#8 called before #1 finished");
expect(spyFn9.callCount).to.be.equal(0, "#9 called before #1 finished");
}),
asap.q(spyFn2).then(() => {
expect(spyFn1.callCount).to.be.equal(1, "#1 not called before #2 finished");
expect(spyFn2.callCount).to.be.equal(1, "#2 not called before #2 finished");
expect(spyFn3.callCount).to.be.equal(1, "#3 not called before #2 finished");
expect(spyFn4.callCount).to.be.equal(0, "#4 called before #2 finished");
}),
asap.q(spyFn3).then(() => {
expect(spyFn5.callCount).to.be.equal(1, "#5 not called before #3 finished");
expect(spyFn8.callCount).to.be.equal(0, "#8 called before #3 finished");
}),
asap.q(spyFn4).then(() => {
expect(spyFn1.callCount).to.be.equal(1, "#1 not called before #4 finished");
expect(spyFn2.callCount).to.be.equal(1, "#2 not called before #4 finished");
expect(spyFn3.callCount).to.be.equal(1, "#3 not called before #4 finished");
expect(spyFn5.callCount).to.be.equal(0, "#5 called before #4 finished");
expect(spyFn6.callCount).to.be.equal(0, "#6 called before #4 finished");
expect(spyFn7.callCount).to.be.equal(0, "#7 called before #4 finished");
}),
asap.q(spyFn5).then(() => {
expect(spyFn6.callCount).to.be.equal(1, "#6 not called before #5 finished");
expect(spyFn9.callCount).to.be.equal(0, "#9 called before #5 finished");
}),
asap.q(spyFn6).then(() => {
expect(spyFn7.callCount).to.be.equal(1, "#7 not called before #6 finished");
expect(spyFn8.callCount).to.be.equal(1, "#8 not called before #6 finished");
expect(spyFn9.callCount).to.be.equal(1, "#9 not called before #6 finished");
}),
asap.q(spyFn7).then(() => {
expect(spyFn8.callCount).to.be.equal(1, "#8 not called before #7 finished");
expect(spyFn9.callCount).to.be.equal(0, "#9 called before #7 finished");
}),
asap.q(spyFn8).then(() => {
expect(spyFn9.callCount).to.be.equal(1, "#9 not called before #8 finished");
}),
asap.q(spyFn9),
]);
asap.c = 3;
await prom;
}).timeout(500);
});
5 changes: 3 additions & 2 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ function ASAP(this: any, c: boolean | number = 1): any {
* process the queue
*/
const process = (): void => {
if (pending.size < concurrency) {
const { size } = pending;
if (size < concurrency) {
heap.filter(
// filter the heap to get only not completed nor pending (running) tasks
([v]) => !complete.has(v) && !pending.has(v),
Expand All @@ -73,7 +74,7 @@ function ASAP(this: any, c: boolean | number = 1): any {
([, a], [, b]) => a - b,
).slice(
0,
concurrency, // slice the array to the size of concurrency value
concurrency - size, // slice the array to the size of left concurrency value
).forEach(([v]) => {
// mark the promise function as pending
pending.add(v);
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"access": "public",
"name": "asap-es",
"version": "1.3.0",
"version": "1.3.1",
"description": "a queue runner with priorities, concurrency and promises",
"main": "index",
"private": false,
Expand Down
17 changes: 10 additions & 7 deletions timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,23 @@ import { task } from ".";
* @param reason timeout rejection reason
*/
export default <T = any>(
timeout: number,
timeout: number | PromiseLike<number>,
fn: task<T> | PromiseLike<task<T>>,
reason?: string,
): (() => Promise<T>) => () => Promise.resolve(fn).then(
reason?: string | PromiseLike<string>,
): (() => Promise<T>) => () => Promise.all([timeout, fn, reason as string | undefined]).then(
// run the logic when task will be ready
(taskFn) => new Promise<T>(
([timeoutResolved, taskResolved, reasonResolved]) => new Promise<T>(
(resolve, reject) => {
Promise.resolve(
// task is ready, yet it may return a promise
taskFn(),
).then(resolve, reject);
taskResolved(),
).then(
resolve,
reject,
);
setTimeout(() => {
// reject when the timeout is reached
reject(new Error(reason));
reject(new Error(reasonResolved));
}, timeout);
},
),
Expand Down

0 comments on commit 10b955d

Please sign in to comment.