十一章 DecoderEncoder核心组件

Netty从底层Java通道读取ByteBuf二进制数据,传入Netty通道的流水线,随后开始入站处理。在入站处理过程中,需要将ByteBuf二进制类型解码成Java POJO对象。这个解码过程可以通过NettyDecoder(解码器)去完成。

在出站处理过程中,业务处理后的结果(出站数据)需要从某个Java POJO对象编码为最终的ByteBuf二进制数据,然后通过底层Java通道发送到对端。在编码过程中,需要用到NettyEncoder(编码器)去完成数据的编码工作。

本章专门为大家解读Netty非常核心的组件:编码器和解码器。

11.1 Decoder原理与实战

什么是Netty的解码器呢?

首先,它是一个InBound入站处理器,负责处理入站数据

其次,它能将上一个Inbound入站处理器传过来的输入(Input)数据进行解码或者格式转换,然后发送到下一个Inbound入站处理器。

一个标准的解码器的职责为:将输入类型为ByteBuf的数据进行解码,输出一个一个的Java POJO对象。Netty内置了ByteToMessageDecoder解码器。

Netty中的解码器都是Inbound入站处理器类型,都直接或者间接地实现了入站处理的超级接口ChannelInboundHandler

11.1.1 ByteToMessageDecoder解码器处理流程

ByteToMessageDecoder是一个非常重要的解码器基类,是一个抽象类,实现了解码处理的基础逻辑和流程。ByteToMessageDecoder继承自ChannelInboundHandlerAdapter适配器,是一个入站处理器,用于完成从ByteBufJava POJO对象的解码功能。

ByteToMessageDecoder解码的流程大致如图11-1所示。首先,它将上一个传过来的输入到ByteBuf中的数据进行解码,解码出一个List<Object>对象列表;然后,迭代List<Object>列表,逐个将Java POJO对象传入下一个Inbound入站处理器。

11-1 ByteToMessageDecoder解码的流程

ByteToMessageDecoder是抽象类,不能以实例化方式创建对象。也就是说,直接通过ByteToMessageDecoder类并不能完成ByteBuf字节码到具体Java类型的解码,还得依赖于它的具体实现。

ByteToMessageDecoder的解码方法为decode(),是一个抽象方法。也就是说,对于decode()方法的具体解码过程,ByteToMessageDecoder没有具体的实现,如何将ByteBuf中的字节数据变成什么样的Object实例(包含多少个Object实例)需要子类去完成。所以,作为解码器的父类,ByteToMessageDecoder仅仅提供了一个整体框架:它会调用子类的decode()方法,完成具体的二进制字节解码,然后会获取子类解码之后的Object结果,放入自己内部的结果列表List<Object>中,最终父类负责将List<Object>中的元素一个一个地传递给下一个。从这个角度来说,ByteToMessageDecoder在设计上使用了模板模式(Template Pattern)。

ByteToMessageDecoder的子类要做的是将从入站ByteBuf解码出来的所有Object实例加入父类的List<Object>列表中。

实现一个解码器,首先要继承ByteToMessageDecoder抽象类,然后实现其基类的decode()抽象方法。总体来说,流程大致如下:

1)继承ByteToMessageDecoder抽象类。

2)实现基类的decode()抽象方法,将ByteBuf到目标POJO的解码逻辑写入此方法,以将ByteBuf中的二进制数据解码成一个一个的Java POJO对象。

3)解码完成后,需要将解码后的Java POJO对象放入decode()方法的List<Object>实参中,此实参是父类所传入的解码结果收集容器。

余下的工作都由父类ByteToMessageDecoder自动完成。在流水线的处理过程中,父类在执行完子类的解码后,会将List<Object>收集到的结果一个一个地传递到下一个Inbound入站处理器。

11.1.2 自定义Byte2IntegerDecoder整数解码器

下面是一个小小的ByteToMessageDecoder子类的实战案例:整数解码器。其功能是将ByteBuf中的字节解码成整数类型。

Byte2IntegerDecoder整数解码器的代码很简单,具体如下:

package cn.edu.bbc.computer.decoder;

//

public class Byte2IntegerDecoder extends ByteToMessageDecoder {

    @Override

    public void decode(ChannelHandlerContext ctx, ByteBuf in,List<Object> out) {

        while (in.readableBytes() >= 4) {

            int i = in.readInt();

            Logger.info("解码出一个整数: " + i);

            out.add(i);

        }

    }

}

上面实战案例程序中,decode()方法的逻辑大致如下:

首先,Byte2IntegerDecoder解码器继承自ByteToMessageDecoder

其次,在decode()方法中,通过ByteBufreadInt()实例方法从输入缓冲区读取整数,其作用是将二进制数据解码成一个一个的整数。

再次,将解码后的整数增加到decode()方法的List<Object>列表参数中。

最后,decode()方法不断地循环解码,并且不断地添加到List<Object>结果容器中。

前面反复讲到,decode()方法处理完成后,基类会继续后面的传递处理:将List<Object>结果列表中所得到的整数一个一个地传递到下一个Inbound入站处理器。

至此,一个简单的解码器就完成了。

如何使用这个自定义的Byte2IntegerDecoder解码器呢?首先,需要将其加入通道流水线中;其次,由于解码器的功能仅仅是完成ByteBuf的解码,不做其他业务处理,所以还需要编写一个业务处理器,用于在读取解码后的Java POJO对象之后完成具体的业务处理。

这里编写一个简单的配套处理器IntegerProcessHandler,用于处理Byte2IntegerDecoder解码之后的整数。其功能是:读取上一个的入站数据,把它转换成整数,并且输出到Console(控制台)上。配套处理器的代码如下:

package cn.edu.bbc.computer.decoder;

//

public class IntegerProcessHandler extends ChannelInboundHandlerAdapter {

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg){

        Integer integer = (Integer) msg;

        Logger.info("打印出一个整数: " + integer);

    }

}

至此,已经编写了解码处理器Byte2IntegerDecoder和配套处理器IntegerProcessHandler这两个自己的入站处理器:一个负责解码,一个负责模拟处理解码结果。

最终如何测试这两个入站处理器呢?使用EmbeddedChannel(嵌入式通道)编写一个测试用例,代码如下:

package cn.edu.bbc.computer.decoder;

//

public class Byte2IntegerDecoderTester {

    /**

     * 整数解码器的使用实例

     */

    @Test

    public void testByteToIntegerDecoder() {

        ChannelInitializer i= new ChannelInitializer<EmbeddedChannel>(){

            protected void initChannel(EmbeddedChannel ch) {

               ch.pipeline().addLast(new Byte2IntegerDecoder());

               ch.pipeline().addLast(new IntegerProcessHandler());

            }

        };

        EmbeddedChannel channel = new EmbeddedChannel(i);

        for (int j = 0; j < 100; j++) {

            ByteBuf buf = Unpooled.buffer();

            buf.writeInt(j);

            channel.writeInbound(buf);

        }

               //

    }

}

在测试用例中,新建了一个EmbeddedChannel实例,将Byte2IntegerDecoderIntegerProcessHandler加入通道的流水线上。

这里请注意先后次序:Byte2IntegerDecoder解码器在前,IntegerProcessHandler处理器在后。为什么呢?因为入站处理的次序为从前到后。

