Skip to content

Commit

Permalink
update docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
grdsdev committed Feb 18, 2025
1 parent 1ec8aab commit 026d3ab
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 31 deletions.
68 changes: 43 additions & 25 deletions realtime/_async/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@

class AsyncRealtimeChannel:
"""
`Channel` is an abstraction for a topic listener for an existing socket connection.
Each Channel has its own topic and a list of event-callbacks that responds to messages.
Should only be instantiated through `connection.RealtimeClient().channel(topic)`.
Channel is an abstraction for a topic subscription on an existing socket connection.
Each Channel has its own topic and a list of event-callbacks that respond to messages.
Should only be instantiated through `AsyncRealtimeClient.channel(topic)`.
"""

def __init__(
Expand Down Expand Up @@ -149,9 +149,12 @@ async def subscribe(
] = None,
) -> AsyncRealtimeChannel:
"""
Subscribe to the channel.
Subscribe to the channel. Can only be called once per channel instance.
:return: The Channel instance for method chaining.
:param callback: Optional callback function that receives subscription state updates
and any errors that occur during subscription
:return: The Channel instance for method chaining
:raises: Exception if called multiple times on the same channel instance
"""
if not self.socket.is_connected:
await self.socket.connect()
Expand Down Expand Up @@ -251,6 +254,10 @@ def on_join_push_timeout(*args):
return self

async def unsubscribe(self):
"""
Unsubscribe from the channel and leave the topic.
Sets channel state to LEAVING and cleans up timers and pushes.
"""
self.state = ChannelStates.LEAVING

self.rejoin_timer.reset()
Expand All @@ -271,6 +278,15 @@ def _close(*args):
async def push(
self, event: str, payload: Dict[str, Any], timeout: Optional[int] = None
) -> AsyncPush:
"""
Push a message to the channel.
:param event: The event name to push
:param payload: The payload to send
:param timeout: Optional timeout in milliseconds
:return: AsyncPush instance representing the push operation
:raises: Exception if called before subscribing to the channel
"""
if not self._joined_once:
raise Exception(
f"tried to push '{event}' to '{self.topic}' before joining. Use channel.subscribe() before pushing events"
Expand Down Expand Up @@ -352,9 +368,9 @@ def on_broadcast(
"""
Set up a listener for a specific broadcast event.
:param event: The name of the broadcast event to listen for.
:param callback: The callback function to execute when the event is received.
:return: The Channel instance for method chaining.
:param event: The name of the broadcast event to listen for
:param callback: Function called with the payload when a matching broadcast is received
:return: The Channel instance for method chaining
"""
return self._on(
"broadcast",
Expand All @@ -371,13 +387,14 @@ def on_postgres_changes(
filter: Optional[str] = None,
) -> AsyncRealtimeChannel:
"""
Set up a listener for a specific Postgres changes event.
:param event: The name of the Postgres changes event to listen for.
:param table: The table name for which changes should be monitored.
:param callback: The callback function to execute when the event is received.
:param schema: The database schema where the table exists. Default is 'public'.
:return: The Channel instance for method chaining.
Set up a listener for Postgres database changes.
:param event: The type of database event to listen for (INSERT, UPDATE, DELETE, or *)
:param callback: Function called with the payload when a matching change is detected
:param table: The table name to monitor. Defaults to "*" for all tables
:param schema: The database schema to monitor. Defaults to "public"
:param filter: Optional filter string to apply
:return: The Channel instance for method chaining
"""

binding_filter = {"event": event, "schema": schema, "table": table}
Expand All @@ -404,22 +421,24 @@ def on_system(
# Presence methods
async def track(self, user_status: Dict[str, Any]) -> None:
"""
Track a user's presence.
Track presence status for the current user.
:param user_status: User's presence status.
:return: None
:param user_status: Dictionary containing the user's presence information
"""
await self.send_presence("track", user_status)

async def untrack(self) -> None:
"""
Untrack a user's presence.
:return: None
Stop tracking presence for the current user.
"""
await self.send_presence("untrack", {})

def presence_state(self) -> RealtimePresenceState:
"""
Get the current state of presence on this channel.
:return: Dictionary mapping presence keys to lists of presence payloads
"""
return self.presence.state

def on_presence_sync(self, callback: Callable[[], None]) -> AsyncRealtimeChannel:
Expand Down Expand Up @@ -459,11 +478,10 @@ def on_presence_leave(
# Broadcast methods
async def send_broadcast(self, event: str, data: Any) -> None:
"""
Sends a broadcast message to the current channel.
Send a broadcast message through this channel.
:param event: The name of the broadcast event.
:param data: The data to be sent with the message.
:return: An asyncio.Future object representing the send operation.
:param event: The name of the broadcast event
:param data: The payload to broadcast
"""
await self.push(
ChannelEvents.broadcast,
Expand Down
15 changes: 9 additions & 6 deletions realtime/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ def __init__(
Initialize a RealtimeClient instance for WebSocket communication.
:param url: WebSocket URL of the Realtime server. Starts with `ws://` or `wss://`.
Also accepts default Supabase URL: `http://` or `https://`.
Also accepts default Supabase URL: `http://` or `https://`.
:param token: Authentication token for the WebSocket connection.
:param auto_reconnect: If True, automatically attempt to reconnect on disconnection. Defaults to False.
:param params: Optional parameters for the connection. Defaults to an empty dictionary.
:param auto_reconnect: If True, automatically attempt to reconnect on disconnection. Defaults to True.
:param params: Optional parameters for the connection. Defaults to None.
:param hb_interval: Interval (in seconds) for sending heartbeat messages to keep the connection alive. Defaults to 30.
:param max_retries: Maximum number of reconnection attempts. Defaults to 5.
:param initial_backoff: Initial backoff time (in seconds) for reconnection attempts. Defaults to 1.0.
:param websocket_factory: Optional factory function to create websocket connections (useful for testing).
:param timeout: Connection timeout in seconds. Defaults to DEFAULT_TIMEOUT.
"""
if not is_ws_url(url):
ValueError("url must be a valid WebSocket URL or HTTP URL string")
Expand Down Expand Up @@ -249,8 +249,11 @@ def channel(
self, topic: str, params: Optional[RealtimeChannelOptions] = None
) -> AsyncRealtimeChannel:
"""
:param topic: Initializes a channel and creates a two-way association with the socket
:return: Channel
Initialize a channel and create a two-way association with the socket.
:param topic: The topic to subscribe to
:param params: Optional channel parameters
:return: AsyncRealtimeChannel instance
"""
topic = f"realtime:{topic}"
chan = AsyncRealtimeChannel(self, topic, params)
Expand Down

0 comments on commit 026d3ab

Please sign in to comment.