Netty使用方法介绍



Netty使用方法介绍。

Netty 是一个异步的、事件驱动的网络编程框架,可以快速开发出可维护的、高性能、高扩展能力的协议服务及其客户端应用。
Netty 是一个基于NIO的客户端、服务器端编程框架,使用Netty可以确保你快速简单的开发出一个网络应用。例如实现了某种协议的客户端、服务端应用。Netty简化了网络应用的编程开发过程,例如:TCP和UDP的socket服务开发。
“快速”和“简单”并不意味着会让你的最终应用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议包括FTP、SMTP、HTTP、各种二进制、文本协议,并经过相当精心设计的项目,最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

 

jar包依赖:

 

  1. <dependency>
  2.     <groupId>org.jboss.netty</groupId>
  3.     <artifactId>netty</artifactId>
  4.     <version>3.2.0.ALPHA4</version>
  5. </dependency>
<dependency>
  	<groupId>org.jboss.netty</groupId>
	<artifactId>netty</artifactId>
	<version>3.2.0.ALPHA4</version>
</dependency>

 

 

服务端:

 

  1. public void testServer() {
  2.        ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
  3.                                                                   Executors.newCachedThreadPool());
  4.        ServerBootstrap bootstrap = new ServerBootstrap(factory);
  5.        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
  6.            public ChannelPipeline getPipeline() throws Exception {
  7.                return Channels.pipeline(new ServerHandler(), new TimeEncoder());
  8.            }
  9.        });
  10.        bootstrap.setOption(“child.tcpNoDelay”, true);
  11.        bootstrap.setOption(“child.keepAlive”, true);
  12.        bootstrap.bind(new InetSocketAddress(“localhost”, 9999));
  13.    }
 public void testServer() {
        ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
                                                                   Executors.newCachedThreadPool());
        ServerBootstrap bootstrap = new ServerBootstrap(factory);
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(new ServerHandler(), new TimeEncoder());
            }
        });
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);

        bootstrap.bind(new InetSocketAddress("localhost", 9999));
    }

1)ChannelFactory 是一个创建和管理Channel通道及其相关资源的工厂接口,它处理所有的I/O请求并产生相应的I/O ChannelEvent通道事件。Netty 提供了多种 ChannelFactory 实现。这里我们需要实现一个服务端的例子,因此我们使用NioServerSocketChannelFactory实现。另一件需要注意的是这个工厂自己不负责创建I/O线程。你应当在其构造器中指定该工厂使用的线程池,这样做的好处是你获得了更高的控制力来管理你的应用环境中使用的线程。
2)ServerBootstrap 是一个设置服务的帮助类。你甚至可以在这个服务中直接设置一个Channel通道。然而请注意,这是一个繁琐的过程,大多数情况下并不需要这样做。
3)这里,我们将ServerHandler、TimeEncoder处理器添加至默认的ChannelPipeline通道。任何时候当服务器接收到一个新的连接,一个新的ChannelPipeline管道对象将被创建,并且所有在这里添加的ChannelHandler对象将被添加至这个新的 ChannelPipeline管道对象。这很像是一种浅拷贝操作(a shallow-copy operation);所有的Channel通道以及其对应的ChannelPipeline实例将分享相同的ServerHandler、TimeEncoder实例。
4)你也可以设置我们在这里指定的这个通道实现的配置参数。我们正在写的是一个TCP/IP服务,因此我们运行设定一些socket选项,例如 tcpNoDelay和keepAlive。请注意我们在配置选项里添加的”child.”前缀。这意味着这个配置项仅适用于我们接收到的通道实例,而不 是ServerSocketChannel实例。因此,你可以这样给一个ServerSocketChannel设定参数:
bootstrap.setOption(“reuseAddress”, true);
5)最后要做的是绑定这个服务使用的端口并且启动这个服务。这里,我们绑定本机所有网卡上的9999端口。当然,你也可以对不同的绑定地址多次调用绑定操作。

 

 

