SparkStreaming兩種方式鏈接Flume

SparkStreaming 鏈接Flume的兩種方式分別爲:Push(推)和Pull(拉)的方式實現,以Spark Streaming的角度來看,Push方式屬於推送(由Flume向Spark推送數據);而Pull屬於拉取(Spark 拉取 Flume的輸出數據);

 Flume向SparkStreaming推送數據沒有研究明白,有大佬指點一下嗎?

萬分感謝!

1.Spark拉取Flume數據:

導入兩個jar包到flume/lib下java

 不然拋出這兩個異常:

org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink.SparkSinkapache

java.lang.IllegalStateException: begin() called when transaction is OPEN!app

2.編寫flume 工做文件:

a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/zhuzhu/apps/flumeSpooding
a1.sources.r1.fileHeader=true

# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
# 當前主機端口
a1.sinks.k1.hostname = 192.168.137.88
a1.sinks.k1.port = 9999

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.編寫SparkStreaming程序:

package day02

import java.net.InetSocketAddress

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @ClassName: StreamingFlume
 * @Description TODO 實時監控flume,統計flume數據產生,是Spark
 * @Author: Charon 
 * @Date: 2021/4/7 13:19
 * @Version 1.0
 **/
object StreamingFlume {

  def main(args: Array[String]): Unit = {
    //1.建立SparkConf對象
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingFlume")
    //2.建立SparkContext對象
    val sc = new SparkContext(conf)
    //設置日誌輸出格式,只打印異常日誌,在這裏設置沒有用
    //sc.setLogLevel("WARN")
    //3.建立StreamingContext,Seconds(5):輪詢機制,多久執行一次
    val ssc = new StreamingContext(sc, Seconds(5))
    //4.定義一個flume集合,能夠接受多個flume數據,多個用,隔開須要new
    val addresses = Seq(new InetSocketAddress("127.0.0.1", 5555))
    //5.獲取flume中的數據,
    val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK_2)
    // 6.截取flume數據:{"header":xxxxx   "body":xxxxxx}
    val lineDstream: DStream[String] = stream.map(x => new String(x.event.getBody.array()))
    lineDstream.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_).print()
    ssc.start()
    ssc.awaitTermination()
  }
}

 4。開啓flume監控文件,開啓SparkStreaming程序:

向指定目錄上傳文件spa

 

 

相關文章
相關標籤/搜索