章 Netty核心原理与基础实战

Netty是一个Java NIO客户端/服务器框架,是一个为了快速开发可维护的高性能、高可扩展的网络服务器和客户端程序而提供的异步事件驱动基础框架和工具。基于Netty,可以快速轻松地开发网络服务器和客户端的应用程序。与直接使用Java NIO相比,Netty给大家造出了一个非常优美的轮子,它可以大大简化网络编程流程。例如,Netty极大地简化了TCPUDP套接字和HTTP Web服务程序的开发。

Netty的目标之一是使通信开发可以做到快速和轻松。使用Netty,除了能快速和轻松地开发TCP/UDP等自定义协议的通信程序之外,还可以做到快速和轻松地开发应用层协议的通信程序,如FTPSMTPHTTP以及其他的传统应用层协议。

Netty的目标之二是要做到高性能、高可扩展性。基于JavaNIONetty设计了一套优秀的、高性能的Reactor模式实现,并且基于NettyReactor模式实现中的Channel(通道)、Handler(处理器)等基础类库能进行快速扩展,以支持不同协议通信、完成不同业务处理的大量应用类。

10.1 第一个Netty实战案例DiscardServer

在开始介绍Netty核心原理之前,首先为大家介绍一个非常简单的入门实战案例,这是一个丢弃服务器(DiscardServer)的简单通信案例,其作用类似于学习Java基础编程时的“Hello World”程序。

在开始编写实战案例之前,需要准备Netty的版本,并且配置好开发环境。

10.1.1 创建第一个Netty项目

首先我们需要创建项目(或者模块),这里取名为NettyDemos。第一个Netty的实战案例DiscardServer就在这个项目中进行实战开发。DiscardServer功能很简单:读取客户端的输入数据,直接丢弃,不给客户端任何回复。

在使用Netty前,需要考虑一下JDK的版本,Netty官方建议使用JDK 1.6以上,本书使用的是JDK 1.8。然后是Netty自己的版本,建议使用Netty 4.0以上的版本,本书使用的Netty版本是4.1.6

使用maven导入Netty的依赖坐标到工程(或项目),Netty的依赖坐标如下:

<dependency>

    <groupId>io.netty</groupId>

    <artifactId>netty-all</artifactId>

    <version>4.1.6.Final</version>

</dependency>

说明

Netty版本在不断升级,但是4.0以上的版本使用比较广泛。Netty曾经升级到10.0,不过出现了一些问题,版本又回退了。另外,很多的大数据开源框架使用的还是3.0版本的Netty。从学习的角度来看,关键是学习其核心原理和编程技巧。理解原理之后,在实际开发过程中,根据具体的版本,看看其源码或者API手册即可。

准备好项目工程之后,就可以正式开始编写第一个Netty程序了。

10.1.2 第一个Netty服务端程序

创建一个服务端类NettyDiscardServer,用以实现消息的Discard(丢弃)功能,源代码如下:

package cn.edu.bbc.computer;

public class NettyDiscardServer {
    private final int serverPort;
    ServerBootstrap b = new ServerBootstrap();

    public NettyDiscardServer(int port) {
        this.serverPort = port;
    }

    public void start() {
        //创建reactor 线程组
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try {
            //1 设置reactor 线程组
            b.group(bossLoopGroup, workerLoopGroup);
            //2 设置nio类型的channel
            b.channel(NioServerSocketChannel.class);
            //3 设置监听端口
            b.localAddress(serverPort);
            //4 设置通道的参数
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 装配子通道流水线
            b.childHandler(new ChannelInitializer<SocketChannel>() {
                //有连接到达时会创建一个channel
                protected void initChannel(SocketChannel ch) throws Exception {
                    // pipeline管理子通道channel中的Handler
                    // 向子channel流水线添加一个handler处理器
                    ch.pipeline().addLast(new NettyDiscardHandler());
                }
            });
            // 6 开始绑定server
            // 通过调用sync同步方法阻塞直到绑定成功
            ChannelFuture channelFuture = b.bind().sync();
            Logger.info(" 服务器启动成功,监听端口: " +
                    channelFuture.channel().localAddress());

            // 7 等待通道关闭的异步任务结束
            // 服务监听通道会一直等待通道关闭的异步任务结束
            ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            closeFuture.sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 8 优雅关闭EventLoopGroup
            // 释放掉所有资源包括创建的线程
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws InterruptedException {
        int port = NettyDemoConfig.SOCKET_SERVER_PORT;
        new NettyDiscardServer(port).start();
    }
}

如果是第一次看Netty应用程序的代码,那么上面的代码应用是晦涩难懂的,因为代码中涉及很多Netty专用组件。不过不要紧,因为Netty是基于Reactor模式实现的。通过前面的章节学习,大家已经非常深入地了解了Reactor模式,所以现在只需要顺藤摸瓜理清楚NettyReactor模式对应的组件,Netty的核心组件结构就相对简单了。

首先要说的是Reactor模式中的Reactor组件。前面讲到,反应器组件的作用是进行IO事件的查询和分发。Netty中对应的反应器组件有多种,不同应用通信场景用到的反应器组件各不相同。一般来说,对应于多线程的Java NIO通信的应用场景,Netty对应的反应器组件为NioEventLoopGroup

在上面的例子中,使用了两个NioEventLoopGroup反应器组件实例:第一个负责服务器通道新连接的IO事件的监听,可以形象地理解为包工头角色;第二个主要负责传输通道的IO事件的处理和数据传输,可以形象地理解为工人角色。

其次要说的是Reactor模式中的Handler(处理器)角色组件。Handler的作用是对应到IO事件,完成IO事件的业务处理。Handler需要为业务做专门开发,下一小节将对上面的NettyDiscardHandler自定义处理器进行介绍。

再次,在上面的例子中还用到了Netty的服务引导类ServerBootstrap。服务引导类是一个组装和集成器,职责是将不同的Netty组件组装在一起。此外,ServerBootstrap能够按照应用场景的需要为组件设置好基础性的参数,最后帮助快速实现Netty服务器的监听和启动。服务引导类ServerBootstrap也是本章重点之一,后面将对其进行详细的介绍。

10.1.3 业务处理器NettyDiscardHandler

Reactor模式中,所有的业务处理都在Handler中完成,业务处理一般需要自己编写,这里编写一个新类:NettyDiscardHandler。这里的业务处理很简单:把收到的任何内容直接丢弃,也不会回复任何消息。

NettyDiscardHandler的代码如下:

package cn.edu.bbc.computer;

//

NettyDiscardHandler extends ChannelInboundHandlerAdapter {

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) {

        ByteBuf in = (ByteBuf) msg;

        try {

            Logger.info("收到消息,丢弃如下:");

            while (in.isReadable()) {

                System.out.print((char) in.readByte());

            }

            System.out.println();//换行

        } finally {

            ReferenceCountUtil.release(msg);

        }

    }

}

NettyHandler需要处理多种IO事件(如读就绪、写就绪),对应于不同的IO事件,Netty提供了一些基础方法。这些方法都已经提前封装好,应用程序直接继承或者实现即可。比如说,对于处理入站的IO事件,其对应的接口为ChannelInboundHandler,并且Netty提供了ChannelInboundHandlerAdapter适配器作为入站处理器的默认实现。

说明

这里将引入一组新的概念:入站和出站。简单理解,入站指的是输入,出站指的是输出。后面也会有详细介绍。Netty中的出/入站与Java NIO中的出/入站有些微妙的不同,Netty的出站可以理解为从Handler传递到Channel的操作,比如说write写通道、read读通道数据;Netty的入站可以理解为从Channel传递到Handler的操作,比如说Channel数据过来之后,会触发HandlerchannelRead()入站处理方法。

如果要实现自己的入站处理器,可以简单地继承ChannelInboundHandlerAdapter入站处理器适配器,再写入自己的入站处理的业务逻辑。也就是说,重写通道读取方法channelRead()即可。

在上面例子中的channelRead()方法将Netty缓冲区ByteBuf的输入数据打印到服务端控制台后,直接丢弃不管了,而且不给客户端任何回复。

NettyByteBuf缓冲区组件(后面会单独对其进行详细的介绍)可以对应到前面介绍的Java NIO类库的数据缓冲区Buffer组件。只不过相对而言,NettyByteBuf缓冲区性能更好,使用也更加方便。

10.1.4 运行NettyDiscardServer

在上面的例子中出现了Netty中的各种组件:服务器引导类、缓冲区、反应器、业务处理器、Future异步回调、数据传输通道等。这些Netty组件都是需要掌握的,也都是我们在后面需要进行专项学习的。

说明

Future异步回调或者同步阻塞是高并发开发频繁使用到的技术,所以有关Future异步回调或者同步阻塞的原理和知识是非常重要的,具体请参阅《Java高并发核心编程 卷2:多线程、锁、JMMJUC、高并发设计模式》的相关内容。

如果看不懂以上NettyDiscardServer程序,没有关系。此程序的目的只是为大家展示一下Netty开发中会涉及什么内容,给大家留一个初步的印象。接下来,大家可以启动NettyDiscardServer服务器来体验一下Netty程序的运行。

在源代码工程中找到消息丢弃服务器类NettyDiscardServer,启动它的main()方法,就启动了这个服务器应用。

如果想看到最终的消息丢弃执行效果,不能仅仅启动服务器,还需要启动客户端,需要从客户端向服务器发送消息。这里的客户端只要能通过TCP与服务器建立Socket连接即可,不一定是使用Netty编写的客户端程序,可以是Java OIO或者NIO客户端。因此,直接使用前面章节中的EchoClient程序作为客户端程序即可,因为所使用的TCP通信端口是一致的。

在源代码工程中,我们可以找到发送消息到服务器的客户端类:EchoClient。通过启动它的main()方法,就可以启动这个客户端程序。然后在客户端的标准化输入窗口不断输入要发送的消息,发送到服务器即可,在服务端可以看到所打印的丢弃了的消息。

虽然EchoClient客户端是使用Java NIO编写的,而NettyDiscardServer服务端是使用Netty编写的,但是不影响它们之间的相互通信。不仅仅是因为底层Netty框架也是使用Java NIO开发的,更加核心的原因是都使用了TCP通信协议。

10.2 解密Netty中的Reactor模式

在前面的章节中,已经反复说明:设计模式是Java代码或者程序的重要组织方式,如果不了解设计模式,学习和阅读Java程序代码往往找不到头绪,上下求索而不得其法。故而,在学习Netty组件之前,我们必须了解Netty中的Reactor模式是如何实现的。

这里,先回顾一下Java NIOIO事件的处理流程和Reactor模式的基础内容。

10.2.1 回顾Reactor模式中IO事件的处理流程

一个IO事件从操作系统底层产生后,在Reactor模式中的处理流程如图10-1所示。

10-1 Java Reactor模式中IO事件的处理流程

Reactor模式中IO事件的处理流程大致分为4步,具体如下:

1步:通道注册。IO事件源于通道(Channel),IO是和通道(对应于底层连接而言)强相关的。一个IO事件一定属于某个通道。如果要查询通道的事件,首先就要将通道注册到选择器。

2步:查询事件。在Reactor模式中,一个线程会负责一个反应器(或者SubReactor子反应器),不断地轮询,查询选择器中的IO事件(选择键)。

3步:事件分发。如果查询到IO事件,则分发给与IO事件有绑定关系的Handler业务处理器。

4步:完成真正的IO操作和业务处理,这一步由Handler业务处理器负责。

以上4步就是整个Reactor模式的IO处理器流程。其中,第1步和第2步其实是Java NIO的功能,Reactor模式仅仅是利用了Java NIO的优势而已。

说明

Reactor模式的IO事件处理流程比较重要,是学习Netty的基础性和铺垫性知识。如果这里看不懂,就先回到前面有关Reactor模式详细介绍的部分内容,回头再学习一下Reactor模式原理。

10.2.2 Netty中的Channel

Channel组件是Netty中非常重要的组件,为什么首先要说的是Channel组件呢?原因是:Reactor模式和通道紧密相关,反应器的查询和分发的IO事件都来自Channel组件。

Netty中不直接使用Java NIOChannel组件,对Channel组件进行了自己的封装。Netty实现了一系列的Channel组件,为了支持多种通信协议,换句话说,对于每一种通信连接协议,Netty都实现了自己的通道。除了JavaNIONetty还提供了Java面向流的OIO处理通道。

总结起来,对应到不同的协议,Netty实现了对应的通道,每一种协议基本上都有NIOOIO两个版本。

对应于不同的协议,Netty中常见的通道类型如下:

NioSocketChannel:异步非阻塞TCP Socket传输通道。

NioServerSocketChannel:异步非阻塞TCP Socket服务端监听通道。

NioDatagramChannel:异步非阻塞的UDP传输通道。

NioSctpChannel:异步非阻塞Sctp传输通道。

NioSctpServerChannel:异步非阻塞Sctp服务端监听通道。

OioSocketChannel:同步阻塞式TCP Socket传输通道。

OioServerSocketChannel:同步阻塞式TCP Socket服务端监听通道。

OioDatagramChannel:同步阻塞式UDP传输通道。

OioSctpChannel:同步阻塞式Sctp传输通道。

OioSctpServerChannel:同步阻塞式Sctp服务端监听通道。

一般来说,服务端编程用到最多的通信协议还是TCP,对应的Netty传输通道类型为NioSocketChannel类、Netty服务器监听通道类型为NioServerSocketChannel。不论是哪种通道类型,在主要的API和使用方式上和NioSocketChannel类基本都是相同的,更多是底层的传输协议不同,而Netty帮大家极大地屏蔽了传输差异。如果没有特殊情况,本书的很多案例都将以NioSocketChannel通道为主。

NettyNioSocketChannel内部封装了一个Java NIOSelectableChannel成员,通过对该内部的Java NIO通道的封装,对NettyNioSocketChannel通道上的所有IO操作最终都会落地到Java NIOSelectableChannel底层通道。NioSocketChannel的继承关系图如图10-2所示。

10-2 NioSocketChannel的继承关系图

10.2.3 Netty中的Reactor

Reactor模式中,一个反应器(或者SubReactor子反应器)会由一个事件处理线程负责事件查询和分发。该线程不断进行轮询,通过Selector选择器不断查询注册过的IO事件(选择键)。如果查询到IO事件,就分发给Handler业务处理器。

首先为大家介绍一下Netty中的反应器组件。Netty中的反应器组件有多个实现类,这些实现类与其通道类型相互匹配。对应于NioSocketChannel通道,Netty的反应器类为NioEventLoopNIO事件轮询)。

NioEventLoop类有两个重要的成员属性:一个是Thread线程类的成员,一个是Java NIO选择器的成员属性。NioEventLoop的继承关系和主要成员属性如图10-3所示。

10-3 NioEventLoop的继承关系和主要成员属性

通过这个关系图可以看出:NioEventLoop和前面章节讲的反应器实现在思路上是一致的:一个NioEventLoop拥有一个线程,负责一个Java NIO选择器的IO事件轮询。

Netty中,EventLoop反应器和Channel的关系是什么呢?理论上来说,一个EventLoop反应器和NettyChannel通道是一对多的关系:一个反应器可以注册成千上万的通道,如图10-4所示。

10-4 EventLoop反应器和Channel的关系

10.2.4 Netty中的Handler

在前面的章节介绍Java NIOIO事件类型时讲到,可供选择器监控的通道IO事件类型包括以下4种:

  • 可读:SelectionKey.OP_READ
  • 可写:SelectionKey.OP_WRITE
  • 连接:SelectionKey.OP_CONNECT
  • 接受:SelectionKey.OP_ACCEPT

Netty中,EventLoop反应器内部有一个线程负责Java NIO选择器的事件的轮询,然后进行对应的事件分发。事件分发(Dispatch)的目标就是NettyHandler(含用户定义的业务处理器)。

NettyHandler分为两大类:第一类是ChannelInboundHandler入站处理器;第二类是ChannelOutboundHandler出站处理器,二者都继承了ChannelHandler处理器接口。有关Handler的接口与继承关系如图10-5所示。

10-5 Netty中的Handler的接口与继承关系

Netty入站处理的流程是什么呢?以底层的Java NIO中的OP_READ输入事件为例:在通道中发生了OP_READ事件后,会被EventLoop查询到,然后分发给ChannelInboundHandler入站处理器,调用对应的入站处理的read()方法。在ChannelInboundHandler入站处理器内部的read()方法具体实现中,可以从通道中读取数据。

Netty中的入站处理触发的方向为从通道触发,ChannelInboundHandler入站处理器负责接收(或者执行)。Netty中的入站处理不仅仅是OP_READ输入事件的处理,还包括从通道底层触发,由Netty通过层层传递,调用ChannelInboundHandler入站处理器进行的其他某个处理。

Netty中的出站处理指的是从ChannelOutboundHandler出站处理器到通道的某次IO操作。例如,在应用程序完成业务处理后,可以通过ChannelOutboundHandler出站处理器将处理的结果写入底层通道。最常用的一个方法就是write()方法,即把数据写入通道。

Netty中的出站处理不仅仅包括Java NIOOP_WRITE可写事件,还包括Netty自身从处理器到通道方向的其他操作。OP_WRITE可写事件是Java NIO的概念,和Netty的出站处理在概念上不是一个维度,Netty的出站处理是应用层维度的。

