JStorm源碼分析系列--02--拓撲分配TopologyAssign

  寫在前面的話,筆者第一次閱讀框架源碼,因此可能有些地方理解錯誤或者沒有詳細解釋,若是在閱讀過程發現錯誤很歡迎在文章下面評論指出。文章後續會陸續更新,能夠關注或者收藏,轉發請先私信我,謝謝。對了,筆者看的是2.2.1這個版本。上一篇博客,JStorm源碼分析系列--01--Nimbus啓動分析筆者講解了Nimbus啓動過程當中作的一些基本的操做,在initFollowerThread方法中,若是當前的Nimbus變成Leader以後,這個方法內會負責執行一些初始化init操做。下面就來說講第一個初始化操做--拓撲分配。本文將詳細(很是長,因此慢慢看)的講解如何去爲一個拓撲分配相應的資源。
  從方法initTopologyAssign開始,TopologyAssign是一個單例對象,在這個類的init方法內,作了簡單的賦值操做以後,並初始化一個調度器實例對象以後,就創建一個守護線程,這個守護線程的目的是不斷從TopologyAssign內部維護的一個阻塞隊列中讀取系統提交的拓撲任務,並調用相應的方法doTopologyAssignment進行分配操做。代碼都比較簡單,就不浪費版面去貼了。
  下面是doTopologyAssignment方法的源碼,git

protected boolean doTopologyAssignment(TopologyAssignEvent event) {
        Assignment assignment;
        try {
            Assignment oldAssignment = null;
            boolean isReassign = event.isScratch();
            if (isReassign) {
                //若是存在舊的分配信息,須要先將舊的分配信息存儲下來
                oldAssignment = nimbusData.getStormClusterState().assignment_info(event.getTopologyId(), null);
            }
            //調用方法執行新的分配
            assignment = mkAssignment(event);
            //將task添加到集羣的metrics中
            pushTaskStartEvent(oldAssignment, assignment, event);

            if (!isReassign) {
                //若是是新建的拓撲,須要把拓撲設置爲active狀態
                setTopologyStatus(event);
            }
        } catch (Throwable e) {
            LOG.error("Failed to assign topology " + event.getTopologyId(), e);
            event.fail(e.getMessage());
            return false;
        }

        if (assignment != null)
            //將拓撲備份到ZK上
            backupAssignment(assignment, event);
        event.done();
        return true;
    }

  因此,最重要的方法仍是mkAssignment,這裏執行了實際的分配操做。下面就來詳細的介紹這個方法。github

prepareTopologyAssign

  prepareTopologyAssign這個方法整體的目的爲了初始化拓撲分配的上下文信息,生成一個TopologyAssignContext的實例對象。這個上下文對象須要存下拓撲的不少關鍵信息,包括拓撲的組件信息(用StormTopology對象保存,下文在添加acker的時候會詳細介紹這個類),拓撲的配置信息,拓撲上全部的task id,以及死掉的task id,unstopped task id(這裏的解釋是,那些supervisor死掉可是worker還繼續運行的稱爲unstopworker,而包含在unstopworker內的task則稱爲unstoppedTask)。以及這個拓撲能分配到的worker,以上說起的這些信息都會在這個方法內慢慢的初始化。下面一步步來看吧。prepareTopologyAssign方法的源碼比較長,一部分一部分來說解。編程

//建立一個上下文的實例對象
TopologyAssignContext ret = new TopologyAssignContext();

String topologyId = event.getTopologyId();
ret.setTopologyId(topologyId);

int topoMasterId = nimbusData.getTasksHeartbeat().get(topologyId).get_topologyMasterId();
ret.setTopologyMasterTaskId(topoMasterId);
LOG.info("prepareTopologyAssign, topoMasterId={}", topoMasterId);

Map<Object, Object> nimbusConf = nimbusData.getConf();
//根據拓撲id從nimbus上讀取拓撲的配置信息
Map<Object, Object> topologyConf = StormConfig.read_nimbus_topology_conf(topologyId, nimbusData.getBlobStore());
//這裏讀取拓撲中各個組件的一個結構,後續會講解這個類的組成
StormTopology rawTopology = StormConfig.read_nimbus_topology_code(topologyId, nimbusData.getBlobStore());
ret.setRawTopology(rawTopology);
//設置一些配置信息
Map stormConf = new HashMap();
stormConf.putAll(nimbusConf);
stormConf.putAll(topologyConf);
ret.setStormConf(stormConf);

  緊接着,根據目前集羣的狀態,初始化一份集羣上全部的supervisor,並獲取全部可用的workersegmentfault

StormClusterState stormClusterState = nimbusData.getStormClusterState();

