Storm系列(四)並行度和流分組

原文鏈接:a870439570.github.io/interview-d…git

並行度(parallelism)概念

  • 一個運行中的拓撲是由什麼構成的:工做進程(worker processes),執行器(executors)和任務(tasks)
  • 在 Worker 中運行的是拓撲的一個子集。一個 worker 進程是從屬於某一個特定的拓撲的,在 worker 進程中會運行一個或者多個與拓撲中的組件相關聯的 executor。一個運行中的拓撲就是由這些運行於 Storm集羣中的不少機器上的進程組成的。
  • 一個 executor 是由 worker 進程生成的一個線程。在 executor 中可能會有一個或者多個 task,這些 task 都是爲同一個組件(spout 或者 bolt)服務的。
  • task 是實際執行數據處理的最小工做單元(注意,task 並非線程) —— 在你的代碼中實現的每一個 spout 或者 bolt 都會在集羣中運行不少個 task。在拓撲的整個生命週期中每一個組件的 task 數量都是保持不變的,不過每一個組件的 executor數量倒是有可能會隨着時間變化。在默認狀況下 task 的數量是和 executor 的數量同樣的,也就是說,默認狀況下 Storm會在每一個線程上運行一個 task。

Storm的流分組策略

  • Storm的分組策略對結果有着直接的影響,不一樣的分組的結果必定是不同的。其次,不一樣的分組策略對資源的利用也是有着很是大的不一樣
  • 拓撲定義的一部分就是爲每一個Bolt指定輸入的數據流,而數據流分組則定義了在Bolt的task之間如何分配數據流。

八種流分組定義

Shuffle grouping:github

  • 隨機分組:隨機的將tuple分發給bolt的各個task,每一個bolt實例接收到相同數量的tuple。

Fields grouping:apache

  • 按字段分組:根據指定的字段的值進行分組,舉個栗子,流按照「user-id」進行分組,那麼具備相同的「user-id」的tuple會發到同一個task,而具備不一樣「user-id」值的tuple可能會發到不一樣的task上。這種狀況經常用在單詞計數,而實際狀況是不多用到,由於若是某個字段的某個值太多,就會致使task不均衡的問題。

Partial Key grouping:bash

  • 部分字段分組:流由分組中指定的字段分區,如「字段」分組,可是在兩個下游Bolt之間進行負載平衡,當輸入數據歪斜時,能夠更好地利用資源。優勢。有了這個分組就徹底能夠不用Fields grouping了

All grouping:網絡

  • 廣播分組:將全部的tuple都複製以後再分發給Bolt全部的task,每個訂閱數據流的task都會接收到一份相同的徹底的tuple的拷貝。

Global grouping:jvm

  • 全局分組:這種分組會將全部的tuple都發到一個taskid最小的task上。因爲全部的tuple都發到惟一一個task上,勢必在數據量大的時候會形成資源不夠用的狀況。

None grouping:分佈式

  • 不分組:不指定分組就表示你不關心數據流如何分組。目前來講不分組和隨機分組效果是同樣的,可是最終,Storm可能會使用與其訂閱的bolt或spout在相同進程的bolt來執行這些tuple

Direct grouping:post

  • 指向分組:這是一種特殊的分組策略。以這種方式分組的流意味着將由元組的生成者決定消費者的哪一個task能接收該元組。指向分組只能在已經聲明爲指向數據流的數據流中聲明。tuple的發射必須使用emitDirect種的一種方法。Bolt能夠經過使用TopologyContext或經過在OutputCollector(返回元組發送到的taskID)中跟蹤emit方法的輸出來獲取其消費者的taskID。

Local or shuffle grouping: 本地或隨機分組:和隨機分組相似,可是若是目標Bolt在同一個工做進程中有一個或多個任務,那麼元組將被隨機分配到那些進程內task。簡而言之就是若是發送者和接受者在同一個worker則會減小網絡傳輸,從而提升整個拓撲的性能。有了此分組就徹底能夠不用shuffle grouping了。性能

示例

修改上一章節的Topology Storm(三)Java編寫第一個本地模式demoui

package com.qxw.topology;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import com.qxw.bolt.OutBolt;
import com.qxw.bolt.OutBolt2;
import com.qxw.spout.DataSource;

/**
 * 拓撲的並行性
 * @author qxw
 * @data 2018年9月17日下午2:49:09
 */
public class TopologyTest2 {

	public static void main(String[] args) throws Exception {
		//配置
		Config cfg = new Config();
		cfg.setNumWorkers(2);//指定工做進程數  (jvm數量,分佈式環境下可用,本地模式設置無心義)
		cfg.setDebug(false);
		
		//構造拓撲流程圖
		TopologyBuilder builder = new TopologyBuilder();
		//設置數據源(產生2個執行器和倆個任務)
		builder.setSpout("dataSource", new DataSource(),2).setNumTasks(2);
		//設置數據建流處理組件(產生2個執行器和4個任務)
		builder.setBolt("out-bolt", new OutBolt(),2).shuffleGrouping("dataSource").setNumTasks(4); //隨機分組
		//設置bolt的並行度和任務數:(產生6個執行器和6個任務)
//		builder.setBolt("out-bol2", new OutBolt2(),6).shuffleGrouping("out-bolt").setNumTasks(6); //隨機分組
		
		//設置字段分組(產生8個執行器和8個任務)字段分組 
		builder.setBolt("out-bol2", new OutBolt2(),8).fieldsGrouping("out-bolt", new Fields("outdata")).setNumTasks(8);
		//設置廣播分組
		//builder.setBolt("write-bolt", new OutBolt2(), 4).allGrouping("print-bolt");
		//設置全局分組
		//builder.setBolt("write-bolt", new OutBolt2(), 4).globalGrouping("print-bolt");
		
		//1 本地模式
		LocalCluster cluster = new LocalCluster();
		
		//提交拓撲圖  會一直輪詢執行
		cluster.submitTopology("topo", cfg, builder.createTopology());

		
		//2 集羣模式
//		StormSubmitter.submitTopology("topo", cfg, builder.createTopology());
		
	}
}

}

複製代碼
相關文章
相關標籤/搜索