无论是入站还是出站,Netty都提供了各自的默认适配器实现:ChannelInboundHandler的默认实现为ChannelInboundHandlerAdapter(入站处理适配器)。ChannelOutboundHandler的默认实现为ChannelOutBoundHandlerAdapter(出站处理适配器)。这两个默认的通道处理适配器分别实现了基本的入站操作和出站操作功能。如果要实现自己的业务处理器,不需要从零开始去实现处理器的接口,只需要继承通道处理适配器即可。

10.2.5 Netty中的Pipeline

在介绍NettyPipeline事件处理流水线之前,先梳理一下NettyReactor模式实现中各个组件之间的关系:

1)反应器(或者SubReactor子反应器)和通道之间是一对多的关系:一个反应器可以查询很多个通道的IO事件。

2)通道和Handler处理器实例之间是多对多的关系:一个通道的IO事件可以被多个Handler实例处理;一个Handler处理器实例也能绑定到很多通道,处理多个通道的IO事件。

问题是:通道和Handler处理器实例之间的绑定关系,Netty是如何组织的呢?

Netty设计了一个特殊的组件,叫作ChannelPipeline(通道流水线)。它像一条管道,将绑定到一个通道的多个Handler处理器实例串联在一起,形成一条流水线。ChannelPipeline的默认实现实际上被设计成一个双向链表。所有的Handler处理器实例被包装成双向链表的节点,被加入到ChannelPipeline中。

说明

一个Netty通道拥有一个ChannelPipeline类型的成员属性,该属性的名称叫作pipeline

以入站处理为例,每一个来自通道的IO事件都会进入一次ChannelPipeline。在进入第一个Handler处理器后,这个IO事件将按照既定的从前往后次序,在流水线上不断地向后流动,流向下一个Handler处理器。

在向后流动的过程中,会出现3种情况:

1)如果后面还有其他Handler入站处理器,那么IO事件可以交给下一个Handler处理器向后流动。

2)如果后面没有其他的入站处理器,就意味着这个IO事件在此次流水线中的处理结束了。

3)如果在中间需要终止流动,可以选择不将IO事件交给下一个Handler处理器,流水线的执行也被终止了。

Netty的通道流水线与普通的流水线不同,Netty的流水线不是单向的,而是双向的,而普通的流水线基本都是单向的。Netty是这样规定的:入站处理器的执行次序是从前到后,出站处理器的执行次序是从后到前。总之,IO事件在流水线上的执行次序与IO事件的类型是有关系的,如图10-6所示。

10-6 流水线上入站处理器和出站处理器的执行次序

除了流动的方向与IO操作类型有关之外,流动过程中所经过的处理器类型也是与IO操作的类型有关的。入站的IO操作只能从Inbound入站处理器类型的Handler流过;出站的IO操作只能从Outbound出站处理器类型的Handler流过。

至此,在了解完流水线之后,大家应该对Netty中的通道、EventLoop反应器、处理器,以及三者之间的协作关系,有了一个清晰的认知和了解,基本可以动手开发简单的Netty程序了。为了方便开发者,Netty提供了一系列辅助类,用于把上面的三个组件快速组装起来完成一个Netty应用,这个系列的类叫作引导类。服务端的引导类叫作ServerBootstrap类,客户端的引导类叫作Bootstrap类。

接下来,为大家详细介绍一下这些能提升开发效率的Bootstrap

10.3 详解Bootstrap

Bootstrap类是Netty提供的一个便利的工厂类,可以通过它来完成Netty的客户端或服务端的Netty组件的组装,以及Netty程序的初始化和启动执行。Netty的官方解释是,完全可以不用这个Bootstrap类,可以一点点去手动创建通道、完成各种设置和启动注册到EventLoop反应器,然后开始事件的轮询和处理,但是这个过程会非常麻烦。通常情况下,使用这个便利的Bootstrap工具类的效率会更高。

Netty中有两个引导类,分别用于服务器和客户端,如图10-7所示。

10-7 Netty中的两个引导类

这两个引导类仅是使用的地方不同,它们大致的配置和使用方法都是相同的。下面以ServerBootstrap类作为重点介绍对象。

在介绍ServerBootstrap的服务器启动流程之前,首先介绍一下涉及的两个基础概念:父子通道、EventLoopGroup(事件轮询线程组)。

10.3.1 父子通道

Netty中,每一个NioSocketChannel通道所封装的都是Java NIO通道,再往下就对应到了操作系统底层的socket文件描述符。理论上来说,操作系统底层的socket文件描述符分为两类:

  1. 连接监听类型。连接监听类型的socket描述符处于服务端,负责接收客户端的套接字连接;在服务端,一个“连接监听类型”的socket描述符可以接受(Accept)成千上万的传输类的socket文件描述符。
  2. 数据传输类型。数据传输类的socket描述符负责传输数据。同一条TCPSocket传输链路在服务器和客户端都分别会有一个与之相对应的数据传输类型的socket文件描述符。

Netty中,异步非阻塞的服务端监听通道NioServerSocketChannel所封装的Linux底层的文件描述符是连接监听类型socket描述符;异步非阻塞的传输通道NioSocketChannel所封装的Linux的文件描述符是数据传输类型socket描述符。

Netty中,将有接收关系的监听通道和传输通道叫作父子通道。其中,负责服务器连接监听和接收的监听通道(如NioServerSocketChannel)也叫父通道(Parent Channel),对应于每一个接收到的传输类通道(如NioSocketChannel)也叫子通道(Child Channel)。

10.3.2 EventLoopGroup

在前面介绍Reactor模式的具体实现时,分为单线程实现版本和多线程实现版本。Netty中的Reactor模式实现不是单线程版本的,而是多线程版本的。

实际上,在Netty中一个EventLoop相当于一个子反应器(SubReactor),一个NioEventLoop子反应器拥有了一个事件轮询线程,同时拥有一个Java NIO选择器。

Netty是如何完成多线程版本的Reactor模式实现的呢?答案是使用EventLoopGroup(事件轮询组)。多个EventLoop线程放在一起,可以组成一个EventLoopGroup。反过来说,EventLoopGroup就是一个多线程版本的反应器,其中的单个EventLoop线程对应于一个子反应器(SubReactor)。

Netty的程序开发不会直接使用单个EventLoop(事件轮询器),而是使用EventLoopGroupEventLoopGroup的构造函数有一个参数,用于指定内部的线程数。在构造器初始化时,会按照传入的线程数量在内部构造多个线程和多个EventLoop子反应器(一个线程对应一个EventLoop子反应器),进行多线程的IO事件查询和分发。

如果使用EventLoopGroup的无参数构造函数,没有传入线程数量或者传入的数量为0,那么EventLoopGroup内部的线程数量到底是多少呢?默认的EventLoopGroup内部线程数量为最大可用的CPU处理器数量的2倍。假设电脑使用的是4核的CPU,那么在内部会启动8EventLoop线程,相当于8个子反应器实例。

从前文可知,为了及时接收新连接,在服务端,一般有两个独立的反应器,一个负责新连接的监听和接收,另一个负责IO事件轮询和分发,并且两个反应器相互隔离。对应到Netty服务器程序中,则需要设置两个EventLoopGroup,一个组负责新连接的监听和接受,另外一个组负责IO传输事件的轮询与分发,两个轮询组的职责具体如下:

1)负责新连接的监听和接收的EventLoopGroup中的反应器完成查询通道的新连接IO事件查询。这些反应器有点像负责招工的包工头,因此,该轮询组可以形象地称为包工头Boss)轮询组。

2)负责IO事件轮询和分发的反应器完成查询所有子通道的IO事件,并且执行对应的Handler处理器完成IO处理——例如数据的输入和输出(有点儿像搬砖),这个轮询组可以形象地称为工人Worker)轮询组。

NettyEventLoopGroupEventLoop之间、EventLoopChannel之间的关系如图10-8所示。

10-8 Netty中的Reactor模式示意图

至此,介绍完了两个重要的基础概念:父子通道与EventLoopGroup。有了这些基础知识作为铺垫,接下来可以正式介绍ServerBootstrap的启动流程了。

10.3.3 Bootstrap启动流程

Bootstrap的启动流程也就是Netty组件的组装、配置,以及Netty服务器或者客户端的启动流程。在本节中对启动流程进行了梳理,大致分成8个步骤。本书仅仅演示的是服务端引导类的使用,用到的引导类为ServerBootstrap。正式使用前,首先创建一个服务端的引导类实例。

//创建一个服务端的引导类

ServerBootstrap b = new ServerBootstrap();

接下来,结合前面的NettyDiscardServer服务器的程序代码,给大家详细介绍一下Bootstrap启动流程中精彩的8个步骤。

1步:创建反应器轮询组,并设置到ServerBootstrap引导类实例,大致的代码如下:

//创建反应器轮询组

//boss轮询组

EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);

//worker轮询组

EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

//

//step1:为引导类实例设置反应器轮询组

b.group(bossLoopGroup, workerLoopGroup);

在设置反应器轮询组之前,创建了两个NioEventLoopGroup,一个负责处理连接监听IO事件,名为bossLoopGroup;另一个负责数据传输事件和处理,名为workerLoopGroup。在两个轮询组创建完成后,就可以配置给引导类实例,它一次性地给引导类配置了两大轮询组。

如果不需要分开监听新连接事件和输出事件,就不一定非得配置两个轮询组,可以仅配置一个EventLoopGroup反应器轮询组。具体的配置方法是调用b.group(workerGroup)。在这种模式下,新连接监听IO事件和数据传输IO事件可能被挤在了同一个线程中处理。这样会带来一个风险:新连接的接收被更加耗时的数据传输或者业务处理所阻塞。所以,在服务端,建议设置成两个轮询组的工作模式。

2步:设置通道的IO类型。Netty不止支持Java NIO,也支持阻塞式的OIO。下面配置的是Java NIO类型的通道类型:

//step2:设置传输通道的类型为NIO类型

b.channel(NioServerSocketChannel.class);

如果确实指定BootstrapIO模型为BIO类型,可以配置为OioServerSocketChannel.class类。NIO的优势巨大,因此通常不会在Netty中使用BIO

3步:设置监听端口,代码大致如下:

//step3:设置监听端口

b.localAddress(new InetSocketAddress(port));

这是最为简单的一步操作,主要是设置服务器的监听地址。

4步:设置传输通道的配置选项,代码大致如下:

//step4:设置通道的参数

b.option(ChannelOption.SO_KEEPALIVE, true);

b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

这里调用了Bootstrapoption()选项设置方法。对于服务器的Bootstrap而言,这个方法的作用是:给父通道(Parent Channel)设置一些与传输协议相关的选项。如果要给子通道(Child Channel)设置一些通道选项,则需要调用childOption()设置方法。

可以设置哪些通道选项(ChannelOption)呢?在上面的代码中,设置了一个底层TCP相关的选项ChannelOption.SO_KEEPALIVE。该选项表示是否开启TCP底层心跳机制,true为开启,false为关闭。其他的通道设置选项,参见下一小节。

5步:装配子通道的Pipeline。每一个通道都用一条ChannelPipeline流水线,它的内部有一个双向的链表。装配流水线的方式是:将业务处理器ChannelHandler实例包装之后加入双向链表中。

如何装配Pipeline流水线呢?装配子通道的Handler流水线调用引导类的childHandler()方法,该方法需要传入一个ChannelInitializer通道初始化类的实例作为参数。每当父通道成功接收到一个连接并创建成功一个子通道后,就会初始化子通道,此时这里配置的ChannelInitializer实例就会被调用。

ChannelInitializer通道初始化类的实例中,有一个initChannel初始化方法,在子通道创建后会被执行,向子通道流水线增加业务处理器。

装配子通道的Pipeline流水线的大致代码如下:

//step5:装配子通道流水线

b.childHandler(new ChannelInitializer<SocketChannel>() {

    //有连接到达时会创建一个通道的子通道,并初始化

    protected void initChannel(SocketChannel ch) {

         //这里可以管理子通道中的Handler业务处理器

         //向子通道流水线添加一个Handler业务处理器

         ch.pipeline().addLast(new NettyDiscardHandler());

    }

});

为什么仅装配子通道的流水线,而不需要装配父通道的流水线呢?原因是:父通道(NioServerSocketChannel)的内部业务处理是固定的:接收新连接后,创建子通道,然后初始化子通道,所以不需要特别的配置,由Netty自行进行装配。如果需要完成特殊的父通道业务处理,可以类似地调用ServerBootstraphandler(ChannelHandler handler)方法,为父通道设置ChannelInitializer初始化器。

在装配流水线时需要注意的是,ChannelInitializer处理器有一个泛型参数SocketChannel,它代表需要初始化的通道类型,这个类型需要和前面的引导类中设置的传输通道类型一一对应起来。

6步:开始绑定服务器新连接的监听端口,代码大致如下:

//step6:开始绑定端口,通过调用sync()同步方法阻塞直到绑定成功

ChannelFuture channelFuture = b.bind().sync();

Logger.info(" 服务器启动成功,监听端口: " +

channelFuture.channel().localAddress());

这个也很简单。b.bind()方法的功能是返回一个端口绑定Netty的异步任务channelFuture。在这里,并没有给channelFuture异步任务增加回调监听器,而是阻塞channelFuture异步任务,直到端口绑定任务执行完成。

Netty中,所有的IO操作都是异步执行的,这就意味着任何一个IO操作都会立即返回,返回时异步任务还没有真正执行。什么时候执行完成呢?Netty中的IO操作都会返回异步任务实例(如channelFuture实例)。通过该异步任务实例,既可以实现同步阻塞一直到channelFuture异步任务执行完成,也可以通过为其增加事件监听器的方式注册异步回调逻辑,以获得Netty中的IO操作的真正结果。上面所使用的是同步阻塞一直到channelFuture异步任务执行完成的处理方式。

至此,服务器正式启动。

说明

Future异步回调或者同步阻塞,涉及高并发的核心模式——异步回调模式,是高并发开发非常重要的基础性知识,具体请参阅《Java高并发核心编程 卷2:多线程、锁、JMMJUC、高并发设计模式》的相关内容。

7步:自我阻塞,直到监听通道关闭,代码大致如下:

//step7:自我阻塞,直到通道关闭的异步任务结束

ChannelFuture closeFuture = channelFuture.channel().closeFuture();

closeFuture.sync();

如果要阻塞当前线程直到通道关闭,可以调用通道的closeFuture()方法,以获取通道关闭的异步任务。当通道被关闭时,closeFuture实例的sync()方法会返回。

8步:关闭EventLoopGroup,代码大致如下:

//step8:释放掉所有资源,包括创建的反应器线程

workerLoopGroup.shutdownGracefully();

bossLoopGroup.shutdownGracefully();

关闭反应器轮询组,同时会关闭内部的子反应器线程,也会关闭内部的选择器、内部的轮询线程以及负责查询的所有子通道。在子通道关闭后,会释放掉底层的资源,如Socket文件描述符等。

10.3.4 ChannelOption

无论是对于NioServerSocketChannel父通道类型还是对于NioSocketChannel子通道类型,都可以设置一系列的ChannelOption(通道选项)。ChannelOption类中定义了一系列选项,下面介绍一些常见的选项。

1. SO_RCVBUFSO_SNDBUF

这两个为TCP传输选项,每个TCP socket(套接字)在内核中都有一个发送缓冲区和一个接收缓冲区,这两个选项就是用来设置TCP连接的两个缓冲区大小的。TCP的全双工工作模式以及TCP的滑动窗口对两个独立的缓冲区都有依赖。

2. TCP_NODELAY

此为TCP传输选项,如果设置为true就表示立即发送数据。TCP_NODELAY用于开启或关闭Nagle算法。如果要求高实时性,有数据发送时就马上发送,就将该选项设置为true(关闭Nagle算法);如果要减少发送次数、减少网络交互,就设置为false(开启Nagle算法),等累积一定大小的数据后再发送。关于TCP_NODELAY的值,Netty默认为true,而操作系统默认为false

Nagle算法将小的碎片数据连接成更大的报文(或数据包)来最小化所发送报文的数量,如果需要发送一些较小的报文,则需要禁用该算法。

Netty默认禁用Nagle算法,报文会立即发送出去,从而最小化报文传输的延时。

说明

TCP_NODELAY的值设置为true表示关闭延迟,设置为false表示开启延迟。其值与是否开启Nagle算法是相反的。

3. SO_KEEPALIVE

此为TCP传输选项,表示是否开启TCP的心跳机制。true为连接保持心跳,默认值为false。启用该功能时,TCP会主动探测空闲连接的有效性。需要注意的是:默认的心跳间隔是7200秒,即2小时。Netty默认关闭该功能。

4. SO_REUSEADDR

此为TCP传输选项,为true时表示地址复用,默认值为false。有四种情况需要用到这个参数设置:

  1. 当有一个地址和端口相同的连接socket1处于TIME_WAIT状态时,而又希望启动一个新的连接socket2要占用该地址和端口。
  2. 有多块网卡或用IP Alias技术的机器在同一端口启动多个进程,但每个进程绑定的本地IP地址不能相同。
  3. 同一进程绑定相同的端口到多个socket(套接字)上,但每个socket绑定的IP地址不同。
  4. 完全相同的地址和端口的重复绑定,但这只用于UDP的多播,不用于TCP

说明

Socket连接状态(如TIME_WAIT)和连接建立时三次握手以及断开时四次挥手有关,请参阅本书后面有关TCP协议原理的内容。

10. SO_LINGER

