Kafka消息系統

1、基本概念java

Kafka是一個分佈式的、可分區的、可複製的消息系統。它提供了普通消息系統的功能,但具備本身獨特的設計。
首先讓咱們看幾個基本的消息系統術語:
Kafka將消息以topic爲單位進行概括。
將向Kafka topic發佈消息的程序成爲producers.
將預訂topics並消費消息的程序成爲consumer.
Kafka以集羣的方式運行,能夠由一個或多個服務組成,每一個服務叫作一個broker.node

 producers經過網絡將消息發送到Kafka集羣,集羣向消費者提供消息,以下圖所示:
 服務器

客戶端和服務端經過TCP協議通訊。Kafka提供了Java客戶端,而且對多種語言都提供了支持。
Topics 和Logs
先來看一下Kafka提供的一個抽象概念:topic.
一個topic是對一組消息的概括。對每一個topic,Kafka 對它的日誌進行了分區,以下圖所示:
 
每一個分區都由一系列有序的、不可變的消息組成,這些消息被連續的追加到分區中。分區中的每一個消息都有一個連續的序列號叫作offset,用來在分區中惟一的標識這個消息。
在一個可配置的時間段內,Kafka集羣保留全部發布的消息,無論這些消息有沒有被消費。好比,若是消息的保存策略被設置爲2天,那麼在一個消息被髮布的兩天時間內,它都是能夠被消費的。以後它將被丟棄以釋放空間。Kafka的性能是和數據量無關的常量級的,因此保留太多的數據並非問題。
實際上每一個consumer惟一須要維護的數據是消息在日誌中的位置,也就是offset.這個offset有consumer來維護:通常狀況下隨着consumer不斷的讀取消息,這offset的值不斷增長,但其實consumer能夠以任意的順序讀取消息,好比它能夠將offset設置成爲一箇舊的值來重讀以前的消息。
以上特色的結合,使Kafka consumers很是的輕量級:它們能夠在不對集羣和其餘consumer形成影響的狀況下讀取消息。你可使用命令行來"tail"消息而不會對其餘正在消費消息的consumer形成影響。
將日誌分區能夠達到如下目的:首先這使得每一個日誌的數量不會太大,能夠在單個服務上保存。另外每一個分區能夠單獨發佈和消費,爲併發操做topic提供了一種可能。網絡

分佈式併發

每一個分區在Kafka集羣的若干服務中都有副本,這樣這些持有副本的服務能夠共同處理數據和請求,副本數量是能夠配置的。副本使Kafka具有了容錯能力。
每一個分區都由一個服務器做爲「leader」,零或若干服務器做爲「followers」,leader負責處理消息的讀和寫,followers則去複製leader.若是leader down了,followers中的一臺則會自動成爲leader。集羣中的每一個服務都會同時扮演兩個角色:做爲它所持有的一部分分區的leader,同時做爲其餘分區的followers,這樣集羣就會據有較好的負載均衡。負載均衡

Producers
Producer將消息發佈到它指定的topic中,並負責決定發佈到哪一個分區。一般簡單的由負載均衡機制隨機選擇分區,但也能夠經過特定的分區函數選擇分區。使用的更多的是第二種。異步

Consumers
發佈消息一般有兩種模式:隊列模式(queuing)和發佈-訂閱模式(publish-subscribe)。隊列模式中,consumers能夠同時從服務端讀取消息,每一個消息只被其中一個consumer讀到;發佈-訂閱模式中消息被廣播到全部的consumer中。Consumers能夠加入一個consumer 組,共同競爭一個topic,topic中的消息將被分發到組中的一個成員中。同一組中的consumer能夠在不一樣的程序中,也能夠在不一樣的機器上。若是全部的consumer都在一個組中,這就成爲了傳統的隊列模式,在各consumer中實現負載均衡。若是全部的consumer都不在不一樣的組中,這就成爲了發佈-訂閱模式,全部的消息都被分發到全部的consumer中。更常見的是,每一個topic都有若干數量的consumer組,每一個組都是一個邏輯上的「訂閱者」,爲了容錯和更好的穩定性,每一個組由若干consumer組成。這其實就是一個發佈-訂閱模式,只不過訂閱者是個組而不是單個consumer。
 
由兩個機器組成的集羣擁有4個分區 (P0-P3) 2個consumer組. A組有兩個consumerB組有4個
相比傳統的消息系統,Kafka能夠很好的保證有序性。
傳統的隊列在服務器上保存有序的消息,若是多個consumers同時從這個服務器消費消息,服務器就會以消息存儲的順序向consumer分發消息。雖然服務器按順序發佈消息,可是消息是被異步的分發到各consumer上,因此當消息到達時可能已經失去了原來的順序,這意味着併發消費將致使順序錯亂。爲了不故障,這樣的消息系統一般使用「專用consumer」的概念,其實就是隻容許一個消費者消費消息,固然這就意味着失去了併發性。
在這方面Kafka作的更好,經過分區的概念,Kafka能夠在多個consumer組併發的狀況下提供較好的有序性和負載均衡。將每一個分區分只分發給一個consumer組,這樣一個分區就只被這個組的一個consumer消費,就能夠順序的消費這個分區的消息。由於有多個分區,依然能夠在多個consumer組之間進行負載均衡。注意consumer組的數量不能多於分區的數量,也就是有多少分區就容許多少併發消費。
Kafka只能保證一個分區以內消息的有序性,在不一樣的分區之間是不能夠的,這已經能夠知足大部分應用的需求。若是須要topic中全部消息的有序性,那就只能讓這個topic只有一個分區,固然也就只有一個consumer組消費它。分佈式

2、環境搭建函數

1.下載Kafka點擊下載最新的版本並解壓.性能

1)tar -xzf kafka_2.9.2-0.8.1.1.tgz

2)cd kafka_2.9.2-0.8.1.1

2.啓動服務
Kafka用到了Zookeeper,全部首先啓動Zookper,下面簡單的啓用一個單實例的Zookkeeper服務。能夠在命令的結尾加個&符號,這樣就能夠啓動後離開控制檯。

1)bin/zookeeper-server-start.sh config/zookeeper.properties &

2)啓動Kafka:

bin/kafka-server-start.sh config/server.properties

3)建立 topic

建立一個叫作「test」的topic,它只有一個分區,一個副本

bin/kafka-topics.sh --create --zookeeper

localhost:2181 --replication-factor 1 --partitions 1

--topic test

能夠經過list命令查看建立的topic:

bin/kafka-topics.sh --list --zookeeper localhost:2181

除了手動建立topic,還能夠配置broker讓它自動建立topic.

4)發送消息
Kafka 使用一個簡單的命令行producer,從文件中或者從標準輸入中讀取消息併發送到服務端。默認的每條命令將發送一條消息。

運行producer並在控制檯中輸一些消息,這些消息將被髮送到服務端:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

This is a messageThis is another message

ctrl+c能夠退出發送。

5)啓動consumer
Kafka也有一個命令行consumer能夠讀取消息並輸出到標準輸出:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

This is a message

This is another message

你在一個終端中運行consumer命令行,另外一個終端中運行producer命令行,就能夠在一個終端輸入消息,另外一個終端讀取消息。
這兩個命令都有本身的可選參數,能夠在運行的時候不加任何參數能夠看到幫助信息。

6) 搭建一個多個broker的集羣

剛纔只是啓動了單個broker,如今啓動有3個broker組成的集羣,這些broker節點也都是在本機上的:
首先爲每一個節點編寫配置文件:

cp config/server.properties config/server-1.properties

cp config/server.properties config/server-2.properties

在拷貝出的新文件中添加如下參數:

config/server-[1,2].properties:

broker.id=[1,2]

port=909[3,4]

log.dir=/tmp/kafka-logs-[1,2]

broker.id在集羣中惟一的標註一個節點,由於在同一個機器上,因此必須制定不一樣的端口和日誌文件,避免數據被覆蓋。

剛纔已經啓動可Zookeeper和一個節點,如今啓動另外兩個節點:

bin/kafka-server-start.sh config/server-1.properties &

bin/kafka-server-start.sh config/server-2.properties &
建立一個擁有3個副本的topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

如今咱們搭建了一個集羣,怎麼知道每一個節點的信息呢?運行「"describe topics」命令就能夠了:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:

Topic: my-replicated-topic      Partition: 0    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0

下面解釋一下這些輸出。第一行是對全部分區的一個描述,而後每一個分區都會對應一行,由於咱們只有一個分區因此下面就只加了一行。
leader:負責處理消息的讀和寫,leader是從全部節點中隨機選擇的.
replicas:列出了全部的副本節點,無論節點是否在服務中.
isr:是正在服務中的節點.
在咱們的例子中,節點1是做爲leader運行。
向topic發送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

my test message 1 my test message 2^C

消費這些消息:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic     my-replicated-topic

my test message 1

my test message 2
測試一下容錯能力.Broker 1做爲leader運行,如今咱們kill掉它:

ps | grep server-1.properties                     7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...

kill -9 7564

另一個節點被選作了leader,node 1 再也不出如今 in-sync 副本列表中:

bin/kafka-topics.sh --describe --zookeeper localhost:218192 --topic my-replicated-topic

Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:

Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 1,2,0 Isr: 2,0

雖然最初負責續寫消息的leader down掉了,但以前的消息仍是能夠消費的:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

my test message 1

my test message 2

相關文章
相關標籤/搜索