Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Trio 0.12.x #3

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/monitor.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/sh
python -m trio_monitor.monitor -a localhost -p 8899
## or:
# telnet localhost 8899
21 changes: 21 additions & 0 deletions examples/monitored.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import trio
from trio_monitor.monitor import Monitor

async def main():
m = Monitor()
trio.hazmat.add_instrument(m)
async with trio.open_nursery() as nursery:
nursery.start_soon(trio.serve_tcp, m.listen_on_stream, 8899)
print(
"This program is monitored: connect to localhost:8899..."
)
...

async def blip():
while True:
await trio.sleep(1)
nursery.start_soon(blip)

...

trio.run(main)
2 changes: 2 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[pytest]
trio_mode = true
25 changes: 25 additions & 0 deletions trio_monitor/_tests/test_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import trio
from trio_monitor.monitor import Monitor
from trio.testing import open_stream_to_socket_listener


async def test_monitor_connect():
async with trio.open_nursery() as nursery:
m = Monitor()
listeners = await nursery.start(trio.serve_tcp, m.listen_on_stream, 0)

greeting = b" " * 6
cli = await open_stream_to_socket_listener(listeners[0])
with trio.move_on_after(1):
while greeting[-6:] != b"trio> ":
greeting += await cli.receive_some(4)
assert greeting[-6:] == b"trio> "

ps = b" " * 14
await cli.send_all(b"ps\r\n")
with trio.move_on_after(1):
while b"trio.serve_tcp" not in ps:
ps += await cli.receive_some(4)
assert b"trio.serve_tcp" in ps

nursery.cancel_scope.cancel()
28 changes: 23 additions & 5 deletions trio_monitor/monitor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
import inspect
import os
import random
Expand All @@ -9,11 +10,11 @@

from async_generator._impl import ANextIter

from trio import Queue, WouldBlock, BrokenStreamError
from trio._highlevel_serve_listeners import _run_handler
from ._version import __version__
from trio.abc import Instrument
from trio.hazmat import current_task, Task
import trio


# inspiration: https://github.com/python-trio/trio/blob/master/notes-to-self/print-task-tree.py
Expand All @@ -35,6 +36,23 @@ def walk_coro_stack(coro):
coro = coro.gi_yieldfrom


class Queue():
def __init__(self, capacity=math.inf):
self.send_ch, self.recv_ch = trio.open_memory_channel(capacity)

def __aiter__(self):
return self

def put_nowait(self, message):
self.send_ch.send_nowait(message)

async def __anext__(self):
msg = await self.recv_ch.receive()
if not msg:
raise StopAsyncIteration
return msg


class Monitor(Instrument):
"""Represents a monitor; a simple way of monitoring the health of your
Trio application using the Trio instrumentation API.
Expand All @@ -59,7 +77,7 @@ def __init__(self):
self._is_monitoring = False
# semi-arbitrary size, because otherwise we'll be dropping events
# no clue how to make this better, alas.
self._monitoring_queue = Queue(capacity=100)
self._monitoring_queue = Queue()

@staticmethod
def get_root_task() -> Task:
Expand Down Expand Up @@ -127,7 +145,7 @@ def _add_to_monitoring_queue(self, item):

try:
self._monitoring_queue.put_nowait(item)
except WouldBlock:
except trio.WouldBlock:
return

async def listen_on_stream(self, stream):
Expand Down Expand Up @@ -167,7 +185,7 @@ async def main_loop(self, stream):
finally:
self._is_monitoring = False
# empty out the queue
self._monitoring_queue = Queue(capacity=100)
self._monitoring_queue = Queue()

try:
fn = getattr(self, "command_{}".format(name))
Expand Down Expand Up @@ -242,7 +260,7 @@ async def do_monitor(self, stream):

try:
await stream.send_all(message.encode("ascii") + b'\n')
except BrokenStreamError: # client disconnected on us
except trio.BrokenResourceError: # client disconnected on us
return

# command definitions
Expand Down