Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] network package lost if enable haProxyProtocolEnabled #21684

Merged
merged 9 commits into from
Dec 8, 2023

Conversation

poorbarcode
Copy link
Contributor

Fixes #21557

Motivation

There is a network package loss issue after enabling haProxyProtocolEnabled, which leads the error Checksum failed on the broker and Adjusted frame length exceeds, you can reproduce the issue by the test testSlowNetwork. See: https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java#L43-L44

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        ProtocolDetectionResult<HAProxyProtocolVersion> result =
                HAProxyMessageDecoder.detectProtocol((ByteBuf) msg);
        if (result.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
            return; // package lost here
        }

        if (result.state() == ProtocolDetectionState.DETECTED) {
            ctx.pipeline().addAfter(NAME, null, new HAProxyMessageDecoder());
            ctx.pipeline().remove(this);
        }
    }
    super.channelRead(ctx, msg);
}

Error logs:

2023-11-08T04:45:06,474-0800 [pulsar-io-4-47] WARN  org.apache.pulsar.broker.service.ServerCnx - [/10.139.120.143:45029] Got exception io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 5253120: 336070198 - discarded
	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:507)
	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:493)
	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.exceededFrameLength(LengthFieldBasedFrameDecoder.java:377)
	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:423)
	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:333)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
	at org.apache.pulsar.common.protocol.OptionalProxyProtocolDecoder.channelRead(OptionalProxyProtocolDecoder.java:52)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1373)
	at io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1247)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1287)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)

Modifications

Fix the bug.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Dec 7, 2023
@poorbarcode poorbarcode self-assigned this Dec 7, 2023
@poorbarcode poorbarcode added this to the 3.2.0 milestone Dec 7, 2023
@poorbarcode poorbarcode added release/2.10.6 release/3.0.3 release/2.11.4 release/3.1.3 category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost type/bug The PR fixed a bug or issue reported a bug labels Dec 7, 2023
Copy link
Member

@coderzc coderzc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

ctx.pipeline().addAfter(NAME, null, new HAProxyMessageDecoder());
ctx.pipeline().remove(this);
}
super.channelRead(ctx, buf);
Copy link
Member

@coderzc coderzc Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to remove the OptionalProxyProtocolDecoder from the pipeline if result.state() == ProtocolDetectionState.INVALID. Otherwise, if buffer.readableBytes() < 12 then the channelRead will block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise, if buffer.readableBytes() < 12 then the channelRead will block.

Since we can get the result ProtocolDetectionState.INVALID, the value of buffer.readableBytes() must larger or equals 12

Copy link
Member

@coderzc coderzc Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we get the result ProtocolDetectionState.INVALID, it proves the user is not using HAProxy, we should remove the OptionalProxyProtocolDecoder from the pipeline and don't need to continue to accumulate bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we get the result ProtocolDetectionState.INVALID, it proves the user is not using HAProxy, we should remove the OptionalProxyProtocolDecoder from the pipeline and don't need to continue to accumulate bytes.

The current implementation is most stable when it is uncertain how many times HaProxy Prototl Notation will be received during the lifetime of a connection (such as repeated send due to an error).

The probability of receiving one package that is less than 12 bytes is extremely low, so it doesn't affect performance.

Otherwise, if buffer.readableBytes() < 12 then the channelRead will block.

PulsarCommand at least has 12 bytes: [frame length][cmd length][base cmd], it will not cause a stuck.

Copy link
Member

@coderzc coderzc Dec 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PulsarCommand at least has 12 bytes: [frame length][cmd length][base cmd], it will not cause a stuck.

What if the last byte read was less than 12 due to network unpacking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with you, fixed

@codecov-commenter
Copy link

Codecov Report

Merging #21684 (4e2eb6c) into master (1919a0e) will increase coverage by 0.11%.
Report is 2 commits behind head on master.
The diff coverage is 75.00%.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #21684      +/-   ##
============================================
+ Coverage     73.29%   73.41%   +0.11%     
+ Complexity    32769    32764       -5     
============================================
  Files          1893     1893              
  Lines        140745   140766      +21     
  Branches      15503    15506       +3     