为了测试入站处理器,需要确保通道能接收到ByteBuf入站数据。这里调用writeInbound()方法,模拟入站数据的写入,向EmbeddedChannel写入100ByteBuf入站缓冲区,每一次写入仅仅包含一个整数。模拟入站数据会被流水线上的两个入站处理器所接收和处理。接着,这些入站的二进制字节被解码成一个一个的整数,逐个输出到控制台上。运行测试用例,部分输出结果如下:

//省略部分输出

[main|Byte2IntegerDecoder:decode]:解码出一个整数: 0

[main|IntegerProcessHandler:channelRead]:打印出一个整数: 0

[main|Byte2IntegerDecoder:decode]:解码出一个整数: 1

[main|IntegerProcessHandler:channelRead]:打印出一个整数: 1

[main|Byte2IntegerDecoder:decode]:解码出一个整数: 2

[main|IntegerProcessHandler:channelRead]:打印出一个整数: 2

[main|Byte2IntegerDecoder:decode]:解码出一个整数: 3

[main|IntegerProcessHandler:channelRead]:打印出一个整数: 3

通过这个实例,大家对ByteToMessageDecoder基类以及如何动手去实现一个解码器应该比较了解了,甚至还可以仿照这个例子实现除了整数之外的Java基本数据类型(ShortCharLongFloatDouble等)的解码器。

最后说明一下:ByteToMessageDecoder传递给下一个的是解码之后的Java POJO对象,不是ByteBuf缓冲区。那么问题来了,ByteBuf缓冲区并没有发送到流水线的TailContext(尾部处理器),将由谁负责释放引用计数呢?其实,基类ByteToMessageDecoder会完成ByteBuf释放工作,它会调用ReferenceCountUtil.release(in)方法将之前的ByteBuf缓冲区的引用计数减1

这个ByteBuf先被释放了,如果在后面还需要用到,怎么办?可以在子类的decode()方法中调用一次ReferenceCountUtil.retain(in)来增加一次引用计数,不过在使用完成后要及时将增加的这次计数减去。

11.1.3 ReplayingDecoder解码器

使用上面的Byte2IntegerDecoder整数解码器会面临一个问题:需要对ByteBuf的长度进行检查,有足够的字节才能进行整数的读取。这种长度的判断是否可以由Netty来帮忙完成呢?答案是可以的,可以使用NettyReplayingDecoder类省去长度的判断。

ReplayingDecoder类是ByteToMessageDecoder的子类,作用是:

  1. 在读取ByteBuf缓冲区的数据之前,需要检查缓冲区是否有足够的字节。
  2. ByteBuf中有足够的字节,则会正常读取;反之,则会停止解码。

使用ReplayingDecoder基类改写上一个整数解码器,可以不进行长度检测。创建一个新的整数解码器,类名为Byte2IntegerReplayDecoder,代码如下:

package cn.edu.bbc.computer.decoder;

//

public class Byte2IntegerReplayDecoder extends ReplayingDecoder {

    @Override

    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {

        int i = in.readInt();

        Logger.info("解码出一个整数: " + i);

        out.add(i);

    }

}

通过这个示例程序,我们可以看到:继承ReplayingDecoder实现一个解码器,就不用编写长度判断的代码。ReplayingDecoder进行长度判断的原理很简单:内部定义一个新的二进制缓冲区类(类名为ReplayingDecoderBuffer),又对ByteBuf缓冲区进行装饰。该装饰器的特点是,在缓冲区真正读数据之前先进行长度的判断:如果长度合格,就读取数据;否则,抛出ReplayErrorReplayingDecoder捕获到ReplayError后会留着数据,等待下一次IO事件到来时再读取。

简单来讲,ReplayingDecoder对输入的ByteBuf进行了偷梁换柱,在将外部传入的ByteBuf缓冲区传给子类之前,换成了自己装饰过的ReplayingDecoderBuffer缓冲区。也就是说,在示例程序中,Byte2IntegerReplayDecoder中的decode()方法所得到的实参in的直接类型并不是原始的ByteBuf类型,而是ReplayingDecoderBuffer类型。

ReplayingDecoderBuffer类型首先是一个内部类,其次是继承了ByteBuf类型,包装了ByteBuf类型的大部分读取方法。ReplayingDecoderBufferByteBuf类型的读取方法做了什么样的功能增强呢?主要是进行二进制数据长度的判断,如果长度不足,就抛出异常。这个异常会反过来被ReplayingDecoder基类所捕获,将解码工作停掉。

实质上,ReplayingDecoder的作用远远不止于进行长度判断,它更重要的作用是用于分包传输的应用场景。

11.1.4 整数的分包解码器的实战案例

前面讲到,底层通信协议是分包传输的,一份数据可能分几个数据包到达对端。发送端出去的包在传输过程中会进行多次拆分和组装。接收端收到的包和发送端所发送的包不是一模一样的(见图11-2):在发送端发出4个字符串,Netty或者NIO接收端可能只接收到了3ByteBuf数据缓冲。

Java OIO流式传输中,程序若读不到完整的信息则会一直阻塞,而不会继续执行,也就不会出现图11-2所示的问题了。在JavaNIO(具有非阻塞性)中,保证一次性读取到完整的数据则成了一个大问题。

11-2 通道接收到的ByteBuf数据包和发送端发送的数据包不完全一致

那么,Netty通过什么样的解码器对图11-2中接收端的3ByteBuf缓冲数据进行解码,而后得到和发送端一模一样的4个字符串呢?理论上可以使用ReplayingDecoder来解决。在进行数据解析时,如果发现当前ByteBuf中所有可读的数据不够,那么ReplayingDecoder会一直等待,直到可读数据是足够的。这一切都是在ReplayingDecoder内部,通过与缓冲区装饰器ReplayingDecoderBuffer相互配合完成的。所以,图11-2展示的字符串错乱问题完全可以通过继承ReplayingDecoder基类实现自己的解码器来解决。

11-2中的问题是字符串传输过程中出现的,并且实现字符串的解码和纠正相对比较复杂。为了好懂,这里先介绍一个简单点的例子——整数序列解码,并且将它们两两一组进行相加,重点是,解码过程中需要保持发送时的次序。

要完成上述例子,需要用到ReplayingDecoder的一个很重要的属性——state成员属性。该成员属性的作用是保存当前解码器在解码过程中所处的阶段。在Netty源代码中,该属性的定义如下:

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {

    //省略不相关的代码

    //缓冲区装饰器

    private final ReplayingDecoderByteBuf replayable = new ReplayingDecoderByteBuf();

   //重要的成员属性,表示解码过程中所处的阶段,类型为泛型,默认为Object

    private S state;

    //默认的构造器,state值为空,没有用到该属性

    protected ReplayingDecoder() {

        this((Object)null);

    }

    //重载的构造器

    protected ReplayingDecoder(S initialState) {

        //初始化内部的ByteBuf缓冲区装饰器类

       this.replayable = new ReplayingDecoderByteBuf();

        //读指针检查点,默认为-1

        this.checkpoint = -1;

        //状态state的默认值为null

        this.state = initialState;

    }

    //省略不相关的方法

}

在上一小节定义的整数解码实例中,使用的是默认的无参数构造器,该构造器初始化state成员的值为null,也就是没有用到state属性。本小节将用到state成员属性。为什么呢?因为整数序列的解码工作不可能一次完成,要完成两个整数的提取并相加需要解码两次,每一次解码只能解码出一个整数,只有在第二个整数提取之后才能求和,整个解码工作才算完成。这里存在两个阶段,具体的阶段需要使用state来记录。

