YARN底層基礎庫

 
YARN基礎庫是其餘一切模塊的基礎,它的設計直接決定了YARN的穩定性和擴展性,YARN借用了MRV1的一些底層基礎庫,好比RPC庫等,但由於引入了不少新的軟件設計方式,因此它的基礎庫更多,包括直接使用了開源序列化框架Protocol Buffers和Apache Avro,自定義的服務庫、事件庫和狀態機等



目錄
 
一. 概述
二. Protocol Buffers
三. Apache Avro
四. 底層通訊庫
五. 服務庫與事件庫
六. 狀態機庫



一. 概述
 
Yarn基礎庫是其餘一切模塊的基礎,它的設計直接決定了Yarn的穩定性和擴展性
 
Yarn的基礎庫主要有 : 
  • Protocol Buffers : Protocol Buffers是Google開源的序列化庫,具備平臺無關,高性能,兼容好等優勢.Yarn將ProtocolBuffers用到RPC通訊中,默認狀況下,Yarn RPC中全部參數採用Protocol Buffers進行序列化/反序列化
  • Apache Avro : Avro是Hadoop生態系統中的RPC框架,具備平臺無關,支持動態模式等優勢,Avro的最初設計動機是解決Yarn RPC兼容性和擴展性差等問題
  • RPC庫 : Yarn採用MR1中的RPC庫,但其中採用的默認序列化方法被替換成了Protocol Buffers
  • 服務庫和事件庫 : Yarn將全部的對象服務化,以便統一管理(建立,銷燬等),而服務之間則採用事件機制進行通訊
  • 狀態機庫 : 狀態機是一種表示有限個狀態以及在這些狀態之間的轉移和動做等行爲的數學模型
二. Protocol Buffers
 
Protocol Buffers是一種輕便高效的結構化數據存儲格式,能夠用於結構化數據序列化/反序列化
 
他適合作數據存儲或RPC的數據交換格式,經常使用做通訊協議,數據存儲等領域的與語言無關,平臺無關,可擴展的序列化結構數據格式
 
相比XML格式,Protocol Buffers的優勢 : 
  • 平臺無關,語言無關
  • 高性能,解析速度是XML的20 ~ 100倍
  • 體積小,文件大小僅是XML的1/10 ~ 1/3
  • 使用簡單
  • 兼容性好
 
Yarn中,全部的RPC函數的參數均採用Protocol Buffers定義的,Yarn的RPC協議全是使用Protocol Buffers定義(RPC協議上一章有介紹)
 
三. Apache Avro
 
Apache Avro是Hadoop下的一個子項目。它自己既是一個序列化框架,同時也實現了RPC的功能
 
Avro的特性和功能 : 
  • 豐富的數據結構類型
  • 快速可壓縮的二進制數據形式
  • 存儲持久數據的文件容器
  • 提供遠程過程調用RPC
  • 簡單的動態語言結合功能
 
Avro具備如下特色 : 
  • 支持動態模式。Avro不須要生成代碼,有利於搭建通用的數據處理系統,避免了代碼入侵
  • 數據無需加標籤
  • 無需手工分配的域標識
 
Avro做爲日誌序列化庫使用,在Yarn MapReduce中,全部事件的序列化/反序列化均採用Avro完成
 
四. 底層通訊庫
  
HadoopRPC的解析參考個人文章Hadoop RPC機制詳解
 
YARN提供的對外類是Yarn RPC,用戶只需使用該類即可以構建一個基於HadoopRPC且採用Protocol Buffers序列化框架的通訊協議
 
五. 服務庫與事件庫
 
服務庫
 
對於生命週期較長的對象,YARN採用了基於服務的對象管理模型對其進行管理,該模型主要有如下幾個特色 : 
  • 將每一個被服務化的對象分爲4個狀態 : NOTINITED(被建立),INITED(已初始化),STARTED(已啓動),STOPPED(已中止)
  • 任何服務狀態變化均可以觸發另一些動做
  • 可經過組合的方式對任意服務進行組合,以便進行統一管理
 
YARN中全部的服務對象最終都實現了接口Service,它定義了最基本的服務初始化、啓動、中止等操做,而AbstractService類提供了一個最基本的Service實現。
 
