【Spark深刻學習 -15】Spark Streaming前奏-Kafka初體驗

----本節內容-------html

1.Kafka基礎概念java

 1.1 出世背景node

 1.2 基本原理python

      1.2.1.前置知識linux

      1.2.2.架構和原理算法

      1.2.3.基本概念數據庫

      1.2.4.kafka特色apache

2.Kafka初體驗緩存

  2.1 環境準備安全

  2.2 Kafka小試牛刀

      2.2.1單個broker初體驗

      2.2.2 多個broker初體驗

  2.3 Kafka分佈式集羣構建

     2.3.1 Kafka分佈式集羣構建

     2.3.2 Kafka主題建立

     2.3.3 生產者生產數據

     2.3.4消費者消費數據

     2.3.5消息的壓縮

2.4 Kafka在ZK目錄節點

    2.4.1 kafka鏡像原理

    2.4.2 Kafka副本模型

    2.4.3 在ZK目錄節點內容

 2.5 實體間交互流程

   2.5.1主題與zk  

   2.5.2 消費者與zk

   2.5.3 broker與生產者

   2.5.4 消費者與消費者組

3.參考資料

---------------------

1.Kafka基礎概念

1.1 出世背景

Kafka是一個消息系統,是LinkedIn公司開發並開源出來的組件。Kafka本來用做LinkedIn的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎。如今它已被多家公司做爲多種類型的數據管道和消息系統使用。歪果仁就喜歡整一些看不懂的詞彙,好比活動流,好比運營數據,就不能好好說話麼。

活動流是啥?簡單理解,就是用戶使用網站或者系統時產生的數據流,好比點擊一個頁面,查看一個圖片,翻看一網頁,搜索一個關鍵字,網站運營者須要對用戶的這些行爲進行統計,造成報表。

運營數據是啥?就是計算機產生的監控日誌信息,如CPU數據,IO數據等, 這些數據都是動態生成的,Linkein這樣的大公司,運營數據量很是大,一般的方式是生成這些數據,寫入到log文件中,而後進行統計。活動流數據和運營數據對網站和軟件產品很是重要,舉幾個栗子

  1)動態彙總,將用戶的信息動態彙總,或者本身監控,或者發給用戶的朋友圈

  2) 安全,實時監控用戶訪問信息,防止網絡爬蟲或者用戶擴散垃圾信息,對API使用速率進行實時監控和控制,切斷網站某些不正常活動。

 3)機器硬件實時監控:對機器運行效率實時監控,對異常狀況自動觸發告警。

 4)報表和批處理:將數據導入Hadoop平臺,進行離線報表分析。

LinkedIn處理的時候就碰到幾個問題:

  1) 日誌量大,天天要處理10億多條數據。

   2)高吞吐量。

  3)實時性能差。

現有的消息隊列系統(messaging and queuing system)卻很適合於在實時或近實時(near-real-time)的狀況下使用,但它們對很長的未被處理的消息隊列的處理很不給力,每每並不將數 據持久化做爲首要的事情考慮。這樣就會形成一種狀況,就是當把大量數據傳送給Hadoop這樣的離線系統後, 這些離線系統每一個小時或天天僅能處理掉部分源數據。Kafka的目的就是要成爲一個隊列平臺,僅僅使用它就可以既支持離線又支持在線使用這兩種狀況。

1.2 基本架構和原理

1.2.1.前置知識

消息隊列

   爲何要引入消息隊列?舉個例子,假如A發送消息給B,若是B在線,那麼能夠很順利的通信發消息,那若是B不在線,那就比較麻煩了,消息隊列技術能夠很好的解決這個問題。

   消息隊列技術是分佈式應用間交換信息的一種技術,是兩個系統通信的橋樑和媒介,將兩個系統解耦,不須要知道對方的位置和信息。 經過消息隊列技術2個異構的系統能夠進行通信,尤爲是大型系統。消息隊列能夠保存在磁盤或者內存中。

   消息隊列技術底層都是socket通信,socket在不少地方有用到,好比數據庫,進程間通信,jdbc等等底層都是socket通信。

   JMS是消息服務的規範,不少消息中間件技術都遵循JMS規範。