两个ChannelHandler处理类
ServerHandler.java

 

  1. public class ServerHandler extends SimpleChannelHandler {
  2.     @Override
  3.     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
  4.         Person person = new Person(“Tom哥”, 29, 111111111);
  5.         ChannelFuture future = e.getChannel().write(person);
  6.         future.addListener(ChannelFutureListener.CLOSE);
  7.     }
  8. }
public class ServerHandler extends SimpleChannelHandler {

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        Person person = new Person("Tom哥", 29, 111111111);
        ChannelFuture future = e.getChannel().write(person);
        future.addListener(ChannelFutureListener.CLOSE);
    }
}

TimeEncoder.java

 


 

  1. public class TimeEncoder extends SimpleChannelHandler {
  2.     private final ChannelBuffer buffer = dynamicBuffer();
  3.     @Override
  4.     public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
  5.         Person person = (Person) e.getMessage();
  6.         buffer.writeInt(person.getName().getBytes(“GBK”).length);
  7.         buffer.writeBytes(person.getName().getBytes(“GBK”));
  8.         buffer.writeInt(person.getAge());
  9.         buffer.writeDouble(person.getSalary());
  10.         Channels.write(ctx, e.getFuture(), buffer);
  11.     }
  12. }
public class TimeEncoder extends SimpleChannelHandler {

    private final ChannelBuffer buffer = dynamicBuffer();

    @Override
    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        Person person = (Person) e.getMessage();
        buffer.writeInt(person.getName().getBytes("GBK").length);
        buffer.writeBytes(person.getName().getBytes("GBK"));
        buffer.writeInt(person.getAge());
        buffer.writeDouble(person.getSalary());
        Channels.write(ctx, e.getFuture(), buffer);
    }
}

ChannelBuffer是Netty的一个基本数据结构,这个数据结构存储了一个字节序列。ChannelBuffer类似于NIO的ByteBuffer,但是前者却更加的灵活和易于使用。例如:Netty允许你创建一个由多个ChannelBuffer构建的复合ChannelBuffer类型,这样就可以减少不必要的内存拷贝次数。

 

 

客户端:

 

  1. public void testClient() {
  2.         // 创建客户端channel的辅助类,发起connection请求
  3.         ClientBootstrap bootstrap = new ClientBootstrap(
  4.                                                         new NioClientSocketChannelFactory(
  5.                                                                                           Executors.newCachedThreadPool(),
  6.                                                                                           Executors.newCachedThreadPool()));
  7.         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
  8.             public ChannelPipeline getPipeline() throws Exception {
  9.                 ChannelPipeline pipeline = Channels.pipeline();
  10.                 pipeline.addLast(“decoder”, new TimeDecoder());
  11.                 pipeline.addLast(“handler”, new ClientHandler());
  12.                 return pipeline;
  13.             }
  14.         });
  15.         // 创建无连接传输channel的辅助类(UDP),包括client和server
  16.         ChannelFuture future = bootstrap.connect(new InetSocketAddress(“localhost”, 9999));
  17.         future.getChannel().getCloseFuture().awaitUninterruptibly();
  18.         bootstrap.releaseExternalResources();
  19.     }
public void testClient() {
        // 创建客户端channel的辅助类,发起connection请求
        ClientBootstrap bootstrap = new ClientBootstrap(
                                                        new NioClientSocketChannelFactory(
                                                                                          Executors.newCachedThreadPool(),
                                                                                          Executors.newCachedThreadPool()));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", new TimeDecoder());
                pipeline.addLast("handler", new ClientHandler());
                return pipeline;
            }
        });
        // 创建无连接传输channel的辅助类(UDP),包括client和server
        ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 9999));
        future.getChannel().getCloseFuture().awaitUninterruptibly();
        bootstrap.releaseExternalResources();
    }

 