// get all running supervisor, don't need callback to watch supervisor
Map<String, SupervisorInfo> supInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null);
// init all AvailableWorkerPorts
for (Entry<String, SupervisorInfo> supInfo : supInfos.entrySet()) {
     SupervisorInfo supervisor = supInfo.getValue();
     if (supervisor != null)
        //設置所有的端口都爲可用,後面經過HB去除掉那些已經被使用的worker
        //supervisor是一個k-v,k是supervisorid,v是保存實例信息
        supervisor.setAvailableWorkerPorts(supervisor.getWorkerPorts());
}
//這個方法就是利用HB去掉那些掛掉的supervisor
//判斷的方法是獲取每一個supervisor最近的HB時間,
//由當前時間減去最近HB時間和超時時間作對比。
getAliveSupervsByHb(supInfos, nimbusConf);

  接下來獲取拓撲中定義的taskid對應上組件,這裏要解釋下,對於一個拓撲而言,taskid老是從1開始分配的,而且,相同的組件taskid是相鄰的。好比你定義了一個SocketSpout(並行度5),一個PrintBolt(並行度4,那麼SocketSpout的taskid多是1-5,PrintBolt的taskid多是6-9。負載均衡

//這個k-v,k是taskid,v是拓撲內定義的組件的id。
//寫過應用的同窗都應該知道,TopologyBuilder在setSpout或者Bolt的時候,須要指定<組件id,對象,和並行度>。
//eg:builder.setSpout("integer", new ReceiverSpout(), 2);
Map<Integer, String> taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null);
ret.setTaskToComponent(taskToComponent);

//獲取全部的taskid。
Set<Integer> allTaskIds = taskToComponent.keySet();
ret.setAllTaskIds(allTaskIds);

  若是原來存在舊的拓撲分配信息,還須要設置unstoppedTasks,deadTasks,unstoppedWorkers等信息。而後調用getFreeSlots方法負責去除那些已經分配出去的worker。處理過程比較直觀,獲取集羣上全部的拓撲分配信息,而後根據每一個分配信息中保存的worker信息,從原先supInfos中移除那些被分配出去的worker。
  若是沒有舊的分配信息,說明拓撲分配類型爲ASSIGN_TYPE_NEW。若是存在同名的拓撲,也會把同名的拓撲設置舊的分配信息,放到上下文中。若是存在舊的分配信息,須要把舊的分配信息放入到上下文中,此外還要判斷是ASSIGN_TYPE_REBALANCE仍是ASSIGN_TYPE_MONITOR,由於還須要設置unstoppedWorkers的信息。到這裏,預分配,建立拓撲分配上下文就完成了。目前咱們帶有比較重要的信息是拓撲全部的taskid,以及拓撲基本的組件信息。框架

集羣assignTasks

  在完成拓撲上下文初始化以後,開始實際給拓撲分配相應的worker,不過這裏須要判斷是本地模式仍是集羣模式,本地模式下比較簡單,找個一個合適的端口,而後新建一個worker的資源對象ResourceWorkerSlot,將一些關鍵信息如hostname,port,allTaskId配置好。由於local模式下比較簡單,因此,即便設置多個worker也不會啓動多個jvm。而在集羣模式下,一個worker表示的是一個jvm進程。下面就重點講解集羣下的分配狀況。我把集羣上的分配過程(assignTasks這個方法)分紅三個主要的部分,分別是資源準備,worker分配,task分配。dom

Set<ResourceWorkerSlot> assignments = null;
if (!StormConfig.local_mode(nimbusData.getConf())) {
    IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME);
    //集羣下的分配,見下文講解
    assignments = scheduler.assignTasks(context);
} else {
    assignments = mkLocalAssignment(context);
}

資源準備

  首先第一步是判斷拓撲分配的類型是否符合要求,不符合則拋出異常。緊接着,根據上一個方法生成的拓撲分配上下文來生成一個默認的拓撲分配上下文實例對象,DefaultTopologyAssignContext這個類的構造方法執行了不少很細節的操做。包括爲拓撲添加附加的組件,存儲下taskid和組件的對應信息,計算拓撲須要的worker數目,計算unstopworker的數目等。jvm

//根據以前的上下文,初始化一個分配的上下文對象
DefaultTopologyAssignContext defaultContext = new DefaultTopologyAssignContext(context);
if (assignType == TopologyAssignContext.ASSIGN_TYPE_REBALANCE) {
    freeUsed(defaultContext);
}

  下面代碼是DefaultTopologyAssignContext的構造方法ide

public DefaultTopologyAssignContext(TopologyAssignContext context){
    super(context);
    try {
        sysTopology = Common.system_topology(stormConf, rawTopology);
    } catch (Exception e) {
        throw new FailedAssignTopologyException("Failed to generate system topology");
    }

    sidToHostname = generateSidToHost();
    hostToSid = JStormUtils.reverse_map(sidToHostname);

    if (oldAssignment != null && oldAssignment.getWorkers() != null) {
        oldWorkers = oldAssignment.getWorkers();
    } else {
        oldWorkers = new HashSet<ResourceWorkerSlot>();
    }

    refineDeadTasks();

    componentTasks = JStormUtils.reverse_map(context.getTaskToComponent());

    for (Entry<String, List<Integer>> entry : componentTasks.entrySet()) {
    List<Integer> componentTaskList = entry.getValue();
    Collections.sort(componentTaskList);
}

    totalWorkerNum = computeWorkerNum();
    unstoppedWorkerNum = computeUnstoppedAssignments();
}
添加附加組件

  從上面的代碼能夠看出在DefaultTopologyAssignContext的構造方法中,第一句是調用父類構造方法先去初始化一些參數,而後調用system_topology這個方法。下面來看看這個方法的內部。第一個方法就是添加一個acker到原來的拓撲中去。拓撲做爲JStrom處理的一個邏輯模型,對用戶提供了很是簡單且強大的編程原語,只要分別繼承兩大組件,就能夠構造一個拓撲模型,可是實際上,一個實際運行的拓撲模型遠遠不止用戶定義的用於處理輸入的spout和用於處理業務的bolt,JStorm爲了保證消息的可靠性,拓撲Metrics管理,拓撲HB管理,再拓撲實際模型中添加了幾個很是重要的bolt,下面就詳細的介紹acker,用於保證消息的可靠性。函數

