1. Job執行在Linkis處在什麼位置微信
Job執行在Linkis的架構中處於統一做業執行服務(Unified Job Execution Services)中,以下圖所示:
websocket
2. Job處理流程架構
2.1 Job封裝併發
1. Clinet(如前端)發起請求執行Job,Job請求信息以下:
異步
2. GateWay收到Job後將Job經過executeApplicationName和requestApplicationName轉發給對應的Entrance。socket
3. Entrance接受到對應的Job後,會調用JobHistory的RPC對Job的元信息進行持久化,並對前端的請求進行解析對自定義變量和參數進行封裝,封裝爲能夠執行的Job。svg
2.2 Job的執行高併發
1. 拿到解析後的Job後,Entrance會先爲用戶去請求一個計算引擎,在請求前先去ResourceManager找到負載最低的EngineManager(引擎管理器);
2. 接着會向負載低的EM請求啓動Engine,若是是JDBC和MLSQL沒有EM的服務,這步會在Entrance裏面進行;
3. EM接收到啓動新的Engine後,會先向RM(資源管理器)爲這個用戶申請資源啓動Engine,若是資源足夠則準備啓動引擎;
4. EM根據申請到的資源啓動新的Engine,引擎啓動成功後會向EM推送信息,EM接着推送給Entrance,至此引擎的申請流程結束;
5. Entrance請求到Engine後,首先爲該Job申請鎖定該Engine以防止其餘Entrance和Job提交到該Engine;
6. Entrance鎖定Engine成功後,會將Job發送給Engine執行,至此Job開始實際執行
這裏須要說明的是若是用戶存在一個可用空閒的引擎,則會跳過1,2,3,4 四個步驟;
2.3 Job信息推送
1. Job執行過程當中爲了客戶端的用戶能夠實時看到job的運行情況,Engine會將Job的狀態/進度/日誌經過RPC實時推送給Entrance;
2. Entrance收到Job信息進行持久化處理,若是是websocket則將信息直接推送給用戶;
3. Job執行完成後,Engine會將最終狀態和結果集推送給Entrance,Entrance收到後將結果保存到結果集路徑,接着更新JobHistory中的狀態信息和結果集信息;
4. 客戶端經過狀態接口判斷Job的成功與否,若是成功則能夠經過調用接口請求JobHistory拿到結果集信息。至此整個Job則執行完成。
3 Linkis的Job執行源碼解讀
第三章對Job的處理流程進行了一個簡單講解,本章主要講解在這一流程中Entrance,EngineManager,Engine的代碼調用流程。
3.1.Entrance處理流程
Entrance是Job執行的入口,當Job從GateWay轉發後會先到EntranceRestfulApi或者EntranceWebSocketService分別對應Rest和WebSocket請求的邏輯處理。下面咱們從WebSocket請求進行講解,Rest請求相似;
1. 前端的WebSocket請求經過Gateway轉發到ServerSocket類,繼承了WebSocketAdapter,ServerSocket會調用ControllerServer的onMessage的方法將消息投遞到serverListenerEventBus消息總線,serverListenerEventBus接着會將消息給到EntranceWebSocketService進行處理處理部分代碼以下:
2. 執行請求處理邏輯:EntranceWebSocketService的dealExecute方法會調用entranceServer.execute去執行Job,entranceServer接着會對job進行解析封裝而後提交給調度器。
3. Job執行流程:調度拿到Job後會爲這個Job生成一個groupName,而後經過groupName去ConsumerManager獲取一個Consumer(分組消費器),並將該Job傳遞給這個FIFOUserConsumer ,FIFOUserConsumer 接着會循環去BlockingLoopQueue裏面取Job進行消費,拿到Job後FIFOUserConsumer 會請求一個引擎,接着將該Job提交給線程池運行。
Job線程運行起來後首先會將該Job請求提交給遠程Engine進行執行,並拿到響應,爲了提高Job的性能,這裏通常會返回AsynReturnExecuteResponse(異步返回的請求響應) 用於將Job的狀態和信息都異步返回回來,該線程能夠直接執行完下降線程開銷
4. Job信息推送:Job信息推送爲Engine經過RPC推送給Entrance,Entrance再經過WebSocket推送給用戶,或者用戶經過Rest請求。
3.2 EngineManager
引擎管理器EngineManager是用來管理引擎的,用於對引擎的生命週期進行管理,當Entrance中的Consumer發起askExecutor時會將PRC請求發送給EngineManagerReceiver,
EM接收到消息會先向RM判斷該用戶和Creator是否還有足夠的資源啓動引擎,資源判斷經過後纔會發起引擎啓動
Engine的正常退出流程分爲兩種一、用戶發起kill命令,經過EM 殺掉Engine 二、Engine空閒時間過長自行kill,默認一個小時
用戶調用kill後會EM會調用:engineManager.getEngineManagerContext.getOrCreateEngineFactory.delete
3.3.Engine處理流程
Engine啓動成功後就能夠接受Entrance的job請求,執行並推送Job信息給到Entrance,Entrance的job請求會發送到EngineReceiver
engine的Job執行調度流程和Entrance的相同,除了實現類不同都是經過Scheduler--Consumer--Executor--Executor執行Job,這裏須要說明的是Executor對應的是相應Engine的具體執行代碼的實現類不是Engnie,這是與Entrance有區別的地方,執行流程也主要在Executor進行實現的:
Job信息經過JobDemo定時進行推送:
3.4 Linkis的Job架構歸納
上面從入口、引擎管理器、引擎介紹了一個Job的總體執行流程,Job執行的總體的調用鏈能夠總結爲下圖:
4. 總結
本文從Job在Linkis 0.X版本中所處的位置進行引入,介紹了一個Job從提交到封裝、運行、信息推送的整個流程和源碼解析。在Linkis1.0版本中咱們對Job執行作了多個優化:
1. 任務標籤化:靈活的經過標籤作租戶隔離,智能路由對應的引擎服務,並支持經過標籤指定提交的Hadoop集羣等;
2. 任務全棧化:支持在交互做業的基礎上,支持流式、一次性批量等做業類型;
3. 任務解析策略化:借鑑Calcite思想,對任務進行編排優化,支持更多的計算策略,智能調度執行;
4. 服務簡化:統一Entrance和EngineManager服務,底層計算存儲引擎只須要實現引擎插件(EngineConnPlugin)就能夠完成新引擎的實現,不在須要實現Entrance和EngineManager服務。
Linkis1.0 新的架構對多個模塊進行了架構調整和優化,敬請期待,同時Linkis1.0有多個模塊正在開發實現當中,歡迎社區各位大佬的加入。
本文從Job在Linkis中所處的位置進行引入,介紹了一個Job從提交到封裝、運行、信息推送的整個流程和源碼解析。後續將爲你們帶來Linkis更多模塊的代碼解析,敬請期待。
掃碼關注咱們
微信號公衆號 : WeDataSphere
GitHub:WeDataSphere
若是喜歡咱們的產品或文章,請給咱們的GitHub點上你寶貴的star和fork哦~~
WeDataSphere,BIG DATA MADE EASY.
用心作一個有溫度的開源社區
~歡迎關注~
歡迎加入咱們的有獎徵文活動哦,詳見以下連接~
同時誠摯的但願您點開「閱讀原文」,在OSC開源投票中,爲Linkis與DataSphere Studio投上您寶貴的一票哦~~
本文分享自微信公衆號 - WeDataSphere(gh_273e85fce73b)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。