Skip to content

Commit

Permalink
Fix ParAff Alt error behavior (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
natefaubion authored Aug 20, 2017
1 parent b79371c commit 8f560de
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 49 deletions.
120 changes: 71 additions & 49 deletions src/Control/Monad/Aff.js
Original file line number Diff line number Diff line change
Expand Up @@ -646,18 +646,20 @@ var Aff = function () {

switch (step.tag) {
case FORKED:
tmp = fibers[step._1];
kills[count++] = tmp.kill(error, function (result) {
return function () {
count--;
if (fail === null && util.isLeft(result)) {
fail = result;
}
if (count === 0) {
cb(fail || util.right(void 0))();
}
};
});
if (step._3 === EMPTY) {
tmp = fibers[step._1];
kills[count++] = tmp.kill(error, function (result) {
return function () {
count--;
if (fail === null && util.isLeft(result)) {
fail = result;
}
if (count === 0) {
cb(fail || util.right(void 0))();
}
};
});
}
// Terminal case.
if (head === null) {
break loop;
Expand Down Expand Up @@ -685,11 +687,15 @@ var Aff = function () {
}
}

// Run the cancelation effects. We alias `count` because it's mutable.
kid = 0;
tmp = count;
for (; kid < tmp; kid++) {
kills[kid] = kills[kid]();
if (count === 0) {
cb(fail || util.right(void 0))();
} else {
// Run the cancelation effects. We alias `count` because it's mutable.
kid = 0;
tmp = count;
for (; kid < tmp; kid++) {
kills[kid] = kills[kid]();
}
}

return kills;
Expand Down Expand Up @@ -753,48 +759,64 @@ var Aff = function () {
// the first error.
if (util.isLeft(lhs)) {
if (util.isLeft(rhs)) {
if (step === lhs) {
step = rhs;
if (fail === lhs) {
fail = rhs;
}
} else {
step = lhs;
fail = lhs;
}
step = null;
head._3 = fail;
} else if (util.isLeft(rhs)) {
step = rhs;
step = null;
fail = rhs;
head._3 = fail;
} else {
head._3 = util.right(util.fromRight(lhs)(util.fromRight(rhs)));
step = head._3;
step = util.right(util.fromRight(lhs)(util.fromRight(rhs)));
head._3 = step;
}
break;
case ALT:
lhs = head._1._3;
rhs = head._2._3;
head._3 = step;
tmp = true;
kid = killId++;
// Once a side has resolved, we need to cancel the side that is still
// pending before we can continue.
kills[kid] = kill(early, step === lhs ? head._2 : head._1, function (killResult) {
return function () {
delete kills[kid];
if (util.isLeft(killResult)) {
fail = killResult;
step = null;
}
if (tmp) {
tmp = false;
} else if (tail === null) {
join(fail || step, null, null);
} else {
join(fail || step, tail._1, tail._2);
}
};
});

if (tmp) {
tmp = false;
lhs = head._1._3;
rhs = head._2._3;
// We can only proceed if both have resolved or we have a success
if (lhs === EMPTY && util.isLeft(rhs) || rhs === EMPTY && util.isLeft(lhs)) {
return;
}
// If both sides resolve with an error, we should continue with the
// first error
if (lhs !== EMPTY && util.isLeft(lhs) && rhs !== EMPTY && util.isLeft(rhs)) {
fail = step === lhs ? rhs : lhs;
step = null;
head._3 = fail;
} else {
head._3 = step;
tmp = true;
kid = killId++;
// Once a side has resolved, we need to cancel the side that is still
// pending before we can continue.
kills[kid] = kill(early, step === lhs ? head._2 : head._1, function (killResult) {
return function () {
delete kills[kid];
if (util.isLeft(killResult)) {
fail = killResult;
step = null;
}
if (tmp) {
tmp = false;
} else if (tail === null) {
join(fail || step, null, null);
} else {
join(fail || step, tail._1, tail._2);
}
};
});

if (tmp) {
tmp = false;
return;
}
}
break;
}

Expand Down
9 changes: 9 additions & 0 deletions test/Test/Main.purs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,14 @@ test_parallel_alt = assert "parallel/alt" do
r2 ← joinFiber f1
pure (r1 == "bar" && r2 == "bar")

test_parallel_alt_throw eff. TestAff eff Unit
test_parallel_alt_throw = assert "parallel/alt/throw" do
r1 ← sequential $
parallel (delay (Milliseconds 10.0) *> throwError (error "Nope."))
<|> parallel (delay (Milliseconds 11.0) $> "foo")
<|> parallel (delay (Milliseconds 12.0) $> "bar")
pure (r1 == "foo")

test_parallel_alt_sync eff. TestAff eff Unit
test_parallel_alt_sync = assert "parallel/alt/sync" do
ref ← newRef ""
Expand Down Expand Up @@ -569,6 +577,7 @@ main = do
test_parallel
test_kill_parallel
test_parallel_alt
test_parallel_alt_throw
test_parallel_alt_sync
test_parallel_mixed
test_kill_parallel_alt
Expand Down

0 comments on commit 8f560de

Please sign in to comment.