Storm系列一: Storm初步

初入Storm

前言

學習Storm已經有兩週左右的時間,可是認真來講學習過程確實是零零散散,遇到問題去百度一下,找到新概念再次學習,在這樣的一個循環又不成體系的過程當中不斷學習Storm。java

前人栽樹,後人乘涼,也正是由於網上有這樣多熱心的人,分享本身的看法,纔可以讓開發變得更簡單。也正是基於這個目的,同時公司剛好是作大數據的,預計還有至關長的時間須要深刻Storm,決定寫一下Storm系列相關知識。git

正文

在大數據處理中,目前來看,有這樣三種主要的數據處理方式,以hadoop爲主的大數據批處理框架, 以Storm爲主的實時計算流處理框架, 還有以Spark爲主的微型批處理流框架。github

解釋可能不太到位, 可是Storm最重要的特色也就是, 實時, 流處理。數據庫

基本概念

在這裏經過一個網絡上比較常見的案例來做爲開始吧, 假設咱們須要對一篇文章,一本書中的全部單詞按照首字母進行統計,統計每種首字母的單詞按照長度劃分進行統計,也就是等首字母,等長度的單詞,究竟出現了多少次,應該怎樣作呢?apache

不管是以怎樣的開發框架,模式來進行思考,咱們很容易想到這樣一個處理步驟:數組

用IO流從文章中不斷讀取內容做爲輸入,而後提取每一個單詞的首字母,判斷單詞的首字母,先按照首字母分組,再將分組事後的數據一個個統計其長度,對應的數值便可。網絡

那麼一點點來進行拆分。併發

拓撲(Topology)

首先須要提到的一個概念就是拓撲。不難將上述概念轉換成以下流程圖:
負載均衡

這樣的每個圓都表明一個簡單的處理或計算過程,每條邊就表明將上一個節點處理結束的數據發送到下一個節點,這樣一個數據流向。框架

而拓撲正是這樣一個計算圖,結點表明一些計算,數據處理邏輯,邊表明在結點之間數據的傳遞,由結點和邊所構建出來的這個總體,完成一個完整功能的總體,就被稱做是拓撲。

元組(Tuple)

元組是拓撲之間傳輸數據的形式,它自己是一個有序的數值序列。由於是有序的數值序列,就意味着在特定的index有着特定的含義,而這個含義又或者字段名稱(field)就是由使用者本身定義的。

任何一個節點均可以創造元組,併發送給任意其餘一個或多個節點,而這個過程就被稱做是發射(emit)一個元組。

那麼就會有這樣一個問題,在元組中並無對數據類型作出強制限定,對於處在不一樣機器,或不一樣進程的節點,通常是須要經過網絡發送,或是socket在本機間發送,是如何發送java中的對象呢?答案是經過序列化的方式。而在這一點,咱們在後續的篇章再提。

流(Streaming)

流就是一個「無邊界的元組序列」, 元組是基本的傳輸單位,當元組在兩個節點之間源源不斷的發送,就是所謂的流。

而除了根節點是從數據源不斷讀取數據以外,其餘的節點均可以從任意多個節點接收數據,而每個節點均可以向任意多個節點發送數據。

spout

Spout的主要功能是從數據源中讀取數據,並向其餘節點發送數據,數據源能夠有多種,文件,消息隊列,數據庫。

Spout中並不包含對數據的處理邏輯,所須要作的是,從數據源讀取,發送。但也並不是徹底意義上的什麼都不作,通常來講,在這一步會選擇完成反序列化這一工做,甚至更近一步的,將接收到的數據轉換成相應的基本Java對象發射出去,之因此說是基本的,也意味着僅僅是將字符串或其餘形式的數據,轉換成對象,並不作任何特殊處理。

爲何Spout不作任何的數據處理功能呢?在這裏是否是連對象轉換也不要有比較好呢?

