首页 » Netty实战 » Netty实战全文在线阅读

《Netty实战》第6章 ChannelHandler和ChannelPipeline

关灯直达底部

本章主要内容

  • ChannelHandler API和ChannelPipeline API
  • 检测资源泄漏
  • 异常处理

在上一章中你学习了ByteBuf——Netty的数据容器。当我们在本章中探讨Netty的数据流以及处理组件时,我们将基于已经学过的东西,并且你将开始看到框架的重要元素都结合到了一起。

你已经知道,可以在ChannelPipeline中将ChannelHandler链接在一起以组织处理逻辑。我们将会研究涉及这些类的各种用例,以及一个重要的关系——ChannelHandlerContext

理解所有这些组件之间的交互对于通过Netty构建模块化的、可重用的实现至关重要。

6.1 ChannelHandler家族

在我们开始详细地学习ChannelHandler之前,我们将在Netty的组件模型的这部分基础上花上一些时间。

6.1.1 Channel的生命周期

Interface Channel定义了一组和ChannelInboundHandlerAPI密切相关的简单但功能强大的状态模型,表6-1列出了Channel的这4个状态。

表6-1 Channel的生命周期状态

状  态

描  述

ChannelUnregistered

Channel已经被创建,但还未注册到EventLoop

ChannelRegistered

Channel已经被注册到了EventLoop

ChannelActive

Channel处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了

ChannelInactive

Channel没有连接到远程节点

Channel的正常生命周期如图6-1所示。当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给ChannelPipeline中的ChannelHandler,其可以随后对它们做出响应。

图6-1 Channel的状态模型

6.1.2 ChannelHandler的生命周期

表6-2中列出了interface ChannelHandler定义的生命周期操作,在ChannelHandler被添加到ChannelPipeline中或者被从ChannelPipeline中移除时会调用这些操作。这些方法中的每一个都接受一个ChannelHandlerContext参数。

表6-2 ChannelHandler的生命周期方法

类  型

描  述

handlerAdded

当把ChannelHandler添加到ChannelPipeline中时被调用

handlerRemoved

当从ChannelPipeline中移除ChannelHandler时被调用

exceptionCaught

当处理过程中在ChannelPipeline中有错误产生时被调用

Netty定义了下面两个重要的ChannelHandler子接口:

  • ChannelInboundHandler——处理入站数据以及各种状态变化;
  • ChannelOutboundHandler——处理出站数据并且允许拦截所有的操作。

在接下来的章节中,我们将详细地讨论这些子接口。

6.1.3 ChannelInboundHandler接口

表6-3列出了interface ChannelInboundHandler的生命周期方法。这些方法将会在数据被接收时或者与其对应的Channel状态发生改变时被调用。正如我们前面所提到的,这些方法和Channel的生命周期密切相关。

表6-3 ChannelInboundHandler的方法

类  型

描  述

channelRegistered

Channel已经注册到它的EventLoop并且能够处理I/O时被调用

channelUnregistered

Channel从它的EventLoop注销并且无法处理任何I/O时被调用

channelActive

Channel处于活动状态时被调用;Channel已经连接/绑定并且已经就绪

channelInactive

Channel离开活动状态并且不再连接它的远程节点时被调用

channelReadComplete

Channel上的一个读操作完成时被调用[1]

channelRead

当从Channel读取数据时被调用

ChannelWritability - Changed

Channel的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生OutOfMemoryError)或者可以在Channel变为再次可写时恢复写入。可以通过调用ChannelisWritable方法来检测Channel的可写性。与可写性相关的阈值可以通过Channel.config. setWriteHighWaterMarkChannel.config.setWriteLowWater- Mark方法来设置

userEventTriggered

ChannelnboundHandler.fireUserEventTriggered方法被调用时被调用,因为一个POJO被传经了ChannelPipeline

当某个ChannelInboundHandler的实现重写channelRead方法时,它将负责显式地释放与池化的ByteBuf实例相关的内存。Netty为此提供了一个实用方法ReferenceCount-Util.release,如代码清单6-1所示。

代码清单6-1 释放消息资源

@Sharablepublic class DiscardHandler extends ChannelInboundHandlerAdapter {   ← --  扩展了Channel-InboundHandler-Adapter  @Override  public void channelRead(ChannelHandlerContext ctx, Object msg) {  ← --  丢弃已接收的消息    ReferenceCountUtil.release(msg);   }}  

Netty将使用WARN级别的日志消息记录未释放的资源,使得可以非常简单地在代码中发现违规的实例。但是以这种方式管理资源可能很繁琐。一个更加简单的方式是使用Simple- ChannelInboundHandler。代码清单6-2是代码清单6-1的一个变体,说明了这一点。

代码清单6-2 使用SimpleChannelInboundHandler

