章 Java NIO核心详解

高性能的Java通信绝对离不开Java NIO组件,现在主流的技术框架或中间件服务器都使用了Java NIO组件,譬如TomcatJettyNetty。学习和掌握Java NIO组件已经不是一项加分技能,而是一项必备技能。

不管是面试还是实际开发,作为Java工程师,都必须掌握NIO的原理和开发实战技能。

8.1 Java NIO简介

1.4版本之前,Java IO类库是阻塞IO;从1.4版本开始,引进了新的异步IO库,被称为Java New IO类库,简称为Java NIONew IO类库的目标就是要让Java支持非阻塞IO,基于此,更多的人喜欢称Java NIO为非阻塞IONon-Blocking IO),称“老的”阻塞式Java IOOIOOld IO)。总体上说,NIO弥补了原来面向流的OIO同步阻塞的不足,为标准Java代码提供了高速、面向缓冲区的IO

Java NIO类库包含以下三个核心组件:

  1. Channel(通道)
  2. Buffer(缓冲区)
  3. Selector(选择器)

理解了第2章的四种IO模型,大家一眼就能识别出来Java NIO属于第三种模型——IO多路复用模型。只不过,Java NIO组件提供了统一的API,为大家屏蔽了底层的操作系统的差异。

在后面的章节中,我们会对以上三个Java NIO的核心组件展开详细介绍。先来看看JavaNIOOIO的简单对比。

8.1.1 NIOOIO的对比

Java中,NIOOIO的区别主要体现在三个方面:

1OIO是面向流(Stream Oriented)的,NIO是面向缓冲区(Buffer Oriented)的。

在一般的OIO操作中,面向字节流或字符流的IO操作总是以流式的方式顺序地从一个流(Stream)中读取一个或多个字节,因此,我们不能随意改变读取指针的位置。在NIO操作中则不同,NIO中引入了ChannelBuffer的概念。面向缓冲区的读取和写入只需要从通道读取数据到缓冲区中,或将数据从缓冲区写入通道中。NIO不像OIO那样是顺序操作,它可以随意读取Buffer中任意位置的数据。

2OIO的操作是阻塞的,而NIO的操作是非阻塞的。

OIO操作都是阻塞的。例如,我们调用一个read方法读取一个文件的内容,调用read的线程就会被阻塞,直到read操作完成。

NIO模式中,当我们调用read方法时,如果此时有数据,则read读取数据并返回;如果此时没有数据,则read也会直接返回,而不会阻塞当前线程。

NIO的非阻塞是如何做到的呢?其实在上一章已经揭晓答案,即NIO使用了通道和通道的多路复用技术。

3OIO没有选择器(Selector)的概念,而NIO有选择器的概念。

NIO的实现是基于底层选择器的系统调用的,所以NIO需要底层操作系统提供支持;而OIO不需要用到选择器。

8.1.2 通道

OIO中,同一个网络连接会关联到两个流:一个是输入流(Input Stream),另一个是输出流(Output Stream)。Java应用程序通过这两个流不断地进行输入和输出的操作。

NIO中,一个网络连接使用一个通道表示,所有NIOIO操作都是通过连接通道完成的。一个通道类似于OIO中两个流的结合体,既可以从通道读取数据,也可以向通道写入数据。

8.1.3 选择器

首先回顾一下前面介绍的基础知识——IO多路复用指的是一个进程/线程可以同时监视多个文件描述符(含socket连接),一旦其中的一个或者多个文件描述符可读或者可写,该监听进程/线程就能够进行IO就绪事件的查询。

Java应用层面,如何实现对多个文件描述符的监视呢?需要用到一个非常重要的Java NIO组件——选择器。选择器可以理解为一个IO事件的监听与查询器。通过选择器,一个线程可以查询多个通道的IO事件的就绪状态。

从编程实现维度来说,IO多路复用编程的第一步是把通道注册到选择器中,第二步是通过选择器所提供的事件查询(select)方法来查询这些注册的通道是否有已经就绪的IO事件(例如可读、可写、网络连接完成等)。

由于一个选择器只需要一个线程进行监控,因此我们可以很简单地使用一个线程,通过选择器去管理多个连接通道。

OIO相比,NIO使用选择器的最大优势是系统开销小。系统不必为每一个网络连接(文件描述符)创建进程/线程,从而大大减少了系统的开销。总之,一个线程负责多个连接通道的IO处理是非常高效的,这种高效来自Java的选择器组件Selector及其底层的操作系统IO多路复用技术的支持。

8.1.4 缓冲区

应用程序与通道的交互主要是进行数据的读取和写入。为了完成NIO的非阻塞读写操作,NIO为大家准备了第三个重要的组件——Buffer。所谓通道的读取,就是将数据从通道读取到缓冲区中;所谓通道的写入,就是将数据从缓冲区写入通道中。缓冲区的使用是面向流进行读写操作的OIO所没有的,也是NIO非阻塞的重要前提和基础之一。

接下来笔者从缓冲区开始为大家详细介绍NIO的三大核心组件。

8.2 详解NIO Buffer类及其属性

NIOBuffer本质上是一个内存块,既可以写入数据,也可以从中读取数据。Java NIO中代表缓冲区的Buffer类是一个抽象类,位于java.nio包中。

NIOBuffer内部是一个内存块(数组),与普通的内存块(Java数组)不同的是:NIO Buffer对象提供了一组比较有效的方法,用来进行写入和读取的交替访问。

说明

Buffer类是一个非线程安全类。

8.2.1 Buffer

Buffer类是一个抽象类,对应于Java的主要数据类型。在NIO中,有8种缓冲区类,分别是ByteBufferCharBufferDoubleBufferFloatBufferIntBufferLongBufferShortBufferMappedByteBuffer。前7Buffer类型覆盖了能在IO中传输的所有Java基本数据类型,第8种类型是一种专门用于内存映射的ByteBuffer类型。不同的Buffer子类可以操作的数据类型能够通过名称进行判断,比如IntBuffer只能操作Integer类型的对象。

实际上,使用最多的是ByteBuffer(二进制字节缓冲区)类型,后面的章节会看到它的具体使用。

8.2.2 Buffer类的重要属性

Buffer的子类会拥有一块内存,作为数据的读写缓冲区,但是读写缓冲区并没有定义在Buffer基类中,而是定义在具体的子类中。例如,ByteBuffer子类就拥有一个byte[]类型的数组成员final byte[] hb,可以作为自己的读写缓冲区,数组的元素类型与Buffer子类的操作类型相对应。

说明

在本书的上一个版本中,这里的内容为:Buffer内部有一个byte[]类型的数组作为数据的读写缓冲区。乍看上去没有什么错误,实际上那个结论是错误的。具体原因是作为读写缓冲区的数组,并没有定义在Buffer类中,而是定义在各具体子类中。感谢社群小伙伴@炬,是他发现了这个比较隐蔽的编写错误。

为了记录读写的状态和位置,Buffer类额外提供了一些重要的属性,其中有三个重要的成员属性:capacity(容量)、position(读写位置)和limit(读写的限制)。接下来对这三个成员属性进行比较详细的介绍。

1. capacity属性

Buffer类的capacity属性表示内部容量的大小。一旦写入的对象数量超过了capacity,缓冲区就满了,不能再写入了。

Buffer类的capacity属性一旦初始化,就不能再改变。原因是什么呢?Buffer类的对象在初始化时会按照capacity分配内部数组的内存,在数组内存分配好之后,它的大小就不能改变了。

前面讲到,Buffer类是一个抽象类,Java不能直接用来新建对象。在具体使用的时候,必须使用Buffer的某个子类,例如DoubleBuffer子类,该子类能写入的数据类型是double,如果在创建实例时其capacity100,那么我们最多可以写入100double类型的数据。

说明

capacity并不是指内部的内存块byte[]数组的字节数量,而是指能写入的数据对象的最大限制数量。

2. position属性

Buffer类的position属性表示当前的位置。position属性的值与缓冲区的读写模式有关。在不同的模式下,position属性值的含义是不同的,在缓冲区进行读写的模式改变时,position值会进行相应的调整。

在写模式下,position值的变化规则如下:

1)在刚进入写模式时,position值为0,表示当前的写入位置为从头开始。

2)每当一个数据写到缓冲区之后,position会向后移动到下一个可写的位置。

3)初始的position值为0,最大可写值为limit-1。当position值达到limit时,缓冲区就已经无空间可写了。

在读模式下,position值的变化规则如下:

1)当缓冲区刚开始进入读模式时,position会被重置为0

2)当从缓冲区读取时,也是从position位置开始读。读取数据后,position向前移动到下一个可读的位置。

3)在读模式下,limit表示可读数据的上限。position的最大值为最大可读上限limit,当position达到limit时表明缓冲区已经无数据可读。

Buffer的读写模式具体如何切换呢?当新建了一个缓冲区实例时,缓冲区处于写模式,这时是可以写数据的。在数据写入完成后,如果要从缓冲区读取数据,就要进行模式的切换,可以调用flip()方法将缓冲区变成读模式,flip为翻转的意思。

