須要三步:shell
1.shell:往 1234 端口寫數據apache
nc localhost 1234 spa
2.shell: 啓動flume服務code
cd /usr/local2/flume/binblog
./flume-ng agent --conf /usr/local2/flume/conf -f /usr/local2/flume/conf/flume-to-spark.conf --name a1ci
3.IDEA:get
import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object DStream_Flume_source { def main(args: Array[String]): Unit = { val host="localhost" val port=4321 val setIntervalTime=Seconds(2) val sparkConf=new SparkConf().setAppName("flume 數據源").setMaster("local[2]") val ssc=new StreamingContext(sparkConf,setIntervalTime) val stream=FlumeUtils.createStream(ssc,host,port,StorageLevel.MEMORY_ONLY_SER_2) stream.count().map(x=>"收到"+x+"個 flume events").print() val words=stream.flatMap(x=>new String(x.event.getBody.array()).split(" ")).map(x=>(x,1)) words.reduceByKey((x,y)=>x+y).print() ssc.start() ssc.awaitTermination() } }
在IDEA中能夠看到輸入的數據,中文也能夠照常顯示it
/usr/local2/flume/conf/flume-to-spark.conf:spark
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 1234 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = localhost a1.sinks.k1.port = 4321 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000000 a1.channels.c1.transactionCapacity = 1000000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
注意整個啓動順序:IDEA>>>>shell2>>>>shell1 不然報錯 io