Twitter Storm進階初步,Storm能作什麼

本篇Blog是一個簡單的Storm入門例子,目的讓讀者明白Storm是怎樣的運行機制。以及後續會放出的幾篇Storm高級特性以及最終將Storm融入Hadoop 2.x的YARN中。目的讀者是已經進階大數據的Hadoop,Spark用戶,或者瞭解Storm想深刻理解Storm的讀者用戶。apache

項目Pom(Storm jar沒有提交到Maven中央倉庫,須要在項目中加入下面的倉庫地址):後端

<repositories>
    <repository>
        <id>central</id>
        <name>Maven Repository Switchboard</name>
        <layout>default</layout>
        <url>http://maven.oschina.net/content/groups/public/</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
    <repository>
        <id>clojars</id>
        <url>https://clojars.org/repo/</url>
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
        <releases>
            <enabled>true</enabled>
        </releases>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>org.yaml</groupId>
        <artifactId>snakeyaml</artifactId>
        <version>1.13</version>
    </dependency>

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.3.3</version>
    </dependency>

    <dependency>
        <groupId>org.clojure</groupId>
        <artifactId>clojure</artifactId>
        <version>1.5.1</version>
    </dependency>

    <dependency>
        <groupId>storm</groupId>
        <artifactId>storm</artifactId>
        <version>0.9.0.1</version>
    </dependency>

    <dependency>
        <groupId>storm</groupId>
        <artifactId>libthrift7</artifactId>
        <version>0.7.0</version>
    </dependency>
</dependencies>

下面是一個Storm的HelloWord的例子,代碼有刪減,熟悉Storm的讀者天然能把代碼組織成一個完整的例子。session

public static void main(String[] args) {
	Config conf = new Config();
	conf.put(Config.STORM_LOCAL_DIR, "/Volumes/Study/data/storm");
	conf.put(Config.STORM_CLUSTER_MODE, "local");
	//conf.put("storm.local.mode.zmq", "false");
	conf.put("storm.zookeeper.root", "/storm");
	conf.put("storm.zookeeper.session.timeout", 50000);
	conf.put("storm.zookeeper.servers", "nowledgedata-n15");
	conf.put("storm.zookeeper.port", 2181);
	//conf.setDebug(true);
	//conf.setNumWorkers(2);
	
	TopologyBuilder builder = new TopologyBuilder();
	builder.setSpout("words", new TestWordSpout(), 2); 
	
	builder.setBolt("exclaim2", new DefaultStringBolt(), 5)
	       .shuffleGrouping("words");
	LocalCluster cluster = new LocalCluster();
	cluster.submitTopology("test", conf, builder.createTopology());
}
  • Config.STORM_LOCAL_DIR是配置一個本地路徑,Storm會在這個路徑寫入一些配置信息和臨時數據。
  • Config.STORM_CLUSTER_MODE是運行模式,local和distributed兩個選項,即本地模式和分佈式模式。本地模式在運行時時多線程模擬的,開發測試用;分佈式模式在分佈式集羣下是多進程的,真正的分佈式。
  • Storm的Spout和Blot高可用是經過ZooKeeper協調的,storm.zookeeper.root是一個ZooKeeper地址,而且有對應的端口號
  • Debug是測試模式,有更詳細的日誌信息。

TestWordSpout是一個Storm自帶的例子,用來隨機的產生<code>new String[] {"nathan", "mike", "jackson", "golda", "bertels"};</code>列表中的字符串,用來提供數據源。多線程

其中DefaultStringBolt的源碼:maven

OutputCollector collector;

public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
    this.collector = collector;
}

public void execute(Tuple tuple) {
	log.info("rev a message: " + tuple.getString(0));
	collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
    collector.ack(tuple);
}

運行日誌:分佈式

10658 [Thread-29-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 10658 [Thread-31-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 10758 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike 10758 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 10859 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 10859 [Thread-29-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels 10961 [Thread-31-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 10961 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 11061 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 11062 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 11162 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels 11163 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jacksonoop

數據由一個Storm叫作噴嘴(Spout,也至關一個水龍頭,能產生數據的來源端)產生,而後傳遞給後端一連串的的Blot,最終被轉換和消費。而Spout和Blot都是並行的,並行度均可以本身設置(本地運行是靠多線程模擬的)。如:測試

builder.setSpout("words", new TestWordSpout(), 2); 
builder.setBolt("exclaim2", new DefaultStringBolt(), 5)

噴嘴TestWordSpout的並行度是2,DefaultStringBolt的並行度是5.大數據

從日誌能夠看出,數據通過噴嘴到達預先定於的一個Blot,打印了日誌。我測試代碼設置的並行度是5,日誌中統計,確實是5個線程:ui

  1. Thread-29-exclaim2
  2. Thread-31-exclaim2
  3. Thread-26-exclaim2
  4. Thread-33-exclaim2
  5. Thread-35-exclaim2

關於Storm是是什麼?http://storm.incubator.apache.org/http://www.ibm.com/developerworks/cn/opensource/os-twitterstorm/ 有詳細的介紹。

借用OSC網友的話說,Hadoop就是商場裏自動升降式的電梯,用戶須要排隊等待,選按樓層,而後到達;而Storm就像是自動扶梯,扶梯預先設置好運行後,來人就當即運走,目的地是明確的。

Storm按個人理解,Storm和Hadoop是徹底不一樣的,設計上也沒有半點擬合的部分。Storm更像是我以前介紹過的Spring Integration,是一個數據流系統。它能把數據按照預設定的流程,把數據作各類轉換,傳遞,分解,合併,最後數據到達後端存儲。只不過Storm是能夠分佈式,並且分佈式的能力也是能夠本身設置。

Storm的這種特性很適合大數據類的ETL系統開發。

相關文章
相關標籤/搜索