-
Notifications
You must be signed in to change notification settings - Fork 115
/
Copy pathtest_channel_groups.py
168 lines (120 loc) · 5.93 KB
/
test_channel_groups.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import asyncio
import pytest
from pubnub.models.consumer.channel_group import PNChannelGroupsAddChannelResult, PNChannelGroupsListResult, \
PNChannelGroupsRemoveChannelResult, PNChannelGroupsRemoveGroupResult
from pubnub.pubnub_asyncio import PubNubAsyncio
from tests.helper import pnconf, pnconf_copy, pnconf_pam_env_copy
from tests.integrational.vcr_asyncio_sleeper import get_sleeper
from tests.integrational.vcr_helper import pn_vcr
@get_sleeper('tests/integrational/fixtures/asyncio/groups/add_remove_single_channel.yaml')
@pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/groups/add_remove_single_channel.yaml',
filter_query_parameters=['uuid', 'pnsdk', 'l_cg', 'l_pub'])
@pytest.mark.asyncio
async def test_add_remove_single_channel(event_loop, sleeper=asyncio.sleep):
pubnub = PubNubAsyncio(pnconf_copy(), custom_event_loop=event_loop)
pubnub.config.uuid = 'test-channel-group-asyncio-uuid1'
ch = "test-channel-groups-asyncio-ch"
gr = "test-channel-groups-asyncio-cg"
await pubnub.publish().channel(ch).message("hey").future()
# add
env = await pubnub.add_channel_to_channel_group() \
.channels(ch).channel_group(gr).future()
assert isinstance(env.result, PNChannelGroupsAddChannelResult)
await sleeper(1)
# list
env = await pubnub.list_channels_in_channel_group().channel_group(gr).future()
assert isinstance(env.result, PNChannelGroupsListResult)
assert len(env.result.channels) == 1
assert env.result.channels[0] == ch
# remove
env = await pubnub.remove_channel_from_channel_group() \
.channels(ch).channel_group(gr).future()
assert isinstance(env.result, PNChannelGroupsRemoveChannelResult)
await sleeper(1)
# change uuid to let vcr to distinguish list requests
pubnub.config.uuid = 'test-channel-group-asyncio-uuid2'
# list
env = await pubnub.list_channels_in_channel_group().channel_group(gr).future()
assert isinstance(env.result, PNChannelGroupsListResult)
assert len(env.result.channels) == 0
await pubnub.stop()
@get_sleeper('tests/integrational/fixtures/asyncio/groups/add_remove_multiple_channels.yaml')
@pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/groups/add_remove_multiple_channels.yaml',
filter_query_parameters=['uuid', 'pnsdk', 'l_cg', 'l_pub'])
@pytest.mark.asyncio
async def test_add_remove_multiple_channels(event_loop, sleeper=asyncio.sleep):
pubnub = PubNubAsyncio(pnconf, custom_event_loop=event_loop)
ch1 = "channel-groups-tornado-ch1"
ch2 = "channel-groups-tornado-ch2"
gr = "channel-groups-tornado-cg"
# add
env = await pubnub.add_channel_to_channel_group() \
.channels([ch1, ch2]).channel_group(gr).future()
assert isinstance(env.result, PNChannelGroupsAddChannelResult)
await sleeper(1)
# list
env = await pubnub.list_channels_in_channel_group().channel_group(gr).future()
assert isinstance(env.result, PNChannelGroupsListResult)
assert len(env.result.channels) == 2
assert ch1 in env.result.channels
assert ch2 in env.result.channels
# remove
env = await pubnub.remove_channel_from_channel_group() \
.channels([ch1, ch2]).channel_group(gr).future()
assert isinstance(env.result, PNChannelGroupsRemoveChannelResult)
await sleeper(1)
# list
env = await pubnub.list_channels_in_channel_group().channel_group(gr).future()
assert isinstance(env.result, PNChannelGroupsListResult)
assert len(env.result.channels) == 0
await pubnub.stop()
@get_sleeper('tests/integrational/fixtures/asyncio/groups/add_channel_remove_group.yaml')
@pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/groups/add_channel_remove_group.yaml',
filter_query_parameters=['uuid', 'pnsdk', 'l_cg', 'l_pub'])
@pytest.mark.asyncio
async def test_add_channel_remove_group(event_loop, sleeper=asyncio.sleep):
pubnub = PubNubAsyncio(pnconf, custom_event_loop=event_loop)
ch = "channel-groups-tornado-ch"
gr = "channel-groups-tornado-cg"
# add
env = await pubnub.add_channel_to_channel_group() \
.channels(ch).channel_group(gr).future()
assert isinstance(env.result, PNChannelGroupsAddChannelResult)
await sleeper(1)
# list
env = await pubnub.list_channels_in_channel_group().channel_group(gr).future()
assert isinstance(env.result, PNChannelGroupsListResult)
assert len(env.result.channels) == 1
assert env.result.channels[0] == ch
# remove group
env = await pubnub.remove_channel_group().channel_group(gr).future()
assert isinstance(env.result, PNChannelGroupsRemoveGroupResult)
await sleeper(1)
# list
env = await pubnub.list_channels_in_channel_group().channel_group(gr).future()
assert isinstance(env.result, PNChannelGroupsListResult)
assert len(env.result.channels) == 0
await pubnub.stop()
@pytest.mark.asyncio
async def test_super_call(event_loop):
pubnub = PubNubAsyncio(pnconf_pam_env_copy(), custom_event_loop=event_loop)
ch = "channel-groups-torna|do-ch"
gr = "channel-groups-torna|do-cg"
pubnub.config.auth = "h.e|l%l,0"
# add
env = await pubnub.add_channel_to_channel_group() \
.channels(ch).channel_group(gr).future()
assert isinstance(env.result, PNChannelGroupsAddChannelResult)
# list
env = await pubnub.list_channels_in_channel_group().channel_group(gr).future()
assert isinstance(env.result, PNChannelGroupsListResult)
# remove channel
env = await pubnub.remove_channel_from_channel_group().channel_group(gr).channels(ch).future()
assert isinstance(env.result, PNChannelGroupsRemoveChannelResult)
# remove group
env = await pubnub.remove_channel_group().channel_group(gr).future()
assert isinstance(env.result, PNChannelGroupsRemoveGroupResult)
# list
env = await pubnub.list_channels_in_channel_group().channel_group(gr).future()
assert isinstance(env.result, PNChannelGroupsListResult)
await pubnub.stop()