實踐部署與使用apache kafka框架技術博文資料彙總

前一篇Kafka框架設計來自英文原文(Kafka Architecture Design)的翻譯及整理文章,很是有借鑑性,本文是從一個企業使用Kafka框架的角度來記錄及整理的Kafka框架的技術資料,也很是有借鑑價值,爲了便於閱讀與分享,我將其整理一篇Blog。php

本文內容文件夾摘要例如如下:css

1)apache kafka消息服務html

2)kafka在zookeeper中存儲結構

3)kafka log4j配置前端

4)kafka replication設計機制

5)apache kafka監控系列-監控指標java

6)kafka.common.ConsumerRebalanceFailedException異常解決的方法

7)kafak安裝與使用node

8)apache kafka中server.properties配置文件參數說明

9)apache kafka的consumer初始化時獲取不到消息python

10)Kafka Producer處理邏輯

11)apache kafka源碼project環境搭建(IDEA)mysql

12)apache kafka監控系列-KafkaOffsetMonitornginx

13)Kafka Controller設計機制c++

14)Kafka性能測試報告(虛擬機版)

15)apache kafka監控系列-kafka-web-console

16)apache kafka遷移與擴容工具使用方法

17)kafka LeaderNotAvailableException

18)apache kafka jmx監控指標參數

19)apache kafka性能測試命令使用和構建kafka-perf

20)apache kafka源代碼構建打包

21)Apache kafkaclient開發-java

22) kafka broker內部架構

23)apache kafka源代碼分析走讀-kafka整體結構分析

24)apache kafka源代碼分析走讀-Producer分析

25)apache kafka性能優化架構分析

26)apache kafka源代碼分析走讀-server端網絡架構分析

27)apache kafka源代碼分析走讀-ZookeeperConsumerConnector分析

28)kafka的ZkUtils類的java版本號部分代碼

29)kafka & mafka client開發與實踐

30)   kafka文件系統設計那些事

31)kafka的ZookeeperConsumer實現


具體內容例如如下所看到的:

1)apache kafka消息服務

apache kafka參考

http://kafka.apache.org/documentation.html

消息隊列分類:

 點對點:

消息生產者生產消息發送到queue中。而後消息消費者從queue中取出並且消費消息。

這裏要注意:

  • 消息被消費之後。queue中再也不有存儲。因此消息消費者不可能消費到已經被消費的消息。
  • Queue支持存在多個消費者,但是對一個消息而言。僅僅會有一個消費者可以消費。

公佈/訂閱

消息生產者(公佈)將消息公佈到topic中,同一時候有多個消息消費者(訂閱)消費該消息。

和點對點方式不一樣。公佈到topic的消息會被所有訂閱者消費。

kafka消息隊列調研

背景介紹

kafka是最初由Linkedin公司開發,使用Scala語言編寫,Kafka是一個分佈式、分區的、多副本的、多訂閱者的日誌系統(分佈式MQ系統),可以用於web/nginx日誌。搜索日誌,監控日誌,訪問日誌等等。

kafka眼下支持多種client語言:java,python,c++。php等等。

總體結構:


kafka名詞解釋和工做方式:

  • Producer :消息生產者,就是向kafka broker發消息的client。
  • Consumer :消息消費者,向kafka broker取消息的client
  • Topic :咋們可以理解爲一個隊列。
  • Consumer Group (CG):這是kafka用來實現一個topic消息的廣播(發給所有的consumer)和單播(發給隨意一個consumer)的手段。一個topic可以有多個CG。topic的消息會複製(不是真的複製,是概念上的)到所有的CG,但每個CG僅僅會把消息發給該CG中的一個consumer。

    假設需要實現廣播,僅僅要每個consumer有一個獨立的CG就可以了。要實現單播僅僅要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要屢次發送消息到不一樣的topic。

  • Broker :一臺kafkaserver就是一個broker。一個集羣由多個broker組成。

    一個broker可以容納多個topic。

  • Partition:爲了實現擴展性。一個很大的topic可以分佈到多個broker(即server)上,一個topic可以分爲多個partition。每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。kafka僅僅保證按一個partition中的順序將消息發給consumer。不保證一個topic的整體(多個partition間)的順序。
  •  Offset:kafka的存儲文件都是依照offset.kafka來命名,用offset作名字的優勢是方便查找。好比你想找位於2049的位置,僅僅要找到2048.kafka的文件就能夠。固然the first offset就是00000000000.kafka

 kafka特性:

  • 經過O(1)的磁盤數據結構提供消息的持久化,這樣的結構對於即便數以TB的消息存儲也能夠保持長時間的穩定性能。
  • 高吞吐量:即便是很普通的硬件kafka也能夠支持每秒數十萬的消息。
  • 支持同步和異步複製兩種HA
  • Consumerclientpull,隨機讀,利用sendfile系統調用,zero-copy ,批量拉數據
  • 消費狀態保存在client
  • 消息存儲順序寫
  • 數據遷移、擴容對用戶透明
  • 支持Hadoop並行數據載入。

  • 支持online和offline的場景。
  • 持久化:經過將數據持久化到硬盤以及replication防止數據丟失。
  • scale out:無需停機就能夠擴展機器。

  • 按期刪除機制,支持設定partitions的segment file保留時間。

可靠性(一致性)

kafka(MQ)要實現從producer到consumer之間的可靠的消息傳送和分發。

傳統的MQ系統一般都是經過broker和consumer間的確認(ack)機制實現的。並在broker保存消息分發的狀態。

即便這樣一致性也是很是難保證的(參考原文)。kafka的作法是由consumer本身保存狀態,也不要不論什麼確認。這樣儘管consumer負擔更重。但事實上更靈活了。

因爲不管consumer上不論什麼緣由致使需要又一次處理消息,都可以再次從broker得到。

kafak系統擴展性

kafka使用zookeeper來實現動態的集羣擴展,不需要更改client(producer和consumer)的配置。broker會在zookeeper註冊並保持相關的元數據(topic,partition信息等)更新。

而client會在zookeeper上註冊相關的watcher。

一旦zookeeper發生變化。client能及時感知並做出對應調整。

這樣就保證了加入或去除broker時,各broker間仍能本身主動實現負載均衡。

kafka設計目標

高吞吐量是其核心設計之中的一個。

  • 數據磁盤持久化:消息不在內存中cache,直接寫入到磁盤。充分利用磁盤的順序讀寫性能。
  • zero-copy:下降IO操做步驟。

  • 支持數據批量發送和拉取。
  • 支持數據壓縮。
  • Topic劃分爲多個partition,提升並行處理能力。

Producer負載均衡和HA機制

  • producer依據用戶指定的算法,將消息發送到指定的partition。

  • 存在多個partiiton,每個partition有本身的replica,每個replica分佈在不一樣的Broker節點上。
  • 多個partition需要選取出lead partition。lead partition負責讀寫,並由zookeeper負責fail over。
  • 經過zookeeper管理broker與consumer的動態增長與離開。


Consumer的pull機制

由於kafka broker會持久化數據,broker沒有cahce壓力。所以。consumer比較適合採取pull的方式消費數據,詳細特別例如如下:

  • 簡化kafka設計,減小了難度。

  • Consumer依據消費能力自主控制消息拉取速度。
  • consumer依據自身狀況自主選擇消費模式,好比批量。反覆消費。從制定partition或位置(offset)開始消費等.

Consumer與topic關係以及機制

本質上kafka僅僅支持Topic.每個consumer屬於一個consumer group;反過來講,每個group中可以有多個consumer.對於Topic中的一條特定的消息,
僅僅會被訂閱此Topic的每個group中的一個consumer消費,此消息不會發送給一個group的多個consumer;那麼一個group中所有的consumer將會交錯的消費整個Topic.
假設所有的consumer都具備一樣的group,這樣的狀況和JMS queue模式很是像;消息將會在consumers之間負載均衡.
假設所有的consumer都具備不一樣的group,那這就是"公佈-訂閱";消息將會廣播給所有的消費者.

在kafka中,一個partition中的消息僅僅會被group中的一個consumer消費(同一時刻);每個group中consumer消息消費互相獨立;咱們可以以爲一個group是一個"訂閱"者,

一個Topic中的每個partions,僅僅會被一個"訂閱者"中的一個consumer消費,只是一個consumer可以同一時候消費多個partitions中的消息.

kafka僅僅能保證一個partition中的消息被某個consumer消費時是順序的.其實,從Topic角度來講,當有多個partitions時,消息仍不是全局有序的.

 

一般狀況下,一個group中會包括多個consumer,這樣不只可以提升topic中消息的併發消費能力,而且還能提升"故障容錯"性,假設group中的某個consumer失效,

那麼其消費的partitions將會有其它consumer本身主動接管.kafka的設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同一時候消費,

不然將意味着某些consumer將沒法獲得消息.

Producer均衡算法

kafka集羣中的不論什麼一個broker,都可以向producer提供metadata信息,這些metadata中包括"集羣中存活的servers列表"/"partitions leader列表"
等信息(請參看zookeeper中的節點信息).當producer獲取到metadata信心以後, producer將會和Topic下所有partition leader保持socket鏈接;
消息由producer直接經過socket發送到broker,中間不會通過不論什麼"路由層".其實,消息被路由到哪一個partition上,有producerclient決定.
比方可以採用"random""key-hash""輪詢"等,假設一個topic中有多個partitions,那麼在producer端實現"消息均衡分發"是必要的.
在producer端的配置文件裏,開發人員可以指定partition路由的方式.

Consumer均衡算法

當一個group中,有consumer增長或者離開時,會觸發partitions均衡.均衡的終於目的,是提高topic的併發消費能力.
1) 假如topic1,具備例如如下partitions: P0,P1,P2,P3
2) 增長group中,有例如如下consumer: C0,C1
3) 首先依據partition索引號對partitions排序: P0,P1,P2,P3
4) 依據consumer.id排序: C0,C1
5) 計算倍數: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
6) 而後依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]

kafka broker集羣內broker之間replica機制

kafka中,replication策略是基於partition,而不是topic;kafka將每個partition數據拷貝到多個server上,不論什麼一個partition有一個leader和多個follower(可以沒有);

備份的個數可以經過broker配置文件來設定.leader處理所有的read-write請求,follower需要和leader保持同步.Follower就像一個"consumer",

消費消息並保存在本地日誌中;leader負責跟蹤所有的follower狀態,假設follower"落後"太多或者失效,leader將會把它從replicas同步列表中刪除.

當所有的follower都將一條消息保存成功,此消息才被以爲是"committed",那麼此時consumer才幹消費它,這樣的同步策略,就要求follower和leader之間必須具備良好的網絡環境.

即便僅僅有一個replicas實例存活,仍然可以保證消息的正常發送和接收,僅僅要zookeeper集羣存活就能夠.(備註:不一樣於其它分佈式存儲,比方hbase需要"多數派"存活才行)

kafka斷定一個follower存活與否的條件有2個:

1) follower需要和zookeeper保持良好的連接    

2) 它必須能夠及時的跟進leader,不能落後太多.

假設同一時候知足上述2個條件,那麼leader就以爲此follower是"活躍的".假設一個follower失效(server失效)或者落後太多,

leader將會把它從同步列表中移除[備註:假設此replicas落後太多,它將會繼續從leader中fetch數據,直到足夠up-to-date,

而後再次增長到同步列表中;kafka不會更換replicas宿主!因爲"同步列表"中replicas需要足夠快,這樣才幹保證producer公佈消息時接受到ACK的延遲較小。

當leader失效時,需在followers中選取出新的leader,可能此時follower落後於leader,所以需要選擇一個"up-to-date"的follower.kafka中leader選舉並無採用"投票多數派"的算法,

因爲這樣的算法對於"網絡穩定性"/"投票參與者數量"等條件有較高的要求,而且kafka集羣的設計,還需要容忍N-1個replicas失效.對於kafka而言,

每個partition中所有的replicas信息都可以在zookeeper中得到,那麼選舉leader將是一件很easy的事情.選擇follower時需要兼顧一個問題,

就是新leader server上所已經承載的partition leader的個數,假設一個server上有過多的partition leader,意味着此server將承受着不少其它的IO壓力.

在選舉新leader,需要考慮到"負載均衡",partition leader較少的broker將會更有可能成爲新的leader.

在整幾個集羣中,僅僅要有一個replicas存活,那麼此partition都可以繼續接受讀寫操做.

總結: 

    1) Producer端直接鏈接broker.list列表,從列表中返回TopicMetadataResponse,該Metadata包括Topic下每個partition leader創建socket鏈接併發送消息.

    2) Broker端使用zookeeper用來註冊broker信息,以及監控partition leader存活性.

    3) Consumer端使用zookeeper用來註冊consumer信息,當中包含consumer消費的partition列表等,同一時候也用來發現broker列表,並和partition leader創建socket鏈接,並獲取消息.

性能測試

眼下我已經在虛擬機上作了性能測試。

測試環境:cpu: 雙核   內存 :2GB   硬盤:60GB 

測試指標
性能相關說明
結論
消息堆積壓力測試

單個kafka broker節點測試。啓動一個kafka broker和Producer,Producer不斷向broker發送數據。

直到broker堆積數據爲18GB爲止(中止Producer執行)。啓動Consumer,不間斷從broker獲取數據,

直到全部數據讀取完畢爲止。最後查看Producer==Consumer數據,沒有出現卡死或broker不響應現象

數據大量堆積不會出現broker卡死

或不響應現象

生產者速率 1.200byte/msg,4w/s左右。

2.1KB/msg,1w/s左右

性能上是全然知足要求,其性能主要由磁盤決定
消費者速率 1.200byte/msg,4w/s左右。2.1KB/msg,1w/s左右 性能上是全然知足要求。其性能主要由磁盤決定

2)kafka在zookeeper中存儲結構

1.topic註冊信息

/brokers/topics/[topic] :

存儲某個topic的partitions所有分配信息

Schema:
{
    "version": "版本號編號眼下固定爲數字1",
    "partitions": {
        "partitionId編號": [
            同步副本組brokerId列表
        ],
        "partitionId編號": [
            同步副本組brokerId列表
        ],
        .......
    }
}
Example:
{
"version": 1,
"partitions": {
"0": [1, 2],
"1": [2, 1],
"2": [1, 2],
}
}

說明:紫紅色爲patitions編號,藍色爲同步副本組brokerId列表

2.partition狀態信息

/brokers/topics/[topic]/partitions/[0...N]  當中[0..N]表示partition索引號

/brokers/topics/[topic]/partitions/[partitionId]/state

Schema:
{
"controller_epoch": 表示kafka集羣中的中央控制器選舉次數,
"leader": 表示該partition選舉leader的brokerId,
"version": 版本號編號默以爲1,
"leader_epoch": 該partition leader選舉次數,
"isr": [同步副本組brokerId列表]
}
 
Example:
{
"controller_epoch": 1,
"leader": 2,
"version": 1,
"leader_epoch": 0,
"isr": [2, 1]

}

3. Broker註冊信息

/brokers/ids/[0...N]                 

每個broker的配置文件裏都需要指定一個數字類型的id(全局不可反覆),此節點爲暫時znode(EPHEMERAL)

Schema:
{
"jmx_port": jmx端口號,
"timestamp": kafka broker初始啓動時的時間戳,
"host": 主機名或ip地址,
"version": 版本號編號默以爲1,
"port": kafka broker的服務端端口號,由server.properties中參數port肯定

}
 
Example:
{
"jmx_port": 6061,
"timestamp":"1403061899859"
"version": 1,
"host": "192.168.1.148",
"port": 9092

}

4. Controller epoch: 

/controller_epoch -> int (epoch)   

此值爲一個數字,kafka集羣中第一個broker第一次啓動時爲1,之後僅僅要集羣中center controller中央控制器所在broker變動或掛掉,就會又一次選舉新的center controller,每次center controller變動controller_epoch值就會 + 1; 

5. Controller註冊信息:

/controller -> int (broker id of the controller)  存儲center controller中央控制器所在kafka broker的信息

Schema:
{
"version": 版本號編號默以爲1,
"brokerid": kafka集羣中broker惟一編號,
"timestamp": kafka broker中央控制器變動時的時間戳

}
 
Example:
{
"version": 1,
"brokerid": 3,
"timestamp": "1403061802981"
}


Consumer and Consumer group概念: 
a.每個consumerclient被建立時,會向zookeeper註冊本身的信息;
b.此做用主要是爲了"負載均衡".
c.同一個Consumer Group中的Consumers,Kafka將對應Topic中的每個消息僅僅發送給當中一個Consumer。
d.Consumer Group中的每個Consumer讀取Topic的一個或多個Partitions,並且是惟一的Consumer;
e.一個Consumer group的多個consumer的所有線程依次有序地消費一個topic的所有partitions,假設Consumer group中所有consumer總線程大於partitions數量,則會出現空暇狀況;
舉例說明:
kafka集羣中建立一個topic爲report-log   4 partitions 索引編號爲0,1,2,3
假若有眼下有三個消費者node:注意-->一個consumer中一個消費線程可以消費一個或多個partition.
假設每個consumer建立一個consumer thread線程,各個node消費狀況例如如下,node1消費索引編號爲0,1分區,node2費索引編號爲2,node3費索引編號爲3
假設每個consumer建立2個consumer thread線程,各個node消費狀況例如如下(是從consumer node前後啓動狀態來肯定的),node1消費索引編號爲0,1分區。node2費索引編號爲2,3;node3爲空暇狀態
總結
從以上可知。Consumer Group中各個consumer是依據前後啓動的順序有序消費一個topic的所有partitions的。

假設Consumer Group中所有consumer的總線程數大於partitions數量。則可能consumer thread或consumer會出現空暇狀態。

Consumer均衡算法
當一個group中,有consumer增長或者離開時,會觸發partitions均衡.均衡的終於目的,是提高topic的併發消費能力.
1) 假如topic1,具備例如如下partitions: P0,P1,P2,P3
2) 增長group中,有例如如下consumer: C0,C1
3) 首先依據partition索引號對partitions排序: P0,P1,P2,P3
4) 依據(consumer.id + '-'+ thread序號)排序: C0,C1
5) 計算倍數: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
6) 而後依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]

6. Consumer註冊信息:

每個consumer都有一個惟一的ID(consumerId可以經過配置文件指定,也可以由系統生成),此id用來標記消費者信息.

/consumers/[groupId]/ids/[consumerIdString]

是一個暫時的znode,此節點的值爲請看consumerIdString產生規則,即表示此consumer眼下所消費的topic + partitions列表.

consumerId產生規則:

   StringconsumerUuid = null;
    if(config.consumerId!=null && config.consumerId)
      consumerUuid = consumerId;
    else {
      String uuid = UUID.randomUUID()
      consumerUuid = "%s-%d-%s".format(
        InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
        uuid.getMostSignificantBits().toHexString.substring(0,8));

     }
     String consumerIdString = config.groupId + "_" + consumerUuid;

Schema:
{
"version": 版本號編號默以爲1,
"subscription": { //訂閱topic列表
"topic名稱": consumer中topic消費者線程數
},
"pattern": "static",
"timestamp": "consumer啓動時的時間戳"

}
 
Example:
{
"version": 1,
"subscription": {
"open_platform_opt_push_plus1": 5
},
"pattern": "static",
"timestamp": "1411294187842"

}

 

