Storm源碼分析--Topology Submit Client

  1. Storm Client
    最開始使用storm命令(bin/storm)來啓動topology, 以下
    storm jar storm-starter.jar storm.starter.WordCountTopology

這個storm命令是用python實現的, 看看其中的jar函數, 很簡單, 調用exec_storm_class, 其中jvmtype="-client"
而exec_storm_class其實就是拼出一條java執行命令, 而後用os.system(command)去執行, 爲什麼用Python寫, 簡單? 能夠直接使用storm命令?
這兒的klass就是topology類, 因此java命令只是調用Topology類的main函數
/bin/stormhtml

def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False):
    global CONFFILE
    all_args = [
        "java", jvmtype, get_config_opts(),
        "-Dstorm.home=" + STORM_DIR, 
        "-Djava.library.path=" + confvalue("java.library.path", extrajars),
        "-Dstorm.conf.file=" + CONFFILE,
        "-cp", get_classpath(extrajars),
    ] + jvmopts + [klass] + list(args)
    print "Running: " + " ".join(all_args)
    if fork:
        os.spawnvp(os.P_WAIT, "java", all_args)
    else:
        os.execvp("java", all_args) # replaces the current process and never returns
def jar(jarfile, klass, *args):
    """Syntax: [storm jar topology-jar-path class ...]
    Runs the main method of class with the specified arguments. 運行類的main方法而且跟上指定的參數
    The storm jars and configs in ~/.storm are put on the classpath.  Storm運行時的jar包和配置文件會被加載到類路徑中
    The process is configured so that StormSubmitter 當計算拓撲被提交後, jar包會被上傳
    will upload the jar at topology-jar-path when the topology is submitted.
    """
    exec_storm_class(
        klass,
        jvmtype="-client",
        extrajars=[jarfile, USER_CONF_DIR, STORM_DIR + "/bin"],
        args=args,
        jvmopts=[' '.join(filter(None, [JAR_JVM_OPTS, "-Dstorm.jar=" + jarfile]))])

執行上面的storm jar命令: storm jar storm-starter.jar storm.starter.WordCountTopology
jarfile = storm-starter.jar, klass = storm.starter.WordCountTopology, -Dstorm.jar = storm-starter.jarjava

直接看看WordCountTopology例子的main函數都執行什麼?
除了定義topology, 最終會調用StormSubmitter.submitTopology(args[0], conf, builder.createTopology()), 來提交topology
storm-starter/storm/starter/WordCountTopology.javapython

public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new RandomSentenceSpout(), 5);
    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

    Config conf = new Config();
    conf.setDebug(true);
    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);
      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    } else {
      conf.setMaxTaskParallelism(3);
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("word-count", conf, builder.createTopology());
      Thread.sleep(10000);
      cluster.shutdown();
    }
  }
  1. StormSubmitter
    1) 配置參數
    把命令行參數放在stormConf, 從conf/storm.yaml讀取配置參數到conf, 再把stormConf也put到conf, 可見命令行參數的優先級更高
    將stormConf轉化爲Json, 由於這個配置是要發送到服務器的

RPC分爲客戶端和服務端, 在客戶端要持有服務端的接口引用. 調用這個引用就會調用到服務端該接口的具體實現類的同名方法.
因此客戶端代碼使用Nimbus.Iface localNimbus引用, 至關於持有了一個服務器Nimbus的接口. 只要調用這個接口的方法便可.
storm-core/backtype/storm/utils/NimbusClient.java (注: .java文件在storm-core/src/jvm下)json

public class NimbusClient extends ThriftClient {
    private Nimbus.Client _client;
    public static NimbusClient getConfiguredClient(Map conf) {
            String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
            int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
            return new NimbusClient(conf, nimbusHost, nimbusPort);
    }
    public NimbusClient(Map conf, String host, int port) throws TTransportException {
        this(conf, host, port, null);
    }
    public NimbusClient(Map conf, String host, int port, Integer timeout) throws TTransportException {
        super(conf, host, port, timeout);
        _client = new Nimbus.Client(_protocol);
    }
    public Nimbus.Client getClient() { return _client; }
}

客戶端要持有NimbusClient, 要知道服務端Nimbus的主機和thrfit端口號,就能夠經過RPC調用Nimbus的方法.
2) Submit Jar
StormSubmitter的本質是個Thrift Client, 而Nimbus則是Thrift Server, 因此全部的操做都是經過Thrift RPC來完成,
先判斷topologyNameExists, 經過Thrift client獲得如今運行的topology的情況, 並check;而後Submit Jar, 經過底下三步服務器

    client.getClient().beginFileUpload(); 
    client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit)); 
    client.getClient().finishFileUpload(uploadLocation);