具体来说,完成两个整数的提取并求和的过程可以从业务上分成两个阶段。使用state属性来保存目前所处的阶段:如果是第一个阶段,则仅仅提取第一个整数,完成后进入第二个阶段;如果是第二个阶段,则不仅要提取第二个整数,提取后还需要计算相加的结果,并将相加的和作为解码结果输出。只有两个阶段全部完成才表示一次解码工作完成。

下面先基于ReplayingDecoder基础解码器编写一个整数相加的解码器:解码两个整数,并把这两个数据之和作为解码的结果。代码如下:

package cn.edu.bbc.computer.decoder;

//省略import

public class IntegerAddDecoder extends ReplayingDecoder<IntegerAddDecoder.PHASE>

{

                //自定义的状态枚举值,代表两个阶段

                enum PHASE

                {

                  PHASE_1,//第一个阶段,仅仅提取第一个整数,完成后进入第二个阶段

                  PHASE_2 //第二个阶段,提取第二个整数后,还需要计算相加的结果并输出

                }

                private int first;

                private int second;

                public IntegerAddDecoder()

                {

                        //在构造函数中,初始化父类的state属性为PHASE_1,表示第一个阶段

                        super(PHASE.PHASE_1);

                }

                @Override

                protected void decode(ChannelHandlerContext ctx,ByteBuf in,     List<Object> out)

throws Exception{

                        switch (state()) //判断当前的状态

                        {

                            //第一个阶段,仅仅提取第一个整数,完成后进入第二个阶段

                             case PHASE_1:

                                        //从装饰器ByteBuf 中读取数据

                                        first = in.readInt();

                                        //第一步解析成功,进入第二步,设置“state”为第二个阶段

                                      checkpoint(PHASE.PHASE_2);

                                        break;

 

                             //提取第二个整数后还需要计算相加的结果

                             //并将和作为解码的结果输出

                             case PHASE_2:

                                        second = in.readInt();

                                        Integer sum = first + second;

                                        out.add(sum);

                                        //进入下一轮解码的第一步,设置“state”为第一个阶段

                                        checkpoint(PHASE.PHASE_1);

                                        break;

                             default:

                                        break;

                             }

                }

}

IntegerAddDecoder类继承了ReplayingDecoder<IntegerAddDecoder.PHASE>,其后面的泛型实参为IntegerAddDecoder.PHASE自定义的状态类型,是一个枚举类型,用来作为泛型变量state的实际类型。该枚举值有两个常量:

1PHASE_1:表示第一个阶段,读取第一个整数。

2PHASE_2:表示第二个阶段,读取第二个整数,然后求和。

父类的成员变量state的值可能为PHASE_1或者PHASE_2,代表当前的阶段。state值需要在构造函数中进行初始化,在这里的子类构造函数中调用super(Status.PHASE_1)state初始化为第一个阶段。

IntegerAddDecoder类中,每一次decode()方法中的解码都有两个阶段:

1)第一个阶段,解码出前一个整数。

2)第二个阶段,解码出后一个整数,然后求和。

每一个阶段一完成就通过checkpoint(PHASE)方法(类似于state属性的setter()方法)把当前的state状态设置为新的PHASE枚举值。checkpoint()方法有两个作用:

1)设置state属性的值,更新一下当前的状态。

2)设置“读指针检查点”。

什么是ReplayingDecoder读指针呢?就是ReplayingDecoder提取二进制数据的ByteBuf缓冲区的readerIndex读指针检查点ReplayingDecoder类的一个重要成员,用于暂存内部ReplayingDecoderBuffer装饰器缓冲区的readerIndex,有点类似于mark。当读数据时,一旦缓冲区可读的二进制数据不够,缓冲区装饰器ReplayingDecoderBuffer在抛出ReplayError异常之前就会把readerIndex的值还原到之前通过checkpoint()方法设置的读指针检查点。在ReplayingDecoder下一次重新读取时,将会从读指针检查点开始读取。

回到IntegerAddDecoderdecode()方法,该方法的逻辑大致如下:

1)判断当前解码器的state阶段是Status.PHASE_1还是Status.PHASE_2,根据对应的阶段进行读取处理。

2)每一次读取完成之后要切换阶段并保持当前“读指针检查点”,以便于在可读数据不足之后进行读指针恢复。

通过上面的分析可以看出,IntegerAddDecoder与前面自定义的整数解码器不同,该解码器是有状态的,不能在不同的通道之间进行简单的共享。更进一步说,ReplayingDecoder类型及其所有的子类都需要保存状态信息,都不适合在不同的通道之间进行简单的共享。

至此,IntegerAddDecoder基本介绍完了。那么,如何使用IntegerAddDecoder解码器呢?具体的测试用例和前面Byte2IntegerDecoder的大致相同,由于篇幅的限制,这里不再赘述。大家可以在源代码包中执行对应的Byte2IntegerReplayDecoderTester测试用例。

11.1.5 字符串的分包解码器的实战案例

通过前面的整数分包传输,大家应该对ReplayingDecoder的分阶段解码有了完整的了解。现在来看一下字符串的分包传输。在原理上,字符串分包解码和整数分包解码是一样的,所不同的是:整数的长度是固定的,目前在Java中是4字节;字符串的长度是不固定的,是可变的。

如何获取字符串的长度信息呢?这是一个小小的难题,和程序所使用的具体传输协议是强相关的。一般来说,在Netty中进行字符串的传输可以采用普通的Head-Content内容传输协议。该协议的规则很简单:

1)在协议的Head部分放置字符串的字节长度,可以用一个整数类型来描述。

2)在协议的Content部分,放置字符串的字节数组。

在实际的传输过程中,一个Head-Content内容包在发送端会被编码成一个ByteBuf内容发送包,当到达接收端后可能被分成很多ByteBuf接收包。对于这些参差不齐的接收包,如何解码成最初的ByteBuf内容发送包来获得Head-Content内容呢?采用ReplayingDecoder解码器即可解决。

下面就是基于ReplayingDecoder实现自定义的字符串分包解码器的示例程序:

package cn.edu.bbc.computer.decoder;

//

public class StringReplayDecoder extends ReplayingDecoder<StringReplayDecoder.PHASE>{

    enum PHASE

    {

        PHASE_1, //第一个阶段:解码出字符串的长度

        PHASE_2  //第二个阶段:按照第一个阶段的字符串长度解码出字符串的内容

    }

 

    private int length;

    private byte[] inBytes;

    public StringReplayDecoder()

    {

        //在构造函数中,需要初始化父类的state属性为PHASE_1阶段

        super(PHASE.PHASE_1);

    }

    @Override

    protected void decode(ChannelHandlerContext ctx, ByteBuf in,List<Object> out) throws Exception

    {

        switch (state())

        {

            case PHASE_1:

                //第一步,从装饰器ByteBuf中读取字符串的长度

                length = in.readInt();

                inBytes = new byte[length];

                //进入第二步,读取内容

                //并设置“读指针检查点”为当前的readerIndex位置

                checkpoint(PHASE.PHASE_2);

                break;

            case PHASE_2:

                //第二步,从装饰器ByteBuf 中读取字符串的内容数组

                in.readBytes(inBytes, 0, length);

                out.add(new String(inBytes, "UTF-8"));

                //第二步解析成功,进入下一个字符串的解析

                //并设置“读指针检查点”为当前的readerIndex位置

                checkpoint(PHASE.PHASE_1);

                break;

            default:

                break;

        }

    }

}

