kafka入門

問題導讀:
1.zookeeper在kafka的做用是什麼?
2.kafka中幾乎不容許對消息進行「隨機讀寫」的緣由是什麼?
3.kafka集羣consumer和producer狀態信息是如何保存的?
4.partitions設計的目的的根本緣由是什麼?




 

1、入門
    一、簡介
    Kafka is a distributed,partitioned,replicated commit logservice。它提供了相似於JMS的特性,可是在 設計實現上徹底不一樣,此外它並非JMS規範的實現。kafka對消息保存時根據Topic進行歸類,發送消息者成爲Producer,消息接受者成爲Consumer,此外kafka集羣有多個kafka實例組成,每一個實例(server)成爲broker。不管是kafka集羣,仍是producer和consumer都依賴於zookeeper來保證系統可用性集羣保存一些meta信息。
<ignore_js_op>  
 
   二、Topics/logs
    一個Topic能夠認爲是一類消息,每一個topic將被分紅多個partition(區),每一個partition在存儲層面是append log文件。任何發佈到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱爲offset(偏移量),offset 爲一個long型數字,它是惟一標記一條消息。它惟一的標記一條消息。kafka並無提供其餘額外的索引機制來存儲offset,由於在kafka中幾 乎不容許對消息進行「隨機讀寫」。

 

<ignore_js_op>  

 

    kafka和JMS(Java Message Service)實現(activeMQ)不一樣的是:即便消息被消費,消息仍然不會被當即刪除.日誌文件將會根據broker中的配置要求,保留必定的時 間以後刪除;好比log文件保留2天,那麼兩天後,文件會被清除,不管其中的消息是否被消費.kafka經過這種簡單的手段,來釋放磁盤空間,以及減小消 息消費以後對文件內容改動的磁盤IO開支.
 
    對於consumer而言,它須要保存消費消息的offset,對於offset的保存和使用,有consumer來控制;當consumer正常消費消 息時,offset將會"線性"的向前驅動,即消息將依次順序被消費.事實上consumer可使用任意順序消費消息,它只須要將offset重置爲任 意值..(offset將會保存在zookeeper中,參見下文)
 
    kafka集羣幾乎不須要維護任何consumer和producer狀態信息,這些信息有zookeeper保存;所以producer和consumer的 客戶端實現很是輕量級,它們能夠隨意離開,而不會對集羣形成額外的影響.
 
    partitions的 設計目的有多個.最根本緣由是kafka基於文件存儲.經過分區,能夠將日誌內容分散到多個server上, 來避免文件尺寸達到單機磁盤的上限,每一個partiton都會被當前server(kafka實例)保存;能夠將一個topic切分多任意多個 partitions,來消息保存/消費的效率.此外越多的partitions意味着能夠容納更多的consumer,有效提高併發消費的能力.(具體 原理參見下文).
 
    三、Distribution
    一個Topic的多個partitions,被分佈在kafka集羣中的多個server上;每一個server(kafka實例)負責 partitions中消息的讀寫操做;此外kafka還能夠配置partitions須要備份的個數(replicas),每一個partition將會 被備份到多臺機器上,以提升可用性.
 
    基於replicated方案,那麼就意味着須要對多個備份進行調度;每一個partition都有一個 server爲"leader";leader 負責全部的讀寫操做,若是leader失效,那麼將會有其餘follower來接管(成爲新的leader);follower只是單調的和leader 跟進,同步消息便可..因而可知做爲leader的server承載了所有的請求壓力,所以從集羣的總體考慮,有多少個partitions就意味着有多 少個"leader",kafka會將"leader"均衡的分散在每一個實例上,來確保總體的性能穩定.
 
    Producers
    Producer將消息發佈到指定的Topic中,同時Producer也能決定將此消息歸屬於哪一個partition;好比基於"round-robin"方式或者經過其餘的一些算法等.
 
    Consumers
    本質上kafka只支持Topic.每一個consumer屬於一個consumer group;反過來講,每一個group中能夠有多個consumer.發送到Topic的消息,只會被訂閱此Topic的每一個group中的一個consumer消費.
 
    若是全部的consumer都具備相同的group,這種狀況和queue模式很像;消息將會在consumers之間負載均衡.
    若是全部的consumer都具備不一樣的group,那這就是"發佈-訂閱";消息將會廣播給全部的消費者.
    在kafka中,一個partition中的消息只會被group中的一個consumer消費;每一個group中consumer消息消費互相獨立;我 們能夠認爲一個group是一個"訂閱"者,一個Topic中的每一個partions,只會被一個"訂閱者"中的一個consumer消費,不過一個 consumer能夠消費多個partitions中的消息.kafka只能保證一個partition中的消息被某個consumer消費時,消息是順 序的.事實上,從Topic角度來講,消息仍不是有序的.
 
    kafka的 設計原理決定,對於一個topic,同一個group中不能有多於partitions個數的consumer同時消費,不然將意味着某些consumer將沒法獲得消息.
 
    Guarantees
    1) 發送到partitions中的消息將會按照它接收的順序追加到日誌中
    2) 對於消費者而言,它們消費消息的順序和日誌中消息順序一致.
    3) 若是Topic的"replicationfactor"爲N,那麼容許N-1個kafka實例失效.
 
2、使用場景
 
    一、Messaging   
    對於一些常規的消息系統,kafka是個不錯的選擇;partitons/replication和容錯,可使kafka具備良好的擴展性和性能優點. 不過到目前爲止,咱們應該很清楚認識到,kafka並無提供JMS中的"事務性""消息傳輸擔保(消息確認機制)""消息分組"等企業級特 性;kafka只能使用做爲"常規"的消息系統,在必定程度上,還沒有確保消息的發送與接收絕對可靠(好比,消息重發,消息發送丟失等)
 
    二、Websit activity tracking
    kafka能夠做爲"網站活性跟蹤"的最佳工具;能夠將網頁/用戶操做等信息發送到kafka中.並實時監控,或者離線統計分析等

 

    三、Log Aggregation
    kafka的特性決定它很是適合做爲"日誌收集中心";application能夠將操做日誌"批量""異步"的發送到kafka集羣中,而不是保存在本 地或者DB中;kafka能夠批量提交消息/壓縮消息等,這對producer端而言,幾乎感受不到性能的開支.此時consumer端可使 hadoop等其餘系統化的存儲和分析系統.
 
3、設計原理
 
    kafka的 設計初衷是但願做爲一個統一的信息收集平臺,可以實時的收集反饋信息,並須要可以支撐較大的數據量,且具有良好的容錯能力.
 
    一、持久性
    kafka使用文件存儲消息,這就直接決定kafka在性能上嚴重依賴文件系統的自己特性.且不管任何OS下,對文件系統自己的優化幾乎沒有可能.文件緩 存/直接內存映射等是經常使用的手段.由於kafka是對日誌文件進行append操做,所以磁盤檢索的開支是較小的;同時爲了減小磁盤寫入的次 數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到必定閥值時,再flush到磁盤,這樣減小了磁盤IO調用的次數.

