本章主要内容
ChannelHandler
API和ChannelPipeline
API- 检测资源泄漏
- 异常处理
在上一章中你学习了ByteBuf
——Netty的数据容器。当我们在本章中探讨Netty的数据流以及处理组件时,我们将基于已经学过的东西,并且你将开始看到框架的重要元素都结合到了一起。
你已经知道,可以在ChannelPipeline
中将ChannelHandler
链接在一起以组织处理逻辑。我们将会研究涉及这些类的各种用例,以及一个重要的关系——ChannelHandlerContext
。
理解所有这些组件之间的交互对于通过Netty构建模块化的、可重用的实现至关重要。
6.1 ChannelHandler家族
在我们开始详细地学习ChannelHandler
之前,我们将在Netty的组件模型的这部分基础上花上一些时间。
6.1.1 Channel的生命周期
Interface Channel
定义了一组和ChannelInboundHandler
API密切相关的简单但功能强大的状态模型,表6-1列出了Channel
的这4个状态。
表6-1 Channel
的生命周期状态
状 态
描 述
ChannelUnregistered
Channel
已经被创建,但还未注册到EventLoop
ChannelRegistered
Channel
已经被注册到了EventLoop
ChannelActive
Channel
处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了
ChannelInactive
Channel
没有连接到远程节点
Channel
的正常生命周期如图6-1所示。当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给ChannelPipeline
中的ChannelHandler
,其可以随后对它们做出响应。
图6-1 Channel
的状态模型
6.1.2 ChannelHandler的生命周期
表6-2中列出了interface ChannelHandler
定义的生命周期操作,在ChannelHandler
被添加到ChannelPipeline
中或者被从ChannelPipeline
中移除时会调用这些操作。这些方法中的每一个都接受一个ChannelHandlerContext
参数。
表6-2 ChannelHandler
的生命周期方法
类 型
描 述
handlerAdded
当把ChannelHandler
添加到ChannelPipeline
中时被调用
handlerRemoved
当从ChannelPipeline
中移除ChannelHandler
时被调用
exceptionCaught
当处理过程中在ChannelPipeline
中有错误产生时被调用
Netty定义了下面两个重要的ChannelHandler
子接口:
ChannelInboundHandler
——处理入站数据以及各种状态变化;ChannelOutboundHandler
——处理出站数据并且允许拦截所有的操作。
在接下来的章节中,我们将详细地讨论这些子接口。
6.1.3 ChannelInboundHandler接口
表6-3列出了interface ChannelInboundHandler
的生命周期方法。这些方法将会在数据被接收时或者与其对应的Channel
状态发生改变时被调用。正如我们前面所提到的,这些方法和Channel
的生命周期密切相关。
表6-3 ChannelInboundHandler
的方法
类 型
描 述
channelRegistered
当Channel
已经注册到它的EventLoop
并且能够处理I/O时被调用
channelUnregistered
当Channel
从它的EventLoop
注销并且无法处理任何I/O时被调用
channelActive
当Channel
处于活动状态时被调用;Channel
已经连接/绑定并且已经就绪
channelInactive
当Channel
离开活动状态并且不再连接它的远程节点时被调用
channelReadComplete
当Channel
上的一个读操作完成时被调用[1]
channelRead
当从Channel
读取数据时被调用
ChannelWritability - Changed
当Channel
的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生OutOfMemoryError
)或者可以在Channel
变为再次可写时恢复写入。可以通过调用Channel
的isWritable
方法来检测Channel
的可写性。与可写性相关的阈值可以通过Channel.config. setWriteHighWaterMark
和Channel.config.setWriteLowWater- Mark
方法来设置
userEventTriggered
当ChannelnboundHandler.fireUserEventTriggered
方法被调用时被调用,因为一个POJO被传经了ChannelPipeline
当某个ChannelInboundHandler
的实现重写channelRead
方法时,它将负责显式地释放与池化的ByteBuf
实例相关的内存。Netty为此提供了一个实用方法ReferenceCount-Util.release
,如代码清单6-1所示。
代码清单6-1 释放消息资源
@Sharablepublic class DiscardHandler extends ChannelInboundHandlerAdapter { ← -- 扩展了Channel-InboundHandler-Adapter @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ← -- 丢弃已接收的消息 ReferenceCountUtil.release(msg); }}
Netty将使用WARN
级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现违规的实例。但是以这种方式管理资源可能很繁琐。一个更加简单的方式是使用Simple- ChannelInboundHandler
。代码清单6-2是代码清单6-1的一个变体,说明了这一点。
代码清单6-2 使用SimpleChannelInboundHandler
@Sharablepublic class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> { ← -- 扩展了SimpleChannelInboundHandler @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) { // No need to do anything special ← -- 不需要任何显式的资源释放 }}
由于SimpleChannelInboundHandler
会自动释放资源,所以你不应该存储指向任何消息的引用供将来使用,因为这些引用都将会失效。
6.1.6节为引用处理提供了更加详细的讨论。
6.1.4 ChannelOutboundHandler接口
出站操作和数据将由ChannelOutboundHandler
处理。它的方法将被Channel
、Channel- Pipeline
以及ChannelHandlerContext
调用。
ChannelOutboundHandler
的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷操作并在稍后继续。
表6-4显示了所有由ChannelOutboundHandler
本身所定义的方法(忽略了那些从Channel- Handler
继承的方法)。
表6-4 ChannelOutboundHandler
的方法
类 型
描 述
bind(ChannelHandlerContext,
SocketAddress,ChannelPromise)
当请求将Channel
绑定到本地地址时被调用
connect(ChannelHandlerContext,
SocketAddress,SocketAddress,ChannelPromise)
当请求将Channel
连接到远程节点时被调用
disconnect(ChannelHandlerContext,
ChannelPromise)
当请求将Channel
从远程节点断开时被调用
close(ChannelHandlerContext,ChannelPromise)
当请求关闭Channel
时被调用
deregister(ChannelHandlerContext,
ChannelPromise)
当请求将Channel
从它的EventLoop
注销时被调用
read(ChannelHandlerContext)
当请求从Channel
读取更多的数据时被调用
flush(ChannelHandlerContext)
当请求通过Channel
将入队数据冲刷到远程节点时被调用
write(ChannelHandlerContext,Object,
ChannelPromise)
当请求通过Channel
将数据写到远程节点时被调用
ChannelPromise与ChannelFuture
ChannelOutboundHandler
中的大部分方法都需要一个ChannelPromise
参数,以便在操作完成时得到通知。ChannelPromise
是ChannelFuture
的一个子类,其定义了一些可写的方法,如setSuccess
和setFailure
,从而使ChannelFuture
不可变[2]。
接下来我们将看一看那些简化了编写ChannelHandler
的任务的类。
6.1.5 ChannelHandler适配器
你可以使用ChannelInboundHandlerAdapter
和ChannelOutboundHandlerAdapter
类作为自己的ChannelHandler
的起始点。这两个适配器分别提供了ChannelInboundHandler
和ChannelOutboundHandler
的基本实现。通过扩展抽象类ChannelHandlerAdapter
,它们获得了它们共同的超接口ChannelHandler
的方法。生成的类的层次结构如图6-2所示。
图6-2 ChannelHandlerAdapter
类的层次结构
ChannelHandlerAdapter
还提供了实用方法isSharable
。如果其对应的实现被标注为Sharable
,那么这个方法将返回true
,表示它可以被添加到多个ChannelPipeline
中(如在2.3.1节中所讨论过的一样)。
在ChannelInboundHandlerAdapter
和ChannelOutboundHandlerAdapter
中所提供的方法体调用了其相关联的ChannelHandlerContext
上的等效方法,从而将事件转发到了ChannelPipeline
中的下一个ChannelHandler
中。
你要想在自己的ChannelHandler
中使用这些适配器类,只需要简单地扩展它们,并且重写那些你想要自定义的方法。
6.1.6 资源管理
每当通过调用ChannelInboundHandler.channelRead
或者ChannelOutbound- Handler.write
方法来处理数据时,你都需要确保没有任何的资源泄漏。你可能还记得在前面的章节中所提到的,Netty使用引用计数来处理池化的ByteBuf
。所以在完全使用完某个ByteBuf
后,调整其引用计数是很重要的。
为了帮助你诊断潜在的(资源泄漏)问题,Netty提供了class ResourceLeakDetector
[3],它将对你应用程序的缓冲区分配做大约1%的采样来检测内存泄露。相关的开销是非常小的。
如果检测到了内存泄露,将会产生类似于下面的日志消息:
LEAK: ByteBuf.release was not called before it's garbage-collected. Enableadvanced leak reporting to find out where the leak occurred. To enableadvanced leak reporting, specify the JVM option'-Dio.netty.leakDetectionLevel=ADVANCED' or callResourceLeakDetector.setLevel.
Netty目前定义了4种泄漏检测级别,如表6-5所示。
表6-5 泄漏检测级别
级 别
描 述
DISABLED
禁用泄漏检测。只有在详尽的测试之后才应设置为这个值
SIMPLE
使用1%的默认采样率检测并报告任何发现的泄露。这是默认级别,适合绝大部分的情况
ADVANCED
使用默认的采样率,报告所发现的任何的泄露以及对应的消息被访问的位置
PARANOID
类似于ADVANCED
,但是其将会对每次(对消息的)访问都进行采样。这对性能将会有很大的影响,应该只在调试阶段使用
泄露检测级别可以通过将下面的Java系统属性设置为表中的一个值来定义:
java -Dio.netty.leakDetectionLevel=ADVANCED
如果带着该JVM选项重新启动你的应用程序,你将看到自己的应用程序最近被泄漏的缓冲区被访问的位置。下面是一个典型的由单元测试产生的泄漏报告:
Running io.netty.handler.codec.xml.XmlFrameDecoderTest15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release was not called before it's garbage-collected.Recent access records: 1#1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString( AdvancedLeakAwareByteBuf.java:697)io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml( XmlFrameDecoderTest.java:157)io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages( XmlFrameDecoderTest.java:133)...
实现ChannelInboundHandler.channelRead
和ChannelOutboundHandler.write
方法时,应该如何使用这个诊断工具来防止泄露呢?让我们看看你的channelRead
操作直接消费入站消息的情况;也就是说,它不会通过调用ChannelHandlerContext.fireChannelRead
方法将入站消息转发给下一个ChannelInboundHandler
。代码清单6-3展示了如何释放消息。
代码清单6-3 消费并释放入站消息
@Sharablepublic class DiscardInboundHandler extends ChannelInboundHandlerAdapter { ← -- 扩展了ChannelInboundandlerAdapter @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ReferenceCountUtil.release(msg); ← -- 通过调用ReferenceCountUtil.release方法释放资源 }}
消费入站消息的简单方式 由于消费入站数据是一项常规任务,所以Netty提供了一个特殊的被称为
SimpleChannelInboundHandler
的ChannelInboundHandler
实现。这个实现会在消息被channelRead0
方法消费之后自动释放消息。
在出站方向这边,如果你处理了write
操作并丢弃了一个消息,那么你也应该负责释放它。代码清单6-4展示了一个丢弃所有的写入数据的实现。
代码清单6-4 丢弃并释放出站消息
@Sharablepublic class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter { ← -- 扩展了ChannelOutboundHandlerAdapter @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { ReferenceCountUtil.release(msg); ← -- 通过使用R eferenceCountUtil.realse(...)方法释放资源 promise.setSuccess; ← -- 通知ChannelPromise数据已经被处理了 }}
重要的是,不仅要释放资源,还要通知ChannelPromise
。否则可能会出现Channel-FutureListener
收不到某个消息已经被处理了的通知的情况。
总之,如果一个消息被消费或者丢弃了,并且没有传递给ChannelPipeline
中的下一个ChannelOutboundHandler
,那么用户就有责任调用ReferenceCountUtil.release
。如果消息到达了实际的传输层,那么当它被写入时或者Channel
关闭时,都将被自动释放。
6.2 ChannelPipeline接口
如果你认为ChannelPipeline
是一个拦截流经Channel
的入站和出站事件的Channel-Handler
实例链,那么就很容易看出这些ChannelHandler
之间的交互是如何组成一个应用程序数据和事件处理逻辑的核心的。
每一个新创建的Channel
都将会被分配一个新的ChannelPipeline
。这项关联是永久性的;Channel
既不能附加另外一个ChannelPipeline
,也不能分离其当前的。在Netty组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。
根据事件的起源,事件将会被ChannelInboundHandler
或者ChannelOutboundHandler
处理。随后,通过调用ChannelHandlerContext
实现,它将被转发给同一超类型的下一个ChannelHandler
。
ChannelHandlerContext
ChannelHandlerContext
使得ChannelHandler
能够和它的ChannelPipeline
以及其他的ChannelHandler
交互。ChannelHandler
可以通知其所属的ChannelPipeline
中的下一个ChannelHandler
,甚至可以动态修改它所属的ChannelPipeline[4]
。
ChannelHandlerContext
具有丰富的用于处理事件和执行I/O操作的API。6.3节将提供有关ChannelHandlerContext
的更多内容。
图6-3展示了一个典型的同时具有入站和出站ChannelHandler
的ChannelPipeline
的布局,并且印证了我们之前的关于ChannelPipeline
主要由一系列的ChannelHandler
所组成的说法。ChannelPipeline
还提供了通过ChannelPipeline
本身传播事件的方法。如果一个入站事件被触发,它将被从ChannelPipeline
的头部开始一直被传播到Channel Pipeline
的尾端。在图6-3中,一个出站I/O事件将从ChannelPipeline
的最右边开始,然后向左传播。
图6-3 ChannelPipeline
和它的ChannelHandler
ChannelPipeline相对论
你可能会说,从事件途经
ChannelPipeline
的角度来看,ChannelPipeline
的头部和尾端取决于该事件是入站的还是出站的。然而Netty总是将ChannelPipeline
的入站口(图6-3中的左侧)作为头部,而将出站口(该图的右侧)作为尾端。当你完成了通过调用
ChannelPipeline.add*
方法将入站处理器(ChannelInboundHandler
)和出站处理器(ChannelOutboundHandler
)混合添加到ChannelPipeline
之后,每一个ChannelHandler
从头部到尾端的顺序位置正如同我们方才所定义它们的一样。因此,如果你将图6-3中的处理器(ChannelHandler
)从左到右进行编号,那么第一个被入站事件看到的ChannelHandler
将是1,而第一个被出站事件看到的ChannelHandler
将是5。
在ChannelPipeline
传播事件时,它会测试ChannelPipeline
中的下一个Channel- Handler
的类型是否和事件的运动方向相匹配。如果不匹配,ChannelPipeline
将跳过该ChannelHandler
并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。(当然,ChannelHandler
也可以同时实现ChannelInboundHandler
接口和ChannelOutbound- Handler
接口。)
6.2.1 修改ChannelPipeline
ChannelHandler
可以通过添加、删除或者替换其他的ChannelHandler
来实时地修改ChannelPipeline
的布局。(它也可以将它自己从ChannelPipeline
中移除。)这是Channel- Handler
最重要的能力之一,所以我们将仔细地来看看它是如何做到的。表6-6列出了相关的方法。
表6-6 ChannelPipeline
上的相关方法,由ChannelHandler
用来修改ChannelPipeline
的布局
名 称
描 述
addFirst
addBefore
addAfter
addLast
将一个ChannelHandler
添加到ChannelPipeline
中
remove
将一个ChannelHandler
从ChannelPipeline
中移除
replace
将ChannelPipeline
中的一个ChannelHandler
替换为另一个Channel- Handler
代码清单6-5展示了这些方法的使用。
代码清单6-5 修改ChannelPipeline
ChannelPipeline pipeline = ..;FirstHandler firstHandler = new FirstHandler; ← -- 创建一个FirstHandler 的实例pipeline.addLast("handler1", firstHandler); ← -- 将该实例作为"handler1" 添加到ChannelPipeline 中pipeline.addFirst("handler2", new SecondHandler); ← -- 将一个SecondHandler的实例作为"handler2"添加到ChannelPipeline的第一个槽中。这意味着它将被放置在已有的"handler1"之前 pipeline.addLast("handler3", new ThirdHandler); ← -- 将一个ThirdHandler 的实例作为"handler3"添加到ChannelPipeline 的最后一个槽中 ...pipeline.remove("handler3"); ← -- 通过名称移除"handler3" pipeline.remove(firstHandler); ← -- 通过引 用移除FirstHandler(它是唯一的,所以不需要它的名称) pipeline.replace("handler2", "handler4", new ForthHandler); ← -- 将SecondHandler("handler2")替换为FourthHandler:"handler4"
稍后,你将看到,重组ChannelHandler
的这种能力使我们可以用它来轻松地实现极其灵活的逻辑。
ChannelHandler的执行和阻塞
通常
ChannelPipeline
中的每一个ChannelHandler
都是通过它的EventLoop
(I/O线程)来处理传递给它的事件的。所以至关重要的是不要阻塞这个线程,因为这会对整体的I/O处理产生负面的影响。但有时可能需要与那些使用阻塞API的遗留代码进行交互。对于这种情况,
ChannelPipeline
有一些接受一个EventExecutorGroup
的add
方法。如果一个事件被传递给一个自定义的EventExecutor- Group
,它将被包含在这个EventExecutorGroup
中的某个EventExecutor
所处理,从而被从该Channel
本身的EventLoop
中移除。对于这种用例,Netty提供了一个叫DefaultEventExecutor- Group
的默认实现。
除了这些操作,还有别的通过类型或者名称来访问ChannelHandler
的方法。这些方法都列在了表6-7中。
表6-7 ChannelPipeline
的用于访问ChannelHandler
的操作
名 称
描 述
get
通过类型或者名称返回ChannelHandler
context
返回和ChannelHandler
绑定的ChannelHandlerContext
names
返回ChannelPipeline
中所有ChannelHandler
的名称
6.2.2 触发事件
ChannelPipeline
的API公开了用于调用入站和出站操作的附加方法。表6-8列出了入站操作,用于通知ChannelInboundHandler
在ChannelPipeline
中所发生的事件。
表6-8 ChannelPipeline
的入站操作
方 法 名 称
描 述
fireChannelRegistered
调用ChannelPipeline
中下一个ChannelInboundHandler
的channelRegistered(ChannelHandlerContext)
方法
fireChannelUnregistered
调用ChannelPipeline
中下一个ChannelInboundHandler
的channelUnregistered(ChannelHandlerContext)
方法
fireChannelActive
调用ChannelPipeline
中下一个ChannelInboundHandler
的channelActive(ChannelHandlerContext)
方法
fireChannelInactive
调用ChannelPipeline
中下一个ChannelInboundHandler
的channelInactive(ChannelHandlerContext)
方法
fireExceptionCaught
调用ChannelPipeline
中下一个ChannelInboundHandler
的exceptionCaught(ChannelHandlerContext, Throwable)
方法
fireUserEventTriggered
调用ChannelPipeline
中下一个ChannelInboundHandler
的userEventTriggered(ChannelHandlerContext, Object)
方法
fireChannelRead
调用ChannelPipeline
中下一个ChannelInboundHandler
的channelRead(ChannelHandlerContext, Object msg)
方法
fireChannelReadComplete
调用ChannelPipeline
中下一个ChannelInboundHandler
的channelReadComplete(ChannelHandlerContext)
方法
fireChannelWritability - Changed
调用ChannelPipeline
中下一个ChannelInboundHandler
的channelWritabilityChanged(ChannelHandlerContext)方
法
在出站这边,处理事件将会导致底层的套接字上发生一系列的动作。表6-9列出了Channel- Pipeline
API的出站操作。
表6-9 ChannelPipeline
的出站操作
方 法 名 称
描 述
bind
将Channel
绑定到一个本地地址,这将调用ChannelPipeline
中的下一个ChannelOutboundHandler
的bind(ChannelHandlerContext, Socket- Address, ChannelPromise)
方法
connect
将Channel
连接到一个远程地址,这将调用ChannelPipeline
中的下一个ChannelOutboundHandler
的connect(ChannelHandlerContext, Socket- Address, ChannelPromise)
方法
disconnect
将Channel
断开连接。这将调用ChannelPipeline
中的下一个ChannelOutbound- Handler
的disconnect(ChannelHandlerContext, Channel Promise)
方法
close
将Channel
关闭。这将调用ChannelPipeline
中的下一个ChannelOutbound- Handler
的close(ChannelHandlerContext, ChannelPromise)
方法
deregister
将Channel
从它先前所分配的EventExecutor
(即EventLoop
)中注销。这将调用ChannelPipeline
中的下一个ChannelOutboundHandler
的deregister (ChannelHandlerContext, ChannelPromise)
方法
flush
冲刷Channel
所有挂起的写入。这将调用ChannelPipeline
中的下一个Channel- OutboundHandler
的flush(ChannelHandlerContext)
方法
write
将消息写入Channel
。这将调用ChannelPipeline
中的下一个Channel- OutboundHandler
的write(ChannelHandlerContext, Object msg, Channel- Promise)
方法。注意:这并不会将消息写入底层的Socket
,而只会将它放入队列中。要将它写入Socket
,需要调用flush
或者writeAndFlush
方法
writeAndFlush
这是一个先调用write
方法再接着调用flush
方法的便利方法
read
请求从Channel
中读取更多的数据。这将调用ChannelPipeline
中的下一个ChannelOutboundHandler
的read(ChannelHandlerContext)
方法
总结一下:
ChannelPipeline
保存了与Channel
相关联的ChannelHandler
;ChannelPipeline
可以根据需要,通过添加或者删除ChannelHandler
来动态地修改;ChannelPipeline
有着丰富的API用以被调用,以响应入站和出站事件。
6.3 ChannelHandlerContext接口
ChannelHandlerContext
代表了ChannelHandler
和ChannelPipeline
之间的关联,每当有ChannelHandler
添加到ChannelPipeline
中时,都会创建ChannelHandler- Context
。ChannelHandlerContext
的主要功能是管理它所关联的ChannelHandler
和在同一个ChannelPipeline
中的其他ChannelHandler
之间的交互。
ChannelHandlerContext
有很多的方法,其中一些方法也存在于Channel
和Channel- Pipeline
本身上,但是有一点重要的不同。如果调用Channel
或者ChannelPipeline
上的这些方法,它们将沿着整个ChannelPipeline
进行传播。而调用位于ChannelHandlerContext
上的相同方法,则将从当前所关联的ChannelHandler
开始,并且只会传播给位于该ChannelPipeline
中的下一个能够处理该事件的ChannelHandler
。
表6-10对ChannelHandlerContext
API进行了总结。
表6-10 ChannelHandlerContext
的API
方 法 名 称
描 述
alloc
返回和这个实例相关联的Channel
所配置的ByteBufAllocator
bind
绑定到给定的SocketAddress
,并返回ChannelFuture
channel
返回绑定到这个实例的Channel
close
关闭Channel
,并返回ChannelFuture
connect
连接给定的SocketAddress
,并返回ChannelFuture
deregister
从之前分配的EventExecutor
注销,并返回ChannelFuture
disconnect
从远程节点断开,并返回ChannelFuture
executor
返回调度事件的EventExecutor
fireChannelActive
触发对下一个ChannelInboundHandler
上的channelActive
方法(已连接)的调用
fireChannelInactive
触发对下一个ChannelInboundHandler
上的channelInactive
方法(已关闭)的调用
fireChannelRead
触发对下一个ChannelInboundHandler
上的channelRead
方法(已接收的消息)的调用
fireChannelReadComplete
触发对下一个ChannelInboundHandler
上的channelReadComplete
方法的调用
fireChannelRegistered
触发对下一个ChannelInboundHandler
上的fireChannelRegistered
方法的调用
fireChannelUnregistered
触发对下一个ChannelInboundHandler
上的fireChannelUnregistered
方法的调用
fireChannelWritabilityChanged
触发对下一个ChannelInboundHandler
上的fireChannelWritabilityChanged
方法的调用
fireExceptionCaught
触发对下一个ChannelInboundHandler
上的fireExceptionCaught(Throwable)
方法的调用
fireUserEventTriggered
触发对下一个ChannelInboundHandler
上的fireUserEventTriggered(Object evt)
方法的调用
handler
返回绑定到这个实例的ChannelHandler
isRemoved
如果所关联的ChannelHandler
已经被从ChannelPipeline中
移除则返回true
name
返回这个实例的唯一名称
pipeline
返回这个实例所关联的ChannelPipeline
read
将数据从Channel
读取到第一个入站缓冲区;如果读取成功则触发[5]一个channelRead
事件,并(在最后一个消息被读取完成后)通知ChannelInboundHandler的channelReadComplete (ChannelHandlerContext)方法
write
通过这个实例写入消息并经过ChannelPipeline
writeAndFlush
通过这个实例写入并冲刷消息并经过ChannelPipeline
当使用ChannelHandlerContext
的API的时候,请牢记以下两点:
ChannelHandlerContext
和ChannelHandler
之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;- 如同我们在本节开头所解释的一样,相对于其他类的同名方法,
ChannelHandlerContext
的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。
6.3.1 使用ChannelHandlerContext
在这一节中我们将讨论ChannelHandlerContext
的用法,以及存在于ChannelHandler- Context
、Channel
和ChannelPipeline
上的方法的行为。图6-4展示了它们之间的关系。
图6-4 Channel
、ChannelPipeline
、ChannelHandler
以及ChannelHandlerContext
之间的关系
在代码清单6-6中,将通过ChannelHandlerContext获取到Channel的引用。调用Channel上的write
方法将会导致写入事件从尾端到头部地流经ChannelPipeline
。
代码清单6-6 从ChannelHandlerContext
访问Channel
ChannelHandlerContext ctx = ..; Channel channel = ctx.channel; ← -- 获取到与ChannelHandlerContext相关联的Channel 的引用channel.write(Unpooled.copiedBuffer("Netty in Action", ← -- 通过Channel 写入缓冲区 CharsetUtil.UTF_8));
代码清单6-7展示了一个类似的例子,但是这一次是写入ChannelPipeline
。我们再次看到,(到ChannelPipline
的)引用是通过ChannelHandlerContext
获取的。
代码清单6-7 通过ChannelHandlerContext
访问ChannelPipeline
ChannelHandlerContext ctx = ..;ChannelPipeline pipeline = ctx.pipeline; ← -- 获取到与ChannelHandlerContext相关联的ChannelPipeline 的引用pipeline.write(Unpooled.copiedBuffer("Netty in Action", ← -- 通过ChannelPipeline写入缓冲区 CharsetUtil.UTF_8));
如同在图6-5中所能够看到的一样,代码清单6-6和代码清单6-7中的事件流是一样的。重要的是要注意到,虽然被调用的Channel或ChannelPipeline
上的write
方法将一直传播事件通过整个ChannelPipeline
,但是在ChannelHandler
的级别上,事件从一个ChannelHandler
到下一个ChannelHandler
的移动是由ChannelHandlerContext
上的调用完成的。
图6-5 通过Channel
或者ChannelPipeline
进行的事件传播
为什么会想要从ChannelPipeline
中的某个特定点开始传播事件呢?
- 为了减少将事件传经对它不感兴趣的
ChannelHandler
所带来的开销。 - 为了避免将事件传经那些可能会对它感兴趣的
ChannelHandler
。
要想调用从某个特定的ChannelHandler
开始的处理过程,必须获取到在(Channel- Pipeline
)该ChannelHandler
之前的ChannelHandler
所关联的ChannelHandler- Context
。这个ChannelHandlerContext
将调用和它所关联的ChannelHandler
之后的ChannelHandler
。
代码清单6-8和图6-6说明了这种用法。
代码清单6-8 调用ChannelHandlerContext
的write
方法
ChannelHandlerContext ctx = ..; ← -- 获取到ChannelHandlerContext的引用ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8)); ← -- write方法将把缓冲区数据发送到下一个ChannelHandler
如图6-6所示,消息将从下一个ChannelHandler
开始流经ChannelPipeline
,绕过了所有前面的ChannelHandler
。
图6-6 通过ChannelHandlerContext
触发的操作的事件流
我们刚才所描述的用例是常见的,对于调用特定的ChannelHandler
实例上的操作尤其有用。
6.3.2 ChannelHandler和ChannelHandlerContext的高级用法
正如我们在代码清单6-6中所看到的,你可以通过调用ChannelHandlerContext
上的pipeline
方法来获得被封闭的ChannelPipeline
的引用。这使得运行时得以操作ChannelPipeline
的ChannelHandler
,我们可以利用这一点来实现一些复杂的设计。例如,你可以通过将ChannelHandler
添加到ChannelPipeline
中来实现动态的协议切换。
另一种高级的用法是缓存到ChannelHandlerContext
的引用以供稍后使用,这可能会发生在任何的ChannelHandler
方法之外,甚至来自于不同的线程。代码清单6-9展示了用这种模式来触发事件。
代码清单6-9 缓存到ChannelHandlerContext
的引用
public class WriteHandler extends ChannelHandlerAdapter { private ChannelHandlerContext ctx; @Override public void handlerAdded(ChannelHandlerContext ctx) { this.ctx = ctx; ← -- 存储到ChannelHandlerContext的引用以供稍后使用 } public void send(String msg) { ← -- 使用之前存储的到ChannelHandlerContext的引用来发送消息 ctx.writeAndFlush(msg); }}
因为一个ChannelHandler
可以从属于多个ChannelPipeline
,所以它也可以绑定到多个ChannelHandlerContext
实例。对于这种用法指在多个ChannelPipeline
中共享同一个ChannelHandler
,对应的ChannelHandler
必须要使用@Sharable
注解标注;否则,试图将它添加到多个ChannelPipeline
时将会触发异常。显而易见,为了安全地被用于多个并发的Channel
(即连接),这样的ChannelHandler
必须是线程安全的。
代码清单6-10展示了这种模式的一个正确实现。
代码清单6-10 可共享的ChannelHandler
@Sharable public class SharableHandler extends ChannelInboundHandlerAdapter { ← -- 使用注解@Sharable标注 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("Channel read message: " + msg); ctx.fireChannelRead(msg); ← -- 记录方法调用,并转发给下一个ChannelHandler }}
前面的ChannelHandler
实现符合所有的将其加入到多个ChannelPipeline
的需求,即它使用了注解@Sharable
标注,并且也不持有任何的状态。相反,代码清单6-11中的实现将会导致问题。
代码清单6-11 @Sharable
的错误用法
@Sharable ← -- 使用注解@Sharable标注public class UnsharableHandler extends ChannelInboundHandlerAdapter { private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { count++; ← -- 将count 字段的值加1 System.out.println("channelRead(...) called the " + count + " time"); ← -- 记录方法调用,并转发给下一个ChannelHandler ctx.fireChannelRead(msg); }}
这段代码的问题在于它拥有状态[6],即用于跟踪方法调用次数的实例变量count
。将这个类的一个实例添加到ChannelPipeline
将极有可能在它被多个并发的Channel
访问时导致问题。(当然,这个简单的问题可以通过使channelRead
方法变为同步方法来修正。)
总之,只应该在确定了你的ChannelHandler
是线程安全的时才使用@Sharable
注解。
为何要共享同一个
ChannelHandler
在多个ChannelPipeline
中安装同一个ChannelHandler
的一个常见的原因是用于收集跨越多个Channel
的统计信息。
我们对于ChannelHandlerContext
和它与其他的框架组件之间的关系的讨论到此就结束了。接下来我们将看看异常处理。
6.4 异常处理
异常处理是任何真实应用程序的重要组成部分,它也可以通过多种方式来实现。因此,Netty提供了几种方式用于处理入站或者出站处理过程中所抛出的异常。这一节将帮助你了解如何设计最适合你需要的方式。
6.4.1 处理入站异常
如果在处理入站事件的过程中有异常被抛出,那么它将从它在ChannelInboundHandler
里被触发的那一点开始流经ChannelPipeline
。要想处理这种类型的入站异常,你需要在你的ChannelInboundHandler
实现中重写下面的方法。
public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause) throws Exception
代码清单6-12展示了一个简单的示例,其关闭了Channel
并打印了异常的栈跟踪信息。
代码清单6-12 基本的入站异常处理
public class InboundExceptionHandler extends ChannelInboundHandlerAdapter { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace; ctx.close; }}
因为异常将会继续按照入站方向流动(就像所有的入站事件一样),所以实现了前面所示逻辑的ChannelInboundHandler
通常位于ChannelPipeline
的最后。这确保了所有的入站异常都总是会被处理,无论它们可能会发生在ChannelPipeline
中的什么位置。
你应该如何响应异常,可能很大程度上取决于你的应用程序。你可能想要关闭Channel
(和连接),也可能会尝试进行恢复。如果你不实现任何处理入站异常的逻辑(或者没有消费该异常),那么Netty将会记录该异常没有被处理的事实[7]。
总结一下:
ChannelHandler.exceptionCaught
的默认实现是简单地将当前异常转发给ChannelPipeline
中的下一个ChannelHandler
;- 如果异常到达了
ChannelPipeline
的尾端,它将会被记录为未被处理; - 要想定义自定义的处理逻辑,你需要重写
exceptionCaught
方法。然后你需要决定是否需要将该异常传播出去。
6.4.2 处理出站异常
用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制。
- 每个出站操作都将返回一个
ChannelFuture
。注册到ChannelFuture
的Channel- FutureListener
将在操作完成时被通知该操作是成功了还是出错了。 - 几乎所有的
ChannelOutboundHandler
上的方法都会传入一个ChannelPromise
的实例。作为ChannelFuture
的子类,ChannelPromise
也可以被分配用于异步通知的监听器。但是,ChannelPromise
还具有提供立即通知的可写方法:
ChannelPromise setSuccess; ChannelPromise setFailure(Throwable cause);
添加ChannelFutureListener
只需要调用ChannelFuture
实例上的addListener(ChannelFutureListener)
方法,并且有两种不同的方式可以做到这一点。其中最常用的方式是,调用出站操作(如write
方法)所返回的ChannelFuture
上的addListener
方法。
代码清单6-13使用了这种方式来添加ChannelFutureListener
,它将打印栈跟踪信息并且随后关闭Channel
。
代码清单6-13 添加ChannelFutureListener
到ChannelFuture
ChannelFuture future = channel.write(someMessage);future.addListener(new ChannelFutureListener { @Override public void operationComplete(ChannelFuture f) { if (!f.isSuccess) { f.cause.printStackTrace; f.channel.close; } }});
第二种方式是将ChannelFutureListener
添加到即将作为参数传递给ChannelOut- boundHandler
的方法的ChannelPromise
。代码清单6-14中所展示的代码和代码清单6-13中所展示的具有相同的效果。
代码清单6-14 添加ChannelFutureListener
到ChannelPromise
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { promise.addListener(new ChannelFutureListener { @Override public void operationComplete(ChannelFuture f) { if (!f.isSuccess) { f.cause.printStackTrace; f.channel.close; } } }); }}
ChannelPromise的可写方法
通过调用
ChannelPromise
上的setSuccess
和setFailure
方法,可以使一个操作的状态在ChannelHandler
的方法返回给其调用者时便即刻被感知到。
为何选择一种方式而不是另一种呢?对于细致的异常处理,你可能会发现,在调用出站操作时添加ChannelFutureListener
更合适,如代码清单6-13所示。而对于一般的异常处理,你可能会发现,代码清单6-14所示的自定义的ChannelOutboundHandler
实现的方式更加的简单。
如果你的ChannelOutboundHandler
本身抛出了异常会发生什么呢?在这种情况下,Netty本身会通知任何已经注册到对应ChannelPromise
的监听器。
6.5 小结
在本章中我们仔细地研究了Netty的数据处理组件——ChannelHandler
。我们讨论了ChannelHandler
是如何链接在一起,以及它们是如何作为ChannelInboundHandler
和ChannelOutboundHandler与ChannelPipeline
进行交互的。
下一章将介绍Netty的EventLoop
和并发模型,这对于理解Netty是如何实现异步的、事件驱动的网络编程模型来说至关重要。
[1] 当所有可读的字节都已经从Channel
中读取之后,将会调用该回调方法;所以,可能在channelRead- Complete
被调用之前看到多次调用channelRead(...)
。——译者注
[2] 这里借鉴的是Scala的Promise和Future的设计,当一个Promise被完成之后,其对应的Future的值便不能再进行任何修改了。——译者注
[3] 其利用了JDK提供的PhantomReference
类来实现这一点。——译者注
[4] 这里指修改ChannelPipeline
中的ChannelHandler
的编排。——译者注
[5] 通过配合ChannelConfig.setAutoRead(boolean autoRead)
方法,可以实现反应式系统的特性之一回压(back-pressure)。——译者注
[6] 主要的问题在于,对于其所持有的状态的修改并不是线程安全的,比如也可以通过使用AtomicInteger
来规避这个问题。——译者注
[7] 即Netty将会通过Warning级别的日志记录该异常到达了ChannelPipeline
的尾端,但没有被处理,并尝试释放该异常。——译者注