SparkStreaming-Kafka集成

SparkStreaming-Kafka集成

參考連接: Spark Streaming + Kafka Integration Guidehtml

文章基本是官方的翻譯, 最多再加入了一小部分本身的思考在內, 若是能看懂官方文檔, 也能夠自行查看官網。java

另外就是提供了本身實現的 zk + kafka + spark 獲取offset。 offset的存儲在 獲取偏移量 與 存儲偏移量的 第三小節 有描述。mysql

基於版本:正則表達式

Kafka broker version 0.10.0 or higherredis

0.10.0 版本的 SparkStreaming kafka 與 0.8版本的 DirectStream比較接近。sql

它支持比較簡單的並行性,包括 Kafka 分區 和Spark 分區之間 是 1:1對應關係,以及對偏移量和元數據的訪問。可是,因爲較新的集成使用了新的Kafka使用者API而不是簡單的API,所以用法上存在顯着差別。apache

Maven依賴

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.12
version = 2.4.4

注意: 不要自行添加 org.apache.kafka artifacts (例如:kafka-clients), 在 spark-streaming-kafka-0-10 已經集成了可以使用的kafka版本, 若是自行引入其餘kakfa版本可能會引起問題。bootstrap

但一樣也須要注意到的是: 這一點是在2.4.4版本才添加的, 在2.4.3版本及之前仍是須要本身手動引入 kafka.clients的。api

建立 Direct Stream

須要注意引入的版本號: 010緩存

import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("topicA", "topicB");

JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);

stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));

參數:auto.offset.reset

earliest: 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費

latest: 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
none: topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常
默認建議用earliest。設置該參數後 kafka出錯後重啓,找到未消費的offset能夠繼續消費。

可是對於 Spark而言,在某些狀況下 採起哪種並無太大區別, 這個稍後再說。

對於kafka中可配置的參數, 能夠在 KAFKA_CONFIGURATION 中找到.

若是你的 spark batch interval 時間要大於 Kafka heartbeat session timeout(默認是30s),
If your Spark batch duration is larger than the default Kafka heartbeat session timeout (30 seconds), 須要自行增長 heartbeat.interval.ms 和 session.timeout.ms. 由於 Spark是 每隔一個 batch interval纔去拉取數據, 若是間隔過久, kafka就會認爲已經斷開鏈接。 對於 batch interval 大於5分鐘的, 還須要配置另外一個參數:group.max.session.timeout.ms.

另外就是 注意到 在 例子中設置: enable.auto.commit false,在稍後會描述緣由。

LocationStrategies

在方法參數中, 須要傳入 LocationStrategies。

在新版的kafka Consumer API中, 會將 message 預加載到緩存中,所以, 出於性能的緣由, Spark集成kafka 會將 consumers 緩存到 executor中(而不是在每一個批次都從新建立 consumers),而且更傾向於 在具備 適當的 consumers 的主機上 安排分區。

在大多數狀況下, 咱們須要使用 LocationStrategies.PreferConsistent 它將會在 可用的 executors上均勻分配分區。

若是 executors 和 kafka brokers 在同一臺主機上, 則LocationStrategies.PreferBrokers 是更好的選擇。由於它會 將 partition 優先分配到存在 kafka broker 的機器上。

由於kafka的分區會與 spark 分區一一對應, 所以, 可能會由於 kafka的數據傾斜, 致使 spark中一樣出現數據傾斜的問題, 所以 LocationStrategies.PreferFixed 容許您指定分區到主機的顯式映射(任何未指定的分區將使用一致的位置)。

consumers 的緩存的默認最大大小爲64. 若是你但願處理超過(64 * executors)Kafka分區,則能夠經過spark.streaming.kafka.consumer.cache.maxCapacity更改此設置。

若是你想禁用 kafka consumer 的緩存, 能夠設置 spark.streaming.kafka.consumer.cache.enabled 爲 false。

kafka consumer cache 的 緩存 是用 topicpartition 和 group.id 作區分的, 所以對於同時啓動 多個receiver, 須要爲每一個 direct stream 建立不一樣的 groupId。

ConsumerStrategies

kafka新的 api中, 提供了大量的不一樣的方法 去指定 topic,其中一部分 要求 特別大的 post對象實例(原文是: post-object-instantiation 不太理解)配置。 ConsumerStrategies 提供了一種抽象,即便從檢查點從新啓動後,Spark也能夠得到正確配置的消費者。