@Sharablepublic class SimpleDiscardHandler  extends SimpleChannelInboundHandler<Object> {  ← --   扩展了SimpleChannelInboundHandler  @Override  public void channelRead0(ChannelHandlerContext ctx,    Object msg) {    // No need to do anything special  ← --  不需要任何显式的资源释放  }}  

由于SimpleChannelInboundHandler会自动释放资源,所以你不应该存储指向任何消息的引用供将来使用,因为这些引用都将会失效。

6.1.6节为引用处理提供了更加详细的讨论。

6.1.4 ChannelOutboundHandler接口

出站操作和数据将由ChannelOutboundHandler处理。它的方法将被ChannelChannel- Pipeline以及ChannelHandlerContext调用。

ChannelOutboundHandler的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷操作并在稍后继续。

表6-4显示了所有由ChannelOutboundHandler本身所定义的方法(忽略了那些从Channel- Handler继承的方法)。

表6-4 ChannelOutboundHandler的方法

类  型

描  述

bind(ChannelHandlerContext,
SocketAddress,ChannelPromise)

当请求将Channel绑定到本地地址时被调用

connect(ChannelHandlerContext,
SocketAddress,SocketAddress,ChannelPromise)

当请求将Channel连接到远程节点时被调用

disconnect(ChannelHandlerContext,
ChannelPromise)

当请求将Channel从远程节点断开时被调用

close(ChannelHandlerContext,ChannelPromise)

当请求关闭Channel时被调用

deregister(ChannelHandlerContext,
ChannelPromise)

当请求将Channel从它的EventLoop注销时被调用

read(ChannelHandlerContext)

当请求从Channel读取更多的数据时被调用

flush(ChannelHandlerContext)

当请求通过Channel将入队数据冲刷到远程节点时被调用

write(ChannelHandlerContext,Object,
ChannelPromise)

当请求通过Channel将数据写到远程节点时被调用

ChannelPromise与ChannelFuture ChannelOutboundHandler中的大部分方法都需要一个ChannelPromise参数,以便在操作完成时得到通知。ChannelPromiseChannelFuture的一个子类,其定义了一些可写的方法,如setSuccesssetFailure,从而使ChannelFuture不可变[2]。

接下来我们将看一看那些简化了编写ChannelHandler的任务的类。

6.1.5 ChannelHandler适配器

你可以使用ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter类作为自己的ChannelHandler的起始点。这两个适配器分别提供了ChannelInboundHandlerChannelOutboundHandler的基本实现。通过扩展抽象类ChannelHandlerAdapter,它们获得了它们共同的超接口ChannelHandler的方法。生成的类的层次结构如图6-2所示。

图6-2 ChannelHandlerAdapter类的层次结构

ChannelHandlerAdapter还提供了实用方法isSharable。如果其对应的实现被标注为Sharable,那么这个方法将返回true,表示它可以被添加到多个ChannelPipeline中(如在2.3.1节中所讨论过的一样)。

ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter中所提供的方法体调用了其相关联的ChannelHandlerContext上的等效方法,从而将事件转发到了ChannelPipeline中的下一个ChannelHandler中。

你要想在自己的ChannelHandler中使用这些适配器类,只需要简单地扩展它们,并且重写那些你想要自定义的方法。

6.1.6 资源管理

每当通过调用ChannelInboundHandler.channelRead或者ChannelOutbound- Handler.write方法来处理数据时,你都需要确保没有任何的资源泄漏。你可能还记得在前面的章节中所提到的,Netty使用引用计数来处理池化的ByteBuf。所以在完全使用完某个ByteBuf后,调整其引用计数是很重要的。

为了帮助你诊断潜在的(资源泄漏)问题,Netty提供了class ResourceLeakDetector[3],它将对你应用程序的缓冲区分配做大约1%的采样来检测内存泄露。相关的开销是非常小的。

如果检测到了内存泄露,将会产生类似于下面的日志消息:

LEAK: ByteBuf.release was not called before it's garbage-collected. Enableadvanced leak reporting to find out where the leak occurred. To enableadvanced leak reporting, specify the JVM option'-Dio.netty.leakDetectionLevel=ADVANCED' or callResourceLeakDetector.setLevel.  

Netty目前定义了4种泄漏检测级别,如表6-5所示。

表6-5 泄漏检测级别

级  别

描  述

DISABLED

禁用泄漏检测。只有在详尽的测试之后才应设置为这个值

SIMPLE

使用1%的默认采样率检测并报告任何发现的泄露。这是默认级别,适合绝大部分的情况

ADVANCED

使用默认的采样率,报告所发现的任何的泄露以及对应的消息被访问的位置

PARANOID

类似于ADVANCED,但是其将会对每次(对消息的)访问都进行采样。这对性能将会有很大的影响,应该只在调试阶段使用

泄露检测级别可以通过将下面的Java系统属性设置为表中的一个值来定义:

java -Dio.netty.leakDetectionLevel=ADVANCED  

如果带着该JVM选项重新启动你的应用程序,你将看到自己的应用程序最近被泄漏的缓冲区被访问的位置。下面是一个典型的由单元测试产生的泄漏报告:

Running io.netty.handler.codec.xml.XmlFrameDecoderTest15:03:36.886 [main] ERROR io.netty.util.ResourceLeakDetector - LEAK:   ByteBuf.release was not called before it's garbage-collected.Recent access records: 1#1: io.netty.buffer.AdvancedLeakAwareByteBuf.toString(  AdvancedLeakAwareByteBuf.java:697)io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithXml(  XmlFrameDecoderTest.java:157)io.netty.handler.codec.xml.XmlFrameDecoderTest.testDecodeWithTwoMessages(  XmlFrameDecoderTest.java:133)...  

实现ChannelInboundHandler.channelReadChannelOutboundHandler.write方法时,应该如何使用这个诊断工具来防止泄露呢?让我们看看你的channelRead操作直接消费入站消息的情况;也就是说,它不会通过调用ChannelHandlerContext.fireChannelRead方法将入站消息转发给下一个ChannelInboundHandler。代码清单6-3展示了如何释放消息。

代码清单6-3 消费并释放入站消息

@Sharablepublic class DiscardInboundHandler extends ChannelInboundHandlerAdapter {  ← --  扩展了ChannelInboundandlerAdapter   @Override   public void channelRead(ChannelHandlerContext ctx, Object msg) {     ReferenceCountUtil.release(msg);  ← -- 通过调用ReferenceCountUtil.release方法释放资源   }}  

消费入站消息的简单方式 由于消费入站数据是一项常规任务,所以Netty提供了一个特殊的被称为SimpleChannelInboundHandlerChannelInboundHandler实现。这个实现会在消息被channelRead0方法消费之后自动释放消息。

在出站方向这边,如果你处理了write操作并丢弃了一个消息,那么你也应该负责释放它。代码清单6-4展示了一个丢弃所有的写入数据的实现。

代码清单6-4 丢弃并释放出站消息

@Sharablepublic class DiscardOutboundHandler  extends ChannelOutboundHandlerAdapter {   ← --  扩展了ChannelOutboundHandlerAdapter  @Override  public void write(ChannelHandlerContext ctx,    Object msg, ChannelPromise promise) {    ReferenceCountUtil.release(msg);  ← -- 通过使用R eferenceCountUtil.realse(...)方法释放资源    promise.setSuccess;   ← -- 通知ChannelPromise数据已经被处理了  }}  

重要的是,不仅要释放资源,还要通知ChannelPromise。否则可能会出现Channel-FutureListener收不到某个消息已经被处理了的通知的情况。

总之,如果一个消息被消费或者丢弃了,并且没有传递给ChannelPipeline中的下一个ChannelOutboundHandler,那么用户就有责任调用ReferenceCountUtil.release。如果消息到达了实际的传输层,那么当它被写入时或者Channel关闭时,都将被自动释放。

6.2 ChannelPipeline接口

如果你认为ChannelPipeline是一个拦截流经Channel的入站和出站事件的Channel-Handler实例链,那么就很容易看出这些ChannelHandler之间的交互是如何组成一个应用程序数据和事件处理逻辑的核心的。

每一个新创建的Channel都将会被分配一个新的ChannelPipeline。这项关联是永久性的;Channel既不能附加另外一个ChannelPipeline,也不能分离其当前的。在Netty组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。

根据事件的起源,事件将会被ChannelInboundHandler或者ChannelOutboundHandler处理。随后,通过调用ChannelHandlerContext实现,它将被转发给同一超类型的下一个ChannelHandler

ChannelHandlerContext

ChannelHandlerContext使得ChannelHandler能够和它的ChannelPipeline以及其他的ChannelHandler交互。ChannelHandler可以通知其所属的ChannelPipeline中的下一个ChannelHandler,甚至可以动态修改它所属的ChannelPipeline[4]

ChannelHandlerContext具有丰富的用于处理事件和执行I/O操作的API。6.3节将提供有关ChannelHandlerContext的更多内容。

图6-3展示了一个典型的同时具有入站和出站ChannelHandlerChannelPipeline的布局,并且印证了我们之前的关于ChannelPipeline主要由一系列的ChannelHandler所组成的说法。ChannelPipeline还提供了通过ChannelPipeline本身传播事件的方法。如果一个入站事件被触发,它将被从ChannelPipeline的头部开始一直被传播到Channel Pipeline的尾端。在图6-3中,一个出站I/O事件将从ChannelPipeline的最右边开始,然后向左传播。

图6-3 ChannelPipeline和它的ChannelHandler

ChannelPipeline相对论

你可能会说,从事件途经ChannelPipeline的角度来看,ChannelPipeline的头部和尾端取决于该事件是入站的还是出站的。然而Netty总是将ChannelPipeline的入站口(图6-3中的左侧)作为头部,而将出站口(该图的右侧)作为尾端。

当你完成了通过调用ChannelPipeline.add*方法将入站处理器(ChannelInboundHandler)和出站处理器(ChannelOutboundHandler)混合添加到ChannelPipeline之后,每一个ChannelHandler从头部到尾端的顺序位置正如同我们方才所定义它们的一样。因此,如果你将图6-3中的处理器(ChannelHandler)从左到右进行编号,那么第一个被入站事件看到的ChannelHandler将是1,而第一个被出站事件看到的ChannelHandler将是5。

ChannelPipeline传播事件时,它会测试ChannelPipeline中的下一个Channel- Handler的类型是否和事件的运动方向相匹配。如果不匹配,ChannelPipeline将跳过该ChannelHandler并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。(当然,ChannelHandler也可以同时实现ChannelInboundHandler接口和ChannelOutbound- Handler接口。)

6.2.1 修改ChannelPipeline

ChannelHandler可以通过添加、删除或者替换其他的ChannelHandler来实时地修改ChannelPipeline的布局。(它也可以将它自己从ChannelPipeline中移除。)这是Channel- Handler最重要的能力之一,所以我们将仔细地来看看它是如何做到的。表6-6列出了相关的方法。

表6-6 ChannelPipeline上的相关方法,由ChannelHandler用来修改ChannelPipeline的布局

名  称

描  述

addFirst
addBefore
addAfter
addLast

将一个ChannelHandler添加到ChannelPipeline

remove

将一个ChannelHandlerChannelPipeline中移除

replace

ChannelPipeline中的一个ChannelHandler替换为另一个Channel- Handler

代码清单6-5展示了这些方法的使用。

代码清单6-5 修改ChannelPipeline

ChannelPipeline pipeline = ..;FirstHandler firstHandler = new FirstHandler;   ← --  创建一个FirstHandler 的实例pipeline.addLast("handler1", firstHandler);  ← --  将该实例作为"handler1" 添加到ChannelPipeline 中pipeline.addFirst("handler2", new SecondHandler);  ← --  将一个SecondHandler的实例作为"handler2"添加到ChannelPipeline的第一个槽中。这意味着它将被放置在已有的"handler1"之前 pipeline.addLast("handler3", new ThirdHandler);  ← --  将一个ThirdHandler 的实例作为"handler3"添加到ChannelPipeline 的最后一个槽中 ...pipeline.remove("handler3");  ← --  通过名称移除"handler3"  pipeline.remove(firstHandler);  ← --  通过引 用移除FirstHandler(它是唯一的,所以不需要它的名称) pipeline.replace("handler2", "handler4", new ForthHandler);  ← --  将SecondHandler("handler2")替换为FourthHandler:"handler4"  

稍后,你将看到,重组ChannelHandler的这种能力使我们可以用它来轻松地实现极其灵活的逻辑。

ChannelHandler的执行和阻塞

通常ChannelPipeline中的每一个ChannelHandler都是通过它的EventLoop(I/O线程)来处理传递给它的事件的。所以至关重要的是不要阻塞这个线程,因为这会对整体的I/O处理产生负面的影响。

但有时可能需要与那些使用阻塞API的遗留代码进行交互。对于这种情况,ChannelPipeline有一些接受一个EventExecutorGroupadd方法。如果一个事件被传递给一个自定义的EventExecutor- Group,它将被包含在这个EventExecutorGroup中的某个EventExecutor所处理,从而被从该Channel本身的EventLoop中移除。对于这种用例,Netty提供了一个叫DefaultEventExecutor- Group的默认实现。

除了这些操作,还有别的通过类型或者名称来访问ChannelHandler的方法。这些方法都列在了表6-7中。

表6-7 ChannelPipeline的用于访问ChannelHandler的操作

名  称

描  述

get

通过类型或者名称返回ChannelHandler

context

返回和ChannelHandler绑定的ChannelHandlerContext

names

返回ChannelPipeline中所有ChannelHandler的名称

6.2.2 触发事件

ChannelPipeline的API公开了用于调用入站和出站操作的附加方法。表6-8列出了入站操作,用于通知ChannelInboundHandlerChannelPipeline中所发生的事件。

表6-8 ChannelPipeline的入站操作

方 法 名 称

描  述

fireChannelRegistered

调用ChannelPipeline中下一个ChannelInboundHandlerchannelRegistered(ChannelHandlerContext)方法

fireChannelUnregistered

调用ChannelPipeline中下一个ChannelInboundHandlerchannelUnregistered(ChannelHandlerContext)方法

fireChannelActive

调用ChannelPipeline中下一个ChannelInboundHandlerchannelActive(ChannelHandlerContext)方法

fireChannelInactive

调用ChannelPipeline中下一个ChannelInboundHandlerchannelInactive(ChannelHandlerContext)方法

fireExceptionCaught

调用ChannelPipeline中下一个ChannelInboundHandlerexceptionCaught(ChannelHandlerContext, Throwable)方法

fireUserEventTriggered

调用ChannelPipeline中下一个ChannelInboundHandleruserEventTriggered(ChannelHandlerContext, Object)方法

fireChannelRead

调用ChannelPipeline中下一个ChannelInboundHandlerchannelRead(ChannelHandlerContext, Object msg)方法

fireChannelReadComplete

调用ChannelPipeline中下一个ChannelInboundHandlerchannelReadComplete(ChannelHandlerContext)方法

fireChannelWritability - Changed

调用ChannelPipeline中下一个ChannelInboundHandlerchannelWritabilityChanged(ChannelHandlerContext)方

在出站这边,处理事件将会导致底层的套接字上发生一系列的动作。表6-9列出了Channel- Pipeline API的出站操作。

表6-9 ChannelPipeline的出站操作

方 法 名 称

描  述

bind

Channel绑定到一个本地地址,这将调用ChannelPipeline中的下一个ChannelOutboundHandlerbind(ChannelHandlerContext, Socket- Address, ChannelPromise)方法

connect

Channel连接到一个远程地址,这将调用ChannelPipeline中的下一个ChannelOutboundHandlerconnect(ChannelHandlerContext, Socket- Address, ChannelPromise)方法

disconnect

Channel断开连接。这将调用ChannelPipeline中的下一个ChannelOutbound- Handlerdisconnect(ChannelHandlerContext, Channel Promise)方法

close

Channel关闭。这将调用ChannelPipeline中的下一个ChannelOutbound- Handlerclose(ChannelHandlerContext, ChannelPromise)方法

deregister

Channel从它先前所分配的EventExecutor(即EventLoop)中注销。这将调用ChannelPipeline中的下一个ChannelOutboundHandlerderegister (ChannelHandlerContext, ChannelPromise)方法

flush

冲刷Channel所有挂起的写入。这将调用ChannelPipeline中的下一个Channel- OutboundHandlerflush(ChannelHandlerContext)方法

write

将消息写入Channel。这将调用ChannelPipeline中的下一个Channel- OutboundHandlerwrite(ChannelHandlerContext, Object msg, Channel- Promise)方法。注意:这并不会将消息写入底层的Socket,而只会将它放入队列中。要将它写入Socket,需要调用flush或者writeAndFlush方法

writeAndFlush

这是一个先调用write方法再接着调用flush方法的便利方法

read

请求从Channel中读取更多的数据。这将调用ChannelPipeline中的下一个ChannelOutboundHandlerread(ChannelHandlerContext)方法

总结一下:

  • ChannelPipeline保存了与Channel相关联的ChannelHandler
  • ChannelPipeline可以根据需要,通过添加或者删除ChannelHandler来动态地修改;
  • ChannelPipeline有着丰富的API用以被调用,以响应入站和出站事件。

6.3 ChannelHandlerContext接口

ChannelHandlerContext代表了ChannelHandlerChannelPipeline之间的关联,每当有ChannelHandler添加到ChannelPipeline中时,都会创建ChannelHandler- ContextChannelHandlerContext的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipeline中的其他ChannelHandler之间的交互。

ChannelHandlerContext有很多的方法,其中一些方法也存在于ChannelChannel- Pipeline本身上,但是有一点重要的不同。如果调用Channel或者ChannelPipeline上的这些方法,它们将沿着整个ChannelPipeline进行传播。而调用位于ChannelHandlerContext上的相同方法,则将从当前所关联的ChannelHandler开始,并且只会传播给位于该ChannelPipeline中的下一个能够处理该事件的ChannelHandler

表6-10对ChannelHandlerContext API进行了总结。

表6-10 ChannelHandlerContext的API

方 法 名 称

描  述

alloc

返回和这个实例相关联的Channel所配置的ByteBufAllocator

bind

绑定到给定的SocketAddress,并返回ChannelFuture

channel

返回绑定到这个实例的Channel

close

关闭Channel,并返回ChannelFuture

connect

连接给定的SocketAddress,并返回ChannelFuture

deregister

从之前分配的EventExecutor注销,并返回ChannelFuture

disconnect

从远程节点断开,并返回ChannelFuture

executor

返回调度事件的EventExecutor

fireChannelActive

触发对下一个ChannelInboundHandler上的channelActive方法(已连接)的调用

fireChannelInactive

触发对下一个ChannelInboundHandler上的channelInactive方法(已关闭)的调用

fireChannelRead

触发对下一个ChannelInboundHandler上的channelRead方法(已接收的消息)的调用

fireChannelReadComplete

触发对下一个ChannelInboundHandler上的channelReadComplete方法的调用

fireChannelRegistered

触发对下一个ChannelInboundHandler上的fireChannelRegistered方法的调用

fireChannelUnregistered

触发对下一个ChannelInboundHandler上的fireChannelUnregistered方法的调用

fireChannelWritabilityChanged

触发对下一个ChannelInboundHandler上的fireChannelWritabilityChanged方法的调用

fireExceptionCaught

触发对下一个ChannelInboundHandler上的fireExceptionCaught(Throwable)方法的调用

fireUserEventTriggered

触发对下一个ChannelInboundHandler上的fireUserEventTriggered(Object evt)方法的调用

handler

返回绑定到这个实例的ChannelHandler

isRemoved

如果所关联的ChannelHandler已经被从ChannelPipeline中移除则返回true

name

返回这个实例的唯一名称

pipeline

返回这个实例所关联的ChannelPipeline

read

将数据从Channel读取到第一个入站缓冲区;如果读取成功则触发[5]一个channelRead事件,并(在最后一个消息被读取完成后)通知ChannelInboundHandler的channelReadComplete (ChannelHandlerContext)方法

write

通过这个实例写入消息并经过ChannelPipeline

writeAndFlush

通过这个实例写入并冲刷消息并经过ChannelPipeline

当使用ChannelHandlerContext的API的时候,请牢记以下两点:

