JAVA NIO工做原理及代碼示例

簡介:本文主要介紹了JAVA NIO中的Buffer, Channel, Selector的工做原理以及使用它們的若干注意事項,最後是利用它們實現服務器和客戶端通訊的代碼實例。html

歡迎探討,若有錯誤敬請指正 java

如需轉載,請註明出處 http://www.cnblogs.com/nullzx/數組


1. ByteBuffer

1.1直接緩衝區和非直接緩衝區

下面是建立ByteBuffer對象的幾種方式安全

static ByteBuffer服務器

allocate(int capacity)網絡

static ByteBufferapp

allocateDirect(int capacity)dom

static ByteBuffer異步

wrap(byte[] array)socket

static ByteBuffer

wrap(byte[] array, int offset, int length)

allocate方式建立的ByteBuffer對象咱們稱之爲非直接緩衝區,這個ByteBuffer對象(和對象包含的緩衝數組)都位於JVM的堆區。wrap方式和allocate方式建立的ByteBuffer沒有本質區別,都建立的是非直接緩衝區。

allocateDirect方法建立的ByteBuffer咱們稱之爲直接緩衝區,此時ByteBuffer對象自己在堆區,而緩衝數組位於非堆區, ByteBuffer對象內部存儲了這個非堆緩衝數組的地址。在非堆區的緩衝數組能夠經過JNI(內部仍是系統調用)方式進行IO操做,JNI不受gc影響,機器碼執行速度也比較快,同時還避免了JVM堆區與操做系統內核緩衝區的數據拷貝,因此IO速度比非直接緩衝區快。然而allocateDirect方式建立ByteBuffer對象花費的時間和回收該對象花費的時間比較多,因此這個方法適用於建立那些須要重複使用的緩衝區對象。

 

1.2重要屬性和方法

ByteBuffer對象三個重要屬性 position, limitcapacity。其中capacity表示了緩衝區的總容量,始終保持不變,初始時候position 等於 0 , limit 等於 capacity

1) put向緩衝區放入數據

abstract ByteBuffer

put(byte b)

ByteBuffer

put(byte[] src)

ByteBuffer

put(byte[] src, int offset, int length)

調用put方法前,limit應該等於capacity,若是不等於,幾乎能夠確定咱們對緩衝區的操做有誤。在put方法中0到position-1的區域表示有效數據,position到limit之間區域表示空閒區域。put方法會從position的當前位置放入數據,每放入一個數據position增長1,當position等於limit(即空閒區域使用完)時還繼續放入數據就會拋出BufferUnderflowException異常

2)get從緩衝區讀取數據

abstract byte

get()

ByteBuffer

get(byte[] dst)

ByteBuffer

get(byte[] dst, int offset, int length)

在get方法中, 0到position-1的區域表示已讀數據,position到limit之間的區域表示未讀取的數據。每讀取一個數據position增長1,當position等於limit時繼續讀取數據就會拋出BufferUnderflowException異常。

2)flip 將寫模式轉換成讀模式

    public final Buffer flip() {
        limit = position;
        position = 0;
        mark = -1;
        return this;
    }

3)clear清空緩衝區,將讀模式轉換寫模式

    public final Buffer clear() {
        position = 0;
        limit = capacity;
        mark = -1;
        return this;
    }

4)compact保留未讀取的數據,將讀模式轉換寫模式

    public ByteBuffer compact() {

        int pos = position();
        int lim = limit();
        assert (pos <= lim);
        int rem = (pos <= lim ? lim - pos : 0);

        unsafe.copyMemory(ix(pos), ix(0), (long)rem << 0);
        position(rem);
        limit(capacity());
        discardMark();
        return this;

    }

5mark保存當前position的位置到mark變量

    public final Buffer mark() {
        mark = position;
        return this;
    }

6rest將position置爲mark變量中的值

    public final Buffer reset() {
        int m = mark;
        if (m < 0)
            throw new InvalidMarkException();
        position = m;
        return this;
    }

mark方法和rest方法聯合使用可實現從指定位置的重讀。

 

7rewind從頭開始重讀

    public final Buffer rewind() {
        position = 0;
        mark = -1;
        return this;
    }

 

ByteBuffer對象使用時又不少須要注意的地方,自認爲這個API設計的不是很友好。好比必定不能連續兩次調用flip和compact方法,flip方法調用之後不能再調用put方法,等等。要避免這些錯誤,只能在使用ByteBuffer前弄清楚當前緩衝區中0到position-1以及position到limit中數據表示的含義,這纔是避免bug的根本辦法。

