-
Notifications
You must be signed in to change notification settings - Fork 115
/
Copy pathtest_heartbeat.py
81 lines (59 loc) · 2.59 KB
/
test_heartbeat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import logging
import asyncio
import pytest
import pubnub as pn
from pubnub.pubnub_asyncio import PubNubAsyncio, SubscribeListener
from tests import helper
from tests.helper import pnconf_env_copy
pn.set_stream_logger('pubnub', logging.DEBUG)
messenger_config = pnconf_env_copy()
messenger_config.set_presence_timeout(8)
messenger_config.enable_subscribe = True
messenger_config.uuid = helper.gen_channel("messenger")
listener_config = pnconf_env_copy()
listener_config.enable_subscribe = True
listener_config.uuid = helper.gen_channel("listener")
@pytest.mark.asyncio
async def test_timeout_event_on_broken_heartbeat(event_loop):
ch = helper.gen_channel("heartbeat-test")
pubnub = PubNubAsyncio(messenger_config, custom_event_loop=event_loop)
pubnub_listener = PubNubAsyncio(listener_config, custom_event_loop=event_loop)
pubnub.config.uuid = helper.gen_channel("messenger")
pubnub_listener.config.uuid = helper.gen_channel("listener")
# - connect to :ch-pnpres
callback_presence = SubscribeListener()
pubnub_listener.add_listener(callback_presence)
pubnub_listener.subscribe().channels(ch).with_presence().execute()
await callback_presence.wait_for_connect()
envelope = await callback_presence.wait_for_presence_on(ch)
assert ch == envelope.channel
assert 'join' == envelope.event
assert pubnub_listener.uuid == envelope.uuid
# - connect to :ch
callback_messages = SubscribeListener()
pubnub.add_listener(callback_messages)
pubnub.subscribe().channels(ch).execute()
useless_connect_future = asyncio.ensure_future(callback_messages.wait_for_connect())
presence_future = asyncio.ensure_future(callback_presence.wait_for_presence_on(ch))
# - assert join event
await asyncio.wait([useless_connect_future, presence_future])
prs_envelope = presence_future.result()
assert ch == prs_envelope.channel
assert 'join' == prs_envelope.event
assert pubnub.uuid == prs_envelope.uuid
# wait for one heartbeat call
await asyncio.sleep(8)
# - break messenger heartbeat loop
pubnub._subscription_manager._stop_heartbeat_timer()
# - assert for timeout
envelope = await callback_presence.wait_for_presence_on(ch)
assert ch == envelope.channel
assert 'timeout' == envelope.event
assert pubnub.uuid == envelope.uuid
pubnub.unsubscribe().channels(ch).execute()
await callback_messages.wait_for_disconnect()
# - disconnect from :ch-pnpres
pubnub_listener.unsubscribe().channels(ch).execute()
await callback_presence.wait_for_disconnect()
await pubnub.stop()
await pubnub_listener.stop()