Skip to content

Commit 4fb85b4

Browse files
authored
Change stream results to indicate cancellation (#498)
Resolves #490
1 parent acd635e commit 4fb85b4

File tree

3 files changed

+188
-128
lines changed

3 files changed

+188
-128
lines changed

design/mvp/CanonicalABI.md

+44-30
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ values *into* the buffer's memory. Buffers are represented by the following 3
354354
abstract Python classes:
355355
```python
356356
class Buffer:
357-
MAX_LENGTH = 2**30 - 1
357+
MAX_LENGTH = 2**28 - 1
358358
t: ValType
359359
remain: Callable[[], int]
360360

@@ -1056,7 +1056,7 @@ stream.)
10561056
```python
10571057
RevokeBuffer = Callable[[], None]
10581058
OnPartialCopy = Callable[[RevokeBuffer], None]
1059-
OnCopyDone = Callable[[], None]
1059+
OnCopyDone = Callable[[Literal['completed','cancelled']], None]
10601060

10611061
class ReadableStream:
10621062
t: ValType
@@ -1069,7 +1069,8 @@ The key operation is `read` which works as follows:
10691069
* `read` is non-blocking, returning `'blocked'` if it would have blocked.
10701070
* The `On*` callbacks are only called *after* `read` returns `'blocked'`.
10711071
* `OnCopyDone` is called to indicate that the caller has regained ownership of
1072-
the buffer.
1072+
the buffer and whether this was due to the read/write completing or
1073+
being cancelled.
10731074
* `OnPartialCopy` is called to indicate a partial write has been made to the
10741075
buffer, but there may be further writes made in the future, so the caller
10751076
has *not* regained ownership of the buffer.
@@ -1119,21 +1120,21 @@ If set, the `pending_*` fields record the `Buffer` and `On*` callbacks of a
11191120
`read`. Closing the readable or writable end of a stream or cancelling a `read`
11201121
or `write` notifies any pending `read` or `write` via its `OnCopyDone`
11211122
callback, which lets the other side know that ownership of the `Buffer` has
1122-
been returned:
1123+
been returned and why:
11231124
```python
1124-
def reset_and_notify_pending(self):
1125+
def reset_and_notify_pending(self, why):
11251126
pending_on_copy_done = self.pending_on_copy_done
11261127
self.reset_pending()
1127-
pending_on_copy_done()
1128+
pending_on_copy_done(why)
11281129

11291130
def cancel(self):
1130-
self.reset_and_notify_pending()
1131+
self.reset_and_notify_pending('cancelled')
11311132

11321133
def close(self):
11331134
if not self.closed_:
11341135
self.closed_ = True
11351136
if self.pending_buffer:
1136-
self.reset_and_notify_pending()
1137+
self.reset_and_notify_pending('completed')
11371138

11381139
def closed(self):
11391140
return self.closed_
@@ -1179,7 +1180,7 @@ but in the opposite direction. Both are implemented by a single underlying
11791180
if self.pending_buffer.remain() > 0:
11801181
self.pending_on_partial_copy(self.reset_pending)
11811182
else:
1182-
self.reset_and_notify_pending()
1183+
self.reset_and_notify_pending('completed')
11831184
return 'done'
11841185
```
11851186
Currently, there is a trap when both the `read` and `write` come from the same
@@ -1243,10 +1244,10 @@ and closing once a value has been read-from or written-to the given buffer:
12431244
class FutureEnd(StreamEnd):
12441245
def close_after_copy(self, copy_op, inst, buffer, on_copy_done):
12451246
assert(buffer.remain() == 1)
1246-
def on_copy_done_wrapper():
1247+
def on_copy_done_wrapper(why):
12471248
if buffer.remain() == 0:
12481249
self.stream.close()
1249-
on_copy_done()
1250+
on_copy_done(why)
12501251
ret = copy_op(inst, buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper)
12511252
if ret == 'done' and buffer.remain() == 0:
12521253
self.stream.close()
@@ -3521,7 +3522,8 @@ multiple partial copies before having to context-switch back.
35213522
```python
35223523
if opts.sync:
35233524
final_revoke_buffer = None
3524-
def on_partial_copy(revoke_buffer):
3525+
def on_partial_copy(revoke_buffer, why = 'completed'):
3526+
assert(why == 'completed')
35253527
nonlocal final_revoke_buffer
35263528
final_revoke_buffer = revoke_buffer
35273529
if not async_copy.done():
@@ -3532,6 +3534,8 @@ multiple partial copies before having to context-switch back.
35323534
await task.wait_on(async_copy, sync = True)
35333535
final_revoke_buffer()
35343536
```
3537+
(When non-cooperative threads are added, the assertion that synchronous copies
3538+
can only be `completed`, and not `cancelled`, will no longer hold.)
35353539

35363540
In the asynchronous case, the `on_*` callbacks set a pending event on the
35373541
`Waitable` which will be delivered to core wasm when core wasm calls
@@ -3542,36 +3546,46 @@ allowing multiple partial copies to complete in the interim, reducing overall
35423546
context-switching overhead.
35433547
```python
35443548
else:
3545-
def copy_event(revoke_buffer):
3549+
def copy_event(why, revoke_buffer):
35463550
revoke_buffer()
35473551
e.copying = False
3548-
return (event_code, i, pack_copy_result(task, buffer, e))
3552+
return (event_code, i, pack_copy_result(task, e, buffer, why))
35493553
def on_partial_copy(revoke_buffer):
3550-
e.set_event(partial(copy_event, revoke_buffer))
3551-
def on_copy_done():
3552-
e.set_event(partial(copy_event, revoke_buffer = lambda:()))
3554+
e.set_event(partial(copy_event, 'completed', revoke_buffer))
3555+
def on_copy_done(why):
3556+
e.set_event(partial(copy_event, why, revoke_buffer = lambda:()))
35533557
if e.copy(task.inst, buffer, on_partial_copy, on_copy_done) != 'done':
35543558
e.copying = True
35553559
return [BLOCKED]
3556-
return [pack_copy_result(task, buffer, e)]
3560+
return [pack_copy_result(task, e, buffer, 'completed')]
35573561
```
35583562
However the copy completes, the results are reported to the caller via
35593563
`pack_copy_result`:
35603564
```python
3561-
BLOCKED = 0xffff_ffff
3562-
CLOSED = 0x8000_0000
3565+
BLOCKED = 0xffff_ffff
3566+
COMPLETED = 0x0
3567+
CLOSED = 0x1
3568+
CANCELLED = 0x2
35633569

3564-
def pack_copy_result(task, buffer, e):
3565-
if buffer.progress or not e.stream.closed():
3566-
assert(buffer.progress <= Buffer.MAX_LENGTH < BLOCKED)
3567-
assert(not (buffer.progress & CLOSED))
3568-
return buffer.progress
3570+
def pack_copy_result(task, e, buffer, why):
3571+
if e.stream.closed():
3572+
result = CLOSED
3573+
elif why == 'cancelled':
3574+
result = CANCELLED
35693575
else:
3570-
return CLOSED
3571-
```
3572-
The order of tests here indicates that, if some progress was made and then the
3573-
stream was closed, only the progress is reported and the `CLOSED` status is
3574-
left to be discovered next time.
3576+
assert(why == 'completed')
3577+
assert(not isinstance(e, FutureEnd))
3578+
result = COMPLETED
3579+
assert(buffer.progress <= Buffer.MAX_LENGTH < 2**28)
3580+
packed = result | (buffer.progress << 4)
3581+
assert(packed != BLOCKED)
3582+
return packed
3583+
```
3584+
The `result` indicates whether the stream was closed by the other end, the
3585+
copy was cancelled by this end (via `{stream,future}.cancel-{read,write}`) or,
3586+
otherwise, completed successfully. In all cases, any number of elements (from
3587+
`0` to `n`) may have *first* been copied into or out of the buffer passed to
3588+
the `read` or `write` and so this number is packed into the `i32` result.
35753589

35763590

35773591
### 🔀 `canon {stream,future}.cancel-{read,write}`

design/mvp/canonical-abi/definitions.py

+35-26
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ def __init__(self, impl, dtor = None, dtor_sync = True, dtor_callback = None):
303303
#### Buffer State
304304

305305
class Buffer:
306-
MAX_LENGTH = 2**30 - 1
306+
MAX_LENGTH = 2**28 - 1
307307
t: ValType
308308
remain: Callable[[], int]
309309

@@ -639,7 +639,7 @@ def drop(self):
639639

640640
RevokeBuffer = Callable[[], None]
641641
OnPartialCopy = Callable[[RevokeBuffer], None]
642-
OnCopyDone = Callable[[], None]
642+
OnCopyDone = Callable[[Literal['completed','cancelled']], None]
643643

644644
class ReadableStream:
645645
t: ValType
@@ -666,19 +666,19 @@ def reset_pending(self):
666666
self.pending_on_partial_copy = None
667667
self.pending_on_copy_done = None
668668

669-
def reset_and_notify_pending(self):
669+
def reset_and_notify_pending(self, why):
670670
pending_on_copy_done = self.pending_on_copy_done
671671
self.reset_pending()
672-
pending_on_copy_done()
672+
pending_on_copy_done(why)
673673

674674
def cancel(self):
675-
self.reset_and_notify_pending()
675+
self.reset_and_notify_pending('cancelled')
676676

677677
def close(self):
678678
if not self.closed_:
679679
self.closed_ = True
680680
if self.pending_buffer:
681-
self.reset_and_notify_pending()
681+
self.reset_and_notify_pending('completed')
682682

683683
def closed(self):
684684
return self.closed_
@@ -706,7 +706,7 @@ def copy(self, inst, buffer, on_partial_copy, on_copy_done, src, dst):
706706
if self.pending_buffer.remain() > 0:
707707
self.pending_on_partial_copy(self.reset_pending)
708708
else:
709-
self.reset_and_notify_pending()
709+
self.reset_and_notify_pending('completed')
710710
return 'done'
711711

712712
class StreamEnd(Waitable):
@@ -736,10 +736,10 @@ def copy(self, inst, src, on_partial_copy, on_copy_done):
736736
class FutureEnd(StreamEnd):
737737
def close_after_copy(self, copy_op, inst, buffer, on_copy_done):
738738
assert(buffer.remain() == 1)
739-
def on_copy_done_wrapper():
739+
def on_copy_done_wrapper(why):
740740
if buffer.remain() == 0:
741741
self.stream.close()
742-
on_copy_done()
742+
on_copy_done(why)
743743
ret = copy_op(inst, buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper)
744744
if ret == 'done' and buffer.remain() == 0:
745745
self.stream.close()
@@ -2022,7 +2022,8 @@ async def copy(EndT, BufferT, event_code, t, opts, task, i, ptr, n):
20222022
buffer = BufferT(t, cx, ptr, n)
20232023
if opts.sync:
20242024
final_revoke_buffer = None
2025-
def on_partial_copy(revoke_buffer):
2025+
def on_partial_copy(revoke_buffer, why = 'completed'):
2026+
assert(why == 'completed')
20262027
nonlocal final_revoke_buffer
20272028
final_revoke_buffer = revoke_buffer
20282029
if not async_copy.done():
@@ -2033,29 +2034,37 @@ def on_partial_copy(revoke_buffer):
20332034
await task.wait_on(async_copy, sync = True)
20342035
final_revoke_buffer()
20352036
else:
2036-
def copy_event(revoke_buffer):
2037+
def copy_event(why, revoke_buffer):
20372038
revoke_buffer()
20382039
e.copying = False
2039-
return (event_code, i, pack_copy_result(task, buffer, e))
2040+
return (event_code, i, pack_copy_result(task, e, buffer, why))
20402041
def on_partial_copy(revoke_buffer):
2041-
e.set_event(partial(copy_event, revoke_buffer))
2042-
def on_copy_done():
2043-
e.set_event(partial(copy_event, revoke_buffer = lambda:()))
2042+
e.set_event(partial(copy_event, 'completed', revoke_buffer))
2043+
def on_copy_done(why):
2044+
e.set_event(partial(copy_event, why, revoke_buffer = lambda:()))
20442045
if e.copy(task.inst, buffer, on_partial_copy, on_copy_done) != 'done':
20452046
e.copying = True
20462047
return [BLOCKED]
2047-
return [pack_copy_result(task, buffer, e)]
2048-
2049-
BLOCKED = 0xffff_ffff
2050-
CLOSED = 0x8000_0000
2051-
2052-
def pack_copy_result(task, buffer, e):
2053-
if buffer.progress or not e.stream.closed():
2054-
assert(buffer.progress <= Buffer.MAX_LENGTH < BLOCKED)
2055-
assert(not (buffer.progress & CLOSED))
2056-
return buffer.progress
2048+
return [pack_copy_result(task, e, buffer, 'completed')]
2049+
2050+
BLOCKED = 0xffff_ffff
2051+
COMPLETED = 0x0
2052+
CLOSED = 0x1
2053+
CANCELLED = 0x2
2054+
2055+
def pack_copy_result(task, e, buffer, why):
2056+
if e.stream.closed():
2057+
result = CLOSED
2058+
elif why == 'cancelled':
2059+
result = CANCELLED
20572060
else:
2058-
return CLOSED
2061+
assert(why == 'completed')
2062+
assert(not isinstance(e, FutureEnd))
2063+
result = COMPLETED
2064+
assert(buffer.progress <= Buffer.MAX_LENGTH < 2**28)
2065+
packed = result | (buffer.progress << 4)
2066+
assert(packed != BLOCKED)
2067+
return packed
20592068

20602069
### 🔀 `canon {stream,future}.cancel-{read,write}`
20612070

0 commit comments

Comments
 (0)