本文首發於 vivo互聯網技術 微信公衆號 mp.weixin.qq.com/s/bV8AhqAjQ…
做者簡介:鄭志彬,畢業於華南理工大學計算機科學與技術(雙語班)。前後從事過電子商務、開放平臺、移動瀏覽器、推薦廣告和大數據、人工智能等相關開發和架構。目前在vivo智能平臺中心從事 AI中臺建設以及廣告推薦業務。擅長各類業務形態的業務架構、平臺化以及各類業務解決方案。
博客地址:arganzheng.life。html
最近要把原來作的那套集中式日誌監控系統進行遷移,原來的實現方案是: Log Agent => Log Server => ElasticSearch => Kibana,其中Log Agent和Log Server之間走的是Thrift RPC,本身實現了一個簡單的負載均衡(WRB)。java
原來的方案其實運行的挺好的,異步化Agent對應用性能基本沒有影響。支持咱們這個天天幾千萬PV的應用一點壓力都沒有。不過有個缺點就是若是錯誤日誌暴增,Log Server這塊處理不過來,會致使消息丟失。固然咱們量級沒有達到這個程度,並且也是能夠經過引入隊列緩衝一下處理。不過如今綜合考慮,其實直接使用消息隊列會更簡單。PRC,負載均衡,負載緩衝都內建實現了。另外一種方式是直接讀取日誌,相似於logstash或者flume的方式。不過考慮到靈活性仍是決定使用消息隊列的方式,反正咱們已經部署了Zookeeper。調研了一下,Kafka是最適合作這個數據中轉和緩衝的。因而,打算把方案改爲: Log Agent => Kafka => ElasticSearch => Kibana。node
magic > 0
時消息頭必須包含該字段。8個字節。${topicName}-{partitionId}
,如__consumer_offsets-0
。log.segment.bytes
指定,默認是1GB。時間長度則是根據log.roll.ms
或者log.roll.hours
配置項設置;當前活躍的日誌段稱之爲活躍段(activeSegment
)。.log
爲文件後綴名的消息集文件(FileMessageSet),用於保存消息實際數據BaseOffset
),左補0構成20位數字字符組成LEO+1
(第一個數據文件爲0).index
爲後綴名。它的目的是爲了快速根據偏移量定位到消息所在的位置。BaseOffset
爲key保存到一個ConcurrentSkipListMap
跳躍表中,這樣在查找指定偏移量的消息時,用二分查找法就能快速定位到消息所在的數據文件和索引文件index.interval.bytes
設置索引跨度。.timeindex
做爲後綴。它的做用則是爲了解決根據時間戳快速定位消息所在位置。offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
方法,該方法會返回時間戳大於等於待查詢時間的第一條消息對應的偏移量和時間戳。這個功能其實挺好用的,假設咱們但願從某個時間段開始消費,就能夠用offsetsForTimes()
方法定位到離這個時間最近的第一條消息的偏移量,而後調用seek(TopicPartition, long offset)
方法將消費者偏移量移動過去,而後調用poll()
方法長輪詢拉取消息。request.required.acks
: Kafka爲生產者提供了三種消息確認機制(ACK),用於配置broker接到消息後向生產者發送確認信息,以便生產者根據ACK進行相應的處理,該機制經過屬性request.required.acks
設置,取值能夠爲0, -1, 1,默認是1。
min.insync.replicas
設置,當同步副本數不足次配置項時,生產者會拋出異常。可是這種方式同時也影響了生產者發送消息的速度以及吞吐率。message.send.max.retries
: 生產者在放棄該消息前進行重試的次數,默認是3次。retry.backoff.ms
: 每次重試以前等待的時間,單位是ms,默認是100。queue.buffering.max.ms
: 在異步模式下,消息被緩存的最長時間,當到達該時間後消息被開始批量發送;若在異步模式下同時配置了緩存數據的最大值batch.num.messages
,則達到這兩個閾值的任何一個就會觸發消息批量發送。默認是1000ms。queue.buffering.max.messages
: 在異步模式下,能夠被緩存到隊列中的未發送的最大消息條數。默認是10000。queue.enqueue.timeout.ms
:
=0
: 表示當隊列沒滿時直接入隊,滿了則當即丟棄<0
: 表示無條件阻塞且不丟棄>0
: 表示阻塞達到該值時長拋出QueueFullException
異常batch.num.messages
: Kafka支持批量消息(Batch)向broker的特定分區發送消息,批量大小由屬性batch.num.messages
設置,表示每次批量發送消息的最大消息數,當生產者採用同步模式發送時改配置項將失效。默認是200。request.timeout.ms
: 在須要acks時,生產者等待broker應答的超時時間。默認是1500ms。send.buffer.bytes
: Socket發送緩衝區大小。默認是100kb。topic.metadata.refresh.interval.ms
: 生產者定時請求更新主題元數據的時間間隔。若設置爲0,則在每一個消息發送後都會去請求更新數據。默認是5min。client.id
: 生產者id,主要方便業務用來追蹤調用定位問題。默認是console-producer
。group.id
配置項指定,若不指定group name則默認爲test-consumer-group
。client.id
指定,若是不指定,Kafka會自動爲該消費者生成一個格式爲${groupId}-${hostName}-${timestamp}-${UUID前8個字符}
的全局惟一id。auto.commit.interval.ms
。
enable.auto.commit=true
auto.commit.interval.ms
enable.auto.commit=false
commitSync()
: 同步提交commitAsync()
: 異步提交broker.id
時,ZK會自動生成一個全局惟一的id。{broker.id}
的子節點TIPSpython
若是跟ES對應,Broker至關於Node,Topic至關於Index,Message相對於Document,而Partition至關於shard。LogSegment相對於ES的Segment。git
咱們在使用kafka的過程當中有時候能夠須要查看咱們生產的消息的各類信息,這些消息是存儲在kafka的日誌文件中的。因爲日誌文件的特殊格式,咱們是沒法直接查看日誌文件中的信息內容。Kafka提供了一個命令,能夠將二進制分段日誌文件轉儲爲字符類型的文件:github
$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments
Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.
Option Description
------ -----------
--deep-iteration 使用深迭代而不是淺迭代
--files <file1, file2, ...> 必填。輸入的日誌段文件,逗號分隔
--key-decoder-class 自定義key值反序列化器。必須實現`kafka.serializer.Decoder` trait。所在jar包須要放在`kafka/libs`目錄下。(默認是`kafka.serializer.StringDecoder`)。
--max-message-size <Integer: size> 消息最大的字節數(默認爲5242880)
--print-data-log 同時打印出日誌消息
--value-decoder-class 自定義value值反序列化器。必須實現`kafka.serializer.Decoder` trait。所在jar包須要放在`kafka/libs`目錄下。(默認是`kafka.serializer.StringDecoder`)。
--verify-index-only 只是驗證索引不打印索引內容複製代碼
$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log
Dumping /tmp/kafka-logs/test-0/00000000000000000000.log
Starting offset: 0
offset: 0 position: 0 CreateTime: 1498104812192 isvalid: true payloadsize: 11 magic: 1 compresscodec: NONE crc: 3271928089 payload: hello world
offset: 1 position: 45 CreateTime: 1498104813269 isvalid: true payloadsize: 14 magic: 1 compresscodec: NONE crc: 242183772 payload: hello everyone
複製代碼
注意:這裏
--print-data-log
是表示查看消息內容的,不加此項只能看到Header,看不到payload。
也能夠用來查看index文件:web
$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.index --print-data-log
Dumping /tmp/kafka-logs/test-0/00000000000000000000.index
offset: 0 position: 0
複製代碼
timeindex文件也是OK的:
$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.timeindex --print-data-log
Dumping /tmp/kafka-logs/test-0/00000000000000000000.timeindex
timestamp: 1498104813269 offset: 1
Found timestamp mismatch in :/tmp/kafka-logs/test-0/00000000000000000000.timeindex
Index timestamp: 0, log timestamp: 1498104812192
Found out of order timestamp in :/tmp/kafka-logs/test-0/00000000000000000000.timeindex
Index timestamp: 0, Previously indexed timestamp: 1498104813269
複製代碼
消費者平衡(Consumer Rebalance)是指的是消費者從新加入消費組,並從新分配分區給消費者的過程。在如下狀況下會引發消費者平衡操做:apache
新的消費者加入消費組bootstrap
當前消費者從消費組退出(無論是異常退出仍是正常關閉)瀏覽器
消費者取消對某個主題的訂閱
訂閱主題的分區增長(Kafka的分區數能夠動態增長可是不能減小)
broker宕機新的協調器當選
當消費者在${session.timeout.ms}時間內尚未發送心跳請求,組協調器認爲消費者已退出。
消費者自動平衡操做提供了消費者的高可用和高可擴展性,這樣當咱們增長或者減小消費者或者分區數的時候,不須要關心底層消費者和分區的分配關係。可是須要注意的是,在rebalancing過程當中,因爲須要給消費者從新分配分區,因此會出如今一個短暫時間內消費者不能拉取消息的情況。
NOTES
這裏要特別注意最後一種狀況,就是所謂的慢消費者(Slow Consumers)。若是沒有在session.timeout.ms時間內收到心跳請求,協調者能夠將慢消費者從組中移除。一般,若是消息處理比session.timeout.ms慢,就會成爲慢消費者。致使兩次poll()方法的調用間隔比session.timeout.ms時間長。因爲心跳只在 poll()調用時纔會發送(在0.10.1.0版本中, 客戶端心跳在後臺異步發送了),這就會致使協調者標記慢消費者死亡。
若是沒有在session.timeout.ms時間內收到心跳請求,協調者標記消費者死亡而且斷開和它的鏈接。同時,經過向組內其餘消費者的HeartbeatResponse中發送IllegalGeneration錯誤代碼 觸發rebalance操做。
在手動commit offset的模式下,要特別注意這個問題,不然會出現commit不上的狀況。致使一直在重複消費。
消息順序:保證每一個partition內部的順序,可是不保證跨partition的全局順序。若是須要全局消息有序,topic只能有一個partition。
consumer group:consumer group中的consumer併發獲取消息,可是爲了保證partition消息的順序性,每一個partition只會由一個consumer消費。所以consumer group中的consumer數量須要小於等於topic的partition個數。(如需全局消息有序,只能有一個partition,一個consumer)
同一Topic的一條消息只能被同一個Consumer Group內的一個Consumer消費,但多個Consumer Group可同時消費這一消息。這是Kafka用來實現一個Topic消息的廣播(發給全部的Consumer)和單播(發給某一個Consumer)的手段。一個Topic能夠對應多個Consumer Group。若是須要實現廣播,只要每一個Consumer有一個獨立的Group就能夠了。要實現單播只要全部的Consumer在同一個Group裏。
Producer Push消息,Client Pull消息模式:一些logging-centric system,好比Facebook的Scribe和Cloudera的Flume,採用push模式。事實上,push模式和pull模式各有優劣。push模式很難適應消費速率不一樣的消費者,由於消息發送速率是由broker決定的。push模式的目標是儘量以最快速度傳遞消息,可是這樣很容易形成Consumer來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而pull模式則能夠根據Consumer的消費能力以適當的速率消費消息。pull模式可簡化broker的設計,Consumer可自主控制消費消息的速率,同時Consumer能夠本身控制消費方式——便可批量消費也可逐條消費,同時還能選擇不一樣的提交方式從而實現不一樣的傳輸語義。
實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可使用Storm或Spark Streaming這種實時流處理系統對消息進行實時在線處理,同時使用Hadoop這種批處理系統進行離線處理,還能夠同時將數據實時備份到另外一個數據中心,只須要保證這三個操做所使用的Consumer屬於不一樣的Consumer Group便可。
Kafka在0.8之前的版本中,並不提供High Availablity機制,一旦一個或多個Broker宕機,則宕機期間其上全部Partition都沒法繼續提供服務。若該Broker永遠不能再恢復,亦或磁盤故障,則其上數據將丟失。而Kafka的設計目標之一便是提供數據持久化,同時對於分佈式系統來講,尤爲當集羣規模上升到必定程度後,一臺或者多臺機器宕機的可能性大大提升,對Failover要求很是高。所以,Kafka從0.8開始提供High Availability機制。主要表如今Data Replication和Leader Election兩方面。
Kafka從0.8開始提供partition級別的replication,replication的數量可在
$KAFKA_HOME/config/server.properties 中配置:
default.replication.factor = 1
複製代碼
該 Replication與leader election配合提供了自動的failover機制。replication對Kafka的吞吐率是有必定影響的,但極大的加強了可用性。默認狀況下,Kafka的replication數量爲1。每一個partition都有一個惟一的leader,全部的讀寫操做都在leader上完成,follower批量從leader上pull數據。通常狀況下partition的數量大於等於broker的數量,而且全部partition的leader均勻分佈在broker上。follower上的日誌和其leader上的徹底同樣。
須要注意的是,replication factor並不會影響consumer的吞吐率測試,由於consumer只會從每一個partition的leader讀數據,而與replicaiton factor無關。一樣,consumer吞吐率也與同步複製仍是異步複製無關。
引入Replication以後,同一個Partition可能會有多個副本(Replica),而這時須要在這些副本之間選出一個Leader,Producer和Consumer只與這個Leader副本交互,其它Replica做爲Follower從Leader中複製數據。注意,只有Leader負責數據讀寫,Follower只向Leader順序Fetch數據(N條通路),並不提供任何讀寫服務,系統更加簡單且高效。
思考 爲何follower副本不提供讀寫,只作冷備?
follwer副本不提供寫服務這個比較好理解,由於若是follower也提供寫服務的話,那麼就須要在全部的副本之間相互同步。n個副本就須要 nxn 條通路來同步數據,若是採用異步同步的話,數據的一致性和有序性是很難保證的;而採用同步方式進行數據同步的話,那麼寫入延遲實際上是放大n倍的,反而拔苗助長。
那麼爲何不讓follower副本提供讀服務,減小leader副本的讀壓力呢?這個除了由於同步延遲帶來的數據不一致以外,不一樣於其餘的存儲服務(如ES,MySQL),Kafka的讀取本質上是一個有序的消息消費,消費進度是依賴於一個叫作offset的偏移量,這個偏移量是要保存起來的。若是多個副本進行讀負載均衡,那麼這個偏移量就很差肯定了。
TIPS
Kafka的leader副本相似於ES的primary shard,follower副本相對於ES的replica。ES也是一個index有多個shard(相對於Kafka一個topic有多個partition),shard又分爲primary shard和replicition shard,其中primary shard用於提供讀寫服務(sharding方式跟MySQL很是相似:shard = hash(routing) % number_of_primary_shards。可是ES引入了協調節點(coordinating node) 的角色,實現對客戶端透明。),而replication shard只提供讀服務(這裏跟Kafka同樣,ES會等待relication shard返回成功才最終返回給client)。
有傳統MySQL分庫分表經驗的同窗必定會以爲這個過程是很是類似的,就是一個sharding + replication的數據架構,只是經過client(SDK)或者coordinator對你透明瞭而已。
Propagate消息
Producer在發佈消息到某個Partition時,先經過ZooKeeper找到該Partition的Leader,而後不管該Topic的Replication Factor爲多少(也即該Partition有多少個Replica),Producer只將該消息發送到該Partition的Leader。Leader會將該消息寫入其本地Log。每一個Follower都從Leader pull數據。這種方式上,Follower存儲的數據順序與Leader保持一致。Follower在收到該消息並寫入其Log後,向Leader發送ACK。一旦Leader收到了 ISR (in-sync replicas) 中的全部Replica的ACK,該消息就被認爲已經commit了,Leader將增長 HW( High-Watermark) 而且向Producer發送ACK。
爲了提升性能,每一個Follower在接收到數據後就立馬向Leader發送ACK,而非等到數據寫入Log中。所以,對於已經commit的消息,Kafka只能保證它被存於多個Replica的內存中,而不能保證它們被持久化到磁盤中,也就不能徹底保證異常發生後該條消息必定能被Consumer消費。但考慮到這種場景很是少見,能夠認爲這種方式在性能和數據持久化上作了一個比較好的平衡。在未來的版本中,Kafka會考慮提供更高的持久性。
Consumer讀消息也是從Leader讀取,只有被commit過的消息(offset低於HW的消息)纔會暴露給Consumer。
Kafka Replication的數據流以下圖所示:
關於這方面的內容比較多並且複雜,這裏就不展開了,這篇文章寫的很好,有興趣的同窗能夠學習
《 Kafka設計解析(二):Kafka High Availability (上)》。
下面這張圖很是簡單明瞭的顯示kafka的全部遊標
(rongxinblog.wordpress.com/2016/07/29/…):
下面簡單的說明一下:
In-Sync Replicas list,顧名思義,就是跟leader 「保存同步」 的Replicas。「保持同步」的含義有些複雜,在0.9版本,broker的參數replica.lag.time.max.ms用來指定ISR的定義,若是leader在這麼長時間沒收到follower的拉取請求,或者在這麼長時間內,follower沒有fetch到leader的log end offset,就會被leader從ISR中移除。ISR是個很重要的指標,controller選取partition的leader replica時會使用它,leader須要維護ISR列表,所以leader選取ISR後會把結果記到Zookeeper上。
在須要選舉leader的場景下,leader和ISR是由controller決定的。在選出leader之後,ISR是leader決定。若是誰是leader和ISR只存在於ZK上,那麼每一個broker都須要在Zookeeper上監聽它host的每一個partition的leader和ISR的變化,這樣效率比較低。若是不放在Zookeeper上,那麼當controller fail之後,須要從全部broker上從新得到這些信息,考慮到這個過程當中可能出現的問題,也不靠譜。因此leader和ISR的信息存在於Zookeeper上,可是在變動leader時,controller會先在Zookeeper上作出變動,而後再發送LeaderAndIsrRequest給相關的broker。這樣能夠在一個LeaderAndIsrRequest裏包括這個broker上有變更的全部partition,即batch一批變動新信息給broker,更有效率。另外,在leader變動ISR時,會先在Zookeeper上作出變動,而後再修改本地內存中的ISR。
Consumer最後提交的位置,這個位置會保存在一個特殊的topic:_consumer_offsets 中。
Consumer當前讀取的位置,可是尚未提交給broker。提交以後就變成Last Commit Offset。
這個offset是全部ISR的LEO的最小位置(minimum LEO across all the ISR of this partition),consumer不能讀取超過HW的消息,由於這意味着讀取到未徹底同步(所以沒有徹底備份)的消息。換句話說就是:HW是全部ISR中的節點都已經複製完的消息.也是消費者所能獲取到的消息的最大offset(注意,並非全部replica都必定有這些消息,而只是ISR裏的那些才確定會有)。
隨着follower的拉取進度的即時變化,HW是隨時在變化的。follower老是向leader請求本身已有messages的下一個offset開始的數據,所以當follower發出了一個fetch request,要求offset爲A以上的數據,leader就知道了這個follower的log end offset至少爲A。此時就能夠統計下ISR裏的全部replica的LEO是否已經大於了HW,若是是的話,就提升HW。同時,leader在fetch本地消息給follower時,也會在返回給follower的reponse裏附帶本身的HW。這樣follower也就知道了leader處的HW(可是在實現中,follower獲取的只是讀leader本地log時的HW,並不能保證是最新的HW)。可是leader和follower的HW是不一樣步的,follower處記的HW可能會落後於leader。
Hight Watermark Checkpoint
因爲HW是隨時變化的,若是即時更新到Zookeeper,會帶來效率的問題。而HW是如此重要,所以須要持久化,ReplicaManager就啓動了單獨的線程按期把全部的partition的HW的值記到文件中,即作highwatermark-checkpoint。
四、Log End Offset(LEO)
這個很好理解,就是當前的最新日誌寫入(或者同步)位置。
Kafka支持JVM語言(java、scala),同是也提供了高性能的C/C++客戶端,和基於librdkafka封裝的各類語言客戶端。如,Python客戶端: confluent-kafka-python 。Python客戶端還有純python實現的:kafka-python。
下面是Python例子(以confluent-kafka-python爲例):
Producer:
from confluent_kafka import Producer
p = Producer({'bootstrap.servers': 'mybroker,mybroker2'})
for data in some_data_source:
p.produce('mytopic', data.encode('utf-8'))
p.flush()
複製代碼
Consumer:
from confluent_kafka import Consumer, KafkaError
c = Consumer({'bootstrap.servers': 'mybroker', 'group.id': 'mygroup',
'default.topic.config': {'auto.offset.reset': 'smallest'}})
c.subscribe(['mytopic'])
running = True
while running:
msg = c.poll()
if not msg.error():
print('Received message: %s' % msg.value().decode('utf-8'))
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
c.close()
複製代碼
跟普通的消息隊列使用基本是同樣的。
kafka讀取消息實際上是基於offset來進行的,若是offset出錯,就可能出現重複讀取消息或者跳過未讀消息。在0.8.2以前,kafka是將offset保存在ZooKeeper中,可是咱們知道zk的寫操做是很昂貴的,並且不能線性拓展,頻繁的寫入zk會致使性能瓶頸。因此在0.8.2引入了Offset Management,將這個offset保存在一個 compacted kafka topic(_consumer_offsets),Consumer經過發送OffsetCommitRequest請求到指定broker(偏移量管理者)提交偏移量。這個請求中包含一系列分區以及在這些分區中的消費位置(偏移量)。偏移量管理者會追加鍵值(key-value)形式的消息到一個指定的topic(__consumer_offsets)。key是由consumerGroup-topic-partition組成的,而value是偏移量。同時爲了提供性能,內存中也會維護一份最近的記錄,這樣在指定key的狀況下能快速的給出OffsetFetchRequests而不用掃描所有偏移量topic日誌。若是偏移量管理者因某種緣由失敗,新的broker將會成爲偏移量管理者而且經過掃描偏移量topic來從新生成偏移量緩存。
0.9版本以前的Kafka提供了kafka-consumer-offset-checker.sh腳本,能夠用來查看某個消費組對一個或者多個topic的消費者消費偏移量狀況,該腳本調用的是
kafka.tools.Consumer.OffsetChecker。0.9版本以後已再也不建議使用該腳本了,而是建議使用kafka-consumer-groups.sh腳本,該腳本調用的是kafka.admin.ConsumerGroupCommand。這個腳本實際上是對消費組進行管理,不僅是查看消費組的偏移量。這裏只介紹最新的kafka-consumer-groups.sh腳本使用。
用ConsumerGroupCommand工具,咱們可使用list,describe,或delete消費者組。
例如,要列出全部主題中的全部消費組信息,使用list參數:
$ bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list
test-consumer-group
複製代碼
要查看某個消費組當前的消費偏移量則使用describe參數:
$ bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe --group test-consumer-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
test-consumer-group test-foo 0 1 3 2 consumer-1_/127.0.0.1
複製代碼
NOTES
該腳本只支持刪除不包括任何消費組的消費組,並且只能刪除消費組爲老版本消費者對應的消費組(即分組元數據存儲在zookeeper的纔有效),由於這個腳本刪除操做的本質就是刪除ZK中對應消費組的節點及其子節點而已。
上面介紹了經過腳本工具方式查詢Kafka消費偏移量。事實上,咱們也能夠經過API的方式查詢消費偏移量。
Kafka消費者API提供了兩個方法用於查詢消費者消費偏移量的操做:
committed(TopicPartition partition): 該方法返回一個OffsetAndMetadata對象,經過它能夠獲取指定分區已提交的偏移量。
position(TopicPartition partition): 該方法返回下一次拉取位置的position。
除了查看消費偏移量,有些時候咱們須要人爲的指定offset,好比跳過某些消息,或者redo某些消息。在0.8.2以前,offset是存放在ZK中,只要用ZKCli操做ZK就能夠了。可是在0.8.2以後,offset默認是存放在kafka的__consumer_offsets隊列中,只能經過API修改了:
Class KafkaConsumer<K,V> Kafka allows specifying the position using seek(TopicPartition, long) to specify the new position. Special methods for seeking to the earliest and latest offset the server maintains are also available (seekToBeginning(TopicPartition…) and seekToEnd(TopicPartition…) respectively).
參考文檔: Kafka Consumer Offset Management
Kafka消費者API提供了重置消費偏移量的方法:
seek(TopicPartition partition, long offset): 該方法用於將消費起始位置重置到指定的偏移量位置。
seekToBeginning(): 從消息起始位置開始消費,對應偏移量重置策略
auto.offset.reset=earliest。
seekToEnd(): 從最新消息對應的位置開始消費,也就是說等待新的消息寫入後纔開始拉取,對應偏移量重置策略是
auto.offset.reset=latest。
固然前提你得知道要重置的offset的位置。一種方式就是根據時間戳獲取對應的offset。再seek過去。
Kafka是用Scala寫的,因此只要安裝了JRE環境,運行很是簡單。直接下載官方編譯好的包,解壓配置一下就能夠直接運行了。
配置文件在config目錄下的server.properties,關鍵配置以下(有些屬性配置文件中默認沒有,需本身添加):
broker.id:Kafka集羣中每臺機器(稱爲broker)須要獨立不重的id
port:監聽端口
delete.topic.enable:設爲true則容許刪除topic,不然不容許
message.max.bytes:容許的最大消息大小,默認是1000012(1M),建議調到到10000012(10M)。
replica.fetch.max.bytes: 同上,默認是1048576,建議調到到10048576。
log.dirs:Kafka數據文件的存放目錄,注意不是日誌文件。能夠配置爲:/home/work/kafka/data/kafka-logs
log.cleanup.policy:過時數據清除策略,默認爲delete,還可設爲compact
log.retention.hours:數據過時時間(小時數),默認是1073741824,即一週。過時數據用log.cleanup.policy的規則清除。能夠用log.retention.minutes配置到分鐘級別。
log.segment.bytes:數據文件切分大小,默認是1073741824(1G)。
retention.check.interval.ms:清理線程檢查數據是否過時的間隔,單位爲ms,默認是300000,即5分鐘。
zookeeper.connect:負責管理Kafka的zookeeper集羣的機器名:端口號,多個用逗號分隔
複製代碼
TIPS 發送和接收大消息
須要修改以下參數:
broker:message.max.bytes
& replica.fetch.max.bytes
consumer:fetch.message.max.bytes
更多參數的詳細說明見官方文檔:
而後先確保ZK已經正確配置和啓動了。Kafka自帶ZK服務,配置文件在config/zookeeper.properties文件,關鍵配置以下:
dataDir=/home/work/kafka/data/zookeeper
clientPort=2181
maxClientCnxns=0
tickTime=2000
initLimit=10
syncLimit=5
server.1=nj03-bdg-kg-offline-01.nj03:2888:3888
server.2=nj03-bdg-kg-offline-02.nj03:2888:3888
server.3=nj03-bdg-kg-offline-03.nj03:2888:3888
複製代碼
NOTES Zookeeper集羣部署
ZK的集羣部署要作兩件事情:
分配serverId: 在dataDir目錄下建立一個myid文件,文件中只包含一個1到255的數字,這就是ZK的serverId。
配置集羣:格式爲server.{id}={host}:{port}:{port},其中{id}就是上面提到的ZK的serverId。
而後啓動:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties。
而後能夠啓動Kafka:JMX_PORT=8999 bin/kafka-server-start.sh -daemon config/server.properties,很是簡單。
TIPS
咱們在啓動命令中增長了JMX_PORT=8999環境變量,這樣能夠暴露JMX監控項,方便監控。
不過不像RabbitMQ,或者ActiveMQ,Kafka默認並無web管理界面,只有命令行語句,不是很方便,不過能夠安裝一個,好比,Yahoo的 Kafka Manager: A tool for managing Apache Kafka。它支持不少功能:
Manage multiple clusters
Easy inspection of cluster state (topics, consumers, offsets, brokers, replica distribution, partition distribution)
Run preferred replica election
Generate partition assignments with option to select brokers to use
Run reassignment of partition (based on generated assignments)
Create a topic with optional topic configs (0.8.1.1 has different configs than 0.8.2+)
Delete topic (only supported on 0.8.2+ and remember set delete.topic.enable=true in broker config)
Topic list now indicates topics marked for deletion (only supported on 0.8.2+)
Batch generate partition assignments for multiple topics with option to select brokers to use
Batch run reassignment of partition for multiple topics
Add partitions to existing topic
Update config for existing topic
Optionally enable JMX polling for broker level and topic level metrics.
Optionally filter out consumers that do not have ids/ owners/ & offsets/ directories in zookeeper.
安裝過程蠻簡單的,就是要下載不少東東,會好久。具體參見: kafka manager安裝。不過這些管理平臺都沒有權限管理功能。
須要注意的是,Kafka Manager的conf/application.conf配置文件裏面配置的kafka-manager.zkhosts是爲了它自身的高可用,而不是指向要管理的Kafka集羣指向的zkhosts。因此不要忘記了手動配置要管理的Kafka集羣信息(主要是配置名稱,和zk地址)。Install and Evaluation of Yahoo’s Kafka Manager。
Kafka Manager主要是提供管理界面,監控的話還要依賴於其餘的應用,好比:
Burrow: Kafka Consumer Lag Checking. Linkedin開源的cusumer log監控,go語言編寫,貌似沒有界面,只有HTTP API,能夠配置郵件報警。
Kafka Offset Monitor: A little app to monitor the progress of kafka consumers and their lag wrt the queue.
這兩個應用的目的都是監控Kafka的offset。
刪除Kafka主題,通常有以下兩種方式:
一、手動刪除各個節點${log.dir}目錄下該主題分區文件夾,同時登錄ZK客戶端刪除待刪除主題對應的節點,主題元數據保存在/brokers/topics和/config/topics節點下。
二、執行kafka-topics.sh腳本執行刪除,若但願經過該腳本完全刪除主題,則須要保證在啓動Kafka時加載的server.properties文件中配置 delete.topic.enable=true,該配置項默認爲false。不然執行該腳本並未真正刪除topic,而是在ZK的/admin/delete_topics目錄下建立一個與該待刪除主題同名的topic,將該主題標記爲刪除狀態而已。
kafka-topic –delete –zookeeper server-1:2181,server-2:2181 –topic test`
執行結果:
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
複製代碼
此時若但願可以完全刪除topic,則須要經過手動刪除相應文件及節點。當該配置項爲true時,則會將該主題對應的全部文件目錄以及元數據信息刪除。
對於傳統的message queue而言,通常會刪除已經被消費的消息,而Kafka集羣會保留全部的消息,不管其被消費與否。固然,由於磁盤限制,不可能永久保留全部數據(實際上也不必),所以Kafka提供兩種策略去刪除舊數據。一是基於時間,二是基於partition文件大小。能夠經過配置$KAFKA_HOME/config/server.properties ,讓Kafka刪除一週前的數據,也可經過配置讓Kafka在partition文件超過1GB時刪除舊數據:
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# By default the log cleaner is disabled and the log retention policy will default to
# just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs
# can then be marked for log compaction.
log.cleaner.enable=false
複製代碼
這裏要注意,由於Kafka讀取特定消息的時間複雜度爲O(1),即與文件大小無關,因此這裏刪除文件與Kafka性能無關,選擇怎樣的刪除策略只與磁盤以及具體的需求有關。
一、只保證單個主題單個分區內的消息有序,可是不能保證單個主題全部分區消息有序。若是應用嚴格要求消息有序,那麼kafka可能不大合適。
二、消費偏移量由消費者跟蹤和提交,可是消費者並不會常常把這個偏移量寫會kafka,由於broker維護這些更新的代價很大,這會致使異常狀況下消息可能會被屢次消費或者沒有消費。
具體分析以下:消息可能已經被消費了,可是消費者尚未像broker提交偏移量(commit offset)確認該消息已經被消費就掛掉了,接着另外一個消費者又開始處理同一個分區,那麼它會從上一個已提交偏移量開始,致使有些消息被重複消費。可是反過來,若是消費者在批處理消息以前就先提交偏移量,可是在處理消息的時候掛掉了,那麼這部分消息就至關於『丟失』了。一般來講,處理消息和提交偏移量很難構成一個原子性操做,所以沒法老是保證全部消息都恰好只被處理一次。
三、主題和分區的數目有限
Kafka集羣可以處理的主題數目是有限的,達到1000個主題左右時,性能就開始降低。這些問題基本上都跟Kafka的基本實現決策有關。特別是,隨着主題數目增長,broker上的隨機IO量急劇增長,由於每一個主題分區的寫操做實際上都是一個單獨的文件追加(append)操做。隨着分區數目增長,問題愈來愈嚴重。若是Kafka不接管IO調度,問題就很難解決。
固然,通常的應用都不會有這麼大的主題數和分區數要求。可是若是將單個Kafka集羣做爲多租戶資源,這個時候這個問題就會暴露出來。
四、手動均衡分區負載
Kafka的模型很是簡單,一個主題分區所有保存在一個broker上,可能還有若干個broker做爲該分區的副本(replica)。同一分區不在多臺機器之間分割存儲。隨着分區不斷增長,集羣中有的機器運氣很差,會正好被分配幾個大分區。Kafka沒有自動遷移這些分區的機制,所以你不得不本身來。監控磁盤空間,診斷引發問題的是哪一個分區,而後肯定一個合適的地方遷移分區,這些都是手動管理型任務,在Kafka集羣環境中不容忽視。
若是集羣規模比較小,數據所需的空間較小,這種管理方式還勉強奏效。可是,若是流量迅速增長或者沒有一流的系統管理員,那麼狀況就徹底沒法控制。
注意:若是向集羣添加新的節點,也必須手動將數據遷移到這些新的節點上,Kafka不會自動遷移分區以平衡負載量或存儲空間的。
五、follow副本(replica)只充當冷備(解決HA問題),沒法提供讀服務
不像ES,replica shard是同時提供讀服務,以緩解master的讀壓力。kafka由於讀服務是有狀態的(要維護commited offset),因此follow副本並無參與到讀寫服務中。只是做爲一個冷備,解決單點問題。
六、只能順序消費消息,不能隨機定位消息,出問題的時候不方便快速定位問題
這實際上是全部以消息系統做爲異步RPC的通用問題。假設發送方發了一條消息,可是消費者說我沒有收到,那麼怎麼排查呢?消息隊列缺乏隨機訪問消息的機制,如根據消息的key獲取消息。這就致使排查這種問題不大容易。
更多內容敬請關注 vivo 互聯網技術 微信公衆號
注:轉載文章請先與微信號:labs2020 聯繫。