7. Consumer owner:

/consumers/[groupId]/owners/[topic]/[partitionId] -> consumerIdString + threadId索引編號

當consumer啓動時,所觸發的操做:

a) 首先進行"Consumer Id註冊";

b) 而後在"Consumer id 註冊"節點下注冊一個watch用來監聽當前group中其它consumer的"退出"和"增長";僅僅要此znode path下節點列表變動,都會觸發此group下consumer的負載均衡.(比方一個consumer失效,那麼其它consumer接管partitions).

c) 在"Broker id 註冊"節點下,註冊一個watch用來監聽broker的存活狀況;假設broker列表變動,將會觸發所有的groups下的consumer又一次balance.

8. Consumer offset:

/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)

用來跟蹤每個consumer眼下所消費的partition中最大的offset

此znode爲持久節點,可以看出offset跟group_id有關,以代表當消費者組(consumer group)中一個消費者失效,

又一次觸發balance,其它consumer可以繼續消費.

9. Re-assign partitions

/admin/reassign_partitions

{
    "fields":[
      {
         "name":"version",
         "type":"int",
         "doc":"version id"
      },
      {
         "name":"partitions",
         "type":{
            "type":"array",
            "items":{
               "fields":[
                  {
                     "name":"topic",
                     "type":"string",
                     "doc":"topic of the partition to be reassigned"
                  },
                  {
                     "name":"partition",
                     "type":"int",
                     "doc":"the partition to be reassigned"
                  },
                  {
                     "name":"replicas",
                     "type":"array",
                     "items":"int",
                     "doc":"a list of replica ids"
                  }
               ],
            }
            "doc":"an array of partitions to be reassigned to new replicas"
         }
      }
   ]
}
 
Example:
{
  "version"1,
  "partitions":
     [
        {
            "topic""Foo",
            "partition"1,
            "replicas": [013]
        }
     ]            
}

 

10. Preferred replication election

/admin/preferred_replica_election

 

{
   "fields":[
      {
         "name":"version",
         "type":"int",
         "doc":"version id"
      },
      {
         "name":"partitions",
         "type":{
            "type":"array",
            "items":{
               "fields":[
                  {
                     "name":"topic",
                     "type":"string",
                     "doc":"topic of the partition for which preferred replica election should be triggered"
                  },
                  {
                     "name":"partition",
                     "type":"int",
                     "doc":"the partition for which preferred replica election should be triggered"
                  }
               ],
            }
            "doc":"an array of partitions for which preferred replica election should be triggered"
         }
      }
   ]
}
 
樣例:
 
{
  "version"1,
  "partitions":
     [
        {
            "topic""Foo",
            "partition"1         
        },
        {
            "topic""Bar",
            "partition"0         
        }
     ]            
}

 

11. 刪除topics
/admin/delete_topics

Schema:
"fields":
    [ {"name""version""type""int""doc""version id"},
      {"name""topics",
       "type": { "type""array""items""string""doc""an array of topics to be deleted"}
      } ]
}
 
樣例:
{
  "version"1,
  "topics": ["foo""bar"]
}

Topic配置

/config/topics/[topic_name]

樣例

{
  "version"1,
  "config": {
    "config.a""x",
    "config.b""y",
    ...
   }
}

3)kafka log4j配置

kafka日誌文件分爲5種類型,依次爲:controller,kafka-request,server,state-change,log-cleaner。不一樣類型log數據,寫到不一樣文件裏:

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. kafka.logs.dir=logs  
  2.    
  3. log4j.rootLogger=INFO, stdout  
  4.    
  5. log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
  6. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
  7. log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n  
  8.    
  9. log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender  
  10. log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH  
  11. log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log  
  12. log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout  
  13. log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n  
  14.    
  15. log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender  
  16. log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH  
  17. log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log  
  18. log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout  
  19. log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n  
  20.    
  21. log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender  
  22. log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH  
  23. log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log  
  24. log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout  
  25. log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n  
  26.    
  27. log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender  
  28. log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH  
  29. log4j.appender.cleanerAppender.File=log-cleaner.log  
  30. log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout  
  31. log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n  
  32.    
  33.    
  34. log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender  
  35. log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH  
  36. log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log  
  37. log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout  
  38. log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n  
  39.    
  40. # Turn on all our debugging info  
  41. #log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender  
  42. #log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender  
  43. #log4j.logger.kafka.perf=DEBUG, kafkaAppender  
  44. #log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender  
  45. #log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG  
  46. log4j.logger.kafka=INFO, kafkaAppender  
  47.    
  48. log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender  
  49. log4j.additivity.kafka.network.RequestChannel$=false  
  50.    
  51. #log4j.logger.kafka.network.Processor=TRACE, requestAppender  
  52. #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender  
  53. #log4j.additivity.kafka.server.KafkaApis=false  
  54. log4j.logger.kafka.request.logger=WARN, requestAppender  
  55. log4j.additivity.kafka.request.logger=false  
  56.    
  57.    
  58. log4j.logger.kafka.controller=TRACE, controllerAppender  
  59. log4j.additivity.kafka.controller=false  
  60.    
  61.    
  62. log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender  
  63. log4j.additivity.kafka.log.LogCleaner=false  
  64. log4j.logger.kafka.log.Cleaner=INFO, cleanerAppender  
  65. log4j.additivity.kafka.log.Cleaner=false  
  66.    
  67.    
  68. log4j.logger.state.change.logger=TRACE, stateChangeAppender  
  69. log4j.additivity.state.change.logger=false 

4)kafka replication設計機制

概覽:

當中一個broker被選舉做爲整個集羣控制器,他將負責幾個方面工做:

1.管理或領導分區變化.

2.create topic,delete topic

3.replicas(運行復制計劃,複製partition)

集羣控制器作出決定之後,操做信息或狀態將永久註冊並存儲在zookeeper上。並且也可以經過RPC方式發送新的決定操做broker。控制器公佈的決定來源真實。他將用於client請求路由和broker的從新啓動或恢復狀態。

假設有一個新的broker增長或啓動。controller會經過RPC調用發出新的決定。

潛在的長處:

1.當leader發生變化時,更easy集中到一個地方作調試(排除故障)。

2.當leader發生變化時。ZK可以把讀取/寫狀態信息成批廣播到其它broker,所以當leader failover的時候會下降broker之間恢復的延遲時間。

3.需要更少的監聽器。

4.使用更高效的RPC通訊方式,取代在zookeeper中隊列實現方式。

潛在的缺點:

需要考慮controller failover

zookeeper中路徑列表說明

1.Controller path:存儲當前controller信息.

/controller --> {brokerid} (ephemeral; created by controller)

2.Broker path:存儲當前所有活着的brokers信息。

/brokers/ids/[broker_id] --> host:port (ephemeral; created by admin)

3.存儲一個主題的所有分區副本任務。對於每一個副本。咱們存儲的副本指派一個broker ID。第一個副本是首選的複製品。

注意,對於一個給定的分區,在一個broker上有至多一個副本。所以,broker ID可以做副本標識.

/brokers/topics/[topic]/[partition_id]/leaderAndISR --> {leader_epoc: epoc, leader: broker_id, ISR: {broker1, broker2}}
此路徑被controller或leader改動,當前leader僅僅改動ISR一部分信息。

當更新path需要使用條件同步到zookeeper上。

4.LeaderAndISR path:存儲一個分區leader and ISR

/brokers/topics/[topic]/[partition_id]/leaderAndISR --> {leader_epoc: epoc, leader: broker_id, ISR: {broker1, broker2}}
此路徑被controller或leader改動。當前leader僅僅改動ISR一部分信息。

當更新path需要使用條件同步到zookeeper上。

5.分區分配path:當咱們又一次分配某些分區到不一樣的brokers時。此path會被使用。對於每個分區又一次分配,他將會存儲一個新副本列表和他們對應的brokers信息。

每當某個管理員操做例如如下命令成功後,且這個分區遷移到目標broker成功後,源broker上的分區會本身主動刪除。

/admin/partitions_add/[topic]/[partition_id] --> {broker_id …} (created by admin)
/admin/partitions_remove/[topic]/[partition_id] (created by admin)

kafka中專有詞語解釋:

AR(assign replicas):分配副本  ISR(in-sync replicas):在同步中的副本

Replica {                                // 一個分區副本信息
  broker_id               : int
  partition               : Partition    //分區信息
  log                     : Log          //本地日誌與副本關聯信息
  hw                      : long         //最後被commit的message的offset信息
  leo                     : long         // 日誌結尾offset
  isLeader                : Boolean      //是否爲該副本的leader
}
  
Partition {                              //topic名稱
  topic                   : string
  partition_id            : int
  leader                  : Replica      // 這個分區的leader副本
  ISR                     : Set[Replica] // 正在同步中的副本集合
  AR                      : Set[Replica] // 這個分區的所有副本分配集合
  LeaderAndISRVersionInZK : long         // version id of the LeaderAndISR path; used for conditionally update the LeaderAndISR path in ZK
}
  
LeaderAndISRRequest {
  request_type_id         : int16 // 當前request的版本號
  version_id              : int16 // request的版本號號
  client_id               : int32 // this can be the broker id of the controller
  ack_timeout             : int32 // the time in ms to wait for a response
  isInit                  : byte  // whether this is the first command issued by a controller
  leaderAndISRMap         : Map[(topic: String, partitionId: int32) => LeaderAndISR) // a map of LeaderAndISR
}
  
LeaderAndISR {
  leader                  : int32          // leader的broker編號
  leaderEpoc              : int32          // leader epoc, incremented on each leadership change
  ISR                     : Set[int32]     // 所有在ISR複製副本的broker集合
  zkVersion               : int64          // version of the LeaderAndISR path in ZK
}
  
LeaderAndISRResponse {
  version_id              : int16 // 當前request的版本號
  responseMap             : Map[(topic: String, partitionId: int32) => int16) // error code表
}
  
StopReplicaRequest {
  request_type_id         : int16 // request id
  version_id              : int16 // 當前request的版本號
  client_id               : int32 // this can be the broker id of the controller
  ack_timeout             : int32 // ack響應時間。單位爲毫秒
  stopReplicaSet          : Set[(topic: String, partitionId: int)) // 需要中止的分區集合
}
  
StopReplicaResponse {
  version_id              : int16 // 當前request的版本號
  responseMap             : Map[(topic: String, partitionId: int32) => int16) //error code表
}

5)apache kafka監控系列-監控指標

一、監控目標

    1.當系統可能或處於亞健康狀態時及時提醒。預防故障發生

    2.報警提示 a.短信方式 b.郵件

二、監控內容

2.1 機器監控

Kafkaserver指標

  1. CPU Load
  2. Disk IO
  3. Memory
  4. 磁盤log.dirs文件夾下數據文件大小,要有定時清除策略

2.2 JVM監控

主要監控JAVA的 GC time(垃圾回收時間)。JAVA的垃圾回收機制對性能的影響比較明顯

2.3 Kafka系統監控

一、Kafka總體監控

  • zookeeper上/XXX/broker/ids文件夾下節點數量
  • leader 選舉頻率

二、Kafka Broker監控

  • kafka集羣中Broker列表,broker執行情況,包含node下線。活躍數量
  • Broker是否提供服務
  • 數據流量  流入速度。流出速度 (message / byte)
  • ISR 收縮頻率

三、Kafka Controller監控

  • controller存活數目

四、Kafka Producer監控

  • producer數量,排隊狀況
  • 請求響應時間
  • QPS/分鐘

五、Kafka Consumer監控

  • consumer隊列中排隊請求數
  • 請求響應時間
  • 近期一分鐘平均每秒請求數

六、Topic監控

  • 數據量大小;
  • offset
  • 數據流量 流入速度,流出速度 (message / byte)

3.監控指標

3.1 JVM監控

a.經過JMX獲取GC time

b.jvm full gc次數

        c.經過jmx監控kafka相關參數

3.2 kafka系統監控

監控數據獲取方式

一、生存節點信息可以從zookeeper獲取

二、除生存節點 和 

a、Broker是否提供服務。

b、Topic數據量大小。

c、Topic的offset 外,其它數據都可以經過JMX獲取


6)kafka.common.ConsumerRebalanceFailedException異常解決的方法

kafka.common.ConsumerRebalanceFailedException :log-push-record-consumer-group_mobile-pushremind02.lf.xxx.com-1399456594831-99f15e63 can't rebalance after 3 retries

at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown Source)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown Source)
at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown Source)
at com.xxx.mafka.client.consumer.DefaultConsumerProcessor.getKafkaStreams(DefaultConsumerProcessor.java:149)
at com.xxx.mafka.client.consumer.DefaultConsumerProcessor.recvMessage(DefaultConsumerProcessor.java:63)
at com.xxx.service.mobile.push.kafka.MafkaPushRecordConsumer.main(MafkaPushRecordConsumer.java:22)

at com.xxx.service.mobile.push.Bootstrap.main(Bootstrap.java:34)


出現以上問題緣由分析:

同一個消費者組(consumer group)有多個consumer前後啓動。就是一個消費者組內有多個consumer同一時候負載消費多個partition數據.

解決的方法:

1.配置zk問題(kafka的consumer配置)

zookeeper.session.timeout.ms=5000

zookeeper.connection.timeout.ms=10000

zookeeper.sync.time.ms=2000


在使用高級API過程當中,通常出現這個問題是zookeeper.sync.time.ms時間間隔配置太短,不排除有其它緣由引發。但筆者遇到一般是這個緣由。

給你們解釋一下緣由:一個消費者組中(consumer數量<partitions數量)每當有consumer發送變化,會觸發負載均衡。第一件事就是釋放當consumer資源,無則免之,調用ConsumerFetcherThread關閉並釋放當前kafka broker所有鏈接。釋放當前消費的partitons。實際就是刪除暫時節點(/xxx/consumer/owners/topic-xxx/partitions[0-n]),所有同一個consumer group內所有consumer經過計算獲取本consumer要消費的partitions。而後本consumer註冊對應暫時節點卡位,表明我擁有該partition的消費所有權。其它consumer不能使用。


假設你們理解上面解釋,如下就更easy了,當consumer調用Rebalance時,它是依照時間間隔和最大次數採取失敗重試原則。每當獲取partitions失敗後會重試獲取。

舉個樣例。假如某個公司有個會議,B部門在某個時間段預訂該會議室,但是時間到了去會議室看時。發現A部門還在使用。這時B部門僅僅有等待了,每隔一段時間去詢問一下。假設時間過於頻繁。則會議室一直會處於佔用狀態。假設時間間隔設置長點。可能去個2次。A部門就讓出來了。


同理,當新consumer增長又一次觸發rebalance時,已有(old)的consumer會又一次計算並釋放佔用partitions。但是會消耗必定處理時間。此時新(new)consumer去搶佔該partitions很是有可能就會失敗。

咱們若是設置足夠old consumer釋放資源的時間,就不會出現這個問題。


zookeeper.sync.time.ms時間設置太短就會致使old consumer尚未來得及釋放資源,new consumer重試失敗屢次到達閥值就退出了。


zookeeper.sync.time.ms設置時間閥值。要考慮網絡環境。server性能等因素在內綜合衡量。


kafka zk節點存儲。請參考:kafka在zookeeper中存儲結構


7)kafak安裝與使用

kafak安裝與使用

1.前言

學習kafka的基礎是先把kafka系統部署起來,而後簡單的使用它。從直觀上感受它,而後逐步的深刻了解它。


本文介紹了kafka部署方法,包含配置。安裝和簡單的使用。


2.kafka下載和安裝

kafka版本號一直在更新,且每次更新,變化均比較大,如配置文件有修改,kafka 0.7到0.8.1版本號變化很是大,包含增長。支持集羣內複製,支持多個數據文件夾。請求處理改成異步,實現partition動態管理,基於時間的日誌段刪除

2.1下載地址:

https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz。

kafka文件夾結構

如 圖-1


說明:塗黑部分爲我本身建立目錄

文件夾

說明

bin

操做kafka的可運行腳本,還包括windows下腳本

config

配置文件所在文件夾

libs

依賴庫文件夾

logs

日誌數據文件夾,文件夾kafkaserver端日誌分爲5種類型。分爲:server,request,statelog-cleanercontroller

2.1 安裝以及啓動kafka

步驟1:

lizhitao@localhost:~$ tar -xzf kafka_2.10-0.8.1.1.tgz
lizhitao@localhost:~$ cd kafka_2.10-0.8.1.1.tgz

步驟2:

 配置zookeeper(若是您已經安裝了zookeeper,若是沒有安裝。請再網上搜索安裝方法)
進入kafka安裝project根文件夾編輯 vim config/server.properties  改動屬性zookeeper.connect=ip:8081,ip2:8082

步驟3:

kafka最爲重要三個配置依次爲:broker.id、log.dir、zookeeper.connect
kafka server端config/server.properties參數說明和解釋例如如下:
依據屬性說明完畢配置
broker.id = 1
port = 9092

步驟4: 啓動服務

cd kafka-0.8.1

lizhitao@localhost:~$ bin/kafka-server-start.sh config/server.properties

[2014-04-16 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)

[2014-04-16 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)

...

步驟5:建立topic

lizhitao@localhost:~$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

步驟6:驗證topic是否建立成功

lizhitao@localhost:~$ bin/kafka-topics.sh --list --zookeeper localhost:2181

test

Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.

步驟7:發送一些消息驗證,在console模式下。啓動producer

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a message

This is another message

步驟7:啓動一個consumer

lizhitao@localhost:~$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

This is a message

This is another message

3.配置kafka集羣模式,需要由多個broker組成

步驟1:

因爲需要在同一個文件夾(config)下配置多個server.properties,操做過程例如如下:

lizhitao@localhost:~$ cp config/server.properties config/server-1.properties 

lizhitao@localhost:~$ cp config/server.properties config/server-2.properties

步驟2:

需要編輯並設置例如如下文件屬性:

config/server-1.properties:

    broker.id=1

    port=9093

    log.dir=/tmp/kafka-logs-1

 

config/server-2.properties:

    broker.id=2

    port=9094

    log.dir=/tmp/kafka-logs-2

啓動服務

lizhitao@localhost:~$ bin/kafka-server-start.sh config/server-1.properties &

...

lizhitao@localhost:~$ bin/kafka-server-start.sh config/server-2.properties &

...

步驟3:

建立topic

lizhitao@localhost:~$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

.....

topic created success....

lizhitao@localhost:~$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic     PartitionCount:1    ReplicationFactor:3Configs:

Topic: my-replicated-topic     Partition: 0Leader: 1Replicas: 1,2,0Isr: 1,2,0

描寫敘述topic中分區,同步副本狀況

lizhitao@localhost:~$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic:test PartitionCount:1 ReplicationFactor:1Configs:

Topic: test Partition: 0 Leader: 0Replicas: 0Isr: 0

步驟4:做爲生產者發送消息

lizhitao@localhost:~$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

...

my test message 1

my test message 2

步驟5:消費topic數據

lizhitao@localhost:~$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2

步驟6:

檢查consumer offset位置

lizhitao@localhost:~$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test

Group           Topic                          Pid Offset          logSize         Lag             Owner

my-group        my-topic                       0   0               0               0               test_jkreps-mn-1394154511599-60744496-0

