IDEA Spark Streaming Flume數據源 --解決沒法轉化爲實際輸入數據,及中文亂碼(Scala)

須要三步:shell

1.shell:往 1234 端口寫數據apache

nc localhost 1234 spa

2.shell: 啓動flume服務code

cd /usr/local2/flume/binblog

./flume-ng agent --conf /usr/local2/flume/conf -f /usr/local2/flume/conf/flume-to-spark.conf  --name a1ci

3.IDEA:get

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object DStream_Flume_source {
  def main(args: Array[String]): Unit = {
    val host="localhost"
    val port=4321
    val setIntervalTime=Seconds(2)
    val sparkConf=new SparkConf().setAppName("flume 數據源").setMaster("local[2]")
    val ssc=new StreamingContext(sparkConf,setIntervalTime)
    val stream=FlumeUtils.createStream(ssc,host,port,StorageLevel.MEMORY_ONLY_SER_2)
        stream.count().map(x=>"收到"+x+"個 flume events").print()
    val words=stream.flatMap(x=>new String(x.event.getBody.array()).split(" ")).map(x=>(x,1))
         words.reduceByKey((x,y)=>x+y).print()
      ssc.start()
    ssc.awaitTermination()
  }

}

在IDEA中能夠看到輸入的數據,中文也能夠照常顯示it

/usr/local2/flume/conf/flume-to-spark.confspark

     a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # Describe/configure the source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 1234

        # Describe the sink
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = localhost
        a1.sinks.k1.port = 4321

        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000000
        a1.channels.c1.transactionCapacity = 1000000

        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

 注意整個啓動順序:IDEA>>>>shell2>>>>shell1 不然報錯 io

相關文章
相關標籤/搜索