NIO相關基礎篇二

轉載請註明原創出處,謝謝!java

上篇NIO相關基礎篇一,主要介紹了一些基本的概念以及緩衝區(Buffer)和通道(Channel),本篇繼續NIO相關話題內容,主要就是文件鎖、以及比較關鍵的Selector,後續還會繼續有一到二篇左右與NIO內容相關。安全

文件鎖(FileLock)

在看RocketMQ源碼中,發現有關於文件鎖的import,可是具體使用代碼裏面註釋調了[回頭看看爲何,理解下,到時候會在某篇文章裏進行說明](實現一個事情的方法不少,因此不必定就一種),可是爲了知識的完整性,仍是準備講下文件鎖,可能之後或者那個地方可使用,或者你們在那裏使用到均可以繼續留言討論。服務器

文件鎖和其餘咱們瞭解併發裏面的鎖不少概念相似,當多我的同時操做一個文件的時候,只有第一我的能夠進行編輯,其餘要麼關閉(等第一我的操做完成以後能夠操做),要麼以只讀的方式進行打開。網絡

在java nio中提供了新的鎖文件功能,當一個線程將文件鎖定以後,其餘線程沒法操做此文件,文件的鎖操做是使用FileLock類來進行完成的,此類對象須要依賴FileChannel進行實例化。併發

文件鎖方式異步

  • 共享鎖:容許多個線程進行文件讀取。
  • 獨佔鎖:只容許一個線程進行文件的讀寫操做。

備註:文件鎖定以整個 Java 虛擬機來保持。但它們不適用於控制同一虛擬機內多個線程對文件的訪問。 多個併發線程可安全地使用文件鎖定對象。socket

Java文件依賴FileChannel的主要涉及以下4個方法:編輯器

方法 說明
lock() 獲取對此通道的文件的獨佔鎖定。
lock(long position, long size, boolean shared) 獲取此通道的文件給定區域上的鎖定。
tryLock() throws IOException 試圖獲取對此通道的文件的獨佔鎖定。
tryLock(long position, long size, boolean shared) throws IOException 試圖獲取對此通道的文件給定區域的鎖定。
lock()等同於lock(0L, Long.MAX_VALUE, false)
tryLock()等同於tryLock(0L, Long.MAX_VALUE, false)

lock()

tryLock()

lock()和tryLock()的區別函數

  • lock()阻塞的方法,鎖定範圍能夠隨着文件的增大而增長。無參lock()默認爲獨佔鎖;有參lock(0L, Long.MAX_VALUE, true)爲共享鎖。
  • tryLock()非阻塞,當未得到鎖時,返回null。無參tryLock()默認爲獨佔鎖;有參tryLock(0L, Long.MAX_VALUE, true)爲共享鎖。

簡單實例代碼:工具

File file = new File("d:" + File.separator + "test.txt") ;
FileOutputStream output = null ;
FileChannel fout = null ;
try {
    output = new FileOutputStream(file,true) ;
    fout = output.getChannel() ;// 獲得通道
    FileLock lock = fout.tryLock() ; // 進行獨佔鎖的操做
    if(lock!=null){
	System.out.println(file.getName() + "文件鎖定") ;
	Thread.sleep(5000) ;
	lock.release() ;	// 釋放
	System.out.println(file.getName() + "文件解除鎖定。") ;
    }
} catch (IOException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
} finally {
    if(fout!=null){
	try {
	    fout.close();
	} catch (IOException e) {
	    e.printStackTrace();
	}
    }
    if(output!=null){
	try {
	    output.close();
	} catch (IOException e) {
	    e.printStackTrace();
	}
    }
}
複製代碼

運行結果:

**說明:**目前也不多接觸到關於文件鎖的靈活運用,RocketMQ那塊代碼給註釋了,若是那我瞭解,歡迎留言討論。

Selector