二、性能
    須要考慮的影響性能點不少,除磁盤IO以外,咱們還須要考慮網絡IO,這直接關係到kafka的吞吐量問題.kafka並無提供太多高超的技巧;對於 producer端,能夠將消息buffer起來,當消息的條數達到必定閥值時,批量發送給broker;對於consumer端也是同樣,批量 fetch多條消息.不過消息量的大小能夠經過配置文件來指定.對於kafka broker端,彷佛有個sendfile系統調用能夠潛在的提高網絡IO的性能:將文件的數據映射到系統內存中,socket直接讀取相應的內存區域即 可,而無需進程再次copy和交換. 其實對於producer/consumer/broker三者而言,CPU的開支應該都不大,所以啓用消息壓縮機制是一個良好的策略;壓縮須要消耗少許 的CPU資源,不過對於kafka而言,網絡IO更應該須要考慮.能夠將任何在網絡上傳輸的消息都通過壓縮.kafka支持gzip/snappy等多種 壓縮方式.
 
    三、生產者
    負載均衡: producer將會和Topic下全部partition leader保持socket鏈接;消息由producer直接經過socket發送到broker,中間不會通過任何"路由層".事實上,消息被路由到 哪一個partition上,有producer 客戶端決定.好比能夠採用"random""key-hash""輪詢"等,若是一個topic中有多個partitions,那麼在producer端實現"消息均衡分發"是必要的.
 
    其中partition leader的位置(host:port)註冊在zookeeper中,producer做爲zookeeper client,已經註冊了watch用來監聽partition leader的變動事件.
    異步發送:將多條消息暫且在客戶端buffer起來,並將他們批量的發送到broker,小數據IO太多,會拖慢總體的網絡延遲,批量延遲發送事實上提高了網絡效率。不過這也有必定的隱患,好比說當producer失效時,那些還沒有發送的消息將會丟失。

 

    四、消費者
    consumer端向broker發送"fetch"請求,並告知其獲取消息的offset;此後consumer將會得到必定條數的消息;consumer端也能夠重置offset來從新消費消息.
 
    在JMS實現中,Topic模型基於push方式,即broker將消息推送給consumer端.不過在kafka中,採用了pull方式,即 consumer在和broker創建鏈接以後,主動去pull(或者說fetch)消息;這中模式有些優勢,首先consumer端能夠根據本身的消費 能力適時的去fetch消息並處理,且能夠控制消息消費的進度(offset);此外,消費者能夠良好的控制消息消費的數量,batch fetch.
 
    其餘JMS實現,消息消費的位置是有prodiver保留,以便避免重複發送消息或者將沒有消費成功的消息重發等,同時還要控制消息的狀態.這就要求 JMS broker須要太多額外的工做.在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態的控制,也沒有複雜的消 息確認機制,可見kafka broker端是至關輕量級的.當消息被consumer接收以後,consumer能夠在本地保存最後消息的offset,並間歇性的向 zookeeper註冊offset.因而可知,consumer 客戶端也很輕量級.
 
<ignore_js_op>  



    五、消息傳送機制
    對於JMS實現,消息傳輸擔保很是直接:有且只有一次(exactly once).在kafka中稍有不一樣:
    1) at most once: 最多一次,這個和JMS中"非持久化"消息相似.發送一次,不管成敗,將不會重發.
    2) at least once: 消息至少發送一次,若是消息未能接受成功,可能會重發,直到接收成功.
    3) exactly once: 消息只會發送一次.
    at most once: 消費者fetch消息,而後保存offset,而後處理消息;當client保存offset以後,可是在消息處理過程當中出現了異常,致使部分消息未能繼 續處理.那麼此後"未處理"的消息將不能被fetch到,這就是"at most once".
    at least once: 消費者fetch消息,而後處理消息,而後保存offset.若是消息處理成功以後,可是在保存offset階段zookeeper異常致使保存操做未能 執行成功,這就致使接下來再次fetch時可能得到上次已經處理過的消息,這就是"at least once",緣由offset沒有及時的提交給zookeeper,zookeeper恢復正常仍是以前offset狀態.
    exactly once: kafka中並無嚴格的去實現(基於2階段提交,事務),咱們認爲這種策略在kafka中是沒有必要的.
    一般狀況下"at-least-once"是咱們搜選.(相比at most once而言,重複接收數據總比丟失數據要好).
 
    六、複製備份
    kafka將每一個partition數據複製到多個server上,任何一個partition有一個leader和多個follower(能夠沒有); 備份的個數能夠經過broker配置文件來設定.leader處理全部的read-write請求,follower須要和leader保持同 步.Follower和consumer同樣,消費消息並保存在本地日誌中;leader負責跟蹤全部的follower狀態,若是follower"落 後"太多或者失效,leader將會把它從replicas同步列表中刪除.當全部的follower都將一條消息保存成功,此消息才被認爲 是"committed",那麼此時consumer才能消費它.即便只有一個replicas實例存活,仍然能夠保證消息的正常發送和接收,只要 zookeeper集羣存活便可.(不一樣於其餘分佈式存儲,好比hbase須要"多數派"存活才行)
    當leader失效時,需在followers中選取出新的leader,可能此時follower落後於leader,所以須要選擇一個"up-to-date"的follower.選擇follower時須要兼顧一個問題,就是新leader server上所已經承載的partition leader的個數,若是一個server上有過多的partition leader,意味着此server將承受着更多的IO壓力.在選舉新leader,須要考慮到"負載均衡".
 
    7.日誌
    若是一個topic的名稱爲"my_topic",它有2個partitions,那麼日誌將會保存在my_topic_0和my_topic_1兩個目 錄中;日誌文件中保存了一序列"log entries"(日誌條目),每一個log entry格式爲"4個字節的數字N表示消息的長度" + "N個字節的消息內容";每一個日誌都有一個offset來惟一的標記一條消息,offset的值爲8個字節的數字,表示此消息在此partition中所 處的起始位置..每一個partition在物理存儲層面,有多個log file組成(稱爲segment).segmentfile的命名爲"最小offset".kafka.例如"00000000000.kafka"; 其中"最小offset"表示此segment中起始消息的offset.