在从写模式到读模式的翻转过程中,positionlimit属性值会进行调整,具体的规则是:

1limit属性被设置成写模式时的position值,表示可以读取的最大数据位置。

2position由原来的写入位置变成新的可读位置,也就是0,表示可以从头开始读。

8. limit属性

Buffer类的limit属性表示可以写入或者读取的数据最大上限,其属性值的具体含义也与缓冲区的读写模式有关。在不同的模式下,limit值的含义是不同的,具体分为以下两种情况:

1)在写模式下,limit属性值的含义为可以写入的数据最大上限。在刚进入写模式时,limit的值会被设置成缓冲区的capacity值,表示可以一直将缓冲区的容量写满。

2)在读模式下,limit值的含义为最多能从缓冲区读取多少数据。

一般来说,在进行缓冲区操作时是先写入再读取的。当缓冲区写入完成后,就可以开始从Buffer读取数据,调用flip()方法(翻转),这时limit的值也会进行调整。具体如何调整呢?将写模式下的position值设置成读模式下的limit值,也就是说,将之前写入的最大数量作为可以读取数据的上限值。

Buffer在翻转时的属性值调整主要涉及positionlimit两个属性,但是这种调整比较微妙,不是太好理解,下面举一个简单的例子:

首先,创建缓冲区。新创建的缓冲区处于写模式,其position值为0limit值为最大容量capacity

然后,向缓冲区写数据。每写入一个数据,position向后面移动一个位置,也就是position的值加1。这里假定写入了5个数,当写入完成后,position的值为5

最后,使用flip方法将缓冲区切换到读模式。limit的值会先被设置成写模式时的position值,所以新的limit值是5,表示可以读取数据的最大上限是5。之后调整position值,新的position会被重置为0,表示可以从0开始读。

缓冲区切换到读模式后就可以从缓冲区读取数据了,一直到缓冲区的数据读取完毕。

除了以上capacitypositionlimit三个重要的成员属性之外,Buffer还有一个比较重要的标记属性:mark(标记)属性。该属性的大致作用为:在缓冲区操作过程当中,可以将当前的position值临时存入mark属性中;需要的时候,再从mark中取出暂存的标记值,恢复到position属性中,重新从position位置开始处理。

下面用表8-1总结一下Buffer类的四个重要属性。

8-1 Buffer类的四个重要属性说明

8.3 详解NIO Buffer类的重要方法

本节将详细介绍Buffer类的几个常用方法,包含Buffer实例的创建、写入、读取、重复读、标记和重置等。

8.8.1 allocate()

在使用Buffer实例之前,我们首先需要获取Buffer子类的实例对象,并且分配内存空间。需要获取一个Buffer实例对象时,并不是使用子类的构造器来创建,而是调用子类的allocate()方法。

下面的程序片段演示如何获取一个整型的Buffer实例对象:

package cn.edu.bbc.computer.bufferDemo;

import cn.edu.bbc.computer.util.Logger;

import java.nio.IntBuffer;

 

public class UseBuffer

{

        //一个整型的Buffer静态变量

        static IntBuffer intBuffer = null;

        public static void allocateTest()

        {

                //创建一个intBuffer实例对象

                intBuffer = IntBuffer.allocate(20);

                Logger.debug("------------after allocate------------------");

                Logger.debug("position=" + intBuffer.position());

                Logger.debug("limit=" + intBuffer.limit());

                Logger.debug("capacity=" + intBuffer.capacity());

        }

        //省略其他代码

}

本例中,IntBuffer是具体的Buffer子类,通过调用IntBuffer.allocate(20)创建了一个intBuffer实例对象,并且分配了20×4字节的内存空间。运行程序之后,通过程序的输出结果,我们可以查看一个新建缓冲区实例对象的主要属性值,如下所示:

allocatTest |>  ------------after allocate------------------

allocatTest |>  position=0

allocatTest |>  limit=20

allocatTest |>  capacity=20

从上面的运行结果可以看出:一个缓冲区在新建后处于写模式,position属性(代表写入位置)的值为0,缓冲区的capacity值是初始化时allocate方法的参数值(这里是20),而limit最大可写上限值也为allocate方法的初始化参数值。

8.8.2 put()

在调用allocate()方法分配内存、返回了实例对象后,缓冲区实例对象处于写模式,可以写入对象,如果要把对象写入缓冲区,就需要调用put()方法。put()方法很简单,只有一个参数,即需要写入的对象,只不过要求写入的数据类型与缓冲区的类型保持一致。

接着前面的例子向刚刚创建的intBuffer缓存实例对象写入5个整数,代码如下:

package cn.edu.bbc.computer.bufferDemo;

//省略import

public class UseBuffer

{

    //一个整型的Buffer静态变量

    static IntBuffer intBuffer = null;

    //省略了创建缓冲区的代码,具体查看前面小节的内容和随书源码

         public static void putTest()

    {

        for (int i = 0; i < 5; i++)

        {

            //写入一个整数到缓冲区

            intBuffer.put(i);

        }

                

        //输出缓冲区的主要属性值

        Logger.debug("------------after putTest------------------");

        Logger.debug("position=" + intBuffer.position());

        Logger.debug("limit=" + intBuffer.limit());

        Logger.debug("capacity=" + intBuffer.capacity());

    }

    //省略其他代码

}

写入5个元素后,同样输出缓冲区的主要属性值,输出的结果如下:

putTest |>  ------------after putTest------------------

putTest |>  position=5

putTest |>  limit=20

putTest |>  capacity=20

从结果可以看到,写入了5个元素之后,缓冲区的position属性值变成了5,所以指向了第6个(从0开始的)可以进行写入的元素位置。limit最大可写上限、capacity最大容量两个属性的值都没有发生变化。

8.8.3 flip()

向缓冲区写入数据之后,是否可以直接从缓冲区读取数据呢?不能!这时缓冲区还处于写模式,如果需要读取数据,要将缓冲区转换成读模式。flip()翻转方法是Buffer类提供的一个模式转变的重要方法,作用是将写模式翻转成读模式。

接着前面的例子演示一下flip()方法的使用:

package cn.edu.bbc.computer.bufferDemo;

//省略import

public class UseBuffer

{

        //一个整型的Buffer静态变量

        static IntBuffer intBuffer = null;

        //省略了缓冲区的创建、写入数据的代码,具体查看前面小节的内容和随书源码

        public static void flipTest()

        {

                        //翻转缓冲区,从写模式翻转成读模式

                        intBuffer.flip();

                        //输出缓冲区的主要属性值

                        Logger.info("------------after flip ------------------");

                        Logger.info("position=" + intBuffer.position());

                        Logger.info("limit=" + intBuffer.limit());

                                Logger.info("capacity=" + intBuffer.capacity());

                }

    //省略其他代码

}

在调用flip()方法进行缓冲区的模式翻转之后,通过程序的输出内容可以看到缓冲区的属性有了奇妙的变化,具体如下:

flipTest |>  ------------after flipTest ------------------

flipTest |>  position=0

flipTest |>  limit=5

flipTest |>  capacity=20

调用flip()方法后,新模式下可读上限limit的值变成了之前写模式下的position属性值,也就是5;而新的读模式下的position值简单粗暴地变成了0,表示从头开始读取。

flip()方法从写入到读取转换的规则,再一次详细介绍如下:

首先,设置可读上限limit的属性值。将写模式下的缓冲区中内容的最后写入位置position值作为读模式下的limit上限值。

其次,把读的起始位置position的值设为0,表示从头开始读。

最后,清除之前的mark标记,因为mark保存的是写模式下的临时位置,发生模式翻转后,如果继续使用旧的mark标记,就会造成位置混乱。

上面三步其实可以查看Buffer.flip()方法的源代码,具体如下:

public final Buffer flip() {

    limit = position;   //设置可读上限limit,设置为写模式下的position

    position = 0;       //把读的起始位置position的值设为0,表示从头开始读

    mark = UNSET_MARK;  //清除之前的mark标记

    return this;

}

新的问题来了:在读取完成后,如何再一次将缓冲区切换成写模式呢?答案是:可以调用Buffer.clear()清空或者Buffer.compact()压缩方法,它们可以将缓冲区转换为写模式。总体的Buffer模式转换大致如图8-1所示。

8-1 缓冲区读写模式的转换

8.8.4 get()

调用flip()方法将缓冲区切换成读模式之后,就可以开始从缓冲区读取数据了。读取数据的方法很简单,可以调用get()方法每次从position的位置读取一个数据,并且进行相应的缓冲区属性的调整。

接着前面调用flip()方法的实例,演示一下缓冲区的读取操作,代码如下:

package cn.edu.bbc.computer.bufferDemo;

//省略import

public class UseBuffer

{

        //一个整型的Buffer静态变量

        static IntBuffer intBuffer = null;

 

        //省略了缓冲区的创建、写入、翻转的代码,具体查看前面小节的内容和随书源码

        

        public static void getTest()