從上面的介紹中咱們能夠看出,ByteBuffer對象既能夠讀,也能夠寫。除非咱們能保證在讀操做一次性使用完ByteBuffer對象中的全部數據,而且保證寫入ByteBuffer對象向中的內容所有寫入完成,不然同時用於讀寫的ByteBuffer對象會形成數據的混亂和錯誤。通常來講,咱們都會建立兩個ByteBuffer對象向,一個用於接收數據,另外一個用於發送數據。

 

1.3其它方法

ByteBuffer是面向字節的,爲方便基本數據類型的讀取,ByteBuffer中還提供getInt,putInt,getFloat,putFloat等方法,這些方法方便咱們在緩衝區存取單個基本數據類型。若是須要從基本數據類型數組中寫入到ByteBuffer中,或者從ByteBuffer中讀取到基本數據類型的數組中,那麼咱們能夠經過已建立好的ByteBuffer對象的asXxxBuffer方法建立基本數據類型的Buffer。

abstract CharBuffer

asCharBuffer()

abstract DoubleBuffer

asDoubleBuffer()

abstract FloatBuffer

asFloatBuffer()

abstract IntBuffer

asIntBuffer()

abstract LongBuffer

asLongBuffer()

假設有以下代碼

IntBuffer intBufferObj = byteBufferObj.asIntBuffer();

此時intBufferObj和byteBufferObj對象共享底層的數組。可是比較坑爹的是兩個buffer的position,limit是獨立的,這樣極易產生bug,須要引發咱們注意。

 

1.4 ByteBuffer的編碼和解碼

數據傳輸中咱們使用的是ByteBuffer對象做爲緩衝區,若是在通道兩端咱們通訊的內容是文本數據,這就涉及到ByteBuffer與CharBuffer的轉換。咱們可使用Charset類實現這個轉換的功能。

1)解碼示例

ByteBuffer byteBuffer = ByteBuffer.allocate(128);
byteBuffer.put(new byte[]{-26, -120, -111, -25, -120, -79, -28, -67, -96});
byteBuffer.flip();

/*對獲取utf8的編解碼器*/
Charset utf8 = Charset.forName("UTF-8");
CharBuffer charBuffer = utf8.decode(byteBuffer);/*對bytebuffer中的內容解碼*/

/*array()返回的就是內部的數組引用,編碼之後的有效長度是0~limit*/
char[] charArr = Arrays.copyOf(charBuffer.array(), charBuffer.limit());
System.out.println(charArr); /*運行結果:我愛你*/

 

2)編碼示例

CharBuffer charBuffer = CharBuffer.allocate(128);
charBuffer.append("我愛你");
charBuffer.flip();

/*對獲取utf8的編解碼器*/
Charset utf8 = Charset.forName("UTF-8");
ByteBuffer byteBuffer = utf8.encode(charBuffer); /*對charbuffer中的內容解碼*/

/*array()返回的就是內部的數組引用,編碼之後的有效長度是0~limit*/
byte[] bytes = Arrays.copyOf(byteBuffer.array(), byteBuffer.limit());
System.out.println(Arrays.toString(bytes));
/*運行結果:[-26, -120, -111, -25, -120, -79, -28, -67, -96] */

 

咱們還能夠經過代碼中的utf8編解碼器分別獲取編碼器對象和解碼器對象

CharsetEncoder utf8Encoder = utf8.newEncoder();
CharsetDecoder utf8Decoder = utf8.newDecoder();

而後經過下面編碼器和解碼器提供的方法進行編解碼,其中一些方法可使ByteBuffer和CharBuffer對象循環使用,沒必要每次都產生一個新的對象。

解碼器方法

CharBuffer

decode(ByteBuffer in)

Convenience method that decodes the remaining content of a single input byte buffer into a newly-allocated character buffer.

CoderResult

decode(ByteBuffer in, CharBuffer out, boolean endOfInput)

Decodes as many bytes as possible from the given input buffer, writing the results to the given output buffer.

protected abstract CoderResult

decodeLoop(ByteBuffer in, CharBuffer out)

Decodes one or more bytes into one or more characters.

編碼器方法

ByteBuffer

encode(CharBuffer in)

Convenience method that encodes the remaining content of a single input character buffer into a newly-allocated byte buffer.

CoderResult

encode(CharBuffer in, ByteBuffer out, boolean endOfInput)

Encodes as many characters as possible from the given input buffer, writing the results to the given output buffer.

protected abstract CoderResult

encodeLoop(CharBuffer in, ByteBuffer out)

Encodes one or more characters into one or more bytes.

注意encode和decode方法都會改變源buffer中的position的位置,這點也是容易產生bug的方法。

 

2. Channel