<ignore_js_op>  
    其中每一個partiton中所持有的segments列表信息會存儲在zookeeper中.
    當segment文件尺寸達到必定閥值時(能夠經過配置文件設定,默認1G),將會建立一個新的文件;當buffer中消息的條數達到閥值時將會觸發日誌 信息flush到日誌文件中,同時若是"距離最近一次flush的時間差"達到閥值時,也會觸發flush到日誌文件.若是broker失效,極有可能會 丟失那些還沒有flush到文件的消息.由於 server意外實現,仍然會致使log文件格式的破壞(文件尾部),那麼就要求當server啓東是須要檢測最後一個segment的文件結構是否合法並進行必要的修復.
    獲取消息時,須要指定offset和最大chunk尺寸,offset用來表示消息的起始位置,chunk size用來表示最大獲取消息的總長度(間接的表示消息的條數).根據offset,能夠找到此消息所在segment文件,而後根據segment的最 小offset取差值,獲得它在file中的相對位置,直接讀取輸出便可.
    日誌文件的刪除策略很是簡單:啓動一個後臺線程按期掃描log file列表,把保存時間超過閥值的文件直接刪除(根據文件的建立時間).爲了不刪除文件時仍然有read操做(consumer消費),採起copy-on-write方式.
 
    八、分配
    kafka使用zookeeper來存儲一些meta信息,並使用了zookeeper watch機制來發現meta信息的變動並做出相應的動做(好比consumer失效,觸發負載均衡等)
    1) Broker node registry: 當一個kafkabroker啓動後,首先會向zookeeper註冊本身的節點信息(臨時znode),同時當broker和zookeeper斷開鏈接時,此znode也會被刪除.
    格式: /broker/ids/[0...N]   -->host:port;其中[0..N]表示broker id,每一個broker的配置文件中都須要指定一個數字類型的id(全局不可重複),znode的值爲此broker的host:port信息.
    2) Broker Topic Registry: 當一個broker啓動時,會向zookeeper註冊本身持有的topic和partitions信息,仍然是一個臨時znode.
    格式: /broker/topics/[topic]/[0...N]  其中[0..N]表示partition索引號.
    3) Consumer and Consumer group: 每一個consumer 客戶端被建立時,會向zookeeper註冊本身的信息;此做用主要是爲了"負載均衡".
    一個group中的多個consumer能夠交錯的消費一個topic的全部partitions;簡而言之,保證此topic的全部 partitions都能被此group所消費,且消費時爲了性能考慮,讓partition相對均衡的分散到每一個consumer上.
    4) Consumer id Registry: 每一個consumer都有一個惟一的ID(host:uuid,能夠經過配置文件指定,也能夠由系統生成),此id用來標記消費者信息.
    格式:/consumers/[group_id]/ids/[consumer_id]
    仍然是一個臨時的znode,此節點的值爲{"topic_name":#streams...},即表示此consumer目前所消費的topic + partitions列表.
    5) Consumer offset Tracking: 用來跟蹤每一個consumer目前所消費的partition中最大的offset.
    格式:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]-->offset_value
    此znode爲持久節點,能夠看出offset跟group_id有關,以代表當group中一個消費者失效,其餘consumer能夠繼續消費.
    6) Partition Owner registry: 用來標記partition被哪一個consumer消費.臨時znode
    格式:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]-->consumer_node_id當consumer啓動時,所觸發的操做:
    A) 首先進行"Consumer id Registry";
    B) 而後在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其餘consumer的"leave"和"join";只要此znode path下節點列表變動,都會觸發此group下consumer的負載均衡.(好比一個consumer失效,那麼其餘consumer接管 partitions).
    C) 在"Broker id registry"節點下,註冊一個watch用來監聽broker的存活狀況;若是broker列表變動,將會觸發全部的groups下的consumer從新balance.
