kafka學習總結

KAFKA學習總結

1、 簡介

KAFKAApache基金會的一個開源項目,是一個分佈式的發佈-訂閱的消息系統;html

KAFKA用途普遍,能夠用做消息隊列,也能夠用做日誌系統,還有其餘一些應用,在此不做詳細介紹;java

2、 組成

按照不一樣模塊的職責來區分,一個正常運行的KAFKA共有四個部分:Zookeeper,Broker,ProducerConsumer;apache

Producer

消息產生者,產生消息,發送到Broker;併發

Consumer

消息Consumer,Broker獲得消息;eclipse

Broker

亦即KAFKAServer;中間人,消息中轉站;一堆Broker組成的集羣叫作cluster分佈式

Zookeeper

管理鏈接信息,包括各個節點的IP,端口等;ProducerConsumer須要到Zookeeper請求Broker的信息,從而進行消息的收發;一個新的Broker的啓動也須要到Zookeeper來註冊; zookeeper也能夠配集羣。目的是防止某一臺掛了;學習

 producerconsumer經過zookeeper去發現topic,而且經過zookeeper來協調生產和消費的過程;編碼

3、 術語

除了上一節中提到的四個部分以外,KAFKA還包括一些其餘概念,現介紹以下:spa

Topic

Topic,KAFKA對消息分類的依據;一條消息,必須有一個與之對應的Topic;.net

好比如今又兩個Topic,分別是LoveHate,ProducerLove發送一個消息XiJinping,而後向Hate發送一個消息Obama;那麼,訂閱LoveConsumer就會收到消息XiJinping,訂閱HateConsumer就會收到消息Obama;(每一個Consumer能夠同時訂閱多個Topic,也便是說,同時訂閱LoveHateConsumer能夠收到XiJinpingObama);

Message

Message就是咱們所說的消息,KAfKA操做的對象,消息是按照Topic存儲的;

KAFKA中按照必定的期限保存着全部發布過的Message,無論這些Message是否被消費過;例如這些Message的保存期限被這隻爲兩天,那麼一條Message從發佈開始的兩天時間內是可用的,超過保存期限的消息會被清空以釋放存儲空間;

Partition

每個Topic能夠有多個Partition,這樣作是爲了提升KAFKA系統的併發能力;

每一個Partition中按照消息發送的順序保存着Producer發來的消息,每一個消息用ID標識,表明這個消息在改Partition中的偏移量,這樣,知道了ID,就能夠方便的定位一個消息了;每一個新提交過來的消息,被追加到Partition的尾部;若是一個Partition被寫滿了,就再也不追加;(注意,KAFKA不保證不一樣Partition之間的消息有序保存)

Leader

Partition中負責消息讀寫的節點;Leader是從Partition的節點中隨機選取的;

ReplicationFactor

一個Partition中複製數據的全部節點,包括已經掛了的;

isr

ReplicationFactor的子集,存活的且和Leader保持同步的節點;

Consumer Group

傳統的消息系統提供兩種使用方式:隊列和發佈-訂閱;

隊列,是一個池中有若干個Consumer,一條消息發出來之後,被其中的一個Consumer消費;

發佈-訂閱,是一個消息被廣播出去,以後被全部訂閱該主題的Consumer消費;

KAFKA提供的使用方式能夠達到以上兩種方式的效果:Consumer Group;

每個ConsumerConsumer Group Name標識本身,當一條消息產生後,改消息被訂閱了其TopicConsumer Group收到,以後被這個Consumer Group中的一個Consumer消費;

若是全部的Consumer都在同一個Consumer Group,那麼這就和傳統的隊列形式的消息系統同樣了;

若是每個Consumer都在一個不一樣的Consumer Group,那麼就和傳統的發佈-訂閱的形式同樣了;

上個圖片:

一個Topic,兩個Broker,五個Partition,大概是這麼個樣子:

 

4、 使用

配置

根據前面提到的四個部分再加上日誌,在配置文件中分別有五類配置文件;

下面簡要說幾項比較基本的配置:

Ÿ zookeeper.properties

dataDir:用於存放zookeeper生成的數據文件,默認放在/tmp/zookeeper路徑下;

clientPort:zookeeper監聽的端口;

Ÿ server.properties

broker.id:broker的惟一標識,整數;

port:broker監聽的端口;

host.name:broker綁定到的IP,若是不設置,將綁定到全部的接口;(官網中的配置文件是這麼說的,我也不知道什麼叫綁定到全部的接口,多是和端口的說明寫混了)

log.dirs:broker存放數據文件的地方,默認在/tmp/kafuka-logs/

zookeeper.connect:

Ÿ producer.properties

metadata.broker.list:brokerip和端口(我看的文檔裏說producerconsumer都是從zookeeper得到broker的信息,可是這裏又配置producer的信息,不知道是什麼意思);

Ÿ consumer.properties

zookeeper.connect:zookeeperip和端口

group.id:consumer所屬的consumer groupid

producerconsumer的配置文件,官網上的demo沒用到這兩個文件,直接在命令行裏輸入了參數,

Ÿ 日誌配置文件,目前沒用到,不作介紹;

操做步驟

1) 初始化zookeeper;

2) 初始化broker;

3) 建立Topic(若是不顯示建立Topic,Producer在發送Message的時候回自動建立,可是諸如Partition等屬性就沒法自定義了,失去了靈活性,因此不建議不建立Topic);

4) 初始化producerconsumer,這兩步沒有前後順序;

5) 產生Message,消費Message,這兩部也沒有前後順序;

關於producer中配置broker的問題:

KAFKA官網上的java代碼和命令行demo,都有在Producer中直接配置broke的地址信息,而我看的一篇介紹文檔中,java代碼裏沒有出現props.put("metadata.broker.list", "xxx.xxx.xxx.xxx:xxxx");

而是

props.put("zk.connect", "xxx.xxx.xxx.xxx:xxxx");

可是這種配置方式在個人eclipse裏拋出異常了,說是沒有發現"metadata.broker.list"的配置;

猜想大概是版本變化的緣由;

應用於java項目中

具體java編碼的方式,網上一堆,我就不copy了,簡單說一下producer和consumer端都須要哪幾個部分:

Ÿ producer:

主類,發送Message的邏輯放在裏邊;

實現Partitioner的類:根據業務邏輯,將Message發送到不一樣的Partition中,若是不實現此接口,KAFKA有一個默認的類;

實現kafka.serializer.Encoder的類,用於封裝Message給KAFKA進行解析和傳送;

Ÿ consumer:

沒有必須實現的接口,不過在官網的demo中將部分接收Message的邏輯抽取出來單獨弄了個類而且實現了Runnable接口,這種作法,根據具體的業務邏輯進行變換就好了;

參考文獻: 

http://kafka.apache.org/documentation.html

http://my.oschina.net/ielts0909/blog/117489

相關文章
相關標籤/搜索