Storm 系列(八)—— Storm 集成 HDFS 和 HBase

1、Storm集成HDFS

1.1 項目結構

本用例源碼下載地址:storm-hdfs-integrationhtml

1.2 項目主要依賴

項目主要依賴以下,有兩個地方須要注意:java

  • 這裏因爲我服務器上安裝的是 CDH 版本的 Hadoop,在導入依賴時引入的也是 CDH 版本的依賴,須要使用 <repository> 標籤指定 CDH 的倉庫地址;
  • hadoop-commonhadoop-clienthadoop-hdfs 均須要排除 slf4j-log4j12 依賴,緣由是 storm-core 中已經有該依賴,不排除的話有 JAR 包衝突的風險;
<properties>
    <storm.version>1.2.2</storm.version>
</properties>

<repositories>
    <repository>
        <id>cloudera</id>
        <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
</repositories>

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>${storm.version}</version>
    </dependency>
    <!--Storm 整合 HDFS 依賴-->
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-hdfs</artifactId>
        <version>${storm.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.6.0-cdh5.15.2</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.6.0-cdh5.15.2</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.6.0-cdh5.15.2</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

1.3 DataSourceSpout

/**
 * 產生詞頻樣本的數據源
 */
public class DataSourceSpout extends BaseRichSpout {

    private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");

    private SpoutOutputCollector spoutOutputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        // 模擬產生數據
        String lineData = productData();
        spoutOutputCollector.emit(new Values(lineData));
        Utils.sleep(1000);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("line"));
    }


    /**
     * 模擬數據
     */
    private String productData() {
        Collections.shuffle(list);
        Random random = new Random();
        int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
        return StringUtils.join(list.toArray(), "\t", 0, endIndex);
    }

}

產生的模擬數據格式以下:git

Spark   HBase
Hive    Flink   Storm   Hadoop  HBase   Spark
Flink
HBase   Storm
HBase   Hadoop  Hive    Flink
HBase   Flink   Hive    Storm
Hive    Flink   Hadoop
HBase   Hive
Hadoop  Spark   HBase   Storm

1.4 將數據存儲到HDFS

這裏 HDFS 的地址和數據存儲路徑均使用了硬編碼,在實際開發中能夠經過外部傳參指定,這樣程序更爲靈活。github

public class DataToHdfsApp {

    private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
    private static final String HDFS_BOLT = "hdfsBolt";

    public static void main(String[] args) {

        // 指定 Hadoop 的用戶名 若是不指定,則在 HDFS 建立目錄時候有可能拋出無權限的異常 (RemoteException: Permission denied)
        System.setProperty("HADOOP_USER_NAME", "root");

        // 定義輸出字段 (Field) 之間的分隔符
        RecordFormat format = new DelimitedRecordFormat()
                .withFieldDelimiter("|");

        // 同步策略: 每 100 個 tuples 以後就會把數據從緩存刷新到 HDFS 中
        SyncPolicy syncPolicy = new CountSyncPolicy(100);

        // 文件策略: 每一個文件大小上限 1M,超過限定時,建立新文件並繼續寫入
        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, Units.MB);

        // 定義存儲路徑
        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
                .withPath("/storm-hdfs/");

        // 定義 HdfsBolt
        HdfsBolt hdfsBolt = new HdfsBolt()
                .withFsUrl("hdfs://hadoop001:8020")
                .withFileNameFormat(fileNameFormat)
                .withRecordFormat(format)
                .withRotationPolicy(rotationPolicy)
                .withSyncPolicy(syncPolicy);


        // 構建 Topology
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
        // save to HDFS
        builder.setBolt(HDFS_BOLT, hdfsBolt, 1).shuffleGrouping(DATA_SOURCE_SPOUT);


        // 若是外部傳參 cluster 則表明線上環境啓動,不然表明本地啓動
        if (args.length > 0 && args[0].equals("cluster")) {
            try {
                StormSubmitter.submitTopology("ClusterDataToHdfsApp", new Config(), builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                e.printStackTrace();
            }
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalDataToHdfsApp",
                    new Config(), builder.createTopology());
        }
    }
}

1.5 啓動測試

能夠用直接使用本地模式運行,也能夠打包後提交到服務器集羣運行。本倉庫提供的源碼默認採用 maven-shade-plugin 進行打包,打包命令以下:shell

# mvn clean package -D maven.test.skip=true

運行後,數據會存儲到 HDFS 的 /storm-hdfs 目錄下。使用如下命令能夠查看目錄內容:apache

# 查看目錄內容
hadoop fs -ls /storm-hdfs
# 監聽文內容變化
hadoop fs -tail -f /strom-hdfs/文件名

2、Storm集成HBase

2.1 項目結構

集成用例: 進行詞頻統計並將最後的結果存儲到 HBase,項目主要結構以下:緩存

本用例源碼下載地址:storm-hbase-integration服務器

2.2 項目主要依賴

<properties>
        <storm.version>1.2.2</storm.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>${storm.version}</version>
        </dependency>
        <!--Storm 整合 HBase 依賴-->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-hbase</artifactId>
            <version>${storm.version}</version>
        </dependency>
    </dependencies>

