Kafka文件存儲機制及offset存取

Kafka是什麼

Kafka是最初由Linkedin公司開發,是一個分佈式、分區的、多副本的、多訂閱者,基於zookeeper協調的分佈式日誌系統(也能夠當作MQ系統),常見能夠用於web/nginx日誌、訪問日誌,消息服務等等,Linkedin於2010年貢獻給了Apache基金會併成爲頂級開源項目。css

1.前言

一個商業化消息隊列的性能好壞,其文件存儲機制設計是衡量一個消息隊列服務技術水平和最關鍵指標之一。
下面將從Kafka文件存儲機制和物理結構角度,分析Kafka是如何實現高效文件存儲,及實際應用效果。html

2.Kafka文件存儲機制

Kafka部分名詞解釋以下:java

  • Broker:消息中間件處理結點,一個Kafka節點就是一個broker,多個broker能夠組成一個Kafka集羣。
  • Topic:一類消息,例如page view日誌、click日誌等均可以以topic的形式存在,Kafka集羣可以同時負責多個topic的分發。
  • Partition:topic物理上的分組,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。
  • Segment:partition物理上由多個segment組成,下面2.2和2.3有詳細說明。
  • offset:每一個partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到partition中。partition中的每一個消息都有一個連續的序列號叫作offset,用於partition惟一標識一條消息.

分析過程分爲如下4個步驟:python

  • topic中partition存儲分佈
  • partiton中文件存儲方式
  • partiton中segment文件存儲結構
  • 在partition中如何經過offset查找message

經過上述4過程詳細分析,咱們就能夠清楚認識到kafka文件存儲機制的奧祕。nginx

2.1 topic中partition存儲分佈

假設實驗環境中Kafka集羣只有一個broker,xxx/message-folder爲數據文件存儲根目錄,在Kafka broker中server.properties文件配置(參數log.dirs=xxx/message-folder),例如建立2個topic名稱分別爲report_push、launch_info, partitions數量都爲partitions=4
存儲路徑和目錄規則爲:git

複製代碼

xxx/message-folder

              |--report_push-0
              |--report_push-1
              |--report_push-2
              |--report_push-3
              |--launch_info-0
              |--launch_info-1
              |--launch_info-2
              |--launch_info-3

複製代碼

 

在Kafka文件存儲中,同一個topic下有多個不一樣partition,每一個partition爲一個目錄,partiton命名規則爲topic名稱+有序序號,第一個partiton序號從0開始,序號最大值爲partitions數量減1。
若是是多broker分佈狀況,請參考kafka集羣partition分佈原理分析github

2.2 partiton中文件存儲方式

下面示意圖形象說明了partition中文件存儲方式:
imageweb

圖1
  • 每一個partion(目錄)至關於一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每一個段segment file消息數量不必定相等,這種特性方便old segment file快速被刪除。
  • 每一個partiton只須要支持順序讀寫就好了,segment文件生命週期由服務端配置參數決定。

這樣作的好處就是能快速刪除無用文件,有效提升磁盤利用率。redis

2.3 partiton中segment文件存儲結構

讀者從2.2節瞭解到Kafka文件系統partition存儲方式,本節深刻分析partion中segment file組成和物理結構。sql

  • segment file組成:由2大部分組成,分別爲index file和data file,此2個文件一一對應,成對出現,後綴".index"和「.log」分別表示爲segment索引文件、數據文件.
  • segment文件命名規則:partion全局的第一個segment從0開始,後續每一個segment文件名爲上一個segment文件最後一條消息的offset值。數值最大爲64位long大小,19位數字字符長度,沒有數字用0填充。

下面文件列表是筆者在Kafka broker上作的一個實驗,建立一個topicXXX包含1 partition,設置每一個segment大小爲500MB,並啓動producer向Kafka broker寫入大量數據,以下圖2所示segment文件列表形象說明了上述2個規則:
image

圖2

以上述圖2中一對segment file文件爲例,說明segment中index<—->data file對應關係物理結構以下:
image

圖3

上述圖3中索引文件存儲大量元數據,數據文件存儲大量消息,索引文件中元數據指向對應數據文件中message的物理偏移地址。
其中以索引文件中元數據3,497爲例,依次在數據文件中表示第3個message(在全局partiton表示第368772個message)、以及該消息的物理偏移地址爲497。

從上述圖3瞭解到segment data file由許多message組成,下面詳細說明message物理結構以下:
image