StringReplayDecoder类中,每一次字符串解码分为两个步骤:

第一步,解码出一个字符串的长度。

第二步,按照第一个阶段的字符串长度解码出字符串的内容。

decode()方法中,每个阶段完成后都会通过checkpoint(Status)方法把当前的状态设置为新的Status值。

为了处理StringReplayDecoder解码后的字符串,这里编写一个简单的辅助性质的业务处理器。其功能是读取上一个的入站数据,把它转换成字符串,并输出到控制台上。新业务处理器的名称为StringProcessHandler,具体代码如下:

package cn.edu.bbc.computer.decoder;

//

public class StringProcessHandler extends ChannelInboundHandlerAdapter {

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg){

        String s = (String) msg;

        Logger.info("打印出一个字符串: " + s);

    }

}

至此,已经编写了StringReplayDecoderStringProcessHandler两个入站处理器:一个负责字符串解码,一个负责字符串输出。如何使用这两个入站处理器呢?编写一个测试用例,代码如下:

package cn.edu.bbc.computer.decoder;

//

public class StringReplayDecoderTester {

   static String content= "软件构件技术原理-Java版随书代码!";

    @Test

    public void testStringReplayDecoder() {

       ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {

            protected void initChannel(EmbeddedChannel ch) {

                ch.pipeline().addLast(new StringReplayDecoder());

                ch.pipeline().addLast(new StringProcessHandler());

            }

        };

        EmbeddedChannel channel = new EmbeddedChannel(i);

        //待发送字符串content的字节数组

        byte[] bytes =content.getBytes(Charset.forName("utf-8"));

        //循环发送100轮,每一轮可以理解为发送一个Head-Content报文

        for (int j = 0; j < 100; j++) {//发送100个包

            //每个包为随机1~3"软件构件技术原理-Java版随书代码!"

            int random = RandomUtil.randInMod(3);

            ByteBuf buf = Unpooled.buffer();

            //发送长度:字节数组长度*重复次数

            buf.writeInt(bytes.length * random);

            //重复拷贝content的字节数据到发送缓冲区

            for (int k = 0; k < random; k++) {

                buf.writeBytes(bytes);

            }

            //发送内容:发送buf缓冲区

            channel.writeInbound(buf);

        }

    }

}

在测试用例中,新建一个EmbeddedChannel实例,将StringReplayDecoderStringProcessHandler加入通道的流水线中。为了测试入站处理器,调用writeInbound()方法向EmbeddedChannel(嵌入式通道)写入100ByteBuf入站缓冲区,每个ByteBuf缓冲区包含一个字符串(为了进行区分,对content随机重复,最多3次)。

EmbeddedChannel接收到入站数据后,流水线上的两个入站处理器就能不断地处理这些入站数据:将接收到的二进制字节解码成一个一个的字符串,然后逐个输出到控制台上。

//部分输出省略

打印: 软件构件技术原理-Java版随书代码!

打印: 软件构件技术原理-Java版随书代码!

打印: 软件构件技术原理-Java版随书代码!软件构件技术原理-Java版随书代码!

打印: 软件构件技术原理-Java版随书代码!软件构件技术原理-Java版随书代码!

打印: 软件构件技术原理-Java版随书代码!

打印: 软件构件技术原理-Java版随书代码!软件构件技术原理-Java版随书代码!

通过ReplayingDecoder解码器,可以正确地解码分包后的ByteBuf数据包。但是,在实际开发中不建议继承这个类,原因如下:

1)不是所有的ByteBuf操作都被ReplayingDecoderBuffer装饰器类支持,可能有些ByteBuf方法在ReplayingDecoderdecode()方法中会抛出ReplayError异常。

2)在数据解码逻辑复杂的应用场景下,ReplayingDecoder在解码速度上相对较差。因为在ByteBuf长度不够时,ReplayingDecoder会捕获一个ReplayError异常,并会把ByteBuf中的读指针还原到之前的读指针检查点(checkpoint),然后结束这次解析操作,等待下一次IO读事件。在网络条件比较糟糕时,一个数据包的解析逻辑会被反复执行多次,此时解析过程是一个消耗CPU的操作,解码速度上相对较差。所以,ReplayingDecoder更多地应用于数据解析逻辑简单的场景。

在数据解析复杂的应用场景下,建议使用前文介绍的解码器ByteToMessageDecoder或者其子类(后文介绍)。这里继承ByteToMessageDecoder基类,实现一个定制的Head-Content协议字符串内容解码器,代码如下:

package cn.edu.bbc.computer.decoder;

//

public class StringIntegerHeaderDecoder extends ByteToMessageDecoder {

    @Override

    protected void decode(ChannelHandlerContext channelHandlerContext,ByteBuf buf, List<Object> out) {

        //可读字节小于4,消息头还没读满,返回

        if (buf.readableBytes() < 4) {

            return;

        }

        //消息头已经完整

        //在真正开始从缓冲区读取数据之前,调用markReaderIndex()设置mark

        buf.markReaderIndex();

        int length = buf.readInt();

        //从缓冲区读出消息头的大小,这会导致readIndex读指针变化

        //如果剩余长度不够消息体的大小,则需要重置读指针,下一次从相同的位置处理

        if (buf.readableBytes() < length) {

            //读指针重置到消息头的readIndex位置处

            buf.resetReaderIndex();

            return;

        }

        //读取数据,编码成字符串

        byte[] inBytes = new byte[length];

        buf.readBytes(inBytes, 0, length);

        out.add(new String(inBytes, "UTF-8"));

    }

}

在上面的示例程序中,在读取数据之前,需要调用buf.markReaderIndex()方法标记当前的位置指针,当可读内容不够(buf.readableBytes() < length)时,需要调用buf.resetReaderIndex()方法将readerIndex读指针恢复到标记位置。

表面上ByteToMessageDecoder基类是无状态的,不像ReplayingDecoder那样需要使用状态位来保存当前的读取阶段,实际上ByteToMessageDecoder也是有状态的。其内部有一个二进制字节累积器cumulation,用来保存没有解析完的二进制内容。所以,ByteToMessageDecoder及其子类都是有状态的,其实例不能在通道之间共享。在每次初始化通道的流水线时,都要重新创建一个ByteToMessageDecoder或者它的子类的实例。

11.1.6 MessageToMessageDecoder解码器

前面的解码器都是将ByteBuf缓冲区中的二进制数据解码成Java的普通POJO对象,那么是否存在一些解码器可以将一种POJO对象解码成另外一种POJO对象呢?答案是存在。与前面不同的是,解码器需要继承一个新的Netty解码器基类MessageToMessageDecoder<I>。在继承它的时候,需要明确的泛型实参<I>,用于指定入站消息的Java POJO类型。

为什么继承MessageToMessageDecoder<I>时需要指定入站数据的类型,而在前面继承ByteToMessageDecoder解码ByteBuf时不需要指定泛型实参呢?原因很简单:ByteToMessageDecoder的入站消息类型是十分明确的,就是二进制缓冲区ByteBuf类型;MessageToMessageDecoder<I>的入站消息类型是不明确的,可以是任何POJO类型,所以需要指定。