public static StormTopology system_topology(Map storm_conf, StormTopology topology) throws InvalidTopologyException {
    StormTopology ret = topology.deepCopy();
    add_acker(storm_conf, ret);
    addTopologyMaster(storm_conf, ret);
    add_metrics_component(ret);
    add_system_components(ret);
    return ret;
}
StormTopology

  這裏先來介紹下StormTopology這個類,才能往下理解。StormTopology這個類用於存儲拓撲的組件信息,在這個類內部,有三個很是重要的成員變量,分別存儲spout和bolt以及state_spout,第三個筆者暫時沒有弄清楚其做用,可是前兩個就很是明顯,分別存儲拓撲的兩大組件,spout和bolt

private Map<String,SpoutSpec> spouts; // required
  private Map<String,Bolt> bolts; // required
  private Map<String,StateSpoutSpec> state_spouts; // required

  Map中的key表示咱們定義的組件的id,上文提到過的id。SpoutSpec和Bolt中有兩個重要的成員變量。

private ComponentObject spout_object; // required
  private ComponentCommon common; // required

  ComponentObject用於存儲序列化後的代碼信息,第二個ComponentCommon用於存儲很重要的配置信息,包括輸入的流,輸出的流和分組信息。有三個重要的成員變量

//GlobalStreamId有兩個String成員變量,componentId表示這個輸入組件的流來源的那個組件id,
  //streamId表示componentId所輸出的特定的流
  private Map<GlobalStreamId,Grouping> inputs; // 輸入的來源和分組狀況
  //StreamInfo有個重要的成員變量List<String> output_fields,表示輸出的域。
  private Map<String,StreamInfo> streams; // 輸出的流
  private int parallelism_hint; // 並行度

  根據上述的結構,StormTopology可以完整的表示拓撲中每一個組件輸出以後的流所流向的位置。

acker

  這一小節筆者不打算先從源碼的角度入手,先來將一個acker的做用以及從一個小例子來說解acker是怎樣工做的。咱們都知道做爲一個流式處理框架,消息的可靠性是一個很是特性之一。除開更加高級的事務框架能保證消息只被處理一次(exactly-once),JStorm自己也提供了at-least-once,這個機制能保證消息必定會被處理。下面從一個例子的角度來說解,這是如何實現的。
[圖片]
  如上圖所示,integer做爲輸入的spout,sliding和printer都是負責處理的bolt,Field表示之間輸出的元組內的元素對應的key。StreamID爲默認,不指定數據流分組的形式,則默認狀況下shuffle。上述是一個很是簡單的拓撲邏輯結構,而後在通過add_acker這個方法以後,實際的拓撲結構發生了一些變化,以下圖
[圖片]
  JStrom爲原來的拓撲結構添加了一個_ack的bolt,負責維護拓撲的可靠性,大體的狀況能夠從上圖中看出,每當一個元組被髮送到拓撲下游bolt中去的時候,也會發送到_ack中去保存下來,而後後續處理的每一個bolt每次調用ack函數都會發送給_ack(bolt),在指定時間間隔內收到最後處理的ack,那麼_ack(bolt)就發送一個消息給最初的spout,則保證了一個元組的可靠性。因此綜上,_ack這個Bolt就是維護了整個拓撲的可靠性,那麼讀者可能會問,_ack裏面保存了那麼多的消息,若是某個元組通過的組件很是多,是否會形成該元組的拓撲樹變的很大。這裏阿里利用異或,實現了一個很是簡單且高效低耗的判斷方法。
  其實在_ack中存儲的內容很是簡單,就是一個k-v鍵值對,k是一個隨機無重複的id(root_id),且在元組被處理的整個過程當中保持不變,將消息存儲爲<root_id,random>,random由每一個收到元組的組件生成,每通過一個組件,random就會改變一次。如上圖,integer在發送一個<root_id,x>給sliding以後,也會發送一個<root_id,x>給_ack,而後sliding通過處理以後,發送<root_id,y>給printer,而且發送一個<root_id,x^y>給_ack,而後當printer處理完以後在發送一個<root_id,y>給_ack,此時的_ack內部對於root_id這個消息的值是x^x^y^y=0。也就是處理成功,若是達到指定超時時間root_id對應的值還不是0,則須要通知給出這個元組的task(_ack也是一個bolt,因此內部也有保存某個消息的來源task),要求重發。以上就是JStorm用於保證消息可靠性所使用的方法,直觀且簡單。
  後續的幾個方法如addTopologyMaster,add_metrics_component,add_system_components都是添加了相應的控件(bolt)來進行協同操做。好比topology master能夠負責metrics,也能夠負責baskpressure(反壓)機制。筆者還沒深刻解讀,相應部分後續再作相應的添加,這裏先挖個坑。

