From de58dc908ac03d78a92b1dc13643a486a63007f4 Mon Sep 17 00:00:00 2001 From: hballard Date: Sun, 21 Jan 2018 22:12:34 -0600 Subject: [PATCH 01/20] Add pubsub classes for asyncio and rx --- graphql_ws/pubsub.py | 45 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 graphql_ws/pubsub.py diff --git a/graphql_ws/pubsub.py b/graphql_ws/pubsub.py new file mode 100644 index 0000000..65416c3 --- /dev/null +++ b/graphql_ws/pubsub.py @@ -0,0 +1,45 @@ +from asyncio import Queue +from rx.subjects import Subject + + +class AsyncioPubsub(object): + + def __init__(self): + self.subscriptions = {} + self.sub_id = 0 + + async def publish(self, channel, payload): + if channel in self.subscriptions: + for q in self.subscriptions[channel].values(): + await q.put(payload) + + def subscribe_to_channel(self, channel): + self.sub_id += 1 + q = Queue() + if channel in self.subscriptions: + self.subscriptions[channel][self.sub_id] = q + else: + self.subscriptions[channel] = {self.sub_id: q} + return self.sub_id, q + + def unsubscribe(self, channel, sub_id): + if sub_id in self.subscriptions.get(channel, {}): + del self.subscriptions[channel][sub_id] + + +class RxPubsub(object): + + def __init__(self): + self.subscriptions = {} + + def publish(self, channel, payload): + if channel in self.subscriptions: + self.subscriptions[channel].on_next(payload) + + def subscribe_to_channel(self, channel): + if channel in self.subscriptions: + return self.subscriptions[channel] + else: + subject = Subject() + self.subscriptions[channel] = subject + return subject From eb1530225d42e56373438728af2bcc3c1abc0591 Mon Sep 17 00:00:00 2001 From: hballard Date: Sun, 21 Jan 2018 22:13:42 -0600 Subject: [PATCH 02/20] Modify schema examples to include pubsub classes --- examples/flask_gevent/schema.py | 15 ++++++++++++++- examples/websockets_lib/schema.py | 17 ++++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/examples/flask_gevent/schema.py b/examples/flask_gevent/schema.py index 6e6298c..432cd86 100644 --- a/examples/flask_gevent/schema.py +++ b/examples/flask_gevent/schema.py @@ -2,9 +2,17 @@ import graphene from rx import Observable +from graphql_ws.pubsub import RxPubsub + +p = RxPubsub() + class Query(graphene.ObjectType): - base = graphene.String() + base = graphene.String(value=graphene.String()) + + def resolve_base(root, info, value='Hello World!'): + p.publish('BASE', value) + return value class RandomType(graphene.ObjectType): @@ -18,6 +26,11 @@ class Subscription(graphene.ObjectType): random_int = graphene.Field(RandomType) + base_sub = graphene.String() + + def resolve_base_sub(root, info): + return p.subscribe_to_channel('BASE') + def resolve_count_seconds(root, info, up_to=5): return Observable.interval(1000)\ .map(lambda i: "{0}".format(i))\ diff --git a/examples/websockets_lib/schema.py b/examples/websockets_lib/schema.py index 3c23d00..37a54ea 100644 --- a/examples/websockets_lib/schema.py +++ b/examples/websockets_lib/schema.py @@ -2,9 +2,17 @@ import asyncio import graphene +from graphql_ws.pubsub import AsyncioPubsub + +p = AsyncioPubsub() + class Query(graphene.ObjectType): - base = graphene.String() + base = graphene.String(value=graphene.String()) + + async def resolve_base(root, info, value='Hello World!'): + await p.publish('BASE', value) + return value class RandomType(graphene.ObjectType): @@ -15,6 +23,13 @@ class RandomType(graphene.ObjectType): class Subscription(graphene.ObjectType): count_seconds = graphene.Float(up_to=graphene.Int()) random_int = graphene.Field(RandomType) + base_sub = graphene.String() + + async def resolve_base_sub(root, info): + sub_id, q = p.subscribe('BASE') + while True: + payload = await q.get() + yield payload async def resolve_count_seconds(root, info, up_to=5): for i in range(up_to): From dbdf0af44c05226537de701b56ecc1a4b3c84334 Mon Sep 17 00:00:00 2001 From: hballard Date: Sun, 21 Jan 2018 22:14:28 -0600 Subject: [PATCH 03/20] Fix graphiql version number for template code --- examples/flask_gevent/template.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/flask_gevent/template.py b/examples/flask_gevent/template.py index e7e0d6a..ef905a8 100644 --- a/examples/flask_gevent/template.py +++ b/examples/flask_gevent/template.py @@ -117,9 +117,8 @@ def render_graphiql(): ''').substitute( - GRAPHIQL_VERSION='0.11.7', + GRAPHIQL_VERSION='0.10.2', SUBSCRIPTIONS_TRANSPORT_VERSION='0.7.0', subscriptionsEndpoint='ws://localhost:5000/subscriptions', - # subscriptionsEndpoint='ws://localhost:5000/', endpointURL='/graphql', ) From 7174b74fbf82557dbb4ea691cf342900eb5d6f24 Mon Sep 17 00:00:00 2001 From: hballard Date: Wed, 24 Jan 2018 23:53:22 -0600 Subject: [PATCH 04/20] Add redis pubsub that returns observable --- examples/flask_gevent/schema.py | 15 +++++++++--- graphql_ws/pubsub.py | 43 +++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/examples/flask_gevent/schema.py b/examples/flask_gevent/schema.py index 432cd86..7f47c83 100644 --- a/examples/flask_gevent/schema.py +++ b/examples/flask_gevent/schema.py @@ -2,9 +2,11 @@ import graphene from rx import Observable -from graphql_ws.pubsub import RxPubsub +# from graphql_ws.pubsub import RxPubsub +from graphql_ws.pubsub import GeventRedisPubsub -p = RxPubsub() +# p = RxPubsub() +p = GeventRedisPubsub() class Query(graphene.ObjectType): @@ -29,7 +31,14 @@ class Subscription(graphene.ObjectType): base_sub = graphene.String() def resolve_base_sub(root, info): - return p.subscribe_to_channel('BASE') + # subscribe_to_channel method returns an observable + sub_id, observable = p.subscribe_to_channel('BASE') + return observable.map(lambda i: "{0}".format(i)) + + # def resolve_base_sub(root, info): + # # subscribe_to_channel method returns an observable + # return p.subscribe_to_channel('BASE')\ + # .map(lambda i: "{0}".format(i)) def resolve_count_seconds(root, info, up_to=5): return Observable.interval(1000)\ diff --git a/graphql_ws/pubsub.py b/graphql_ws/pubsub.py index 65416c3..15d12c4 100644 --- a/graphql_ws/pubsub.py +++ b/graphql_ws/pubsub.py @@ -1,4 +1,10 @@ +import gevent +import pickle +import redis + from asyncio import Queue +from rx import Observable +from rx.concurrency import GEventScheduler from rx.subjects import Subject @@ -43,3 +49,40 @@ def subscribe_to_channel(self, channel): subject = Subject() self.subscriptions[channel] = subject return subject + + +class GeventRedisPubsub(object): + + def __init__(self, host='localhost', port=6379, *args, **kwargs): + redis.connection.socket = gevent.socket + self.redis = redis.StrictRedis(host, port, *args, **kwargs) + self.subscriptions = {} + self.sub_id = 0 + + def publish(self, channel, payload): + self.redis.publish(channel, pickle.dumps(payload)) + + def subscribe_to_channel(self, channel): + self.sub_id += 1 + + self.pubsub = self.redis.pubsub() + self.pubsub.subscribe(channel) + + self.subscriptions[self.sub_id] = self.pubsub + + def wait_and_get_messages(observer): + while True: + message = self.pubsub.get_message( + ignore_subscribe_messages=True) + if message: + observer.on_next(pickle.loads(message['data'])) + gevent.sleep(.001) + + return self.sub_id, Observable.create(wait_and_get_messages)\ + .subscribe_on(GEventScheduler()) + + def unsubscribe(self, channel, sub_id): + if sub_id in self.subscriptions: + self.subscriptions[sub_id].unsubscribe(channel) + self.subscriptions[sub_id].close() + del self.subscriptions[sub_id] From 427216b79f62ca6ee6bdc4e456ff82e2b0b5b7ba Mon Sep 17 00:00:00 2001 From: hballard Date: Fri, 26 Jan 2018 20:48:39 -0600 Subject: [PATCH 05/20] Add pubsub class using rx and redis --- examples/flask_gevent/schema.py | 9 ++------ graphql_ws/pubsub.py | 38 ++++++++++++++++----------------- 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/examples/flask_gevent/schema.py b/examples/flask_gevent/schema.py index 7f47c83..4d12294 100644 --- a/examples/flask_gevent/schema.py +++ b/examples/flask_gevent/schema.py @@ -32,13 +32,8 @@ class Subscription(graphene.ObjectType): def resolve_base_sub(root, info): # subscribe_to_channel method returns an observable - sub_id, observable = p.subscribe_to_channel('BASE') - return observable.map(lambda i: "{0}".format(i)) - - # def resolve_base_sub(root, info): - # # subscribe_to_channel method returns an observable - # return p.subscribe_to_channel('BASE')\ - # .map(lambda i: "{0}".format(i)) + return p.subscribe_to_channel('BASE')\ + .map(lambda i: "{0}".format(i)) def resolve_count_seconds(root, info, up_to=5): return Observable.interval(1000)\ diff --git a/graphql_ws/pubsub.py b/graphql_ws/pubsub.py index 15d12c4..065729a 100644 --- a/graphql_ws/pubsub.py +++ b/graphql_ws/pubsub.py @@ -57,32 +57,30 @@ def __init__(self, host='localhost', port=6379, *args, **kwargs): redis.connection.socket = gevent.socket self.redis = redis.StrictRedis(host, port, *args, **kwargs) self.subscriptions = {} - self.sub_id = 0 def publish(self, channel, payload): self.redis.publish(channel, pickle.dumps(payload)) def subscribe_to_channel(self, channel): - self.sub_id += 1 - - self.pubsub = self.redis.pubsub() - self.pubsub.subscribe(channel) + if channel in self.subscriptions: + return self.subscriptions[channel] + else: + pubsub = self.redis.pubsub() + pubsub.subscribe(channel) - self.subscriptions[self.sub_id] = self.pubsub + def wait_and_get_messages(observer): + while True: + message = pubsub.get_message( + ignore_subscribe_messages=True) + if message: + observer.on_next(pickle.loads(message['data'])) + gevent.sleep(.001) - def wait_and_get_messages(observer): - while True: - message = self.pubsub.get_message( - ignore_subscribe_messages=True) - if message: - observer.on_next(pickle.loads(message['data'])) - gevent.sleep(.001) + observable = Observable.create(wait_and_get_messages)\ + .subscribe_on(GEventScheduler())\ + .publish()\ + .auto_connect() - return self.sub_id, Observable.create(wait_and_get_messages)\ - .subscribe_on(GEventScheduler()) + self.subscriptions[channel] = observable - def unsubscribe(self, channel, sub_id): - if sub_id in self.subscriptions: - self.subscriptions[sub_id].unsubscribe(channel) - self.subscriptions[sub_id].close() - del self.subscriptions[sub_id] + return observable From 1d79e37cb3b1713cfd4237d52f8af2c7a9e1e80e Mon Sep 17 00:00:00 2001 From: hballard Date: Sat, 27 Jan 2018 22:46:27 -0600 Subject: [PATCH 06/20] Modify GeventRedisPubsub class --- graphql_ws/pubsub.py | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/graphql_ws/pubsub.py b/graphql_ws/pubsub.py index 065729a..21d95b6 100644 --- a/graphql_ws/pubsub.py +++ b/graphql_ws/pubsub.py @@ -1,10 +1,9 @@ -import gevent import pickle + +import gevent import redis from asyncio import Queue -from rx import Observable -from rx.concurrency import GEventScheduler from rx.subjects import Subject @@ -56,7 +55,9 @@ class GeventRedisPubsub(object): def __init__(self, host='localhost', port=6379, *args, **kwargs): redis.connection.socket = gevent.socket self.redis = redis.StrictRedis(host, port, *args, **kwargs) + self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) self.subscriptions = {} + self.greenlet = None def publish(self, channel, payload): self.redis.publish(channel, pickle.dumps(payload)) @@ -65,22 +66,22 @@ def subscribe_to_channel(self, channel): if channel in self.subscriptions: return self.subscriptions[channel] else: - pubsub = self.redis.pubsub() - pubsub.subscribe(channel) - - def wait_and_get_messages(observer): - while True: - message = pubsub.get_message( - ignore_subscribe_messages=True) - if message: - observer.on_next(pickle.loads(message['data'])) - gevent.sleep(.001) - - observable = Observable.create(wait_and_get_messages)\ - .subscribe_on(GEventScheduler())\ - .publish()\ - .auto_connect() - - self.subscriptions[channel] = observable + self.pubsub.subscribe(channel) + subject = Subject() + self.subscriptions[channel] = subject + if not self.greenlet: + self.greenlet = gevent.spawn(self._wait_and_get_messages) + return subject - return observable + def _wait_and_get_messages(self): + while True: + msg = self.pubsub.get_message() + if msg: + if isinstance(msg['channel'], bytes): + channel = msg['channel'].decode() + else: + channel = msg['channel'] + if channel in self.subscriptions: + self.subscriptions[channel].on_next(pickle.loads( + msg['data'])) + gevent.sleep(.001) From 2d28b5b853c7236bd358225453e648de4aa34ea6 Mon Sep 17 00:00:00 2001 From: hballard Date: Sun, 28 Jan 2018 01:01:25 -0600 Subject: [PATCH 07/20] Add AsyncioRedisPubsub class --- graphql_ws/pubsub.py | 56 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/graphql_ws/pubsub.py b/graphql_ws/pubsub.py index 21d95b6..6b5bfe0 100644 --- a/graphql_ws/pubsub.py +++ b/graphql_ws/pubsub.py @@ -1,9 +1,10 @@ +import asyncio import pickle +import aredis import gevent import redis -from asyncio import Queue from rx.subjects import Subject @@ -20,7 +21,7 @@ async def publish(self, channel, payload): def subscribe_to_channel(self, channel): self.sub_id += 1 - q = Queue() + q = asyncio.Queue() if channel in self.subscriptions: self.subscriptions[channel][self.sub_id] = q else: @@ -30,6 +31,8 @@ def subscribe_to_channel(self, channel): def unsubscribe(self, channel, sub_id): if sub_id in self.subscriptions.get(channel, {}): del self.subscriptions[channel][sub_id] + if not self.subscriptions[channel]: + del self.subscriptions[channel] class RxPubsub(object): @@ -49,6 +52,52 @@ def subscribe_to_channel(self, channel): self.subscriptions[channel] = subject return subject + def unsubscribe(self, channel, disposable): + pass + + +class AsyncioRedisPubsub(object): + + def __init__(self, host='localhost', port=6379, *args, **kwargs): + self.redis = aredis.StrictRedis(host, port, *args, **kwargs) + self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) + self.subscriptions = {} + self.sub_id = 0 + self.future = None + + async def publish(self, channel, payload): + await self.redis.publish(channel, pickle.dumps(payload)) + + async def subscribe_to_channel(self, channel): + self.sub_id += 1 + q = asyncio.Queue() + if channel in self.subscriptions: + self.subscriptions[channel][self.sub_id] = q + else: + await self.pubsub.subscribe(channel) + self.subscriptions[channel] = {self.sub_id: q} + if not self.future: + self.future = asyncio.ensure_future( + self._wait_and_get_messages()) + return self.sub_id, q + + def unsubscribe(self, channel, sub_id): + if sub_id in self.subscriptions.get(channel, {}): + del self.subscriptions[channel][sub_id] + if not self.subscriptions[channel]: + self.pubsub.unsubscribe(channel) + del self.subscriptions[channel] + + async def _wait_and_get_messages(self): + while True: + msg = await self.pubsub.get_message() + if msg: + channel = msg['channel'].decode() + if channel in self.subscriptions: + for q in self.subscriptions[channel].values(): + await q.put(pickle.loads(msg['data'])) + await asyncio.sleep(.001) + class GeventRedisPubsub(object): @@ -73,6 +122,9 @@ def subscribe_to_channel(self, channel): self.greenlet = gevent.spawn(self._wait_and_get_messages) return subject + def unsubscribe(self, channel, disposable): + pass + def _wait_and_get_messages(self): while True: msg = self.pubsub.get_message() From b6ded509ce25b09ee901c09354c2c3a025528ae1 Mon Sep 17 00:00:00 2001 From: hballard Date: Wed, 21 Feb 2018 22:48:18 -0600 Subject: [PATCH 08/20] Add wrapper class for cleaning up Subject cancel() --- examples/flask_gevent/schema.py | 8 +-- examples/websockets_lib/app.py | 17 ++++- examples/websockets_lib/schema.py | 18 ++++-- graphql_ws/pubsub.py | 100 +++++++++++++++++++----------- 4 files changed, 97 insertions(+), 46 deletions(-) diff --git a/examples/flask_gevent/schema.py b/examples/flask_gevent/schema.py index 4d12294..0b700f6 100644 --- a/examples/flask_gevent/schema.py +++ b/examples/flask_gevent/schema.py @@ -2,11 +2,11 @@ import graphene from rx import Observable -# from graphql_ws.pubsub import RxPubsub -from graphql_ws.pubsub import GeventRedisPubsub +from graphql_ws.pubsub import GeventRxPubsub +# from graphql_ws.pubsub import GeventRxRedisPubsub -# p = RxPubsub() -p = GeventRedisPubsub() +p = GeventRxPubsub() +# p = GeventRxRedisPubsub() class Query(graphene.ObjectType): diff --git a/examples/websockets_lib/app.py b/examples/websockets_lib/app.py index 0de6988..33b333c 100644 --- a/examples/websockets_lib/app.py +++ b/examples/websockets_lib/app.py @@ -1,3 +1,5 @@ +import asyncio + from graphql_ws.websockets_lib import WsLibSubscriptionServer from graphql.execution.executors.asyncio import AsyncioExecutor from sanic import Sanic, response @@ -9,12 +11,25 @@ @app.listener('before_server_start') -def init_graphql(app, loop): +async def init_graphql(app, loop): app.add_route(GraphQLView.as_view(schema=schema, executor=AsyncioExecutor(loop=loop)), '/graphql') +@app.listener('before_server_stop') +async def cleanup_subscription_tasks(app, loop): + # clean up tasks created by subscriptions and pubsub + def shutdown_exception_handler(loop, context): + if "exception" not in context or not isinstance( + context["exception"], asyncio.CancelledError): + loop.default_exception_handler(context) + loop.set_exception_handler(shutdown_exception_handler) + pending = asyncio.Task.all_tasks(loop=loop) + future = asyncio.gather(*pending, loop=loop, return_exceptions=True) + future.cancel() + + @app.route('/graphiql') async def graphiql_view(request): return response.html(render_graphiql()) diff --git a/examples/websockets_lib/schema.py b/examples/websockets_lib/schema.py index 37a54ea..33e6caa 100644 --- a/examples/websockets_lib/schema.py +++ b/examples/websockets_lib/schema.py @@ -2,9 +2,11 @@ import asyncio import graphene -from graphql_ws.pubsub import AsyncioPubsub +# from graphql_ws.pubsub import AsyncioPubsub +from graphql_ws.pubsub import AsyncioRedisPubsub -p = AsyncioPubsub() +# p = AsyncioPubsub() +p = AsyncioRedisPubsub() class Query(graphene.ObjectType): @@ -26,10 +28,14 @@ class Subscription(graphene.ObjectType): base_sub = graphene.String() async def resolve_base_sub(root, info): - sub_id, q = p.subscribe('BASE') - while True: - payload = await q.get() - yield payload + try: + sub_id, q = await p.subscribe_to_channel('BASE') + # sub_id, q = p.subscribe_to_channel('BASE') + while True: + payload = await q.get() + yield payload + except asyncio.CancelledError: + print('Caught SIGINT') async def resolve_count_seconds(root, info, up_to=5): for i in range(up_to): diff --git a/graphql_ws/pubsub.py b/graphql_ws/pubsub.py index 6b5bfe0..967d8b2 100644 --- a/graphql_ws/pubsub.py +++ b/graphql_ws/pubsub.py @@ -6,9 +6,10 @@ import redis from rx.subjects import Subject +from rx import config -class AsyncioPubsub(object): +class AsyncioPubsub: def __init__(self): self.subscriptions = {} @@ -31,39 +32,18 @@ def subscribe_to_channel(self, channel): def unsubscribe(self, channel, sub_id): if sub_id in self.subscriptions.get(channel, {}): del self.subscriptions[channel][sub_id] - if not self.subscriptions[channel]: - del self.subscriptions[channel] + if not self.subscriptions[channel]: + del self.subscriptions[channel] -class RxPubsub(object): - - def __init__(self): - self.subscriptions = {} - - def publish(self, channel, payload): - if channel in self.subscriptions: - self.subscriptions[channel].on_next(payload) - - def subscribe_to_channel(self, channel): - if channel in self.subscriptions: - return self.subscriptions[channel] - else: - subject = Subject() - self.subscriptions[channel] = subject - return subject - - def unsubscribe(self, channel, disposable): - pass - - -class AsyncioRedisPubsub(object): +class AsyncioRedisPubsub: def __init__(self, host='localhost', port=6379, *args, **kwargs): self.redis = aredis.StrictRedis(host, port, *args, **kwargs) self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) self.subscriptions = {} self.sub_id = 0 - self.future = None + self.task = None async def publish(self, channel, payload): await self.redis.publish(channel, pickle.dumps(payload)) @@ -76,17 +56,19 @@ async def subscribe_to_channel(self, channel): else: await self.pubsub.subscribe(channel) self.subscriptions[channel] = {self.sub_id: q} - if not self.future: - self.future = asyncio.ensure_future( + if not self.task: + self.task = asyncio.ensure_future( self._wait_and_get_messages()) return self.sub_id, q - def unsubscribe(self, channel, sub_id): + async def unsubscribe(self, channel, sub_id): if sub_id in self.subscriptions.get(channel, {}): del self.subscriptions[channel][sub_id] - if not self.subscriptions[channel]: - self.pubsub.unsubscribe(channel) - del self.subscriptions[channel] + if not self.subscriptions[channel]: + await self.pubsub.unsubscribe(channel) + del self.subscriptions[channel] + if not self.subscriptions: + self.task.cancel() async def _wait_and_get_messages(self): while True: @@ -99,7 +81,49 @@ async def _wait_and_get_messages(self): await asyncio.sleep(.001) -class GeventRedisPubsub(object): +class ObserversWrapper(object): + def __init__(self, pubsub, channel): + self.pubsub = pubsub + self.channel = channel + self.observers = [] + + self.lock = config["concurrency"].RLock() + + def __getattr__(self, attr): + return getattr(self.observers, attr) + + def remove(self, observer): + with self.lock: + self.observers.remove(observer) + if not self.observers: + self.pubsub.unsubscribe(self.channel) + + +class GeventRxPubsub(object): + + def __init__(self): + self.subscriptions = {} + + def publish(self, channel, payload): + if channel in self.subscriptions: + self.subscriptions[channel].on_next(payload) + + def subscribe_to_channel(self, channel): + if channel in self.subscriptions: + return self.subscriptions[channel] + else: + subject = Subject() + # monkeypatch Subject to cleanup pubsub on subscription cancel() + subject.observers = ObserversWrapper(self, channel) + self.subscriptions[channel] = subject + return subject + + def unsubscribe(self, channel): + if channel in self.subscriptions: + del self.subscriptions[channel] + + +class GeventRxRedisPubsub(object): def __init__(self, host='localhost', port=6379, *args, **kwargs): redis.connection.socket = gevent.socket @@ -117,13 +141,19 @@ def subscribe_to_channel(self, channel): else: self.pubsub.subscribe(channel) subject = Subject() + # monkeypatch Subject to cleanup pubsub on subscription cancel() + subject.observers = ObserversWrapper(self, channel) self.subscriptions[channel] = subject if not self.greenlet: self.greenlet = gevent.spawn(self._wait_and_get_messages) return subject - def unsubscribe(self, channel, disposable): - pass + def unsubscribe(self, channel): + if channel in self.subscriptions: + self.pubsub.unsubscribe(channel) + del self.subscriptions[channel] + if not self.subscriptions: + self.greenlet.kill() def _wait_and_get_messages(self): while True: From 32e139f5af5320945d8a0185dd535fcfc9b48c39 Mon Sep 17 00:00:00 2001 From: hballard Date: Thu, 22 Feb 2018 23:39:37 -0600 Subject: [PATCH 09/20] Fix bug in Subject wrapper class code --- examples/flask_gevent/app.py | 3 ++- examples/flask_gevent/schema.py | 11 ++++++----- examples/websockets_lib/schema.py | 18 +++++++++++------- graphql_ws/pubsub.py | 15 ++++++++++----- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/examples/flask_gevent/app.py b/examples/flask_gevent/app.py index dbb0cca..4d822a4 100644 --- a/examples/flask_gevent/app.py +++ b/examples/flask_gevent/app.py @@ -20,7 +20,8 @@ def graphql_view(): app.add_url_rule( - '/graphql', view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=False)) + '/graphql', view_func=GraphQLView.as_view('graphql', schema=schema, + graphiql=False)) subscription_server = GeventSubscriptionServer(schema) app.app_protocol = lambda environ_path_info: 'graphql-ws' diff --git a/examples/flask_gevent/schema.py b/examples/flask_gevent/schema.py index 0b700f6..389511b 100644 --- a/examples/flask_gevent/schema.py +++ b/examples/flask_gevent/schema.py @@ -2,11 +2,11 @@ import graphene from rx import Observable -from graphql_ws.pubsub import GeventRxPubsub -# from graphql_ws.pubsub import GeventRxRedisPubsub +# from graphql_ws.pubsub import GeventRxPubsub +from graphql_ws.pubsub import GeventRxRedisPubsub -p = GeventRxPubsub() -# p = GeventRxRedisPubsub() +# p = GeventRxPubsub() +p = GeventRxRedisPubsub() class Query(graphene.ObjectType): @@ -41,7 +41,8 @@ def resolve_count_seconds(root, info, up_to=5): .take_while(lambda i: int(i) <= up_to) def resolve_random_int(root, info): - return Observable.interval(1000).map(lambda i: RandomType(seconds=i, random_int=random.randint(0, 500))) + return Observable.interval(1000).map( + lambda i: RandomType(seconds=i, random_int=random.randint(0, 500))) schema = graphene.Schema(query=Query, subscription=Subscription) diff --git a/examples/websockets_lib/schema.py b/examples/websockets_lib/schema.py index 33e6caa..3ad99a8 100644 --- a/examples/websockets_lib/schema.py +++ b/examples/websockets_lib/schema.py @@ -2,11 +2,15 @@ import asyncio import graphene -# from graphql_ws.pubsub import AsyncioPubsub -from graphql_ws.pubsub import AsyncioRedisPubsub +from graphql_ws.pubsub import AsyncioPubsub -# p = AsyncioPubsub() -p = AsyncioRedisPubsub() +p = AsyncioPubsub() + +# TODO: +# - Add Mutation Type +# - Breakup into package and two modules +# - Add explanation on how to use pubsub to readme +# - Modify author code to peform proper cleanup during cancel() class Query(graphene.ObjectType): @@ -29,13 +33,13 @@ class Subscription(graphene.ObjectType): async def resolve_base_sub(root, info): try: - sub_id, q = await p.subscribe_to_channel('BASE') - # sub_id, q = p.subscribe_to_channel('BASE') + # sub_id, q = await p.subscribe_to_channel('BASE') + sub_id, q = p.subscribe_to_channel('BASE') while True: payload = await q.get() yield payload except asyncio.CancelledError: - print('Caught SIGINT') + p.unsubscribe('BASE', sub_id) async def resolve_count_seconds(root, info, up_to=5): for i in range(up_to): diff --git a/graphql_ws/pubsub.py b/graphql_ws/pubsub.py index 967d8b2..4748fc9 100644 --- a/graphql_ws/pubsub.py +++ b/graphql_ws/pubsub.py @@ -81,7 +81,7 @@ async def _wait_and_get_messages(self): await asyncio.sleep(.001) -class ObserversWrapper(object): +class SubjectObserversWrapper(object): def __init__(self, pubsub, channel): self.pubsub = pubsub self.channel = channel @@ -89,6 +89,9 @@ def __init__(self, pubsub, channel): self.lock = config["concurrency"].RLock() + def __getitem__(self, key): + return self.observers[key] + def __getattr__(self, attr): return getattr(self.observers, attr) @@ -113,8 +116,9 @@ def subscribe_to_channel(self, channel): return self.subscriptions[channel] else: subject = Subject() - # monkeypatch Subject to cleanup pubsub on subscription cancel() - subject.observers = ObserversWrapper(self, channel) + # monkeypatch Subject to unsubscribe pubsub on observable + # subscription.cancel() + subject.observers = SubjectObserversWrapper(self, channel) self.subscriptions[channel] = subject return subject @@ -141,8 +145,9 @@ def subscribe_to_channel(self, channel): else: self.pubsub.subscribe(channel) subject = Subject() - # monkeypatch Subject to cleanup pubsub on subscription cancel() - subject.observers = ObserversWrapper(self, channel) + # monkeypatch Subject to unsubscribe pubsub on observable + # subscription.cancel() + subject.observers = SubjectObserversWrapper(self, channel) self.subscriptions[channel] = subject if not self.greenlet: self.greenlet = gevent.spawn(self._wait_and_get_messages) From 331c04059cb7b7fc5a7c4e2cb4985a318f675e99 Mon Sep 17 00:00:00 2001 From: hballard Date: Fri, 23 Feb 2018 21:03:33 -0600 Subject: [PATCH 10/20] Add mutation using pubsub class to example apps --- examples/aiohttp/schema.py | 36 +++++++++++++++++++++++++- examples/flask_gevent/schema.py | 42 +++++++++++++++++++------------ examples/websockets_lib/schema.py | 34 +++++++++++++++++-------- 3 files changed, 85 insertions(+), 27 deletions(-) diff --git a/examples/aiohttp/schema.py b/examples/aiohttp/schema.py index 3c23d00..c0b8adc 100644 --- a/examples/aiohttp/schema.py +++ b/examples/aiohttp/schema.py @@ -2,10 +2,32 @@ import asyncio import graphene +from graphql_ws.pubsub import AsyncioPubsub + +p = AsyncioPubsub() + class Query(graphene.ObjectType): base = graphene.String() + async def resolve_base(root, info): + return 'Hello World!' + + +class MutationExample(graphene.Mutation): + class Arguments: + input_text = graphene.String() + + output_text = graphene.String() + + async def mutate(self, info, input_text): + await p.publish('BASE', input_text) + return MutationExample(output_text=input_text) + + +class Mutations(graphene.ObjectType): + mutation_example = MutationExample.Field() + class RandomType(graphene.ObjectType): seconds = graphene.Int() @@ -15,6 +37,17 @@ class RandomType(graphene.ObjectType): class Subscription(graphene.ObjectType): count_seconds = graphene.Float(up_to=graphene.Int()) random_int = graphene.Field(RandomType) + mutation_example = graphene.String() + + async def resolve_mutation_example(root, info): + try: + # sub_id, q = await p.subscribe_to_channel('BASE') + sub_id, q = p.subscribe_to_channel('BASE') + while True: + payload = await q.get() + yield payload + except asyncio.CancelledError: + p.unsubscribe('BASE', sub_id) async def resolve_count_seconds(root, info, up_to=5): for i in range(up_to): @@ -31,4 +64,5 @@ async def resolve_random_int(root, info): i += 1 -schema = graphene.Schema(query=Query, subscription=Subscription) +schema = graphene.Schema(query=Query, mutation=Mutations, + subscription=Subscription) diff --git a/examples/flask_gevent/schema.py b/examples/flask_gevent/schema.py index 389511b..242ae07 100644 --- a/examples/flask_gevent/schema.py +++ b/examples/flask_gevent/schema.py @@ -1,20 +1,32 @@ -import random import graphene -from rx import Observable +import random -# from graphql_ws.pubsub import GeventRxPubsub -from graphql_ws.pubsub import GeventRxRedisPubsub +from graphql_ws.pubsub import GeventRxPubsub +from rx import Observable -# p = GeventRxPubsub() -p = GeventRxRedisPubsub() +p = GeventRxPubsub() class Query(graphene.ObjectType): - base = graphene.String(value=graphene.String()) + base = graphene.String() + + def resolve_base(root, info): + return 'Hello World!' + + +class MutationExample(graphene.Mutation): + class Arguments: + input_text = graphene.String() - def resolve_base(root, info, value='Hello World!'): - p.publish('BASE', value) - return value + output_text = graphene.String() + + def mutate(self, info, input_text): + p.publish('BASE', input_text) + return MutationExample(output_text=input_text) + + +class Mutations(graphene.ObjectType): + mutation_example = MutationExample.Field() class RandomType(graphene.ObjectType): @@ -23,14 +35,11 @@ class RandomType(graphene.ObjectType): class Subscription(graphene.ObjectType): - count_seconds = graphene.Int(up_to=graphene.Int()) - random_int = graphene.Field(RandomType) + mutation_example = graphene.String() - base_sub = graphene.String() - - def resolve_base_sub(root, info): + def resolve_mutation_example(root, info): # subscribe_to_channel method returns an observable return p.subscribe_to_channel('BASE')\ .map(lambda i: "{0}".format(i)) @@ -45,4 +54,5 @@ def resolve_random_int(root, info): lambda i: RandomType(seconds=i, random_int=random.randint(0, 500))) -schema = graphene.Schema(query=Query, subscription=Subscription) +schema = graphene.Schema(query=Query, mutation=Mutations, + subscription=Subscription) diff --git a/examples/websockets_lib/schema.py b/examples/websockets_lib/schema.py index 3ad99a8..6033792 100644 --- a/examples/websockets_lib/schema.py +++ b/examples/websockets_lib/schema.py @@ -7,18 +7,31 @@ p = AsyncioPubsub() # TODO: -# - Add Mutation Type -# - Breakup into package and two modules -# - Add explanation on how to use pubsub to readme +# - Breakup pubsub module into package and two modules +# - Add explanation on how to use pubsub # - Modify author code to peform proper cleanup during cancel() class Query(graphene.ObjectType): - base = graphene.String(value=graphene.String()) + base = graphene.String() - async def resolve_base(root, info, value='Hello World!'): - await p.publish('BASE', value) - return value + async def resolve_base(root, info): + return 'Hello World!' + + +class MutationExample(graphene.Mutation): + class Arguments: + input_text = graphene.String() + + output_text = graphene.String() + + async def mutate(self, info, input_text): + await p.publish('BASE', input_text) + return MutationExample(output_text=input_text) + + +class Mutations(graphene.ObjectType): + mutation_example = MutationExample.Field() class RandomType(graphene.ObjectType): @@ -29,9 +42,9 @@ class RandomType(graphene.ObjectType): class Subscription(graphene.ObjectType): count_seconds = graphene.Float(up_to=graphene.Int()) random_int = graphene.Field(RandomType) - base_sub = graphene.String() + mutation_example = graphene.String() - async def resolve_base_sub(root, info): + async def resolve_mutation_example(root, info): try: # sub_id, q = await p.subscribe_to_channel('BASE') sub_id, q = p.subscribe_to_channel('BASE') @@ -56,4 +69,5 @@ async def resolve_random_int(root, info): i += 1 -schema = graphene.Schema(query=Query, subscription=Subscription) +schema = graphene.Schema(query=Query, mutation=Mutations, + subscription=Subscription) From 1f4bd213b03cf93a581e6a113ad425bee8a84529 Mon Sep 17 00:00:00 2001 From: hballard Date: Fri, 23 Feb 2018 21:23:39 -0600 Subject: [PATCH 11/20] Broke pubsub module into a package and two modules --- examples/websockets_lib/schema.py | 1 - graphql_ws/pubsub/__init__.py | 2 + graphql_ws/pubsub/asyncio.py | 76 +++++++++++++++++++ .../gevent_observable.py} | 74 ------------------ 4 files changed, 78 insertions(+), 75 deletions(-) create mode 100644 graphql_ws/pubsub/__init__.py create mode 100644 graphql_ws/pubsub/asyncio.py rename graphql_ws/{pubsub.py => pubsub/gevent_observable.py} (55%) diff --git a/examples/websockets_lib/schema.py b/examples/websockets_lib/schema.py index 6033792..9a8d570 100644 --- a/examples/websockets_lib/schema.py +++ b/examples/websockets_lib/schema.py @@ -7,7 +7,6 @@ p = AsyncioPubsub() # TODO: -# - Breakup pubsub module into package and two modules # - Add explanation on how to use pubsub # - Modify author code to peform proper cleanup during cancel() diff --git a/graphql_ws/pubsub/__init__.py b/graphql_ws/pubsub/__init__.py new file mode 100644 index 0000000..442d2db --- /dev/null +++ b/graphql_ws/pubsub/__init__.py @@ -0,0 +1,2 @@ +from .asyncio import AsyncioPubsub, AsyncioRedisPubsub +from .gevent_observable import GeventRxPubsub, GeventRxRedisPubsub diff --git a/graphql_ws/pubsub/asyncio.py b/graphql_ws/pubsub/asyncio.py new file mode 100644 index 0000000..408580b --- /dev/null +++ b/graphql_ws/pubsub/asyncio.py @@ -0,0 +1,76 @@ +import asyncio +import pickle + +import aredis + + +class AsyncioPubsub: + + def __init__(self): + self.subscriptions = {} + self.sub_id = 0 + + async def publish(self, channel, payload): + if channel in self.subscriptions: + for q in self.subscriptions[channel].values(): + await q.put(payload) + + def subscribe_to_channel(self, channel): + self.sub_id += 1 + q = asyncio.Queue() + if channel in self.subscriptions: + self.subscriptions[channel][self.sub_id] = q + else: + self.subscriptions[channel] = {self.sub_id: q} + return self.sub_id, q + + def unsubscribe(self, channel, sub_id): + if sub_id in self.subscriptions.get(channel, {}): + del self.subscriptions[channel][sub_id] + if not self.subscriptions[channel]: + del self.subscriptions[channel] + + +class AsyncioRedisPubsub: + + def __init__(self, host='localhost', port=6379, *args, **kwargs): + self.redis = aredis.StrictRedis(host, port, *args, **kwargs) + self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) + self.subscriptions = {} + self.sub_id = 0 + self.task = None + + async def publish(self, channel, payload): + await self.redis.publish(channel, pickle.dumps(payload)) + + async def subscribe_to_channel(self, channel): + self.sub_id += 1 + q = asyncio.Queue() + if channel in self.subscriptions: + self.subscriptions[channel][self.sub_id] = q + else: + await self.pubsub.subscribe(channel) + self.subscriptions[channel] = {self.sub_id: q} + if not self.task: + self.task = asyncio.ensure_future( + self._wait_and_get_messages()) + return self.sub_id, q + + async def unsubscribe(self, channel, sub_id): + if sub_id in self.subscriptions.get(channel, {}): + del self.subscriptions[channel][sub_id] + if not self.subscriptions[channel]: + await self.pubsub.unsubscribe(channel) + del self.subscriptions[channel] + if not self.subscriptions: + self.task.cancel() + + async def _wait_and_get_messages(self): + while True: + msg = await self.pubsub.get_message() + if msg: + channel = msg['channel'].decode() + if channel in self.subscriptions: + for q in self.subscriptions[channel].values(): + await q.put(pickle.loads(msg['data'])) + await asyncio.sleep(.001) diff --git a/graphql_ws/pubsub.py b/graphql_ws/pubsub/gevent_observable.py similarity index 55% rename from graphql_ws/pubsub.py rename to graphql_ws/pubsub/gevent_observable.py index 4748fc9..750bbab 100644 --- a/graphql_ws/pubsub.py +++ b/graphql_ws/pubsub/gevent_observable.py @@ -1,7 +1,5 @@ -import asyncio import pickle -import aredis import gevent import redis @@ -9,78 +7,6 @@ from rx import config -class AsyncioPubsub: - - def __init__(self): - self.subscriptions = {} - self.sub_id = 0 - - async def publish(self, channel, payload): - if channel in self.subscriptions: - for q in self.subscriptions[channel].values(): - await q.put(payload) - - def subscribe_to_channel(self, channel): - self.sub_id += 1 - q = asyncio.Queue() - if channel in self.subscriptions: - self.subscriptions[channel][self.sub_id] = q - else: - self.subscriptions[channel] = {self.sub_id: q} - return self.sub_id, q - - def unsubscribe(self, channel, sub_id): - if sub_id in self.subscriptions.get(channel, {}): - del self.subscriptions[channel][sub_id] - if not self.subscriptions[channel]: - del self.subscriptions[channel] - - -class AsyncioRedisPubsub: - - def __init__(self, host='localhost', port=6379, *args, **kwargs): - self.redis = aredis.StrictRedis(host, port, *args, **kwargs) - self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True) - self.subscriptions = {} - self.sub_id = 0 - self.task = None - - async def publish(self, channel, payload): - await self.redis.publish(channel, pickle.dumps(payload)) - - async def subscribe_to_channel(self, channel): - self.sub_id += 1 - q = asyncio.Queue() - if channel in self.subscriptions: - self.subscriptions[channel][self.sub_id] = q - else: - await self.pubsub.subscribe(channel) - self.subscriptions[channel] = {self.sub_id: q} - if not self.task: - self.task = asyncio.ensure_future( - self._wait_and_get_messages()) - return self.sub_id, q - - async def unsubscribe(self, channel, sub_id): - if sub_id in self.subscriptions.get(channel, {}): - del self.subscriptions[channel][sub_id] - if not self.subscriptions[channel]: - await self.pubsub.unsubscribe(channel) - del self.subscriptions[channel] - if not self.subscriptions: - self.task.cancel() - - async def _wait_and_get_messages(self): - while True: - msg = await self.pubsub.get_message() - if msg: - channel = msg['channel'].decode() - if channel in self.subscriptions: - for q in self.subscriptions[channel].values(): - await q.put(pickle.loads(msg['data'])) - await asyncio.sleep(.001) - - class SubjectObserversWrapper(object): def __init__(self, pubsub, channel): self.pubsub = pubsub From c378ecbdf6a77aacd340566938f14ac57a406b40 Mon Sep 17 00:00:00 2001 From: hballard Date: Fri, 23 Feb 2018 23:09:31 -0600 Subject: [PATCH 12/20] Modify readme and setup.py to reflect changes --- README.md | 130 ++++++++++++++++++++++++++++++++++++++++-------------- setup.py | 4 ++ 2 files changed, 100 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 338bb9b..d5c9655 100644 --- a/README.md +++ b/README.md @@ -9,13 +9,13 @@ Currently supports: # Installation instructions -For instaling graphql-ws, just run this command in your shell +For installing graphql-ws, just run this command in your shell ```bash pip install graphql-ws ``` -## Examples +## Subscription Server ### aiohttp @@ -63,68 +63,130 @@ async def subscriptions(request, ws): app.run(host="0.0.0.0", port=8000) ``` +### Gevent +For setting up, just plug into your Gevent server. + +```python +subscription_server = GeventSubscriptionServer(schema) +app.app_protocol = lambda environ_path_info: 'graphql-ws' + +@sockets.route('/subscriptions') +def echo_socket(ws): + subscription_server.handle(ws) + return [] +``` + +## Publish-Subscribe +Includes several publish-subscribe (pubsub) classes for hooking +up your mutations to your subscriptions. When a client makes a +subscription, the pubsub can be used to map from one subscription name +to one or more channel names to subscribe to the right channels. +The subscription query will be re-run every time something is +published to one of these channels. Using these classes, a +subscription is just the result of a mutation. There are two +pubsub classes for asyncio, one that is in-memory and the other +that utilizes Redis (for production), via the [aredis](https://github.com/NoneGG/aredis) libary, which +is a asynchronous port of the excellent [redis-py](https://github.com/andymccurdy/redis-py) library. -And then, plug into a subscribable schema: +The schema for asyncio would look something like this below: ```python import asyncio import graphene +from graphql_ws.pubsub import AsyncioPubsub -class Query(graphene.ObjectType): - base = graphene.String() +# create a new pubsub object; this class is in-memory and does +# not utilze Redis +p = AsyncioPubsub() -class Subscription(graphene.ObjectType): - count_seconds = graphene.Float(up_to=graphene.Int()) +class MutationExample(graphene.Mutation): + class Arguments: + input_text = graphene.String() - async def resolve_count_seconds(root, info, up_to): - for i in range(up_to): - yield i - await asyncio.sleep(1.) - yield up_to + output_text = graphene.String() + async def mutate(self, info, input_text): + # publish to the pubsub object before returning mutation + await p.publish('BASE', input_text) + return MutationExample(output_text=input_text) -schema = graphene.Schema(query=Query, subscription=Subscription) -``` -You can see a full example here: https://github.com/graphql-python/graphql-ws/tree/master/examples/aiohttp +class Mutations(graphene.ObjectType): + mutation_example = MutationExample.Field() -### Gevent -For setting up, just plug into your Gevent server. +class Subscription(graphene.ObjectType): + mutation_example = graphene.String() + + async def resolve_mutation_example(root, info): + try: + # pubsub subscribe_to_channel method returns + # subscription id and an asyncio.Queue + sub_id, q = p.subscribe_to_channel('BASE') + while True: + payload = await q.get() + yield payload + except asyncio.CancelledError: + # unsubscribe subscription id from channel + # when coroutine is cancelled + p.unsubscribe('BASE', sub_id) + +schema = graphene.Schema(mutation=Mutations, + subscription=Subscription) +``` -```python -subscription_server = GeventSubscriptionServer(schema) -app.app_protocol = lambda environ_path_info: 'graphql-ws' +You can see a full asyncio example here: https://github.com/graphql-python/graphql-ws/tree/master/examples/aiohttp -@sockets.route('/subscriptions') -def echo_socket(ws): - subscription_server.handle(ws) - return [] -``` +### Gevent + +There are two pubsub classes for Gevent as well, one that is +in-memory and the other that utilizes Redis (for production), via +[redis-py](https://github.com/andymccurdy/redis-py). -And then, plug into a subscribable schema: +Finally, plug into a subscribable schema: ```python import graphene + +from graphql_ws.pubsub import GeventRxRedisPubsub from rx import Observable +# create a new pubsub object; in the case you'll need to +# be running a redis-server instance in a separate process +p = GeventRxRedisPubsub() -class Query(graphene.ObjectType): - base = graphene.String() + +class MutationExample(graphene.Mutation): + class Arguments: + input_text = graphene.String() + + output_text = graphene.String() + + def mutate(self, info, input_text): + # publish to the pubsub before returning mutation + p.publish('BASE', input_text) + return MutationExample(output_text=input_text) + + +class Mutations(graphene.ObjectType): + mutation_example = MutationExample.Field() class Subscription(graphene.ObjectType): - count_seconds = graphene.Float(up_to=graphene.Int()) + mutation_example = graphene.String() - async def resolve_count_seconds(root, info, up_to=5): - return Observable.interval(1000)\ - .map(lambda i: "{0}".format(i))\ - .take_while(lambda i: int(i) <= up_to) + def resolve_mutation_example(root, info): + # pubsub subscribe_to_channel method returns an observable + # when observable is cancelled, the subscription will + # be cleaned up and unsubscribed from + return p.subscribe_to_channel('BASE')\ + .map(lambda i: "{0}".format(i)) -schema = graphene.Schema(query=Query, subscription=Subscription) +schema = graphene.Schema(mutation=Mutations, + subscription=Subscription) ``` You can see a full example here: https://github.com/graphql-python/graphql-ws/tree/master/examples/flask_gevent diff --git a/setup.py b/setup.py index 99844fc..e1b932d 100644 --- a/setup.py +++ b/setup.py @@ -15,6 +15,10 @@ requirements = [ 'graphql-core>=2.0<3', + 'aredis', + 'gevent', + 'redis', + 'rx' # TODO: put package requirements here ] From 19d37ae55a69133ab5d20c31ca88ba2839a504be Mon Sep 17 00:00:00 2001 From: hballard Date: Fri, 23 Feb 2018 23:48:16 -0600 Subject: [PATCH 13/20] Rebase off of upstream master --- README.md | 57 ++++++++++++++++--------------- examples/websockets_lib/schema.py | 4 --- 2 files changed, 30 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index d5c9655..9ec10f0 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,33 @@ def echo_socket(ws): subscription_server.handle(ws) return [] ``` +### Django (with channels) + +First `pip install channels` and add it to your django apps + +Then add the following to your settings.py + +```python + CHANNELS_WS_PROTOCOLS = ["graphql-ws", ] + CHANNEL_LAYERS = { + "default": { + "BACKEND": "asgiref.inmemory.ChannelLayer", + "ROUTING": "django_subscriptions.urls.channel_routing", + }, + + } +``` + +Add the channel routes to your Django server. + +```python +from channels.routing import route_class +from graphql_ws.django_channels import GraphQLSubscriptionConsumer + +channel_routing = [ + route_class(GraphQLSubscriptionConsumer, path=r"^/subscriptions"), +] +``` ## Publish-Subscribe Includes several publish-subscribe (pubsub) classes for hooking @@ -90,6 +117,8 @@ is a asynchronous port of the excellent [redis-py](https://github.com/andymccurd The schema for asyncio would look something like this below: +### Asyncio + ```python import asyncio import graphene @@ -192,23 +221,8 @@ schema = graphene.Schema(mutation=Mutations, You can see a full example here: https://github.com/graphql-python/graphql-ws/tree/master/examples/flask_gevent -### Django Channels - - -First `pip install channels` and it to your django apps - -Then add the following to your settings.py - -```python - CHANNELS_WS_PROTOCOLS = ["graphql-ws", ] - CHANNEL_LAYERS = { - "default": { - "BACKEND": "asgiref.inmemory.ChannelLayer", - "ROUTING": "django_subscriptions.urls.channel_routing", - }, +### Django (with channels) - } -``` Setup your graphql schema @@ -254,14 +268,3 @@ GRAPHENE = { 'SCHEMA': 'path.to.schema' } ``` - -and finally add the channel routes - -```python -from channels.routing import route_class -from graphql_ws.django_channels import GraphQLSubscriptionConsumer - -channel_routing = [ - route_class(GraphQLSubscriptionConsumer, path=r"^/subscriptions"), -] -``` \ No newline at end of file diff --git a/examples/websockets_lib/schema.py b/examples/websockets_lib/schema.py index 9a8d570..c0b8adc 100644 --- a/examples/websockets_lib/schema.py +++ b/examples/websockets_lib/schema.py @@ -6,10 +6,6 @@ p = AsyncioPubsub() -# TODO: -# - Add explanation on how to use pubsub -# - Modify author code to peform proper cleanup during cancel() - class Query(graphene.ObjectType): base = graphene.String() From 5a5a6bf2c91fa7f4c8110ac74b99d772a34d7997 Mon Sep 17 00:00:00 2001 From: hballard Date: Sat, 24 Feb 2018 16:21:33 -0600 Subject: [PATCH 14/20] Add registration of disposable in gevent.py --- graphql_ws/gevent.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/graphql_ws/gevent.py b/graphql_ws/gevent.py index 92a65ce..a261d57 100644 --- a/graphql_ws/gevent.py +++ b/graphql_ws/gevent.py @@ -81,13 +81,14 @@ def on_start(self, connection_context, op_id, params): connection_context.request_context, params) assert isinstance( execution_result, Observable), "A subscription must return an observable" - execution_result.subscribe(SubscriptionObserver( + disposable = execution_result.subscribe(SubscriptionObserver( connection_context, op_id, self.send_execution_result, self.send_error, self.on_close )) + connection_context.register_operation(op_id, disposable) except Exception as e: self.send_error(connection_context, op_id, str(e)) @@ -97,7 +98,8 @@ def on_stop(self, connection_context, op_id): class SubscriptionObserver(Observer): - def __init__(self, connection_context, op_id, send_execution_result, send_error, on_close): + def __init__(self, connection_context, op_id, send_execution_result, + send_error, on_close): self.connection_context = connection_context self.op_id = op_id self.send_execution_result = send_execution_result From 257eada00234a36347de5c12b607702b3ab53766 Mon Sep 17 00:00:00 2001 From: hballard Date: Sat, 24 Feb 2018 17:53:09 -0600 Subject: [PATCH 15/20] Change try/finally on async generator cleanup --- examples/aiohttp/schema.py | 3 +-- examples/websockets_lib/schema.py | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/examples/aiohttp/schema.py b/examples/aiohttp/schema.py index c0b8adc..22e4ecb 100644 --- a/examples/aiohttp/schema.py +++ b/examples/aiohttp/schema.py @@ -41,12 +41,11 @@ class Subscription(graphene.ObjectType): async def resolve_mutation_example(root, info): try: - # sub_id, q = await p.subscribe_to_channel('BASE') sub_id, q = p.subscribe_to_channel('BASE') while True: payload = await q.get() yield payload - except asyncio.CancelledError: + finally: p.unsubscribe('BASE', sub_id) async def resolve_count_seconds(root, info, up_to=5): diff --git a/examples/websockets_lib/schema.py b/examples/websockets_lib/schema.py index c0b8adc..22e4ecb 100644 --- a/examples/websockets_lib/schema.py +++ b/examples/websockets_lib/schema.py @@ -41,12 +41,11 @@ class Subscription(graphene.ObjectType): async def resolve_mutation_example(root, info): try: - # sub_id, q = await p.subscribe_to_channel('BASE') sub_id, q = p.subscribe_to_channel('BASE') while True: payload = await q.get() yield payload - except asyncio.CancelledError: + finally: p.unsubscribe('BASE', sub_id) async def resolve_count_seconds(root, info, up_to=5): From e734d92d026e19681a704a8184afeaefc951f358 Mon Sep 17 00:00:00 2001 From: hballard Date: Sat, 24 Feb 2018 18:00:19 -0600 Subject: [PATCH 16/20] Delete unused imports in aiohttp & websockets_lib --- graphql_ws/aiohttp.py | 2 +- graphql_ws/websockets_lib.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/graphql_ws/aiohttp.py b/graphql_ws/aiohttp.py index 4af5720..e17bf74 100644 --- a/graphql_ws/aiohttp.py +++ b/graphql_ws/aiohttp.py @@ -1,4 +1,4 @@ -from inspect import isawaitable, isasyncgen +from inspect import isawaitable from asyncio import ensure_future from aiohttp import WSMsgType diff --git a/graphql_ws/websockets_lib.py b/graphql_ws/websockets_lib.py index f41a1bb..20fc03b 100644 --- a/graphql_ws/websockets_lib.py +++ b/graphql_ws/websockets_lib.py @@ -1,4 +1,4 @@ -from inspect import isawaitable, isasyncgen +from inspect import isawaitable from asyncio import ensure_future from websockets import ConnectionClosed From 0088479ac8f4f298d6de9eb29dca33e071d7597b Mon Sep 17 00:00:00 2001 From: hballard Date: Sat, 24 Feb 2018 19:29:56 -0600 Subject: [PATCH 17/20] Rename pubsub variable in examples and comments --- README.md | 18 +++++++++--------- examples/aiohttp/schema.py | 8 ++++---- examples/flask_gevent/schema.py | 6 +++--- examples/websockets_lib/schema.py | 8 ++++---- graphql_ws/pubsub/gevent_observable.py | 4 ++-- 5 files changed, 22 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 9ec10f0..afadd35 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ channel_routing = [ ``` ## Publish-Subscribe -Includes several publish-subscribe (pubsub) classes for hooking +Included are several publish-subscribe (pubsub) classes for hooking up your mutations to your subscriptions. When a client makes a subscription, the pubsub can be used to map from one subscription name to one or more channel names to subscribe to the right channels. @@ -127,7 +127,7 @@ from graphql_ws.pubsub import AsyncioPubsub # create a new pubsub object; this class is in-memory and does # not utilze Redis -p = AsyncioPubsub() +pubsub = AsyncioPubsub() class MutationExample(graphene.Mutation): @@ -138,7 +138,7 @@ class MutationExample(graphene.Mutation): async def mutate(self, info, input_text): # publish to the pubsub object before returning mutation - await p.publish('BASE', input_text) + await pubsub.publish('BASE', input_text) return MutationExample(output_text=input_text) @@ -153,14 +153,14 @@ class Subscription(graphene.ObjectType): try: # pubsub subscribe_to_channel method returns # subscription id and an asyncio.Queue - sub_id, q = p.subscribe_to_channel('BASE') + sub_id, q = pubsub.subscribe_to_channel('BASE') while True: payload = await q.get() yield payload except asyncio.CancelledError: # unsubscribe subscription id from channel # when coroutine is cancelled - p.unsubscribe('BASE', sub_id) + pubsub.unsubscribe('BASE', sub_id) schema = graphene.Schema(mutation=Mutations, subscription=Subscription) @@ -184,7 +184,7 @@ from rx import Observable # create a new pubsub object; in the case you'll need to # be running a redis-server instance in a separate process -p = GeventRxRedisPubsub() +pubsub = GeventRxRedisPubsub() class MutationExample(graphene.Mutation): @@ -195,7 +195,7 @@ class MutationExample(graphene.Mutation): def mutate(self, info, input_text): # publish to the pubsub before returning mutation - p.publish('BASE', input_text) + pubsub.publish('BASE', input_text) return MutationExample(output_text=input_text) @@ -208,9 +208,9 @@ class Subscription(graphene.ObjectType): def resolve_mutation_example(root, info): # pubsub subscribe_to_channel method returns an observable - # when observable is cancelled, the subscription will + # when observable is disposed of, the subscription will # be cleaned up and unsubscribed from - return p.subscribe_to_channel('BASE')\ + return pubsub.subscribe_to_channel('BASE')\ .map(lambda i: "{0}".format(i)) diff --git a/examples/aiohttp/schema.py b/examples/aiohttp/schema.py index 22e4ecb..351d0bd 100644 --- a/examples/aiohttp/schema.py +++ b/examples/aiohttp/schema.py @@ -4,7 +4,7 @@ from graphql_ws.pubsub import AsyncioPubsub -p = AsyncioPubsub() +pubsub = AsyncioPubsub() class Query(graphene.ObjectType): @@ -21,7 +21,7 @@ class Arguments: output_text = graphene.String() async def mutate(self, info, input_text): - await p.publish('BASE', input_text) + await pubsub.publish('BASE', input_text) return MutationExample(output_text=input_text) @@ -41,12 +41,12 @@ class Subscription(graphene.ObjectType): async def resolve_mutation_example(root, info): try: - sub_id, q = p.subscribe_to_channel('BASE') + sub_id, q = pubsub.subscribe_to_channel('BASE') while True: payload = await q.get() yield payload finally: - p.unsubscribe('BASE', sub_id) + pubsub.unsubscribe('BASE', sub_id) async def resolve_count_seconds(root, info, up_to=5): for i in range(up_to): diff --git a/examples/flask_gevent/schema.py b/examples/flask_gevent/schema.py index 242ae07..669d270 100644 --- a/examples/flask_gevent/schema.py +++ b/examples/flask_gevent/schema.py @@ -4,7 +4,7 @@ from graphql_ws.pubsub import GeventRxPubsub from rx import Observable -p = GeventRxPubsub() +pubsub = GeventRxPubsub() class Query(graphene.ObjectType): @@ -21,7 +21,7 @@ class Arguments: output_text = graphene.String() def mutate(self, info, input_text): - p.publish('BASE', input_text) + pubsub.publish('BASE', input_text) return MutationExample(output_text=input_text) @@ -41,7 +41,7 @@ class Subscription(graphene.ObjectType): def resolve_mutation_example(root, info): # subscribe_to_channel method returns an observable - return p.subscribe_to_channel('BASE')\ + return pubsub.subscribe_to_channel('BASE')\ .map(lambda i: "{0}".format(i)) def resolve_count_seconds(root, info, up_to=5): diff --git a/examples/websockets_lib/schema.py b/examples/websockets_lib/schema.py index 22e4ecb..351d0bd 100644 --- a/examples/websockets_lib/schema.py +++ b/examples/websockets_lib/schema.py @@ -4,7 +4,7 @@ from graphql_ws.pubsub import AsyncioPubsub -p = AsyncioPubsub() +pubsub = AsyncioPubsub() class Query(graphene.ObjectType): @@ -21,7 +21,7 @@ class Arguments: output_text = graphene.String() async def mutate(self, info, input_text): - await p.publish('BASE', input_text) + await pubsub.publish('BASE', input_text) return MutationExample(output_text=input_text) @@ -41,12 +41,12 @@ class Subscription(graphene.ObjectType): async def resolve_mutation_example(root, info): try: - sub_id, q = p.subscribe_to_channel('BASE') + sub_id, q = pubsub.subscribe_to_channel('BASE') while True: payload = await q.get() yield payload finally: - p.unsubscribe('BASE', sub_id) + pubsub.unsubscribe('BASE', sub_id) async def resolve_count_seconds(root, info, up_to=5): for i in range(up_to): diff --git a/graphql_ws/pubsub/gevent_observable.py b/graphql_ws/pubsub/gevent_observable.py index 750bbab..543bc28 100644 --- a/graphql_ws/pubsub/gevent_observable.py +++ b/graphql_ws/pubsub/gevent_observable.py @@ -43,7 +43,7 @@ def subscribe_to_channel(self, channel): else: subject = Subject() # monkeypatch Subject to unsubscribe pubsub on observable - # subscription.cancel() + # subscription.dispose() subject.observers = SubjectObserversWrapper(self, channel) self.subscriptions[channel] = subject return subject @@ -72,7 +72,7 @@ def subscribe_to_channel(self, channel): self.pubsub.subscribe(channel) subject = Subject() # monkeypatch Subject to unsubscribe pubsub on observable - # subscription.cancel() + # subscription.dispose() subject.observers = SubjectObserversWrapper(self, channel) self.subscriptions[channel] = subject if not self.greenlet: From 581107d50c010c446cdea9effe21362a308687ee Mon Sep 17 00:00:00 2001 From: hballard Date: Tue, 6 Mar 2018 21:20:38 -0600 Subject: [PATCH 18/20] Fix python 2 support --- README.md | 4 ++-- examples/websockets_lib/schema.py | 2 +- graphql_ws/pubsub/__init__.py | 6 +++++- setup.py | 6 +++++- 4 files changed, 13 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index afadd35..d40edb5 100644 --- a/README.md +++ b/README.md @@ -243,8 +243,8 @@ class Subscription(graphene.ObjectType): def resolve_count_seconds( - root, - info, + root, + info, up_to=5 ): return Observable.interval(1000)\ diff --git a/examples/websockets_lib/schema.py b/examples/websockets_lib/schema.py index 351d0bd..1a5f591 100644 --- a/examples/websockets_lib/schema.py +++ b/examples/websockets_lib/schema.py @@ -20,7 +20,7 @@ class Arguments: output_text = graphene.String() - async def mutate(self, info, input_text): + async def mutate(root, info, input_text): await pubsub.publish('BASE', input_text) return MutationExample(output_text=input_text) diff --git a/graphql_ws/pubsub/__init__.py b/graphql_ws/pubsub/__init__.py index 442d2db..8ec73ba 100644 --- a/graphql_ws/pubsub/__init__.py +++ b/graphql_ws/pubsub/__init__.py @@ -1,2 +1,6 @@ -from .asyncio import AsyncioPubsub, AsyncioRedisPubsub +import sys + from .gevent_observable import GeventRxPubsub, GeventRxRedisPubsub + +if sys.version_info[0] > 2: + from .asyncio import AsyncioPubsub, AsyncioRedisPubsub diff --git a/setup.py b/setup.py index e1b932d..a6e455e 100644 --- a/setup.py +++ b/setup.py @@ -3,6 +3,8 @@ """The setup script.""" +import sys + from setuptools import setup, find_packages with open('README.rst') as readme_file: @@ -15,13 +17,15 @@ requirements = [ 'graphql-core>=2.0<3', - 'aredis', 'gevent', 'redis', 'rx' # TODO: put package requirements here ] +if sys.version_info[0] > 2: + requirements.append(aredis) + setup_requirements = [ 'pytest-runner', # TODO(graphql-python): put setup requirements (distutils extensions, From 0fa5362abbea606fadbe97c2cf869b484d1055e6 Mon Sep 17 00:00:00 2001 From: hballard Date: Tue, 6 Mar 2018 21:30:31 -0600 Subject: [PATCH 19/20] Fix minor error setup.py --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index a6e455e..67bbdc9 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ ] if sys.version_info[0] > 2: - requirements.append(aredis) + requirements.append('aredis') setup_requirements = [ 'pytest-runner', From 18fa5780d5bfc3756e87f2ae76b17b637ff6f3c7 Mon Sep 17 00:00:00 2001 From: hballard Date: Tue, 6 Mar 2018 22:13:44 -0600 Subject: [PATCH 20/20] Minor change to README --- README.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index d40edb5..43d108c 100644 --- a/README.md +++ b/README.md @@ -110,15 +110,16 @@ subscription, the pubsub can be used to map from one subscription name to one or more channel names to subscribe to the right channels. The subscription query will be re-run every time something is published to one of these channels. Using these classes, a -subscription is just the result of a mutation. There are two -pubsub classes for asyncio, one that is in-memory and the other +subscription is just the result of a mutation. + +### Asyncio + +There are two pubsub classes for asyncio, one that is in-memory and the other that utilizes Redis (for production), via the [aredis](https://github.com/NoneGG/aredis) libary, which is a asynchronous port of the excellent [redis-py](https://github.com/andymccurdy/redis-py) library. The schema for asyncio would look something like this below: -### Asyncio - ```python import asyncio import graphene