  • ChannelHandlerContextChannelHandler之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;
  • 如同我们在本节开头所解释的一样,相对于其他类的同名方法,ChannelHandlerContext的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。

6.3.1 使用ChannelHandlerContext

在这一节中我们将讨论ChannelHandlerContext的用法,以及存在于ChannelHandler- ContextChannelChannelPipeline上的方法的行为。图6-4展示了它们之间的关系。

图6-4 ChannelChannelPipelineChannelHandler以及ChannelHandlerContext之间的关系

在代码清单6-6中,将通过ChannelHandlerContext获取到Channel的引用。调用Channel上的write方法将会导致写入事件从尾端到头部地流经ChannelPipeline

代码清单6-6 从ChannelHandlerContext访问Channel

ChannelHandlerContext ctx = ..; Channel channel = ctx.channel;   ← --  获取到与ChannelHandlerContext相关联的Channel 的引用channel.write(Unpooled.copiedBuffer("Netty in Action",  ← --  通过Channel 写入缓冲区  CharsetUtil.UTF_8));  

代码清单6-7展示了一个类似的例子,但是这一次是写入ChannelPipeline。我们再次看到,(到ChannelPipline的)引用是通过ChannelHandlerContext获取的。

代码清单6-7 通过ChannelHandlerContext访问ChannelPipeline

ChannelHandlerContext ctx = ..;ChannelPipeline pipeline = ctx.pipeline;   ← --  获取到与ChannelHandlerContext相关联的ChannelPipeline 的引用pipeline.write(Unpooled.copiedBuffer("Netty in Action",   ← --  通过ChannelPipeline写入缓冲区  CharsetUtil.UTF_8));  

如同在图6-5中所能够看到的一样,代码清单6-6和代码清单6-7中的事件流是一样的。重要的是要注意到,虽然被调用的Channel或ChannelPipeline上的write方法将一直传播事件通过整个ChannelPipeline,但是在ChannelHandler的级别上,事件从一个ChannelHandler到下一个ChannelHandler的移动是由ChannelHandlerContext上的调用完成的。

图6-5 通过Channel或者ChannelPipeline进行的事件传播

为什么会想要从ChannelPipeline中的某个特定点开始传播事件呢?