消息隊列通信模式

1)點對點通信:點對點方式是最爲傳統和常見的通信方式,它支持一對1、一對多、多對多、多對一等多種配置方式,支持樹狀、網狀等多種拓撲結構。用人話描述一遍:就是一我的放東西,一我的去東西,這就是點對點。

2)發佈/訂閱 (Publish/Subscribe) 模式:發佈/訂閱功能使消息的分發能夠突破目的隊列地理指向的限制,使消息按照特定的主題甚至內容進行分發,用戶或應用程序能夠根據主題或內容接收到所須要的消息。發佈/訂閱功能使得發送者和接收者之間的耦合關係變得更爲鬆散,發送者沒必要關心接收者的目的地址,而接收者也沒必要關心消息的發送地址,而只是根據消息的主題進行消息的收發。發佈/訂閱 模式:就跟貼尋人啓事,公告同樣的道理,消息往公告上一貼,關心的人就去看看發生什麼事,不關心的就看成一堆廢紙,不用搭理。

Kafka很厲害,kafka將這兩個概念整合到一塊兒,他只有主題模式,可是能實現隊列模式的效果,實現方式就是消費者組的引入:

消費者組:將一個或者多個消費者劃到一塊兒,取一個名標記,這就是消費者組,對於一個消費者組,

,處在同一個消費者組的消費者,只能有一個消費者消費,

發佈訂閱模式:每一個消費者組,只有一個消費者,那就是發佈訂閱,每一個消費者都有本身的組。

每一個組都能消費,那就是發佈訂閱模式。

隊列模式: 全部的消費者都在一個組裏面。

消息隊列特色

 ·數據緩衝做用

· 下降耦合

·異構系統高效交互

Kafka是一個消息隊列組件,它遵循JMS規範,基本工做流程,生產者生產數據-> kafka集羣中轉數據->消費者消費數據

 

1.2.2.架構和原理

 

 

 生產者生產消息,將消息發送給Kafka集羣,Kafka內在是分佈式的,一個Kafka集羣一般包括多個代理。爲了均衡負載,將話題分紅多個分區,每一個代理存儲一或多個分區。消費者從kafka主題中獲取消息。多個生產者和消費者可以同時生產和獲取消息。

1.Producer根據指定partition方法(round-robin、hash等),將消息發佈到指定topic的partition裏面

2.kafka集羣接收到Producer發過來的消息後,將其持久化到硬盤,並保留消息指定時長(可配置),而不關注消息是否被消費。

3.Consumer從kafka集羣pull數據,並控制獲取消息的offset

1.2.3.基本概念

Broker:Kafka 集羣包含一個或多個服務器,這種服務器被稱爲 broker。

Topic:每條發佈到 Kafka 集羣的消息都有一個類別,這個類別被稱爲 Topic。(物理上不一樣 Topic 的消息分開存儲,邏輯上一個 Topic 的消息雖然保存於一個或多個 broker 上,但用戶只需指定消息的 Topic 便可生產或消費數據而沒必要關心數據存於何處)。

Partition:Partition 是物理上的概念,每一個 Topic 包含一個或多個 Partition。

Producer:負責發佈消息到 Kafka broker。

Consumer:消息消費者,向 Kafka broker 讀取消息的客戶端。

Consumer Group:每一個 Consumer 屬於一個特定的 Consumer Group(可爲每一個 Consumer 指定 group name,若不指定 group name 則屬於默認的 group)。

1.2.4.Kafka特色

1)分佈式流平臺,支持消息的分區(mr的分區相似),支持多個服務器之間消息分區

2)支持發佈和訂閱數據流,相似於消息系統

3)支持分佈式和副本集羣方式,來存儲數據流

4)實時處理數據流

5) 支持多種源數據,數據庫交互、app雙向交互

6)水平可伸縮

7) 容錯好

8)速度快

9)多種方式存儲,持久化存儲內存,磁盤秒級

