KAFKA是Apache基金會的一個開源項目,是一個分佈式的發佈-訂閱的消息系統;html
KAFKA用途普遍,能夠用做消息隊列,也能夠用做日誌系統,還有其餘一些應用,在此不做詳細介紹;java
按照不一樣模塊的職責來區分,一個正常運行的KAFKA共有四個部分:Zookeeper,Broker,Producer和Consumer;apache
消息產生者,產生消息,發送到Broker;併發
消息Consumer,從Broker獲得消息;eclipse
亦即KAFKA的Server;中間人,消息中轉站;一堆Broker組成的集羣叫作cluster分佈式
管理鏈接信息,包括各個節點的IP,端口等;Producer和Consumer須要到Zookeeper請求Broker的信息,從而進行消息的收發;一個新的Broker的啓動也須要到Zookeeper來註冊; zookeeper也能夠配集羣。目的是防止某一臺掛了;學習
producer和consumer經過zookeeper去發現topic,而且經過zookeeper來協調生產和消費的過程;編碼
除了上一節中提到的四個部分以外,KAFKA還包括一些其餘概念,現介紹以下:spa
Topic,是KAFKA對消息分類的依據;一條消息,必須有一個與之對應的Topic;.net
好比如今又兩個Topic,分別是Love和Hate,Producer向Love發送一個消息XiJinping,而後向Hate發送一個消息Obama;那麼,訂閱Love的Consumer就會收到消息XiJinping,訂閱Hate的Consumer就會收到消息Obama;(每一個Consumer能夠同時訂閱多個Topic,也便是說,同時訂閱Love和Hate的Consumer能夠收到XiJinping和Obama);
Message就是咱們所說的消息,是KAfKA操做的對象,消息是按照Topic存儲的;
KAFKA中按照必定的期限保存着全部發布過的Message,無論這些Message是否被消費過;例如這些Message的保存期限被這隻爲兩天,那麼一條Message從發佈開始的兩天時間內是可用的,超過保存期限的消息會被清空以釋放存儲空間;
每個Topic能夠有多個Partition,這樣作是爲了提升KAFKA系統的併發能力;
每一個Partition中按照消息發送的順序保存着Producer發來的消息,每一個消息用ID標識,表明這個消息在改Partition中的偏移量,這樣,知道了ID,就能夠方便的定位一個消息了;每一個新提交過來的消息,被追加到Partition的尾部;若是一個Partition被寫滿了,就再也不追加;(注意,KAFKA不保證不一樣Partition之間的消息有序保存)
Partition中負責消息讀寫的節點;Leader是從Partition的節點中隨機選取的;
一個Partition中複製數據的全部節點,包括已經掛了的;
ReplicationFactor的子集,存活的且和Leader保持同步的節點;
傳統的消息系統提供兩種使用方式:隊列和發佈-訂閱;
隊列,是一個池中有若干個Consumer,一條消息發出來之後,被其中的一個Consumer消費;
發佈-訂閱,是一個消息被廣播出去,以後被全部訂閱該主題的Consumer消費;
KAFKA提供的使用方式能夠達到以上兩種方式的效果:Consumer Group;
每個Consumer用Consumer Group Name標識本身,當一條消息產生後,改消息被訂閱了其Topic的Consumer Group收到,以後被這個Consumer Group中的一個Consumer消費;
若是全部的Consumer都在同一個Consumer Group中,那麼這就和傳統的隊列形式的消息系統同樣了;
若是每個Consumer都在一個不一樣的Consumer Group中,那麼就和傳統的發佈-訂閱的形式同樣了;
一個Topic,兩個Broker,五個Partition,大概是這麼個樣子:
根據前面提到的四個部分再加上日誌,在配置文件中分別有五類配置文件;
下面簡要說幾項比較基本的配置:
Ÿ zookeeper.properties
dataDir:用於存放zookeeper生成的數據文件,默認放在/tmp/zookeeper路徑下;
clientPort:zookeeper監聽的端口;
Ÿ server.properties
broker.id:broker的惟一標識,整數;
port:broker監聽的端口;
host.name:broker綁定到的IP,若是不設置,將綁定到全部的接口;(官網中的配置文件是這麼說的,我也不知道什麼叫綁定到全部的接口,多是和端口的說明寫混了)
log.dirs:broker存放數據文件的地方,默認在/tmp/kafuka-logs/
zookeeper.connect:
Ÿ producer.properties
metadata.broker.list:broker的ip和端口(我看的文檔裏說producer和consumer都是從zookeeper得到broker的信息,可是這裏又配置producer的信息,不知道是什麼意思);
Ÿ consumer.properties
zookeeper.connect:zookeeper的ip和端口
group.id:consumer所屬的consumer group的id
producer和consumer的配置文件,官網上的demo沒用到這兩個文件,直接在命令行裏輸入了參數,
Ÿ 日誌配置文件,目前沒用到,不作介紹;
1) 初始化zookeeper;
2) 初始化broker;
3) 建立Topic(若是不顯示建立Topic,Producer在發送Message的時候回自動建立,可是諸如Partition等屬性就沒法自定義了,失去了靈活性,因此不建議不建立Topic);
4) 初始化producer和consumer,這兩步沒有前後順序;
5) 產生Message,消費Message,這兩部也沒有前後順序;
在KAFKA官網上的java代碼和命令行demo中,都有在Producer中直接配置broke的地址信息,而我看的一篇介紹文檔中,java代碼裏沒有出現props.put("metadata.broker.list", "xxx.xxx.xxx.xxx:xxxx");
而是
props.put("zk.connect", "xxx.xxx.xxx.xxx:xxxx");
可是這種配置方式在個人eclipse裏拋出異常了,說是沒有發現"metadata.broker.list"的配置;
猜想大概是版本變化的緣由;
具體java編碼的方式,網上一堆,我就不copy了,簡單說一下producer和consumer端都須要哪幾個部分:
Ÿ producer:
主類,發送Message的邏輯放在裏邊;
實現Partitioner的類:根據業務邏輯,將Message發送到不一樣的Partition中,若是不實現此接口,KAFKA有一個默認的類;
實現kafka.serializer.Encoder的類,用於封裝Message給KAFKA進行解析和傳送;
Ÿ consumer:
沒有必須實現的接口,不過在官網的demo中將部分接收Message的邏輯抽取出來單獨弄了個類而且實現了Runnable接口,這種作法,根據具體的業務邏輯進行變換就好了;