針對四種不一樣的應用場景,有四種不一樣類型的Channel對象。

類型

應用場景

是否阻塞

FileChannel

文件

阻塞

DatagramChannel

UDP協議

阻塞或非阻塞

SocketChannel

TCP協議

阻塞或非阻塞

ServerSocketChannel

用於TCP服務器端的監聽和連接

阻塞或非阻塞

Channel對象的建立都是經過調用內部的open靜態方法實現的,此方法是線程安全的。不論哪一種類型的Channel對象,都有read(要理解爲從通道中讀取,寫入緩衝區中)和write(要理解爲從緩衝區中讀取數據,寫入到通道中)方法,並且read和write方法都只針對ByteBuffer對象。

當咱們要獲取由通道傳輸過來的數據時,先調用channel.read(byteBufferObj)方法,這個方法在內部調用了byteBufferObj對象的put方法,將通道中的數據寫入緩衝區中。當咱們要獲取由通道傳輸來的數據時,調用byteBufferObj.flip(),而後調用byteBufferObj的get方法獲取通道傳過來的數據,最後調用clear或compact方法轉換成寫模式,爲下次channel.read作準備。

當咱們要向通道發送數據時,先調channel.write(byteBufferObj)方法,這個方法內部調用了byteBufferObj的get方法獲取數據,而後將數據寫入通道中。當寫入完成後調用clear或compact方法轉換成寫模式,爲下次channel.write寫入緩衝區取作準備。

 

2.1 FileChannel

在文件通道中readwrite方法都是阻塞的,對於read方法,除非遇到文件結束,不然會把緩衝區的剩餘空間讀滿再返回。對於write方法,會一次性把緩衝區中的內容所有寫入到文件中才會返回。

下面的代碼展現了FileChannel的功能,首先向文本文件中寫入utf8格式的中英文混合字符,而後再讀取出來。讀寫過程當中都涉及到編解碼問題。

package nioDemo;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
package nioDemo;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.nio.file.Paths;

public class FileChannelDemo {
	
	public static void main(String[] args){ 
		
		/*建立文件,向文件中寫入數據*/
		try {
			
			/*若是文件不存在,建立該文件,文件後綴是否是文本文件不重要*/
			File file = new File("E:/noi_utf8.data");
			if(!file.exists()){
				file.createNewFile();
			}
			
			/*根據文件輸出流建立與這個文件相關的通道*/
			FileOutputStream fos = new FileOutputStream(file);
			FileChannel fc = fos.getChannel();
			
			/*建立ByteBuffer對象, position = 0, limit = 64*/
			ByteBuffer bb = ByteBuffer.allocate(64);
			
			/*向ByteBuffer中放入字符串UTF-8的字節, position = 17, limit = 64*/
			bb.put("Hello,World 123 \n".getBytes("UTF-8"));
			
			/*flip方法  position = 0, limit = 17*/
			bb.flip();
			
			/*write方法使得ByteBuffer的position到 limit中的元素寫入通道中*/
			fc.write(bb);
			
			/*clear方法使得position = 0, limit = 64*/
			bb.clear();
			
			/*下面的代碼同理*/
			bb.put("你好,世界 456".getBytes("UTF-8"));
			bb.flip();
			
			fc.write(bb);
			bb.clear();
			
			fos.close();
			fc.close();
			
		} catch (FileNotFoundException e) {
			
		} catch (IOException e) {
			System.out.println(e);
		}
		
		
		/*從剛纔的文件中讀取字符序列*/
		try {
			
			/*經過Path對象建立文件通道*/
			Path path = Paths.get("E:/noi_utf8.data");
			FileChannel fc = FileChannel.open(path);
			
			ByteBuffer bb = ByteBuffer.allocate((int) fc.size()+1);
			
			Charset utf8 = Charset.forName("UTF-8");
			
			/*阻塞模式,讀取完成才能返回*/
			fc.read(bb);
			
			bb.flip();
			CharBuffer cb = utf8.decode(bb);
			System.out.print(cb.toString());
			bb.clear();
			

			fc.close();
			
		} catch (IOException e) {
			e.printStackTrace();
		}
		
	}
}

 

2.2 ServerSocketChannel

服務器端用於建立TCP鏈接的通道,只能對accept事件感興趣。accept方法會返回一個已和客戶端鏈接好的SocketChannel通道,它才服務器是真正傳輸數據的通道。

 

2.3 SocketChannel

TCP客戶端和TCP服務器端都用它來傳輸數據。

