storm jar storm-starter.jar storm.starter.WordCountTopology
這個storm命令是用python實現的, 看看其中的jar函數, 很簡單, 調用exec_storm_class, 其中jvmtype="-client"
而exec_storm_class其實就是拼出一條java執行命令, 而後用os.system(command)去執行, 爲什麼用Python寫, 簡單? 能夠直接使用storm命令?
這兒的klass就是topology類, 因此java命令只是調用Topology類的main函數
/bin/stormhtml
def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False): global CONFFILE all_args = [ "java", jvmtype, get_config_opts(), "-Dstorm.home=" + STORM_DIR, "-Djava.library.path=" + confvalue("java.library.path", extrajars), "-Dstorm.conf.file=" + CONFFILE, "-cp", get_classpath(extrajars), ] + jvmopts + [klass] + list(args) print "Running: " + " ".join(all_args) if fork: os.spawnvp(os.P_WAIT, "java", all_args) else: os.execvp("java", all_args) # replaces the current process and never returns def jar(jarfile, klass, *args): """Syntax: [storm jar topology-jar-path class ...] Runs the main method of class with the specified arguments. 運行類的main方法而且跟上指定的參數 The storm jars and configs in ~/.storm are put on the classpath. Storm運行時的jar包和配置文件會被加載到類路徑中 The process is configured so that StormSubmitter 當計算拓撲被提交後, jar包會被上傳 will upload the jar at topology-jar-path when the topology is submitted. """ exec_storm_class( klass, jvmtype="-client", extrajars=[jarfile, USER_CONF_DIR, STORM_DIR + "/bin"], args=args, jvmopts=[' '.join(filter(None, [JAR_JVM_OPTS, "-Dstorm.jar=" + jarfile]))])
執行上面的storm jar命令: storm jar storm-starter.jar storm.starter.WordCountTopology
jarfile = storm-starter.jar, klass = storm.starter.WordCountTopology, -Dstorm.jar = storm-starter.jarjava
直接看看WordCountTopology例子的main函數都執行什麼?
除了定義topology, 最終會調用StormSubmitter.submitTopology(args[0], conf, builder.createTopology()), 來提交topology
storm-starter/storm/starter/WordCountTopology.javapython
public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } }
RPC分爲客戶端和服務端, 在客戶端要持有服務端的接口引用. 調用這個引用就會調用到服務端該接口的具體實現類的同名方法.
因此客戶端代碼使用Nimbus.Iface localNimbus引用, 至關於持有了一個服務器Nimbus的接口. 只要調用這個接口的方法便可.
storm-core/backtype/storm/utils/NimbusClient.java (注: .java文件在storm-core/src/jvm下)json
public class NimbusClient extends ThriftClient { private Nimbus.Client _client; public static NimbusClient getConfiguredClient(Map conf) { String nimbusHost = (String) conf.get(Config.NIMBUS_HOST); int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)); return new NimbusClient(conf, nimbusHost, nimbusPort); } public NimbusClient(Map conf, String host, int port) throws TTransportException { this(conf, host, port, null); } public NimbusClient(Map conf, String host, int port, Integer timeout) throws TTransportException { super(conf, host, port, timeout); _client = new Nimbus.Client(_protocol); } public Nimbus.Client getClient() { return _client; } }
客戶端要持有NimbusClient, 要知道服務端Nimbus的主機和thrfit端口號,就能夠經過RPC調用Nimbus的方法.
2) Submit Jar
StormSubmitter的本質是個Thrift Client, 而Nimbus則是Thrift Server, 因此全部的操做都是經過Thrift RPC來完成,
先判斷topologyNameExists, 經過Thrift client獲得如今運行的topology的情況, 並check;而後Submit Jar, 經過底下三步服務器
client.getClient().beginFileUpload(); client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit)); client.getClient().finishFileUpload(uploadLocation);
把數據經過RPC發過去, 具體怎麼存是nimbus本身的邏輯的事...
storm-core/backtype/storm/StormSubmitter.javaapp
private static String submittedJar = null; private static void submitJar(Map conf, ProgressListener listener) { if(submittedJar==null) { LOG.info("Jar not uploaded to master yet. Submitting jar..."); String localJar = System.getProperty("storm.jar"); // 即storm jar xxx.jar的jar包 submittedJar = submitJar(conf, localJar, listener); } } public static String submitJar(Map conf, String localJar) { NimbusClient client = NimbusClient.getConfiguredClient(conf); try { String uploadLocation = client.getClient().beginFileUpload(); LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation); BufferFileInputStream is = new BufferFileInputStream(localJar); while(true) { byte[] toSubmit = is.read(); if(toSubmit.length==0) break; client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit)); } client.getClient().finishFileUpload(uploadLocation); LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation); return uploadLocation; } finally {client.close();} }
3) Submit Topology 只是簡單的調用RPC
storm-core/backtype/storm/StormSubmitter.java負載均衡
private static Nimbus.Iface localNimbus = null; // 做爲RPC的服務端 /** * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. 提交一個計算拓撲以便運行在集羣上 * @param name the name of the storm. 計算拓撲的名字,即命令行參數的第一個參數 * @param stormConf the topology-specific configuration. See {@link Config}. 計算拓撲指定的配置,Config繼承HashMap * @param topology the processing to execute. * @param options to manipulate the starting of the topology */ public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) { if(!Utils.isValidConf(stormConf)) { throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable"); } stormConf = new HashMap(stormConf); // Topology的Config設置 stormConf.putAll(Utils.readCommandLineOpts()); // 命令行參數-Dstorm.options=,見bin/storm的get_config_opts() Map conf = Utils.readStormConfig(); // storm.yaml conf.putAll(stormConf); // stormConf的配置會覆蓋storm.yaml的配置 String serConf = JSONValue.toJSONString(stormConf); // Map轉爲json,序列化,要發送到Server if(localNimbus!=null) { // 本地模式(運行全部的nimbus, supervisor...) LOG.info("Submitting topology " + name + " in local mode"); localNimbus.submitTopology(name, null, serConf, topology); } else { NimbusClient client = NimbusClient.getConfiguredClient(conf); if(topologyNameExists(conf, name)) { throw new RuntimeException("Topology with name `" + name + "` already exists on cluster"); } submitJar(conf); try { LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf); if(opts!=null) { client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts); } else { // this is for backwards compatibility client.getClient().submitTopology(name, submittedJar, serConf, topology); } } finally {client.close();} } LOG.info("Finished submitting topology: " + name); }
LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology());
先來看看ILocalCluster接口. LocalCluster實現了ILocalCluster接口
storm-core/backtype/storm/ILocalCluster.javadom
public interface ILocalCluster { void submitTopology(String topologyName, Map conf, StormTopology topology); void submitTopologyWithOpts(String topologyName, Map conf, StormTopology topology, SubmitOptions submitOpts); void killTopology(String topologyName) throws NotAliveException; void killTopologyWithOpts(String name, KillOptions options) throws NotAliveException; void activate(String topologyName) throws NotAliveException; void deactivate(String topologyName) throws NotAliveException; void rebalance(String name, RebalanceOptions options) throws NotAliveException; void shutdown(); String getTopologyConf(String id); StormTopology getTopology(String id); ClusterSummary getClusterInfo(); TopologyInfo getTopologyInfo(String id); Map getState(); }
這裏麪包括了操做集羣的動做: 提交計算拓撲, 殺死計算拓撲, 激活, 禁用, 負載均衡, 關閉集羣. 集羣的信息包括: 配置, 拓撲對象, 概要, 拓撲信息, 狀態.
1) init
storm-core/backtype/storm/LocalCluster.clj (注: .clj文件在storm-core/src/clj下)jvm
(ns backtype.storm.LocalCluster (:use [backtype.storm testing config]) ;; 使用backtype/storm/testing.clj 和 config.clj (:import [java.util Map]) (:gen-class :init init :implements [backtype.storm.ILocalCluster] ;; 接口爲ILocalCluster :constructors {[] [] [java.util.Map] []} :state state )) ;; init()的返回值會賦值給state.
:gen-class提早編譯產生class. http://clojuredocs.org/clojure_core/clojure.core/gen-class
:init name If supplied, names a function that will be called with the arguments to the constructor. Must return [ [superclass-constructor-args] state]
init方法的參數會做爲構造函數的參數, 而且init方法也會被構造函數調用. 顧名思義init初始化操做. 返回值是一個向量,第一個爲父類的參數類型ide
:constructors {[param-types] [super-param-types], ...} By default, constructors are created for the generated class which match the signature(s) of the constructors for the superclass. This parameter may be used to explicitly specify constructors, each entry providing a mapping from a constructor signature to a superclass constructor signature. When you supply this, you must supply an :init specifier. 參數類型的每一個條目(多種狀況下)分別是{[子類] [父類]}.
LocalCluster implements ILocalCluster, 有接口, 可是沒有父類. 因此super-param-types=[]
前面init的參數會做爲構造函數的參數. 而init的參數有兩種狀況[]或者[Map]. 因此{[] [] [Map] []} 第一個和第三個爲LocalCluster, 第二四爲父類都爲[].
:state name If supplied, a public final instance field with the given name will be created. You must supply an :init function in order to provide a value for the state. 即便用:state, 必須提供init方法, init要給:state賦值. 這樣後面纔可使用state變量(final)
init()方法若是接受一個Map參數, 就直接返回. 若是沒有參數, 則調用mk-local-storm-cluster返回一個Map. 若是是java, 則要寫兩個同名方法, 參數不一樣.
(defn -init ([] ;; 第一種狀況參數是[]. 相似init() (let [ret (mk-local-storm-cluster :daemon-conf {TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})] [[] ret] ;; 返回值必須是 [ [superclass-constructor-args] state] 父類的參數類型都是[]. 因此開頭的:state state會被賦值爲ret! )) ([^Map stateMap] ;; 第二種狀況參數是[Map], 能夠理解是參數個數不一樣的構造函數重載, 相似init(Map) [[] stateMap]))
init方法調用testing.clj的defnk mk-local-storm-cluster. 其中defnk 和普通defn的不一樣是,
能夠在參數裏面使用k,v, 而且能夠在函數體中直接使用k來獲得value
其實它的實現就是增長一個hashmap來存放這些k,v. [ Storm分析中使人費解的Clojure語法(http://www.xuebuyuan.com/418278.html) ]
因此(mk-local-storm-cluster key value)中的key=:daemon-conf, value={TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true}
value又是一個map. 由於daemon-conf表示進程配置, 可能有多個配置項. 具體到testing.clj中的方法實現, 又給出了多個默認的配置:
storm-core/backtype/storm/testing.clj
;; returns map containing cluster info 返回包含集羣信息的map ;; local dir is always overridden in maps ;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter ;; if need to customize amt of ports more, can use add-supervisor calls afterwards ;; LocalCluster.clj的init會設置:daemon-conf的值. 而若是沒有給值, 則:daemon-conf默認爲空的Map: {}. 這裏還給出了其餘默認值 ;; 好比本機模式有2個supervisor(固然只有一個nimbus). 每一個supervisor有3個端口,其中最小的端口號爲1024, 其餘分別爲1025,1026 (defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024] (let [zk-tmp (local-temp-path) [zk-port zk-handle] (zk/mk-inprocess-zookeeper zk-tmp) ;; let中的模式匹配, zk/mk返回返回值第一個賦值給port,第二個給handle daemon-conf (merge (read-storm-config) ;; merge map1 map2 ... {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true ZMQ-LINGER-MILLIS 0 TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS false TOPOLOGY-TRIDENT-BATCH-EMIT-INTERVAL-MILLIS 50 } daemon-conf ;; defnk 函數參數的key value中的:daemon-conf {}. daemon-conf就表示包含這個kv的Map {STORM-CLUSTER-MODE "local" STORM-ZOOKEEPER-PORT zk-port STORM-ZOOKEEPER-SERVERS ["localhost"]}) nimbus-tmp (local-temp-path) port-counter (mk-counter supervisor-slot-port-min) ;; 參數的:key 能夠做爲字符串形式的參數 nimbus (nimbus/service-handler ;; nimbus服務處理, 這是一個服務器的方法 (assoc daemon-conf STORM-LOCAL-DIR nimbus-tmp) ;; assoc map key value (if inimbus inimbus (nimbus/standalone-nimbus))) ;; 若是inimbus=nil,則建立一個標準的nimbus,最後一句爲返回值 context (mk-shared-context daemon-conf) cluster-map {:nimbus nimbus ;; 一個nimbus :port-counter port-counter :daemon-conf daemon-conf :supervisors (atom []) ;; 初始化, 原子vector :state (mk-distributed-cluster-state daemon-conf) ;; 這個值應該就是LocalCluster.clj中:state的值了 :storm-cluster-state (mk-storm-cluster-state daemon-conf) :tmp-dirs (atom [nimbus-tmp zk-tmp]) :zookeeper zk-handle ;; zookeeper對象 :shared-context context} supervisor-confs (if (sequential? supervisors) ;; list和vector都是sequential?. 而只有list是seq supervisors (repeat supervisors {}))] ;; 若是不是一個sequential, 則設置爲Map類型 (doseq [sc supervisor-confs] ;; 賦值操做, 相似於let中的賦值 (add-supervisor cluster-map :ports ports-per-supervisor :conf sc)) ;; 根據supervisors配置建立supervisor,並更新到cluster-map中 cluster-map ;; 返回值, 爲上面的{:key value ... } ))
add-supervisor仍然是一個defnk函數.
storm-core/backtype/storm/testing.clj
(defnk add-supervisor [cluster-map :ports 2 :conf {} :id nil] (let [tmp-dir (local-temp-path) port-ids (if (sequential? ports) ports (doall (repeatedly ports (:port-counter cluster-map)))) supervisor-conf (merge (:daemon-conf cluster-map) conf {STORM-LOCAL-DIR tmp-dir SUPERVISOR-SLOTS-PORTS port-ids}) id-fn (if id (fn [] id) supervisor/generate-supervisor-id) ;; UUID的生成方式 daemon (with-var-roots [supervisor/generate-supervisor-id id-fn] (supervisor/mk-supervisor supervisor-conf (:shared-context cluster-map) (supervisor/standalone-supervisor)))] (swap! (:supervisors cluster-map) conj daemon) ;; 更新(swap!)cluster-map中:supervisor的值 (swap! (:tmp-dirs cluster-map) conj tmp-dir) daemon ;; dameon爲body的返回值即mk-supervisor返回的supervisor對象 ))
with-var-roots第一個參數爲bindings, 把剩餘的(&)都做爲body. 在函數體內建立了supervisor. 和上面的nimbus/service-handler都是defserverfn服務器方法
body:(supervisor/mk-supervisor supervisor-conf (:shared-context cluster-map) (supervisor/standalone-supervisor)) 要求傳遞三個參數.最終返回supervisor
2) submitTopology
回到WordCountTopology的LocalCluster.submitTopology主線上
storm-core/backtype/storm/LocalCluster.clj
(defn -submitTopology [this name conf topology] ;; 第一個參數表示LocalCluster ;; (:nimbus map)獲取map中:nimbus的值. 咱們知道init的返回值是個Map, 其中包括了:nimbus這個key. 因此state就是ret! (submit-local-topology (:nimbus (. this state)) ;; :nimbus是init返回值ret這個Map中key=:nimbus的Nimbus對象 name conf topology)) ;; 和StormSubmitter.submitTopology的參數同樣: 計算拓撲的名字, 配置, StormTopology
以前覺得開頭部分的聲明:state state是init()返回值Map中的:state的值(這個方法確實有這個key=:state).
而到這裏(:nimbus (. this state)). this.state獲取實例變量state若是是某個value, 就和(:key map)這個語法衝突了. 因此開頭的:state是init()的返回值!
clojure中的HashMap的定義方式是map = {:key value}. 獲取key的方式是: value = (:key map).
storm-core/backtype/storm/testing.clj
(defn submit-local-topology [nimbus storm-name conf topology] (when-not (Utils/isValidConf conf) ;; 驗證配置有效性, 和StormSubmmiter.submitTopology版本的差很少同樣的流程 (throw (IllegalArgumentException. "Topology conf is not json-serializable"))) ;; 一樣調用nimbus的submitTopology, 這裏由於是本機, 因此不須要上傳. 配置也要轉成json格式 (.submitTopology nimbus storm-name nil (to-json conf) topology))
看看clojure的to-json方法, 其實跟前面StormSubmitter.submitTopology的String serConf = JSONValue.toJSONString(stormConf); 是同樣的.
storm-core/backtype/storm/util.clj
(defn to-json [obj] (JSONValue/toJSONString obj)) ;; 調用JSONValue的靜態方法toJSONString