java nio的一點整理(二)

上一遍說了nio的 channel 和buffer 進行讀寫,這一遍整理一下 nio實現非阻塞式sooket的通訊。java

先看看傳統的io 和socket實現tcp的通訊。安全

服務端代碼。服務器

package com.cn.socket;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
 * Created by ZC-16-012 on 2018/11/1.
 * socket 服務端
 */
public class MyServer {
    //將arrayList包裝成線程安全的list
    public static List<Socket> socketList= Collections.synchronizedList(new ArrayList<>());

    public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket(30000);

        while (true) {
            //阻塞式,若是客戶端一直沒有鏈接該服務端,該方法將一直阻塞下去
            Socket s = ss.accept();
            socketList.add(s);
            //每當客戶端鏈接成功後啓動一個線程爲該客戶端服務
            new Thread(new ServerThread(s), "服務端").start();
        }
    }
}
package com.cn.socket;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;

/**
 * Created by ZC-16-012 on 2018/11/1.
 * 服務端的線程,負責讀客戶端的數據
 */
public class ServerThread implements Runnable {

    private Socket s =null;

    private BufferedReader br;

    public ServerThread(Socket s) {
        this.s = s;
        try {
            //初始化該線程的socket的輸入流,讀客戶端的數據
            br= new BufferedReader(new InputStreamReader(s.getInputStream()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        String content = null;
        while ((content = readFromClient()) != null) {
              for (Socket s: MyServer.socketList){
                  try {
                      //將socket中讀到的數據再輸出給客戶端
                      PrintStream ps= new PrintStream(s.getOutputStream());
                      System.out.println("服務端讀到的客戶端數據="+content);
                      ps.println(content);
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }
        }
    }

    private String readFromClient(){
        try {
            return br.readLine();
        } catch (IOException e) {
            //若是捕獲到異常,說客戶端的socket已經關閉
            MyServer.socketList.remove(s);
            e.printStackTrace();
        }
        return null;
    }
}

客戶端代碼socket

package com.cn.socket.client;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;

/**
 * Created by ZC-16-012 on 2018/11/1.
 */
public class MyClient {

    public static void main(String[] args) throws IOException {
        Socket s= new Socket("127.0.0.1",30000);

        new Thread(new ClientThread(s),"客戶端").start();

        //獲取該socket輸出流
        PrintStream ps= new PrintStream(s.getOutputStream());
        String line=null;
        BufferedReader br= new BufferedReader(new InputStreamReader(System.in));
        while ((line=br.readLine())!=null){
            ps.println(line);
        }
    }
}
package com.cn.socket.client;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;

/**
 * Created by ZC-16-012 on 2018/11/1.
 *
 * 客戶端的線程,負責讀取服務端輸出的數據,也就是輸入流
 */
public class ClientThread implements Runnable {
    Socket s;

    BufferedReader br;

    public ClientThread(Socket s) throws IOException {
        this.s = s;
        br= new BufferedReader(new InputStreamReader(s.getInputStream()));
    }

    @Override
    public void run() {
         String content= null;
        try {
            while ((content=br.readLine())!=null){
                System.out.print(content);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

傳統的阻塞式socket主要用的對象是serverSocket和 socket對象,其中如上代碼所示,serverSocket.accpet()方法是阻塞式的,假如沒有客戶端鏈接,該方法一直阻塞在這裏。tcp

而後看一下非阻塞式的socket通訊ide

服務端代碼:this

package com.cn.socket.nio;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.Charset;

/**
 * Created by ZC-16-012 on 2018/11/1.
 */
public class NioServer {

    private  Selector selector = null;

    private Charset charset= Charset.forName("UTF-8");

    /**
    * 服務器上的全部channel(包括ServerSocketChannel和SocketChannel)都須要向selector註冊,
     * selector負責監視這些socket的io狀態,當其中任意一個或者多個channel具備可用的IO操做時,
     * 該selector的select()方法會返回大於0的整數。當selector上全部的channel沒有須要處理的io時,
     * 則該selector的select()方法會阻塞
    * */
    public void init() throws IOException {
        //開啓一個selector註冊中心
        selector = Selector.open();
        //開啓一個serverSocketChannel,相似於傳統socket中的serverSocket
        ServerSocketChannel server = ServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 30000);
        server.bind(address);
        //設置socketServer以非阻塞式方式,默認是阻塞模式
        server.configureBlocking(false);
        //將socketChannel註冊到selector中,ServerSocketChannel只支持OP_ACCEPT操做
        server.register(selector, SelectionKey.OP_ACCEPT);
        //select()監控全部註冊的channel,當他們中間有須要處理的io時,將對應的SelectionKey加入被選擇的selectedKey集合中
        //並返回該chanel的數量
        while (selector.select() > 0) {
            //selectedKeys()獲取全部被選擇的channel,SelectionKey表明全部selectableChannel和selector註冊關係
            for (SelectionKey sk : selector.selectedKeys()) {
                selector.selectedKeys().remove(sk);
                //判斷該key是否有對應的客戶端鏈接
                if (sk.isAcceptable()) {
                    //調用accept()接收鏈接,產生服務端的SocketChannel
                    SocketChannel sc = server.accept();
                    sc.configureBlocking(false);
                    //註冊到selector監控中心
                    sc.register(selector, SelectionKey.OP_READ);
                    //將sk對應的channel設置成準備接收其餘請求
                    sk.interestOps(SelectionKey.OP_ACCEPT);
                }
                //判斷服務端是否有數據可讀,也就是sk對應的channel
                if (sk.isReadable()) {
                    SocketChannel sc = (SocketChannel) sk.channel();
                    ByteBuffer buff = ByteBuffer.allocate(1024);
                    String content = "";
                    try {
                        while ((sc.read(buff)) > 0) {
                            buff.flip();
                            //將子節解碼成字符
                            content += charset.decode(buff);
                            System.out.println("服務端讀取數據:" + content);
                            //將sk對應的channel設置成準備下一讀取
                            sk.interestOps(SelectionKey.OP_READ);
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                        //若是讀取的出現異常,說明該channel對應的client出現問題
                        sk.cancel();
                        if (sk.channel() != null) {
                            sk.channel().close();
                        }

                    }


                    if (content.length() > 0) {
                        for (SelectionKey key : selector.keys()) {
                            Channel channel = key.channel();
                            if (channel instanceof SocketChannel) {
                                SocketChannel socketChannel = (SocketChannel) channel;
                                socketChannel.write(charset.encode(content));
                            }
                        }
                    }
                }

            }
        }

    }

    public static void main(String[] args){
        try {
            new NioServer().init();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客戶端代碼:編碼

package com.cn.socket.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;

/**
 * Created by ZC-16-012 on 2018/11/5.
 * nio 客戶端
 */
public class NioClient {

    private Selector selector = null;

    private Charset charset = Charset.forName("UTF-8");

    private SocketChannel sc = null;


    public void init() throws IOException {
        selector = Selector.open();
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 30000);
        sc = SocketChannel.open(address);
        //將該socket以非阻塞式方式工做
        sc.configureBlocking(false);
        sc.register(selector, SelectionKey.OP_READ);
        //開啓客戶端讀取數據線程
        new ClientThread().start();
        //鍵盤輸入,客戶端寫的數據
        Scanner scan = new Scanner(System.in);
        while (scan.hasNextLine()) {
            String content = scan.nextLine();
            //編碼 寫出去
            sc.write(charset.encode(content));
        }
    }

    public static void main(String[] args){
        try {
            new NioClient().init();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    private class ClientThread extends Thread{
        @Override
        public void run() {
            try {
                while (selector.select()>0){
                    for (SelectionKey sk:selector.selectedKeys()){
                       //刪除正在處理的SelectionKey
                        selector.selectedKeys().remove(sk);
                        //若是sk對應的channel有可讀的數據
                        if (sk.isReadable()){
                           SocketChannel sc= (SocketChannel) sk.channel();
                            ByteBuffer buff= ByteBuffer.allocate(1024);
                            String content="";
                            while (sc.read(buff)>0){
                                sc.read(buff);
                                buff.flip();
                                content+=charset.decode(buff);
                            }
                            System.out.println("聊天信息:" + content);
                            //爲下一次讀取作準備
                            sk.interestOps(SelectionKey.OP_READ);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

這裏用到selector註冊中心以及channel的概念。利用nio的 charset 編碼解碼,buffer讀取數據。.net

相關文章
相關標籤/搜索