Structured Streaming最主要的生產環境應用場景就是配合kafka作實時處理,不過在Strucured Streaming中kafka的版本要求相對搞一些,只支持0.10及以上的版本。就在前一個月,咱們才從0.9升級到0.10,終於能夠嘗試structured streaming的不少用法,很開心~html
若是是maven工程,直接添加對應的kafka的jar包便可:java
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency>
讀取的時候,能夠讀取某個topic,也能夠讀取多個topic,還能夠指定topic的通配符形式:sql
val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribePattern", "topic.*") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
關於Kafka的offset,structured streaming默認提供了幾種方式:apache
val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2") .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
val df = spark .read .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribePattern", "topic.*") .option("startingOffsets", "earliest") .option("endingOffsets", "latest") .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)]
讀取後的數據的Schema是固定的,包含的列以下:json
Column | Type | 說明 |
---|---|---|
key | binary | 信息的key |
value | binary | 信息的value(咱們本身的數據) |
topic | string | 主題 |
partition | int | 分區 |
offset | long | 偏移值 |
timestamp | long | 時間戳 |
timestampType | int | 類型 |
不管是流的形式,仍是批的形式,都須要一些必要的參數:bootstrap
其餘比較重要的參數有:數組
Apache kafka僅支持「至少一次」的語義,所以,不管是流處理仍是批處理,數據都有可能重複。好比,當出現失敗的時候,structured streaming會嘗試重試,可是不會肯定broker那端是否已經處理以及持久化該數據。可是若是query成功,那麼能夠判定的是,數據至少寫入了一次。比較常見的作法是,在後續處理kafka數據時,再進行額外的去重,關於這點,其實structured streaming有專門的解決方案。服務器
保存數據時的schema:maven
下面是sink輸出必需要有的參數:post
// 基於配置指定topic val ds = df .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "topic1") .start() // 在字段中包含topic val ds = df .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") .writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .start()
跟流處理其實同樣
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .write .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "topic1") .save() df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") .write .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .save()
針對Kafka的特殊處理,能夠經過DataStreamReader.option進行設置。
關於(詳細的kafka配置能夠參考consumer的官方文檔](http://kafka.apache.org/documentation.html#newconsumerconfigs)
注意下面的參數是不能被設置的,不然kafka會拋出異常: