Spark Streaming與Kafka集成

Spark Streaming與Kafka集成

一、介紹

kafka是一個發佈訂閱消息系統,具備分佈式、分區化、多副本提交日誌特色。kafka項目在0.8和0.10之間引入了一種新型消費者API,注意選擇正確的包以得到相應的特性。每一個版本都是向後兼容的,所以0.8能夠兼容0.9和0.10,可是0.10不能兼容早期版本。0.8支持python、Receiver流和Direct流,不支持偏移量提交API以及動態分區訂閱,0.10不支持python和Receiver流,支持Direct流、偏移量提交API和動態分區訂閱。具體見表格:java

spark-streaming-kafka-0-8 spark-streaming-kafka-0-10
Broker Version 0.8.2.1 or higher 0.10.0 or higher
API Maturity 過期 穩定
支持語言 scala、java、python scala、java
Receiver流 支持 不支持
Direct流 支持 支持
SSL/TLS 不支持 支持
偏移量提交API 不支持 支持
動態分區訂閱 不支持 支持

二、Spark Streaming與kafka集成

0.10的集成方式和0.8相似,提供了在spark streaming 分區和kafka分區的1:1關係,能夠訪問偏移量和元數據。但因爲使用的是新型消費者API,而不是簡單API,所以仍是有諸多注意事項。python

2.1 建立模塊引入依賴

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.1.0</version>
</dependency>

**注意:**不要手動添加org.apache.kafka工件依賴,該依賴已經有正確的工件依賴,多個不一樣版本會致使不兼容。正則表達式

2.2 實現scala代碼

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

/**
 * Created by Administrator on 2018/3/8.
 */
object SparkStreamingKafkaScala {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("kafka")
    conf.setMaster("local[*]")

    val ssc = new StreamingContext(conf , Seconds(2))

    //kafka參數
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "s102:9092,s103:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "g1",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("topic1")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    val ds2 = stream.map(record => (record.key, record.value))
    ds2.print()

    ssc.start()

    ssc.awaitTermination()
  }
}

2.3 啓動kafka集羣

$>xkafka.sh start

2.4 建立主題

建立主題,指定分區數和副本數,分區數和集羣的內核數相同,保證最大併發能力,例若有三個節點,每一個節點8個和,分區數爲3 * 8 = 24,。sql

$>kafka-topics.sh --zookeeper s102:2181 --create --topic topic1 \
	--replication-factor 3 --partitions 8

2.5 啓動控制檯消費者

$>kafka-console-consumer.sh --zookeeper s102:2181 --topic topic1

2.6 啓動控制檯生產者

$>kafka-console-producer.sh --broker-list s102:9092 --topic topic1

2.7 在生產者終端輸入消息,檢查消費者是否可以收到

spark_037

2.8 啓動Spark streaming應用

spark_038

2.9 查看流的分區

//經過RDD查看分區數 , 分區數爲4
stream.foreachRDD(rdd=>{
	println(rdd.partitions.length)
})

2.10 查看kafka主題的分區

$>kafka-topics.sh --zookeeper s102:2181 --topic topic1 --describe

運行結果以下:shell

spark_039

三、相關參數

3.1 LocationStrategies

新型kafka消費者API會預提取kafka數據到buffer中,所以讓Spark在executor上保持緩存的consumer,對於性能來說就很是重要,而不是每一個batch建立新的consumer,選擇在執行器上對於給定的主題分區如何調度消費者。位置策略的本意就是控制消費者在哪些節點上開啓。數據庫

  • LocationStrategies.PreferConsistentapache

    大多數狀況下,選擇使用該方式,將在全部executors上均衡分佈分區進行調度。bootstrap

  • LocationStrategies.PreferBrokers緩存

    若是executor和kafka broker位於同一主機,則能夠使用該方式,這將優先調度那些分區爲leader的分區。併發

  • LocationStrategies.PreferFixed

    若是在分區間有嚴重的數據傾斜,能夠使用該方式,容許爲分區指定特定的位置進行調度。

3.2 ConsumerStrategies

新型kafka消費者API有幾種指定主題的方式。

  • ConsumerStrategies.Assign

    容許指定固定的分區集合。

  • ConsumerStrategies.Subscribe

    容許訂閱固定的主題集合。

  • ConsumerStrategies.SubscribePattern

    能夠使用正則表達式指定主題。

