在網絡的初期,網民不多,服務器徹底無壓力,那時的技術也沒有如今先進,一般用一個線程來全程跟蹤處理一個請求。由於這樣最簡單。java
其實代碼實現你們都知道,就是服務器上有個ServerSocket在某個端口監聽,接收到客戶端的鏈接後,會建立一個Socket,並把它交給一個線程進行後續處理。git
線程主要從Socket讀取客戶端傳過來的數據,而後進行業務處理,並把結果再寫入Socket傳回客戶端。github
因爲網絡的緣由,Socket建立後並不必定能馬上從它上面讀取數據,可能須要等一段時間,此時線程也必須一直阻塞着。在向Socket寫入數據時,也可能會使線程阻塞。面試
客戶端:建立20個Socket並鏈接到服務器上,再建立20個線程,每一個線程負責一個Socket。redis
服務器端:接收到這20個鏈接,建立20個Socket,接着建立20個線程,每一個線程負責一個Socket。安全
爲了模擬服務器端的Socket在建立後不能立馬讀取數據,讓客戶端的20個線程分別休眠5-10之間的一個隨機秒數。bash
客戶端的20個線程會在第5秒到第10秒這段時間內陸陸續續的向服務器端發送數據,服務器端的20個線程也會陸陸續續接收到數據。服務器
/**
* @author lixinjie
* @since 2019-05-07
*/
public class BioServer {
static AtomicInteger counter = new AtomicInteger(0);
static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
public static void main(String[] args) {
try {
ServerSocket ss = new ServerSocket();
ss.bind(new InetSocketAddress("localhost", 8080));
while (true) {
Socket s = ss.accept();
processWithNewThread(s);
}
} catch (Exception e) {
e.printStackTrace();
}
}
static void processWithNewThread(Socket s) {
Runnable run = () -> {
InetSocketAddress rsa = (InetSocketAddress)s.getRemoteSocketAddress();
System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + counter.incrementAndGet());
try {
String result = readBytes(s.getInputStream());
System.out.println(time() + "->" + result + "->" + Thread.currentThread().getId() + ":" + counter.getAndDecrement());
s.close();
} catch (Exception e) {
e.printStackTrace();
}
};
new Thread(run).start();
}
static String readBytes(InputStream is) throws Exception {
long start = 0;
int total = 0;
int count = 0;
byte[] bytes = new byte[1024];
//開始讀數據的時間
long begin = System.currentTimeMillis();
while ((count = is.read(bytes)) > -1) {
if (start < 1) {
//第一次讀到數據的時間
start = System.currentTimeMillis();
}
total += count;
}
//讀完數據的時間
long end = System.currentTimeMillis();
return "wait=" + (start - begin) + "ms,read=" + (end - start) + "ms,total=" + total + "bs";
}
static String time() {
return sdf.format(new Date());
}
}
複製代碼
/**
* @author lixinjie
* @since 2019-05-07
*/
public class Client {
public static void main(String[] args) {
try {
for (int i = 0; i < 20; i++) {
Socket s = new Socket();
s.connect(new InetSocketAddress("localhost", 8080));
processWithNewThread(s, i);
}
} catch (IOException e) {
e.printStackTrace();
}
}
static void processWithNewThread(Socket s, int i) {
Runnable run = () -> {
try {
//睡眠隨機的5-10秒,模擬數據還沒有就緒
Thread.sleep((new Random().nextInt(6) + 5) * 1000);
//寫1M數據,爲了拉長服務器端讀數據的過程
s.getOutputStream().write(prepareBytes());
//睡眠1秒,讓服務器端把數據讀完
Thread.sleep(1000);
s.close();
} catch (Exception e) {
e.printStackTrace();
}
};
new Thread(run).start();
}
static byte[] prepareBytes() {
byte[] bytes = new byte[1024*1024*1];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = 1;
}
return bytes;
}
}
複製代碼
執行結果以下:網絡
時間->IP:Port->線程Id:當前線程數
15:11:52->127.0.0.1:55201->10:1
15:11:52->127.0.0.1:55203->12:2
15:11:52->127.0.0.1:55204->13:3
15:11:52->127.0.0.1:55207->16:4
15:11:52->127.0.0.1:55208->17:5
15:11:52->127.0.0.1:55202->11:6
15:11:52->127.0.0.1:55205->14:7
15:11:52->127.0.0.1:55206->15:8
15:11:52->127.0.0.1:55209->18:9
15:11:52->127.0.0.1:55210->19:10
15:11:52->127.0.0.1:55213->22:11
15:11:52->127.0.0.1:55214->23:12
15:11:52->127.0.0.1:55217->26:13
15:11:52->127.0.0.1:55211->20:14
15:11:52->127.0.0.1:55218->27:15
15:11:52->127.0.0.1:55212->21:16
15:11:52->127.0.0.1:55215->24:17
15:11:52->127.0.0.1:55216->25:18
15:11:52->127.0.0.1:55219->28:19
15:11:52->127.0.0.1:55220->29:20
時間->等待數據的時間,讀取數據的時間,總共讀取的字節數->線程Id:當前線程數
15:11:58->wait=5012ms,read=1022ms,total=1048576bs->17:20
15:11:58->wait=5021ms,read=1022ms,total=1048576bs->13:19
15:11:58->wait=5034ms,read=1008ms,total=1048576bs->11:18
15:11:58->wait=5046ms,read=1003ms,total=1048576bs->12:17
15:11:58->wait=5038ms,read=1005ms,total=1048576bs->23:16
15:11:58->wait=5037ms,read=1010ms,total=1048576bs->22:15
15:11:59->wait=6001ms,read=1017ms,total=1048576bs->15:14
15:11:59->wait=6016ms,read=1013ms,total=1048576bs->27:13
15:11:59->wait=6011ms,read=1018ms,total=1048576bs->24:12
15:12:00->wait=7005ms,read=1008ms,total=1048576bs->20:11
15:12:00->wait=6999ms,read=1020ms,total=1048576bs->14:10
15:12:00->wait=7019ms,read=1007ms,total=1048576bs->26:9
15:12:00->wait=7012ms,read=1015ms,total=1048576bs->21:8
15:12:00->wait=7023ms,read=1008ms,total=1048576bs->25:7
15:12:01->wait=7999ms,read=1011ms,total=1048576bs->18:6
15:12:02->wait=9026ms,read=1014ms,total=1048576bs->10:5
15:12:02->wait=9005ms,read=1031ms,total=1048576bs->19:4
15:12:03->wait=10007ms,read=1011ms,total=1048576bs->16:3
15:12:03->wait=10006ms,read=1017ms,total=1048576bs->29:2
15:12:03->wait=10010ms,read=1022ms,total=1048576bs->28:1
複製代碼
能夠看到服務器端確實爲每一個鏈接建立一個線程,共建立了20個線程。併發
客戶端進入休眠約5-10秒,模擬鏈接上數據不就緒,服務器端線程在等待,等待時間約5-10秒。
客戶端陸續結束休眠,往鏈接上寫入1M數據,服務器端開始讀取數據,整個讀取過程約1秒。 能夠看到,服務器端的工做線程會把時間花在**「等待數據」和「讀取數據」**這兩個過程上。
這有兩個很差的地方:
一是有不少客戶端同時發起請求的話,服務器端要建立不少的線程,可能會由於超過了上限而形成崩潰。
二是每一個線程的大部分時光中都是在阻塞着,無事可幹,形成極大的資源浪費。 開頭已經說了那個年代網民不多,因此,不可能會有大量請求同時過來。至於資源浪費就浪費吧,反正閒着也是閒着。
來個簡單的小例子:
飯店共有10張桌子,且配備了10位服務員。只要有客人來了,大堂經理就把客人帶到一張桌子,並安排一位服務員全程陪同。
即便客人暫時不須要服務,服務員也一直在旁邊站着。可能覺着是一種浪費,其實非也,這就是尊貴的VIP服務。 其實,VIP映射的是一對一的模型,主要體如今「專用」上或「私有」上。
真正的多路複用技術 多路複用技術本來指的是,在通訊方面,多種信號或數據(從宏觀上看)交織在一塊兒,使用同一條傳輸通道進行傳輸。
這樣作的目的,一方面能夠充分利用通道的傳輸能力,另外一方面天然是省時省力省錢啦。
其實這個概念很是的「生活化」,隨手就能夠舉個例子:
一條小水渠裏水在流,在一端往裏倒入大量乒乓球,在另外一端用網進行過濾,把乒乓球和水流分開。
這就是一個比較「土」的多路複用,首先在發射端把多種信號或數據進行「混合」,接着是在通道上進行傳輸,最後在接收端「分離」出本身須要的信號或數據。
相信你們都看出來了,這裏的重點其實就是處理好「混合」和「分離」,對於不一樣的信號或數據,有不一樣的處理方法。
好比之前的有線電視是模擬信號,即電磁波。一家通常只有一根信號線,但能夠同時接多個電視,每一個電視任意換臺,互不影響。
這是因爲不一樣頻率的波能夠混合和分離。(固然,可能不是十分準確,明白意思就好了。)
再好比城市的高鐵站通常都有數個站臺供高鐵(同時)停靠,但城市間的高鐵軌道單方向只有一條,如何保證那麼多趟高鐵安全運行呢?
很明顯是分時使用,每趟高鐵都有本身的時刻。多趟高鐵按不一樣的時刻出站至關於混合,按不一樣的時刻進站至關於分離。
總結一下,多路指的是多種不一樣的信號或數據或其它事物,複用指的是共用同一個物理鏈路或通道或載體。 可見,多路複用技術是一種一對多的模型,「多」的這一方複用了「一」的這一方。
其實,一對多的模型主要體如今「公用」上或「共享」上。
一對一服務是典型的有錢任性,雖然響應及時、服務周到,但不是每一個人都能享受的,畢竟仍是「屌絲」多嘛,那就來個共享服務吧。
因此實際當中更多的狀況是,客人坐下後,會給他一個菜單,讓他先看着,反正也不可能立馬點餐,服務員就去忙別的了。
可能不時的會有服務員從客人身旁通過,發現客人尚未點餐,就會主動去詢問如今須要點餐嗎?
若是須要,服務員就給你寫菜單,若是不須要,服務員就繼續往前走了。 這種狀況飯店總體運行的也很好,可是服務員人數少多了。如今服務10桌客人,4個服務員綽綽有餘。(這節省的可都是純利潤呀。)
由於10桌客人同時須要服務的狀況幾乎是不會發生的,絕大部分狀況都是錯開的。若是真有的話,那就等會好了,又不是120/119,人命關天的。
回到代碼裏,狀況與之很是類似,徹底能夠採用相同的理論去處理。
鏈接創建後,找個地方把它放到那裏,能夠暫時先無論它,反正此時也沒有數據可讀。
可是數據遲早會到來的,因此,要不時的去詢問每一個鏈接有數據沒有,有的話就讀取數據,沒有的話就繼續無論它。
其實這個模式在Java裏早就有了,就是Java NIO,這裏的大寫字母「N」是單詞「New」,即「新」的意思,主要是爲了和上面的「一對一」進行區分。
如今須要把Socket交互的過程再稍微細化一些。客戶端先請求鏈接,connect,服務器端而後接受鏈接,accept,而後客戶端再向鏈接寫入數據,write,接着服務器端從鏈接上讀出數據,read。
和打電話的場景同樣,主叫撥號,connect,被叫接聽,accept,主叫說話,speak,被叫聆聽,listen。主叫給被叫打電話,說明主叫找被叫有事,因此被叫關注的是接通電話,聽對方說。
客戶端主動向服務器端發起請求,說明客戶端找服務器端有事,因此服務器端關注的是接受請求,讀取對方傳來的數據。這裏把接受請求,讀取數據稱爲服務器端感興趣的操做。
在Java NIO中,接受請求的操做,用OP_ACCEPT表示,讀取數據的操做,用OP_READ表示。
我決定先過一遍飯店的場景,讓首次接觸Java NIO的同窗不那麼迷茫。就是把常規的場景進行了定向整理,稍微有點刻意,明白意思就好了。
因而跑腿服務員就有了一個任務,替大堂經理盯梢。終於來客人了,跑腿服務員趕忙告訴了大堂經理。
因而跑腿服務員就又多了一個任務,就是盯着這桌客人,不時來問問,若是須要服務的話,就叫點餐服務員過來服務。
事情就這樣一直循環的持續下去,一切,都挺好。角色明確,職責單一,配合很好。
大堂經理和點餐服務員是需求的提供者或實現者,跑腿服務員是需求的發現者,並識別出需求的種類,須要接待的交給大堂經理,須要點餐的交給點餐服務員。
代碼的寫法很是的固定,能夠配合着後面的解說來看,這樣就好理解了,以下:
/**
* @author lixinjie
* @since 2019-05-07
*/
public class NioServer {
static int clientCount = 0;
static AtomicInteger counter = new AtomicInteger(0);
static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
public static void main(String[] args) {
try {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress("localhost", 8080));
while (true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel ssc1 = (ServerSocketChannel)key.channel();
SocketChannel sc = null;
while ((sc = ssc1.accept()) != null) {
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
InetSocketAddress rsa = (InetSocketAddress)sc.socket().getRemoteSocketAddress();
System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + (++clientCount));
}
} else if (key.isReadable()) {
//先將「讀」從感興趣操做移出,待把數據從通道中讀完後,再把「讀」添加到感興趣操做中
//不然,該通道會一直被選出來
key.interestOps(key.interestOps() & (~ SelectionKey.OP_READ));
processWithNewThread((SocketChannel)key.channel(), key);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
static void processWithNewThread(SocketChannel sc, SelectionKey key) {
Runnable run = () -> {
counter.incrementAndGet();
try {
String result = readBytes(sc);
//把「讀」加進去
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
System.out.println(time() + "->" + result + "->" + Thread.currentThread().getId() + ":" + counter.get());
sc.close();
} catch (Exception e) {
e.printStackTrace();
}
counter.decrementAndGet();
};
new Thread(run).start();
}
static String readBytes(SocketChannel sc) throws Exception {
long start = 0;
int total = 0;
int count = 0;
ByteBuffer bb = ByteBuffer.allocate(1024);
//開始讀數據的時間
long begin = System.currentTimeMillis();
while ((count = sc.read(bb)) > -1) {
if (start < 1) {
//第一次讀到數據的時間
start = System.currentTimeMillis();
}
total += count;
bb.clear();
}
//讀完數據的時間
long end = System.currentTimeMillis();
return "wait=" + (start - begin) + "ms,read=" + (end - start) + "ms,total=" + total + "bs";
}
static String time() {
return sdf.format(new Date());
}
}
複製代碼
它的大體處理過程以下:
至關於設立一個跑腿服務員。
相等於聘請了一位大堂經理。
至關於大堂經理給跑腿服務員說,幫我盯着門外,有客人來了告訴我。
至關於跑腿服務員一遍又一遍的去詢問、去轉悠。
至關於跑腿服務員終於發現門外來客人了,客人是須要接待的。
至關於跑腿服務員把大堂經理叫來了,大堂經理開始着手接待。
至關於大堂經理把客人帶到座位上,給了客人菜單,並又把客人委託給跑腿服務員,說客人接下來確定是要點餐的,你不時的來問問。
至關於跑腿服務員繼續不時的詢問着、轉悠着。
至關於跑腿服務員終於發現了一桌客人有了需求,是須要點餐的。
至關於跑腿服務員叫來了點餐服務員,點餐服務員開始爲客人寫菜單。
至關於點餐服務員寫好菜單後,就走了,能夠再去爲其餘客人寫菜單。
至關於跑腿服務員繼續着重複的詢問、轉悠,不知道將來在何方。
相信你已經看出來了,大堂經理至關於服務器端套接字,跑腿服務員至關於選擇器,點餐服務員至關於Worker線程。 啓動服務器端代碼,使用同一個客戶端代碼,按相同的套路發20個請求,結果以下:
時間->IP:Port->主線程Id:當前鏈接數
16:34:39->127.0.0.1:56105->1:1
16:34:39->127.0.0.1:56106->1:2
16:34:39->127.0.0.1:56107->1:3
16:34:39->127.0.0.1:56108->1:4
16:34:39->127.0.0.1:56109->1:5
16:34:39->127.0.0.1:56110->1:6
16:34:39->127.0.0.1:56111->1:7
16:34:39->127.0.0.1:56112->1:8
16:34:39->127.0.0.1:56113->1:9
16:34:39->127.0.0.1:56114->1:10
16:34:39->127.0.0.1:56115->1:11
16:34:39->127.0.0.1:56116->1:12
16:34:39->127.0.0.1:56117->1:13
16:34:39->127.0.0.1:56118->1:14
16:34:39->127.0.0.1:56119->1:15
16:34:39->127.0.0.1:56120->1:16
16:34:39->127.0.0.1:56121->1:17
16:34:39->127.0.0.1:56122->1:18
16:34:39->127.0.0.1:56123->1:19
16:34:39->127.0.0.1:56124->1:20
時間->等待數據的時間,讀取數據的時間,總共讀取的字節數->線程Id:當前線程數
16:34:45->wait=1ms,read=1018ms,total=1048576bs->11:5
16:34:45->wait=0ms,read=1054ms,total=1048576bs->10:5
16:34:45->wait=0ms,read=1072ms,total=1048576bs->13:6
16:34:45->wait=0ms,read=1061ms,total=1048576bs->14:5
16:34:45->wait=0ms,read=1140ms,total=1048576bs->12:4
16:34:46->wait=0ms,read=1001ms,total=1048576bs->15:5
16:34:46->wait=0ms,read=1062ms,total=1048576bs->17:6
16:34:46->wait=0ms,read=1059ms,total=1048576bs->16:5
16:34:47->wait=0ms,read=1001ms,total=1048576bs->19:4
16:34:47->wait=0ms,read=1001ms,total=1048576bs->20:4
16:34:47->wait=0ms,read=1015ms,total=1048576bs->18:3
16:34:47->wait=0ms,read=1001ms,total=1048576bs->21:2
16:34:48->wait=0ms,read=1032ms,total=1048576bs->22:4
16:34:49->wait=0ms,read=1002ms,total=1048576bs->23:3
16:34:49->wait=0ms,read=1001ms,total=1048576bs->25:2
16:34:49->wait=0ms,read=1028ms,total=1048576bs->24:4
16:34:50->wait=0ms,read=1008ms,total=1048576bs->28:4
16:34:50->wait=0ms,read=1033ms,total=1048576bs->27:3
16:34:50->wait=1ms,read=1002ms,total=1048576bs->29:2
16:34:50->wait=0ms,read=1001ms,total=1048576bs->26:2
複製代碼
服務器端接受20個鏈接,建立20個通道,並把它們註冊到選擇器上,此時不須要額外線程。
當某個通道已經有數據時,纔會用一個線程來處理它,因此,線程「等待數據」的時間是0,「讀取數據」的時間仍是約1秒。
由於20個通道是陸陸續續有數據的,因此服務器端最多時是6個線程在同時運行的,換句話說,用包含6個線程的線程池就能夠了。 對比與結論:
處理一樣的20個請求,一個須要用20個線程,一個須要用6個線程,節省了70%線程數。
在本例中,兩種感興趣的操做共用一個選擇器,且選擇器運行在主線程裏,Worker線程是新的線程。
其實對於選擇器的個數、選擇器運行在哪一個線程裏、是否使用新的線程來處理請求都沒有要求,要根據實際狀況來定。
好比說redis,和處理請求相關的就一個線程,選擇器運行在裏面,處理請求的程序也運行在裏面,因此這個線程既是I/O線程,也是Worker線程。
固然,也可使用兩個選擇器,一個處理OP_ACCEPT,一個處理OP_READ,讓它們分別運行在兩個單獨的I/O線程裏。對於能快速完成的操做能夠直接在I/O線程裏作了,對於很是耗時的操做必定要使用Worker線程池來處理。
這種處理模式就是被稱爲的多路複用I/O,多路指的是多個Socket通道,複用指的是隻用一個線程來管理它們。
一對一的形式,一個桌子配一個服務員,一個Socket分配一個線程,響應速度最快,畢竟是VIP嘛,可是效率很低,服務員大部分時間都是在站着,線程大部分時間都是在等待。
多路複用的形式,全部桌子共用一個跑腿服務員,全部Socket共用一個選擇器線程,響應速度確定變慢了,畢竟是一對多嘛。可是效率提升了,點餐服務員在須要點餐時纔會過去,工做線程在數據就緒時纔會開始工做。
從VIP到多路複用,形式上確實有很大的不一樣,其本質是從一對一到一對多的轉變,其實就是犧牲了響應速度,換來了效率的提高,不過綜合性能仍是獲得了極大的改進。
就飯店而言,究竟幾張桌子配一個跑腿服務員,幾張桌子配一個點餐服務員,通過一段時間運行,必定會有一個最優解。
就程序而言,究竟須要幾個選擇器線程,幾個工做線程,通過評估測試後,也會有一個最優解。
一旦達到最優解後,就不可能再提高了,這一樣是由多路複用這種一對多的形式所限制的。就像一對一的形式限制同樣。
人們的追求是無止境的,如何對多路複用繼續提高呢?答案必定是具備顛覆性的,即拋棄多路複用,採用全新的形式。
還以飯店爲例,如何在最優解的狀況下,既要繼續減小服務員數量,還要使效率提高呢?可能有些朋友已經猜到了,那就是拋棄服務員服務客人這種模式,把飯店改爲自助餐廳。
在客人進門時,把餐具給他,並告訴他就餐時長、不許浪費等這些規則,而後就不用管了。客人本身選餐,本身吃完,本身走人,不用再等服務員了,所以也再也不須要服務員了。(收拾桌子的除外。)
這種模式對應到程序裏,其實就是AIO,在Java裏也早就有了。
代碼的寫法很是的固定,能夠配合着後面的解說來看,這樣就好理解了,以下:
/**
* @author lixinjie
* @since 2019-05-13
*/
public class AioServer {
static int clientCount = 0;
static AtomicInteger counter = new AtomicInteger(0);
static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
public static void main(String[] args) {
try {
AsynchronousServerSocketChannel assc = AsynchronousServerSocketChannel.open();
assc.bind(new InetSocketAddress("localhost", 8080));
//非阻塞方法,其實就是註冊了個回調,並且只能接受一個鏈接
assc.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel asc, Object attachment) {
//再次註冊,接受下一個鏈接
assc.accept(null, this);
try {
InetSocketAddress rsa = (InetSocketAddress)asc.getRemoteAddress();
System.out.println(time() + "->" + rsa.getHostName() + ":" + rsa.getPort() + "->" + Thread.currentThread().getId() + ":" + (++clientCount));
} catch (Exception e) {
}
readFromChannelAsync(asc);
}
@Override
public void failed(Throwable exc, Object attachment) {
}
});
//不讓主線程退出
synchronized (AioServer.class) {
AioServer.class.wait();
}
} catch (Exception e) {
e.printStackTrace();
}
}
static void readFromChannelAsync(AsynchronousSocketChannel asc) {
//會把數據讀入到該buffer以後,再觸發工做線程來執行回調
ByteBuffer bb = ByteBuffer.allocate(1024*1024*1 + 1);
long begin = System.currentTimeMillis();
//非阻塞方法,其實就是註冊了個回調,並且只能接受一次讀取
asc.read(bb, null, new CompletionHandler<Integer, Object>() {
//從該鏈接上一共讀到的字節數
int total = 0;
/**
* @param count 表示本次讀取到的字節數,-1表示數據已讀完
*/
@Override
public void completed(Integer count, Object attachment) {
counter.incrementAndGet();
if (count > -1) {
total += count;
}
int size = bb.position();
System.out.println(time() + "->count=" + count + ",total=" + total + "bs,buffer=" + size + "bs->" + Thread.currentThread().getId() + ":" + counter.get());
if (count > -1) {//數據尚未讀完
//再次註冊回調,接受下一次讀取
asc.read(bb, null, this);
} else {//數據已讀完
try {
asc.close();
} catch (Exception e) {
e.printStackTrace();
}
}
counter.decrementAndGet();
}
@Override
public void failed(Throwable exc, Object attachment) {
}
});
long end = System.currentTimeMillis();
System.out.println(time() + "->exe read req,use=" + (end -begin) + "ms" + "->" + Thread.currentThread().getId());
}
static String time() {
return sdf.format(new Date());
}
}
複製代碼
它的大體處理過程以下:
一、初始化一個AsynchronousServerSocketChannel對象,並開始監聽
二、經過accept方法註冊一個「完成處理器」的接受鏈接回調,即CompletionHandler,用於在接受到鏈接後的相關操做。
三、當客戶端鏈接過來後,由系統來接受,並建立好AsynchronousSocketChannel對象,而後觸發該回調,並把該對象傳進該回調,該回調會在Worker線程中執行。
四、在接受鏈接回調裏,再次使用accept方法註冊一次相同的完成處理器對象,用於讓系統接受下一個鏈接。就是這種註冊只能使用一次,因此要不停的連續註冊,人家就是這樣設計的。
五、在接受鏈接回調裏,使用AsynchronousSocketChannel對象的read方法註冊另外一個接受數據回調,用於在接受到數據後的相關操做。
六、當客戶端數據過來後,由系統接受,並放入指定好的ByteBuffer中,而後觸發該回調,並把本次接受到的數據字節數傳入該回調,該回調會在Worker線程中執行。
七、在接受數據回調裏,若是數據沒有接受完,須要再次使用read方法把同一個對象註冊一次,用於讓系統接受下一次數據。這和上面的套路是同樣的。
八、客戶端的數據多是分屢次傳到服務器端的,因此接受數據回調會被執行屢次,直到數據接受完爲止。屢次接受到的數據合起來纔是完整的數據,這個必定要處理好。
九、關於ByteBuffer,要麼足夠的大,可以裝得下完整的客戶端數據,這樣屢次接受的數據直接往裏追加便可。要麼每次把ByteBuffer中的數據移到別的地方存儲起來,而後清空ByteBuffer,用於讓系統往裏裝入下一次接受的數據。
注:若是出現ByteBuffer空間不足,則系統不會裝入數據,就會致使客戶端數據老是讀不完,極有可能進入死循環。
啓動服務器端代碼,使用同一個客戶端代碼,按相同的套路發20個請求,結果以下:
時間->IP:Port->回調線程Id:當前鏈接數
17:20:47->127.0.0.1:56454->15:1
時間->發起一個讀請求,耗時->回調線程Id
17:20:47->exe read req,use=3ms->15
17:20:47->127.0.0.1:56455->15:2
17:20:47->exe read req,use=1ms->15
17:20:47->127.0.0.1:56456->15:3
17:20:47->exe read req,use=0ms->15
17:20:47->127.0.0.1:56457->16:4
17:20:47->127.0.0.1:56458->15:5
17:20:47->exe read req,use=1ms->16
17:20:47->exe read req,use=1ms->15
17:20:47->127.0.0.1:56460->15:6
17:20:47->127.0.0.1:56459->17:7
17:20:47->exe read req,use=0ms->15
17:20:47->127.0.0.1:56462->15:8
17:20:47->127.0.0.1:56461->16:9
17:20:47->exe read req,use=1ms->15
17:20:47->exe read req,use=0ms->16
17:20:47->exe read req,use=0ms->17
17:20:47->127.0.0.1:56465->16:10
17:20:47->127.0.0.1:56463->18:11
17:20:47->exe read req,use=0ms->18
17:20:47->127.0.0.1:56466->15:12
17:20:47->exe read req,use=1ms->16
17:20:47->127.0.0.1:56464->17:13
17:20:47->exe read req,use=1ms->15
17:20:47->127.0.0.1:56467->18:14
17:20:47->exe read req,use=2ms->17
17:20:47->exe read req,use=1ms->18
17:20:47->127.0.0.1:56468->15:15
17:20:47->exe read req,use=1ms->15
17:20:47->127.0.0.1:56469->16:16
17:20:47->127.0.0.1:56470->18:17
17:20:47->exe read req,use=1ms->18
17:20:47->exe read req,use=1ms->16
17:20:47->127.0.0.1:56472->15:18
17:20:47->127.0.0.1:56473->19:19
17:20:47->exe read req,use=2ms->15
17:20:47->127.0.0.1:56471->17:20
17:20:47->exe read req,use=1ms->19
17:20:47->exe read req,use=1ms->17
時間->本次接受到的字節數,截至到目前接受到的字節總數,buffer中的字節總數->回調線程Id:當前線程數
17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1
17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1
17:20:52->count=65536,total=65536bs,buffer=65536bs->14:1
17:20:52->count=230188,total=295724bs,buffer=295724bs->12:1
17:20:52->count=752852,total=1048576bs,buffer=1048576bs->14:3
17:20:52->count=131072,total=196608bs,buffer=196608bs->17:2
。。。。。。。。。。。。。。。。。。。。。。
17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:57->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1
17:20:58->count=-1,total=1048576bs,buffer=1048576bs->15:1
複製代碼
系統接受到鏈接後,在工做線程中執行了回調。而且在回調中執行了read方法,耗時是0,由於只是註冊了個接受數據的回調而已。
系統接受到數據後,把數據放入ByteBuffer,在工做線程中執行了回調。而且回調中能夠直接使用ByteBuffer中的數據。
接受數據的回調被執行了屢次,屢次接受到的數據加起來正好等於客戶端傳來的數據。
由於系統是接受到數據後才觸發的回調,因此服務器端最多時是3個線程在同時運行回調的,換句話說,線程池包含3個線程就能夠了。
對比與結論: 處理一樣的20個請求,一個須要用20個線程,一個須要用6個線程,一個須要3個線程,又節省了50%線程數。
注:不用特別較真這個比較結果,這裏只是爲了說明問題而已。哈哈。
第一種是阻塞IO,阻塞點有兩個,等待數據就緒的過程和讀取數據的過程。
第二種是阻塞IO,阻塞點有一個,讀取數據的過程。
第三種是非阻塞IO,沒有阻塞點,當工做線程啓動時,數據已經(被系統)準備好能夠直接用了。
可見,這是一個逐步消除阻塞點的過程。 再次來談談各類IO: 只有一個線程,接受一個鏈接,讀取數據,處理業務,寫回結果,再接受下一個鏈接,這是同步阻塞。這種用法幾乎沒有。
一個線程和一個線程池,線程接受到鏈接後,把它丟給線程池中的線程,再接受下一個鏈接,這是異步阻塞。對應示例一。
一個線程和一個線程池,線程運行selector,執行select操做,把就緒的鏈接拿出來丟給線程池中的線程,再執行下一次的select操做,就是多路複用,這是異步阻塞。對應示例二。
一個線程和一個線程池,線程註冊一個accept回調,系統幫咱們接受好鏈接後,才觸發回調在線程池中執行,執行時再註冊read回調,系統幫咱們接受好數據後,才觸發回調在線程池中執行,就是AIO,這是異步非阻塞。對應示例三。
redis也是多路複用,但它只有一個線程在執行select操做,處理就緒的鏈接,整個是串行化的,因此自然不存在併發問題。只能把它歸爲同步阻塞了。
BIO是阻塞IO,能夠是同步阻塞,也能夠是異步阻塞。AIO是異步IO,只有異步非阻塞這一種。所以沒有同步非阻塞這種說法,由於同步必定是阻塞的。
注:以上的說法是站在用戶程序/線程的立場上來講的。
建議把代碼下載下來,本身運行一下,體會體會: