Storm源碼分析--Nimbus啓動過程

Nimbus server, 首先從啓動命令開始, 一樣是使用storm命令"storm nimbus」來啓動
看下源碼, 此處和上面client不一樣, jvmtype="-server", 最終調用"backtype.storm.daemon.nimbus"的main
nimbus是用clojure實現的, 可是clojure是基於JVM的, 因此在最終發佈的時候會產生nimbus.class,
因此在用戶使用的時候徹底能夠不知道clojure, 看上去全部都是Java. clojure只是用於提升開發效率而已.java

1. Nimbus啓動過程

bin/stormnode

def nimbus(klass="backtype.storm.daemon.nimbus"):
    """Syntax: [storm nimbus]
    Launches the nimbus daemon. This command should be run under 
    supervision with a tool like daemontools or monit. 
    See Setting up a Storm cluster for more information.
    """
    cppaths = [CLUSTER_CONF_DIR]
    jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
        "-Dlogfile.name=nimbus.log",
        "-Dlogback.configurationFile=" + STORM_DIR + "/logback/cluster.xml",
    ]
    exec_storm_class(klass, jvmtype="-server", extrajars=cppaths, jvmopts=jvmopts)

storm-core/backtype/storm/daemon/nimbus.cljpython

;; 啓動nimbus的主方法
(defn -main []                      ;; main前面加上-, 表示是public的. 因此bin/storm能直接調用nimbus.clj的main方法
  (-launch (standalone-nimbus)))        ;; 一樣launch也是一個public方法. standalone-nimbus是一個方法, clojure對於沒有參數的方法能夠省略()

(defn -launch [nimbus]              ;; launch的參數是一個Nimbus對象, 因此上面standalone-nimbus方法的返回值是Nimbus
  (launch-server! (read-storm-config) nimbus))

注意在clojure中的函數命名規範,-functionname表示該函數是public的,如上面的-main,調用該函數的時候,不須要加-,使用main便可。
而與此相對的是defn-,這個表示該函數是私有函數,不可在外部調用。c++

1) standalone-nimbus

nimbus的main, 最終會調到launch-server!, conf參數是調用read-storm-config讀出的配置參數,
而nimbus是INimbus接口(backtype.storm.scheduler.INimbus)的實現, 能夠參考standalone-nimbus.
storm-core/backtype/storm/scheduler/INimbus.javajson

public interface INimbus {
    void prepare(Map stormConf, String schedulerLocalDir);
    /**Returns all slots that are available for the next round of scheduling.在下一次調度中可用的槽位
     * A slot is available for scheduling 若是槽位是空閒的且能夠被分配的, 或者雖然被使用但能夠被從新分配的. 都是能夠被調度的
     * if it is free and can be assigned to, or if it is used and can be reassigned. */
    Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors, Topologies topologies, Set<String> topologiesMissingAssignments);

    // this is called after the assignment is changed in ZK
    void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId);

    // map from node id to supervisor details
    String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId);    
    IScheduler getForcedScheduler(); 
}
;; 返回一個實現了INimbus接口的對象. 因爲不想建立這種類型, 使用reify匿名對象
(defn standalone-nimbus []                  ;; 沒有參數. clojure中[]使用的地方有: let綁定, 方法的參數, vector
  (reify INimbus                                ;; reify: 具體化匿名數據類型: 須要一個實現了某一協議/接口的對象,可是不想建立一個命名的數據類型. 匿名類
    ;; 下面的方式都是INimbus接口的實現方法
    (prepare [this conf local-dir])                 ;; this能夠看作是一個隱式參數, prepare方法實際只有2個參數的
    (allSlotsAvailableForScheduling [this supervisors topologies topologies-missing-assignments]
      (->> supervisors
           (mapcat (fn [^SupervisorDetails s]
                     (for [p (.getMeta s)]
                       (WorkerSlot. (.getId s) p))))
           set ))
    (assignSlots [this topology slots])
    (getForcedScheduler [this] nil )
    (getHostName [this supervisors node-id]
      (if-let [^SupervisorDetails supervisor (get supervisors node-id)]
        (.getHost supervisor)))
    ))