1) 使用NioClientSocketChannelFactory而不是NioServerSocketChannelFactory来创建客户端的Channel通道对象。
2) 客户端的ClientBootstrap对应ServerBootstrap。
3) 请注意,这里不使用“child.”前缀的配置项,客户端的SocketChannel实例不存在父级Channel对象。
4) 我们应当调用connect连接方法,而不是之前的bind绑定方法。
 

  1. public class TimeDecoder extends FrameDecoder {
  2.     private final ChannelBuffer buffer = dynamicBuffer();
  3.     @Override
  4.     protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer channelBuffer) throws Exception {
  5.         if (channelBuffer.readableBytes() < 4) {
  6.             return null;
  7.         }
  8.         if (channelBuffer.readable()) {
  9.             // 读到,并写入buf
  10.             channelBuffer.readBytes(buffer, channelBuffer.readableBytes());
  11.         }
  12.         int namelength = buffer.readInt();
  13.         String name = new String(buffer.readBytes(namelength).array(), ”GBK”);
  14.         int age = buffer.readInt();
  15.         double salary = buffer.readDouble();
  16.         Person person = new Person(name, age, salary);
  17.         return person;
  18.     }
  19. }
public class TimeDecoder extends FrameDecoder {

    private final ChannelBuffer buffer = dynamicBuffer();

    @Override
    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer channelBuffer) throws Exception {
        if (channelBuffer.readableBytes() < 4) {
            return null;
        }
        if (channelBuffer.readable()) {
            // 读到,并写入buf
            channelBuffer.readBytes(buffer, channelBuffer.readableBytes());
        }
        int namelength = buffer.readInt();
        String name = new String(buffer.readBytes(namelength).array(), "GBK");
        int age = buffer.readInt();
        double salary = buffer.readDouble();
        Person person = new Person(name, age, salary);
        return person;
    }
}

代码说明
1)当接收到新的数据后,FrameDecoder会调用decode方法,同时传入一个FrameDecoder内部持有的累积型buffer缓冲。
2)如果decode返回null值,这意味着还没有接收到足够的数据。当有足够数量的数据后FrameDecoder会再次调用decode方法。
3) 如果decode方法返回一个非空值,这意味着decode方法已经成功完成一条信息的解码。FrameDecoder将丢弃这个内部的累计型缓冲。请注意你不需要对多条消息进行解码,FrameDecoder将保持对decode方法的调用,直到decode方法返回非空对象。

对于例如TCP/IP这种基于流的传输协议实现,接收到的数据会被存储在socket的接受缓冲区内。不幸的是,这种基于流的传输缓冲区并不是一个包队列,而是一个字节队列。这意味着,即使你以两个数据包的形式发送了两条消息,操作系统却不会把它们看成是两条消息,而仅仅是一个批次的字节序列。因此,在这种情况下我们就无法保证收到的数据恰好就是远程节点所发送的数据。假设一个操作系统的TCP/IP堆栈收到了三个数据包:

 

+—–+—–+—–+
| ABC | DEF | GHI |
+—–+—–+—–+

由于这种流传输协议的普遍性质,在你的应用中有较高的可能会把这些数据读取为另外一种形式:

+—-+——-+—+—+
| AB | CDEFG | H | I |
+—-+——-+—+—+

因此对于数据的接收方,不管是服务端还是客户端,应当重构这些接收到的数据,让其变成一种可让你的应用逻辑易于理解的更有意义的数据结构。在上面所述的这个例子中,接收到的数据应当重构为下面的形式:

+—–+—–+—–+
| ABC | DEF | GHI |
+—–+—–+—–+
Netty有一个简单却不失强大的架构。这个架构由三部分组成——缓冲(buffer),通道(channel),事件模型(event model)——所有的高级特性都构建在这三个核心组件之上。一旦你理解了它们之间的工作原理,你便不难理解在本章简要提及的更多高级特性。