my-group        my-topic                       1   0               0               0               test_jkreps-mn-1394154521217-1a0be913-0

8)apache kafka中server.properties配置文件參數說明

每個kafka broker中配置文件server.properties默認必須配置的屬性例如如下:

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. broker.id=0  
  2. num.network.threads=2  
  3. num.io.threads=8  
  4. socket.send.buffer.bytes=1048576  
  5. socket.receive.buffer.bytes=1048576  
  6. socket.request.max.bytes=104857600  
  7. log.dirs=/tmp/kafka-logs  
  8. num.partitions=2  
  9. log.retention.hours=168  
  10.   
  11. log.segment.bytes=536870912  
  12. log.retention.check.interval.ms=60000  
  13. log.cleaner.enable=false  
  14.   
  15. zookeeper.connect=localhost:2181  
  16. zookeeper.connection.timeout.ms=1000000  

server.properties中所有配置參數說明(解釋)例如如下列表:

參數

說明(解釋)

broker.id =0

每一個broker在集羣中的惟一表示,要求是正數。當該server的IP地址發生改變時,broker.id沒有變化。則不會影響consumers的消息狀況

log.dirs=/data/kafka-logs

kafka數據的存放地址,多個地址的話用逗號切割,多個文件夾分佈在不一樣磁盤上可以提升讀寫性能  /data/kafka-logs-1/data/kafka-logs-2

port =9092

broker server服務port

message.max.bytes =6525000

表示消息體的最大大小。單位是字節

num.network.threads =4

broker處理消息的最大線程數,普通狀況下不需要去改動

num.io.threads =8

broker處理磁盤IO的線程數,數值應該大於你的硬盤數

background.threads =4

一些後臺任務處理的線程數,好比過時消息文件的刪除等,普通狀況下不需要去作改動

queued.max.requests =500

等待IO線程處理的請求隊列最大數,如果等待IO的請求超過這個數值,那麼會中止接受外部消息,應該是一種自我保護機制。

host.name

broker的主機地址,如果設置了,那麼會綁定到這個地址上。如果沒有,會綁定到所有的接口上。並將當中之中的一個發送到ZK,通常不設置

socket.send.buffer.bytes=100*1024

socket的發送緩衝區,socket的調優參數SO_SNDBUFF

socket.receive.buffer.bytes =100*1024

socket的接受緩衝區,socket的調優參數SO_RCVBUFF

socket.request.max.bytes =100*1024*1024

socket請求的最大數值,防止serverOOMmessage.max.bytes一定要小於socket.request.max.bytes,會被topic建立時的指定參數覆蓋

log.segment.bytes =1024*1024*1024

topic的分區是以一堆segment文件存儲的,這個控制每個segment的大小,會被topic建立時的指定參數覆蓋

log.roll.hours =24*7

這個參數會在日誌segment沒有達到log.segment.bytes設置的大小,也會強制新建一個segment會被 topic建立時的指定參數覆蓋

log.cleanup.policy = delete

日誌清理策略選擇有:deletecompact主要針對過時數據的處理。或是日誌文件達到限制的額度,會被 topic建立時的指定參數覆蓋

log.retention.minutes=3days

數據存儲的最大時間超過這個時間會依據log.cleanup.policy設置的策略處理數據,也就是消費端能夠多久去消費數據

log.retention.byteslog.retention.minutes隨意一個達到要求,都會運行刪除,會被topic建立時的指定參數覆蓋

log.retention.bytes=-1

topic每個分區的最大文件大小,一個topic的限制大小 = 分區數*log.retention.bytes-1沒有大小限log.retention.byteslog.retention.minutes隨意一個達到要求,都會運行刪除。會被topic建立時的指定參數覆蓋

log.retention.check.interval.ms=5minutes

文件大小檢查的週期時間。是否處罰 log.cleanup.policy中設置的策略

log.cleaner.enable=false

是否開啓日誌壓縮

log.cleaner.threads = 2

日誌壓縮執行的線程數

log.cleaner.io.max.bytes.per.second=None

日誌壓縮時候處理的最大大小

log.cleaner.dedupe.buffer.size=500*1024*1024

日誌壓縮去重時候的緩存空間,在空間贊成的狀況下。越大越好

log.cleaner.io.buffer.size=512*1024

日誌清理時候用到的IO塊大小通常不需要改動

log.cleaner.io.buffer.load.factor =0.9

日誌清理中hash表的擴大因子通常不需要改動

log.cleaner.backoff.ms =15000

檢查是否處罰日誌清理的間隔

log.cleaner.min.cleanable.ratio=0.5

日誌清理的頻率控制,越大意味着更高效的清理,同一時候會存在一些空間上的浪費,會被topic建立時的指定參數覆蓋

log.cleaner.delete.retention.ms =1day

對於壓縮的日誌保留的最長時間,也是client消費消息的最長時間。同log.retention.minutes的差異在於一個控制未壓縮數據。一個控制壓縮後的數據。會被topic建立時的指定參數覆蓋

log.index.size.max.bytes =10*1024*1024

對於segment日誌的索引文件限制大小,會被topic建立時的指定參數覆蓋

log.index.interval.bytes =4096

當運行一個fetch操做後,需要必定的空間來掃描近期的offset大小。設置越大,表明掃描速度越快,但是也更好內存。普通狀況下不需要搭理這個參數

log.flush.interval.messages=None

log文件」sync」到磁盤以前累積的消息條數,因爲磁盤IO操做是一個慢操做,但又是一個數據可靠性"的必要手段,因此此參數的設置,需要在"數據可靠性""性能"之間作必要的權衡.假設此值過大,將會致使每次"fsync"的時間較長(IO堵塞),假設此值太小,將會致使"fsync"的次數較多,這也意味着整體的client請求有必定的延遲.物理server故障,將會致使沒有fsync的消息丟失.

log.flush.scheduler.interval.ms =3000

檢查是否需要固化到硬盤的時間間隔

log.flush.interval.ms = None

只經過interval來控制消息的磁盤寫入時機,是不足的.此參數用於控制"fsync"的時間間隔,假設消息量始終沒有達到閥值,但是離上一次磁盤同步的時間間隔達到閥值,也將觸發.

log.delete.delay.ms =60000

文件在索引中清除後保留的時間通常不需要去改動

log.flush.offset.checkpoint.interval.ms =60000

控制上次固化硬盤的時間點,以便於數據恢復通常不需要去改動

auto.create.topics.enable =true

是否贊成本身主動建立topic,如果false,就需要經過命令建立topic

default.replication.factor =1

是否贊成本身主動建立topic,如果false,就需要經過命令建立topic

num.partitions =1

每個topic的分區個數。如果在topic建立時候沒有指定的話會被topic建立時的指定參數覆蓋

下面是kafkaLeader,replicas配置參數

controller.socket.timeout.ms =30000

partition leaderreplicas之間通信時,socket的超時時間

controller.message.queue.size=10

partition leaderreplicas數據同步時,消息的隊列尺寸

replica.lag.time.max.ms =10000

replicas響應partition leader的最長等待時間,如果超過這個時間,就將replicas列入ISR(in-sync replicas),並以爲它是死的,不會再增長管理中

replica.lag.max.messages =4000

假設follower落後與leader太多,將會以爲此follower[或者說partition relicas]已經失效

##一般,followerleader通信時,因爲網絡延遲或者連接斷開,總會致使replicas中消息同步滯後

##假設消息以後太多,leader將以爲此follower網絡延遲較大或者消息吞吐能力有限,將會把此replicas遷移

##到其它follower.

##broker數量較少,或者網絡不足的環境中,建議提升此值.

replica.socket.timeout.ms=30*1000

followerleader之間的socket超時時間

replica.socket.receive.buffer.bytes=64*1024

leader複製時候的socket緩存大小

replica.fetch.max.bytes =1024*1024

replicas每次獲取數據的最大大小

replica.fetch.wait.max.ms =500

replicasleader之間通訊的最大等待時間。失敗了會重試

replica.fetch.min.bytes =1

fetch的最小數據尺寸,假設leader中還沒有同步的數據不足此值,將會堵塞,直到知足條件

num.replica.fetchers=1

leader進行復制的線程數,增大這個數值會添加followerIO

replica.high.watermark.checkpoint.interval.ms =5000

每個replica檢查是否將最高水位進行固化的頻率

controlled.shutdown.enable =false

是否贊成控制器關閉broker ,如果設置爲true,會關閉所有在這個broker上的leader,並轉移到其它broker

controlled.shutdown.max.retries =3

控制器關閉的嘗試次數

controlled.shutdown.retry.backoff.ms =5000

每次關閉嘗試的時間間隔

leader.imbalance.per.broker.percentage =10

leader的不平衡比例。如果超過這個數值。會對分區進行又一次的平衡

leader.imbalance.check.interval.seconds =300

檢查leader是否不平衡的時間間隔

offset.metadata.max.bytes

client保留offset信息的最大空間大小

kafkazookeeper參數配置

zookeeper.connect = localhost:2181

zookeeper集羣的地址。可以是多個,多個之間用逗號切割hostname1:port1,hostname2:port2,hostname3:port3

zookeeper.session.timeout.ms=6000

ZooKeeper的最大超時時間。就是心跳的間隔,如果沒有反映,那麼以爲已經死了,不易過大

zookeeper.connection.timeout.ms =6000

ZooKeeper的鏈接超時時間

zookeeper.sync.time.ms =2000

ZooKeeper集羣中leaderfollower之間的同步實際那

9)apache kafka的consumer初始化時獲取不到消息

問題

發現一個問題,假設使用的是一個高級的kafka接口 那麼默認的狀況下假設某個topic沒有變化 則consumer消費不到消息 比方某個消息生產了2w條,此時producer再也不生產消息,而後另一個consumer啓動,此時拿不到消息.

緣由解釋

auto.offset.reset:假設zookeeper沒有offset值或offset值超出範圍。那麼就給個初始的offset。

有smallest、largest、anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。

默認largest

默認值:auto.offset.reset=largest

10)Kafka Producer處理邏輯

Kafka Producer處理邏輯

Kafka Producer產生數據發送給Kafka Server,詳細的分發邏輯及負載均衡邏輯,全部由producer維護。

Kafka結構圖

Kafka Producer默認調用邏輯

默認Partition邏輯

一、沒有key時的分發邏輯

每隔 topic.metadata.refresh.interval.ms 的時間,隨機選擇一個partition。這個時間窗體內的所有記錄發送到這個partition。

發送數據出錯後也會又一次選擇一個partition

二、依據key分發

對key求hash,而後對partition數量求模

Utils.abs(key.hashCode) % numPartitions

怎樣獲取Partition的leader信息(元數據)

決定好發送到哪一個Partition後。需要明白該Partition的leader是哪臺broker才幹決定發送到哪裏。

詳細實現位置

kafka.client.ClientUtils#fetchTopicMetadata

 實現方案

一、從broker獲取Partition的元數據。由於Kafka所有broker存有所有的元數據,因此不論什麼一個broker都可以返回所有的元數據

二、broker選取策略:將broker列表隨機排序,從首個broker開始訪問,假設出錯,訪問下一個

三、出錯處理:出錯後向下一個broker請求元數據

注意

  • Producer是從broker獲取元數據的,並不關心zookeeper。
  • broker發生變化後。producer獲取元數據的功能不能動態變化。
  • 獲取元數據時使用的broker列表由producer的配置中的 metadata.broker.list 決定。

    該列表中的機器僅僅要有一臺正常服務,producer就能獲取元數據。

  • 獲取元數據後,producer可以寫數據到非 metadata.broker.list 列表中的broker

錯誤處理

producer的send函數默認沒有返回值。出錯處理有EventHandler實現。

DefaultEventHandler的錯誤處理例如如下:

  • 獲取出錯的數據
  • 等待一個間隔時間。由配置 retry.backoff.ms 決定這段時間長短
  • 又一次獲取元數據
  • 又一次發送數據

出錯重試次數由配置 message.send.max.retries 決定

全部重試全部失敗時。DefaultEventHandler會拋出異常。代碼例如如下

if(outstandingProduceRequests.size >0) {
  producerStats.failedSendRate.mark()
  val correlationIdEnd = correlationId.get()
  error("Failed to send requests for topics %s with correlation ids in [%d,%d]"
    .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","),
    correlationIdStart, correlationIdEnd-1))
  thrownewFailedToSendMessageException("Failed to send messages after "+ config.messageSendMaxRetries +" tries.",null)
}


11)apache kafka源碼project環境搭建(IDEA)

1.gradle安裝

2.下載apache kafka源碼

3.用gradle構建產生IDEAproject文件

先裝好idea的scala插件。否則構建時就會本身主動下載,由於沒有國內鏡像。速度會很是慢。
lizhitao@users-MacBook-Pro:~/Downloads/kafka_2.10-0.8.1$ gradle idea
假設是eclipseproject,運行:gradle eclipse
生成IDEAproject文件例如如下:

4.項目導入到IDEAproject中

File-->Open

5.IDEA中查看源代碼project


6.Kafka啓動時,參數設置


7.log4j.properties文件路徑設置

啓動kafka server很是奇怪,log4j.properties文件找不到。報例如如下錯誤。
log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
僅僅有把log4j.properties放置到src/main/scala路徑下,才幹找到文件,而後執行程序,正確輸出日誌信息。輸出例如如下所看到的

[2014-05-24 23:45:31,965] INFO Verifying properties (kafka.utils.VerifiableProperties)  
[2014-05-24 23:45:32,009] INFO Property broker.id is overridden to 9 (kafka.utils.VerifiableProperties)  
[2014-05-24 23:45:32,009] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)  
[2014-05-24 23:45:32,009] INFO Property log.dirs is overridden to /Users/lizhitao/kafka-logs (kafka.utils.VerifiableProperties)  
[2014-05-24 23:45:32,009] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)  
[2014-05-24 23:45:32,010] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)  
[2014-05-24 23:45:32,010] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)  
[2014-05-24 23:45:32,010] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)  
[2014-05-24 23:45:32,010] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)  
[2014-05-24 23:45:32,010] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)  
[2014-05-24 23:45:32,010] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)  
[2014-05-24 23:45:32,010] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)  
[2014-05-24 23:45:32,011] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)  
[2014-05-24 23:45:32,011] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)  
[2014-05-24 23:45:32,011] INFO Property zookeeper.connect is overridden to 192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183 (kafka.utils.VerifiableProperties)  
[2014-05-24 23:45:32,011] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)  
[2014-05-24 23:45:32,032] INFO [Kafka Server 9], starting (kafka.server.KafkaServer)  
[2014-05-24 23:45:32,036] INFO [Kafka Server 9], Connecting to zookeeper on 192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183 (kafka.server.KafkaServer)  
[2014-05-24 23:45:32,045] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)  
[2014-05-24 23:45:32,370] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper)  
[2014-05-24 23:45:32,370] INFO Client environment:host.name=192.168.2.104 (org.apache.zookeeper.ZooKeeper)  
[2014-05-24 23:45:32,370] INFO Client environment:java.version=1.7.0_55 (org.apache.zookeeper.ZooKeeper)  
[2014-05-24 23:45:32,370] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)  
[2014-05-24 23:45:32,370] INFO Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre (org.apache.zookeeper.ZooKeeper)  
[2014-05-24 23:45:32,370] INFO Client environment:java.class.path=/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/lib/javafx-doclet.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/lib/tools.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/htmlconverter.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_55.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Users/lizhitao/mt_wp/open_source/kafka-platform/kafka-0.8.1-src/out/production/core:/Users/lizhitao/.gradle/caches/modules-2/files-2.1/org.scala-lang/scala-library/2.8.0/95bf967bf2e0a26727736228bba3451f4dd3e5b9/scala-library-2.8.0.jar:/Users/lizhitao/.gradle/caches/modules-2/files-2.1/org.apache.zookeeper/zookeeper/3.3.4/6471e17c92181da9e143559c4c4779925a5e6eb0/zookeeper-3.3.4.jar:/Users/lizhitao/.gradle/caches/modules-2/files-2.1/com.101tec/zkclient/0.3/dedcf2b53fb742adba7080ac3aed781694ba616e/zkclient-0.3.jar:/Users/lizhitao/.gradle/caches/modules-2/files-2.1/com.yammer.metrics/metrics-core/2.2.0/f82c035cfa786d3cbec362c38c22a5f5b1bc8724/metrics-core-2.2.0.jar:/Users/lizhitao/.gradle/caches/modules-2/files-2.1/com.yammer.metrics/metrics-annotation/2.2.0/62962b54c490a95c0bb255fa93b0ddd6cc36dd4b/metrics-annotation-2.2.0.jar:/Users/lizhitao/.gradle/caches/modules-2/files-2.1/net.sf.jopt-simple/jopt-simple/3.2/d625f12ba08083c8c16dcedd5396ec730e9e77ab/jopt-simple-3.2.jar:/Users/lizhitao/.gradle/caches/modules-2/files-2.1/org.xerial.snappy/snappy-java/1.0.5/10cb4550360a0ec6b80f09a5209d00b6058e82bf/snappy-java-1.0.5.jar:/Users/lizhitao/.gradle/caches/modules-2/files-2.1/log4j/log4j/1.2.15/f0a0d2e29ed910808c33135a3a5a51bba6358f7b/log4j-1.2.15.jar:/Users/lizhitao/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.2/81d61b7f33ebeab314e07de0cc596f8e858d97/slf4j-api-1.7.2.jar:/Applications/IntelliJ IDEA 12.app/lib/idea_rt.jar (org.apache.zookeeper.ZooKeeper)  
[2014-05-24 23:45:32,370] INFO Client environment:java.library.path=/Users/lizhitao/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:. (org.apache.zookeeper.ZooKeeper)  
[2014-05-24 23:45:32,370] INFO Client environment:java.io.tmpdir=/var/folders/pn/qjf0v4k52mq965jxjd72hlx00000gp/T/ (org.apache.zookeeper.ZooKeeper)  
[2014-05-24 23:45:32,370] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)  
[2014-05-24 23:45:32,370] INFO Client environment:os.name=Mac OS X (org.apache.zookeeper.ZooKeeper)  
[2014-05-24 23:45:32,370] INFO Client environment:os.arch=x86_64 (org.apache.zookeeper.ZooKeeper)  
[2014-05-24 23:45:32,371] INFO Client environment:os.version=10.9.2 (org.apache.zookeeper.ZooKeeper)  
[2014-05-24 23:45:32,371] INFO Client environment:user.name=lizhitao (org.apache.zookeeper.ZooKeeper)  
[2014-05-24 23:45:32,371] INFO Client environment:user.home=/Users/lizhitao (org.apache.zookeeper.ZooKeeper)  
[2014-05-24 23:45:32,371] INFO Client environment:user.dir=/Users/lizhitao/mt_wp/open_source/kafka-platform/kafka-0.8.1-src (org.apache.zookeeper.ZooKeeper)  
[2014-05-24 23:45:32,372] INFO Initiating client connection, connectString=192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@6e739617 (org.apache.zookeeper.ZooKeeper)  
[2014-05-24 23:45:32,387] INFO Opening socket connection to server /192.168.2.225:2181 (org.apache.zookeeper.ClientCnxn)  
[2014-05-24 23:45:32,393] ERROR Unable to open socket to 192.168.2.225/192.168.2.225:2181 (org.apache.zookeeper.ClientCnxn)
經過如上7步後就可以正確啓動kafka程序,進行相關debug,並研究其源碼了。