客戶端必須調用connect方法去鏈接服務器。在非阻塞通模式中,該方法將當前通道加入到選擇器的已註冊集合中,而後經過異步方式進行建立TCP鏈接,而後該方法馬上返回。注意調用該方法後並不表示已經建立好了TCP鏈接,若是這個方法返回false,稍後必須調用finishConnect方法來完成客戶端到服務器的tcp鏈接。在阻塞方式中,connect方法會阻塞直到建立好了TCP鏈接。

finishConnect在非阻塞模式中僅僅是返回鏈接的狀態。返回true時,表示鏈接建立好了。在阻塞模式下,直接調用方法connect便可完成鏈接,不須要使用finishConnect。

非阻塞模式下,讀寫操做要配合選擇器一塊兒使用。在阻塞模式下,建立好TCP鏈接後就能夠直接對通道進行讀寫操做。

 

2.4 DatagramChannel

connect方法僅用於客戶端到服務器端的鏈接,鏈接的做用僅僅是避免每次發送和接受數據時的安全檢查,提升發送和接受數據的效率,而不是像TCP鏈接那樣表示握手的意思。客戶端通道只有調用了connect方法後,才能使用read和write方法讀寫數據。

客戶端也能夠不事先調用connet方法,而直接使用receive方法和send方法來實現數據的收發。

abstract SocketAddress

receive(ByteBuffer dst)

abstract int

send(ByteBuffer src, SocketAddress target)

 

2.5 服務器端DatagramChannel和SocketChannel的區別

對於服務器端DatagramChannel(UDP)和SocketChannel(TCP)有明顯的區別,對於TCP鏈接,服務器端每建立一個鏈接就對應一個通道(不一樣的客戶端ip:port地址對應一個通道),而服務器端UDP的鏈接始終只有一個通道,全部客戶端發送過來的報文都存放於同一個緩衝區中,這顯然會下降服務器端的效率,好在DatagramChannel對象是線程安全的,能夠用多個線程讀寫同一個UDP通道。

服務器端爲何只有一個通道呢?我猜測由於UDP是無狀態的,不知道什麼時客戶端會發送數據,何時數據又發送完成,因此服務器端沒有辦法爲每一個客戶端建立一個通道,就算服務器端根據客戶端ip:port爲每一個客戶端建立了通道,服務器端也不知道何時該釋放這個通道,這就形成了資源的浪費。

 

4. Selector

Selector類表示選擇器,經過這個類的對象能夠選取已就緒的通道和這個通道感興趣的事件。經過靜態open方法建立。

 

4.1註冊

通道能夠經過它的register方法,將通道註冊到選擇器上。

SelectionKey

register(Selector sel, int ops)

Registers this channel with the given selector, returning a selection key.

abstract SelectionKey

register(Selector sel, int ops, Object att)

Registers this channel with the given selector, returning a selection key.

這個該方法會返回一個SeletctKey對象,但在這裏咱們一般忽略這個返回值。SeletctionKey對象內部包含了這個註冊的通道和這個通道感興趣的事件(ops參數),以及附帶的對象(由att參數傳遞),這個附帶的對象一般就是和這個通道相關的讀寫緩衝區。

 

4.2通道的選擇與取消

abstract int

select()

Selects a set of keys whose corresponding channels are ready for I/O operations.

abstract int

select(long timeout)

Selects a set of keys whose corresponding channels are ready for I/O operations.

abstract int

selectNow()

Selects a set of keys whose corresponding channels are ready for I/O operations.

三個方法的返回值都表示就緒通道的數量。

select()方法是個阻塞方法,有通道就緒纔會返回。

select(long timeout),最多阻塞timeout毫秒,即便沒有通道就緒也會返回,若超時返回,則當前線程中斷標誌位被設置。若阻塞時間內有通道就緒,就提早返回。

seletor.selectNow(),非阻塞方法。

一個seletor對象內部維護了三個集合。

1)已註冊集合:表示了全部已註冊通道的SelectionKey對象。

2)就緒集合:表示了全部已就緒通道的SelectionKey對象。

3)取消集合:表示了全部須要取消註冊關係的通道的SelectionKey對象。

SelectionKey的cancel方法用於取消通道和選擇器的註冊關係,這個方法只是把表示當前通道的SelectionKey放入取消集合中,下次調用select方法時纔會真正取消註冊關係。

select方法每次會從已註冊的通道集合中刪除全部已取消的通道的SelectionKey,而後清空已取消的通道集合,最後從更新過的已註冊通道集合中選出就緒的通道,放入已就緒的集合中。每次調用select方法,會向已就緒的集合中放入已就緒通道的SelectionKey對象,調用selectedKeys 方法就會返回這個已就緒通道集合的引用。當咱們處理完一個已就緒通道,該通道對應的SelectionKey對象仍然位於已就緒的集合中,這就要求咱們處理一個已就緒的通道後就必須手動從已就緒的集合中刪除它,不然下次調用selectedKeys時,已處理過的通道還存在於這個集合中,致使線程空轉。這裏也是極易產生bug的。

 

