NIO socket 的簡單鏈接池

     在最近的項目中,須要寫一個socket 與 底層服務器通訊的模塊。在設計中,請求對象被封裝 xxxRequest,消息返回被封裝爲 xxxResponse. 因爲socket的編程開發經驗少,一開始我使用了短鏈接的方式,每一個請求創建一個socket通訊,因爲每一個socket只進行一次讀寫,這大大浪費了 系統資源。

      因而考慮使用長鏈接,系統公用一個client socket 並對send 操做進行加鎖,結果在處理併發的時候,各類慢,各類等待。沒有辦法,考慮使用兩節池,預先建立多個 client socket 放入 鏈接池,須要發送請求時從鏈接池獲取一個socket,完成請求時放入鏈接池中。下面是一個簡單的實現。html

       

        private  static String IP=GlobalNames.industryIP;
 private  static int PORT =Integer.parseInt(GlobalNames.industryPort);
 
 private static  int CONNECTION_POOL_SIZE = 10;
 private static NIOConnectionPool self = null;
 private Hashtable<Integer, SocketChannel> socketPool = null; // 鏈接池
 private boolean[] socketStatusArray = null; // 鏈接的狀態(true-被佔用,false-空閒)
 private static Selector selector  = null;
 private static InetSocketAddress SERVER_ADDRESS = null;
 
 /**
  * 初始化鏈接池,最大TCP鏈接的數量爲10
  *
  * @throws IOException
  */android 自定義SeekBarPreference 實現
 public static synchronized void init() throws Exception {
  self = new NIOConnectionPool();
  self.socketPool = new Hashtable<Integer, SocketChannel>();
  self.socketStatusArray = new boolean[CONNECTION_POOL_SIZE];
  buildConnectionPool();
 }android

 /**
  * 創建鏈接池
  */
 public synchronized static void buildConnectionPool() throws Exception {
  if (self == null) {
   init();
  }
  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
   SocketChannel client = allocateSocketChannel();
   self.socketPool.put(new Integer(i), client);
   self.socketStatusArray[i] = false;
  }
 }編程

 /**
  * 從鏈接池中獲取一個空閒的Socket
  * 商帳追收
  * @return 獲取的TCP鏈接
  */
 public static SocketChannel getConnection() throws Exception {
  if (self == null)
   init();
  int i = 0;
  for (i = 0; i < CONNECTION_POOL_SIZE; i++) {
   if (!self.socketStatusArray[i]) {
    self.socketStatusArray[i] = true;
    break;
   }
  }
  if (i < CONNECTION_POOL_SIZE) {
   return self.socketPool.get(new Integer(i));
   
  } else {服務器

  //目前鏈接池無可用鏈接時只是簡單的新建一個鏈接
   SocketChannel newClient = allocateSocketChannel();
   CONNECTION_POOL_SIZE++;
   self.socketPool.put(CONNECTION_POOL_SIZE, newClient);
   return newClient;
  }
 }併發

 /**
  * 當得到的socket不可用時,從新得到一個空閒的socket。
  *
  * @param socket
  *            不可用的socket
  * @return 新獲得的socket
  * @throws Exception
  */
 public static SocketChannel rebuildConnection(SocketChannel socket)
   throws Exception {
  if (self == null) {
   init();
  }
  SocketChannel newSocket = null;
  try {
   for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
    if (self.socketPool.get(new Integer(i)) == socket) {
     newSocket = allocateSocketChannel();
     self.socketPool.put(new Integer(i), newSocket);
     self.socketStatusArray[i] = true;
    }
   }socket

  } catch (Exception e) {
   System.out.println("重建鏈接失敗!");
   throw new RuntimeException(e);
  }
  return newSocket;
 }ui


 /**
  * 將用完的socket放回池中,調整爲空閒狀態。此時鏈接並無斷開。
  *
  * @param socket
  *            使用完的socket
  * @throws Exception
  */
 public static void releaseConnection(SocketChannel socket) throws Exception {
  if (self == null) {
   init();
  }
  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
   if (self.socketPool.get(new Integer(i)) == socket) {
    self.socketStatusArray[i] = false;
    break;
   }
  }
 }設計

 /**
  * 斷開池中全部鏈接
  *
  * @throws Exception
  */
 public synchronized static void releaseAllConnection() throws Exception {
  if (self == null)
   init();htm

  // 關閉全部鏈接
  SocketChannel socket = null;
  for (int i = 0; i < CONNECTION_POOL_SIZE; i++) {
   socket = self.socketPool.get(new Integer(i));
   try {
    socket.close();
    self.socketStatusArray[i] = false;
   } catch (Exception e) {
    e.printStackTrace();
   }
  }
 }
   
 
 public static SocketChannel allocateSocketChannel(){
  
   SERVER_ADDRESS = new InetSocketAddress(  
                IP, PORT);  
  SocketChannel socketChannel = null;
     SocketChannel client = null;
     try{
     socketChannel = SocketChannel.open();  
        socketChannel.configureBlocking(false);  
        selector = Selector.open();  
        socketChannel.register(selector, SelectionKey.OP_CONNECT);  
        socketChannel.connect(SERVER_ADDRESS);
        Set<SelectionKey> selectionKeys;  
        Iterator<SelectionKey> iterator;  
        SelectionKey selectionKey;
        selector.select();  
        selectionKeys = selector.selectedKeys();  
        iterator = selectionKeys.iterator();  
        while (iterator.hasNext()) {  
            selectionKey = iterator.next();  
            if (selectionKey.isConnectable()) {  
                client = (SocketChannel) selectionKey.channel();  
                if (client.isConnectionPending()) {  
                    client.finishConnect();
                    client.register(selector, SelectionKey.OP_WRITE);  
                    break;
                }
            }
        }
 }catch(Exception e){
  e.printStackTrace();
 }
 return client;
  }對象

 public static Selector getSelector() {
  return selector;
 }

 