這裏面有好幾個語法點: ->>, mapcat if-let
mapcat, (mapcat f & colls) 和普通map不一樣的是, 會對map執行的結果執行concat操做等於(concat (map f &colls))
依次對colls中的每一個集合運用函數f, 最後將每一個結果合併起來. (mapcat f collections)的map不是數據結構意義的映射. 而是一個遍歷操做.
普通的map版本是: (map f collection), 用java來描述就是for(Object o : collection) func(o). 集合中的每一個元素會做爲函數f的參數.
上面的(mapcat (fn [s] ...))並無看到collections. 這個要結合->> supervisors來一塊兒分析.
->> supervisors (mapcat fun) 實際上等價於(mapcat fun supervisors). 因爲mapcat的返回值是map,根據接口的定義返回值是一個集合Collection
因此(mapcat)表達式後面的set的意思是將(mapcat)表達式的返回值轉換爲set, (mapcat)表達式的返回值會跟在set後面做爲最後一個Item.
達到連續調用的功能. ->>和->的區別是->是將返回值做爲下一個表達式的第二個Item, 而->>是做爲下一個表達式的最後一個Item.bootstrap

supervisors不是Supervisor列表, 其類型是SupervisorDetails. mapcat後面緊跟的函數的參數類型對應的是collections=supervisors的類型.
WorkerSlot須要兩個參數id和port. 因此這個方法返回的是Collection, 對應接口INimbus的返回類型.服務器

getHostName的參數supervisors和allSlotsAvailableForScheduling的supervisors是同樣的.
經過supervisors.get(node-id)獲取對應的supervisor. 因此咱們能夠猜想supervisors是一個Map.
storm-core/backtype/storm/scheduler/SupervisorDetails.java網絡

public class SupervisorDetails {
    String id;
    String host;                // hostname of this supervisor
    Object meta;
    Object schedulerMeta;       // meta data configured for this supervisor
    Set<Integer> allPorts;      // all the ports of the supervisor  
}

Nimbus要分配任務給Supervisor上的Worker進行工做, 而每一個Supervisor會有多個worker. 配置文件中能夠爲一個supervisor配置多個slot port.數據結構

2) read-storm-config

閱讀源碼其實都會遵循一個範式,那就是程序的入口在哪,配置文件是在何時讀入的。那麼好,如今就來說配置參數的讀入,在上面的-launch函數中,
已經能夠見到用以讀取配置文件的函數了,那就是read-storm-config。很是狗血的是, 在 nimbus.clj 中有一個名稱很是相似的函數稱爲read-storm-conf,這個可不是來讀取storm cluster的配置信息,它實際上是用來讀取Topology的配置內容的。read-storm-config定義於config.clj中,此時你會說等等,沒見到有地方
import或是use backtype.storm.config啊。這一切都被包裝了,它們通通被放到bootstrap.clj中了。注意到這行沒 (bootstrap)
好了, 上述有關文件引用的疑問解決以後, 仍是回到正題, 看看read-storm-config的定義吧。storm默認的配置文件使用的是yaml格式,必定要找到使用yaml parser的地方。
storm-core/backtype/storm/config.cljjvm

(defn read-storm-config []
  (let [conf (clojurify-structure (Utils/readStormConfig))] ;; let中參數conf被賦值爲右側的表達式的值. 和java方法參數不一樣, let中參數能夠被計算
    (validate-configs-with-schemas conf)                ;; 對conf進行驗證
    conf))                                      ;; 返回這個conf

真正實現對配置文件storm.yaml進行讀取的是由java代碼來實現的,readStormConfig定義於Utils.java中。
storm-core/backtype/storm/utils/Utils.java