4.3通道的寫方法注意事項

1)寫方法何時就緒?

寫操做的就緒條件爲socket底層寫緩衝區有空閒空間,此時並不表明咱們這時有(或者須要將)數據寫入通道。而底層寫緩衝區絕大部分時間都是有空閒空間的,因此當你註冊寫事件後,寫操做基本一直是就緒的。這就致使只要有一個通道對寫事件感興趣,select方法幾乎老是馬上返回的,可是實際上咱們可能沒有數據可寫的,因此使得調用select方法的線程老是空轉。對於客戶端發送一些數據,客戶端返回一些數據的模型,咱們能夠在讀事件完成後,再設置通道對寫事件感興趣,寫操做完成後再取消該通道對寫事件的興趣,這樣就能夠避免上述問題。

 

2)如何正確的發送數據

while(writeBuffer.hasRemaining()){
    channel.write(writeBuffer);
}

 

上面發送數據的一般用的代碼,當網絡情況良好的狀況下,這段代碼能正常工做。 如今咱們考慮一種極端狀況,服務器端寫事件就緒,咱們向底層的寫緩衝區寫入一些數據後,服務器端到客戶端的鏈路出現問題,服務器端沒能把數據發送出去,此時底層的寫緩衝區一直處於滿的狀態,假設writeBuffer中仍然還有沒發送完的數據就會致使while循環空轉,浪費CPU資源,同時也妨礙這個selector管理的其它通道的讀寫。

爲了解決個問題,咱們應該使用下面的方法發送數據

int len = 0;
while(writeBuffer.hasRemaining()){
	len = sc.write(writeBuffer);
	/*說明底層的socket寫緩衝已滿*/
	if(len == 0){
		break;
	}
}

 

5. 代碼示例

下面這個類,後面的代碼都會用到,它只是兩個緩衝區的包裝

package nioDemo;

import java.nio.ByteBuffer;

/*自定義Buffer類中包含讀緩衝區和寫緩衝區,用於註冊通道時的附加對象*/
public class Buffers {

	ByteBuffer readBuffer;
	ByteBuffer writeBuffer;
	
	public Buffers(int readCapacity, int writeCapacity){
		readBuffer = ByteBuffer.allocate(readCapacity);
		writeBuffer = ByteBuffer.allocate(writeCapacity);
	}
	
	public ByteBuffer getReadBuffer(){
		return readBuffer;
	}
	
	public ByteBuffer gerWriteBuffer(){
		return writeBuffer;
	}
}

 

5.1 TCP非阻塞示例

1)服務器端代碼

package nioDemo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;


/*服務器端,:接收客戶端發送過來的數據並顯示,
 *服務器把上接收到的數據加上"echo from service:"再發送回去*/
public class ServiceSocketChannelDemo {
	
	public static class TCPEchoServer implements Runnable{
		
		/*服務器地址*/
		private InetSocketAddress localAddress;
		
