如何构建高效多协议Android通讯框架即线程池和NIO技术应用讲解



如何构建高效多协议Android通讯框架以及线程池和NIO技术应用图片讲解。当前如何高效的请求得到网络数据成为大多数应用所面临的瓶颈问题。一些应用程序中可能会使用多种协议,比如IM通讯、视频流类型的应用会牺牲数据的完整性来更高效的获取数据,在这种类型移动开发应用中,也许就需要同时支持TCP、UDP以及HTTP协议。现在我们就试着基于安卓多线程技术ThreadPoolExecutor与NIO非阻塞式编程构建这样一个框架,高效的获取网络数据并优化支持多种协议的并发请求。

基本设计思路
基于ThreadPoolExecutor线程池来管理多个NIO线程的请求的,首先应该有个全局的ThreadPoolExecutor变量,使用单例模式来实现:
public static synchronized ThreadPoolExecutor setThreadPoolNum(int aThreadPoolMinNum,
int aThreadPoolMaxNum,long keepAliveTime)
{
    if(threadPool == null)
    {
        threadPool =
             new ThreadPoolExecutor(aThreadPoolMinNum,aThreadPoolMaxNum,
keepAliveTime,TimeUnit.SECONDS,new ArrayBlockingQueue(3),
new ThreadPoolExecutor.DiscardOldestPolicy());
    }
  
    return threadPool;
}
每个NIO请求被设计为单独的线程,每个网络通讯的连接都可以附属在NIO线程上,基本的设计思路如图1所示:

基于线程池和NIO技术构建高效的多协议Android通讯框架(转)

