Skip to content

feat(mq): 添加 RocketMQ消息队列功能 #283

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
786 changes: 786 additions & 0 deletions demo-mq-rocketmq/README.md

Large diffs are not rendered by default.

109 changes: 68 additions & 41 deletions demo-mq-rocketmq/pom.xml
Original file line number Diff line number Diff line change
@@ -1,48 +1,75 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<modelVersion>4.0.0</modelVersion>

<artifactId>demo-mq-rocketmq</artifactId>
<artifactId>demo-mq-rocketmq</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>demo-mq-rocketmq</name>
<description>Demo project for Spring Boot</description>

<parent>
<groupId>com.xkcoding</groupId>
<artifactId>spring-boot-demo</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>demo-mq-rocketmq</name>
<description>Demo project for Spring Boot</description>

<parent>
<groupId>com.xkcoding</groupId>
<artifactId>spring-boot-demo</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<finalName>demo-mq-rocketmq</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>

<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version> <!-- 兼容JDK8~17的稳定版本 -->
<scope>provided</scope>
</dependency>


<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>
</dependencies>

<build>
<finalName>demo-mq-rocketmq</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.xkcoding.mq.rocketmq.constants;

public interface RocketMQConstant {
/**
* TOPIC
*/
String TOPIC_TEST = "TEST-TOPIC";

/**
* 消费者组
*/
String CONSUMER_GROUP = "demo-consumer-group";

/**
* 消息类型
*/
interface MessageType {
/**
* 普通消息
*/
String NORMAL = "NORMAL";

/**
* 定时消息
*/
String DELAY = "DELAY";

/**
* 顺序消息
*/
String ORDER = "ORDER";

/**
* 事务消息
*/
String TRANSACTION = "TRANSACTION";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.xkcoding.mq.rocketmq.consumer;

import com.xkcoding.mq.rocketmq.constants.RocketMQConstant;
import com.xkcoding.mq.rocketmq.message.MessageStruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* <p>
* 延时消息消费者
* </p>
*
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMQConstant.TOPIC_TEST,
consumerGroup = RocketMQConstant.CONSUMER_GROUP + "_delay",
selectorType = SelectorType.TAG,
selectorExpression = "delay"
)
// 延时消息消费
public class DelayMessageConsumer implements RocketMQListener<MessageStruct> {
/**
* 消费消息
* @param message
*/
@Override
public void onMessage(MessageStruct message) {
try {
log.info("延时消息消费者收到消息:messageId={}, messageContent={}, messageType={}, 接收时间={}",
message.getMessageId(),
message.getMessageContent(),
message.getMessageType(),
System.currentTimeMillis());
// 模拟业务处理
Thread.sleep(100);
log.info("延时消息处理完成:messageId={}", message.getMessageId());
} catch (Exception e) {
log.error("延时消息处理异常:messageId={}", message.getMessageId(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.xkcoding.mq.rocketmq.consumer;

import com.xkcoding.mq.rocketmq.constants.RocketMQConstant;
import com.xkcoding.mq.rocketmq.message.MessageStruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
/**
* <p>
* 普通消息消费者
* </p>
*
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMQConstant.TOPIC_TEST,
consumerGroup = RocketMQConstant.CONSUMER_GROUP + "_normal",
selectorType = SelectorType.TAG,
selectorExpression = "normal",
consumeMode = ConsumeMode.CONCURRENTLY,
messageModel = MessageModel.CLUSTERING,
maxReconsumeTimes = 3
)

public class
MessageConsumer implements RocketMQListener<MessageStruct> {

@PostConstruct
public void init() {
log.info("普通消息消费者初始化完成:topic={}, consumerGroup={}, tag={}, messageModel={}",
RocketMQConstant.TOPIC_TEST,
RocketMQConstant.CONSUMER_GROUP + "_normal",
"normal",
MessageModel.CLUSTERING);
}

@Override
public void onMessage(MessageStruct message) {
long startTime = System.currentTimeMillis();
log.info("普通消息消费者开始处理消息:messageId={}, messageContent={}, messageType={}, startTime={}",
message.getMessageId(),
message.getMessageContent(),
message.getMessageType(),
startTime);

try {
// 模拟业务处理
Thread.sleep(100);

long endTime = System.currentTimeMillis();
log.info("普通消息处理完成:messageId={}, 耗时={}ms",
message.getMessageId(),
(endTime - startTime));
} catch (Exception e) {
log.error("普通消息处理异常:messageId={}, error={}",
message.getMessageId(),
e.getMessage(),
e);
// 抛出异常以触发重试机制
throw new RuntimeException("消息处理失败:" + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.xkcoding.mq.rocketmq.consumer;

import com.xkcoding.mq.rocketmq.constants.RocketMQConstant;
import com.xkcoding.mq.rocketmq.message.MessageStruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMQConstant.TOPIC_TEST,
consumerGroup = RocketMQConstant.CONSUMER_GROUP + "_order",
selectorType = SelectorType.TAG,
selectorExpression = "order",
consumeMode = ConsumeMode.ORDERLY
)
public class OrderMessageConsumer implements RocketMQListener<MessageStruct> {
private final AtomicInteger counter = new AtomicInteger(0);

@Override
public void onMessage(MessageStruct message) {
try {
int count = counter.incrementAndGet();
log.info("顺序消息消费者收到消息:messageId={}, messageContent={}, messageType={}, 消息序号={}",
message.getMessageId(),
message.getMessageContent(),
message.getMessageType(),
count);
// 模拟业务处理
Thread.sleep(100);
log.info("顺序消息处理完成:messageId={}, 消息序号={}", message.getMessageId(), count);
} catch (Exception e) {
log.error("顺序消息处理异常:messageId={}", message.getMessageId(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.xkcoding.mq.rocketmq.consumer;

import com.xkcoding.mq.rocketmq.constants.RocketMQConstant;
import com.xkcoding.mq.rocketmq.message.MessageStruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* <p>
* 事务消息消费者
* </p>
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMQConstant.TOPIC_TEST,
consumerGroup = RocketMQConstant.CONSUMER_GROUP + "_transaction",
selectorType = SelectorType.TAG,
selectorExpression = "transaction"
)
// 消费者组
public class TransactionMessageConsumer implements RocketMQListener<MessageStruct> {
@Override
public void onMessage(MessageStruct message) {
try {
log.info("事务消息消费者收到消息:messageId={}, messageContent={}, messageType={}",
message.getMessageId(),
message.getMessageContent(),
message.getMessageType());
// 模拟业务处理
Thread.sleep(100);
log.info("事务消息处理完成:messageId={}", message.getMessageId());
} catch (Exception e) {
log.error("事务消息处理异常:messageId={}", message.getMessageId(), e);
}
}
}
Loading