MessageToMessageDecoder同样使用了模板模式,也有一个decode()抽象方法,其具体解码的逻辑需要子类去实现。下面通过实现一个整数到字符串转换的解码器演示一下MessageToMessageDecoder的使用。代码很简单,如下所示:

package cn.edu.bbc.computer.decoder;

//

public class Integer2StringDecoder extends MessageToMessageDecoder<Integer> {

    @Override

    public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) {

        out.add(String.valueOf(msg));

    }

}

这里定义的Integer2StringDecoder新类继承了MessageToMessageDecoder基类。基类泛型实参为Integer,表明子类解码器的入站数据类型为Integer。在decode()方法中,将整数转成字符串,再加入一个List输出容器(由父类在调用时传递过来的)中即可。在子类decode()方法处理完成后,父类会将List容器中的所有元素进行迭代,逐个发送给下一个Inbound入站处理器。

Integer2StringDecoder的使用与前面的解码器一样,其具体的测试用例和前面的StringReplayDecoder实例的也大致相同,由于篇幅的限制,这里不再赘述。大家可以在源代码包中查看,其测试用例的具体类名为Integer2StringDecoderTester

11.2 常用的内置Decoder

Netty提供了不少开箱即用的Decoder(解码器),能够满足很多编解码应用场景的需求。下面将几个比较基础的解码器梳理一下。

1)固定长度数据包解码器——FixedLengthFrameDecoder

适用场景:每个接收到的数据包的长度都是固定的,例如100字节。在这种场景下,把FixedLengthFrameDecoder解码器加到流水线中,它就会把入站ByteBuf数据包拆分成一个个长度为100的数据包,然后发往下一个channelHandler入站处理器。

说明

这里所指的数据包在Netty中是一个ByteBuf实例。

2)行分割数据包解码器——LineBasedFrameDecoder

适用场景:每个ByteBuf数据包使用换行符(或者回车换行符)作为边界分隔符。在这种场景下,把LineBasedFrameDecoder解码器加到流水线中,Netty就会使用换行分隔符把ByteBuf数据包分割成一个一个完整的应用层ByteBuf数据包再发送到下一个。

3)自定义分隔符数据包解码器——DelimiterBasedFrameDecoder

DelimiterBasedFrameDecoderLineBasedFrameDecoder按照行分割的通用版本,不同之处在于这个解码器更加灵活,可以自定义分隔符,而不是局限于换行符。如果使用这个解码器,那么所接收到的数据包末尾必须带上对应的分隔符。

4)自定义长度数据包解码器——LengthFieldBasedFrameDecoder

这是一种基于灵活长度的解码器,在ByteBuf数据包中加了一个长度字段,保存了原始数据包的长度,解码时会按照原始数据包长度进行提取。此解码器在所有开箱即用解码器中是最为复杂的一种,后面会重点介绍。

11.2.1 LineBasedFrameDecoder解码器

在前面字符串分包解码器中,内容是按照Head-Content协议进行传输的。如果不使用Head-Content协议,而是在发送端通过换行符("\n"或者"\r\n")来分割每一次发送的字符串,接收端是否可以正确地解析呢?答案是肯定的。

Netty中,提供了一个开箱即用、使用换行符分割字符串的解码器——LineBasedFrameDecoder,它是一个最为基础的Netty内置解码器。这个解码器的工作原理很简单,依次遍历ByteBuf数据包中的可读字节,判断在二进制字节流中是否存在换行符"\n"或者"\r\n"的字节码。如果有,就以此位置为结束位置,把从可读索引到结束位置之间的字节作为解码成功后的ByteBuf数据包。

LineBasedFrameDecoder支持配置一个最大长度值,表示解码出来的ByteBuf能包含的最大字节数。如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常。

下面演示一下LineBasedFrameDecoder的使用,代码如下:

package cn.edu.bbc.computer.decoder;

//

public class NettyOpenBoxDecoder {

    static String spliter = "\r\n";

    static String content = "软件构件技术原理-Java版随书代码!";

    @Test

    public void testLineBasedFrameDecoder() {

          ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {

                protected void initChannel(EmbeddedChannel ch) {

                   ch.pipeline().addLast(new LineBasedFrameDecoder(1024));

                   ch.pipeline().addLast(new StringDecoder());

                   ch.pipeline().addLast(new StringProcessHandler());

                }

            };

            EmbeddedChannel channel = new EmbeddedChannel(i);

            for (int j = 0; j < 100; j++) {  //发送100个包

                //每个包为随机1~3"软件构件技术原理-Java版随书代码!"

                int random = RandomUtil.randInMod(3);

                ByteBuf buf = Unpooled.buffer();

                for (int k = 0; k < random; k++) {

                    buf.writeBytes(content.getBytes("UTF-8"));

                }

                  //发送"\r\n"回车换行符作为包结束符

                buf.writeBytes(spliter.getBytes("UTF-8"));

                channel.writeInbound(buf);

            }

         }

}

在这个示例程序中,向通道写入100个入站数据包,每个入站包都以"\r\n"回车换行符结束。模拟通道的LineBasedFrameDecoder解码器会将"\r\n"作为分隔符,分隔出一个一个的入站ByteBuf,然后发送给StringDecoder,将这些ByteBuf二进制数据转成字符串,再发送到StringProcessHandler业务处理器,由它负责将字符串展示出来。

至此,LineBasedFrameDecoder演示完毕,仅仅是Netty中非常简单的数据包解码器。

11.2.2 DelimiterBasedFrameDecoder解码器

DelimiterBasedFrameDecoder解码器不仅可以使用换行符,还可以使用其他特殊字符作为数据包的分隔符,例如制表符"\t"。其构造方法如下:

public DelimiterBasedFrameDecoder(

        int maxFrameLength,     //解码的数据包的最大长度

        Boolean stripDelimiter, //解码后的数据包是否去掉分隔符,一般选择是

        ByteBuf delimiter)      //分隔符

{

        //省略构造器的源代码

}

DelimiterBasedFrameDecoder解码器的使用方法与LineBasedFrameDecoder是一样的,只是在构造参数上有一点点不同。下面是一个实战案例。

 

package cn.edu.bbc.computer.decoder;

//

public class NettyOpenBoxDecoder {

    static String spliter2 = "\t";

    static String content = "软件构件技术原理-Java版随书代码!";

    /**

     * LengthFieldBasedFrameDecoder使用实例

     */

    @Test

    public void testDelimiterBasedFrameDecoder() {

        try {

            final ByteBuf delimiter = Unpooled.copiedBuffer(spliter2.getBytes("UTF-8"));

            ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {

                protected void initChannel(EmbeddedChannel ch) {

                    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, true, delimiter));

                    ch.pipeline().addLast(new StringDecoder());

                    ch.pipeline().addLast(new StringProcessHandler());

                }

            };

            //省略与前一个实例相同的重复代码

        }

    }

}

以上实例中,通过DelimiterBasedFrameDecoder构造了一个以制表符作为分隔符的字符串分包器。向模拟通道发送字符串的代码,由于与前一小节的发送代码基本相同,这里省略。需要注意的是,发送一个包后,要发送一个制表符作为结束。

11.2.3 LengthFieldBasedFrameDecoder解码器