NIO通讯线程的设计
NIO通讯线程被设计为管理多个不同类型的网络连接,并负责维护每个连接所对应的网络数据处理接口,这些网络数据处理接口包括数据发送接口processWrite()、数据接收接口processRead()以及错误处理接口processError()等。
在每个NIO通讯线程被线程池对象启动之后,首先检查NIO端口是否有连接注册的数据到来,如果有数据到来,则提交给相应的连接进行处理,代码如下:
try {
if(selector != null)
     n = selector.select(3000);
// 如果要shutdown,关闭selector退出
if (shutdown) {
     selector.close();
     break;                 
 }
} catch (IOException e) {
   dispatchErrorToAll(e);
          
// 如果select返回大于0,处理事件
if(n > 0) {
for (Iterator i = selector.selectedKeys().iterator(); i.hasNext();) {
    // 得到下一个Key
    SelectionKey sk = i.next();
    i.remove();
    // 检查其是否还有效
    if(!sk.isValid())
        continue;

     // 处理
     INIOHandler handler = (INIOHandler)sk.attachment();
     try {
           if(sk.isConnectable())
           {
               handler.processConnect(sk);
           }
           else if (sk.isReadable())
           {
               handler.processRead(sk);
           }
           } catch (IOException e) {
               handler.processError(e);
           } catch (RuntimeException e) {
            }
       }
            n = 0;
        }
        checkNewConnection();
        notifySend();             
代码首先使用Selector类的selectedKyes()方法获取到所有有事件发生的连接通道,然后遍历这些通道,并使用attachment()方法获取到注册到通道上的处理对象NIOHandler,之后调用每个连接通道上注册的NIOHandler对象的方法进行网络数据的处理。

网络连接的设计

从NIO通讯线程的设计中知道,每个网络连接都有相应的连接通道以及连接通道的数据处理器,那么就可以抽象出来这些连接通道以及数据处理器的接口:
public interface IConnection {


public void add(OutPacket out);

public void clearSendQueue();

public void start();

public String getId();

public void dispose();

public InetSocketAddress getRemoteAddress();

public SelectableChannel channel();

public INIOHandler getNIOHandler();

public boolean isEmpty();

public void receive() throws IOException;

public void send() throws IOException;

public void send(ByteBuffer buffer);
 
public boolean isConnected();
}
在通道数据处理器中,每个连接通道都应该有基本的数据发送、数据接收以及错误处理接口,可以抽象出数据处理器的基本接口如下:
public interface INIOHandler {
   public void processConnect(SelectionKey sk) throws IOException;

   public void processRead(SelectionKey sk) throws IOException;
 
   public void processWrite() throws IOException;
 
   public void processError(Exception e);
}
那么每个网络连接都应该继承上述两个接口,比如TCP连接需要有SocketChannel通道以及相应的NIOHandler数据处理接口,UDP连接需要有DatagramChannel通道以及相应的NIOHandler数据处理接口,下面是TCPConnection的代码:
public class TCPConnection extends ConnectionImp{

private final SocketChannel channel;

private boolean remoteClosed;

public TCPConnection(String id, InetSocketAddress address) throws IOException {
   super(id);
   channel = SocketChannel.open();
   channel.configureBlocking(false);
    this.remoteAddress = address;
    remoteClosed = false;
}

public void start() {
    try {
           channel.connect(remoteAddress);
    } catch(UnknownHostException e) {
          processError(new Exception(“Unknown Host”));
    } catch(UnresolvedAddressException e) {
          processError(new Exception(“Unable to resolve server address”));
    } catch (IOException e) {
           processError(e);
    }
   }
 
   public SelectableChannel channel() {
    return channel;
}

public void receive() throws IOException {
    if(remoteClosed)
        return;
    //接收数据      
   int oldPos = receiveBuf.position();
    for (int r = channel.read(receiveBuf); r > 0; r = channel.read(receiveBuf))
        ;
  
    byte[] tempBuffer = new byte[1024];
    receiveBuf.get(tempBuffer, 0, receiveBuf.position());
    Log.e(“receive”, ” = “+new String(tempBuffer,”UTF-8″));
  
    // 得到当前位置
    int pos = receiveBuf.position();     
    receiveBuf.flip();
    // 检查是否读了0字节,这种情况一般表示远程已经关闭了这个连接       
    if(oldPos == pos) {
        remoteClosed = true;
        return;
    }
   InPacket packet = new InPacket(receiveBuf);
   inQueue.add(packet);
 
   adjustBuffer(pos);
}

private void adjustBuffer(int pos) {
   // 如果0不等于当前pos,说明至少分析了一个包
   if(receiveBuf.position() > 0) {
       receiveBuf.compact();
       receiveBuf.limit(receiveBuf.capacity());  
   } else {
       receiveBuf.limit(receiveBuf.capacity());  
       receiveBuf.position(pos);
   }
}

public void send() throws IOException {
    while (!isEmpty()) {
        sendBuf.clear();
        OutPacket packet = remove();
        channel.write(ByteBuffer.wrap(packet.getBody()));
        // 添加到重发队列
        packet.setTimeout(System.currentTimeMillis() + EnginConst.QQ_TIMEOUT_SEND);
        Log.e(“debug”,”have sended packet – ” + packet.toString());
        }
    }

   public void send(OutPacket packet) {
    try {
        sendBuf.clear();
        channel.write(ByteBuffer.wrap(packet.getBody()));
           Log.d(“debug”,”have sended packet – ” + packet.toString());            
       } catch (Exception e) {
       }
   }
 
   public void send(ByteBuffer buffer) {
       try {
           channel.write(buffer);
       } catch (IOException e) {
       }
   }
 
   public void dispose() {
       try {
        channel.close();
    } catch(IOException e) {
    }
   }
 
   public boolean isConnected() {
       return channel != null && channel.isConnected();
   }

   public void processConnect(SelectionKey sk) throws IOException {
       //完成SocketChannel的连接
       channel.finishConnect();
       while(!channel.isConnected()) {
           try {
               Thread.sleep(300);
           } catch (InterruptedException e) {
           }          
           channel.finishConnect();
       }
     sk.interestOps(SelectionKey.OP_READ);
     Log.e(“debug”,”hava connected to server”);
   }

   public void processRead(SelectionKey sk) throws IOException {
       receive();
   }
   public void processWrite() throws IOException {
       if(isConnected())
           send();
   }
}

测试代码
public void onCreate(Bundle savedInstanceState) {
       super.onCreate(savedInstanceState);
       setContentView(R.layout.main);
       AsyncRequest request = new AsyncRequest(new InetSocketAddress(“10.0.2.2″,8042));
       try {
         request.setConNumPerNIOThread(5);
         AsyncRequest.setThreadPoolNum(2, 5, 2);
         request.setUDP(false);
         request.setBody(“hello world”.getBytes());
         request.startAsyn(0);
      
        if(!request.isCurrRequestFull())
        {
             request.setBody(“My name is dongfengsun”.getBytes());
             request.startAsyn(1);
        }
    } catch (Exception e) {
        Log.e(“onCreate”, ” = “+e.getMessage());
        e.printStackTrace();
    }
   }
测试代码中设定每个NIO通讯线程的连接上限为5,如果每个NIO通讯线程还有空余的连接数,那么可以继续在当前的NIO通讯线程创建网络连接,如果满了,则可以重新初始化一个NIO通讯线程并加入到线程池中

作者 孙东风 2011-1-20