flume+sparkStreaming實例 實時監控文件demo

1,flume所在的節點不和spark同一個集羣  v50和 10-15節點 flume在v50裏面shell

flume-agent.confapache

 

spark是開的work節點,就是單點計算節點,不涉及到master發送管理 只是用到了sparkStreming的實時功能socket

開啓的是spark-shell不是spark-submit 提交jar的形式,提交jar的形式還須要後面研究下oop

以下  在結算節點下spa

  和flume的jar包要在各個節點上的spark 都要放入:
bin/spark-shell \
--jars /hadoop/spark/spark-2.0/jars/flume-ng-sdk-1.6.0.jar
,/hadoop/spark/spark-2.0/jars/flume-avro-source-1.5.0.1.jar,
/hadoop/spark/spark-2.0/jars/spark-streaming-flume_2.11-2.0.0.jar \
--master local[2]
12端口blog

val stream = FlumeUtils.createStream(ssc, "10.0.10.12", 9999)
11端口的spark
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._hadoop

val ssc = new StreamingContext(sc, Seconds(5))rem

//9999就是開啓的端口 像socket同樣 用端口鏈接get

val stream = FlumeUtils.createStream(ssc, "10.0.10.12", 9999)it

val wordCountStream = stream.map(x => new String(x.event.getBody.array())).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)

wordCountStream.print()

ssc.start()
ssc.awaitTermination()

 

flume命令

bin/flume-ng agent --conf conf --conf-file conf/taile2stream.conf --name agent -Dflume.root.logger=INFO,console

相關文章
相關標籤/搜索