流式大數據計算實踐(6)----Storm簡介&使用&安裝

1、前言

一、這一文開始進入Storm流式計算框架的學習python

2、Storm簡介

一、Storm與Hadoop的區別就是,Hadoop是一個離線執行的做業,執行完畢就結束了,而Storm是能夠源源不斷的接受數據源,不停的對數據進行處理,而數據就行水流同樣不停的流進來,通過處理,再將結果存入數據庫或者作其餘用途數據庫

二、基礎概念apache

(1)Tuple(元組):數據流傳遞的基本單元,至關於數據的流動經過Tuple做爲對象來傳遞json

(2)Spout(龍捲):至關於數據源,經過重寫nextTuple()方法,源源不斷的將數據流入咱們的處理框架vim

(3)Bolt(閃電):處理數據的節點,經過重寫execute()方法,接收Spout送出的數據,並進行任意的業務處理,處理完畢還能夠將數據繼續流入下一個Bolt,組成一條鏈框架

(4)Topology(拓撲):鏈接以上各個組件,使其組成一個拓撲結構,好比將多個Bolt組成一條數據鏈maven

三、舉例說明:好比咱們如今要統計一下《戰爭與和平》這本書的每一個英文單詞出現的數量ide

(1)編寫Spout代碼,將書的內容源源不斷的經過句子輸入到咱們的體系中函數

(2)編寫多個Bolt來處理數據,好比第一個Bolt專門來將句子切分紅單詞,第二個Bolt專門來統計每一個單詞出現的數量,每一個Bolt之間經過定義Bolt來流動數據,好比統計的Bolt,定義一個二元元組(單詞,數量),第一個值就是具體的單詞,第二個值就是這個單詞出現的數量oop

(3)經過Topology將以上組件鏈接成一個完整的系統

3、Storm安裝

一、下載Storm的tar.gz包,並解壓

tar zxvf /work/soft/installer/apache-storm-1.2.2

二、修改配置文件

(1)storm.ymal,分別配置咱們的Zookeeper集羣(前文中已經搭建好了)的各個節點和nimbus節點的高可用性,避免單點故障,咱們的環境有兩個機器,因此都寫上

vim /work/soft/apache-storm-1.2.2/conf/storm.yaml

 storm.zookeeper.servers:
     - "storm1"
     - "storm2"
 
 nimbus.seeds: ["storm1", "storm2"]

三、輸入python檢查一下機器是否安裝了python,若是沒有則安裝python,安裝完畢再執行python,發現能夠進入,而後ctrl+D退出便可

apt-get install python-minimal

四、啓動Storm集羣,經過如下命令分別啓動nimbus、supervisor和控制檯UI,nohup能夠當SSH客戶端關閉時,不會將進程殺死,後綴加一個&,能夠理解爲讓進程在後臺運行

nohup /work/soft/apache-storm-1.2.2/bin/storm nimbus &
nohup /work/soft/apache-storm-1.2.2/bin/storm supervisor &
nohup /work/soft/apache-storm-1.2.2/bin/storm ui &

四、經過jps命令查看進程是否正常啓動,若是看到config_value,說明還沒啓動完畢,稍等一下就行了

五、打開8080端口,能夠看到控制檯,正常運行

 

4、Storm代碼編寫

咱們根據以上的思路寫一個簡單的單詞統計任務,咱們先放在開發環境上面跑代碼,是不須要Storm集羣環境的,等咱們寫好代碼並在本地跑通後,就能夠搭建Storm集羣,在集羣上面跑了,關於單詞統計的代碼網上很容易找到,下面闡述一下實現的思路,能夠對照着如下文字來看代碼,更好理解

一、建立一個maven工程,引入如下依賴,因爲我這裏的思路是:經過Rabbitmq獲取消息數據,Storm進行數據流處理,將結果存儲爲Json格式並存入HBase。因此我須要引入以下依賴

<!-- HBase -->
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>2.1.1</version>
</dependency>
<!-- Storm -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.0.4</version>
    <scope>provided</scope>
</dependency>
<!-- Json -->
<dependency>
    <groupId>org.json</groupId>
    <artifactId>json</artifactId>
    <version>20140107</version>