我的理解,因爲Spout是整個數據處理的第一環,大批量的數據流入並從當前節點分發,在nextTuple中不能有阻塞是基本要求。在這一環節作出的操做越多,對性能的影響越大。至於對象轉換, 因爲這是對數據的基本操做,也就是放在任何節點都須要執行的東西,更況且,若是是本身設計, 若是數據被原樣發出,會在下一節點作出數據轉換後,進一步發送。至關於作了一次無效的發送操做。

因此,仍需考量。

bolt

不一樣於spout只監聽數據源,bolt能夠完成從輸入流的元組接收, 轉換, 處理, 發射功能。是咱們的topology中真正的數據處理節點。

在咱們的例子中有這樣兩個bolt:

  • 提取單詞首字母:所作的工做是,接收單詞,獲取單詞首字母,併發送到下一節點。

  • 數據更新節點:接收單詞,判斷首字母標識字段, 判斷長度, 更新計數器。

咱們會注意到, 接收, 處理, 或許發送, 是bolt的全部功能。

就接收來講,咱們的數據來源可能並不止一個,多是Spout,也多是其餘bolt。 對於發送來講, 咱們的目的地也可能不止一個,既能夠是bolt, 也有多是其餘拓撲的Spout。 bolt 能夠是多入多出的。

小結

就如今而言,咱們知道了:

  • 一個拓撲包含大量的節點和邊

  • 節點有Spout或bolt

  • 邊表明節點間的元組流

  • 一個元祖是一個有序的數值列表,每一個數組都被賦予一個命名

  • 一個數據流失一個在spout 和 bolt 或兩個bolt之間的無邊界元組序列。

  • spout是拓撲的數據源

  • bolt接收輸入流,作出數據處理,可能會發送數據給下一節點。

  • 在實際中,每一個spout可能會同時運行一個或多個獨立的實例,並行的進行相應的數據處理。

流分組

咱們還須要關注一下下其中的一個策略性問題, 即流分組, 當數據從一個節點發送到另外一個節點是以流的形式進行發送。

咱們已經知道處在當前節點的下游,可能存在多個不一樣種類的bolt, 也可能存在同一bolt的多個實例。數據流是怎樣分配的呢?

對待第一種狀況, 不一樣種類的bolt,比較好處理, 咱們爲每一種類型的流,就案例而言,若是咱們設計了多種bolt, 分別處理相應字母開頭的單詞, 那麼在spout發送時, 就能夠指定流的名字, bolt接收時,不一樣的bolt實例去接收不一樣的流便可。

而第二種狀況便是流分組, 最多見的是隨機分組, 它能夠保證每一個bolt接收到的數據量基本一致,負載均衡。可是,並非絕對均衡,由於採起的是隨機的方式,並非輪詢策略。

第二種比較常見的方式是, 字段分組, 它能夠保證特定字段上的值相同的元組發射到同一個bolt實例。

流分組策略有多種,在後續會有章節提到。

Storm工程

相應代碼已經上傳至:

git@github.com:zyzdisciple/storm_study.git

須要提到的一點是:在運行topology時,可能會打印的東西過多,即便加了debug false也不可以改變這一問題,須要在當前項目的 resources中加入, log4j2.xml 更改打印Level;

log4j2.xml 在 storm-core jar包中自帶。

<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="60">
<Appenders>
    <Console name="Console" target="SYSTEM_OUT">
    <PatternLayout pattern="%-4r [%t] %-5p %c{1.} - %msg%n"/>
    </Console>
</Appenders>
<Loggers>
    <!--<Logger name="org.apache.zookeeper" level="WARN"/>-->
    <Root level="WARN">
    <AppenderRef ref="Console"/>
    </Root>
</Loggers>
</configuration>

獲取Storm的最簡單方式是經過Maven:

<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>1.2.1</version>
    <!--在真實項目中通常須要定義爲provided,暫時註釋 -->
    <!--<scope>provided</scope>-->
</dependency>

建立Spout

在開始你的代碼以前,最好對整個拓撲有一個較爲清晰的瞭解,也就是咱們以前所作的工做, 須要知道數據源的數據輸出格式, 擁有幾個節點,每一個節點是作什麼的,數據在各個節點之間如何分發,數據輸入節點以前應該是怎樣的,流出節點以後又應該是怎樣的?

