在前面的兩篇文章中,留下了一個問題,對於 TCP 或 UDP 的服務器,如何實現併發處理客戶端。java
最直觀的想法就是爲每一個到來的請求,建立一個單獨的線程來處理,可是這種方式未免太浪費資源了,那可使用線程池來管理線程,這樣能夠節約資源。以 TCP 服務器舉例。安全
首先須要定義一個須要提交到線程池中的任務。服務器
public class TCPRunnable implements Runnable {
private Socket mSocket;
public TCPRunnable(Socket socket) {
mSocket = socket;
}
@Override
public void run() {
try {
System.out.println("Handling client: " + mSocket.getRemoteSocketAddress());
InputStream in = mSocket.getInputStream();
OutputStream out = mSocket.getOutputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line;
System.out.println("Client said: ");
while ((line = br.readLine()) != null) {
System.out.println(line);
}
out.write("Welcome!".getBytes());
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (mSocket != null) {
mSocket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
複製代碼
在構造函數中,須要傳入一個 Socket
實例,當任務提交到線程池後,Socket
的讀寫操做就在異步線程中執行。網絡
如今能夠改進下服務器端,只須要在獲取 Socket
實例後提交任務便可多線程
public class TCPServer1 {
public static void main(String[] args) {
ExecutorService mThreadPool = Executors.newCachedThreadPool();
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(8890);
while (true) {
Socket socket = serverSocket.accept();
mThreadPool.execute(new TCPRunnable(socket));
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (serverSocket != null) {
serverSocket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
複製代碼
使用線程池好像很完美,可是如今再思考一個老是,假如客戶端但願與服務器保持一個長鏈接,那麼很顯然線程池也限制了客戶端併發訪問的數量,由於核心線程就那麼幾個。 那麼可不能夠增大線程池中的核心線程數量呢? 能夠是能夠,可是要增大多少呢?面對數以百萬計的客戶端,你選擇不了!並且增大線程數量,只會帶來更大的線程開銷,包括線程調度以及上下文切換。 同時,咱們還要面對一個老是,那就是多線程臨界資源的訪問,咱們須要同步或者加鎖,這些隱藏的開銷是開發者沒法控制的。併發
Java NIO
的到來解決了這些問題,而且可讓服務器同時處理上千個客戶端,並且還能夠保持良好的性能。那麼本文就探討下 NIO
到底強在哪裏。異步
NIO
使用信道 (Channel
) 來發送和接收數據,而不使用傳統的流 (InputStream/OutputStream
)。socket
Channel
實例表明了打開一個實體的鏈接,這些實體包括硬件設備,文件,網絡套接字等等。 Channel
有個特點,在 Channel
上的操做,例如讀寫,都是線程安全的。ide
SelectableChannel
是一個抽象類,它實現了 Channel
接口,這個類比較特殊。函數
首先 SelectableChannel
能夠是阻塞或者非阻塞模式。若是是阻塞模式,在這個信道上的任何 I/O 操做都是阻塞的直到 I/O 完成。 而若是是非阻塞模式,任何在這個信道上的 I/O 都不會阻塞,可是傳輸的字節數可能比本來請求的字節數要少,甚至一個也沒有。
其次呢 SelectableChannel
能夠被 Selector
用來多路複用,不過首先須要調用 selectableChannel.configureBlocking(false)
調整爲非阻塞模式(nonblocking mode
),這一點很重要。而後進行註冊
SelectionKey register(Selector sel, int ops) SelectionKey register(Selector sel, int ops, Object att) 複製代碼
第一個參數表明要註冊的 Selector
實例。關於 Selector
後面再講。
第二個參數表明本通道感興趣的操做,這些都定義在 SelectionKey
類中,以下
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
複製代碼
對於 SocketChannel
,它感興趣的操做只有 OP_READ
, OP_WIRTE
和 OP_CONNECT
,然而它並不包括 OP_ACCEPT
。 而 ServerSocketChannel
能夠對這四個操做都感興趣。爲什麼?由於只有 ServerSocketChannel
有 accpet()
方法。
SocketChannel
和ServerSocketChannel
都是SelectableChannel
的子類。
第三個參數 Object att
是註冊時的附件,也就是能夠在註冊的時候帶點什麼東西過去。
register()
方法會返回一個 SelectionKey
實例。SelectionKey
至關於一個 Java Bean
,其實就是 register()
的三個參數的容器,它能夠返回和設置這些參數
Selector selector();
int interestOps();
Object attachment() 複製代碼
SocketChannel
表明套接字通道(socket channel
)。
SocketChannel
實例是經過它的靜態的方法 open()
建立的
public static SocketChannel open() throws IOException {
return SelectorProvider.provider().openSocketChannel();
}
public static SocketChannel open(SocketAddress remote) throws IOException {
// 1. ceate socket channel
SocketChannel sc = open();
try {
// 2. connect channel's socket, blocking until connected or error
sc.connect(remote);
} catch (Throwable x) {
try {
sc.close();
} catch (Throwable suppressed) {
x.addSuppressed(suppressed);
}
throw x;
}
assert sc.isConnected();
return sc;
}
複製代碼
open()
方法僅僅是建立一個 SocketChannel
對象,而 open(SocketAddress remote)
就更進一步,它還調用了 connect(addr)
來鏈接服務器。
SocketChannel
是 SelectableChannel
的子類,還記得前面 SelectableChannel
的特性嗎?若是不配置阻塞模式,那麼 SocketChannel
對象默認就是阻塞模式,那麼 open(SocketAddress remote)
方法其實就是阻塞式打開服務器鏈接。並且在 SocketChannel
上任何 I/O 操做都是阻塞式的。
那麼既然 SelectableChannel
能夠在非阻塞模式下的任何 I/O 操做都不阻塞,那麼咱們能夠先調用無參的 open()
方法,而後再配置爲非阻塞模式,再進行鏈接,而這個鏈接就是非阻塞式鏈接,僞代碼以下
// 建立 SocketChannel 實例
SocketChannel sc = SocketChannel.open();
// 調整爲非阻塞模式
sr.configureBlocking(false);
// 鏈接服務器
sr.connect(remoteAddr);
複製代碼
此時的 connect()
方法是非阻塞式的,咱們能夠經過 isConnectionPending()
方法來查詢是否還在鏈接中,若是還在鏈接中咱們能夠作點其它事,而不用像建立 Socket
同樣一塊兒阻塞走到鏈接創建,在這裏咱們能夠看到使用 NIO
的好處了。
若是 isConnectionPending()
返回了 false
,那就表明已經創建鏈接了,可是咱們還要調用 finishConnect()
來完成鏈接,這點須要注意。
public class NonBlockingTCPClient {
public static void main(String[] args) {
byte[] data = "hello".getBytes();
SocketChannel channel = null;
try {
// 1. open a socket channel
channel = SocketChannel.open();
// adjust to be nonblocking
channel.configureBlocking(false);
// 2. init connection to server and repeatedly poll with complete
// connect() and finishConnect() are nonblocking operation, both return immediately
if (!channel.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8899))) {
while (!channel.finishConnect()) {
System.out.print(".");
}
}
System.out.println("Connected to server...");
ByteBuffer writeBuffer = ByteBuffer.wrap(data);
ByteBuffer readBuffer = ByteBuffer.allocate(data.length);
int totalBytesReceived = 0;
int bytesReceived;
// 3. read and write bytes
while (totalBytesReceived < data.length) {
if (writeBuffer.hasRemaining()) {
channel.write(writeBuffer);
}
if ((bytesReceived = channel.read(readBuffer)) == -1) {
throw new SocketException("Connection closed prematurely");
}
totalBytesReceived += bytesReceived;
System.out.print(".");
}
System.out.println("Server said: " + new String(readBuffer.array()));
} catch (IOException e) {
e.printStackTrace();
} finally {
// 4 .close socket channel
try {
if (channel != null) {
channel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
複製代碼
第一步,建立 SocketChannel
實例,並配置爲非阻塞模式,只有在非阻塞模式下,任何在 SocketChannel
實例上的 I/O 操做纔是非阻塞的。這樣咱們的客戶端就是一個非阻塞式客戶端,也就能夠提高客戶端性能。
第二步,用 connect()
方法鏈接服務器,同時用 while
循環不斷檢測並徹底鏈接。 其實咱們能夠不用這樣盲等,這裏只是爲了演示鏈接的過程。 當你在須要立刻進行 I/O 操做前,必需要用 finishConnect()
完成鏈接過程。
第三步,用 ByteBuffer
讀寫字節,這裏咱們爲什麼和一個 while
循環不斷地讀寫呢? 還記得前面講 SelectableChannel
非阻塞時的特性嗎? 若是一個 SelectableChannel
爲非阻塞模式,它的 I/O 操做讀寫的字節數可能比實際的要少,甚至沒有。 因此咱們這裏用循環不斷的讀寫,保證讀寫完成。
官方對 SocketChannel.write() 有一段話是這樣說的: A socket channel in non-blocking mode, for example, cannot write any more bytes than are free in the socket's output buffer.
ServerSocketChannel
類表明服務器端套接字通道(server-socket channel
)。
ServerSocketChannel
和 SocktChannel
同樣,須要經過靜態方法 open()
來建立一個實例,建立後,還須要經過 bind()
方法來綁定到本地的 IP 地址和端口
ServerSocketChannel bind(SocketAddress local) ServerSocketChannel bind(SocketAddress local, int limitQueue) 複製代碼
參數 SocketAddress local
表明本地 IP 地址和端口號,參數 int limitQueue
限制了鏈接的數量。
Selector
是 SelectableChannel
的多路複用器,能夠用一個 Selector
管理多個 SelectableChannel
。例如,能夠用 Selector
在一個線程中管理多個 ServerSocketChannel
,那麼咱們就能夠在單線程中同時監聽多個端口的請求,這簡直是美不可言。 從這裏咱們也能夠看出使用 NIO
的好處。
Selector
實例也須要經過靜態方法 open()
建立。
前面說過,咱們須要調用 SelectableChannel
的 register()
來向 Selector
註冊,它會返回一個 SelctionKey
來表明此次註冊。
前面說過,能夠經過 Selector
管理多個 SelectableChannel
,它的 select()
方法能夠監測哪些信道已經準備好進行 I/O 操做了,返回值表明了這些 I/O 的數量。
int select() int select(long timeout) int selectNow() 複製代碼
當調用 select()
方法後,它會把表明已經準備好 I/O 操做的信道的 SelectionKey
保存在一個集合中,能夠經過 selectedKeys()
返回。
Set<SelectionKey> selectedKeys() 複製代碼
select()
的三個方法,從命名就能夠看出這幾個方法的不一樣之處,第一個方法是阻塞式調用,第三個方法設置了一個超時時間,第三個方法是當即返回。
若是調用 selcet()
方法會致使線程阻塞,甚至無限阻塞,wakeUp()
方法是喚醒那些調用 select()
方法而處於阻塞狀態的線程。
package com.ckt.sockettest;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TCPChannelServer {
public static void main(String[] args) {
Selector selector = null;
try {
// 1. open a selector
selector = Selector.open();
// 2. listen for server socket channel
ServerSocketChannel ssc = ServerSocketChannel.open();
// must to be nonblocking mode before register
ssc.configureBlocking(false);
// bind server socket channel to port 8899
ssc.bind(new InetSocketAddress(8899));
// 3. register it with selector
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) { // run forever
// 4. select ready SelectionKey for I/O operation
if (selector.select(3000) == 0) {
continue;
}
// 5. get selected keys
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 6. handle selected key's interest operations
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// get socket channel from server socket channel
SocketChannel clientChannel = serverSocketChannel.accept();
// must to be nonblocking before register with selector
clientChannel.configureBlocking(false);
// register socket channel to selector with OP_READ
clientChannel.register(key.selector(), SelectionKey.OP_READ);
}
if (key.isReadable()) {
// read bytes from socket channel to byte buffer
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(10);
int readBytes = clientChannel.read(readBuffer);
if (readBytes == -1) {
System.out.println("closed.......");
clientChannel.close();
} else if (readBytes > 0) {
String s = new String(readBuffer.array());
System.out.println("Client said: " + s);
if (s.trim().equals("Hello")) {
// attachment is content used to write
key.interestOps(SelectionKey.OP_WRITE);
key.attach("Welcome!!!");
}
}
}
if (key.isValid() && key.isWritable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
// get content from attachment
String content = (String) key.attachment();
// write content to socket channel
clientChannel.write(ByteBuffer.wrap(content.getBytes()));
key.interestOps(SelectionKey.OP_READ);
}
// remove handled key from selected keys
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
// close selector
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
複製代碼
第一步,建立 Selector
實例。
第二步,建立 ServerSocketChannel
實例,配置爲非阻塞模式,綁定本地端口。
第三步,把 ServerSocketChannel
實例 註冊到 Selector
實例中。
第四步,選擇一些準備好 I/O 操做的信道,這裏設置了3秒超時時間,也就是阻塞3秒。
第五步,獲取選中的 SelectionKey
的集合。
第六步,處理 SelectionKey
的感興趣的操做。註冊到 selector
中的 serverSocketChannel
只能是 isAcceptable()
,所以經過它的 accept()
方法,咱們能夠獲取到客戶端的請求 SocketChannel
實例,而後再把這個 socketChannel
註冊到 selector
中,設置爲可讀的操做。那麼下次遍歷 selectionKeys
的時候,就能夠處理那麼可讀的操做。
經過三篇文章,概要性的描述了 Java Socket
的輪廓。 然而我在實際的工做中並無接觸這方面內容,所以這三篇文章只是膚淺的入門,若是往後有機會深刻學習,再來改善這些文章內容。