圖4

參數說明:

關鍵字 解釋說明
8 byte offset 在parition(分區)內的每條消息都有一個有序的id號,這個id號被稱爲偏移(offset),它能夠惟一肯定每條消息在parition(分區)內的位置。即offset表示partiion的第多少message
4 byte message size message大小
4 byte CRC32 用crc32校驗message
1 byte 「magic" 表示本次發佈Kafka服務程序協議版本號
1 byte 「attributes" 表示爲獨立版本、或標識壓縮類型、或編碼類型。
4 byte key length 表示key的長度,當key爲-1時,K byte key字段不填
K byte key 可選
value bytes payload 表示實際消息數據。

2.4 在partition中如何經過offset查找message

例如讀取offset=368776的message,須要經過下面2個步驟查找。

  • 第一步查找segment file
    上述圖2爲例,其中00000000000000000000.index表示最開始的文件,起始偏移量(offset)爲0.第二個文件00000000000000368769.index的消息量起始偏移量爲368770 = 368769 + 1.一樣,第三個文件00000000000000737337.index的起始偏移量爲737338=737337 + 1,其餘後續文件依次類推,以起始偏移量命名並排序這些文件,只要根據offset **二分查找**文件列表,就能夠快速定位到具體文件。
    當offset=368776時定位到00000000000000368769.index|log

  • 第二步經過segment file查找message
    經過第一步定位到segment file,當offset=368776時,依次定位到00000000000000368769.index的元數據物理位置和00000000000000368769.log的物理偏移地址,而後再經過00000000000000368769.log順序查找直到offset=368776爲止。

從上述圖3可知這樣作的優勢,segment index file採起稀疏索引存儲方式,它減小索引文件大小,經過mmap能夠直接內存操做,稀疏索引爲數據文件的每一個對應message設置一個元數據指針,它比稠密索引節省了更多的存儲空間,但查找起來須要消耗更多的時間。

3 Kafka文件存儲機制–實際運行效果

實驗環境:

  • Kafka集羣:由2臺虛擬機組成
  • cpu:4核
  • 物理內存:8GB
  • 網卡:千兆網卡
  • jvm heap: 4GB
  • 詳細Kafka服務端配置及其優化請參考:kafka server.properties配置詳解

image

圖5

從上述圖5能夠看出,Kafka運行時不多有大量讀磁盤的操做,主要是按期批量寫磁盤操做,所以操做磁盤很高效。這跟Kafka文件存儲中讀寫message的設計是息息相關的。Kafka中讀寫message有以下特色:

寫message

  • 消息從java堆轉入page cache(即物理內存)。
  • 由異步線程刷盤,消息從page cache刷入磁盤。

讀message

  • 消息直接從page cache轉入socket發送出去。
  • 當從page cache沒有找到相應數據時,此時會產生磁盤IO,從磁
    盤Load消息到page cache,而後直接從socket發出去

4.offset存儲方式

  • 一、在kafka 0.9版本以後,kafka爲了下降zookeeper的io讀寫減小network data transfer,也本身實現了在kafka server上存儲consumer,topic,partitions,offset信息將消費的 offset 遷入到了 Kafka 一個名爲 __consumer_offsets 的Topic中。
  • 二、將消費的 offset 存放在 Zookeeper 集羣中。
  • 三、將offset存放至第三方存儲,如Redis, 爲了嚴格實現不重複消費

下面分別說一下這三種存儲方式的實現

4.1 __consumer_offsets [kafka]

  下面的代碼案例實現了test這一topic的數據連續消費

 

複製代碼

from kafka import KafkaConsumer

class KafkaStreamTest:
    '''
    This class consume all external Kafka topics'''

    def __init__(self):

        self.appName = "kafkatest"

        self.kafkaHosts = "192.168.4.201:6667,192.168.4.231:6667"
        self.kafkaAutoOffsetReset = "largest"
        self._kafka_topic = "test"

    def start(self):
        reload(sys)
        sys.setdefaultencoding('utf-8')

        elogging.debug(self.appName, elogging.normalCID(), "receiver starting")
        consumer = KafkaConsumer('test',  bootstrap_servers=['192.168.4.201:6667','192.168.4.231:6667'], enable_auto_commit=True, auto_offset_reset='earliest')
        #consumer = KafkaConsumer('test', bootstrap_servers=['192.168.4.201:6667', '192.168.4.231:6667'], auto_offset_reset='earliest')
        while True:
            # The definition of KafkaMessage:
            # KafkaMessage = namedtuple("KafkaMessage",
            #     ["topic", "partition", "offset", "key", "value"])
            kafkaMsg = consumer.next()

            # for debug
            print kafkaMsg.topic, kafkaMsg.partition, kafkaMsg.offset, kafkaMsg.key, kafkaMsg.value

