搭建Kafka運行環境

這兩天在公司學kafka,在網上找到一篇比較詳細的入門博文,拿過來學習一下,發現直接按上面的作仍是有些小問題,本身就在教程裏作了些相應的修改

介紹

Kafka是一個分佈式的、可分區的、可複製的消息系統。它提供了普通消息系統的功能,但具備本身獨特的設計。這個獨特的設計是什麼樣的呢?java

 

首先讓咱們看幾個基本的消息系統術語:node

  • Kafka將消息以topic爲單位進行概括。linux

  • 將向Kafka topic發佈消息的程序成爲producers.apache

  • 將預訂topics並消費消息的程序成爲consumer.服務器

  • Kafka以集羣的方式運行,能夠由一個或多個服務組成,每一個服務叫作一個broker.網絡

producers經過網絡將消息發送到Kafka集羣,集羣向消費者提供消息,以下圖所示:架構

客戶端和服務端經過TCP協議通訊。Kafka提供了Java客戶端,而且對多種語言都提供了支持。併發

 

相關閱讀負載均衡

分佈式發佈訂閱消息系統 Kafka 架構設計 http://www.linuxidc.com/Linux/2013-11/92751.htm異步

Apache Kafka 代碼實例 http://www.linuxidc.com/Linux/2013-11/92754.htm

Apache Kafka 教程筆記 http://www.linuxidc.com/Linux/2014-01/94682.htm

Topics 和Logs

先來看一下Kafka提供的一個抽象概念:topic.

一個topic是對一組消息的概括。對每一個topic,Kafka 對它的日誌進行了分區,以下圖所示:

每一個分區都由一系列有序的、不可變的消息組成,這些消息被連續的追加到分區中。分區中的每一個消息都有一個連續的序列號叫作offset,用來在分區中惟一的標識這個消息。

 

在一個可配置的時間段內,Kafka集羣保留全部發布的消息,無論這些消息有沒有被消費。好比,若是消息的保存策略被設置爲2天,那麼在一個消息被髮布的兩天時間內,它都是能夠被消費的。以後它將被丟棄以釋放空間。Kafka的性能是和數據量無關的常量級的,因此保留太多的數據並非問題。

 

實際上每一個consumer惟一須要維護的數據是消息在日誌中的位置,也就是offset.這個offset有consumer來維護:通常狀況下隨着consumer不斷的讀取消息,這offset的值不斷增長,但其實consumer能夠以任意的順序讀取消息,好比它能夠將offset設置成爲一箇舊的值來重讀以前的消息。

 

以上特色的結合,使Kafka consumers很是的輕量級:它們能夠在不對集羣和其餘consumer形成影響的狀況下讀取消息。你可使用命令行來"tail"消息而不會對其餘正在消費消息的consumer形成影響。

 

將日誌分區能夠達到如下目的:首先這使得每一個日誌的數量不會太大,能夠在單個服務上保存。另外每一個分區能夠單獨發佈和消費,爲併發操做topic提供了一種可能。

分佈式

每一個分區在Kafka集羣的若干服務中都有副本,這樣這些持有副本的服務能夠共同處理數據和請求,副本數量是能夠配置的。副本使Kafka具有了容錯能力。

每一個分區都由一個服務器做爲「leader」,零或若干服務器做爲「followers」,leader負責處理消息的讀和寫,followers則去複製leader.若是leader down了,followers中的一臺則會自動成爲leader。集羣中的每一個服務都會同時扮演兩個角色:做爲它所持有的一部分分區的leader,同時做爲其餘分區的followers,這樣集羣就會據有較好的負載均衡。

Producers

Producer將消息發佈到它指定的topic中,並負責決定發佈到哪一個分區。一般簡單的由負載均衡機制隨機選擇分區,但也能夠經過特定的分區函數選擇分區。使用的更多的是第二種。

Consumers

發佈消息一般有兩種模式:隊列模式(queuing)和發佈-訂閱模式(publish-subscribe)。隊列模式中,consumers能夠同時從服務端讀取消息,每一個消息只被其中一個consumer讀到;發佈-訂閱模式中消息被廣播到全部的consumer中。

Consumers能夠加入一個consumer 組,共同競爭一個topic,topic中的消息將被分發到組中的一個成員中。同一組中的consumer能夠在不一樣的程序中,也能夠在不一樣的機器上。若是全部的consumer都在一個組中,這就成爲了傳統的隊列模式,在各consumer中實現負載均衡。

若是全部的consumer都不在不一樣的組中,這就成爲了發佈-訂閱模式,全部的消息都被分發到全部的consumer中。

更常見的是,每一個topic都有若干數量的consumer組,每一個組都是一個邏輯上的「訂閱者」,爲了容錯和更好的穩定性,每一個組由若干consumer組成。這其實就是一個發佈-訂閱模式,只不過訂閱者是個組而不是單個consumer。

由兩個機器組成的集羣擁有4個分區 (P0-P3) 2個consumer組. A組有兩個consumerB組有4個

 

相比傳統的消息系統,Kafka能夠很好的保證有序性。

