spark streaming基於Kafka的開發

spark streaming使用Kafka數據源進行數據處理,本文側重講述實踐使用。

1、基於receiver的方式

在使用receiver的時候,若是receiver和partition分配不當,很容易形成數據傾斜,使個別executor工做繁重,拖累總體處理速度。

receiver線程分配和partition的關係:

假如topic A,分配了3個receiver,topic A有5個partition,一個receiver會對應一個線程,partition 0,1,2,3,4會這樣分配html

 


1. partition和receiver的分配計算

    1.1.partition 5/receiver 3 = 1;
    1.2.partition 5%receiver 3 = 2;

2. receiver分配到的partition

    2.1.receiver 1,分配的partition編號爲:0,1
    2.2.receiver 2,分配的partition編號爲:2,3
    2.3.receiver 3,分配的partition編號爲:4java



⚠️因而可知,要想達到數據較均衡處理,設計好receiver線程數很重要,固然還要注意,每一個topic消息處理的速度。
 api

 

 

要想數據能更好的均衡處理,還要使每一個executor分配的receiver線程數儘可能均等。最好是receiver的總個數與executor的個數相同。不過在調度資源的時候,若是隻是分配到一部分資源,那麼等receiver分配好executor後,後期再申請到的資源,也不會有receiver從新分配。app

 

JavaPairReceiverInputDStream<String,byte> messages = KafkaUtils.createStream( jssc, String.class,byte.class, kafka.serializer.DefaultDecoder.class, kafka.serializer.DefaultDecoder.class, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK());

參數解析:ide

 


1.jssc:JavaStreamingContext

2.DStream的key類型

3.DStream的值類型

4.Kafka key 解析類型

5.Kafka value 解析類型

6.Kafka參數配置,map類型

    1)zookeeper的配置信息fetch

kafkaParams.put("zookeeper.connect", "192.168.1.1:2181");

    2)groupID 
kafkaParams.put("group.id", "group");

    3)超時設置
kafkaParams.put("zookeeper.connection.timeout.ms", "1000");

7.topic信息爲map類型,如:topicMap.put(ga,2),其中ga爲topic名稱,2            表示爲這個topic建立的線程數

8.RDD存儲級別

 


 
2、基於direct的方式

Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the directKafkaStream, not later down a chain of methods. You can use transform() instead of foreachRDD() as your first method call in order to access offsets, then call further Spark methods. However, be aware that the one-to-one mappingbetween RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().spa

 

須要注意的是:spark.streaming.kafka.maxRatePerPartition它是配置每一個topic全部partition的最大速率,就是說不分topic,全部的消費的partition的最大速率都是同樣。在有消息延遲時,咱們須要設置這個參數,否則會一上來就衝很大的消息量,致使系統崩潰(這裏重點講述有延遲的處理)。


 


1.使用direct API能夠保證每一個topic的全部partition均衡的處理數據(如:topic A的全部partition的offset範圍是相同的)。但須要注意的是,它會均衡每一個topic的全部partition的offset範圍,當有個別partition處理速度慢,它會從新均衡offset範圍

2.在延遲消費時,當消費的topic的partition分區相同,可是生產速率不一樣,會致使消費的消息時間有很大差別線程



在資源分配不合理狀況下:

如:topic A,topic B分別有30個partition,當分配的num-executor 3,executor-cores 5時,同時並行處理的task爲15個(或分配cores總數爲30),taskID小的那個topic會優先調度,因爲spark的任務調度是默認是FIFO,會致使後面處理的topic時間延遲,進而下一批處理的offset偏移範圍會相對調小,一直這樣循環下去,會使後處理的topic消息量愈來愈少。

但當總體都有消息延遲,或忽然下降處理量時(或sleep一段時間),兩個topic的消息處理量達到一個很低的值後,當從新獲得資源時,兩個topic的offset範圍會從新恢復到均衡的範圍。

如圖所示:設計

 

 

1.jpeg

 

2.jpeg


 
因此建議,在分配資源的時候,儘可能不要被每一個topic的partition個數整除,以避免發生有的topic處理慢,致使消息處理量一直降低。(列表待整理:待驗證)

建立directStreamcode