此为TCP传输选项,可以用来控制socket.close()方法被调用后的行为,包括延迟关闭时间。如果此选项设置为-1,就表示socket.close()方法在调用后立即返回,但操作系统底层会将发送缓冲区的数据全部发送到对端;如果此选项设置为0,就表示socket.close()方法在调用后会立即返回,但是操作系统会放弃发送缓冲区数据,直接向对端发送RST包,对端将收到复位错误;如果此选项设置为非0整数值,就表示调用socket.close()方法的线程被阻塞,直到延迟时间到来,发送缓冲区中的数据发送完毕,若超时,则对端会收到复位错误。

SO_LINGER的默认值为-1,表示禁用该功能。

6. SO_BACKLOG

此为TCP传输选项,表示服务端接收连接的队列长度,如果队列已满,客户端连接将被拒绝。服务端在处理客户端新连接请求时(三次握手)是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端到来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,队列的大小通过SO_BACKLOG指定。

具体来说,服务端对完成第二次握手的连接放在一个队列(暂时称A队列),如果进一步完成第三次握手,再把连接从A队列移动到新队列(暂时称B队列),接下来应用程序会通过调用accept()方法取出握手成功的连接,而系统则会将该连接从B队列移除。AB队列的长度之和是SO_BACKLOG指定的值,当AB队列的长度之和大于SO_BACKLOG值时,新连接将会被TCP内核拒绝。所以,如果SO_BACKLOG过小,accept速度可能会跟不上,AB队列全满,导致新客户端无法连接。

说明

SO_BACKLOG对程序支持的连接数并无影响,影响的只是还没有被accept取出的连接数,也就是三次握手的排队连接数。

如果连接建立频繁,服务器处理新连接较慢,那么可以适当调大这个参数。

7. SO_BROADCAST

此为TCP传输选项,表示设置为广播模式。

10.4 详解Channel

本节首先为大家介绍一下Channel(通道)的主要成员和方法,然后为大家介绍一下Netty所提供的一个专门的单元测试通道——EmbeddedChannel(嵌入式通道)。

10.4.1 Channel的主要成员和方法

通道是Netty的核心概念之一,代表网络连接,由它负责同对端进行网络通信,既可以写入数据到对端,也可以从对端读取数据。

Netty通道的抽象类AbstractChannel的构造函数如下:

protected AbstractChannel(Channel parent) {

        this.parent = parent; //父通道

        id = newId();

        unsafe = newUnsafe(); //新建一个底层的NIO 通道,完成实际的IO操作

        pipeline = newChannelPipeline(); //新建一条通道流水线

}

AbstractChannel内部有一个pipeline属性,表示处理器的流水线。Netty在对通道进行初始化的时候,将pipeline属性初始化为DefaultChannelPipeline的实例。以上代码表明每个通道都拥有一条ChannelPipeline处理器流水线。

AbstractChannel内部有一个parent父通道属性,保持通道的父通道。对于连接监听通道(如NioServerSocketChannel)来说,其parent属性的值为null;对于传输通道(如NioSocketChannel)来说,其parent属性的值为接收到该连接的监听通道。

几乎所有的Netty通道实现类都继承了AbstractChannel抽象类,都拥有上面的parentpipeline两个属性成员。

接下来,介绍一下通道接口中所定义的几个重要方法。

1ChannelFuture connect(SocketAddress address)

此方法的作用为连接远程服务器。方法的参数为远程服务器的地址,调用后会立即返回,其返回值为执行连接操作的异步任务ChannelFuture。此方法在客户端的传输通道使用。

2ChannelFuture bind(SocketAddress address)

此方法的作用为绑定监听地址,开始监听新的客户端连接。此方法在服务器的新连接监听和接收通道时调用。

3ChannelFuture close()

此方法的作用为关闭通道连接,返回连接关闭的ChannelFuture异步任务。如果需要在连接正式关闭后执行其他操作,则需要为异步任务设置回调方法;或者调用ChannelFuture异步任务的sync()方法来阻塞当前线程,一直等到通道关闭的异步任务执行完毕。

4Channel read()

此方法的作用为读取通道数据,并且启动入站处理。具体来说,从内部的Java NIO Channel通道读取数据,然后启动内部的Pipeline流水线,开启数据读取的入站处理。此方法的返回通道自身用于链式调用。

5ChannelFuture writeObject o

此方法的作用为启程出站流水处理,把处理后的最终数据写到底层通道(如Java NIO通道)。此方法的返回值为出站处理的异步处理任务。

6Channel flush()

此方法的作用为将缓冲区中的数据立即写出到对端。调用前面的write()出站处理时,并不能将数据直接写出到对端,write操作的作用在大部分情况下仅仅是写入操作系统的缓冲区,操作系统会根据缓冲区的情况决定什么时候把数据写到对端。执行flush()方法会立即将缓冲区的数据写到对端。

上面的6种方法仅仅是比较常见的通道方法。在Channel接口中以及各种通道的实现类中还定义了大量的通道操作方法。在一般的日常开发中,如果需要用到,请直接查阅Netty API文档或者Netty源代码。

10.4.2 EmbeddedChannel

Netty的实际开发中,底层通信传输的基础工作Netty已经替大家完成。实际上,更多的工作是设计和开发ChannelHandler业务处理器。处理器开发完成后,需要投入单元测试。一般单元测试的大致流程是:先将Handler业务处理器加入到通道的Pipeline流水线中,接下来先后启动Netty服务器、客户端程序,相互发送消息,测试业务处理器的效果。这些复杂的工序存在一个问题:如果每开发一个业务处理器都进行服务器和客户端的重复启动,那么整个的过程是非常烦琐和浪费时间的。如何解决这种徒劳、低效的重复工作呢?Netty提供了一个专用通道,即EmbeddedChannel(嵌入式通道)。

EmbeddedChannel仅仅是模拟入站与出站的操作,底层不进行实际传输,不需要启动Netty服务器和客户端。除了不进行传输之外,EmbeddedChannel的其他事件机制和处理流程和真正的传输通道是一模一样的。因此,使用EmbeddedChannel,开发人员可以在单元测试用例中方便、快速地进行ChannelHandler业务处理器的单元测试。

为了模拟数据的发送和接收,EmbeddedChannel提供了一组专门的方法,具体如表10-1所示。

10-1 EmbeddedChannel单元测试的辅助方法

最为重要的两个方法为writeInbound()writeOutbound()方法。

1writeInbound()

它的使用场景是测试入站处理器。在测试入站处理器时(例如测试一个解码器),需要读取入站(Inbound)数据。可以调用writeInbound()方法,向EmbeddedChannel写入一个入站数据(如二进制ByteBuf数据包),模拟底层的入站包,从而被入站处理器处理到,达到测试的目的。

2writeOutbound()

它的使用场景是测试出站处理器。在测试出站处理器时(例如测试一个编码器),需要有出站(Outbound)的数据进入流水线。可以调用writeOutbound()方法,向模拟通道写入一个出站数据(如二进制ByteBuf数据包),该包将进入处理器流水线,被待测试的出站处理器所处理。

总之,EmbeddedChannel类既拥有通道的通用接口和方法,又增加了一些单元测试的辅助方法,在开发时是非常有用的。有关它的具体用法,后面还会结合其他Netty组件的实例反复提到。

10.5 详解Handler

Reactor经典模型中,反应器查询到IO事件后会分发到Handler业务处理器,由Handler完成IO操作和业务处理。

整个IO处理操作环节大致包括从通道读数据包、数据包解码、业务处理、目标数据编码、把数据包写到通道,然后由通道发送到对端,如图10-9所示。

10-9 整个的IO处理操作环节

整个的IO处理操作环节的前后两个环节(包括从通道读数据包和由通道发送到对端),由Netty的底层负责完成,不需要用户程序负责。

用户程序主要涉及的Handler环节为数据包解码、业务处理、目标数据编码、把数据包写到通道中。

前面已经介绍过,从应用程序开发人员的角度来看有入站和出站两种类型操作。

  1. 入站处理触发的方向为自底向上,从Netty的内部(如通道)到ChannelInboundHandler入站处理器。
  2. 出站处理触发的方向为自顶向下,从ChannelOutboundHandler出站处理器到Netty的内部(如通道)。

按照这种触发方向来区分,IO处理操作环节前面的数据包解码、业务处理两个环节属于入站处理器的工作;后面目标数据编码、把数据包写到通道中两个环节属于出站处理器的工作。

10.10.1 ChannelInboundHandler入站处理器

当对端数据入站到Netty通道时,Netty将触发ChannelInboundHandler入站处理器所对应的入站API,进行入站操作处理。ChannelInboundHandler的主要操作如图10-10所示。

10-10 ChannelInboundHandler的主要操作

对于ChannelInboundHandler的核心方法,大致介绍如下:

1. channelRegistered()

当通道注册完成后,Netty会调用fireChannelRegistered()方法,触发通道注册事件,而在通道流水线注册过的入站处理器的channelRegistered()回调方法会被调用。

2. channelActive()

当通道激活完成后,Netty会调用fireChannelActive()方法,触发通道激活事件,而在通道流水线注册过的入站处理器的channelActive()回调方法会被调用。

3. channelRead()

当通道缓冲区可读时,Netty会调用fireChannelRead()方法,触发通道可读事件,而在通道流水线注册过的入站处理器的channelRead()回调方法会被调用,以便完成入站数据的读取和处理。

4. channelReadComplete()

当通道缓冲区读完时,Netty会调用fireChannelReadComplete()方法,触发通道缓冲区读完事件,而在通道流水线注册过的入站处理器的channelReadComplete()回调方法会被调用。

10. channelInactive()

当连接被断开或者不可用时,Netty会调用fireChannelInactive()方法,触发连接不可用事件,而在通道流水线注册过的入站处理器的channelInactive()回调方法会被调用。

6. exceptionCaught()

当通道处理过程发生异常时,Netty会调用fireExceptionCaught()方法,触发异常捕获事件,而在通道流水线注册过的入站处理器的exceptionCaught()方法会被调用。注意,这个方法是在ChannelHandler中定义的方法,入站处理器、出站处理器接口都继承了该方法。

上面介绍的并不是ChannelInboundHandler的全部方法,仅仅介绍了其中几种比较重要的方法。在Netty中,入站处理器的默认实现为ChannelInboundHandlerAdapter,在实际开发中只需要继承ChannelInboundHandlerAdapter默认实现,重写自己需要的回调方法即可。

10.10.2 ChannelOutboundHandler出站处理器

当业务处理完成后,需要操作Java NIO底层通道时,通过一系列的ChannelOutboundHandler出站处理器完成Netty通道到底层通道的操作,比如建立底层连接、断开底层连接、写入底层Java NIO通道等。ChannelOutboundHandler接口定义了大部分的出站操作,如图10-11所示。

10-11 ChannelOutboundHandler的主要操作

再强调一下,Netty出站处理的方向是通过上层Netty通道去操作底层Java IO通道,主要出站(Outbound)的操作如下:

  1. bind()

监听地址(IP+端口)绑定:完成底层Java IO通道的IP地址绑定。如果使用TCP传输协议,这个方法用于服务端。

  1. connect()

连接服务端:完成底层Java IO通道的服务端的连接操作。如果使用TCP传输协议,那么这个方法将用于客户端。

  1. write()

写数据到底层:完成Netty通道向底层Java IO通道的数据写入操作。此方法仅仅是触发一下操作,并不是完成实际的数据写入操作。

  1. flush()

将底层缓存区的数据腾空,立即写出到对端。

  1. read ()

从底层读数据:完成Netty通道从Java IO通道的数据读取。

  1. disConnect()

断开服务器连接:断开底层Java IO通道的socket连接。如果使用TCP传输协议,此方法主要用于客户端。

  1. close()

主动关闭通道:关闭底层的通道,例如服务端的新连接监听通道。

上面介绍的并不是ChannelOutboundHandler的全部方法,仅仅介绍了其中几个比较重要的方法。在Netty中,它的默认实现为ChannelOutboundHandlerAdapter。在实际开发中,只需要继承ChannelOutboundHandlerAdapter默认实现,重写自己需要的方法即可。

10.10.3 ChannelInitializer通道初始化处理器

在前面已经讲到,ChannelHandler业务处理器的关系是:一条Netty的通道拥有一条Handler业务处理器流水线,负责装配自己的Handler业务处理器。装配Handler的工作发生在通道开始工作之前。现在的问题是:如果向流水线中装配业务处理器呢?这就得借助通道的初始化处理器——ChannelInitializer

首先回顾一下NettyDiscardServer丢弃服务端的代码,在给接收到的新连接装配Handler业务处理器时,调用childHandler()方法设置了一个ChannelInitializer实例:

//step5:装配子通道流水线

b.childHandler(new ChannelInitializer<SocketChannel>() {

    //有连接到达时会创建一个通道的子通道,并初始化

    protected void initChannel(SocketChannel ch) {

         //这里可以管理子通道中的Handler业务处理器

         //向子通道流水线添加一个Handler业务处理器

         ch.pipeline().addLast(new NettyDiscardHandler());

    }

});

上面的ChannelInitializer也是通道初始化器,属于入站处理器的类型。在示例代码中,使用了ChannelInitializerinitChannel()方法。initChannel()方法是ChannelInitializer定义的一个抽象方法,这个抽象方法需要开发人员自己实现。

在通道初始化时,会调用提前注册的初始化处理器的initChannel()方法。比如,在父通道接收到新连接并且要初始化其子通道时,会调用初始化器的initChannel()方法,并且会将新接收的通道作为参数,传递给此方法。

一般来说,initChannel()方法的大致业务代码是:拿到新连接通道作为实际参数,往它的流水线中装配Handler业务处理器。

10.10.4 ChannelInboundHandler的生命周期的实战案例

为了弄清Handler业务处理器的各个方法的执行顺序和生命周期,这里定义一个简单的入站Handler处理器——InHandlerDemo。这个类继承于ChannelInboundHandlerAdapter适配器,实现了基类的大部分入站处理方法,并在每一个方法的实现代码中都加上必要的输出信息,以便于观察方法是否被执行到。

InHandlerDemo的代码如下:

package cn.edu.bbc.computer.handler;

//

public class InHandlerDemo extends ChannelInboundHandlerAdapter {

    @Override

    public void handlerAdded(ChannelHandlerContext ctx){

        Logger.info("被调用:handlerAdded()");

        super.handlerAdded(ctx);

    }

    @Override

    public void channelRegistered(ChannelHandlerContext ctx){

        Logger.info("被调用:channelRegistered()");

        super.channelRegistered(ctx);

    }

    @Override

    public void channelActive(ChannelHandlerContext ctx){

        Logger.info("被调用:channelActive()");

        super.channelActive(ctx);

    }

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg){

        Logger.info("被调用:channelRead()");

        super.channelRead(ctx, msg);

    }

    @Override

    public void channelReadComplete(ChannelHandlerContext ctx){

        Logger.info("被调用:channelReadComplete()");

        super.channelReadComplete(ctx);

    }

    @Override

    public void channelInactive(ChannelHandlerContext ctx){

        Logger.info("被调用:channelInactive()");

        super.channelInactive(ctx);

    }

    @Override

    public void channelUnregistered(ChannelHandlerContext ctx){

        Logger.info("被调用: channelUnregistered()");

        super.channelUnregistered(ctx);

    }

    @Override

    public void handlerRemoved(ChannelHandlerContext ctx){

        Logger.info("被调用:handlerRemoved()");

        super.handlerRemoved(ctx);

    }

}

为了演示这个入站处理器,需要编写一个单元测试代码:将上面的Inhandler入站处理器加入一个EmbeddedChannel嵌入式通道的流水线中。接着,通过writeInbound()方法写入ByteBuf数据包。InHandlerDemo作为一个入站处理器,会处理到流水线上的入站报文。单元测试的代码如下:

package cn.edu.bbc.computer.handler;

//省略import

public class InHandlerDemoTester {

    @Test

    public void testInHandlerLifeCircle() {

        final InHandler DemoinHandler = new InHandlerDemo();

        //初始化处理器

        ChannelInitializer i =

new ChannelInitializer<EmbeddedChannel>()

{

            protected void initChannel(EmbeddedChannel ch) {

                ch.pipeline().addLast(inHandler);

            }

        };

        //创建嵌入式通道

        EmbeddedChannel channel = new EmbeddedChannel(i);

        ByteBuf buf = Unpooled.buffer();

        buf.writeInt(1);

        //模拟入站,向嵌入式通道写一个入站数据包

        channel.writeInbound(buf);

        channel.flush();

        //模拟入站,再写一个入站数据包

        channel.writeInbound(buf);

        channel.flush();

        //通道关闭

        channel.close();

       //

    }

}

运行上面的测试用例,主要的输出结果具体如下:

[main|handlerAdded]:被调用:handlerAdded()

[main|channelRegistered]:被调用:channelRegistered()

[main|channelActive]:被调用:channelActive()

[main|channelRead]:被调用:channelRead()

[main|channelReadComplete]:被调用:channelReadComplete()

[main|channelRead]:被调用:channelRead()

[main|channelReadComplete]:被调用:channelReadComplete()

[main|channelInactive]:被调用:channelInactive()

[main|channelUnregistered]:被调用: channelUnregistered()

[main|handlerRemoved]:被调用:handlerRemoved()

在讲解上面的方法之前,首先对处理器的方法进行分类:是生命周期方法还是数据入站回调方法。上面的几个方法中,channelReadchannelReadComplete是入站处理方法;而其他的6个方法是入站处理器的周期方法。从输出的结果可以看到,ChannelHandler中回调方法的执行顺序为:

handlerAdded()channelRegistered()channelActive()→数据传输的入站回调→channelInactive()channelUnregistered()handlerRemoved()

其中,数据传输的入站回调过程为:

channelRead()channelReadComplete()

读数据的入站回调过程会根据入站数据的数量被重复调用,每一次有ByteBuf数据包入站都会调用到。

除了两个入站回调方法外,其余的6个方法都和ChannelHandler的生命周期有关,具体的介绍如下:

1handlerAdded():当业务处理器被加入到流水线后,此方法将被回调。也就是在完成ch.pipeline().addLast(handler)语句之后会回调handlerAdded()

2channelRegistered():当通道成功绑定一个NioEventLoop反应器后,此方法将被回调。

3channelActive():当通道激活成功后,此方法将被回调。通道激活成功指的是所有的业务处理器添加、注册的异步任务完成,并且与NioEventLoop反应器绑定的异步任务完成。

4channelInactive():当通道的底层连接已经不是ESTABLISH状态或者底层连接已经关闭时,会首先回调所有业务处理器的channelInactive()方法。

5channelUnregistered():通道和NioEventLoop反应器解除绑定,移除掉对这条通道的事件处理之后,回调所有业务处理器的channelUnregistered ()方法。

6handlerRemoved()Netty会移除掉通道上所有的业务处理器,并且回调所有业务处理器的handlerRemoved()方法。

在上面的6个生命周期方法中,前面3个在通道创建和绑定时被先后回调,后面3个在通道关闭时会先后被回调。

除了生命周期的回调,还有数据传输的入站回调方法。对于Inhandler入站处理器,有两个很重要的回调方法:

1channelRead():有数据包入站,通道可读。流水线会启动入站处理流程,从前向后,入站处理器的channelRead()方法会被依次回调到。

2channelReadComplete():流水线完成入站处理后,会从前向后依次回调每个入站处理器的channelReadComplete()方法,表示数据读取完毕。

至此,大家对ChannelInboundHandler的生命周期和入站业务处理应该有了一个非常清楚的了解。

上面的入站处理器实战案例InHandlerDemo演示的是入站处理器的工作流程。对于出站处理器ChannelOutboundHandler的生命周期以及回调的顺序,与入站处理器的顺序是大致相同的。不同的是,出站处理器的业务处理方法略微不同。在随书源代码工程中,有一个关于出站处理器的实战案例——OutHandlerDemo。它的代码、包名和上面的类似,大家可以自己去运行和学习,这里不再赘述。

10.6 详解Pipeline

前面讲到,一条Netty通道需要很多业务处理器来处理业务。每条通道内部都有一条流水线(Pipeline)将Handler装配起来。Netty的业务处理器流水线ChannelPipeline是基于责任链设计模式(Chain of Responsibility)来设计的,内部是一个双向链表结构,能够支持动态地添加和删除业务处理器。

10.6.1 Pipeline入站处理流程

为了完整地演示Pipeline入站处理流程,将新建三个极为简单的入站处理器:SimpleInHandlerASimpleInHandlerBSimpleInHandlerC。在ChannelInitializer处理器的initChannel方法中,把它们加入到流水线中。添加的顺序为A→B→C。实战的代码如下:

package cn.edu.bbc.computer.pipeline;

//

public class InPipeline {

  //内部类:第一个入站处理器

  static class SimpleInHandlerA extends

 ChannelInboundHandlerAdapter {

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg){

            Logger.info("入站处理器 A: 被回调 ");

            super.channelRead(ctx, msg);

    }

 }

 //内部类:第二个入站处理器

 static class SimpleInHandlerB extends

ChannelInboundHandlerAdapter {

   @Override

   public void channelRead(ChannelHandlerContext ctx, Object msg){

            Logger.info("入站处理器 B: 被回调 ");

            super.channelRead(ctx, msg);

  }

}

//内部类:第三个入站处理器

static class SimpleInHandlerC extends

ChannelInboundHandlerAdapter {

 @Override

 public void channelRead(ChannelHandlerContext ctx, Object msg){

            Logger.info("入站处理器 C: 被回调 ");

            super.channelRead(ctx, msg);

 }

}

@Test

public void testPipelineInBound() {

        ChannelInitializer i =

new ChannelInitializer<EmbeddedChannel>() {

            protected void initChannel(EmbeddedChannel ch) {

               ch.pipeline().addLast(new SimpleInHandlerA());

               ch.pipeline().addLast(new SimpleInHandlerB());

               ch.pipeline().addLast(new SimpleInHandlerC());

            }

        };

        EmbeddedChannel channel = new EmbeddedChannel(i);

        ByteBuf buf = Unpooled.buffer();

        buf.writeInt(1);

        //向通道写一个入站报文(数据包)

        channel.writeInbound(buf);

        //省略不相关代码

  }

}

在以上三个内部入站处理器的channelRead()方法中,我们打印当前Handler业务处理器的信息,然后调用父类的channelRead()方法,而父类的channelRead()方法的主要作用是把当前入站处理器中处理完毕的结果传递到下一个入站处理器。只是在示例程序中传递的对象都是同一个数据(也就是程序中的msg实例)。

运行实战案例的代码,输出的结果如下:

[main|InPipeline$SimpleInHandlerA:channelRead]:入站处理器 A: 被回调

[main|InPipeline$SimpleInHandlerB:channelRead]:入站处理器 B: 被回调

[main|InPipeline$SimpleInHandlerC:channelRead]:入站处理器 C: 被回调

我们可以看到,入站处理器的流动次序是从前到后,如图10-12所示。

10-12 入站处理器的执行次序

疑问:在入站处理器的channelRead()方法中,如果不调用父类的channelRead()方法,结果会如何呢?大家可以自行尝试。

10.6.2 Pipeline出站处理流程

为了完整地演示Pipeline出站处理流程,将新建三个极为简单的出站处理器:SimpleOutHandlerASimpleOutHandlerBSimpleOutHandlerC。在ChannelInitializer处理器的initChannel()方法中,把它们加入到流水线中,添加的顺序为A→B→C。实战案例的代码如下:

package cn.edu.bbc.computer.pipeline;

//

public class OutPipeline {

    //内部类:第一个出站处理器

    public class SimpleOutHandlerA extends ChannelOutboundHandlerAdapter {

        @Override

        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise){

            Logger.info("出站处理器 A: 被回调" );

            super.write(ctx, msg, promise);

        }

    }

 

    //内部类:第二个出站处理器

    public class SimpleOutHandlerB extends ChannelOutboundHandlerAdapter {

        @Override

        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise){

            Logger.info("出站处理器 B: 被回调" );

            super.write(ctx, msg, promise);

        }

    }

    //内部类:第三个出站处理器

    public class SimpleOutHandlerC extends ChannelOutboundHandlerAdapter {

        @Override

        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise){

            Logger.info("出站处理器 C: 被回调" );

            super.write(ctx, msg, promise);

        }

    }

    @Test

    public void testPipelineOutBound() {

        ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {

            protected void initChannel(EmbeddedChannel ch) {

                ch.pipeline().addLast(new SimpleOutHandlerA());

                ch.pipeline().addLast(new SimpleOutHandlerB());

                ch.pipeline().addLast(new SimpleOutHandlerC());

            }

        };

        EmbeddedChannel channel = new EmbeddedChannel(i);

        ByteBuf buf = Unpooled.buffer();

        buf.writeInt(1);

        //向通道写入一个出站报文(或数据包)

        channel.writeOutbound(buf);

        //省略不相关代码

    }

}

在以上出站处理器的write()方法中,打印当前Handler业务处理器的信息,然后调用父类的write()方法,而这里父类的write()方法会将出站数据通过通道流水线发送到下一个出站处理器。运行上面的实战案例程序,控制台的输出如下:

[main|OutPipeline$SimpleOutHandlerC:write]:出站处理器 C: 被回调

[main|OutPipeline$SimpleOutHandlerB:write]:出站处理器 B: 被回调

[main|OutPipeline$SimpleOutHandlerA:write]:出站处理器 A: 被回调

在代码中,通过pipeline. addLast()方法添加OutBoundHandler出站处理器的顺序为A→B→C。从结果可以看出,出站流水处理次序为从后向前(C→B→A),最后加入的出站处理器反而执行在最前面如图10-13所示。这一点和Inbound入站处理的次序是恰好相反的。

10-13 出站处理器的执行次序

疑问:在出站处理器的write()方法中,如果不调用父类的write()方法,结果会如何呢?大家可以自行尝试和体验。

10.6.3 ChannelHandlerContext

Netty的设计中Handler是无状态的,不保存和Channel有关的信息。Handler的目标是将自己的处理逻辑做得很通用,可以给不同的Channel使用。与Handler不同的是,Pipeline是有状态的,保存了Channel的关系。于是,HandlerPipeline之间需要一个中间角色将它们联系起来。这个中间角色是谁呢?ChannelHandlerContext(通道处理器上下文)!

不管我们定义的是哪种类型的业务处理器,最终它们都是以双向链表的方式保存在流水线中。这里流水线的节点类型并不是前面的业务处理器基类,而是其包装类型ChannelHandlerContext类。当业务处理器被添加到流水线中时会为其专门创建一个ChannelHandlerContext实例,主要封装了ChannelHandler(通道处理器)和ChannelPipeline(通道流水线)之间的关联关系。所以,流水线ChannelPipeline中的双向链接实质是一个由ChannelHandlerContext组成的双向链表。作为Context的成员,无状态的Handler关联在ChannelHandlerContext中。

ChannelPipeline流水线的示意图大致如图10-14所示。

10-14 ChannelPipeline流水线的示意图

ChannelHandlerContext中包含了许多方法,主要可以分为两类:第一类是获取上下文所关联的Netty组件实例,如所关联的通道、所关联的流水线、上下文内部Handler业务处理器实例等;第二类是入站和出站处理方法。

ChannelChannelPipelineChannelHandlerContext三个类中,都存在同样的出站和入站处理方法,这些出现在不同的类中的相同方法,功能有何不同呢?

如果通过ChannelChannelPipeline的实例来调用这些出站和入站处理方法,它们就会在整条流水线中传播。如果是通过ChannelHandlerContext调用出站和入站处理方法,就只会从当前的节点开始往同类型的下一站处理器传播,而不是在整条流水线从头至尾进行完整的传播。

总结一下ChannelHandlerChannelHandlerContext三者的关系:Channel拥有一条ChannelPipeline,每一个流水线节点为一个ChannelHandlerContext上下文对象,每一个上下文中包裹了一个ChannelHandler。在ChannelHandler的入站/出站处理方法中,Netty会传递一个Context实例作为实际参数。处理器中的回调代码可以通过Context实参,在业务处理过程中去获取ChannelPipeline实例或者Channel实例。

10.6.4 HeadContext与TailContext

通道流水线在没有加入任何处理器之前装配了两个默认的处理器上下文:一个头部上下文HeadContext,一个尾部上下文TailContextpipeline的创建、初始化除了保存一些必要的属性外,核心就在于创建了HeadContext头节点和TailContext尾节点。

每个流水线中双向链表结构从一开始就存在了HeadContextTailContext两个节点,后面添加的处理器上下文节点都添加在HeadContext实例和TailContext实例之间。在添加了一些必要的解码器、业务处理器、编码器之后,一条流水线的结构大致如图10-15所示。

10-15 一条流水线的结构大致示意图

流水线尾部的TailContext不仅仅是一个上下文类,还是一个入站处理器类,实现了所有入站处理回调方法,这些回调实现的主要工作基本上都是有关收尾处理的,如释放缓冲区对象、完成异常处理等。

TailContext是流水线默认实现类DefaultChannelPipeline的一个内部类,代码大致如下:

 //流水线默认实现类(来自Netty4.1.49版本)

public class DefaultChannelPipeline implements ChannelPipeline {

  

  //内部类:尾部处理器和尾部上下文是同一个类

  final class TailContext extends AbstractChannelHandlerContext

                                 implements ChannelInboundHandler {

    //入站处理方法:读取通道

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) {

          //释放缓冲区

    }

  //省略TailContext 其他的入站处理方法

  }

}

流水线头部的HeadContextTailContext复杂得多,既是一个出站处理器,也是一个入站处理器,还保存了一个unsafe(完成实际通道传输的类)实例,也就是HeadContext还需要负责最终的通道传输工作。

HeadContext也是流水线默认实现类DefaultChannelPipeline的一个内部类,代码大致如下:

 //流水线默认实现类(来自Netty4.1.49版本)

public class DefaultChannelPipeline implements ChannelPipeline {

  

//内部类:头部处理器和头部上下文是同一个类

//并且头部处理器既是出站处理器也是入站处理器

final class HeadContext extends AbstractChannelHandlerContext

                implements ChannelOutboundHandler, ChannelInboundHandler {

 

     //传输操作类实例:完成通道最终的输入、输出等操作

     //此类专供Netty内部使用,应用程序不能使用,所以取名unsafe

     private final Unsafe unsafe;

 

     //入站处理举例:入站(从ChannelHandler)读操作

     @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) {

         ctx.fireChannelRead(msg);

    }

 

     //出站处理举例:出站(从HandlerChannel)读取传输数据

 @Override

         public void read(ChannelHandlerContext ctx) {

                unsafe.beginRead();

         }

 

       //出站处理举例:出站(从HandlerChannel)写操作

@Override

        public void write(ChannelHandlerContext ctx,

Object msg, ChannelPromise promise) {

                unsafe.write(msg, promise);

        }

//省略HeadContext其他的处理方法

}

  

}

10.6.5 Pipeline入站和出站的双向链接操作

在理解了HeadContextTailContext两个重要的节点之后,再来梳理一下Pipeline的出站和入站处理流程中的双向链接操作。下面摘取流水线的一个入站(读)操作和一个出站(写)操作,源码大致如下:

final class DefaultChannelPipeline implements ChannelPipeline {

      final AbstractChannelHandlerContext head; //HeadContext

      final AbstractChannelHandlerContext tail; //TailContext

    //出站:启动流水线的出站写

    @Override

    public ChannelFuture write(Object msg) {

        return tail.write(msg); //从后往前传递

    }

 

   //入站:启动流水线的入站读

    @Override

    public ChannelPipeline fireChannelRead(Object msg) {

        head.fireChannelRead(msg); //从头往后传递

        return this;

    }

}

完整的出站和入站处理流转过程都是通过调用流水线实例的相应出/入站方法开启的。先看看入站处理的流转过程,以流水线的入站读的启动过程为例,从以上源码可以看出,流水线的入站流程是从fireXXX()方法开始的(XXX表示具体入站操作,入站读的操作为ChannelRead)。在fireChannelRead的源码中,从流水线的头节点Head开始,将入站的msg数据沿着流水线上的入站处理器逐个向后传递,如图10-16所示。

10-16 流水线的入站处理流程大致示意图

如果所有的入站处理过程都没有截断流水线的处理,则该入站数据msg(如ByteBuffer缓冲区)将一直传递到流水线的末尾,也就是TailContext处理器。

从源码可以看出,流水线的出站流程是从流水线的尾部节点Tail开始的,将出站的msg数据沿着流水线上的出站处理器逐个向前传递,如图10-17所示。

10-17 流水线的出站处理流程大致示意图

出站msg数据在经过所有出站处理器之后,将一直传递到流水线的头部,也就是HeadContext处理器,并且通过unsafe传输实例将二进制数据写入底层传输通道,完成整个传输处理过程。

出站和入站被流水线启动之后,其传播的中间过程具体如何呢?这里需要了解一下流水线链表的节点实现,其默认的实现类为AbstractChannelHandlerContext抽象类,此类也是HeadContextTailContext的父类。pipeline内部的双向链表的指针维护以及节点前驱和后继的计算方法都在这个类中实现。AbstractChannelHandlerContext的核心成员如下:

abstract class AbstractChannelHandlerContext

implements ChannelHandlerContext {

        //双向链表的指针:指向后继

        volatile AbstractChannelHandlerContext next;

     //双向链表的指针:指向前驱

        volatile AbstractChannelHandlerContext prev;

 

        private final boolean inbound;          //标志:是否为入站节点

        private final boolean outbound;         //标志:是否为出站节点

        private final AbstractChannel channel; //上下文节点所关联的通道

        private final DefaultChannelPipeline pipeline;  //所属流水线

        private final String name;  //上下文节点名称,可以在加入流水线时指定

        //节点的执行线程,如果没有特别设置,则为通道的IO线程

final EventExecutor executor;

//

}

AbstractChannelHandlerContext的成员属性不止这些,以上成员仅仅是与Pipeline入站和出站的双向链接操作有关的核心成员属性。

Pipeline如何通过上下文实例进行出入站的传播呢?

首先介绍入站操作的传播。以入站读ChannelRead操作为例,下面是fireChannelRead()方法(传播入站读)的源码:

abstract class AbstractChannelHandlerContext

implements ChannelHandlerContext {

//

        @Override

        public ChannelHandlerContext fireChannelRead(final Object msg) {

                if (msg == null) {

                        throw new NullPointerException("msg");

                }

                //在双向链表中向后查找,找到下一个入站节点(同类的后继)

                final AbstractChannelHandlerContext next =findContextInbound();

                

                EventExecutor executor = next.executor();//获取后继的处理线程

                if (executor.inEventLoop()) {

                        //如果当前线程为后继的处理线程

                        //执行后继上下文所包装的处理器

                        next.invokeChannelRead(msg);

                } else {

                        //如果当前处理线程不是后继的处理线程,则提交到后继处理线程去排队

                        //保障该节点的处理器被设置的线程调用,避免发生线程安全问题

                        executor.execute(new OneTimeTask() {

                                @Override

                                public void run() {

                                        //提交到后继处理线程

                                        next.invokeChannelRead(msg);

                                }

                        });

                }

                return this;

        }

}