傳統的隊列在服務器上保存有序的消息,若是多個consumers同時從這個服務器消費消息,服務器就會以消息存儲的順序向consumer分發消息。雖然服務器按順序發佈消息,可是消息是被異步的分發到各consumer上,因此當消息到達時可能已經失去了原來的順序,這意味着併發消費將致使順序錯亂。爲了不故障,這樣的消息系統一般使用「專用consumer」的概念,其實就是隻容許一個消費者消費消息,固然這就意味着失去了併發性。

 

在這方面Kafka作的更好,經過分區的概念,Kafka能夠在多個consumer組併發的狀況下提供較好的有序性和負載均衡。將每一個分區分只分發給一個consumer組,這樣一個分區就只被這個組的一個consumer消費,就能夠順序的消費這個分區的消息。由於有多個分區,依然能夠在多個consumer組之間進行負載均衡。注意consumer組的數量不能多於分區的數量,也就是有多少分區就容許多少併發消費。

 

Kafka只能保證一個分區以內消息的有序性,在不一樣的分區之間是不能夠的,這已經能夠知足大部分應用的需求。若是須要topic中全部消息的有序性,那就只能讓這個topic只有一個分區,固然也就只有一個consumer組消費它。

搭建Kafka開發環境

Kafka用到了Zookeeper,全部首先啓動Zookper,下面簡單的啓用一個單實例的Zookkeeper服務。能夠在命令的結尾加個&符號,這樣就能夠啓動後離開控制檯。

> bin/zookeeper-server-start.sh config/zookeeper.properties &


...

如今啓動Kafka:

> bin/kafka-server-start.sh config/server.properties &



Step 3: 建立 topic


建立一個叫作「test」的topic,它只有一個分區,一個副本。

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

能夠經過list命令查看建立的topic:

> bin/kafka-topics.sh --list --zookeeper localhost:2181

test

除了手動建立topic,還能夠配置broker讓它自動建立topic.


Step 4:發送消息.


Kafka 使用一個簡單的命令行producer,從文件中或者從標準輸入中讀取消息併發送到服務端。默認的每條命令將發送一條消息。


運行producer並在控制檯中輸一些消息,這些消息將被髮送到服務端:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

This is a messageThis is another message

ctrl+c能夠退出發送。

Step 5: 啓動consumer


Kafka also has a command line consumer that will dump out messages to standard output.

Kafka也有一個命令行consumer能夠讀取消息並輸出到標準輸出:

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

This is a message

This is another message

你在一個終端中運行consumer命令行,另外一個終端中運行producer命令行,就能夠在一個終端輸入消息,另外一個終端讀取消息。

這兩個命令都有本身的可選參數,能夠在運行的時候不加任何參數能夠看到幫助信息。

 

Step 6: 搭建一個多個broker的集羣


剛纔只是啓動了單個broker,如今啓動有3個broker組成的集羣,這些broker節點也都是在本機上的:

首先爲每一個節點編寫配置文件:

 

> cp config/server.properties config/server-1.properties

> cp config/server.properties config/server-2.properties

在拷貝出的新文件中修改和添加如下參數:

config/server-1.properties:

    broker.id=1(修改)

    log.dir=/tmp/kafka-logs-1(修改)

    port=9093(添加)

    

 

config/server-2.properties:

    broker.id=2(修改)

    log.dir=/tmp/kafka-logs-2 (修改)

    port=9094 (添加)

    

broker.id在集羣中惟一的標註一個節點,由於在同一個機器上,因此必須制定不一樣的端口和日誌文件,避免數據被覆蓋。

 

We already have Zookeeper and our single node started, so we just need to start the two new nodes:

剛纔已經啓動可Zookeeper和一個節點,如今啓動另外兩個節點:

> bin/kafka-server-start.sh config/server-1.properties &

...

> bin/kafka-server-start.sh config/server-2.properties &

...

建立一個擁有3個副本的topic:

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

如今咱們搭建了一個集羣,怎麼知道每一個節點的信息呢?運行「"describe topics」命令就能夠了:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:

        Topic: my-replicated-topic      Partition: 0    Leader: 0       Replicas: 1,2,0 Isr: 1,2,0

下面解釋一下這些輸出。第一行是對全部分區的一個描述,而後每一個分區都會對應一行,由於咱們只有一個分區因此下面就只加了一行。

leader:負責處理消息的讀和寫,leader是從全部節點中隨機選擇的,這裏的--replication-factor是副本數,它不能大於broker數

replicas:列出了全部的副本節點,無論節點是否在服務中.

isr:是正在服務中的節點.

在咱們的例子中,節點0是做爲leader運行。

向topic發送消息:


> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

...

my test message 1my test message 2^C 

消費這些消息:

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2

^C

測試一下容錯能力.Broker 1做爲leader運行,如今咱們kill掉它:

> ps | grep server-1.properties7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...

> kill -9 7564

另一個節點被選作了leader,node 1 再也不出如今 in-sync 副本列表中:

> bin/kafka-topics.sh --describe --zookeeper localhost:218192 --topic my-replicated-topic

Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:

        Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 1,2,0 Isr: 2,0

雖然最初負責續寫消息的leader down掉了,但以前的消息仍是能夠消費的:

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2

^C

看來Kafka的容錯機制仍是不錯的。

相關文章
相關標籤/搜索