Spark 系列(十六)—— Spark Streaming 整合 Kafka

1、版本說明

Spark 針對 Kafka 的不一樣版本,提供了兩套整合方案:spark-streaming-kafka-0-8spark-streaming-kafka-0-10,其主要區別以下:html

spark-streaming-kafka-0-8 spark-streaming-kafka-0-10
Kafka 版本 0.8.2.1 or higher 0.10.0 or higher
AP 狀態 Deprecated
從 Spark 2.3.0 版本開始,Kafka 0.8 支持已被棄用
Stable(穩定版)
語言支持 Scala, Java, Python Scala, Java
Receiver DStream Yes No
Direct DStream Yes Yes
SSL / TLS Support No Yes
Offset Commit API(偏移量提交) No Yes
Dynamic Topic Subscription
(動態主題訂閱)
No Yes

本文使用的 Kafka 版本爲 kafka_2.12-2.2.0,故採用第二種方式進行整合。java

2、項目依賴

項目採用 Maven 進行構建,主要依賴以下:git

<properties>
    <scala.version>2.12</scala.version>
</properties>

<dependencies>
    <!-- Spark Streaming-->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_${scala.version}</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- Spark Streaming 整合 Kafka 依賴-->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
        <version>2.4.3</version>
    </dependency>
</dependencies>

完整源碼見本倉庫:spark-streaming-kafkagithub

3、整合Kafka

經過調用 KafkaUtils 對象的 createDirectStream 方法來建立輸入流,完整代碼以下:shell

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * spark streaming 整合 kafka
  */
object KafkaDirectStream {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("KafkaDirectStream").setMaster("local[2]")
    val streamingContext = new StreamingContext(sparkConf, Seconds(5))

    val kafkaParams = Map[String, Object](
      /*
       * 指定 broker 的地址清單,清單裏不須要包含全部的 broker 地址,生產者會從給定的 broker 裏查找其餘 broker 的信息。
       * 不過建議至少提供兩個 broker 的信息做爲容錯。
       */
      "bootstrap.servers" -> "hadoop001:9092",
      /*鍵的序列化器*/
      "key.deserializer" -> classOf[StringDeserializer],
      /*值的序列化器*/
      "value.deserializer" -> classOf[StringDeserializer],
      /*消費者所在分組的 ID*/
      "group.id" -> "spark-streaming-group",
      /*
       * 該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的狀況下該做何處理:
       * latest: 在偏移量無效的狀況下,消費者將從最新的記錄開始讀取數據(在消費者啓動以後生成的記錄)
       * earliest: 在偏移量無效的狀況下,消費者將從起始位置讀取分區的記錄
       */
      "auto.offset.reset" -> "latest",
      /*是否自動提交*/
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    
    /*能夠同時訂閱多個主題*/
    val topics = Array("spark-streaming-topic")
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      /*位置策略*/
      PreferConsistent,
      /*訂閱主題*/
      Subscribe[String, String](topics, kafkaParams)
    )

    /*打印輸入流*/
    stream.map(record => (record.key, record.value)).print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

3.1 ConsumerRecord

這裏得到的輸入流中每個 Record 其實是 ConsumerRecord<K, V> 的實例,其包含了 Record 的全部可用信息,源碼以下:apache

public class ConsumerRecord<K, V> {
    
    public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
    public static final int NULL_SIZE = -1;
    public static final int NULL_CHECKSUM = -1;
    
    /*主題名稱*/
    private final String topic;
    /*分區編號*/
    private final int partition;
    /*偏移量*/
    private final long offset;
    /*時間戳*/
    private final long timestamp;
    /*時間戳表明的含義*/
    private final TimestampType timestampType;
    /*鍵序列化器*/
    private final int serializedKeySize;
    /*值序列化器*/
    private final int serializedValueSize;
    /*值序列化器*/
    private final Headers headers;
    /*鍵*/
    private final K key;
    /*值*/
    private final V value;
    .....   
}

3.2 生產者屬性

在示例代碼中 kafkaParams 封裝了 Kafka 消費者的屬性,這些屬性和 Spark Streaming 無關,是 Kafka 原生 API 中就有定義的。其中服務器地址、鍵序列化器和值序列化器是必選的,其餘配置是可選的。其他可選的配置項以下:bootstrap

1. fetch.min.byte

消費者從服務器獲取記錄的最小字節數。若是可用的數據量小於設置值,broker 會等待有足夠的可用數據時纔會把它返回給消費者。服務器

2. fetch.max.wait.ms

broker 返回給消費者數據的等待時間。session

3. max.partition.fetch.bytes

分區返回給消費者的最大字節數。異步

4. session.timeout.ms

消費者在被認爲死亡以前能夠與服務器斷開鏈接的時間。

5. auto.offset.reset

該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的狀況下該做何處理:

