Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ws Connection Issues + Connectivity Updates + Notification Keys #62

Merged
merged 31 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9753235
fix: Env mismatches + workflow stage name
kevinszuchet Feb 10, 2025
6dee0ec
chore: add logs
kevinszuchet Feb 10, 2025
b931341
chore: Add more logs to rpc calls
kevinszuchet Feb 10, 2025
43f76ef
chore: More logs
kevinszuchet Feb 10, 2025
d1214fa
chore: More and more logs
kevinszuchet Feb 10, 2025
6458ed2
refactor: Back to initial implementation of handling updates
kevinszuchet Feb 10, 2025
ef65fe2
refactor: Try to avoid deadlocks and improve subscription cleanups
kevinszuchet Feb 10, 2025
98d18c1
refactor: Remove races
kevinszuchet Feb 10, 2025
1a0b054
chore: New log
kevinszuchet Feb 10, 2025
2b7e6c3
refactor: Release pg client in the finally statement
kevinszuchet Feb 11, 2025
663e775
refactor: Remove set immediate for notifications
kevinszuchet Feb 11, 2025
f143be9
feat: Avoid overloading the server with the hearbeats
kevinszuchet Feb 11, 2025
5af8b7f
feat: Proper generator cleanup
kevinszuchet Feb 11, 2025
0165251
chore: Change peers endpoint
kevinszuchet Feb 11, 2025
8c69cab
chore: More logs
kevinszuchet Feb 11, 2025
9c572b7
chore: more data on the closing because no auth chain
kevinszuchet Feb 11, 2025
5b9fe9e
chore: Log active connections
kevinszuchet Feb 11, 2025
0ea2af2
chore: More logs
kevinszuchet Feb 11, 2025
03a7e29
refactor: Avoid crashing when transport is not ready
kevinszuchet Feb 13, 2025
88fd5ca
fix: IN clause was not working on the getOnlineFriends query
kevinszuchet Feb 13, 2025
f912f15
chore: More logs for connectivity
kevinszuchet Feb 13, 2025
fc16142
fix: wrong log data
kevinszuchet Feb 13, 2025
afb2a35
fix: Back to hearbeat notifying connected peers
kevinszuchet Feb 13, 2025
63a35b7
test: Fix tests
kevinszuchet Feb 13, 2025
8f88749
chore: Improve logs
kevinszuchet Feb 13, 2025
e0279ae
refactor: Defer sending notification
kevinszuchet Feb 13, 2025
ca844f5
chore: Remove logs from services
kevinszuchet Feb 13, 2025
a8c694b
feat: Notify connectivity when user accepts request (#63)
kevinszuchet Feb 14, 2025
1457981
Merge branch 'main' into chore/add-logs-to-rpc
kevinszuchet Feb 14, 2025
21e60e4
test: Improve get online friends test
kevinszuchet Feb 14, 2025
486cf0d
chore: Improve README flows
kevinszuchet Feb 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.default
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
HTTP_SERVER_PORT=3000
HTTP_SERVER_HOST=0.0.0.0

RPC_SERVER_PORT=8085

# reset metrics at 00:00UTC
WKC_METRICS_RESET_AT_NIGHT=false

Expand Down
178 changes: 86 additions & 92 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,54 +35,36 @@ A microservice that handles social interactions (friendships) for Decentraland,
This service follows the Well Known Components pattern, where each component is a self-contained unit with a clear interface. The main components are:

- **Database (PostgreSQL)**: Stores friendship relationships and actions
- **Cache (Redis)**: Handles temporary data and real-time status
- **RPC Server**: Manages client-server RPC communication
- **Cache (Redis)**: Handles temporary information, real-time status, and frequently accessed data
- **RPC Server**: Manages client-server RPC communication following the [Protocol definition](https://github.com/decentraland/protocol/blob/main/proto/decentraland/social_service/v2/social_service_v2.proto)
- **PubSub**: Handles real-time updates
- **Archipelago Stats**: Integrates with Decentraland's peer discovery system
- **Peer Tracking**: Monitors online status of users through the NATS messaging system
- **Catalyst Client**: Fetches profiles from the Catalyst Lambdas API
- **Peers Synchronization**: Synchronizes peers with the Archipelago Stats service and store them in Redis

### Database Design

```plantuml
@startuml
!define table(x) class x << (T,#FFAAAA) >>
!define primary_key(x) <u>x</u>
!define foreign_key(x) #x#
hide methods
hide stereotypes

table(friendships) {
primary_key(id): uuid
address_requester: varchar
address_requested: varchar
is_active: boolean
created_at: timestamp
updated_at: timestamp
--
indexes
..
hash(address_requester)
hash(address_requested)
btree(LOWER(address_requester))
btree(LOWER(address_requested))
}

table(friendship_actions) {
primary_key(id): uuid
foreign_key(friendship_id): uuid
action: varchar
acting_user: varchar
metadata: jsonb
timestamp: timestamp
--
indexes
..
btree(friendship_id)
}

friendships ||--|{ friendship_actions
@enduml
```
erDiagram
FRIENDSHIPS {
uuid id PK
varchar address_requester
varchar address_requested
boolean is_active
timestamp created_at
timestamp updated_at
}
FRIENDSHIP_ACTIONS {
uuid id PK
uuid friendship_id FK
varchar action
varchar acting_user
jsonb metadata
timestamp timestamp
}

FRIENDSHIPS ||--o{ FRIENDSHIP_ACTIONS : "has"
```

The database schema supports:
Expand All @@ -98,64 +80,71 @@ See migrations for details: [migrations](./src/migrations)

```mermaid
sequenceDiagram
participant Client
participant WebSocket
participant RPC Server
participant Redis
participant NATS
participant DB

Note over Client,DB: Connection Setup
Client->>WebSocket: WS Handshake
activate WebSocket
WebSocket-->>Client: Connection Established
Client->>WebSocket: Auth Message
WebSocket->>RPC Server: Attach Transport
activate RPC Server

Note over RPC Server,NATS: Subscriptions Setup
RPC Server->>Redis: Subscribe to updates channels
activate Redis
Note over Redis: friendship.updates
Note over Redis: friend.status.updates
RPC Server->>NATS: Subscribe to peer events
activate NATS
Note over NATS: peer.*.connected
Note over NATS: peer.*.disconnected
Note over NATS: peer.*.heartbeat

Note over Client,DB: Friendship Request Flow
Client->>RPC Server: Friend Request
RPC Server->>DB: Create Friendship Record
DB-->>RPC Server: Friendship Created
RPC Server->>DB: Record Friendship Action
RPC Server->>Redis: Publish Friendship Update
RPC Server-->>Client: Request Confirmation

Note over Client,DB: Friendship Updates Flow
Redis-->>RPC Server: Friendship Update Event
participant Client
participant WebSocket
participant RPC Server
participant Redis
participant NATS
participant DB

Note over Client,DB: Connection Setup
Client->>WebSocket: WS Handshake
activate WebSocket
WebSocket-->>Client: Connection Established
Client->>WebSocket: Auth Message
WebSocket->>RPC Server: Attach Transport
activate RPC Server

Note over RPC Server,NATS: Subscriptions Setup
RPC Server->>Redis: Subscribe to updates channels
activate Redis
Note over Redis: friendship.updates
Note over Redis: friend.status.updates
RPC Server->>NATS: Subscribe to peer events
activate NATS
Note over NATS: peer.*.connected
Note over NATS: peer.*.disconnected
Note over NATS: peer.*.heartbeat

Note over Client,DB: Friendship Requests
Client->>RPC Server: Friend Request
RPC Server->>DB: Create Friendship Record
DB-->>RPC Server: Friendship Created
RPC Server->>DB: Record Friendship Action
RPC Server->>Redis: Publish Friendship Update
RPC Server-->>Client: Request Confirmation

loop Friendship Status Updates
Redis-->>RPC Server: Friendship Update
RPC Server-->>Client: Stream Friendship Updates
Note over RPC Server: (accept/cancel/reject/delete)
end

Note over Client,DB: Friends Lifecycle
NATS-->>Redis: Publish Peer Connection Update Event
Redis-->>RPC Server: Broadcast Friend Status Update Event
Note over Client,DB: Friends Connectivity Status
loop connectivity updates
NATS-->>Redis: Publish Peer Connection/Disconnection Update
Redis-->>RPC Server: Broadcast Friend Connectivity Status Update
RPC Server->>Redis: Get Cached Peers
Redis-->>RPC Server: Cached Peers
RPC Server->>DB: Query Online Friends
DB-->>RPC Server: Online Friends
RPC Server-->>Client: Stream Friend Status Updates
RPC Server-->>Client: Stream Friend Connectivity Status to Connected Friends
Note over RPC Server: (online/offline)

Note over Client,DB: Cleanup
Client->>WebSocket: Connection Close
WebSocket->>RPC Server: Detach Transport
RPC Server->>Redis: Unsubscribe
RPC Server->>NATS: Unsubscribe
deactivate WebSocket
deactivate RPC Server
deactivate Redis
deactivate NATS
end
loop friendship accepted
Redis-->>RPC Server: Friendship Accepted
RPC Server-->>Client: Stream Friend Connectivity Status Update to both friends
end

Note over Client,DB: Cleanup
Client->>WebSocket: Connection Close
WebSocket->>RPC Server: Detach Transport
RPC Server->>Redis: Unsubscribe
RPC Server->>NATS: Unsubscribe
deactivate WebSocket
deactivate RPC Server
deactivate Redis
deactivate NATS
```

## 🚀 Getting Started
Expand Down Expand Up @@ -191,16 +180,21 @@ yarn migrate up
5. Run the service:

```bash
yarn start
yarn dev
```

### Environment Variables

Key environment variables needed:

- `REDIS_HOST`: URL of the Redis instance
- `RPC_SERVER_PORT`: Port of the RPC server
- `PG_COMPONENT_PSQL_CONNECTION_STRING`: URL of the PostgreSQL instance
- `ARCHIPELAGO_STATS_URL`: URL of the Archipelago Stats service
- `NATS_URL`: URL of the NATS instance
- `CATALYST_LAMBDAS_URL_LOADBALANCER`: URL of the Catalyst Lambdas Load Balancer
- `PEER_SYNC_INTERVAL_MS`: Interval for peer synchronization
- `PEERS_SYNC_CACHE_TTL_MS`: Cache TTL for peer synchronization

See `.env.default` for all available options.

Expand Down
2 changes: 1 addition & 1 deletion src/adapters/archipelago-stats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export async function createArchipelagoStatsComponent({
return {
async getPeers() {
try {
const response = await fetcher.fetch(`${url}/comms/peers`)
const response = await fetcher.fetch(`${url}/peers`)

if (!response.ok) {
throw new Error(`Error fetching peers: ${response.statusText}`)
Expand Down
8 changes: 5 additions & 3 deletions src/adapters/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,9 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>
END as address
FROM friendships
WHERE (
(LOWER(address_requester) = ${normalizedUserAddress} AND LOWER(address_requested) IN (${normalizedOnlinePotentialFriends.join(',')}))
(LOWER(address_requester) = ${normalizedUserAddress} AND LOWER(address_requested) = ANY(${normalizedOnlinePotentialFriends}))
OR
(LOWER(address_requested) = ${normalizedUserAddress} AND LOWER(address_requester) IN (${normalizedOnlinePotentialFriends.join(',')}))
(LOWER(address_requested) = ${normalizedUserAddress} AND LOWER(address_requester) = ANY(${normalizedOnlinePotentialFriends}))
)
AND is_active = true`

Expand All @@ -351,15 +351,17 @@ export function createDBComponent(components: Pick<AppComponents, 'pg' | 'logs'>
const pool = pg.getPool()
const client = await pool.connect()
await client.query('BEGIN')

try {
const res = await cb(client)
await client.query('COMMIT')
return res
} catch (error: any) {
logger.error(`Error executing transaction: ${error.message}`)
await client.query('ROLLBACK')
client.release()
throw error
} finally {
client.release()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/adapters/peer-tracking.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export type PeerStatusHandler = {
}

export const PEER_STATUS_HANDLERS: PeerStatusHandler[] = [
{ event: 'connect', pattern: 'peer.*.connect', status: ConnectivityStatus.ONLINE },
{ event: 'connect', pattern: 'peer.*.connect', status: ConnectivityStatus.OFFLINE },
kevinszuchet marked this conversation as resolved.
Show resolved Hide resolved
{ event: 'disconnect', pattern: 'peer.*.disconnect', status: ConnectivityStatus.OFFLINE },
{ event: 'heartbeat', pattern: 'peer.*.heartbeat', status: ConnectivityStatus.ONLINE }
]
Expand Down
23 changes: 20 additions & 3 deletions src/adapters/rpc-server/rpc-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import { getSentFriendshipRequestsService } from './services/get-sent-friendship
import { getFriendshipStatusService } from './services/get-friendship-status'
import { subscribeToFriendConnectivityUpdatesService } from './services/subscribe-to-friend-connectivity-updates'
import { FRIEND_STATUS_UPDATES_CHANNEL, FRIENDSHIP_UPDATES_CHANNEL } from '../pubsub'
import { friendshipUpdateHandler, friendConnectivityUpdateHandler } from '../../logic/updates'
import {
friendshipUpdateHandler,
friendConnectivityUpdateHandler,
friendshipAcceptedUpdateHandler
} from '../../logic/updates'

export async function createRpcServerComponent({
logs,
Expand Down Expand Up @@ -74,15 +78,26 @@ export async function createRpcServerComponent({
})

await pubsub.subscribeToChannel(FRIENDSHIP_UPDATES_CHANNEL, friendshipUpdateHandler(subscribersContext, logger))
await pubsub.subscribeToChannel(
FRIENDSHIP_UPDATES_CHANNEL,
friendshipAcceptedUpdateHandler(subscribersContext, logger)
)
await pubsub.subscribeToChannel(
FRIEND_STATUS_UPDATES_CHANNEL,
friendConnectivityUpdateHandler(subscribersContext, logger, db)
)
},
attachUser({ transport, address }) {
logger.debug('[DEBUGGING CONNECTION] Attaching user to RPC', {
address,
transportConnected: String(transport.isConnected)
})

transport.on('close', () => {
logger.debug('[DEBUGGING CONNECTION] Transport closed, removing subscriber', {
address
})
subscribersContext.removeSubscriber(address)
logger.debug('User disconnected and removed from subscribers', { address })
})

const eventEmitter = subscribersContext.getOrAddSubscriber(address)
Expand All @@ -93,8 +108,10 @@ export async function createRpcServerComponent({
})
},
detachUser(address) {
logger.debug('[DEBUGGING CONNECTION] Detaching user from RPC', {
address
})
subscribersContext.removeSubscriber(address)
logger.debug('Detached user and cleaned up subscribers', { address })
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export function subscribeToFriendConnectivityUpdatesService({
const logger = logs.getLogger('subscribe-to-friend-connectivity-updates-service')

return async function* (_request: Empty, context: RpcServerContext): AsyncGenerator<FriendConnectivityUpdate> {
let cleanup: (() => void) | undefined

try {
const onlinePeers = await archipelagoStats.getPeersFromCache()
const onlineFriends = await db.getOnlineFriends(context.address, onlinePeers)
Expand All @@ -26,7 +28,7 @@ export function subscribeToFriendConnectivityUpdatesService({

yield* parsedProfiles

yield* handleSubscriptionUpdates({
cleanup = yield* handleSubscriptionUpdates({
rpcContext: context,
eventName: 'friendConnectivityUpdate',
components: {
Expand All @@ -39,8 +41,11 @@ export function subscribeToFriendConnectivityUpdatesService({
parser: parseEmittedUpdateToFriendConnectivityUpdate
})
} catch (error: any) {
logger.error('Error in friend updates subscription:', error)
logger.error('Error in friend connectivity updates subscription:', error)
throw error
} finally {
logger.info('Cleaning up friend connectivity updates subscription')
cleanup?.()
}
}
}
Loading
Loading