Skip to content

Commit 2f7c5ee

Browse files
authored
Merge pull request #114 from natefaubion/kill-fixes
Kill fixes
2 parents 8f560de + 49db6f6 commit 2f7c5ee

File tree

3 files changed

+94
-64
lines changed

3 files changed

+94
-64
lines changed

src/Control/Monad/Aff.js

+65-63
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ var Aff = function () {
4848

4949
// Various constructors used in interpretation
5050
var CONS = "Cons"; // Cons-list, for stacks
51-
var RECOVER = "Recover"; // Continue with error handler
5251
var RESUME = "Resume"; // Continue indiscriminately
5352
var RELEASE = "Release"; // Continue with bracket finalizers
53+
var FINALIZER = "Finalizer"; // A non-interruptible effect
5454
var FINALIZED = "Finalized"; // Marker for finalization
5555
var FORKED = "Forked"; // Reference to a forked fiber, with resumption stack
5656
var FIBER = "Fiber"; // Actual fiber reference
@@ -239,7 +239,9 @@ var Aff = function () {
239239
var bhead = null;
240240
var btail = null;
241241

242-
// Stack of attempts and finalizers for error recovery.
242+
// Stack of attempts and finalizers for error recovery. Every `Cons` is also
243+
// tagged with current `interrupt` state. We use this to track which items
244+
// should be ignored or evaluated as a result of a kill.
243245
var attempts = null;
244246

245247
// A special state is needed for Bracket, because it cannot be killed. When
@@ -335,30 +337,33 @@ var Aff = function () {
335337
return;
336338

337339
case THROW:
338-
bhead = null;
339-
btail = null;
340340
status = RETURN;
341341
fail = util.left(step._1);
342342
step = null;
343343
break;
344344

345-
// Enqueue the current stack of binds and continue
345+
// Enqueue the Catch so that we can call the error handler later on
346+
// in case of an exception.
346347
case CATCH:
347-
attempts = new Aff(CONS, new Aff(RECOVER, step._2, bhead, btail), attempts);
348+
if (bhead === null) {
349+
attempts = new Aff(CONS, step, attempts, interrupt);
350+
} else {
351+
attempts = new Aff(CONS, step, new Aff(CONS, new Aff(RESUME, bhead, btail), attempts, interrupt), interrupt);
352+
}
348353
bhead = null;
349354
btail = null;
350355
status = CONTINUE;
351356
step = step._1;
352357
break;
353358

354-
// When we evaluate a Bracket, we also enqueue the instruction so we
355-
// can fullfill it later once we return from the acquisition.
359+
// Enqueue the Bracket so that we can call the appropriate handlers
360+
// after resource acquisition.
356361
case BRACKET:
357362
bracketCount++;
358363
if (bhead === null) {
359-
attempts = new Aff(CONS, step, attempts);
364+
attempts = new Aff(CONS, step, attempts, interrupt);
360365
} else {
361-
attempts = new Aff(CONS, step, new Aff(CONS, new Aff(RESUME, bhead, btail), attempts));
366+
attempts = new Aff(CONS, step, new Aff(CONS, new Aff(RESUME, bhead, btail), attempts, interrupt), interrupt);
362367
}
363368
bhead = null;
364369
btail = null;
@@ -386,40 +391,42 @@ var Aff = function () {
386391
break;
387392

388393
case RETURN:
394+
bhead = null;
395+
btail = null;
389396
// If the current stack has returned, and we have no other stacks to
390397
// resume or finalizers to run, the fiber has halted and we can
391398
// invoke all join callbacks. Otherwise we need to resume.
392399
if (attempts === null) {
393400
status = COMPLETED;
394401
step = interrupt || fail || step;
395402
} else {
403+
// The interrupt status for the enqueued item.
404+
tmp = attempts._3;
396405
attempt = attempts._1;
397406
attempts = attempts._2;
398407

399408
switch (attempt.tag) {
400409
// We cannot recover from an interrupt. Otherwise we should
401410
// continue stepping, or run the exception handler if an exception
402411
// was raised.
403-
case RECOVER:
404-
if (interrupt) {
412+
case CATCH:
413+
// We should compare the interrupt status as well because we
414+
// only want it to apply if there has been an interrupt since
415+
// enqueuing the catch.
416+
if (interrupt && interrupt !== tmp) {
405417
status = RETURN;
406-
} else {
407-
bhead = attempt._2;
408-
btail = attempt._3;
409-
if (fail) {
410-
status = CONTINUE;
411-
step = attempt._1(util.fromLeft(fail));
412-
fail = null;
413-
} else {
414-
status = STEP_BIND;
415-
step = util.fromRight(step);
416-
}
418+
} else if (fail) {
419+
status = CONTINUE;
420+
step = attempt._2(util.fromLeft(fail));
421+
fail = null;
417422
}
418423
break;
419424

420425
// We cannot resume from an interrupt or exception.
421426
case RESUME:
422-
if (interrupt || fail) {
427+
// As with Catch, we only want to ignore in the case of an
428+
// interrupt since enqueing the item.
429+
if (interrupt && interrupt !== tmp || fail) {
423430
status = RETURN;
424431
} else {
425432
bhead = attempt._1;
@@ -437,42 +444,47 @@ var Aff = function () {
437444
bracketCount--;
438445
if (fail === null) {
439446
result = util.fromRight(step);
440-
attempts = new Aff(CONS, new Aff(RELEASE, attempt._2, result), attempts);
441-
if (interrupt === null || bracketCount > 0) {
447+
// We need to enqueue the Release with the same interrupt
448+
// status as the Bracket that is initiating it.
449+
attempts = new Aff(CONS, new Aff(RELEASE, attempt._2, result), attempts, tmp);
450+
// We should only coninue as long as the interrupt status has not changed or
451+
// we are currently within a non-interruptable finalizer.
452+
if (interrupt === tmp || bracketCount > 0) {
442453
status = CONTINUE;
443454
step = attempt._3(result);
444455
}
445456
}
446457
break;
447458

448459
// Enqueue the appropriate handler. We increase the bracket count
449-
// because it should be cancelled.
460+
// because it should not be cancelled.
450461
case RELEASE:
451462
bracketCount++;
452-
attempts = new Aff(CONS, new Aff(FINALIZED, step), attempts);
463+
attempts = new Aff(CONS, new Aff(FINALIZED, step), attempts, interrupt);
453464
status = CONTINUE;
454-
if (interrupt !== null) {
465+
// It has only been killed if the interrupt status has changed
466+
// since we enqueued the item.
467+
if (interrupt && interrupt !== tmp) {
455468
step = attempt._1.killed(util.fromLeft(interrupt))(attempt._2);
456-
} else if (fail !== null) {
469+
} else if (fail) {
457470
step = attempt._1.failed(util.fromLeft(fail))(attempt._2);
458471
} else {
459472
step = attempt._1.completed(util.fromRight(step))(attempt._2);
460473
}
461474
break;
462475

476+
case FINALIZER:
477+
bracketCount++;
478+
attempts = new Aff(CONS, new Aff(FINALIZED, step), attempts, interrupt);
479+
status = CONTINUE;
480+
step = attempt._1;
481+
break;
482+
463483
case FINALIZED:
464484
bracketCount--;
465485
status = RETURN;
466486
step = attempt._1;
467487
break;
468-
469-
// Otherwise we need to run a finalizer, which cannot be interrupted.
470-
// We insert a FINALIZED marker to know when we can release it.
471-
default:
472-
bracketCount++;
473-
attempts = new Aff(CONS, new Aff(FINALIZED, step), attempts);
474-
status = CONTINUE;
475-
step = attempt;
476488
}
477489
}
478490
break;
@@ -485,9 +497,15 @@ var Aff = function () {
485497
}
486498
}
487499
joins = null;
500+
// If we have an interrupt and a fail, then the thread threw while
501+
// running finalizers. This should always rethrow in a fresh stack.
502+
if (interrupt && fail) {
503+
setTimeout(function () {
504+
throw util.fromLeft(fail);
505+
}, 0);
488506
// If we have an unhandled exception, and no other fiber has joined
489507
// then we need to throw the exception in a fresh stack.
490-
if (util.isLeft(step) && rethrow) {
508+
} else if (util.isLeft(step) && rethrow) {
491509
setTimeout(function () {
492510
// Guard on reathrow because a completely synchronous fiber can
493511
// still have an observer which was added after-the-fact.
@@ -532,12 +550,8 @@ var Aff = function () {
532550

533551
var canceler = onComplete({
534552
rethrow: false,
535-
handler: function (result) {
536-
if (fail) {
537-
return cb(fail);
538-
} else {
539-
return cb(util.right(void 0));
540-
}
553+
handler: function (/* unused */) {
554+
return cb(util.right(void 0));
541555
}
542556
})();
543557

@@ -554,10 +568,8 @@ var Aff = function () {
554568
}
555569
if (bracketCount === 0) {
556570
if (status === PENDING) {
557-
attempts = new Aff(CONS, step(error), attempts);
571+
attempts = new Aff(CONS, new Aff(FINALIZER, step(error)), attempts, interrupt);
558572
}
559-
bhead = null;
560-
btail = null;
561573
status = RETURN;
562574
step = null;
563575
fail = null;
@@ -569,8 +581,6 @@ var Aff = function () {
569581
interrupt = util.left(error);
570582
}
571583
if (bracketCount === 0) {
572-
bhead = null;
573-
btail = null;
574584
status = RETURN;
575585
step = null;
576586
fail = null;
@@ -634,7 +644,6 @@ var Aff = function () {
634644
// cancellation fibers.
635645
function kill(error, par, cb) {
636646
var step = par;
637-
var fail = null;
638647
var head = null;
639648
var tail = null;
640649
var count = 0;
@@ -651,11 +660,8 @@ var Aff = function () {
651660
kills[count++] = tmp.kill(error, function (result) {
652661
return function () {
653662
count--;
654-
if (fail === null && util.isLeft(result)) {
655-
fail = result;
656-
}
657663
if (count === 0) {
658-
cb(fail || util.right(void 0))();
664+
cb(result)();
659665
}
660666
};
661667
});
@@ -688,7 +694,7 @@ var Aff = function () {
688694
}
689695

690696
if (count === 0) {
691-
cb(fail || util.right(void 0))();
697+
cb(util.right(void 0))();
692698
} else {
693699
// Run the cancelation effects. We alias `count` because it's mutable.
694700
kid = 0;
@@ -795,19 +801,15 @@ var Aff = function () {
795801
kid = killId++;
796802
// Once a side has resolved, we need to cancel the side that is still
797803
// pending before we can continue.
798-
kills[kid] = kill(early, step === lhs ? head._2 : head._1, function (killResult) {
804+
kills[kid] = kill(early, step === lhs ? head._2 : head._1, function (/* unused */) {
799805
return function () {
800806
delete kills[kid];
801-
if (util.isLeft(killResult)) {
802-
fail = killResult;
803-
step = null;
804-
}
805807
if (tmp) {
806808
tmp = false;
807809
} else if (tail === null) {
808-
join(fail || step, null, null);
810+
join(step, null, null);
809811
} else {
810-
join(fail || step, tail._1, tail._2);
812+
join(step, tail._1, tail._2);
811813
}
812814
};
813815
});

src/Control/Monad/Aff.purs

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ module Control.Monad.Aff
1515
, liftEff'
1616
, supervise
1717
, attempt
18+
, apathize
1819
, delay
1920
, never
2021
, finally

test/Test/Main.purs

+28-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import Control.Monad.Eff.Exception (Error, EXCEPTION, throwException, error, mes
1313
import Control.Monad.Eff.Ref (REF, Ref)
1414
import Control.Monad.Eff.Ref as Ref
1515
import Control.Monad.Eff.Ref.Unsafe (unsafeRunRef)
16-
import Control.Monad.Error.Class (throwError)
16+
import Control.Monad.Error.Class (throwError, catchError)
1717
import Control.Parallel (parallel, sequential, parTraverse_)
1818
import Data.Array as Array
1919
import Data.Bifunctor (lmap)
@@ -367,6 +367,31 @@ test_kill_supervise = assert "kill/supervise" do
367367
delay (Milliseconds 20.0)
368368
eq "acquirefooacquirebarkillfookillbar" <$> readRef ref
369369

370+
test_kill_finalizer_catch eff. TestAff eff Unit
371+
test_kill_finalizer_catch = assert "kill/finalizer/catch" do
372+
ref ← newRef ""
373+
fiber ← forkAff $ bracket
374+
(delay (Milliseconds 10.0))
375+
(\_ → throwError (error "Finalizer") `catchError` \_ → writeRef ref "caught")
376+
(\_ → pure unit)
377+
killFiber (error "Nope") fiber
378+
eq "caught" <$> readRef ref
379+
380+
test_kill_finalizer_bracket eff. TestAff eff Unit
381+
test_kill_finalizer_bracket = assert "kill/finalizer/bracket" do
382+
ref ← newRef ""
383+
fiber ← forkAff $ bracket
384+
(delay (Milliseconds 10.0))
385+
(\_ → generalBracket (pure unit)
386+
{ killed: \_ _ → writeRef ref "killed"
387+
, failed: \_ _ → writeRef ref "failed"
388+
, completed: \_ _ → writeRef ref "completed"
389+
}
390+
(\_ → pure unit))
391+
(\_ → pure unit)
392+
killFiber (error "Nope") fiber
393+
eq "completed" <$> readRef ref
394+
370395
test_parallel eff. TestAff eff Unit
371396
test_parallel = assert "parallel" do
372397
ref ← newRef ""
@@ -574,6 +599,8 @@ main = do
574599
test_kill_bracket
575600
test_kill_bracket_nested
576601
test_kill_supervise
602+
test_kill_finalizer_catch
603+
test_kill_finalizer_bracket
577604
test_parallel
578605
test_kill_parallel
579606
test_parallel_alt

0 commit comments

Comments
 (0)