寫在前面的話,筆者第一次閱讀框架源碼,因此可能有些地方理解錯誤或者沒有詳細解釋,若是在閱讀過程發現錯誤很歡迎在文章下面評論指出。文章後續會陸續更新,能夠關注或者收藏,轉發請先私信我,謝謝。對了,筆者看的是2.2.1這個版本緩存
JStorm是一個分佈式的實時計算引擎,是阿里巴巴根據storm的流處理模型進行重寫的一個框架,支持相同的邏輯模型(也就是拓撲結構),而後底層的實現卻大有不一樣。不過本文並非打算對兩個框架進行比較,接下來我會從源碼的角度上來解析JStorm是如何工做的。
做爲第一個篇章,筆者先來介紹下nimbus以及它啓動的時候作了什麼。JStorm的主節點上運行着nimbus的守護進程,這個進程主要負責與ZK通訊,分發代碼,給集羣中的從節點分配任務,監視集羣狀態等等。此外nimbus須要維護的全部狀態都會存儲在ZK中,JStorm爲了減小對ZK的訪問次數作了一些緩存,這個後續代碼分析會說到。以上是nimbus功能的簡介,接下來咱們從源碼的角度看看Nimbus到底作了什麼。首先在Nimbus啓動的時候:框架
//設置主線程因爲未捕獲異常而忽然停止時調用的默認程序 Thread.setDefaultUncaughtExceptionHandler(new DefaultUncaughtExceptionHandler()); //加載集羣的配置信息 Map config = Utils.readStormConfig(); //這下面這個方法內部註釋掉了,筆者暫時沒有太在乎,後續再補充 JStormServerUtils.startTaobaoJvmMonitor(); //建立一個NimbusServer實例 NimbusServer instance = new NimbusServer(); //建立一個默認的nimbus啓動類 INimbus iNimbus = new DefaultInimbus(); //開始進行實際的初始化 instance.launchServer(config, iNimbus);
其實在DefaultUncaughtExceptionHandler
中也並無太多的處理操做,簡單判斷是不是內存溢出,而後正常關閉,不然就是異常直接拋出而後中斷。讀取配置的過程就不詳細講解了。NimbusServer
這個類主要封裝了一些用於操做Nimbus的成員變量和方法,Nimbus的啓動操做基本都是定義在這個類內的(上述代碼就是這個類中的main方法所定義的)。
最重要的方法是launchServer
,接下來就詳細的解說這個方法的做用,首先來看下launchServer
這個方法內部的代碼:tcp
private void launchServer(final Map conf, INimbus inimbus) { LOG.info("Begin to start nimbus with conf " + conf); try { //判斷配置模式是否正確 StormConfig.validate_distributed_mode(conf); createPid(conf); //設置退出時的操做 initShutdownHook(); //這個方法在默認實現中沒有任何操做 inimbus.prepare(conf, StormConfig.masterInimbus(conf)); //建立NimbusData對象 data = createNimbusData(conf, inimbus); //這個方法主要負責處理當nimbus線程稱爲leader線程以後的操做 initFollowerThread(conf); int port = ConfigExtension.getNimbusDeamonHttpserverPort(conf); hs = new Httpserver(port, conf); hs.start(); //若是集羣是運行在yarn上,也須要作一些初始化操做。 initContainerHBThread(conf); serviceHandler = new ServiceHandler(data); //thrift是一個分佈式的RPC框架 initThrift(conf); } catch (Throwable e) { if (e instanceof OutOfMemoryError) { LOG.error("Halting due to Out Of Memory Error..."); } LOG.error("Fail to run nimbus ", e); } finally { cleanup(); } LOG.info("Quit nimbus"); }
只是判斷配置信息中的一個字段名爲「storm.cluster.mode」是不是「distributed」,本地模式下是「local」。分佈式
initShutdownHook
添加退出的時候一些操做,包括設置參數提醒集羣要退出,清除nimbus存儲下的一些工做線程(負責處理通訊,分發代碼,心跳的一系列守護線程),關閉打開的各類資源等。ide
createNimbusData
這個方法用於建立一個NimbusData
的對象,這個對象封裝了Nimbus與ZK通訊的一些成員變量。下面會在每一個方法內部逐漸講到NimbusData
的一些成員變量以及他們的做用。首先來看看NimbusData
的構造方法。函數
public NimbusData(final Map conf, INimbus inimbus) throws Exception { this.conf = conf; //兩個方法分別處理打開的文件流和blob傳輸流 createFileHandler(); mkBlobCacheMap(); this.nimbusHostPortInfo = NimbusInfo.fromConf(conf); this.blobStore = BlobStoreUtils.getNimbusBlobStore(conf, nimbusHostPortInfo); this.isLaunchedCleaner = false; this.isLaunchedMonitor = false; this.submittedCount = new AtomicInteger(0); this.stormClusterState = Cluster.mk_storm_cluster_state(conf); createCache(); this.taskHeartbeatsCache = new ConcurrentHashMap<String, Map<Integer, TkHbCacheTime>>(); //建立一個調度線程池,默認大小爲12 this.scheduExec = Executors.newScheduledThreadPool(SCHEDULE_THREAD_NUM); this.statusTransition = new StatusTransition(this); this.startTime = TimeUtils.current_time_secs(); this.inimubs = inimbus; localMode = StormConfig.local_mode(conf); this.metricCache = new JStormMetricCache(conf, this.stormClusterState); this.clusterName = ConfigExtension.getClusterName(conf); pendingSubmitTopologies = new TimeCacheMap<String, Object>(JStormUtils.MIN_10); topologyTaskTimeout = new ConcurrentHashMap<String, Integer>(); tasksHeartbeat = new ConcurrentHashMap<String, TopologyTaskHbInfo>(); this.metricsReporter = new JStormMetricsReporter(this); this.metricRunnable = ClusterMetricsRunnable.mkInstance(this); String configUpdateHandlerClass = ConfigExtension.getNimbusConfigUpdateHandlerClass(conf); this.configUpdateHandler = (ConfigUpdateHandler) Utils.newInstance(configUpdateHandlerClass); if (conf.containsKey(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN)) { String string = (String) conf.get(Config.NIMBUS_TOPOLOGY_ACTION_NOTIFIER_PLUGIN); nimbusNotify = (ITopologyActionNotifierPlugin) Utils.newInstance(string); } else { nimbusNotify = null; } }
3.1 createFileHandler:在這方法內部,實現了一個匿名的內部類ExpiredCallback
,在其內部實現了一個方法叫expire
,利用回調的方式來關閉Channel
或者BufferFileInputStream
實例對象。ui
public void createFileHandler() { ExpiredCallback<Object, Object> expiredCallback = new ExpiredCallback<Object, Object>() { @Override public void expire(Object key, Object val) { try { LOG.info("Close file " + String.valueOf(key)); if (val != null) { if (val instanceof Channel) { Channel channel = (Channel) val; channel.close(); } else if (val instanceof BufferFileInputStream) { BufferFileInputStream is = (BufferFileInputStream) val; is.close(); } } } catch (IOException e) { LOG.error(e.getMessage(), e); } } }; //獲取超時時間 int file_copy_expiration_secs = JStormUtils.parseInt(conf.get(Config.NIMBUS_FILE_COPY_EXPIRATION_SECS), 30); uploaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs, expiredCallback); downloaders = new TimeCacheMap<Object, Object>(file_copy_expiration_secs, expiredCallback); }
而後初始化NimbusData
的兩個成員變量uploaders
和downloaders
,這兩個分別維護須要上傳的通道和須要下載的通道。TimeCacheMap
這個類的主要實現邏輯是在其構造函數內部啓動一個守護線程。首先建立一個緩衝區,只要系統不關閉,則在守護線程內部不斷的緩衝區獲取對象,在對象不爲空的狀況下調用回調函數的expire方法,並執行相應的操做,這裏具體傳進來的expire
方法是關閉Channel
或者BufferFileInputStream
。
3.2. mkBlobCacheMap
:和上述的方法很是相似,也是申明一個匿名內部類,而後初始化幾個成員變量。代碼幾乎和上個方法同樣就不浪費拌麪去貼了。這裏expire方法中要關閉的是兩個流AtomicOutputStream
和BufferInputStream
,blobUploaders
和blobDownloaders
分別存放着上傳和下載所打開的流。blobListers
存放上傳和下載的數據。
3.3. 初始化幾個成員變量,包括NimbusInfo(包含了主機名,端口和標誌是不是leader),BlobStore(用來存儲blob數據的,使用鍵值存儲,阿里提供了兩個不一樣的blob存儲方式,一種是本地文件系統存儲,一種的hdfs存儲,兩種方式的區別在於,因爲本地文件存儲並不能保證一致性,因此須要ZK介入來保證,這是JStorm的默認配置。若是使用hdfs來存儲,則不須要ZK介入,由於hdfs能保證一致性和正確性),StormClusterState(存儲整個集羣的狀態,這個是從ZK上獲取的),爲了不屢次向ZK通訊,還須要設置緩存信息,任務的心跳信息等等。
3.4. 初始化好metrics相關的報告線程和監聽線程。this
initFollowerThread
4.1. 方法首先初始化一個回調函數,這是當一個nimbus成爲leader以後就會調用的一個用於初始化一系列變量的方法,包括拓撲如何在集羣上分配,拓撲狀態更新,清除函數,還有監控線程等。後續會有新的篇章來介紹這個init方法,這裏先放這個方法的源碼。線程
private void init(Map conf) throws Exception { data.init(); NimbusUtils.cleanupCorruptTopologies(data); //拓撲分配 initTopologyAssign(); //狀態更新 initTopologyStatus(); //清除函數 initCleaner(conf); initMetricRunnable(); if (!data.isLocalMode()) { initMonitor(conf); //mkRefreshConfThread(data); } }
4.2. 初始化一個Runnable的子類,在構造方法中,首先判斷集羣並非使用本地模式,而後更新ZK上的節點信息(將nimbus註冊到ZK上)。而後經過ZK獲取集羣的狀態信息,畢竟nimbus是須要維護整個集羣的。緊接着判斷是否存在leader,兩次都沒法選舉出leader以後,則將ZK上的nimbus信息刪除並退出。若是blobstore使用的是本地文件模式(有本文模式還有hdfs模式兩種)還須要添加一個回調函數,這個回調函數執行的操做是,當這個nimbus不是leader的時候,對blob進行同步。此外還須要將那些active的blob存到ZK中,而將死掉的進行清除(緣由前文3.3也說到過,本地模式存儲沒法保證一致性,因此須要ZK進行維護,而hdfs自帶容錯機制,能保證數據的一致性)。
4.3. 設置該線程爲守護線程,並啓動這個線程。run方法首先判斷當前保存在ZK上的集羣中是否有leader,若是沒有則選舉當前nimbus爲leader線程。若是有了leader線程,則須要判斷是否跟當前的nimbus相同,若是不相同則中止當前的nimbus,畢竟已經有leader存在了。若是是相同的,則須要判斷本地的狀態中,若是尚未設置爲leader,代表當前nimbus尚未進行初始化,則先設置nimbus爲leader而後回調函數進行初始化,也就是調用init(conf)
方法。
獲取一個端口(默認的端口是7621)用於構建HttpServer
實例對象。能夠用於處理和接受tcp鏈接,啓動一個新的線程進行httpserver的監聽。(主要做用或者說在哪裏用到尚且不明確)。code
initContainerHBThread
這個方法的主要做用是得知是否能在資源管理器(yarn)上運行jstorm集羣,若是能夠的話,則須要建立一個新的線程用於處理。(其實這裏使用容器的目的是能夠在一個物理集羣上運行多個不同的邏輯集羣甚至多個JStorm集羣,能動態調整邏輯集羣分到的資源,此外,資源管理器能提供很是強的可擴展性)。容器線程會被添加到NimbusServer
中,後續使用到的時候再詳細講解。這個容器線程也是守護線程,且立刻就會啓動,這個線程的run方法裏面包含兩個處理:
6.1. handleWriteDir
:這個方法的主要做用是清除掉容器上的過時心跳信息,準確的說,若是JStorm集羣容器目錄下的心跳信息大於10,則須要清除(從最老的開始)。
6.2. handlReadDir
:這裏主要是用於維護本地是否能接受到集羣上的hb信息,若是屢次超時則要拋出異常。
initThrift
thrift是JStorm使用的一個分佈式RPC框架。筆者後續再添加相應的源碼解析。