Pipeline的入站和出站的传播方向是相反的,入站是顺着双向链表向后传播,出站是顺着双向链表向前传播。所以,在fireChannelRead()方法中,调用findContextInbound()方法,找到下一个入站节点(后继的入站节点),该方法的源码如下:

//在双向链表中向后查找,找到下一个入站节点

private AbstractChannelHandlerContext findContextInbound() {

        AbstractChannelHandlerContext ctx = this;

        do {

                ctx = ctx.next;  //向后查找,一直到末尾或者找到入站类型节点为止

        } while (!ctx.inbound);

        return ctx;

}

fireChannelRead()方法中通过findContextInbound()方法找到下一棒入站Context之后,准备开始执行下一站所包装的处理器,只不过这里需要确保执行的线程是该Context实例的executor成员线程以保证线程安全。执行下一站的处理器的方法如下:

//执行下一棒入站Context所包装的处理器

private void invokeChannelRead(Object msg) {

        try {

                ((ChannelInboundHandler) handler()).channelRead(this, msg);

        } catch (Throwable t) {

                notifyHandlerException(t);

        }

}

以上为入站处理的传播过程。Pipeline的出站传播除了方向是相反的,其余的地方与入站传播大致相同,其查找一下出站处理的方法之源码如下:

//在双向链表中向前查找,找到前一个出站节点

private AbstractChannelHandlerContext findContextOutbound() {

                AbstractChannelHandlerContext ctx = this;

                do {

            //向前查找,直到头部或者找到一个出站Context为止

                   ctx = ctx.prev;

                } while (!ctx.outbound);

                return ctx;

}

10.6.6 截断流水线的入站处理传播过程

在入站/出站的过程中,如果由于业务条件不满足而需要截断流水线的处理,不让处理传播到下一站,那么该怎么办呢?

这里以channelRead入站读的处理流程为例,看看如何截断入站处理流程。这里采用的办法是在处理器的channelRead()方法中不再调用父处理器的channelRead()入站方法。代码如下:

package cn.edu.bbc.computer.pipeline;

//

public class InPipeline {

     //省略SimpleInHandlerASimpleInHandlerC

 

     //定义 SimpleInHandlerB2,替换掉SimpleInHandlerB

    static class SimpleInHandlerB2 extends

 ChannelInboundHandlerAdapter {

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg){

            Logger.info("入站处理器 B: 被回调 ");

            //不调用基类的channelRead,终止流水线的执行

            //super.channelRead(ctx, msg);

      }

   }

 

    @Test

    public void testPipelineCutting() {

        ChannelInitializer i =

 new ChannelInitializer<EmbeddedChannel>() {

            protected void initChannel(EmbeddedChannel ch) {

                ch.pipeline().addLast(new SimpleInHandlerA());

                ch.pipeline().addLast(new SimpleInHandlerB2());

                ch.pipeline().addLast(new SimpleInHandlerC());

            }

        };

        EmbeddedChannel channel = new EmbeddedChannel(i);

        ByteBuf buf = Unpooled.buffer();

        buf.writeInt(1);

        //向通道写一个入站报文(或数据包),启动入站处理器流程

        channel.writeInbound(buf);

          //

    }

}

以上代码同样定义了3个业务处理器,只是中间的业务处理器SimpleInHandlerB2没有调用父类的super.channelRead()方法。运行的结果如下:

[T:main|F:channelRead] |>入站处理器 A: 被回调

[T:main|F:channelRead] |>入站处理器 B: 被回调

从运行的结果可以看出,入站处理器C没有执行到,说明处理流水线被成功地截断了,如图10-18所示。

10-18 处理流水线的截断

在以上代码中,通过不调用基类的channelRead()方法截断流水线的执行。在channelRead()方法中,将入站处理结果发送到一站还有一种方法:调用Context上下文的ctx.fireChannelRead(msg)方法。如果要截断流水线的处理,显然不能调用ctx.fireChannelRead(msg)方法。

上面演示的是channelRead读操作入站流程的截断,仅仅是一个示例,如果要截断其他的入站处理的流水线操作(使用Xxx指代),也可以同样处理:

1)不调用supper.channelXxx(ChannelHandlerContext)

2)不调用ctx.fireChannelXxx()

大家在编写入站处理器的代码时一般会继承ChannelInboundHandlerAdapter适配器,而该适配器的默认入站实现主要是进行入站操作的流水线传播,并且是通过上下文Context实例完成的,大致的源码如下:

//入站处理适配器

public class ChannelInboundHandlerAdapter

        extends ChannelHandlerAdapter implements ChannelInboundHandler {

 

//入站方法举例:入站读

  @Override

  public void channelRead(ChannelHandlerContext ctx, Object msg){

        //通过上下文进行入站读操作的流水线传播

        ctx.fireChannelRead(msg);

  }

    //…其他的入站方法的源码类似,故省略

}

至此,入站处理传播流程的截断技巧和背后的原理介绍完了。

流水线的出站处理传播流程如何截断呢?结论是:出站处理流程只要开始执行,就不能被截断,强行截断的话Netty会抛出异常。如果业务条件不满足,可以不启动出站处理。大家可以运行示例工程中的testPipelineOutBoundCutting()测试方法,查看出站处理截断后抛出的异常,这里不再赘述。

10.6.7 在流水线上热插拔Handler

Netty中的处理器流水线是一个双向链表。在程序执行过程中,可以动态进行业务处理器的热插拔:动态地增加、删除流水线上的业务处理器。主要的Handler热拔插方法声明在ChannelPipeline接口中,具体如下:

package io.netty.channel;

//

public interface ChannelPipeline

                 extends Iterable<Entry<String, ChannelHandler>>

{

        //

        //在流水线头部增加一个业务处理器,名字由name指定

        ChannelPipeline addFirst(String name, ChannelHandler handler);

      //在流水线尾部增加一个业务处理器,名字由name指定

        ChannelPipeline addLast(String name, ChannelHandler handler);

      //baseName处理器的前面增加一个业务处理器,名字由name指定

        ChannelPipeline addBefore(String baseName, String name,ChannelHandler handler);      

    //baseName处理器的后面增加一个业务处理器,名字由name指定

        ChannelPipeline addAfter(String  baseName, String name,ChannelHandler handler);

      //删除一个业务处理器实例

        ChannelPipeline remove(ChannelHandler handler);

      //删除一个处理器实例

        ChannelHandler remove(String handler);

      //删除第一个业务处理器

        ChannelHandler removeFirst();

      //删除最后一个业务处理器

        ChannelHandler removeLast();

        //

}

如果需要动态地增加、删除流水线上的业务处理器,调用以上ChannelPipeline的某个方法即可。下面是一个简单的示例:调用流水线实例的remove(ChannelHandler)方法,从流水线动态地删除一个Handler

package cn.edu.bbc.computer.pipeline;

//

public class PipelineHotOperateTester {

  static class SimpleInHandlerA extends ChannelInboundHandlerAdapter {

    public void channelRead(ChannelHandlerContext ctx, Object msg){

            Logger.info("入站处理器 A: 被回调 ");

            super.channelRead(ctx, msg);

            //从流水线删除当前业务处理器

            ctx.pipeline().remove(this);

   }

 

 }

 //省略SimpleInHandlerBSimpleInHandlerC的定义

 

 //测试业务处理器的热拔插

  @Test

  public void testPipelineHotOperating() {

        ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {

            protected void initChannel(EmbeddedChannel ch) {

            ch.pipeline().addLast(new SimpleInHandlerA());

            ch.pipeline().addLast(new SimpleInHandlerB());

            ch.pipeline().addLast(new SimpleInHandlerC());

       }

    };

    EmbeddedChannel channel = new EmbeddedChannel(i);

    ByteBuf buf = Unpooled.buffer();

    buf.writeInt(1);

    //第一次向通道写入站报文(或数据包)

    channel.writeInbound(buf);

    //第二次向通道写入站报文(或数据包)

    channel.writeInbound(buf);

    //第三次向通道写入站报文(或数据包)

    channel.writeInbound(buf);

//省略其他代码

}

运行示例代码,结果节选如下:

[A|F:channelRead] |>入站处理器 A: 被回调

[B|F:channelRead] |>入站处理器 B: 被回调

[C|F:channelRead] |>入站处理器 C: 被回调

[B|F:channelRead] |>入站处理器 B: 被回调

[C|F:channelRead] |>入站处理器 C: 被回调

[B|F:channelRead] |>入站处理器 B: 被回调

[C|F:channelRead] |>入站处理器 C: 被回调

从运行结果中可以看出,在SimpleInHandlerA从流水线中删除后,在后面的入站流水处理中(第二次和第三次入站处理流程),SimpleInHandlerA已经不再被调用了。

这里为大家分析一下通道初始化处理器ChannelInitializer没有被重复调用的原因。通过翻看源码可以知道,在注册完成channelRegistered回调方法中调用ctx.pipeline().remove(this)将自己从流水线中删除了,所以该处理器仅仅被执行了一次。有关ChannelInitializer的源代码,节选如下:

package io.netty.channel;

//省略不相关代码

public abstract class ChannelInitializer extends

 ChannelInboundHandlerAdapter {

        //

        //通道初始化,抽象方法,需要子类实现

        protected abstract void initChannel(Channel var1) throws Exception;

       //回调方法:加入通道(注册完成)后触发

        public final void channelRegistered(ChannelHandlerContext ctx){

                //调用通道初始化实现

                this.initChannel(ctx.channel());

                //删除通道初始化处理器

                ctx.pipeline().remove(this);

                //发送注册消息到下一站

                ctx.fireChannelRegistered();

        }

    //

}

ChannelInitializer在完成了通道的初始化之后,为什么要将自己从流水线中删除呢?原因很简单,就是一条通道流水线只需要做一次装配工作。

10.7 详解ByteBuf

Netty提供了ByteBuf缓冲区组件来替代Java NIOByteBuffer缓冲区组件,以便更加快捷和高效地操纵内存缓冲区。

10.7.1 ByteBuf的优势

Java NIOByteBuffer相比,ByteBuf的优势如下:

  1. Pooling(池化),减少了内存拷贝和GC,提升了效率。
  2. 复合缓冲区类型,支持零拷贝。
  3. 不需要调用flip()方法去切换读/写模式。
  4. 可扩展性好。
  5. 可以自定义缓冲区类型。
  6. 读取和写入索引分开。
  7. 方法的链式调用。
  8. 可以进行引用计数,方便重复使用。

10.7.2 ByteBuf的组成部分

ByteBuf是一个字节容器,内部是一个字节数组。从逻辑上来分,字节容器内部可以分为四个部分,具体如图10-19所示。

10-19 ByteBuf的内部字节数组

第一部分是已用字节,表示已经使用完的废弃的无效字节;第二部分是可读字节,这部分数据是ByteBuf保存的有效数据,从ByteBuf中读取的数据都来自这一部分;第三部分是可写字节,写入ByteBuf的数据都会写到这一部分中;第四部分是可扩容字节,表示的是该ByteBuf最多还能扩容的大小。

10.7.3 ByteBuf的重要属性

ByteBuf通过三个整数类型的属性有效地区分可读数据和可写数据的索引,使得读写之间相互没有冲突。这三个属性定义在AbstractByteBuf抽象类中,分别是:

  1. readerIndex(读指针):指示读取的起始位置。每读取一个字节,readerIndex自动增加1。一旦readerIndexwriterIndex相等,则表示ByteBuf不可读了。
  2. writerIndex(写指针):指示写入的起始位置。每写一个字节,writerIndex自动增加1。一旦增加到writerIndexcapacity()容量相等,则表示ByteBuf不可写了。注意,capacity()是一个成员方法,不是一个成员属性,表示ByteBuf中可以写入的容量,而且它的值不一定是最大容量值。
  3. maxCapacity(最大容量):表示ByteBuf可以扩容的最大容量。当向ByteBuf写数据的时候,如果容量不足,可以进行扩容。扩容的最大限度由maxCapacity来设定,超过maxCapacity就会报错。

ByteBuf的这三个重要属性的含义如图10-20所示。

10-20 ByteBuf内部的三个重要属性的含义

10.7.4 ByteBuf的方法

ByteBuf的方法大致可以分为三组。

第一组:容量系列

  1. capacity():表示ByteBuf的容量,是废弃的字节数、可读字节数和可写字节数之和。
  2. maxCapacity():表示ByteBuf能够容纳的最大字节数。当向ByteBuf中写数据的时候,如果发现容量不足,则进行扩容,直至扩容到maxCapacity设定的上限。

第二组:写入系列

  1. isWritable():表示ByteBuf是否可写。如果capacity()容量大于writerIndex指针的位置,则表示可写,否则为不可写。注意:isWritable()返回false并不代表不能再往ByteBuf中写数据了。如果Netty发现往ByteBuf中写数据写不进去,就会自动扩容ByteBuf
  2. writableBytes():取得可写入的字节数,它的值等于容量capacity()减去writerIndex
  3. maxWritableBytes():取得最大的可写字节数,它的值等于最大容量maxCapacity减去writerIndex
  4. writeBytes(byte[] src):把入参src字节数组中的数据全部写到ByteBuf。这是最为常用的一个方法。
  5. writeTYPE(TYPE value):写入基础数据类型的数据。TYPE表示基础数据类型,这里包含了八种大基础数据类型:writeByte()writeBoolean()writeChar()writeShort()writeInt()writeLong()writeFloat()writeDouble()
  6. setTYPE(TYPE value):基础数据类型的设置,不改变writerIndex指针值。TYPE表示基础数据类型这里包含了八大基础数据类型的设置,即setByte()setBoolean()setChar()setShort()setInt()setLong()setFloat()setDouble()setTYPE系列与writeTYPE系列的不同点是setTYPE系列不改变写指针writerIndex的值,writeTYPE系列会改变写指针writerIndex的值。
  7. markWriterIndex()resetWriterIndex():前一个方法表示把当前的写指针writerIndex属性的值保存在markedWriterIndex标记属性中;后一个方法表示把之前保存的markedWriterIndex的值恢复到写指针writerIndex属性中。这两个方法都用到了标记属性markedWriterIndex,相当于一个写指针的暂存属性。

第三组:读取系列

  1. isReadable():返回ByteBuf是否可读。如果writerIndex指针的值大于readerIndex指针的值,则表示可读,否则为不可读。
  2. readableBytes():返回表示ByteBuf当前可读取的字节数,它的值等于writerIndex减去readerIndex
  3. readBytes(byte[] dst):将数据从ByteBuf读取到dst目标字节数组中,这里dst字节数组的大小通常等于readableBytes()可读字节数。这个方法也是最为常用的方法之一。
  4. readTYPE():读取基础数据类型。可以读取八大基础数据类型:readByte()readBoolean()readChar()readShort()readInt()readLong()readFloat()readDouble()
  5. getTYPE():读取基础数据类型,并且不改变readerIndex读指针的值,具体为getByte()getBoolean()getChar()getShort()getInt()getLong()getFloat()getDouble()getTYPE系列与readTYPE系列的不同点是getTYPE系列不会改变读指针readerIndex的值,readTYPE系列会改变读指针readerIndex的值。
  6. markReaderIndex()resetReaderIndex():前一种方法表示把当前的读指针readerIndex保存在markedReaderIndex属性中;后一种方法表示把保存在markedReaderIndex属性的值恢复到读指针readerIndex中。markedReaderIndex属性定义在AbstractByteBuf抽象基类中,是一个标记属性,相当于一个读指针的暂存属性。

10.7.5 ByteBuf基本使用的实战案例

ByteBuf的基本使用分为三部分:

1)分配一个ByteBuf实例。

2)向ByteBuf写数据。

3)从ByteBuf读数据。

这里使用默认的分配器分配了一个初始容量为9、最大限制为100个字节的缓冲区。关于ByteBuf实例的分配器,后面章节会详细介绍。

实战代码很简单,具体如下:

package cn.edu.bbc.computer.bytebuf;

//

public class WriteReadTest {

    @Test

    public void testWriteRead() {

        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);

        print("动作:分配ByteBuf(9, 100)", buffer);

        buffer.writeBytes(new byte[]{1, 2, 3, 4});

        print("动作:写入4个字节 (1,2,3,4)", buffer);

        Logger.info("start==========:get==========");

        getByteBuf(buffer);

        print("动作:取数据ByteBuf", buffer);

        Logger.info("start==========:read==========");

        readByteBuf(buffer);

        print("动作:读完ByteBuf", buffer);

    }

    //取字节

    private void readByteBuf(ByteBuf buffer) {

        while (buffer.isReadable()) {

            Logger.info("取一个字节:" + buffer.readByte());

        }

    }

    //读字节,不改变指针

    private void getByteBuf(ByteBuf buffer) {

        for (int i = 0; i<buffer.readableBytes(); i++) {

            Logger.info("读一个字节:" + buffer.getByte(i));

        }

    }

}

有关运行的结果,节选如下:

[main|:print]after =======动作:分配ByteBuf(9, 100)============

[main|:print]1.0 isReadable(): false

