使用JAVA NIO簡單實現Socket Serverjava
package com.flyer.cn.javaIO; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Date; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class EchoServer { public static SelectorLoop connectionBell; public static SelectorLoop readBell; public boolean isReadBellRunning=false; private ExecutorService thdPool=Executors.newCachedThreadPool(); public static void main(String[] args) throws IOException { new EchoServer().startServer(); } // 啓動服務器 public void startServer() throws IOException { // 準備好一個鬧鐘.當有連接進來的時候響. connectionBell = new SelectorLoop(); // 準備好一個鬧裝,當有read事件進來的時候響. readBell = new SelectorLoop(); // 開啓一個server channel來監聽 ServerSocketChannel ssc = ServerSocketChannel.open(); // 開啓非阻塞模式 ssc.configureBlocking(false); ServerSocket socket = ssc.socket(); socket.bind(new InetSocketAddress("localhost",7878)); // 給鬧鐘規定好要監聽報告的事件,這個鬧鐘只監聽新鏈接事件. ssc.register(connectionBell.getSelector(), SelectionKey.OP_ACCEPT); new Thread(connectionBell,"connectionBell").start(); } // Selector輪詢線程類 public class SelectorLoop implements Runnable { private Selector selector; private ByteBuffer temp = ByteBuffer.allocate(1024); public SelectorLoop() throws IOException { this.selector = Selector.open(); } public Selector getSelector() { return this.selector; } @Override public void run() { while(true) { try { // 阻塞,只有當至少一個註冊的事件發生的時候纔會繼續. this.selector.select(); Set<SelectionKey> selectKeys = this.selector.selectedKeys(); Iterator<SelectionKey> it = selectKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (key.isAcceptable()) { // 這是一個connection accept事件, 而且這個事件是註冊在serversocketchannel上的. ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // 接受一個鏈接. SocketChannel sc = ssc.accept(); // 對新的鏈接的channel註冊read事件. 使用readBell鬧鐘. sc.configureBlocking(false); sc.register(readBell.getSelector(), SelectionKey.OP_READ); System.out.println(" from client address:" + sc.getRemoteAddress()); // 若是讀取線程尚未啓動,那就啓動一個讀取線程. synchronized(EchoServer.this) { if (!EchoServer.this.isReadBellRunning) { EchoServer.this.isReadBellRunning = true; new Thread(readBell,"readBell").start(); } } } else if (key.isReadable()){ int IntLength=4; int ObjLength; //有效數據長度 int readObj;//從NIO信道中讀出的數據長度 ByteBuffer bbInt = ByteBuffer.allocate(4); //讀取INT頭信息的緩存池 ByteBuffer bbObj = ByteBuffer.allocate(1024); //讀取OBJ有效數據的緩存池 // 這是一個read事件,而且這個事件是註冊在socketchannel上的. SocketChannel channel = (SocketChannel) key.channel(); //讀出INT數據頭 channel.read(bbInt); //獲取INT頭中標示的有效數據長度信息並清空INT緩存池 ObjLength = bbInt.getInt(0); bbInt.clear(); //清空有效數據緩存池設置有效緩存池的大小 bbObj.clear(); bbObj.limit(ObjLength); //循環讀滿緩存池以保證數據完整性 readObj = channel.read(bbObj); while (readObj != ObjLength) { readObj += channel.read(bbObj); } // // // 寫數據到buffer // int count = sc.read(temp); // if (count < 0) { // // 客戶端已經斷開鏈接. // key.cancel(); // sc.close(); // return; // } // // 切換buffer到讀狀態,內部指針歸位. bbObj.flip(); String msg = Charset.forName("UTF-8").decode(bbObj).toString(); // System.out.println(ObjLength+":"+readObj+" "+new Date().toLocaleString()+"Server received ["+msg+"] from client address:" + channel.getRemoteAddress()); if(ObjLength!=readObj){ System.out.println(ObjLength+":"+readObj+" "+new Date().toLocaleString()+"Server received ["+msg+"] from client address:" + channel.getRemoteAddress()); } // 清空buffer temp.clear(); thdPool.submit(new Dispatch(channel,msg)); } } } catch (IOException e) { e.printStackTrace(); } } } public class Dispatch implements Runnable{ private SocketChannel sc; private String msg; public Dispatch(SocketChannel _sc,String _msg){ this.sc=_sc; this.msg=_msg; } public void run() { try{ // Thread.sleep(500); msg=msg+" "+new Date().toLocaleString(); // echo back. sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8")))); } catch(Exception ex){ ex.printStackTrace(); } } } } }
客戶端緩存
package com.flyer.cn.javaIO; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Date; import java.util.Iterator; import java.util.Set; public class Client implements Runnable { // 空閒計數器,若是空閒超過10次,將檢測server是否中斷鏈接. private String clientName; private static int idleCounter = 0; private Selector selector; private SocketChannel socketChannel; private ByteBuffer temp = ByteBuffer.allocate(1024); public static void main(String[] args) throws IOException { for(int i=0;i<100;i++){ Client client= new Client("client"+i); new Thread(client).start(); //client.sendFirstMsg(); } } public Client(String name) { try{ this.clientName=name; // 一樣的,註冊鬧鐘. this.selector = Selector.open(); // 鏈接遠程server socketChannel = SocketChannel.open(); // 若是快速的創建了鏈接,返回true.若是沒有創建,則返回false,並在鏈接後出發Connect事件. Boolean isConnected = socketChannel.connect(new InetSocketAddress("localhost", 7878)); socketChannel.configureBlocking(false); SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ); if (isConnected) { this.sendFirstMsg(socketChannel,"Hello NIO.From "+this.clientName); } else { // 若是鏈接還在嘗試中,則註冊connect事件的監聽. connect成功之後會出發connect事件. key.interestOps(SelectionKey.OP_CONNECT); } } catch(Exception ex){ ex.printStackTrace(); } } public void sendFirstMsg(SocketChannel socketChannel,String msg) throws IOException { int IntLength=4; ByteBuffer bb = ByteBuffer.allocate(1024); //構造發送數據:整型數據頭+有效數據段 byte[] arr = msg.getBytes(Charset.forName("UTF-8")); final int ObjLength = arr.length; //獲取有效數據段長度 bb.clear(); bb.limit(IntLength + ObjLength); //調整緩存池大小 bb.putInt(ObjLength); bb.put(arr); bb.position(0); socketChannel.write(bb); } @Override public void run() { while (true) { try { // 阻塞,等待事件發生,或者1秒超時. num爲發生事件的數量. int num = this.selector.select(1000); if (num ==0) { idleCounter ++; if(idleCounter >10) { // 若是server斷開了鏈接,發送消息將失敗. try { this.sendFirstMsg(socketChannel,"Hello NIO.From "+this.clientName); } catch(ClosedChannelException e) { e.printStackTrace(); this.socketChannel.close(); return; } } continue; } else { idleCounter = 0; } Set<SelectionKey> keys = this.selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (key.isConnectable()) { // socket connected SocketChannel sc = (SocketChannel)key.channel(); if (sc.isConnectionPending()) { sc.finishConnect(); } // send first message; this.sendFirstMsg(socketChannel,"Hello NIO.From "+this.clientName); } if (key.isReadable()) { // msg received. SocketChannel sc = (SocketChannel)key.channel(); this.temp = ByteBuffer.allocate(1024); int count = sc.read(temp); if (count<0) { sc.close(); continue; } // 切換buffer到讀狀態,內部指針歸位. temp.flip(); String msg = Charset.forName("UTF-8").decode(temp).toString(); System.out.println("Client received ["+msg+"] from server address:" + sc.getRemoteAddress()+new Date().toLocaleString()); Thread.sleep(1000); sendFirstMsg(sc,"Hello NIO.From "+this.clientName); // echo back. // sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8")))); // 清空buffer temp.clear(); } } } catch (Exception e) { System.out.println("網絡鏈接異常"); } } } }