        {

                //先读2个数据

                for (int i = 0; i< 2; i++)

                {

                        int j = intBuffer.get();

                        Logger.info("j = " + j);

                }

 

                //输出缓冲区的主要属性值

                Logger.info("---------after get 2 int --------------");

                Logger.info("position=" + intBuffer.position());

                Logger.info("limit=" + intBuffer.limit());

                Logger.info("capacity=" + intBuffer.capacity());

                //再读3个数据

                for (int i = 0; i< 3; i++)

                {

                        int j = intBuffer.get();

                        Logger.info("j = " + j);

                }

                //输出缓冲区的主要属性值

                Logger.info("---------after get 3 int ---------------");

                Logger.info("position=" + intBuffer.position());

                Logger.info("limit=" + intBuffer.limit());

                Logger.info("capacity=" + intBuffer.capacity());

                }

                //

        }

 

    //省略其他代码

}

以上代码调用get方法从缓冲实例中先读取2个元素,再读取3个元素,运行后输出的结果如下

getTest |>  ------------after get 2 int ------------------

getTest |>  position=2

getTest |>  limit=5

getTest |>  capacity=20

getTest |>  ------------after get 3 int ------------------

getTest |>  position=5

getTest |>  limit=5

getTest |>  capacity=20

从程序的输出结果可以看到,读取操作会改变可读位置position的属性值,而可读上限limit值并不会改变。在position值和limit值相等时,表示所有数据读取完成,position指向了一个没有数据的元素位置,已经不能再读了,此时再读就会抛出BufferUnderflowException异常。

这里强调一下,在读完之后是否可以立即对缓冲区进行数据写入呢?答案是不能。现在还处于读模式,我们必须调用Buffer.clear()Buffer.compact()方法,即清空或者压缩缓冲区,将缓冲区切换成写模式,让其重新可写。

此外还有一个问题:缓冲区是不是可以重复读呢?答案是可以的,既可以通过倒带方法rewind()去完成,也可以通过mark()reset()两个方法组合实现。

8.8.5 rewind()

已经读完的数据,如果需要再读一遍,可以调用rewind()方法。rewind()也叫倒带,就像播放磁带一样倒回去,再重新播放。

接着前面的示例代码,继续rewind方法使用的演示,示例代码如下:

package cn.edu.bbc.computer.bufferDemo;

//省略import

public class UseBuffer

{

        //一个整型的Buffer静态变量

        static IntBuffer intBuffer = null;

        //省略了缓冲区的写入和读取等代码,具体查看前面小节的内容和随书源码

        public static void rewindTest() {

                //倒带

                intBuffer.rewind();

                //输出缓冲区属性

                Logger.info("------------after rewind ------------------");

                Logger.info("position=" + intBuffer.position());

                Logger.info("limit=" + intBuffer.limit());

                Logger.info("capacity=" + intBuffer.capacity());

        }

 

      //省略其他代码

}

这个范例程序的执行结果如下:

rewindTest |>  ------------after rewind ------------------

rewindTest |>  position=0

rewindTest |>  limit=5

rewindTest |>  capacity=20

rewind ()方法主要是调整了缓冲区的position属性与mark属性,具体的调整规则如下:

1position重置为0,所以可以重读缓冲区中的所有数据。

2limit保持不变,数据量还是一样的,仍然表示能从缓冲区中读取的元素数量。

3mark被清理,表示之前的临时位置不能再用了。

JDK中可以查阅Buffer.rewind()方法的源代码,具体如下:

public final Buffer rewind() {

        position = 0; //重置为0,所以可以重读缓冲区中的所有数据

        mark = -1;    //mark被清理,表示之前的临时位置不能再用了

        return this;

}

通过源代码,我们可以看到rewind()方法与flip()方法很相似,区别在于:倒带方法rewind()不会影响limit属性值;而翻转方法flip()会重设limit属性值。

rewind()倒带之后,就可以再一次读取,重复读取的示例代码如下:

package cn.edu.bbc.computer.bufferDemo;

//省略import

public class UseBuffer

{

        //一个整型的Buffer静态变量

        static IntBuffer intBuffer = null;

        //省略了缓冲区的写入和读取、倒带等代码,具体查看前面小节的内容和随书源码

 

        public static void reRead() {

                for (int i = 0; i< 5; i++) {

                        if (i == 2) {

                                //临时保存,标记一下第3个位置

                                intBuffer.mark();

                        }

                        //读取元素

                        int j = intBuffer.get();

                        Logger.info("j = " + j);

                }

                //输出缓冲区的属性值

                Logger.info("------------after reRead------------------");

                Logger.info("position=" + intBuffer.position());

                Logger.info("limit=" + intBuffer.limit());

                Logger.info("capacity=" + intBuffer.capacity());

        }

        

   //省略其他代码

}

这段代码与前面的读取示例代码基本相同,只是增加了一个mark调用。大家可以通过随书源码工程执行以上代码并观察输出结果,具体的输出与前面的类似,这里不再赘述。

8.8.6 mark()reset()

mark()reset()两个方法是配套使用的:Buffer.mark()方法将当前position的值保存起来放在mark属性中,让mark属性记住这个临时位置;然后可以调用Buffer.reset()方法将mark的值恢复到position中。

说明

Buffer.mark()Buffer.reset()两个方法都涉及mark属性的使用。mark()方法与mark属性的名字虽然相同,但是一个是Buffer类的成员方法,一个是Buffer类的成员属性,不能混淆。

例如,在前面重复读取的示例代码中,在读到第三个元素(i2时)时,可以调用mark()方法,把当前位置position的值保存到mark属性中,这时mark属性的值为2

接下来可以调用reset()方法将mark属性的值恢复到position中,这样就可以从位置2(第三个元素)开始重复读取了。

接着前面重复读取的代码,进行mark()方法和reset()方法的示例演示,代码如下:

package cn.edu.bbc.computer.bufferDemo;

//省略import

public class UseBuffer

{

        //一个整型的Buffer静态变量

        static IntBuffer intBuffer = null;

        //省略了缓冲区的倒带、重复读取等代码,具体查看前面小节的内容和随书源码

 

        //演示前提:

        //在前面的reRead()演示方法中,已经通过mark()方法暂存了position

        

        public static void afterReset() {

                Logger.info("------------after reset------------------");

                //把前面保存在mark中的值恢复到position

                intBuffer.reset();

                //输出缓冲区的属性值

                Logger.info("position=" + intBuffer.position());

                Logger.info("limit=" + intBuffer.limit());

                Logger.info("capacity=" + intBuffer.capacity());

                //读取并且输出元素

                for (int i =2; i< 5; i++) {

                                int j = intBuffer.get();

                                Logger.info("j = " + j);

                }

        }

    //省略其他代码

}

在上面的代码中,首先调用reset()mark中的值恢复到position中,因此读取的位置position就是2,表示可以再次开始从第三个元素开始读取数据。上面的程序代码的输出结果是:

afterReset |>  ------------after reset------------------

afterReset |>  position=2

afterReset |>  limit=5

afterReset |>  capacity=20

afterReset |>  j = 2

afterReset |>  j = 3

afterReset |>  j = 4

调用reset()方法之后,position的值为2,此时去读取缓冲区,输出后面的三个元素234

8.8.7 clear()

在读模式下,调用clear()方法将缓冲区切换为写模式。此方法的作用是:

1)将position清零。

2limit设置为capacity最大容量值,可以一直写入,直到缓冲区写满。

接着上面的实例演示一下clear()方法的使用,大致的代码如下:

package cn.edu.bbc.computer.bufferDemo;

//省略import

public class UseBuffer

{

     //一个整型的Buffer静态变量

     static IntBuffer intBuffer = null;

     //省略了缓冲区的创建、写入、读取等代码,具体查看前面小节的内容和随书源码

        

    public static void clearDemo() {

        Logger.info("------------after clear------------------");

        //清空缓冲区,进入写模式

        intBuffer.clear();

        //输出缓冲区的属性值

        Logger.info("position=" + intBuffer.position());

        Logger.info("limit=" + intBuffer.limit());

        Logger.info("capacity=" + intBuffer.capacity());

    }

     //省略其他代码

}

这个示例程序运行之后,结果如下:

main |>清空

clearDemo |>  ------------after clear------------------

clearDemo |>  position=0

clearDemo |>  limit=20

clearDemo |>  capacity=20

在缓冲区处于读模式时,调用clear(),缓冲区会被切换成写模式。调用clear()之后,我们可以看到清空了position(写入的起始位置)的值,其值被设置为0,并且limit值(写入的上限)为最大容量。

8.8.8 使用Buffer类的基本步骤

总体来说,使用Java NIO Buffer类的基本步骤如下:

1)使用创建子类实例对象的allocate()方法创建一个Buffer类的实例对象。

2)调用put()方法将数据写入缓冲区中。

3)写入完成后,在开始读取数据前调用Buffer.flip()方法,将缓冲区转换为读模式。

4)调用get()方法,可以从缓冲区中读取数据。