[main|:print]1.1 readerIndex(): 0

[main|:print]1.2 readableBytes(): 0

[main|:print]2.0 isWritable(): true

[main|:print]2.1 writerIndex(): 0

[main|:print]2.2 writableBytes(): 9

[main|:print]3.0 capacity(): 9

[main|:print]3.1 maxCapacity(): 100

[main|:print]3.2 maxWritableBytes(): 100

//

[main|:print]after ========动作:写入4个字节 (1,2,3,4)===========

[main|:print]1.0 isReadable(): true

[main|:print]1.1 readerIndex(): 0

[main|:print]1.2 readableBytes(): 4

[main|:print]2.0 isWritable(): true

[main|:print]2.1 writerIndex(): 4

[main|:print]2.2 writableBytes(): 5

[main|:print]3.0 capacity(): 9

[main|:print]3.1 maxCapacity(): 100

[main|:print]3.2 maxWritableBytes(): 96

//

[main|:print]after =========动作:取数据ByteBuf============

[main|:print]1.0 isReadable(): true

[main|:print]1.1 readerIndex(): 0

[main|:print]1.2 readableBytes(): 4

[main|:print]2.0 isWritable(): true

[main|:print]2.1 writerIndex(): 4

[main|:print]2.2 writableBytes(): 5

[main|:print]3.0 capacity(): 9

[main|:print]3.1 maxCapacity(): 100

[main|:print]3.2 maxWritableBytes(): 96

//

[main|:print]after =========动作:读完ByteBuf============

[main|:print]1.0 isReadable(): false

[main|:print]1.1 readerIndex(): 4

[main|:print]1.2 readableBytes(): 0

[main|:print]2.0 isWritable(): true

[main|:print]2.1 writerIndex(): 4

[main|:print]2.2 writableBytes(): 5

[main|:print]3.0 capacity(): 9

[main|:print]3.1 maxCapacity(): 100

[main|:print]3.2 maxWritableBytes(): 96

可以看到,使用get取数据是不会影响ByteBuf指针属性值的。由于篇幅原因,这里不仅省略了很多输出结果,还省略了print()方法的源代码,它的作用是打印ByteBuf的属性值。建议打开源代码工程,查看和运行本案例的代码。

10.7.6 ByteBuf的引用计数

JVM中使用计数器(一种GC算法)来标记对象是否不可达进而收回,Netty也使用了这种手段来对ByteBuf的引用进行计数。(注:GCGarbage Collection的缩写,即Java中的垃圾回收机制。)NettyByteBuf的内存回收工作是通过引用计数方式管理的。

Netty之所以采用计数器来追踪ByteBuf的生命周期,一是能对Pooled ByteBuf进行支持,二是能够尽快发现那些可以回收的ByteBuf(非Pooled),以便提升ByteBuf的分配和销毁的效率。

说明

什么是池化(Pooled)的ByteBuf缓冲区呢?从Netty 4版本开始,新增了ByteBuf的池化机制,即创建一个缓冲区对象池,将没有被引用的ByteBuf对象放入对象缓存池中,需要时重新从对象缓存池中取出,而不需要重新创建。

在通信程序的数据传输过程中,Buffer缓冲区实例会被频繁创建、使用、释放,从而频繁创建对象、内存分配、释放内存,这样会导致系统的开销大、性能低。如何提升性能、提高Buffer实例的使用率呢?池化ByteBuf是一种非常有效的方式。

ByteBuf引用计数的大致规则如下:在默认情况下,当创建完一个ByteBuf时,引用计数为1;每次调用retain()方法,引用计数加1;每次调用release()方法,引用计数减1;如果引用为0,再次访问这个ByteBuf对象,将会抛出异常;如果引用为0,表示这个ByteBuf没有哪个进程引用,它占用的内存需要回收。

在下面的例子中,多次调用了ByteBufretain()release()方法,运行后可以看效果:

package cn.edu.bbc.computer.bytebuf;

//

public class ReferenceTest {

    @Test

    public  voidtestRef()

    {

        ByteBuf buffer  =ByteBufAllocator.DEFAULT.buffer();

        Logger.info("after create:"+buffer.refCnt());

 

        buffer.retain();        //增加一次引用计数

        Logger.info("after retain:"+buffer.refCnt());

 

        buffer.release();       //减少一次引用计数

        Logger.info("after release:"+buffer.refCnt());

 

        buffer.release();       //减少一次引用计数

        Logger.info("after release:"+buffer.refCnt());

 

        //错误:refCnt: 0,不能再retain

        buffer.retain();        //增加一次引用计数

        Logger.info("after retain:"+buffer.refCnt());

    }

}

运行程序,结果如下:

[main|ReferenceTest.testRef] |>  after create:1

[main|ReferenceTest.testRef] |>  after retain:2

[main|ReferenceTest.testRef] |>  after release:1

[main|ReferenceTest.testRef] |>  after release:0

…(省略不相关的输出)

io.netty.util.IllegalReferenceCountException: refCnt: 0, increment: 1

…(省略异常信息)

运行后我们会发现:最后一次retain()方法抛出了IllegalReferenceCountException异常。原因是:在此之前,缓冲区buffer的引用计数已经为0,不能再retain了。也就是说:在Netty中,引用计数为0的缓冲区不能再继续使用。

为了确保引用计数不会混乱,在Netty的业务处理器开发过程中应该坚持一个原则:retain()release()方法应该结对使用。对缓冲区调用了一次retain(),就应该调用一次release()。大致的参考代码如下:

public void handlMethodA(ByteBuf byteBuf) {

        byteBuf.retain();

        try {

                handlMethodB(byteBuf);

        } finally {

                byteBuf.release();

        }

}

如果retain()release()这两个方法一次都不调用呢?Netty在缓冲区使用完成后会调用一次release(),就是释放一次。例如,在Netty流水线上,中间所有的业务处理器处理完ByteBuf之后会直接传递给下一个,由最后一个Handler负责调用其release()方法来释放缓冲区的内存空间。

ByteBuf的引用计数已经为0时,Netty会进行ByteBuf的回收,分为以下两种场景:

1)如果属于池化的ByteBuf内存,回收方法是:放入可以重新分配的ByteBuf池,等待下一次分配。

2)如果属于未池化的ByteBuf缓冲区,需要细分为两种情况:如果是堆(Heap)结构缓冲,会被JVM的垃圾回收机制回收;如果是直接(Direct)内存类型,则会调用本地方法释放外部内存(unsafe.freeMemory)。

除了通过ByteBuf成员方法retain()release()管理引用计数之外,Netty还提供了一组用于增加和减少引用计数的通用静态方法:

1ReferenceCountUtil.retain(Object):增加一次缓冲区引用计数的静态方法,从而防止该缓冲区被释放。

2ReferenceCountUtil.release(Object):减少一次缓冲区引用计数的静态方法,如果引用计数为0,缓冲区将被释放。

10.7.7 ByteBuf的分配器

Netty通过ByteBufAllocator分配器来创建缓冲区和分配内存空间。Netty提供了两种分配器实现:PoolByteBufAllocatorUnpooledByteBufAllocator

PoolByteBufAllocator(池化的ByteBuf分配器)将ByteBuf实例放入池中,提高了性能,将内存碎片减少到最小;池化分配器采用了jemalloc高效内存分配的策略,该策略被好几种现代操作系统所采用。

UnpooledByteBufAllocator是普通的未池化ByteBuf分配器,没有把ByteBuf放入池中,每次被调用时,返回一个新的ByteBuf实例;使用完之后,通过Java的垃圾回收机制回收或者直接释放(对于直接内存而言)。

为了验证两者的性能,大家可以做一下对比试验:

1)使用UnpooledByteBufAllocator方式分配ByteBuf缓冲区,开启10000个长连接,每秒所有的连接发一条消息,再看看服务器的内存使用量情况。

实验的参考结果:在较短时间内,就可以看到程序占到10GB多的内存空间,随着系统的运行,内存空间会不断增长,直到整个系统内存被占满而导致内存溢出,最终宕机。

2)把UnpooledByteBufAllocator换成PooledByteBufAllocator,再进行试验,看看服务器的内存使用量情况。

实验的参考结果:内存使用量基本能维持在一个连接占用1MB左右的内存空间,内存使用量保持在10GB左右,经过长时间的运行测试,我们会发现内存使用量能维持在这个数量附近,系统不会因为内存被耗尽而崩溃。

Netty中,默认的分配器为ByteBufAllocator.DEFAULT。该默认的分配器可以通过系统参数(System Property)选项io.netty.allocator.type进行配置,配置时使用字符串值:"unpooled""pooled"

不同的Netty版本,对于分配器的默认使用策略是不一样的。在Netty 4.0版本中,默认的分配器为UnpooledByteBufAllocator(非池化内存分配器)。在Netty 4.1版本中,默认的分配器为PooledByteBufAllocator(池化内存分配器),初始化代码在ByteBufUtil类中的静态代码中,具体如下:

 public final class ByteBufUtil {

     

    static {

                //Android系统默认为unpooled,其他系统默认为pooled

          //除非通过系统属性io.netty.allocator.type 做专门配置

                String allocType = SystemPropertyUtil.get(

                        "io.netty.allocator.type",

                        PlatformDependent.isAndroid() ? "unpooled" : "pooled");

                ByteBufAllocator alloc;

                if ("unpooled".equals(allocType)) {

                        alloc = UnpooledByteBufAllocator.DEFAULT;

                        

                } else if ("pooled".equals(allocType)) {

                        alloc = PooledByteBufAllocator.DEFAULT;

                        

                } else {

                        alloc = PooledByteBufAllocator.DEFAULT;

                        

                }

                DEFAULT_ALLOCATOR = alloc;

                

        }

}

现在PooledByteBufAllocator已经广泛使用了一段时间,并且有了增强的缓冲区泄漏追踪机制。因此,也可以在Netty程序中设置引导类Bootstrap装配的时候将PooledByteBufAllocator设置为默认的分配器。

ServerBootstrap b = new ServerBootstrap()

//设置通道的参数

b.option(ChannelOption.SO_KEEPALIVE, true);

//设置父通道的缓冲区分配器

b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

//设置子通道的缓冲区分配器

b.childOption(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT);

Netty的内存管理策略可以灵活调整,这是使用Netty所带来的又一个好处:只需一行简单的配置就能获得到池化缓冲区带来的好处。在底层,Netty为我们干了所有脏活、累活

使用缓冲区分配器创建ByteBuf的方法有多种,下面列出几种主要的:

package cn.edu.bbc.computer.bytebuf;

//

public class AllocatorTest {

    @Test

    public void showAlloc() {

        ByteBuf buffer = null;

 

     //方法1:通过默认分配器分配

     //初始容量为9、最大容量为100的缓冲区

        buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);

     //方法2:通过默认分配器分配

     //初始容量为256、最大容量为Integer.MAX_VALUE的缓冲区

        buffer = ByteBufAllocator.DEFAULT.buffer();

     //方法3:非池化分配器,分配Java的堆(Heap)结构内存缓冲区

        buffer = UnpooledByteBufAllocator.DEFAULT.heapBuffer();

     //方法4:池化分配器,分配由操作系统管理的直接内存缓冲区

        buffer = PooledByteBufAllocator.DEFAULT.directBuffer();

        //其他方法

    }

}

Netty中缓冲区分配的方法很多,可以根据实际需要进行选择。

10.7.8 ByteBuf缓冲区的类型

介绍完了分配器的类型,再来说一下缓冲区的类型(见表10-2)。根据内存的管理方不同,缓冲区分为堆缓冲区和直接缓冲区,也就是Heap ByteBufDirect ByteBuf。另外,为了方便缓冲区进行组合,还提供了一种组合缓存区。

10-2 ByteBuf缓冲区的类型

上面三种缓冲区都可以通过池化(Pooled)、非池化(Unpooled)两种分配器来创建和分配内存空间。

下面介绍一下Direct Memory(直接内存):

  1. Direct Memory不属于Java堆内存,所分配的内存其实是调用操作系统malloc()函数来获得的,由Netty的本地Native堆进行管理。
  2. Direct Memory容量可通过-XX:MaxDirectMemorySize来指定,如果不指定,则默认与Java堆的最大值(-Xmx指定)一样。注意:并不是强制要求,有的JVM默认Direct Memory-Xmx值无直接关系。
  3. Direct Memory的使用避免了Java堆和Native堆之间来回拷贝数据。在某些应用场景中提高了性能。
  4. 在需要频繁创建缓冲区的场合,由于创建和销毁Direct Buffer(直接缓冲区)的代价比较高昂,因此不宜使用Direct Buffer。也就是说,Direct Buffer尽量在池化分配器中分配和回收。如果能将Direct Buffer进行复用,在读写频繁的情况下就可以大幅度改善性能。
  5. Direct Buffer的读写比Heap Buffer快,但是它的创建和销毁比普通Heap Buffer慢。
  6. Java的垃圾回收机制回收Java堆时,Netty框架也会释放不再使用的Direct Buffer缓冲区,因为它的内存为堆外内存,所以清理的工作不会为Java虚拟机(JVM)带来压力。注意一下垃圾回收的应用场景:①垃圾回收仅在Java堆被填满,以至于无法为新的堆分配请求提供服务时发生;②在Java应用程序中调用System.gc()函数来释放内存。

10.7.9 两类ByteBuf使用的实战案例

首先对比介绍一下Heap ByteBufDirect ByteBuf两类缓冲区的使用,它们有以下几点不同:

  1. Heap ByteBuf通过调用分配器的buffer()方法来创建;Direct ByteBuf通过调用分配器的directBuffer()方法来创建。
  2. Heap ByteBuf缓冲区可以直接通过array()方法读取内部数组;Direct ByteBuf缓冲区不能读取内部数组。
  3. 可以调用hasArray()方法来判断是否为Heap ByteBuf类型的缓冲区;如果hasArray()返回值为true,则表示是堆缓冲,否则为直接内存缓冲区。
  4. Direct ByteBuf读取缓冲数据进行Java程序处理时,相对比较麻烦,需要通过getBytes/readBytes等方法先将数据拷贝到Java的堆内存,然后进行其他的计算。

在实战案例中对比Heap ByteBufDirect ByteBuf这两类缓冲区的使用,大致的代码如下:

package cn.edu.bbc.computer.bytebuf;

//

public class BufferTypeTest {

   final static Charset UTF_8 = Charset.forName("UTF-8");

    //堆缓冲区测试用例

    @Test

    public  void testHeapBuffer() {

        //取得堆内存

        ByteBuf heapBuf = ByteBufAllocator.DEFAULT.heapBuffer();

        heapBuf.writeBytes("软件构件技术原理-java版示例源码".getBytes(UTF_8));

        if (heapBuf.hasArray()) {

            //取得内部数组

            byte[] array = heapBuf.array();

            int offset = heapBuf.arrayOffset() +

heapBuf.readerIndex();

            int length = heapBuf.readableBytes();

            Logger.info(new String(array,offset,length, UTF_8));

        }

        heapBuf.release();

    }

 

    //直接缓冲区测试用例

    @Test

    public  void testDirectBuffer() {

       ByteBuf directBuf=  ByteBufAllocator.DEFAULT.directBuffer();

       directBuf.writeBytes("软件构件技术原理-java版示例源码".getBytes(UTF_8));

        if (!directBuf.hasArray()) {

            int length = directBuf.readableBytes();

            byte[] array = new byte[length];

            //把数据读取到堆内存array中,再进行Java处理

            directBuf.getBytes(directBuf.readerIndex(), array);

            Logger.info(new String(array, UTF_8));

        }

        directBuf.release();

    }

}

Direct ByteBufhasArray()会返回false;反过来,如果hasArray()返回false,不一定代表缓冲区一定就是Direct ByteBuf,也有可能是CompositeByteBufCompositeByteBuf缓冲区是Netty为了减少内存拷贝而提供的组合缓冲区,有关其具体的知识请查阅后面的Netty零拷贝章节。

为了快速创建ByteBufferNetty提供了一个非常方便的获取缓冲区的类——Unpooled,用它可以创建和使用非池化的缓冲区。Unpooled的使用也很容易,下面给出三个例子:

//创建堆缓冲区

ByteBuf heapBuf = Unpooled.buffer(8);

//创建直接缓冲区

ByteBuf directBuf = Unpooled.directBuffer(16);

//创建复合缓冲区

CompositeByteBuf compBuf = Unpooled.compositeBuffer();

Unpooled提供了很多方法,主要的方法大致如表10-3所示。

10-3 Unpooled提供的主要方法

除了在Netty开发中使用之外,Unpooled类的应用场景还包括不需要其他Netty组件(除了缓冲区之外)甚至无网络操作的场景,从而使得Java程序可以使用Netty的高性能、可扩展的缓冲区技术。Unpooled类可用于在Netty应用之外的其他程序中独立使用ByteBuf缓冲区。

在处理器的开发过程中(这个为Netty应用开发的主要工作),推荐大家通过调用Context.alloc()方法来获取通道的缓冲区分配器来创建ByteBuf。下面给出一个例子,演示如何通过Context上下文来获取ByteBuf

public class AllocatorTest

{

        

        //辅助的方法:输出ByteBuf是否为直接内存,以及内存分配器

        public static void printByteBuf(String action, ByteBuf b)