<ignore_js_op>  
    1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每一個partition leader創建socket鏈接併發送消息.
    2) Broker端使用zookeeper用來註冊broker信息,已經監測partitionleader存活性.
    3) Consumer端使用zookeeper用來註冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader創建socket鏈接,並獲取消息.
 
4、主要配置
 
    一、Broker配置

 

<ignore_js_op>  

 

    2.Consumer主要配置

 

<ignore_js_op>  

 

3.Producer主要配置

 

<ignore_js_op>  

 

 
以上是關於kafka一些基礎說明,在其中咱們知道若是要kafka正常運行,必須配置zookeeper,不然不管是kafka集羣仍是 客戶端的生存者和消費者都沒法正常的工做的,如下是對zookeeper進行一些簡單的介紹:

 

5、zookeeper集羣
    zookeeper是一個爲分佈式應用提供一致性服務的軟件,它是開源的Hadoop項目的一個子項目,並根據google發表的一篇論文來實現的。 zookeeper爲分佈式系統提供了高笑且易於使用的協同服務,它能夠爲分佈式應用提供至關多的服務,諸如統一命名服務,配置管理,狀態同步和組服務 等。zookeeper接口簡單,咱們沒必要過多地糾結在分佈式系統 編程難於處理的同步和一致性問題上,你可使用zookeeper提供的現成(off-the-shelf)服務來實現來實現分佈式系統額配置管理,組管理,Leader選舉等功能。
    zookeeper集羣的安裝,準備三臺服務器server1:192.168.0.1,server2:192.168.0.2,
    server3:192.168.0.3.
    1)下載zookeeper
    到 http://zookeeper.apache.org/releases.html去下載最新版本Zookeeper-3.4.5的安裝包zookeeper-3.4.5.tar.gz.將文件保存server1的~目錄下
    2)安裝zookeeper
    先在服務器 server分別執行a-c步驟
    a)解壓  
    tar -zxvf zookeeper-3.4.5.tar.gz
    解壓完成後在目錄~下會發現多出一個目錄zookeeper-3.4.5,從新命令爲zookeeper
    b)配置
    將conf/zoo_sample.cfg拷貝一份命名爲zoo.cfg,也放在conf目錄下。而後按照以下值修改其中的配置:
   
    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just
    # example sakes.
    dataDir=/home/wwb/zookeeper /data
    dataLogDir=/home/wwb/zookeeper/logs
    # the port at which the clients will connect
    clientPort=2181
    #
    # Be sure to read the maintenance section of the
    # administrator guide before turning on autopurge.
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1
    server.1=192.168.0.1:3888:4888
    server.2=192.168.0.2:3888:4888
     server.3=192.168.0.3:3888:4888
    tickTime:這個時間是做爲 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每一個 tickTime 時間就會發送一個心跳。
    dataDir:顧名思義就是 Zookeeper 保存數據的目錄,默認狀況下,Zookeeper 將寫數據的日誌文件也保存在這個目錄裏。
    clientPort:這個端口就是客戶端鏈接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求。
    initLimit:這個配置項是用來配置 Zookeeper 接受 客戶端(這 裏所說的客戶端不是用戶鏈接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集羣中鏈接到 Leader 的 Follower 服務器)初始化鏈接時最長能忍受多少個心跳時間間隔數。當已經超過 5個心跳的時間(也就是 tickTime)長度後 Zookeeper 服務器尚未收到客戶端的返回信息,那麼代表這個客戶端鏈接失敗。總的時間長度就是 5*2000=10 秒
    syncLimit:這個配置項標識 Leader 與Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是2*2000=4 秒
    server.A=B:C:D:其中 A 是一個數字,表示這個是第幾號服務器;B 是這個服務器的 ip 地址;C 表示的是這個服務器與集羣中的 Leader 服務器交換信息的端口;D 表示的是萬一集羣中的 Leader 服務器掛了,須要一個端口來從新進行選舉,選出一個新的 Leader,而這個端口就是用來執行選舉時服務器相互通訊的端口。若是是僞集羣的配置方式,因爲 B 都是同樣,因此不一樣的 Zookeeper 實例通訊端口號不能同樣,因此要給它們分配不一樣的端口號
