本文记录了笔者之前在使用 Netty 框架时遇到过的一个奇怪问题,当时的现象看起来像 ChannelPipeline 中某个 Handler 在正常工作一段时间后就“失效”了,经过排查问题最终得到了解决。
背景
内部某个长连接服务,基于 Netty 框架使用 MQTT 协议进行通讯,在某天遇到个问题,经过排查日志发现进程行为像是“READ”后“不走”下一个 ChannelInboundHandler
而是直接进入 READ COMPLETE:
排查过程
观察行为
Channel 初始化:
可以看到,正常情况下 LoggingHandler
打印完 READ 后会进入 MqttPublishMessageHandler
:
查看代码发现 MqttPublishMessageHandler
继承了 SimpleChannelInboundHandler<MqttPublishMessage>
,而其重写的 channelRead0
被一个大 try-catch 包围,异常会触发 exceptionCaught:
InboundExceptionHandler 中会统一打印 Inbound 方向的异常。
往前回溯日志,发现服务器像是 “突然”改变了工作行为 一样,在某个时间点后,新接收到的消息就都不流经 MqttPublishMessageHandler
了。
最后一条行为正常的消息:
第一条出现异常行为的消息:
“行为是否正常”以是否打印
PUBLISH[dispatch]
日志为依据。
查看 error 日志,没发现相关信息。
dump 内存
赶紧 dump 内存:
还没来得及分析内存,突然发现客户端行为恢复正常了,时间点正好是 dump 完内存后,继续查看日志发现 Server 与 Client 之前的两条 TCP 连接都断了:
是重连后才恢复正常的:
由于重连会导致 ChannelPipeline
重建,猜测是 ChannelHandlerContext
被 remove 了,于是使用 MAT 分析内存:
OQL 如下:
SELECT c.remote.holder.addr.holder.address FROM io.netty.channel.epoll.EpollSocketChannel c WHERE (c.remote.holder.addr.holder.address = 170191178)
注:170191178 是 10.36.233.74 IP 地址的 int 值。
猜想一:handler 被 remove 了?
查看其中一个 pipeline:
从 head
往后检查 handler
链,发现 MqttPublishMessageHandler
是在链上的:
会不会是这个 handlerState
有问题?
我们查看 handlerState == 2
的含义:
是正常的“已添加”:
那会不会是 executionMask
有问题?
查看 Netty 4.1 进行的 @Skip
优化,查看 MqttPublishMessageHandler
的 executionMask
属性:
值为 0b100000,没有问题。
有关通过掩码和
@Skip
优化 Pipeline 效率,可以查看 Netty 的 相关 issue,我只能说,性能,极致的性能!
至此,“Handler 被移除”的猜想已被否定。
猜想二:handler 没进去?
提出下一猜想:会不会是由于某些原因,MqttPublishMessageHandler
没有进入?
检查 InboundDiscardHandler
和 InboundExceptionHandler
:
确实存在 msg 不为 ByteBuf 的情况下会“吞信息”的可能(代码不健壮)。
猜想三:handler 带状态?
但是内部系统间只会有 MQTT PUBLISH 消息,什么情况下会出现对象不是 MqttPublishMessageHandler
所关心的泛型 MqttPublishMessage
的情况呢?我们把视野转向 MqttDecoder:
如图,内存中的 io.netty.handler.codec.mqtt.MqttDecoder
有两种 state,而 其中的“BAD_MESSAGE”看起来很可疑,会不会是这个状态改变了 encoder 的行为?
真相大白
通过检查 incoming references,发现两个 state 为“BAD_MESSAGE”的 MqttDecoder 就是位于与 10.36.233.74 建立的两条(存在问题的)连接所对应的 ChannelPipeline 上的 decoder:
图中内存地址为 0xcd9769a8 的 DefaultChannelPipeline 对象与上文对应。
查看源码:
一句话总结:只要 QoS 为 1,可变头的 message id 就不能为 0。
如果为 0,抛出的 DecoderException 会被最外层捕获,并构造一个 MqttMessage 对象放入解码输出集合中:
最重要的是,MqttDecoder 是有状态的!。
从前面的 ChannelInitializer 中可以看到,ChannelPipeline 使用的 decoder 是 new 的,而 encoder 是单例的,无状态(标注了
@Sharable
)。
具体就是,一旦 MqttDecoder 发现有解码失败的消息,它会开始丢弃所有后续消息直到断连:
根本原因
其实,如果 InboundDiscardHandler
正确打印所有丢弃的消息,或者 MqttPublishMessageHandler
中判断 decodeResult
,(或者我有这方面的经验)这个问题就不会排查这么久了。
最终处理方式如下:
- 添加完整的日志,并在收到预期之外的包时断开连接
- Handler 中添加 decoderResult 检查逻辑