Netty的开箱即用解码器中,最为复杂的是解码器为LengthFieldBasedFrameDecoder自定义长度数据包。它的难点在于参数比较多,也比较难以理解,但同时它又比较常用,因而下面对它进行重点介绍。

LengthFieldBasedFrameDecoder可以翻译为长度字段数据包解码器。传输内容中的Length(长度)字段的值是指存放在数据包中要传输内容的字节数。普通的基于Head-Content协议的内容传输尽量用内置的LengthFieldBasedFrameDecoder来解码。

一个简单的LengthFieldBasedFrameDecoder使用示例如下:

package cn.edu.bbc.computer.decoder;

//

public class NettyOpenBoxDecoder {

    public static final int VERSION = 100;

    static String content = "软件构件技术原理-Java版随书代码!";

   /**

     * LengthFieldBasedFrameDecoder使用示例 1

     */

    @Test

    public void testLengthFieldBasedFrameDecoder1() {

        try {

       final LengthFieldBasedFrameDecoder spliter = new LengthFieldBasedFrameDecoder(1024,0,4,0,4);

            ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {

                protected void initChannel(EmbeddedChannel ch) {

                    ch.pipeline().addLast(spliter);

                    ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));

                    ch.pipeline().addLast(new StringProcessHandler());

                }

            };

            EmbeddedChannel channel = new EmbeddedChannel(i);

            for (int j = 1; j <= 100; j++) {

                ByteBuf buf = Unpooled.buffer();

                String s = j + "次发送->"+content;

                byte[] bytes = s.getBytes("UTF-8");

                buf.writeInt(bytes.length );

                buf.writeBytes(bytes);

                channel.writeInbound(buf);

            }

            Thread.sleep(Integer.MAX_VALUE);

        } catch (InterruptedException e) {

            e.printStackTrace();

        } catch (UnsupportedEncodingException e) {

            e.printStackTrace();

        }

    }

}

上面的示例程序中用到了一个LengthFieldBasedFrameDecoder构造器,具体如下:

 public LengthFieldBasedFrameDecoder(

        int maxFrameLength,       //发送的数据包的最大长度

        int lengthFieldOffset,    //长度字段偏移量

        int lengthFieldLength,    //长度字段本身占用的字节数

        int lengthAdjustment,     //长度字段的偏移量矫正

        int initialBytesToStrip)  //丢弃的起始字节数

{

   //

}

在前面的示例程序中涉及5个参数和值,分别解读如下:

1maxFrameLength:发送的数据包的最大长度。示例程序中该值为1024,表示一个数据包最多可发送1024字节。

2lengthFieldOffset:长度字段偏移量,指的是长度字段位于整个数据包内部字节数组中的下标索引值。

3lengthFieldLength:长度字段所占的字节数。如果长度字段是一个int整数,则为4;如果长度字段是一个short整数,则为2

4lengthAdjustment:长度的调整值。这个参数最为难懂。在传输协议比较复杂的情况下,例如协议包含了长度字段、协议版本号、魔数等,那么解码时就需要进行长度调整。长度调整值的计算公式为:内容字段偏移量-长度字段偏移量-长度字段的字节数。这个公式一看就比较复杂,下一小节会有详细的举例说明。

5initialBytesToStrip:丢弃的起始字节数。在有效数据字段Content前面,如果还有一些其他字段的字节,作为最终的解析结果可以丢弃。例如,在上面的示例程序中,前面有4字节的长度字段,它起辅助的作用,最终的结果中不需要这个长度,所以丢弃的字节数为4

在前面的示例程序中,自定义长度解码器的构造参数值如下:

LengthFieldBasedFrameDecoder spliter = new LengthFieldBasedFrameDecoder(1024,0,4,0,4);

1个参数maxFrameLength设置为1024,表示数据包的最大长度为1024字节。

2个参数lengthFieldOffset设置为0,表示长度字段的偏移量为0,也就是长度字段放在了最前面,处于数据包的起始位置。

3个参数lengthFieldLength设置为4,表示长度字段的长度为4字节,即内容长度的值占用数据包的4字节。

4个参数lengthAdjustment设置为0。长度调整值的计算公式为:内容字段偏移量-长度字段偏移量-长度字段的字节数,在上面示例程序中的实际值为4-0-4=0

5个参数initialBytesToStrip4,表示获取最终内容Content的字节数组时抛弃最前面的4字节的数据。

运行上面的示例程序,输出结果节选如下:

//

打印: 1次发送->软件构件技术原理-Java版随书代码!

打印: 2次发送->软件构件技术原理-Java版随书代码!

打印: 3次发送->软件构件技术原理-Java版随书代码!

打印: 4次发送->软件构件技术原理-Java版随书代码!

打印: 5次发送->软件构件技术原理-Java版随书代码!

打印: 6次发送->软件构件技术原理-Java版随书代码!

如果对这些传输没有直观的了解,下面对第一个传输的数据包给出一个简单的字节图(见图11-3):长度字段为4字节,内容字段为52字节,整个数据包为56字节。

11-3 Head-Content协议的示意图

11.2.4 多字段Head-Content协议数据包解析的实战案例

Head-Content协议是最为简单的内容传输协议。在实际使用过程中则没有那么简单,除了长度和内容,在数据包中还可能包含其他字段,例如协议版本号,如图11-4所示。

11-4 包含协议版本号的Head-Content协议的示意图

使用LengthFieldBasedFrameDecoder解码器解析以上带有版本号的Head-Content协议的数据包,该如何进行构造器参数的计算呢?

1个参数maxFrameLength可以为1024,表示数据包的最大长度为1024字节。

2个参数lengthFieldOffset0,表示长度字段处于数据包的起始位置。

3个参数lengthFieldLength的值为4,表示长度字段的长度为4字节。

4个参数lengthAdjustment2,长度调整值的计算方法为:内容字段偏移量-长度字段偏移量-长度字段的长度=6-0-4=2。换句话说,在这个例子中,lengthAdjustment就是夹在内容字段和长度字段中的部分——版本号的长度。

5个参数initialBytesToStrip6,表示获取最终内容的字节数组时抛弃最前面的6字节数据。换句话说,长度字段、版本字段的值被抛弃。

实战案例的代码如下:

package cn.edu.bbc.computer.decoder;

//

public class NettyOpenBoxDecoder {

    public static final int VERSION = 100;

    static String content = "软件构件技术原理-Java版随书代码!";

    /**

     * LengthFieldBasedFrameDecoder使用示例 2

     */

    @Test

    public void testLengthFieldBasedFrameDecoder2() {

        try {

            final LengthFieldBasedFrameDecoder spliter =

                    new LengthFieldBasedFrameDecoder(1024, 0, 4, 2, 6);

            ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {

                protected void initChannel(EmbeddedChannel ch) {

                    ch.pipeline().addLast(spliter);

                    ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));

                    ch.pipeline().addLast(new StringProcessHandler());

                }

            };

            EmbeddedChannel channel = new EmbeddedChannel(i);

            for (int j = 1; j <= 100; j++) {

                ByteBuf buf = Unpooled.buffer();

                String s = j + "次发送->" + content;

                byte[] bytes = s.getBytes("UTF-8");

                buf.writeInt(bytes.length);

                buf.writeChar(VERSION);

                buf.writeBytes(bytes);

                channel.writeInbound(buf);

            }

            Thread.sleep(Integer.MAX_VALUE);

        } catch (InterruptedException e) {

            e.printStackTrace();

        } catch (UnsupportedEncodingException e) {

            e.printStackTrace();

        }

    }

}

