經過Spark Streaming處理交易數據

Apache Spark 是加州大學伯克利分校的 AMPLabs 開發的開源分佈式輕量級通用計算框架。ios

因爲 Spark 基於內存設計,使得它擁有比 Hadoop 更高的性能(極端狀況下能夠達到 100x),而且對多語言(Scala、Java、Python)提供支持。apache

其一棧式設計特色使得咱們的學習和維護成本大大地減小,並且其提供了很好的容錯解決方案bootstrap

 

業務場景框架

咱們天天都有來自全國各地的自然氣購氣數據,並根據用戶的充氣,退氣,覈銷等實時計算分析的是用戶訂單數數據,因爲數據量比較大,單臺機器處理已經達到了瓶頸;綜合業務場景分析,咱們選用 Spark Streaming + Kafka+Flume+Hbase+kudu 來處理這些日誌;又由於業務系統不統一,先經過Spark Streaming對數據進行清洗後再回寫kafka集羣,由於會有其餘業務也須要kafka的數據;經過經過不一樣的程序對kafka數據進行消費,用戶記錄以多版本方式記錄到hbase;須要常常統計的指標業務數據寫入kududom

 

業務代碼:分佈式

  建立DStreamide

val sparkConf = new SparkConf().setAppName("OrderSpark")

val sc = new SparkContext(sparkConf)

val ssc = new StreamingContext(sc, Seconds(10))

val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerAddress,"group.id" -> groupId)

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,StringDecoder](ssc, kafkaParams, Set(topic))

返回的messages 是一個 DStream,它是對 RDD 的封裝,其上的不少操做都相似於 RDD;函數

createDirectStream 函數是 Spark 1.3.0 開始引入的,其內部實現是調用 Kafka 的低層次 API,Spark 自己維護 Kafka 偏移量等信息,因此能夠保證數據零丟失oop

可是機器一旦宕機或者重啓時,可能會存在重複消費;所以咱們能夠經過本身對offset進行checkpoint性能

 

  獲取kafkaoffset

   val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    var offsetRanges = Array[OffsetRange]()
    kafkaStream.transform{ rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }.foreachRDD(rdd=>{
      for(o <- offsetRanges) {
        println(s"@@@@@@ topic  ${o.topic}  partition ${o.partition}  fromoffset ${o.fromOffset}  untiloffset ${o.untilOffset} #######")
      }
}

爲了可以在 Spark Streaming 程序掛掉後又能從斷點處恢復,咱們每一個批次進行向zookeeper進行 Checkpoint;

這裏咱們沒有采用spark自帶的checkpoint,是由於一旦程序修改,以前序列化的checkpoint數據會衝突報錯,

固然checkpoint到文件也會隨之越大。(讀者能夠本身搜索spark 文件checkpoint的弊端)

 

  啓動實時程序

    ssc.start()
    ssc.awaitTermination()

 

  因業務所需須要向kafka回寫數據

  

rdd.foreachPartition(partition=>{
        val props = new Properties()
        props.put("bootstrap.servers",Constans.brokers)
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        val producer = new KafkaProducer[String,String](props)
        partition.foreach(r=>{
          val record = new ProducerRecord[String, String](Constans.topic_kc, new Random().nextInt(3), "", msg)
      producer.send(record,new Callback() {
       override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
       if (null != e) {
       println("發送消息失敗=>"+msg)
       }
      }
      })
  }) producer.close() })

 

監控

系統部署上線以後,咱們沒法保證系統 7x24 小時都正常運行,即便是在運行着,咱們也沒法保證 Job 不堆積、是否及時處理 Kafka 中的數據;並且 Spark Streaming 系統自己就不很穩定。因此咱們須要實時地監控系統,包括監控Kafka 集羣、Spark Streaming 程序。咱們全部的監控都是CDH自帶監控管理和Ganglia以及nagios,一旦檢測到異常,系統會本身先重試是否能夠本身恢復,若是不行,就會給咱們發送報警郵件和打電話。
相關文章
相關標籤/搜索