JavaInputDStream<byte> message = KafkaUtils.createDirectStream( jssc, String.class, byte.class, StringDecoder.class, DefaultDecoder.class, byte.class, kafkaParams, fromOffsets, new Function<MessageAndMetadata<String, byte>, byte>() { @Override public byte call(MessageAndMetadata<String, byte> v1) throws Exception { return v1.message(); } } );


 
參數解析:

 


1.jssc:JavaStreamingContext

2.Kafka記錄中的key的類型

3.Kafka記錄中的value的類型

4.key解析類型

5.value解析類型

6.Dstream中的記錄類型:定義的DStrem須要返回的類型

7.Kafka參數配置,map類型

    1)broker配置信息

kafkaParams.put("metadata.broker.list", "192.168.1.1:9092,192.168.1.2:9092");

    2)groupID
kafkaParams.put("group.id", "group");

8.fromOffsets

9.messageHandler

 



從Kafka讀取offset信息:

 

final static int TIMEOUT = 100000; final static int BUFFERSIZE = 64 * 1024; public static Map<TopicAndPartition, Long> getLastOffsetsOrEarlist( String brokers, List<String> topic, String groupId, boolean isLastOffset) { Map<TopicAndPartition, Long> topicOffsets = new HashMap<TopicAndPartition, Long>(); Map<TopicAndPartition, Broker> topicBroker = findLeaders(brokers, topic); for (Map.Entry<TopicAndPartition, Broker> tp : topicBroker.entrySet()) { Broker leader = tp.getValue(); SimpleConsumer sim_consumer = new SimpleConsumer( leader.host(), leader.port(), TIMEOUT, BUFFERSIZE, groupId); long offset; if (isLastOffset) { offset = getLastOffset(sim_consumer, tp.getKey(), groupId); } else { offset = getEarliestOffset(sim_consumer, tp.getKey(), groupId); } topicOffsets.put(tp.getKey(), offset); } return topicOffsets; }


1.getBrokerMap

private static Map<String, Integer> getBrokderMap(String brokers) { Map<String, Integer> brokMap = new HashMap<>(); if (brokers != null) { String brokList = brokers.split(","); for (String b : brokList) { String ip_port = b.split(":"); brokMap.put(ip_port[0], Integer.parseInt(ip_port[1])); } } return brokMap; }

2.findleader

// 根據topic查找leader public static Map<TopicAndPartition, Broker> findLeaders(String brokers, List<String> topic) { Map<String, Integer> brokMap = getBrokderMap(brokers); Map<TopicAndPartition, Broker> topicBroker = new HashMap<>(); String client_name = "client_" + topic.get(0) + "_" + System.currentTimeMillis(); for (String b : brokMap.keySet()) { SimpleConsumer sim_consumer = null; try { sim_consumer = new SimpleConsumer(b, brokMap.get(b), TIMEOUT, BUFFERSIZE, client_name); TopicMetadataRequest request = new TopicMetadataRequest(topic); TopicMetadataResponse response = sim_consumer.send(request); List<TopicMetadata> metadata = response.topicsMetadata(); for (TopicMetadata t : metadata) { for (PartitionMetadata p : t.partitionsMetadata()) { TopicAndPartition topicAndPartition = new TopicAndPartition(t.topic(), p.partitionId()); topicBroker.put(topicAndPartition, p.leader()); } } } catch (Exception e) { e.printStackTrace(); } finally { if (sim_consumer != null) { sim_consumer.close(); } } } return topicBroker; }


3.getLasetOffset或getEarliestOffset

// 根據topicAndPartition獲得offset值 private static Long getLastOffset( SimpleConsumer consumer, TopicAndPartition tp, String clientName) { Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>(); requestInfo.put(tp, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1)); OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(tp.topic(), tp.partition())); } long offsets = response.offsets(tp.topic(), tp.partition()); return offsets[0]; }
private static Long getEarliestOffset( SimpleConsumer consumer, TopicAndPartition tp, String clientName) { Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>(); requestInfo.put(tp, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1)); OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); OffsetResponse response = consumer.getOffsetsBefore(request); if (response.hasError()) { System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(tp.topic(), tp.partition())); } long offsets = response.offsets(tp.topic(), tp.partition()); return offsets[0]; }
相關文章
相關標籤/搜索