ConsumerStrategies.Subscribe, 容許你訂閱固定的 topic 集合。

ConsumerStrategies.SubscribePattern 容許你使用正則表達式來指定感興趣的主題。

與0.8版本的集成不一樣, 經過以上兩種方式 在運行流期間使用Subscribe或SubscribePattern應該響應添加分區, 在這裏的意思應該是, 即便topic一開始不存在, 即便是動態添加的依然可以在 spark 運行期間 拉取數據。

最後 ConsumerStrategies.Assign 容許你指定特定的 分區。

這三種方式 都支持你 指定 對特定分區的起始offset。

若是你具備上述選項沒法知足的需求,能夠經過 extend ConsumerStrategy實現本身的方法。

最後須要提醒的是:

即便你指定的topic 和 partition 並不存在, 程序也可以正常運行, 這得益於 kafka中的一個參數:

allow.auto.create.topics

默認爲true。

建立 RDD

你能夠經過指定 topic partition 以及 offset的範圍的方式, 來建立RDD

// Import dependencies and create kafka params as in Create Direct Stream above

OffsetRange[] offsetRanges = {
    // topic, partition, inclusive starting offset, exclusive ending offset
    OffsetRange.create("test", 0, 0, 100),
    OffsetRange.create("test", 1, 0, 100)
};

JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
    sparkContext,
    kafkaParams,
    offsetRanges,
    LocationStrategies.PreferConsistent()
);

注意:在這裏不可以使用 LocationStrategies.PreferBrokers 由於在沒有流的狀況下, 缺少驅動側的 consumer 幫你自動查找獲取 broker的元信息。 若是必需要用的話, 使用 PreferFixed 來本身查找元信息。

獲取偏移量

stream.foreachRDD(rdd -> {
    OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    rdd.foreachPartition(consumerRecords -> {
        OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
        System.out.println(
        o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
    });
});

注意: HasOffsetRanges的類型轉換隻有在createDirectStream 獲取到的流, 在流處理的第一個方法調用時纔會成功,而不能在其以後的方法鏈中調用。須要認識到,RDD分區和Kafka分區之間的映射關係,在任何一個repartition或shuffle操做後(如reduceByKey()或Window())函數後都再也不存在。

所以,我每每是經過:

dstream.transform(rdd -> {
    (HasOffsetRanges) rdd.rdd()).offsetRanges();
    return rdd;
})

在這以後再執行更復雜流處理過程。

存儲偏移量

在失敗的狀況下, kafka的交付語義 取決於在何時 其 offset被存儲,存儲則意味着 歸屬於當前 offset以前的全部數據 都已經被正確處理, 所以至關於 以前的數據已經被 丟棄, 不會再度進行處理。

而這也是咱們不使用 enable.auto.commit 爲 true的緣由。

在kafka中的自動提交機制是:

enable.auto.commit 的默認值是 true;就是默認採用自動提交的機制。

auto.commit.interval.ms 的默認值是 5000,單位是毫秒。

這樣,默認5秒鐘,一個 Consumer 將會提交它的 Offset 給 Kafka,或者每一次數據從指定的 Topic 取回時,將會提交最後一次的 Offset。

也就是說,當咱們從 topic partition中取回數據時,每隔固定時間, 這個offset就會被提交。

在絕大多數狀況下, 這並非咱們想要的方式。

