spark streaming使用Kafka數據源進行數據處理,本文側重講述實踐使用。
1、基於receiver的方式
在使用receiver的時候,若是receiver和partition分配不當,很容易形成數據傾斜,使個別executor工做繁重,拖累總體處理速度。
receiver線程分配和partition的關係:
假如topic A,分配了3個receiver,topic A有5個partition,一個receiver會對應一個線程,partition 0,1,2,3,4會這樣分配html
1. partition和receiver的分配計算
1.1.partition 5/receiver 3 = 1;
1.2.partition 5%receiver 3 = 2;
2. receiver分配到的partition
2.1.receiver 1,分配的partition編號爲:0,1
2.2.receiver 2,分配的partition編號爲:2,3
2.3.receiver 3,分配的partition編號爲:4java
⚠️因而可知,要想達到數據較均衡處理,設計好receiver線程數很重要,固然還要注意,每一個topic消息處理的速度。
api
要想數據能更好的均衡處理,還要使每一個executor分配的receiver線程數儘可能均等。最好是receiver的總個數與executor的個數相同。不過在調度資源的時候,若是隻是分配到一部分資源,那麼等receiver分配好executor後,後期再申請到的資源,也不會有receiver從新分配。app
參數解析:ide
1.jssc:JavaStreamingContext
2.DStream的key類型
3.DStream的值類型
4.Kafka key 解析類型
5.Kafka value 解析類型
6.Kafka參數配置,map類型
1)zookeeper的配置信息fetchkafkaParams.put("zookeeper.connect", "192.168.1.1:2181");
2)groupIDkafkaParams.put("group.id", "group");
3)超時設置kafkaParams.put("zookeeper.connection.timeout.ms", "1000");
7.topic信息爲map類型,如:topicMap.put(ga,2),其中ga爲topic名稱,2 表示爲這個topic建立的線程數
8.RDD存儲級別
2、基於direct的方式
Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the directKafkaStream, not later down a chain of methods. You can use transform() instead of foreachRDD() as your first method call in order to access offsets, then call further Spark methods. However, be aware that the one-to-one mappingbetween RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().spa
須要注意的是:spark.streaming.kafka.maxRatePerPartition它是配置每一個topic全部partition的最大速率,就是說不分topic,全部的消費的partition的最大速率都是同樣。在有消息延遲時,咱們須要設置這個參數,否則會一上來就衝很大的消息量,致使系統崩潰(這裏重點講述有延遲的處理)。
1.使用direct API能夠保證每一個topic的全部partition均衡的處理數據(如:topic A的全部partition的offset範圍是相同的)。但須要注意的是,它會均衡每一個topic的全部partition的offset範圍,當有個別partition處理速度慢,它會從新均衡offset範圍
2.在延遲消費時,當消費的topic的partition分區相同,可是生產速率不一樣,會致使消費的消息時間有很大差別線程
在資源分配不合理狀況下:
如:topic A,topic B分別有30個partition,當分配的num-executor 3,executor-cores 5時,同時並行處理的task爲15個(或分配cores總數爲30),taskID小的那個topic會優先調度,因爲spark的任務調度是默認是FIFO,會致使後面處理的topic時間延遲,進而下一批處理的offset偏移範圍會相對調小,一直這樣循環下去,會使後處理的topic消息量愈來愈少。
但當總體都有消息延遲,或忽然下降處理量時(或sleep一段時間),兩個topic的消息處理量達到一個很低的值後,當從新獲得資源時,兩個topic的offset範圍會從新恢復到均衡的範圍。
如圖所示:設計
因此建議,在分配資源的時候,儘可能不要被每一個topic的partition個數整除,以避免發生有的topic處理慢,致使消息處理量一直降低。(列表待整理:待驗證)
建立directStreamcode
參數解析:
1.jssc:JavaStreamingContext
2.Kafka記錄中的key的類型
3.Kafka記錄中的value的類型
4.key解析類型
5.value解析類型
6.Dstream中的記錄類型:定義的DStrem須要返回的類型
7.Kafka參數配置,map類型
1)broker配置信息kafkaParams.put("metadata.broker.list", "192.168.1.1:9092,192.168.1.2:9092");
2)groupIDkafkaParams.put("group.id", "group");
8.fromOffsets
9.messageHandler
從Kafka讀取offset信息:
1.getBrokerMap
2.findleader
3.getLasetOffset或getEarliestOffset