1
- package com .alibabacloud .mse .demo ;
1
+ package com .alibabacloud .mse .demo . a . mq ;
2
2
3
- import com .alibabacloud .mse .demo .service .MqConsumer ;
4
3
import lombok .RequiredArgsConstructor ;
5
4
import lombok .extern .slf4j .Slf4j ;
6
5
import org .apache .rocketmq .client .consumer .DefaultMQPushConsumer ;
7
6
import org .apache .rocketmq .client .exception .MQClientException ;
8
7
import org .apache .rocketmq .common .consumer .ConsumeFromWhere ;
8
+ import org .springframework .beans .factory .annotation .Autowired ;
9
9
import org .springframework .beans .factory .annotation .Value ;
10
+ import org .springframework .cloud .commons .util .InetUtils ;
10
11
import org .springframework .context .annotation .Bean ;
11
12
import org .springframework .context .annotation .Configuration ;
13
+ import org .springframework .web .client .RestTemplate ;
12
14
13
15
@ Slf4j
14
16
@ Configuration
@@ -24,7 +26,14 @@ public class RocketMqConfiguration {
24
26
@ Value ("${rocketmq.consumer.topic}" )
25
27
private String topic ;
26
28
27
- private final MqConsumer mqConsumer ;
29
+ @ Autowired
30
+ private RestTemplate restTemplate ;
31
+
32
+ @ Autowired
33
+ private InetUtils inetUtils ;
34
+
35
+ @ Autowired
36
+ private String serviceTag ;
28
37
29
38
static {
30
39
System .setProperty ("rocketmq.client.log.loadconfig" , "false" );
@@ -37,6 +46,12 @@ public DefaultMQPushConsumer mqPushConsumer() throws MQClientException {
37
46
consumer .setNamesrvAddr (nameSrvAddr );
38
47
consumer .subscribe (topic , "*" );
39
48
consumer .setConsumeFromWhere (ConsumeFromWhere .CONSUME_FROM_FIRST_OFFSET );
49
+
50
+ MqConsumer mqConsumer = new MqConsumer (
51
+ restTemplate ,
52
+ inetUtils ,
53
+ serviceTag
54
+ );
40
55
consumer .registerMessageListener (mqConsumer );
41
56
log .info ("完成启动rocketMq的consumer,subscribe:{}" , topic );
42
57
return consumer ;
0 commit comments