  • 为了减少将事件传经对它不感兴趣的ChannelHandler所带来的开销。
  • 为了避免将事件传经那些可能会对它感兴趣的ChannelHandler

要想调用从某个特定的ChannelHandler开始的处理过程,必须获取到在(Channel- Pipeline)该ChannelHandler之前的ChannelHandler所关联的ChannelHandler- Context。这个ChannelHandlerContext将调用和它所关联的ChannelHandler之后的ChannelHandler

代码清单6-8和图6-6说明了这种用法。

代码清单6-8 调用ChannelHandlerContextwrite方法

ChannelHandlerContext ctx = ..;   ← --  获取到ChannelHandlerContext的引用ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));   ← --  write方法将把缓冲区数据发送到下一个ChannelHandler  

如图6-6所示,消息将从下一个ChannelHandler开始流经ChannelPipeline,绕过了所有前面的ChannelHandler

图6-6 通过ChannelHandlerContext触发的操作的事件流

我们刚才所描述的用例是常见的,对于调用特定的ChannelHandler实例上的操作尤其有用。

6.3.2 ChannelHandler和ChannelHandlerContext的高级用法

正如我们在代码清单6-6中所看到的,你可以通过调用ChannelHandlerContext上的pipeline方法来获得被封闭的ChannelPipeline的引用。这使得运行时得以操作ChannelPipelineChannelHandler,我们可以利用这一点来实现一些复杂的设计。例如,你可以通过将ChannelHandler添加到ChannelPipeline中来实现动态的协议切换。