10)海量數據,TB級高吞吐量:支持每秒百萬消息,·廉價硬件

11)多客戶端支持,很容集成不一樣平臺,java,python,和多源進行協同,它是一個

12)中間件的基因,跨平臺和跨語言,開源

2.Kafka初體驗

2.1環境準備

1)kafka下載

kafka2.1.2官網下載:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.1/kafka_2.12-0.10.2.1.tgz

2) zookeeper下載

zookeeper3.3.6官網地址:http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.3.6/zookeeper-3.3.6.tar.gz

3) jdk下載

kafka和zookeeper前提是安裝好了jdk,注意你電腦是32位仍是64爲

jdk官網下載地址:http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.tar.gz

4)關閉防火牆

不然zk啓動會報錯no route tohost

· 查看防火牆狀態,使用root帳號執行

service iptables status

·關閉防火牆

service iptables stop

·查看防火牆開機啓動狀態

chkconfig iptables --list

·關閉防火牆開機啓動

chkconfig iptables off

5)配置好host

2.2 Kafka小試牛刀

2.2.1 單個broker初體驗

1.安裝單節點的kafka

下載下來了以後直接解壓就能夠運行單節點的Kafka,由於Kafka須要用zookeeper作高可用,若是沒有安裝zk,也沒有關係,使用它自帶的配置啓動就能夠。

1)啓動自帶配置的zk

啓動zk命令:  bin/zookeeper-server-start.sh config/zookeeper.properties

zookeeper啓動成功

2)啓動kafka

kafka啓動:bin/kafka-server.start.sh config/server.properites

kafka啓動成功

3)建立一個topic

建立topic:bin/kafka-topics.sh --create --zookeeper kafka01:2181 --replication-factor 1 --partitions 1 --topic test

查看topic:bin/kafka-topicts.sh --list --zookeeper kafka01:2181

4)啓動生產者

命令:bin/kafka-console-producer.sh --broker-list kafka:9092 --topic test

啓動以後,輸入2行

5)啓動消費者

命令:bin/kafka-console-consumer.sh --zookeeper kafka01:9202 --topic test --from-beginning

消費者接收到2行消息

總結

1.啓動zk

命令: bin/zookeeper-server-start.sh config/zookeeper.properties

2.啓動kafka

命令:bin/kafka-server.start.sh config/server.properites

參數:指定kafka的配置文件

3.建立主題

命令:bin/kafka-topics.sh --create --zookeeper kafka01:2181 --replication-factor 1 --partitions 1 --topic test

參數:1)create:指定建立動做,2)zookeeper:指定zookeeper客戶端;3)replication-factor:指定主題副本個數;4)partitions:指定分區個數;5)topic:指定主題名稱

查看topic:bin/kafka-topicts.sh --list --zookeeper kafka01:2181

4.啓動生產者

命令:bin/kafka-console-producer.sh --broker-list kafka01:9092 --topic test

參數:1) broker-list:指定broker;2)topic:指定主題名,從參數能夠看出來,生產者是不直接和zk交互的

5.啓動消費者

命令:bin/kafka-console-consumer.sh --zookeeper kafka01:2181 --topic test --from-beginning

參數:1)zookeeper:指定zookeeper客戶端;2)topic:指定主題名,3)from-beginning:從topic頭開始讀取消息

2.2.2 多個broker初體驗

前面已經配置了一個單節點kafka服務,再次擴展演示kafka集羣多節點可用性、容錯性,也爲kafka分佈式集羣作好鋪墊。

1)配置kafka多節點服務

配置多節點服務,拷貝出2分配置server1.properties,server2.properties,修改2處參數

------------------------------server1.properties

broker.id=1

listeners=PLAINTEXT://:9092

log.dirs=/tmp/kafka1-logs

------------------------------server2.properties

broker.id=2

listeners=PLAINTEXT://:9093

log.dirs=/tmp/kafka2-logs

-----------------------------

2)啓動zookeeper服務

bin/zookeeper-server-start.sh config/zookeeper.properties

3)啓動2個kafka服務