把數據經過RPC發過去, 具體怎麼存是nimbus本身的邏輯的事...
storm-core/backtype/storm/StormSubmitter.javaapp

private static String submittedJar = null;
    private static void submitJar(Map conf, ProgressListener listener) {
        if(submittedJar==null) {
            LOG.info("Jar not uploaded to master yet. Submitting jar...");
            String localJar = System.getProperty("storm.jar");      // 即storm jar xxx.jar的jar包
            submittedJar = submitJar(conf, localJar, listener);
        }
    }
    public static String submitJar(Map conf, String localJar) {
        NimbusClient client = NimbusClient.getConfiguredClient(conf);
        try {
            String uploadLocation = client.getClient().beginFileUpload();
            LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
            BufferFileInputStream is = new BufferFileInputStream(localJar);
            while(true) {
                byte[] toSubmit = is.read();
                if(toSubmit.length==0) break;
                client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
            }
            client.getClient().finishFileUpload(uploadLocation);
            LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
            return uploadLocation;
        } finally {client.close();}
    }

3) Submit Topology 只是簡單的調用RPC
storm-core/backtype/storm/StormSubmitter.java負載均衡

private static Nimbus.Iface localNimbus = null;         // 做爲RPC的服務端
    /**
     * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. 提交一個計算拓撲以便運行在集羣上
     * @param name the name of the storm. 計算拓撲的名字,即命令行參數的第一個參數
     * @param stormConf the topology-specific configuration. See {@link Config}.  計算拓撲指定的配置,Config繼承HashMap
     * @param topology the processing to execute.
     * @param options to manipulate the starting of the topology */
    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) {
        if(!Utils.isValidConf(stormConf)) {
            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
        }
        stormConf = new HashMap(stormConf);                 // Topology的Config設置
        stormConf.putAll(Utils.readCommandLineOpts());          // 命令行參數-Dstorm.options=,見bin/storm的get_config_opts()
        Map conf = Utils.readStormConfig();                     // storm.yaml
        conf.putAll(stormConf);                             // stormConf的配置會覆蓋storm.yaml的配置
            String serConf = JSONValue.toJSONString(stormConf); // Map轉爲json,序列化,要發送到Server
            if(localNimbus!=null) {                             // 本地模式(運行全部的nimbus, supervisor...)
                LOG.info("Submitting topology " + name + " in local mode");
                localNimbus.submitTopology(name, null, serConf, topology);
            } else {
                NimbusClient client = NimbusClient.getConfiguredClient(conf);
                if(topologyNameExists(conf, name)) {
                    throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                }
                submitJar(conf);
                try {
                    LOG.info("Submitting topology " +  name + " in distributed mode with conf " + serConf);
                    if(opts!=null) {
                        client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);                    
                    } else {  // this is for backwards compatibility
                        client.getClient().submitTopology(name, submittedJar, serConf, topology);                                            
                    }
                } finally {client.close();}
            }
            LOG.info("Finished submitting topology: " +  name);
    }
  1. LocalCluster
    本地模式, 全部進程都在同一臺機器. LocalCluster並非用java寫的, 而是LocalCluster.clj. Clojure代碼編譯後會生成LocalCluster.class
    storm-starter/storm/starter/WordCountTopology.java
LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("word-count", conf, builder.createTopology());

先來看看ILocalCluster接口. LocalCluster實現了ILocalCluster接口
storm-core/backtype/storm/ILocalCluster.javadom

public interface ILocalCluster {
    void submitTopology(String topologyName, Map conf, StormTopology topology);
    void submitTopologyWithOpts(String topologyName, Map conf, StormTopology topology, SubmitOptions submitOpts);
    void killTopology(String topologyName) throws NotAliveException;
    void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException;
    void activate(String topologyName) throws NotAliveException;
    void deactivate(String topologyName) throws NotAliveException;
    void rebalance(String name, RebalanceOptions options) throws NotAliveException;
    void shutdown();
    String getTopologyConf(String id);
    StormTopology getTopology(String id);
    ClusterSummary getClusterInfo();
    TopologyInfo getTopologyInfo(String id);
    Map getState();
}

這裏麪包括了操做集羣的動做: 提交計算拓撲, 殺死計算拓撲, 激活, 禁用, 負載均衡, 關閉集羣. 集羣的信息包括: 配置, 拓撲對象, 概要, 拓撲信息, 狀態.
1) init
storm-core/backtype/storm/LocalCluster.clj (注: .clj文件在storm-core/src/clj下)jvm