另一种高级的用法是缓存到ChannelHandlerContext的引用以供稍后使用,这可能会发生在任何的ChannelHandler方法之外,甚至来自于不同的线程。代码清单6-9展示了用这种模式来触发事件。

代码清单6-9 缓存到ChannelHandlerContext的引用

public class WriteHandler extends ChannelHandlerAdapter {  private ChannelHandlerContext ctx;  @Override  public void handlerAdded(ChannelHandlerContext ctx) {    this.ctx = ctx;   ← --  存储到ChannelHandlerContext的引用以供稍后使用  }  public void send(String msg) {  ← --  使用之前存储的到ChannelHandlerContext的引用来发送消息    ctx.writeAndFlush(msg);  }}  

因为一个ChannelHandler可以从属于多个ChannelPipeline,所以它也可以绑定到多个ChannelHandlerContext实例。对于这种用法指在多个ChannelPipeline中共享同一个ChannelHandler,对应的ChannelHandler必须要使用@Sharable注解标注;否则,试图将它添加到多个ChannelPipeline时将会触发异常。显而易见,为了安全地被用于多个并发的Channel(即连接),这样的ChannelHandler必须是线程安全的。

代码清单6-10展示了这种模式的一个正确实现。

代码清单6-10 可共享的ChannelHandler

