Apache Spark 是加州大學伯克利分校的 AMPLabs 開發的開源分佈式輕量級通用計算框架。ios
因爲 Spark 基於內存設計,使得它擁有比 Hadoop 更高的性能(極端狀況下能夠達到 100x),而且對多語言(Scala、Java、Python)提供支持。apache
其一棧式設計特色使得咱們的學習和維護成本大大地減小,並且其提供了很好的容錯解決方案bootstrap
業務場景框架
咱們天天都有來自全國各地的自然氣購氣數據,並根據用戶的充氣,退氣,覈銷等實時計算分析的是用戶訂單數數據,因爲數據量比較大,單臺機器處理已經達到了瓶頸;綜合業務場景分析,咱們選用 Spark Streaming + Kafka+Flume+Hbase+kudu 來處理這些日誌;又由於業務系統不統一,先經過Spark Streaming對數據進行清洗後再回寫kafka集羣,由於會有其餘業務也須要kafka的數據;經過經過不一樣的程序對kafka數據進行消費,用戶記錄以多版本方式記錄到hbase;須要常常統計的指標業務數據寫入kududom
業務代碼:分佈式
建立DStreamide
val sparkConf = new SparkConf().setAppName("OrderSpark") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerAddress,"group.id" -> groupId) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,StringDecoder](ssc, kafkaParams, Set(topic))
返回的messages 是一個 DStream,它是對 RDD 的封裝,其上的不少操做都相似於 RDD;函數
createDirectStream 函數是 Spark 1.3.0 開始引入的,其內部實現是調用 Kafka 的低層次 API,Spark 自己維護 Kafka 偏移量等信息,因此能夠保證數據零丟失oop
可是機器一旦宕機或者重啓時,可能會存在重複消費;所以咱們能夠經過本身對offset進行checkpoint性能
獲取kafkaoffset
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) var offsetRanges = Array[OffsetRange]() kafkaStream.transform{ rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.foreachRDD(rdd=>{ for(o <- offsetRanges) { println(s"@@@@@@ topic ${o.topic} partition ${o.partition} fromoffset ${o.fromOffset} untiloffset ${o.untilOffset} #######") }
}
爲了可以在 Spark Streaming 程序掛掉後又能從斷點處恢復,咱們每一個批次進行向zookeeper進行 Checkpoint;
這裏咱們沒有采用spark自帶的checkpoint,是由於一旦程序修改,以前序列化的checkpoint數據會衝突報錯,
固然checkpoint到文件也會隨之越大。(讀者能夠本身搜索spark 文件checkpoint的弊端)
啓動實時程序
ssc.start()
ssc.awaitTermination()
因業務所需須要向kafka回寫數據
rdd.foreachPartition(partition=>{ val props = new Properties() props.put("bootstrap.servers",Constans.brokers) props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String,String](props) partition.foreach(r=>{ val record = new ProducerRecord[String, String](Constans.topic_kc, new Random().nextInt(3), "", msg)
producer.send(record,new Callback() {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (null != e) {
println("發送消息失敗=>"+msg)
}
}
})
}) producer.close() })
監控
系統部署上線以後,咱們沒法保證系統 7x24 小時都正常運行,即便是在運行着,咱們也沒法保證 Job 不堆積、是否及時處理 Kafka 中的數據;並且 Spark Streaming 系統自己就不很穩定。因此咱們須要實時地監控系統,包括監控Kafka 集羣、Spark Streaming 程序。咱們全部的監控都是CDH自帶監控管理和Ganglia以及nagios,一旦檢測到異常,系統會本身先重試是否能夠本身恢復,若是不行,就會給咱們發送報警郵件和打電話。