5)读取完成后,调用Buffer.clear()方法或Buffer.compact()方法,将缓冲区转换为写模式,可以继续写入。

 

8.4 详解NIO Channel

前面提到,Java NIO中一个socket连接使用一个Channel来表示。从更广泛的层面来说,一个通道可以表示一个底层的文件描述符,例如硬件设备、文件、网络连接等。然而,远不止如此,Java NIO的通道可以更加细化。例如,不同的网络传输协议类型,在Java中都有不同的NIO Channel实现。

这里不对Java NIO的全部通道类型进行过多的描述,仅着重介绍其中最为重要的四种Channel实现:FileChannelSocketChannelServerSocketChannelDatagramChannel

对于以上四种通道,说明如下:

1FileChannel:文件通道,用于文件的数据读写。

2SocketChannel:套接字通道,用于套接字TCP连接的数据读写。

3ServerSocketChannel:服务器套接字通道(或服务器监听通道),允许我们监听TCP连接请求,为每个监听到的请求创建一个SocketChannel通道。

4DatagramChannel:数据报通道,用于UDP的数据读写。

这四种通道涵盖了文件IOTCP网络、UDP IO三类基础IO读写操作。下面从通道的获取、读取、写入、关闭这四个重要的操作入手,对它们进行简单的介绍。

8.4.1 FileChannel

FileChannel(文件通道)是专门操作文件的通道。通过FileChannel,既可以从一个文件中读取数据,也可以将数据写入文件中。特别申明一下,FileChannel为阻塞模式,不能设置为非阻塞模式。

下面分别介绍FileChannel的获取、读取、写入、关闭这四个操作。

1. 获取FileChannel

可以通过文件的输入流、输出流获取FileChannel,示例如下:

//创建一个文件输入流

FileInputStream fis = new FileInputStream(srcFile);

//获取文件流的通道

FileChannel inChannel = fis.getChannel();

//创建一个文件输出流

FileOutputStream fos = new FileOutputStream(destFile);

//获取文件流的通道

FileChannel outchannel = fos.getChannel();

也可以通过RandomAccessFile(文件随机访问)类来获取FileChannel实例,代码如下:

//创建RandomAccessFile随机访问对象

RandomAccessFile rFile = new RandomAccessFile("filename.txt""rw");

//获取文件流的通道(可读可写)

FileChannel channel = rFile.getChannel();

2. 读取FileChannel

在大部分应用场景中,从通道读取数据都会调用通道的int read(ByteBuffer buf)方法,它把从通道读取的数据写入ByteBuffer缓冲区,并且返回读取的数据量。

RandomAccessFileaFile = new RandomAccessFile(fileName, "rw");

//获取通道(可读可写)

FileChannel channel=aFile.getChannel();

//获取一个字节缓冲区

ByteBuffer buf = ByteBuffer.allocate(CAPACITY);

int length = -1;

//调用通道的read()方法,读取数据并写入字节类型的缓冲区

while ((length = channel.read(buf)) != -1) {

//省略buf中的数据处理

}

说明

以上代码中channel.read(buf)读取通道的数据时,对于通道来说是读模式,对于ByteBuffer缓冲区来说是写入数据,这时ByteBuffer缓冲区处于写模式。

8. 写入FileChannel

把数据写入通道,在大部分应用场景中都会调用通道的write(ByteBuffer)方法,此方法的参数是一个ByteBuffer缓冲区实例,是待写数据的来源。

write(ByteBuffer)方法的作用是从ByteBuffer缓冲区中读取数据,然后写入通道自身,而返回值是写入成功的字节数。

//如果buf处于写模式(如刚写完数据),需要翻转buf,使其变成读模式

buf.flip();

int outlength = 0;

//调用write()方法,将buf的数据写入通道

while ((outlength = outchannel.write(buf)) != 0) {

        System.out.println("写入的字节数:" + outlength);

}

在以上的outchannel.write(buf)调用中,对于入参buf实例来说,需要从其中读取数据写入outchannel通道中,所以入参buf必须处于读模式,不能处于写模式。

4. 关闭通道

当通道使用完成后,必须将其关闭。关闭非常简单,调用close()方法即可。

//关闭通道

channel.close();

5. 强制刷新到磁盘

在将缓冲区写入通道时,出于性能的原因,操作系统不可能每次都实时地将写入数据落地(或刷新)到磁盘,完成最终的数据保存。

在将缓冲区数据写入通道时,要保证数据能写入磁盘,可以在写入后调用一下FileChannelforce()方法。

//强制刷新到磁盘

channel.force(true);

8.4.2 使用FileChannel完成文件复制的实战案例

下面是一个简单的实战案例:使用FileChannel复制文件。具体的功能是使用FileChannel将原文件复制一份,把原文件中的数据都复制到目标文件中。完整代码如下:

package cn.edu.bbc.computer.iodemo.fileDemos;

//省略import,具体请参见源代码工程

public class FileNIOCopyDemo {

    public static void main(String[] args) {

        //演示复制资源文件

        nioCopyResouceFile();

    }

    /**

     * 复制两个资源目录下的文件

     */

    public static void nioCopyResouceFile() {

       //

        String sourcePath = NioDemoConfig.FILE_RESOURCE_SRC_PATH;

        String srcPath = IOUtil.getResourcePath(sourcePath);

        Logger.info("srcPath=" + srcPath);

 

        //目标

        String destPath = NioDemoConfig.FILE_RESOURCE_DEST_PATH;

        String destDecodePath = IOUtil.builderResourcePath(destPath);

        Logger.info("destDecodePath=" + destDecodePath);

 

//复制文件

        nioCopyFile(srcDecodePath, destDecodePath);

    }

    /**

     * NIO方式复制文件

     * @param srcPath 源路径

     * @param destPath目标路径

     */