@Sharable public class SharableHandler extends ChannelInboundHandlerAdapter {  ← --  使用注解@Sharable标注  @Override  public void channelRead(ChannelHandlerContext ctx, Object msg) {    System.out.println("Channel read message: " + msg);    ctx.fireChannelRead(msg);   ← --  记录方法调用,并转发给下一个ChannelHandler  }}  

前面的ChannelHandler实现符合所有的将其加入到多个ChannelPipeline的需求,即它使用了注解@Sharable标注,并且也不持有任何的状态。相反,代码清单6-11中的实现将会导致问题。

代码清单6-11 @Sharable的错误用法

@Sharable  ← --  使用注解@Sharable标注public class UnsharableHandler extends ChannelInboundHandlerAdapter {  private int count;  @Override  public void channelRead(ChannelHandlerContext ctx, Object msg) {    count++;  ← --  将count 字段的值加1    System.out.println("channelRead(...) called the "      + count + " time");   ← --  记录方法调用,并转发给下一个ChannelHandler    ctx.fireChannelRead(msg);  }}  

这段代码的问题在于它拥有状态[6],即用于跟踪方法调用次数的实例变量count。将这个类的一个实例添加到ChannelPipeline将极有可能在它被多个并发的Channel访问时导致问题。(当然,这个简单的问题可以通过使channelRead方法变为同步方法来修正。)

总之,只应该在确定了你的ChannelHandler是线程安全的时才使用@Sharable注解。

为何要共享同一个ChannelHandler 在多个ChannelPipeline中安装同一个ChannelHandler的一个常见的原因是用于收集跨越多个Channel的统计信息。

我们对于ChannelHandlerContext和它与其他的框架组件之间的关系的讨论到此就结束了。接下来我们将看看异常处理。

6.4 异常处理

异常处理是任何真实应用程序的重要组成部分,它也可以通过多种方式来实现。因此,Netty提供了几种方式用于处理入站或者出站处理过程中所抛出的异常。这一节将帮助你了解如何设计最适合你需要的方式。

6.4.1 处理入站异常

如果在处理入站事件的过程中有异常被抛出,那么它将从它在ChannelInboundHandler里被触发的那一点开始流经ChannelPipeline。要想处理这种类型的入站异常,你需要在你的ChannelInboundHandler实现中重写下面的方法。

public void exceptionCaught(  ChannelHandlerContext ctx, Throwable cause) throws Exception  

代码清单6-12展示了一个简单的示例,其关闭了Channel并打印了异常的栈跟踪信息。

代码清单6-12 基本的入站异常处理

public class InboundExceptionHandler extends ChannelInboundHandlerAdapter {  @Override  public void exceptionCaught(ChannelHandlerContext ctx,    Throwable cause) {    cause.printStackTrace;    ctx.close;  }}  

