Structured Streaming教程(3) —— 與Kafka的集成

Structured Streaming最主要的生產環境應用場景就是配合kafka作實時處理,不過在Strucured Streaming中kafka的版本要求相對搞一些,只支持0.10及以上的版本。就在前一個月,咱們才從0.9升級到0.10,終於能夠嘗試structured streaming的不少用法,很開心~html

引入

若是是maven工程,直接添加對應的kafka的jar包便可:java

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
    <version>2.2.0</version>
</dependency>

讀取kafka的數據

以流的形式查詢

讀取的時候,能夠讀取某個topic,也能夠讀取多個topic,還能夠指定topic的通配符形式:sql

讀取一個topic

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

讀取多個topic

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

讀取通配符形式的topic組

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

以批的形式查詢

關於Kafka的offset,structured streaming默認提供了幾種方式:apache

設置每一個分區的起始和結束值

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

配置起始和結束的offset值(默認)

val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

Schema信息

讀取後的數據的Schema是固定的,包含的列以下:json

Column Type 說明
key binary 信息的key
value binary 信息的value(咱們本身的數據)
topic string 主題
partition int 分區
offset long 偏移值
timestamp long 時間戳
timestampType int 類型

source相關的配置

不管是流的形式,仍是批的形式,都須要一些必要的參數:bootstrap

  • kafka.bootstrap.servers kafka的服務器配置,host:post形式,用逗號進行分割,如host1:9000,host2:9000
  • assign,以json的形式指定topic信息
  • subscribe,經過逗號分隔,指定topic信息
  • subscribePattern,經過java的正則指定多個topic
    assign、subscribe、subscribePattern同時之中能使用一個。

其餘比較重要的參數有:數組

  • startingOffsets, offset開始的值,若是是earliest,則從最先的數據開始讀;若是是latest,則從最新的數據開始讀。默認流是latest,批是earliest
  • endingOffsets,最大的offset,只在批處理的時候設置,若是是latest則爲最新的數據
  • failOnDataLoss,在流處理時,當數據丟失時(好比topic被刪除了,offset在指定的範圍以外),查詢是否報錯,默認爲true。這個功能能夠當作是一種告警機制,若是對丟失數據不感興趣,能夠設置爲false。在批處理時,這個值老是爲true。
  • kafkaConsumer.pollTimeoutMs,excutor鏈接kafka的超時時間,默認是512ms
  • fetchOffset.numRetries,獲取kafka的offset信息時,嘗試的次數;默認是3次
  • fetchOffset.retryIntervalMs,嘗試從新讀取kafka offset信息時等待的時間,默認是10ms
  • maxOffsetsPerTrigger,trigger暫時不會用,不太明白什麼意思。Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.

寫入數據到Kafka

Apache kafka僅支持「至少一次」的語義,所以,不管是流處理仍是批處理,數據都有可能重複。好比,當出現失敗的時候,structured streaming會嘗試重試,可是不會肯定broker那端是否已經處理以及持久化該數據。可是若是query成功,那麼能夠判定的是,數據至少寫入了一次。比較常見的作法是,在後續處理kafka數據時,再進行額外的去重,關於這點,其實structured streaming有專門的解決方案。服務器

保存數據時的schema:maven

  • key,可選。若是沒有填,那麼key會當作null,kafka針對null會有專門的處理(待查)。
  • value,必須有
  • topic,可選。(若是配置option裏面有topic會覆蓋這個字段)

下面是sink輸出必需要有的參數:post

  • kafka.bootstrap.servers,kafka的集羣地址,host:port格式用逗號分隔。

流處理的數據寫入

// 基於配置指定topic
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// 在字段中包含topic
val ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()

批處理的數據寫入

跟流處理其實同樣

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()

kafka的特殊配置

針對Kafka的特殊處理,能夠經過DataStreamReader.option進行設置。

關於(詳細的kafka配置能夠參考consumer的官方文檔](http://kafka.apache.org/documentation.html#newconsumerconfigs)

以及kafka producer的配置

注意下面的參數是不能被設置的,不然kafka會拋出異常:

  • group.id kafka的source會在每次query的時候自定建立惟一的group id
  • auto.offset.reset 爲了不每次手動設置startingoffsets的值,structured streaming在內部消費時會自動管理offset。這樣就能保證訂閱動態的topic時不會丟失數據。startingOffsets在流處理時,只會做用於第一次啓動時,以後的處理都會自定的讀取保存的offset。
  • key.deserializer,value.deserializer,key.serializer,value.serializer 序列化與反序列化,都是ByteArraySerializer
  • enable.auto.commit kafka的source不會提交任何的offset
  • interceptor.classes 因爲kafka source讀取數據都是二進制的數組,所以不能使用任何攔截器進行處理。

參考

相關文章
相關標籤/搜索