      public static void nioCopyFile(String srcPath, String destPath){

        File srcFile = new File(srcPath);

        File destFile = new File(destPath);

        try {

            //如果目标文件不存在,则新建

            if (!destFile.exists()) {

                destFile.createNewFile();

            }

           long startTime = System.currentTimeMillis();

        FileInputStream fis = null;

        FileOutputStream fos = null;

        FileChannel inChannel = null;  //输入通道

        FileChannel outchannel = null; //输出通道

        try {

            fis = new FileInputStream(srcFile);

            fos = new FileOutputStream(destFile);

            inChannel = fis.getChannel();

            outchannel = fos.getChannel();

            int length = -1;

            //新建buf,处于写模式

            ByteBufferbuf = ByteBuffer.allocate(1024);

            //从输入通道读取到buf

            while ((length = inChannel.read(buf)) != -1) {

               //buf第一次模式切换:翻转buf,从写模式变成读模式

                buf.flip();

                int outlength = 0;

                //buf写入输出的通道

                while ((outlength = outchannel.write(buf)) != 0) {

                    System.out.println("写入的字节数:" + outlength);

                }

                //buf第二次模式切换:清除buf,变成写模式

                buf.clear();

            }

            //强制刷新到磁盘

            outchannel.force(true);

        } finally {

             //关闭所有的可关闭对象

             IOUtil.closeQuietly(outchannel);

             IOUtil.closeQuietly(fos);

             IOUtil.closeQuietly(inChannel);

             IOUtil.closeQuietly(fis);

          }

        long endTime = System.currentTimeMillis();

        Logger.info("base复制毫秒数:" + (endTime - startTime));

    } catch (IOException e) {

              e.printStackTrace();

    }

}

除了FileChannel的通道操作外,还需要注意代码执行过程中隐藏的ByteBuffer的模式切换。新建的ByteBuffer在写模式时才可作为inChannel.read(ByteBuffer)方法的参数,inChannel.read()方法将从通道inChannel读到的数据写入ByteBuffer。然后,调用缓冲区的flip方法,将ByteBuffer从写模式切换成读模式才能作为outchannel.write(ByteBuffer)方法的参数,以便从ByteBuffer读取数据,最终写入outchannel(输出通道)。

完成一次复制之后,在进入下一次复制前还要进行一次缓冲区的模式切换。此时,需要通过clear方法将Buffer切换成写模式才能进入下一次的复制。所以,在示例代码中,每一轮外层的while循环都需要两次ByteBuffer模式切换:第一次模式切换时翻转buf,变成读模式;第二次模式切换时清除buf,变成写模式。

上面示例代码的主要目的在于演示文件通道以及字节缓冲区的使用。作为文件复制的程序来说,以上实战代码的效率不是最高的。更高效的文件复制可以调用文件通道的transferFrom()方法。具体的代码可以参见源代码工程中的FileNIOFastCopyDemo类,完整源文件的路径为cn.edu.bbc.computer.iodemo.fileDemos.FileNIOFastCopyDemo

请大家在随书源码工程中自行运行和学习以上代码,这里不再赘述。

8.4.3 SocketChannel

NIO中,涉及网络连接的通道有两个:一个是SocketChannel,负责连接的数据传输;另一个是ServerSocketChannel,负责连接的监听。其中,NIO中的SocketChannel传输通道与OIO中的Socket类对应,NIO中的ServerSocketChannel监听通道对应于OIO中的ServerSocket类。

ServerSocketChannel仅应用于服务端,而SocketChannel同时处于服务端和客户端。所以,对于一个连接,两端都有一个负责传输的SocketChannel

无论是ServerSocketChannel还是SocketChannel,都支持阻塞和非阻塞两种模式。如何进行模式的设置呢?调用configureBlocking()方法,具体如下:

1socketChannel.configureBlocking(false)设置为非阻塞模式。

2socketChannel.configureBlocking(true)设置为阻塞模式。

在阻塞模式下,SocketChannel的连接、读、写操作都是同步阻塞式的,在效率上与Java OIO面向流的阻塞式读写操作相同。因此,在这里不介绍阻塞模式下通道的具体操作。在非阻塞模式下,通道的操作是异步、高效的,这也是相对于传统OIO的优势所在。下面详细介绍在非阻塞模式下通道的获取、读写和关闭等操作。

1. 获取SocketChannel传输通道

在客户端,先通过SocketChannel静态方法open()获得一个套接字传输通道,然后将socket设置为非阻塞模式,最后通过connect()实例方法对服务器的IP和端口发起连接。

//获得一个套接字传输通道

SocketChannel socketChannel = SocketChannel.open();

//设置为非阻塞模式

socketChannel.configureBlocking(false);

//对服务器的IP和端口发起连接

socketChannel.connect(new InetSocketAddress("127.0.0.1"80));

在非阻塞情况下,与服务器的连接可能还没有真正建立,socketChannel.connect()方法就返回了,因此需要不断地自旋,检查当前是否连接到了主机:

while(! socketChannel.finishConnect() ){

    //不断地自旋、等待,或者做一些其他的事情

}

在服务端,如何获取与客户端对应的传输套接字呢?

在连接建立的事件到来时,服务端的ServerSocketChannel能成功地查询出这个新连接事件,并且通过调用服务端ServerSocketChannel监听套接字的accept()方法来获取新连接的套接字通道:

//新连接事件到来,首先通过事件获取服务器监听通道

ServerSocketChannel server = (ServerSocketChannel) key.channel();

//获取新连接的套接字通道

SocketChannel socketChannel = server.accept();

//设置为非阻塞模式

socketChannel.configureBlocking(false);

 

说明

NIO套接字通道主要用于非阻塞的传输场景。所以,基本上都需要调用通道的configureBlocking(false)方法,将通道从阻塞模式切换为非阻塞模式。

2. 读取SocketChannel传输通道

SocketChannel传输通道可读时,可以从SocketChannel读取数据,具体方法与前面的文件通道读取方法是相同的。调用read()方法,将数据读入缓冲区ByteBuffer

ByteBufferbuf = ByteBuffer.allocate(1024);

int bytesRead = socketChannel.read(buf);

在读取时,因为是异步的,所以我们必须检查read()的返回值,以便判断当前是否读取到了数据。read()方法的返回值是读取的字节数,如果是-1,那么表示读取到对方的输出结束标志,即对方已经输出结束,准备关闭连接。实际上,通过read()方法读数据本身是很简单的,比较困难的是在非阻塞模式下如何知道通道何时是可读的。这需要用到NIO的新组件——Selector通道选择器,稍后介绍它。

8. 写入SocketChannel传输通道

和前面把数据写入FileChannel一样,大部分应用场景都会调用通道的int write(ByteBufferbuf)方法。

//写入前需要读取缓冲区,要求ByteBuffer是读模式

buffer.flip();

socketChannel.write(buffer);

4. 关闭SocketChannel传输通道

在关闭SocketChannel传输通道前,如果传输通道用来写入数据,则建议调用一次shutdownOutput()终止输出方法,向对方发送一个输出的结束标志(-1)。然后调用socketChannel.close()方法,关闭套接字连接。

//调用终止输出方法,向对方发送一个输出的结束标志

socketChannel.shutdownOutput();

//关闭套接字连接

IOUtil.closeQuietly(socketChannel);

8.4.4 使用SocketChannel发送文件的实战案例

下面的实战案例是使用FileChannel读取本地文件内容,然后在客户端使用SocketChannel把文件信息和文件内容发送到服务器。客户端的完整代码如下:

package cn.edu.bbc.computer.iodemo.socketDemos;

//…

public class NioSendClient {

      private Charset charset = Charset.forName("UTF-8");

      /**

      * 向服务端传输文件

      */

    public void sendFile()

    {

        try

        {

            String sourcePath = NioDemoConfig.SOCKET_SEND_FILE;

            String srcPath = IOUtil.getResourcePath(sourcePath);

            Logger.debug("srcPath=" + srcPath);

 

            String destFile = NioDemoConfig.SOCKET_RECEIVE_FILE;

            Logger.debug("destFile=" + destFile);

 

            File file = new File(srcPath);

            if (!file.exists())

            {

                Logger.debug("文件不存在");

                return;

            }

            FileChannel fileChannel =new FileInputStream(file).getChannel();

 

            SocketChannel socketChannel =SocketChannel.open();

            socketChannel.socket().connect(new InetSocketAddress("127.0.0.1",18899));

            socketChannel.configureBlocking(false);

            Logger.debug("Client 成功连接服务端");

 

            while (!socketChannel.finishConnect())

            {

                //不断地自旋、等待,或者做一些其他的事情

            }

            //发送文件名称和长度

            ByteBuffer buffer =sengFileNameAndLength(destFile, file, socketChannel);

                                

            //发送文件内容

            int length =sendContent(file, fileChannel, socketChannel, buffer);

 

            if (length == -1)

            {

                IOUtil.closeQuietly(fileChannel);

                socketChannel.shutdownOutput();

                IOUtil.closeQuietly(socketChannel);

            }

            Logger.debug("======== 文件传输成功 ========");

        } catch (Exception e)

        {

            e.printStackTrace();

        }

    }

        

    //方法:发送文件内容

    public int sendContent(File file, FileChannel fileChannel,SocketChannel socketChannel,

                           ByteBuffer buffer) throws IOException

    {

        //发送文件内容

        Logger.debug("开始传输文件");

        int length = 0;

        long progress = 0;

        while ((length = fileChannel.read(buffer)) > 0)

        {

            buffer.flip();

            socketChannel.write(buffer);

            buffer.clear();

            progress += length;

          Logger.debug("| " + (100 * progress / file.length()) + "% |");

        }

        return length;

    }

        

    //方法:发送文件名称和长度

    public ByteBuffer sengFileNameAndLength(String destFile,File file,

                                        SocketChannel socketChannel) throws IOException

    {

        //发送文件名称

        ByteBuffer fileNameByteBuffer = charset.encode(destFile);

 

        ByteBuffer buffer =ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);

        //发送文件名称长度

        int fileNameLen = fileNameByteBuffer.capacity();

        buffer.putInt(fileNameLen);

        buffer.flip();

        socketChannel.write(buffer);

        buffer.clear();

        Logger.info("Client 文件名称长度发送完成:", fileNameLen);

 

        //发送文件名称

        socketChannel.write(fileNameByteBuffer);

        Logger.info("Client 文件名称发送完成:", destFile);

        //发送文件长度

        buffer.putLong(file.length());

        buffer.flip();

        socketChannel.write(buffer);

        buffer.clear();

        Logger.info("Client 文件长度发送完成:", file.length());

        return buffer;

    }

}

以上代码中,文件发送过程是:首先发送文件名称(不带路径)和文件长度,然后发送文件内容。代码中的配置项(如服务器的IP、服务端口、待发送的源文件名称(带路径)、远程的目标文件名称等配置信息)都是从system.properties配置文件中读取的,通过自定义的NioDemoConfig配置类来完成配置。

在运行以上客户端的程序之前,需要先运行服务端的程序。服务端的类与客户端的源代码在同一个包下,类名为NioReceiveServer,具体参见源代码工程,我们稍后再详细介绍这个类。

8.4.5 DatagramChannel

Java中使用UDP传输数据比TCP更加简单。和socketTCP不同,UDP不是面向连接的协议。使用UDP时,只要知道服务器的IP和端口就可以直接向对方发送数据。在Java NIO中,使用DatagramChannels处理UDP的数据传输。

1. 获取DatagramChannel

获取数据报通道的方式很简单,调用DatagramChannel类的open()静态方法即可。然后调用configureBlocking(false)方法,设置成非阻塞模式。

//获取DatagramChannel

DatagramChannel channel = DatagramChannel.open();

//设置为非阻塞模式

datagramChannel.configureBlocking(false);

如果需要接收数据,还需要调用bind()方法绑定一个数据报的监听端口,具体如下:

//调用bind()方法绑定一个数据报的监听端口

channel.socket().bind(new InetSocketAddress(18080));

2. DatagramChannel读取数据

DatagramChannel通道可读时,可以从DatagramChannel读取数据。和前面的SocketChannel读取方式不同,这里不调用read()方法,而是调用receive(ByteBufferbuf)方法将数据从DatagramChannel读入,再写入ByteBuffer缓冲区中。