        {

                Logger.info(" ===========" + action + "============");

                //true表示缓冲区为Java堆内存(组合缓冲例外)

                //false表示缓冲区为操作系统管理的内存(组合缓冲例外)

                Logger.info("b.hasArray: " + b.hasArray());

                

                //输出内存分配器

                Logger.info("b.ByteBufAllocator: " + b.alloc());

        }

 

        //处理器类:演示使用Context来获取ByteBuf

        static class AllocDemoHandler extends ChannelInboundHandlerAdapter

        {

                @Override

                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

                {

                        printByteBuf("入站的ByteBuf", (ByteBuf) msg);

                        ByteBuf buf = ctx.alloc().buffer();

                        buf.writeInt(100);

                        //向模拟通道写入一个出站包,模拟数据出站,需要刷新通道才能获取到输出

                        ctx.channel().writeAndFlush(buf);

                }

        }

 

        //测试用例入口

        @Test

        public void testByteBufAlloc()

        {

                        ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>()

                        {

                                protected void initChannel(EmbeddedChannel ch)

                                {

                                                ch.pipeline().addLast(new AllocDemoHandler());

                                }

                        };

                        EmbeddedChannel channel = new EmbeddedChannel(i);

                        //配置通道的缓冲区分配器,这里设置一个池化的分配器

                        channel.config().setAllocator(PooledByteBufAllocator.DEFAULT);

                        ByteBuf buf = Unpooled.buffer();

                        buf.writeInt(1);

                        //向模拟通道写入一个入站包,模拟数据入站

                        channel.writeInbound(buf);

                        //获取通道的出站包

                        ByteBuf outBuf = (ByteBuf) channel.readOutbound();

                        printByteBuf("出站的ByteBuf", (ByteBuf) outBuf);

                        //省略不相关代码

                }

        }

运行测试用例入口方法testByteBufAlloc(),输出大致如下:

[main]|>  ===========入站的ByteBuf============

[main]|>  b.hasArray: true

[main]|>  b.ByteBufAllocator: UnpooledByteBufAllocator(directByDefault: true)

[main]|>  ===========出站的ByteBuf============

[main]|>  b.hasArray: false

[main]|>  b.ByteBufAllocator: PooledByteBufAllocator(directByDefault: true)

以上代码的AllocDemoHandler处理器调用ctx.alloc().buffer()方法获取ByteBuf,有关ctx.alloc()方法的源码如下:

abstract class AbstractChannelHandlerContext{

        

        //获取通道的缓冲区分配器

        @Override

        public ByteBufAllocator alloc() {

                return channel().config().getAllocator();

        }

}

通过源码可以看出,ctx.alloc()方法所获取的分配器是通道的缓冲区分配器。该分配器可以通过Bootstrap引导类为通道进行配置,也可以直接通过channel.config().setAllocator()为通道设置一个缓冲区分配器。

10.7.10 ByteBuf的自动创建与自动释放

1. ByteBuf的自动创建

首先来看一个问题:在入站处理时,Netty是何时自动创建入站的ByteBuf缓冲区的呢?

查看Netty源代码,我们可以看到,NettyReactor线程会通过底层的Java NIO通道读数据。发生NIO读取的方法为AbstractNioByteChannel.NioByteUnsafe.read(),其代码如下:

public void read() {

        

        //channelconfig信息

        final ChannelConfig config = config();

        //获取通道的缓冲区分配器

        final ByteBufAllocator allocator = config.getAllocator();

        //channelpipeline

        final ChannelPipeline pipeline = pipeline();

        //缓冲区分配时的大小推测与计算组件

        final RecvByteBufAllocator.Handle allocHandle =unsafe().recvBufAllocHandle();

        //输入缓冲变量

        ByteBuf byteBuf = null;

        Throwable exception = null;

        try {

                

                do {

                        

                        //使用缓冲区分配器、大小计算组件一起

                        //由分配器按照计算好的大小分配的一个缓冲区

                      byteBuf = allocHandle.allocate(allocator);

                        

                        //读取数据到缓冲区

                        int localReadAmount = doReadBytes(byteBuf);

                        

                        //发送数据到流水线,进行入站处理

                        pipeline.fireChannelRead(byteBuf);

                        

                }while (++ messages < maxMessagesPerRead);

                

        } catch (Throwable t) {

                handleReadException(pipeline, byteBuf, t, close);

        }

        

}

分配缓冲区的时候,为什么要计算大小呢?从通道里读取数据时是不知道接收到数据的具体大小的,那么申请的缓冲区究竟要多大呢?首先,不能太大,太大了浪费;其次,也不能太小,太小了又不够,就需要进行缓冲区的扩容,会影响性能。所以,需要推测要申请的缓冲区大小。Netty设计了一个RecvByteBufAllocator大小推测接口和一系列的大小推测实现类,以帮助进行缓冲区大小的计算和推测。默认的缓冲区大小推测实现类为AdaptiveRecvByteBufAllocator,其特点是能够根据上一次接收数据的大小来自动调整下一次缓冲区创建时分配的空间大小,从而避免内存浪费。

再来看一个问题:在入站处理完成时,入站的ByteBuf是如何自动释放的呢?

方式一:TailContext自动释放

Netty默认会在ChannelPipline的最后添加一个TailContext(尾部上下文,也是一个入站处理器)。它实现了默认的入站处理方法,在这些方法中会帮助完成ByteBuf内存释放的工作,具体如图10-21所示。

10-21 TailContext帮助释放缓冲区

所以,只要最初的ByteBuf数据包一路向后传递,进入流水线的末端,TailContext(末尾处理器)就会自动释放掉入站的ByteBuf实例。其源码大致如下:

 //流水线实现类

public class DefaultChannelPipeline implements ChannelPipeline {

  //内部类:尾部处理器和尾部上下文是同一个类

  final class TailContext extends AbstractChannelHandlerContext

implements ChannelInboundHandler {

    //入站处理方法:读取通道

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) {

            onUnhandledInboundMessage(ctx, msg);

    }

 }

 //入站消息没有被处理,或者说来到了流水线末尾,释放缓冲区

 protected void onUnhandledInboundMessage(Object msg) {

    try {

        logger.debug();

        } finally {

           //释放缓冲区

            ReferenceCountUtil.release(msg);

       }

 }

}

说明

以上的TailContext源码来自Netty4.1.49版本,其他版本的源码可能会有微小的区别,比如说4.0.33版本的源码就有所不同。虽然代码不同,但是干的活都是类似的,就是需要进行缓冲区的释放。

如何让ByteBuf数据包通过流水线一路向后传递,到达末尾的TailContext呢?如果自定义的InboundHandler(入站处理器)继承自ChannelInboundHandlerAdapter适配器,那么可以在入站处理方法中调用基类的入站处理方法,演示代码如下:

public class DemoHandler extends ChannelInboundHandlerAdapter {

   /**

     * 出站处理方法

     * @param ctx 上下文

     * @param msg 入站数据包

     * @throws Exception 可能抛出的异常

     */

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg){

        ByteBuf byteBuf = (ByteBuf) msg;

        //省略ByteBuf的业务处理

        //调用父类的入站方法,默认的动作是将msg向下一站传递,一直到末端

        super.channelRead(ctx,msg);

       //方式二:手动释放ByteBuf

       //byteBuf.release();

    }

}

当然,如果没有调用父类的入站处理方法将ByteBuf缓存区向后传递,则需要手动进行释放。

如果Handler业务处理器需要截断流水线的处理流程,不将ByteBuf数据包送入流水线末端的TailContext入站处理器,并且也不愿意手动释放ByteBuf缓冲区实例,那么该怎么办呢?继承SimpleChannelInboundHandler,利用它的自动释放功能来完成。

方式二:SimpleChannelInboundHandler自动释放

以入站读数据为例,Handler业务处理器可以继承自SimpleChannelInboundHandler基类,此时必须将业务处理代码移动到重写的channelRead0(ctx, msg)方法中。

SimpleChannelInboundHandle类的入站处理方法(如channelRead等)会在调用完实际的channelRead0()方法后帮忙释放ByteBuf实例。如果想看看SimpleChannelInboundHandler是如何释放ByteBuf的,那么可以看看Netty源代码。截取的部分代码如下:

public abstract class SimpleChannelInboundHandler<I>

extends ChannelInboundHandlerAdapter

{

    //基类的入站方法

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg){

        boolean release = true;

        try {

               if (acceptInboundMessage(msg)) {

                    @SuppressWarnings("unchecked")

                    I imsg = (I) msg;

                    //调用实际的业务代码,必须由子类提供实现

                    channelRead0(ctx, imsg);

                } else {

                    release = false;

                    ctx.fireChannelRead(msg);

                  }

         } finally {

                if (autoRelease&& release) {

                    //释放ByteBuf

                    ReferenceCountUtil.release(msg);

                }

          }

     }

    

 }

NettySimpleChannelInboundHandler类的源代码中,执行完子类的channelRead0()业务处理后,在finally语句代码段中ByteBuf被释放了一次,如果ByteBuf计数器为零,就将被彻底释放掉。

2. 出站处理时的自动释放

出站缓冲区的自动释放方式是HeadContext自动释放。出站处理用到的ByteBuf缓冲区一般是要发送的消息,通常是由Handler业务处理器所申请分配的。例如,通过write()方法写入流水线时,调用ctx.writeAndFlush(ByteBuf msg),就会让ByteBuf缓冲区进入流水线的出站处理流程。在每一个出站Handler业务处理器中的处理完成后,数据包(或消息)会来到出站处理的最后一棒HeadContext,在完成数据输出到通道之后,ByteBuf会被释放一次,如果计数器为零,就将被彻底释放掉,如图10-22所示。

10-22 HeadContext头部处理器帮助释放ByteBuffer缓冲区

在出站处理的流水处理过程中,在最终进行写入刷新的时候,HeadContext要通过通道实现类自身实现的doWrite()方法将ByteBuf缓冲区的字节数据发送出去(比如拷贝到内部的Java NIO通道),发送完成后,doWrite()方法就会减少ByteBuf缓冲区的引用计数,代码大致如下:

public abstract class AbstractNioByteChannel

extends AbstractNioChannel {

    //执行二进制字节内容的写入,写入Java NIO通道

    @Override

    protected void doWrite(ChannelOutboundBuffer in) {

        int writeSpinCount = -1;

        boolean setOpWrite = false;

         //死循环:发送缓冲区的数据,直到缓冲区发送完毕

        for (;;) {

            Object msg = in.current();

            

            if (msg instanceof ByteBuf) {

                ByteBuf buf = (ByteBuf) msg;

                int readableBytes = buf.readableBytes();

                                

                  //发送完毕

                if (readableBytes == 0) {

                                

                  //remove()里边包含释放msg的引用减少代码

                  //具体为:ReferenceCountUtil.safeRelease(msg);

                  in.remove();

                  continue;

                }

                  

                  //发送缓冲区的字节数据到Java NIO通道

                int localFlushedAmount = doWriteBytes(buf);

                

            } else if (msg instanceof FileRegion) {

              

            } else {

                //Should not reach here.

                throw new Error();

            }

        }

     

    }

        

    //发送缓冲区的字节数据,将其拷贝到Java NIO通道

    @Override

   protected int doWriteBytes(ByteBuf buf) {

        final int expectedWrittenBytes = buf.readableBytes();

       //拷贝数据到Java NIO通道,相当于发送到Java NIO通道

        return buf.readBytes(javaChannel(), expectedWrittenBytes);

    }

}

}

总之,在Netty应用开发中,必须密切关注ByteBuf缓冲区的释放。如果释放不及时,就会造成Netty的内存泄漏(Memory Leak),最终导致内存耗尽。

10.7.11 ByteBuf浅层拷贝的高级使用方式

首先说明浅层拷贝是一种非常重要的操作,可以很大程度地避免内存拷贝。这一点对于大规模消息通信来说是非常重要的。ByteBuf的浅层拷贝分为两种:切片(slice)浅层拷贝和整体(duplicate)浅层拷贝。

1. 切片浅层拷贝

ByteBufslice()方法可以获取到一个ByteBuf的切片。一个ByteBuf可以进行多次切片浅层拷贝;多次切片后的ByteBuf对象可以共享一个存储区域。

Slice()方法有两个重载版本:

1public ByteBuf slice()

2public ByteBuf slice(int index, int length)

第一个是不带参数的slice()方法,在内部调用了带参数的重载版本,调用大致方式为:

public abstract class AbstractByteBuf extends ByteBuf {

                

                @Override

                public ByteBuf slice() {

                        return slice(readerIndex, readableBytes());

                }

}

也就是说,第一个无参数slice()方法的返回值是ByteBuf实例中可读部分的切片。带参数的slice(int index, int length)方法可以通过灵活地设置不同起始位置和长度来获取到ByteBuf不同区域的切片。

一个简单的slice的使用示例代码如下:

package cn.edu.bbc.computer.bytebuf;

//

public class SliceTest {

    @Test

    public  void testSlice() {

        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);

        print("动作:分配ByteBuf(9, 100)", buffer);

        buffer.writeBytes(new byte[]{1, 2, 3, 4});

        print("动作:写入4个字节 (1,2,3,4)", buffer);

        ByteBuf slice = buffer.slice();

        print("动作:切片 slice", slice);

    }

}

在上面的代码中,输出了源ByteBuf和调用slice()方法后的切片ByteBuf的三组属性值,运行结果如下:

//省略了ByteBuf刚分配后的属性值输出

[main|]after ===========动作:写入4个字节 (1,2,3,4)============

[main|]1.0 isReadable(): true

[main|]1.1 readerIndex(): 0

[main|]1.2 readableBytes(): 4

[main|]2.0 isWritable(): true

[main|]2.1 writerIndex(): 4

[main|]2.2 writableBytes(): 5

[main|]3.0 capacity(): 9

[main|]3.1 maxCapacity(): 100

[main|]3.2 maxWritableBytes(): 96

[main|]after ===========动作:切片 slice============

[main|]1.0 isReadable(): true

[main|]1.1 readerIndex(): 0

[main|]1.2 readableBytes(): 4

[main|]2.0 isWritable(): false

[main|]2.1 writerIndex(): 4

[main|]2.2 writableBytes(): 0

[main|]3.0 capacity(): 4

[main|]3.1 maxCapacity(): 4

[main|]3.2 maxWritableBytes(): 0

调用slice()方法后,返回的切片是一个新的ByteBuf对象,该对象的几个重要属性值大致如下:

  1. readerIndex(读指针)值为0
  2. writerIndex(写指针)值为源ByteBufreadableBytes()可读字节数。
  3. maxCapacity(最大容量)值为源ByteBufreadableBytes()可读字节数。

切片后的新ByteBuf有两个特点:

  1. 切片不可以写入,原因是:maxCapacitywriterIndex值相同。
  2. 切片和源ByteBuf的可读字节数相同,原因是:切片后的可读字节数为自己的属性writerIndex readerIndex,也就是源ByteBufreadableBytes() - 0

切片后的新ByteBuf和源ByteBuf的关联性如下:

  1. 切片不会拷贝源ByteBuf的底层数据,底层数组和源ByteBuf的底层数组是同一个。
  2. 切片不会改变源ByteBuf的引用计数。

从根本上说,slice()无参数方法所生成的切片就是源ByteBuf可读部分的浅层拷贝。

2. 整体浅层拷贝

slice切片不同,duplicate()方法返回的是源ByteBuf的整个对象的一个浅层拷贝,包括如下内容:

  1. Duplicate()的读写指针、最大容量值,与源ByteBuf的读写指针相同。
  2. duplicate()不会改变源ByteBuf的引用计数。
  3. duplicate()不会拷贝源ByteBuf的底层数据。

duplicate()slice()方法都是浅层拷贝。不同的是,slice()方法是切取一段的浅层拷贝,而duplicate()是整体的浅层拷贝。

3. 浅层拷贝的问题

浅层拷贝方法不会实际去拷贝数据,也不会改变ByteBuf的引用计数,会导致一个问题:在源ByteBuf调用release()方法之后,一旦引用计数为零,就变得不能访问了;在这种场景下,源ByteBuf的所有浅层拷贝实例也不能进行读写了;如果强行对浅层拷贝实例进行读写,则会报错。

因此,在调用浅层拷贝实例时,可以通过调用一次retain()方法来增加引用,表示它们对应的底层内存多了一次引用,引用计数为2。在浅层拷贝实例用完后,需要调用两次release()方法,将引用计数减1,这样就不会影响源ByteBuf的内存释放了。

 

10.8 Netty的零拷贝

大部分场景下,在Netty接收和发送ByteBuffer的过程中会使用直接内存进行Socket通道读写,使用JVM的堆内存进行业务处理,会涉及直接内存、堆内存之间的数据拷贝。内存的数据拷贝其实是效率非常低的,Netty提供了多种方法,以帮助应用程序减少内存的拷贝。

Netty的零拷贝(Zero-Copy)主要体现在五个方面:

1Netty提供CompositeByteBuf组合缓冲区类,可以将多个ByteBuf合并为一个逻辑上的ByteBuf,避免了各个ByteBuf之间的拷贝。

2Netty提供了ByteBuf的浅层拷贝操作(sliceduplicate),可以将ByteBuf分解为多个共享同一个存储区域的ByteBuf,避免内存的拷贝。

3)在使用Netty进行文件传输时,可以调用FileRegion包装的transferTo()方法直接将文件缓冲区的数据发送到目标通道,避免普通的循环读取文件数据和写入通道所导致的内存拷贝问题。

4)在将一个byte数组转换为一个ByteBuf对象的场景下,Netty提供了一系列的包装类,避免了转换过程中的内存拷贝。