		public TCPEchoServer(int port) throws IOException{
			this.localAddress = new InetSocketAddress(port);
		}
		
		
		@Override
		public void run(){
			
			Charset utf8 = Charset.forName("UTF-8");
			
			ServerSocketChannel ssc = null;
			Selector selector = null;
			
			Random rnd = new Random();
			
			try {
				/*建立選擇器*/
				selector = Selector.open();
				
				/*建立服務器通道*/
				ssc = ServerSocketChannel.open();
				ssc.configureBlocking(false);
				
				/*設置監聽服務器的端口,設置最大鏈接緩衝數爲100*/
				ssc.bind(localAddress, 100);
				
				/*服務器通道只能對tcp連接事件感興趣*/
				ssc.register(selector, SelectionKey.OP_ACCEPT);
				
			} catch (IOException e1) {
				System.out.println("server start failed");
				return;
			} 
			
			System.out.println("server start with address : " + localAddress);
			
			/*服務器線程被中斷後會退出*/
			try{
				
				while(!Thread.currentThread().isInterrupted()){
					
					int n = selector.select();
					if(n == 0){
						continue;
					}

					Set<SelectionKey> keySet = selector.selectedKeys();
					Iterator<SelectionKey> it = keySet.iterator();
					SelectionKey key = null;
					
					while(it.hasNext()){
							
						key = it.next();
						/*防止下次select方法返回已處理過的通道*/
						it.remove();
						
						/*若發現異常,說明客戶端鏈接出現問題,但服務器要保持正常*/
						try{
							/*ssc通道只能對連接事件感興趣*/
							if(key.isAcceptable()){
								
								/*accept方法會返回一個普統統道,
								     每一個通道在內核中都對應一個socket緩衝區*/
								SocketChannel sc = ssc.accept();
								sc.configureBlocking(false);
								
								/*向選擇器註冊這個通道和普統統道感興趣的事件,同時提供這個新通道相關的緩衝區*/
								int interestSet = SelectionKey.OP_READ; 							
								sc.register(selector, interestSet, new Buffers(256, 256));
								
								System.out.println("accept from " + sc.getRemoteAddress());
							}
							
							
							/*(普通)通道感興趣讀事件且有數據可讀*/
							if(key.isReadable()){
								
								/*經過SelectionKey獲取通道對應的緩衝區*/
								Buffers  buffers = (Buffers)key.attachment();
								ByteBuffer readBuffer = buffers.getReadBuffer();
								ByteBuffer writeBuffer = buffers.gerWriteBuffer();
								
								/*經過SelectionKey獲取對應的通道*/
								SocketChannel sc = (SocketChannel) key.channel();
								
								/*從底層socket讀緩衝區中讀入數據*/
								sc.read(readBuffer);
								readBuffer.flip();
								
								/*解碼顯示,客戶端發送來的信息*/
								CharBuffer cb = utf8.decode(readBuffer);
								System.out.println(cb.array());
					
								readBuffer.rewind();

								
								/*準備好向客戶端發送的信息*/
								/*先寫入"echo:",再寫入收到的信息*/
								writeBuffer.put("echo from service:".getBytes("UTF-8"));
								writeBuffer.put(readBuffer);
								
								readBuffer.clear();
								
								/*設置通道寫事件*/
								key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
																
							}
							
							/*通道感興趣寫事件且底層緩衝區有空閒*/
							if(key.isWritable()){
								
								Buffers  buffers = (Buffers)key.attachment();
								
								ByteBuffer writeBuffer = buffers.gerWriteBuffer();
								writeBuffer.flip();
								
								SocketChannel sc = (SocketChannel) key.channel();
								
								int len = 0;
								while(writeBuffer.hasRemaining()){
									len = sc.write(writeBuffer);
									/*說明底層的socket寫緩衝已滿*/
									if(len == 0){
										break;
									}
								}
								
								writeBuffer.compact();
								
								/*說明數據所有寫入到底層的socket寫緩衝區*/
								if(len != 0){
									/*取消通道的寫事件*/
									key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
								}
								
							}
						}catch(IOException e){
							System.out.println("service encounter client error");
							/*若客戶端鏈接出現異常,從Seletcor中移除這個key*/
							key.cancel();
							key.channel().close();
						}

					}
						
					Thread.sleep(rnd.nextInt(500));
				}
				
			}catch(InterruptedException e){
				System.out.println("serverThread is interrupted");
			} catch (IOException e1) {
				System.out.println("serverThread selecotr error");
			}finally{
				try{
					selector.close();
				}catch(IOException e){
					System.out.println("selector close failed");
				}finally{
					System.out.println("server close");
				}
			}

		}
	}
	
	public static void main(String[] args) throws InterruptedException, IOException{
		Thread thread = new Thread(new TCPEchoServer(8080));
		thread.start();
		Thread.sleep(100000);
		/*結束服務器線程*/
		thread.interrupt();
	}
	
}

 

2)客戶端程序

package nioDemo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;

/*客戶端:客戶端每隔1~2秒自動向服務器發送數據,接收服務器接收到數據並顯示*/
public class ClientSocketChannelDemo {
	
	public static class TCPEchoClient implements Runnable{
		
		/*客戶端線程名*/
		private String name;
		private Random rnd = new Random();
		
		/*服務器的ip地址+端口port*/
		private InetSocketAddress remoteAddress;
		
		public TCPEchoClient(String name, InetSocketAddress remoteAddress){
			this.name = name;
			this.remoteAddress = remoteAddress;
		}
		