//创建缓冲区

ByteBuffer buf = ByteBuffer.allocate(1024);

//DatagramChannel读入,再写入ByteBuffer缓冲区

SocketAddress clientAddr= datagramChannel.receive(buf);

通道读取receive(ByteBufferbuf)方法虽然读取了数据到buf缓冲区,但是其返回值是SocketAddress类型,表示返回发送端的连接地址(包括IP和端口)。通过receive方法读取数据非常简单,但是在非阻塞模式下如何知道DatagramChannel通道何时是可读的呢?和SocketChannel一样,同样需要用到NIO的新组件——Selector通道选择器。

8. 写入DatagramChannel

DatagramChannel发送数据,和向SocketChannel通道发送数据的方法是不同的。这里不是调用write()方法,而是调用send()方法。示例代码如下:

//把缓冲区翻转为读模式

buffer.flip();

//调用send()方法,把数据发送到目标IP+端口

dChannel.send(buffer,  new InetSocketAddress("127.0.0.1",18899));

//清空缓冲区,切换到写模式

buffer.clear();

由于UDP是面向非连接的协议,因此在调用send()方法发送数据时需要指定接收方的地址(IP和端口)。

4. 关闭DatagramChannel

这个比较简单,直接调用close()方法即可关闭数据报通道。

//简单关闭即可

dChannel.close();

8.4.6 使用DatagramChannel发送数据的实战案例

下面是一个使用DatagramChannel发送数据的客户端示例程序,功能是获取用户的输入数据,通过DatagramChannel将数据发送到远程的服务器。客户端的完整程序代码如下:

package cn.edu.bbc.computer.iodemo.udpDemos;

//…

public class UDPClient {

    public void send() throws IOException {

        //获取DatagramChannel

        DatagramChannel dChannel = DatagramChannel.open();

        //设置为非阻塞

        dChannel.configureBlocking(false);

        ByteBuffer buffer =

                                ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);

        Scanner scanner = new Scanner(System.in);

        Print.tcfo("UDP客户端启动成功!");

        Print.tcfo("请输入发送内容:");

        while (scanner.hasNext()) {

            String next = scanner.next();

            buffer.put((Dateutil.getNow() + " >>" + next).getBytes());

            buffer.flip();

            //通过DatagramChannel发送数据

            dChannel.send(buffer,new InetSocketAddress("127.0.0.1",18899));

            buffer.clear();

        }

        //关闭DatagramChannel

        dChannel.close();

    }

    public static void main(String[] args) throws IOException {

        new UDPClient().send();

    }

}

从示例程序可以看出,在客户端使用DatagramChannel发送数据比在客户端使用SocketChannel发送数据要简单得多。

接下来看看在服务端应该如何使用DatagramChannel接收数据。

下面给出服务端通过DatagramChannel接收数据的程序代码。大家目前不一定看得懂,因为代码中用到了Selector

服务端是通过DatagramChannel绑定一个服务器地址(IP+端口),接收客户端发送过来的UDP数据报。服务端的完整代码如下:

package cn.edu.bbc.computer.iodemo.udpDemos;

//…

public class UDPServer {

    public void receive() throws IOException {

        //获取DatagramChannel

        DatagramChannel datagramChannel = DatagramChannel.open();

        //设置为非阻塞模式

        datagramChannel.configureBlocking(false);

        //绑定监听地址

        datagramChannel.bind(new InetSocketAddress("127.0.0.1",18899));

        Print.tcfo("UDP服务器启动成功!");

       //开启一个通道选择器

       Selector selector = Selector.open();

       //将通道注册到选择器

       datagramChannel.register(selector, SelectionKey.OP_READ);

       //通过选择器查询IO事件

       while (selector.select() > 0) {

           Iterator<SelectionKey> iterator =

                       selector.selectedKeys().iterator();

           ByteBuffer buffer =

              ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);

                        

           //迭代IO事件

           while (iterator.hasNext()) {

               SelectionKeyselectionKey = iterator.next();

               //可读事件,有数据到来

               if (selectionKey.isReadable()) {

                    //读取DatagramChannel数据

                    SocketAddress client = datagramChannel.receive(buffer);

                    buffer.flip();

                    Print.tcfo(new String(buffer.array(), 0, buffer.limit()));

                    buffer.clear();

               }

           }

           iterator.remove();

       }

       //关闭选择器和通道

       selector.close();

       datagramChannel.close();

    }

    public static void main(String[] args) throws IOException {

        new UDPServer().receive();

    }

}

在服务端,首先调用了bind()方法绑定DatagramChannel的监听端口。当数据到来时调用了receive()方法,从DatagramChannel接收数据后写入ByteBuffer缓冲区中。

在服务端代码中,为了监控数据的到来,使用了Selector。什么是Selector?如何使用Selector呢?请看下一节。

 

8.5 详解NIO Selector

Java NIO的三大核心组件是Channel(通道)、Buffer(缓冲区)、Selector(选择器)。其中,通道和缓冲区的联系比较密切:数据总是从通道读到缓冲区内,或者从缓冲区写入通道中。

前面两个组件已经介绍完毕,下面迎来最后一个非常重要的角色——选择器。

8.5.1 选择器与注册

选择器是什么?选择器和通道的关系又是什么?

简单地说,选择器的使命是完成IO的多路复用,其主要工作是通道的注册、监听、事件查询。一个通道代表一条连接通路,通过选择器可以同时监控多个通道的IO(输入输出)状况。选择器和通道的关系是监控和被监控的关系。

选择器提供了独特的API方法,能够选出(select)所监控的通道已经发生了哪些IO事件,包括读写就绪的IO操作事件。

NIO编程中,一般是一个单线程处理一个选择器,一个选择器可以监控很多通道。所以,通过选择器,一个单线程可以处理数百、数千、数万甚至更多的通道。在极端情况下(数万个连接),只用一个线程就可以处理所有的通道,这样会大量地减少线程之间上下文切换的开销。

通道和选择器之间的关联通过register(注册)的方式完成。调用通道的Channel.register(Selector selint ops)方法,可以将通道实例注册到一个选择器中。register方法有两个参数:第一个参数指定通道注册到的选择器实例;第二个参数指定选择器要监控的IO事件类型。

可供选择器监控的通道IO事件类型包括以下四种:

1)可读:SelectionKey.OP_READ

2)可写:SelectionKey.OP_WRITE

3)连接:SelectionKey.OP_CONNECT

4)接收:SelectionKey.OP_ACCEPT

以上事件类型常量定义在SelectionKey类中。如果选择器要监控通道的多种事件,可以用“按位或”运算符来实现。例如,同时监控可读和可写IO事件:

//监控通道的多种事件,用按位或运算符来实现

int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE ;

什么是IO事件?

这个概念容易混淆,这里特别说明一下。这里的IO事件不是对通道的IO操作,而是通道处于某个IO操作的就绪状态,表示通道具备执行某个IO操作的条件。例如,某个SocketChannel传输通道如果完成了和对端的三次握手过程,就会发生“连接就绪”(OP_CONNECT)事件;某个ServerSocketChannel服务器连接监听通道,在监听到一个新连接到来时,则会发生“接收就绪”(OP_ACCEPT)事件;一个SocketChannel通道有数据可读,就会发生“读就绪”(OP_READ)事件;一个SocketChannel通道等待数据写入,就会发生“写就绪”(OP_WRITE)事件。

说明

socket连接事件的核心原理和TCP连接的建立过程有关。关于TCP的核心原理和连接建立时的三次握手、四次挥手知识,请参阅本书后面有关TCP原理的内容。

8.5.2 SelectableChannel

并不是所有的通道都是可以被选择器监控或选择的。例如,FileChannel就不能被选择器复用。判断一个通道能否被选择器监控或选择有一个前提:判断它是否继承了抽象类SelectableChannel(可选择通道),如果是,就可以被选择,否则不能被选择。

简单地说,一个通道若能被选择,则必须继承SelectableChannel类。

SelectableChannel类是何方神圣呢?它提供了实现通道可选择性所需要的公共方法。Java NIO中所有网络连接socket通道都继承了SelectableChannel类,都是可选择的。FileChannel并没有继承SelectableChannel,因此不是可选择通道。

8.5.3 SelectionKey

通道和选择器的监控关系注册成功后就可以选择就绪事件,具体的选择工作可调用Selectorselect()方法来完成。通过select()方法,选择器可以不断地选择通道中所发生操作的就绪状态,返回注册过的那些感兴趣的IO事件。换句话说,一旦在通道中发生了某些IO事件(就绪状态达成),并且是在选择器中注册过的IO事件,就会被选择器选中,并放入SelectionKey(选择键)的集合中。

SelectionKey是什么呢?简单地说,SelectionKey就是那些被选择器选中的IO事件。前面讲到,一个IO事件发生(就绪状态达成)后,如果之前在选择器中注册过,就会被选择器选中,并放入SelectionKey中;如果之前没有注册过,那么即使发生了IO事件,也不会被选择器选中。SelectionKeyIO的关系可以简单地理解为SelectionKey就是被选中了的IO事件。

