Kafka是最初由Linkedin公司開發,是一個分佈式、分區的、多副本的、多訂閱者,基於zookeeper協調的分佈式日誌系統(也能夠當作MQ系統),常見能夠用於web/nginx日誌、訪問日誌,消息服務等等,Linkedin於2010年貢獻給了Apache基金會併成爲頂級開源項目。css
一個商業化消息隊列的性能好壞,其文件存儲機制設計是衡量一個消息隊列服務技術水平和最關鍵指標之一。
下面將從Kafka文件存儲機制和物理結構角度,分析Kafka是如何實現高效文件存儲,及實際應用效果。html
Kafka部分名詞解釋以下:java
分析過程分爲如下4個步驟:python
經過上述4過程詳細分析,咱們就能夠清楚認識到kafka文件存儲機制的奧祕。nginx
假設實驗環境中Kafka集羣只有一個broker,xxx/message-folder爲數據文件存儲根目錄,在Kafka broker中server.properties文件配置(參數log.dirs=xxx/message-folder),例如建立2個topic名稱分別爲report_push、launch_info, partitions數量都爲partitions=4
存儲路徑和目錄規則爲:git
xxx/message-folder |--report_push-0 |--report_push-1 |--report_push-2 |--report_push-3 |--launch_info-0 |--launch_info-1 |--launch_info-2 |--launch_info-3
在Kafka文件存儲中,同一個topic下有多個不一樣partition,每一個partition爲一個目錄,partiton命名規則爲topic名稱+有序序號,第一個partiton序號從0開始,序號最大值爲partitions數量減1。
若是是多broker分佈狀況,請參考kafka集羣partition分佈原理分析github
下面示意圖形象說明了partition中文件存儲方式:
web
圖1
這樣作的好處就是能快速刪除無用文件,有效提升磁盤利用率。redis
讀者從2.2節瞭解到Kafka文件系統partition存儲方式,本節深刻分析partion中segment file組成和物理結構。sql
下面文件列表是筆者在Kafka broker上作的一個實驗,建立一個topicXXX包含1 partition,設置每一個segment大小爲500MB,並啓動producer向Kafka broker寫入大量數據,以下圖2所示segment文件列表形象說明了上述2個規則:
圖2
以上述圖2中一對segment file文件爲例,說明segment中index<—->data file對應關係物理結構以下:
圖3
上述圖3中索引文件存儲大量元數據,數據文件存儲大量消息,索引文件中元數據指向對應數據文件中message的物理偏移地址。
其中以索引文件中元數據3,497爲例,依次在數據文件中表示第3個message(在全局partiton表示第368772個message)、以及該消息的物理偏移地址爲497。
從上述圖3瞭解到segment data file由許多message組成,下面詳細說明message物理結構以下:
圖4
關鍵字 | 解釋說明 |
---|---|
8 byte offset | 在parition(分區)內的每條消息都有一個有序的id號,這個id號被稱爲偏移(offset),它能夠惟一肯定每條消息在parition(分區)內的位置。即offset表示partiion的第多少message |
4 byte message size | message大小 |
4 byte CRC32 | 用crc32校驗message |
1 byte 「magic" | 表示本次發佈Kafka服務程序協議版本號 |
1 byte 「attributes" | 表示爲獨立版本、或標識壓縮類型、或編碼類型。 |
4 byte key length | 表示key的長度,當key爲-1時,K byte key字段不填 |
K byte key | 可選 |
value bytes payload | 表示實際消息數據。 |
例如讀取offset=368776的message,須要經過下面2個步驟查找。
第一步查找segment file
上述圖2爲例,其中00000000000000000000.index表示最開始的文件,起始偏移量(offset)爲0.第二個文件00000000000000368769.index的消息量起始偏移量爲368770 = 368769 + 1.一樣,第三個文件00000000000000737337.index的起始偏移量爲737338=737337 + 1,其餘後續文件依次類推,以起始偏移量命名並排序這些文件,只要根據offset **二分查找**文件列表,就能夠快速定位到具體文件。
當offset=368776時定位到00000000000000368769.index|log
第二步經過segment file查找message
經過第一步定位到segment file,當offset=368776時,依次定位到00000000000000368769.index的元數據物理位置和00000000000000368769.log的物理偏移地址,而後再經過00000000000000368769.log順序查找直到offset=368776爲止。
從上述圖3可知這樣作的優勢,segment index file採起稀疏索引存儲方式,它減小索引文件大小,經過mmap能夠直接內存操做,稀疏索引爲數據文件的每一個對應message設置一個元數據指針,它比稠密索引節省了更多的存儲空間,但查找起來須要消耗更多的時間。
實驗環境:
圖5
從上述圖5能夠看出,Kafka運行時不多有大量讀磁盤的操做,主要是按期批量寫磁盤操做,所以操做磁盤很高效。這跟Kafka文件存儲中讀寫message的設計是息息相關的。Kafka中讀寫message有以下特色:
寫message
讀message
下面分別說一下這三種存儲方式的實現
下面的代碼案例實現了test這一topic的數據連續消費
from kafka import KafkaConsumer class KafkaStreamTest: ''' This class consume all external Kafka topics''' def __init__(self): self.appName = "kafkatest" self.kafkaHosts = "192.168.4.201:6667,192.168.4.231:6667" self.kafkaAutoOffsetReset = "largest" self._kafka_topic = "test" def start(self): reload(sys) sys.setdefaultencoding('utf-8') elogging.debug(self.appName, elogging.normalCID(), "receiver starting") consumer = KafkaConsumer('test', bootstrap_servers=['192.168.4.201:6667','192.168.4.231:6667'], enable_auto_commit=True, auto_offset_reset='earliest') #consumer = KafkaConsumer('test', bootstrap_servers=['192.168.4.201:6667', '192.168.4.231:6667'], auto_offset_reset='earliest') while True: # The definition of KafkaMessage: # KafkaMessage = namedtuple("KafkaMessage", # ["topic", "partition", "offset", "key", "value"]) kafkaMsg = consumer.next() # for debug print kafkaMsg.topic, kafkaMsg.partition, kafkaMsg.offset, kafkaMsg.key, kafkaMsg.value if __name__ =="__main__": test = KafkaStreamTest() test.start()
enable_auto_commit (bool) – If True , the consumer’s offset will be periodically committed in the background. Default: True設置爲true,表示offset自動託管到kafka內部的一個特定名稱爲__consumer_offsets的topic
auto_offset_reset:What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
只有當offset不存在的時候,才用latest或者earliest
其餘詳細內容請參看
https://github.com/dpkp/kafka-python
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
https://stackoverflow.com/questions/35432326/how-to-get-latest-offset-for-a-partition-for-a-kafka-topic
Kafka 如何讀取offset topic內容 (__consumer_offsets)
kafka 0.9.0.0 __consumer_offsets日誌清理問題?
Kafka 0.10.2<auto.offset.reset和enable.auto.commit>
請參考
spark createDirectStream保存kafka offset
import os import sys sys.path.append("..") sys.path.append(sys.argv[0][:sys.argv[0].rfind(os.path.join('com','ericsson'))]) import copy import traceback import redis from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext, DStream from pyspark.sql import SQLContext import simplejson as json from com.ericsson.analytics.fms.common.common import ELogForDistributedApp,getSqlContextInstance from pyspark.streaming.kafka import KafkaUtils,TopicAndPartition from com.ericsson.analytics.oamf.client.logging import elogging from com.ericsson.analytics.fms.common.common import HDFSOperation class KafkaStreamTest: ''' This class consume all external Kafka topics, store the data into Parquet and send the data to internal Kafka topics ''' def __init__(self): self.appName = "kafkatest" self.kafkaHosts = "192.168.4.201:6667,192.168.4.231:6667" self.kafkaAutoOffsetReset = "largest" self.kafka_offset_redis_db = 6 self._kafka_topic = "test" self.redisHost = "192.168.4.231" self.redisPort = 6379 self.spark_batch_duration = 20 def createStreamingContext(self, sc): ssc = StreamingContext(sc, self.spark_batch_duration) ds = self.getDStreamFromKafka(ssc) if ds is not None: elogging.info(self.appName, elogging.normalCID(), "Kafka succeeded to getting the data") return ssc, ds else: return None, None def getDStreamFromKafka(self, ssc): kafkaParams = {"metadata.broker.list": self.kafkaHosts} elogging.debug(self.appName, elogging.normalCID(), kafkaParams) sc = ssc.sparkContext dstream = None try: redisConn = self.getRedisConnection(self.kafka_offset_redis_db) if redisConn.exists(self.appName): elogging.debug(self.appName, elogging.normalCID(), "key " + self.appName + " exists in redis") fromOffset = {} offsetListStr = redisConn.get(self.appName) offsetList = eval(offsetListStr) for offset in offsetList: elogging.debug(self.appName, elogging.normalCID(), str(offset)) topicPartion = TopicAndPartition(offset["topic"], offset["partition"]) fromOffset[topicPartion] = offset["untilOffset"] dstream = KafkaUtils.createDirectStream(ssc, [self._kafka_topic], kafkaParams, fromOffset) else: kafkaParams = {"metadata.broker.list": self.kafkaHosts, "auto.offset.reset": self.kafkaAutoOffsetReset} elogging.debug(self.appName, elogging.normalCID(), "key " + self.appName + " doesn't exist in redis") dstream = KafkaUtils.createDirectStream(ssc, [self._kafka_topic], kafkaParams) except: traceInfo = traceback.format_exc() elogging.error(self.appName, elogging.faultCID(), "failed to create DStream : " + traceInfo) return dstream def getRedisConnection(self, redisDB): try: pool = redis.ConnectionPool(host=self.redisHost, port=self.redisPort, db=redisDB) redisConn = redis.Redis(connection_pool=pool) except: traceInfo = traceback.format_exc() elogging.error(self.appName, elogging.faultCID(), "failed to create DStream : " + traceInfo) return None return redisConn def getOffSetRangesFromRDD(self, rdd): try: offsetRanges = rdd.offsetRanges() except: traceInfo = traceback.format_exc() elogging.error(self.appName, elogging.faultCID(), "failed to call rdd.offsetRanges() function : " + traceInfo) return None offsetList = [] for offset in offsetRanges: offsetList.append({"topic": offset.topic, "partition": offset.partition, "fromOffset": offset.fromOffset, "untilOffset": offset.untilOffset}) elogging.info(self.appName, elogging.normalCID(), "getOffSetRangesFromRDD, offsetList: " + str(offsetList)) return offsetList def saveOffSetRangesToRedis(self, offsetList): redisConn = self.getRedisConnection(self.kafka_offset_redis_db) if redisConn is not None: redisConn.set(self.appName, offsetList) elogging.info(self.appName, elogging.normalCID(), "saveOffSetRangesToRedis, offsetList : " + str(offsetList)) def handleMessages(self, runTime, rdd): elogging.debug(self.appName, elogging.normalCID(), "========= %s =========" % str(runTime)) offsetList = self.getOffSetRangesFromRDD(rdd) if offsetList is not None: self.saveOffSetRangesToRedis(offsetList) rddFilter = rdd.map(lambda p: p[1]) counts = rddFilter.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b) sqlContext = getSqlContextInstance(rddFilter.context) if counts is not None: df = sqlContext.createDataFrame(counts) df.show() def start(self): reload(sys) sys.setdefaultencoding('utf-8') sc = SparkContext(appName=self.appName) eloggingConfig = None try: eloggingConfig = HDFSOperation.getConfigFromHDFS(ELogForDistributedApp.LOGHDFSPATH, sc) elogging.initLogFromDict(eloggingConfig) except StandardError, se: pass elogging.debug(self.appName, elogging.normalCID(), "receiver starting") configInfoStr = 'kafkaHosts:' + str(self.kafkaHosts) + ', kafkaAutoOffsetReset:' + str(self.kafkaAutoOffsetReset) + \ ', kafka_offset_redis_db:' + str(self.kafka_offset_redis_db) + ', spark_batch_duration:' + str(self.spark_batch_duration) + \ ', redisHost:' + str(self.redisHost) + ', redisPort:' + str(self.redisPort) elogging.info(self.appName, elogging.normalCID(), configInfoStr) ssc, newDS = self.createStreamingContext(sc) if newDS is not None: newDS.foreachRDD(self.handleMessages) ssc.start() elogging.debug(self.appName, elogging.normalCID(), "StreamingContext start") ssc.awaitTermination() elogging.debug(self.appName, elogging.normalCID(), "receiver end") else: traceInfo = traceback.format_exc() elogging.error(self.appName, elogging.faultCID(), "Failed to create DStream " + traceInfo) if __name__ =="__main__": test = KafkaStreamTest() test.start()
Kafka高效文件存儲設計特色