============================================
+ Hits         103166   103344     +178     
+ Misses        29498    29317     -181     
- Partials       8081     8105      +24     
Flag Coverage Δ
inttests 24.10% <0.00%> (?)
systests 24.75% <0.00%> (+0.12%) ⬆️
unittests 72.69% <75.00%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
.../common/protocol/OptionalProxyProtocolDecoder.java 79.16% <75.00%> (+6.43%) ⬆️

... and 71 files with indirect coverage changes

if (result.state() == ProtocolDetectionState.NEEDS_MORE_DATA) {
// Accumulate data if need more data to detect the protocol.
if (cumulatedByteBuf == null) {
cumulatedByteBuf = new CompositeByteBuf(ctx.alloc(), false, MIN_BYTES_SIZE_TO_DETECT_PROTOCOL, buf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PulsarByteBufAllocator have defined memory policies and OOM listener. and it's not recommended to use the constructor to build CompositeByteBuf in it's doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PulsarByteBufAllocator have defined memory policies and OOM listener.

Ah, it is the same object. see: https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L488

and it's not recommended to use the constructor to build CompositeByteBuf in it's doc.

Since these two recommended methods ByteBufAllocator.compositeBuffer() and Unpooled.wrappedBuffer(ByteBuf...) do not support the param maxNumComponents, we create CompositeByteBuf by constructor is better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the same. Ok

@poorbarcode poorbarcode merged commit 6e18874 into apache:master Dec 8, 2023
poorbarcode added a commit that referenced this pull request Dec 13, 2023
…21684)

Fixes #21557

### Motivation

There is a network package loss issue after enabling `haProxyProtocolEnabled`, which leads the error `Checksum failed on the broker` and `Adjusted frame length exceeds`, you can reproduce the issue by the test `testSlowNetwork`.

### Modifications

Fix the bug.

(cherry picked from commit 6e18874)
poorbarcode added a commit that referenced this pull request Dec 13, 2023
…21684)

Fixes #21557

There is a network package loss issue after enabling `haProxyProtocolEnabled`, which leads the error `Checksum failed on the broker` and `Adjusted frame length exceeds`, you can reproduce the issue by the test `testSlowNetwork`.

Fix the bug.

(cherry picked from commit 6e18874)
poorbarcode added a commit that referenced this pull request Dec 13, 2023
…21684)

Fixes #21557

### Motivation

There is a network package loss issue after enabling `haProxyProtocolEnabled`, which leads the error `Checksum failed on the broker` and `Adjusted frame length exceeds`, you can reproduce the issue by the test `testSlowNetwork`.

### Modifications

Fix the bug.

(cherry picked from commit 6e18874)
poorbarcode added a commit that referenced this pull request Dec 13, 2023
…21684)

Fixes #21557

### Motivation

There is a network package loss issue after enabling `haProxyProtocolEnabled`, which leads the error `Checksum failed on the broker` and `Adjusted frame length exceeds`, you can reproduce the issue by the test `testSlowNetwork`.

### Modifications

Fix the bug.

(cherry picked from commit 6e18874)
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 20, 2023
…pache#21684)

Fixes apache#21557

### Motivation

There is a network package loss issue after enabling `haProxyProtocolEnabled`, which leads the error `Checksum failed on the broker` and `Adjusted frame length exceeds`, you can reproduce the issue by the test `testSlowNetwork`.

### Modifications

Fix the bug.

(cherry picked from commit 6e18874)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 20, 2023
…pache#21684)

Fixes apache#21557

### Motivation

There is a network package loss issue after enabling `haProxyProtocolEnabled`, which leads the error `Checksum failed on the broker` and `Adjusted frame length exceeds`, you can reproduce the issue by the test `testSlowNetwork`.

### Modifications

Fix the bug.

(cherry picked from commit 6e18874)
nodece pushed a commit to nodece/pulsar that referenced this pull request Feb 23, 2024
…pache#21684)

Fixes apache#21557

There is a network package loss issue after enabling `haProxyProtocolEnabled`, which leads the error `Checksum failed on the broker` and `Adjusted frame length exceeds`, you can reproduce the issue by the test `testSlowNetwork`.

Fix the bug.

(cherry picked from commit 6e18874)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost cherry-picked/branch-2.10 cherry-picked/branch-2.11 cherry-picked/branch-3.0 cherry-picked/branch-3.1 doc-not-needed Your PR changes do not impact docs release/2.10.6 release/2.11.3 release/3.0.3 release/3.1.3 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] Failed to verify checksum
6 participants