YARN中全部對象,若是是非組合服務,直接繼承AbstractService類便可,不然需CompositeService。好比,對於RM而言,它是一個組合服務,它組合了各類服務對象,包括ClientRMService、ApplicationMasterLauncher、ApplicationMasterService等
 
 
                     YARN中服務模型的類圖
 
YARN中,RM和NM屬於組合服務,它們內部包含多個單一服務和組合服務,以實現對內部多種服務的統一管理
 
事件庫
 
YARN採用了基於事件驅動的併發模型,該模型可以大大加強併發性,從而提升系統總體性能。爲了構建該模型,YARN將各類處理邏輯抽象成事件和對應事件調度器,並將每類事件的處理過程分割成多個步驟,用有限狀態機表示
 
 
                    YARN的事件處理模型
 
整個處理過程大體爲 :處理請求會做爲事件進入系統,由中央異步調度器(AsyncDispatcher)負責傳遞給相應事件調度器(Event Handler)。該事件調度器可能將該事件轉發給另一個事件調度器,也可能交給一個帶有有限狀態機的事件處理器,其處理結果也以事件的形式輸出給中央異步調度器。而新的事件會再次被中央異步調度器轉發給下一個事件調度器,直至處理完成(達到終止條件)
 
YARN中,全部核心服務實際上都是一箇中央異步調度器,包括RM、NM和AppMaster等,它們維護了事先註冊的事件與事件處理器,並根據接收的事件類型驅動服務的運行
 
使用YARN事件庫時,一般先定義一箇中央異步調度器,負責事件的轉發與處理,而後根據實際業務需求定義一系列的事件與事件處理器,並註冊到中央異步調度器實現事件統一管理和調度。以MRAppMaster爲例,它內部包含一箇中央異步調度器,並註冊了TaskAttemptEvent/TaskAttemptImpl、TaskEvent/TaskImpl、JobEvent/JobImpl等一系列事件/事件處理器,由中央異步調度器統一管理和調度
 
服務化和事件驅動軟件設計思想的引入,使得YARN具備低耦合、高內聚的特色,各個模塊只需完成各自功能,而模塊之間則採用事件聯繫起來,系統設計簡單且維護方便
 
YARN服務庫和事件庫的使用
 
YARN服務庫和事件庫的使用方法,介紹一個簡單的實例,該例子涉及任務和做業兩種對象的事件以及一箇中央異步調度器
 
(1) 定義Task事件
 
public class TaskEvent extends AbstractEvent<TaskEventType> {
    private String taskID;
    public TaskEvent (String taskID, TaskEventType type) {
        super(type);
        this.taskID = taskID;
    }
    public String getTaskID() {
        return taskID;
    }
}
// Task事件類型定義
public enum TaskEventType {
    T_KILL,
    T_SCHEDULE
}
 
(2) 定義Job事件
 
public class JobEvent extends AbstractEvent<JobEventType> {
    private String jobID;
    public JobEvent (String jobID, JobEventType type) {
        super(type);
        this.jobID = jobID;
    }
}
//Job事件類型定義
public enum JobEventType {
    JOB_KILL,
    JOB_INIT,
    JOB_START
}
 
(3) 事件調度器
 
定義一箇中央異步調度器,它接收Job和Task兩種類型事件,並交給對應的事件處理器處理
@SuppressWarnings("unchecked")
public class SimpleMRAppMaster extends CompositeService {
    private Dispatcher dispatcher; //中央異步調度器
    private String jobID;
    private int taskNumber; //該做業包含的任務數目
    private String[] taskIDs; //該做業內部包含的全部任務
 
    public SimpleMRAppMaster (String name, String jobID, int taskNumber) {
        super(name);
        this.jobID = jobID;
        this.taskNumber = taskNumber;
        taskIDs = new String[taskNumber];
        for (int i = 0; i < taskNumber; i++) {
            taskIDs[i] = new String (jobID + "_task_" + i);
        }
    }
 
    public void serviceInit (final Configuration conf) throws Exception {
        dispatcher = new AsyncDispatcher(); //定義一箇中央異步調度器
        //分別註冊Job和Task事件調度器
        dispatcher.register(JobEventType.class, new JobEventDispatcher());
        dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
        addService((Service)dispatcher);
        super.serviceInit(conf);
    }
 
    public Dispatcher getDispatcher() {
        return dispatcher;
    }
 
