Java NIO 詳解(概念)
前言
我們在寫java程序的時候,爲了進行優化,把全部的精力用在了處理效率上,但是對IO的關注卻很少。這也可能是由以前java早期時JVM在解釋字節碼時速度慢,運行速率大大低於本地編譯代碼,因此以前往往忽視了IO的優化。
但是現在JVM在運行時優化已前進了一大步,現在的java應用程序更多的是受IO的束縛,也就是將時間花在等待數據傳輸上。現在有了NIO,就可以減少IO的等待時間,從而提升IO的效率。
NIO原理
NIO與IO的區別
首先來講一下傳統的IO和NIO的區別,傳統的IO又稱BIO,即阻塞式IO,NIO就是非阻塞IO了。還有一種AIO就是異步IO,這裏不加闡述了。
Java IO的各種流是阻塞的。這意味着,當一個線程調用read() 或 write()時,該線程被阻塞,直到有一些數據被讀取,或數據完全寫入。該線程在此期間不能再幹任何事情了。
Java NIO的非阻塞模式,使一個線程從某通道發送請求讀取數據,但是它僅能得到目前可用的數據,如果目前沒有數據可用時,就什麼都不會獲取。而不是保持線程阻塞,所以直至數據變的可以讀取之前,該線程可以繼續做其他的事情。 非阻塞寫也是如此。一個線程請求寫入一些數據到某通道,但不需要等待它完全寫入,這個線程同時可以去做別的事情。 線程通常將非阻塞IO的空閒時間用於在其它通道上執行IO操作,所以一個單獨的線程現在可以管理多個輸入和輸出通道(channel)。
緩衝區Buffer
如上圖所示,Buffer實際上也是分爲兩種,一種用於寫數據,一種用於讀取數據。
一個Buffer對象是固定數量的數據的容器。其作用是一個存儲器,或者分段運輸區,在這裏數據可被存儲並在之後用於檢索。儘管緩衝區作用於它們存儲的原始數據類型,但緩衝區十分傾向於處理字節。非字節緩衝區可以在後臺執行從字節或到字節的轉換,這取決於緩衝區是如何創建的。
緩衝區的工作與通道緊密聯繫。通道是 I/O 傳輸發生時通過的入口,而緩衝區是這些數據傳輸的來源或目標。對於離開緩衝區的傳輸,您想傳遞出去的數據被置於一個緩衝區,被傳送到通道。對於傳回緩衝區的傳輸,一個通道將數據放置在您所提供的緩衝區中。這種在協同對象(通常是您所寫的對象以及一到多個 Channel 對象)之間進行的緩衝區數據傳遞是高效數據處理的關鍵。
以下是一個新創建的ByteBuffer:
ByteBuffer
位置被設爲 1,而且容量和上界被設爲 8,剛好經過緩衝區能夠容納的最後一個字節。標記最初未定義。容量是固定的,但另外的三個屬性可以在使用緩衝區時改變。
其中的四個屬性的含義分別如下:
- 容量(Capacity):緩衝區能夠容納的數據元素的最大數量。這一個容量在緩衝區創建時被設定,並且永遠不能改變。
- 上界(Limit):緩衝區的第一個不能被讀或寫的元素。或者說,緩衝區中現存元素的計數。
- 位置(Position):下一個要被讀或寫的元素的索引。位置會自動由相應的 get( )和 put( )函數更新。
- 標記(Mark):下一個要被讀或寫的元素的索引。位置會自動由相應的 get( )和 put( )函數更新。
Buffer的常見方法如下所示:
- flip(): 寫模式轉換成讀模式
- rewind():將 position 重置爲 0 ,一般用於重複讀。
- clear() :清空 buffer ,準備再次被寫入 (position 變成 0 , limit 變成 capacity) 。
- compact(): 將未讀取的數據拷貝到 buffer 的頭部位。
- mark(): reset():mark 可以標記一個位置, reset 可以重置到該位置。
- Buffer 常見類型: ByteBuffer 、 MappedByteBuffer 、 CharBuffer 、 DoubleBuffer 、 FloatBuffer 、 IntBuffer 、 LongBuffer 、ShortBuffer 。
通道Channel
通道(Channel)是 java.nio 的第二個主要創新。它們既不是一個擴展也不是一項增強,而是全新、極好的 Java I/O 示例,提供與 I/O 服務的直接連接。Channel 用於在字節緩衝區和位於通道另一側的實體(通常是一個文件或套接字)之間有效地傳輸數據。
通道是一種途徑,藉助該途徑,可以用最小的總開銷來訪問操作系統本身的 I/O 服務。緩衝區則是通道內部用來發送和接收數據的端點。通道channel充當連接I/O服務的導管,入下圖所示
channel
通道特性
通道可以是單向或者雙向的。一個 channel 類可能實現定義read( )方法的 ReadableByteChannel 接口,而另一個 channel 類也許實現 WritableByteChannel 接口以提供 write( )方法。實現這兩種接口其中之一的類都是單向的,只能在一個方向上傳輸數據。如果一個類同時實現這兩個接口,那麼它是雙向的,可以雙向傳輸數據。
每一個 file 或 socket 通道都實現全部三個接口。從類定義的角度而言,這意味着全部 file 和 socket 通道對象都是雙向的。這對於 sockets 不是問題,因爲它們一直都是雙向的,不過對於 files 卻是個問題了。我們知道,一個文件可以在不同的時候以不同的權限打開。從 FileInputStream 對象的getChannel( )方法獲取的 FileChannel 對象是隻讀的,不過從接口聲明的角度來看卻是雙向的,因爲FileChannel 實現 ByteChannel 接口。在這樣一個通道上調用 write( )方法將拋出未經檢查的NonWritableChannelException 異常,因爲 FileInputStream 對象總是以 read-only 的權限打開文件。
通道會連接一個特定 I/O 服務且通道實例(channel instance)的性能受它所連接的 I/O 服務的特徵限制,記住這很重要。一個連接到只讀文件的 Channel 實例不能進行寫操作,即使該實例所屬的類可能有 write( )方法。基於此,程序員需要知道通道是如何打開的,避免試圖嘗試一個底層 I/O服務不允許的操作。
通道可以以阻塞(blocking)或非阻塞(nonblocking)模式運行。非阻塞模式的通道永遠不會讓調用的線程休眠。請求的操作要麼立即完成,要麼返回一個結果表明未進行任何操作。只有面向流的(stream-oriented)的通道,如 sockets 和 pipes 才能使用非阻塞模式。
選擇器Selector
選擇器提供選擇執行已經就緒的任務的能力,這使得多元I/O成爲可能,就緒選擇和多元執行使得單線程能夠有效率的同時管理多個I/O通道(channels),簡單言之就是selector充當一個監視者,您需要將之前創建的一個或多個可選擇的通道註冊到選擇器對象中。一個表示通道和選擇器的鍵將會被返回。選擇鍵會記住您關心的通道。它們也會追蹤對應的通道是否已經就緒當您調用一個選擇器對象的 select( )方法時,相關的鍵會被更新,用來檢查所有被註冊到該選擇器的通道。您可以獲取一個鍵的集合,從而找到當時已經就緒的通道。通過遍歷這些鍵,您可以選擇出每個從上次您調用 select( )開始直到現在,已經就緒的通道。
傳統的socket監控
傳統的監控多個 socket 的 Java 解決方案是爲每個 socket 創建一個線程並使得線程可以在 read( )調用中阻塞,直到數據可用。這事實上將每個被阻塞的線程當作了 socket 監控器,並將 Java 虛擬機的線程調度當作了通知機制。這兩者本來都不是爲了這種目的而設計的。程序員和 Java 虛擬機都爲管理所有這些線程的複雜性和性能損耗付出了代價,這在線程數量的增長時表現得更爲突出。
選擇器屬性
- 選擇器(Selector)
選擇器類管理着一個被註冊的通道集合的信息和它們的就緒狀態。通道是和選擇器一起被註冊的,並且使用選擇器來更新通道的就緒狀態。當這麼做的時候,可以選擇將被激發的線程掛起,直到有就緒的的通道。
- 可選擇通道(SelectableChannel)
SelectableChannel 可以被註冊到 Selector 對象上,同時可以指定對那個選擇器而言,那種操作是感興趣的。一個通道可以被註冊到多個選擇器上,但對每個選擇器而言只能被註冊一次。
- 選擇鍵(SelectionKey)
選擇鍵封裝了特定的通道與特定的選擇器的註冊關係。選擇鍵對象被SelectableChannel.register( ) 返回並提供一個表示這種註冊關係的標記。選擇鍵包含了兩個比特集(以整數的形式進行編碼),指示了該註冊關係所關心的通道操作,以及通道已經準備好的操作。
下圖體現了就緒選擇註冊和Selector的關係
Selector
一個單獨的通道對象可以被註冊到多個選擇器上。可以調用 isRegistered( )方法來檢查一個通道是否被註冊到任何一個選擇器上。這個方法沒有提供關於通道被註冊到哪個選擇器上的信息,而只能知道它至少被註冊到了一個選擇器上。此外,在一個鍵被取消之後,直到通道被註銷爲止,可能有時間上的延遲。這個方法只是一個提示,而不是確切的答案。
鍵對象
鍵對象表示了一種特定的註冊關係。當應該終結這種關係的時候,可以調用 SelectionKey對象的 cancel( )方法。可以通過調用 isValid( )方法來檢查它是否仍然表示一種有效的關係。當鍵被取消時,它將被放在相關的選擇器的已取消的鍵的集合裏。註冊不會立即被取消,但鍵會立即失效。當再次調用 select( )方法時(或者一個正在進行的 select()調用結束時),已取消的鍵的集合中的被取消的鍵將被清理掉,並且相應的註銷也將完成。通道會被註銷,而新的SelectionKey 將被返回。
SelectionKey 類定義了四個便於使用的布爾方法來爲您測試這些比特值:isReadable( ),isWritable( ),isConnectable( ), 和 isAcceptable( )。每一個方法都與使用特定掩碼來測試 readyOps( )方法的結果的效果相同。例如:
1
2
3
|
if
(key.isWritable( ))
等價於:
if
((key.readyOps( ) & SelectionKey.OP_WRITE) !=
0
)
|
這四個方法在任意一個 SelectionKey 對象上都能安全地調用。不能在一個通道上註冊一個它不支持的操作,這種操作也永遠不會出現在 ready 集合中。調用一個不支持的操作將總是返回 false,因爲這種操作在該通道上永遠不會準備好。
停止選擇過程
有三種方式可以喚醒在select()方法中睡眠的線程。
- 調用wakeup()
調用 Selector 對象的 wakeup( )方法將使得選擇器上的第一個還沒有返回的選擇操作立即返回。如果當前沒有在進行中的選擇,那麼下一次對 select( )方法的一種形式的調用將立即返回。後續的選擇操作將正常進行。在選擇操作之間多次調用 wakeup( )方法與調用它一次沒有什麼不同。有時這種延遲的喚醒行爲並不是您想要的。您可能只想喚醒一個睡眠中的線程,而使得後續的選擇繼續正常地進行。您可以通過在調用 wakeup( )方法後調用 selectNow( )方法來繞過這個問題。儘管如此,如果您將您的代碼構造爲合理地關注於返回值和執行選擇集合,那麼即使下一個 select( )方法的調用在沒有通道就緒時就立即返回,也應該不會有什麼不同。不管怎麼說,您應該爲可能發生的事件做好準備。
- 調用 close( )
如果選擇器的 close( )方法被調用,那麼任何一個在選擇操作中阻塞的線程都將被喚醒,就像wakeup( )方法被調用了一樣。與選擇器相關的通道將被註銷,而鍵將被取消。
- 調用 interrupt( )
如果睡眠中的線程的 interrupt( )方法被調用,它的返回狀態將被設置。如果被喚醒的線程之後將試圖在通道上執行 I/O 操作,通道將立即關閉,然後線程將捕捉到一個異常。這是由於在第三章中已經探討過的通道的中斷語義。使用 wakeup( )方法將會優雅地將一個在 select( )方法中睡眠的線程喚醒。如果您想讓一個睡眠的線程在直接中斷之後繼續執行,需要執行一些步驟來清理中斷狀態
簡單的NIO服務器
下面是一個簡單的NIO服務器的例子,使用select()來爲多個通道提供服務。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
import
java.net.InetSocketAddress;
import
java.net.ServerSocket;
import
java.nio.ByteBuffer;
import
java.nio.channels.SelectableChannel;
import
java.nio.channels.SelectionKey;
import
java.nio.channels.Selector;
import
java.nio.channels.ServerSocketChannel;
import
java.nio.channels.SocketChannel;
import
javax.swing.text.html.HTMLDocument.Iterator;
/**
* Simple echo-back server which listens for incoming stream connections and
* echoes back whatever it reads. A single Selector object is used to listen to
* the server socket (to accept new connections) and all the active socket
* channels.
* @author zale (zalezone.cn)
*/
public
class
SelectSockets {
public
static
int
PORT_NUMBER =
1234
;
public
static
void
main(String[] argv)
throws
Exception
{
new
SelectSockets().go(argv);
}
public
void
go(String[] argv)
throws
Exception
{
int
port = PORT_NUMBER;
if
(argv.length >
0
)
{
// 覆蓋默認的監聽端口
port = Integer.parseInt(argv[
0
]);
}
System.out.println(
"Listening on port "
+ port);
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 打開一個未綁定的serversocketchannel
ServerSocket serverSocket = serverChannel.socket();
// 得到一個ServerSocket去和它綁定
Selector selector = Selector.open();
// 創建一個Selector供下面使用
serverSocket.bind(
new
InetSocketAddress(port));
//設置server channel將會監聽的端口
serverChannel.configureBlocking(
false
);
//設置非阻塞模式
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
//將ServerSocketChannel註冊到Selector
while
(
true
)
{
// This may block for a long time. Upon returning, the
// selected set contains keys of the ready channels.
int
n = selector.select();
if
(n ==
0
)
{
continue
;
// nothing to do
}
java.util.Iterator<SelectionKey> it = selector.selectedKeys().iterator();
// Get an iterator over the set of selected keys
//在被選擇的set中遍歷全部的key
while
(it.hasNext())
{
SelectionKey key = (SelectionKey) it.next();
// 判斷是否是一個連接到來
if
(key.isAcceptable())
{
ServerSocketChannel server =(ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
registerChannel(selector, channel,SelectionKey.OP_READ);
//註冊讀事件
sayHello(channel);
//對連接進行處理
}
//判斷這個channel上是否有數據要讀
if
(key.isReadable())
{
readDataFromSocket(key);
}
//從selected set中移除這個key,因爲它已經被處理過了
it.remove();
}
}
}
// ----------------------------------------------------------
/**
* Register the given channel with the given selector for the given
* operations of interest
*/
protected
void
registerChannel(Selector selector,SelectableChannel channel,
int
ops)
throws
Exception
{
if
(channel ==
null
)
{
return
;
// 可能會發生
}
// 設置通道爲非阻塞
channel.configureBlocking(
false
);
// 將通道註冊到選擇器上
channel.register(selector, ops);
}
// ----------------------------------------------------------
// Use the same byte buffer for all channels. A single thread is
// servicing all the channels, so no danger of concurrent acccess.
//對所有的通道使用相同的緩衝區。單線程爲所有的通道進行服務,所以併發訪問沒有風險
private
ByteBuffer buffer = ByteBuffer.allocateDirect(
1024
);
/**
* Sample data handler method for a channel with data ready to read.
* 對於一個準備讀入數據的通道的簡單的數據處理方法
* @param key
*
A SelectionKey object associated with a channel determined by
the selector to be ready for reading. If the channel returns
an EOF condition, it is closed here, which automatically
invalidates the associated key. The selector will then
de-register the channel on the next select call.
一個選擇器決定了和通道關聯的SelectionKey object是準備讀狀態。如果通道返回EOF,通道將被關閉。
並且會自動使相關的key失效,選擇器然後會在下一次的select call時取消掉通道的註冊
*/
protected
void
readDataFromSocket(SelectionKey key)
throws
Exception
{
SocketChannel socketChannel = (SocketChannel) key.channel();
int
count;
buffer.clear();
// 清空Buffer
// Loop while data is available; channel is nonblocking
//當可以讀到數據時一直循環,通道爲非阻塞
while
((count = socketChannel.read(buffer)) >
0
)
{
buffer.flip();
// 將緩衝區置爲可讀
// Send the data; don't assume it goes all at once
//發送數據,不要期望能一次將數據發送完
while
(buffer.hasRemaining())
{
socketChannel.write(buffer);
}
// WARNING: the above loop is evil. Because
// it's writing back to the same nonblocking
// channel it read the data from, this code can
// potentially spin in a busy loop. In real life
// you'd do something more useful than this.
//這裏的循環是無意義的,具體按實際情況而定
buffer.clear();
// Empty buffer
}
if
(count <
0
)
{
// Close channel on EOF, invalidates the key
//讀取結束後關閉通道,使key失效
socketChannel.close();
}
}
// ----------------------------------------------------------
/**
* Spew a greeting to the incoming client connection.
*
* @param channel
*
The newly connected SocketChannel to say hello to.
*/
private
void
sayHello(SocketChannel channel)
throws
Exception
{
buffer.clear();
buffer.put(
"Hi there!\r\n"
.getBytes());
buffer.flip();
channel.write(buffer);
}
}
|
原理解釋
上面這個例子實現了一個簡單的服務器,它創建了 ServerSocketChannel 和 Selector 對象,並將通道註冊到選擇器上。我們不在註冊的鍵中保存服務器 socket 的引用,因爲它永遠不會被註銷。這個無限循環在最上面先調用了 select( ),這可能會無限期地阻塞。當選擇結束時,就遍歷選擇鍵並檢查已經就緒的通道。
如果一個鍵指示與它相關的通道已經準備好執行一個 accecpt( )操作,我們就通過鍵獲取關聯的通道,並將它轉換爲 SeverSocketChannel 對象。我們都知道這麼做是安全的,因爲只有ServerSocketChannel 支持 OP_ACCEPT 操作。我們也知道我們的代碼只把對一個單一的ServerSocketChannel 對象的 OP_ACCEPT 操作進行了註冊。通過對服務器 socket 通道的引用,我 們調用了它 的 accept( )方法 ,來獲取剛到達 的 socket 的句 柄。返回的 對象的類型 是
SocketChannel,也是一個可選擇的通道類型。這時,與創建一個新線程來從新的連接中讀取數據不同,我們只是簡單地將 socket 同多註冊到選擇器上。我們通過傳入 OP_READ 標記,告訴選擇器我們關心新的 socket 通道什麼時候可以準備好讀取數據。
如果鍵指示通道還沒有準備好執行 accept( ),我們就檢查它是否準備好執行 read( )。任何一個這麼指示的 socket 通道一定是之前 ServerSocketChannel 創建的 SocketChannel 對象之一,並且被註冊爲只對讀操作感興趣。對於每個有數據需要讀取的 socket 通道,我們調用一個公共的方法來讀取並處理這個帶有數據的 socket。需要注意的是這個公共方法需要準備好以非阻塞的方式處理 socket 上的不完整的數據。它需要迅速地返回,以其他帶有後續輸入的通道能夠及時地得到處理。例 4-1 中只是簡單地對數據進行響應,將數據寫回 socket,傳回給發送者。
在循環的底部,我們通過調用 Iterator(迭代器)對象的 remove()方法,將鍵從已選擇的鍵的集合中移除。鍵可以直接從 selectKeys()返回的 Set 中移除,但同時需要用 Iterator 來檢查集合,您需要使用迭代器的 remove()方法來避免破壞迭代器內部的狀態。
併發性
選擇器對象是線程安全的,但它們包含的鍵集合不是。通過 keys( )和 selectKeys( )返回的鍵的集合是 Selector 對象內部的私有的 Set 對象集合的直接引用。這些集合可能在任意時間被改變。已註冊的鍵的集合是隻讀的。如果您試圖修改它,那麼您得到的獎品將是一個java.lang.UnsupportedOperationException,但是當您在觀察它們的時候,它們可能發生了改變的話,您仍然會遇到麻煩。Iterator 對象是快速失敗的(fail-fast):如果底層的 Set 被改變了,它們將會拋出 java.util.ConcurrentModificationException,因此如果您期望在多個線程間共享選擇器和/或鍵,請對此做好準備。您可以直接修改選擇鍵,但請注意您這麼做時可能會徹底破壞另一個線程的 Iterator。
如果在多個線程併發地訪問一個選擇器的鍵的集合的時候存在任何問題,您可以採取一些步驟來合理地同步訪問。在執行選擇操作時,選擇器在 Selector 對象上進行同步,然後是已註冊的鍵的集合,最後是已選擇的鍵的集合,按照這樣的順序。已取消的鍵的集合也在選擇過程的的第 1步和第 3 步之間保持同步(當與已取消的鍵的集合相關的通道被註銷時)。
在多線程的場景中,如果您需要對任何一個鍵的集合進行更改,不管是直接更改還是其他操作帶來的副作用,您都需要首先以相同的順序,在同一對象上進行同步。鎖的過程是非常重要的。如果競爭的線程沒有以相同的順序請求鎖,就將會有死鎖的潛在隱患。如果您可以確保否其他線程不會同時訪問選擇器,那麼就不必要進行同步了。
Selector 類的 close( )方法與 slect( )方法的同步方式是一樣的,因此也有一直阻塞的可能性。在選擇過程還在進行的過程中,所有對 close( )的調用都會被阻塞,直到選擇過程結束,或者執行選擇的線程進入睡眠。在後面的情況下,執行選擇的線程將會在執行關閉的線程獲得鎖時立即被喚醒,並關閉選擇器。
選擇過程的可擴展性
對於單CPU的系統用一個線程來爲多個通道提供服務可能是個好主意,但是對於多個CPU的系統來說就可能不能使其他CPU高效發揮作用。
一個比較好的優化策略是對所有的可選擇通道使用一個選擇器,並將對就緒通道的服務委託給其他線程。根據部署的條件,線程池的大小是可以調整的(或者它自己進行動態的調整)。
另外,有些通道要求比其他通道有更高的響應速度,可以通過使用兩個選擇器來解決:一個爲命令連接服務,另一個爲普通連接服務。與將所有準備好的通道放到同一個線程池的做法不同,通道可以根據功能由不同的工作線程來處理。它們可能可以是日誌線程池,命令/控制線程池,狀態請求線程池,等等。
服務線程池服務器示例
這個例子是上一個簡單服務器的一般性的選擇循環的擴展。它覆寫了 readDataFromSocket( )方法,並使用線程池來爲準備好數據用於讀取的通道提供服務。與在主線程中同步地讀取數據不同,這個版本的實現將 SelectionKey 對象傳遞給爲其服務的工作線程。
使用線程池來爲通道提供服務
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
|
import
java.io.IOException;
import
java.nio.ByteBuffer;
import
java.nio.channels.SelectionKey;
import
java.nio.channels.SocketChannel;
import
java.util.LinkedList;
import
java.util.List;
/**
* Specialization of the SelectSockets class which uses a thread pool to service
* channels. The thread pool is an ad-hoc implementation quicky lashed togther
* in a few hours for demonstration purposes. It's definitely not production
* quality.
*
*/
public
class
SelectSocketsThreadPool
extends
SelectSockets
{
private
static
final
int
MAX_THREADS =
5
;
private
ThreadPool pool =
new
ThreadPool(MAX_THREADS);
// -------------------------------------------------------------
public
static
void
main(String[] argv)
throws
Exception
{
new
SelectSocketsThreadPool().go(argv);
}
// -------------------------------------------------------------
/**
* Sample data handler method for a channel with data ready to read. This
* method is invoked from(被調用) the go( ) method in the parent class. This handler
* delegates(委託) to a worker thread in a thread pool to service the channel,
* then returns immediately.
*
* @param key
*
A SelectionKey object representing a channel determined by the
*
selector to be ready for reading. If the channel returns an
*
EOF condition, it is closed here, which automatically
*
invalidates the associated key. The selector will then
*
de-register the channel on the next select call.
*/
protected
void
readDataFromSocket(SelectionKey key)
throws
Exception
{
WorkerThread worker = pool.getWorker();
if
(worker ==
null
)
{
// No threads available. Do nothing. The selection
// loop will keep calling this method until a
// thread becomes available. This design could
// be improved.
return
;
}
// Invoking this wakes up the worker thread, then returns
worker.serviceChannel(key);
}
// ---------------------------------------------------------------
/**
* A very simple thread pool class. The pool size is set at construction
* time and remains fixed. Threads are cycled through a FIFO idle queue.
*/
private
class
ThreadPool
{
List idle =
new
LinkedList();
ThreadPool(
int
poolSize)
{
// Fill up the pool with worker threads
for
(
int
i =
0
; i < poolSize; i++)
{
WorkerThread thread =
new
WorkerThread(
this
);
// Set thread name for debugging. Start it.
thread.setName(
"Worker"
+ (i +
1
));
thread.start();
idle.add(thread);
}
}
/**
* Find an idle worker thread, if any. Could return null.
*/
WorkerThread getWorker()
{
WorkerThread worker =
null
;
synchronized
(idle)
{
if
(idle.size() >
0
)
{
worker = (WorkerThread) idle.remove(
0
);
}
}
return
(worker);
}
/**
* Called by the worker thread to return itself to the idle pool.
*/
void
returnWorker(WorkerThread worker)
{
synchronized
(idle)
{
idle.add(worker);
}
}
}
/**
* A worker thread class which can drain(排空) channels and echo-back(回顯) the input.
* Each instance is constructed with a reference(參考) to the owning thread pool
* object. When started, the thread loops forever waiting to be awakened to
* service the channel associated with a SelectionKey object. The worker is
* tasked by calling its serviceChannel( ) method with a SelectionKey
* object. The serviceChannel( ) method stores the key reference in the
* thread object then calls notify( ) to wake it up. When the channel has
* been drained, the worker thread returns itself to its parent pool.
*/
private
class
WorkerThread
extends
Thread
{
private
ByteBuffer buffer = ByteBuffer.allocate(
1024
);
private
ThreadPool pool;
private
SelectionKey key;
WorkerThread(ThreadPool pool)
{
this
.pool = pool;
}
// Loop forever waiting for work to do
public
synchronized
void
run()
{
System.out.println(
this
.getName() +
" is ready"
);
while
(
true
)
{
try
{
// Sleep and release object lock
//休眠並且釋放掉對象鎖
this
.wait();
}
catch
(InterruptedException e)
{
e.printStackTrace();
// Clear interrupt status
this
.interrupted();
}
if
(key ==
null
)
{
continue
;
// just in case
}
System.out.println(
this
.getName() +
" has been awakened"
);
try
{
drainChannel(key);
}
catch
(Exception e)
{
System.out.println(
"Caught '"
+ e +
"' closing channel"
);
// Close channel and nudge selector
try
{
key.channel().close();
}
catch
(IOException ex)
{
ex.printStackTrace();
}
key.selector().wakeup();
}
key =
null
;
// Done. Ready for more. Return to pool
this
.pool.returnWorker(
this
);
}
}
/**
* Called to initiate a unit of work by this worker thread on the
* provided SelectionKey object. This method is synchronized, as is the
* run( ) method, so only one key can be serviced at a given time.
* Before waking the worker thread, and before returning to the main
* selection loop, this key's interest set is updated to remove OP_READ.
* This will cause the selector to ignore read-readiness for this
* channel while the worker thread is servicing it.
* 通過一個被提供SelectionKey對象的工作線程來初始化一個工作集合,這個方法是同步的,所以
* 裏面的run方法只有一個key能被服務在同一個時間,在喚醒工作線程和返回到主循環之前,這個key的
* 感興趣的集合被更新來刪除OP_READ,這將會引起工作線程在提供服務的時候選擇器會忽略讀就緒的通道
*/
synchronized
void
serviceChannel(SelectionKey key)
{
this
.key = key;
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
this
.notify();
// Awaken the thread
}
/**
* The actual code which drains the channel associated with the given
* key. This method assumes the key has been modified prior to
|