</dependency>
<!-- RabbitMQ -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.0</version>
</dependency>

二、由於要使用HBase,因此參照上文的操做,還要將Hadoop的配置文件拷貝到項目中。環境搭建好後,開始編寫代碼

三、首先編寫Spout,也就是數據的來源,建立一個類實現IRichSpout接口,並重寫nextTuple()方法,在這個方法裏實現數據的生產,好比讀取數據庫(RDS或NoSQL),從消息隊列獲取數據(Kafka、RabbitMQ),將輸出的數據定義成Tuple(元組),經過重寫declareOutputFields()方法,定義元組的key和數量,而後在nextTuple()方法中將元組的內容經過emit()方法傳遞到下一個組件

四、編寫Bolt,也就是數據的處理者,建立一個類實現IRichBolt接口,並重寫execute(Tuple tuple)方法,這個方法就是處理數據的邏輯了,在這裏能夠編寫各類代碼對接收到的Tuple進行處理,處理完畢後,和Spout同樣,能夠將數據經過定義Tuple的方式傳遞到下一個組件,每一個Bolt會對數據進行特定的處理,而後傳遞給下一個Bolt,這樣就能夠組成一條數據流的處理鏈

五、編寫Topology,拓撲將上面編寫的組件鏈接起來,組成一個拓撲圖,數據就在這個拓撲圖裏面持續的「流動」,永不停歇,拓撲也是程序的入口,因此建立一個主函數,在主函數裏面建立一個TopologyBuilder對象,經過setSpout()、setBolt()方法將上面的組件鏈接起來,鏈接的方式涉及到Storm的八種Grouping策略

(1)Shuffle Grouping(隨機分組):最經常使用的分組方式,將Tuple平均隨機分配到各個Bolt裏面

(2)Fields Grouping(字段分組):根據指定字段進行分組,好比咱們按照word字段進行分組,相同word值的Tuple會被分配到同一個Bolt裏面

(3)All Grouping(廣播分組):全部的Bolt均可以收到Tuple

(4)None Grouping(無分組):將Tuple隨機分配到各個Bolt裏面

(5)Global Grouping(全局分組):將Tuple分配到task id值最低的task裏面

(6)Direct Grouping(直接分組):生產者Bolt決定消費者Bolt能夠接受的Tuple

(7)Local or Shuffle Grouping(本地或者隨機分組):Bolt在同一進程或存在多個task,元組會隨機分配這些task

(8)Custom Grouping (自定義分組):經過實現CustomStreamGrouping接口來定義自定義分組

六、經過TopologyBuilder鏈接好各個組件後,就能夠提交任務了,提交任務分兩種方式:本地提交和集羣提交

(1)本地提交:提交到開發環境中,不須要安裝Storm環境,只須要引入Storm的依賴包便可,使用LocalCluster類的submitTopology方法提交任務

(2)集羣提交:提交到Storm集羣中,使用StormSubmitter類的submitTopology方法提交任務

5、提交jar包到集羣

一、首先咱們要修改一下pom文件,將以前引入的storm-core依賴裏面加<scope>provided</scope>,目的是storm-core這個依賴排除掉,由於這個依賴只是本地測試調試依賴的,集羣中不須要這個依賴,若是不加會報錯,還要記得修改拓撲的代碼,使用StormSubmitter類來提交

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.0.4</version>
    <scope>provided</scope>
</dependency>

二、經過編譯器將咱們的maven項目打包成jar

mvn clean install

三、將jar包拷貝到集羣的集羣裏面,由於咱們的代碼使用到了HBase,因此要記得把項目中的配置文件夾也拷貝過來(core-site.xml、hbase-site.xml、hdfs-site.xml),jar是掃描不到jar包裏的配置文件的,把配置文件放到與jar包同級目錄下便可

四、執行命令將jar包提交到集羣中運行,命令後面要記得指定主函數的全包名

nohup /work/soft/apache-storm-1.2.2/bin/storm jar /work/jar/mytest.jar com.orange.heatmap.Main &

五、進入8080控制檯,能夠看到咱們剛纔提交的拓撲,點擊進去能夠查看運行的狀態

相關文章
相關標籤/搜索