kafka做爲一個實時的分佈式消息隊列,實時的生產和消費消息,這裏咱們能夠利用SparkStreaming實時計算框架實時地讀取kafka中的數據而後進行計算。在spark1.3版本後,kafkaUtils裏面提供了兩個建立dstream的方法,一種爲KafkaUtils.createDstream,另外一種爲KafkaUtils.createDirectStream。html
1.KafkaUtils.createDstream方式
構造函數爲KafkaUtils.createDstream(ssc,[zk], [consumer group id], [per-topic,partitions] ) 使用了receivers來接收數據,利用的是Kafka高層次的消費者api,對於全部的receivers接收到的數據將會保存在Spark executors中,而後經過Spark Streaming啓動job來處理這些數據,默認會丟失,可啓用WAL日誌,它同步將接受到數據保存到分佈式文件系統上好比HDFS。 因此數據在出錯的狀況下能夠恢復出來 。java
A、建立一個receiver來對kafka進行定時拉取數據,ssc的rdd分區和kafka的topic分區不是一個概念,故若是增長特定主消費的線程數僅僅是增長一個receiver中消費topic的線程數,並不增長spark的並行處理數據數量。
B、對於不一樣的group和topic可使用多個receivers建立不一樣的DStream
C、若是啓用了WAL(spark.streaming.receiver.writeAheadLog.enable=true)node
同時須要設置存儲級別(默認StorageLevel.MEMORY_AND_DISK_SER_2),正則表達式
即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)redis
1.1KafkaUtils.createDstream實戰
(1)添加kafka的pom依賴shell
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_0-10_2.11</artifactId>
<version>2.0.2</version>
</dependency>apache
(2)啓動zookeeper集羣編程
zkServer.sh startbootstrap
(3)啓動kafka集羣api
kafka-server-start.sh /export/servers/kafka/config/server.properties
(4)建立topic
kafka-topics.sh --create --zookeeper node-1:2181 --replication-factor 1 --partitions 3 --topic kafka_spark
(5)向topic中生產數據
經過shell命令向topic發送消息
kafka-console-producer.sh --broker-list node-1:9092--topic kafka_spark
(6)編寫SparkStreaming應用程序
KafkaUtils.createDstream方式(基於kafka高級Api-----偏移量由zk保存)
package cn.testdemo.dstream.kafka
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import scala.collection.immutable
//todo:利用sparkStreaming對接kafka實現單詞計數----採用receiver(高級API)
object SparkStreamingKafka_Receiver {
def main(args: Array[String]): Unit = {
//一、建立sparkConf
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkStreamingKafka_Receiver")
.setMaster("local[2]")
.set("spark.streaming.receiver.writeAheadLog.enable","true") //開啓wal預寫日誌,保存數據源的可靠性
//二、建立sparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
//三、建立StreamingContext
val ssc = new StreamingContext(sc,Seconds(5))
//設置checkpoint
ssc.checkpoint("./Kafka_Receiver")
//四、定義zk地址
val zkQuorum="node-1:2181,node-2:2181,node-3:2181"
//五、定義消費者組
val groupId="spark_receiver"
//六、定義topic相關信息 Map[String, Int]
// 這裏的value並非topic分區數,它表示的topic中每個分區被N個線程消費
val topics=Map("kafka_spark" -> 2)
//七、經過KafkaUtils.createStream對接kafka
//這個時候至關於同時開啓3個receiver接受數據
val receiverDstream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
stream
}
)
//使用ssc.union方法合併全部的receiver中的數據
val unionDStream: DStream[(String, String)] = ssc.union(receiverDstream)
//八、獲取topic中的數據
val topicData: DStream[String] = unionDStream.map(_._2)
//九、切分每一行,每一個單詞計爲1
val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1))
//十、相同單詞出現的次數累加
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
//十一、打印輸出
result.print()
//開啓計算
ssc.start()
ssc.awaitTermination()
}
}
(7)運行代碼,查看控制檯結果數據
總結:
經過這種方式實現,剛開始的時候系統正常運行,沒有發現問題,可是若是系統異常從新啓動sparkstreaming程序後,發現程序會重複處理已經處理過的數據,這種基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制能夠保證數據零丟失的高可靠性,可是卻沒法保證數據被處理一次且僅一次,可能會處理兩次。由於Spark和ZooKeeper之間多是不一樣步的。官方如今也已經不推薦這種整合方式,官網相關地址下面咱們使用官網推薦的第二種方式kafkaUtils的createDirectStream()方式。
2.KafkaUtils.createDirectStream方式
不一樣於Receiver接收數據,這種方式按期地從kafka的topic下對應的partition中查詢最新的偏移量,再根據偏移量範圍在每一個batch裏面處理數據,Spark經過調用kafka簡單的消費者Api讀取必定範圍的數據。
相比基於Receiver方式有幾個優勢:
A、簡化並行
不須要建立多個kafka輸入流,而後union它們,sparkStreaming將會建立和kafka分區一種的rdd的分區數,並且會從kafka中並行讀取數據,spark中RDD的分區數和kafka中的分區數據是一一對應的關係。
B、高效
第一種實現數據的零丟失是將數據預先保存在WAL中,會複製一遍數據,會致使數據被拷貝兩次,第一次是被kafka複製,另外一次是寫到WAL中。而沒有receiver的這種方式消除了這個問題。
C、剛好一次語義(Exactly-once-semantics)
Receiver讀取kafka數據是經過kafka高層次api把偏移量寫入zookeeper中,雖然這種方法能夠經過數據保存在WAL中保證數據不丟失,可是可能會由於sparkStreaming和ZK中保存的偏移量不一致而致使數據被消費了屢次。EOS經過實現kafka低層次api,偏移量僅僅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的問題。缺點是沒法使用基於zookeeper的kafka監控工具
2.1KafkaUtils.createDirectStream實戰
(1)前面的步驟跟KafkaUtils.createDstream方式同樣,接下來開始執行代碼。
KafkaUtils.createDirectStream方式(基於kafka低級Api-----偏移量由客戶端程序保存)
package cn.itcast.dstream.kafka
import kafka.serializer.StringDecoder
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
//todo:利用sparkStreaming對接kafka實現單詞計數----採用Direct(低級API)
object SparkStreamingKafka_Direct {
def main(args: Array[String]): Unit = {
//一、建立sparkConf
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkStreamingKafka_Direct")
.setMaster("local[2]")
//二、建立sparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
//三、建立StreamingContext
val ssc = new StreamingContext(sc,Seconds(5))
//四、配置kafka相關參數
val kafkaParams=Map("metadata.broker.list"->"node-1:9092,node-2:9092,node-3:9092","group.id"->"Kafka_Direct")
//五、定義topic
val topics=Set("kafka_spark")
//六、經過 KafkaUtils.createDirectStream接受kafka數據,這裏採用是kafka低級api偏移量不受zk管理
val dstream: InputDStream[(String, String)] =
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
//七、獲取kafka中topic中的數據
val topicData: DStream[String] = dstream.map(_._2)
//八、切分每一行,每一個單詞計爲1
val wordAndOne: DStream[(String, Int)] = topicData.flatMap(_.split(" ")).map((_,1))
//九、相同單詞出現的次數累加
val result: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
//十、打印輸出
result.print()
//開啓計算
ssc.start()
ssc.awaitTermination()
}
}
查看控制檯的輸出:
在spark1.3版本後,kafkautil裏面提供了兩個建立dstream的方法,
一、KafkaUtils.createDstream
構造函數爲KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] )
使用了receivers來接收數據,利用的是Kafka高層次的消費者api,對於全部的receivers接收到的數據將會保存在Spark executors中,而後經過Spark Streaming啓動job來處理這些數據,默認會丟失,可啓用WAL日誌,該日誌存儲在HDFS上
A、建立一個receiver來對kafka進行定時拉取數據,ssc的rdd分區和kafka的topic分區不是一個概念,故若是增長特定主體分區數僅僅是增長一個receiver中消費topic的線程數,並不增長spark的並行處理數據數量
B、對於不一樣的group和topic可使用多個receivers建立不一樣的DStream
C、若是啓用了WAL,須要設置存儲級別,即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)
2.KafkaUtils.createDirectStream
區別Receiver接收數據,這種方式按期地從kafka的topic+partition中查詢最新的偏移量,再根據偏移量範圍在每一個batch裏面處理數據,使用的是kafka的簡單消費者api
優勢:
A、 簡化並行,不須要多個kafka輸入流,該方法將會建立和kafka分區同樣的rdd個數,並且會從kafka並行讀取。
B、高效,這種方式並不須要WAL,WAL模式須要對數據複製兩次,第一次是被kafka複製,另外一次是寫到wal中
C、剛好一次語義(Exactly-once-semantics),傳統的讀取kafka數據是經過kafka高層次api把偏移量寫入zookeeper中,存在數據丟失的可能性是zookeeper中和ssc的偏移量不一致。EOS經過實現kafka低層次api,偏移量僅僅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的問題。缺點是沒法使用基於zookeeper的kafka監控工具
Kafka 0.10的Spark Streaming集成在設計上相似於0.8 Direct Stream方法。它提供簡單的並行性,Kafka分區和Spark分區之間的1:1對應,以及訪問偏移和元數據。然而,由於較新的集成使用新的Kafka消費者API而不是簡單的API,因此在使用上有顯着的差別。此版本的集成被標記爲實驗性的,所以API可能會更改。
連接
對於使用SBT / Maven項目定義的Scala / Java應用程序,請將流應用程序與如下工件連接(有關詳細信息,請參閱主編程指南中的連接部分)。
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.11
version = 2.1.0
建立直接流
請注意,導入的命名空間包括版本org.apache.spark.streaming.kafka010
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
return new Tuple2<>(record.key(), record.value());
}
})
有關可能的kafkaParams,請參閱Kafka consumer配置文件。若是您的Spark批處理持續時間大於默認的Kafka心跳會話超時(30秒),請適當增長heartbeat.interval.ms和session.timeout.ms。對於大於5分鐘的批次,這將須要更改代理上的group.max.session.timeout.ms。請注意,示例將enable.auto.commit設置爲false,有關討論,請參閱下面的存儲偏移。
LocationStrategies
新的Kafka consumer API會將消息預取到緩衝區中。所以,出於性能緣由,Spark集成保持緩存消費者對執行者(而不是爲每一個批次從新建立它們)是重要的,而且更喜歡在具備適當消費者的主機位置上調度分區。
在大多數狀況下,您應該使用LocationStrategies.PreferConsistent如上所示。這將在可用的執行器之間均勻分配分區。若是您的執行程序與Kafka代理所在的主機相同,請使用PreferBrokers,這將更喜歡在該分區的Kafka leader上安排分區。最後,若是您在分區之間的負載有顯着誤差,請使用PreferFixed。這容許您指定分區到主機的顯式映射(任何未指定的分區將使用一致的位置)。
消費者的緩存的默認最大大小爲64.若是您但願處理超過(64 *個執行程序數)Kafka分區,則能夠經過如下方式更改此設置: spark.streaming.kafka.consumer.cache.maxCapacity
緩存由topicpartition和group.id鍵入,所以對每一個調用使用一個單獨 group.id的createDirectStream。
ConsumerStrateges
新的Kafka consumer API有許多不一樣的方式來指定主題,其中一些須要至關多的後對象實例化設置。 ConsumerStrategies提供了一種抽象,容許Spark即便在從檢查點從新啓動後也能得到正確配置的消費者。
ConsumerStrategies.Subscribe,如上所示,容許您訂閱固定的主題集合。SubscribePattern容許您使用正則表達式來指定感興趣的主題。注意,與0.8集成不一樣,在運行流期間使用Subscribe或SubscribePattern應該響應添加分區。最後,Assign容許您指定固定的分區集合。全部三個策略都有重載的構造函數,容許您指定特定分區的起始偏移量。
若是您具備上述選項不知足的特定用戶設置需求,則ConsumerStrategy是能夠擴展的公共類。
建立RDD
若是您有一個更適合批處理的用例,則能夠爲定義的偏移量範圍建立RDD。
// Import dependencies and create kafka params as in Create Direct Stream above
OffsetRange[] offsetRanges = {
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange.create("test", 0, 0, 100),
OffsetRange.create("test", 1, 0, 100)
};
JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent()
);
注意,你不能使用PreferBrokers,由於沒有流沒有驅動程序端消費者爲你自動查找代理元數據。若是須要,請PreferFixed使用您本身的元數據查找。
獲取偏移
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
final OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(new VoidFunction<Iterator<ConsumerRecord<String, String>>>() {
@Override
public void call(Iterator<ConsumerRecord<String, String>> consumerRecords) {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
}
});
}
});
注意類型轉換HasOffsetRanges只會成功,若是是在第一個方法中調用的結果createDirectStream,不是後來一系列的方法。請注意,RDD分區和Kafka分區之間的一對一映射在任何隨機或從新分區的方法(例如reduceByKey()或window())後不會保留。
存儲偏移
在失敗的狀況下的Kafka交付語義取決於如何和什麼時候存儲偏移。火花輸出操做至少一次。所以,若是你想要一個徹底一次的語義的等價物,你必須在一個等冪輸出以後存儲偏移,或者在一個原子事務中存儲偏移和輸出。使用這種集成,您有3個選項,按照可靠性(和代碼複雜性)的增長,如何存儲偏移。
檢查點
若是啓用Spark 檢查點,偏移將存儲在檢查點中。這很容易實現,但有缺點。你的輸出操做必須是冪等的,由於你會獲得重複的輸出; 事務不是一個選項。此外,若是應用程序代碼已更改,您將沒法從檢查點恢復。對於計劃升級,您能夠經過與舊代碼同時運行新代碼來緩解這種狀況(由於輸出必須是冪等的,它們不該該衝突)。但對於須要更改代碼的意外故障,您將丟失數據,除非您有其餘方法來識別已知的良好起始偏移。
kafka
Kafka有一個偏移提交API,將偏移存儲在特殊的Kafka主題中。默認狀況下,新消費者將按期自動提交偏移量。這幾乎確定不是你想要的,由於消費者成功輪詢的消息可能尚未致使Spark輸出操做,致使未定義的語義。這就是爲何上面的流示例將「enable.auto.commit」設置爲false的緣由。可是,您能夠在使用commitAsyncAPI 存儲了輸出後,向Kafka提交偏移量。與檢查點相比,Kafka是一個耐用的存儲,而無論您的應用程序代碼的更改。然而,Kafka不是事務性的,因此你的輸出必須仍然是冪等的。
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
}
});
您本身的數據存儲
對於支持事務的數據存儲,即便在故障狀況下,也能夠在同一事務中保存偏移量做爲結果,以保持二者同步。若是您仔細檢查重複或跳過的偏移範圍,則回滾事務可防止重複或丟失的郵件影響結果。這給出了剛好一次語義的等價物。也可使用這種策略甚至對於聚合產生的輸出,聚合一般很難使冪等。
// The details depend on your data store, but the general idea looks like this
// begin from the the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
@Override
public void call(JavaRDD<ConsumerRecord<String, String>> rdd) {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
Object results = yourCalculation(rdd);
// begin your transaction
// update results
// update offsets where the end of existing offsets matches the beginning of this batch of offsets
// assert that offsets were updated correctly
// end your transaction
}
});
SSL / TLS
新的Kafka消費者支持SSL。要啓用它,請在傳遞到createDirectStream/ 以前適當地設置kafkaParams createRDD。注意,這隻適用於Spark和Kafka代理之間的通訊; 您仍然有責任單獨保證 Spark節點間通訊。
Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");
部署
與任何Spark應用程序同樣,spark-submit用於啓動應用程序。
對於Scala和Java應用程序,若是您使用SBT或Maven進行項目管理,則將程序包spark-streaming-kafka-0-10_2.11及其依賴項包含到應用程序JAR中。確保spark-core_2.11並spark-streaming_2.11標記爲provided依賴關係,由於它們已經存在於Spark安裝中。而後使用spark-submit啓動應用程序(請參閱主程序指南中的部署部分)。
spark-streaming-kafka-0-10中已經實現offset自動提交zk中
最新的實現中createDirectStream也能夠提交offset了spark-streaming-kafka-0-10http://spark.apache.org/docs/latest/streaming-kafka-integration.html但要求 kafka是0.10.0及之後。
createDirectStream不會自動提交offset到zk中,不能方便的監控數據消費狀況
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic)) .transform(rdd => { val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (offset <- offsets) { val topicAndPartition = TopicAndPartition(offset.topic, offset.partition) //保存offset至zk可redis中方便監控 //commitOffset(kafkaParams,groupId, Map(topicAndPartition -> offset.untilOffset)) } rdd })
若是能夠只是用來監控消費狀況在transform中轉換成HasOffsetRanges取出offset保存到zk中便可,
"rdd.asInstanceOf[HasOffsetRanges].offsetRanges" 若是已經通過其它Transformations或output操做以後此rdd已經不是KafkaRDD,再轉換會報錯!!
另外還有一個控制能更強的createDirectStream方法,能夠指定fromOffsets和messageHandler
def createDirectStream(
ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
)
能夠將offset保存在zk或redis等外部存儲中方便監控,而後下次啓動時再從中讀取
Kafka中的partition和Spark中的partition是不一樣的概念,但createDirectStream方式時topic的總partition數量和Spark和partition數量相等。
```
//KafkaRDD.getPartitions
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}
```
partition中數據分佈不均會致使有些任務快有些任務慢,影響總體性能,能夠根據實際狀況作repartition,單個topic比較容易實現partition中數據分佈均勻,但若是同一個程序中須要同時處理多個topic的話,能夠考慮可否合併成一個topic,增長partition數量,不過topic不少時間會和其它系統共用,因此可能不容易合併,這狀況只能作repartition。雖然repartition會消耗一些時間,但總的來講,若是數據分佈不是很均勻的話repartition仍是值得,repartition以後各任務處理數據量基本同樣,並且Locality_level會變成「PROCESS_LOCAL」
!!使用flume加載到kafka的使用默認配置十有八九分佈不勻
代碼:
Object SparkApp(){ def gnStreamContext(chkdir:String,batchDuration: Duration,partitions:Int)={ val conf = new SparkConf().setAppName("GnDataToHive") //.setMaster("local[2]") val ssc = new StreamingContext(conf, batchDuration) KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic)) ........... ........... ........... val terminfos = ssc.sparkContext.broadcast(ttis) ssc.checkpoint(chkdir) ssc } def main(args: Array[String]): Unit = { val chkdir="hdfs://xxxxx/chkpoint/chkpoint-1" val chkssc = StreamingContext.getOrCreate(chkdir,()=>gnStreamContext(chkdir,Seconds(args(0).toInt),args(1).toInt)) chkssc.start() chkssc.awaitTermination() } }
offset會在保存至檢查點中,下次啓動會繼續接着讀取可是如下問題須要注意:
kafka中數一般保存週期都不會太長,都有清理週期,若是記錄的offset對應數據已經被清理,從檢查點恢復時程序會一直報錯。
若是程序邏輯發生變化,須要先刪除檢查點,不然無論數據仍是邏輯都會從舊檢查點恢復。
能夠用spark.streaming.kafka.maxRatePerPartition指定每一個批次從每一個partition中每秒中最大拉取的數據量,好比將值設爲1000的話,每秒鐘最多從每一個partition中拉取1000條數據,若是batchDuration設爲1分鐘的話,則每一個批次最多從每一個partition中拉取60000條數據。
此值要設置合理,過小有可能致使資源浪費,但kafka中的數據消費不完,太多又達不到限流的目的
具體代碼見:
DirectKafkaInputDStream.maxMessagesPerPartition
DirectKafkaInputDStream.clamp
``` // limits the maximum number of messages per partition protected def clamp( leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = { maxMessagesPerPartition.map { mmp => leaderOffsets.map { case (tp, lo) => tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset)) } }.getOrElse(leaderOffsets) } ```
spark-submit提交時帶上便可:--conf spark.streaming.kafka.maxRatePerPartition=10000
貌似只能在createDirectStream中起做用,在createStream方式中沒看到有相似設置
寫入hdfs時默認目錄名格式爲:"prefix-TIME_IN_MS.suffix",每一個目錄下的文件名爲"part-xxxx"。
若是隻想自定義目錄名能夠經過foreachRDD,調用RDD的saveAsXXX dstream.foreachRDD(rdd=>rdd.saveAsxxxx(""))
若是須要自定義輸出的文件名,須要自定義一個FileOutputFormat的子類,修改getRecordWriter方法中的name便可,而後調用saveAsHadoopFile[MyTextOutputFormat[NullWritable, Text]]
。
某些狀況下載關聯外部數據進行關聯或計算。
mapPartitions
或foreachRDD.foreachPartitions
中關聯By default, it will start consuming from the latest offset of each Kafka partition. If you set configuration auto.offset.reset in Kafka parameters to smallest, then it will start consuming from the smallest offset.
topic主題,分區ID,起始offset,結束offset
由於spark源碼中KafkaCluster
類被限制在[spark]包下,因此咱們若是想要在項目中調用這個類,那麼只能在項目中也新建包org.apache.spark.streaming.kafka
.而後再該包下面寫調用的邏輯.這裏面就能夠引用KafkaCluster
類了.這個類裏面封裝了不少實用的方法,好比:獲取主題和分區,獲取offset等等...
這些api,spark裏面都有現成的,咱們如今就是須要組織起來!
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
簡單說一下
在zookeeper上讀取offset前先根據實際狀況更新fromOffsets
1.1 若是是一個新的groupid,那麼會從最新的開始讀
1.2 若是是存在的groupid,根據配置auto.offset.reset
1.2.1 smallest
: 那麼會從開始讀,獲取最開始的offset.
1.2.2 largest
: 那麼會從最新的位置開始讀取,獲取最新的offset.
根據topic獲取topic和該topics下全部的partitions
val partitionsE = kc.getPartitions(topics)
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V, String)]( ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => ( mmd.key, mmd.message, mmd.topic))
kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
sparkstreaming-kafka的源碼中是本身把offset維護在kafka集羣中了?
./kafka-consumer-groups.sh --bootstrap-server 10.10.25.13:9092 --describe --group heheda
由於用命令行工具能夠查到,這個工具能夠查到基於javaapi方式的offset,查不到在zookeeper中的
網上的本身維護offset,是把offset維護在zookeeper中了?
用這個方式產生的groupid,在命令行工具中查不到,可是也是調用的源碼中的方法呢?
難道spark提供了這個方法,可是本身卻沒有用是嗎?
區別只在於,本身維護offset,會先去zk中獲取offset,邏輯處理完成後再更新zk中的offset.
然而,在代碼層面,區別在於調用了不一樣的KafkaUtils.createDirectStream
本身維護的offset,這個方法會傳入offset.由於在此以前咱們已經從zk中獲取到了起始offset
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V, String)]( ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => ( mmd.key, mmd.message, mmd.topic))
接受的是一個topic,底層會根據topic去獲取存儲在kafka中的起始offset
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, myTopic)
接下來這個方法裏面會調用getFromOffsets
來獲取起始的offset
val kc = new KafkaCluster(kafkaParams) val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
這個代碼,網上不少,GitHub上也有現成的了.這裏我就不貼出來了!
這裏主要仍是學習代碼的實現思路!