1
1
# -*- coding: utf-8 -*-
2
2
3
+ import json
3
4
import time
4
5
5
6
import click
6
7
import schedule
8
+ import pika
7
9
8
10
from eridu .logger import logger
9
11
from eridu .config import FILTER_TAGS , SECONDS_BETWEEN_REQUESTS
12
+ from eridu .config import QUEUE_HOST , QUEUE_PORT , EXCHANGE , QUESTIONS_ROUTING_KEY , ANSWERS_ROUTING_KEY
10
13
from eridu .core import get_post_ids , split_post_ids , get_questions , get_answers , filter_posts_by_tag
11
14
12
15
@click .command ()
@@ -27,26 +30,39 @@ def run(params):
27
30
page = params .get ('page' )
28
31
logger .info ('Getting posts for page {}.' .format (page ))
29
32
33
+ connection = pika .BlockingConnection (pika .ConnectionParameters (
34
+ host = QUEUE_HOST ,
35
+ port = QUEUE_PORT )
36
+ )
37
+ channel = connection .channel ()
38
+
39
+ channel .exchange_declare (exchange = EXCHANGE ,
40
+ type = 'topic' )
41
+
30
42
post_ids = get_post_ids (page )
31
43
ids = split_post_ids (post_ids ['items' ])
32
44
33
45
questions = get_questions (ids ['question_ids' ])
34
46
questions = filter_posts_by_tag (questions ['items' ], tags )
35
- for question in questions :
36
- try :
37
- print (question )
38
- except UnicodeEncodeError :
39
- pass
40
47
41
- print ('\n \n ' )
48
+ routing_key = QUESTIONS_ROUTING_KEY
49
+ for question in questions :
50
+ message = json .dumps (question )
51
+ channel .basic_publish (exchange = EXCHANGE ,
52
+ routing_key = routing_key ,
53
+ body = message )
42
54
43
55
answers = get_answers (ids ['answer_ids' ])
44
56
answers = filter_posts_by_tag (answers ['items' ], tags )
57
+
58
+ routing_key = ANSWERS_ROUTING_KEY
45
59
for answer in answers :
46
- try :
47
- print (answer )
48
- except UnicodeEncodeError :
49
- pass
60
+ message = json .dumps (answer )
61
+ channel .basic_publish (exchange = EXCHANGE ,
62
+ routing_key = routing_key ,
63
+ body = message )
64
+
65
+ connection .close ()
50
66
51
67
params ['page' ] += 1
52
68
0 commit comments