		@Override
		public void run(){
			
			/*建立解碼器*/
			Charset utf8 = Charset.forName("UTF-8");
			
			Selector selector;
			
			try {
				
				/*建立TCP通道*/
				SocketChannel sc = SocketChannel.open();
				
				/*設置通道爲非阻塞*/
				sc.configureBlocking(false);
				
				/*建立選擇器*/
				selector = Selector.open();
				
				/*註冊感興趣事件*/
				int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
				
				/*向選擇器註冊通道*/
				sc.register(selector, interestSet, new Buffers(256, 256));
				
				/*向服務器發起鏈接,一個通道表明一條tcp連接*/
				sc.connect(remoteAddress);
				
				/*等待三次握手完成*/
				while(!sc.finishConnect()){
					;
				}

				System.out.println(name + " " + "finished connection");
				
			} catch (IOException e) {
				System.out.println("client connect failed");
				return;
			}
			
			/*與服務器斷開或線程被中斷則結束線程*/
			try{

				int i = 1;
				while(!Thread.currentThread().isInterrupted()){
					
					/*阻塞等待*/
					selector.select();
					
					/*Set中的每一個key表明一個通道*/
					Set<SelectionKey> keySet = selector.selectedKeys();
					Iterator<SelectionKey> it = keySet.iterator();
					
					/*遍歷每一個已就緒的通道,處理這個通道已就緒的事件*/
					while(it.hasNext()){
						
						SelectionKey key = it.next();
						/*防止下次select方法返回已處理過的通道*/
						it.remove();
						
						/*經過SelectionKey獲取對應的通道*/
						Buffers  buffers = (Buffers)key.attachment();
						ByteBuffer readBuffer = buffers.getReadBuffer();
						ByteBuffer writeBuffer = buffers.gerWriteBuffer();
						
						/*經過SelectionKey獲取通道對應的緩衝區*/
						SocketChannel sc = (SocketChannel) key.channel();
						
						/*表示底層socket的讀緩衝區有數據可讀*/
						if(key.isReadable()){
							/*從socket的讀緩衝區讀取到程序定義的緩衝區中*/
							sc.read(readBuffer);
							readBuffer.flip();
							/*字節到utf8解碼*/
							CharBuffer cb = utf8.decode(readBuffer);
							/*顯示接收到由服務器發送的信息*/
							System.out.println(cb.array());
							readBuffer.clear();
						}
						
						/*socket的寫緩衝區可寫*/
						if(key.isWritable()){
							writeBuffer.put((name + "  " + i).getBytes("UTF-8"));
							writeBuffer.flip();
							/*將程序定義的緩衝區中的內容寫入到socket的寫緩衝區中*/
							sc.write(writeBuffer);
							writeBuffer.clear();
							i++;
						}
					}
					
					Thread.sleep(1000 + rnd.nextInt(1000));
				}
			
			}catch(InterruptedException e){
				System.out.println(name + " is interrupted");
			}catch(IOException e){
				System.out.println(name + " encounter a connect error");
			}finally{
				try {
					selector.close();
				} catch (IOException e1) {
					System.out.println(name + " close selector failed");
				}finally{
					System.out.println(name + "  closed");
				}
			}
		}
		
	}
	
	public static void main(String[] args) throws InterruptedException{
		
		InetSocketAddress remoteAddress = new InetSocketAddress("192.168.1.100", 8080);
		
		Thread ta = new Thread(new TCPEchoClient("thread a", remoteAddress));
		Thread tb = new Thread(new TCPEchoClient("thread b", remoteAddress));
		Thread tc = new Thread(new TCPEchoClient("thread c", remoteAddress));
		Thread td = new Thread(new TCPEchoClient("thread d", remoteAddress));
		
		ta.start();
		tb.start();
		tc.start();
		
		Thread.sleep(5000);

		/*結束客戶端a*/
		ta.interrupt();
		
		/*開始客戶端d*/
		td.start();
	}
}

 

5.2 UDP示例

客戶端非阻塞模式,服務器端阻塞模式

1)服務器端代碼(服務器端只有一個通道,對應一個讀緩衝區,一個寫緩衝區,因此使用非阻塞方式容易發生數據混亂)

package nioDemo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.Charset;

public class ServiceDatagramChannelDemo {
	
	public static class UDPEchoService implements Runnable{
		
		private int port;
		
		public UDPEchoService(int port){
			this.port = port;
		}
		