运行实战案例,大家可以发现运行的结果和前一个实例一样,表明参数设置是正确的,LengthFieldBasedFrameDecoder解码器可以正确地解析内容。

将协议设计得再复杂一点:将2字节的协议版本放在最前面,在长度字段前面加上2字节的版本字段,在长度字段后面加上4字节的魔数,魔数用来对数据包做一些安全的认证。协议的数据包如图11-5所示。

11-5 包含协议版本号、魔数的Head-Content协议的示意图

使用LengthFieldBasedFrameDecoder解码器解码图11-5中的Head-Content协议,构造器的参数该如何计算呢?参数的设置大致如下:

1个参数maxFrameLength可以设置为1024,表示数据包的最大长度为1024字节。

2个参数lengthFieldOffset可以设置为2,表示长度字段处于版本号的后面。

3个参数lengthFieldLength可以设置为4,表示长度字段为4字节。

4个参数lengthAdjustment可以设置为4。长度调整值的计算方法为:内容字段偏移量-长度字段偏移量-长度字段的长度=10-2-4=4。在这个例子中,lengthAdjustment就是夹在内容字段和长度字段中的部分——魔数字段的长度。

5个参数initialBytesToStrip可以设置为10,表示获取最终Content内容的字节数组时抛弃最前面的10字节数据。换句话说,长度字段、版本字段、魔数字段的值被抛弃。

实战案例的代码如下:

package cn.edu.bbc.computer.decoder;

//

@Test

public void testLengthFieldBasedFrameDecoder3() {

    try {

        final LengthFieldBasedFrameDecoder spliter =

             new LengthFieldBasedFrameDecoder(1024, 2, 4, 4, 10);

       ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {

            protected void initChannel(EmbeddedChannel ch) {

                ch.pipeline().addLast(spliter);

                ch.pipeline().addLast( new  StringDecoder(Charset.forName("UTF-8")));

                ch.pipeline().addLast(new StringProcessHandler());

            }

        };

        EmbeddedChannel channel = new EmbeddedChannel(i);

        for (int j = 1; j <= 100; j++) {

            ByteBuf buf = Unpooled.buffer();

            String s = j + "次发送->" + content;

            byte[] bytes = s.getBytes("UTF-8");

            buf.writeChar(VERSION);

            buf.writeInt(bytes.length);

            buf.writeInt(MAGICCODE);

            buf.writeBytes(bytes);

            channel.writeInbound(buf);

        }

        Thread.sleep(Integer.MAX_VALUE);

        } catch (InterruptedException e) {

            e.printStackTrace();

        } catch (UnsupportedEncodingException e) {

            e.printStackTrace();

        }

    }

}

运行实战案例,可以发现运行的结果和前一个实例一样。这说明参数设置是正确的,LengthFieldBasedFrameDecoder解码器可以正确地解析内容。

11.3 Encoder原理与实战

Netty的业务处理完成后,业务处理的结果往往是某个Java POJO对象需要编码成最终的ByteBuf二进制类型,通过流水线写入底层的Java通道,这就需要用到Encoder(编码器)。

Netty中,什么叫编码器?首先,编码器是一个Outbound出站处理器,负责处理出站数据;其次,编码器将上一个Outbound出站处理器传过来的输入(Input)数据进行编码或者格式转换,然后传递到下一个ChannelOutboundHandler出站处理器。

编码器与解码器相呼应,Netty中的编码器负责将出站的某种Java POJO对象编码成二进制ByteBuf,或者转换成另一种Java POJO对象。

编码器是ChannelOutboundHandler的具体实现类。一个编码器将出站对象编码之后,数据将被传递到下一个ChannelOutboundHandler出站处理器进行后面的出站处理。

由于最后只有ByteBuf才能写入通道中,因此可以肯定通道流水线上装配的第一个编码器一定是把数据编码成了ByteBuf类型。为什么编码成的最终ByteBuf类型数据包的编码器是在流水线的头部,而不是在流水线的尾部呢?原因很简单:出站处理的顺序是从后向前的。

11.3.1 MessageToByteEncoder编码器

MessageToByteEncoder是一个非常重要的编码器基类,位于Nettyio.netty.handler.codec包中。MessageToByteEncoder的功能是将一个Java POJO对象编码成一个ByteBuf数据包。它是一个抽象类,仅仅实现了编码的基础流程,在编码过程中通过调用encode()抽象方法来完成。它的encode()编码方法是一个抽象方法,没有具体的编码逻辑实现,实现encode()抽象方法的工作需要子类去完成。

如果要实现一个自己的编码器,则需要继承自MessageToByteEncoder基类,实现它的encode()抽象方法。作为演示,下面实现一个整数编码器。其功能是将Java整数编码成二进制ByteBuf数据包。这个示例程序的代码如下:

package cn.edu.bbc.computer.encoder;

//

public class Integer2ByteEncoder extends MessageToByteEncoder<Integer> {

    @Override

    public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) {

        out.writeInt(msg);

        Logger.info("encoder Integer = " + msg);

    }

}

在继承MessageToByteEncoder时,需要带上泛型实参,具体为编码之前的Java POJO原类型(输入类型)。在这个示例程序中,编码之前的类型是Java Integer

上面的encode()方法实现很简单:将入站数据Integer类型对象msg写入Out实参(基类传入的ByteBuf实例)。编码完成后,基类MessageToByteEncoder会将输出的ByteBuf数据包发送到下一个。

编码器Integer2ByteEncoder已经完成,如何使用呢?这里编写了一个测试用例,代码如下:

package cn.edu.bbc.computer.encoder;

//

public class Integer2ByteEncoderTester {

        @Test

        public void testIntegerToByteDecoder() {

                ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {

                                protected void initChannel(EmbeddedChannel ch) {

                                        ch.pipeline().addLast(new Integer2ByteEncoder());

                                }

                };

                EmbeddedChannel channel = new EmbeddedChannel(i);

                for (int j = 0; j < 100; j++) {

                        channel.write(j);  //向通道写入整数

                }

                channel.flush();

                //取得通道的出站数据包

                ByteBuf buf = (ByteBuf) channel.readOutbound();

                while (null != buf) {

                                System.out.println("o = " + buf.readInt());

                                buf = (ByteBuf) channel.readOutbound();

                }

  //

        }

}

在上面的实例中,首先将Integer2ByteEncoder加入嵌入式通道,然后调用write()方法向通道写入100个数字。写完之后,调用channel.readOutbound()方法从通道中读取模拟的出站数据包,并且不断地循环,将数据包中的数字打印出来。

此编码器的运行比较简单,运行的结果就不在书中给出了。建议读者参考源代码工程,自行设计和实现一个整数编码器,以便加深理解。

11.3.2 MessageToMessageEncoder编码器

上一小节的示例程序是将POJO对象编码成ByteBuf二进制对象,那么是否能够通过Netty的编码器将某种POJO对象编码成另外一种POJO对象呢?答案是肯定的。需要继承另外一个Netty的重要编码器——MessageToMessageEncoder编码器,并实现它的encode()抽象方法。在子类的encode()方法实现中,完成原POJO类型到目标POJO类型的转换逻辑。在encode()实现方法中,编码完成后,将解码后的目标对象加入encode()方法中的实参list输出容器即可。