  • latest(默認值) :在偏移量無效的狀況下,消費者將從其啓動以後生成的最新的記錄開始讀取數據;
  • earliest :在偏移量無效的狀況下,消費者將從起始位置讀取分區的記錄。

6. enable.auto.commit

是否自動提交偏移量,默認值是 true,爲了不出現重複數據和數據丟失,能夠把它設置爲 false。

7. client.id

客戶端 id,服務器用來識別消息的來源。

8. max.poll.records

單次調用 poll() 方法可以返回的記錄數量。

9. receive.buffer.bytes 和 send.buffer.byte

這兩個參數分別指定 TCP socket 接收和發送數據包緩衝區的大小,-1 表明使用操做系統的默認值。

3.3 位置策略

Spark Streaming 中提供了以下三種位置策略,用於指定 Kafka 主題分區與 Spark 執行程序 Executors 之間的分配關係:

  • PreferConsistent : 它將在全部的 Executors 上均勻分配分區;

  • PreferBrokers : 當 Spark 的 Executor 與 Kafka Broker 在同一機器上時能夠選擇該選項,它優先將該 Broker 上的首領分區分配給該機器上的 Executor;
  • PreferFixed : 能夠指定主題分區與特定主機的映射關係,顯示地將分區分配到特定的主機,其構造器以下:

@Experimental
def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy =
  new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))

@Experimental
def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =
  new PreferFixed(hostMap)

3.4 訂閱方式

Spark Streaming 提供了兩種主題訂閱方式,分別爲 SubscribeSubscribePattern。後者可使用正則匹配訂閱主題的名稱。其構造器分別以下:

/**
  * @param 須要訂閱的主題的集合
  * @param Kafka 消費者參數
  * @param offsets(可選): 在初始啓動時開始的偏移量。若是沒有,則將使用保存的偏移量或 auto.offset.reset 屬性的值
  */
def Subscribe[K, V](
    topics: ju.Collection[jl.String],
    kafkaParams: ju.Map[String, Object],
    offsets: ju.Map[TopicPartition, jl.Long]): ConsumerStrategy[K, V] = { ... }

/**
  * @param 須要訂閱的正則
  * @param Kafka 消費者參數
  * @param offsets(可選): 在初始啓動時開始的偏移量。若是沒有,則將使用保存的偏移量或 auto.offset.reset 屬性的值
  */
def SubscribePattern[K, V](
    pattern: ju.regex.Pattern,
    kafkaParams: collection.Map[String, Object],
    offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { ... }

在示例代碼中,咱們實際上並無指定第三個參數 offsets,因此程序默認採用的是配置的 auto.offset.reset 屬性的值 latest,即在偏移量無效的狀況下,消費者將從其啓動以後生成的最新的記錄開始讀取數據。

3.5 提交偏移量

在示例代碼中,咱們將 enable.auto.commit 設置爲 true,表明自動提交。在某些狀況下,你可能須要更高的可靠性,如在業務徹底處理完成後再提交偏移量,這時候可使用手動提交。想要進行手動提交,須要調用 Kafka 原生的 API :

  • commitSync: 用於異步提交;
  • commitAsync:用於同步提交。

具體提交方式能夠參見:Kafka 消費者詳解

4、啓動測試

4.1 建立主題

1. 啓動Kakfa

Kafka 的運行依賴於 zookeeper,須要預先啓動,能夠啓動 Kafka 內置的 zookeeper,也能夠啓動本身安裝的:

# zookeeper啓動命令
bin/zkServer.sh start

# 內置zookeeper啓動命令
bin/zookeeper-server-start.sh config/zookeeper.properties

啓動單節點 kafka 用於測試:

# bin/kafka-server-start.sh config/server.properties

2. 建立topic

# 建立用於測試主題
bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                    --replication-factor 1 \
                    --partitions 1  \
                    --topic spark-streaming-topic

# 查看全部主題
 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092

3. 建立生產者

這裏建立一個 Kafka 生產者,用於發送測試數據:

bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic spark-streaming-topic

4.2 本地模式測試

這裏我直接使用本地模式啓動 Spark Streaming 程序。啓動後使用生產者發送數據,從控制檯查看結果。

從控制檯輸出中能夠看到數據流已經被成功接收,因爲採用 kafka-console-producer.sh 發送的數據默認是沒有 key 的,因此 key 值爲 null。同時從輸出中也能夠看到在程序中指定的 groupId 和程序自動分配的 clientId

參考資料

  1. https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

更多大數據系列文章能夠參見 GitHub 開源項目大數據入門指南

相關文章
相關標籤/搜索