使用鏈接池進行通訊:

 /*緩衝區大小*/       private static int BLOCK = 8*4096;        /*發送數據緩衝區*/       private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);            /*接受數據緩衝區*/      private static ByteBuffer protocalNum = ByteBuffer.allocate(4);      private static ByteBuffer functionNum = ByteBuffer.allocate(4);      private static ByteBuffer messageLen = ByteBuffer.allocate(4);      private static ByteBuffer receivebuffer = null;      private  SocketChannel client = null;      private Selector selector = null;           private boolean readable = true;      private boolean writable = true;                public NIOSocketBackUp() throws Exception{       client = NIOConnectionPool.getConnection();       selector = NIOConnectionPool.getSelector();      }           public String send(ServiceRequest request) throws Exception {                           Set<SelectionKey> selectionKeys;            Iterator<SelectionKey> iterator;            SelectionKey selectionKey;            int count=0;            boolean flag = true;          String receiveText="";             while (flag) {                selector.select();                //返回此選擇器的已選擇鍵集。                selectionKeys = selector.selectedKeys();                iterator = selectionKeys.iterator();                while (iterator.hasNext()) {                    selectionKey = iterator.next();                    if (selectionKey.isWritable() && (writable)) {                            sendbuffer.clear();                            sendbuffer.put(request.getProtocalNum());                          sendbuffer.put(request.getFunctionNum());                          sendbuffer.put(request.getMessageLen());                          sendbuffer.put(request.getXmlbytes());                          sendbuffer.flip();                            client.write(sendbuffer);                            client.register(selector, SelectionKey.OP_READ);                            writable = false;                  } else if (selectionKey.isReadable() && (readable) ) {                        protocalNum.clear();                      functionNum.clear();                      messageLen.clear();                                                                count=client.read(protocalNum);                        count=client.read(functionNum);                      count=client.read(messageLen);                      messageLen.rewind();                      int length = messageLen.asIntBuffer().get(0);                      receivebuffer = ByteBuffer.allocate(length-12);                      receivebuffer.clear();                                           //read方式居然不阻塞                      int total=0;                      while(total!=(length-12)){                        count=client.read(receivebuffer);                        total+=count;                      }                      client.register(selector, SelectionKey.OP_WRITE);                        receiveText = new String(receivebuffer.array(),"GBK");                      flag = false;                      readable = false;                      break;                  }              }            }              NIOConnectionPool.releaseConnection(client);          return receiveText.trim();      }  

相關文章
相關標籤/搜索