public static Map readDefaultConfig() {
        return findAndReadConfigFile("defaults.yaml", true);
    }
    public static Map readStormConfig() {
        Map ret = readDefaultConfig();              // 首先讀取defaults.yaml的配置
        String confFile = System.getProperty("storm.conf.file");
        Map storm;
        if (confFile==null || confFile.equals("")) {
            storm = findAndReadConfigFile("storm.yaml", false); 
        } else {
            storm = findAndReadConfigFile(confFile, true);
        }
        ret.putAll(storm);                          // 其次讀取storm.yaml中的配置
        ret.putAll(readCommandLineOpts());          // 最後是命令行的參數, 這個優先級更高
        return ret;
    }
    public static Map findAndReadConfigFile(String name, boolean mustExist) {
            HashSet<URL> resources = new HashSet<URL>(findResources(name));
            URL resource = resources.iterator().next();
            Yaml yaml = new Yaml();
            Map ret = (Map) yaml.load(new InputStreamReader(resource.openStream()));
            if(ret==null) ret = new HashMap();
            return new HashMap(ret);                // 解析storm.yaml文件, 返回HashMap
    }
    public static List<URL> findResources(String name) {
            Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(name);
            List<URL> ret = new ArrayList<URL>();
            while(resources.hasMoreElements()) {
                ret.add(resources.nextElement());   
            }
            return ret;
    }

終於看到神祕的Yaml了,那麼Yaml這個類又是由誰提供的呢,看看Utils.java的 開頭部分有這麼一句話: import org.yaml.snakeyaml.Yaml;
再看看在storm-core/project.clj中定義的dependencies: [org.yaml/snakeyaml "1.11"]
至此,yaml文件的解析及其依賴關係的解決探索完畢。在新版本的storm中使用了maven管理. 能夠查看pom.xml

3) storm.yaml

conf/storm.yaml

# storm.zookeeper.servers:
#     - "server1"
#     - "server2"
# 
# nimbus.host: "nimbus"
# 
# ##### These may optionally be filled in:
#    
## List of custom serializations
# topology.kryo.register:
#     - org.mycompany.MyType
#     - org.mycompany.MyType2: org.mycompany.MyType2Serializer
#
## List of custom kryo decorators
# topology.kryo.decorators:
#     - org.mycompany.MyDecorator
#
## Locations of the drpc servers
# drpc.servers:
#     - "server1"
#     - "server2"

## Metrics Consumers
# topology.metrics.consumer.register:
#   - class: "backtype.storm.metrics.LoggingMetricsConsumer"
#     parallelism.hint: 1
#   - class: "org.mycompany.MyMetricsConsumer"
#     parallelism.hint: 1
#     argument:
#       - endpoint: "metrics-collector.mycompany.org"
storm.zookeeper.servers:
   - 127.0.0.1
storm.zookeeper.port: 2181
nimbus.host: "127.0.0.1"
storm.local.dir: "/home/hadoop/data/storm"
supervisor.slots.ports:
 - 6700
 - 6701
 - 6702
 - 6703

在配置文件中須要至少回答如下三個問題
1. zookeeper server在哪臺機器上運行,具體就來講就是ip地址啦
2. nimbus在哪運行,能夠填寫ip地址或域名
3. 在每臺supervisor運行的機器上能夠啓幾個slot,指定這些slot監聽的端 口號
2. thrift RPC
1) thrift
網絡結點之間的消息交互通常會牽涉到兩個基本的問題,
• 消息通道的創建
• 消息的編解碼
若是每變化一個需求就手工來重寫一次,一是繁瑣,二是易錯。爲了一勞永逸的解決此類問題,神同樣的工具就出現了,如google protolbuffer,如thrift.
thrift的使用步驟以下

編寫後綴名爲thrift的文件,使用工具生成對應語言的源碼,thrift支持的語言不少的,什麼c,c++,java,python等,通通不是問題。
實現thrift client
實現thrift server
thrift server須要實現thrift文件中定義的service接口。更爲具體的信息能夠經過閱讀官方文檔來得到。這裏有個thrift java的示例.

(1). 編寫thrift文件:add.thrift

namespace java com.zqh.code.thrift.server   // defines the namespace
typedef i32 int                         // typedefs to get convenient names for your types
service AdditionService {               // defines the service to add two numbers
        int add(1:int n1, 2:int n2),            // defines a method
}

(2). 編譯:thrift --gen java add.thrift 會在當前目錄生成gen-java/$namespace$/AdditionService
(3). Service:Interface的實現類

