|
2 | 2 |
|
3 | 3 | from tornado.websocket import websocket_connect
|
4 | 4 | from tornado.gen import coroutine
|
5 |
| -from tornado.ioloop import PeriodicCallback |
6 | 5 |
|
7 | 6 | from requests import Session
|
8 | 7 | from requests.compat import urljoin
|
|
18 | 17 | import time
|
19 | 18 |
|
20 | 19 | from models import User, session
|
21 |
| -from datetime import datetime |
22 | 20 |
|
23 | 21 |
|
24 | 22 | class Beam:
|
@@ -294,134 +292,107 @@ def read_chat(self, handler=None):
|
294 | 292 | if callable(handler):
|
295 | 293 | handler(response)
|
296 | 294 |
|
297 |
| - def connect_to_liveloading(self, channel_id, user_id): |
| 295 | + def connect_to_constellation(self, channel_id, user_id): |
298 | 296 | """Connect to Beam liveloading."""
|
299 | 297 |
|
300 |
| - self.liveloading_connection_information = { |
| 298 | + self.constellation_connection_information = { |
301 | 299 | "channel_id": channel_id,
|
302 | 300 | "user_id": user_id
|
303 | 301 | }
|
304 | 302 |
|
305 |
| - liveloading_websocket_connection = websocket_connect( |
306 |
| - "wss://realtime.beam.pro/socket.io/?EIO=3&transport=websocket") |
307 |
| - liveloading_websocket_connection.add_done_callback( |
| 303 | + constellation_websocket_connection = websocket_connect( |
| 304 | + "wss://constellation.beam.pro") |
| 305 | + constellation_websocket_connection.add_done_callback( |
308 | 306 | partial(self.subscribe_to_liveloading, channel_id, user_id))
|
309 | 307 |
|
310 | 308 | def subscribe_to_liveloading(self, channel_id, user_id, future):
|
311 |
| - """Subscribe to Beam liveloading.""" |
| 309 | + """Subscribe to Beam constellation.""" |
312 | 310 |
|
313 | 311 | if future.exception() is None:
|
314 |
| - self.liveloading_websocket = future.result() |
| 312 | + self.constellation_websocket = future.result() |
315 | 313 |
|
316 | 314 | self.logger.info(
|
317 |
| - "Successfully connected to liveloading websocket.") |
318 |
| - |
319 |
| - interfaces = ( |
320 |
| - "channel:{channel_id}:update", |
321 |
| - "channel:{channel_id}:followed", |
322 |
| - "channel:{channel_id}:subscribed", |
323 |
| - "channel:{channel_id}:resubscribed", |
324 |
| - "channel:{channel_id}:hosted", |
325 |
| - "user:{user_id}:update" |
326 |
| - ) |
327 |
| - self.subscribe_to_interfaces( |
328 |
| - *tuple( |
329 |
| - interface.format(channel_id=channel_id, user_id=user_id) |
330 |
| - for interface in interfaces |
331 |
| - ) |
332 |
| - ) |
| 315 | + "Successfully connected to constellation websocket.") |
| 316 | + |
| 317 | + interfaces = [ |
| 318 | + "channel:{channel}:update".format(channel=channel_id), |
| 319 | + "channel:{channel}:followed".format(channel=channel_id), |
| 320 | + "channel:{channel}:subscribed".format(channel=channel_id), |
| 321 | + "channel:{channel}:resubscribed".format(channel=channel_id), |
| 322 | + "channel:{channel}:hosted".format(channel=channel_id), |
| 323 | + "user:{user}:update".format(user=user_id) |
| 324 | + ] |
| 325 | + self.subscribe_to_interfaces(interfaces) |
333 | 326 |
|
334 | 327 | self.logger.info(
|
335 |
| - "Successfully subscribed to liveloading interfaces.") |
| 328 | + "Successfully subscribed to Constellation interfaces.") |
336 | 329 |
|
337 |
| - self.watch_liveloading() |
| 330 | + self.watch_constellation() |
338 | 331 | else:
|
339 | 332 | self.logger.warning(future.exception())
|
340 |
| - self.connect_to_liveloading(channel_id, user_id) |
341 |
| - |
342 |
| - def subscribe_to_interfaces(self, *interfaces): |
343 |
| - """Subscribe to a Beam liveloading interface.""" |
344 |
| - |
345 |
| - packet = [ |
346 |
| - "put", |
347 |
| - { |
348 |
| - "method": "put", |
349 |
| - "headers": {}, |
350 |
| - "data": { |
351 |
| - "slug": interfaces |
352 |
| - }, |
353 |
| - "url": "/api/v1/live" |
354 |
| - } |
355 |
| - ] |
356 |
| - self.liveloading_websocket.write_message('420' + dumps(packet)) |
357 |
| - |
358 |
| - def parse_liveloading_message(self, message): |
359 |
| - """Parse a message received from the Beam liveloading websocket.""" |
| 333 | + self.connect_to_constellation(channel_id, user_id) |
360 | 334 |
|
361 |
| - sections = re.match(r"(\d+)(.+)?$", message).groups() |
| 335 | + def subscribe_to_interfaces(self, interfaces: list): |
| 336 | + """Subscribe to a Beam constellation interface.""" |
362 | 337 |
|
363 |
| - return { |
364 |
| - "code": sections[0], |
365 |
| - "data": loads(sections[1]) if sections[1] is not None else None |
| 338 | + packet = { |
| 339 | + "type": "method", |
| 340 | + "method": "livesubscribe", |
| 341 | + "params": { |
| 342 | + "events": interfaces |
| 343 | + }, |
| 344 | + "id": 1 |
366 | 345 | }
|
| 346 | + self.constellation_websocket.write_message(dumps(packet)) |
| 347 | + |
| 348 | + def parse_constellation_message(self, packet): |
| 349 | + try: |
| 350 | + packet = loads(packet) |
| 351 | + except: |
| 352 | + return "" |
| 353 | + else: |
| 354 | + if "data" in packet and "payload" in packet["data"]: |
| 355 | + return packet["data"] |
| 356 | + else: |
| 357 | + return "" |
367 | 358 |
|
368 | 359 | @coroutine
|
369 |
| - def watch_liveloading(self, handler=None): |
| 360 | + def watch_constellation(self): |
370 | 361 | """Watch and handle packets from the Beam liveloading websocket."""
|
371 | 362 |
|
372 |
| - response = yield self.liveloading_websocket.read_message() |
| 363 | + response = yield self.constellation_websocket.read_message() |
373 | 364 | if response is None:
|
374 | 365 | raise ConnectionError
|
375 | 366 |
|
376 |
| - packet = self.parse_liveloading_message(response) |
377 |
| - |
378 |
| - PeriodicCallback( |
379 |
| - partial(self.liveloading_websocket.write_message, '2'), |
380 |
| - packet["data"]["pingInterval"] |
381 |
| - ).start() |
382 |
| - |
383 | 367 | while True:
|
384 |
| - message = yield self.liveloading_websocket.read_message() |
385 |
| - |
386 |
| - if message is None: |
387 |
| - self.logger.info("Connection to Liveloading lost.") |
388 |
| - self.logger.info("Attempting to reconnect.") |
389 |
| - |
390 |
| - return self.connect_to_liveloading( |
391 |
| - **self.liveloading_connection_information) |
392 |
| - |
393 |
| - self.logger.info("Attempting to reconnect.") |
394 |
| - self.watch_liveloading() |
395 |
| - |
396 |
| - packet = self.parse_liveloading_message(message) |
397 |
| - |
398 |
| - if packet.get("data") is not None: |
399 |
| - self.logger.debug("LIVE: {}".format(packet)) |
400 |
| - |
401 |
| - if isinstance(packet["data"], list): |
402 |
| - if isinstance(packet["data"][0], str): |
403 |
| - if packet["data"][1].get("following"): |
404 |
| - self.logger.info("- {} followed.".format( |
405 |
| - packet["data"][1]["user"]["username"])) |
406 |
| - |
407 |
| - user = session.query(User).filter_by( |
408 |
| - id=packet["data"][1]["user"]["id"]).first() |
409 |
| - if user and (datetime.now() - user.follow_date).days: |
410 |
| - self.send_message( |
411 |
| - "Thanks for the follow, @{}!".format( |
412 |
| - packet["data"][1]["user"]["username"])) |
413 |
| - user.follow_date = datetime.now() |
414 |
| - session.add(user) |
415 |
| - session.commit() |
416 |
| - elif packet["data"][1].get("subscribed"): |
417 |
| - self.logger.info("- {} subscribed.".format( |
418 |
| - packet["data"][1]["user"]["username"])) |
419 |
| - self.send_message( |
420 |
| - "Thanks for the subscription, @{}! <3".format( |
421 |
| - packet["data"][1]["user"]["username"])) |
422 |
| - elif packet["data"][1].get("hoster"): |
423 |
| - self.logger.info("- {} hosted the channel.".format( |
424 |
| - packet["data"][1]["hoster"]["token"])) |
| 368 | + message = yield self.constellation_websocket.read_message() |
| 369 | + message = self.parse_constellation_message(message) |
| 370 | + if message is None or message is "": |
| 371 | + pass |
| 372 | + else: |
| 373 | + self.logger.debug("LIVE: {}".format(message)) |
| 374 | + if "followed" in message["channel"]: |
| 375 | + if message["payload"]["following"]: |
425 | 376 | self.send_message(
|
426 |
| - "Thanks for hosting the channel, @{}!".format( |
427 |
| - packet["data"][1]["hoster"]["token"])) |
| 377 | + "Thanks for the follow, @{} !".format( |
| 378 | + message["payload"]["user"]["username"])) |
| 379 | + self.logger.info("- {} followed.".format( |
| 380 | + message["payload"]["user"]["username"])) |
| 381 | + elif "subscribed" in message["channel"]: |
| 382 | + self.send_message("Thanks for subscribing, @{} !".format( |
| 383 | + message["payload"]["user"]["username"] |
| 384 | + )) |
| 385 | + elif "resubscribed" in message["channel"]: |
| 386 | + self.send_message("Thanks for subscribing, @{} !".format( |
| 387 | + message["payload"]["user"]["username"] |
| 388 | + )) |
| 389 | + |
| 390 | + # if message is None: |
| 391 | + # self.logger.info("Connection to Constellation lost.") |
| 392 | + # self.logger.info("Attempting to reconnect.") |
| 393 | + |
| 394 | + # return self.connect_to_constellation( |
| 395 | + # **self.constellation_connection_information) |
| 396 | + |
| 397 | + # self.logger.info("Attempting to reconnect.") |
| 398 | + # self.watch_constellation() |
0 commit comments