if __name__ =="__main__":
    test = KafkaStreamTest()
    test.start()

複製代碼

 

enable_auto_commit (bool) – If True , the consumer’s offset will be periodically committed in the background. Default: True設置爲true,表示offset自動託管到kafka內部的一個特定名稱爲__consumer_offsets的topic

auto_offset_reset:What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset

只有當offset不存在的時候,才用latest或者earliest

其餘詳細內容請參看

https://github.com/dpkp/kafka-python

https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

https://stackoverflow.com/questions/35432326/how-to-get-latest-offset-for-a-partition-for-a-kafka-topic

Kafka 如何讀取offset topic內容 (__consumer_offsets)

kafka 0.9.0.0 __consumer_offsets日誌清理問題?

Kafka 0.10.2<auto.offset.reset和enable.auto.commit>

 

4.2 zookeeper

請參考

spark createDirectStream保存kafka offset

修改kafka topic的offset幾種方法

 

4.3 Redis[推薦]

複製代碼

import os
import sys
sys.path.append("..")
sys.path.append(sys.argv[0][:sys.argv[0].rfind(os.path.join('com','ericsson'))])

import copy
import traceback
import redis
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext, DStream
from pyspark.sql import SQLContext
import simplejson as json
from com.ericsson.analytics.fms.common.common import ELogForDistributedApp,getSqlContextInstance
from pyspark.streaming.kafka import KafkaUtils,TopicAndPartition
from com.ericsson.analytics.oamf.client.logging import elogging
from com.ericsson.analytics.fms.common.common import HDFSOperation