    private class JobEventDispatcher implements EventHandler<JobEvent> {
        @Override
        public void handle (JobEvent event) {
            if (event.getType() == JobEventType.JOB_KILL) {
                System.out.println("Receive JOB_KILL event, killing all the tasks");
                for (int i = 0; i < taskNumber; i++) {
                    dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_KILL));
                }
            } else if (event.getType() == JobEventType.JOB_INIT) {
                System.out.println("Receive JOB_INIT event, scheduling tasks");
                for (int i = 0; i < taskNumber; i++) {
                    dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_SCHEDULE));
                }
            }
        }
    }
    
    private class TaskEventDispatcher implements EventHandler<TaskEvent> {
        @Override
        public void handler (TaskEvent event) {
            if (event.getType() == TaskEventType.T_KILL) {
                System.out.println("Receive T_KILL event of task" + event.getTaskID());
            } else if (event.getType() == TaskEventType.T_SCHEDULE) {
                System.out.println("Receive T_SCHEDULE of task" + event.getTaskID()); 
            }
        }
    }
}
 
(4). 測試程序
 
@SuppressWarnings("unchecked")
public class SimpleMRAppMasterTest {
    public static void main (String[] args) throws Exception {
        String jobID = "job_20131215_12";
        SimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);
        YarnConfiguration conf = new YarnConfiguration(new Configuration());
        appMaster.serviceInit(conf);
        appMaster.serviceStart();
        appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID, JobEventType.JOB_KILL));
        appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID, JobEventType.JOB_INIT));
    }
}
 
事件驅動帶來的變化
 
MRV1中,對象之間的做用關係是基於函數調用實現的,當一個對象向另一個對象傳遞消息時,會直接採用函數調用的方式,且整個過程是串行的
 
基於函數調用的編程模型時低效的,它隱含着整個過程是串行、同步進行的。MRV2引入了事件驅動編程模型是一種更加高效的方式。
 
在基於事件驅動的編程模型中,全部對象被抽象成了事件處理器,而事件處理器之間經過事件相互關聯。 每種事件處理器處理一種類型的事件,同時根據須要觸發另一種事件
 
相比於基於函數調用的編程模型,這種編程模型具備異步、併發等特色,更加高效,所以更適合大型分佈式系統
 
六. 狀態機庫
 
狀態機由一組狀態組成,這些狀態分爲三類:初始狀態、中間狀態和最終狀態。狀態機從初始狀態開始運行,通過一系列中間狀態後,到達最終狀態並退出。在一個狀態機中,每一個狀態均可以接收一組特定事件,並根據具體的事件類型轉換到另外一個狀態。當狀態機轉換到最終狀態時,則退出
 
YARN狀態轉換方式
 
YARN中,每種狀態轉換由一個四元組表示,分別是轉換前狀態(preState)、轉換後狀態(postState)、事件(event)和回調函數(hook)
 
YARN定義了三種狀態轉換方式 : 
 
(1) 一個初始狀態、一個最終狀態、一種事件。該方式表示狀態機在preState狀態下,接收到Event事件後,執行函數狀態轉移函數Hook,並在執行完成後將當前狀態轉換爲postState
 
              初始狀態:最終狀態:事件=1:1:1
 
(2) 一個初始狀態、多個最終狀態、一種事件。該方式表示狀態機在preState狀態下,接收到Event事件後,執行函數狀態轉移函數Hook,並將當前狀態轉移爲Hook的返回值所表示的狀態
 
               初始狀態:最終狀態:事件=1:N:1
 
(3) 一個初始狀態、一個最終狀態、多個事件。該方式表示狀態機在preState狀態下,接收到Event一、Event2和Event3中的任何一個事件,將執行函數狀態轉移函數Hook,並在執行完成後將當前狀態轉換成postState
 
             初始狀態:最終狀態:事件=1:1:N
 
狀態機類
 
YARN本身實現了一個很是簡單的狀態機庫(位於包org.apache.hadcop.yarn.state中)。YARN對外提供了一個狀態機工廠StatemachineFactory,它提供多種addTransition方法供用戶添加各類狀態轉移,一旦狀態機添加完畢後,可經過調用installTopology完成一個狀態機的構建
 
我天天會寫文章記錄大數據技術學習之路,另外我本身整理了些大數據的學習資料,目前所有放在個人公衆號"SmallBird技術分享",加入咱們一塊兒學習交流,而且回覆'分享'會有大數據資源驚喜等着你~
相關文章
相關標籤/搜索