12)apache kafka監控系列-KafkaOffsetMonitor

概覽

近期kafka server消息服務上線了,基於jmx指標參數也寫到zabbix中了。但總認爲缺乏點什麼東西,可視化可操做的界面。zabbix中數據比較分散。不能集中看整個集羣狀況。

或者一個cluster中broker列表。本身寫web-console比較耗時耗力。用原型工具畫了一些管理界面東西,關鍵本身也不前端方面技術,這方面比較薄弱。

這不開源社區提供了kafka的web管理平臺KafkaOffsetMonitor.就迅速拿過來執行。

你們不要着急,當即娓娓道來。

說明:

這個應用程序來實時監控你kafka服務的consumer以及他們在partition中的offset(偏移)。 

你可以瀏覽當前的消費者組,每個topic的所有partition的消費狀況都可以盡收眼底。這事實上是很是實用得。從這裏你很是快知道每個partition的message是否很是快被消費(沒有堵塞)。

他能指導你(kafka producer和consumer)優化代碼。

這個web管理平臺保留的partition offset和consumer滯後的歷史數據,因此你可以很是輕易瞭解這幾天consumer消費狀況。

 

KafkaOffsetMonitor功能:

1.從標題都可以看出來,Kafka Offset Monitor,是對consumer消費狀況進行監控,並能列出每個consumer offset,滯後數據。

2.消費者組列表

3.每個topic的所有parition列表(topic,pid,offset,logSize,lag,owner)

4.查看topic的歷史消費信息.

儘管功能覆蓋面不全,但是很是有用。

1.下載

github官網下載

KafkaOffsetMonitor

百度雲下載(網速快)

百度雲KafkaOffsetMonitor下載

說明:百度雲下載爲改動版本號,因爲KafkaOffsetMonitor中有些資源文件(css,js)是訪問外網的。特別是有訪問google資源。你們都懂的,經常不能訪問。建議下載改動版

2.安裝

KafkaOffsetMonitor執行比較簡單。因爲所有執行文件,資源文件。jar文件都打包到KafkaOffsetMonitor-assembly-0.2.0.jar了,直接執行就可以。這樣的方式太棒了。

既不用編譯也不用配置。呵呵,也不是絕對不配置。

a.新建一個文件夾kafka-offset-console,而後把jar複製到該文件夾下.

b.新建腳本。因爲您可能不是一個kafka集羣。用腳本可以啓動多個

lizhitao@users-MacBook-Pro:   vim mobile_start_en.sh

#!/bin/bash
java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
     com.quantifind.kafka.offsetapp.OffsetGetterWeb \
     --zk 192.168.2.101:2181,192.168.2.102:2182,192.168.2.103:2181/config/mobile/xxx \               
     --port 8086 \
     --refresh 10.seconds \
     --retain 7.days 1>mobile-logs/stdout.log 2>mobile-logs/stderr.log &

注意:/config/mobile/xxx  表示zk的根文件夾,需要手工建立,也可以不設置

3.執行

lizhitao@users-MacBook-Pro:  chmod +x mobile_start_en.sh

lizhitao@users-MacBook-Pro:  ./mobile_start_en.sh

serving resources from: jar:file:/opt/xxx/kafka-offset-console/KafkaOffsetMonitor-assembly-0.2.0.jar!/offsetapp

6 演示截圖:

消費者組列表


topic的所有partiton消費狀況列表


kafka正在執行的topic


kafka集羣中topic列表


kafka集羣中broker列表




13)Kafka Controller設計機制

在kafka集羣中,當中一個broker server做爲中央控制器,負責管理分區和副本狀態並運行管理着這些分區的又一次分配。如下說明怎樣經過中央控制器操做分區和副本的狀態。

名詞解釋:

isr:同步副本組

OfflinePartitionLeaderSelector:分區下線後新的領導者選舉

OAR:老的分配副本

PartitionStateChange:

其有效狀態例如如下:

  • NonExistentPartition:  這樣的狀態代表該分區歷來沒有建立過或之前建立事後來又刪除了。
  • NewPartition:建立分區後,分區處於NewPartition狀態。在這樣的狀態下,分區副本應該分配給它,但尚未領導者/同步複製組。

  • OnlinePartition:一旦一個分區領導者被選出。就會爲在線分區狀態。
  • OfflinePartition:假設分區領導者成功選舉後,當領導者分區崩潰或掛了,分區狀態轉變下線分區狀態。

其有效的狀態轉移例如如下:

NonExistentPartition -> NewPartition

       1.羣集中央控制器依據計算規則,從zk中讀取分區信息。建立新分區和副本。

NewPartition -> OnlinePartition

       1.分配第一個活着的副本做爲分區領導者,並且該分區所有副本做爲一個同步複製組,寫領導者和同步副本組數據到zk中。

       2.對於這個分區,發送LeaderAndIsr請求給每一個副本分區和併發送UpdateMetadata請求到每一個活者的broker server。

OnlinePartition,OfflinePartition -> OnlinePartition

       1.對於這個分區,需要選擇新的領導者和同步副本組,一個副本組要接受LeaderAndIsr請求,最後寫領導者和同步副本組信息到zk中。

               a.OfflinePartitionLeaderSelector:新領導者=存活副本(最好是在isr);新isr =存活isr假設不是空或剛好爲新領導者,不然;正在接受中副本=存活已分配副本。

              b.ReassignedPartitionLeaderSelector:新領導者=存活分區又一次分配副本;新isr =當前isr;正在接受中副本=又一次分配副本

              c.PreferredReplicaPartitionLeaderSelector:新領導這=第一次分配副本(假設在isr);新isr =當前isr;接受副本=分配副本

              d.ControlledShutdownLeaderSelector:新領導者=當前副本在isr中且沒有被關閉,新isr =當前isr -關閉副本;接受副本=存活已分配副本。

     2.對於這個分區,發送LeaderAndIsr請求給每一個接收副本和UpdateMetadata請求到每一個broker server

     NewPartition,OnlinePartition -> OfflinePartition

     1.這僅僅只是標識該分區爲下線狀態

     OfflinePartition -> NonExistentPartition

     1.這僅僅只是標識該分區爲不存在分區狀態

ReplicaStateChange:

有效狀態例如如下:

     1.NewReplica:當建立topic或分區又一次分配期間副本被建立。在這樣的狀態下,副本僅僅能成爲追隨者變動請求狀態。

     2.OnlineReplica:一旦此分區一個副本啓動且部分分配副本,他將處於在線副本狀態。在這樣的狀態下,它可以成爲領導者或成爲尾隨者狀態變動請求。

     3.OfflineReplica:每當broker server副本宕機或崩潰發生時,假設一個副本崩潰或掛了,它將變爲此狀態。

     4.NonExistentReplica:假設一個副本被刪除了,它將變爲此狀態。

有效狀態轉移例如如下:

     NonExistentReplica - - > NewReplica

     1.使用當前領導者和isr分區發送LeaderAndIsr請求到新副本和UpdateMetadata請求給每一個存活borker

     NewReplica - > OnlineReplica

     1.加入新的副本到副本列表中

      OnlineReplica,OfflineReplica - > OnlineReplica

     1.使用當前領導者和isr分區發送LeaderAndIsr請求到新副本和UpdateMetadata請求給每一個存活borker

      NewReplica,OnlineReplica - > OfflineReplica

     1.發送StopReplicaRequest到對應副本(w / o刪除)

     2.從isr和發送LeaderAndIsr請求重刪除此副本(isr)領導者副本和UpdateMetadata分區每個存活broker。

     OfflineReplica - > NonExistentReplica

    1.發送StopReplicaRequest到副本(刪除)

KafkaController操做:

當新建topic時:

  1. 調用方法onNewPartitionCreation

當建立新分區時:

  1. 建立新分區列表 -> 調用方法NewPartition
  2. 建立所有新分區副本 -> 調用方法NewReplica
  3. 新分區在線列表 -> 調用方法OnlinePartition
  4. 新分區所有在線副本 -> OnlineReplica

當broker失敗或掛掉時:

  1. 當前broker所有領導者分區爲下線分區 -> 調用方法OfflinePartition
  2. 下線和在線分區列表 -> OnlinePartition (使用下線分區領導者選舉)
  3. 在broker上所有fail副本 -> OfflineReplica

當broker啓動時:

  1. 發送UpdateMetadate請求給新啓動broker的所有分區。
  2. 新啓動broker的分區副本-> OnlineReplica
  3. 下線和在線分區列表 -> OnlinePartition (使用下線分區領導者選舉)
  4. 當新的broker啓動時,對於所有分區副本。系統會調用方法onPartitionReassignment運行未完畢的分區分配。

當分區又一次分配時: (OAR: 老的分配副本; RAR:每當又一次分配副本會有新的副本組)

  1. 用OAR + RAR副本組改動並分配副本列表.
  2. 當處於OAR + RAR時,發送LeaderAndIsr請求給每個副本。
  3. 副本處於RAR - OAR  -> 調用方法NewReplica
  4. 等待直到新的副本增長isr中
  5. 副本處於RAR  -> 調用方法OnlineReplica
  6. 設置AR to RAR並寫到內存中
  7. send LeaderAndIsr request 給一個潛在領導者 (假設當前領導者不在RAR中)和一個被分配的副本列表(使用RAR) 和一樣sir到每個處於RAR的broker中。
  8. replicas in OAR - RAR -> Offline (強制這些副本從isr重剔除)
  9. replicas in OAR - RAR -> NonExistentReplica (強制這些副本被刪除)
  10. 在zk上改動重分配副本到RAR中。
  11. 在zk上改動 /admin/reassign_partitions路徑,並刪除此分區
  12. 選舉領導者後,副本和isr信息變化,因此又一次發送更新元數據請求給每個broker。

好比, if OAR = {1, 2, 3} and RAR = {4,5,6}, 在zk上重分配副本和領導者/is這些值可能經歷下面轉化。

AR                  leader/isr

{1,2,3}            1/{1,2,3}           (初始化狀態)

{1,2,3,4,5,6}   1/{1,2,3}           (step 2)

{1,2,3,4,5,6}   1/{1,2,3,4,5,6}  (step 4)

{1,2,3,4,5,6}   4/{1,2,3,4,5,6}  (step 7)

{1,2,3,4,5,6}   4/{4,5,6}           (step 8)

{4,5,6}            4/{4,5,6}           (step 10)

注意,當僅僅有一個地方咱們能存儲OAR持久化數據。必須用RAR在zk改動AR節點數據,這樣,假設控制器在這一步以前崩潰,咱們仍然可以恢復。

其中央控制器failover時:

  1. replicaStateMachine.startup():
    1. 從不論什麼下線副本或上線副本中初始化每個副本
    2. 每個副本 -> OnlineReplica (強制LeaderAndIsr請求發送到每個副本)
  2. partitionStateMachine.startup():
    1. 重新建分區中初始化每個分區, 下線或上線分區
    2. each OfflinePartition and NewPartition -> OnlinePartition (強制領導者選舉)
  3. 恢復分區分配
  4. 恢復領導者選舉

當發送首選副本選舉時:

  1. 影響分區列表 -> 調用方法OnlinePartition (with PreferredReplicaPartitionLeaderSelector)

關閉broker:

  1. 在關閉broker中對於每個分區假設是領導者分區 -> 調用方法OnlinePartition (ControlledShutdownPartitionLeaderSelector)
  2. 在關閉broker中每個副本是追隨者,將發送StopReplica請求 (w/o deletion)
  3. 在關閉broker中每個副本是追隨者 -> 調用方法OfflineReplica (強制從同步副本組中刪除副本)

14)Kafka性能測試報告(虛擬機版)

測試方法

在其它虛擬機上使用 Kafka 自帶 kafka-producer-perf-test.sh 腳本進行測試 Kafka 寫入性能

嘗試使用 kafka-simple-consumer-perf-test.sh 腳本測試 Kafka Consumer 性能,但由於獲取到的數據不靠譜。放棄這個測試方法

性能數據

注:Gzip 和 Snappy 的傳輸速度 MB/S 是經過壓縮前數據計算的,壓縮後的實際傳輸量並無超過百兆網卡上限

單條消息大小

batch size/條

線程數

壓縮方式

傳輸速度 MB/S

傳輸速度 Message/S

0~1000 (avg 500)

200

10

不壓縮

11.1513 (約爲百兆網卡上線)

23369.8916

0~1000 (avg 500)

200

10

Gzip

14.0450

29425.1878

0~1000 (avg 500)

200

10

Snappy

32.2064

67471.7850

0~100(avg 50)

200

10

不壓縮

5.3654

111399.5121

0~100(avg 50)

200

10

Gzip

2.6479

54979.4926

0~100(avg 50)

200

10

Snappy

4.4217

91836.6410

0~1800 (avg 900) 仿線上數據量大小

200

10

不壓縮

11.0518 (約爲百兆網卡上線)

12867.3632

0~1800 (avg 900) 仿線上數據量大小

200

10

Gzip

17.3944

20261.3717

0~1800 (avg 900) 仿線上數據量大小

200

10

Snappy

31.0658

36174.2150

下面數據爲次日測試數據

 

 

 

 

 

0~100(avg 50)

200

10

不壓縮

1.8482

38387.7159

0~100(avg 50)

200

10

Gzip

1.3591

28219.0930

0~100(avg 50)

200

10

Snappy

2.0213

41979.7658

0~100(avg 50)

200

50

不壓縮

2.0900

43402.7778

0~100(avg 50)

200

50

Gzip

1.4639

30387.7477

0~100(avg 50)

200

50

Snappy

2.0871

43323.8021

0~1000 (avg 500)

200

10

不壓縮

9.8287

20594.3530

0~1000 (avg 500)

200

10

Gzip

13.0659

27386.0058

0~1000 (avg 500)

200

10

Snappy

20.1827

42265.4269

0~1000 (avg 500)

200

1

不壓縮

7.0980

14885.6041

0~1000 (avg 500)

200

1

Gzip

7.4438

15587.7356

0~1000 (avg 500)

200

1

Snappy

15.3256

32088.3070

測試結論

一、線上的實際message平均大小略小於1k。在這樣的狀況下(相應 0~1800 的test case)。虛擬機可以應對每秒上萬條寫入請求。測試環境下,網絡帶寬是其瓶頸。經過壓縮可以繞過瓶頸。Snappy算法可以處理36000+條請求每秒

二、在使用小數據進行測試時。Kafka每秒可以處理10萬條左右數據。網絡和IO都不是瓶頸,說明Kafka在虛擬機上處理寫入請求的上限約爲10萬條每秒。

三、次日的測試在一樣條件下與第一天差距很是大(0~100 大小數據,10線程,batch size 200),次日在不壓縮狀況下僅僅有第一天的三分之中的一個的處理能力,snappy壓縮狀況下也僅僅有二分之中的一個處理能力,說明虛擬機的性能不夠穩定。

四、生產者線程數對照。說明在網絡和IO及Kafka處理能力沒有達到瓶頸時,不少其它的線程能夠添加寫入速度,但是增加不明顯。

測試推論

一、虛擬機上的Kafka最高也可以處理10萬條請求,物理機的處理能力強得多,應當超過10萬條每秒的處理能力。相應線上平均數據大小接近1K,處理數據流量能力不會低於100MB/S,接近千兆網卡上限。

說明物理機上。在遇到網絡帶寬瓶頸前。Kafka性能應當不會是瓶頸。

二、虛擬機測試是在單topic 單replication 的狀況下測試的。

沒法肯定在多個replication時性能降低狀況。從網上查找看,性能降低不是很是明顯。

三、從測試看。虛擬機的性能能夠承擔線上請求。

但虛擬機性能不穩定,需要很慎重。


15)apache kafka監控系列-kafka-web-console

Kafka Web Console是kafka的開源web監控程序.

功能介紹例如如下:

  • brokers列表
  • 鏈接kafka的zk集羣列表
  • 所有topic列表,操做對應topic可以瀏覽查看對應message生產和消費流量圖.

1.下載Kafka Web Console

2.安裝sbt

a. centos  : yum install sbt
b. ubuntu : apt-get install sbt   

3.配置Kafka Web Console

a.添加數據庫依賴包(mysql)。解壓kafka-web-console.tar.gz,進入文件夾cd kafka-web-console
編輯文件vim build.sbt 
添加mysql配置:
......
libraryDependencies ++= Seq(
  jdbc,
  cache,
  "org.squeryl" % "squeryl_2.10" % "0.9.5-6",
  "com.twitter" % "util-zk_2.10" % "6.11.0",
  "com.twitter" % "finagle-core_2.10" % "6.15.0",
  "org.apache.kafka" % "kafka_2.10" % "0.8.1",
  "org.quartz-scheduler" % "quartz" % "2.2.1",
  "mysql" % "mysql-connector-java" % "5.1.9"
    exclude("javax.jms", "jms")
    exclude("com.sun.jdmk", "jmxtools")
    exclude("com.sun.jmx", "jmxri")
)
.......

4.配置mysql的jdbc驅動

vim application.conf
添加代碼例如如下:
.......  
db.default.driver=com.mysql.jdbc.Driver  
db.default.url="jdbc:mysql://192.168.2.105:3306/mafka?

useUnicode=true&characterEncoding=UTF8&connectTimeout=5000&socketTimeout=10000" db.default.user=xxx db.default.password=xxx .......


5.運行sql語句(例如如下綠色選框所看到的)


6.編譯

lizhitao@localhost:~$ sbt package
打包編譯時會從官網上下載很是多jar,由於網絡緣由,因此很是慢。需要耐心等待。
注意:下載的jar是隱藏的,在cd  ~/.ivy2 文件夾(對應子文件夾)下可以看到所有jar.
ivy2所有jar包百度雲下載

7.執行

lizhitao@localhost:~$ sbt run

8.瀏覽訪問

訪問地址: http://ip:9000/

16)apache kafka遷移與擴容工具使用方法

參考官網site:https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool

說明:

當咱們對kafka集羣擴容時。需要知足2點要求:

  1. 將指定topic遷移到集羣內新增的node上。
  2. 將topic的指定partition遷移到新增的node上。

1. 遷移topic到新增的node上

假如現在一個kafka集羣執行三個broker,broker.id依次爲101,102,103,後來由於業務數據忽然暴增。需要新增三個broker,broker.id依次爲104,105,106.目的是要把push-token-topic遷移到新增node上。

腳本(json格式)例如如下所看到的:

lizhitao@localhost:$  ./bin/kafka-reassign-partitions.sh --zookeeper 192.168.2.225:2183/config/mobile/mq/mafka
--topics-to-move-json-file  migration-push-token-topic.json  --broker-list  "104,105,106"  --generate

 腳本migration-push-token-topic.json文件內容例如如下:

{
"topics":
[
{
"topic": "push-token-topic"
}
],
"version":1
}

生成分配partitions的json腳本:

Current partition replica assignment
{"version":1,"partitions":[{"topic":"cluster-switch-topic","partition":10,"replicas":[8]},{"topic":"cluster-switch-topic","partition":5,"replicas":[4]},{"topic":"cluster-switch-topic","partition":3,"replicas":[5]},{"topic":"cluster-switch-topic","partition":4,"replicas":[5]},{"topic":"cluster-switch-topic","partition":9,"replicas":[5]},{"topic":"cluster-switch-topic","partition":1,"replicas":[5]},{"topic":"cluster-switch-topic","partition":11,"replicas":[4]},{"topic":"cluster-switch-topic","partition":7,"replicas":[5]},{"topic":"cluster-switch-topic","partition":2,"replicas":[4]},{"topic":"cluster-switch-topic","partition":0,"replicas":[4]},{"topic":"cluster-switch-topic","partition":6,"replicas":[4]},{"topic":"cluster-switch-topic","partition":8,"replicas":[4]}]}

又一次分配parttions的json腳本例如如下:

migration-topic-cluster-switch-topic.json 
 {"version":1,"partitions":[{"topic":"cluster-switch-topic","partition":10,"replicas":[5]},{"topic":"cluster-switch-topic","partition":5,"replicas":[4]},{"topic":"cluster-switch-topic","partition":4,"replicas":[5]},{"topic":"cluster-switch-topic","partition":3,"replicas":[4]},{"topic":"cluster-switch-topic","partition":9,"replicas":[4]},{"topic":"cluster-switch-topic","partition":1,"replicas":[4]},{"topic":"cluster-switch-topic","partition":11,"replicas":[4]},{"topic":"cluster-switch-topic","partition":7,"replicas":[4]},{"topic":"cluster-switch-topic","partition":2,"replicas":[5]},{"topic":"cluster-switch-topic","partition":0,"replicas":[5]},{"topic":"cluster-switch-topic","partition":6,"replicas":[5]},{"topic":"cluster-switch-topic","partition":8,"replicas":[5]}]}

lizhitao@localhost:$   bin/kafka-reassign-partitions.sh --zookeeper 192.168.2.225:2183/config/mobile/mq/mafka01 --reassignment-json-file migration-topic-cluster-switch-topic.json --execute

2.topic改動(replicats-factor)副本個數

lizhitao@localhost:$ ./bin/kafka-reassign-partitions.sh --zookeeper   192.168.2.225:2183/config/mobile/mq/mafka
--reassignment-json-file  replicas-update-push-token-topic.json  --execute

假如初始時push-token-topic爲一個副本。爲了提升可用性。需要改成2副本模式。

腳本replicas-push-token-topic.json文件內容例如如下:

{
        "partitions":
                [
                {
                        "topic": "log.mobile_nginx",
                        "partition": 0,
                        "replicas": [101,102,104]
                },
                {
                        "topic": "log.mobile_nginx",
                        "partition": 1,
                        "replicas": [102,103,106]
                },
{
"topic": "xxxx",
"partition": 數字,
"replicas": [數組]
}                
],             
        "version":1
}

3.topic的分區擴容使用方法

a.先擴容分區數量。腳本例如如下:

好比:push-token-topic初始分區數量爲12,眼下到添加到15個

lizhitao@localhost:$ ./bin/kafka-topics.sh --zookeeper 192.168.2.225:2183/config/mobile/mq/mafka  --alter   --partitions 15   --topic   push-token-topic

b.設置topic分區副本

lizhitao@localhost:$ ./bin/kafka-reassign-partitions.sh --zookeeper  192.168.2.225:2183/config/mobile/mq/mafka
--reassignment-json-file partitions-extension-push-token-topic.json  --execute

腳本partitions-extension-push-token-topic.json文件內容例如如下:

{  
        "partitions":  
                [  
                {  
                        "topic": "push-token-topic",  
                        "partition": 12,  
                        "replicas": [101,102]  
                },  
                {  
                        "topic": "push-token-topic",  
                        "partition": 13,  
                        "replicas": [103,104]  
                },  
                {  
                        "topic": "push-token-topic",  
                        "partition": 14,  
                        "replicas": [105,106]  
                }  
                ],               
        "version":1  
} 

17)kafka LeaderNotAvailableException

經常producer和consumer會包例如如下異常

LeaderNotAvailableException

緣由:

1.當中該分區所在的broker掛了。假設是多副本,該分區所在broker剛好爲leader


18)apache kafka jmx監控指標參數

Kafka使用Yammer Metrics來監控server和client指標數據。

JMX監控指標參數列表例如如下:

參數 Mbean名稱 說明
Message in rate "kafka.server":name="AllTopicsMessagesInPerSec",type="BrokerTopicMetrics" 所有topic消息(進出)流量
Byte in rate "kafka.server":name="AllTopicsBytesInPerSec",type="BrokerTopicMetrics"
Request rate "kafka.network":name="{Produce|Fetch-consumer|Fetch-follower}-RequestsPerSec",type="RequestMetrics"
Byte out rate "kafka.server":name="AllTopicsBytesOutPerSec",type="BrokerTopicMetrics"
Log flush rate and time "kafka.log":name="LogFlushRateAndTimeMs",type="LogFlushStats"
# of under replicated partitions (|ISR| < |all replicas|) "kafka.server":name="UnderReplicatedPartitions",type="ReplicaManager" 0
Is controller active on broker "kafka.controller":name="ActiveControllerCount",type="KafkaController" only one broker in the cluster should have 1
Leader election rate "kafka.controller":name="LeaderElectionRateAndTimeMs",type="ControllerStats" non-zero when there are broker failures
Unclean leader election rate "kafka.controller":name="UncleanLeaderElectionsPerSec",type="ControllerStats" 0
Partition counts "kafka.server":name="PartitionCount",type="ReplicaManager" mostly even across brokers
Leader replica counts "kafka.server":name="LeaderCount",type="ReplicaManager" mostly even across brokers
ISR shrink rate "kafka.server":name="ISRShrinksPerSec",type="ReplicaManager" If a broker goes down, ISR for some of the partitions will shrink. When that broker is up again, ISR will be expanded once the replicas are fully caught up. Other than that, the expected value for both ISR shrink rate and expansion rate is 0.
ISR expansion rate "kafka.server":name="ISRExpandsPerSec",type="ReplicaManager" See above
Max lag in messages btw follower and leader replicas "kafka.server":name="([-.\w]+)-MaxLag",type="ReplicaFetcherManager" 副本消息滯後數量
Lag in messages per follower replica "kafka.server":name="([-.\w]+)-ConsumerLag",type="FetcherLagMetrics" 副本消息滯後數量
Requests waiting in the producer purgatory "kafka.server":name="PurgatorySize",type="ProducerRequestPurgatory"
Requests waiting in the fetch purgatory "kafka.server":name="PurgatorySize",type="FetchRequestPurgatory"
Request total time "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-TotalTimeMs",type="RequestMetrics"
Time the request waiting in the request queue "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-QueueTimeMs",type="RequestMetrics"
Time the request being processed at the leader "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-LocalTimeMs",type="RequestMetrics"
Time the request waits for the follower "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-RemoteTimeMs",type="RequestMetrics"
Time to send the response "kafka.network":name="{Produce|Fetch-Consumer|Fetch-Follower}-ResponseSendTimeMs",type="RequestMetrics"
Number of messages the consumer lags behind the producer by "kafka.consumer":name="([-.\w]+)-MaxLag",type="ConsumerFetcherManager"

19)apache kafka性能測試命令使用和構建kafka-perf

原本想用kafka官方提供的工具作性能測試的。

但事與願違。當我運行官方提供的kafka測試腳本,卻報錯沒有找到ProducerPerformance,後來瀏覽一些代碼文件。才發現沒有把perf性能測試程序打包到kafka_2.x.0-0.8.x.x.jar發行版本號中。

現在來教您怎樣打包作測試。

1.準備工做:

安裝gradle

2.下載kafka源碼

kafka-0.8.1源碼

3.編譯kafka-perf_2.x-0.8.1.x.jar

編譯註意事項:默認狀況下是編譯爲2.8.0版本號,也可以指定版本號編譯。眼下編譯高版本號的kafka-perf(2.8.0以上版本號)是由問題的。因爲build.gradle配置參數有問題(版本號不一樣,會報例如如下錯誤,版本號不兼容錯誤),假設要構建高版本號kafka-perf多版本號改動內容例如如下:

下載build.gradle   替換掉kafka-0.8.1.1-src根文件夾下文件就能夠

編譯構建運行命令:

gradle jar   			默認生成2.8.0版本號的kafka和kafka-perf的jar
gradle jar_core_2_8_0		生成2.8.0版本號的kafka的jar
gradle jar_core_2_8_2		生成2.8.2版本號的kafka的jar
gradle jar_core_2_9_1		生成2.9.1版本號的kafka的jar
gradle jar_core_2_9_2		生成2.9.2版本號的kafka的jar
gradle jar_core_2_10_1		生成2.10.1版本號的kafka的jar
gradle perf:jar			生成2.8.0版本號的kafka和kafka-perf的jar
gradle perf_2_9_1		生成2.9.1版本號的kafka和kafka-perf的jar
gradle perf_2_10_1		生成2.10.1版本號的kafka和kafka-perf的jar
gradle -PscalaVersion=2.8.0 jar			編譯scala 2.8.0版本號編譯所有jar
gradle -PscalaVersion=2.8.2 jar			編譯scala 2.8.2版本號編譯所有jar
gradle -PscalaVersion=2.9.1 jar			編譯scala 2.9.1版本號編譯所有jar
gradle -PscalaVersion=2.10.1 jar		編譯scala 2.10.1版本號編譯所有jar

假設不想編譯jar,可以直接下載:kafka-perf_2.x.x-0.8.1.jar 

lizhitao@users-MacBook-Pro:~/mt_wp/tmp$ cd kafka-0.8.1.1-src
lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$gradle jar
lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$gradle perf:jar
The TaskContainer.add() method has been deprecated and is scheduled to be removed in Gradle 2.0. Please use the create() method instead.
Building project 'core' with Scala version 2.8.0
Building project 'perf' with Scala version 2.8.0
:core:compileJava UP-TO-DATE
:core:compileScala
/Users/lizhitao/mt_wp/tmp/kafka-0.8.1.1-src/core/src/main/scala/kafka/admin/AdminUtils.scala:243: non variable type-argument String in type pattern scala.collection.Map[String,_] is unchecked since it is eliminated by erasure
        case Some(map: Map[String, _]) =>
                       ^
/Users/lizhitao/mt_wp/tmp/kafka-0.8.1.1-src/core/src/main/scala/kafka/admin/AdminUtils.scala:246: non variable type-argument String in type pattern scala.collection.Map[String,String] is unchecked since it is eliminated by erasure
            case Some(config: Map[String, String]) =>
                              ^
/Users/lizhitao/mt_wp/tmp/kafka-0.8.1.1-src/core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala:66: non variable type-argument String in type pattern (String, Int) is unchecked since it is eliminated by erasure
    for ((key:(String, Int), value) <- responseMap) {
              ^
/Users/lizhitao/mt_wp/tmp/kafka-0.8.1.1-src/core/src/main/scala/kafka/utils/Utils.scala:363: non variable type-argument V in type pattern List[V] is unchecked since it is eliminated by erasure
        case Some(l: List[V]) => m.put(k, v :: l)
                     ^
four warnings found
:core:processResources UP-TO-DATE
:core:classes
:core:copyDependantLibs UP-TO-DATE
:core:jar UP-TO-DATE
:perf:compileJava UP-TO-DATE
:perf:compileScala
:perf:processResources UP-TO-DATE
:perf:classes
:perf:jar UP-TO-DATE

BUILD SUCCESSFUL
Total time: 54.41 secs

編譯jar包文件夾例如如下:

a. kafka_2.x-0.8.1.1.jar

kafka-0.8.1.1-src/core/build


b.kafka-perf_2.x-0.8.1.x.jar

kafka-0.8.1.1-src/perf/build/libs


kafka多版本號jar:



4. kafka性能測試命令使用方法:

4.1 建立topic

bin/kafka-topics.sh --zookeeper 192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka02 --create --topic test-rep-one --partitions 6 --replication-factor 1


4.2 kafka-producer-perf-test.sh中參數說明:

messages		生產者發送總的消息數量
message-size		每條消息大小
batch-size		每次批量發送消息的數量
topics			生產者發送的topic
threads			生產者使用幾個線程同一時候發送
broker-list		安裝kafka服務的機器ip:port列表
producer-num-retries	一個消息失敗發送重試次數
request-timeout-ms	一個消息請求發送超時時間

4.3 bin/kafka-consumer-perf-test.sh中參數說明:

zookeeperzk		配置
messages		消費者消費消息總數量
topic			消費者需要消費的topic
threads			消費者使用幾個線程同一時候消費
group			消費者組名稱
socket-buffer-sizesocket	緩衝大小
fetch-size		每次向kafka broker請求消費大小
consumer.timeout.ms	消費者去kafka broker拿去一條消息超時時間

4.4 生產者發送數據:

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ bin/kafka-producer-perf-test.sh --messages 5000000 --message-size 5000  --batch-size 5000 --topics test-rep-one --threads 8 --broker-list mobile-esb03:9092,mobile-esb04:9092,mobile-esb05:9092
start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec
[2014-07-06 12:52:36,139] WARN Property reconnect.interval is not valid (kafka.utils.VerifiableProperties)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[2014-07-06 12:52:36,199] WARN Property reconnect.interval is not valid (kafka.utils.VerifiableProperties)
[2014-07-06 12:52:36,202] WARN Property reconnect.interval is not valid (kafka.utils.VerifiableProperties)
[2014-07-06 12:52:36,204] WARN Property reconnect.interval is not valid (kafka.utils.VerifiableProperties)
[2014-07-06 12:52:36,206] WARN Property reconnect.interval is not valid (kafka.utils.VerifiableProperties)
[2014-07-06 12:52:36,207] WARN Property reconnect.interval is not valid (kafka.utils.VerifiableProperties)
[2014-07-06 12:52:36,209] WARN Property reconnect.interval is not valid (kafka.utils.VerifiableProperties)
[2014-07-06 12:52:36,214] WARN Property reconnect.interval is not valid (kafka.utils.VerifiableProperties)

4.5 消費者消費數據

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ bin/kafka-consumer-perf-test.sh --zookeeper
192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka02 --messages 50000000 --topic test-rep-one --threads 1
start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

20)apache kafka源代碼構建打包

準備工做:

安裝gradle

1.構建kafka的jar並執行

打包kafka-0.8.1.1下所有jar,包含core,perf,clients等。

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ gradle jar

2.構建源碼jar

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ gradle srcJar

3.執行序列化測試

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ gradle -Dtest.single=RequestResponseSerializationTest core:test

4.gradle任務列表

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ gradle tasks

5.構建所有jar,包含tasks中各個版本號jar

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ gradle jarAll

6.指定構建jar包版本號

lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ gradle  -PscalaVersion=2.10.1 jar
7.公佈文件到maven倉庫
lizhitao@users-MacBook-Pro:~/mt_wp/tmp/kafka-0.8.1.1-src$ gradle uploadArchivesAll
編輯文件~/.gradle/gradle.properties,添加例如如下內容:
mavenUrl=
mavenUsername=
mavenPassword=
signing.keyId=
signing.password=
signing.secretKeyRingFile=

21)Apache kafkaclient開發-java

1.依賴包

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1</version>
        </dependency>

2.producer程序開發樣例

2.1 producer參數說明

#指定kafka節點列表。用於獲取metadata,沒必要全部指定
metadata.broker.list=192.168.2.105:9092,192.168.2.106:9092
# 指定分區處理類。默認kafka.producer.DefaultPartitioner,表經過key哈希到相應分區
#partitioner.class=com.meituan.mafka.client.producer.CustomizePartitioner
 
# 是否壓縮。默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮後消息中會有頭來指明消息壓縮類型,故在消費者端消息解壓是透明的無需指定。
compression.codec=none
  
# 指定序列化處理類(mafka client API調用說明-->3.序列化約定wiki),默以爲kafka.serializer.DefaultEncoder,即byte[]
serializer.class=com.meituan.mafka.client.codec.MafkaMessageEncoder
# serializer.class=kafka.serializer.DefaultEncoder
# serializer.class=kafka.serializer.StringEncoder
# 假設要壓縮消息,這裏指定哪些topic要壓縮消息。默認empty,表示不壓縮。
#compressed.topics=
 
########### request ack ###############
# producer接收消息ack的時機.默以爲0. 
# 0: producer不會等待broker發送ack 
# 1: 當leader接收到消息以後發送ack 
# 2: 當全部的follower都同步消息成功後發送ack. 
request.required.acks=0 
# 在向producer發送ack以前,broker贊成等待的最大時間 
# 假設超時,broker將會向producer發送一個error ACK.意味着上一次消息因爲某種 
# 緣由未能成功(比方follower未能同步成功) 
request.timeout.ms=10000
########## end #####################
 
 
# 同步仍是異步發送消息,默認「sync」表同步,"async"表異步。異步可以提升發送吞吐量,
# 也意味着消息將會在本地buffer中,並適時批量發送。但是也可能致使丟失未發送過去的消息
producer.type=sync
############## 異步發送 (下面四個異步參數可選) ####################
# 在async模式下,當message被緩存的時間超過此值後,將會批量發送給broker,默以爲5000ms
# 此值和batch.num.messages協同工做.
queue.buffering.max.ms = 5000
# 在async模式下,producer端贊成buffer的最大消息量
# 無論怎樣,producer都沒法儘快的將消息發送給broker,從而致使消息在producer端大量沉積
# 此時,假設消息的條數達到閥值,將會致使producer端堵塞或者消息被拋棄。默以爲10000
queue.buffering.max.messages=20000
# 假設是異步。指定每次批量發送數據量,默以爲200
batch.num.messages=500
# 當消息在producer端沉積的條數達到"queue.buffering.max.meesages"後 
# 堵塞必定時間後,隊列仍然沒有enqueue(producer仍然沒有發送出不論什麼消息) 
# 此時producer可以繼續堵塞或者將消息拋棄,此timeout值用於控制"堵塞"的時間 
# -1: 無堵塞超時限制,消息不會被拋棄 
# 0:立刻清空隊列,消息被拋棄 
queue.enqueue.timeout.ms=-1
################ end ###############
 
# 當producer接收到error ACK,或者沒有接收到ACK時,贊成消息重發的次數 
# 因爲broker並無完整的機制來避免消息反覆,因此當網絡異常時(比方ACK丟失) 
# 有可能致使broker接收到反覆的消息,默認值爲3.
message.send.max.retries=3
 
 
# producer刷新topic metada的時間間隔,producer需要知道partition leader的位置,以及當前topic的狀況 
# 所以producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,將會立刻刷新 
# (比方topic失效,partition丟失,leader失效等),此外也可以經過此參數來配置額外的刷新機制,默認值600000 
topic.metadata.refresh.interval.ms=60000
import java.util.*;  
   
import kafka.javaapi.producer.Producer;  
import kafka.producer.KeyedMessage;  
import kafka.producer.ProducerConfig;  
   
public class TestProducer {  
    public static void main(String[] args) {  
        long events = Long.parseLong(args[0]);  
        Random rnd = new Random();  
   
        Properties props = new Properties();  
        props.put("metadata.broker.list", "192.168.2.105:9092");  
        props.put("serializer.class", "kafka.serializer.StringEncoder"); //默認字符串編碼消息  
        props.put("partitioner.class", "example.producer.SimplePartitioner");  
        props.put("request.required.acks", "1");  
   
        ProducerConfig config = new ProducerConfig(props);  
   
        Producer<String, String> producer = new Producer<String, String>(config);  
   
        for (long nEvents = 0; nEvents < events; nEvents++) {   
               long runtime = new Date().getTime();    
               String ip = 「192.168.2.」 + rnd.nextInt(255);   
               String msg = runtime + 「,www.example.com,」 + ip;   
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);  
               producer.send(data);  
        }  
        producer.close();  
    }  
}

2.1 指定keywordkey。發送消息到指定partitions

說明:假設需要實現本身定義partitions消息發送,需要實現Partitioner接口
[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. public class CustomizePartitioner implements Partitioner {  
  2.     public CustomizePartitioner(VerifiableProperties props) {  
  3.    
  4.     }  
  5.     /** 
  6.      * 返回分區索引編號 
  7.      * @param key sendMessage時。輸出的partKey 
  8.      * @param numPartitions topic中的分區總數 
  9.      * @return 
  10.      */  
  11.     @Override  
  12.     public int partition(Object key, int numPartitions) {  
  13.         System.out.println("key:" + key + "  numPartitions:" + numPartitions);  
  14.         String partKey = (String)key;  
  15.         if ("part2".equals(partKey))  
  16.             return 2;  
  17. //        System.out.println("partKey:" + key);  
  18.    
  19.         ........  
  20.         ........  
  21.         return 0;  
  22.     }  
  23. }  

3.consumer程序開發樣例

3.1 consumer參數說明

# zookeeper鏈接服務器地址,此處爲線下測試環境配置(kafka消息服務-->kafka broker集羣線上部署環境wiki) # 配置樣例:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" zookeeper.connect=192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka # zookeeper的session過時時間,默認5000ms,用於檢測消費者是否掛掉,當消費者掛掉。其它消費者要等該指定時間才幹檢查到並且觸發又一次負載均衡 zookeeper.session.timeout.ms=5000 zookeeper.connection.timeout.ms=10000 # 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次得到的消息。

一旦在更新zookeeper發生異常並從新啓動。將可能拿到已拿到過的消息 zookeeper.sync.time.ms=2000   #指定消費組 group.id=xxx # 當consumer消費必定量的消息以後,將會本身主動向zookeeper提交offset信息  # 注意offset信息並不是每消費一次消息就向zk提交一次,而是現在本地保存(內存),並按期提交,默以爲true auto.commit.enable=true # 本身主動更新時間。默認60 * 1000 auto.commit.interval.ms=1000   # 當前consumer的標識,可以設定,也可以有系統生成,主要用來跟蹤消息消費狀況,便於觀察 conusmer.id=xxx    # 消費者客戶端編號。用於區分不一樣客戶端,默認客戶端程序本身主動產生 client.id=xxxx # 最大取多少塊緩存到消費者(默認10) queued.max.message.chunks=50 # 當有新的consumer增長到group時,將會reblance,此後將會有partitions的消費端遷移到新  # 的consumer上,假設一個consumer得到了某個partition的消費權限,那麼它將會向zk註冊  # "Partition Owner registry"節點信息,但是有可能此時舊的consumer尚沒有釋放此節點,  # 此值用於控制,註冊節點的重試次數.  rebalance.max.retries=5 # 獲取消息的最大尺寸,broker不會像consumer輸出大於此值的消息chunk # 每次feth將獲得多條消息,此值爲總大小,提高此值,將會消耗不少其它的consumer端內存 fetch.min.bytes=6553600 # 當消息的尺寸不足時,server堵塞的時間,假設超時,消息將立刻發送給consumer fetch.wait.max.ms=5000 socket.receive.buffer.bytes=655360   # 假設zookeeper沒有offset值或offset值超出範圍。

那麼就給個初始的offset。

有smallest、largest、 # anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。默認largest auto.offset.reset=smallest # 指定序列化處理類(mafka client API調用說明-->3.序列化約定wiki),默以爲kafka.serializer.DefaultDecoder,即byte[] derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder

3.2 多線程並行消費topic

ConsumerTest類
[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. import kafka.consumer.ConsumerIterator;  
  2. import kafka.consumer.KafkaStream;  
  3.    
  4. public class ConsumerTest implements Runnable {  
  5.     private KafkaStream m_stream;  
  6.     private int m_threadNumber;  
  7.    
  8.     public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {  
  9.         m_threadNumber = a_threadNumber;  
  10.         m_stream = a_stream;  
  11.     }  
  12.    
  13.     public void run() {  
  14.         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();  
  15.         while (it.hasNext())  
  16.             System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));  
  17.         System.out.println("Shutting down Thread: " + m_threadNumber);  
  18.     }  
  19. }  
ConsumerGroupExample類
[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. import kafka.consumer.ConsumerConfig;  
  2. import kafka.consumer.KafkaStream;  
  3. import kafka.javaapi.consumer.ConsumerConnector;  
  4.    
  5. import java.util.HashMap;  
  6. import java.util.List;  
  7. import java.util.Map;  
  8. import java.util.Properties;  
  9. import java.util.concurrent.ExecutorService;  
  10. import java.util.concurrent.Executors;  
  11.    
  12. public class ConsumerGroupExample {  
  13.     private final ConsumerConnector consumer;  
  14.     private final String topic;  
  15.     private  ExecutorService executor;  
  16.    
  17.     public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {  
  18.         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(  
  19.                 createConsumerConfig(a_zookeeper, a_groupId));  
  20.         this.topic = a_topic;  
  21.     }  
  22.    
  23.     public void shutdown() {  
  24.         if (consumer != null) consumer.shutdown();  
  25.         if (executor != null) executor.shutdown();  
  26.     }  
  27.    
  28.     public void run(int a_numThreads) {  
  29.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
  30.         topicCountMap.put(topic, new Integer(a_numThreads));  
  31.         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);  
  32.         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);  
  33.    
  34.         // 啓動所有線程  
  35.         executor = Executors.newFixedThreadPool(a_numThreads);  
  36.    
  37.         // 開始消費消息  
  38.         int threadNumber = 0;  
  39.         for (final KafkaStream stream : streams) {  
  40.             executor.submit(new ConsumerTest(stream, threadNumber));  
  41.             threadNumber++;  
  42.         }  
  43.     }  
  44.    
  45.     private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {  
  46.         Properties props = new Properties();  
  47.         props.put("zookeeper.connect""192.168.2.225:2183/config/mobile/mq/mafka");  
  48.         props.put("group.id""push-token");  
  49.         props.put("zookeeper.session.timeout.ms""60000");  
  50.         props.put("zookeeper.sync.time.ms""2000");  
  51.         props.put("auto.commit.interval.ms""1000");  
  52.    
  53.         return new ConsumerConfig(props);  
  54.     }  
  55.    
  56.     public static void main(String[] args) {  
  57.         String zooKeeper = args[0];  
  58.         String groupId = args[1];  
  59.         String topic = args[2];  
  60.         int threads = Integer.parseInt(args[3]);  
  61.    
  62.         ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);  
  63.         example.run(threads);  
  64.    
  65.         try {  
  66.             Thread.sleep(10000);  
  67.         } catch (InterruptedException ie) {  
  68.    
  69.         }  
  70.         example.shutdown();  
  71.     }  
  72. }  
總結:
kafka消費者api分爲high api和low api,眼下上述demo是都是使用kafka high api。高級api不用關心維護消費狀態信息和負載均衡,系統會依據配置參數,
按期flush offset到zk上,假設有多個consumer且每個consumer建立了多個線程,高級api會依據zk上註冊consumer信息,進行本身主動負載均衡操做。

注意事項:
1.高級api將會內部實現持久化每個分區最後讀到的消息的offset,數據保存在zookeeper中的消費組名中(如/consumers/push-token-group/offsets/push-token/2。
當中push-token-group是消費組,push-token是topic,最後一個2表示第3個分區),每間隔一個(默認1000ms)時間更新一次offset,
那麼可能在從新啓動消費者時拿到反覆的消息。此外,當分區leader發生變動時也可能拿到反覆的消息。

所以在關閉消費者時最好等待必定時間(10s)而後再shutdown()
2.消費組名是一個全局的信息,要注意在新的消費者啓動以前舊的消費者要關閉。

假設新的進程啓動並且消費組名一樣。kafka會加入這個進程到可用消費線程組中用來消費
topic和觸發又一次分配負載均衡,那麼同一個分區的消息就有可能發送到不一樣的進程中。
3.假設消費者組中所有consumer的總線程數量大於分區數,一部分線程或某些consumer可能沒法讀取消息或處於空暇狀態。
4.假設分區數多於線程數(假設消費組中執行者多個消費者,則線程數爲消費者組內所有消費者線程總和),一部分線程會讀取到多個分區的消息
5.假設一個線程消費多個分區消息,那麼接收到的消息是不能保證順序的。
備註:可用zookeeper web ui工具管理查看zk文件夾樹數據: xxx/consumers/push-token-group/owners/push-token/2當中
push-token-group爲消費組,push-token爲topic,2爲分區3.查看裏面的內容如:
push-token-group-mobile-platform03-1405157976163-7ab14bd1-0表示該分區被該標示的線程所運行。


producer性能優化:異步化,消息批量發送,詳細瀏覽上述參數說明。consumer性能優化:假設是高吞吐量數據,設置每次拿取消息(fetch.min.bytes)大些,
拿取消息頻繁(fetch.wait.max.ms)些(或時間間隔短些),假設是低延時要求,則設置時間時間間隔小。每次從kafka broker拿取消息儘可能小些。

22) kafka broker內部架構

如下介紹kafka broker的主要子模塊,幫助您更好地學習並理解kafka源碼和架構。

例如如下介紹幾個子模塊:

  • Kafka API layer
  • LogManager and Log
  • ReplicaManager
  • ZookeeperConsumerConnector
  • service Schedule
例如如下是系統幾個模塊怎樣組成到一塊兒架構圖:

23)apache kafka源代碼分析走讀-kafka整體結構分析

kafka源碼project文件夾結構例如如下圖:


如下僅僅對core文件夾結構做說明,其它都是測試類或javaclient代碼

admin   --管理員模塊,操做和管理topic,paritions相關,包括create,delete topic,擴展patitions

Api       --該模塊主要負責組裝數據,組裝2種類型數據。

1.讀取或解碼client發送的二進制數據.

2.編碼log消息數據。組裝爲需要發送的數據。

client    --該模塊比較簡單,就一個類,Producer讀取kafka broker元數據信息,

topic和partitions,以及leader

cluster   --該模塊包括幾個實體類,Broker,Cluster,Partition,Replica,解釋他們之間關係:Cluster由多個broker組成。一個Broker包括多個partition。一個 topic的所有partitions分佈在不一樣broker的中。一個Replica包括多個Partition。

common     --通用模塊,僅僅包括異常類和錯誤驗證

consumer    --consumer處理模塊。負責所有client消費者數據和邏輯處理

contoroller  --負責中央控制器選舉,partition的leader選舉,副本分配,副本又一次分配。

partition和replica擴容。

javaapi   --提供java的producer和consumer接口api

log          --kafka文件系統,負責處理和存儲所有kafka的topic數據。

message    --封裝kafka的ByteBufferMessageSet

metrics    --內部狀態的監控模塊

network        --網絡事件處理模塊,負責處理和接收client鏈接

producer        --producer實現模塊,包括同步和異步發送消息。

serializer        --序列化或反序列化當前消息

kafka         --kafka門面入口類,副本管理,topic配置管理,leader選舉實現(由contoroller模塊調用)。

tools           --一看這就是工具模塊,包括內容比較多:

a.導出相應consumer的offset值.

b.導出LogSegments信息。當前topic的log寫的位置信息.

c.導出zk上所有consumer的offset值.

d.改動註冊在zk的consumer的offset值.

f.producer和consumer的使用樣例.

utils   --Json工具類。Zkutils工具類,Utils建立線程工具類。KafkaScheduler公共調度器類,公共日誌類等等。

1.kafka啓動類:kafka.scala

kafka爲kafka broker的main啓動類。其主要做用爲載入配置,啓動report服務(內部狀態的監控),註冊釋放資源的鉤子。以及門面入口類。

kafka類代碼例如如下:

......
 try {
      val props = Utils.loadProps(args(0))          //載入配置文件
      val serverConfig = new KafkaConfig(props)
      KafkaMetricsReporter.startReporters(serverConfig.props)             //啓動report服務(內部狀態的監控)
      val kafkaServerStartble = new KafkaServerStartable(serverConfig)    //kafka server核心入口類
      // attach shutdown handler to catch control-c
      Runtime.getRuntime().addShutdownHook(new Thread() {                 //鉤子程序。當jvm退出前,銷燬所有資源
        override def run() = {
          kafkaServerStartble.shutdown
        }
      })

      kafkaServerStartble.startup
      kafkaServerStartble.awaitShutdown
    }
......

KafkaServerStartble類包裝了KafkaSever類,事實上啥都沒有作。僅僅是調用包裝類而已

KafkaSever類是kafka broker執行控制的核心入口類,它是採用門面模式設計的。



kafka中KafkaServer類,採用門面模式,是網絡處理。io處理等得入口.

ReplicaManager    副本管理

KafkaApis    api處理

KafkaRequestHandlerPoolkafka  請求處理池  <-- num.io.threads io線程數量

LogManager    kafka文件系統,負責處理和存儲所有kafka的topic數據

TopicConfigManager  topic管理

KafkaHealthcheck  健康檢查

KafkaController  kafka集羣中央控制器選舉,leader選舉。副本分配。

KafkaScheduler  負責副本管理和日誌管理調度等等

ZkClient         負責註冊zk相關信息.

BrokerTopicStats  topic信息統計和監控

ControllerStats          中央控制器統計和監控


KafkaServer部分主要代碼例如如下:

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. ......    
  2. def startup() {  
  3.     info("starting")  
  4.     isShuttingDown = new AtomicBoolean(false)  
  5.     shutdownLatch = new CountDownLatch(1)  
  6.   
  7.     /* start scheduler */  
  8.     kafkaScheduler.startup()  
  9.       
  10.     /* setup zookeeper */  
  11.     zkClient = initZk()  
  12.   
  13.     /* start log manager */  
  14.     logManager = createLogManager(zkClient)  
  15.     logManager.startup()  
  16.   
  17.     socketServer = new SocketServer(config.brokerId,  
  18.                                     config.hostName,  
  19.                                     config.port,  
  20.                                     config.numNetworkThreads,  
  21.                                     config.queuedMaxRequests,  
  22.                                     config.socketSendBufferBytes,  
  23.                                     config.socketReceiveBufferBytes,  
  24.                                     config.socketRequestMaxBytes)  
  25.     socketServer.startup()  
  26.   
  27.     replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)  
  28.     kafkaController = new KafkaController(config, zkClient)  
  29.       
  30.     /* start processing requests */  
  31.     apis = new KafkaApis(socketServer.requestChannel, replicaManager, zkClient, config.brokerId, config, kafkaController)  
  32.     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)  
  33.      
  34.     Mx4jLoader.maybeLoad()  
  35.   
  36.     replicaManager.startup()  
  37.   
  38.     kafkaController.startup()  
  39.       
  40.     topicConfigManager = new TopicConfigManager(zkClient, logManager)  
  41.     topicConfigManager.startup()  
  42.       
  43.     /* tell everyone we are alive */  
  44.     kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)  
  45.     kafkaHealthcheck.startup()  
  46.   
  47.       
  48.     registerStats()  
  49.     startupComplete.set(true);  
  50.     info("started")  
  51.   }  
  52.     
  53.   private def initZk(): ZkClient = {  
  54.     info("Connecting to zookeeper on " + config.zkConnect)  
  55.     val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)  
  56.     ZkUtils.setupCommonPaths(zkClient)  
  57.     zkClient  
  58.   }  
  59.   
  60.   /** 
  61.    *  Forces some dynamic jmx beans to be registered on server startup. 
  62.    */  
  63.   private def registerStats() {  
  64.     BrokerTopicStats.getBrokerAllTopicsStats()  
  65.     ControllerStats.uncleanLeaderElectionRate  
  66.     ControllerStats.leaderElectionTimer  
  67.   }  
  68. .......  

24)apache kafka源代碼分析走讀-Producer分析

producer的發送方式剖析

Kafka提供了Producer類做爲java producer的api。該類有sync和async兩種發送方式。

sync架構圖


async架構圖


調用流程例如如下:



代碼流程例如如下:

Producer:當new Producer(new ProducerConfig()),其底層實現。實際會產生兩個核心類的實例:Producer、DefaultEventHandler。

在建立的同一時候,會默認new一個ProducerPool,即咱們每new一個java的Producer類,就會有建立Producer、EventHandler和ProducerPool。ProducerPool爲鏈接不一樣kafka broker的池,初始鏈接個數有broker.list參數決定。

調用producer.send方法流程:

當應用程序調用producer.send方法時,其內部事實上調的是eventhandler.handle(message)方法,eventHandler會首先序列化該消息,

eventHandler.serialize(events)-->dispatchSerializedData()-->partitionAndCollate()-->send()-->SyncProducer.send()

調用邏輯解釋:當client應用程序調用producer發送消息messages時(既可以發送單條消息,也可以發送List多條消息),調用eventhandler.serialize首先序列化所有消息,序列化操做用戶可以本身定義實現Encoder接口。下一步調用partitionAndCollate依據topics的messages進行分組操做,messages分配給dataPerBroker(多個不一樣的Broker的Map)。依據不一樣Broker調用不一樣的SyncProducer.send批量發送消息數據,SyncProducer包裝了nio網絡操做信息。


Producer的sync與async發送消息處理,你們看以上架構圖一目瞭然。


partitionAndCollate方法具體做用:獲取所有partitions的leader所在leaderBrokerId(就是在該partiionid的leader分佈在哪一個broker上),

建立一個HashMap<int, Map<TopicAndPartition, List<KeyedMessage<K,Message>>>>,把messages依照brokerId分組組裝數據,而後爲SyncProducer分別發送消息做準備工做。


名稱解釋:partKey:分區keyword,當client應用程序實現Partitioner接口時,傳入參數key爲分區keyword。依據key和numPartitions。返回分區(partitions)索引。記住partitions分區索引是從0開始的。

Producer平滑擴容機制

假設開發過producerclient代碼。會知道metadata.broker.list參數,它的含義是kafak broker的ip和port列表,producer初始化時,就鏈接這幾個broker,這時你們會有疑問。producer支持kafka cluster新增broker節點?它又沒有監聽zk broker節點或從zk中獲取broker信息,答案是確定的,producer可以支持平滑擴容broker,他是經過定時與現有的metadata.broker.list通訊,獲取新增broker信息,而後把新建的SyncProducer放入ProducerPool中。等待興許應用程序調用。

