Socket編程基礎

一:socket又稱「套接字」,至關於「排插」,一一對應,一個客戶端對應一個服務端,原始的socket通訊爲,每當一個客戶端socket新接入,服務端serverSocket就得就得新建一個線程。缺點:客戶端一多就容易搞崩服務端。(爲傳統的BIO編程)java

例子1:編程

服務端代碼:數組

package bhz.bio;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

// 服務端
public class Server {

    final static int PROT = 8765;
    
    public static void main(String[] args) {
        
        ServerSocket server = null;
        try {
            server = new ServerSocket(PROT);
            System.out.println(" server start .. ");
            //進行阻塞
            Socket socket = server.accept();
            //新建一個線程執行客戶端的任務:每來一個Socket就新建一個線程,一多就容易將服務器撐爆。
            new Thread(new ServerHandler(socket)).start();
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(server != null){
                try {
                    server.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            server = null;
        }
                        
    }    
    
}

服務端接收請求後進行run()響應緩存

package bhz.bio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

// 服務端接收請求響應後的具體操做
public class ServerHandler implements Runnable{

    private Socket socket ;
    
    public ServerHandler(Socket socket){
        this.socket = socket;
    }
    
    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            // 接收客戶端數據
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            // 向客戶端響應數據
            out = new PrintWriter(this.socket.getOutputStream(), true);
            String body = null;
            while(true){
                // 讀取客戶端數據
                body = in.readLine();
                if(body == null) break;
                System.out.println("Server :" + body);
                out.println("服務器端回送響的應數據.");
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(in != null){
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(out != null){
                try {
                    out.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(socket != null){
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            socket = null;
        }
        
        
    }

}

客戶端請求:服務器

package bhz.bio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class Client {

    final static String ADDRESS = "127.0.0.1";
    final static int PORT = 8765;
    
    public static void main(String[] args) {
        
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        
        try {
            // 建立Socket鏈接
            socket = new Socket(ADDRESS, PORT);
            // 接收服務端請求數據
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            // 向服務端發送響應數據
            out = new PrintWriter(socket.getOutputStream(), true);
            
            //向服務器端發送數據
            out.println("接收到客戶端的請求數據...");
            out.println("接收到客戶端的請求數據1111...");
            // 讀取服務端響應的數據
            String response = in.readLine();
            System.out.println("Client: " + response);
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(in != null){
                try {
                    in.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if(out != null){
                try {
                    out.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            if(socket != null){
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            socket = null;
        }
    }
}

二:爲擺脫服務端每接收一次客戶端請求就得建立一個線程的問題,能夠自定義線程池,限制線程、隊列的大小。(爲傳統的BIO編程)異步

自定義線程池:socket

package bhz.bio2;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

// 服務端自定的線程池
public class HandlerExecutorPool {

    private ExecutorService executor;
    // 構造方法,定義參數值
    public HandlerExecutorPool(int maxPoolSize, int queueSize){
        this.executor = new ThreadPoolExecutor(
                Runtime.getRuntime().availableProcessors(),
                maxPoolSize, 
                120L, 
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(queueSize));
    }
    
    public void execute(Runnable task){
        this.executor.execute(task);
    }
        
}

 

服務端使用線程池:ide

package bhz.bio2;

import java.io.BufferedReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {

    final static int PORT = 8765;

    public static void main(String[] args) {
        ServerSocket server = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            server = new ServerSocket(PORT);
            System.out.println("server start");
            Socket socket = null;
            // 自定義一個線程池
            HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000);
            while(true){
                // 阻塞,接收客戶端請求
                socket = server.accept();
                // 將客戶端請求放在線程池中,進行服務端響應
                executorPool.execute(new ServerHandler(socket));
            }
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(in != null){
                try {
                    in.close();
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            }
            if(out != null){
                try {
                    out.close();
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
            if(server != null){
                try {
                    server.close();
                } catch (Exception e3) {
                    e3.printStackTrace();
                }
            }
            server = null;                
        }

    }
        
}

服務端執行請求響應操做:this

package bhz.bio2;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class ServerHandler implements Runnable {

    private Socket socket;
    public ServerHandler (Socket socket){
        this.socket = socket;
    }
    
    // 執行線程操做
    @Override
    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            // 接收客戶端請求數據
            in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
            // 發送服務端響應數據
            out = new PrintWriter(this.socket.getOutputStream(), true);
            String body = null;
            while(true){
                body = in.readLine();
                if(body == null) break;
                System.out.println("Server:" + body);
                out.println("Server response");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if(in != null){
                try {
                    in.close();
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            }
            if(out != null){
                try {
                    out.close();
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
            if(socket != null){
                try {
                    socket.close();
                } catch (Exception e3) {
                    e3.printStackTrace();
                }
            }
            socket = null;            
        }
        
        
    }

}

客戶端請求:spa

package bhz.bio2;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;

public class Client {
    
    final static String ADDRESS = "127.0.0.1";
    final static int PORT =8765;
    
    public static void main(String[] args) {
        Socket socket = null;
        BufferedReader in = null;
        PrintWriter out = null;
        try {
            // 建立Socket鏈接
            socket = new Socket(ADDRESS, PORT);
            // 接收服務端返回數據
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            // 發送客戶端響應數據
            out = new PrintWriter(socket.getOutputStream(), true);
            
            out.println("Client request");
            
            String response = in.readLine();
            System.out.println("Client:" + response);
            
            
        }  catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            if(in != null){
                try {
                    in.close();
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            }
            if(out != null){
                try {
                    out.close();
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
            if(socket != null){
                try {
                    socket.close();
                } catch (Exception e3) {
                    e3.printStackTrace();
                }
            }
            socket = null;                
        }
        
        
        
    }

}

三:NIO的Socket通訊:同步非阻塞

1.NIO三個重要概念:Buffer(緩衝區)、Channel(通道)、選擇器(Selector)。

Buffer(緩衝區):用於通訊傳輸時的緩衝

Channel(通道):客戶端和服務端都有一個通道,分別爲:socketChannel和ServerSocketChannel,這兩個類型管道都得註冊到選擇器(Seletoer)上,由選擇器進行統一的通訊管理。

選擇器(Selector):對客戶端和服務端的通訊管道進行統一管理

2.Buffer的基本概念操做:

package bhz.nio.test;

import java.nio.IntBuffer;

public class TestBuffer {
    
    public static void main(String[] args) {
        
        // 1 基本操做
        
        //建立指定長度的緩衝區
        IntBuffer buf = IntBuffer.allocate(10);
        buf.put(13);// position位置:0 - > 1   緩衝區加入數據
        buf.put(21);// position位置:1 - > 2
        buf.put(35);// position位置:2 - > 3
        //把位置復位爲0,也就是position位置:3 - > 0
        buf.flip();
        System.out.println("使用flip復位:" + buf);
        System.out.println("容量爲: " + buf.capacity());    //容量一旦初始化後不容許改變(warp方法包裹數組除外)
        System.out.println("限制爲: " + buf.limit());        //因爲只裝載了三個元素,因此可讀取或者操做的元素爲3 則limit=3
        
        
        System.out.println("獲取下標爲1的元素:" + buf.get(1));
        System.out.println("get(index)方法,position位置不改變:" + buf);
        buf.put(1, 4);
        System.out.println("put(index, change)方法,position位置不變:" + buf);;
        
        for (int i = 0; i < buf.limit(); i++) {
            //調用get方法會使其緩衝區位置(position)向後遞增一位
            System.out.print(buf.get() + "\t");
        }
        System.out.println("buf對象遍歷以後爲: " + buf);
        
        
        // 2 wrap方法使用
        /**
        //  wrap方法會包裹一個數組: 通常這種用法不會先初始化緩存對象的長度,由於沒有意義,最後還會被wrap所包裹的數組覆蓋掉。 
        //  而且wrap方法修改緩衝區對象的時候,數組自己也會跟着發生變化。                     
        int[] arr = new int[]{1,2,5};
        IntBuffer buf1 = IntBuffer.wrap(arr);
        System.out.println(buf1);
        
        IntBuffer buf2 = IntBuffer.wrap(arr, 0 , 2);
        //這樣使用表示容量爲數組arr的長度,可是可操做的元素只有實際進入緩存區的元素長度
        System.out.println(buf2);
        */
        
        
        // 3 其餘方法
        /**
        IntBuffer buf1 = IntBuffer.allocate(10);
        int[] arr = new int[]{1,2,5};
        buf1.put(arr);
        System.out.println(buf1);
        //一種複製方法
        IntBuffer buf3 = buf1.duplicate();
        System.out.println(buf3);
        
        //設置buf1的位置屬性
        //buf1.position(0);
        buf1.flip();
        System.out.println(buf1);
        
        System.out.println("可讀數據爲:" + buf1.remaining());
        
        int[] arr2 = new int[buf1.remaining()];
        //將緩衝區數據放入arr2數組中去
        buf1.get(arr2);
        for(int i : arr2){
            System.out.print(Integer.toString(i) + ",");
        }
        */
        
    }
}

2.使用管道Channel和選擇器Seletor的服務端(接收客戶端請求):

package bhz.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class Server implements Runnable{
    //1 多路複用器(管理全部的通道),channel會註冊到這個seletor對象上
    private Selector seletor;
    //2 創建緩衝區
    private ByteBuffer readBuf = ByteBuffer.allocate(1024);
    //3 
    private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
    // 構造方法,初始化參數
    public Server(int port){
        try {
            //1 打開路複用器
            this.seletor = Selector.open();
            //2 打開服務器通道
            ServerSocketChannel ssc = ServerSocketChannel.open();
            //3 設置服務器通道爲非阻塞模式,爲true的話和傳統的阻塞socket沒區別
            ssc.configureBlocking(false);
            //4 綁定地址
            ssc.bind(new InetSocketAddress(port));
            //5 把服務器通道註冊到多路複用器上,而且監聽阻塞事件
            ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
            
            System.out.println("Server start, port :" + port);
            
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    // 線程開啓後執行
    @Override
    public void run() {
        while(true){
            try {
                //1 必需要讓多路複用器開始監聽
                this.seletor.select();
                //2 返回多路複用器已經選擇的結果集,客戶端chanal和服務端的serverChanal都會註冊到seletor上(註冊的值爲Key),此處拿到的keys包含了客戶端的和服務端的key
                Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
                //3 對結果集進行遍歷
                while(keys.hasNext()){
                    //4 獲取一個選擇的元素
                    SelectionKey key = keys.next();
                    //5 直接從容器中移除就能夠了
                    keys.remove();
                    //6 若是是有效的
                    if(key.isValid()){
                        //7 若是爲阻塞狀態,說明key爲服務端的serverSocketChanal。此處NIO爲接收客戶端請求,同步不阻塞
                        if(key.isAcceptable()){
                            this.accept(key);
                        }
                        //8 若是爲可讀狀態
                        if(key.isReadable()){
                            // 讀取每個客戶端的SocketChannle發送過來的數據
                            this.read(key);
                        }
                        //9 寫數據
                        if(key.isWritable()){
                            //this.write(key); //ssc
                        }
                    }
                    
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    private void write(SelectionKey key){
        //ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
        //ssc.register(this.seletor, SelectionKey.OP_WRITE);
    }
    
    // 接收讀取客戶端數據
    private void read(SelectionKey key) {
        try {
            //1 清空緩衝區舊的數據
            this.readBuf.clear();
            //2 獲取以前註冊的socket通道對象
            SocketChannel sc = (SocketChannel) key.channel();
            //3 讀取數據
            int count = sc.read(this.readBuf);
            //4 若是沒有數據
            if(count == -1){
                key.channel().close();
                key.cancel();
                return;
            }
            //5 有數據則進行讀取 讀取以前須要進行復位方法(把position 和limit進行復位)
            this.readBuf.flip();
            //6 根據緩衝區的數據長度建立相應大小的byte數組,接收緩衝區的數據
            byte[] bytes = new byte[this.readBuf.remaining()];
            //7 接收緩衝區數據
            this.readBuf.get(bytes);
            //8 打印客戶端傳送過來的結果數據
            String body = new String(bytes).trim();
            System.out.println("Server : " + body);
            
            // 9..能夠寫回給客戶端數據 
            
        } catch (IOException e) {
            e.printStackTrace();
        }
        
    }
    
    // 接收客戶端請求
    private void accept(SelectionKey key) {
        try {
            //1 獲取服務通道
            ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
            //2 執行阻塞方法,拿到客戶端管道,客戶端管道也要註冊到selector選擇器上
            SocketChannel sc = ssc.accept();
            //3 設置阻塞模式:非阻塞
            sc.configureBlocking(false);
            //4 註冊到多路複用器上,並設置讀取標識
            sc.register(this.seletor, SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        // 開啓服務,執行線程run()方法
        new Thread(new Server(8765)).start();;
    }
    
    
}

3.客戶端發送請求:

package bhz.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class Client {

    //須要一個Selector 
    public static void main(String[] args) {
        
        //建立鏈接的地址
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8765);
        
        //聲明鏈接通道
        SocketChannel sc = null;
        
        //創建緩衝區
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        try {
            //打開通道
            sc = SocketChannel.open();
            //進行鏈接
            sc.connect(address);
            // 鏈接成功
            while(true){
                //定義一個字節數組,而後使用系統錄入功能:
                byte[] bytes = new byte[1024];
                System.in.read(bytes);
                
                //把數據放到緩衝區中
                buf.put(bytes);
                //對緩衝區進行復位
                buf.flip();
                //寫出數據
                sc.write(buf);
                //清空緩衝區數據
                buf.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if(sc != null){
                try {
                    sc.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        
    }
    
}

四:AIO的Socket通訊:AIO是在NIO的基礎上,新增了異步通道,在管道前面使用了Asynchronous的,服務端和客戶端管道類分別爲:AsynchronousServerSocketChannel和AsynchronousSocketChannel。

AIO通訊的服務端爲:先建立線程池,線程組使用線程池並負責客戶端管道的接入,服務端開啓線程組。

1.服務端ServerCompletionHandler類的建立(服務端接收請求後的具體執行內容):

package bhz.aio;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;

public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {
    
    // 鏈接成功的操做方法,每個客戶端鏈接進來都進行此操做
    @Override
    public void completed(AsynchronousSocketChannel asc, Server attachment) {
        //當有下一個客戶端接入的時候 直接調用Server的accept方法,這樣反覆執行下去,保證多個客戶端均可以阻塞
        attachment.assc.accept(attachment, this);
        read(asc);
    }
    
    // 讀取客戶端管道數據
    private void read(final AsynchronousSocketChannel asc) {
        //讀取數據
        ByteBuffer buf = ByteBuffer.allocate(1024);
        // 異步,不堵塞,執行不成功也直接往下走
        asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer resultSize, ByteBuffer attachment) {
                //進行讀取以後,重置標識位
                attachment.flip();
                //得到讀取的字節數
                System.out.println("Server -> " + "收到客戶端的數據長度爲:" + resultSize);
                //獲取讀取的數據
                String resultData = new String(attachment.array()).trim();
                System.out.println("Server -> " + "收到客戶端的數據信息爲:" + resultData);
                // 發送服務端響應數據
                String response = "服務器響應, 收到了客戶端發來的數據: " + resultData;
                write(asc, response);
            }
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                exc.printStackTrace();
            }
        });
    }
    
    // 服務端寫響應數據
    private void write(AsynchronousSocketChannel asc, String response) {
        try {
            // 設置緩衝區大小
            ByteBuffer buf = ByteBuffer.allocate(1024);
            // 向客戶端發送響應數據
            buf.put(response.getBytes());
            buf.flip();
            asc.write(buf).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    
    // 鏈接失敗的操做方法
    @Override
    public void failed(Throwable exc, Server attachment) {
        exc.printStackTrace();
    }

}

2.服務端:

package bhz.aio;

import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Server {
    //線程池
    private ExecutorService executorService;
    //線程組
    private AsynchronousChannelGroup threadGroup;
    //服務器通道
    public AsynchronousServerSocketChannel assc;
    
    public Server(int port){
        try {
            //建立一個緩存池
            executorService = Executors.newCachedThreadPool();
            //建立線程組:異步,負責接收client的接入
            threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
            //建立服務器通道
            assc = AsynchronousServerSocketChannel.open(threadGroup);
            //進行綁定接收端口
            assc.bind(new InetSocketAddress(port));
            
            System.out.println("server start , port : " + port);
            //進行阻塞:一直等客戶端的請求,this指當前對象。new ServerCompletionHandler()爲進行通訊的具體操做內容
            assc.accept(this, new ServerCompletionHandler());
            //一直阻塞 不讓服務器中止
            Thread.sleep(Integer.MAX_VALUE);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        Server server = new Server(8765);
    }
    
}

3.客戶端:

package bhz.aio;

import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;

public class Client implements Runnable{

    private AsynchronousSocketChannel asc ;
    
    public Client() throws Exception {
        asc = AsynchronousSocketChannel.open();
    }
    
    // 鏈接服務端地址
    public void connect(){
        asc.connect(new InetSocketAddress("127.0.0.1", 8765));
    }
    
    // 向服務端寫數據
    public void write(String request){
        try {
            asc.write(ByteBuffer.wrap(request.getBytes())).get();
            read();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    //接收服務端數據
    private void read() {
        ByteBuffer buf = ByteBuffer.allocate(1024);
        try {
            asc.read(buf).get();
            buf.flip();
            byte[] respByte = new byte[buf.remaining()];
            buf.get(respByte);
            System.out.println(new String(respByte,"utf-8").trim());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public void run() {
        while(true){
            
        }
    }
    
    public static void main(String[] args) throws Exception {
        Client c1 = new Client();
        c1.connect();
        
        Client c2 = new Client();
        c2.connect();
        
        Client c3 = new Client();
        c3.connect();
        
        new Thread(c1, "c1").start();
        new Thread(c2, "c2").start();
        new Thread(c3, "c3").start();
        
        Thread.sleep(1000);
        
        c1.write("c1 aaa");
        c2.write("c2 bbbb");
        c3.write("c3 ccccc");
    }
    
}

客戶端發送請求,接收服務端的數據:

服務器響應, 收到了客戶端發來的數據: c1 aaa
服務器響應, 收到了客戶端發來的數據: c2 bbbb
服務器響應, 收到了客戶端發來的數據: c3 ccccc

五:思考:BIO和NIO,NIO和AIO的區別?(這三者如今開發都不會用了)

相關文章
相關標籤/搜索