注意:dataDir,dataLogDir中的wwb是當前登陸用戶名,data,logs目錄開始是不存在,須要使用mkdir命令建立相應的目錄。而且在該目錄下建立文件myid,serve1,server2,server3該文件內容分別爲1,2,3。
針對服務器server2,server3能夠將server1複製到相應的目錄,不過須要注意dataDir,dataLogDir目錄,而且文件myid內容分別爲2,3
    3)依次啓動 server1,server2,server3的zookeeper.
    /home/wwb/zookeeper/bin/zkServer.sh start,出現相似如下內容
    JMX enabled by default
    Using config: /home/wwb/zookeeper/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
   4) 測試zookeeper是否正常工做,在server1上執行如下命令
    /home/wwb/zookeeper/bin/zkCli.sh -server192.168.0.2:2181,出現相似如下內容
    JLine support is enabled
    2013-11-27 19:59:40,560 - INFO      [main-SendThread(localhost.localdomain:2181):ClientCnxn$SendThread@736]- Session   establishmentcomplete on  server localhost.localdomain/127.0.0.1:2181, sessionid =    0x1429cdb49220000, negotiatedtimeout = 30000
 
    WATCHER::
   
    WatchedEvent state:SyncConnected type:None path:null
    [zk: 127.0.0.1:2181(CONNECTED) 0] [root@localhostzookeeper2]#  
    即表明集羣構建成功了,若是出現錯誤那應該是第三部時沒有啓動好集羣,
運行,先利用
    ps aux | grep zookeeper查看是否有相應的進程的,沒有話,說明集羣啓動出現問題,能夠在每一個服務器上使用
    ./home/wwb/zookeeper/bin/zkServer.sh stop。再依次使用./home/wwb/zookeeper/binzkServer.sh start,這時在執行4通常是沒有問題,若是仍是有問題,那麼先stop再到bin的上級目錄執行./bin/zkServer.shstart試試。
 
注意:zookeeper集羣時,zookeeper要求半數以上的機器可用,zookeeper才能提供服務。
 
6、kafka集羣
(利用上面server1,server2,server3,下面以server1爲實例)
    1)下載kafka0.8( http://kafka.apache.org/downloads.html),保存到服務器/home/wwb目錄下kafka-0.8.0-beta1-src.tgz(kafka_2.8.0-0.8.0-beta1.tgz)
    2)解壓 tar -zxvf kafka-0.8.0-beta1-src.tgz,產生文件夾kafka-0.8.0-beta1-src更改成kafka01   
3)配置
    修改kafka01/config/ server.properties,其中broker.id,log.dirs,zookeeper.connect必須根據實際狀況進行修改,其餘項根據須要自行斟酌。大體以下:
     broker.id=1  
     port=9091  
     num.network.threads=2  
     num.io.threads=2  
     socket.send.buffer.bytes=1048576  
    socket.receive.buffer.bytes=1048576  
     socket.request.max.bytes=104857600  
    log.dir=./logs  
    num.partitions=2  
    log.flush.interval.messages=10000  
    log.flush.interval.ms=1000  
    log.retention.hours=168  
    #log.retention.bytes=1073741824  
    log.segment.bytes=536870912  
    num.replica.fetchers=2  
    log.cleanup.interval.mins=10  
    zookeeper.connect=192.168.0.1:2181,192.168.0.2:2182,192.168.0.3:2183  
    zookeeper.connection.timeout.ms=1000000  
    kafka.metrics.polling.interval.secs=5  
    kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter  
    kafka.csv.metrics.dir=/tmp/kafka_metrics  
    kafka.csv.metrics.reporter.enabled=false
 