bin/kafka-server-start.sh config/server1.properties

bin/kafka-server-start.sh config/server2.properties

4)建立topic主題

bin/kafka-topics.sh --create --zookeeper kafka01:2181 -replication-factor 1 --partitions 1 --test2

5)啓動生產者

bin/kafka-console-producer.sh --broker-list kafka02:9093 --topic test2

輸入內容

6)啓動消費者

bin/kafka-console-consumer.sh --zookeeper kafka01:2181 --topitc test2 --from-beginning

2) 測試多節點可用性

在生產者輸入內容,消費者端能夠獲取到消息

生產者端:

消費者端:

 

3)測試多節點容錯性

殺掉一個kafka服務,而後發送消息測試,消費者是否能政策收到消息

 

2.3 Kafka分佈式集羣構建

2.3.1 Kafka分佈式集羣構建

1.配置zookeeper集羣

1).解壓後,配置zoo.cfg,若是沒有從模板配置文件中拷貝出來

官網建議使用zookeeper3.4.x,3.4.9

http://kafka.apache.org/documentation.html#zk

這裏有各類版本的下載地址

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

咱們須要的版本3.4.9

https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz

配置以前建立目錄

命令:mkdir -p /usr/local/hadoop/zookeeper/data /usr/local/hadoop/zookeeper/log

修改zoo.cfg配置

命令:vi /usr/local/hadoop/zookeeper/zookeeper-3.4.5-cdh5.4.5/conf/zoo.cfg

內容:

----------------------------

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/usr/local/hadoop/zookeeper/data

dataLogDir=/usr/local/hadoop/zookeeper/log

clientPort=2181

server.1=kafka01:2287:3387

server.2=kafka02:2288:3387

server.3=kafka03:2289:3387

----------------------------

b.配置myid(在每一個節點上都這樣配置)

還有一個關鍵的設置,在每一個zk server配置文件的dataDir所對應的目錄下,必須建立一個名爲myid的文件,其中的內容必須與zoo.cfg中server.x 中的x相同,即:

-----------------------

/usr/local/hadoop/zookeeper/data/myid 中的內容爲1,對應server.1中的1

/usr/local/hadoop/zookeeper/data/myid 中的內容爲2,對應server.2中的2

/usr/local/hadoop/zookeeper/data/myid 中的內容爲3,對應server.3中的3

 -----------------------

c.關閉防火牆,而且各節點要配置好jdk

-------------

service iptables stop

chkconfig iptables off

-------------

d.啓動zookeeper

/usr/local/hadoop/zookeeper/zookeeper-3.4.5-cdh5.4.5/bin/zkServer.sh start

e.驗證服務

命令1:bin/zkServer.sh status

命令2:bin/zkServer.sh start

f.配置環境變量到/etc/profile

ZOOKEEPER_HOME=/usr/local/hadoop/zookeeper/zookeeper-3.3.6

PATH=$ZOOKEEPER_HOME/bin:$PATH

export PATH

KAFKA_HOME=/usr/local/hadoop/kafka/kafka_2.12-0.10.2.1

PATH=$ZOOKEEPER_HOME/bin:$PATH

export PATH

2.配置kafka集羣

1)準備目錄

命令:mkdir -p /usr/local/hadoop/kafka/log

2)修改server.properties 3個參數,

每一個節點的broker.id不同,本次實驗kafka01,kafka02,kafka03對應1,2,3

------------

broker.id=03

log.dirs=/usr/local/hadoop/kafka/log

zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181

------------

3)啓動kafka集羣

每個節點執行,

命令:bin/kafka-server-start.sh config/server.properties

4)驗證kafka集羣是否正常啓動

a.jps查看進程是否啓動,正常啓動,會有Kafka進程服務

b.查看/usr/local/hadoop/kafka/log下面是否有數據

c.查看zk中是否有kafka目錄

 

2.3.2 Kafka主題建立

1)相關命令

建立命令

bin/kafka-topics.sh --create --zookeeper kafka01:2181 -replication-factor 3 --partitions 2 --topic test3

