Skip to content

Commit fdf9525

Browse files
committed
cache 1
1 parent 9048b61 commit fdf9525

File tree

8 files changed

+118
-14
lines changed

8 files changed

+118
-14
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,12 @@
9191
<artifactId>netty-all</artifactId>
9292
<version>4.1.109.Final</version>
9393
</dependency>
94+
<dependency>
95+
<groupId>org.reflections</groupId>
96+
<artifactId>reflections</artifactId>
97+
<version>0.10.2</version>
98+
</dependency>
99+
94100
</dependencies>
95101

96102
</project>

src/main/java/com/wei/seqMq/ConsumerInfo.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,11 @@ public class ConsumerInfo {
1717
*/
1818
private String consumerName;
1919

20+
public ConsumerInfo(String streamName, String groupName) {
21+
this.streamName = streamName;
22+
this.groupName = groupName;
23+
}
24+
25+
public ConsumerInfo() {
26+
}
2027
}

src/main/java/com/wei/seqMq/ListenerTask.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,21 @@ public void run() {
2727
public void listen() {
2828
log.info("start listen stream message...");
2929
while (canLoop()) {
30-
Map<StreamMessageId, Map<String, String>> streamMessageIdMapMap = readEventsFromStreamTtl(consumerInfo, Duration.ofSeconds(5));
31-
log.info("listen msg = {}", streamMessageIdMapMap);
32-
if (null == streamMessageIdMapMap) {
33-
continue;
34-
}
35-
for (StreamMessageId streamMessageId : streamMessageIdMapMap.keySet()) {
36-
Map<String, String> stringMap = streamMessageIdMapMap.get(streamMessageId);
37-
if (stringMap == null) {
30+
try {
31+
Map<StreamMessageId, Map<String, String>> streamMessageIdMapMap = readEventsFromStreamTtl(consumerInfo, Duration.ofSeconds(10));
32+
log.info("listen msg = {}", streamMessageIdMapMap);
33+
if (null == streamMessageIdMapMap) {
3834
continue;
3935
}
40-
handlerListen(consumerInfo, streamMessageId, stringMap);
36+
for (StreamMessageId streamMessageId : streamMessageIdMapMap.keySet()) {
37+
Map<String, String> stringMap = streamMessageIdMapMap.get(streamMessageId);
38+
if (stringMap == null) {
39+
continue;
40+
}
41+
handlerListen(consumerInfo, streamMessageId, stringMap);
42+
}
43+
} catch (Throwable e) {
44+
log.error("listen throw exception cause = {}", e.getMessage(), e.getCause());
4145
}
4246
}
4347
}

src/main/java/com/wei/seqMq/MyStreamListener.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,14 @@
33
import lombok.extern.slf4j.Slf4j;
44

55
@Slf4j
6+
@StreamConsumer(
7+
groupName = "SeqMessageQueue",
8+
streamName = "SeqGroup",
9+
autoAck = true
10+
)
611
class MyStreamListener implements StreamListener<SeqMessage> {
712

8-
boolean actAck = true;
13+
private boolean actAck = true;
914

1015
@Override
1116
public void onMessage(SeqMessage message) {
@@ -16,4 +21,8 @@ public void onMessage(SeqMessage message) {
1621
public boolean autoAck() {
1722
return actAck;
1823
}
24+
25+
public void setActAck(boolean actAck) {
26+
this.actAck = actAck;
27+
}
1928
}

src/main/java/com/wei/seqMq/SeqMessageQueue.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,16 @@ public static void main(String[] args) throws InterruptedException {
7575
}
7676
queue.processPending(streamKey, streamGroup);
7777

78-
MyStreamListener streamListener = new MyStreamListener();
7978

79+
// 创建容器实例
80+
StreamMessageListenerContainer container = StreamMessageListenerContainer.create();
81+
82+
// 自动扫描并注册监听器(指定扫描包路径)
83+
StreamConsumerRegistrar.register(container, "com.wei.seqMq");
8084
StreamMessageListenerContainer streamMessageListenerContainer = StreamMessageListenerContainer.create();
81-
streamMessageListenerContainer.receiveAutoAck(consumerInfo, streamListener, true);
85+
// MyStreamListener streamListener = new MyStreamListener();
86+
// streamListener.setActAck(true);
87+
// streamMessageListenerContainer.receiveAutoAck(consumerInfo, streamListener, true);
8288
streamMessageListenerContainer.start();
8389
Thread.sleep(60 * 1000);
8490
queue.shutdown();
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.wei.seqMq;
2+
3+
import java.lang.annotation.ElementType;
4+
import java.lang.annotation.Retention;
5+
import java.lang.annotation.RetentionPolicy;
6+
import java.lang.annotation.Target;
7+
8+
@Retention(RetentionPolicy.RUNTIME)
9+
@Target(ElementType.TYPE)
10+
public @interface StreamConsumer {
11+
String streamName(); // 流键
12+
13+
String groupName(); // 消费者组
14+
15+
boolean autoAck() default true; // 是否自动确认
16+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.wei.seqMq;
2+
3+
import org.reflections.Reflections;
4+
5+
import java.lang.reflect.Field;
6+
import java.lang.reflect.InvocationTargetException;
7+
import java.lang.reflect.Method;
8+
import java.util.Set;
9+
10+
public class StreamConsumerRegistrar {
11+
12+
public static void register(StreamMessageListenerContainer container, String... basePackages) {
13+
Reflections reflections = new Reflections(basePackages);
14+
Set<Class<?>> classes = reflections.getTypesAnnotatedWith(StreamConsumer.class);
15+
for (Class<?> clazz : classes) {
16+
if (StreamListener.class.isAssignableFrom(clazz)) {
17+
try {
18+
StreamConsumer annot = clazz.getAnnotation(StreamConsumer.class);
19+
StreamListener<?> listener = (StreamListener<?>) clazz.getDeclaredConstructor().newInstance();
20+
// 设置 autoAck 属性
21+
setAutoAckProperty(listener, annot.autoAck());
22+
// 构建 ConsumerInfo
23+
ConsumerInfo consumerInfo = new ConsumerInfo(annot.streamName(), annot.groupName());
24+
// 注册到容器
25+
if (annot.autoAck()) {
26+
container.receiveAutoAck(consumerInfo, listener);
27+
} else {
28+
container.receive(consumerInfo, listener, false);
29+
}
30+
} catch (InstantiationException | IllegalAccessException |
31+
NoSuchMethodException | InvocationTargetException e) {
32+
e.printStackTrace();
33+
}
34+
}
35+
}
36+
}
37+
38+
private static void setAutoAckProperty(StreamListener<?> listener, boolean autoAck) {
39+
try {
40+
// 尝试调用 setActAck 方法
41+
Method setMethod = listener.getClass().getMethod("setActAck", boolean.class);
42+
setMethod.invoke(listener, autoAck);
43+
} catch (NoSuchMethodException e) {
44+
// 方法不存在,尝试直接设置字段
45+
try {
46+
Field field = listener.getClass().getDeclaredField("actAck");
47+
field.setAccessible(true);
48+
field.setBoolean(listener, autoAck);
49+
} catch (NoSuchFieldException | IllegalAccessException ex) {
50+
throw new RuntimeException("Failed to set autoAck for listener: " + listener.getClass(), ex);
51+
}
52+
} catch (IllegalAccessException | InvocationTargetException e) {
53+
throw new RuntimeException("Error setting autoAck property", e);
54+
}
55+
}
56+
}

src/main/java/com/wei/seqMq/StreamMessageListenerContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ default void receive(ConsumerInfo consumerInfo, StreamListener streamListener) {
1616
this.receive(consumerInfo, streamListener, false);
1717
}
1818

19-
default void receiveAutoAck(ConsumerInfo consumerInfo, StreamListener streamListener, boolean autoAck) {
20-
this.receive(consumerInfo, streamListener, autoAck);
19+
default void receiveAutoAck(ConsumerInfo consumerInfo, StreamListener streamListener) {
20+
this.receive(consumerInfo, streamListener, true);
2121
}
2222
}

0 commit comments

Comments
 (0)