spark的 輸出語義是 至少一次,因此 若是 你想要獲取 與 至少一次等效的語義, 你必須在 冪等的輸出操做後存儲 或在一次與輸出操做並行的原子操做中存儲。爲了達到上述目的, 你有如下三種方式去處理:

  1. checkPoints(檢查點)

    若是你打開了Spark的checkpointing選項,偏移量會被保存在checkpoint裏面。

    這確實是一種很簡單的方式, 然而有一些缺點, 首先爲了對於同一數據獲得的輸出是 重複的, 因此你的輸出操做必須是冪等的;事務並非一個好的選擇。

    此外,若是你的代碼有了更改,就不能從checkpoint之中恢復。對於計劃升級,能夠在舊代碼運行的同時部署運行新的代碼來緩解這個問題(由於輸出是冪等的,因此不會形成衝突)。

    可是對於意料以外的故障而須要更改代碼的,除非你有其餘的方式來獲取開始的偏移量,不然就會丟失數據。

  2. kafka自身

    其 auto.commit 沒必要多說, 天然是不合適的, 所以,你能夠在你確保輸出操做已經完成後使用commitSync API向Kafka提交偏移量。與checkpoint方式相比,該種方式的好處是Kafka是一個持久化的存儲,而不須要考慮代碼的更新。 然而, kafka是非事務性的, 所以仍然須要 輸出操做 是冪等的。

    stream.foreachRDD(rdd -> {
         OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    
         // some time later, after outputs have completed
         ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
     });

    這自己就是一種比較好的方式。

  3. 本身實現

    對於支持事務的數據存儲,能夠在同一個事務中保存偏移量,這樣即使在失敗的狀況下也能夠保證二者的同步。

    若是你關心重複的或者跳過的偏移量的範圍,回滾事務能夠防止重複或丟失消息影響結果。這等價於僅僅一次的語義。也可使用這種策略來對那些一般很難保證冪等性的聚合輸出操做起做用。

    // The details depend on your data store, but the general idea looks like this
    
     // begin from the the offsets committed to the database
     Map<TopicPartition, Long> fromOffsets = new HashMap<>();
         for (resultSet : selectOffsetsFromYourDatabase)
             fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
     }
    
     JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
         streamingContext,
         LocationStrategies.PreferConsistent(),
         ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
     );
    
     stream.foreachRDD(rdd -> {
         OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    
         Object results = yourCalculation(rdd);
    
         // begin your transaction
    
         // update results
         // update offsets where the end of existing offsets matches the beginning of this batch of offsets
         // assert that offsets were updated correctly
    
         // end your transaction
     });

    這部分,目前比較常見的方式是, 經過 zk存儲數據,但不限於 zk, redis, mysql等方式都是能夠的。

    由於在這裏的數據更新頻次實際上並不會過高,通常是每一批次提交一次, 所以即便是存儲在mysql中也是能夠接受的。

    在這裏給出我本身的項目所使用的實現:

    import java.util.ArrayList;
     import java.util.Arrays;
     import java.util.HashMap;
     import java.util.List;
     import java.util.Map;
    
     import org.I0Itec.zkclient.ZkClient;
     import org.apache.kafka.clients.consumer.ConsumerRecord;
     import org.apache.kafka.clients.consumer.KafkaConsumer;
     import org.apache.kafka.common.PartitionInfo;
     import org.apache.kafka.common.TopicPartition;
     import org.apache.kafka.common.serialization.StringDeserializer;
     import org.apache.spark.streaming.api.java.JavaInputDStream;
     import org.apache.spark.streaming.api.java.JavaStreamingContext;
     import org.apache.spark.streaming.kafka010.ConsumerStrategies;
     import org.apache.spark.streaming.kafka010.KafkaUtils;
     import org.apache.spark.streaming.kafka010.LocationStrategies;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    
     public class SparkDataSource {
    
         private static final Logger logger = LoggerFactory.getLogger(SparkDataSource.class);
    
         private static final String ZK_PATH_PREFIX = "/consumer/spark/project/offset/";
    
         public static JavaInputDStream<ConsumerRecord<Object, Object>> getInputDStreamByKakfa(JavaStreamingContext jssc, 
                 @SuppressWarnings("rawtypes") Class valueDeserializerClass, 
                 String groupId,
                 String topic
                 ) {
             Map<String, Object> kafkaConfig = new HashMap<>();
             kafkaConfig.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
             kafkaConfig.put("key.deserializer", StringDeserializer.class);
             kafkaConfig.put("value.deserializer", valueDeserializerClass);
             kafkaConfig.put("group.id", "groupId");
             kafkaConfig.put("auto.offset.reset", "lastest");
             kafkaConfig.put("enable.auto.commit", false);
    
             KafkaConsumer<String, Object> consumer = new KafkaConsumer<>(kafkaConfig);
    
             //可能出現鏈接超時, topic 不存在等狀況,會引發報錯,致使啓動中斷.
             while (true) {
                 try {
                     List<TopicPartition> topicPartitions = topicPartitions(consumer, topic);
                     if (topicPartitions != null && !topicPartitions.isEmpty()) {
                         break;
                     }
                     Thread.sleep(5000);
                 } catch (Exception e) {
                     logger.warn("獲取topic partition信息失敗", e);
                 }
             }
    
    
             return KafkaUtils.createDirectStream(jssc, 
                         LocationStrategies.PreferConsistent(),
                         ConsumerStrategies.Subscribe(Arrays.asList(new String[] {topic}), kafkaConfig, getOffset(consumer, topic))
                     );
         }
    
         /**
         * 獲取偏移量, 若是zk中有,則取zk,不然直接去獲取.
         * @param consumer kafkaConsumer
         * @param topic topic
         * @return 最終的topic partition 和 offset
         */
         private static Map<TopicPartition, Long> getOffset(KafkaConsumer<String, Object> consumer, String topic) {
             String zkPath = ZK_PATH_PREFIX + topic;
             //建立zk, 須要傳入自身的鏈接信息。
             ZkClient zkClient = new ZkClient("zkServer");
             //檢查當前路徑是否存在子節點, 默認是有值的,是咱們在保存信息時建立的 zk節點。
             int childNumber = zkClient.countChildren(zkPath);
             Map<TopicPartition, Long> fromOffset = new HashMap<>();
    
             if (childNumber > 0) {
                 //獲取對應topic的最大 offset, 由於若是請求的offset超出最大值是會報錯的.
                 Map<TopicPartition, Long> endOffsets = getEndOffsetByTopic(consumer, topic);
                 for (int i = 0; i < childNumber; i++) {
                     TopicPartition tap = new TopicPartition(topic, i);
                     //存儲kafka對應的各個partition對應 offset 的 路徑.
                     String realPath = zkPath + "/" + i;
                     String offset = zkClient.readData(realPath);
                     Long lastOffset = endOffsets.get(tap);
                     //然而這種方式也不見得徹底正確, 依然存在一種可能性,topic已經被刪除,這是從新建立的數據, 且已經灌入一批數據
                     //因此此時應該選擇從頭開始讀, 或者說從最新處開始讀,要看我的選擇, 同時最好能夠加入相關的信息標識
                     //代表是來自同一批數據.
                     if (lastOffset != null) {
                         if (lastOffset < Long.parseLong(offset)) {
                             //若是記錄的offset過大,則能夠選擇最新的offset.
                             fromOffset.put(tap, lastOffset);
                         } else {
                             fromOffset.put(tap, Long.parseLong(offset));
                         }
                     } else {
                         //若是爲null的話, 說明kafka的分區可能已經通過調整, 須要刪除zk對應的節點.
                         zkClient.delete(realPath);
                     }
                 }
             } else {
                 fromOffset = getBeginningOffsetByTopic(consumer, topic);
             }
             return fromOffset;
         }
    
         private static Map<TopicPartition, Long> getEndOffsetByTopic(KafkaConsumer<String, Object> consumer, String topic) {
             return consumer.endOffsets(topicPartitions(consumer, topic));
         }
    
         private static Map<TopicPartition, Long> getBeginningOffsetByTopic(KafkaConsumer<String, Object> consumer, String topic) {
             return consumer.beginningOffsets(topicPartitions(consumer, topic));
         }
    
         private static List<TopicPartition> topicPartitions(KafkaConsumer<String, Object> consumer, String topic) {
             List<PartitionInfo> partitions = consumer.partitionsFor(topic);
             List<TopicPartition> topicPartitons = new ArrayList<>(partitions.size());
             partitions.forEach(pInfo -> {
                 topicPartitons.add(new TopicPartition(topic, pInfo.partition()));
             });
             return topicPartitons;
         }
     }

    存儲 offset卻是沒有什麼特別的地方, 主要是在 項目啓動 offset的獲取上。

SSL / TLS

Tips: HTTPS、SSL、TLS三者之間的聯繫和區別 通俗來講, TLS就是 SSL標準化後的產物。

新的kafkaConsumer支持 SSL,爲了支持這一點, 須要在接入kafka以前 加入一部分配置, 注意,這僅僅適用於spark和kafka 服務器之間的交流,你一樣須要保證Spark節點內部之間的安全(Spark安全)通訊。

Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");

程序部署

對於JAVA或Scala應用來講,若是你使用SBT或MAVEN來作項目管理,須要將spark-streaming-kafka-010_2.11包以及它的依賴包添加到你的應用的JAR包中。確保spark-core+2.11包和spark-streaming_2.11包在你的依賴中位provided級別,由於他們在Spark的安裝包中已經提供了。接下來使用spark-submit命令來部署你的應用。

相關文章
相關標籤/搜索