本用例源碼下載地址:storm-hdfs-integrationhtml
項目主要依賴以下,有兩個地方須要注意:java
<repository>
標籤指定 CDH 的倉庫地址;hadoop-common
、hadoop-client
、hadoop-hdfs
均須要排除 slf4j-log4j12
依賴,緣由是 storm-core
中已經有該依賴,不排除的話有 JAR 包衝突的風險;<properties> <storm.version>1.2.2</storm.version> </properties> <repositories> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> </dependency> <!--Storm 整合 HDFS 依賴--> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hdfs</artifactId> <version>${storm.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0-cdh5.15.2</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-cdh5.15.2</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0-cdh5.15.2</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies>
/** * 產生詞頻樣本的數據源 */ public class DataSourceSpout extends BaseRichSpout { private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); private SpoutOutputCollector spoutOutputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; } @Override public void nextTuple() { // 模擬產生數據 String lineData = productData(); spoutOutputCollector.emit(new Values(lineData)); Utils.sleep(1000); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("line")); } /** * 模擬數據 */ private String productData() { Collections.shuffle(list); Random random = new Random(); int endIndex = random.nextInt(list.size()) % (list.size()) + 1; return StringUtils.join(list.toArray(), "\t", 0, endIndex); } }
產生的模擬數據格式以下:git
Spark HBase Hive Flink Storm Hadoop HBase Spark Flink HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Hive Flink Hadoop HBase Hive Hadoop Spark HBase Storm
這裏 HDFS 的地址和數據存儲路徑均使用了硬編碼,在實際開發中能夠經過外部傳參指定,這樣程序更爲靈活。github
public class DataToHdfsApp { private static final String DATA_SOURCE_SPOUT = "dataSourceSpout"; private static final String HDFS_BOLT = "hdfsBolt"; public static void main(String[] args) { // 指定 Hadoop 的用戶名 若是不指定,則在 HDFS 建立目錄時候有可能拋出無權限的異常 (RemoteException: Permission denied) System.setProperty("HADOOP_USER_NAME", "root"); // 定義輸出字段 (Field) 之間的分隔符 RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter("|"); // 同步策略: 每 100 個 tuples 以後就會把數據從緩存刷新到 HDFS 中 SyncPolicy syncPolicy = new CountSyncPolicy(100); // 文件策略: 每一個文件大小上限 1M,超過限定時,建立新文件並繼續寫入 FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, Units.MB); // 定義存儲路徑 FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath("/storm-hdfs/"); // 定義 HdfsBolt HdfsBolt hdfsBolt = new HdfsBolt() .withFsUrl("hdfs://hadoop001:8020") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); // 構建 Topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout()); // save to HDFS builder.setBolt(HDFS_BOLT, hdfsBolt, 1).shuffleGrouping(DATA_SOURCE_SPOUT); // 若是外部傳參 cluster 則表明線上環境啓動,不然表明本地啓動 if (args.length > 0 && args[0].equals("cluster")) { try { StormSubmitter.submitTopology("ClusterDataToHdfsApp", new Config(), builder.createTopology()); } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); } } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalDataToHdfsApp", new Config(), builder.createTopology()); } } }
能夠用直接使用本地模式運行,也能夠打包後提交到服務器集羣運行。本倉庫提供的源碼默認採用 maven-shade-plugin
進行打包,打包命令以下:shell
# mvn clean package -D maven.test.skip=true
運行後,數據會存儲到 HDFS 的 /storm-hdfs
目錄下。使用如下命令能夠查看目錄內容:apache
# 查看目錄內容 hadoop fs -ls /storm-hdfs # 監聽文內容變化 hadoop fs -tail -f /strom-hdfs/文件名
集成用例: 進行詞頻統計並將最後的結果存儲到 HBase,項目主要結構以下:緩存
本用例源碼下載地址:storm-hbase-integration服務器
<properties> <storm.version>1.2.2</storm.version> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> </dependency> <!--Storm 整合 HBase 依賴--> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-hbase</artifactId> <version>${storm.version}</version> </dependency> </dependencies>
/** * 產生詞頻樣本的數據源 */ public class DataSourceSpout extends BaseRichSpout { private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive"); private SpoutOutputCollector spoutOutputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; } @Override public void nextTuple() { // 模擬產生數據 String lineData = productData(); spoutOutputCollector.emit(new Values(lineData)); Utils.sleep(1000); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("line")); } /** * 模擬數據 */ private String productData() { Collections.shuffle(list); Random random = new Random(); int endIndex = random.nextInt(list.size()) % (list.size()) + 1; return StringUtils.join(list.toArray(), "\t", 0, endIndex); } }
產生的模擬數據格式以下:app
Spark HBase Hive Flink Storm Hadoop HBase Spark Flink HBase Storm HBase Hadoop Hive Flink HBase Flink Hive Storm Hive Flink Hadoop HBase Hive Hadoop Spark HBase Storm
/** * 將每行數據按照指定分隔符進行拆分 */ public class SplitBolt extends BaseRichBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String line = input.getStringByField("line"); String[] words = line.split("\t"); for (String word : words) { collector.emit(tuple(word, 1)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
/** * 進行詞頻統計 */ public class CountBolt extends BaseRichBolt { private Map<String, Integer> counts = new HashMap<>(); private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector=collector; } @Override public void execute(Tuple input) { String word = input.getStringByField("word"); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); // 輸出 collector.emit(new Values(word, String.valueOf(count))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
/** * 進行詞頻統計 並將統計結果存儲到 HBase 中 */ public class WordCountToHBaseApp { private static final String DATA_SOURCE_SPOUT = "dataSourceSpout"; private static final String SPLIT_BOLT = "splitBolt"; private static final String COUNT_BOLT = "countBolt"; private static final String HBASE_BOLT = "hbaseBolt"; public static void main(String[] args) { // storm 的配置 Config config = new Config(); // HBase 的配置 Map<String, Object> hbConf = new HashMap<>(); hbConf.put("hbase.rootdir", "hdfs://hadoop001:8020/hbase"); hbConf.put("hbase.zookeeper.quorum", "hadoop001:2181"); // 將 HBase 的配置傳入 Storm 的配置中 config.put("hbase.conf", hbConf); // 定義流數據與 HBase 中數據的映射 SimpleHBaseMapper mapper = new SimpleHBaseMapper() .withRowKeyField("word") .withColumnFields(new Fields("word","count")) .withColumnFamily("info"); /* * 給 HBaseBolt 傳入表名、數據映射關係、和 HBase 的配置信息 * 表須要預先建立: create 'WordCount','info' */ HBaseBolt hbase = new HBaseBolt("WordCount", mapper) .withConfigKey("hbase.conf"); // 構建 Topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout(),1); // split builder.setBolt(SPLIT_BOLT, new SplitBolt(), 1).shuffleGrouping(DATA_SOURCE_SPOUT); // count builder.setBolt(COUNT_BOLT, new CountBolt(),1).shuffleGrouping(SPLIT_BOLT); // save to HBase builder.setBolt(HBASE_BOLT, hbase, 1).shuffleGrouping(COUNT_BOLT); // 若是外部傳參 cluster 則表明線上環境啓動,不然表明本地啓動 if (args.length > 0 && args[0].equals("cluster")) { try { StormSubmitter.submitTopology("ClusterWordCountToRedisApp", config, builder.createTopology()); } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) { e.printStackTrace(); } } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LocalWordCountToRedisApp", config, builder.createTopology()); } } }
能夠用直接使用本地模式運行,也能夠打包後提交到服務器集羣運行。本倉庫提供的源碼默認採用 maven-shade-plugin
進行打包,打包命令以下:dom
# mvn clean package -D maven.test.skip=true
運行後,數據會存儲到 HBase 的 WordCount
表中。使用如下命令查看錶的內容:
hbase > scan 'WordCount'
在上面的用例中咱們是手動編碼來實現詞頻統計,並將最後的結果存儲到 HBase 中。其實也能夠在構建 SimpleHBaseMapper
的時候經過 withCounterFields
指定 count 字段,被指定的字段會自動進行累加操做,這樣也能夠實現詞頻統計。須要注意的是 withCounterFields 指定的字段必須是 Long 類型,不能是 String 類型。
SimpleHBaseMapper mapper = new SimpleHBaseMapper() .withRowKeyField("word") .withColumnFields(new Fields("word")) .withCounterFields(new Fields("count")) .withColumnFamily("cf");
更多大數據系列文章能夠參見 GitHub 開源項目: 大數據入門指南