查看命令

bin/kafka-topics.sh --list  --zookeeper kafka01:2181

查看分區個數、副本個數

bin/kafka-topics.sh --describe --zookeeper kafka01:2181 --topic test

2)相關說明

從這裏能夠看到不少關於主題的信息,總要包含:

· 主題的leader:讀寫都從這裏進行隨機選擇

· 主題的副本數:副本數,節點列表

· isr:同步複製

·主題的分區數:2個分區

zk保留了副本之間的leader和隨從信息,每一個副本週期性同步到磁盤

1.每一個分區有N個副本,能夠承受N-1個節點故障,ZK承受N-1/2個故障,若是3個節點,掛了2個,那就不行了。每一個副本都有本身的leader,其他的都是follower,zk存放分區的leader和all replica的信息

2.每一個副本存儲消息的部分數據在本地的log和offset中,週期性同步到磁盤中去確保消息寫入所有

副本或者其中一個

3.leader故障時,消息或者寫入本地log,或則在producer在收到ack消息前,重新發送消息給新的leader

這些信息都是保留在zookeeper中的。進一步去zookeeper觀察

看到有2個topic

進入到0的目錄下查看

再看看kafka的log目錄,實際的數據是保存在kafka的log目錄下,雖然尚未寫數據,可是相關目錄已經準備好了相關存放文件和目錄了。

 

2.3.3 生產者生產數據

1)相關命令

bin/kafka-console-producer.sh --broker-list kafka02:9092 --topic test2

這裏要注意端口要和配置文件的保持一致,筆者由於前面演示了單機版的多broker(9093端口),端口沒有改正,致使消費者無法消費數據,白白浪費了不少時間排查問題。

2)相關說明

從命令能夠看出,生產者是和broker直接交互,broker使用zk協同工具來管理多個broker

broker:broker不知道誰消費了消息,並不維護哪一個消費者消費了消息

消費者組:每一個組中只有一個消費者能夠消費消息(全部的消費者都在一個組》隊列模式,都有本身的組》訂閱模式),經過消費者組贊成了

消費者:維護了消費消息的狀態,broker不知道誰消費了消息,並不維護哪一個消費者消費了消息,消費者本身知道的。

2.2.4 消費者消費數據

1)相關命令

bin/kafka-console-consumer.sh --zookeeper kafka02:2181 --topic test2 --from-beginning

2)相關說明

1.消息緩存與filesystem的存儲,數據是當即被即刻寫入OS的內核頁而且緩存以及清理磁盤(能夠配置)

2.消息被消費後,kafka能長時間駐留消息在服務器,容許重複消費

3.對分組消息使用了消息set,防止網絡過載

4.在服務器存放消費的信息,kafka是在消費者端存放,消費者保持消息的狀態

5.消費者狀態默認是在zk中,也容許存到到其餘OLTP,好比數據庫

6.Kafka中生產和消費是點心的pull-push

生產者pull(write,輸入流),消費者push(read,輸出流,拉)

7.沒有主從模式,全部的broker的地位相同,broker數據均在zk中維護

並在producer之間共享

8.負載均衡策略,loadbalance,容許producer動態發現broker

9.producer生產者維護了一個broker鏈接池,並能經過zk的callback進行實時更新

10.producer能夠選擇同步或者異步的方式發送消息給broker

打電話:同步,阻塞的都是同步的,NIO的特色就是非阻塞,IO就是阻塞的

發短息:異步,你收不收,知不知道,我無論,我先去幹其餘的事情

2.2.5  消息的壓縮

Kafka設計的初衷是迅速處理短小的消息,通常10K大小的消息吞吐性能最好(可參見LinkedIn的kafka性能測試)。但有時候,咱們須要處理更大的消息,好比XML文檔或JSON內容,一個消息差很少有10-100M,這種狀況下,Kakfa應該如何處理?

kafka producer設置compression.type=snappy

2.4 Kafka在ZK目錄節點

2.4.1  kafka鏡像原理

