Socket編程入門(基於Java實現)

認識Socket

socket,又稱套接字,是在不一樣的進程間進行網絡通信的一種協議、約定或者說是規範。java

對於socket編程,它更多的時候像是基於TCP/UDP等協議作的一層封裝或者說抽象,是一套系統所提供的用於進行網絡通訊相關編程的接口。linux

創建socket的基本流程

咱們以linux操做系統提供的基本api爲例,瞭解創建一個socket通訊的基本流程:編程

能夠看到本質上,socket是對tcp鏈接(固然也有多是udp等其餘鏈接)協議,在編程層面上的簡化和抽象。api

1.最基本的Socket示範

1.1 單向通訊

首先,咱們從只發送和接收一次消息的socket基礎代碼開始:數組

服務端:bash

package com.marklux.socket.base;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * The very basic socket server that only listen one single message.
 */

public class BaseSocketServer {

    private ServerSocket server;
    private Socket socket;
    private int port;
    private InputStream inputStream;
    private static final int MAX_BUFFER_SIZE = 1024;

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public BaseSocketServer(int port) {
        this.port = port;
    }

    public void runServerSingle() throws IOException {
        this.server = new ServerSocket(this.port);

        System.out.println("base socket server started.");
        // the code will block here till the request come.
        this.socket = server.accept();

        this.inputStream = this.socket.getInputStream();

        byte[] readBytes = new byte[MAX_BUFFER_SIZE];

        int msgLen;
        StringBuilder stringBuilder = new StringBuilder();

        while ((msgLen = inputStream.read(readBytes)) != -1) {
            stringBuilder.append(new String(readBytes,0,msgLen,"UTF-8"));
        }

        System.out.println("get message from client: " + stringBuilder);

        inputStream.close();
        socket.close();
        server.close();
    }

    public static void main(String[] args) {
        BaseSocketServer bs = new BaseSocketServer(9799);
        try {
            bs.runServerSingle();
        }catch (IOException e) {
            e.printStackTrace();
        }

    }
}

複製代碼

客戶端:服務器

package com.marklux.socket.base;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.Socket;

/**
 * The very basic socket client that only send one single message.
 */

public class BaseSocketClient {
    private String serverHost;
    private int serverPort;
    private Socket socket;
    private OutputStream outputStream;

    public BaseSocketClient(String host, int port) {
        this.serverHost = host;
        this.serverPort = port;
    }

    public void connetServer() throws IOException {
        this.socket = new Socket(this.serverHost, this.serverPort);
        this.outputStream = socket.getOutputStream();
        // why the output stream?
    }