(ns backtype.storm.LocalCluster
  (:use [backtype.storm testing config])            ;; 使用backtype/storm/testing.clj 和 config.clj
  (:import [java.util Map])
  (:gen-class       
   :init init
   :implements [backtype.storm.ILocalCluster]   ;; 接口爲ILocalCluster
   :constructors {[] [] [java.util.Map] []}
   :state state ))                              ;; init()的返回值會賦值給state.

:gen-class提早編譯產生class. http://clojuredocs.org/clojure_core/clojure.core/gen-class
:init name If supplied, names a function that will be called with the arguments to the constructor. Must return [ [superclass-constructor-args] state]
init方法的參數會做爲構造函數的參數, 而且init方法也會被構造函數調用. 顧名思義init初始化操做. 返回值是一個向量,第一個爲父類的參數類型ide

:constructors {[param-types] [super-param-types], ...} By default, constructors are created for the generated class which match the signature(s) of the constructors for the superclass. This parameter may be used to explicitly specify constructors, each entry providing a mapping from a constructor signature to a superclass constructor signature. When you supply this, you must supply an :init specifier. 參數類型的每一個條目(多種狀況下)分別是{[子類] [父類]}.
LocalCluster implements ILocalCluster, 有接口, 可是沒有父類. 因此super-param-types=[]
前面init的參數會做爲構造函數的參數. 而init的參數有兩種狀況[]或者[Map]. 因此{[] [] [Map] []} 第一個和第三個爲LocalCluster, 第二四爲父類都爲[].

:state name If supplied, a public final instance field with the given name will be created. You must supply an :init function in order to provide a value for the state. 即便用:state, 必須提供init方法, init要給:state賦值. 這樣後面纔可使用state變量(final)
init()方法若是接受一個Map參數, 就直接返回. 若是沒有參數, 則調用mk-local-storm-cluster返回一個Map. 若是是java, 則要寫兩個同名方法, 參數不一樣.

(defn -init
  ([]               ;; 第一種狀況參數是[]. 相似init()
     (let [ret (mk-local-storm-cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})]
       [[] ret]     ;; 返回值必須是 [ [superclass-constructor-args] state] 父類的參數類型都是[].  因此開頭的:state state會被賦值爲ret!
       ))
  ([^Map stateMap]   ;; 第二種狀況參數是[Map], 能夠理解是參數個數不一樣的構造函數重載, 相似init(Map)
     [[] stateMap]))