在实际编程时,SelectionKey的功能是很强大的。通过SelectionKey,不仅可以获得通道的IO事件类型(比如SelectionKey.OP_READ),还可以获得发生IO事件所在的通道。另外,还可以获得选择器实例。

8.5.4 选择器使用流程

选择器的使用主要有以下三步:

1)获取选择器实例。选择器实例是通过调用静态工厂方法open()来获取的,具体如下:

//调用静态工厂方法open()来获取Selector实例

Selector selector = Selector.open();

Selector的类方法open()的内部是向选择器SPI发出请求,通过默认的SelectorProvider(选择器提供者)对象获取一个新的选择器实例。Java中的SPIService Provider Interface,服务提供者接口)是一种可以扩展的服务提供和发现机制。Java通过SPI的方式提供选择器的默认实现版本。也就是说,其他的服务提供者可以通过SPI的方式提供定制化版本的选择器的动态替换或者扩展。

2)将通道注册到选择器实例。要实现选择器管理通道,需要将通道注册到相应的选择器上,简单的示例代码如下:

//获取通道

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

//设置为非阻塞

serverSocketChannel.configureBlocking(false);

//绑定连接

serverSocketChannel.bind(new InetSocketAddress(1314));

//将通道注册到选择器上,并指定监听事件为接收连接

serverSocketChannel.register(selectorSelectionKey.OP_ACCEPT);

上面通过调用通道的register()方法将ServerSocketChannel注册到了一个选择器上。当然,在注册之前,需要准备好通道。

这里需要注意:注册到选择器的通道必须处于非阻塞模式下,否则将抛出IllegalBlockingModeException异常。这意味着,FileChannel不能与选择器一起使用,因为FileChannel只有阻塞模式,不能切换到非阻塞模式;而socket相关的所有通道都可以。其次,一个通道并不一定支持所有的四种IO事件。例如,服务器监听通道ServerSocketChannel仅支持Accept(接收到新连接)IO事件,而传输通道SocketChannel则不同,它不支持Accept类型的IO事件。

如何判断通道支持哪些事件呢?可以在注册之前通过通道的validOps()方法来获取该通道支持的所有IO事件集合。

3)选出感兴趣的IO就绪事件(选择键集合)。通过Selectorselect()方法,选出已经注册的、已经就绪的IO事件,并且保存到SelectionKey集合中。SelectionKey集合保存在选择器实例内部,其元素为SelectionKey类型实例。调用选择器的selectedKeys()方法,可以取得选择键集合。

接下来,迭代集合的每一个选择键,根据具体IO事件类型执行对应的业务操作。大致的处理流程如下:

//轮询,选择感兴趣的IO就绪事件(选择键集合)

while (selector.select() > 0) {

        Set selectedKeys = selector.selectedKeys();

        Iterator keyIterator = selectedKeys.iterator();

        while(keyIterator.hasNext()) {

                SelectionKey key = keyIterator.next();

   //根据具体的IO事件类型执行对应的业务操作

                if(key.isAcceptable()) {

                  //IO事件:ServerSocketChannel服务器监听通道有新连接

                } else if (key.isConnectable()) {

                  //IO事件:传输通道连接成功

                } else if (key.isReadable()) {

                  //IO事件:传输通道可读

                } else if (key.isWritable()) {

                  //IO事件:传输通道可写

                }

                //处理完成后,移除选择键

                keyIterator.remove();

        }

}

处理完成后,需要将选择键从SelectionKey集合中移除,以防止下一次循环时被重复处理。SelectionKey集合不能添加元素,如果试图向SelectionKey中添加元素,则将抛出java.lang.UnsupportedOperationException异常。

用于选择就绪的IO事件的select()方法有多个重载的实现版本,具体如下:

1select():阻塞调用,直到至少有一个通道发生了注册的IO事件。

2select(long timeout):和select()一样,但最长阻塞时间为timeout指定的毫秒数。

3selectNow():非阻塞,不管有没有IO事件都会立刻返回。

select()方法的返回值是整数类型(int),表示发生了IO事件的数量,即从上一次select到这一次select之间有多少通道发生了IO事件,更加准确地说是发生了选择器感兴趣(注册过)的IO事件数。

8.5.5 使用NIO实现Discard服务器的实战案例

Discard服务器的功能很简单:仅读取客户端通道的输入数据,读取完成后直接关闭客户端通道,并且直接抛弃掉(Discard)读取到的数据。Discard服务器足够简单明了,作为第一个学习NIO的通信实例比较有参考价值。

下面的Discard服务器代码将选择器使用步骤进行了进一步细化:

package cn.edu.bbc.computer.iodemo.NioDiscard;

//…

public class NioDiscardServer {

    public static void startServer() throws IOException {

        //1.获取选择器

        Selector selector = Selector.open();

        //2.获取通道

        ServerSocketChannel serverSocketChannel =ServerSocketChannel.open();

        //8.设置为非阻塞

        serverSocketChannel.configureBlocking(false);

        //4.绑定连接

        serverSocketChannel.bind(newInetSocketAddress(18899));

        Logger.info("服务器启动成功");

        //5.将通道注册的接收新连接”IO事件注册到选择器上

        serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

        //6.轮询感兴趣的IO就绪事件(选择键集合)

        while (selector.select() > 0) {

            //7.获取选择键集合

            Iterator<SelectionKey> selectedKeys =selector.selectedKeys().iterator();

            while (selectedKeys.hasNext()) {

                //8.获取单个的选择键,并处理

                SelectionKey selectedKey = selectedKeys.next();

                //9.判断key是具体的什么事件

                if (selectedKey.isAcceptable()) {

                                

                    //10.若选择键的IO事件是连接就绪,就获取客户端连接

                    SocketChannel socketChannel =serverSocketChannel.accept();

                    //11.将新连接切换为非阻塞模式

                    socketChannel.configureBlocking(false);

                    //12.将新连接的通道的可读事件注册到选择器上

                    socketChannel.register(selector,SelectionKey.OP_READ);

                                        

                } else if (selectedKey.isReadable()) {

                                

                    //18.若选择键的IO事件是可读,则读取数据

                    SocketChannelsocketChannel =(SocketChannel) selectedKey.channel();

                                        

                    //14.读取数据,然后丢弃

                    ByteBufferbyteBuffer = ByteBuffer.allocate(1024);

                    int length = 0;

                    while ((length =socketChannel.read(byteBuffer)) >0)

                    {

                        byteBuffer.flip();

                 Logger.info(new String(byteBuffer.array(), 0, length));

                        byteBuffer.clear();

                    }

                    socketChannel.close();

                }

                //15.移除选择键

                selectedKeys.remove();

            }

        }

        //16.关闭连接

        serverSocketChannel.close();

    }

    public static void main(String[] args) throws IOException {

        startServer();

    }

}

实现DiscardServer共分为16步,其中第7~15步是循环执行的,不断查询,将感兴趣的IO事件选择到选择键集合中,然后通过selector.selectedKeys()获取该选择键集合,并且进行迭代处理。在事件处理过程中,对于新建立的socketChannel客户端传输通道,也要注册到同一个选择器上,这样就能使用同一个选择线程不断地对所有的注册通道进行选择键的查询。

DiscardServer程序中,涉及两次选择器注册:一次是注册serverChannel(服务器通道);另一次是注册接收到的socketChannel客户端传输通道。serverChannel所注册的是新连接的IO事件SelectionKey.OP_ACCEPTsocketChannel所注册的是可读IO事件SelectionKey.OP_READ

注册完成后如果有事件发生,则DiscardServer在对选择键进行处理时先判断类型,然后进行相应的处理:

1)如果是SelectionKey.OP_ACCEPT新连接事件类型,代表serverChannel接收到新的客户端连接,发生了新连接事件,则通过服务器通道的accept方法获取新的socketChannel传输通道,并且将新通道注册到选择器。

2)如果是SelectionKey.OP_READ可读事件类型,代表某个客户端通道有数据可读,则读取选择键中socketChannel传输通道的数据,进行业务处理,这里是直接丢弃数据。

客户端首先建立到服务器的连接,发送一些简单的数据,然后直接关闭连接。客户端的DiscardClient代码更加简单,代码如下:

package cn.edu.bbc.computer.iodemo.NioDiscard;

//…

public class NioDiscardClient {

    public static void startClient() throws IOException {

        InetSocketAddress address =new InetSocketAddress("127.0.0.1",18899);

        //1.获取通道

        SocketChannel socketChannel = SocketChannel.open(address);

        //2.切换成非阻塞模式

        socketChannel.configureBlocking(false);

        //不断地自旋、等待连接完成,或者做一些其他的事情

        while (!socketChannel.finishConnect()) {

        }

        Logger.info("客户端连接成功");

        //8.分配指定大小的缓冲区

        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        byteBuffer.put("hello world".getBytes());

        byteBuffer.flip();

        //发送到服务器

        socketChannel.write(byteBuffer);

        socketChannel.shutdownOutput();

        socketChannel.close();

    }

