原創: 老劉 碼農翻身 2017-11-20html
隨着移動互聯網的爆發性增加,小明公司的電子商務系統訪問量愈來愈大,因爲現有系統是個單體的巨型應用,已經沒法知足海量的併發請求,拆分勢在必行。java
在微服務的大潮之中, 架構師小明把系統拆分紅了多個服務,根據須要部署在多個機器上,這些服務很是靈活,能夠隨着訪問量彈性擴展。linux
世界上沒有免費的午飯, 拆分紅多個「微服務」之後雖然增長了彈性,但也帶來了一個巨大的挑戰:服務之間互相調用的開銷。git
好比說:原來用戶下一個訂單須要登陸,瀏覽產品詳情,加入購物車,支付,扣庫存等一系列操做,在單體應用的時候它們都在一臺機器的同一個進程中,說白了就是模塊之間的函數調用,效率超級高。 程序員
如今好了,服務被安置到了不一樣的服務器上,一個訂單流程,幾乎每一個操做都要越網絡,都是遠程過程調用(RPC), 那執行時間、執行效率可遠遠比不上之前了。github
遠程過程調用的初版實現使用了HTTP協議,也就是說各個服務對外提供HTTP接口。 小明發現,HTTP協議雖然簡單明瞭,可是廢話太多,僅僅是給服務器發個簡單的消息都會附帶一大堆無用信息:面試
GET /orders/1 HTTP/1.1 算法
Host: order.myshop.com數據庫
User-Agent: Mozilla/5.0 (Windows NT 6.1; )編程
Accept: text/html;
Accept-Language: en-US,en;
Accept-Encoding: gzip
Connection: keep-alive
......
看看那User-Agent,Accept-Language ,這個協議明顯是爲瀏覽器而生的!可是我這裏是程序之間的調用,用這個HTTP有點虧。
能不能自定義一個精簡的協議? 在這個協議中我只須要把要調用方法名和參數發給服務器便可,根本不用這麼多亂七八糟的額外信息。
可是自定義協議客戶端和服務器端就得直接使用「低級」的Socket了,尤爲是服務器端,得可以處理高併發的訪問請求才行。
小明覆習了一下服務器端的socket編程,最先的Java是所謂的阻塞IO(Blocking IO), 想處理多個socket的鏈接的話須要建立多個線程, 一個線程對應一個。
這種方式寫起來卻是挺簡單的,可是鏈接(socket)多了就受不了了,若是真的有成千上萬個線程同時處理成千上萬個socket,佔用大量的空間不說,光是線程之間的切換就是一個巨大的開銷。
更重要的是,雖然有大量的socket,可是真正須要處理的(能夠讀寫數據的socket)卻很少,大量的線程處於等待數據狀態(這也是爲何叫作阻塞的緣由),資源浪費得讓人心疼。
後來Java爲了解決這個問題,又搞了一個非阻塞IO(NIO:Non-Blocking IO,有人也叫作New IO), 改變了一下思路:經過多路複用的方式讓一個線程去處理多個Socket。
這樣一來,只須要使用少許的線程就能夠搞定多個socket了,線程只須要經過Selector去查一下它所管理的socket集合,哪一個Socket的數據準備好了,就去處理哪一個Socket,一點兒都不浪費。
好了,就是Java NIO了!
小明先定義了一套精簡的RPC的協議,裏邊規定了如何去調用一個服務,方法名和參數該如何傳遞,返回值用什麼格式......等等。而後雄心勃勃地要把這個協議用Java NIO給實現了。
但是美好的理想很快被無情的現實給擊碎, 小明努力了一週就意識到本身陷入了一個大坑之中,Java NIO雖然看起來簡單,可是API仍是太「低級」了,有太多的複雜性,沒有強悍的、一流的編程能力根本沒法駕馭,根本作不到高併發狀況下的可靠和高效。
小明不死心,繼續向領導要人要資源,必定要把這個坑給填上,掙扎了6個月之後,終於實現了一個本身的NIO框架,能夠執行高併發的RPC調用了。
而後又是長達6個月的修修補補,小明常常半夜被叫醒:生產環境的RPC調用沒法返回了! 這樣的Bug不知道改了多少個。
在那些不眠之夜中,小明常常仰天長嘆:我用NIO作個高併發的RPC框架怎麼這麼難吶!
一年以後,自研的框架終於穩定,但是小明也從張大胖那裏聽到了一個讓他崩潰的消息: 小明你知道嗎?有個叫Netty的開源框架,能夠快速地開發高性能的面向協議的服務器和客戶端。 易用、健壯、安全、高效,你能夠在Netty上輕鬆實現各類自定義的協議!我們也試試?
小明趕忙研究,看完後不禁得「淚流滿面」:這東西怎麼不早點出來啊!
好了,這個故事我快編不下去了,要爛尾了。
說說Netty究竟是何方神聖, 要解決什麼問題吧。
像上面小明的例子,想使用Java NIO來實現一個高性能的RPC框架,調用協議,數據的格式和次序都是本身定義的,現有的HTTP根本玩不轉,那使用Netty就是絕佳的選擇。
其實遊戲領域是個更好的例子,長鏈接,自定義協議,高併發,Netty就是絕配。
由於Netty自己就是一個基於NIO的網絡框架, 封裝了Java NIO那些複雜的底層細節,給你提供簡單好用的抽象概念來編程。
注意幾個關鍵詞,首先它是個框架,是個「半成品」,不能開箱即用,你必須得拿過來作點定製,利用它開發出本身的應用程序,而後才能運行(就像使用Spring那樣)。
一個更加知名的例子就是阿里巴巴的Dubbo了,這個RPC框架的底層用的就是Netty。
另一個關鍵詞是高性能,若是你的應用根本沒有高併發的壓力,那就不必定要用Netty了。
netty是基於NIO實現的異步事件驅動的網絡編程框架,學完NIO之後,應該看看netty的實現,netty框架涉及的內容特別多,這裏只介紹netty的基本使用和實現原理,更多擴展的內容將在之後推出。
基於NIO的網絡編程框架Netty
轉自https://sylvanassun.github.io/2017/11/30/2017-11-30-netty_introduction/
Netty是一個基於異步與事件驅動的網絡應用程序框架,它支持快速與簡單地開發可維護的高性能的服務器與客戶端。
所謂事件驅動就是由經過各類事件響應來決定程序的流程,在Netty中處處都充滿了異步與事件驅動,這種特色使得應用程序能夠以任意的順序響應在任意的時間點產生的事件,它帶來了很是高的可伸縮性,讓你的應用能夠在須要處理的工做不斷增加時,經過某種可行的方式或者擴大它的處理能力來適應這種增加。
Netty提供了高性能與易用性,它具備如下特色:
擁有設計良好且統一的API,支持NIO與OIO(阻塞IO)等多種傳輸類型,支持真正的無鏈接UDP Socket。
簡單而強大的線程模型,可高度定製線程(池)。
良好的模塊化與解耦,支持可擴展和靈活的事件模型,能夠很輕鬆地分離關注點以複用邏輯組件(可插拔的)。
性能高效,擁有比Java核心API更高的吞吐量,經過zero-copy功能以實現最少的內存複製消耗。
內置了許多經常使用的協議編解碼器,如HTTP、SSL、WebScoket等常見協議能夠經過Netty作到開箱即用。用戶也能夠利用Netty簡單方便地實現本身的應用層協議。
大多數人使用Netty主要仍是爲了提升應用的性能,而高性能則離不開非阻塞IO。Netty的非阻塞IO是基於Java NIO的,而且對其進行了封裝(直接使用Java NIO API在高複雜度下的應用中是一項很是繁瑣且容易出錯的操做,而Netty幫你封裝了這些複雜操做)。
Netty簡介
讀完這一章,咱們基本上能夠了解到Netty全部重要的組件,對Netty有一個全面的認識,這對下一步深刻學習Netty是十分重要的,而學完這一章,咱們其實已經能夠用Netty解決一些常規的問題了。
1、先縱覽一下Netty,看看Netty都有哪些組件?
爲了更好的理解和進一步深刻Netty,咱們先整體認識一下Netty用到的組件及它們在整個Netty架構中是怎麼協調工做的。Netty應用中必不可少的組件:
Bootstrap,一個Netty應用一般由一個Bootstrap開始,它主要做用是配置整個Netty程序,串聯起各個組件。
Handler,爲了支持各類協議和處理數據的方式,便誕生了Handler組件。Handler主要用來處理各類事件,這裏的事件很普遍,好比能夠是鏈接、數據接收、異常、數據轉換等。
ChannelInboundHandler,一個最經常使用的Handler。這個Handler的做用就是處理接收到數據時的事件,也就是說,咱們的業務邏輯通常就是寫在這個Handler裏面的,ChannelInboundHandler就是用來處理咱們的核心業務邏輯。
ChannelInitializer,當一個連接創建時,咱們須要知道怎麼來接收或者發送數據,固然,咱們有各類各樣的Handler實現來處理它,那麼ChannelInitializer即是用來配置這些Handler,它會提供一個ChannelPipeline,並把Handler加入到ChannelPipeline。
ChannelPipeline,一個Netty應用基於ChannelPipeline機制,這種機制須要依賴於EventLoop和EventLoopGroup,由於它們三個都和事件或者事件處理相關。
EventLoops的目的是爲Channel處理IO操做,一個EventLoop能夠爲多個Channel服務。
EventLoopGroup會包含多個EventLoop。
Channel表明了一個Socket連接,或者其它和IO操做相關的組件,它和EventLoop一塊兒用來參與IO處理。
Future,在Netty中全部的IO操做都是異步的,所以,你不能馬上得知消息是否被正確處理,可是咱們能夠過一會等它執行完成或者直接註冊一個監聽,具體的實現就是經過Future和ChannelFutures,他們能夠註冊一個監聽,當操做執行成功或失敗時監聽會自動觸發。總之,全部的操做都會返回一個ChannelFuture。
2、Netty是如何處理鏈接請求和業務邏輯的呢?-- Channels、Events 和 IO
Netty是一個非阻塞的、事件驅動的、網絡編程框架。固然,咱們很容易理解Netty會用線程來處理IO事件,對於熟悉多線程編程的人來講,你或許會想到如何同步你的代碼,可是Netty不須要咱們考慮這些,具體是這樣:
一個Channel會對應一個EventLoop,而一個EventLoop會對應着一個線程,也就是說,僅有一個線程在負責一個Channel的IO操做。
關於這些名詞之間的關係,能夠見下圖:
如圖所示:當一個鏈接到達,Netty會註冊一個channel,而後EventLoopGroup會分配一個EventLoop綁定到這個channel,在這個channel的整個生命週期過程當中,都會由綁定的這個EventLoop來爲它服務,而這個EventLoop就是一個線程。
說到這裏,那麼EventLoops和EventLoopGroups關係是如何的呢?咱們前面說過一個EventLoopGroup包含多個Eventloop,可是咱們看一下下面這幅圖,這幅圖是一個繼承樹,從這幅圖中咱們能夠看出,EventLoop其實繼承自EventloopGroup,也就是說,在某些狀況下,咱們能夠把一個EventLoopGroup當作一個EventLoop來用。
3、咱們來看看如何配置一個Netty應用?-- BootsStrapping
咱們利用BootsStrapping來配置netty 應用,它有兩種類型,一種用於Client端:BootsStrap,另外一種用於Server端:ServerBootstrap,要想區別如何使用它們,你僅須要記住一個用在Client端,一個用在Server端。下面咱們來詳細介紹一下這兩種類型的區別:
1.第一個最明顯的區別是,ServerBootstrap用於Server端,經過調用bind()方法來綁定到一個端口監聽鏈接;Bootstrap用於Client端,須要調用connect()方法來鏈接服務器端,但咱們也能夠經過調用bind()方法返回的ChannelFuture中獲取Channel去connect服務器端。
2.客戶端的Bootstrap通常用一個EventLoopGroup,而服務器端的ServerBootstrap會用到兩個(這兩個也能夠是同一個實例)。爲什麼服務器端要用到兩個EventLoopGroup呢?這麼設計有明顯的好處,若是一個ServerBootstrap有兩個EventLoopGroup,那麼就能夠把第一個EventLoopGroup用來專門負責綁定到端口監聽鏈接事件,而把第二個EventLoopGroup用來處理每一個接收到的鏈接,下面咱們用一幅圖來展示一下這種模式:
PS: 若是僅由一個EventLoopGroup處理全部請求和鏈接的話,在併發量很大的狀況下,這個EventLoopGroup有可能會忙於處理已經接收到的鏈接而不能及時處理新的鏈接請求,用兩個的話,會有專門的線程來處理鏈接請求,不會致使請求超時的狀況,大大提升了併發處理能力。
咱們知道一個Channel須要由一個EventLoop來綁定,並且二者一旦綁定就不會再改變。通常狀況下一個EventLoopGroup中的EventLoop數量會少於Channel數量,那麼就頗有可能出現一個多個Channel公用一個EventLoop的狀況,這就意味着若是一個Channel中的EventLoop很忙的話,會影響到這個Eventloop對其它Channel的處理,這也就是爲何咱們不能阻塞EventLoop的緣由。
固然,咱們的Server也能夠只用一個EventLoopGroup,由一個實例來處理鏈接請求和IO事件,請看下面這幅圖:
4、咱們看看Netty是如何處理數據的?-- Netty核心ChannelHandler
下面咱們來看一下netty中是怎樣處理數據的,回想一下咱們前面講到的Handler,對了,就是它。說到Handler咱們就不得不提ChannelPipeline,ChannelPipeline負責安排Handler的順序及其執行,下面咱們就來詳細介紹一下他們:
ChannelPipeline and handlers
咱們的應用程序中用到的最多的應該就是ChannelHandler,咱們能夠這麼想象,數據在一個ChannelPipeline中流動,而ChannelHandler即是其中的一個個的小閥門,這些數據都會通過每個ChannelHandler而且被它處理。這裏有一個公共接口ChannelHandler:
從上圖中咱們能夠看到,ChannelHandler有兩個子類ChannelInboundHandler和ChannelOutboundHandler,這兩個類對應了兩個數據流向,若是數據是從外部流入咱們的應用程序,咱們就看作是inbound,相反即是outbound。其實ChannelHandler和Servlet有些相似,一個ChannelHandler處理完接收到的數據會傳給下一個Handler,或者什麼不處理,直接傳遞給下一個。下面咱們看一下ChannelPipeline是如何安排ChannelHandler的:
從上圖中咱們能夠看到,一個ChannelPipeline能夠把兩種Handler(ChannelInboundHandler和ChannelOutboundHandler)混合在一塊兒,當一個數據流進入ChannelPipeline時,它會從ChannelPipeline頭部開始傳給第一個ChannelInboundHandler,當第一個處理完後再傳給下一個,一直傳遞到管道的尾部。與之相對應的是,當數據被寫出時,它會從管道的尾部開始,先通過管道尾部的「最後」一個ChannelOutboundHandler,當它處理完成後會傳遞給前一個ChannelOutboundHandler。
數據在各個Handler之間傳遞,這須要調用方法中傳遞的ChanneHandlerContext來操做, 在netty的API中提供了兩個基類分ChannelOutboundHandlerAdapter和ChannelOutboundHandlerAdapter,他們僅僅實現了調用ChanneHandlerContext來把消息傳遞給下一個Handler,由於咱們只關心處理數據,所以咱們的程序中能夠繼承這兩個基類來幫助咱們作這些,而咱們僅需實現處理數據的部分便可。
咱們知道InboundHandler和OutboundHandler在ChannelPipeline中是混合在一塊兒的,那麼它們如何區分彼此呢?其實很容易,由於它們各自實現的是不一樣的接口,對於inbound event,Netty會自動跳過OutboundHandler,相反如果outbound event,ChannelInboundHandler會被忽略掉。
當一個ChannelHandler被加入到ChannelPipeline中時,它便會得到一個ChannelHandlerContext的引用,而ChannelHandlerContext能夠用來讀寫Netty中的數據流。所以,如今能夠有兩種方式來發送數據,一種是把數據直接寫入Channel,一種是把數據寫入ChannelHandlerContext,它們的區別是寫入Channel的話,數據流會從Channel的頭開始傳遞,而若是寫入ChannelHandlerContext的話,數據流會流入管道中的下一個Handler。
5、咱們最關心的部分,如何處理咱們的業務邏輯? -- Encoders, Decoders and Domain Logic
Netty中會有不少Handler,具體是哪一種Handler還要看它們繼承的是InboundAdapter仍是OutboundAdapter。固然,Netty中還提供了一些列的Adapter來幫助咱們簡化開發,咱們知道在Channelpipeline中每個Handler都負責把Event傳遞給下一個Handler,若是有了這些輔助Adapter,這些額外的工做均可自動完成,咱們只需覆蓋實現咱們真正關心的部分便可。此外,還有一些Adapter會提供一些額外的功能,好比編碼和解碼。那麼下面咱們就來看一下其中的三種經常使用的ChannelHandler:
Encoders和Decoders
由於咱們在網絡傳輸時只能傳輸字節流,所以,才發送數據以前,咱們必須把咱們的message型轉換爲bytes,與之對應,咱們在接收數據後,必須把接收到的bytes再轉換成message。咱們把bytes to message這個過程稱做Decode(解碼成咱們能夠理解的),把message to bytes這個過程成爲Encode。
Netty中提供了不少現成的編碼/解碼器,咱們通常從他們的名字中即可知道他們的用途,如ByteToMessageDecoder、MessageToByteEncoder,如專門用來處理Google Protobuf協議的ProtobufEncoder、 ProtobufDecoder。
咱們前面說過,具體是哪一種Handler就要看它們繼承的是InboundAdapter仍是OutboundAdapter,對於Decoders,很容易即可以知道它是繼承自ChannelInboundHandlerAdapter或 ChannelInboundHandler,由於解碼的意思是把ChannelPipeline傳入的bytes解碼成咱們能夠理解的message(即Java Object),而ChannelInboundHandler正是處理Inbound Event,而Inbound Event中傳入的正是字節流。Decoder會覆蓋其中的「ChannelRead()」方法,在這個方法中來調用具體的decode方法解碼傳遞過來的字節流,而後經過調用ChannelHandlerContext.fireChannelRead(decodedMessage)方法把編碼好的Message傳遞給下一個Handler。與之相似,Encoder就沒必要多少了。
Domain Logic
其實咱們最最關心的事情就是如何處理接收到的解碼後的數據,咱們真正的業務邏輯即是處理接收到的數據。Netty提供了一個最經常使用的基類SimpleChannelInboundHandler<T>,其中T就是這個Handler處理的數據的類型(上一個Handler已經替咱們解碼好了),消息到達這個Handler時,Netty會自動調用這個Handler中的channelRead0(ChannelHandlerContext,T)方法,T是傳遞過來的數據對象,在這個方法中咱們即可以任意寫咱們的業務邏輯了。
Netty源碼剖析
Netty從某方面來講就是一套NIO框架,在Java NIO基礎上作了封裝,因此要想學好Netty我建議先理解好Java NIO,
NIO能夠稱爲New IO也能夠稱爲Non-blocking IO,它比Java舊的阻塞IO在性能上要高效許多(若是讓每個鏈接中的IO操做都單首創建一個線程,那麼阻塞IO並不會比NIO在性能上落後,但不可能建立無限多的線程,在鏈接數很是多的狀況下會很糟糕)。
ByteBuffer:NIO的數據傳輸是基於緩衝區的,ByteBuffer正是NIO數據傳輸中所使用的緩衝區抽象。ByteBuffer支持在堆外分配內存,而且嘗試避免在執行I/O操做中的多餘複製。通常的I/O操做都須要進行系統調用,這樣會先切換到內核態,內核態要先從文件讀取數據到它的緩衝區,只有等數據準備完畢後,纔會從內核態把數據寫到用戶態,所謂的阻塞IO其實就是說的在等待數據準備好的這段時間內進行阻塞。若是想要避免這個額外的內核操做,能夠經過使用mmap(虛擬內存映射)的方式來讓用戶態直接操做文件。
Channel:它相似於文件描述符,簡單地來講它表明了一個實體(如一個硬件設備、文件、Socket或者一個可以執行一個或多個不一樣的I/O操做的程序組件)。你能夠從一個Channel中讀取數據到緩衝區,也能夠將一個緩衝區中的數據寫入到Channel。
Selector:選擇器是NIO實現的關鍵,NIO採用的是I/O多路複用的方式來實現非阻塞,Selector經過在一個線程中監聽每一個Channel的IO事件來肯定有哪些已經準備好進行IO操做的Channel,所以能夠在任什麼時候間檢查任意的讀操做或寫操做的完成狀態。這種方式避免了等待IO操做準備數據時的阻塞,使用較少的線程即可以處理許多鏈接,減小了線程切換與維護的開銷。
瞭解了NIO的實現思想以後,我以爲還頗有必要了解一下Unix中的I/O模型,Unix中擁有如下5種I/O模型:
阻塞I/O(Blocking I/O)
非阻塞I/O(Non-blocking I/O)
I/O多路複用(I/O multiplexing (select and poll))
信號驅動I/O(signal driven I/O (SIGIO))
異步I/O(asynchronous I/O (the POSIX aio_functions))
阻塞I/O模型是最多見的I/O模型,一般咱們使用的InputStream/OutputStream都是基於阻塞I/O模型。在上圖中,咱們使用UDP做爲例子,recvfrom()函數是UDP協議用於接收數據的函數,它須要使用系統調用並一直阻塞到內核將數據準備好,以後再由內核緩衝區複製數據到用戶態(便是recvfrom()接收到數據),所謂阻塞就是在等待內核準備數據的這段時間內什麼也不幹。
舉個生活中的例子,阻塞I/O就像是你去餐廳吃飯,在等待飯作好的時間段中,你只能在餐廳中坐着乾等(若是你在玩手機那麼這就是非阻塞I/O了)。
在非阻塞I/O模型中,內核在數據還沒有準備好的狀況下回返回一個錯誤碼EWOULDBLOCK
,而recvfrom並無在失敗的狀況下選擇阻塞休眠,而是不斷地向內核詢問是否已經準備完畢,在上圖中,前三次內核都返回了EWOULDBLOCK
,直到第四次詢問時,內核數據準備完畢,而後開始將內核中緩存的數據複製到用戶態。這種不斷詢問內核以查看某種狀態是否完成的方式被稱爲polling(輪詢)
。
非阻塞I/O就像是你在點外賣,只不過你很是心急,每隔一段時間就要打電話問外賣小哥有沒有到。
I/O多路複用的思想跟非阻塞I/O是同樣的,只不過在非阻塞I/O中,是在recvfrom的用戶態(或一個線程)中去輪詢內核,這種方式會消耗大量的CPU時間。而I/O多路複用則是經過select()或poll()系統調用來負責進行輪詢,以實現監聽I/O讀寫事件的狀態。如上圖中,select監聽到一個datagram可讀時,就交由recvfrom去發送系統調用將內核中的數據複製到用戶態。
這種方式的優勢很明顯,經過I/O多路複用能夠監聽多個文件描述符,且在內核中完成監控的任務。但缺點是至少須要兩個系統調用(select()與recvfrom())。
I/O多路複用一樣適用於點外賣這個例子,只不過你在等外賣的期間徹底能夠作本身的事情,當外賣到的時候會經過外賣APP或者由外賣小哥打電話來通知你。
Unix中提供了兩種I/O多路複用函數,select()和poll()。select()的兼容性更好,但它在單個進程中所能監控的文件描述符是有限的,這個值與FD_SETSIZE
相關,32位系統中默認爲1024,64位系統中爲2048。select()還有一個缺點就是他輪詢的方式,它採起了線性掃描的輪詢方式,每次都要遍歷FD_SETSIZE個文件描述符,無論它們是否活不活躍的。poll()本質上與select()的實現沒有區別,不過在數據結構上區別很大,用戶必須分配一個pollfd結構數組,該數組維護在內核態中,正因如此,poll()並不像select()那樣擁有大小上限的限制,但缺點一樣也很明顯,大量的fd數組會在用戶態與內核態之間不斷複製,無論這樣的複製是否有意義。
還有一種比select()與poll()更加高效的實現叫作epoll(),它是由Linux內核2.6推出的可伸縮的I/O多路複用實現,目的是爲了替代select()與poll()。epoll()一樣沒有文件描述符上限的限制,它使用一個文件描述符來管理多個文件描述符,並使用一個紅黑樹來做爲存儲結構。同時它還支持邊緣觸發(edge-triggered)與水平觸發(level-triggered)兩種模式(poll()只支持水平觸發),在邊緣觸發模式下,epoll_wait
僅會在新的事件對象首次被加入到epoll時返回,而在水平觸發模式下,epoll_wait
會在事件狀態未變動前不斷地觸發。也就是說,邊緣觸發模式只會在文件描述符變爲就緒狀態時通知一次,水平觸發模式會不斷地通知該文件描述符直到被處理。
關於epoll_wait
請參考以下epoll API。
12345678910 | // 建立一個epoll對象並返回它的文件描述符。// 參數flags容許修改epoll的行爲,它只有一個有效值EPOLL_CLOEXEC。int epoll_create1(int flags);// 配置對象,該對象負責描述監控哪些文件描述符和哪些事件。int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);// 等待與epoll_ctl註冊的任何事件,直至事件發生一次或超時。// 返回在events中發生的事件,最多同時返回maxevents個。int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); |
---|
epoll另外一亮點是採用了事件驅動的方式而不是輪詢,在epoll_ctl
中註冊的文件描述符在事件觸發的時候會經過一個回調機制來激活該文件描述符,epoll_wait
即可以收到通知。這樣效率就不會與文件描述符的數量成正比。
在Java NIO2(從JDK1.7開始引入)中,只要Linux內核版本在2.6以上,就會採用epoll,以下源碼所示(DefaultSelectorProvider.java)。
| 123456789101112131415161718192021222324252627 | public static SelectorProvider create() {String osname = AccessController.doPrivileged(new GetPropertyAction("os.name"));if ("SunOS".equals(osname)) {return new sun.nio.ch.DevPollSelectorProvider();}// use EPollSelectorProvider for Linux kernels >= 2.6if ("Linux".equals(osname)) {String osversion = AccessController.doPrivileged(new GetPropertyAction("os.version"));String[] vers = osversion.split("\.", 0);if (vers.length >= 2) {try {int major = Integer.parseInt(vers[0]);int minor = Integer.parseInt(vers[1]);if (major > 2 || (major == 2 && minor >= 6)) {return new sun.nio.ch.EPollSelectorProvider();}} catch (NumberFormatException x) {// format not recognized}}}return new sun.nio.ch.PollSelectorProvider();} | | --- | --- |
信號驅動I/O模型使用到了信號,內核在數據準備就緒時會經過信號來進行通知。咱們首先開啓了一個信號驅動I/O套接字,並使用sigaction系統調用來安裝信號處理程序,內核直接返回,不會阻塞用戶態。當datagram準備好時,內核會發送SIGIO信號,recvfrom接收到信號後會發送系統調用開始進行I/O操做。
這種模型的優勢是主進程(線程)不會被阻塞,當數據準備就緒時,經過信號處理程序來通知主進程(線程)準備進行I/O操做與對數據的處理。
咱們以前討論的各類I/O模型不管是阻塞仍是非阻塞,它們所說的阻塞都是指的數據準備階段。異步I/O模型一樣依賴於信號處理程序來進行通知,但與以上I/O模型都不相同的是,異步I/O模型通知的是I/O操做已經完成,而不是數據準備完成。
能夠說異步I/O模型纔是真正的非阻塞,主進程只管作本身的事情,而後在I/O操做完成時調用回調函數來完成一些對數據的處理操做便可。
閒扯了這麼多,想必你們已經對I/O模型有了一個深入的認識。以後,咱們將會結合部分源碼(Netty4.X)來探討Netty中的各大核心組件,以及如何使用Netty,你會發現實現一個Netty程序是多麼簡單(並且還伴隨了高性能與可維護性)。
本文做者爲SylvanasSun(sylvanas.sun@gmail.com),首發於SylvanasSun’s Blog。 原文連接:https://sylvanassun.github.io/2017/11/30/2017-11-30-netty_introduction/ (轉載請務必保留本段聲明,而且保留超連接。)
網絡傳輸的基本單位是字節,在Java NIO中提供了ByteBuffer做爲字節緩衝區容器,但該類的API使用起來不太方便,因此Netty實現了ByteBuf做爲其替代品,下面是使用ByteBuf的優勢:
相比ByteBuffer使用起來更加簡單。
經過內置的複合緩衝區類型實現了透明的zero-copy。
容量能夠按需增加。
讀和寫使用了不一樣的索引指針。
支持鏈式調用。
支持引用計數與池化。
能夠被用戶自定義的緩衝區類型擴展。
在討論ByteBuf以前,咱們先須要瞭解一下ByteBuffer的實現,這樣才能比較深入地明白它們之間的區別。
ByteBuffer繼承於abstract class Buffer
(因此還有LongBuffer、IntBuffer等其餘類型的實現),本質上它只是一個有限的線性的元素序列,包含了三個重要的屬性。
Capacity:緩衝區中元素的容量大小,你只能將capacity個數量的元素寫入緩衝區,一旦緩衝區已滿就須要清理緩衝區才能繼續寫數據。
Position:指向下一個寫入數據位置的索引指針,初始位置爲0,最大爲capacity-1。當寫模式轉換爲讀模式時,position須要被重置爲0。
Limit:在寫模式中,limit是能夠寫入緩衝區的最大索引,也就是說它在寫模式中等價於緩衝區的容量。在讀模式中,limit表示能夠讀取數據的最大索引。
因爲Buffer中只維護了position一個索引指針,因此它在讀寫模式之間的切換須要調用一個flip()方法來重置指針。使用Buffer的流程通常以下:
寫入數據到緩衝區。
調用flip()方法。
從緩衝區中讀取數據
調用buffer.clear()或者buffer.compact()清理緩衝區,以便下次寫入數據。
12345678910111213141516171819 | RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");FileChannel inChannel = aFile.getChannel();// 分配一個48字節大小的緩衝區ByteBuffer buf = ByteBuffer.allocate(48);int bytesRead = inChannel.read(buf); // 讀取數據到緩衝區while (bytesRead != -1) {buf.flip(); // 將position重置爲0while(buf.hasRemaining()){System.out.print((char) buf.get()); // 讀取數據並輸出到控制檯}buf.clear(); // 清理緩衝區bytesRead = inChannel.read(buf);}aFile.close(); |
---|
Buffer中核心方法的實現也很是簡單,主要就是在操做指針position。
12345678910111213141516171819202122232425262728293031323334353637383940 | /*** Sets this buffer's mark at its position.** @return This buffer*/public final Buffer mark() {mark = position; // mark屬性是用來標記當前索引位置的return this;}// 將當前索引位置重置爲mark所標記的位置public final Buffer reset() {int m = mark;if (m < 0)throw new InvalidMarkException();position = m;return this;}// 翻轉這個Buffer,將limit設置爲當前索引位置,而後再把position重置爲0public final Buffer flip() {limit = position;position = 0;mark = -1;return this;}// 清理緩衝區// 說是清理,也只是把postion與limit進行重置,以後再寫入數據就會覆蓋以前的數據了public final Buffer clear() {position = 0;limit = capacity;mark = -1;return this;}// 返回剩餘空間public final int remaining() {return limit - position;} |
---|
Java NIO中的Buffer API操做的麻煩之處就在於讀寫轉換須要手動重置指針。而ByteBuf沒有這種繁瑣性,它維護了兩個不一樣的索引,一個用於讀取,一個用於寫入。當你從ByteBuf讀取數據時,它的readerIndex將會被遞增已經被讀取的字節數,一樣的,當你寫入數據時,writerIndex則會遞增。readerIndex的最大範圍在writerIndex的所在位置,若是試圖移動readerIndex超過該值則會觸發異常。
ByteBuf中名稱以read或write開頭的方法將會遞增它們其對應的索引,而名稱以get或set開頭的方法則不會。ByteBuf一樣能夠指定一個最大容量,試圖移動writerIndex超過該值則會觸發異常。
1234567891011121314151617181920212223242526272829303132333435363738394041424344 | public byte readByte() {this.checkReadableBytes0(1); // 檢查readerIndex是否已越界int i = this.readerIndex;byte b = this._getByte(i);this.readerIndex = i + 1; // 遞增readerIndexreturn b;}private void checkReadableBytes0(int minimumReadableBytes) {this.ensureAccessible();if(this.readerIndex > this.writerIndex - minimumReadableBytes) {throw new IndexOutOfBoundsException(String.format("readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s", new Object[]{Integer.valueOf(this.readerIndex), Integer.valueOf(minimumReadableBytes), Integer.valueOf(this.writerIndex), this}));}}public ByteBuf writeByte(int value) {this.ensureAccessible();this.ensureWritable0(1); // 檢查writerIndex是否會越過capacitythis._setByte(this.writerIndex++, value);return this;}private void ensureWritable0(int minWritableBytes) {if(minWritableBytes > this.writableBytes()) {if(minWritableBytes > this.maxCapacity - this.writerIndex) {throw new IndexOutOfBoundsException(String.format("writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", new Object[]{Integer.valueOf(this.writerIndex), Integer.valueOf(minWritableBytes), Integer.valueOf(this.maxCapacity), this}));} else {int newCapacity = this.alloc().calculateNewCapacity(this.writerIndex + minWritableBytes, this.maxCapacity);this.capacity(newCapacity);}}}// get與set只對傳入的索引進行了檢查,而後對其位置進行get或setpublic byte getByte(int index) {this.checkIndex(index);return this._getByte(index);}public ByteBuf setByte(int index, int value) {this.checkIndex(index);this._setByte(index, value);return this;} |
---|
ByteBuf一樣支持在堆內和堆外進行分配。在堆內分配也被稱爲支撐數組模式,它能在沒有使用池化的狀況下提供快速的分配和釋放。
12345678 | ByteBuf heapBuf = Unpooled.copiedBuffer(bytes);if (heapBuf.hasArray()) { // 判斷是否有一個支撐數組byte[] array = heapBuf.array();// 計算第一個字節的偏移量int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();int length = heapBuf.readableBytes(); // 得到可讀字節handleArray(array,offset,length); // 調用你的處理方法} |
---|
另外一種模式爲堆外分配,Java NIO ByteBuffer類在JDK1.4時就已經容許JVM實現經過JNI調用來在堆外分配內存(調用malloc()函數在JVM堆外分配內存),這主要是爲了不額外的緩衝區複製操做。
12345678 | ByteBuf directBuf = Unpooled.directBuffer(capacity);if (!directBuf.hasArray()) {int length = directBuf.readableBytes();byte[] array = new byte[length];// 將字節複製到數組中directBuf.getBytes(directBuf.readerIndex(),array);handleArray(array,0,length);} |
---|
ByteBuf還支持第三種模式,它被稱爲複合緩衝區,爲多個ByteBuf提供了一個聚合視圖。在這個視圖中,你能夠根據須要添加或者刪除ByteBuf實例,ByteBuf的子類CompositeByteBuf實現了該模式。
一個適合使用複合緩衝區的場景是HTTP協議,經過HTTP協議傳輸的消息都會被分紅兩部分——頭部和主體,若是這兩部分由應用程序的不一樣模塊產生,將在消息發送時進行組裝,而且該應用程序還會爲多個消息複用相同的消息主體,這樣對於每一個消息都將會建立一個新的頭部,產生了不少沒必要要的內存操做。使用CompositeByteBuf是一個很好的選擇,它消除了這些額外的複製,以幫助你複用這些消息。
1234567 | CompositeByteBuf messageBuf = Unpooled.compositeBuffer();ByteBuf headerBuf = ....;ByteBuf bodyBuf = ....;messageBuf.addComponents(headerBuf,bodyBuf);for (ByteBuf buf : messageBuf) {System.out.println(buf.toString());} |
---|
CompositeByteBuf透明的實現了zero-copy,zero-copy其實就是避免數據在兩個內存區域中來回的複製。從操做系統層面上來說,zero-copy指的是避免在內核態與用戶態之間的數據緩衝區複製(經過mmap避免),而Netty中的zero-copy更偏向於在用戶態中的數據操做的優化,就像使用CompositeByteBuf來複用多個ByteBuf以免額外的複製,也能夠使用wrap()方法來將一個字節數組包裝成ByteBuf,又或者使用ByteBuf的slice()方法把它分割爲多個共享同一內存區域的ByteBuf,這些都是爲了優化內存的使用率。
那麼如何建立ByteBuf呢?在上面的代碼中使用到了Unpooled,它是Netty提供的一個用於建立與分配ByteBuf的工具類,建議都使用這個工具類來建立你的緩衝區,不要本身去調用構造函數。常用的是wrappedBuffer()與copiedBuffer(),它們一個是用於將一個字節數組或ByteBuffer包裝爲一個ByteBuf,一個是根據傳入的字節數組與ByteBuffer/ByteBuf來複製出一個新的ByteBuf。
12345678910111213141516 | // 經過array.clone()來複制一個數組進行包裝public static ByteBuf copiedBuffer(byte[] array) {return array.length == 0?EMPTY_BUFFER:wrappedBuffer((byte[])array.clone());}// 默認是堆內分配public static ByteBuf wrappedBuffer(byte[] array) {return (ByteBuf)(array.length == 0?EMPTY_BUFFER:new UnpooledHeapByteBuf(ALLOC, array, array.length));}// 也提供了堆外分配的方法private static final ByteBufAllocator ALLOC;public static ByteBuf directBuffer(int initialCapacity) {return ALLOC.directBuffer(initialCapacity);} |
---|
相對底層的分配方法是使用ByteBufAllocator,Netty實現了PooledByteBufAllocator和UnpooledByteBufAllocator,前者使用了jemalloc(一種malloc()的實現)來分配內存,而且實現了對ByteBuf的池化以提升性能。後者分配的是未池化的ByteBuf,其分配方式與以前講的一致。
1234 | Channel channel = ...;ByteBufAllocator allocator = channel.alloc();ByteBuf buffer = allocator.directBuffer();do something....... |
---|
爲了優化內存使用率,Netty提供了一套手動的方式來追蹤不活躍對象,像UnpooledHeapByteBuf這種分配在堆內的對象得益於JVM的GC管理,無需額外操心,而UnpooledDirectByteBuf是在堆外分配的,它的內部基於DirectByteBuffer,DirectByteBuffer會先向Bits類申請一個額度(Bits還擁有一個全局變量totalCapacity,記錄了全部DirectByteBuffer總大小),每次申請前都會查看是否已經超過-XX:MaxDirectMemorySize所設置的上限,若是超限就會嘗試調用Sytem.gc(),以試圖回收一部份內存,而後休眠100毫秒,若是內存仍是不足,則只能拋出OOM異常。堆外內存的回收雖然有了這麼一層保障,但爲了提升性能與使用率,主動回收也是頗有必要的。因爲Netty還實現了ByteBuf的池化,像PooledHeapByteBuf和PooledDirectByteBuf就必須依賴於手動的方式來進行回收(放回池中)。
Netty使用了引用計數器的方式來追蹤那些不活躍的對象。引用計數的接口爲ReferenceCounted,它的思想很簡單,只要ByteBuf對象的引用計數大於0,就保證該對象不會被釋放回收,能夠經過手動調用release()與retain()方法來操做該對象的引用計數值遞減或遞增。用戶也能夠經過自定義一個ReferenceCounted的實現類,以知足自定義的規則。
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 | package io.netty.buffer;public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {// 因爲ByteBuf的實例對象會很是多,因此這裏沒有將refCnt包裝爲AtomicInteger// 而是使用一個全局的AtomicIntegerFieldUpdater來負責操做refCntprivate static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");// 每一個ByteBuf的初始引用值都爲1private volatile int refCnt = 1;public int refCnt() {return this.refCnt;}protected final void setRefCnt(int refCnt) {this.refCnt = refCnt;}public ByteBuf retain() {return this.retain0(1);}// 引用計數值遞增increment,increment必須大於0public ByteBuf retain(int increment) {return this.retain0(ObjectUtil.checkPositive(increment, "increment"));}public static int checkPositive(int i, String name) {if(i <= 0) {throw new IllegalArgumentException(name + ": " + i + " (expected: > 0)");} else {return i;}}// 使用CAS操做不斷嘗試更新值private ByteBuf retain0(int increment) {int refCnt;int nextCnt;do {refCnt = this.refCnt;nextCnt = refCnt + increment;if(nextCnt <= increment) {throw new IllegalReferenceCountException(refCnt, increment);}} while(!refCntUpdater.compareAndSet(this, refCnt, nextCnt));return this;}public boolean release() {return this.release0(1);}public boolean release(int decrement) {return this.release0(ObjectUtil.checkPositive(decrement, "decrement"));}private boolean release0(int decrement) {int refCnt;do {refCnt = this.refCnt;if(refCnt < decrement) {throw new IllegalReferenceCountException(refCnt, -decrement);}} while(!refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement));if(refCnt == decrement) {this.deallocate();return true;} else {return false;}}protected abstract void deallocate();} |
---|
Netty中的Channel與Java NIO的概念同樣,都是對一個實體或鏈接的抽象,但Netty提供了一套更加通用的API。就以網絡套接字爲例,在Java中OIO與NIO是大相徑庭的兩套API,假設你以前使用的是OIO而又想更改成NIO實現,那麼幾乎須要重寫全部代碼。而在Netty中,只須要更改短短几行代碼(更改Channel與EventLoop的實現類,如把OioServerSocketChannel替換爲NioServerSocketChannel),就能夠完成OIO與NIO(或其餘)之間的轉換。
每一個Channel最終都會被分配一個ChannelPipeline和ChannelConfig,前者持有全部負責處理入站與出站數據以及事件的ChannelHandler,後者包含了該Channel的全部配置設置,而且支持熱更新,因爲不一樣的傳輸類型可能具備其特別的配置,因此該類可能會實現爲ChannelConfig的不一樣子類。
Channel是線程安全的(與以後要講的線程模型有關),所以你徹底能夠在多個線程中複用同一個Channel,就像以下代碼所示。
12345678910111213 | final Channel channel = ...final ByteBuf buffer = Unpooled.copiedBuffer("Hello,World!", CharsetUtil.UTF_8).retain();Runnable writer = new Runnable() {@Overridepublic void run() {channel.writeAndFlush(buffer.duplicate());}};Executor executor = Executors.newCachedThreadPool();executor.execute(writer);executor.execute(writer);....... |
---|
Netty除了支持常見的NIO與OIO,還內置了其餘的傳輸類型。
Nmae | Package | Description |
---|---|---|
NIO | io.netty.channel.socket.nio | 以Java NIO爲基礎實現 |
OIO | io.netty.channel.socket.oio | 以java.net爲基礎實現,使用阻塞I/O模型 |
Epoll | io.netty.channel.epoll | 由JNI驅動epoll()實現的更高性能的非阻塞I/O,它只能使用在Linux |
Local | io.netty.channel.local | 本地傳輸,在JVM內部經過管道進行通訊 |
Embedded | io.netty.channel.embedded | 容許在不須要真實網絡傳輸的環境下使用ChannelHandler,主要用於對ChannelHandler進行測試 |
NIO、OIO、Epoll咱們應該已經很熟悉了,下面主要說說Local與Embedded。
Local傳輸用於在同一個JVM中運行的客戶端和服務器程序之間的異步通訊,與服務器Channel相關聯的SocketAddress並無綁定真正的物理網絡地址,它會被存儲在註冊表中,並在Channel關閉時註銷。所以Local傳輸不會接受真正的網絡流量,也就是說它不能與其餘傳輸實現進行互操做。
Embedded傳輸主要用於對ChannelHandler進行單元測試,ChannelHandler是用於處理消息的邏輯組件,Netty經過將入站消息與出站消息都寫入到EmbeddedChannel中的方式(提供了write/readInbound()與write/readOutbound()來讀寫入站與出站消息)來實現對ChannelHandler的單元測試。
ChannelHandler充當了處理入站和出站數據的應用程序邏輯的容器,該類是基於事件驅動的,它會響應相關的事件而後去調用其關聯的回調函數,例如當一個新的鏈接被創建時,ChannelHandler的channelActive()方法將會被調用。
關於入站消息和出站消息的數據流向定義,若是以客戶端爲主視角來講的話,那麼從客戶端流向服務器的數據被稱爲出站,反之爲入站。
入站事件是可能被入站數據或者相關的狀態更改而觸發的事件,包括:鏈接已被激活、鏈接失活、讀取入站數據、用戶事件、發生異常等。
出站事件是將來將會觸發的某個動做的結果的事件,這些動做包括:打開或關閉遠程節點的鏈接、將數據寫(或沖刷)到套接字。
ChannelHandler的主要用途包括:
對入站與出站數據的業務邏輯處理
記錄日誌
將數據從一種格式轉換爲另外一種格式,實現編解碼器。以一次HTTP協議(或者其餘應用層協議)的流程爲例,數據在網絡傳輸時的單位爲字節,當客戶端發送請求到服務器時,服務器須要經過解碼器(處理入站消息)將字節解碼爲協議的消息內容,服務器在發送響應的時候(處理出站消息),還須要經過編碼器將消息內容編碼爲字節。
捕獲異常
提供Channel生命週期內的通知,如Channel活動時與非活動時
Netty中處處都充滿了異步與事件驅動,而回調函數正是用於響應事件以後的操做。因爲異步會直接返回一個結果,因此Netty提供了ChannelFuture(實現了java.util.concurrent.Future)來做爲異步調用返回的佔位符,真正的結果會在將來的某個時刻完成,到時候就能夠經過ChannelFuture對其進行訪問,每一個Netty的出站I/O操做都將會返回一個ChannelFuture。
Netty還提供了ChannelFutureListener接口來監聽ChannelFuture是否成功,並採起對應的操做。
12345678910111213141516 | Channel channel = ...ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1",6666));// 註冊一個監聽器future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) {if (future.isSuccess()) {// do something....} else {// 輸出錯誤信息Throwable cause = future.cause();cause.printStackTrace();// do something....}}}); |
---|
ChannelFutureListener接口中還提供了幾個簡單的默認實現,方便咱們使用。
12345678910111213141516171819202122232425262728293031 | package io.netty.channel;import io.netty.channel.ChannelFuture;import io.netty.util.concurrent.GenericFutureListener;public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> {// 在Future完成時關閉ChannelFutureListener CLOSE = new ChannelFutureListener() {public void operationComplete(ChannelFuture future) {future.channel().close();}};// 若是失敗則關閉ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {public void operationComplete(ChannelFuture future) {if(!future.isSuccess()) {future.channel().close();}}};// 將異常信息傳遞給下一個ChannelHandlerChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {public void operationComplete(ChannelFuture future) {if(!future.isSuccess()) {future.channel().pipeline().fireExceptionCaught(future.cause());}}};} |
---|
ChannelHandler接口定義了對它生命週期進行監聽的回調函數,在ChannelHandler被添加到ChannelPipeline或者被移除時都會調用這些函數。
12345678910111213141516171819 | package io.netty.channel;public interface ChannelHandler {void handlerAdded(ChannelHandlerContext var1) throws Exception;void handlerRemoved(ChannelHandlerContext var1) throws Exception;/** @deprecated */@Deprecatedvoid exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;// 該註解代表這個ChannelHandler可被其餘線程複用@Inherited@Documented@Target({ElementType.TYPE})@Retention(RetentionPolicy.RUNTIME)public @interface Sharable {}} |
---|
入站消息與出站消息由其對應的接口ChannelInboundHandler與ChannelOutboundHandler負責,這兩個接口定義了監聽Channel的生命週期的狀態改變事件的回調函數。
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 | package io.netty.channel;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;public interface ChannelInboundHandler extends ChannelHandler {// 當channel被註冊到EventLoop時被調用void channelRegistered(ChannelHandlerContext var1) throws Exception;// 當channel已經被建立,但還未註冊到EventLoop(或者從EventLoop中註銷)被調用void channelUnregistered(ChannelHandlerContext var1) throws Exception;// 當channel處於活動狀態(鏈接到遠程節點)被調用void channelActive(ChannelHandlerContext var1) throws Exception;// 當channel處於非活動狀態(沒有鏈接到遠程節點)被調用void channelInactive(ChannelHandlerContext var1) throws Exception;// 當從channel讀取數據時被調用void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;// 當channel的上一個讀操做完成時被調用void channelReadComplete(ChannelHandlerContext var1) throws Exception;// 當ChannelInboundHandler.fireUserEventTriggered()方法被調用時被調用void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;// 當channel的可寫狀態發生改變時被調用void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;// 當處理過程當中發生異常時被調用void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;}package io.netty.channel;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelPromise;import java.net.SocketAddress;public interface ChannelOutboundHandler extends ChannelHandler {// 當請求將Channel綁定到一個地址時被調用// ChannelPromise是ChannelFuture的一個子接口,定義瞭如setSuccess(),setFailure()等方法void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;// 當請求將Channel鏈接到遠程節點時被調用void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;// 當請求將Channel從遠程節點斷開時被調用void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;// 當請求關閉Channel時被調用void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;// 當請求將Channel從它的EventLoop中註銷時被調用void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;// 當請求從Channel讀取數據時被調用void read(ChannelHandlerContext var1) throws Exception;// 當請求經過Channel將數據寫到遠程節點時被調用void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;// 當請求經過Channel將緩衝中的數據沖刷到遠程節點時被調用void flush(ChannelHandlerContext var1) throws Exception;} |
---|
經過實現ChannelInboundHandler或者ChannelOutboundHandler就能夠完成用戶自定義的應用邏輯處理程序,不過Netty已經幫你實現了一些基本操做,用戶只須要繼承並擴展ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter來做爲自定義實現的起始點。
ChannelInboundHandlerAdapter與ChannelOutboundHandlerAdapter都繼承於ChannelHandlerAdapter,該抽象類簡單實現了ChannelHandler接口。
123456789101112131415161718192021222324252627282930313233343536 | public abstract class ChannelHandlerAdapter implements ChannelHandler {boolean added;public ChannelHandlerAdapter() {}// 該方法不容許將此ChannelHandler共享複用protected void ensureNotSharable() {if(this.isSharable()) {throw new IllegalStateException("ChannelHandler " + this.getClass().getName() + " is not allowed to be shared");}}// 使用反射判斷實現類有沒有@Sharable註解,以確認該類是否爲可共享複用的public boolean isSharable() {Class clazz = this.getClass();Map cache = InternalThreadLocalMap.get().handlerSharableCache();Boolean sharable = (Boolean)cache.get(clazz);if(sharable == null) {sharable = Boolean.valueOf(clazz.isAnnotationPresent(Sharable.class));cache.put(clazz, sharable);}return sharable.booleanValue();}public void handlerAdded(ChannelHandlerContext ctx) throws Exception {}public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.fireExceptionCaught(cause);}} |
---|
ChannelInboundHandlerAdapter與ChannelOutboundHandlerAdapter默認只是簡單地將請求傳遞給ChannelPipeline中的下一個ChannelHandler,源碼以下:
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 | public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {public ChannelInboundHandlerAdapter() {}public void channelRegistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelRegistered();}public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelUnregistered();}public void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();}public void channelInactive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelInactive();}public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.fireChannelRead(msg);}public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelReadComplete();}public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {ctx.fireUserEventTriggered(evt);}public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelWritabilityChanged();}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.fireExceptionCaught(cause);}}public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {public ChannelOutboundHandlerAdapter() {}public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {ctx.bind(localAddress, promise);}public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {ctx.connect(remoteAddress, localAddress, promise);}public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.disconnect(promise);}public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.close(promise);}public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {ctx.deregister(promise);}public void read(ChannelHandlerContext ctx) throws Exception {ctx.read();}public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {ctx.write(msg, promise);}public void flush(ChannelHandlerContext ctx) throws Exception {ctx.flush();}} |
---|
對於處理入站消息,另一種選擇是繼承SimpleChannelInboundHandler,它是Netty的一個繼承於ChannelInboundHandlerAdapter的抽象類,並在其之上實現了自動釋放資源的功能。
咱們在瞭解ByteBuf時就已經知道了Netty使用了一套本身實現的引用計數算法來主動釋放資源,假設你的ChannelHandler繼承於ChannelInboundHandlerAdapter或ChannelOutboundHandlerAdapter,那麼你就有責任去管理你所分配的ByteBuf,通常來講,一個消息對象(ByteBuf)已經被消費(或丟棄)了,而且不會傳遞給ChannelHandler鏈中的下一個處理器(若是該消息到達了實際的傳輸層,那麼當它被寫入或Channel關閉時,都會被自動釋放),那麼你就須要去手動釋放它。經過一個簡單的工具類ReferenceCountUtil的release方法,就能夠作到這一點。
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 | // 這個泛型爲消息對象的類型public abstract class SimpleChannelInboundHandler<I> extends ChannelInboundHandlerAdapter {private final TypeParameterMatcher matcher;private final boolean autoRelease;protected SimpleChannelInboundHandler() {this(true);}protected SimpleChannelInboundHandler(boolean autoRelease) {this.matcher = TypeParameterMatcher.find(this, SimpleChannelInboundHandler.class, "I");this.autoRelease = autoRelease;}protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType) {this(inboundMessageType, true);}protected SimpleChannelInboundHandler(Class<? extends I> inboundMessageType, boolean autoRelease) {this.matcher = TypeParameterMatcher.get(inboundMessageType);this.autoRelease = autoRelease;}public boolean acceptInboundMessage(Object msg) throws Exception {return this.matcher.match(msg);}// SimpleChannelInboundHandler只是替你作了ReferenceCountUtil.release()public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {boolean release = true;try {if(this.acceptInboundMessage(msg)) {this.channelRead0(ctx, msg);} else {release = false;ctx.fireChannelRead(msg);}} finally {if(this.autoRelease && release) {ReferenceCountUtil.release(msg);}}}// 這個方法纔是咱們須要實現的方法protected abstract void channelRead0(ChannelHandlerContext var1, I var2) throws Exception;}// ReferenceCountUtil中的源碼,release方法對消息對象的類型進行判斷而後調用它的release()方法public static boolean release(Object msg) {return msg instanceof ReferenceCounted?((ReferenceCounted)msg).release():false;} |
---|
爲了模塊化與解耦合,不可能由一個ChannelHandler來完成全部應用邏輯,因此Netty採用了攔截器鏈的設計。ChannelPipeline就是用來管理ChannelHandler實例鏈的容器,它的職責就是保證明例鏈的流動。
每個新建立的Channel都將會被分配一個新的ChannelPipeline,這種關聯關係是永久性的,一個Channel一輩子只能對應一個ChannelPipeline。
一個入站事件被觸發時,它會先從ChannelPipeline的最左端(頭部)開始一直傳播到ChannelPipeline的最右端(尾部),而出站事件正好與入站事件順序相反(從最右端一直傳播到最左端)。這個順序是定死的,Netty老是將ChannelPipeline的入站口做爲頭部,而將出站口做爲尾部。在事件傳播的過程當中,ChannelPipeline會判斷下一個ChannelHandler的類型是否和事件的運動方向相匹配,若是不匹配,就跳過該ChannelHandler並繼續檢查下一個(保證入站事件只會被ChannelInboundHandler處理),一個ChannelHandler也能夠同時實現ChannelInboundHandler與ChannelOutboundHandler,它在入站事件與出站事件中都會被調用。
在閱讀ChannelHandler的源碼時,發現不少方法須要一個ChannelHandlerContext類型的參數,該接口是ChannelPipeline與ChannelHandler之間相關聯的關鍵。ChannelHandlerContext能夠通知ChannelPipeline中的當前ChannelHandler的下一個ChannelHandler,還能夠動態地改變當前ChannelHandler在ChannelPipeline中的位置(經過調用ChannelPipeline中的各類方法來修改)。
ChannelHandlerContext負責了在同一個ChannelPipeline中的ChannelHandler與其餘ChannelHandler之間的交互,每一個ChannelHandlerContext都對應了一個ChannelHandler。在DefaultChannelPipeline的源碼中,已經表現的很明顯了。
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 | public class DefaultChannelPipeline implements ChannelPipeline {.........// 頭部節點和尾部節點的引用變量// ChannelHandlerContext在ChannelPipeline中是以鏈表的形式組織的final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;.........// 添加一個ChannelHandler到鏈表尾部public final ChannelPipeline addLast(String name, ChannelHandler handler) {return this.addLast((EventExecutorGroup)null, name, handler);}public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized(this) {// 檢查ChannelHandler是否爲一個共享對象(@Sharable)// 若是該ChannelHandler沒有@Sharable註解,而且是已被添加過的那麼就拋出異常checkMultiplicity(handler);// 返回一個DefaultChannelHandlerContext,注意該對象持有了傳入的ChannelHandlernewCtx = this.newContext(group, this.filterName(name, handler), handler);this.addLast0(newCtx);// 若是當前ChannelPipeline沒有被註冊,那麼就先加到未決鏈表中if(!this.registered) {newCtx.setAddPending();this.callHandlerCallbackLater(newCtx, true);return this;}// 不然就調用ChannelHandler中的handlerAdded()EventExecutor executor = newCtx.executor();if(!executor.inEventLoop()) {newCtx.setAddPending();executor.execute(new Runnable() {public void run() {DefaultChannelPipeline.this.callHandlerAdded0(newCtx);}});return this;}}this.callHandlerAdded0(newCtx);return this;}// 將新的ChannelHandlerContext插入到尾部與尾部以前的節點之間private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = this.tail.prev;newCtx.prev = prev;newCtx.next = this.tail;prev.next = newCtx;this.tail.prev = newCtx;}.....} |
---|
ChannelHandlerContext還定義了許多與Channel和ChannelPipeline重合的方法(像read()、write()、connect()這些用於出站的方法或者如fireChannelXXXX()這樣用於入站的方法),不一樣之處在於調用Channel或者ChannelPipeline上的這些方法,它們將會從頭沿着整個ChannelHandler實例鏈進行傳播,而調用位於ChannelHandlerContext上的相同方法,則會從當前所關聯的ChannelHandler開始,且只會傳播給實例鏈中的下一個ChannelHandler。並且,事件之間的移動(從一個ChannelHandler到下一個ChannelHandler)也是經過ChannelHandlerContext中的方法調用完成的。
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 | public class DefaultChannelPipeline implements ChannelPipeline {public final ChannelPipeline fireChannelRead(Object msg) {// 注意這裏將頭節點傳入了進去AbstractChannelHandlerContext.invokeChannelRead(this.head, msg);return this;}}abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);EventExecutor executor = next.executor();if(executor.inEventLoop()) {next.invokeChannelRead(m);} else {executor.execute(new Runnable() {public void run() {next.invokeChannelRead(m);}});}}private void invokeChannelRead(Object msg) {if(this.invokeHandler()) {try {((ChannelInboundHandler)this.handler()).channelRead(this, msg);} catch (Throwable var3) {this.notifyHandlerException(var3);}} else {// 尋找下一個ChannelHandlerthis.fireChannelRead(msg);}}public ChannelHandlerContext fireChannelRead(Object msg) {invokeChannelRead(this.findContextInbound(), msg);return this;}private AbstractChannelHandlerContext findContextInbound() {AbstractChannelHandlerContext ctx = this;do {ctx = ctx.next;} while(!ctx.inbound); // 直到找到一個ChannelInboundHandlerreturn ctx;}} |
---|
爲了最大限度地提供高性能和可維護性,Netty設計了一套強大又易用的線程模型。在一個網絡框架中,最重要的能力是可以快速高效地處理在鏈接的生命週期內發生的各類事件,與之相匹配的程序構造被稱爲事件循環,Netty定義了接口EventLoop來負責這項工做。
若是是常常用Java進行多線程開發的童鞋想必常常會使用到線程池,也就是Executor這套API。Netty就是從Executor(java.util.concurrent)之上擴展了本身的EventExecutorGroup(io.netty.util.concurrent),同時爲了與Channel的事件進行交互,還擴展了EventLoopGroup接口(io.netty.channel)。在io.netty.util.concurrent包下的EventExecutorXXX負責實現線程併發相關的工做,而在io.netty.channel包下的EventLoopXXX負責實現網絡編程相關的工做(處理Channel中的事件)。
在Netty的線程模型中,一個EventLoop將由一個永遠不會改變的Thread驅動,而一個Channel一輩子只會使用一個EventLoop(可是一個EventLoop可能會被指派用於服務多個Channel),在Channel中的全部I/O操做和事件都由EventLoop中的線程處理,也就是說一個Channel的一輩子之中都只會使用到一個線程。不過在Netty3,只有入站事件會被EventLoop處理,全部出站事件都會由調用線程處理,這種設計致使了ChannelHandler的線程安全問題。Netty4簡化了線程模型,經過在同一個線程處理全部事件,既解決了這個問題,還提供了一個更加簡單的架構。
| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 | package io.netty.channel;public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", 2147483647));private final Queue<Runnable> tailTasks;protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());}protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());}protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);this.tailTasks = this.newTaskQueue(maxPendingTasks);}protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) {super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);this.tailTasks = this.newTaskQueue(maxPendingTasks);}// 返回它所在的EventLoopGrouppublic EventLoopGroup parent() {return (EventLoopGroup)super.parent();}public EventLoop next() {return (EventLoop)super.next();}// 註冊Channel,這裏ChannelPromise和Channel關聯到了一塊兒public ChannelFuture register(Channel channel) {return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this)));}public ChannelFuture register(ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");promise.channel().unsafe().register(this, promise);return promise;}// 剩下這些函數都是用於調度任務public final void executeAfterEventLoopIteration(Runnable task) {ObjectUtil.checkNotNull(task, "task");if(this.isShutdown()) {reject();}if(!this.tailTasks.offer(task)) {this.reject(task);}if(this.wakesUpForTask(task)) {this.wakeup(this.inEventLoop());}}final boolean removeAfterEventLoopIterationTask(Runnable task) {return this.tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));}protected boolean wakesUpForTask(Runnable task) {return !(task instanceof SingleThreadEventLoop.NonWakeupRunnable);}protected void afterRunningAllTasks() {this.runAllTasksFrom(this.tailTasks);}protected boolean hasTasks() {return super.hasTasks() || !this.tailTasks.isEmpty();}public int pendingTasks() {return super.pendingTasks() + this.tailTasks.size();}interface NonWakeupRunnable extends Runnable {}} | | --- | --- |
爲了確保一個Channel的整個生命週期中的I/O事件會被一個EventLoop負責,Netty經過inEventLoop()方法來判斷當前執行的線程的身份,肯定它是不是分配給當前Channel以及它的EventLoop的那一個線程。若是當前(調用)線程正是EventLoop中的線程,那麼所提交的任務將會被直接執行,不然,EventLoop將調度該任務以便稍後執行,並將它放入內部的任務隊列(每一個EventLoop都有它本身的任務隊列,從SingleThreadEventLoop的源碼就能發現不少用於調度內部任務隊列的方法),在下次處理它的事件時,將會執行隊列中的那些任務。這種設計可讓任何線程與Channel直接交互,而無需在ChannelHandler中進行額外的同步。
從性能上來考慮,千萬不要將一個須要長時間來運行的任務放入到任務隊列中,它會影響到該隊列中的其餘任務的執行。解決方案是使用一個專門的EventExecutor來執行它(ChannelPipeline提供了帶有EventExecutorGroup參數的addXXX()方法,該方法能夠將傳入的ChannelHandler綁定到你傳入的EventExecutor之中),這樣它就會在另外一條線程中執行,與其餘任務隔離。
12345678910111213141516171819202122232425262728293031 | public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {.....public void execute(Runnable task) {if(task == null) {throw new NullPointerException("task");} else {boolean inEventLoop = this.inEventLoop();if(inEventLoop) {this.addTask(task);} else {this.startThread();this.addTask(task);if(this.isShutdown() && this.removeTask(task)) {reject();}}if(!this.addTaskWakesUp && this.wakesUpForTask(task)) {this.wakeup(inEventLoop);}}}public boolean inEventLoop(Thread thread) {return thread == this.thread;}.....} |
---|
EventLoopGroup負責管理和分配EventLoop(建立EventLoop和爲每一個新建立的Channel分配EventLoop),根據不一樣的傳輸類型,EventLoop的建立和分配方式也不一樣。例如,使用NIO傳輸類型,EventLoopGroup就會只使用較少的EventLoop(一個EventLoop服務於多個Channel),這是由於NIO基於I/O多路複用,一個線程能夠處理多個鏈接,而若是使用的是OIO,那麼新建立一個Channel(鏈接)就須要分配一個EventLoop(線程)。
在深刻了解地Netty的核心組件以後,發現它們的設計都很模塊化,若是想要實現你本身的應用程序,就須要將這些組件組裝到一塊兒。Netty經過Bootstrap類,以對一個Netty應用程序進行配置(組裝各個組件),並最終使它運行起來。對於客戶端程序和服務器程序所使用到的Bootstrap類是不一樣的,後者須要使用ServerBootstrap,這樣設計是由於,在如TCP這樣有鏈接的協議中,服務器程序每每須要一個以上的Channel,經過父Channel來接受來自客戶端的鏈接,而後建立子Channel用於它們之間的通訊,而像UDP這樣無鏈接的協議,它不須要每一個鏈接都建立子Channel,只須要一個Channel便可。
一個比較明顯的差別就是Bootstrap與ServerBootstrap的group()方法,後者提供了一個接收2個EventLoopGroup的版本。
12345678910111213141516171819202122232425262728 | // 該方法在Bootstrap的父類AbstractBootstrap中,泛型B爲它當前子類的類型(爲了鏈式調用)public B group(EventLoopGroup group) {if(group == null) {throw new NullPointerException("group");} else if(this.group != null) {throw new IllegalStateException("group set already");} else {this.group = group;return this;}}// ServerBootstrap中的實現,它也支持只用一個EventLoopGrouppublic ServerBootstrap group(EventLoopGroup group) {return this.group(group, group);}public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if(childGroup == null) {throw new NullPointerException("childGroup");} else if(this.childGroup != null) {throw new IllegalStateException("childGroup set already");} else {this.childGroup = childGroup;return this;}} |
---|
Bootstrap其實沒有什麼能夠好說的,它就只是一個裝配工,將各個組件拼裝組合到一塊兒,而後進行一些配置,有關它的詳細API請參考Netty JavaDoc。下面咱們將經過一個經典的Echo客戶端與服務器的例子,來梳理一遍建立Netty應用的流程。
首先實現的是服務器,咱們先實現一個EchoServerInboundHandler,處理入站消息。
1234567891011121314151617181920212223242526 | public class EchoServerInboundHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf in = (ByteBuf) msg;System.out.printf("Server received: %s \n", in.toString(CharsetUtil.UTF_8));// 因爲讀事件不是一次性就能把完整消息發送過來的,這裏並無調用writeAndFlushctx.write(in); // 直接把消息寫回給客戶端(會被出站消息處理器處理,不過咱們的應用沒有實現任何出站消息處理器)}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {// 等讀事件已經完成時,沖刷以前寫數據的緩衝區// 而後添加了一個監聽器,它會在Future完成時進行關閉該Channel.ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);}// 處理異常,輸出異常信息,而後關閉Channel@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}} |
---|
服務器的應用邏輯只有這麼多,剩下就是用ServerBootstrap進行配置了。
12345678910111213141516171819202122232425262728293031323334353637383940414243444546 | public class EchoServer {private final int port;public EchoServer(int port) {this.port = port;}public void start() throws Exception {final EchoServerInboundHandler serverHandler = new EchoServerInboundHandler();EventLoopGroup group = new NioEventLoopGroup(); // 傳輸類型使用NIOtry {ServerBootstrap b = new ServerBootstrap();b.group(group) // 配置EventLoopGroup.channel(NioServerSocketChannel.class) // 配置Channel的類型.localAddress(new InetSocketAddress(port)) // 配置端口號.childHandler(new ChannelInitializer<SocketChannel>() {// 實現一個ChannelInitializer,它能夠方便地添加多個ChannelHandler@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(serverHandler);}});// i綁定地址,同步等待它完成ChannelFuture f = b.bind().sync();// 關閉這個Futuref.channel().closeFuture().sync();} finally {// 關閉應用程序,通常來講Netty應用只須要調用這個方法就夠了group.shutdownGracefully().sync();}}public static void main(String[] args) throws Exception {if (args.length != 1) {System.err.printf("Usage: %s <port> \n",EchoServer.class.getSimpleName());return;}int port = Integer.parseInt(args[0]);new EchoServer(port).start();}} |
---|
接下來實現客戶端,一樣須要先實現一個入站消息處理器。
1234567891011121314151617181920212223 | public class EchoClientInboundHandler extends SimpleChannelInboundHandler<ByteBuf> {/*** 咱們在Channel鏈接到遠程節點直接發送一條消息給服務器*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty!", CharsetUtil.UTF_8));}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {// 輸出從服務器Echo的消息System.out.printf("Client received: %s \n", byteBuf.toString(CharsetUtil.UTF_8));}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}} |
---|
而後配置客戶端。
123456789101112131415161718192021222324252627282930313233343536373839404142 | public class EchoClient {private final String host;private final int port;public EchoClient(String host, int port) {this.host = host;this.port = port;}public void start() throws Exception {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(host, port)) // 服務器的地址.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new EchoClientInboundHandler());}});ChannelFuture f = b.connect().sync(); // 鏈接到服務器f.channel().closeFuture().sync();} finally {group.shutdownGracefully().sync();}}public static void main(String[] args) throws Exception {if (args.length != 2) {System.err.printf("Usage: %s <host> <port> \n", EchoClient.class.getSimpleName());return;}String host = args[0];int port = Integer.parseInt(args[1]);new EchoClient(host, port).start();}} |
---|
實現一個Netty應用程序就是如此簡單,用戶大多數都是在編寫各類應用邏輯的ChannelHandler(或者使用Netty內置的各類實用ChannelHandler),而後只須要將它們所有添加到ChannelPipeline便可。
微信公衆號【黃小斜】做者是螞蟻金服 JAVA 工程師,目前在螞蟻財富負責後端開發工做,專一於 JAVA 後端技術棧,同時也懂點投資理財,堅持學習和寫做,用大廠程序員的視角解讀技術與互聯網,個人世界裏不僅有 coding!關注公衆號後回覆」架構師「便可領取 Java基礎、進階、項目和架構師等免費學習資料,更有數據庫、分佈式、微服務等熱門技術學習視頻,內容豐富,兼顧原理和實踐,另外也將贈送做者原創的Java學習指南、Java程序員面試指南等乾貨資源