Skip to content

RocketMQ完善 #2133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions docs/high-performance/message-queue/rocketmq-questions.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,49 @@ tag:

其实很简单,我们需要处理的仅仅是将同一语义下的消息放入同一个队列(比如这里是同一个订单),那我们就可以使用 **Hash 取模法** 来保证同一个订单在同一个队列中就行了。

RocketMQ实现了两种队列选择算法,也可以自己实现

- 轮询算法

- 轮询算法就是向消息指定的topic所在队列中依次发送消息,保证消息均匀分布
- 是RocketMQ默认队列选择算法

- 最小投递延迟算法

- 每次消息投递的时候统计消息投递的延迟,选择队列时优先选择消息延时小的队列,导致消息分布不均匀,按照如下设置即可。

- ```java
producer.setSendLatencyFaultEnable(true);
```

- 继承MessageQueueSelector实现

- ```java
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//从mqs中选择一个队列,可以根据msg特点选择
return null;
}
}, new Object());
```

### 特殊情况处理

#### 发送异常

选择队列后会与Broker建立连接,通过网络请求将消息发送到Broker上,如果Broker挂了或者网络波动发送消息超时此时RocketMQ会进行重试。

重新选择其他Broker中的消息队列进行发送,默认重试两次,可以手动设置。

```java
producer.setRetryTimesWhenSendFailed(5);
```

#### 消息过大

消息超过4k时RocketMQ会将消息压缩后在发送到Broker上,减少网络资源的占用。

### 重复消费

emmm,就两个字—— **幂等** 。在编程中一个*幂等* 操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。比如说,这个时候我们有一个订单的处理积分的系统,每当来一个消息的时候它就负责为创建这个订单的用户的积分加上相应的数值。可是有一次,消息队列发送给订单系统 FrancisQ 的订单信息,其要求是给 FrancisQ 的积分加上 500。但是积分系统在收到 FrancisQ 的订单信息处理完成之后返回给消息队列处理成功的信息的时候出现了网络波动(当然还有很多种情况,比如 Broker 意外重启等等),这条回应没有发送成功。
Expand Down Expand Up @@ -354,6 +397,68 @@ emmm,就两个字—— **幂等** 。在编程中一个*幂等* 操作的特

这是官方文档的解释,我直接照搬过来就当科普了 😁😁😁。

## RocketMQ如何保证高性能读写

### 传统IO方式

![3](https://img1.imgtp.com/2023/08/15/9DQUZuL7.png)

传统的IO读写其实就是read + write的操作,整个过程会分为如下几步

- 用户调用read()方法,开始读取数据,此时发生一次上下文从用户态到内核态的切换,也就是图示的切换1
- 将磁盘数据通过DMA拷贝到内核缓存区
- 将内核缓存区的数据拷贝到用户缓冲区,这样用户,也就是我们写的代码就能拿到文件的数据
- read()方法返回,此时就会从内核态切换到用户态,也就是图示的切换2
- 当我们拿到数据之后,就可以调用write()方法,此时上下文会从用户态切换到内核态,即图示切换3
- CPU将用户缓冲区的数据拷贝到Socket缓冲区
- 将Socket缓冲区数据拷贝至网卡
- write()方法返回,上下文重新从内核态切换到用户态,即图示切换4

整个过程发生了4次上下文切换和4次数据的拷贝,这在高并发场景下肯定会严重影响读写性能故引入了零拷贝技术

### 零拷贝技术

#### mmap

mmap(memory map)是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。

简单地说就是内核缓冲区和应用缓冲区共享,从而减少了从读缓冲区到用户缓冲区的一次CPU拷贝。基于此上述架构图可变为:

![4](https://img1.imgtp.com/2023/08/15/CHmGd0II.png)

基于mmap IO读写其实就变成mmap + write的操作,也就是用mmap替代传统IO中的read操作。

当用户发起mmap调用的时候会发生上下文切换1,进行内存映射,然后数据被拷贝到内核缓冲区,mmap返回,发生上下文切换2;随后用户调用write,发生上下文切换3,将内核缓冲区的数据拷贝到Socket缓冲区,write返回,发生上下文切换4。

发生4次上下文切换和3次IO拷贝操作,在Java中的实现:

```java
FileChannel fileChannel = new RandomAccessFile("test.txt", "rw").getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileChannel.size());
```

#### sendfile

sendfile()跟mmap()一样,也会减少一次CPU拷贝,但是它同时也会减少两次上下文切换。

![5](https://img1.imgtp.com/2023/08/15/jqLgCEBY.png)



如图,用户在发起sendfile()调用时会发生切换1,之后数据通过DMA拷贝到内核缓冲区,之后再将内核缓冲区的数据CPU拷贝到Socket缓冲区,最后拷贝到网卡,sendfile()返回,发生切换2。发生了3次拷贝和两次切换。Java也提供了相应api:

```java
FileChannel channel = FileChannel.open(Paths.get("./test.txt"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
//调用transferTo方法向目标数据传输
channel.transferTo(position, len, target);
```

在如上代码中,并没有文件的读写操作,而是直接将文件的数据传输到target目标缓冲区,也就是说,sendfile是无法知道文件的具体的数据的;但是mmap不一样,他是可以修改内核缓冲区的数据的。假设如果需要对文件的内容进行修改之后再传输,只有mmap可以满足。

通过上面的一些介绍,结论是基于零拷贝技术,可以减少CPU的拷贝次数和上下文切换次数,从而可以实现文件高效的读写操作。

RocketMQ内部主要是使用基于mmap实现的零拷贝(其实就是调用上述提到的api),用来读写文件,这也是RocketMQ为什么快的一个很重要原因。

## RocketMQ 的刷盘机制

上面我讲了那么多的 `RocketMQ` 的架构和设计原理,你有没有好奇
Expand Down