下面是一个从字符串(String)到整数(Integer)的编码器,演示一下MessageToMessageEncoder的使用。此编码器的具体功能是将字符串中的所有数字提取出来,然后输出到下一个。代码很简单,具体如下:

package cn.edu.bbc.computer.encoder;

//

public class String2IntegerEncoder extends MessageToMessageEncoder<String> {

    @Override

    protected void encode(ChannelHandlerContext c, String s, List<Object> list){

        char[] array = s.toCharArray();

        for (char a : array) {

            //48 0的编码,57 9 的编码

            if (a >= 48 && a <= 57) {

                list.add(Integer.valueOf(a));

            }

        }

    }

}

这里定义的String2IntegerEncoder类继承了MessageToMessageEncoder基类,并且明确了入站的数据类型为String。在encode()方法中,将字符串中的数字(编码在4857之间)提取出来之后,放入list输出容器中,如果遇到数字之外的其他字符则直接略过。

在子类的encode()方法处理完成之后,基类会对这个list输出容器中的所有元素进行迭代,将列表的元素逐个发送给下一个。

编码器String2IntegerEncoder已经完成,下面编写一个测试用例,代码如下:

package cn.edu.bbc.computer.encoder;

//

public class String2IntegerEncoderTester {

    /**

     * 测试字符串到整数的编码器

     */

    @Test

    public void testStringToIntergerDecoder() {

       ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {

            protected void initChannel(EmbeddedChannel ch) {

                ch.pipeline().addLast(new Integer2ByteEncoder());

                ch.pipeline().addLast(new String2IntegerEncoder());

            }

        };

        EmbeddedChannel channel = new EmbeddedChannel(i);

        for (int j = 0; j < 100; j++) {

            String s = "i am " + j;

            channel.write(s); //向通道写入含有数字的字符串

        }

        channel.flush();

        ByteBuf buf = (ByteBuf) channel.readOutbound();

        while (null != buf) {

            System.out.println("o = " + buf.readInt()); //打印数字

            buf = (ByteBuf) channel.readOutbound();     //读取数字

        }

    }

}

测试用例中除了需要使用String2IntegerEncoder编码器外,还需要用到Integer2ByteEncoder编码器。String2IntegerEncoder仅仅是编码的第一棒,负责将字符串编码成整数;Integer2ByteEncoder是编码的第二棒,将整数进一步变成ByteBuf数据包后才能最终写入通道。由于出站处理的过程是从后向前的次序,因此Integer2ByteEncoder先加入流水线,String2IntegerEncoder后加入流水线。

此编码器的运行比较简单,运行结果就不在书中给出了。建议读者参考源代码工程,查看运行结果,以便加深理解。

11.4 解码器和编码器的结合

在实际的开发中,由于数据的入站和出站关系紧密,因此编码器和解码器的关系很紧密。编码和解码更是一种紧密的、相互配套的关系。在流水线处理时,数据的流动往往一进一出,进来时解码,出去时编码。所以,在同一个流水线上,加了某种编码逻辑,常常需要加上一个相对应的解码逻辑。

前面讲到编码器和解码器是分开实现的。例如,通过继承ByteToMessageDecoder基类或者其子类,完成ByteBuf数据包到POJO的解码工作;通过继承基类MessageToByteEncoder或者其子类,完成POJOByteBuf数据包的编码工作。总之,具有相反逻辑的编码器和解码器分开实现在两个不同的类中,导致的一个结果是相互配套的编码器和解码器在加入通道的流水线时常常需要分两次添加。

现在的问题是:具有相互配套逻辑的编码器和解码器能否放在同一个类中呢?答案是肯定的,这需要用到Netty的新类型——Codec(编解码器)。

11.4.1 ByteToMessageCodec编解码器

完成POJOByteBuf数据包的编解码器基类为ByteToMessageCodec<I>,它是一个抽象类。从功能上说,继承ByteToMessageCodec<I>就等同于继承了ByteToMessageDecoderMessageToByteEncoder这两个基类。

编解码器ByteToMessageCodec同时包含了编码encode()和解码decode()两个抽象方法,这两个方法都需要我们自己实现:

1)编码方法——encode(ChannelHandlerContext, I, ByteBuf)

2)解码方法——decode(ChannelHandlerContext, ByteBuf, List<Object>)

下面是一个整数到字节、字节到整数的编解码器,代码如下:

package cn.edu.bbc.computer.codec;

//

public class Byte2IntegerCodec extends ByteToMessageCodec<Integer> {

    @Override

    public void encode(ChannelHandlerContext ctx,

                                Integer msg, ByteBuf out) {

        out.writeInt(msg);

        System.out.println("write Integer = " + msg);

    }

    @Override

    public void decode(ChannelHandlerContext ctx,

                 ByteBuf in, List<Object> out) {

        if (in.readableBytes() >= 4) {

            int i = in.readInt();

            System.out.println("Decoder i= " + i);

            out.add(i);

        }

    }

}

这是编码器和解码器的结合,简单地通过继承的方式将前面编码器的encode()方法和解码器的decode()方法放在了同一个自定义类中,这样在逻辑上更加紧密。在使用时,加入流水线时也只需要加入一次。

从上面的示例程序可以看出,ByteToMessageCodec编解码器和前面的编码器与解码器分开来实现相比仅仅是少写了一个类,少加入了一次流水线,在技术、功能上和分开实现、添加到流水线没有任何区别。

对于POJO之间进行转换的编码和解码,NettyMessageToMessageEncoder编码器和MessageToMessageDecoder解码器进行了简单的整合,整合出一个新的编解码器基类——MessageToMessageCodec。这个基类同时包含了encode()decode()两个抽象方法,用于完成POJO-TO-POJO的双向转换。仅仅是使用形式变得简化了,在技术上并没有增加太多的难度,所以本书不再展开介绍。

11.4.2 CombinedChannelDuplexHandler组合器

前面的编码器和解码器相结合是通过继承完成的。继承的不足之处在于:将编码器和解码器的逻辑强制性地放在同一个类中,在只需要编码或者解码单边操作的流水线上,逻辑上不大合适。

编码器和解码器如果要结合起来,除了继承的方法之外,还可以通过组合的方式实现。与继承相比,组合会带来更大的灵活性:编码器和解码器可以捆绑使用,也可以单独使用。

如何把单独实现的编码器和解码器组合起来呢?

Netty提供了一个新的组合器——CombinedChannelDuplexHandler基类。其用法也很简单,下面通过示例程序来演示如何将前面的整数解码器IntegerFromByteDecoder和对应的整数编码器IntegerToByteEncoder组合起来。代码如下:

package cn.edu.bbc.computer.codec;

//

public class IntegerDuplexHandler extends CombinedChannelDuplexHandler<

        Byte2IntegerDecoder, Integer2ByteEncoder>

{

    public IntegerDuplexHandler() {

        super(new Byte2IntegerDecoder(), new Integer2ByteEncoder());

    }

}

只需要继承CombinedChannelDuplexHandler,而不需要像ByteToMessageCodec那样把编码逻辑和解码逻辑都挤在同一个类中,还是复用原来分开的编码器和解码器实现代码。

总之,使用CombinedChannelDuplexHandler可以保证有了相反逻辑关系的encoder编码器和decoder解码器既可以结合使用,又可以分开使用,十分方便。

 


0 条 查看最新 评论

没有评论
暂时无法发表评论