記錄了Topology的基本信息, 包含StormTopology, StormConf
已經從他們推導出的, task和component, component的streams, input/output信息java
public class GeneralTopologyContext implements JSONAware { private StormTopology _topology; private Map<Integer, String> _taskToComponent; private Map<String, List<Integer>> _componentToTasks; private Map<String, Map<String, Fields>> _componentToStreamToFields; //ComponentCommon.streams, map<string, StreamInfo> private String _stormId; ;;topology id protected Map _stormConf; }
StormTopology, worker從磁盤stormcode.ser
中讀出python
struct StormTopology { //ids must be unique across maps // #workers to use is in conf 1: required map<string, SpoutSpec> spouts; 2: required map<string, Bolt> bolts; 3: required map<string, StateSpoutSpec> state_spouts; }
StormConf, worker從磁盤stormconf.ser
中讀出json
taskToComponent, componentToTasks, task和component的對應關係ide
componentToStreamToFields, component包含哪些streams, 每一個stream包含哪些fields
函數
除了顯而易見的操做之外, 還有以下操做以得到component的輸入和輸出ui
/** * Gets the declared inputs to the specified component. * * @return A map from subscribed component/stream to the grouping subscribed with. */ public Map<GlobalStreamId, Grouping> getSources(String componentId) { return getComponentCommon(componentId).get_inputs(); //ComponentCommon.inputs,map<GlobalStreamId, Grouping> }
/** * Gets information about who is consuming the outputs of the specified component, * and how. * * @return Map from stream id to component id to the Grouping used. */ public Map<String, Map<String, Grouping>> getTargets(String componentId) { Map<String, Map<String, Grouping>> ret = new HashMap<String, Map<String, Grouping>>(); for(String otherComponentId: getComponentIds()) { //對全部components的id Map<GlobalStreamId, Grouping> inputs = getComponentCommon(otherComponentId).get_inputs(); //取出component的inputs for(GlobalStreamId id: inputs.keySet()) { //對inputs裏面的每一個stream-id if(id.get_componentId().equals(componentId)) { //判斷stream的源component是不是該component Map<String, Grouping> curr = ret.get(id.get_streamId()); if(curr==null) curr = new HashMap<String, Grouping>(); curr.put(otherComponentId, inputs.get(id)); ret.put(id.get_streamId(), curr); } } } return ret; // [steamid, [target-componentid, grouping]] }
這裏面的getComponentCommon和getComponentIds, 來自ThriftTopologyUtils類
不要誤解, 不是經過thriftAPI去nimbus獲取信息, 只是從StormTopology裏面讀信息, 而StormTopology類自己是generated by thrift
thrift產生的class, 是有metaDataMap的, 因此實現以下this
public static Set<String> getComponentIds(StormTopology topology) { Set<String> ret = new HashSet<String>(); for(StormTopology._Fields f: StormTopology.metaDataMap.keySet()) { Map<String, Object> componentMap = (Map<String, Object>) topology.getFieldValue(f); ret.addAll(componentMap.keySet()); } return ret; }
從storm.thrift裏面看看ComponentCommon的定義, 上面兩個函數就很好理解了
getTargets的實現, 須要看看, 由於是從inputs去推出outputs
由於在ComponentCommon只記錄了output的streamid以及fields, 但沒法知道這個stream發往哪一個component
但對於input, streamid是GlobalStreamId類型, GlobalStreamId裏面不但包含streamid,還有源component的componentid
因此從這個能夠反推, 只要源component是當前component, 那麼說明該component是源component的target componentspa
struct ComponentCommon { 1: required map<GlobalStreamId, Grouping> inputs; 2: required map<string, StreamInfo> streams; //key is stream id, outputs 3: optional i32 parallelism_hint; //how many threads across the cluster should be dedicated to this component 4: optional string json_conf; } struct SpoutSpec { 1: required ComponentObject spout_object; 2: required ComponentCommon common; // can force a spout to be non-distributed by overriding the component configuration // and setting TOPOLOGY_MAX_TASK_PARALLELISM to 1 } struct Bolt { 1: required ComponentObject bolt_object; 2: required ComponentCommon common; }
WorkerTopologyContext封裝了些worker相關信息code
public class WorkerTopologyContext extends GeneralTopologyContext { public static final String SHARED_EXECUTOR = "executor"; private Integer _workerPort; ;;worker進程的port private List<Integer> _workerTasks; ;;worker包含的taskids private String _codeDir; ;;supervisor上的代碼目錄, stormdist/stormid private String _pidDir; ;;記錄worker運行進程(可能多個)的pids的目錄,workid/pids Map<String, Object> _userResources; Map<String, Object> _defaultResources; }
看註釋, TopologyContext會做爲bolt和spout的prepare(or open)函數的參數
因此用openOrPrepareWasCalled, 表示該TopologyContext是否被prepare調用過component
registerMetric, 能夠用於往_registeredMetrics中註冊metics
註冊的結構, [timeBucketSizeInSecs, [taskId, [name, metric]]]
_hooks, 用於註冊task hook
/** * A TopologyContext is given to bolts and spouts in their "prepare" and "open" * methods, respectively. This object provides information about the component's * place within the topology, such as task ids, inputs and outputs, etc. * * <p>The TopologyContext is also used to declare ISubscribedState objects to * synchronize state with StateSpouts this object is subscribed to.</p> */ public class TopologyContext extends WorkerTopologyContext implements IMetricsContext { private Integer _taskId; private Map<String, Object> _taskData = new HashMap<String, Object>(); private List<ITaskHook> _hooks = new ArrayList<ITaskHook>(); private Map<String, Object> _executorData; private Map<Integer,Map<Integer, Map<String, IMetric>>> _registeredMetrics; private clojure.lang.Atom _openOrPrepareWasCalled;
public TopologyContext(StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, Map<String, Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources, Map<String, Object> userResources, Map<String, Object> executorData, Map registeredMetrics, clojure.lang.Atom openOrPrepareWasCalled) { super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId, codeDir, pidDir, workerPort, workerTasks, defaultResources, userResources); _taskId = taskId; _executorData = executorData; _registeredMetrics = registeredMetrics; _openOrPrepareWasCalled = openOrPrepareWasCalled; }
mk-task-data, 建立每一個task的topology context
user-context (user-topology-context (:worker executor-data) executor-data task-id)
(defn user-topology-context [worker executor-data tid] ((mk-topology-context-builder worker executor-data (:topology worker)) tid)) (defn mk-topology-context-builder [worker executor-data topology] (let [conf (:conf worker)] #(TopologyContext. topology (:storm-conf worker) (:task->component worker) (:component->sorted-tasks worker) (:component->stream->fields worker) (:storm-id worker) (supervisor-storm-resources-path (supervisor-stormdist-root conf (:storm-id worker))) (worker-pids-root conf (:worker-id worker)) (int %) (:port worker) (:task-ids worker) (:default-shared-resources worker) (:user-shared-resources worker) (:shared-executor-data executor-data) (:interval->task->metric-registry executor-data) (:open-or-prepare-was-called? executor-data))))