    public void sendSingle(String message) throws IOException {
        try {
            this.outputStream.write(message.getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {
            System.out.println(e.getMessage());
        }
        this.outputStream.close();
        this.socket.close();
    }

    public static void main(String[] args) {
        BaseSocketClient bc = new BaseSocketClient("127.0.0.1",9799);
        try {
            bc.connetServer();
            bc.sendSingle("Hi from mark.");
        }catch (IOException e) {
            e.printStackTrace();
        }
    }
}

複製代碼

先運行服務端,再運行客戶端,就能夠看到效果。網絡

  • 注意這裏的IO操做實現,咱們使用了一個大小爲MAX_BUFFER_SIZE的byte數組做爲緩衝區,而後從輸入流中取出字節放置到緩衝區,再從緩衝區中取出字節構建到字符串中去,這在輸入流文件很大時很是有用,事實上,後面要講到的NIO也是基於這種思路實現的。

1.2 雙向通訊

上面的例子只實現了一次單向的通訊,這顯然有點浪費通道。socket鏈接支持全雙工的雙向通訊(底層是tcp),下面的例子中,服務端在收到客戶端的消息後,將返回給客戶端一個回執。多線程

而且咱們使用了一些java.io包裝好的方法,來簡化整個通訊的流程(由於消息長度不大,再也不使用緩衝區)。併發

服務端:

public void runServer() throws IOException {
        this.serverSocket = new ServerSocket(port);
        this.socket = serverSocket.accept();
        this.inputStream = socket.getInputStream();

        String message = new String(inputStream.readAllBytes(), "UTF-8");

        System.out.println("received message: " + message);

        this.socket.shutdownInput(); // 告訴客戶端接收已經完畢,以後只能發送

        // write the receipt.

        this.outputStream = this.socket.getOutputStream();
        String receipt = "We received your message: " + message;
        outputStream.write(receipt.getBytes("UTF-8"));

        this.outputStream.close();
        this.socket.close();
    }
複製代碼

客戶端:

public void sendMessage(String message) throws IOException {
        this.socket = new Socket(host,port);
        this.outputStream = socket.getOutputStream();
        this.outputStream.write(message.getBytes("UTF-8"));
        this.socket.shutdownOutput(); // 告訴服務器,全部的發送動做已經結束,以後只能接收
        this.inputStream = socket.getInputStream();
        String receipt = new String(inputStream.readAllBytes(), "UTF-8");
        System.out.println("got receipt: " + receipt);
        this.inputStream.close();
        this.socket.close();
    }
複製代碼
  • 注意這裏咱們在服務端接受到消息以及客戶端發送消息後,分別調用了shutdownInput()shutdownOutput()而不是直接close對應的stream,這是由於在關閉任何一個stream,都會直接致使socket的關閉,也就沒法進行後面回執的發送了。

  • 可是注意,調用shutdownInput()shutdownOutput()以後,對應的流也會被關閉,不能再次向socket發送/寫入了。

2. 發送更多的消息:結束的界定

剛纔的兩個例子中,每次打開流,都只能進行一次寫入/讀取操做,結束後對應流被關閉,就沒法再次寫入/讀取了。

在這種狀況下,若是要發送兩次消息,就不得不創建兩個socket,既耗資源又麻煩。其實咱們徹底能夠不關閉對應的流,只要分次寫入消息就能夠了。

可是這樣的話,咱們就必須面對另外一個問題:如何判斷一次消息發送的結束呢?

2.1 使用特殊符號

最簡單的辦法是使用一些特殊的符號來標記一次發送完成,服務端只要讀到對應的符號就能夠完成一次讀取,而後進行相關的處理操做。

下面的例子中咱們使用換行符\n來標記一次發送的結束,服務端每接收到一個消息,就打印一次,而且使用了Scanner來簡化操做:

服務端:

public void runServer() throws IOException {
        this.server = new ServerSocket(this.port);

        System.out.println("base socket server started.");

        this.socket = server.accept();
        // the code will block here till the request come.

        this.inputStream = this.socket.getInputStream();
        Scanner sc = new Scanner(this.inputStream);
        while (sc.hasNextLine()) {
            System.out.println("get info from client: " + sc.nextLine());
        } // 循環接收並輸出消息內容
        this.inputStream.close();
        socket.close();
    }
複製代碼

客戶端:

public void connetServer() throws IOException {
        this.socket = new Socket(this.serverHost, this.serverPort);
        this.outputStream = socket.getOutputStream();
    }

public void send(String message) throws IOException {
        String sendMsg = message + "\n"; // we mark \n as a end of line.
        try {
            this.outputStream.write(sendMsg.getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {
            System.out.println(e.getMessage());
        }
//        this.outputStream.close();
//        this.socket.shutdownOutput();
    }

 public static void main(String[] args) {
        CycleSocketClient cc = new CycleSocketClient("127.0.0.1", 9799);
        try {
            cc.connetServer();
            Scanner sc = new Scanner(System.in);
            while (sc.hasNext()) {
                String line = sc.nextLine();
                cc.send(line);
            }
        }catch (IOException e) {
            e.printStackTrace();
        }
    }
複製代碼

運行後效果是,客戶端每輸入一行文字按下回車後,服務端就會打印出對應的消息讀取記錄。

2.2 根據長度界定

回到原點,咱們之因此很差定位消息何時結束,是由於咱們不可以肯定每次消息的長度。

那麼其實能夠先將消息的長度發送出去,當服務端知道消息的長度後,就可以完成一次消息的接收了。

總的來講,發送一次消息變成了兩個步驟

  1. 發送消息的長度
  2. 發送消息

最後的問題就是,「發送消息的長度」這一步驟所發送的字節量必須是固定的,不然咱們仍然會陷入僵局。

通常來講,咱們可使用固定的字節數來保存消息的長度,好比規定前2個字節就是消息的長度,不過這樣咱們可以傳送的消息最大長度也就被固定死了,以2個字節爲例,咱們發送的消息最大長度不超過2^16個字節即64K。

若是你瞭解一些字符的編碼,就會知道,其實咱們可使用變長的空間來儲存消息的長度,好比:

第一個字節首位爲0:即0XXXXXXX,表示長度就一個字節,最大128,表示128B
第一個字節首位爲110,那麼附帶後面一個字節表示長度:即110XXXXX 10XXXXXX,最大2048,表示2K
第一個字節首位爲1110,那麼附帶後面二個字節表示長度:即110XXXXX 10XXXXXX 10XXXXXX,最大131072,表示128K
依次類推
複製代碼

固然這樣實現起來會麻煩一些,所以下面的例子裏咱們仍然使用固定的兩個字節來記錄消息的長度。

服務端:

public void runServer() throws IOException {
        this.serverSocket = new ServerSocket(this.port);
        this.socket = serverSocket.accept();
        this.inputStream = socket.getInputStream();
        byte[] bytes;
        while (true) {
            // 先讀第一個字節
            int first = inputStream.read();
            if (first == -1) {
                // 若是是-1,說明輸入流已經被關閉了,也就不須要繼續監聽了
                this.socket.close();
                break;
            }
            // 讀取第二個字節
            int second = inputStream.read();

            int length = (first << 8) + second; // 用位運算將兩個字節拼起來成爲真正的長度

            bytes = new byte[length]; // 構建指定長度的字節大小來儲存消息便可

            inputStream.read(bytes);

            System.out.println("receive message: " + new String(bytes,"UTF-8"));
        }
    }
複製代碼

客戶端:

public void connetServer() throws IOException {
        this.socket = new Socket(host,port);
        this.outputStream = socket.getOutputStream();
    }

public void sendMessage(String message) throws IOException {
        // 首先要把message轉換成bytes以便處理
        byte[] bytes = message.getBytes("UTF-8");
        // 接下來傳輸兩個字節的長度,依然使用移位實現
        int length = bytes.length;
        this.outputStream.write(length >> 8); // write默認一次只傳輸一個字節
        this.outputStream.write(length);
        // 傳輸完長度後,再正式傳送消息
        this.outputStream.write(bytes);
    }

public static void main(String[] args) {
        LengthSocketClient lc = new LengthSocketClient("127.0.0.1",9799);
        try {
            lc.connetServer();
            Scanner sc = new Scanner(System.in);
            while (sc.hasNextLine()) {
                lc.sendMessage(sc.nextLine());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
複製代碼

3. 處理更多的鏈接:多線程

3.1 同時實現消息的發送與接收

在考慮服務端處理多鏈接以前,咱們先考慮使用多線程改造一下原有的一對一對話實例。

在原有的例子中,消息的接收方並不能主動地向對方發送消息,換句話說咱們並無實現真正的互相對話,這主要是由於消息的發送和接收這兩個動做並不能同時進行,所以咱們須要使用兩個線程,其中一個用於監聽鍵盤輸入並將其寫入socket,另外一個則負責監聽socket並將接受到的消息顯示。

出於簡單考慮,咱們直接讓主線程負責鍵盤監聽和消息發送,同時另外開啓一個線程用於拉取消息並顯示。

消息拉取線程 ListenThread.java

public class ListenThread implements Runnable {
    private Socket socket;
    private InputStream inputStream;

    public ListenThread(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() throws RuntimeException{
        try {
            this.inputStream = socket.getInputStream();
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }

        while (true) {
            try {
                int first = this.inputStream.read();
                if (first == -1) {
                    // 輸入流已經被關閉,無需繼續讀取
                    throw new RuntimeException("disconnected.");
                }
                int second = this.inputStream.read();
                int msgLength = (first<<8) + second;
                byte[] readBuffer = new byte[msgLength];
                this.inputStream.read(readBuffer);

                System.out.println("message from [" + socket.getInetAddress() + "]: " + new String(readBuffer,"UTF-8"));
            } catch (IOException e) {
                e.printStackTrace();
                throw new RuntimeException(e.getMessage());
            }
        }
    }
}
複製代碼

主線程,啓動時由用戶選擇是做爲server仍是client:

public class ChatSocket {
    private String host;
    private int port;
    private Socket socket;
    private ServerSocket serverSocket;
    private OutputStream outputStream;

    // 以服務端形式啓動,建立會話
    public void runAsServer(int port) throws IOException {
        this.serverSocket = new ServerSocket(port);
        System.out.println("[log] server started at port " + port);
        // 等待客戶端的加入
        this.socket = serverSocket.accept();
        System.out.println("[log] successful connected with " + socket.getInetAddress());
        // 啓動監聽線程
        Thread listenThread = new Thread(new ListenThread(this.socket));
        listenThread.start();
        waitAndSend();
    }

    // 以客戶端形式啓動,加入會話
    public void runAsClient(String host, int port) throws IOException {
        this.socket = new Socket(host, port);
        System.out.println("[log] successful connected to server " + socket.getInetAddress());
        Thread listenThread = new Thread(new ListenThread(this.socket));
        listenThread.start();
        waitAndSend();
    }

    public void waitAndSend() throws IOException {
        this.outputStream = this.socket.getOutputStream();
        Scanner sc = new Scanner(System.in);
        while (sc.hasNextLine()) {
            this.sendMessage(sc.nextLine());
        }
    }

    public void sendMessage(String message) throws IOException {
        byte[] msgBytes = message.getBytes("UTF-8");
        int length = msgBytes.length;
        outputStream.write(length>>8);
        outputStream.write(length);
        outputStream.write(msgBytes);
    }

    public static void main(String[] args) {
        Scanner scanner = new Scanner(System.in);
        ChatSocket chatSocket = new ChatSocket();
        System.out.println("select connect type: 1 for server and 2 for client");
        int type = Integer.parseInt(scanner.nextLine().toString());
        if (type == 1) {
            System.out.print("input server port: ");
            int port = scanner.nextInt();
            try {
                chatSocket.runAsServer(port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }else if (type == 2) {
            System.out.print("input server host: ");
            String host = scanner.nextLine();
            System.out.print("input server port: ");
            int port = scanner.nextInt();
            try {
                chatSocket.runAsClient(host, port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
複製代碼

3.2 使用線程池優化服務端併發能力

做爲服務端,若是一次只跟一個客戶端創建socket鏈接,未免顯得太過浪費資源,所以咱們徹底可讓服務端和多個客戶端創建多個socket。

那麼既然要處理多個鏈接,就不得不面對併發問題了(固然,你也能夠寫循環輪流處理)。咱們可使用多線程來處理併發,不過線程的建立和銷燬都會消耗大量的資源和時間,因此最好一步到位,用一個線程池來實現。

下面給出一個示範性質的服務端代碼:

public class SocketServer {
  public static void main(String args[]) throws Exception {
    // 監聽指定的端口
    int port = 55533;
    ServerSocket server = new ServerSocket(port);
    // server將一直等待鏈接的到來
    System.out.println("server將一直等待鏈接的到來");

    //若是使用多線程,那就須要線程池,防止併發太高時建立過多線程耗盡資源
    ExecutorService threadPool = Executors.newFixedThreadPool(100);
    
    while (true) {
      Socket socket = server.accept();
      
      Runnable runnable=()->{
        try {
          // 創建好鏈接後,從socket中獲取輸入流,並創建緩衝區進行讀取
          InputStream inputStream = socket.getInputStream();
          byte[] bytes = new byte[1024];
          int len;
          StringBuilder sb = new StringBuilder();
          while ((len = inputStream.read(bytes)) != -1) {
            // 注意指定編碼格式,發送方和接收方必定要統一,建議使用UTF-8
            sb.append(new String(bytes, 0, len, "UTF-8"));
          }
          System.out.println("get message from client: " + sb);
          inputStream.close();
          socket.close();
        } catch (Exception e) {
          e.printStackTrace();
        }
      };
      threadPool.submit(runnable);
    }

  }
}
複製代碼

4. 鏈接保活

我想你不難發現一個問題,那就是當socket鏈接成功創建後,若是中途發生異常致使其中一方斷開鏈接,此時另外一方是沒法發現的,只有在再次嘗試發送/接收消息纔會由於拋出異常而退出。

簡單的說,就是咱們維持的socket鏈接,是一個長鏈接,但咱們沒有保證它的時效性,上一秒它可能仍是能夠用的,可是下一秒就不必定了。

4.1 使用心跳包

保證鏈接隨時可用的最多見方法就是定時發送心跳包,來檢測鏈接是否正常。這對於實時性要求很高的服務而言,仍是很是重要的(好比消息推送)。

大致的方案以下:

  1. 雙方約定好心跳包的格式,要可以區別於普通的消息。
  2. 客戶端每隔必定時間,就向服務端發送一個心跳包
  3. 服務端每接收到心跳包時,將其拋棄
  4. 若是客戶端的某個心跳包發送失敗,就能夠判斷鏈接已經斷開
  5. 若是對實時性要求很高,服務端也能夠定時檢查客戶端發送心跳包的頻率,若是超過必定時間沒有發送能夠認爲鏈接已經斷開

4.2 斷開時重連

使用心跳包必然會增長帶寬和性能的負擔,對於普通的應用咱們其實並無必要使用這種方案,若是消息發送時拋出了鏈接異常,直接嘗試從新鏈接就行了。

跟上面的方案對比,其實這個拋出異常的消息就充當了心跳包的角色。

總的來講,鏈接是否要保活,如何保活,須要根據具體的業務場景靈活地思考和定製。

相關文章
相關標籤/搜索