說明:

  • FileChannel是可讀可寫的Channel,它必須阻塞,不能用在非阻塞模式中
  • SocketChannel與FileChannel不一樣:新的Socket Channel能在非阻塞模式下運行而且是可選擇的。再也不須要爲每一個socket鏈接指派線程了。使用新的NIO類,一個或多個線程能管理成百上千個活動的socket鏈接,使用Selector對象能夠選擇可用的Socket Channel。

之前的Socket程序是阻塞的,服務器必須始終等待客戶端的鏈接,而NIO能夠經過Selector完成非阻塞操做。

備註:其實NIO主要的功能是解決服務端的通信性能。 上篇NIO相關基礎篇一的知識,立刻這塊也是須要使用到的。

Selector一些主要方法:

方法 說明
open() 打開一個選擇器。
select() 選擇一組鍵,其相應的通道已爲 I/O 操做準備就緒。
selectedKeys() 返回此選擇器的已選擇鍵集。

SelectionKey的四個重要常量:

字段 說明
OP_ACCEPT 用於套接字接受操做的操做集位。
OP_CONNECT 用於套接字鏈接操做的操做集位。
OP_READ 用於讀取操做的操做集位。
OP_WRITE 用於寫入操做的操做集位。

**說明:**其實四個常量就是Selector監聽SocketChannel四種不一樣類型的事件。

若是你對不止一種事件感興趣,那麼能夠用"位或"操做符將常量鏈接起來,以下: int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

#NIO 簡單實例代碼 服務端代碼:

int port = 8000;
// 經過open()方法找到Selector
Selector selector = Selector.open();
// 打開服務器的通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 服務器配置爲非阻塞
serverSocketChannel.configureBlocking(false);
ServerSocket serverSocket = serverSocketChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
// 進行服務的綁定
serverSocket.bind(address);
// 註冊到selector,等待鏈接
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服務器運行,端口:" + 8000);

// 數據緩衝區
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

while (true) {
if ((selector.select()) > 0) { // 選擇一組鍵,而且相應的通道已經準備就緒
	Set<SelectionKey> selectedKeys = selector.selectedKeys();// 取出所有生成的key
	Iterator<SelectionKey> iter = selectedKeys.iterator();
	while (iter.hasNext()) {
		SelectionKey key = iter.next(); // 取出每個key
		if (key.isAcceptable()) {
			ServerSocketChannel server = (ServerSocketChannel) key.channel();
			// 接收新鏈接 和BIO寫法類是都是accept
			SocketChannel client = server.accept();
			// 配置爲非阻塞
			client.configureBlocking(false);
			byteBuffer.clear();
			// 向緩衝區中設置內容
			byteBuffer.put(("當前的時間爲:" + new Date()).getBytes());
			byteBuffer.flip();
			// 輸出內容
			client.write(byteBuffer);
			client.register(selector, SelectionKey.OP_READ);
		} else if (key.isReadable() && key.isValid()) {
			SocketChannel client = (SocketChannel) key.channel();
			byteBuffer.clear();
			// 讀取內容到緩衝區中
			int readSize = client.read(byteBuffer);
			if (readSize > 0) {
				System.out.println("服務器端接受客戶端數據:" + new String(byteBuffer.array(), 0, readSize));
				client.register(selector, SelectionKey.OP_WRITE);
			}
		} else if (key.isWritable() && key.isValid()) {
			SocketChannel client = (SocketChannel) key.channel();
			byteBuffer.clear();
			// 向緩衝區中設置內容
			byteBuffer.put(("歡迎關注匠心零度,已經收到您的反饋,會第一時間回覆您。感謝支持!!!").getBytes());
			byteBuffer.flip();
			// 輸出內容
			client.write(byteBuffer);
			client.register(selector, SelectionKey.OP_READ);
		}
	}
	selectedKeys.clear(); // 清楚所有的key
}
}
複製代碼

客戶端代碼:

// 打開socket通道
SocketChannel socketChannel = SocketChannel.open();
// 設置爲非阻塞方式
socketChannel.configureBlocking(false);
// 經過open()方法找到Selector
Selector selector = Selector.open();
// 註冊鏈接服務端socket動做
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 鏈接
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));

/* 數據緩衝區 */
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

while (true) {
if ((selector.select()) > 0) { // 選擇一組鍵,而且相應的通道已經準備就緒
	Set<SelectionKey> selectedKeys = selector.selectedKeys();// 取出所有生成的key
	Iterator<SelectionKey> iter = selectedKeys.iterator();
	while (iter.hasNext()) {
		SelectionKey key = iter.next(); // 取出每個key
		if (key.isConnectable()) {
			SocketChannel client = (SocketChannel) key.channel();
			if (client.isConnectionPending()) {
				client.finishConnect();
				byteBuffer.clear();
				// 向緩衝區中設置內容
				byteBuffer.put(("isConnect,當前的時間爲:" + new Date()).getBytes());
				byteBuffer.flip();
				// 輸出內容
				client.write(byteBuffer);
			}
			client.register(selector, SelectionKey.OP_READ);
		} else if (key.isReadable() && key.isValid()) {
			SocketChannel client = (SocketChannel) key.channel();
			byteBuffer.clear();
			// 讀取內容到緩衝區中
			int readSize = client.read(byteBuffer);
			if (readSize > 0) {
				System.out.println("客戶端接受服務器端數據:" + new String(byteBuffer.array(), 0, readSize));
				client.register(selector, SelectionKey.OP_WRITE);
			}
		} else if (key.isWritable() && key.isValid()) {
			SocketChannel client = (SocketChannel) key.channel();
			byteBuffer.clear();
			// 向緩衝區中設置內容
			byteBuffer.put(("nio文章學習不少!").getBytes());
			byteBuffer.flip();
			// 輸出內容
			client.write(byteBuffer);
			client.register(selector, SelectionKey.OP_READ);
		}
	}
	selectedKeys.clear(); // 清楚所有的key
}
}
複製代碼

程序運行結果截圖:

**說明:**上面僅僅是一個demo,其實使用nio開發複雜度很高的,須要考慮:鏈路的有效性校驗機制(安全認證、半包和粘包等)、鏈路的斷連重連機制(網絡閃斷重連)、可靠性設計(心跳檢測,消息重發、黑白名單)以及可擴展性的考慮等等,這些都是很複雜,那裏搞很差就容易出錯,看netty 權威指南的時候 記得做者他們那個時候尚未netty,常常出現一些莫名問題須要進行解決,而不少問題netty已經幫咱們解決了,因此有必要好好看看netty了(目前做者也在看netty權威指南,惟一不爽的時候,裏面大量代碼,習慣用工具查看代碼(編輯器查看代碼變色,能夠跳轉等),求netty權威指南代碼地址,看書裏面代碼特別變扭!,謝謝)

簡單聊幾句AIO

雖然NIO在網絡操做中提供了非阻塞方法,可是NIO的IO行爲仍是同步的,對於NIO來講,咱們的業務線程是在IO操做準備好時,才獲得通知,接着就有這個線程自行完成IO操做,可是IO操做的自己其實仍是同步的。

AIO是異步IO的縮寫,相對與NIO來講又進了一步,它不是在IO準備好時再通知線程,而是在IO操做完成後在通知線程,因此AIO是徹底不阻塞的,咱們的業務邏輯看起來就像一個回調函數了。

**備註:**AIO就是簡單提提,NIO尚未搞明白,後續還有一到二篇左右與NIO內容相關,主要談談一些select、poll、epoll、零拷貝等一些內容,若是有關零拷貝比較好的資料,歡迎在留言區進行留言,讓零度也學習下,系統的瞭解下,謝謝!!!

若是讀完以爲有收穫的話,歡迎點贊、關注、加公衆號【匠心零度】。


我的公衆號,歡迎關注,查閱更多精彩歷史!!!

匠心零度公衆號
相關文章
相關標籤/搜索