    public static void main(String[] args) throws IOException {

        startClient();

    }

}

说明

如果需要执行整个Discard演示程序,首先要执行前面的NioDiscardServer服务端程序,然后才能执行本客户端程序。

通过Discard服务器的开发实战,大家应该对NIO Selector的使用流程了解得非常清楚了。下面来看一个稍微复杂一点的案例:在服务端接收文件和内容。

8.5.6 使用SocketChannel在服务端接收文件的实战案例

本示例演示文件的接收,是服务端的程序,和前面介绍的发送文件的SocketChannel客户端程序是相互配合使用的。由于在服务端需要用到选择器,因此直到完成了选择器的知识介绍之后才开始介绍NIO文件传输的socket服务端程序。服务端接收文件的示例代码如下:

package cn.edu.bbc.computer.iodemo.socketDemos;

//省略import

/**

 * 文件传输Server

 * Created by oliver

 */

public class NioReceiveServer

{

 

    //接收文件路径

    private static final String RECEIVE_PATH =NioDemoConfig.SOCKET_RECEIVE_PATH;

 

    private Charset charset = Charset.forName("UTF-8");

 

    /**

     * 服务端保存的客户端对象,对应一个客户端文件

     */

    static class Client

    {

        //文件名称

        String fileName;

        //长度

        long fileLength;

 

        //开始传输的时间

        long startTime;

 

        //客户端的地址

        InetSocketAddress remoteAddress;

 

        //输出的文件通道

        FileChannel outChannel;

 

        //接收长度

        long receiveLength;

 

        public boolean isFinished()

        {

            return receiveLength >= fileLength;

        }

    }

 

    private ByteBuffer buffer= ByteBuffer.allocate(NioDemoConfig.SERVER_BUFFER_SIZE);

 

    //使用Map保存每个客户端传输

    //OP_READ通道可读时,根据Channel找到对应的对象

    Map<SelectableChannel, Client> clientMap =new HashMap<SelectableChannel, Client>();

    public void startServer() throws IOException

    {

        //1.获取选择器

        Selector selector = Selector.open();

 

        //2.获取通道

        ServerSocketChannel serverChannel =ServerSocketChannel.open();

        ServerSocket serverSocket = serverChannel.socket();

 

        //8.设置为非阻塞

        serverChannel.configureBlocking(false);

        //4.绑定连接

        InetSocketAddress address= new InetSocketAddress(18899);

        serverSocket.bind(address);

        //5.将通道注册到选择器上,并且注册的IO事件为接收新连接

        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        Print.tcfo("serverChannel is listening…");

        //6.轮询感兴趣的I/O就绪事件(选择键集合)

        while (selector.select() > 0)

        {

            //7.获取选择键集合

            Iterator<SelectionKey> it = selector.selectedKeys().iterator();

            while (it.hasNext())

            {

                //8.获取单个的选择键,并处理

                SelectionKey key = it.next();

 

                //9.判断key是具体的什么事件,是否为新连接事件

                if (key.isAcceptable())

                {

                    //10.若接收的事件是新连接,则获取客户端新连接

                   ServerSocketChannel server = (ServerSocketChannel) key.channel();

                    SocketChannel socketChannel = server.accept();

                    if (socketChannel == null) continue;

                    //11.客户端新连接,切换为非阻塞模式

                    socketChannel.configureBlocking(false);

                    //12.将客户端新连接通道注册到Selector

                    SelectionKey selectionKey =socketChannel.register(selector, SelectionKey.OP_READ);

                    //余下为业务处理

                    Client client = new Client();

                    client.remoteAddress=(InetSocketAddress) socketChannel.getRemoteAddress();

                    clientMap.put(socketChannel, client);

              Logger.debug(socketChannel.getRemoteAddress() + "连接成功…");

 

                } else if (key.isReadable())

                {

                    processData(key);

                }

                //NIO的特点是只会累加,已选择键的集合不会删除

                //如果不删除,下一次又会被select()函数选中

                it.remove();

            }

        }

    }

 

    /**

     * 处理客户端传输过来的数据

     */

    private void processData(SelectionKey key) throws IOException

    {

        Client client = clientMap.get(key.channel());

 

        SocketChannel socketChannel = (SocketChannel) key.channel();

        int num = 0;

        try

        {

            buffer.clear();

            while ((num = socketChannel.read(buffer)) > 0)

            {

                buffer.flip();

                //客户端发送过来的,首先处理文件名

                if (null == client.fileName)

                {

 

                    if (buffer.capacity() < 4)

                    {

                        continue;

                    }

                    int fileNameLen = buffer.getInt();

                    byte[] fileNameBytes = new byte[fileNameLen];

                    buffer.get(fileNameBytes);

 

                    //文件名

                    String fileName = new String(fileNameBytes, charset);

 

                    File directory = new File(RECEIVE_PATH);

                    if (!directory.exists())

                    {

                        directory.mkdir();

                    }

                    Logger.info("NIO  传输目标dir", directory);

 

                    client.fileName = fileName;

                    String fullName = directory.getAbsolutePath() + File.separatorChar + fileName;

                    Logger.info("NIO  传输目标文件:", fullName);

 

                    File file = new File(fullName.trim());

 

                    if (!file.exists())

                    {

                        file.createNewFile();

                    }

                    FileChannel fileChannel = new FileOutputStream(file).getChannel();

                    client.outChannel = fileChannel;

 

                    if (buffer.capacity() < 8)

                    {

                        continue;

                    }

                    //文件长度

                    long fileLength = buffer.getLong();

                    client.fileLength = fileLength;

                    client.startTime = System.currentTimeMillis();

                    Logger.debug("NIO 传输开始:");

 

                    client.receiveLength += buffer.capacity();

                    if (buffer.capacity() > 0)

                    {

                        //写入文件

                        client.outChannel.write(buffer);

                    }

                    if (client.isFinished())

                    {

                        finished(key, client);

                    }

                    buffer.clear();

                }

                //客户端发送过来的,最后是文件内容

                else

                {

                    client.receiveLength += buffer.capacity();

                    //写入文件

                    client.outChannel.write(buffer);

                    if (client.isFinished())

                    {

                        finished(key, client);

                    }

                    buffer.clear();

                }

 

            }

            key.cancel();

        } catch (IOException e)

        {

            key.cancel();

            e.printStackTrace();

            return;

        }

        //调用close-1,到达末尾

        if (num == -1)

        {

            finished(key, client);

            buffer.clear();

        }

    }

 

    private void finished(SelectionKey key, Client client)

    {

        IOUtil.closeQuietly(client.outChannel);

        Logger.info("上传完毕");

        key.cancel();

        Logger.debug("文件接收成功,File Name" + client.fileName);

        Logger.debug(" Size" + IOUtil.getFormatFileSize(client.fileLength));

        long endTime = System.currentTimeMillis();

        Logger.debug("NIO IO 传输毫秒数:" + (endTime - client.startTime));

    }

 

 

    /**

     * 入口

     */

    public static void main(String[] args) throws Exception

    {

        NioReceiveServer server = new NioReceiveServer();

        server.startServer();

    }

}

客户端每次传输文件都会分为多次传输:首先传入文件名称,其次是文件大小,然后是文件内容。

对于每一个客户端socketChannel,创建一个客户端对象,用于保存客户端状态,分别保存文件名、文件大小和写入的目标文件通道outChannel

socketChannelClient对象之间是一对一的对应关系:建立连接时,在Map中以键-值对的形式保存Client实例,其中socketChannel作为键(Key),Client对象作为值(Value)。当socketChannel传输通道有数据可读时,通过选择键key.channel()方法取出IO事件所在的socketChannel通道,然后通过socketChannel通道从map中获取对应的Client对象。

接收到数据时,如果文件名为空,就先处理文件名称,并把文件名保存到Client对象,同时创建服务器上的目标文件;接下来读取到数据,说明接收到了文件大小,把文件大小保存到Client对象;接下来接收到数据,说明是文件内容,写入Client对象的outChannel文件通道中,直到数据读取完毕。

运行方式是先启动NioReceiveServer服务器程序,再启动前面介绍的客户端程序NioSendClient,完成文件的传输。

由于NIO传输是非阻塞、异步的,因此在传输过程中会出现“粘包”和“半包”问题。正因如此,无论是前面NIO文件传输实例还是Discard服务器程序,都会在传输过程中出现异常现象(偶现)。由于以上实例在生产过程中不会使用,仅仅是为了大家学习NIO的知识,所以没有为了解决“粘包”和“半包”问题而将代码编写得很复杂。

说明

在执行以上实例时,传输过程中会出现异常现象——部分内容传输出错。其实并不是程序问题,而是传输过程中发生了“粘包”和“半包”问题。后面的章节会专门介绍“粘包”和“半包”问题及其根本性的解决方案。

 


0 条 查看最新 评论

没有评论
暂时无法发表评论