DefaultEventHandler類中初始化實例化BrokerPartitionInfo類,而後按期brokerPartitionInfo.updateInfo方法,DefaultEventHandler部分代碼例如如下:
[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. def handle(events: Seq[KeyedMessage[K,V]]) {  
  2.   ......  
  3.   while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {  
  4.     topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)  
  5.     if (topicMetadataRefreshInterval >= 0 &&  
  6.         SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {  
  7.       Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))  
  8.       sendPartitionPerTopicCache.clear()  
  9.       topicMetadataToRefresh.clear  
  10.       lastTopicMetadataRefreshTime = SystemTime.milliseconds  
  11.     }  
  12.     outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)  
  13.     if (outstandingProduceRequests.size > 0) {  
  14.       info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))  
  15.       //休眠時間,多長時間刷新一次  
  16.       Thread.sleep(config.retryBackoffMs)  
  17.       // 生產者按期請求刷新最新topics的broker元數據信息  
  18.       Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))  
  19.       .....  
  20.     }  
  21.   }  
  22.   
  23. }  
BrokerPartitionInfo的updateInfo方法代碼例如如下:
[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. def updateInfo(topics: Set[String], correlationId: Int) {  
  2.   var topicsMetadata: Seq[TopicMetadata] = Nil  
  3.   //依據topics列表,meta.broker.list,其它配置參數,correlationId表示請求次數,一個計數器參數而已  
  4.   //建立一個topicMetadataRequest,並隨機的選取傳入的broker信息中不論什麼一個去取metadata,直到取到爲止  
  5.   val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)  
  6.   topicsMetadata = topicMetadataResponse.topicsMetadata  
  7.   // throw partition specific exception  
  8.   topicsMetadata.foreach(tmd =>{  
  9.     trace("Metadata for topic %s is %s".format(tmd.topic, tmd))  
  10.     if(tmd.errorCode == ErrorMapping.NoError) {  
  11.       topicPartitionInfo.put(tmd.topic, tmd)  
  12.     } else  
  13.       warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))  
  14.     tmd.partitionsMetadata.foreach(pmd =>{  
  15.       if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {  
  16.         warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,  
  17.           ErrorMapping.exceptionFor(pmd.errorCode).getClass))  
  18.       } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata  
  19.     })  
  20.   })  
  21.   producerPool.updateProducer(topicsMetadata)  
  22. }  
ClientUtils.fetchTopicMetadata方法代碼:
[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {  
  2.   var fetchMetaDataSucceeded: Boolean = false  
  3.   var i: Int = 0  
  4.   val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)  
  5.   var topicMetadataResponse: TopicMetadataResponse = null  
  6.   var t: Throwable = null  
  7.   val shuffledBrokers = Random.shuffle(brokers) //生成隨機數  
  8.   while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {  
  9.     //對隨機選到的broker會建立一個SyncProducer  
  10.     val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))  
  11.     info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i), correlationId, topics.size, topics))  
  12.     try {  //發送topicMetadataRequest到該broker去取metadata,得到該topic所相應的所有的broker信息  
  13.       topicMetadataResponse = producer.send(topicMetadataRequest)  
  14.       fetchMetaDataSucceeded = true  
  15.     }  
  16.     catch {  
  17.       ......  
  18.     }  
  19.   }  
  20.   if(!fetchMetaDataSucceeded) {  
  21.     throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s] failed".format(topics, shuffledBrokers), t)  
  22.   } else {  
  23.     debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))  
  24.   }  
  25.   return topicMetadataResponse  
  26. }  
ProducerPool的updateProducer
[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. def updateProducer(topicMetadata: Seq[TopicMetadata]) {  
  2.     val newBrokers = new collection.mutable.HashSet[Broker]  
  3.     topicMetadata.foreach(tmd => {  
  4.       tmd.partitionsMetadata.foreach(pmd => {  
  5.         if(pmd.leader.isDefined)  
  6.           newBrokers+=(pmd.leader.get)  
  7.       })  
  8.     })  
  9.     lock synchronized {  
  10.       newBrokers.foreach(b => {  
  11.         if(syncProducers.contains(b.id)){  
  12.           syncProducers(b.id).close()  
  13.           syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))  
  14.         } else  
  15.           syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))  
  16.       })  
  17.     }  
  18.   }  

當咱們啓動kafka broker後。並且大量producer和consumer時。經常會報例如如下異常信息。

root@lizhitao:/opt/soft$ Closing socket connection to 192.168.11.166

筆者也是經常很是長時間看源代碼分析,才明確了爲何ProducerConfig配置信息裏面並不要求使用者提供完整的kafka集羣的broker信息。而是任選一個或幾個就能夠。

因爲他會經過您選擇的broker和topics信息而獲取最新的所有的broker信息。

值得了解的是用於發送TopicMetadataRequest的SyncProducer儘管是用ProducerPool.createSyncProducer方法建出來的,但用完並不還回ProducerPool。而是直接Close.

重難點理解:
刷新metadata並不只在第一次初始化時作。

爲了能適應kafka broker執行中因爲各類緣由掛掉、paritition改變等變化,

eventHandler會按期的再去刷新一次該metadata,刷新的間隔用參數topic.metadata.refresh.interval.ms定義,默認值是10分鐘。
這裏有三點需要強調:
  • client調用send, 纔會新建SyncProducer,僅僅有調用send纔會去按期刷新metadata
  • 在每次取metadata時,kafka會新建一個SyncProducer去取metadata,邏輯處理完後再close。

  • 依據當前SyncProducer(一個Broker的鏈接)取得的最新的完整的metadata,刷新ProducerPool中到broker的鏈接.
  • 每10分鐘的刷新會直接又一次把到每個broker的socket鏈接重建。意味着在這以後的第一個請求會有幾百毫秒的延遲。假設不想要該延遲。把topic.metadata.refresh.interval.ms值改成-1,這樣僅僅有在發送失敗時。纔會又一次刷新。

    Kafka的集羣中假設某個partition所在的broker掛了,可以檢查錯誤後從新啓動又一次增長集羣,手動作rebalance。producer的鏈接會再次斷掉,直到rebalance完畢,那麼刷新後取到的鏈接着中就會有這個新增長的broker。

說明:每個SyncProducer實例化對象會創建一個socket鏈接
特別注意:
在ClientUtils.fetchTopicMetadata調用完畢後,回到BrokerPartitionInfo.updateInfo繼續運行,在其末尾,pool會依據上面取得的最新的metadata創建所有的SyncProducer,即Socket通道producerPool.updateProducer(topicsMetadata)
在ProducerPool中,SyncProducer的數目是由該topic的partition數目控制的。即每一個SyncProducer相應一個broker,內部封了一個到該broker的socket鏈接。

每次刷新時,會把已存在SyncProducer給close掉,即關閉socket鏈接,而後新建SyncProducer,即新建socket鏈接,去覆蓋老的。
假設不存在,則直接建立新的。


25)apache kafka性能優化架構分析

Apache kafka性能優化架構分析


應用程序優化:數據壓縮






consumer offset默認狀況下是定時批量更新topics的partitions offset值


26)apache kafka源代碼分析走讀-server端網絡架構分析

筆者今天分析一下kafka網絡架構,俗話說人無好的脛骨,就沒有好的身體,建築沒有紮實可靠的結構框架,就不會屹立不倒。

相同的服務端程序沒有好的網絡架構。其性能就會受到極大影響,其它方面再怎麼優化。也會受限於此。那kafka網絡架構是如何的呢,它不是用的現今流行的netty,mina的高性能網絡架構,而是本身基於java nio開發的。

kafka網絡架構圖例如如下:


27)apache kafka源代碼分析走讀-ZookeeperConsumerConnector分析

1.ZookeeperConsumer架構

ZookeeperConsumer類中consumer執行過程架構圖:

                                                                                                圖1

過程分析:

ConsumerGroupExample類

2.消費者線程(consumer thread),隊列。拉取線程(fetch thread)三者之間關係

每一個topic至少需要建立一個consumer thread。假設有多個partitions,則可以建立多個consumer thread線程,consumer thread>==partitions數量,不然會有consumer thread空暇。

部分代碼示比例如如下:

ConsumerConnector consumer

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(

            createConsumerConfig());

 Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

 topicCountMap.put("test-string-topic", new Integer(1));  //value表示consumer thread線程數量

 Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

詳細說明一下三者關係:

(1).topic的partitions分佈規則

paritions是安裝kafka brokerId有序分配的。

好比現在有三個node安裝了kafka broker服務端程序,brokerId分別設置爲1,2,3。現在準備一個topic爲test-string-topic,並且分配12個partitons,此時partitions的kafka broker節點分佈狀況爲 ,partitions索引編號爲0,3,6,9等4個partitions在brokerId=1上,1,4,7,10在brokerId=2上。2,5,8,11在brokerId=3上。

建立consumer thread

consumer thread數量與BlockingQueue一一相應。

a.當consumer thread count=1時

此時有一個blockingQueue1,三個fetch thread線程,該topic分佈在幾個node上就有幾個fetch thread,每個fetch thread會於kafka broker創建一個鏈接。3個fetch thread線程去拉取消息數據,終於放到blockingQueue1中。等待consumer thread來消費。

消費者線程。緩衝隊列。partitions分佈列表例如如下

consumer線程

Blocking Queue

partitions

consumer thread1

blockingQueue1

0,1,2,3,4,5,6,7,8,9,10,11

fetch thread與partitions分佈列表例如如下

fetch線程

partitions

fetch thread1

0,3,6,9

fetch thread2

1,4,7,10

fetch thread3

2,5,8,11


b. 當consumer thread count=2時

此時有consumerThread1和consumerThread2分別相應2個隊列blockingQueue1,blockingQueue2,這2個消費者線程消費partitions依次爲:0,1,2,3,4,5與6,7,8,9,10,11;消費者線程。緩衝隊列,partitions分佈列表例如如下

consumer線程

Blocking Queue

partitions

consumer thread1

blockingQueue1

0,1,2,3,4,5

consumer thread2

blockingQueue2

6,7,8,9,10,11


fetch thread與partitions分佈列表例如如下

fetch線程

partitions

fetch thread1

0,3,6,9

fetch thread2

1,4,7,10

fetch thread3

2,5,8,11


c. 當consumer thread count=4時

消費者線程,緩衝隊列,partitions分佈列表例如如下

consumer線程

Blocking Queue

partitions

consumer thread1

blockingQueue1

0,1,2

consumer thread2

blockingQueue2

3,4,5

consumer thread3

blockingQueue3

6,7,8

consumer thread4

blockingQueue4

9,10,11

fetch thread與partitions分佈列表例如如下

同上

同理當消費線程consumer thread count=n,都是安裝上述分佈規則來處理的。


3.consumer消息線程以及隊列建立邏輯

運用ZookeeperConsumerConnector類建立多線程並行消費測試類,ConsumerGroupExample類初始化,調用createMessageStreams方法,實際是在consume方法處理的邏輯。建立KafkaStream,以及堵塞隊列(LinkedBlockingQueue),KafkaStream與隊列個數一一相應,消費者線程數量決定堵塞隊列的個數。

registerConsumerInZK()方法:設置消費者組,註冊消費者信息consumerIdString到zookeeper上。

consumerIdString產生規則部分代碼例如如下:

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. String consumerUuid = null;  
  2. if(config.consumerId!=null && config.consumerId)  
  3.  consumerUuid = consumerId;  
  4. else {   
  5.  String uuid = UUID.randomUUID()  
  6.    
  7.   consumerUuid = "%s-%d-%s".format(  
  8.     InetAddress.getLocalHost.getHostName, System.currentTimeMillis,  
  9.     uuid.getMostSignificantBits().toHexString.substring(0,8));  
  10. }      
  11. String consumerIdString =  config.groupId + "_" + consumerUuid;  

kafka zookeeper註冊模型結構或存儲結構例如如下:

kafka在zookeeper中存儲結構           

說明:眼下把kafka中絕大部分存儲模型都列表出來了。當前還有少許不常使用的,臨時尚未列舉,興許會加上。


consumer初始化邏輯處理:

1.實例化並註冊loadBalancerListener監聽,ZKRebalancerListener監聽consumerIdString狀態變化

 

觸發consumer reblance條件例如如下幾個:

ZKRebalancerListener:當/kafka01/consumer/[consumer-group]/ids子節點變化時,會觸發

ZKTopicPartitionChangeListener:當該topic的partitions發生變化時,會觸發。


      val topicPath = "/kafka01/brokers/topics" + "/" + "topic-1"
      zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)


consumer reblance邏輯

consumer offset更新機制

reblance計算規則:(有待補充)


28)kafka的ZkUtils類的java版本號部分代碼

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. /** 
  2.  * Created with IntelliJ IDEA. 
  3.  * User: lizhitao 
  4.  * Date: 14-6-6 
  5.  * Time: 下午3:01 
  6.  * To change this template use File | Settings | File Templates. 
  7.  */  
  8. public class TestMafkaZkUtils {  
  9.     private static final Logger logger = Logger.getLogger(TestMafkaZkUtils.class);  
  10.   
  11.     /**********   kafka zk root conf   *********/  
  12.     public static final String ConsumersPath = "/consumers";  
  13.     public static final String BrokerIdsPath = "/brokers/ids";  
  14.     public static final String BrokerTopicsPath = "/brokers/topics";  
  15.     public static final String TopicConfigPath = "/config/topics";  
  16.     public final String TopicConfigChangesPath = "/config/changes";  
  17.     public static final String ControllerPath = "/controller";  
  18.     public static final String ControllerEpochPath = "/controller_epoch";  
  19.     public static final String ReassignPartitionsPath = "/admin/reassign_partitions";  
  20.     public static final String DeleteTopicsPath = "/admin/delete_topics";  
  21.     public static final String PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election";  
  22.   
  23.     public static String getTopicPath(String topic) {  
  24.         return  BrokerTopicsPath + "/" + topic;  
  25.     }  
  26.   
  27.     public static String getTopicPartitionsPath(String topic) {  
  28.         return   getTopicPath(topic) +  "/partitions";  
  29.     }  
  30.   
  31.     public static String getTopicConfigPath(String topic) {  
  32.         return  TopicConfigPath + "/" + topic;  
  33.     }  
  34.   
  35.     public static String getDeleteTopicPath(String clusterName, String topic ) {  
  36.         return   DeleteTopicsPath + "/" + topic;  
  37.     }  
  38.   
  39.     public static String getBrokerIdsPath() {  
  40.         return  BrokerIdsPath;  
  41.     }  
  42.   
  43.   
  44.     public static List<MafkaBroker> getAllBrokersInCluster(ZkClient zkClient, String clusterName) {  
  45.         if (!pathExists(zkClient, getBrokerIdsPath())) {  
  46.             throw new ZkNoNodeException(getBrokerIdsPath());  
  47.         }  
  48.   
  49.         List<String> brokerIds = getChildrenParentMayNotExist(zkClient, getBrokerIdsPath());  
  50.         Collections.sort(brokerIds);  
  51. //                List<String>     MafkaBroker getBrokerInfo(ZkClient zkClient, int brokerId)  
  52.         List<MafkaBroker> retList = new ArrayList<MafkaBroker>();  
  53.         for (String brokerIdStr : brokerIds) {  
  54.             MafkaBroker broker = getBrokerInfo(zkClient, Integer.valueOf(brokerIdStr));  
  55.             if (broker!=null)  
  56.                 retList.add(broker);  
  57.   
  58.         }  
  59.   
  60.         return retList;  
  61.     }  
  62.   
  63.   
  64.     public static String getMetadataBrokerList(ZkClient zkClient, String clusterName) {  
  65.         List<MafkaBroker> brokers = TestMafkaZkUtils.getAllBrokersInCluster(zkClient, clusterName);  
  66.         StringBuffer sb = new StringBuffer();  
  67.         for (MafkaBroker broker : brokers) {  
  68.             logger.info(broker);  
  69.             if (sb.length() > 0)  
  70.                 sb.append(",");  
  71.             sb.append(broker.getHost()).append(":").append(broker.getPort());  
  72.         }  
  73.   
  74.         return sb.toString();  
  75.     }  
  76.   
  77.     /** 
  78.      * get children nodes name 
  79.      * @param zkClient zkClient 
  80.      * @param path full path 
  81.      * @return children nodes name or null while path not exist 
  82.      */  
  83.     public static List<String> getChildrenParentMayNotExist(ZkClient zkClient, String path) {  
  84.         try {  
  85.             return zkClient.getChildren(path);  
  86.         } catch (ZkNoNodeException e) {  
  87.             return null;  
  88.         } catch (Exception ex) {  
  89.             logger.error("getChildrenParentMayNotExist invoke fail!",ex);  
  90.             return null;  
  91.         }  
  92.     }  
  93.   
  94.     /** 
  95.      * This API takes in a broker id, queries zookeeper for the broker metadata and returns the metadata for that broker 
  96.      * or throws an exception if the broker dies before the query to zookeeper finishes 
  97.      * @param brokerId The broker id 
  98.      * @param zkClient The zookeeper client connection 
  99.      * @return An optional MafkaBroker object encapsulating the broker metadata 
  100.      */  
  101.     public static MafkaBroker getBrokerInfo(ZkClient zkClient, int brokerId) {  
  102. //        Pair<String, Stat>  
  103.         String brokerInfoStr = readDataMaybeNull(zkClient, getBrokerIdsPath() + "/" + brokerId).getLeft();  
  104.         if (StringUtils.isNotEmpty(brokerInfoStr)) {  
  105.             return MafkaBroker.createBroker(brokerId, brokerInfoStr);  
  106.         } else{  
  107.             return null;  
  108.         }  
  109.     }  
  110.   
  111.     public static Pair<String, Stat> readData(ZkClient client, String path) {  
  112.         Stat stat = new Stat();  
  113.         String dataStr = client.readData(path, stat);  
  114.         return Pair.of(dataStr, stat);  
  115.     }  
  116.   
  117.     public static Pair<String, Stat> readDataMaybeNull(ZkClient client, String path) {  
  118.         Stat stat = new Stat();  
  119.         Pair<String, Stat> dataAndStat = null;  
  120.         try {  
  121.             dataAndStat = Pair.of((String)client.readData(path, stat), stat);  
  122.         } catch(ZkNoNodeException nkex) {  
  123.             return Pair.of(null, stat);  
  124.         } catch(Exception ex) {  
  125.             logger.error(ex);  
  126.         }  
  127.         return dataAndStat;  
  128.     }  
  129.   
  130.     /** 
  131.      * Update the value of a persistent node with the given path and data. 
  132.      * create parrent directory if necessary. Never throw NodeExistException. 
  133.      */  
  134.     public void updateEphemeralPath(ZkClient client, String path, String data) {  
  135.         try {  
  136.             client.writeData(path, data);  
  137.         } catch(ZkNoNodeException zkex) {  
  138.             createParentPath(client, path);  
  139.             client.createEphemeral(path, data);  
  140.         } catch (Exception ex) {  
  141.             logger.error(ex);  
  142.         }  
  143.     }  
  144.   
  145.     public static boolean deletePath(ZkClient client, String path) {  
  146.         try {  
  147.             return client.delete(path);  
  148.         } catch(ZkNoNodeException zkex) {  
  149.             // this can happen during a connection loss event, return normally  
  150.             logger.info(path + " deleted during connection loss; this is ok");  
  151.             return false;  
  152.         } catch (Exception ex) {  
  153.             logger.error(ex);  
  154.         }  
  155.         return false;  
  156.     }  
  157.   
  158.     public void deletePathRecursive(ZkClient client, String path) {  
  159.         try {  
  160.             client.deleteRecursive(path);  
  161.         } catch(ZkNoNodeException zkex) {  
  162.             // this can happen during a connection loss event, return normally  
  163.             logger.info(path + " deleted during connection loss; this is ok");  
  164.         } catch (Exception ex) {  
  165.             logger.error(ex);  
  166.         }  
  167.     }  
  168.   
  169.     public void maybeDeletePath(String zkUrl, String dir) {  
  170.         try {  
  171.             ZkClient zk = new ZkClient(zkUrl, 30*100030*1000new MafkaZKStrSerializer());  
  172.             zk.deleteRecursive(dir);  
  173.             zk.close();  
  174.         } catch(Exception ex) {  
  175.             logger.error(ex);  
  176.         }  
  177.     }  
  178.   
  179.     /** 
  180.      *  make sure a persistent path exists in ZK. Create the path if not exist. 
  181.      */  
  182.     public static void makeSurePersistentPathExists(ZkClient client, String path) {  
  183.         if (!client.exists(path))  
  184.             client.createPersistent(path, true); // won't throw NoNodeException or NodeExistsException  
  185.     }  
  186.   
  187.     /** 
  188.      *  create the parent path 
  189.      */  
  190.     private static void createParentPath(ZkClient client, String path) {  
  191.         String parentDir = path.substring(0, path.lastIndexOf('/'));  
  192.         if (parentDir.length() != 0)  
  193.             client.createPersistent(parentDir, true);  
  194.     }  
  195.   
  196.     /** 
  197.      * Create an ephemeral node with the given path and data. Create parents if necessary. 
  198.      */  
  199.     private static void createEphemeralPath(ZkClient client, String path, String data) {  
  200.         try {  
  201.             client.createEphemeral(path, data);  
  202.         } catch(ZkNoNodeException znex) {  
  203.             createParentPath(client, path);  
  204.             client.createEphemeral(path, data);  
  205.         }  
  206.     }  
  207.   
  208.     /** 
  209.      * Create an ephemeral node with the given path and data. 
  210.      * Throw NodeExistException if node already exists. 
  211.      */  
  212.     public static void createEphemeralPathExpectConflict(ZkClient client, String path, String data) {  
  213.         try {  
  214.             createEphemeralPath(client, path, data);  
  215.         } catch(ZkNodeExistsException zkex) {  
  216.             // this can happen when there is connection loss; make sure the data is what we intend to write  
  217.             String storedData = null;  
  218.             try {  
  219.                 storedData = readData(client, path).getLeft();  
  220.             } catch(ZkNoNodeException znex) {  
  221.                 logger.error(znex);  
  222.             }  
  223.             if (storedData == null || storedData != data) {  
  224.                 logger.info("conflict in " + path + " data: " + data + " stored data: " + storedData);  
  225.                 throw zkex;  
  226.             } else {  
  227.                 // otherwise, the creation succeeded, return normally  
  228.                 logger.info(path + " exists with value " + data + " during connection loss; this is ok");  
  229.             }  
  230.         }  
  231.     }  
  232.   
  233.   
  234.     /** 
  235.      * Create an persistent node with the given path and data. Create parents if necessary. 
  236.      */  
  237.     public static void createPersistentPath(ZkClient client, String path, String data) {  
  238.         try {  
  239.             client.createPersistent(path, data);  
  240.         } catch(ZkNoNodeException znex) {  
  241.             createParentPath(client, path);  
  242.             client.createPersistent(path, data);  
  243.         }  
  244.     }  
  245.   
  246.     public String createSequentialPersistentPath(ZkClient client, String path, String data) {  
  247.         return client.createPersistentSequential(path, data);  
  248.     }  
  249.   
  250.   
  251.     public static List<String> getAllPartitionsByTopic(ZkClient zkClient, String topic) {  
  252.         return getChildren(zkClient, getTopicPartitionsPath(topic));  
  253.     }  
  254.   
  255.     /** 
  256.      * Check if the given path exists 
  257.      */  
  258.     public static boolean pathExists(ZkClient zkClient, String path) {  
  259.         logger.info("pathExists:" + path+ " zkClient:" + zkClient);  
  260.         return zkClient.exists(path);  
  261.     }  
  262.   
  263.     /** 
  264.      * 功能介紹:解析partitions列表數據,partitions以字符串方式存儲,用逗號分隔。 
  265.      * @param zkClient 
  266.      * @return 
  267.      */  
  268.     public static String getAllPartitionsSepCommaByTopic(ZkClient zkClient,String topic) {  
  269.         logger.info("getTopicPartitionsPath(clusterName, topic):" + getTopicPartitionsPath(topic));  
  270.         if (!pathExists(zkClient, getTopicPartitionsPath(topic))) {  
  271.             throw new ZkNoNodeException(getTopicPartitionsPath(topic));  
  272.         }  
  273.   
  274.         List<String> partitions = getChildren(zkClient, getTopicPartitionsPath(topic));  
  275.         Collections.sort(partitions,new Comparator<String>() {  
  276.             @Override  
  277.             public int compare(String o1, String o2) {  
  278.                 final int p1 = ( o1 == null ) ? 0 : Integer.parseInt(o1);  
  279.                 final int p2 = ( o2 == null ) ? 0 : Integer.parseInt(o2);  
  280.                 return NumberUtils.compare(p1, p2);  
  281.             }  
  282.         });  
  283.   
  284.         StringBuffer parts = new StringBuffer();  
  285.         for ( String partition : partitions ) {  
  286.             if (parts.length() > 0)  
  287.                 parts.append(",");  
  288.             parts.append(partition);  
  289.         }  
  290.         return parts.toString();  
  291.     }  
  292.   
  293.     public static List<String> getChildren(ZkClient client, String path) {  
  294.         return client.getChildren(path);  
  295.     }  
  296.   
  297.   
  298.     public static List<MafkaBroker> getAllBrokersInCluster(ZkClient zkClient) {  
  299.         List<String> brokerIds = getChildrenParentMayNotExist(zkClient, getBrokerIdsPath());  
  300.         Collections.sort(brokerIds);  
  301. //                List<String>     MafkaBroker getBrokerInfo(ZkClient zkClient, int brokerId)  
  302.         List<MafkaBroker> retList = new ArrayList<MafkaBroker>();  
  303.         for (String brokerIdStr : brokerIds) {  
  304.             MafkaBroker broker = getBrokerInfo(zkClient, Integer.valueOf(brokerIdStr));  
  305.             if (broker!=null)  
  306.                 retList.add(broker);  
  307.   
  308.         }  
  309.   
  310.         return retList;  
  311.     }  
  312.   
  313.   
  314.     public static void main(String[] args) {  
  315.         ZkClient zkClient;  
  316.         //kafka zk根節點  
  317.         String zkConnect = "192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka01";  
  318.         int zkSessionTimeoutMs = 5000;  
  319.         int zkConnectionTimeoutMs = 5000;  
  320.         zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, zkConnectionTimeoutMs, new MafkaZKStrSerializer());  
  321.         //獲取所有broker信息  
  322.         System.out.println(getAllBrokersInCluster(zkClient));  
  323.         //獲取所有partitions信息  
  324.         System.out.println(getAllPartitionsSepCommaByTopic(zkClient, "cluster-switch-topic"));  
  325.     }  
  326.   
  327. }  

