版本支持來自官網:html
Kafka: Spark Streaming 2.2.0 is compatible with Kafka broker versions 0.8.2.1 or higher. See the Kafka Integration Guide for more details.java
Similar to Spark, Spark Streaming is available through Maven Central. To write your own Spark Streaming program, you will have to add the following dependency to your SBT or Maven project.mysql
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency>
For ingesting data from sources like Kafka, Flume, and Kinesis that are not present in the Spark Streaming core API, you will have to add the corresponding artifact spark-streaming-xyz_2.11
to the dependencies. For example, some of the common ones are as follows.sql
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka-0-8_2.11 |
Flume | spark-streaming-flume_2.11 |
Kinesis | spark-streaming-kinesis-asl_2.11 [Amazon Software License] |
spark-streaming-kafka-0-8 | spark-streaming-kafka-0-10 | |
---|---|---|
Broker Version | 0.8.2.1 or higher | 0.10.0 or higher |
Api Stability | Stable | Experimental |
Language Support | Scala, Java, Python | Scala, Java |
Receiver DStream | Yes | No |
Direct DStream | Yes | Yes |
SSL / TLS Support | No | Yes |
Offset Commit Api | No | Yes |
Dynamic Topic Subscription | No | Yes |
pom.xml:apache
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.xp.cn</groupId> <artifactId>sparkXN</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <name>sparkXN</name> <url>http://maven.apache.org</url> <repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> <properties> <maven.compiler.source>1.5</maven.compiler.source> <maven.compiler.target>1.5</maven.compiler.target> <encoding>UTF-8</encoding> <hadoop.version>2.5.0</hadoop.version> <spark.version>2.2.0</spark.version> <scala.version>2.11.8</scala.version> <mysql.version>5.1.20</mysql.version> </properties> <dependencies> <!-- 導入scala的依賴 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- 導入sparkCore的依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- 導入sparkSql的依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- 導入sparkCore的依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- sparkStreaming 的依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- sparkStreaming 和kafka整合的依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- HDFS Client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- mysql依賴 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.version}</version> </dependency> <!-- Test --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.8.1</version> <scope>test</scope> </dependency> <!-- kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.1</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <!-- <configuration> <args> <arg>-make:transitive</arg> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration>--> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.4</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <!-- If you have classpath issue like NoDefClassError,... --> <!-- useManifestOnlyJar>false</useManifestOnlyJar --> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> </plugins> </build> </project>
spark實現:api
package com.xp.cn.streaming import org.apache.log4j.{Level, Logger} import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkContext, SparkConf} /** * Created by xupan on 2017/12/16. */ object KafkaStreaming { def main(args: Array[String]) { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) //建立conf,spark streaming至少要啓動兩個線程,一個負責接受數據,一個負責處理數據 val conf = new SparkConf().setAppName("FirstStreaming").setMaster("local[2]") //建立SparkContext val sc = new SparkContext(conf) //建立StreamingContext,每隔10秒產生一個批次 val ssc = new StreamingContext(sc, Seconds(10)) val zkQuorum = "xupan001:2181,xupan002:2181,xupan003:2181" val groupId = "g1" val topic = Map[String,Int]("test001" -> 1) //建立kafkaDStream //createDirectStream:上產環境用,底層API, //createStream,效率低,數據可能丟失 val data: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topic) val lines = data.map(_._2) val countSD = lines.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) //action觸發,每次計算只是計算當前批次的結果 countSD.print() //啓動,開始接收數據並用streamingContext.start() ssc.start() //等待處理中止,streamingContext.awaitTermination() ssc.awaitTermination() } }
-------------------------------------------
Time: 1513414020000 ms
-------------------------------------------
(test001,5)app
-------------------------------------------
Time: 1513414030000 ms
-------------------------------------------
(wwwwwwww,1)
(helo,1)
(qwwwww,1)less
-------------------------------------------
Time: 1513414040000 ms
-------------------------------------------
(wwwwwwww,1)
(helo,1)
(qwwwww,1)maven
-------------------------------------------
Time: 1513414050000 ms
-------------------------------------------
(wwwwwwww,10)
(helo,10)
(qwwwww,10)分佈式
-------------------------------------------
Time: 1513414060000 ms
-------------------------------------------
(wwwwwwww,5)
(helo,5)
(qwwwww,5)
說明:
This approach uses a Receiver to receive the data. The Receiver is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data.
However, under default configuration, this approach can lose data under failures (see receiver reliability. To ensure zero-data loss, you have to additionally enable Write Ahead Logs in Spark Streaming (introduced in Spark 1.2). This synchronously saves all the received Kafka data into write ahead logs on a distributed file system (e.g HDFS), so that all the data can be recovered on failure. See Deploying section in the streaming programming guide for more details on Write Ahead Logs.
此方法使用 Receiver 來接收數據. Receiver 是使用 Kafka high-level consumer API 實現的. 與全部 Receiver 同樣, Receiver 從 Kafka 接收的數據並存儲在 Spark executor 中, 而後由 Spark Streaming 啓動的做業處理數據.
然而, 在默認配置下, 這種方法在程序失敗時會丟失數據(請看 receiver reliability ( receiver 的可靠性) , 爲了確保零數據丟失, 必須啓用 Spark Streaming 中的 Write Ahead Logs(預寫日誌)(在 Spark 1.2 中引入), 這會同步保存全部從 Kafka 接收的數據寫入分佈式文件系統(例如 HDFS), 以便全部數據能夠在故障時恢復. 有關 Write Ahead (預寫日誌)
解決辦法:No Receivers
This new receiver-less 「direct」 approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka’s simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this feature was introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.
This approach has the following advantages over the receiver-based approach (i.e. Approach 1).
Simplified Parallelism: No need to create multiple input Kafka streams and union them. With directStream
, Spark Streaming will create as many RDD partitions as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.
Efficiency: Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminates the problem as there is no receiver, and hence no need for Write Ahead Logs. As long as you have sufficient Kafka retention, messages can be recovered from Kafka.
Exactly-once semantics: The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper. Offsets are tracked by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures. In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (see Semantics of output operations in the main programming guide for further information).
Note that one disadvantage of this approach is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself (see below).
這種新的無 Receiver 的 「direct(直接)」 方法已經在 Spark 1.3 中引入, 以確保更強的 end-to-end guarantees (端到端保證). 代替使用 receiver(接收器)來接收數據, 該方法週期性地查詢 Kafka 以得到每一個 topic(主題)和 partition(分區)中最新的 offset(偏移量), 而且定義每一個批量中處理的 offset ranges(偏移範圍). 當啓動做業時, Kafka 的 simple consumer API Kafka 用於從 kafka 中讀取定義好 offset ranges(偏移範圍)的數據(相似於從文件系統讀取文件). 請注意, 針對 Scala 和 Java API 此特性在 Spark 1.3 中就引入了, 針對 Python API 在 Spark 1.4 中開始引入.
這種方法相較方法 1 有如下優勢.
Simplified Parallelism(簡化並行性): 無需建立多個輸入 Kafka 流並聯合它們. 使用 directStream
, Spark Streaming 將建立與消費的 Kafka partition 同樣多的 RDD 分區, 這將從 Kafka 並行讀取數據. 所以, Kafka 和 RDD 分區之間存在一對一映射, 這更容易理解和調整.
Efficiency(高效): 在第一種方法中實現 zero-data loss (零數據丟失)須要將數據存儲在預寫日誌中, 該日誌進一步複製數據. 這其實是低效的, 由於數據有效地被複制兩次 - 一次是 Kafka, 另外一次是 Write Ahead Log. 第二種方法消除了問題, 由於沒有接收器, 所以不須要 Write Ahead Logs. 只要您的 Kafka 有足夠的保留時間消息能夠從 Kafka 恢復.
Exactly-once semantics(一次且僅一次語義): 第一種方法使用 Kafka 的 high level API(高級API)在 Zookeeper 中存儲 consume 的 offset(偏移量). 這是傳統上消費 Kafka 數據的方式. 雖然這種方法(與預寫日誌結合)能夠確保零數據丟失(即 at-least once 至少一次語義), 可是在某些故障狀況下, 一些 record(記錄)很小的可能性會被消費兩次. 這是由於 Spark Streaming 可靠接收的數據與 Zookeeper 跟蹤的 offset(偏移)之間存在不一致. 所以, 在第二種方法中, 咱們使用不依賴 Zookeeper 的 simple Kafka API. offset(偏移)由 Spark Streaming 在其 checkpoints(檢查點)內進行跟蹤. 這消除了 Spark Streaming 和 Zookeeper/Kafka 之間的不一致, 所以, 儘管出現故障, Spark Streaming 仍然有效地 exactly once(剛好一次)接收了每條 record(記錄). 爲了實現輸出結果的 exactly once(剛好一次)的語義, 將數據保存到外部數據存儲的輸出操做必須是冪等的, 或者是保存結果和 offset(偏移量)的原子事務(請參閱 Semantics of output operations 獲取更多信息).
注意, 這種方法的一個缺點是它不更新 Zookeeper 中的 offset(偏移), 所以基於 Zookeeper 的 Kafka 監控工具將不會顯示進度. 可是, 您能夠在每一個批處理中訪問由此方法處理的 offset(偏移量), 並本身更新 Zookeeper
No Receivers
package com.xp.cn.streaming import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Duration, StreamingContext} /** * Created by xupan on 2017/12/16. * kafka V:0.8 */ object KafkaDirectWordCount { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) //指定組名,消費者組之間消費數據是沒有關係的,A組記錄A組的偏移量,B組記錄B組的偏移量 //每個topic對應的消費組都有本身偏移量 topic+group===》offset val group = "g001" //建立SparkConf val conf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[2]") //建立SparkStreaming,並設置間隔時間 val ssc = new StreamingContext(conf, Duration(5000)) //指定消費的 topic 名字 val topic = "wwcc" //指定kafka的broker地址(sparkStream的Task直連到kafka的分區上,用更加底層的API消費,效率更高) val brokerList = "xupan001:9092,xupan001:9092,xupan001:9092" //指定zk的地址,後期更新消費的偏移量時使用(之後可使用Redis、MySQL來記錄偏移量) val zkQuorum = "xupan001:2181,xupan001:2181,xupan001:2181" //建立 stream 時使用的 topic 名字集合,SparkStreaming可同時消費多個topic val topics: Set[String] = Set(topic) //建立一個 ZKGroupTopicDirs 對象,實際上是指定往zk中寫入數據的目錄,用於保存偏移量 val topicDirs = new ZKGroupTopicDirs(group, topic) //獲取 zookeeper 中的路徑 "/g001/offsets/wordcount/" val zkTopicPath = s"${topicDirs.consumerOffsetDir}" //準備kafka的參數 val kafkaParams = Map( "metadata.broker.list" -> brokerList, "group.id" -> group, //從頭開始讀取數據 "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString ) //zookeeper 的host 和 ip,建立一個 client,用於跟新偏移量量的 //是zookeeper的客戶端,能夠從zk中讀取偏移量數據,並更新偏移量 val zkClient = new ZkClient(zkQuorum) //查詢該路徑下節點,也就是分區(默認有字節點爲咱們本身保存不一樣 partition 時生成的) // /g001/offsets/wordcount/0/10001" // /g001/offsets/wordcount/1/30001" // /g001/offsets/wordcount/2/10001" //zkTopicPath -> /g001/offsets/wordcount/ val children = zkClient.countChildren(zkTopicPath) /** * 兩種狀況: * 1:從頭讀 * 2:接着偏移量讀 */ var kafkaStream: InputDStream[(String, String)] = null //若是 zookeeper 中有保存 offset,咱們會利用這個 offset 做爲 kafkaStream 的起始位置 var fromOffsets: Map[TopicAndPartition, Long] = Map() //若是保存過 offset,讀過並記錄了offset if (children > 0) { for (i <- 0 until children) { // /g001/offsets/wordcount/0/10001 // /g001/offsets/wordcount/0 val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}") // wordcount/0 val tp = TopicAndPartition(topic, i) //將不一樣 partition 對應的 offset 增長到 fromOffsets 中 // wordcount/0 -> 10001 fromOffsets += (tp -> partitionOffset.toLong) } //Key: kafka的key values: "hello tom hello jerry" //這個會將 kafka 的消息進行 transform,最終 kafak 的數據都會變成 (kafka的key, message) 這樣的 tuple val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) //經過KafkaUtils建立直連的DStream(fromOffsets參數的做用是:按照前面計算好了的偏移量繼續消費數據) //[String, String, StringDecoder, StringDecoder, (String, String)] // key value key的解碼方式 value的解碼方式 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) } else { //若是未保存,根據 kafkaParam 的配置使用最新(largest)或者最舊的(smallest) offset kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } //偏移量的範圍 var offsetRanges = Array[OffsetRange]() //從kafka讀取的消息,DStream的Transform方法能夠將當前批次的RDD獲取出來 //該transform方法計算獲取到當前批次RDD,而後將RDD的偏移量取出來,而後在將RDD返回到DStream val transform: DStream[(String, String)] = kafkaStream.transform { rdd => //獲得該 rdd 對應 kafka 的消息的 offset //該RDD是一個KafkaRDD,能夠得到偏移量的範圍 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd } val messages: DStream[String] = transform.map(_._2) //依次迭代DStream中的RDD //foreachRDD觸發的實際操做是DStream轉換,kafkaStream.foreachRDD這一步其實是在Driver中執行的 //rdd.foreach是在Executor中執行的 messages.foreachRDD { rdd => //對RDD進行操做,觸發Action println("===============partionsnum:"+rdd.getNumPartitions) //foreachPartition在Executor中執行 rdd.foreachPartition(partition => partition.foreach(x => { //println(x) }) ) for (o <- offsetRanges) { println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") // /g001/offsets/wordcount/0 val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}" //將該 partition 的 offset 保存到 zookeeper // /g001/offsets/wordcount/0/20000 ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString) } } ssc.start() ssc.awaitTermination() } }
通過測試,很強健
wwcc 2 4096 4100
wwcc 0 0 0
wwcc 1 0 0
===============partionsnum:3
wwcc 2 4100 4100
wwcc 0 0 0
wwcc 1 0 0
===============partionsnum:3
17/12/16 21:18:45 INFO VerifiableProperties: Verifying properties
17/12/16 21:18:45 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest
17/12/16 21:18:45 INFO VerifiableProperties: Property group.id is overridden to g001
17/12/16 21:18:45 INFO VerifiableProperties: Property zookeeper.connect is overridden to
wwcc 2 4100 4111
wwcc 0 0 0
wwcc 1 0 0
===============partionsnum:3
17/12/16 21:18:50 INFO VerifiableProperties: Verifying properties
17/12/16 21:18:50 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest
17/12/16 21:18:50 INFO VerifiableProperties: Property group.id is overridden to g001
17/12/16 21:18:50 INFO VerifiableProperties: Property zookeeper.connect is overridden to
wwcc 2 4111 4125
wwcc 0 0 0
wwcc 1 0 0
===============partionsnum:3
17/12/16 21:18:55 INFO VerifiableProperties: Verifying properties
17/12/16 21:18:55 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest
17/12/16 21:18:55 INFO VerifiableProperties: Property group.id is overridden to g001
17/12/16 21:18:55 INFO VerifiableProperties: Property zookeeper.connect is overridden to
wwcc 2 4125 4135
Version2:
package com.xp.cn.streaming import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Duration, StreamingContext} /** * Created by zx on 2017/7/31. */ object KafkaDirectWordCountV2 { def main(args: Array[String]): Unit = { //指定組名 val group = "g001" //建立SparkConf val conf = new SparkConf().setAppName("KafkaDirectWordCount").setMaster("local[2]") //建立SparkStreaming,並設置間隔時間 val ssc = new StreamingContext(conf, Duration(5000)) //指定消費的 topic 名字 val topic = "wwcc" //指定kafka的broker地址(sparkStream的Task直連到kafka的分區上,用更加底層的API消費,效率更高) val brokerList = "xupan001:9092,xupan001:9092,xupan001:9092" //指定zk的地址,後期更新消費的偏移量時使用(之後可使用Redis、MySQL來記錄偏移量) val zkQuorum = "xupan001:2181,xupan001:2181,xupan001:2181" //建立 stream 時使用的 topic 名字集合,SparkStreaming可同時消費多個topic val topics: Set[String] = Set(topic) //建立一個 ZKGroupTopicDirs 對象,實際上是指定往zk中寫入數據的目錄,用於保存偏移量 val topicDirs = new ZKGroupTopicDirs(group, topic) //獲取 zookeeper 中的路徑 "/g001/offsets/wordcount/" val zkTopicPath = s"${topicDirs.consumerOffsetDir}" //準備kafka的參數 val kafkaParams = Map( "metadata.broker.list" -> brokerList, "group.id" -> group, //從頭開始讀取數據 "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString ) //zookeeper 的host 和 ip,建立一個 client,用於跟新偏移量量的 //是zookeeper的客戶端,能夠從zk中讀取偏移量數據,並更新偏移量 val zkClient = new ZkClient(zkQuorum) //查詢該路徑下是否字節點(默認有字節點爲咱們本身保存不一樣 partition 時生成的) // /g001/offsets/wordcount/0/10001" // /g001/offsets/wordcount/1/30001" // /g001/offsets/wordcount/2/10001" //zkTopicPath -> /g001/offsets/wordcount/ val children = zkClient.countChildren(zkTopicPath) var kafkaStream: InputDStream[(String, String)] = null //若是 zookeeper 中有保存 offset,咱們會利用這個 offset 做爲 kafkaStream 的起始位置 var fromOffsets: Map[TopicAndPartition, Long] = Map() //若是保存過 offset if (children > 0) { for (i <- 0 until children) { // /g001/offsets/wordcount/0/10001 // /g001/offsets/wordcount/0 val partitionOffset = zkClient.readData[String](s"$zkTopicPath/${i}") // wordcount/0 val tp = TopicAndPartition(topic, i) //將不一樣 partition 對應的 offset 增長到 fromOffsets 中 // wordcount/0 -> 10001 fromOffsets += (tp -> partitionOffset.toLong) } //Key: kafka的key values: "hello tom hello jerry" //這個會將 kafka 的消息進行 transform,最終 kafak 的數據都會變成 (kafka的key, message) 這樣的 tuple val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) //經過KafkaUtils建立直連的DStream(fromOffsets參數的做用是:按照前面計算好了的偏移量繼續消費數據) //[String, String, StringDecoder, StringDecoder, (String, String)] // key value key的解碼方式 value的解碼方式 kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler) } else { //若是未保存,根據 kafkaParam 的配置使用最新(largest)或者最舊的(smallest) offset kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } //偏移量的範圍 var offsetRanges = Array[OffsetRange]() //直連方式只有在KafkaDStream的RDD中才能獲取偏移量,那麼就不能到調用DStream的Transformation //因此只能子在kafkaStream調用foreachRDD,獲取RDD的偏移量,而後就是對RDD進行操做了 //依次迭代KafkaDStream中的KafkaRDD //foreachRDD觸發的實際操做是DStream轉換,kafkaStream.foreachRDD這一步其實是在Driver中調用的 //rdd.foreach是在Executor中執行的 kafkaStream.foreachRDD { kafkaRDD => //只有KafkaRDD能夠強轉成HasOffsetRanges,並獲取到偏移量 offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges val lines: RDD[String] = kafkaRDD.map(_._2) //對RDD進行操做,觸發Action //foreachPartition在Executor中執行 lines.foreachPartition(partition => partition.foreach(x => { println(x) }) ) for (o <- offsetRanges) { // /g001/offsets/wordcount/0 val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}" //將該 partition 的 offset 保存到 zookeeper // /g001/offsets/wordcount/0/20000 ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString) } } ssc.start() ssc.awaitTermination() } }