計算worker數目

  在DefaultTopologyAssignContext的構造函數中,添加完附加的組件以後,緊接着獲取supervisorid和hostname對應的鍵值對,若是存在舊的分配信息,則獲取原先全部的worker,若是沒有,則新建一個worker的集合。去除deadtaskid中那些在unstopworker內的task(這裏的目的是分開處理,若是是new的狀況下,這兩個都是空集)。而後計算須要的worker數目。看下面的源碼,

private int computeWorkerNum() {
    //獲取拓撲設置的worker數目
    Integer settingNum = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_WORKERS));
    //
    int ret = 0, hintSum = 0, tmCount = 0;

    Map<String, Object> components =     ThriftTopologyUtils.getComponents(sysTopology);
    for (Entry<String, Object> entry : components.entrySet()) {
        String componentName = entry.getKey();
        Object component = entry.getValue();

        ComponentCommon common = null;
        if (component instanceof Bolt) {
            common = ((Bolt) component).get_common();
        }
        if (component instanceof SpoutSpec) {
            common = ((SpoutSpec) component).get_common();
        }
        if (component instanceof StateSpoutSpec) {
            common = ((StateSpoutSpec) component).get_common();
        }
        //獲取每一個組件中設置的並行度
        int hint = common.get_parallelism_hint();
        if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
            //若是是屬於TM組件,則加到tmCount
            tmCount += hint;
            continue;
        }
        //這個變量存下全部組件並行度的和
        hintSum += hint;
    }
    
    //ret存下較小的值
    if (settingNum == null) {
        ret = hintSum;
    } else {
        ret =  Math.min(settingNum, hintSum);
    }
    //這裏還須要判斷主TM是否須要一個獨立的worker節點用於處理
    Boolean isTmSingleWorker = ConfigExtension.getTopologyMasterSingleWorker(stormConf);
    if (isTmSingleWorker != null) {
        if (isTmSingleWorker == true) {
        ret += tmCount;
        setAssignSingleWorkerForTM(true);
    }
    } else {
        if (ret >= 10) {
            ret += tmCount;
        setAssignSingleWorkerForTM(true);
        }
    }
    return ret;
}

worker分配

  實例化完DefaultTopologyAssignContext以後,若是是rebalance類型,則還須要先將原先佔用的那些worker給釋放掉,具體作法就是將worker使用的端口放回可用端口集合中。幾個變量的含義,needAssignTasks:就是指須要分配的task,也就是除去unstopworker中的那些task。allocWorkerNum:等於原先計算好的worker的數目-減去unstopworker的數目再減去keepAssigns(只有在拓撲類型是ASSIGN_TYPE_MONITOR纔有的)的數目。實際worker分配中,最重要是方法WorkerScheduler.getAvailableWorkers。下面就來詳細講解這個方法內部怎麼實現。

int workersNum = getAvailableWorkersNum(context);
    if (workersNum < allocWorkerNum) {
        throw new FailedAssignTopologyException("there's no enough worker.allocWorkerNum="+ allocWorkerNum + ", availableWorkerNum="+ workersNum);
}
    workersNum = allocWorkerNum;
    List<ResourceWorkerSlot> assignedWorkers = new ArrayList<ResourceWorkerSlot>();

    getRightWorkers(context,needAssign,assignedWorkers,workersNum,getUserDefineWorkers(context, ConfigExtension.getUserDefineAssignment(context.getStormConf())));

  首先得知集羣上可用的所有worker,若是可用的worker小於須要分配的worker數,則須要拋出異常。若是足夠,則會分配足量的worker給指定的拓撲。調用getRightWorkers這個方法來獲取合適的worker,這裏所謂right的worker是指用戶自定義的worker,能夠指定worker的資源分配狀況。

getRightWorkers

  分爲兩部分來說解這個方法,首先是準備工做--getUserDefineWorkers這個方法,這個方法須要兩個參數,拓撲的上下文信息context,用戶自定義的worker列表workers。看下面的源碼:

private List<ResourceWorkerSlot> getUserDefineWorkers(
            DefaultTopologyAssignContext context, List<WorkerAssignment> workers) {
    List<ResourceWorkerSlot> ret = new ArrayList<ResourceWorkerSlot>();
    //若是沒有用戶自定義的worker,則不必任何操做
    if (workers == null)
        return ret;
    Map<String, List<Integer>> componentToTask = (HashMap<String, List<Integer>>) ((HashMap<String, List<Integer>>) context
                .getComponentTasks()).clone();
    //若是分配類型不是NEW,則仍是從workers資源分配信息列表中去除unstopworker。
    //這裏是用戶有指定某些worker資源屬於unstopworker才能去掉。
    if (context.getAssignType() != context.ASSIGN_TYPE_NEW) {
        checkUserDefineWorkers(context, workers, context.getTaskToComponent());
}
    //遍歷用戶定義的worker,去除那些沒有分配task的worker
    //用戶定義的worker中已經指定哪些task該分配到哪一個worker中
    for (WorkerAssignment worker : workers) {
        ResourceWorkerSlot workerSlot = new ResourceWorkerSlot(worker,componentToTask);
        if (workerSlot.getTasks().size() != 0) {
            ret.add(workerSlot);
        }
    }
return ret;
}

  去除那些沒有指定task的worker以後,真正進入getRightWorkers方法內部。源碼以下,這裏解釋下五個參數的含義,context表示以前準備的拓撲上下文信息,needAssign表示這個拓撲須要分配的各個taskid,assignedWorkers表示用來存儲那些在這個方法內分配到的worker資源,workersNum表示須要拓撲須要分配的worker數目,workers表示上個方法中用戶自定義的可用的worker資源。簡而言之,這個方法就是從workers中選出已經分配了指定的task的worker,而後存到assignedWorkers中去。

private void getRightWorkers(DefaultTopologyAssignContext context,
            Set<Integer> needAssign, List<ResourceWorkerSlot> assignedWorkers,
            int workersNum, Collection<ResourceWorkerSlot> workers) {
        Set<Integer> assigned = new HashSet<Integer>();
        List<ResourceWorkerSlot> users = new ArrayList<ResourceWorkerSlot>();
        if (workers == null)
            return;
        for (ResourceWorkerSlot worker : workers) {
            boolean right = true;
            Set<Integer> tasks = worker.getTasks();
            if (tasks == null)
                continue;
            for (Integer task : tasks) {
                if (!needAssign.contains(task) || assigned.contains(task)) {
                    right = false;
                    break;
                }
            }
            if (right) {
                assigned.addAll(tasks);
                users.add(worker);
            }
        }
        if (users.size() + assignedWorkers.size() > workersNum) {
            LOG.warn(
                    "There are no enough workers for user define scheduler / keeping old assignment, userdefineWorkers={}, assignedWorkers={}, workerNum={}",
                    users, assignedWorkers, workersNum);
            return;
        }

        assignedWorkers.addAll(users);
        needAssign.removeAll(assigned);
    }

  上面代碼主要的處理邏輯是在for循環中,在這個循環會去判斷worker內是否存有本拓撲內的taskid,若是有則把worker存儲起來,而且從taskid列表中移除掉那些分配出去的task,沒有則直接退出了。

使用舊分配/rebalance

  回到getAvailableWorkers方法內,看下面這段代碼。

//若是配置指定要使用舊的分配,則從舊的分配中選出合適的worker。
        if (ConfigExtension.isUseOldAssignment(context.getStormConf())) {
            getRightWorkers(context, needAssign, assignedWorkers, workersNum,
                    context.getOldWorkers());
        } else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE
                && context.isReassign() == false) {
            //若是是rebalance,且可使用原來的worker,將原來使用的worker存儲起來。
            int cnt = 0;
            for (ResourceWorkerSlot worker : context.getOldWorkers()) {
                if (cnt < workersNum) {
                    ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot();
                    resFreeWorker.setPort(worker.getPort());
                    resFreeWorker.setHostname(worker.getHostname());
                    resFreeWorker.setNodeId(worker.getNodeId());
                    assignedWorkers.add(resFreeWorker);
                    cnt++;
                } else {
                    break;
                }
            }
        }
        // 計算TM bolt的個數
        int workersForSingleTM = 0;
        if (context.getAssignSingleWorkerForTM()) {
            for (Integer taskId : needAssign) {
                String componentName = context.getTaskToComponent().get(taskId);
                if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
                    workersForSingleTM++;
                }
            }
        }
        int restWokerNum = workersNum - assignedWorkers.size();
        if (restWokerNum < 0)
            throw new FailedAssignTopologyException(
                    "Too much workers are needed for user define or old assignments. workersNum="
                            + workersNum + ", assignedWokersNum="
                            + assignedWorkers.size());

  筆者一開始以爲上述的代碼多是在判斷restWokerNum < 0是極可能會成立而致使拋出異常的,由於若是用戶一開始就指定了worker分配信息,而後rebalance狀況下,不斷去添加舊的worker到assignedWorkers內,這樣就會致使assignedWorkers的大小比實際須要的worker數目workersNum大。可是還沒來得及用實際集羣去測試,只是在github問了官方的人,若是有更新解決方案會後續再這裏說明。

分配剩下的worker
//restWokerNum是剩下須要的worker的數目,直接添加ResourceWorkerSlot實例對象。
    for (int i = 0; i < restWokerNum; i++) {
        assignedWorkers.add(new ResourceWorkerSlot());
    }
    //這裏是獲取那些專門指定運行拓撲的supervisor節點。
    List<SupervisorInfo> isolationSupervisors = this.getIsolationSupervisors(context);
    if (isolationSupervisors.size() != 0) {
        putAllWorkerToSupervisor(assignedWorkers, getResAvailSupervisors(isolationSupervisors));
    } else {
        putAllWorkerToSupervisor(assignedWorkers, getResAvailSupervisors(context.getCluster()));
    }
    this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers);
    LOG.info("Assigned workers=" + assignedWorkers);
    return assignedWorkers;

  上述代碼中的isolationSupervisors存放的是那些指定給這個拓撲的supervisor節點的id。若是有指定,則在這些特定的節點上分配,若是沒有指定,那麼,就在全局內分配。因此實際剩下的分配任務的是putAllWorkerToSupervisor這個方法,getResAvailSupervisors這個方法負責剔除那些沒法分配worker的supervisor節點,由於節點上分配的worker已經滿了。下面來介紹putAllWorkerToSupervisor這個方法的做用。
  putAllWorkerToSupervisor須要兩個參數,第一個是已經分配的worker,包含那些尚未設定運行在那個節點的worker(上面直接新建的那些worker),第二個參數是目前可用的supervisor節點。下面是這個方法的代碼

private void putAllWorkerToSupervisor( List<ResourceWorkerSlot> assignedWorkers, List<SupervisorInfo> supervisors) {
    for (ResourceWorkerSlot worker : assignedWorkers) {
        if (worker.getHostname() != null) {
            for (SupervisorInfo supervisor : supervisors) {
                if (NetWorkUtils.equals(supervisor.getHostName(), worker.getHostname()) && supervisor.getAvailableWorkerPorts().size() > 0) {
                    putWorkerToSupervisor(supervisor, worker);
                    break;
                }
            }
        }
    }
    supervisors = getResAvailSupervisors(supervisors);
    Collections.sort(supervisors, new Comparator<SupervisorInfo>() {

@Override
        public int compare(SupervisorInfo o1, SupervisorInfo o2) {
            // TODO Auto-generated method stub
            return -NumberUtils.compare( o1.getAvailableWorkerPorts().size(), o2.getAvailableWorkerPorts().size());
        }
    });
    putWorkerToSupervisor(assignedWorkers, supervisors);
}

  進入方法的第一步,首先要作的事情,就是對於那些已經分配好節點的worker,從supervisor節點上給該worker分配一個合適的端口。putWorkerToSupervisor這方法主要的操做是從supervisor節點上獲取一個可用的端口,而後設置worker的端口,並將該端口從supervisor節點的可用端口列表中移除。代碼結構很是簡單,以下:

private void putWorkerToSupervisor(SupervisorInfo supervisor, ResourceWorkerSlot worker) {
    int port = worker.getPort();
    if (!supervisor.getAvailableWorkerPorts().contains(worker.getPort())) {
        port = supervisor.getAvailableWorkerPorts().iterator().next();
    }
    worker.setPort(port);
    supervisor.getAvailableWorkerPorts().remove(port);
    worker.setNodeId(supervisor.getSupervisorId());
}

  設置好了一部分已經分配好的worker以後,繼續分配那些沒有指定supervisor的worker。根據supervisor中可用端口逆序,從大到小排。而後調用putWorkerToSupervisor這個方法。
  putWorkerToSupervisor方法內部首先統計全部已經使用的端口,而後計算出一個理論的負載平均值{(全部使用掉的+將要分配的)/supervisor的個數,就會獲得分配後,集羣的一個理論上的負載值theoryAveragePorts,能夠平攤到每一個supervisor身上}。而後經過遍歷須要分配worker的list,進行第一次分配,能夠將worker依次分配到那些負載值(跟理論值的計算方式同樣)小於理論平均負載的supervisor上。而超過負載的,則放進到負載列表中。通過一輪分配以後,若是還存在沒有分配的worker(源碼這裏先進行排序再進行判斷,很明顯形成排序時間浪費的可能性)。根據supervisor中可用端口逆序,從大到小排序。再不斷將worker分配進去。
  到這裏,worker的分配就順利結束了,總結一下,首先是根據拓撲信息初始化上下文信息,而後計算出實際使用的worker數目,若是這些worker有指定運行在某個supervisor節點上,那麼就在節點上分配合適的worker。若是沒有指定,那麼就根據節點的負載狀況,儘可能平均的分配到每一個supervisor節點上。若是你們的負載都比較大的狀況下,再分配到哪些具備比較多的可用端口的節點,完成分配。

task分配

  getAvailableWorkers方法完成了worker的分配,以及若是用戶指定了特定的worker上運行指定的task,剩下的taskid將會在接下來的方法中說明如何去分配。主要在TaskScheduler的構造函數中,這裏須要三個參數,第一個是拓撲的上下文信息defaultContext,第二個是須要分配的task的列表needAssignTasks,以及上文中獲取到的合適的worker列表availableWorkers。(ps:記住,前文若是沒有指定特定的worker資源分配的信息,則沒有taskid被分配到worker中去,也就是worker內部僅有supervisorid,內存,cpu,端口等信息,不存在tasks信息)。接下來看看TaskScheduler的構造函數。

