Spark Streaming是一個新的實時計算的利器,並且還在快速的發展。它將輸入流切分紅一個個的DStream轉換爲RDD,從而能夠使用Spark來處理。它直接支持多種數據源:Kafka, Flume, Twitter, ZeroMQ , TCP sockets等,有一些能夠操做的函數:map
, reduce
, join
, window等。
html
本文將Spark Streaming和Flume-NG進行對接,而後以官方內置的JavaFlumeEventCount做參考,稍做修改而後放到集羣上去運行。 java
1、下載spark streaming的flume插件包,咱們這裏的spark版本是1.0.0(standlone),這個插件包的版本選擇spark-streaming-flume_2.10-1.0.1.jar,這個版本修復了一個重要的bug,參考下面參考中的7。git
2、把spark的編譯後的jar包以及上面flume的插件,放入工程,編寫以下類(參考8中的例子修改而來),代碼以下:github
1 package com.spark_streaming; 2 3 import org.apache.spark.SparkConf; 4 import org.apache.spark.api.java.function.Function; 5 import org.apache.spark.streaming.*; 6 import org.apache.spark.streaming.api.java.*; 7 import org.apache.spark.streaming.flume.FlumeUtils; 8 import org.apache.spark.streaming.flume.SparkFlumeEvent; 9 10 public final class JavaFlumeEventCount { 11 private JavaFlumeEventCount() { 12 } 13 14 public static void main(String[] args) { 15 16 String host = args[0]; 17 int port = Integer.parseInt(args[1]); 18 19 Duration batchInterval = new Duration(Integer.parseInt(args[2])); 20 SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); 21 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); 22 JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port); 23 24 flumeStream.count(); 25 26 flumeStream.count().map(new Function<Long, String>() { 27 @Override 28 public String call(Long in) { 29 return "Received " + in + " flume events."; 30 } 31 }).print(); 32 33 ssc.start(); 34 ssc.awaitTermination(); 35 } 36 }
這個和官方的區別是刪除了參數個數檢查和增長了自定義時間間隔(分割流),也就是第三個參數。這個類並無作太多處理,入門爲主。apache
3、打包這個類到ifeng_spark.jar,連同spark-streaming-flume_2.10-1.0.1.jar一塊兒上傳到spark集羣中的節點上。api
4、啓動flume,這個flume的sink要用avro,指定要發送到的spark集羣中的一個節點,咱們這裏是10.32.21.165:11000。socket
5、在spark安裝根目錄下執行以下命令:maven
./bin/spark-submit --master spark://10.32.21.165:8070 --driver-memory 4G --executor-memory 4G --jars /usr/lib/spark-1.0.0-cdh4/lib/spark-streaming-flume_2.10-1.0.1.jar,/usr/lib/flume-ng-1.4-cdh4.6.0/lib/flume-ng-sdk-1.4.0-cdh6.0.jar /usr/lib/spark-1.0.0-cdh4/ifeng_spark.jar --class com.spark_streaming.JavaFlumeEventCount 10.32.21.165 11000 2000ide
這個命令中的參數解釋請參考下面參考3中的解釋,也能夠本身增長一些參數,須要注意的是配置內存,本身根據須要自行增長內存(driver、executor)防止OOM。另外jars能夠同時加載多個jar包,逗號分隔。記得指定類後須要指定3個參數。函數
若是沒有指定Flume的sdk包,會爆以下錯誤:
java.lang.NoClassDefFoundError: Lorg/apache/flume/source/avro/AvroFlumeEvent;沒有找到類。這個類在flume的sdk包內,在jars參數中指定jar包位置就能夠。
還有就是要將本身定義的業務類的jar單獨列出,不要放在jars參數指定,不然也會有錯誤拋出。
運行後能夠看到大量的輸出信息,而後能夠看到有數據的RDD會統計出這個RDD有多少行,截圖以下,最後的部分就是這2秒(上面命令最後的參數設定的)統計結果:
至此,flume-ng與spark的對接成功,這只是一個入門實驗。可根據須要靈活編寫相關的業務類來實現實時處理Flume傳輸的數據。
spark streaming和一些數據傳輸工具對接能夠達到實時處理的目的。
參考:
一、https://spark.apache.org/docs/0.9.0/streaming-programming-guide.html
二、http://www.cnblogs.com/cenyuhai/p/3577204.html
三、http://blog.csdn.net/book_mmicky/article/details/25714545 , 重要的參數解釋
四、http://blog.csdn.net/lskyne/article/details/37561235 , 這是一個例子
五、http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20 , spark-flume插件下載
六、http://outofmemory.cn/spark/configuration , spark一些可配置參數說明
七、https://issues.apache.org/jira/browse/SPARK-1916 ,這是1.0.1以前版本中spark streaming與flume對接的一個bug信息
八、https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming , 這是java版本的spark streaming的一些例子,裏面有flume的一個