2.3 DataSourceSpout

/**
 * 產生詞頻樣本的數據源
 */
public class DataSourceSpout extends BaseRichSpout {

    private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");

    private SpoutOutputCollector spoutOutputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        // 模擬產生數據
        String lineData = productData();
        spoutOutputCollector.emit(new Values(lineData));
        Utils.sleep(1000);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("line"));
    }


    /**
     * 模擬數據
     */
    private String productData() {
        Collections.shuffle(list);
        Random random = new Random();
        int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
        return StringUtils.join(list.toArray(), "\t", 0, endIndex);
    }

}

產生的模擬數據格式以下:app

Spark   HBase
Hive    Flink   Storm   Hadoop  HBase   Spark
Flink
HBase   Storm
HBase   Hadoop  Hive    Flink
HBase   Flink   Hive    Storm
Hive    Flink   Hadoop
HBase   Hive
Hadoop  Spark   HBase   Storm

2.4 SplitBolt

/**
 * 將每行數據按照指定分隔符進行拆分
 */
public class SplitBolt extends BaseRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String line = input.getStringByField("line");
        String[] words = line.split("\t");
        for (String word : words) {
            collector.emit(tuple(word, 1));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

2.5 CountBolt

/**
 * 進行詞頻統計
 */
public class CountBolt extends BaseRichBolt {

    private Map<String, Integer> counts = new HashMap<>();

    private OutputCollector collector;


    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector=collector;
    }

    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Integer count = counts.get(word);
        if (count == null) {
            count = 0;
        }
        count++;
        counts.put(word, count);
        // 輸出
        collector.emit(new Values(word, String.valueOf(count)));

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

2.6 WordCountToHBaseApp

/**
 * 進行詞頻統計 並將統計結果存儲到 HBase 中
 */
public class WordCountToHBaseApp {

    private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
    private static final String SPLIT_BOLT = "splitBolt";
    private static final String COUNT_BOLT = "countBolt";
    private static final String HBASE_BOLT = "hbaseBolt";

    public static void main(String[] args) {

        // storm 的配置
        Config config = new Config();

        // HBase 的配置
        Map<String, Object> hbConf = new HashMap<>();
        hbConf.put("hbase.rootdir", "hdfs://hadoop001:8020/hbase");
        hbConf.put("hbase.zookeeper.quorum", "hadoop001:2181");

        // 將 HBase 的配置傳入 Storm 的配置中
        config.put("hbase.conf", hbConf);

        // 定義流數據與 HBase 中數據的映射
        SimpleHBaseMapper mapper = new SimpleHBaseMapper()
                .withRowKeyField("word")
                .withColumnFields(new Fields("word","count"))
                .withColumnFamily("info");

        /*
         * 給 HBaseBolt 傳入表名、數據映射關係、和 HBase 的配置信息
         * 表須要預先建立: create 'WordCount','info'
         */
        HBaseBolt hbase = new HBaseBolt("WordCount", mapper)
                .withConfigKey("hbase.conf");

        // 構建 Topology
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout(),1);
        // split
        builder.setBolt(SPLIT_BOLT, new SplitBolt(), 1).shuffleGrouping(DATA_SOURCE_SPOUT);
        // count
        builder.setBolt(COUNT_BOLT, new CountBolt(),1).shuffleGrouping(SPLIT_BOLT);
        // save to HBase
        builder.setBolt(HBASE_BOLT, hbase, 1).shuffleGrouping(COUNT_BOLT);


        // 若是外部傳參 cluster 則表明線上環境啓動,不然表明本地啓動
        if (args.length > 0 && args[0].equals("cluster")) {
            try {
                StormSubmitter.submitTopology("ClusterWordCountToRedisApp", config, builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                e.printStackTrace();
            }
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalWordCountToRedisApp",
                    config, builder.createTopology());
        }
    }
}

2.7 啓動測試

能夠用直接使用本地模式運行,也能夠打包後提交到服務器集羣運行。本倉庫提供的源碼默認採用 maven-shade-plugin 進行打包,打包命令以下:dom

# mvn clean package -D maven.test.skip=true

運行後,數據會存儲到 HBase 的 WordCount 表中。使用如下命令查看錶的內容:

hbase >  scan 'WordCount'

2.8 withCounterFields

在上面的用例中咱們是手動編碼來實現詞頻統計,並將最後的結果存儲到 HBase 中。其實也能夠在構建 SimpleHBaseMapper 的時候經過 withCounterFields 指定 count 字段,被指定的字段會自動進行累加操做,這樣也能夠實現詞頻統計。須要注意的是 withCounterFields 指定的字段必須是 Long 類型,不能是 String 類型。

SimpleHBaseMapper mapper = new SimpleHBaseMapper() 
        .withRowKeyField("word")
        .withColumnFields(new Fields("word"))
        .withCounterFields(new Fields("count"))
        .withColumnFamily("cf");

參考資料

  1. Apache HDFS Integration
  2. Apache HBase Integration

更多大數據系列文章能夠參見 GitHub 開源項目大數據入門指南

相關文章
相關標籤/搜索