Spark Streaming實現實時流處理

1、Streaming與Flume的聯調html

Spark 2.2.0 對應於 Flume 1.6.0
 
兩種模式:
 
1. Flume-style push-based approach:
 
Flume推送數據給Streaming
 
Streaming的receiver做爲Flume的Avro agent
 
Spark workers應該跑在Flume這臺機器上
 
Streaming先啓動,receiver監聽Flume push data的端口
 
 
實現:
 
寫flume配置文件:
netcat source -> memory channel -> avro sink
 
IDEA開發:
添加Spark-flume依賴
對應的API是FlumeUtils
 
開發代碼:
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
 
/*
* Spark Streaming整合Flume的第一種方式
* */
object FlumePushWordCount {
  def main(args: Array[String]): Unit = {
 
    //外部傳入參數
    if (args.length != 2) {
      System.out.println("Usage: FlumePushWordCount <hostname> <port>")
      System.exit(1)
    }
 
    val Array(hostname, port) = args  //外部args數組
 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    //選擇輸入ssc的createStream方法,生成一個InputDStream
    val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt)
 
    //因爲flume的內容有head有body, 須要先把內容拿出來, 並去掉空值
    flumeStream.map(x => new String(x.event.getBody.array()).trim)
        .flatMap(x => x.split(" ")).map(x => (x, 1)).reduceByKey(_+_).print()
 
    ssc.start()
    ssc.awaitTermination()
  }
}
 
 
注意:爲了避免hard-core,選擇外部傳入hostname和port
 
在IDEA測試時,能夠在
裏面的program argument輸入運行參數
 
在本地測試時:
 
先啓動Streaming做業,而後啓動flume agent,最後經過telnet輸入數據,觀察IDEA的控制檯輸出
 
 
在服務器測試時:
 
submit時必定要把maven依賴中在--packages加上,自動會在網絡上下載依賴
當不能下載時,須要--jars才能把預先下載好的jar包加上
 
 
 
2. Pull-based approach using a custom sink:
 
Streaming拉數據
 
Flume推送的數據先放到sink緩衝區
 
Streaming使用一個 reliable flume receiver,確保了數據的接收和備份
 
可靠性更高,支持容錯,生產上面經常使用
 
一臺機器運行Flume agent,Spark集羣其餘機器可訪問這臺機器的custom sink
 
實現:
 
Flume配置:
使用相關jars包,配置依賴:(參考Spark官網)
sink是一個獨特的type
 
IDEA開發:
對應上面Flume的依賴,使用的是 createPollStream,區別於第一種模式
其餘地方都同樣,體現了Spark代碼的複用性
 
本地測試:
先啓動flume!!後啓動Streaming做業
 
 
 
2、Streaming與Kafka的聯調
 
Spark2.2.0對應於Kafka 0.8.2.1或更新(本次使用的是0.9.0.0)
 
兩種模式:
 
1. Receiver-based approach
 
使用Kafka高級用戶API
爲了確保零數據丟失,須要用到 Write Ahead Logs(出現於Spark 1.2)
同步地保存接收到的數據到日誌當中,出錯時能夠恢復(容錯機制)
這是傳統的方式,在ZK server中消費數據
 
用KafkaUtils和Streaming對接,同樣須要加入kafka的各類依賴(見官網)
使用的API是createStream
 
注意:
  1. 此處的topic分區和RDD的分區不一樣概念
  2. 多個Kafka DStream能夠並行接收
  3. 用write ahead logs時須要配置StorageLevel.MEMORY_AND_DISK_SER
 
 
準備工做:
 
啓動ZK server
啓動kafka
./bin/kafka-server-start.sh -daemon ./config/server.properties
建立topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic
測試topic可否正確生產和消費
kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_streaming_topic
kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_streaming_topic
 
IDEA代碼:
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
 