四、使用PreferFixed和Assign組合

指定s102消費主題的全部分區,每一個分區下消費特定的偏移量。

import java.net.Socket

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable.ArrayBuffer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

/**
  * Created by Administrator on 2018/3/8.
  */
object SparkStreamingKafkaScala {

  //發送消息給遠程socket
  def sendInfo(msg: String, objStr: String) = {
    //獲取ip
    val ip = java.net.InetAddress.getLocalHost.getHostAddress
    //獲得pid
    val rr = java.lang.management.ManagementFactory.getRuntimeMXBean();
    val pid = rr.getName().split("@")(0);
    //pid
    //線程
    val tname = Thread.currentThread().getName
    //對象id
    val sock = new java.net.Socket("s101", 8888)
    val out = sock.getOutputStream
    val m = ip + "\t:" + pid + "\t:" + tname + "\t:" + msg + "\t:" + objStr + "\r\n"
    out.write(m.getBytes)
    out.flush()
    out.close()
  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("kafka")
    //conf.setMaster("spark://s101:7077")
    conf.setMaster("local[8]")

    val ssc = new StreamingContext(conf, Seconds(5))

    //kafka參數
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "s102:9092,s103:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "g1",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

	//主題分區與主機名的映射,那個主題分區由哪臺主機消費
    val map = scala.collection.mutable.Map[TopicPartition,String]()
	/**************************************************
	 ***********   必定要使用ip地址 !!!!*****************
	 **************************************************/
    map.put(new TopicPartition("t1" , 0) , "192.168.231.102")
    map.put(new TopicPartition("t1" , 1) , "192.168.231.102")
    map.put(new TopicPartition("t1" , 2) , "192.168.231.102")
    map.put(new TopicPartition("t1" , 3) , "192.168.231.102")
    val fix = LocationStrategies.PreferFixed(map) ;

    val topics = Array("t1")

    //主題分區集合
    val tps = scala.collection.mutable.ArrayBuffer[TopicPartition]()
    tps.+=(new TopicPartition("t1" , 0))
    tps.+=(new TopicPartition("t1" , 1))
    tps.+=(new TopicPartition("t1" , 2))

    //偏移量集合
    val offsets = scala.collection.mutable.Map[TopicPartition,Long]()
    offsets.put(new TopicPartition("t1", 0), 3)
    offsets.put(new TopicPartition("t1", 1), 3)
    offsets.put(new TopicPartition("t1", 2), 0)
	
    //消費者策略,主題分區與偏移集合
    val conss = ConsumerStrategies.Assign[String,String](tps , kafkaParams , offsets)

    //建立kakfa直向流
    val stream = KafkaUtils.createDirectStream[String,String](
      ssc,
      fix,
      ConsumerStrategies.Assign[String, String](tps, kafkaParams, offsets)
    )

    val ds2 = stream.map(record => {
      val t = Thread.currentThread().getName
      val key = record.key()
      val value = record.value()
      val offset = record.offset()
      val par = record.partition()
      val topic = record.topic()
      val tt = (key , value ,offset, par,topic ,t)
      sendInfo(tt.toString() ,this.toString)
      tt
    })

    ds2.print()

    ssc.start()

    ssc.awaitTermination()
  }
}

五、手動提交偏移量

將rdd強制轉換成CanCommitOffsets,經過該trait進行提交,且只能異步提交,能夠指定回調函數對提交結果進行處理。

val stream = KafkaUtils.createDirectStream(...)
//...
//在driver端提交,由於是DStream的方法,DStream不能串行化。
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets, new OffsetCommitCallback() {
  //回調函數
  def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], e: Exception) {
    if (null != e) {
      // error
    } else {
      // success
    }
  }
})

六、消費語義

  • 最多一次

    at most ,最多消費一次。先提交,後消費。

    //先提交
    commitAsync()
    //後消費
    consume()

  • 最少一次

    //先消費
    consume()
    //後提交
    commitAsync()
  • 精準一次

    //提交偏移量到數據庫,不在kafka中
    conn.setAutoCommit();
    consume()
    updateOffset()
    conn.commit();

相關文章
相關標籤/搜索