Kafka是一種分佈式的,基於發佈/訂閱的消息系統。主要設計目標以下:html
在項目啓動之初來預測未來項目會碰到什麼需求,是極其困難的。 消息隊列在處理過程當中間插入了一個隱含的、基於數據的接口層,兩邊的處理過程都要實現這一接口。 這容許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵照一樣的接口約束。數據庫
有些狀況下,處理數據的過程會失敗。除非數據被持久化,不然將形成丟失。消息隊列把數據進行持久化直到它們已經被徹底處理,經過這一方式規避了數據丟失風險。在被許多消息隊列所採用的」插入-獲取-刪除」範式中,在把一個消息從隊列中刪除以前,須要你的處理過程明確的指出該消息已經被處理完畢,確保你的數據被安全的保存直到你使用完畢。安全
由於消息隊列解耦了你的處理過程,因此增大消息入隊和處理的頻率是很容易的;只要另外增長處理過程便可。不須要改變代碼、不須要調節參數。擴展就像調大電力按鈕同樣簡單。服務器
在訪問量劇增的狀況下,應用仍然須要繼續發揮做用,可是這樣的突發流量並不常見;若是爲以能處理這類峯值訪問爲標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列可以使關鍵組件頂住突發的訪問壓力,而不會由於突發的超負荷的請求而徹底崩潰。網絡
當體系的一部分組件失效,不會影響到整個系統。消息隊列下降了進程間的耦合度,因此即便一個處理消息的進程掛掉,加入隊列中的消息仍然能夠在系統恢復後被處理。而這種容許重試或者延後處理請求的能力一般是造就一個略感不便的用戶和一個沮喪透頂的用戶之間的區別。併發
消息隊列提供的冗餘機制保證了消息能被實際的處理,只要一個進程讀取了該隊列便可。在此基礎上,IronMQ提供了一個」只送達一次」保證。不管有多少進程在從隊列中領取數據,每個消息只能被處理一次。這之因此成爲可能,是由於獲取一個消息只是」預約」了這個消息,暫時把它移出了隊列。除非客戶端明確的表示已經處理完了這個消息,不然這個消息會被放回隊列中去,在一段可配置的時間以後可再次被處理。app
在大多使用場景下,數據處理的順序都很重要。消息隊列原本就是排序的,而且能保證數據會按照特定的順序來處理。IronMO保證消息經過FIFO(先進先出)的順序來處理,所以消息在隊列中的位置就是從隊列中檢索他們的位置。負載均衡
在任何重要的系統中,都會有須要不一樣的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列經過一個緩衝層來幫助任務最高效率的執行–寫入隊列的處理會盡量的快速,而不受從隊列讀的預備處理的約束。該緩衝有助於控制和優化數據流通過系統的速度。框架
在一個分佈式系統裏,要獲得一個關於用戶操做會用多長時間及其緣由的整體印象,是個巨大的挑戰。消息隊列經過消息被處理的頻率,來方便的輔助肯定那些表現不佳的處理過程或領域,這些地方的數據流都不夠優化。異步
不少時候,你不想也不須要當即處理消息。消息隊列提供了異步處理機制,容許你把一個消息放入隊列,但並不當即處理它。你想向隊列中放入多少消息就放多少,而後在你樂意的時候再去處理它們。
名稱 | 一覽 |
---|---|
RabbitMQ | RabbitMQ是使用Erlang編寫的一個開源的消息隊列,自己支持不少的協議:AMQP,XMPP, SMTP, STOMP,也正因如此,它很是重量級,更適合於企業級的開發。同時實現了Broker構架,這意味着消息在發送給客戶端時先在中心隊列排隊。對路由,負載均衡或者數據持久化都有很好的支持。 |
ZeroMQ | ZeroMQ號稱最快的消息隊列系統,尤爲針對大吞吐量的需求場景。ZMQ可以實現RabbitMQ不擅長的高級/複雜的隊列,可是開發人員須要本身組合多種技術框架,技術上的複雜度是對這MQ可以應用成功的挑戰。ZeroMQ具備一個獨特的非中間件的模式,你不須要安裝和運行一個消息服務器或中間件,由於你的應用程序將扮演了這個服務角色。你只須要簡單的引用ZeroMQ程序庫,可使用NuGet安裝,而後你就能夠愉快的在應用程序之間發送消息了。可是ZeroMQ僅提供非持久性的隊列,也就是說若是宕機,數據將會丟失。其中,Twitter的Storm 0.9.0之前的版本中默認使用ZeroMQ做爲數據流的傳輸(Storm從0.9版本開始同時支持ZeroMQ和Netty做爲傳輸模塊)。 |
ActiveMQ | ActiveMQ是Apache下的一個子項目。 相似於ZeroMQ,它可以以代理人和點對點的技術實現隊列。同時相似於RabbitMQ,它少許代碼就能夠高效地實現高級應用場景。 |
Kafka | Kafka是Apache下的一個子項目,是一個高性能跨語言分佈式發佈/訂閱消息隊列系統。具備如下特性:快速持久化,能夠在O(1)的系統開銷下進行消息持久化;高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;徹底的分佈式系統,Broker、Producer、Consumer都原生自動支持分佈式,自動實現負載均衡;支持Hadoop數據並行加載,對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka經過Hadoop的並行加載機制來統一了在線和離線的消息處理。Apache Kafka相對於ActiveMQ是一個很是輕量級的消息系統,除了性能很是好以外,仍是一個工做良好的分佈式系統。 |
圖從上層結構反映:producers經過網絡向Kafka集羣發送消息,Kafka又將消息發送給consumers。Produce和Kafka集羣,以及Kafka集羣和consumer之間是經過TCP協議通訊的。
Topic是屬於同一類消息的集合,Kafka採用分區的方法維護每一個topic,格式以下: 每一個分區都是一個有序的,不變的消息隊列。分區中的消息在本分區中都有一個惟一序列id,該序列id稱爲offset。新的消息直接添加到分區的末尾。 Kafka會保留設置時間範圍內的全部消息記錄,儘管某些記錄已經被consumer消費過。每一個consumer會保存其在日誌中的位置信息,該位置信息稱爲offset,offset由consumer管
理。Consumer增長offset的值,依次讀取隊列中新的消息。可是,consumer控制offset,使其可以讀取任意offset的消息。 Consumer這種讀取消息的機制,使得consumer很是靈活,能夠隨時讀取topic中任意位置的消息,consumer的運行和Kafka集羣以及其餘consumer沒有影響。 Partition是物理上的分區,每一個partition有若干segment組成。具體參考以下:
日誌的partitions分佈在Kafka集羣的每一個server上,每一個服務器負責一部分partitions的數據處理和請求。每一個partition均可以設置若干備份分佈在Kafka集羣的其餘server上,而且每一個partition都有一個server做爲leader,0個或多個server做爲輔助的follows。Leader處理其負責的partition全部的讀寫請求,follows對leader上全部的讀寫請求等元數據進行備份,以便leader宕機以後follow充當leader的角色。扮演leader的服務器處理部分partitions,其餘的follows處理其餘的partitions,這樣Kafka集羣就實現了負載均衡的功能。
Produce將消息發佈到它選擇的topic中,並負責將消息分發到其對應topic的partition中。
傳統消息有排隊和發佈-訂閱兩種模型。在排隊模型中,consumer池中的consumer從一個server讀取消息,每條消息只能被一個consumer取到。在分發-訂閱模型中,每條消息被廣播到全部的consumers。Kafka用consumer group整合了這兩種模型的特色。 每一個consumer都屬於一個consumer group。Kafka集羣和consumer group之間的消息分發採用了發佈-訂閱模型,consumer group中的consumer經過consumer group和Kafka集羣採用了排隊模型。
alt text
上圖所示:每一個server中的partition都廣播到各個consumer group,每一個consumer group中的consumer都從group中獲取消息。 兩種極端的狀況: 1. 若全部的consumer都屬於同一個consumer group,每一個consumer都從Kafka集羣以隊列模型獲取消息 2. 若全部的consumer屬於不一樣的consumer group,kafka集羣至關於將消息廣播到每一個consumer Kafka經過將topic分紅partitions的方式維持並行性,並經過consumer線程池確保消息的有序性和負載均衡。具體作法爲:將topic分紅若干partitions,在consumer group中建立若干consumers,每一個partition對應一個consumer,這樣每一個consumer都有序的讀取其對應partition上的消息。從整個topic來看,若干consumers並行讀取topic中的消息。
解壓zookeeper包
##以版本號zookeeper-3.4.8.tar.gz爲例,將zookeeper-3.4.8.tar.gz解壓到/home/kafka目錄下
sudo tar xvf zookeeper-3.4.8.tar.gz -C /home/kafka mv zookeeper-3.4.8 zookeeper
配置/etc/profile文件
sudo vi /etc/profile export ZOOKEEPER_HOME=/home/kafka/zookeeper export PATH=$ZOOKEEPER_HOME/bin:$PATH CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$ZOOKEEPER_HOME/libs export PATH source /etc/profile ##使得配置文件生效
配置zookeeper
mkdir -p /home/kafka/zookeeper/data cd /home/kafka/zookeeper/bin cp zoo.sample.cfg zoo.cfg vi zoo.cfg dataDir=/home/kafka/zookeeper/data ##修改
啓動和關閉zookeeper
bin/zkServer.sh start ##啓動zookeeper bin/zkServer.sh stop ##關閉zookeeper
解壓和安裝kafka
##以版本號kafka_2.11-0.9.0.1.tgz爲例,將kafka_2.11-0.9.0.1.tgz解壓到/home/kafka目錄下
sudo tar xvf kafka_2.11-0.9.0.1.tgz -C /home/kafka mv kafka_2.11-0.9.0.1.tgz kafka
配置kafka
cd ~/kafka/config vi server.properties ##打開server.properties配置文件 log.dirs=/tmp/kafka-logs ##修改log路徑 zookeeper.connect=localhost:2181 ##配置zookeeper
使用kafka
cd ~/kafka/bin bin/kafka-server-start.sh config/server.properties & ##啓動kafka bin/kafka-server-stop.sh config/server.properties ##關閉kafka bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test & ##建立topic bin/kafka-console-producer.sh --broker-list localhost:9092 --sync --topic test ##建立生產者 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test ##建立消費者