-
Notifications
You must be signed in to change notification settings - Fork 64
/
Copy pathindex.coffee
55 lines (40 loc) · 1.82 KB
/
index.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
amqp = require('amqp')
uuid = require('node-uuid')
config = require('./config')
clients = require('./clients')
connection = amqp.createConnection({ url: config.url }, {defaultExchangeName: config.exchange.name})
WebSocketServer = require('ws').Server
wss = new WebSocketServer(config.webSocketServer)
connection.on 'ready', () ->
connection.queue config.queue.name, config.queue.options, (q) ->
console.log 'Queue ' + q.name + ' is open'
exc = connection.exchange config.exchange.name, config.exchange.options, (exchange) ->
console.log 'Exchange ' + exchange.name + ' is open'
q.bind(config.exchange.name, '#')
q.subscribe (msg, header, deliveryInfo) ->
msg = JSON.parse(msg.data.toString())
clientMsg = msg.data
clientMsg.routing_key = deliveryInfo.routingKey
senderClient = clients.getBySessionId(msg.session_id)
subscriptions = clients.getBySubscription(deliveryInfo.routingKey)
clientMsgStr = JSON.stringify(clientMsg)
subscriptions.forEach (client) ->
#exclude sender client
if !senderClient || client.id != senderClient.id
client.ws.send clientMsgStr
wss.on 'connection', (ws) ->
clientId = uuid.v4()
ws.on 'message', (message) ->
msg = JSON.parse(message)
if msg.cmd == 'auth'
clients.add({
id: clientId,
ws: ws,
auth: msg.data
})
else if msg.cmd == 'subscribe'
clients.addSubscription(clientId, msg.routing_key)
else if msg.cmd == 'unsubscribe'
clients.removeSubscription(clientId, msg.routing_key)
ws.on 'close', (message) ->
clients.removeById(clientId)