/*
* SparkStreaming對接Kafka其中的Receiver-based方式
* */
object KafkaReceiverWordCount {
  def main(args: Array[String]): Unit = {
 
    if (args.length != 4) {
      System.out.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }
 
    val Array(zkQuorum, group, topics, numThreads) = args
 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    //createStream須要傳入的其中一個參數是一個Map,就是topics對應的線程數
    val topicsMap = topics.split(",").map((_, numThreads.toInt)).toMap
 
    val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicsMap)
 
    //必定要取Stream的第二位纔是數據,能夠print出來看看,在實際生產中只是更改這一行的業務邏輯!!!
    message.map(_._2).flatMap(_.split(",")).map((_, 1)).reduceByKey(_+_).print()
 
    ssc.start()
    ssc.awaitTermination()
  }
}
 
 
本地測試/服務器測試:
 
從IDEA中輸入參數,便可看到結果
 
從服務器測試也是打包submit就行,看web UI的時候留意驗證receiver是佔有一個Job的,證明了前面的理論
 
 
 
2. Direct Approach
 
No receiver!!!
 
Spark 1.3 版本開始有
 
沒有了Receiver,而是 週期性地檢測Kafka的offset,用了kafka simple consumer API
 
優勢:
  1. 簡化了並行度,不須要建立多個input stream
  2. 性能更好,達到零數據丟失,且不須要保存副本於write ahead logs中
  3.  一次語義Exactly-once semantics
 
缺點:不能在zookeeper中更新offset,但能夠本身設置讓其更新
 
 使用的API是createDirectStream
 
準備工做和上面同樣。
 
IDEA代碼:
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
 
 
/*
* SparkStreaming對接Kafka其中的Direct方式
* */
object KafkaDirectWordCount {
  def main(args: Array[String]): Unit = {
 
    if (args.length != 4) {
      System.out.println("Usage: KafkaReceiverWordCount <brokers> <topics>")
      System.exit(1)
    }
 
    val Array(brokers, topics) = args
 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    //createDirectStream須要傳入kafkaParams和topicsSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val topicsSet = topics.split(",").toSet
 
    val message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet
    )
 
 
    //必定要取Stream的第二位纔是數據,能夠print出來看看
    message.map(_._2).flatMap(_.split(",")).map((_, 1)).reduceByKey(_+_).print()
 
    ssc.start()
    ssc.awaitTermination()
  }
}
 
 
注意:StringDecoder有可能由於前面寫Kafka java API時的包衝突而導入失敗
 
在IDEA運行時報錯:
這是因爲以前在Kafka基礎學習中我設置的kafka的依賴是0.9.0.0,和咱們IDEA衝突,因此要把這一個依賴註釋掉才能執行
 
調優時就是配置createDirectStream的參數嘛!!
 
 
 
3、Flume + Kafka + Spark Streaming經常使用流處理架構
 
實現的需求:實時(到如今爲止)的日誌訪問統計操做
 
因爲本人缺少日誌採集來源,故使用python語言來實現一個 日誌生成器,模擬生產環境中服務器不斷生成日誌的過程
本生成器產生的日誌內容包括ip、time、url、status、referer
 
根據前面的知識,咱們在實現的過程當中有如下步驟:
1. Flume的選型,在本例中設爲exec-memory-kafka
2. 打開kafka一個消費者,再啓動flume讀取日誌生成器中的log文件,可看到kafka中成功讀取到日誌產生器的實時數據
3. 讓Kafka接收到的數據傳輸到Spark Streaming當中,這樣就能夠在Spark對實時接收到的數據進行操做了
 
因爲與前面1、二的操做基本一致,此處再也不重複列出詳細操做過程
 
 
下面直接進入Spark中對實時數據的操做:
 
分爲數據清洗過程、統計功能實現過程兩個步驟!其中統計功能的實現基本上和Spark SQL中的操做一致,這又體現了Spark的代碼複用性,即能通用於多個框架中
相關文章
相關標籤/搜索