本身想了一下怎麼實現,就寫了,沒有深究是否合理.更多處理沒有寫下去,例如收件人不在線,應該保存在數據庫,等下一次鏈接的時候刷新map,再把數據發送過去,圖片發送也沒有作,也沒有用json格式java
socket很奇怪,我用客戶端鏈接上了服務器,沒有發送消息的狀況下,斷開電腦網絡,是不會出現問題,而後在把電腦網絡鏈接上,通信依然正常,正常斷開也不出問題,可是用idea直接按stop鍵,那麼服務端就會出問題了,讀取事件會一直爲true,形成死循環,消耗CPU,因此必需要判斷一下客戶端鏈接是否斷開了數據庫
只須要把客戶端代碼啓動幾個,修改一些userName以及收件人,就能夠測試,實現相似QQ微信即時通信,聊天功能json
服務端代碼服務器
package serversocketchannel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; /** * * @author ZhenWeiLai * */ public class ServerSocketChannelNonBlocking { private static ServerSocketChannel serverSocketChannel = null; private static Charset charset = Charset.forName("GBK");//設置編碼集,用於編碼,解碼 private static Selector selector = null; //保存客戶端的map private static final ConcurrentHashMap<String,SocketChannel> clientSockets = new ConcurrentHashMap<>(); static{ try { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); serverSocketChannel.socket().bind(new InetSocketAddress(8000)); serverSocketChannel.configureBlocking(false);//設置爲非阻塞 selector = Selector.open();//實例化一個選擇器 } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { service(); } private static void service(){ SocketChannel clientChannel = null; SelectionKey selectionKey = null; SocketChannel targetChannel = null; try { serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);//服務端監聽鏈接 while(true){ selector.select();//阻塞至有新的鏈接就開始處理 Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator(); while(selectionKeys.hasNext()){ selectionKey = selectionKeys.next(); if(selectionKey.isAcceptable()){//若是事件是鏈接事件 ServerSocketChannel serverChannel = (ServerSocketChannel)selectionKey.channel();//獲取事件綁定的channel clientChannel = serverChannel.accept();//鏈接獲取帶客戶端信息的socketChannel clientChannel.configureBlocking(false);//客戶設置爲非阻塞,由於非阻塞才支持選擇器.避免盲等浪費資源 ByteBuffer byteBuffer = ByteBuffer.allocate(1024);//做爲每個客戶端的附件緩衝器 /** * 只監聽讀事件,這裏千萬別監聽寫事件,由於只要鏈接有效,那麼寫事件會一直爲true,致使死循環,很耗資源 * 能夠跟serverSocket用同一個選擇器,由於綁定的channel不一樣 */ clientChannel.register(selector,SelectionKey.OP_READ,byteBuffer); }else if(selectionKey.isReadable()){//只要有客戶端寫入,那麼就能夠處理 //獲取客戶端附件,也就是寫入的數據 ByteBuffer byteBuffer = (ByteBuffer)selectionKey.attachment(); //從selectionKey獲取客戶端的channel SocketChannel socketChannel = (SocketChannel)selectionKey.channel(); //把附件讀出,解碼爲字符串 String msg = read(socketChannel,byteBuffer); //這裏用了->分割收件人,->後面跟着的字符串是收件人 if(msg.indexOf("->")!=-1){ //內容 String content = msg.substring(0,msg.lastIndexOf("->")); //從map裏獲取收件人的socket targetChannel = clientSockets.get(msg.substring(msg.lastIndexOf("->")+2)); //實例化一個緩衝區,用來寫出到收件人的socketChannel ByteBuffer temp = ByteBuffer.allocate(1024); temp.put(charset.encode(content)); //寫出 handleWrite(targetChannel,temp); }else{ //若是內容沒有收件人,那麼視爲第一次鏈接,客戶端發過來的userName,做爲KEY存入MAP clientSockets.put(msg,socketChannel); } } selectionKeys.remove(); } } } catch (IOException e) { try { if(selectionKey!=null)selectionKey.cancel(); if(clientChannel!=null){ clientChannel.shutdownInput(); clientChannel.shutdownOutput(); clientChannel.close(); } if(targetChannel!=null){ targetChannel.shutdownInput(); targetChannel.shutdownOutput(); targetChannel.close(); } } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } e.printStackTrace(); } } private static String read(SocketChannel socketChannel,ByteBuffer byteBuffer){ //重置position limit爲寫入作準備 byteBuffer.clear(); try { int flag =socketChannel.read(byteBuffer); //判斷客戶端是否斷開鏈接 if(flag==-1){ //若是客戶端無端斷開,必定要關閉,不然讀取事件一直爲true形成死循環,很是耗資源 socketChannel.close(); } } catch (IOException e) { try { socketChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } e.printStackTrace(); } //position =0 limit等於有效下標,爲寫出作準備 byteBuffer.flip(); return charset.decode(byteBuffer).toString(); } //寫出 private static void handleWrite(SocketChannel socketChannel,ByteBuffer byteBuffer){ synchronized (byteBuffer) { byteBuffer.flip(); try { socketChannel.write(byteBuffer); } catch (IOException e) { try { socketChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } e.printStackTrace(); } } } }
客戶端代碼微信
package socketchannel; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; /** * Created by lzw on 17-2-28. */ public class SocketChannelNonBlockingClient { private static Charset charset = Charset.forName("GBK"); private static ByteBuffer receiveBuffer = ByteBuffer.allocate(10240); private static ByteBuffer sendBuffer = ByteBuffer.allocate(10240); private static SocketChannel socketChannel = null; private static Selector selector = null; private static String userName = "client1";//客戶端名 private static String targetName = "client2";//收件人名 public static void main(String[] args) { try { socketChannel = SocketChannel.open(); //鏈接到服務端 SocketAddress socketAddress = new InetSocketAddress("19.95.103.112",8000); selector = Selector.open();//實例化一個選擇器 socketChannel.configureBlocking(false);//設置爲非阻塞 //先監聽一個鏈接事件 socketChannel.register(selector,SelectionKey.OP_CONNECT); //鏈接 socketChannel.connect(socketAddress); //jdk 1.8的lambda表達式,用一個線程監控控制檯輸入 new Thread(()->{ try { receiveFromUser(); } catch (IOException e) { e.printStackTrace(); } }).start(); talk(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private static void talk(){ try { while(true){ selector.select();//阻塞直到鏈接事件 Iterator<SelectionKey> readyKeys = selector.selectedKeys().iterator(); while(readyKeys.hasNext()){ SelectionKey key =readyKeys.next(); if(key.isConnectable()){ //非阻塞的狀況下可能沒有鏈接完成,這裏調用finishConnect阻塞至鏈接完成 socketChannel.finishConnect(); //鏈接完成之後,先發送本身的userName以便保存在服務端的客戶端map裏面 synchronized (sendBuffer){ SocketChannel socketChannel1 = (SocketChannel)key.channel(); sendBuffer.clear(); sendBuffer.put(charset.encode(userName)); send(socketChannel1); socketChannel.register(selector,SelectionKey.OP_READ);//僅監聽一個讀取事件 } }else if(key.isReadable()){ //處理讀事件 receive(key); } readyKeys.remove(); } } } catch (ClosedChannelException e) { try { socketChannel.close(); } catch (IOException e1) { e1.printStackTrace(); } e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 從控制檯獲取用戶輸入 * @throws IOException */ private static void receiveFromUser() throws IOException{ //阻塞直到控制檯有輸入 BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); for(String msg = br.readLine();msg!=null&&!msg.equals("bye");msg = br.readLine()){ //同步鎖避免線程競爭 synchronized (sendBuffer) { sendBuffer.clear(); //編碼 sendBuffer.put(charset.encode(msg)); //分割副 sendBuffer.put(charset.encode("->")); //目標名 sendBuffer.put(charset.encode(targetName)); send(socketChannel); } } } /** * 接收服務端的數據 * @param key */ private static void receive(SelectionKey key) throws IOException { //獲取服務端的channel SocketChannel channel = (SocketChannel) key.channel(); //爲寫入緩衝器作準備position=0,limit=capacity receiveBuffer.clear(); //從服務端的channel把數據讀入緩衝器 channel.read(receiveBuffer); //position=0,limit=有效下標最後一位 receiveBuffer.flip(); //解碼 String msg = charset.decode(receiveBuffer).toString(); //輸出到控制檯 System.out.println(msg); } /** * 發送到服務端 */ private static void send(SocketChannel sendChannel) throws IOException { if(sendBuffer.remaining()!=0){ synchronized (sendBuffer){ sendBuffer.flip(); sendChannel.write(sendBuffer); } } } }