Skip to content

Commit

Permalink
Merge pull request #673 from orbitjs/fix-653
Browse files Browse the repository at this point in the history
 Ensure that cancelled tasks in queues have promises rejected
  • Loading branch information
dgeb authored Jul 9, 2019
2 parents 4a9cd48 + 2293604 commit 9033e23
Show file tree
Hide file tree
Showing 4 changed files with 416 additions and 18 deletions.
17 changes: 16 additions & 1 deletion packages/@orbit/core/src/task-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export default class TaskProcessor {
}

/**
* Has `process` been invoked and settled?
* Has promise settled, either via `process` or `reject`?
*/
get settled(): boolean {
return this._settled;
Expand All @@ -83,4 +83,19 @@ export default class TaskProcessor {

return this.settle();
}

/**
* Reject the current promise with a specific error.
*
* @param e Error associated with rejection
*/
reject(e: Error): void {
if (this._settled) {
throw new Error(
'TaskProcessor#reject can not be invoked when processing has already settled.'
);
} else {
this._fail(e);
}
}
}
27 changes: 22 additions & 5 deletions packages/@orbit/core/src/task-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,17 @@ export default class TaskQueue implements Evented {
* If `autoProcess` is enabled, this will automatically trigger processing of
* the queue.
*/
skip(): Promise<void> {
skip(e?: Error): Promise<void> {
return this._reified
.then(() => {
this._cancel();
this._tasks.shift();
this._processors.shift();
let processor = this._processors.shift();
if (!processor.settled) {
processor.reject(
e || new Error('Processing cancelled via `TaskQueue#skip`')
);
}
return this._persist();
})
.then(() => this._settle());
Expand All @@ -226,11 +231,18 @@ export default class TaskQueue implements Evented {
/**
* Cancels the current task and completely clears the queue.
*/
clear(): Promise<void> {
clear(e?: Error): Promise<void> {
return this._reified
.then(() => {
this._cancel();
this._tasks = [];
for (let processor of this._processors) {
if (!processor.settled) {
processor.reject(
e || new Error('Processing cancelled via `TaskQueue#clear`')
);
}
}
this._processors = [];
return this._persist();
})
Expand All @@ -242,14 +254,19 @@ export default class TaskQueue implements Evented {
*
* Returns the canceled and removed task.
*/
shift(): Promise<Task> {
shift(e?: Error): Promise<Task> {
let task: Task;

return this._reified
.then(() => {
this._cancel();
task = this._tasks.shift();
this._processors.shift();
let processor = this._processors.shift();
if (!processor.settled) {
processor.reject(
e || new Error('Processing cancelled via `TaskQueue#shift`')
);
}
return this._persist();
})
.then(() => task);
Expand Down
102 changes: 102 additions & 0 deletions packages/@orbit/core/test/task-processor-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,106 @@ module('TaskProcessor', function() {
assert.ok(!processor.started, 'after reset, task has not started');
assert.ok(!processor.settled, 'after reset, task has not settled');
});

test('#reject rejects the processor promise and marks the processor state as settled', async function(assert) {
assert.expect(5);

const target: Performer = {
async perform(task: Task): Promise<string> {
return ':)';
}
};

let processor = new TaskProcessor(target, {
type: 'doSomething',
data: '1'
});

processor.reject(new Error(':('));

assert.ok(processor.settled, 'processor settled');
assert.ok(!processor.started, 'processor not started');

try {
await processor.settle();
} catch (e) {
assert.equal(e.message, ':(', 'fail - error matches expectation');
}

processor.reset();

assert.ok(!processor.started, 'after reset, task has not started');
assert.ok(!processor.settled, 'after reset, task has not settled');
});

test('#reject can reject processing that has started', async function(assert) {
assert.expect(5);

let processor: TaskProcessor;

const target: Performer = {
async perform(task: Task): Promise<string> {
// reject before processing can be completed successfully
processor.reject(new Error(':('));

return ':)';
}
};

processor = new TaskProcessor(target, {
type: 'doSomething',
data: '1'
});

try {
await processor.process();
} catch (e) {
assert.equal(e.message, ':(', 'fail - error matches expectation');
}

assert.ok(processor.started, 'processor started');
assert.ok(processor.settled, 'processor settled');

processor.reset();

assert.ok(!processor.started, 'after reset, task has not started');
assert.ok(!processor.settled, 'after reset, task has not settled');
});

test('#reject will fail when processing has already settled', async function(assert) {
assert.expect(5);

let processor: TaskProcessor;

const target: Performer = {
async perform(task: Task): Promise<string> {
return ':)';
}
};

processor = new TaskProcessor(target, {
type: 'doSomething',
data: '1'
});

await processor.process();

try {
processor.reject(new Error(':('));
} catch (e) {
assert.equal(
e.message,
'TaskProcessor#reject can not be invoked when processing has already settled.',
'error matches expectation'
);
}

assert.ok(processor.started, 'processor started');
assert.ok(processor.settled, 'processor settled');

processor.reset();

assert.ok(!processor.started, 'after reset, task has not started');
assert.ok(!processor.settled, 'after reset, task has not settled');
});
});
Loading

0 comments on commit 9033e23

Please sign in to comment.