5)如果通道接收和发送ByteBuf都使用直接内存进行Socket读写,就不需要进行缓冲区的二次拷贝。如果使用JVM的堆内存进行Socket读写,那么JVM会先将堆内存Buffer拷贝一份到直接内存再写入Socket中,相比于使用直接内存,这种情况在发送过程中会多出一次缓冲区的内存拷贝。所以,在发送ByteBufferSocket时,尽量使用直接内存而不是JVM堆内存。

说明

Netty中的零拷贝和操作系统层面上的零拷贝是有区别的,不能混淆,我们所说的Netty零拷贝完全是基于Java层面或者说用户空间的,它更多的是偏向于应用中的数据操作优化,而不是系统层面的操作优化。

10.8.1 通过CompositeByteBuf实现零拷贝

CompositeByteBuf可以把需要合并的多个ByteBuf组合起来,对外提供统一的readIndexwriterIndexCompositeByteBuf只是在逻辑上是一个整体,在CompositeByteBuf内部,合并的多个ByteBuf都是单独存在的。CompositeByteBuf里面有一个Component数组,聚合的ByteBuf都放在Component数组里面,最小容量为16

在很多通信编程场景下,需要多个ByteBuf组成一个完整的消息。例如,HTTP协议传输时消息总是由Header(消息头)和Body(消息体)组成。如果传输的内容很长,就会分成多个消息包进行发送,消息中的Header就需要重用,而不是每次发送都创建新的Header缓冲区。这时可以使用CompositeByteBuf缓冲区进行ByteBuf组合,避免内存拷贝。

假设有一份协议数据,它由头部和消息体组成,而头部和消息体是分别存放在两个ByteBuf中的,为了方便后续处理,要将两个ByteBuf进行合并,具体如图10-23所示。

10-23 CompositeByteBuf实现合并ByteBuf

使用CompositeByteBuf合并多个ByteBuf,大致的代码如下:

ByteBuf headerBuf =

ByteBuf bodyBuf =

CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();

 cbuf.addComponents(headerBuf, bodyBuf);

不使用CompositeByteBuf,将headerbody合并为一个ByteBuf的代码大致如下:

ByteBuf headerBuf =

ByteBuf bodyBuf =

long length=headerBuf.readableBytes() + bodyBuf.readableBytes()

ByteBuf allBuf = Unpooled.buffer(length);

allBuf.writeBytes(headerBuf );//拷贝header数据

allBuf.writeBytes(body);//拷贝body数据

上述过程将headerbody都拷贝到了新的allBuf中,这增加了两次额外的数据拷贝操作。所以,使用CompositeByteBuf合并ByteBuf可以减少两次额外的数据拷贝操作。

下面是一段通过CompositeByteBuf来复用header的比较完整的演示代码:

package cn.edu.bbc.computer.bytebuf;

//

public class CompositeBufferTest {

    static Charset utf8 = Charset.forName("UTF-8");

    @Test

    public void byteBufComposite() {

         CompositeByteBuf cbuf = ByteBufAllocator.DEFAULT.compositeBuffer();

        //消息头

        ByteBuf headerBuf = Unpooled.copiedBuffer("软件构件技术原理:", utf8);

        //消息体1

        ByteBuf bodyBuf = Unpooled.copiedBuffer("高性能Netty", utf8);

        cbuf.addComponents(headerBuf, bodyBuf);

        sendMsg(cbuf);

        //refCnt0, retain

        headerBuf.retain();

        cbuf.release();

 

        cbuf = ByteBufAllocator.DEFAULT.compositeBuffer();

        //消息体2

        bodyBuf = Unpooled.copiedBuffer("高性能学习社群", utf8);

        cbuf.addComponents(headerBuf, bodyBuf);

        sendMsg(cbuf);

        cbuf.release();

    }

 

    private void sendMsg(CompositeByteBuf cbuf) {

        //处理整个消息

        for (ByteBuf b :cbuf) {

            int length = b.readableBytes();

            byte[] array = new byte[length];

            //CompositeByteBuf中的数据统一拷贝到数组中

            b.getBytes(b.readerIndex(), array);

            //处理一下数组中的数据

            System.out.print(new String(array, utf8));

        }

        System.out.println();

    }

}

在上面的程序中,调用CompositeByteBufaddComponents()方法向自身增加了ByteBuf对象实例。对于所添加的ByteBufHeap ByteBufDirect ByteBuf均可。

如果CompositeByteBuf内部只存在一个ByteBuf,则调用其hasArray()方法,返回的是这个唯一实例hasArray()方法的值;如果有多个ByteBuf,则其hasArray()方法会返回false

另外,调用CompositeByteBufnioBuffer()方法可以将CompositeByteBuf实例合并成一个新的NIO ByteBuffer缓冲区(注意:不是NettyByteBuf缓冲区)。演示代码如下:

package cn.edu.bbc.computer.bytebuf;

//

public class CompositeBufferTest {

    @Test

    public void intCompositeBufComposite() {

        CompositeByteBuf cbuf = Unpooled.compositeBuffer(3);

        cbuf.addComponent(Unpooled.wrappedBuffer(new byte[]{1, 2, 3}));

        cbuf.addComponent(Unpooled.wrappedBuffer(new byte[]{4}));

        cbuf.addComponent(Unpooled.wrappedBuffer(new byte[]{5, 6}));

         //合并成一个的Java NIO缓冲区

        ByteBuffer nioBuffer = cbuf.nioBuffer(0, 6);

        byte[] bytes = nioBuffer.array();

        System.out.print("bytes = ");

        for (byte b : bytes) {

            System.out.print(b);

        }

        cbuf.release();

    }

}

10.8.2 通过wrap操作实现零拷贝

Unpooled提供了一系列的wrap包装方法,可以帮助大家方便、快速地包装出CompositeByteBuf实例或者ByteBuf实例,而不用进行内存拷贝。

Unpooled包装CompositeByteBuf的操作使用起来更加方便。例如,上一小节的headerbody的组合可以调用Unpooled.wrappedBuffer()方法。大致的代码如下:

ByteBuf headerBuf =

ByteBuf bodyBuf =

ByteBuf allByteBuf = Unpooled.wrappedBuffer(headerBuf , bodyBuf );

Unpooled类提供了很多重载的wrappedBuffer()方法,将多个ByteBuf包装为CompositeByteBuf实例,从而实现零拷贝。这些重载方法大致如下:

public static ByteBuf wrappedBuffer(ByteBuffer buffer)

public static ByteBuf wrappedBuffer(ByteBuf buffer)

public static ByteBuf wrappedBuffer(ByteBufbuffers)

public static ByteBuf wrappedBuffer(ByteBufferbuffers)

除了通过Unpooled包装CompositeByteBuf之外,还可以将byte数组包装成ByteBuf。如果将一个byte数组转换为一个ByteBuf对象,大致的代码如下:

byte[] bytes =

ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);

通过调用Unpooled.wrappedBuffer()方法将bytes包装为一个UnpooledHeapByteBuf对象,在包装的过程中不会有拷贝操作,所得到的ByteBuf对象和bytes数组共用同一个存储空间,对bytes的修改也是对ByteBuf对象的修改。

如果不是调用Unpooled.wrappedBuffer()包装方法,那么传统的做法是将此byte数组的内容拷贝到ByteBuf中,大致的代码如下:

byte[] bytes =

ByteBuf byteBuf = Unpooled.buffer();

byteBuf.writeBytes(bytes);

显然,传统的转换方式是有额外的内存申请和拷贝操作的,既浪费了内存空间,又需要耗费内存拷贝的时间。相对而言,Unpooled提供的wrap操作既复用了空间,又节省了时间。

Unpooled提供了多个包装字节数组的重载方法,大致如下:

public static ByteBuf wrappedBuffer(byte[] array)

public static ByteBuf wrappedBuffer(byte[] array, int offset, int length)

public static ByteBuf wrappedBuffer(byte[]arrays)

Unpooled类还提供了一些其他的避免零拷贝的方法,具体可以参见其源码,这里不再赘述。

10.9 EchoServer的实战案例

前面实现过Java NIO版本的EchoServer,在学习了Netty的原理和基本使用后,这里为大家设计和实现一个Netty版本的EchoServer

10.9.1 NettyEchoServer

首先回顾一下NettyEchoServer的功能,很简单:服务端读取客户端输入的数据,然后将数据直接回显到Console控制台。此实战案例的目标是帮助大家掌握以下知识:

  1. 服务端ServerBootstrap的装配和使用。
  2. 服务端NettyEchoServerHandler入站处理器的channelRead入站处理方法的编写。
  3. NettyByteBuf缓冲区的读取、写入,以及ByteBuf引用计数的查看。

首先是服务端的ServerBootstrap装配和启动过程,代码如下:

package cn.edu.bbc.computer.echoServer;

//

public class NettyEchoServer {

        //

        public void runServer() {

                //创建反应器轮询组

                EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);

                EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

                //省略设置: 1 反应器轮询组/2 通道类型/4 通道选项等

                //5 装配子通道流水线

                b.childHandler(new ChannelInitializer<SocketChannel>() {

                        //有连接到达时会创建一个通道

                        protected void initChannel(SocketChannel ch) {

                                //管理子通道中的Handler

                                //向子通道流水线添加一个Handler

                                ch.pipeline().addLast(NettyEchoServerHandler.INSTANCE);

                        }

                });

             //省略启动、等待、优雅关闭等

        }

//省略 main()方法

}

10.9.2 NettyEchoServerHandler

Netty EchoServerHandler入站处理器继承自ChannelInboundHandlerAdapter,实现了channelRead()入站读方法(在可读IO事件到来时将被流水线回调)。

回显服务器处理器的逻辑分为两步:

1)第一步,读取从对端输入的数据。channelRead()方法的msg参数的形参类型不是ByteBuf,而是Object,这是由流水线的上一站决定的。一般而言,入站处理的流程是:Netty读取底层的二进制数据,填充到msg时,msgByteBuf类型,然后经过流水线,传入第一个入站处理器;每一个节点处理完后,将自己的处理结果(类型不一定是ByteBuf)作为msg参数不断向后传递。因此,msg参数的形参类型只能是Object类型。第一个入站处理器的channelRead()方法的msg类型绝对是ByteBuf类型,因为它是Netty读取到的ByteBuf数据包。在本实例中,NettyEchoServerHandler就是第一个业务处理器,虽然msg的实参类型是Object,但是实际类型就是ByteBuf,所以可以强制转成ByteBuf类型。

另外,从Netty 4.1开始,ByteBuf的默认类型是Direct ByteBuf。注意,Java不能直接访问Direct ByteBuf内部的数据,必须通过调用getBytes()readBytes()等方法将数据读入Java数组中才能继续进行处理。

2)第二步,将数据写回客户端。这一步很简单,直接复用前面的msg实例即可。不过要注意,如果上一步调用的是readBytes()方法,那么这一步就不能直接将msg写回了,因为数据已经被readBytes()方法读完了。幸好,上一步调用的读数据方法是getBytes(),它不影响ByteBuf的数据指针,因此可以继续使用。这里除了调用ctx.writeAndFlush()方法把msg数据写回客户端之外,也可调用通道的ctx.channel().writeAndFlush()方法发送数据。这两种方法在这里的效果是一样的,因为这个流水线上没有任何出站处理器。

服务端的入站处理器NettyEchoServerHandler的代码如下:

package cn.edu.bbc.computer.echoServer;

//

@ChannelHandler.Sharable

public class NettyEchoServerHandler extends ChannelInboundHandlerAdapter {

    public static final NettyEchoServerHandler INSTANCE = new NettyEchoServerHandler();

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg){

        ByteBuf in = (ByteBuf) msg;

        Logger.info("msg type: " + (in.hasArray()?"堆内存":"直接内存"));

        int len = in.readableBytes();

        byte[] arr = new byte[len];

        in.getBytes(0, arr);

        Logger.info("server received: " + new String(arr, "UTF-8"));

 

        Logger.info("写回前,msg.refCnt:" + ((ByteBuf) msg).refCnt());

        //写回数据,异步任务

        ChannelFuture f = ctx.writeAndFlush(msg);

        f.addListener((ChannelFuturefutureListener) -> {

            Logger.info("写回后,msg.refCnt:" + ((ByteBuf) msg).refCnt());

        });

    }

}

NettyEchoServerHandler加了一个特殊的Netty注解:@ChannelHandler.Sharable。这个注解的作用是标注一个Handler实例可以被多个通道安全地共享(多个通道的流水线可以加入同一个Handler实例)。这种共享操作,Netty默认是不允许的。

很多应用场景都需要Handler实例能共享。例如,一个服务器处理十万以上的通道,如果每一个通道都新建很多重复的Handler实例,就会浪费很多宝贵的空间,降低了服务器的性能。所以,如果在Handler实例中没有与特定通道强相关的数据或者状态,建议设计成共享模式。

如果没有加@ChannelHandler.Sharable注解,试图将同一个Handler实例添加到多个ChannelPipeline时,Netty将会抛出异常。

如何判断一个Handler是否为@Sharable呢?ChannelHandlerAdapter提供了实用方法——isSharable()。如果其对应的实现加上了@Sharable注解,那么这个方法将返回true,表示它可以被添加到多个ChannelPipeline中。

NettyEchoServerHandler没有保存与任何通道连接相关的数据,也没有内部的其他数据需要保存。所以,该处理器不仅仅可以用来共享,而且不需要做任何同步控制。这里为它加上了@Sharable注解,表示可以共享。更进一步,这里还设计了一个通用的INSTANCE静态实例,所有的通道直接使用这个实例即可。

10.9.3 NettyEchoClient

客户端的实战案例可以帮助大家掌握以下知识:

  1. 客户端Bootstrap的装配和使用。
  2. 在客户端NettyEchoClientHandler入站处理器中接收回写的数据,并且释放内存。
  3. 有多种方式可以用于释放ByteBuf,包括自动释放和手动释放。

客户端Bootstrap的装配和使用代码如下:

package cn.edu.bbc.computer.echoServer;

//

public class NettyEchoClient {

 

    private int serverPort;

    private String serverIp;

    Bootstrap b = new Bootstrap();

 

    public NettyEchoClient(String ip, int port) {

        this.serverPort = port;

        this.serverIp = ip;

    }

 

    public void runClient() {

        //创建反应器轮询组

        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

 

        try {

           //1 设置反应器轮询组

           b.group(workerLoopGroup);

           //2 设置nio类型的通道

           b.channel(NioSocketChannel.class);

           //3 设置监听端口

            b.remoteAddress(serverIp, serverPort);

           //4 设置通道的参数

           b.option(ChannelOption.ALLOCATOR,

                              PooledByteBufAllocator.DEFAULT);

 

           //5 装配子通道流水线

           b.handler(new ChannelInitializer<SocketChannel>() {

               //有连接到达时会创建一个通道

               protected void initChannel(SocketChannel ch){

                   //管理子通道中的Handler

                   //向子通道流水线添加一个Handler

                   ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE);

               }

           });

           ChannelFuture f = b.connect();

           f.addListener((ChannelFuturefutureListener) ->

           {

               if (futureListener.isSuccess()) {

                   Logger.info("EchoClient客户端连接成功!");

               } else {

                   Logger.info("EchoClient客户端连接失败!");

               }

           });

 

           //阻塞,直到连接成功

           f.sync();

           Channel channel = f.channel();

           Scanner scanner = new Scanner(System.in);

           Print.tcfo("请输入发送内容:");

           while (scanner.hasNext()) {

               //获取输入的内容

               String next = scanner.next();

               byte[] bytes = (Dateutil.getNow() + " >>"

                                 + next).getBytes("UTF-8");

               //发送ByteBuf

               ByteBuf buffer = channel.alloc().buffer();

               buffer.writeBytes(bytes);

               channel.writeAndFlush(buffer);

               Print.tcfo("请输入发送内容:");

           }

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            //优雅关闭EventLoopGroup

            //释放掉所有资源,包括创建的线程

            workerLoopGroup.shutdownGracefully();

        }

    }

    //省略 main()方法

}

在上面的代码中,客户端在成功连接到服务端后不断循环获取控制台的输入,通过与服务端之间的连接通道发送到服务器。

10.9.4 NettyEchoClientHandler

客户端接收服务器回显的数据包,显示在Console控制台上,所以客户端的处理器流水线不是空的,还需要装配一个回显处理器。该处理的功能很简单,代码如下:

package cn.edu.bbc.computer.echoServer;

//省略import

@ChannelHandler.Sharable

public class NettyEchoClientHandler extends

                               ChannelInboundHandlerAdapter {

    public static final NettyEchoClientHandler INSTANCE

                      = new NettyEchoClientHandler();

    //入站处理方法

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg){

        ByteBuf byteBuf = (ByteBuf) msg;

        int len = byteBuf.readableBytes();

        byte[] arr = new byte[len];

        byteBuf.getBytes(0, arr);

        Logger.info("client received: " + new String(arr, "UTF-8"));

 

        //释放ByteBuf的两种方法

        //方法一:手动释放ByteBuf

        byteBuf.release();

 

        //方法二:调用父类的入站方法,将msg向后传递

        //super.channelRead(ctx,msg);

    }

}

通过代码可以看到,从服务端发送过来的ByteBuf被手动方式强制释放了。当然,也可以使用前面介绍的自动释放方式来释放ByteBuf

 


0 条 查看最新 评论

没有评论
暂时无法发表评论