解決方案:
一直以來,基於Akka實現的RPC通訊框架是Spark引覺得豪的主要特性,也是與Hadoop等分佈式計算框架對比過程當中一大亮點。後端
可是時代和技術都在演化,從Spark1.3.1版本開始,爲了解決大塊數據(如Shuffle)的傳輸問題,Spark引入了Netty通訊框架,到了1.6.0版本,Netty竟然完成取代了Akka,承擔Spark內部全部的RPC通訊以及數據流傳輸。
網絡IO掃盲貼
在Linux操做系統層面,網絡操做即爲IO操做,總共有:阻塞式,非阻塞式,複用模型,信號驅動和異步五種IO模型。其中數組
JAVA IO也經從來上面幾回演化,從最先的BIO(阻塞式/非阻塞IO),到1.4版本的NIO(IO複用),到1.7版本的NIO2.0/AIO(異步IO);服務器
基於早期BIO來實現高併發網絡服務器都是依賴多線程來實現,可是線程開銷較大,BIO的瓶頸明顯,NIO的出現解決了這一大難題,基於IO複用解決了IO高併發;可是NIO有也有幾個缺點:網絡
那麼Netty和JDK-NIO之間究竟是什麼關係?是JDK-NIO的封裝仍是重寫?多線程
首先是NIO的上層封裝,Netty提供了NioEventLoopGroup/NioSocketChannel/NioServerSocketChannel的組合來完成實際IO操做,繼而在此之上實現數據流Pipeline以及EventLoop線程池等功能。併發
另外它又重寫了NIO,JDK-NIO底層是基於Epoll的LT模式來實現,而Netty是基於Epoll的ET模式實現的一組IO操做EpollEventLoopGroup/EpollSocketChannel/EpollServerSocketChannel;app
Netty對兩種實現進行完美的封裝,能夠根據業務的需求來選擇不一樣的實現(Epoll的ET和LT模式真的有很大的性能差異嗎?單從Epoll的角度來看,ET確定是比LT要性能好那麼一點。框架
可是若是爲了編碼簡潔性,LT仍是首選,ET若是用戶層邏輯實現不夠優美,相比ET還會帶來更大大性能開銷;不過Netty這麼大的開源團隊,相信ET模式應該實現的不錯吧!!純屬猜想!!)。
那麼Akka又是什麼東西?異步
從Akka出現背景來講,它是基於Actor的RPC通訊系統,它的核心概念也是Message,它是基於協程的,性能無可置疑;基於scala的偏函數,易用性也沒有話說。async
可是它畢竟只是RPC通訊,沒法適用大的package/stream的數據傳輸,這也是Spark早期引入Netty的緣由。
注:
Akka is a concurrency framework built around the notion of actors and composable futures, Akka was inspired by Erlang which was built from the ground up around the Actor paradigm. It would usually be used to replace blocking locks such as synchronized, read write locks and the like with higher level asynchronous abstractions.
Akka是一個創建在Actors概念和可組合Futures之上的併發框架,,Akka設計靈感來源於Erlang,Erlang是基於Actor模型構建的。它一般被用來取代阻塞鎖如同步、讀寫鎖及相似的更高級別的異步抽象。
Netty is an asynchronous network library used to make Java NIO easier to use.
Netty是一個異步網絡庫,使JAVA NIO的功能更好用。
Notice that they both embrace asynchronous approaches, and that one could use the two together, or entirely separately.
注意:它們兩個都提供了異步方法,你可使用其中一個,或兩個都用
Where there is an overlap is that Akka has an IO abstraction too, and Akka can be used to create computing clusters that pass messages between actors on different machines. From this point of view, Akka is a higher level abstraction that could (and does) make use of Netty under the hood
Akka針對IO操做有一個抽象,這和netty是同樣的。使用Akka能夠用來建立計算集羣,Actor在不一樣的機器之間傳遞消息。從這個角度來看,Akka相對於Netty來講,是一個更高層次的抽象
那麼Netty爲何能夠取代Akka?首先無可置疑的是Akka能夠作到的,Netty也能夠作到,可是Netty能夠作到,Akka卻沒法作到,緣由是啥?
在軟件棧中,Akka相比Netty要Higher一點,它專門針對RPC作了不少事情,而Netty相比更加基礎一點,能夠爲不一樣的應用層通訊協議(RPC,FTP,HTTP等)提供支持。
在早期的Akka版本,底層的NIO通訊就是用的Netty;其次一個優雅的工程師是不會容許一個系統中容納兩套通訊框架,噁心!
最後,雖然Netty沒有Akka協程級的性能優點,可是Netty內部高效的Reactor線程模型,無鎖化的串行設計,高效的序列化,零拷貝,內存池等特性也保證了Netty不會存在性能問題。
那麼Spark是怎麼用Netty來取代Akka呢?一句話,利用偏函數的特性,基於Netty「仿造」出一個簡約版本的Actor模型!!
Spark Network Common的實現
Byte的表示
對於Network通訊,無論傳輸的是序列化後的對象仍是文件,在網絡上表現的都是字節流。在傳統IO中,字節流表示爲Stream;在NIO中,字節流表示爲ByteBuffer;在Netty中字節流表示爲ByteBuff或FileRegion;在Spark中,針對Byte也作了一層包裝,支持對Byte和文件流進行處理,即ManagedBuffer;
ManagedBuffer包含了三個函數createInputStream(),nioByteBuffer(),convertToNetty()來對Buffer進行「類型轉換」,分別獲取stream,ByteBuffer,ByteBuff或FileRegion;NioManagedBuffer/NettyManagedBuffer/FileSegmentManagedBuffer也是針對這ByteBuffer,ByteBuff或FileRegion提供了具體的實現。
更好的理解ManagedBuffer:好比Shuffle BlockManager模塊須要在內存中維護本地executor生成的shuffle-map輸出的文件引用,從而能夠提供給shuffleFetch進行遠程讀取,此時文件表示爲FileSegmentManagedBuffer,shuffleFetch遠程調用FileSegmentManagedBuffer.nioByteBuffer/createInputStream函數從文件中讀取爲Bytes,並進行後面的網絡傳輸。若是已經在內存中bytes就更好理解了,好比將一個字符數組表示爲NettyManagedBuffer。
Protocol的表示
協議是應用層通訊的基礎,它提供了應用層通訊的數據表示,以及編碼和解碼的能力。在Spark Network Common中,繼承AKKA中的定義,將協議命名爲Message,它繼承Encodable,提供了encode的能力。
<ignore_js_op>
Message根據請求響應能夠劃分爲RequestMessage和ResponseMessage兩種;對於Response,根據處理結果,能夠劃分爲Failure和Success兩種類型;根據功能的不一樣,z主要劃分爲Stream,ChunkFetch,Rpc。
Stream消息就是上面提到的ManagedBuffer中的Stream流,在Spark內部,好比SparkContext.addFile操做會在Driver中針對每個add進來的file/jar會分配惟一的StreamID(file/[]filename],jars/[filename]);worker經過該StreamID向Driver發起一個StreamRequest的請求,Driver將文件轉換爲FileSegmentManagedBuffer返回給Worker,這就是StreamMessage的用途之一;
ChunkFetch也有一個相似Stream的概念,ChunkFetch的對象是「一個內存中的Iterator[ManagedBuffer]」,即一組Buffer,每個Buffer對應一個chunkIndex,整個Iterator[ManagedBuffer]由一個StreamID標識。Client每次的ChunkFetch請求是由(streamId,chunkIndex)組成的惟一的StreamChunkId,Server端根據StreamChunkId獲取爲一個Buffer並返回給Client; 無論是Stream仍是ChunkFetch,在Server的內存中都須要管理一組由StreamID與資源之間映射,即StreamManager類,它提供了getChunk和openStream兩個接口來分別響應ChunkFetch與Stream兩種操做,而且針對Server的ChunkFetch提供一個registerStream接口來註冊一組Buffer,好比能夠將BlockManager中一組BlockID對應的Iterator[ManagedBuffer]註冊到StreamManager,從而支持遠程Block Fetch操做。
Case:對於ExternalShuffleService(一種單獨shuffle服務進程,對其餘計算節點提供本節點上面的全部shuffle map輸出),它爲遠程Executor提供了一種OpenBlocks的RPC接口,即根據請求的appid,executorid,blockid(appid+executor對應本地一組目錄,blockid拆封出)從本地磁盤中加載一組FileSegmentManagedBuffer到內存,並返回加載後的streamId返回給客戶端,從而支持後續的ChunkFetch的操做。
RPC是第三種核心的Message,和Stream/ChunkFetch的Message不一樣,每次通訊的Body是類型是肯定的,在rpcHandler能夠根據每種Body的類型進行相應的處理。 在Spark1.6.*版本中,也正式使用基於Netty的RPC框架來替代Akka。
Server的結構
Server構建在Netty之上,它提供兩種模型NIO和Epoll,能夠經過參數(spark.[module].io.mode)進行配置,最基礎的module就是shuffle,不一樣的IOMode選型,對應了Netty底層不一樣的實現,Server的Init過程當中,最重要的步驟就是根據不一樣的IOModel完成EventLoop和Pipeline的構造,以下所示:
//根據IO模型的不一樣,構造不一樣的EventLoop/ClientChannel/ServerChannel
EventLoopGroup createEventLoop(IOMode mode, int numThreads, String threadPrefix) { switch (mode) { case NIO: return new NioEventLoopGroup(numThreads, threadFactory); case EPOLL: return new EpollEventLoopGroup(numThreads, threadFactory); } } Class<? extends Channel> getClientChannelClass(IOMode mode) { switch (mode) { case NIO: return NioSocketChannel.class; case EPOLL: return EpollSocketChannel.class; } } Class<? extends ServerChannel> getServerChannelClass(IOMode mode) { switch (mode) { case NIO: return NioServerSocketChannel.class; case EPOLL: return EpollServerSocketChannel.class; } } //構造pipelet responseHandler = new TransportResponseHandler(channel); TransportClient client = new TransportClient(channel, responseHandler); requestHandler = new TransportRequestHandler(channel, client,rpcHandler); channelHandler = new TransportChannelHandler(client, responseHandler, requestHandler, conf.connectionTimeoutMs(), closeIdleConnections); channel.pipeline() .addLast("encoder", encoder) .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) .addLast("decoder", decoder) .addLast("idleStateHandler", new IdleStateHandler()) .addLast("handler", channelHandler);
其中,MessageEncoder/Decoder針對網絡包到Message的編碼和解碼,而最爲核心就TransportRequestHandler,它封裝了對全部請求/響應的處理;TransportChannelHandler內部實現也很簡單,它封裝了responseHandler和requestHandler,當從Netty中讀取一條Message之後,根據判斷路由給相應的responseHandler和requestHandler。
public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception { if (request instanceof RequestMessage) { requestHandler.handle((RequestMessage) request); } else { responseHandler.handle((ResponseMessage) request); } }
Sever提供的RPC,ChunkFecth,Stream的功能都是依賴TransportRequestHandler來實現的;從原理上來講,RPC與ChunkFecth/Stream仍是有很大不一樣的,其中RPC對於TransportRequestHandler來講是功能依賴,而ChunkFecth/Stream對於TransportRequestHandler來講只是數據依賴。
怎麼理解?即TransportRequestHandler已經提供了ChunkFecth/Stream的實現,只須要在構造的時候,向TransportRequestHandler提供一個streamManager,告訴RequestHandler從哪裏能夠讀取到Chunk或者Stream。
而RPC須要向TransportRequestHandler註冊一個rpcHandler,針對每一個RPC接口進行功能實現,同時RPC與ChunkFecth/Stream都會有同一個streamManager的依賴,所以注入到TransportRequestHandler中的streamManager也是依賴rpcHandler來實現,即rpcHandler中提供了RPC功能實現和streamManager的數據依賴。
//參考TransportRequestHandler的構造函數 public TransportRequestHandler(RpcHandler rpcHandler) { this.rpcHandler = rpcHandler;//****注入功能**** this.streamManager = rpcHandler.getStreamManager();//****注入streamManager**** } //實現ChunkFecth的功能 private void processFetchRequest(final ChunkFetchRequest req) { buf = streamManager.getChunk(req.streamId, req.chunkIndex); respond(new ChunkFetchSuccess(req.streamChunkId, buf)); } //實現Stream的功能 private void processStreamRequest(final StreamRequest req) { buf = streamManager.openStream(req.streamId); respond(new StreamResponse(req.streamId, buf.size(), buf)); } //實現RPC的功能 private void processRpcRequest(final RpcRequest req) { rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() { public void onSuccess(ByteBuffer response) { respond(new RpcResponse(req.requestId, new NioManagedBuffer(response))); } }); }
Client的結構
Server是經過監聽一個端口,注入rpcHandler和streamManager從而對外提供RPC,ChunkFecth,Stream的服務,而Client即爲一個客戶端類,經過該類,能夠將一個streamId/chunkIndex對應的ChunkFetch請求,streamId對應的Stream請求,以及一個RPC數據包對應的RPC請求發送到服務端,並監聽和處理來自服務端的響應;其中最重要的兩個類即爲TransportClient和TransportResponseHandler分別爲上述的「客戶端類」和「監聽和處理來自服務端的響應"。
那麼TransportClient和TransportResponseHandler是怎麼配合一塊兒完成Client的工做呢?
<ignore_js_op>
如上所示,由TransportClient將用戶的RPC,ChunkFecth,Stream的請求進行打包併發送到Server端,同時將用戶提供的回調函數註冊到TransportResponseHandler,在上面一節中說過,TransportResponseHandler是TransportChannelHandler的一部分,在TransportChannelHandler接收到數據包,並判斷爲響應包之後,將包數據路由到TransportResponseHandler中,在TransportResponseHandler中經過註冊的回調函數,將響應包的數據返回給客戶端
//以TransportResponseHandler中處理ChunkFetchSuccess響應包的處理邏輯 public void handle(ResponseMessage message) throws Exception { String remoteAddress = NettyUtils.getRemoteAddress(channel); if (message instanceof ChunkFetchSuccess) { resp = (ChunkFetchSuccess) message; listener = outstandingFetches.get(resp.streamChunkId); if (listener == null) { //沒有監聽的回調函數 } else { outstandingFetches.remove(resp.streamChunkId); //回調函數,並把resp.body()對應的chunk數據返回給listener listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body()); resp.body().release(); } } } //ChunkFetchFailure/RpcResponse/RpcFailure/StreamResponse/StreamFailure處理的方法是一致的
Spark Network的功能應用--BlockTransfer&&Shuffle
不管是BlockTransfer仍是ShuffleFetch都須要跨executor的數據傳輸,在每個executor裏面都須要運行一個Server線程(後面也會分析到,對於Shuffle也多是一個獨立的ShuffleServer進程存在)來提供對Block數據的遠程讀寫服務。
在每一個Executor裏面,都有一個BlockManager模塊,它提供了對當前Executor全部的Block的「本地管理」,並對進程內其餘模塊暴露getBlockData(blockId: BlockId): ManagedBuffer的Block讀取接口,可是這裏GetBlockData僅僅是提供本地的管理功能,對於跨遠程的Block傳輸,則由NettyBlockTransferService提供服務。
NettyBlockTransferService自己便是Server,爲其餘其餘遠程Executor提供Block的讀取功能,同時它即爲Client,爲本地其餘模塊暴露fetchBlocks的接口,支持經過host/port拉取任何Executor上的一組的Blocks。
NettyBlockTransferService做爲一個Server
NettyBlockTransferService做爲一個Server,與Executor或Driver裏面其餘的服務同樣,在進程啓動時,由SparkEnv初始化構造並啓動服務,在整個運行時的一部分。
val blockTransferService = new NettyBlockTransferService(conf, securityManager, hostname, numUsableCores) val envInstance = new SparkEnv(executorId,rpcEnv,serializer, closureSerializer, blockTransferService,//爲SparkEnv的一個組成 ....,conf)
在上文,咱們談到,一個Server的構造依賴RpcHandler提供RPC的功能注入以及提供streamManager的數據注入。對於NettyBlockTransferService,該RpcHandler即爲NettyBlockRpcServer,在構造的過程當中,須要與本地的BlockManager進行管理,從而支持對外提供本地BlockMananger中管理的數據
"RpcHandler提供RPC的功能注入"在這裏仍是屬於比較「簡陋的」,畢竟他是屬於數據傳輸模塊,Server中提供的chunkFetch和stream已經足夠知足他的功能須要,那如今問題就是怎麼從streamManager中讀取數據來提供給chunkFetch和stream進行使用呢?
就是NettyBlockRpcServer做爲RpcHandler提供的一個Rpc接口之一:OpenBlocks,它接受由Client提供一個Blockids列表,Server根據該BlockIds從BlockManager獲取到相應的數據並註冊到streamManager中,同時返回一個StreamID,後續Client便可以使用該StreamID發起ChunkFetch的操做。
//case openBlocks: OpenBlocks => val blocks: Seq[ManagedBuffer] = openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData) val streamId = streamManager.registerStream(appId, blocks.iterator.asJava) responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteBuffer)
NettyBlockTransferService做爲一個Client
從NettyBlockTransferService做爲一個Server,咱們基本能夠推測NettyBlockTransferService做爲一個Client支持fetchBlocks的功能的基本方法:
核心代碼以下:
//發出openMessage請求 client.sendRpc(openMessage.toByteBuffer(), new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { streamHandle = (StreamHandle)response;//獲取streamId //針對streamid發出一組fetchChunk for (int i = 0; i < streamHandle.numChunks; i++) { client.fetchChunk(streamHandle.streamId, i, chunkCallback); } } });
同時,爲了提升服務端穩定性,針對fetchBlocks操做NettyBlockTransferService提供了非重試版本和重試版本的BlockFetcher,分別爲OneForOneBlockFetcher和RetryingBlockFetcher,經過參數(spark.[module].io.maxRetries)進行配置,默認是重試3次,除非你蛋疼,你不重試!!!
在Spark,Block有各類類型,能夠是ShuffleBlock,也能夠是BroadcastBlock等等,對於ShuffleBlock的Fetch,除了由Executor內部的NettyBlockTransferService提供服務之外,也能夠由外部的ShuffleService來充當Server的功能,並由專門的ExternalShuffleClient來與其進行交互,從而獲取到相應Block數據。功能的原理和實現,基本一致,可是問題來了?爲何須要一個專門的ShuffleService服務呢?主要緣由仍是爲了作到任務隔離,即減輕由於fetch帶來對Executor的壓力,讓其專心的進行數據的計算。
其實外部的ShuffleService最終是來自Hadoop的AuxiliaryService概念,AuxiliaryService爲計算節點NodeManager常駐的服務線程,早期的MapReduce是進程級別的調度,ShuffleMap完成shuffle文件的輸出之後,即當即退出,在ShuffleReduce過程當中由誰來提供文件的讀取服務呢?即AuxiliaryService,每個ShuffleMap都會將本身在本地的輸出,註冊到AuxiliaryService,由AuxiliaryService提供本地數據的清理以及外部讀取的功能。
在目前Spark中,也提供了這樣的一個AuxiliaryService:YarnShuffleService,可是對於Spark不是必須的,若是你考慮到須要「經過減輕由於fetch帶來對Executor的壓力」,那麼就能夠嘗試嘗試。
同時,若是啓用了外部的ShuffleService,對於shuffleClient也不是使用上面的NettyBlockTransferService,而是專門的ExternalShuffleClient,功能邏輯基本一致!
Spark Network的功能應用--新的RPC框架
Akka的通訊模型是基於Actor,一個Actor能夠理解爲一個Service服務對象,它能夠針對相應的RPC請求進行處理,以下所示,定義了一個最爲基本的Actor:
class HelloActor extends Actor { def receive = { case "hello" => println("world") case _ => println("huh?") } } // Receive = PartialFunction[Any, Unit]
Actor內部只有惟一一個變量(固然也能夠理解爲函數了),即Receive,它爲一個偏函數,經過case語句能夠針對Any信息能夠進行相應的處理,這裏Any消息在實際項目中就是消息包。
另一個很重要的概念就是ActorSystem,它是一個Actor的容器,多個Actor能夠經過name->Actor的註冊到Actor中,在ActorSystem中能夠根據請求不一樣將請求路由給相應的Actor。ActorSystem和一組Actor構成一個完整的Server端,此時客戶端經過host:port與ActorSystem創建鏈接,經過指定name就能夠相應的Actor進行通訊,這裏客戶端就是ActorRef。全部Akka整個RPC通訊系列是由Actor,ActorRef,ActorSystem組成。
Spark基於這個思想在上述的Network的基礎上實現一套本身的RPC Actor模型,從而取代Akka。其中RpcEndpoint對於Actor,RpcEndpointRef對應ActorRef,RpcEnv即對應了ActorSystem。
下面咱們具體進行分析它的實現原理。
private[spark] trait RpcEndpoint { def receive: PartialFunction[Any, Unit] = { case _ => throw new SparkException() } def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case _ => context.sendFailure(new SparkException()) } //onStart(),onStop() }
RpcEndpoint與Actor同樣,不一樣RPC Server能夠根據業務須要指定相應receive/receiveAndReply的實現,在Spark內部如今有N多個這樣的Actor,好比Executor就是一個Actor,它處理來自Driver的LaunchTask/KillTask等消息。
RpcEnv相對於ActorSystem:
RpcEndpointRef即爲與相應Endpoint通訊的引用,它對外暴露了send/ask等接口,實現將一個Message發送到Endpoint中。
這就是新版本的RPC框架的基本功能,它的實現基本上與Akka無縫對接,業務的遷移的功能很小,目前基本上都所有遷移完了。
RpcEnv內部實現原理
RpcEnv不只從外部接口與Akka基本一致,在內部的實現上,也基本差很少,都是按照MailBox的設計思路來實現的;
<ignore_js_op> 與上圖所示,RpcEnv即充當着Server,同時也爲Client內部實現。 當As Server,RpcEnv會初始化一個Server,並註冊NettyRpcHandler,在前面描述過,RpcHandler的receive接口負責對每個請求進行處理,通常狀況下,簡單業務能夠在RpcHandler直接完成請求的處理,可是考慮一個RpcEnv的Server上會掛載了不少個RpcEndpoint,每一個RpcEndpoint的RPC請求頻率不可控,所以須要對必定的分發機制和隊列來維護這些請求,其中Dispatcher爲分發器,InBox即爲請求隊列;在將RpcEndpoint註冊到RpcEnv過程當中,也間接的將RpcEnv註冊到Dispatcher分發器中,Dispatcher針對每一個RpcEndpoint維護一個InBox,在Dispatcher維持一個線程池(線程池大小默認爲系統可用的核數,固然也能夠經過spark.rpc.netty.dispatcher.numThreads進行配置),線程針對每一個InBox裏面的請求進行處理。固然實際的處理過程是由RpcEndpoint來完成。這就是RpcEnv As Server的基本過程!其次RpcEnv也完成Client的功能實現,RpcEndpointRef是以RpcEndpoint爲單位,即若是一個進程須要和遠程機器上N個RpcEndpoint服務進行通訊,就對應N個RpcEndpointRef(後端的實際的網絡鏈接是公用,這個是TransportClient內部提供了鏈接池來實現的),當調用一個RpcEndpointRef的ask/send等接口時候,會將把「消息內容+RpcEndpointRef+本地地址」一塊兒打包爲一個RequestMessage,交由RpcEnv進行發送。注意這裏打包的消息裏面包括RpcEndpointRef自己是很重要的,從而能夠由Server端識別出這個消息對應的是哪個RpcEndpoint。和發送端同樣,在RpcEnv中,針對每一個remote端的host:port維護一個隊列,即OutBox,RpcEnv的發送僅僅是把消息放入到相應的隊列中,可是和發送端不同的是:在OutBox中沒有維護一個所謂的線程池來定時清理OutBox,而是經過一堆synchronized來實現的,這點值得商討。