因为异常将会继续按照入站方向流动(就像所有的入站事件一样),所以实现了前面所示逻辑的ChannelInboundHandler通常位于ChannelPipeline的最后。这确保了所有的入站异常都总是会被处理,无论它们可能会发生在ChannelPipeline中的什么位置。

你应该如何响应异常,可能很大程度上取决于你的应用程序。你可能想要关闭Channel(和连接),也可能会尝试进行恢复。如果你不实现任何处理入站异常的逻辑(或者没有消费该异常),那么Netty将会记录该异常没有被处理的事实[7]。

总结一下:

  • ChannelHandler.exceptionCaught的默认实现是简单地将当前异常转发给ChannelPipeline中的下一个ChannelHandler
  • 如果异常到达了ChannelPipeline的尾端,它将会被记录为未被处理;
  • 要想定义自定义的处理逻辑,你需要重写exceptionCaught方法。然后你需要决定是否需要将该异常传播出去。

6.4.2 处理出站异常

用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制。

  • 每个出站操作都将返回一个ChannelFuture。注册到ChannelFutureChannel- FutureListener将在操作完成时被通知该操作是成功了还是出错了。
  • 几乎所有的ChannelOutboundHandler上的方法都会传入一个ChannelPromise的实例。作为ChannelFuture的子类,ChannelPromise也可以被分配用于异步通知的监听器。但是,ChannelPromise还具有提供立即通知的可写方法:
  ChannelPromise setSuccess;  ChannelPromise setFailure(Throwable cause);  

添加ChannelFutureListener只需要调用ChannelFuture实例上的addListener(ChannelFutureListener)方法,并且有两种不同的方式可以做到这一点。其中最常用的方式是,调用出站操作(如write方法)所返回的ChannelFuture上的addListener方法。

代码清单6-13使用了这种方式来添加ChannelFutureListener,它将打印栈跟踪信息并且随后关闭Channel

代码清单6-13 添加ChannelFutureListenerChannelFuture

ChannelFuture future = channel.write(someMessage);future.addListener(new ChannelFutureListener {  @Override  public void operationComplete(ChannelFuture f) {    if (!f.isSuccess) {      f.cause.printStackTrace;      f.channel.close;    }  }});  

第二种方式是将ChannelFutureListener添加到即将作为参数传递给ChannelOut- boundHandler的方法的ChannelPromise。代码清单6-14中所展示的代码和代码清单6-13中所展示的具有相同的效果。

代码清单6-14 添加ChannelFutureListenerChannelPromise

public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {  @Override  public void write(ChannelHandlerContext ctx, Object msg,    ChannelPromise promise) {    promise.addListener(new ChannelFutureListener {      @Override      public void operationComplete(ChannelFuture f) {        if (!f.isSuccess) {          f.cause.printStackTrace;          f.channel.close;        }      }    });  }}  

ChannelPromise的可写方法

通过调用ChannelPromise上的setSuccesssetFailure方法,可以使一个操作的状态在ChannelHandler的方法返回给其调用者时便即刻被感知到。

为何选择一种方式而不是另一种呢?对于细致的异常处理,你可能会发现,在调用出站操作时添加ChannelFutureListener更合适,如代码清单6-13所示。而对于一般的异常处理,你可能会发现,代码清单6-14所示的自定义的ChannelOutboundHandler实现的方式更加的简单。

如果你的ChannelOutboundHandler本身抛出了异常会发生什么呢?在这种情况下,Netty本身会通知任何已经注册到对应ChannelPromise的监听器。

6.5 小结

在本章中我们仔细地研究了Netty的数据处理组件——ChannelHandler。我们讨论了ChannelHandler是如何链接在一起,以及它们是如何作为ChannelInboundHandlerChannelOutboundHandler与ChannelPipeline进行交互的。

下一章将介绍Netty的EventLoop和并发模型,这对于理解Netty是如何实现异步的、事件驱动的网络编程模型来说至关重要。


[1] 当所有可读的字节都已经从Channel中读取之后,将会调用该回调方法;所以,可能在channelRead- Complete被调用之前看到多次调用channelRead(...)。——译者注

[2] 这里借鉴的是Scala的Promise和Future的设计,当一个Promise被完成之后,其对应的Future的值便不能再进行任何修改了。——译者注

[3] 其利用了JDK提供的PhantomReference类来实现这一点。——译者注

[4] 这里指修改ChannelPipeline中的ChannelHandler的编排。——译者注

[5] 通过配合ChannelConfig.setAutoRead(boolean autoRead)方法,可以实现反应式系统的特性之一回压(back-pressure)。——译者注

[6] 主要的问题在于,对于其所持有的状态的修改并不是线程安全的,比如也可以通过使用AtomicInteger来规避这个问题。——译者注

[7] 即Netty将会通过Warning级别的日志记录该异常到达了ChannelPipeline的尾端,但没有被处理,并尝试释放该异常。——译者注