在弄清楚上述問題以前,通常最好不要開始進行代碼。

而咱們的輸入呢?是讀取一個文件, 讀取文件中的每一行數據便可,而後分發到下一個節點去:

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;

/**
* @author zyzdisciple
* @date 2019/4/3
*/
public class FileReaderSpout extends BaseRichSpout {

    private static final long serialVersionUID = -1379474443608375554L;

    private SpoutOutputCollector collector;

    private BufferedReader br;

    /**
    * 方法是用來初始化一些資源類,具體的參數須要待對storm有了更深刻的瞭解以後再度來看。
    * 這些資源類不只僅是參數提供的資源, 包括讀取文件, 讀取數據庫,等等其餘任何方式,
    * 打開數據資源都是在這個方法中實現。
    * 緣由則是能夠理解爲,當對象被初始化時執行的方法,並不許確,但能夠這樣理解。
    * @param conf
    * @param topologyContext
    * @param spoutOutputCollector
    */
    public void open(Map conf, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
        try {
            br = new BufferedReader(new FileReader("E:\\IdeaProjects\\storm_demo\\src\\main\\resources\\data.txt"));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
    }

    /**
    * 流的核心,不斷調用這個方法,讀取數據,發送數據。
    * 在這裏採起的方式是每次讀取一行,固然也能夠在一次中讀取全部數據,而後在循環中
    * emit發射數據。
    * 須要特別注意的是,這個方法必定是不可以被阻塞的, 也不可以拋出異常,
    * 拋出異常會讓固然程序中止,阻塞嚴重影響性能。
    */
    public void nextTuple() {
        try {
            //向外發射數據
            String line = br.readLine();
            if (line == null) {
                return;
            }
            collector.emit(new Values(line));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
    * 定義輸出格式,在collector.emit時,new values可接受數組, 如發送 a b c,
    * 則此時會與declare field中的名稱一一對應,且順序一致,而且必須保證數量一致。
    * 經過這種配置的方式,就無需以map形式輸出數據, 咱們能夠僅輸出值便可。
    *
    * 固然declare不止這一種重載方法,其他的暫時不用理會。
    * @param declarer
    */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        /*Fields名稱這裏,通常使用中會拆出來,定義爲常量,而不是直接字符串,
        * 包括Stream等其餘屬性也是,由於頗有可能在其餘地方會被用到,因此通常拆分紅常量
        * */
        declarer.declare(new Fields("line"));
        //declarer.declare(new Fields(DemoConstants.FIELD_LINE)); //應該採起這種方式
    }

    /**
    * 在fileReader結束以後關閉對應的流
    * 能夠暫時忽略
    */
    @Override
    public void close() {
        if (br != null) {
            try {
                br.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

bolt節點

在bolt的代碼中, 並無太多值得提到的地方, 由於它的操做大都與Spout保持一致。

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
* @author zyzdisciple
* @date 2019/4/3
*/
public class WordsBolt extends BaseRichBolt {

    private static final long serialVersionUID = 520139031105355867L;

    private OutputCollector collector;

    /**
    * 與spout中的open方法功能基本一致。
    * @param stormConf
    * @param context
    * @param collector
    */
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    /**
    * 類比於Spout中的nextTuple
    * @param input 接收的數據,存有數據以及其相關信息。
    */
    public void execute(Tuple input) {

        String line = input.getStringByField("line").trim();
        //input.getStringByField(DemoConstants.FIELD_LINE);
        if (!line.isEmpty()) {
            String[] words = line.split(" ");
            for (String word : words) {
                if (!word.trim().isEmpty()) {
                    collector.emit(new Values(word.charAt(0), word.length(), word));
                }
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("headWord", "wordLength", "word"));
        //declarer.declare(new Fields(DemoConstants.FIELD_HEAD_WORD, DemoConstants.FIELD_WORD_LENGTH, DemoConstants.FIELD_WORD));
    }
}

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.util.HashMap;
import java.util.Map;

/**
* @author zyzdisciple
* @date 2019/4/3
*/
public class CountBolt extends BaseRichBolt {

    private static final long serialVersionUID = 3693291291362580453L;

    //這裏存的時候取巧,用 a1 a2 表示首字母爲1,長度爲1,2 的單詞
    private Map<String, Integer> counterMap;

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        /*爲何hashMap也要放在這裏進行初始化之後再提,這裏暫時忽略。
        *在storm中, bolt和 spout的初始化通常都不會放在構造器中進行,
        * 而都是放在prepare中。
        */
        counterMap = new HashMap<>();
    }

    @Override
    public void execute(Tuple input) {
        String key = input.getValueByField("headWord").toString().toLowerCase() + input.getIntegerByField("wordLength");
        counterMap.put(key, countFor(key) + 1);
        counterMap.forEach((k, v) -> {
            System.out.println(k + " : " + v);
        });
    }

    /**
    * 在這裏由於不須要向下一個節點下發數據, 所以不須要定義。
    * @param declarer
    */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

    /**
    * 統計當前key已經出現多少次。
    * @param key
    * @return
    */
    private int countFor(String key) {
        Integer count =  counterMap.get(key);
        return count == null ? 0 : count;
    }

    /**
    * 與Spout的close方法相似
    */
    @Override
    public void cleanup() {

    }
}

在countBolt中存在一個屬性, map, 這是私有屬性, 而storm在執行的時候可能會建立多個bolt實例,他們之間的變量並不共享, 這必然會致使一些問題, 這就是咱們爲何在流分組策略中選擇 fieldGroup分組的方式, 它可以保證, field相同的數據, 最終必然會流向同一個bolt實例。

但不可以保證 key: a key: b,的兩個tuple流向不一樣的bolt。

topology

import com.storm.demo.rudiments.bolt.CountBolt;
import com.storm.demo.rudiments.bolt.WordsBolt;
import com.storm.demo.rudiments.spout.FileReaderSpout;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;

/**
* @author zyzdisciple
* @date 2019/4/3
*/
public class WordCountTopology {

    private static final String STREAM_SPOUT = "spoutStream";

    private static final String STREAM_WORD_BOLT = "wordBoltStream";

    private static final String STREAM_COUNT_BOLT = "countBoltStream";

    private static final String TOPOLOGY_NAME = "rudimentsTopology";

    private static final Long TEN_SECONDS = 1000L * 10;

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        //設置Spout,第一個參數爲節點名稱, 第二個爲對應的Spout實例
        builder.setSpout(STREAM_SPOUT, new FileReaderSpout());
        //設置bolt,在這裏採用隨機分組便可,在shuffleGrouping,中第一個參數爲接收的節點名稱,表示從哪一個節點接收數據
        //這裏並不能等同於流名稱,這個概念還有其餘用處。
        builder.setBolt(STREAM_WORD_BOLT, new WordsBolt()).shuffleGrouping(STREAM_SPOUT);
        //在這裏採起的是fieldsGrouping,緣由則是由於在CountBolt中存在自有Map,必須保證屬性一致的分到同一個bolt實例中
        builder.setBolt(STREAM_COUNT_BOLT, new CountBolt()).fieldsGrouping(STREAM_WORD_BOLT, new Fields("headWord", "wordLength"));
        //相關配置
        Config config = new Config();
        config.setDebug(true);
        //本地集羣
        LocalCluster cluster = new LocalCluster();
        //經過builder建立拓撲
        StormTopology topology = builder.createTopology();
        //提交拓撲
        cluster.submitTopology(TOPOLOGY_NAME, config, topology);
        //停留幾秒後關閉拓撲,不然會永久運行下去
        Utils.sleep(TEN_SECONDS);
        cluster.killTopology(TOPOLOGY_NAME);
        cluster.shutdown();
    }
}

在這個topology中,雖然功能簡單,但事實已經完整的展現了一個topology的設計流程, 同時在 main方法中也蘊藏了整個 topology的執行流程,生命週期等等。 這部分在後續會提到。

相關文章
相關標籤/搜索