public TaskScheduler(DefaultTopologyAssignContext context, Set<Integer> tasks, List<ResourceWorkerSlot> workers) {
        this.tasks = tasks;
        LOG.info("Tasks " + tasks + " is going to be assigned in workers " + workers);
        this.context = context;
        this.taskContext =
                new TaskAssignContext(this.buildSupervisorToWorker(workers), Common.buildSpoutOutoputAndBoltInputMap(context), context.getTaskToComponent());
        this.componentSelector = new ComponentNumSelector(taskContext);
        this.inputComponentSelector = new InputComponentNumSelector(taskContext);
        this.totalTaskNumSelector = new TotalTaskNumSelector(taskContext);
        if (tasks.size() == 0)
            return;
        if (context.getAssignType() != TopologyAssignContext.ASSIGN_TYPE_REBALANCE || context.isReassign() != false){
            // warning ! it doesn't consider HA TM now!!
            if (context.getAssignSingleWorkerForTM() && tasks.contains(context.getTopologyMasterTaskId())) {
                assignForTopologyMaster();
            }
        }

        int taskNum = tasks.size();
        Map<ResourceWorkerSlot, Integer> workerSlotIntegerMap = taskContext.getWorkerToTaskNum();
        Set<ResourceWorkerSlot> preAssignWorkers = new HashSet<ResourceWorkerSlot>();
        for (Entry<ResourceWorkerSlot, Integer> worker : workerSlotIntegerMap.entrySet()) {
            if (worker.getValue() > 0) {
                taskNum += worker.getValue();
                preAssignWorkers.add(worker.getKey());
            }
        }
        setTaskNum(taskNum, workerNum);

        // Check the worker assignment status of pre-assigned workers, e.g user defined or old assignment workers.
        // Remove the workers which have been assigned with enough workers.
        for (ResourceWorkerSlot worker : preAssignWorkers) {
            if (taskContext.getWorkerToTaskNum().keySet().contains(worker)){

                Set<ResourceWorkerSlot> doneWorkers = removeWorkerFromSrcPool(taskContext.getWorkerToTaskNum().get(worker), worker);
                if (doneWorkers != null) {
                    for (ResourceWorkerSlot doneWorker : doneWorkers) {
                        taskNum -= doneWorker.getTasks().size();
                        workerNum--;
                    }
                }

            }

        }
        setTaskNum(taskNum, workerNum);

        // For Scale-out case, the old assignment should be kept.
        if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && context.isReassign() == false) {
            keepAssignment(taskNum, context.getOldAssignment().getWorkers());
        }
    }
初始化

  在這個構造函數中,首先是構造一個task分配的上下文信息。這個對象主要須要維護的幾個重要信息是

  • taskToComponent:一個Map,Key表示taskid,Value表示所對應的組件id。

  • supervisorToWorker:也是一個Map,Key表示這個拓撲分配的supervisorid,Value表示節點上分配到的worker列表。

  • relationship:維護這個拓撲的一個結構信息,依然是個Map,Key表示組件bolt/spout的組件id,Value表示的是,若是Key對應組件是一個bolt,則Value存下是全部輸入到組件的對應組件的id。若是Key對應組件是一個spout,則Value存下是這個組件全部輸出到的組件id。舉個例子,integer(spout)輸出到sliding(bolt),sliding(bolt)輸出到printer(bolt)。則relationship存下的是[{integer,[sliding]},{sliding,[integer]},{printer,[sliding]}]。

  • workerToTaskNum:Map,Key表示一個worker,Value表示實際在這個worker上運行的task的總數目。

  • workerToComponentNum:Map,Key表示一個worker,Value表示一個Map,存下的是組件id,以及對應的數目。

  緊接着初始化三個selector,第一個是ComponentNumSelector(內部定義了一二WorkerComparator,負責對worker進行比對,對比worker內某個組件的task數目。以及對比每一個supervisor上全部worker內某個組件的總task和),第二個是InputComponentNumSelector(內部也是定義了兩個比對函數,一個是獲取worker內某個組件的所有輸入的task個數,以及在整個supervisor上的所有輸入task個數),第三個是TotalTaskNumSelector(worker內所有task的個數,和supervisor上所有task的個數)。這三個selector的目的都是爲了後續合理的將task分配到這些worker上作的準備。

分配TM bolt

  若是集羣資源足夠,用戶定義TM須要單獨分配到一個獨立的worker上,則須要調用assignForTopologyMaster進行單獨分配。

private void assignForTopologyMaster() {
        int taskId = context.getTopologyMasterTaskId();
        ResourceWorkerSlot workerAssigned = null;
        int workerNumOfSuperv = 0;
        for (ResourceWorkerSlot workerSlot : taskContext.getWorkerToTaskNum().keySet()){
            List<ResourceWorkerSlot> workers = taskContext.getSupervisorToWorker().get(workerSlot.getNodeId());
            if (workers != null && workers.size() > workerNumOfSuperv) {
                for (ResourceWorkerSlot worker : workers) {
                    Set<Integer> tasks = worker.getTasks();
                    if (tasks == null || tasks.size() == 0) {
                        workerAssigned = worker;
                        workerNumOfSuperv = workers.size();
                        break;
                    }
                }
            }
        }

        if (workerAssigned == null)
            throw new FailedAssignTopologyException("there's no enough workers for the assignment of topology master");
        updateAssignedTasksOfWorker(taskId, workerAssigned);
        taskContext.getWorkerToTaskNum().remove(workerAssigned);
        assignments.add(workerAssigned);
        tasks.remove(taskId);
        workerNum--;
        LOG.info("assignForTopologyMaster, assignments=" + assignments);
    }

  這個方法首先是找出某個最合適的worker,這個worker符合兩個條件,一是沒有分配其餘的task,第二,worker所在的supervisor相對分配了最多的worker,第二點的目的是保證負載均衡。若是找不到合適的worker,那麼就拋出異常。若是能找到的話,就把負責TM的task分配給這個worker。updateAssignedTasksOfWorker這個方法的目的就是更新新的分配狀況。

task分配

  接下來獲取所有的task數目,以及已經分配出去的worker列表preAssignWorkers。根據得到的總task數目來計算每一個worker上平均的task數目avgTaskNum,以及剩下多少尚未分配出去的task(總task%總worker,求得餘數leftTaskNum)。而後遍歷preAssignWorkers,調用方法removeWorkerFromSrcPool來判斷一個worker是否分配了足夠的task,而且移除那些已經合理分配的task和worker。

