【python】spark+kafka使用

網上用python寫spark+kafka的資料好少啊 本身記錄一點踩到的坑~html

 

spark+kafka介紹的官方網址:http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.htmlpython

python的pyspark庫函數文檔:http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html?highlight=kafkautils.createdirectstream#pyspark.streaming.kafka.KafkaUtils.createDirectStreamapache

上面兩個是最重要的資料,大多數問題能夠經過仔細研讀上面兩個文檔獲得答案json

 

官網上說了,spark和kafka連用有兩種方式:接收器形式  以及 直連形式api

 

1、 接收器形式

優勢:支持kafka的group.id設置,支持用kafka api查詢offset,若是數據斷掉後,能夠經過group.id輕鬆找到上一次失敗的位置app

缺點:分佈式

1.失敗處理複雜。因爲kafka隊列信息由kafka本身記錄,當spark消費了數據可是處理中出錯時會致使數據丟失。爲了不數據丟失就必須開啓Write Ahead Logs,把spark接收到的數據都存儲到分佈式文件系統中,好比HDFS,而後失敗時從存儲的記錄中找到失敗的消息。這致使同一批數據被kafka和spark存儲了2次。形成數據冗餘。函數

2.若是有多個地方都想獲取同一個kafka隊列的數據,必須創建多個流,沒法用一個流並行處理。spa

該方法是比較老的一種方式,並不太被推薦。code

 

2、直連形式

優勢:

1. 不需兩次存儲數據,直連形式時,spark本身管理偏移信息,再也不使用kafka的offset信息。因此spark能夠自行處理失敗狀況,不要再次存儲數據。spark保證數據傳輸時Exactly-once。

2.只需創建一個流就能夠並行的在多個地方使用流中的數據

缺點:

不支持kafka的group,不支持經過kafka api查詢offset信息!!!!

在鏈接後spark會根據fromOffsets參數設置起始offset,默認是從最新的數據開始的。也就是說,必須本身記錄spark消耗的offset位置。不然在兩次腳本啓動中間的數據都會丟失。

 

我選用的是直連形式,我處理offset的方法是將spark消費的offset信息實時記錄到文件中。在啓動腳本時經過記錄的文件來找到起始位置。

#!/usr/bin/python
# coding=utf-8
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
import time
import os
import json
broker_list = "xxxx"
topic_name = "xxxx"
timer = 5
offsetRanges = []


def store_offset_ranges(rdd):
    global offsetRanges
    offsetRanges = rdd.offsetRanges()
    return rdd


def save_offset_ranges(rdd):
    root_path = os.path.dirname(os.path.realpath(__file__))
    record_path = os.path.join(root_path, "offset.txt")
    data = dict()
    f = open(record_path, "w")
    for o in offsetRanges:
        data = {"topic": o.topic, "partition": o.partition, "fromOffset": o.fromOffset, "untilOffset": o.untilOffset}
    f.write(json.dumps(data))
    f.close()


def deal_data(rdd):
    data = rdd.collect()
    for d in data:
        # do something
        pass

def save_by_spark_streaming():
    root_path = os.path.dirname(os.path.realpath(__file__))
    record_path = os.path.join(root_path, "offset.txt")
    from_offsets = {}
    # 獲取已有的offset,沒有記錄文件時則用默認值即最大值
    if os.path.exists(record_path):
        f = open(record_path, "r")
        offset_data = json.loads(f.read())
        f.close()
        if offset_data["topic"] != topic_name:
            raise Exception("the topic name in offset.txt is incorrect")

        topic_partion = TopicAndPartition(offset_data["topic"], offset_data["partition"])
        from_offsets = {topic_partion: long(offset_data["untilOffset"])}  # 注意設置起始offset時的方法
        print "start from offsets: %s" % from_offsets

    sc = SparkContext(appName="Realtime-Analytics-Engine")
    ssc = StreamingContext(sc, int(timer))

    kvs = KafkaUtils.createDirectStream(ssc=ssc, topics=[topic_name], fromOffsets=from_offsets,
                                        kafkaParams={"metadata.broker.list": broker_list})
    kvs.foreachRDD(lambda rec: deal_data(rec))
    kvs.transform(store_offset_ranges).foreachRDD(save_offset_ranges)

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()


if __name__ == '__main__':
    save_by_spark_streaming()

 

運行:

正常狀況下,只要輸入下面的語句就能夠運行了

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 spark_kafka.py

然而,個人老是報錯,找不到依賴包,說各類庫不認識。因此我只好用--jars來手動指定包的位置了..................

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 --jars /root/.ivy2/jars/org.apache.kafka_kafka_2.11-0.8.2.1.jar,/root/.ivy2/jars/com.yammer.metrics_metrics-core-2.2.0.jar spark_kafka.py

 

 

 

吐槽:

我就踩在直連形式不支持offset的坑上了..... 開始官方文檔沒仔細看,就瞄了一眼說是直連形式好,就豪不猶豫的用了。結果個人腳本不穩定,各類斷,而後中間數據就各類丟啊.......

還有官網上竟然徹底沒有對fromOffsets這個參數的說明,我找了很久很久才弄清楚這個參數怎麼拼出來啊.................

相關文章
相關標籤/搜索