public class AdditionServiceHandler implements AdditionService.Iface {
    public int add(int n1, int n2) throws TException { return n1 + n2; }
}

實現類具體實現了thrift文件定義的接口方法.
(4). Server

public class MyServer {
    public static void start(AdditionService.Processor<AdditionServiceHandler> processor) {
            TServerTransport serverTransport = new TServerSocket(9090);     // 服務端Socket
            TServer server = new TSimpleServer(new Args(serverTransport).processor(processor));
            System.out.println("Starting the simple server...");
            server.serve();
    }
    public static void main(String[] args) {
        start(new AdditionService.Processor<AdditionServiceHandler>(new AdditionServiceHandler()));
    }
}

服務端經過TServerSocket暴露出服務端口, 客戶端要經過這個端口鏈接.
實現類Handler的實例要做爲生成的AdditionService.Processor的參數.
Args須要TServerTransport做爲參數, 而後調用processor方法, 該方法須要AdditionServiceProcessor參數.
這個過程相似於將自定義實現類Handler註冊到服務端上. 接着啓動服務器.
(5). Client

public class AdditionClient {
    public static void main(String[] args) {
            TTransport transport = new TSocket("localhost", 9090);
            transport.open();
            TProtocol protocol = new TBinaryProtocol(transport);
            AdditionService.Client client = new AdditionService.Client(protocol);
            System.out.println(client.add(100, 200));
            transport.close();
    }
}

客戶端要創建到服務端的鏈接, 須要提供Server的host和port. 根據TTransport構造出和服務端進行通信的一個協議.
這個協議傳給自動生成的AdditionService的Client內部類, 會生成一個相似服務端的代理對象.
接着就可使用這個代理對象調用thrift協議提供的方法.

分佈式測試: 能夠在兩臺機器上測試. 第一二步都須要在兩臺機器上操做: 編寫thrift文件, 編譯.
而後在第一臺機器操做3: 自定義實現類; 4: Server. 在第二臺機器上操做5: Client. 最後分別運行兩臺機器的Server和Client.

2) nimbus thrift server

有了thrift這個背景,咱們再從新拾起上述的代碼執行路徑。上頭講到程序執行至

(defn -launch [nimbus]                                  ;; launch的參數是一個Nimbus對象, 因此上面standalone-nimbus方法的返回值是Nimbus
  (launch-server! (read-storm-config) nimbus))

(defn launch-server! [conf nimbus]                      ;; 讓nimbus做爲一個thrift server運行起來
  (validate-distributed-mode! conf)                     ;; 分佈式模式下才會啓動thrift server
  (let [service-handler (service-handler conf nimbus)           ;; 自定義實現類, 實現storm.thrift中service Nimbus定義的接口方法
        options (-> (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT))) ;; 服務端的ServerSocket
                    (THsHaServer$Args.)             ;; TServerSocket做爲TServer.Args內部類的參數. 建立了Args args對象 ->表示插入第二個位置
                    (.workerThreads 64)                 ;; 上面new Args(TServerSocket)會做爲這裏的第二個位置, 即args.workerThreads(64)
                    (.protocolFactory (TBinaryProtocol$Factory. false true (conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)))
                    (.processor (Nimbus$Processor. service-handler))    ;; args做爲這裏的第二個位置,即調用了args.processor
                                                    ;; new Nimbus.Processor(service-handler), 自定義實現類做爲Nimbus.Processor的參數,
                                                    ;; processor會做爲參數再傳給args.processor()
                    )                               ;; 最終返回的是TServer.AbstractServerArgs, 會做爲TServer構造函數的參數
       server (THsHaServer. (do (set! (. options maxReadBufferBytes)(conf NIMBUS-THRIFT-MAX-BUFFER-SIZE)) options))]
    (.addShutdownHook (Runtime/getRuntime) (Thread. (fn [] (.shutdown service-handler) (.stop server))))
    (log-message "Starting Nimbus server...")               ;; 上面添加了一個關閉鉤子. 相似回調函數. 當關閉Nimbus的thrift服務時, 會觸發這個函數執行
    (.serve server)))                               ;; 啓動TServer, 即啓動Nimbus的thrift服務

