kafka是一個發佈訂閱消息系統,具備分佈式、分區化、多副本提交日誌特色。kafka項目在0.8和0.10之間引入了一種新型消費者API,注意選擇正確的包以得到相應的特性。每一個版本都是向後兼容的,所以0.8能夠兼容0.9和0.10,可是0.10不能兼容早期版本。0.8支持python、Receiver流和Direct流,不支持偏移量提交API以及動態分區訂閱,0.10不支持python和Receiver流,支持Direct流、偏移量提交API和動態分區訂閱。具體見表格:java
spark-streaming-kafka-0-8 | spark-streaming-kafka-0-10 | |
---|---|---|
Broker Version | 0.8.2.1 or higher | 0.10.0 or higher |
API Maturity | 過期 | 穩定 |
支持語言 | scala、java、python | scala、java |
Receiver流 | 支持 | 不支持 |
Direct流 | 支持 | 支持 |
SSL/TLS | 不支持 | 支持 |
偏移量提交API | 不支持 | 支持 |
動態分區訂閱 | 不支持 | 支持 |
0.10的集成方式和0.8相似,提供了在spark streaming 分區和kafka分區的1:1關係,能夠訪問偏移量和元數據。但因爲使用的是新型消費者API,而不是簡單API,所以仍是有諸多注意事項。python
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.1.0</version> </dependency>
**注意:**不要手動添加org.apache.kafka工件依賴,該依賴已經有正確的工件依賴,多個不一樣版本會致使不兼容。正則表達式
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe /** * Created by Administrator on 2018/3/8. */ object SparkStreamingKafkaScala { def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("kafka") conf.setMaster("local[*]") val ssc = new StreamingContext(conf , Seconds(2)) //kafka參數 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "s102:9092,s103:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "g1", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("topic1") val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) val ds2 = stream.map(record => (record.key, record.value)) ds2.print() ssc.start() ssc.awaitTermination() } }
$>xkafka.sh start
建立主題,指定分區數和副本數,分區數和集羣的內核數相同,保證最大併發能力,例若有三個節點,每一個節點8個和,分區數爲3 * 8 = 24,。sql
$>kafka-topics.sh --zookeeper s102:2181 --create --topic topic1 \ --replication-factor 3 --partitions 8
$>kafka-console-consumer.sh --zookeeper s102:2181 --topic topic1
$>kafka-console-producer.sh --broker-list s102:9092 --topic topic1
//經過RDD查看分區數 , 分區數爲4 stream.foreachRDD(rdd=>{ println(rdd.partitions.length) })
$>kafka-topics.sh --zookeeper s102:2181 --topic topic1 --describe
運行結果以下:shell
新型kafka消費者API會預提取kafka數據到buffer中,所以讓Spark在executor上保持緩存的consumer,對於性能來說就很是重要,而不是每一個batch建立新的consumer,選擇在執行器上對於給定的主題分區如何調度消費者。位置策略的本意就是控制消費者在哪些節點上開啓。數據庫
LocationStrategies.PreferConsistentapache
大多數狀況下,選擇使用該方式,將在全部executors上均衡分佈分區進行調度。bootstrap
LocationStrategies.PreferBrokers緩存
若是executor和kafka broker位於同一主機,則能夠使用該方式,這將優先調度那些分區爲leader的分區。併發
LocationStrategies.PreferFixed
若是在分區間有嚴重的數據傾斜,能夠使用該方式,容許爲分區指定特定的位置進行調度。
新型kafka消費者API有幾種指定主題的方式。
ConsumerStrategies.Assign
容許指定固定的分區集合。
ConsumerStrategies.Subscribe
容許訂閱固定的主題集合。
ConsumerStrategies.SubscribePattern
能夠使用正則表達式指定主題。
指定s102消費主題的全部分區,每一個分區下消費特定的偏移量。
import java.net.Socket import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable.ArrayBuffer import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe /** * Created by Administrator on 2018/3/8. */ object SparkStreamingKafkaScala { //發送消息給遠程socket def sendInfo(msg: String, objStr: String) = { //獲取ip val ip = java.net.InetAddress.getLocalHost.getHostAddress //獲得pid val rr = java.lang.management.ManagementFactory.getRuntimeMXBean(); val pid = rr.getName().split("@")(0); //pid //線程 val tname = Thread.currentThread().getName //對象id val sock = new java.net.Socket("s101", 8888) val out = sock.getOutputStream val m = ip + "\t:" + pid + "\t:" + tname + "\t:" + msg + "\t:" + objStr + "\r\n" out.write(m.getBytes) out.flush() out.close() } def main(args: Array[String]): Unit = { val conf = new SparkConf() conf.setAppName("kafka") //conf.setMaster("spark://s101:7077") conf.setMaster("local[8]") val ssc = new StreamingContext(conf, Seconds(5)) //kafka參數 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "s102:9092,s103:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "g1", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) //主題分區與主機名的映射,那個主題分區由哪臺主機消費 val map = scala.collection.mutable.Map[TopicPartition,String]() /************************************************** *********** 必定要使用ip地址 !!!!***************** **************************************************/ map.put(new TopicPartition("t1" , 0) , "192.168.231.102") map.put(new TopicPartition("t1" , 1) , "192.168.231.102") map.put(new TopicPartition("t1" , 2) , "192.168.231.102") map.put(new TopicPartition("t1" , 3) , "192.168.231.102") val fix = LocationStrategies.PreferFixed(map) ; val topics = Array("t1") //主題分區集合 val tps = scala.collection.mutable.ArrayBuffer[TopicPartition]() tps.+=(new TopicPartition("t1" , 0)) tps.+=(new TopicPartition("t1" , 1)) tps.+=(new TopicPartition("t1" , 2)) //偏移量集合 val offsets = scala.collection.mutable.Map[TopicPartition,Long]() offsets.put(new TopicPartition("t1", 0), 3) offsets.put(new TopicPartition("t1", 1), 3) offsets.put(new TopicPartition("t1", 2), 0) //消費者策略,主題分區與偏移集合 val conss = ConsumerStrategies.Assign[String,String](tps , kafkaParams , offsets) //建立kakfa直向流 val stream = KafkaUtils.createDirectStream[String,String]( ssc, fix, ConsumerStrategies.Assign[String, String](tps, kafkaParams, offsets) ) val ds2 = stream.map(record => { val t = Thread.currentThread().getName val key = record.key() val value = record.value() val offset = record.offset() val par = record.partition() val topic = record.topic() val tt = (key , value ,offset, par,topic ,t) sendInfo(tt.toString() ,this.toString) tt }) ds2.print() ssc.start() ssc.awaitTermination() } }
將rdd強制轉換成CanCommitOffsets,經過該trait進行提交,且只能異步提交,能夠指定回調函數對提交結果進行處理。
val stream = KafkaUtils.createDirectStream(...) //... //在driver端提交,由於是DStream的方法,DStream不能串行化。 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets, new OffsetCommitCallback() { //回調函數 def onComplete(m: java.util.Map[TopicPartition, OffsetAndMetadata], e: Exception) { if (null != e) { // error } else { // success } } })
最多一次
at most ,最多消費一次。先提交,後消費。
//先提交 commitAsync() //後消費 consume()
最少一次
//先消費 consume() //後提交 commitAsync()
精準一次
//提交偏移量到數據庫,不在kafka中 conn.setAutoCommit(); consume() updateOffset() conn.commit();