Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub implementation for easily publishing and subscribing from your resolvers #11

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 128 additions & 62 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ Currently supports:

# Installation instructions

For instaling graphql-ws, just run this command in your shell
For installing graphql-ws, just run this command in your shell

```bash
pip install graphql-ws
```

## Examples
## Subscription Server

### aiohttp

Expand Down Expand Up @@ -63,90 +63,167 @@ async def subscriptions(request, ws):

app.run(host="0.0.0.0", port=8000)
```

And then, plug into a subscribable schema:
### Gevent
For setting up, just plug into your Gevent server.

```python
import asyncio
import graphene
subscription_server = GeventSubscriptionServer(schema)
app.app_protocol = lambda environ_path_info: 'graphql-ws'

@sockets.route('/subscriptions')
def echo_socket(ws):
subscription_server.handle(ws)
return []
```
### Django (with channels)

class Query(graphene.ObjectType):
base = graphene.String()
First `pip install channels` and add it to your django apps

Then add the following to your settings.py

class Subscription(graphene.ObjectType):
count_seconds = graphene.Float(up_to=graphene.Int())
```python
CHANNELS_WS_PROTOCOLS = ["graphql-ws", ]
CHANNEL_LAYERS = {
"default": {
"BACKEND": "asgiref.inmemory.ChannelLayer",
"ROUTING": "django_subscriptions.urls.channel_routing",
},

async def resolve_count_seconds(root, info, up_to):
for i in range(up_to):
yield i
await asyncio.sleep(1.)
yield up_to
}
```

Add the channel routes to your Django server.

```python
from channels.routing import route_class
from graphql_ws.django_channels import GraphQLSubscriptionConsumer

schema = graphene.Schema(query=Query, subscription=Subscription)
channel_routing = [
route_class(GraphQLSubscriptionConsumer, path=r"^/subscriptions"),
]
```

You can see a full example here: https://github.com/graphql-python/graphql-ws/tree/master/examples/aiohttp
## Publish-Subscribe
Included are several publish-subscribe (pubsub) classes for hooking
up your mutations to your subscriptions. When a client makes a
subscription, the pubsub can be used to map from one subscription name
to one or more channel names to subscribe to the right channels.
The subscription query will be re-run every time something is
published to one of these channels. Using these classes, a
subscription is just the result of a mutation.

### Gevent
### Asyncio

For setting up, just plug into your Gevent server.
There are two pubsub classes for asyncio, one that is in-memory and the other
that utilizes Redis (for production), via the [aredis](https://github.com/NoneGG/aredis) libary, which
is a asynchronous port of the excellent [redis-py](https://github.com/andymccurdy/redis-py) library.

The schema for asyncio would look something like this below:

```python
subscription_server = GeventSubscriptionServer(schema)
app.app_protocol = lambda environ_path_info: 'graphql-ws'
import asyncio
import graphene

@sockets.route('/subscriptions')
def echo_socket(ws):
subscription_server.handle(ws)
return []
from graphql_ws.pubsub import AsyncioPubsub

# create a new pubsub object; this class is in-memory and does
# not utilze Redis
pubsub = AsyncioPubsub()


class MutationExample(graphene.Mutation):
class Arguments:
input_text = graphene.String()

output_text = graphene.String()

async def mutate(self, info, input_text):
# publish to the pubsub object before returning mutation
await pubsub.publish('BASE', input_text)
return MutationExample(output_text=input_text)


class Mutations(graphene.ObjectType):
mutation_example = MutationExample.Field()


class Subscription(graphene.ObjectType):
mutation_example = graphene.String()

async def resolve_mutation_example(root, info):
try:
# pubsub subscribe_to_channel method returns
# subscription id and an asyncio.Queue
sub_id, q = pubsub.subscribe_to_channel('BASE')
while True:
payload = await q.get()
yield payload
except asyncio.CancelledError:
# unsubscribe subscription id from channel
# when coroutine is cancelled
pubsub.unsubscribe('BASE', sub_id)

schema = graphene.Schema(mutation=Mutations,
subscription=Subscription)
```

And then, plug into a subscribable schema:
You can see a full asyncio example here: https://github.com/graphql-python/graphql-ws/tree/master/examples/aiohttp

### Gevent

There are two pubsub classes for Gevent as well, one that is
in-memory and the other that utilizes Redis (for production), via
[redis-py](https://github.com/andymccurdy/redis-py).

Finally, plug into a subscribable schema:

```python
import graphene

from graphql_ws.pubsub import GeventRxRedisPubsub
from rx import Observable

# create a new pubsub object; in the case you'll need to
# be running a redis-server instance in a separate process
pubsub = GeventRxRedisPubsub()

class Query(graphene.ObjectType):
base = graphene.String()

class MutationExample(graphene.Mutation):
class Arguments:
input_text = graphene.String()

class Subscription(graphene.ObjectType):
count_seconds = graphene.Float(up_to=graphene.Int())
output_text = graphene.String()

async def resolve_count_seconds(root, info, up_to=5):
return Observable.interval(1000)\
.map(lambda i: "{0}".format(i))\
.take_while(lambda i: int(i) <= up_to)
def mutate(self, info, input_text):
# publish to the pubsub before returning mutation
pubsub.publish('BASE', input_text)
return MutationExample(output_text=input_text)


schema = graphene.Schema(query=Query, subscription=Subscription)
```
class Mutations(graphene.ObjectType):
mutation_example = MutationExample.Field()

You can see a full example here: https://github.com/graphql-python/graphql-ws/tree/master/examples/flask_gevent

class Subscription(graphene.ObjectType):
mutation_example = graphene.String()

### Django Channels
def resolve_mutation_example(root, info):
# pubsub subscribe_to_channel method returns an observable
# when observable is disposed of, the subscription will
# be cleaned up and unsubscribed from
return pubsub.subscribe_to_channel('BASE')\
.map(lambda i: "{0}".format(i))


First `pip install channels` and it to your django apps
schema = graphene.Schema(mutation=Mutations,
subscription=Subscription)
```

Then add the following to your settings.py
You can see a full example here: https://github.com/graphql-python/graphql-ws/tree/master/examples/flask_gevent

```python
CHANNELS_WS_PROTOCOLS = ["graphql-ws", ]
CHANNEL_LAYERS = {
"default": {
"BACKEND": "asgiref.inmemory.ChannelLayer",
"ROUTING": "django_subscriptions.urls.channel_routing",
},

}
```
### Django (with channels)


Setup your graphql schema

Expand All @@ -167,8 +244,8 @@ class Subscription(graphene.ObjectType):


def resolve_count_seconds(
root,
info,
root,
info,
up_to=5
):
return Observable.interval(1000)\
Expand All @@ -192,14 +269,3 @@ GRAPHENE = {
'SCHEMA': 'path.to.schema'
}
```

and finally add the channel routes

```python
from channels.routing import route_class
from graphql_ws.django_channels import GraphQLSubscriptionConsumer

channel_routing = [
route_class(GraphQLSubscriptionConsumer, path=r"^/subscriptions"),
]
```
35 changes: 34 additions & 1 deletion examples/aiohttp/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,32 @@
import asyncio
import graphene

from graphql_ws.pubsub import AsyncioPubsub

pubsub = AsyncioPubsub()


class Query(graphene.ObjectType):
base = graphene.String()

async def resolve_base(root, info):
return 'Hello World!'


class MutationExample(graphene.Mutation):
class Arguments:
input_text = graphene.String()

output_text = graphene.String()

async def mutate(self, info, input_text):
await pubsub.publish('BASE', input_text)
return MutationExample(output_text=input_text)


class Mutations(graphene.ObjectType):
mutation_example = MutationExample.Field()


class RandomType(graphene.ObjectType):
seconds = graphene.Int()
Expand All @@ -15,6 +37,16 @@ class RandomType(graphene.ObjectType):
class Subscription(graphene.ObjectType):
count_seconds = graphene.Float(up_to=graphene.Int())
random_int = graphene.Field(RandomType)
mutation_example = graphene.String()

async def resolve_mutation_example(root, info):
try:
sub_id, q = pubsub.subscribe_to_channel('BASE')
while True:
payload = await q.get()
yield payload
finally:
pubsub.unsubscribe('BASE', sub_id)

async def resolve_count_seconds(root, info, up_to=5):
for i in range(up_to):
Expand All @@ -31,4 +63,5 @@ async def resolve_random_int(root, info):
i += 1


schema = graphene.Schema(query=Query, subscription=Subscription)
schema = graphene.Schema(query=Query, mutation=Mutations,
subscription=Subscription)
3 changes: 2 additions & 1 deletion examples/flask_gevent/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ def graphql_view():


app.add_url_rule(
'/graphql', view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=False))
'/graphql', view_func=GraphQLView.as_view('graphql', schema=schema,
graphiql=False))

subscription_server = GeventSubscriptionServer(schema)
app.app_protocol = lambda environ_path_info: 'graphql-ws'
Expand Down
38 changes: 33 additions & 5 deletions examples/flask_gevent/schema.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,58 @@
import random
import graphene
import random

from graphql_ws.pubsub import GeventRxPubsub
from rx import Observable

pubsub = GeventRxPubsub()


class Query(graphene.ObjectType):
base = graphene.String()

def resolve_base(root, info):
return 'Hello World!'


class MutationExample(graphene.Mutation):
class Arguments:
input_text = graphene.String()

output_text = graphene.String()

def mutate(self, info, input_text):
pubsub.publish('BASE', input_text)
return MutationExample(output_text=input_text)


class Mutations(graphene.ObjectType):
mutation_example = MutationExample.Field()


class RandomType(graphene.ObjectType):
seconds = graphene.Int()
random_int = graphene.Int()


class Subscription(graphene.ObjectType):

count_seconds = graphene.Int(up_to=graphene.Int())

random_int = graphene.Field(RandomType)
mutation_example = graphene.String()

def resolve_mutation_example(root, info):
# subscribe_to_channel method returns an observable
return pubsub.subscribe_to_channel('BASE')\
.map(lambda i: "{0}".format(i))

def resolve_count_seconds(root, info, up_to=5):
return Observable.interval(1000)\
.map(lambda i: "{0}".format(i))\
.take_while(lambda i: int(i) <= up_to)

def resolve_random_int(root, info):
return Observable.interval(1000).map(lambda i: RandomType(seconds=i, random_int=random.randint(0, 500)))
return Observable.interval(1000).map(
lambda i: RandomType(seconds=i, random_int=random.randint(0, 500)))


schema = graphene.Schema(query=Query, subscription=Subscription)
schema = graphene.Schema(query=Query, mutation=Mutations,
subscription=Subscription)
3 changes: 1 addition & 2 deletions examples/flask_gevent/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,8 @@ def render_graphiql():
</script>
</body>
</html>''').substitute(
GRAPHIQL_VERSION='0.11.7',
GRAPHIQL_VERSION='0.10.2',
SUBSCRIPTIONS_TRANSPORT_VERSION='0.7.0',
subscriptionsEndpoint='ws://localhost:5000/subscriptions',
# subscriptionsEndpoint='ws://localhost:5000/',
endpointURL='/graphql',
)
Loading