Skip to content

Commit

Permalink
feat: exposed pubnub subscription methods as sync and async versions
Browse files Browse the repository at this point in the history
  • Loading branch information
rado0x54 committed Feb 17, 2021
1 parent 2e08fad commit 950405d
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 24 deletions.
33 changes: 29 additions & 4 deletions pysnoo/pubnub.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from .models import ActivityState, SessionLevel
from .const import SNOO_PUBNUB_PUBLISH_KEY, SNOO_PUBNUB_SUBSCRIBE_KEY

_LOGGER = logging.getLogger(__name__)


class SnooSubscribeListener(SubscribeCallback):
"""Snoo Subscription Listener Class"""
Expand All @@ -24,10 +26,12 @@ def status(self, pubnub, status):
"""PubNub Status Callback Implementation"""
if utils.is_subscribed_event(status) and not self.connected_event.is_set():
self.connected_event.set()
self.disconnected_event.clear()
elif utils.is_unsubscribed_event(status) and not self.disconnected_event.is_set():
self.disconnected_event.set()
self.connected_event.clear()
elif status.is_error():
logging.error('Error in Snoo PubNub Listener of Category: %s', status.category)
_LOGGER.error('Error in Snoo PubNub Listener of Category: %s', status.category)

def message(self, pubnub, message):
"""PubNub Message Callback Implementation"""
Expand All @@ -36,6 +40,10 @@ def message(self, pubnub, message):
def presence(self, pubnub, presence):
"""PubNub Presence Callback Implementation"""

def is_connected(self):
"""Returns true if the listener is currently connected to an active subscription"""
return self.connected_event.is_set()