將元集羣的數據副本化給target kafka集羣,目標kafka集羣就看成一個消費者消費數據實現數據的備份。

2.4.2  Kafka副本模型

同步模型(同步複製):生產者從zk找leader,併發送message,消息當即寫入本地log,follow開始拉取消息,每一個follow將消息寫入各自本地的log,向leader發送確認回執。leader在收到全部的follow的確認回執和本地副本寫入工做均完成後,再向producer發送確認回執。生產者客戶端是阻塞的,消費者的數據pull從leader中完成。

 

異步模型:leader的本地log寫入完成立刻向生產者發送回執,leader不等待follow的回執,follow行不行,成不成功,無論。

2.4.3 在ZK目錄節點內容

/brokers/topics/topic:存儲某個topic的partitions全部分配信息

/brokers/topics/[topic]/partitions/[0...N]:partitions狀態信息

/brokers/ids/[0...N]:每一個broker的配置文件中都須要指定一個數字類型的id(全局不可重複),此節點爲臨時znode(EPHEMERAL).

/controller_epoch -> int (epoch) :此值爲一個數字,kafka集羣中第一個broker第一次啓動時爲1,之後只要集羣中center controller中央控制器所在broker變動或掛掉,就會從新選舉新的center controller,每次center controller變動controller_epoch值就會 + 1; 

/controller -> int (broker id of the controller) :存儲center controller中央控制器所在kafka broker的信息

 

2.5 實體間交互流程

zookeeper在kafka扮演重要角色,Kafka使用zookeeper做爲其分佈式協調框架,很好的將消息生產、消息存儲、消息消費的過程結合在一塊兒。同時藉助zookeeper,kafka可以生產者、消費者和broker在內的因此組件在無狀態的狀況下,創建起生產者和消費者的訂閱關係,並實現生產者與消費者的負載均衡。

2.5.1主題與zk  

在kafka中,用戶能夠自定義多個topic,每一個topic又能夠劃分爲多個分區,一半狀況下,每一個分區存儲在一個獨立的broker上。全部這些topic與broker的對應關係都有zookeeper進行維護。

在zookeeper中,創建專門的節點來記錄這些信息,其節點路徑爲/brokers/topics/{topic_name},如:

[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics

[toptic_t, test, my-replicated-topic, mykafka, mykafka6, mykafka5, mykafka4, test6, mykafka3, test7, mykafka2]

[zk: localhost:2181(CONNECTED) 17] get /brokers/topics/mykafka4  

{"version":1,"partitions":{"1":[102,103,104],"2":[103,104,102],"0":[104,102,103]}}

針對topic 的每個分區與broker的對應關係,zookeeper經過節點 /brokers/topics/topic.name來記錄,如:

當broker啓動時,會到對應topic節點下注冊本身的broker.id到對應分區的isr列表中,如:

[zk: localhost:2181(CONNECTED) 23] get /brokers/topics/mykafka4/partitions/1/state

{"controller_epoch":15,"leader":102,"version":1,"leader_epoch":2,"isr":[102,103,104]}

一樣的,當broker退出數,也會觸發zookeeper更新其對應topic分區的isr列表,並決定是否須要作消費者的負載均衡。

2.5.2 消費者與zk

l 註冊新的消費者分組

當新的消費者組註冊到zookeeper中時,zookeeper會建立專用的節點來保存相關信息,其節點路徑爲ls /consumers/{group_id},其節點下有三個子節點,分別爲[ids, owners, offsets]。

Ø ids節點:記錄該消費組中當前正在消費的消費者;

Ø owners節點:記錄該消費組消費的topic信息;

Ø offsets節點:記錄每一個topic的每一個分區的offset,如:

[zk: localhost:2181(CONNECTED) 54] get /consumers/test-consumer2-group/offsets/mykafka4/0

142

l 註冊新的消費者

當新的消費者註冊到kafka中時,會在/consumers/{group_id}/ids節點下建立臨時子節點,並記錄相關信息,如:

[zk: localhost:2181(CONNECTED) 57] ls /consumers/test-consumer2-group/ids/test-consumer2-group_dev103-1433562901087-7b517b97

[]

[zk: localhost:2181(CONNECTED) 58] get /consumers/test-consumer2-group/ids/test-consumer2-group_dev103-1433562901087-7b517b97

{"version":1,"subscription":{"mykafka5":1},"pattern":"white_list","timestamp":"1433562901290"}

 

l 監聽消費者分組中消費者的變化

每一個消費者都要關注其所屬消費者組中消費者數目的變化,即監聽/consumers/{group_id}/ids下子節點的變化。一單發現消費者新增或減小,就會觸發消費者的負載均衡。

2.5.3 broker與與zk

爲了記錄broker的註冊信息,在zookeeper上,專門建立了屬於kafka的一個節點,其路徑爲/brokers,如:

 [zk: localhost:2181(CONNECTED) 1] ls /brokers

[ids, topics]

Kafka的每一個broker啓動時,都會到zookeeper中進行註冊,告訴zookeeper其broker.id, 在整個集羣中,broker.id應該全局惟一,並在zookeeper上建立其屬於本身的節點,其節點路徑爲/brokers/ids/{broker.id}. 如:

 [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids

[102, 103]

建立完節點後,kafka會將該broker的broker.name及端口號記錄到改節點,如

[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/102

{"jmx_port":-1,"timestamp":"1433209686575","host":"host102","version":1,"port":9092}

另外,改broker節點屬性爲臨時節點,當broker會話失效時,zookeeper會刪除該節點,這樣,咱們就能夠很方便的監控到broker節點的變化,及時調整負載均衡等。

 

2.5.4 消費者與消費者組

a.每一個consumer客戶端被建立時,會向zookeeper註冊本身的信息;

b.此做用主要是爲了"負載均衡".

c.同一個Consumer Group中的Consumers,Kafka將相應Topic中的每一個消息只發送給其中一個Consumer。

d.Consumer Group中的每一個Consumer讀取Topic的一個或多個Partitions,而且是惟一的Consumer;

e.一個Consumer group的多個consumer的全部線程依次有序地消費一個topic的全部partitions,若是Consumer group中全部consumer總線程大於partitions數量,則會出現空閒狀況;

舉例說明:

kafka集羣中建立一個topic爲report-log   4 partitions 索引編號爲0,1,2,3

假若有目前有三個消費者node:注意-->一個consumer中一個消費線程能夠消費一個或多個partition.

若是每一個consumer建立一個consumer thread線程,各個node消費狀況以下,node1消費索引編號爲0,1分區,node2費索引編號爲2,node3費索引編號爲3

若是每一個consumer建立2個consumer thread線程,各個node消費狀況以下(是從consumer node前後啓動狀態來肯定的),node1消費索引編號爲0,1分區;node2費索引編號爲2,3;node3爲空閒狀態

總結:

從以上可知,Consumer Group中各個consumer是根據前後啓動的順序有序消費一個topic的全部partitions的。

若是Consumer Group中全部consumer的總線程數大於partitions數量,則可能consumer thread或consumer會出現空閒狀態。

Consumer均衡算法

當一個group中,有consumer加入或者離開時,會觸發partitions均衡.均衡的最終目的,是提高topic的併發消費能力.

1) 假如topic1,具備以下partitions: P0,P1,P2,P3

2) 加入group中,有以下consumer: C0,C1

3) 首先根據partition索引號對partitions排序: P0,P1,P2,P3

4) 根據(consumer.id + '-'+ thread序號)排序: C0,C1

5) 計算倍數: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)

6) 而後依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]

3.參考資料

1.http://blog.jobbole.com/75328/ 分佈式消息系統:Kafka

2.http://blog.csdn.net/lizhitao/article/details/23744675

3.http://blog.csdn.net/opensure/article/details/46048589Kafka文件存儲機制那些事

4.http://blog.csdn.net/liuao107329/article/details/70175691 Zookeeper在kafka中的應用

5.https://wenku.baidu.com/view/20781790b52acfc788ebc955.html

相關文章
相關標籤/搜索