29)kafka & mafka client開發與實踐

單擊這裏下載(下載網址:http://download.csdn.net/detail/zhongwen7710/8173117)

30)   kafka文件系統設計那些事

1.文件系統說明

文件系統通常分爲系統和用戶2種類型,系統級文件系統:ext3,ext4,dfs,ntfs等等,,筆者並不會向你們介紹那種紛繁複雜的分佈式或系統級文件系統,而是從kafka架構高性能角度考慮,深刻剖析kafka文件系統存儲結構設計。


2.kafka文件系統架構


2.1 文件系統數據流

如下用圖形表示介紹client處理幾個步驟例如如下:


                                                             圖1

  • 當創建鏈接請求時。首先客戶端向kafka broker發送鏈接請求。broker中由Acceptor thread線程接收並創建鏈接後。把client的socket以輪詢方式轉交給對應的processor thread。
  • 當client向broker發送數據請求。由processor thread處理並接收client數據放到request緩衝區中,以待IO thread進行邏輯處理和計算並把返回result放到response緩衝區中.
    接着喚醒processor thread。processor thread抱住response隊列循環發送所有response數據給client.

2.2 kafka文件系統存儲結構


                                                                                               圖2

  • paritions分佈規則,kafka集羣由多個kafka broker組成。一個topic的partitions會分佈在一個或多個broker上,topic的partitions在kafka集羣上分配規則爲。安裝paritions索引編號依次有序分佈在broker上,
    當partitions數量 > brokers數量,會依次輪迴再次迭代分配。

  • partitions命名規則,paritions名稱爲:topic-name-index,  index分區索引編號,從0開始依次遞增。

  • producer,每個producer可以發送msg到topic隨意一個或多個partitons。

  • consumer,同一個Consumer Group中的Consumers,Kafka將對應Topic中的每個消息僅僅發送給當中一個Consumer.

2.3 kafka的文件系統結構-文件夾

眼下假如kafka集羣中僅僅有一個broker,數據文件文件夾爲message-folder,好比筆者建立一個topic名稱爲:report_push,  partitions=4

存儲路徑和文件夾規則爲:

xxx/message-folder

                              |--report_push-0

                              |--report_push-1

                              |--report_push-2

                              |--report_push-3

形象表示圖例如如下:


                                     圖3

2.4 kafka的文件系統結構-partiton文件存儲方式


                                                           圖4

每個partition(topic-name-index)文件夾中存儲海量msg消息,那它是怎麼存儲的呢?文件存儲結構是如何?

這麼多(海量)消息是存儲在一個大文件裏,相似DB那樣存儲。仍是其它方式存儲結構呢?筆者興許會像剝洋蔥同樣。給你們一層一層依次分解並分析。

  • 數據庫和kafka文件系統比較,相信你們都用過數據庫,數據庫底層文件系統至關複雜。因爲數據庫特色,需要依照keyword,id高速查詢,改動,刪除,日誌。回滾等等。


    因此數據庫文件系統是分頁存儲的樹形結構,需要支持大量隨機事物操做。

    相比數據庫支持查詢。事物等等複雜文件,則kafka消息隊列類型文件系統簡單多了,kafka文件系統存儲特色是,
    僅僅需要支持producer和consumer順序生產和消息就夠了,消息(msg)生命週期由consumer決定。

  • partiton文件存儲結構分析,每個partition就像如上圖4,一個巨大文件消息數據被平均分配到多個文件大小相等的文件裏。即至關於一個大文件被切成很是多相等大小的文件段segment file
    (消息數量不必定相等)。因爲每個topic中消息生命週期由最後一個consumer決定,當某個或些消息被最後一個consumer(consumer group)消息後,就可以刪除該消息。顯然易見,

          這樣作的目的是broker能高速回收磁盤空間,而且小文件也能mmap全部到內存。

主要目的就是提升磁盤利用率和消息處理性能。

2.5 kafka的文件系統結構-partiton文件存儲segment file組成

讀者從2.4節瞭解到kafka文件系統partition存儲方式。如下向你們介紹一下partion文件存儲中segement file組成結構。

一個商業化消息隊列的性能好壞。

其文件系統存儲結構設計是衡量一個消息隊列服務程序最關鍵指標之中的一個,他也是消息隊列中最核心且最能體現消息隊列技術水平的部分。在本節中咱們將走進segment file內部一探到底。

segment file組成:由2大部分組成,分別爲segment data file和segment index file,此2個文件一一相應。成對出現.

segment index file索引文件組成結構例如如下:

 00000000000000000000.index       文件名,文件串大小最大支持2^64bit

每次記錄對應log文件記錄的相對條數和物理偏移位置位置。共8bytes

4byte 當前segment file offset - last seg file offset記錄條數       offset

4byte相應segment file物理偏移地址 position

………

           segment data file索引文件組成結構例如如下:

           00000000000000000000.log        文件名。文件串大小最大支持2^64bit。與index相應

     

                     圖5      

參數說明:

4 byte CRC32:使用crc32算法計算除CRC32這4byte外的buffer。

1 byte 「magic":表示數據文件協議版本

1 byte 「attributes":表示標識獨立版本號,標識壓縮類型,編碼類型。

 key data:可選,可以存儲推斷或表示這個消息塊的元數據信息。

payload data:消息體。該消息體可能會存儲多條消息記錄。內部是依照序號有序存儲的。

2.6 kafka文件系統-consumer讀取流程

 

圖6

segment index file:

稀疏索引方式,下降索引文件大小,這樣可以直接內存操做,稀疏索引僅僅爲數據文件的每個存儲塊設一個鍵-指針對,它比稠密索引節省了不少其它的存儲空間,但查找給定值的記錄需不少其它的時間,經過二分查找高速找到segment data file物理位置。假設在index file沒有找到data file詳細位置,則data file相對位置繼續順序讀取查找。直到找到爲止。

2.7 kafka的文件系統結構-總體文件夾結構

 

圖7

同一個topic下有不一樣分區,每個分區如下會劃分爲多個(段)文件。僅僅有一個當前文件在寫,其它文件僅僅讀。

當寫滿一個文件(寫滿的意思是達到設定值)則切換文件。新建一個當前文件用來寫,老的當前文件切換爲僅僅讀。

文件的命名以起始偏移量來命名。

看一個樣例,若是report_push這個topic下的0-0分區可能有如下這些文件:

 • 00000000000000000000.index

 • 00000000000000000000.log 

 • 00000000000000368769.index 

 • 00000000000000368769.log 

 • 00000000000000737337.index 

 • 00000000000000737337.log

 • 00000000000001105814.index

 • 00000000000001105814.log

    ………………..

當中 00000000000000000000.index表示最開始的文件,起始偏移量爲0.第二個文件00000000000000368769.index的消息量起始偏移量爲368769.相同。第三個文件00000000000000737337.index的起始偏移量爲737337.

以起始偏移量命名並排序這些文件,那麼當消費者要拉取某個消息起始偏移量位置的數據變的至關簡單,僅僅要依據傳上來的offset**二分查找**文件列表,定位到詳細文件,

而後將絕對offset減去文件的起始節點轉化爲相對offset。就能夠開始數據傳輸。好比,相同以上面的樣例爲例。若是消費者想抓取從第368969消息位置開始的數據。則依據368969二分查找,

定位到00000000000000368769.log這個文件(368969在368769和737337之間),依據索引文件二分搜索可以肯定讀取數據最大大小。

2.8 kafka文件系統–實際效果


                                                    圖8   

基本不會有磁盤讀的大量操做。都在內存進行。僅僅有按期磁盤批量寫操做。

3.總結

 高效文件系統特色

  • 一個大文件分紅多個小文件段。
  • 多個小文件段,easy定時清除或刪除已經消費完文件,下降磁盤佔用。
  • index全部映射到memory直接操做。避免segment file被交換到磁盤添加IO操做次數。
  • 依據索引信息,可以肯定發送response到consumer的最大大小。
  • 索引文件元數據存儲用的是相對前一個segment file的offset存儲。節省空間大小。

31)kafka的ZookeeperConsumer實現

kafka的ZookeeperConsumer數據獲取的過程例如如下:

入口ZookeeperConsumerConnector def consume[T](topicCountMap: scala.collection.Map[String,Int], decoder: Decoder[T])
: Map[String,List[KafkaStream[T]]] 方法
client啓動後會在消費者註冊文件夾上加入子節點變化的監聽ZKRebalancerListener。ZKRebalancerListener實例會在內部建立一個線程。這個線程定時檢查監聽的事件有沒有運行(消費者發生變化),假設沒有變化則wait1秒鐘。當發生了變化就調用 syncedRebalance 方法,去rebalance消費者。

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. while (!isShuttingDown.get) {  
  2.           try {  
  3.             lock.lock()  
  4.             try {  
  5.               if (!isWatcherTriggered)  
  6.                 cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag  
  7.             } finally {  
  8.               doRebalance = isWatcherTriggered  
  9.               isWatcherTriggered = false  
  10.               lock.unlock()  
  11.             }  
  12.             if (doRebalance)  
  13.               syncedRebalance  
  14.           } catch {  
  15.             case t => error("error during syncedRebalance", t)  
  16.           }  

syncedRebalance方法在內部會調用def rebalance(cluster: Cluster): Boolean方法,去運行操做。
這種方法的僞代碼例如如下:

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. while (!isShuttingDown.get) {  
  2.           try {  
  3.             lock.lock()  
  4.             try {  
  5.               if (!isWatcherTriggered)  
  6.                 cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag  
  7.             } finally {  
  8.               doRebalance = isWatcherTriggered  
  9.               isWatcherTriggered = false  
  10.               lock.unlock()  
  11.             }  
  12.             if (doRebalance)  
  13.               syncedRebalance  
  14.           } catch {  
  15.             case t => error("error during syncedRebalance", t)  
  16.           }  

syncedRebalance方法在內部會調用def rebalance(cluster: Cluster): Boolean方法,去運行操做。
這種方法的僞代碼例如如下:

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. // 關閉所有的數據獲取者  
  2. closeFetchers  
  3. // 解除分區的所有者  
  4. releasePartitionOwnership  
  5. // 按規則獲得當前消費者擁有的分區信息並保存到topicRegistry中  
  6. topicRegistry=getCurrentConsumerPartitionInfo  
  7. // 改動並從新啓動Fetchers  
  8. updateFetchers  

updateFetcher是這樣實現的。

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. private def updateFetcher(cluster: Cluster) {  
  2.       // 遍歷topicRegistry中保存的當前消費者的分區信息,改動Fetcher的partitions信息   
  3.       var allPartitionInfos : List[PartitionTopicInfo] = Nil  
  4.       for (partitionInfos <- topicRegistry.values)  
  5.         for (partition <- partitionInfos.values)  
  6.           allPartitionInfos ::= partition  
  7.       info("Consumer " + consumerIdString + " selected partitions : " +  
  8.         allPartitionInfos.sortWith((s,t) => s.partition < t.partition).map(_.toString).mkString(","))  
  9.   
  10.       fetcher match {  
  11.         case Some(f) =>  
  12.           // 調用fetcher的startConnections方法,初始化Fetcher並啓動它  
  13.           f.startConnections(allPartitionInfos, cluster)  
  14.         case None =>  
  15.       }  
  16.     }  

Fetcher在startConnections時。它先把topicInfo按brokerid去分組

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. for(info <- topicInfos) {  
  2.       m.get(info.brokerId) match {  
  3.         case None => m.put(info.brokerId, List(info))  
  4.         case Some(lst) => m.put(info.brokerId, info :: lst)  
  5.       }  
  6.     }  
而後檢查每組topicInfo相應的broker是否在當前集羣中註冊了

[java]  view plain copy 在CODE上查看代碼片 派生到個人代碼片
  1. val brokers = ids.map { id =>  
  2.       cluster.getBroker(id) match {  
  3.         case Some(broker) => broker  
  4.         case None => throw new IllegalStateException("Broker " + id + " is unavailable, fetchers could not be started")  
  5.       }  
  6.     }  
最後對每個broker建立一個FetcherRunnable線程。並啓動它。

這個線程負責從server上不斷獲取數據,把數據插入內部堵塞隊列的操做。

// 對每個分區分別建立FetchRequest

[java]  view plain copy
相關文章
相關標籤/搜索