本文由
玉剛說寫做平臺
[1]提供寫做贊助java原做者:
水晶蝦餃
[2]git版權聲明:本文版權歸微信公衆號
玉剛說
全部,未經許可,不得以任何形式轉載github
本篇咱們先簡單瞭解一下 TCP/IP,而後經過實現一個 echo 服務器來學習 Java 的 Socket API。最後咱們聊聊偏高級一點點的 socket 長鏈接和協議設計。面試
首先咱們看 IP(Internet Protocol)協議。IP 協議提供了主機和主機間的通訊。shell
爲了完成不一樣主機的通訊,咱們須要某種方式來惟一標識一臺主機,這個標識,就是著名的IP地址。經過IP地址,IP 協議就可以幫咱們把一個數據包發送給對方。數據庫
前面咱們說過,IP 協議提供了主機和主機間的通訊。TCP 協議在 IP 協議提供的主機間通訊功能的基礎上,完成這兩個主機上進程對進程的通訊。編程
有了 IP,不一樣主機就可以交換數據。可是,計算機收到數據後,並不知道這個數據屬於哪一個進程(簡單講,進程就是一個正在運行的應用程序)。TCP 的做用就在於,讓咱們可以知道這個數據屬於哪一個進程,從而完成進程間的通訊。bash
爲了標識數據屬於哪一個進程,咱們給須要進行 TCP 通訊的進程分配一個惟一的數字來標識它。這個數字,就是咱們常說的端口號。服務器
TCP 的全稱是 Transmission Control Protocol,你們對它說得最多的,大概就是面向鏈接的特性了。之因此說它是有鏈接的,是說在進行通訊前,通訊雙方須要先通過一個三次握手的過程。三次握手完成後,鏈接便創建了。這時候咱們才能夠開始發送/接收數據。(與之相對的是 UDP,不須要通過握手,就能夠直接發送數據)。微信
下面咱們簡單瞭解一下三次握手的過程。
SYN
,假設此時 sequence number 爲 x
。這個 x
是由操做系統根據必定的規則生成的,不妨認爲它是一個隨機數。SYN
後,會向客戶端再發送一個 SYN
,此時服務器的 seq number = y
。與此同時,會 ACK x+1
,告訴客戶端「已經收到了 SYN
,能夠發送數據了」。SYN
後,回覆一個 ACK y+1
,這個 ACK
則是告訴服務器,SYN
已經收到,服務器能夠發送數據了。通過這 3 步,TCP 鏈接就創建了。這裏須要注意的有三點:
ACK
的時候,TCP 協議是容許咱們攜帶數據的。之因此作不到,是 API 的限制致使的。TCP/IP 相關的理論知識咱們就先了解到這裏。關於 TCP,還有諸如可靠性、流量控制、擁塞控制等很是有趣的特性,強烈推薦讀者看一看 Richard 的名著《TCP/IP 詳解 - 卷1》(注意,是第1版,不是第2版)。
下面咱們看一些偏實戰的東西。
Socket 是 TCP 層的封裝,經過 socket,咱們就能進行 TCP 通訊。
在 Java 的 SDK 中,socket 的共有兩個接口:用於監聽客戶鏈接的 ServerSocket
和用於通訊的 Socket
。使用 socket 的步驟以下:
ServerSocket
並監聽客戶鏈接Socket
鏈接服務端Socket
獲取輸入輸出流進行通訊下面,咱們經過實現一個簡單的 echo 服務來學習 socket 的使用。所謂的 echo 服務,就是客戶端向服務端寫入任意數據,服務器都將數據原封不動地寫回給客戶端。
1. 建立 ServerSocket 並監聽客戶鏈接
public class EchoServer {
private final ServerSocket mServerSocket;
public EchoServer(int port) throws IOException {
// 1. 建立一個 ServerSocket 並監聽端口 port
mServerSocket = new ServerSocket(port);
}
public void run() throws IOException {
// 2. 開始接受客戶鏈接
Socket client = mServerSocket.accept();
handleClient(client);
}
private void handleClient(Socket socket) {
// 3. 使用 socket 進行通訊 ...
}
public static void main(String[] argv) {
try {
EchoServer server = new EchoServer(9877);
server.run();
} catch (IOException e) {
e.printStackTrace();
}
}
}
複製代碼
2. 使用 Socket 鏈接服務端
public class EchoClient {
private final Socket mSocket;
public EchoClient(String host, int port) throws IOException {
// 建立 socket 並鏈接服務器
mSocket = new Socket(host, port);
}
public void run() {
// 和服務端進行通訊
}
public static void main(String[] argv) {
try {
// 因爲服務端運行在同一主機,這裏咱們使用 localhost
EchoClient client = new EchoClient("localhost", 9877);
client.run();
} catch (IOException e) {
e.printStackTrace();
}
}
}
複製代碼
3. 經過 socket.getInputStream()/getOutputStream() 獲取輸入/輸出流進行通訊
首先,咱們來實現服務端:
public class EchoServer {
// ...
private void handleClient(Socket socket) throws IOException {
InputStream in = socket.getInputStream();
OutputStream out = socket.getOutputStream();
byte[] buffer = new byte[1024];
int n;
while ((n = in.read(buffer)) > 0) {
out.write(buffer, 0, n);
}
}
}
複製代碼
能夠看到,服務端的實現其實很簡單,咱們不停地讀取輸入數據,而後寫回給客戶端。
下面咱們看看客戶端。
public class EchoClient {
// ...
public void run() throws IOException {
Thread readerThread = new Thread(this::readResponse);
readerThread.start();
OutputStream out = mSocket.getOutputStream();
byte[] buffer = new byte[1024];
int n;
while ((n = System.in.read(buffer)) > 0) {
out.write(buffer, 0, n);
}
}
private void readResponse() {
try {
InputStream in = mSocket.getInputStream();
byte[] buffer = new byte[1024];
int n;
while ((n = in.read(buffer)) > 0) {
System.out.write(buffer, 0, n);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
複製代碼
客戶端會稍微複雜一點點,在讀取用戶輸入的同時,咱們又想讀取服務器的響應。因此,這裏建立了一個線程來讀服務器的響應。
不熟悉 lambda 的讀者,能夠把 Thread readerThread = new Thread(this::readResponse)
換成下面這個代碼:
Thread readerThread = new Thread(new Runnable() {
@Override
public void run() {
readResponse();
}
});
複製代碼
打開兩個 terminal 分別執行以下命令:
$ javac EchoServer.java
$ java EchoServer
複製代碼
$ javac EchoClient.java
$ java EchoClient
hello Server
hello Server
foo
foo
複製代碼
在客戶端,咱們會看到,輸入的全部字符都打印了出來。
最後須要注意的有幾點:
readThread
。實際應用中,咱們能夠經過關閉 socket 來讓線程從阻塞讀中返回。推薦讀者閱讀《Java併發編程實戰》在進入這一節的主題前,讀者不妨先考慮一個問題:在上一節的實例中,咱們運行 echo 服務後,在客戶端鏈接成功時,一個有多少個 socket 存在?
答案是 3 個 socket。客戶端一個,服務端有兩個。跟這個問題的答案直接關聯的是本節的主題——Socket
和 ServerSocket
的區別是什麼。
眼尖的讀者,可能會注意到在上一節我是這樣描述他們的:
在 Java 的 SDK 中,socket 的共有兩個接口:用於監聽客戶鏈接的
ServerSocket
和用於通訊的Socket
。
注意,我只說 ServerSocket
是用於監聽客戶鏈接,而沒有說它也能夠用來通訊。下面咱們來詳細瞭解一下他們的區別。
注:如下描述使用的是 UNIX/Linux 系統的 API
首先,咱們建立 ServerSocket
後,內核會建立一個 socket。這個 socket 既能夠拿來監聽客戶鏈接,也能夠鏈接遠端的服務。因爲 ServerSocket
是用來監聽客戶鏈接的,緊接着它就會對內核建立的這個 socket 調用 listen
函數。這樣一來,這個 socket 就成了所謂的 listening socket,它開始監聽客戶的鏈接。
接下來,咱們的客戶端建立一個 Socket
,一樣的,內核也建立一個 socket 實例。內核建立的這個 socket 跟 ServerSocket
一開始建立的那個沒有什麼區別。不一樣的是,接下來 Socket
會對它執行 connect
,發起對服務端的鏈接。前面咱們說過,socket API 實際上是 TCP 層的封裝,因此 connect
後,內核會發送一個 SYN
給服務端。
如今,咱們切換角色到服務端。服務端的主機在收到這個 SYN
後,會建立一個新的 socket,這個新建立的 socket 跟客戶端繼續執行三次握手過程。
三次握手完成後,咱們執行的 serverSocket.accept()
會返回一個 Socket
實例,這個 socket 就是上一步內核自動幫咱們建立的。
因此說,在一個客戶端鏈接的狀況下,其實有 3 個 socket。
關於內核自動建立的這個 socket,還有一個頗有意思的地方。它的端口號跟 ServerSocket
是一毛同樣的。咦!!不是說,一個端口只能綁定一個 socket 嗎?其實這個說法並不夠準確。
前面我說的TCP 經過端口號來區分數據屬於哪一個進程的說法,在 socket 的實現裏須要改一改。Socket 並不只僅使用端口號來區別不一樣的 socket 實例,而是使用 <peer addr:peer port, local addr:local port>
這個四元組。
在上面的例子中,咱們的 ServerSocket
長這樣:<*:*, *:9877>
。意思是,能夠接受任何的客戶端,和本地任何 IP。
accept
返回的 Socket
則是這樣: <127.0.0.1:xxxx, 127.0.0.1:9877>
,其中xxxx
是客戶端的端口號。
若是數據是發送給一個已鏈接的 socket,內核會找到一個徹底匹配的實例,因此數據準確發送給了對端。
若是是客戶端要發起鏈接,這時候只有 <*:*, *:9877>
會匹配成功,因此 SYN
也準確發送給了監聽套接字。
Socket/ServerSocket
的區別咱們就講到這裏。若是讀者以爲不過癮,能夠參考《TCP/IP 詳解》卷一、卷2。
背景知識
Socket 長鏈接,指的是在客戶和服務端之間保持一個 socket 鏈接長時間不斷開。
比較熟悉 Socket
的讀者,可能知道有這樣一個 API:
socket.setKeepAlive(true);
複製代碼
嗯……keep alive,「保持活着」,這個應該就是讓 TCP 不斷開的意思。那麼,咱們要實現一個 socket 的長鏈接,只須要這一個調用便可。
遺憾的是,生活並不老是那麼美好。對於 4.4BSD 的實現來講,Socket 的這個 keep alive 選項若是打開而且兩個小時內沒有通訊,那麼底層會發一個心跳,看看對方是否是還活着。
注意,兩個小時纔會發一次。也就是說,在沒有實際數據通訊的時候,我把網線拔了,你的應用程序要通過兩個小時纔會知道。
在說明若是實現長鏈接前,咱們先來理一理咱們面臨的問題。假定如今有一對已經鏈接的 socket,在如下狀況發生時候,socket 將再也不可用:
FIN
,通知對方要關閉 TCP 鏈接。在這種狀況下,另外一端若是去讀 socket,將會讀到 EoF
(End of File)。因而咱們知道對方關閉了 socket。FIN
的,由於它已經跪了。此時對方沒法得知這一狀況。對方在嘗試讀取數據時,最後會返回 read time out。若是寫數據,則是 host unreachable 之類的錯誤。在上面的幾種情形中,有一個共同點就是,只要去讀、寫 socket,只要 socket 鏈接不正常,咱們就可以知道。基於這一點,要實現一個 socket 長鏈接,咱們須要作的就是不斷地給對方寫數據,而後讀取對方的數據,也就是所謂的心跳。只要心還在跳,socket 就是活的。寫數據的間隔,須要根據實際的應用需求來決定。
心跳包不是實際的業務數據,根據通訊協議的不一樣,須要作不一樣的處理。
比方說,咱們使用 JSON 進行通訊,那麼,咱們能夠加一個 type
字段,表面這個 JSON 是心跳仍是業務數據。
{
"type": 0, // 0 表示心跳
// ...
}
複製代碼
使用二進制協議的狀況相似。要求就是,咱們可以區別一個數據包是心跳仍是真實數據。這樣,咱們便實現了一個 socket 長鏈接。
實現示例
這一小節咱們一塊兒來實現一個帶長鏈接的 Android echo 客戶端。
首先了接口部分:
public final class LongLiveSocket {
/** * 錯誤回調 */
public interface ErrorCallback {
/** * 若是須要重連,返回 true */
boolean onError();
}
/** * 讀數據回調 */
public interface DataCallback {
void onData(byte[] data, int offset, int len);
}
/** * 寫數據回調 */
public interface WritingCallback {
void onSuccess();
void onFail(byte[] data, int offset, int len);
}
public LongLiveSocket(String host, int port, DataCallback dataCallback, ErrorCallback errorCallback) {
}
public void write(byte[] data, WritingCallback callback) {
}
public void write(byte[] data, int offset, int len, WritingCallback callback) {
}
public void close() {
}
}
複製代碼
咱們這個支持長鏈接的類就叫 LongLiveSocket
好了。若是在 socket 斷開後須要重連,只須要在對應的接口裏面返回 true 便可(在真實場景裏,咱們還須要讓客戶設置重連的等待時間,還有讀寫、鏈接的 timeout等。爲了簡單,這裏就直接不支持了。
另外須要注意的一點是,若是要作一個完整的庫,須要同時提供阻塞式和回調式API。一樣因爲篇幅緣由,這裏直接省掉了。
首先咱們看看 write()
方法:
public void write(byte[] data, int offset, int len, WritingCallback callback) {
mWriterHandler.post(() -> {
Socket socket = getSocket();
if (socket == null) {
// initSocket 失敗而客戶說不須要重連,但客戶又叫咱們給他發送數據
throw new IllegalStateException("Socket not initialized");
}
try {
OutputStream outputStream = socket.getOutputStream();
DataOutputStream out = new DataOutputStream(outputStream);
out.writeInt(len);
out.write(data, offset, len);
callback.onSuccess();
} catch (IOException e) {
Log.e(TAG, "write: ", e);
// 關閉 socket,避免資源泄露
closeSocket();
// 這裏咱們把發生失敗的數據返回給客戶端,這樣客戶能夠更方便地從新發送數據
callback.onFail(data, offset, len);
if (!closed() && mErrorCallback.onError()) {
// 重連
initSocket();
}
}
});
}
複製代碼
因爲咱們須要定時寫心跳,這裏使用一個 HandlerThread
來處理寫請求。通訊使用的協議,只是簡單地在用戶數據前加一個 len 字段,用於肯定消息的長度。
下面咱們看心跳的發送:
private final Runnable mHeartBeatTask = new Runnable() {
private byte[] mHeartBeat = new byte[0];
@Override
public void run() {
// 咱們使用長度爲 0 的數據做爲 heart beat
write(mHeartBeat, new WritingCallback() {
@Override
public void onSuccess() {
// 每隔 HEART_BEAT_INTERVAL_MILLIS 發送一次
mWriterHandler.postDelayed(mHeartBeatTask, HEART_BEAT_INTERVAL_MILLIS);
mUIHandler.postDelayed(mHeartBeatTimeoutTask, HEART_BEAT_TIMEOUT_MILLIS);
}
@Override
public void onFail(byte[] data, int offset, int len) {
// nop
// write() 方法會處理失敗
}
});
}
};
private final Runnable mHeartBeatTimeoutTask = () -> {
Log.e(TAG, "mHeartBeatTimeoutTask#run: heart beat timeout");
closeSocket();
};
複製代碼
發送心跳使用咱們上面實現的 write()
方法。在發送成功後,post delay 一個 timeout task,若是到期後還沒收到服務器的響應,咱們將認爲 socket 出現異常,這裏直接關閉 socket。最後是對心跳的處理:
int nbyte = in.readInt();
if (nbyte == 0) {
Log.i(TAG, "readResponse: heart beat received");
mUIHandler.removeCallbacks(mHeartBeatTimeoutTask);
}
複製代碼
因爲用戶數據的長度老是會大於 1,這裏咱們就使用 len == 0
的數據做爲心跳。收到心跳後,移除 mHeartBeatTimeoutTask
。
剩餘代碼跟咱們的主題沒有太大關係,讀者在這裏[3]能夠找到完整的代碼或者本身完成這個例子。
最後須要說明的是,若是想節省資源,在有客戶發送數據的時候能夠省略 heart beat。
咱們對讀出錯時候的處理,可能也存在一些爭議。讀出錯後,咱們只是關閉了 socket。socket 須要等到下一次寫動做發生時,纔會從新鏈接。實際應用中,若是這是一個問題,在讀出錯後能夠直接開始重連。這種狀況下,還須要一些額外的同步,避免重複建立 socket。heart beat timeout 的狀況相似。
若是僅僅是爲了使用是 socket,咱們大能夠不去理會協議的細節。之因此推薦你們去看一看《TCP/IP 詳解》,是由於它們有太多值得學習的地方。不少咱們工做中遇到的問題,均可以在這裏找到答案。
如下每個小節的標題都是一個小問題,建議讀者獨立思考一下,再繼續往下看。若是你發現你的答案比個人更好,請必定發送郵件到 ljtong64 AT gmail DOT com 告訴我。
有這麼一句流行的話:這個世界惟一不變的,就是變化。當咱們對協議版本進行升級的時候,正確識別不一樣版本的協議對軟件的兼容很是重要。那麼,咱們如何設計協議,纔可以爲未來的版本升級作準備呢?
答案能夠在 IP 協議找到。
IP 協議的第一個字段叫 version,目前使用的是 4 或 6,分別表示 IPv4 和 IPv6。因爲這個字段在協議的開頭,接收端收到數據後,只要根據第一個字段的值就可以判斷這個數據包是 IPv4 仍是 IPv6。
再強調一下,這個字段在兩個版本的IP協議都位於第一個字段,爲了作兼容處理,對應的這個字段必須位於同一位置。文本協議(如,JSON、HTML)的狀況相似。
舉個例子,咱們用微信發送一條消息。這條消息的長度是不肯定的,而且每條消息都有它的邊界。咱們如何來處理這個邊界呢?
仍是同樣,看看 IP。IP 的頭部有個 header length 和 data length 兩個字段。經過添加一個 len 域,咱們就可以把數據根據應用邏輯分開。
跟這個相對的,還有另外一個方案,那就是在數據的末尾放置終止符。比方說,想 C 語言的字符串那樣,咱們在每一個數據的末尾放一個 \0
做爲終止符,用以標識一條消息的尾部。這個方法帶來的問題是,用戶的數據也可能存在 \0
。此時,咱們就須要對用戶的數據進行轉義。比方說,把用戶數據的全部 \0
都變成 \0\0
。讀消息的過程總,若是遇到 \0\0
,那它就表明 \0
,若是隻有一個 \0
,那就是消息尾部。
使用 len 字段的好處是,咱們不須要對數據進行轉義。讀取數據的時候,只要根據 len 字段,一次性把數據都讀進來就好,效率會更高一些。
終止符的方案雖然要求咱們對數據進行掃描,可是若是咱們可能從任意地方開始讀取數據,就須要這個終止符來肯定哪裏纔是消息的開頭了。
固然,這兩個方法不是互斥的,能夠一塊兒使用。
如今咱們有一個需求,須要一次上傳多個文件到服務器,只有在全部文件都上傳成功的狀況下,纔算成功。咱們該如何來實現呢?
IP 在數據報過大的時候,會把一個數據報拆分紅多個,並設置一個 MF (more fragments)位,表示這個包只是被拆分後的數據的一部分。
好,咱們也學一學 IP。這裏,咱們能夠給每一個文件從 0 開始編號。上傳文件的同時,也攜帶這個編號,並額外附帶一個 MF 標誌。除了編號最大的文件,全部文件的 MF 標誌都置位。由於 MF 沒有置位的是最後一個文件,服務器就能夠根據這個得出總共有多少個文件。
另外一種不使用 MF 標誌的方法是,咱們在上傳文件前,就告訴服務器總共有多少個文件。
若是讀者對數據庫比較熟悉,學數據庫用事務來處理,也是能夠的。這裏就不展開討論了。
這裏講一個我曾經遇到過的面試題。如今有一個任務隊列,多個工做線程從中取出任務並執行,執行結果放到一個結果隊列中。先要求,放入結果隊列的時候,順序順序須要跟從工做隊列取出時的同樣(也就是說,先取出的任務,執行結果須要先放入結果隊列)。
咱們看看 TCP/IP 是怎麼處理的。IP 在發送數據的時候,不一樣數據報到達對端的時間是不肯定的,後面發送的數據有可能較先到達。TCP 爲了解決這個問題,給所發送數據的每一個字節都賦了一個序列號,經過這個序列號,TCP 就可以把數據按原順序從新組裝。
同樣,咱們也給每一個任務賦一個值,根據進入工做隊列的順序依次遞增。工做線程完成任務後,在將結果放入結果隊列前,先檢查要放入對象的寫一個序列號是否是跟本身的任務相同,若是不一樣,這個結果就不能放進去。此時,最簡單的作法是等待,知道下一個能夠放入隊列的結果是本身所執行的那一個。可是,這個線程就沒辦法繼續處理任務了。
更好的方法是,咱們維護多一個結果隊列的緩衝,這個緩衝裏面的數據按序列號從小到大排序。工做線程要將結果放入,有兩種可能:
若是測試代表,這個結果緩衝的數據很少,那麼使用普通的鏈表就能夠。若是數據比較多,可使用一個最小堆。
咱們說,TCP 提供了可靠的傳輸。這樣不就可以保證對方收到消息了嗎?
很遺憾,其實不能。在咱們往 socket 寫入的數據,只要對端的內核收到後,就會返回 ACK
,此時,socket 就認爲數據已經寫入成功。然而要注意的是,這裏只是對方所運行的系統的內核成功收到了數據,並不表示應用程序已經成功處理了數據。
解決辦法仍是同樣,咱們學 TCP
,添加一個應用層的 APP ACK
。應用接收到消息並處理成功後,發送一個 APP ACK
給對方。
有了 APP ACK
,咱們須要處理的另外一個問題是,若是對方真的沒有收到,須要怎麼作?
TCP 發送數據的時候,消息同樣可能丟失。TCP 發送數據後,若是長時間沒有收到對方的 ACK
,就假設數據已經丟失,並從新發送。
咱們也同樣,若是長時間沒有收到 APP ACK
,就假設數據丟失,從新發送一個。
附:
[2] jekton.github.io