4)初始化由於kafka用scala語言編寫,所以運行kafka須要首先準備scala相關環境。
    > cd kafka01  
    > ./sbt update  
    > ./sbt package  
    > ./sbt assembly-package-dependency
在第二個命令時可能須要必定時間,因爲要下載更新一些依賴包。因此請你們 耐心點。
5) 啓動kafka01
    >JMX_PORT=9997 bin/kafka- server-start.sh config/server.properties &  
a)kafka02操做步驟與kafka01雷同,不一樣的地方以下
    修改kafka02/config/server.properties
    broker.id=2
    port=9092
    ##其餘配置和kafka-0保持一致
    啓動kafka02
    JMX_PORT=9998 bin/kafka-server-start.shconfig/server.properties &  
b)kafka03操做步驟與kafka01雷同,不一樣的地方以下
    修改kafka03/config/server.properties
    broker.id=3
    port=9093
    ##其餘配置和kafka-0保持一致
    啓動kafka02
    JMX_PORT=9999 bin/kafka- server-start.shconfig/server.properties &
6)建立Topic(包含一個分區,三個副本)
    >bin/kafka-create-topic.sh--zookeeper 192.168.0.1:2181 --replica 3 --partition 1 --topicmy-replicated-topic
7)查看topic狀況
    >bin/kafka-list-top.sh --zookeeper 192.168.0.1:2181
    topic: my-replicated-topic  partition: 0 leader: 1  replicas: 1,2,0  isr: 1,2,0
8)建立發送者
   >bin/kafka-console-producer.sh--broker-list 192.168.0.1:9091 --topic my-replicated-topic
    my test message1
    my test message2
    ^C
9)建立消費者
    >bin/kafka-console-consumer.sh --zookeeper127.0.0.1:2181 --from-beginning --topic my-replicated-topic
    ...
    my test message1
    my test message2
^C
10)殺掉server1上的broker
  >pkill -9 -f config/ server.properties
11)查看topic
  >bin/kafka-list-top.sh --zookeeper192.168.0.1:2181
  topic: my-replicated-topic  partition: 0 leader: 1  replicas: 1,2,0  isr: 1,2,0
發現topic還正常的存在
11)建立消費者,看是否能查詢到消息
    >bin/kafka-console-consumer.sh --zookeeper192.168.0.1:2181 --from-beginning --topic my-replicated-topic
    ...
    my test message 1
    my test message 2
    ^C
說明一切都是正常的。
 
OK,以上就是對Kafka我的的理解,不對之處請你們及時指出。
 
 
補充說明:
一、public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap),其中該方法的參數Map的key爲topic名稱,value爲topic對應的分區數,譬如說若是在kafka中不存在 相應的topic時,則會建立一個topic,分區數爲value,若是存在的話,該處的value則不起什麼做用

 

二、關於生產者向指定的分區發送數據,經過設置partitioner.class的屬性來指定向那個分區發送數據,若是本身指定必須編寫相應的程序,默認是kafka.producer.DefaultPartitioner,分區程序是基於散列的鍵。

 

三、在多個消費者讀取同一個topic的數據,爲了保證每一個消費者讀取數據的惟一性,必須將這些消費者group_id定義爲同一個值,這樣就構建了一個相似隊列的數據結構,若是定義不一樣,則相似一種廣播結構的。

 

四、在consumerapi中,參數 設計到數字部分,相似Map<String,Integer>,
numStream,指的都是在topic不存在的時,會建立一個topic,而且分區個數爲Integer,numStream,注意若是數字大於broker的配置中num.partitions屬性,會以num.partitions爲依據建立分區個數的。

 

五、producerapi,調用send時,若是不存在topic,也會建立topic,在該方法中沒有提供分區個數的參數,在這裏分區個數是由服務端broker的配置中num.partitions屬性決定的
 
關於kafka說明能夠參考: http://kafka.apache.org/documentation.html
 
文章轉自:http://www.aboutyun.com/thread-9341-1-1.html
相關文章
相關標籤/搜索