launch-server!說白了,就是讓nimbus做爲一個thrift server運行起來, 那麼storm.thrift中service指定的各個接口函數實如今service-handler中完成。
對比clojure版本的建立thrift server的過程, 其實和上面java示例是同樣的, 只不過換了不一樣的實現類. 如下是java-clojure的代碼對比.
new AdditionServiceHandler() (service-handler conf nimbus)
new AdditionService.Processor(new AdditionServiceHandler()) (Nimbus$Processor. service-handler)
TServerTransport serverTransport = new TServerSocket(9090); (TNonblockingServerSocket. (int (conf NIMBUS-THRIFT-PORT)))
new Args(serverTransport) -> (TNonblockingServerSocket...) (THsHaServer$Args.)
new Args(serverTransport).processor(processor) -> (TNonblockingServerSocket...) (THsHaServer$Args.) (.processor (Nimbus$Processor. ..))
TServer server = new TSimpleServer(new Args(serverTransport).processor(processor)); server (THsHaServer… options)
server.serve(); (.serve server)

service-handler但是一個你們夥。對比一下 service-handler能夠發現,在storm.thrift中的定義的Nimbus服務,
其接口在 service-handler中一一得以實現。 如下是storm.thrift中關於service Nimbus的聲明。
storm-core/storm.thrift

namespace java backtype.storm.generated
service Nimbus {
  void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite);
  void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology, 5: SubmitOptions options) throws ...;
  void killTopology(1: string name) throws (1: NotAliveException e);
  void killTopologyWithOpts(1: string name, 2: KillOptions options) throws (1: NotAliveException e);
  void activate(1: string name) throws (1: NotAliveException e);
  void deactivate(1: string name) throws (1: NotAliveException e);
  void rebalance(1: string name, 2: RebalanceOptions options) throws (1: NotAliveException e, 2: InvalidTopologyException ite);

  // need to add functions for asking about status of storms, what nodes they're running on, looking at task logs
  string beginFileUpload();
  void uploadChunk(1: string location, 2: binary chunk);
  void finishFileUpload(1: string location);
  string beginFileDownload(1: string file);
  binary downloadChunk(1: string id);       //can stop downloading chunks when receive 0-length byte array back

  string getNimbusConf();                   // returns json
  ClusterSummary getClusterInfo();          // stats functions  
  TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e);
  string getTopologyConf(1: string id) throws (1: NotAliveException e); //returns json
  StormTopology getTopology(1: string id) throws (1: NotAliveException e);
  StormTopology getUserTopology(1: string id) throws (1: NotAliveException e);
}

這個文件還指定了其餘一些struct結構的數據類型, 好比StormTopology, TopologySummary, ClusterSummary, TopologyInfo等.
編譯storm.thrift文件生成的代碼在namespace指定的位置: backtype.storm.generated
storm-core/genthrift.sh

thrift7 --gen java:beans,hashcode,nocamel --gen py:utf8strings storm.thrift

如今來回顧下storm的thrift RPC的總體流程.
1. 編寫 storm.thrift
2. 編譯 genthrift.sh, 會在backtype.storm.generated生成Nimbus.java接口類. 其中含有內部類Iface(Service), Processor(Server), Client(Client)
3. Service服務類: nimbus.clj中的service-handler方法的返回值. 其應該實現Nimbus.Iface接口. 因此service-handler使用reify Nimbus$Iface
4. Server服務端: launch-server!中建立thrift的TServer, 並啓動. 使用了Nimbus.Processor, 傳入service-handler自定義服務實現類
5. Client客戶端: StormSubmitter中localNimbus!=null時, 使用NimbusClient即Nimbus.Client調用RPC定義的接口方法

注意: 對於本地模式, 在StormSubmitter中直接使用Nimbus.Iface localNimbus對象. 這個對象的實現類應該就是service-handler.
對於分佈式模式, StormSubmitter做爲客戶端, 會經過client調用RPC定義的接口方法. 即storm.thrift中定義的方法. 因此service-handler要實現這些方法!
storm thrift rpc

相關文章
相關標籤/搜索