class KafkaStreamTest:
    '''
    This class consume all external Kafka topics, store the data into Parquet and send the data to internal Kafka topics
    '''

    def __init__(self):

        self.appName = "kafkatest"

        self.kafkaHosts = "192.168.4.201:6667,192.168.4.231:6667"
        self.kafkaAutoOffsetReset = "largest"
        self.kafka_offset_redis_db = 6
        self._kafka_topic = "test"
        self.redisHost = "192.168.4.231"
        self.redisPort = 6379

        self.spark_batch_duration = 20

    def createStreamingContext(self, sc):
        ssc = StreamingContext(sc, self.spark_batch_duration)
        ds = self.getDStreamFromKafka(ssc)
        if ds is not None:
            elogging.info(self.appName, elogging.normalCID(), "Kafka succeeded to getting the data")
            return ssc, ds
        else:
            return None, None

    def getDStreamFromKafka(self, ssc):
        kafkaParams = {"metadata.broker.list": self.kafkaHosts}
        elogging.debug(self.appName, elogging.normalCID(), kafkaParams)

        sc = ssc.sparkContext
        dstream = None
        try:
            redisConn = self.getRedisConnection(self.kafka_offset_redis_db)
            if redisConn.exists(self.appName):
                elogging.debug(self.appName, elogging.normalCID(), "key " + self.appName + " exists in redis")
                fromOffset = {}
                offsetListStr = redisConn.get(self.appName)
                offsetList = eval(offsetListStr)
                for offset in offsetList:
                    elogging.debug(self.appName, elogging.normalCID(), str(offset))
                    topicPartion = TopicAndPartition(offset["topic"], offset["partition"])
                    fromOffset[topicPartion] = offset["untilOffset"]
                dstream = KafkaUtils.createDirectStream(ssc, [self._kafka_topic], kafkaParams, fromOffset)
            else:
                kafkaParams = {"metadata.broker.list": self.kafkaHosts, "auto.offset.reset": self.kafkaAutoOffsetReset}
                elogging.debug(self.appName, elogging.normalCID(), "key " + self.appName + " doesn't exist in redis")
                dstream = KafkaUtils.createDirectStream(ssc, [self._kafka_topic], kafkaParams)
        except:
            traceInfo = traceback.format_exc()
            elogging.error(self.appName, elogging.faultCID(), "failed to create DStream : " + traceInfo)

        return dstream

    def getRedisConnection(self, redisDB):
        try:
            pool = redis.ConnectionPool(host=self.redisHost, port=self.redisPort, db=redisDB)
            redisConn = redis.Redis(connection_pool=pool)
        except:
            traceInfo = traceback.format_exc()
            elogging.error(self.appName, elogging.faultCID(), "failed to create DStream : " + traceInfo)
            return None

        return redisConn

    def getOffSetRangesFromRDD(self, rdd):
        try:
            offsetRanges = rdd.offsetRanges()
        except:
            traceInfo = traceback.format_exc()
            elogging.error(self.appName, elogging.faultCID(), "failed to call rdd.offsetRanges() function : " + traceInfo)
            return None

        offsetList = []
        for offset in offsetRanges:
            offsetList.append({"topic": offset.topic, "partition": offset.partition, "fromOffset": offset.fromOffset,
                               "untilOffset": offset.untilOffset})

        elogging.info(self.appName, elogging.normalCID(), "getOffSetRangesFromRDD, offsetList: " + str(offsetList))
        return offsetList

    def saveOffSetRangesToRedis(self, offsetList):
        redisConn = self.getRedisConnection(self.kafka_offset_redis_db)
        if redisConn is not None:
            redisConn.set(self.appName, offsetList)
            elogging.info(self.appName, elogging.normalCID(), "saveOffSetRangesToRedis, offsetList : " + str(offsetList))

    def handleMessages(self, runTime, rdd):
        elogging.debug(self.appName, elogging.normalCID(), "========= %s =========" % str(runTime))
        offsetList = self.getOffSetRangesFromRDD(rdd)
        if offsetList is not None:
            self.saveOffSetRangesToRedis(offsetList)

        rddFilter = rdd.map(lambda p: p[1])

        counts = rddFilter.flatMap(lambda line: line.split(" ")) \
            .map(lambda word: (word, 1)) \
            .reduceByKey(lambda a, b: a + b)
        sqlContext = getSqlContextInstance(rddFilter.context)
        if counts is not None:
            df = sqlContext.createDataFrame(counts)
            df.show()

    def start(self):
        reload(sys)
        sys.setdefaultencoding('utf-8')
        sc = SparkContext(appName=self.appName)
        eloggingConfig = None
        try:
            eloggingConfig = HDFSOperation.getConfigFromHDFS(ELogForDistributedApp.LOGHDFSPATH, sc)
            elogging.initLogFromDict(eloggingConfig)
        except StandardError, se:
            pass

        elogging.debug(self.appName, elogging.normalCID(), "receiver starting")
        configInfoStr = 'kafkaHosts:' + str(self.kafkaHosts) + ', kafkaAutoOffsetReset:' + str(self.kafkaAutoOffsetReset) + \
                        ', kafka_offset_redis_db:' + str(self.kafka_offset_redis_db) + ', spark_batch_duration:' + str(self.spark_batch_duration) + \
                        ', redisHost:' + str(self.redisHost) + ', redisPort:' + str(self.redisPort)
        elogging.info(self.appName, elogging.normalCID(), configInfoStr)
        ssc, newDS = self.createStreamingContext(sc)
        if newDS is not None:
            newDS.foreachRDD(self.handleMessages)
            ssc.start()
            elogging.debug(self.appName, elogging.normalCID(), "StreamingContext start")
            ssc.awaitTermination()
            elogging.debug(self.appName, elogging.normalCID(), "receiver end")
        else:
            traceInfo = traceback.format_exc()
            elogging.error(self.appName, elogging.faultCID(), "Failed to create DStream " + traceInfo)

if __name__ =="__main__":
    test = KafkaStreamTest()
    test.start()

複製代碼

 

 

 

 

5.總結

Kafka高效文件存儲設計特色

  • Kafka把topic中一個parition大文件分紅多個小文件段,經過多個小文件段,就容易按期清除或刪除已經消費完文件,減小磁盤佔用。
  • 經過索引信息能夠快速定位message和肯定response的最大大小。
  • 經過index元數據所有映射到memory,能夠避免segment file的IO磁盤操做。
  • 經過索引文件稀疏存儲,能夠大幅下降index文件元數據佔用空間大小。

參考

1.Linux Page Cache機制
2.Kafka官方文檔 

3.Kafka Offset Storage

相關文章
相關標籤/搜索