		@Override
		public void run(){
			
			ByteBuffer readBuffer = ByteBuffer.allocate(256);
			ByteBuffer writeBuffer = ByteBuffer.allocate(256);

			DatagramChannel dc = null;
			
			try{
				
				/*服務器端使用默認的阻塞IO的方式*/
				dc = DatagramChannel.open();
				dc.bind(new InetSocketAddress(port));
				
				System.out.println("service start");
				while(!Thread.currentThread().isInterrupted()){
					
					try{
						
						/*先讀取客戶端發送的消息,直到讀取到消息纔會返回*/
						/*只能調用receive方法,由於不知道哪一個地址給服務器發信息,無法實現調用connect方法*/
						/*dc是阻塞的,因此receive方法要等到接收到數據才返回*/
						SocketAddress clientAddress = dc.receive(readBuffer);
						readBuffer.flip();
						CharBuffer charBuffer = Charset.defaultCharset().decode(readBuffer);
						System.out.println(charBuffer.array());
						
						/*調用send方法向客戶端發送的消息,
						 *dc是阻塞的,因此直到send方法把數據所有寫入到socket緩衝區才返回*/
						writeBuffer.put("echo : ".getBytes());
						readBuffer.rewind();
						writeBuffer.put(readBuffer);
						writeBuffer.flip();
						dc.send(writeBuffer, clientAddress);
						
						readBuffer.clear();
						writeBuffer.clear();
						
					}catch(IOException e){
						System.out.println("receive from or send to client failed");
					}
				}
			}catch(IOException e){
				System.out.println("server error");
			}finally{
				try {
					if(dc != null){
						dc.close();
					}
				} catch (IOException e) {

				}
			}
		}
	}
	
	public static void main(String[] args) throws IOException{
		new Thread(new UDPEchoService(8080)).start();
	}
}

 

2)客戶端代碼

package nioDemo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.charset.Charset;
import java.util.Iterator;

public class ClientDatagramChannelDemo {
	
	public static class UDPEchoClient implements Runnable{
		
		private String name;
		private InetSocketAddress serviceAddress;
		
		public UDPEchoClient(String name, InetSocketAddress serviceAddress){
			this.name = name;
			this.serviceAddress = serviceAddress;
		}
		
		@Override
		public void run(){
			DatagramChannel dc = null;
			try{
				
				/*每一個實際上能夠建立多個通道鏈接同一個服務器地址,
				咱們這裏爲了演示方便,只建立了一個通道*/
				dc = DatagramChannel.open();
				
				/*客戶端採用非阻塞模式*/
				dc.configureBlocking(false);

				/*這裏的鏈接不是指TCP的握手鍊接,由於UDP協議自己不須要鏈接,
				 *這裏鏈接的意思大概是提早向操做系統申請好本地端口號,以及高速操做系統要發送的目的
				 *鏈接後的UDP通道能夠提升發送的效率,還能夠調用read和write方法接收和發送數據
				 *未鏈接的UDP通道只能調用receive和send方法接收和發送數據*/
				dc.connect(serviceAddress);
				
				Selector selector = Selector.open();
				int interest = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
				dc.register(selector, interest, new Buffers(256, 256));
				
				int i = 0;
				while(!Thread.currentThread().isInterrupted()){
					
					selector.select();
					
					Iterator<SelectionKey> it = selector.selectedKeys().iterator();
					while(it.hasNext()){
						
						SelectionKey key = it.next();
						it.remove();
						
						Buffers buffers = (Buffers)key.attachment();
						
						ByteBuffer readBuffer = buffers.getReadBuffer();
						ByteBuffer writeBuffer = buffers.gerWriteBuffer();
						
						try{
							
							if(key.isReadable()){
								dc.read(readBuffer);
								readBuffer.flip();
								CharBuffer charBuffer = Charset.defaultCharset().decode(readBuffer);
								System.out.println(charBuffer.array());
								readBuffer.clear();
							}
							
							if(key.isWritable()){
								writeBuffer.put((name + (i++)).getBytes());
								writeBuffer.flip();
								dc.write(writeBuffer);
								writeBuffer.clear();
								
								Thread.sleep(500);
							}
						
						}catch(IOException e){
							key.cancel();
							key.channel().close();
						}
					}
				}
			}catch(InterruptedException e){
				System.out.println(name + "interrupted");
			} catch (IOException e) {
				System.out.println(name + "encounter connect error");
			} finally{
				try {
					dc.close();
				} catch (IOException e) {
					System.out.println(name + "encounter close error");
				}finally{
					System.out.println(name + "closed");
				}
			}
		}
	}
	
	public static void main(String[] args){
		
		InetSocketAddress serviceAddress = new InetSocketAddress("192.168.1.100", 8080);
		
		UDPEchoClient clientA = new UDPEchoClient("thread a ", serviceAddress);
		UDPEchoClient clientB = new UDPEchoClient("thread b ", serviceAddress);
		UDPEchoClient clientC = new UDPEchoClient("thread c ", serviceAddress);
		
		new Thread(clientA).start();
		new Thread(clientB).start();
		new Thread(clientC).start();
		
	}
}

 

6. 參考內容

[1] 堆外內存之 DirectByteBuffer 詳解

[2] SocketChannel---各類注意點

[3] JDK 8 API 文檔

相關文章
相關標籤/搜索