init方法調用testing.clj的defnk mk-local-storm-cluster. 其中defnk 和普通defn的不一樣是,
能夠在參數裏面使用k,v, 而且能夠在函數體中直接使用k來獲得value
其實它的實現就是增長一個hashmap來存放這些k,v. [ Storm分析中使人費解的Clojure語法(http://www.xuebuyuan.com/418278.html) ]
因此(mk-local-storm-cluster key value)中的key=:daemon-conf, value={TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}
value又是一個map. 由於daemon-conf表示進程配置, 可能有多個配置項. 具體到testing.clj中的方法實現, 又給出了多個默認的配置:

storm-core/backtype/storm/testing.clj

;; returns map containing cluster info 返回包含集羣信息的map
;; local dir is always overridden in maps
;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
;; if need to customize amt of ports more, can use add-supervisor calls afterwards
;; LocalCluster.clj的init會設置:daemon-conf的值. 而若是沒有給值, 則:daemon-conf默認爲空的Map: {}.  這裏還給出了其餘默認值
;; 好比本機模式有2個supervisor(固然只有一個nimbus). 每一個supervisor有3個端口,其中最小的端口號爲1024, 其餘分別爲1025,1026
(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024]
  (let [zk-tmp (local-temp-path)
        [zk-port zk-handle] (zk/mk-inprocess-zookeeper zk-tmp)                  ;; let中的模式匹配, zk/mk返回返回值第一個賦值給port,第二個給handle
        daemon-conf (merge (read-storm-config)                              ;; merge map1 map2 ...
                           {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
                            ZMQ-LINGER-MILLIS 0
                            TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false
                            TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50
                            }
                           daemon-conf      ;; defnk 函數參數的key value中的:daemon-conf {}. daemon-conf就表示包含這個kv的Map
                           {STORM-CLUSTER-MODE "local"
                            STORM-ZOOKEEPER-PORT zk-port
                            STORM-ZOOKEEPER-SERVERS ["localhost"]})
        nimbus-tmp (local-temp-path)
        port-counter (mk-counter supervisor-slot-port-min)                      ;; 參數的:key 能夠做爲字符串形式的參數
        nimbus (nimbus/service-handler                                      ;; nimbus服務處理, 這是一個服務器的方法
                (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp)          ;; assoc map key value
                (if inimbus inimbus (nimbus/standalone-nimbus)))            ;; 若是inimbus=nil,則建立一個標準的nimbus,最後一句爲返回值
        context (mk-shared-context daemon-conf)
        cluster-map {:nimbus nimbus                                     ;; 一個nimbus
                     :port-counter port-counter
                     :daemon-conf daemon-conf
                     :supervisors (atom [])                                 ;; 初始化, 原子vector
                     :state (mk-distributed-cluster-state daemon-conf)          ;; 這個值應該就是LocalCluster.clj中:state的值了
                     :storm-cluster-state (mk-storm-cluster-state daemon-conf)
                     :tmp-dirs (atom [nimbus-tmp zk-tmp])
                     :zookeeper zk-handle                               ;; zookeeper對象
                     :shared-context context}
        supervisor-confs (if (sequential? supervisors)                          ;; list和vector都是sequential?. 而只有list是seq  
                           supervisors
                           (repeat supervisors {}))]                        ;; 若是不是一個sequential, 則設置爲Map類型
    (doseq [sc supervisor-confs]                                            ;; 賦值操做, 相似於let中的賦值
      (add-supervisor cluster-map :ports ports-per-supervisor :conf sc))            ;; 根據supervisors配置建立supervisor,並更新到cluster-map中
    cluster-map                                                     ;; 返回值, 爲上面的{:key value ... }
    ))

add-supervisor仍然是一個defnk函數.
storm-core/backtype/storm/testing.clj

(defnk add-supervisor [cluster-map :ports 2 :conf {} :id nil]
  (let [tmp-dir (local-temp-path)
        port-ids (if (sequential? ports) ports (doall (repeatedly ports (:port-counter cluster-map))))
        supervisor-conf (merge (:daemon-conf cluster-map)
                               conf
                               {STORM-LOCAL-DIR tmp-dir
                                SUPERVISOR-SLOTS-PORTS port-ids})
        id-fn (if id (fn [] id) supervisor/generate-supervisor-id)                  ;; UUID的生成方式
        daemon (with-var-roots [supervisor/generate-supervisor-id id-fn] (supervisor/mk-supervisor supervisor-conf (:shared-context cluster-map) (supervisor/standalone-supervisor)))]
    (swap! (:supervisors cluster-map) conj daemon)                          ;; 更新(swap!)cluster-map中:supervisor的值 
    (swap! (:tmp-dirs cluster-map) conj tmp-dir)
    daemon                                                      ;; dameon爲body的返回值即mk-supervisor返回的supervisor對象     
    ))

with-var-roots第一個參數爲bindings, 把剩餘的(&)都做爲body. 在函數體內建立了supervisor. 和上面的nimbus/service-handler都是defserverfn服務器方法
body:(supervisor/mk-supervisor supervisor-conf (:shared-context cluster-map) (supervisor/standalone-supervisor)) 要求傳遞三個參數.最終返回supervisor

2) submitTopology
回到WordCountTopology的LocalCluster.submitTopology主線上
storm-core/backtype/storm/LocalCluster.clj

(defn -submitTopology [this name conf topology]         ;; 第一個參數表示LocalCluster
  ;; (:nimbus map)獲取map中:nimbus的值. 咱們知道init的返回值是個Map, 其中包括了:nimbus這個key. 因此state就是ret!
  (submit-local-topology (:nimbus (. this state))       ;; :nimbus是init返回值ret這個Map中key=:nimbus的Nimbus對象
                      name conf topology))          ;; 和StormSubmitter.submitTopology的參數同樣: 計算拓撲的名字, 配置, StormTopology

以前覺得開頭部分的聲明:state state是init()返回值Map中的:state的值(這個方法確實有這個key=:state).
而到這裏(:nimbus (. this state)). this.state獲取實例變量state若是是某個value, 就和(:key map)這個語法衝突了. 因此開頭的:state是init()的返回值!
clojure中的HashMap的定義方式是map = {:key value}. 獲取key的方式是: value = (:key map).

storm-core/backtype/storm/testing.clj

(defn submit-local-topology [nimbus storm-name conf topology]
  (when-not (Utils/isValidConf conf)                    ;; 驗證配置有效性, 和StormSubmmiter.submitTopology版本的差很少同樣的流程   
    (throw (IllegalArgumentException. "Topology conf is not json-serializable")))
  ;; 一樣調用nimbus的submitTopology, 這裏由於是本機, 因此不須要上傳. 配置也要轉成json格式
  (.submitTopology nimbus storm-name nil (to-json conf) topology))

看看clojure的to-json方法, 其實跟前面StormSubmitter.submitTopology的String serConf = JSONValue.toJSONString(stormConf); 是同樣的.
storm-core/backtype/storm/util.clj

(defn to-json [obj]                 
  (JSONValue/toJSONString obj))                 ;; 調用JSONValue的靜態方法toJSONString
相關文章
相關標籤/搜索