for (ResourceWorkerSlot worker : preAssignWorkers) {
            if (taskContext.getWorkerToTaskNum().keySet().contains(worker)){

                Set<ResourceWorkerSlot> doneWorkers = removeWorkerFromSrcPool(taskContext.getWorkerToTaskNum().get(worker), worker);
                if (doneWorkers != null) {
                    for (ResourceWorkerSlot doneWorker : doneWorkers) {
                        taskNum -= doneWorker.getTasks().size();
                        workerNum--;
                    }
                }

            }

        }

  removeWorkerFromSrcPool這個方法挺有趣的,第一次看的時候有點懵逼,可是其實仔細看下就很明確了。下面我簡單講解下:

private Set<ResourceWorkerSlot> removeWorkerFromSrcPool(int taskNum, ResourceWorkerSlot worker) {
        Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>();

        if (leftTaskNum <= 0) {
            if (taskNum >= avgTaskNum) {
                taskContext.getWorkerToTaskNum().remove(worker);
                assignments.add(worker);
                ret.add(worker);
            }
        } else {
            if (taskNum > avgTaskNum ) {
                taskContext.getWorkerToTaskNum().remove(worker);
                leftTaskNum = leftTaskNum -(taskNum -avgTaskNum);
                assignments.add(worker);
                ret.add(worker);
            }
            if (leftTaskNum <= 0) {
                List<ResourceWorkerSlot> needDelete = new ArrayList<ResourceWorkerSlot>();
                for (Entry<ResourceWorkerSlot, Integer> entry : taskContext.getWorkerToTaskNum().entrySet()) {
                    if (avgTaskNum != 0 && entry.getValue() == avgTaskNum)
                        needDelete.add(entry.getKey());
                }
                for (ResourceWorkerSlot workerToDelete : needDelete) {
                    taskContext.getWorkerToTaskNum().remove(workerToDelete);
                    assignments.add(workerToDelete);
                    ret.add(workerToDelete);
                }
            }
        }

        return ret;
    }

  ret保存的是須要返回給調用者須要移除的worker集合。看這個方法,首先判斷,在剩餘數小於等於0的狀況,若是當前worker內的task數目大於等於平均數,說明這個worker的確分配了合理的task。(緣由是,若是leftTaskNum小於等於0,是否是就當作,平均數會比正常狀況下加1。舉個例子,有3個盒子,10個球放進去,那麼,平均數爲3的狀況下,餘數爲1,若是平均數爲4,那麼餘數就是-2了)。若是leftTaskNum大於0,判斷就複雜一點,首先若是數目taskNum大於平均的avgTaskNum,說明這個worker多分配了一些task,那麼這些多分配的就必須從leftTaskNum減去。甚至可能taskNum的數目大於avgTaskNum+leftTaskNum的數目,那麼直接致使leftTaskNum小於等於0。在leftTaskNum小於等於0的狀況下,找出分配上下文中worker分配的task數目恰好是平均數的worker,存在needDelete列表中。而後遍歷這個列表,把這些worker從加到須要移除的集合ret中,並返回。(由於若是有某個worker分配的數目多於avgTaskNum+leftTaskNum的數目,那麼那些分配數是平均數的worker確定是合理的,剩下那些分配小於平均數的纔是須要調整的)。
  在執行完上述的操做以後,更新下目前的平均數avgTaskNum和分配剩餘的task數目leftTaskNum。(此刻還有一些task還沒有實際分配),完成分配的調度是在assign方法中。在這個方法內,若是已經沒有須要分配的task,則將原來已經分配好的返回就好了。若是還存在須要分配的task,遍歷這個須要分配的task列表,若是task對應的組件屬於系統組件(組件id爲__acker或者__topology_master的組件),則存下來,若是是通常的task,則調用chooseWorker方法選擇一個合適的worker,而後將task分配到worker上。(固然這裏還須要作一些額外的操做,好比清除那些已經合理的分配的worker,經過調用removeWorkerFromSrcPool這個方法去清除)。而chooseWorker這個方法利用的就是前文提到的三個selector來選擇最佳的supervisor,選擇最佳的worker(須要考慮這個task接收的input,須要考慮supervisor節點的負載狀況和worker內的負載狀況)。分配完普通的task以後,在分配系統組件,分配方式也是同樣的。
  至此,task的分配也完成,總結一下,除開那些已經指定的分配外,比較重要的是,定義合理的selector(綜合考慮節點負載,worker負載,已經input輸入,考慮本地化)。分配的同時不斷去檢測是否已經有worker已經合理分配了,就不要在繼續分配到那個worker上。

HeartBeat操做

  上述完成task和worker的分配以後,回到mkAssignment方法。剩下的操做就是設置task的HB起始時間和超時時間。這些比較簡單就再也不細說了。

結束語

  解讀拓撲分配的過程可讓咱們更加清楚,咱們寫的一個邏輯拓撲,其實是如何變成一個能夠實際運行在集羣的拓撲。以及拓撲如何保證負載均衡等問題。筆者後續還會更新JStorm幾個比較重要的特性的源碼分析。包括如何實現反壓機制,如何實現nimbus和supervisor容錯,supervisor啓動的時候須要執行那些操做。

相關文章
相關標籤/搜索