Spark Streaming和Flume-NG對接實驗

  Spark Streaming是一個新的實時計算的利器,並且還在快速的發展。它將輸入流切分紅一個個的DStream轉換爲RDD,從而能夠使用Spark來處理。它直接支持多種數據源:Kafka, Flume, Twitter, ZeroMQ , TCP sockets等,有一些能夠操做的函數:mapreducejoinwindow等。html

  本文將Spark Streaming和Flume-NG進行對接,而後以官方內置的JavaFlumeEventCount做參考,稍做修改而後放到集羣上去運行。 java

  1、下載spark streaming的flume插件包,咱們這裏的spark版本是1.0.0(standlone),這個插件包的版本選擇spark-streaming-flume_2.10-1.0.1.jar,這個版本修復了一個重要的bug,參考下面參考中的7。git

  2、把spark的編譯後的jar包以及上面flume的插件,放入工程,編寫以下類(參考8中的例子修改而來),代碼以下:github

 1 package com.spark_streaming;
 2 
 3 import org.apache.spark.SparkConf;
 4 import org.apache.spark.api.java.function.Function;
 5 import org.apache.spark.streaming.*;
 6 import org.apache.spark.streaming.api.java.*;
 7 import org.apache.spark.streaming.flume.FlumeUtils;
 8 import org.apache.spark.streaming.flume.SparkFlumeEvent;
 9 
10 public final class JavaFlumeEventCount {
11   private JavaFlumeEventCount() {
12   }
13 
14   public static void main(String[] args) {
15 
16     String host = args[0];
17     int port = Integer.parseInt(args[1]);
18 
19     Duration batchInterval = new Duration(Integer.parseInt(args[2]));
20     SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
21     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
22     JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
23 
24     flumeStream.count();
25 
26     flumeStream.count().map(new Function<Long, String>() {
27       @Override
28       public String call(Long in) {
29         return "Received " + in + " flume events.";
30       }
31     }).print();
32 
33     ssc.start();
34     ssc.awaitTermination();
35   }
36 }

  這個和官方的區別是刪除了參數個數檢查和增長了自定義時間間隔(分割流),也就是第三個參數。這個類並無作太多處理,入門爲主。apache

  3、打包這個類到ifeng_spark.jar,連同spark-streaming-flume_2.10-1.0.1.jar一塊兒上傳到spark集羣中的節點上。api

  4、啓動flume,這個flume的sink要用avro,指定要發送到的spark集羣中的一個節點,咱們這裏是10.32.21.165:11000。socket

  5、在spark安裝根目錄下執行以下命令:maven

  ./bin/spark-submit  --master spark://10.32.21.165:8070  --driver-memory 4G  --executor-memory 4G --jars /usr/lib/spark-1.0.0-cdh4/lib/spark-streaming-flume_2.10-1.0.1.jar,/usr/lib/flume-ng-1.4-cdh4.6.0/lib/flume-ng-sdk-1.4.0-cdh6.0.jar  /usr/lib/spark-1.0.0-cdh4/ifeng_spark.jar   --class com.spark_streaming.JavaFlumeEventCount 10.32.21.165 11000 2000ide

   這個命令中的參數解釋請參考下面參考3中的解釋,也能夠本身增長一些參數,須要注意的是配置內存,本身根據須要自行增長內存(driver、executor)防止OOM。另外jars能夠同時加載多個jar包,逗號分隔。記得指定類後須要指定3個參數。函數

  若是沒有指定Flume的sdk包,會爆以下錯誤:

  java.lang.NoClassDefFoundError: Lorg/apache/flume/source/avro/AvroFlumeEvent;沒有找到類。這個類在flume的sdk包內,在jars參數中指定jar包位置就能夠。

  還有就是要將本身定義的業務類的jar單獨列出,不要放在jars參數指定,不然也會有錯誤拋出。

 

  運行後能夠看到大量的輸出信息,而後能夠看到有數據的RDD會統計出這個RDD有多少行,截圖以下,最後的部分就是這2秒(上面命令最後的參數設定的)統計結果:

     

  至此,flume-ng與spark的對接成功,這只是一個入門實驗。可根據須要靈活編寫相關的業務類來實現實時處理Flume傳輸的數據。

  spark streaming和一些數據傳輸工具對接能夠達到實時處理的目的。

 

  參考:

  一、https://spark.apache.org/docs/0.9.0/streaming-programming-guide.html

  二、http://www.cnblogs.com/cenyuhai/p/3577204.html

  三、http://blog.csdn.net/book_mmicky/article/details/25714545 , 重要的參數解釋

  四、http://blog.csdn.net/lskyne/article/details/37561235 , 這是一個例子

  五、http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20 , spark-flume插件下載

  六、http://outofmemory.cn/spark/configuration , spark一些可配置參數說明

  七、https://issues.apache.org/jira/browse/SPARK-1916  ,這是1.0.1以前版本中spark streaming與flume對接的一個bug信息

  八、https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming , 這是java版本的spark streaming的一些例子,裏面有flume的一個

相關文章
相關標籤/搜索