async def wait_for_connect(self):
"""Async utility function that waits for subscription connect."""
if not self.connected_event.is_set():
Expand Down Expand Up @@ -63,6 +71,8 @@ def __init__(self,
self._controlcommand_channel = 'ControlCommand.{}'.format(serial_number)
self._pubnub = PubNubAsyncio(self.config, custom_event_loop=custom_event_loop)
self._listener = SnooSubscribeListener(self._activy_state_callback)
# Add listener
self._pubnub.add_listener(self._listener)
self._external_listeners: List[Callable[[ActivityState], None]] = []

@staticmethod
Expand Down Expand Up @@ -95,20 +105,35 @@ def _activy_state_callback(self, state: ActivityState):
for update_callback in self._external_listeners:
update_callback(state)

async def subscribe(self):
def subscribe(self):
"""Subscribe to Snoo Activity Channel"""
self._pubnub.add_listener(self._listener)
if self._listener.is_connected():
_LOGGER.warning('Trying to subscribe PubNub instance that is already subscribed to %s',
self._activiy_channel)
return

self._pubnub.subscribe().channels([
self._activiy_channel
]).execute()

async def subscribe_and_await_connect(self):
"""Subscribe to Snoo Activity Channel and await connect"""
self.subscribe()
await self._listener.wait_for_connect()

async def unsubscribe(self):
def unsubscribe(self):
"""Unsubscribe to Snoo Activity Channel"""
if not self._listener.is_connected():
_LOGGER.warning('Trying to unsubscribe PubNub instance that is NOT subscribed to %s', self._activiy_channel)
return

self._pubnub.unsubscribe().channels(
self._activiy_channel
).execute()

async def unsubscribe_and_await_disconnect(self):
"""Unsubscribe to Snoo Activity Channel and await disconnect"""
self.unsubscribe()
await self._listener.wait_for_disconnect()

async def history(self, count=1):
Expand Down
4 changes: 2 additions & 2 deletions scripts/snoo
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ async def monitor(snoo: Snoo, args):
for activity_state in await pubnub.history():
as_callback(activity_state)

await pubnub.subscribe()
await pubnub.subscribe_and_await_connect()

try:
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
pass
finally:
await pubnub.unsubscribe()
await pubnub.unsubscribe_and_await_disconnect()
await pubnub.stop()


Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""PySnoo setup script."""
from setuptools import setup

_VERSION = '0.1.1'
_VERSION = '0.1.2'


def readme():
Expand Down
50 changes: 33 additions & 17 deletions tests/test_snoo_pubnub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import json

from pubnub.enums import PNOperationType, PNStatusCategory
from pubnub.callbacks import SubscribeCallback
from pubnub.models.consumer.common import PNStatus
from pubnub.models.consumer.pubsub import PNMessageResult

Expand Down Expand Up @@ -90,33 +89,39 @@ async def test_publish_goto_state_with_hold(self, mocked_request):
self.assertEqual(options.query_string,
f'auth=ACCESS_TOKEN&pnsdk=PubNub-Python-Asyncio%2F{self.pubnub._pubnub.SDK_VERSION}&uuid=UUID')

@patch('pubnub.pubnub_core.PubNubCore.add_listener')
@patch('pubnub.managers.SubscriptionManager.adapt_subscribe_builder')
async def test_subscribe(self, mocked_subscribe_builder, mocked_add_listener):
async def test_subscribe_and_await_connect(self, mocked_subscribe_builder):
"""Test subscribe"""
# Setup

def add_listener_side_effect(listener: SubscribeCallback):
# Call Connect Status.
pn_status = PNStatus()
pn_status.category = PNStatusCategory.PNConnectedCategory
# Call after 1s: listener.status(self.pubnub._pubnub, pn_status)
self.loop.call_later(1, listener.status, self.pubnub._pubnub, pn_status) # pylint: disable=protected-access

mocked_add_listener.side_effect = add_listener_side_effect
# pylint: disable=protected-access
# Call Connect Status.
pn_status = PNStatus()
pn_status.category = PNStatusCategory.PNConnectedCategory
# Call after 1s: listener.status(self.pubnub._pubnub, pn_status)
self.loop.call_later(1, self.pubnub._listener.status,
self.pubnub._pubnub, pn_status)

await self.pubnub.subscribe()
await self.pubnub.subscribe_and_await_connect()

mocked_add_listener.assert_called_once()
mocked_subscribe_builder.assert_called_once()
subscribe_operation = mocked_subscribe_builder.mock_calls[0][1][0]
self.assertEqual(subscribe_operation.channels, ['ActivityState.SERIAL_NUMBER'])
self.assertEqual(subscribe_operation.channel_groups, [])
self.assertEqual(subscribe_operation.presence_enabled, False)
self.assertEqual(subscribe_operation.timetoken, 0)

@patch('pubnub.managers.SubscriptionManager.adapt_subscribe_builder')
def test_prevent_multiple_subscription(self, mocked_subscribe_builder):
"""Test prevent multiple subscriptions"""
# pylint: disable=protected-access
# Set Listener as connected
self.pubnub._listener.connected_event.set()

self.pubnub.subscribe()

mocked_subscribe_builder.assert_not_called()

@patch('pubnub.managers.SubscriptionManager.adapt_unsubscribe_builder')
async def test_unsubscribe(self, mocked_unsubscribe_builder):
async def test_unsubscribe_and_await_disconnect(self, mocked_unsubscribe_builder):
"""Test unsubscribe"""
# pylint: disable=protected-access
# Call Connect Status.
Expand All @@ -125,14 +130,25 @@ async def test_unsubscribe(self, mocked_unsubscribe_builder):
pn_status.operation = PNOperationType.PNUnsubscribeOperation
# Call after 1s: listener.status(self.pubnub._pubnub, pn_status)
self.loop.call_later(1, self.pubnub._listener.status, self.pubnub._pubnub, pn_status)
# Listener is connected:
self.pubnub._listener.connected_event.set()

await self.pubnub.unsubscribe()
await self.pubnub.unsubscribe_and_await_disconnect()

mocked_unsubscribe_builder.assert_called_once()
unsubscribe_operation = mocked_unsubscribe_builder.mock_calls[0][1][0]
self.assertEqual(unsubscribe_operation.channels, ['ActivityState.SERIAL_NUMBER'])
self.assertEqual(unsubscribe_operation.channel_groups, [])

@patch('pubnub.managers.SubscriptionManager.adapt_unsubscribe_builder')
def test_prevent_multiple_unsubscription(self, mocked_unsubscribe_builder):
"""Test prevent multiple unsubscriptions"""

# Listener is disconnected (initial state)
self.pubnub.unsubscribe()

mocked_unsubscribe_builder.assert_not_called()

@patch('pubnub.pubnub_asyncio.PubNubAsyncio.request_future')
async def test_history(self, mocked_request):
"""Test history"""
Expand Down

0 comments on commit 950405d

Please sign in to comment.