上一遍說了nio的 channel 和buffer 進行讀寫,這一遍整理一下 nio實現非阻塞式sooket的通訊。java
先看看傳統的io 和socket實現tcp的通訊。安全
服務端代碼。服務器
package com.cn.socket; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** * Created by ZC-16-012 on 2018/11/1. * socket 服務端 */ public class MyServer { //將arrayList包裝成線程安全的list public static List<Socket> socketList= Collections.synchronizedList(new ArrayList<>()); public static void main(String[] args) throws IOException { ServerSocket ss = new ServerSocket(30000); while (true) { //阻塞式,若是客戶端一直沒有鏈接該服務端,該方法將一直阻塞下去 Socket s = ss.accept(); socketList.add(s); //每當客戶端鏈接成功後啓動一個線程爲該客戶端服務 new Thread(new ServerThread(s), "服務端").start(); } } }
package com.cn.socket; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.Socket; /** * Created by ZC-16-012 on 2018/11/1. * 服務端的線程,負責讀客戶端的數據 */ public class ServerThread implements Runnable { private Socket s =null; private BufferedReader br; public ServerThread(Socket s) { this.s = s; try { //初始化該線程的socket的輸入流,讀客戶端的數據 br= new BufferedReader(new InputStreamReader(s.getInputStream())); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { String content = null; while ((content = readFromClient()) != null) { for (Socket s: MyServer.socketList){ try { //將socket中讀到的數據再輸出給客戶端 PrintStream ps= new PrintStream(s.getOutputStream()); System.out.println("服務端讀到的客戶端數據="+content); ps.println(content); } catch (IOException e) { e.printStackTrace(); } } } } private String readFromClient(){ try { return br.readLine(); } catch (IOException e) { //若是捕獲到異常,說客戶端的socket已經關閉 MyServer.socketList.remove(s); e.printStackTrace(); } return null; } }
客戶端代碼socket
package com.cn.socket.client; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintStream; import java.net.Socket; /** * Created by ZC-16-012 on 2018/11/1. */ public class MyClient { public static void main(String[] args) throws IOException { Socket s= new Socket("127.0.0.1",30000); new Thread(new ClientThread(s),"客戶端").start(); //獲取該socket輸出流 PrintStream ps= new PrintStream(s.getOutputStream()); String line=null; BufferedReader br= new BufferedReader(new InputStreamReader(System.in)); while ((line=br.readLine())!=null){ ps.println(line); } } }
package com.cn.socket.client; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.Socket; /** * Created by ZC-16-012 on 2018/11/1. * * 客戶端的線程,負責讀取服務端輸出的數據,也就是輸入流 */ public class ClientThread implements Runnable { Socket s; BufferedReader br; public ClientThread(Socket s) throws IOException { this.s = s; br= new BufferedReader(new InputStreamReader(s.getInputStream())); } @Override public void run() { String content= null; try { while ((content=br.readLine())!=null){ System.out.print(content); } } catch (IOException e) { e.printStackTrace(); } } }
傳統的阻塞式socket主要用的對象是serverSocket和 socket對象,其中如上代碼所示,serverSocket.accpet()方法是阻塞式的,假如沒有客戶端鏈接,該方法一直阻塞在這裏。tcp
而後看一下非阻塞式的socket通訊ide
服務端代碼:this
package com.cn.socket.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.Charset; /** * Created by ZC-16-012 on 2018/11/1. */ public class NioServer { private Selector selector = null; private Charset charset= Charset.forName("UTF-8"); /** * 服務器上的全部channel(包括ServerSocketChannel和SocketChannel)都須要向selector註冊, * selector負責監視這些socket的io狀態,當其中任意一個或者多個channel具備可用的IO操做時, * 該selector的select()方法會返回大於0的整數。當selector上全部的channel沒有須要處理的io時, * 則該selector的select()方法會阻塞 * */ public void init() throws IOException { //開啓一個selector註冊中心 selector = Selector.open(); //開啓一個serverSocketChannel,相似於傳統socket中的serverSocket ServerSocketChannel server = ServerSocketChannel.open(); InetSocketAddress address = new InetSocketAddress("127.0.0.1", 30000); server.bind(address); //設置socketServer以非阻塞式方式,默認是阻塞模式 server.configureBlocking(false); //將socketChannel註冊到selector中,ServerSocketChannel只支持OP_ACCEPT操做 server.register(selector, SelectionKey.OP_ACCEPT); //select()監控全部註冊的channel,當他們中間有須要處理的io時,將對應的SelectionKey加入被選擇的selectedKey集合中 //並返回該chanel的數量 while (selector.select() > 0) { //selectedKeys()獲取全部被選擇的channel,SelectionKey表明全部selectableChannel和selector註冊關係 for (SelectionKey sk : selector.selectedKeys()) { selector.selectedKeys().remove(sk); //判斷該key是否有對應的客戶端鏈接 if (sk.isAcceptable()) { //調用accept()接收鏈接,產生服務端的SocketChannel SocketChannel sc = server.accept(); sc.configureBlocking(false); //註冊到selector監控中心 sc.register(selector, SelectionKey.OP_READ); //將sk對應的channel設置成準備接收其餘請求 sk.interestOps(SelectionKey.OP_ACCEPT); } //判斷服務端是否有數據可讀,也就是sk對應的channel if (sk.isReadable()) { SocketChannel sc = (SocketChannel) sk.channel(); ByteBuffer buff = ByteBuffer.allocate(1024); String content = ""; try { while ((sc.read(buff)) > 0) { buff.flip(); //將子節解碼成字符 content += charset.decode(buff); System.out.println("服務端讀取數據:" + content); //將sk對應的channel設置成準備下一讀取 sk.interestOps(SelectionKey.OP_READ); } } catch (IOException e) { e.printStackTrace(); //若是讀取的出現異常,說明該channel對應的client出現問題 sk.cancel(); if (sk.channel() != null) { sk.channel().close(); } } if (content.length() > 0) { for (SelectionKey key : selector.keys()) { Channel channel = key.channel(); if (channel instanceof SocketChannel) { SocketChannel socketChannel = (SocketChannel) channel; socketChannel.write(charset.encode(content)); } } } } } } } public static void main(String[] args){ try { new NioServer().init(); } catch (IOException e) { e.printStackTrace(); } } }
客戶端代碼:編碼
package com.cn.socket.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Scanner; /** * Created by ZC-16-012 on 2018/11/5. * nio 客戶端 */ public class NioClient { private Selector selector = null; private Charset charset = Charset.forName("UTF-8"); private SocketChannel sc = null; public void init() throws IOException { selector = Selector.open(); InetSocketAddress address = new InetSocketAddress("127.0.0.1", 30000); sc = SocketChannel.open(address); //將該socket以非阻塞式方式工做 sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); //開啓客戶端讀取數據線程 new ClientThread().start(); //鍵盤輸入,客戶端寫的數據 Scanner scan = new Scanner(System.in); while (scan.hasNextLine()) { String content = scan.nextLine(); //編碼 寫出去 sc.write(charset.encode(content)); } } public static void main(String[] args){ try { new NioClient().init(); } catch (IOException e) { e.printStackTrace(); } } private class ClientThread extends Thread{ @Override public void run() { try { while (selector.select()>0){ for (SelectionKey sk:selector.selectedKeys()){ //刪除正在處理的SelectionKey selector.selectedKeys().remove(sk); //若是sk對應的channel有可讀的數據 if (sk.isReadable()){ SocketChannel sc= (SocketChannel) sk.channel(); ByteBuffer buff= ByteBuffer.allocate(1024); String content=""; while (sc.read(buff)>0){ sc.read(buff); buff.flip(); content+=charset.decode(buff); } System.out.println("聊天信息:" + content); //爲下一次讀取作準備 sk.interestOps(SelectionKey.OP_READ); } } } } catch (IOException e) { e.printStackTrace(); } } } }
這裏用到selector註冊中心以及channel的概念。利用nio的 charset 編碼解碼,buffer讀取數據。.net