Kafka 是一個分佈式流媒體平臺html
kafka官網:http://kafka.apache.org/java
(1)流媒體平臺有三個關鍵功能:算法
(2)Kafka一般用於兩大類應用:數據庫
要了解Kafka如何作這些事情,讓咱們深刻探討Kafka的能力。apache
(3)首先是幾個概念:bootstrap
(4)Kafka有四個核心API:vim
在Kafka中,客戶端和服務器之間的通訊是經過簡單,高性能,語言無關的TCP協議完成的。此協議已版本化並保持與舊版本的向後兼容性。Kafka提供Java客戶端,但客戶端有多種語言版本。windows
咱們首先深刻了解 Kafka 爲記錄流提供的核心抽象 - 主題topics 後端
一個Topic能夠認爲是一類消息,每一個topic將被分紅多個partition(區),每一個partition在存儲層面是append log文件bash
主題是發佈記錄的類別或訂閱源名稱。Kafka的主題老是多用戶; 也就是說,一個主題能夠有零個,一個或多個消費者訂閱寫入它的數據。
對於每一個主題,Kafka羣集都維護一個以下所示的分區日誌:
每一個分區都是一個有序的,不可變的記錄序列,不斷附加到結構化的提交日誌中。分區中的記錄每一個都分配了一個稱爲偏移的順序ID號,它惟一地標識分區中的每一個記錄。
Kafka集羣持久保存全部已發佈的記錄 - 不管是否已使用 - 使用可配置的保留期。例如,若是保留策略設置爲兩天,則在發佈記錄後的兩天內,它可供使用,以後將被丟棄以釋放空間。Kafka的性能在數據大小方面其實是恆定的,所以長時間存儲數據不是問題。
實際上,基於每一個消費者保留的惟一元數據是該消費者在日誌中的偏移或位置。這種偏移由消費者控制:一般消費者在讀取記錄時會線性地提升其偏移量,但事實上,因爲該位置由消費者控制,所以它能夠按照本身喜歡的任何順序消費記錄。例如,消費者能夠重置爲較舊的偏移量來從新處理過去的數據,或者跳到最近的記錄並從「如今」開始消費。
這些功能組合意味着Kafka 消費者consumers 很是cheap - 他們能夠來來每每對集羣或其餘消費者沒有太大影響。例如,您可使用咱們的命令行工具「tail」任何主題的內容,而無需更改任何現有使用者所消耗的內容。
日誌中的分區有多種用途。首先,它們容許日誌擴展到超出適合單個服務器的大小。每一個單獨的分區必須適合託管它的服務器,但主題可能有許多分區,所以它能夠處理任意數量的數據。其次,它們充當了並行性的單位 - 更多的是它。
一個Topic的多個partitions,被分佈在kafka集羣中的多個server上;每一個server(kafka實例)負責partitions中消息的讀寫操做;此外kafka還能夠配置partitions須要備份的個數(replicas),每一個partition將會被備份到多臺機器上,以提升可用性.
基於replicated方案,那麼就意味着須要對多個備份進行調度;每一個partition都有一個server爲"leader";leader負責全部的讀寫操做,若是leader失效,那麼將會有其餘follower來接管(成爲新的leader);follower只是單調的和leader跟進,同步消息便可..因而可知做爲leader的server承載了所有的請求壓力,所以從集羣的總體考慮,有多少個partitions就意味着有多少個"leader",kafka會將"leader"均衡的分散在每一個實例上,來確保總體的性能穩定。
Producers 將數據發佈到指定的topics 主題。同時Producer 也能決定將此消息歸屬於哪一個partition;好比基於"round-robin"方式或者經過其餘的一些算法等。
分析:兩個服務器Kafka羣集,託管四個分區(P0-P3),包含兩個使用者組。消費者組A有兩個消費者實例,B組有四個消費者實例。
在Kafka中實現消費consumption 的方式是經過在消費者實例上劃分日誌中的分區,以便每一個實例在任什麼時候間點都是分配的「公平份額」的獨佔消費者。維護組中成員資格的過程由Kafka協議動態處理。若是新實例加入該組,他們將從該組的其餘成員接管一些分區; 若是實例死亡,其分區將分發給其他實例。
Kafka僅提供分區內記錄的總訂單,而不是主題中不一樣分區之間的記錄。對於大多數應用程序而言,按分區排序與按鍵分區數據的能力相結合就足夠了。可是,若是您須要對記錄進行總訂單,則可使用僅包含一個分區的主題來實現,但這將意味着每一個使用者組只有一個使用者進程。
Kafka的流概念與傳統的企業郵件系統相好比何?
(1)傳統消息系統
消息傳統上有兩種模型:queuing排隊 and publish-subscribe發佈 - 訂閱。在隊列中,消費者池能夠從服務器讀取而且每一個記錄轉到其中一個; 在發佈 - 訂閱中,記錄被廣播給全部消費者。這兩種模型中的每一種都有優勢和缺點。排隊的優點在於它容許您在多個消費者實例上劃分數據處理,從而能夠擴展您的處理。不幸的是,一旦一個進程讀取它已經消失的數據,隊列就不是多用戶。發佈 - 訂閱容許您將數據廣播到多個進程,但因爲每條消息都發送給每一個訂閱者,所以沒法進行擴展處理。
卡夫卡的消費者羣體概念歸納了這兩個概念。與隊列同樣,使用者組容許您將處理劃分爲一組進程(使用者組的成員)。與發佈 - 訂閱同樣,Kafka容許您向多個消費者組廣播消息。
(2)kafka 的優點
Kafka模型的優點在於每一個主題都具備這些屬性 - 它能夠擴展處理而且也是多用戶 - 不須要選擇其中一個。
與傳統的消息系統相比,Kafka具備更強的訂購保證。
傳統隊列在服務器上按順序保留記錄,若是多個消費者從隊列中消耗,則服務器按照存儲順序分發記錄。可是,雖然服務器按順序分發記錄,可是記錄是異步傳遞給消費者的,所以它們可能會在不一樣的消費者處出現故障。這實際上意味着在存在並行消耗的狀況下丟失記錄的順序。消息傳遞系統一般經過具備「獨佔消費者」概念來解決這個問題,該概念只容許一個進程從隊列中消耗,但固然這意味着處理中沒有並行性。
kafka作得更好。經過在主題中具備並行性概念 - 分區 - ,Kafka可以在消費者流程池中提供訂購保證和負載平衡。這是經過將主題中的分區分配給使用者組中的使用者來實現的,以便每一個分區僅由該組中的一個使用者使用。經過這樣作,咱們確保使用者是該分區的惟一讀者並按順序使用數據。因爲有許多分區,這仍然能夠平衡許多消費者實例的負載。但請注意,消費者組中的消費者實例不能超過度區。
Kafka能夠替代更傳統的消息代理。消息代理的使用有多種緣由(將處理與數據生成器分離,緩衝未處理的消息等)。與大多數消息傳遞系統相比,Kafka具備更好的吞吐量,內置分區,複製和容錯功能,這使其成爲大規模消息處理應用程序的理想解決方案。
根據經驗,消息傳遞的使用一般相對較低,但可能須要較低的端到端延遲,而且一般取決於Kafka提供的強大的耐用性保證。
在這個領域,Kafka可與傳統的消息傳遞系統(如ActiveMQ或 RabbitMQ)相媲美。
Kafka的原始用例是可以將用戶活動跟蹤管道重建爲一組實時發佈 - 訂閱源。這意味着站點活動(頁面查看,搜索或用戶可能採起的其餘操做)將發佈到中心主題,每一個活動類型包含一個主題。這些源可用於訂購一系列用例,包括實時處理,實時監控以及加載到Hadoop或離線數據倉庫系統以進行脫機處理和報告。
活動跟蹤一般很是高,由於爲每一個用戶頁面視圖生成了許多活動消息。
Kafka一般用於運營監控數據。這涉及從分佈式應用程序聚合統計信息以生成操做數據的集中式提要。
許多人使用Kafka做爲日誌聚合解決方案的替代品。日誌聚合一般從服務器收集物理日誌文件,並將它們放在中央位置(多是文件服務器或HDFS)進行處理。Kafka抽象出文件的細節,並將日誌或事件數據做爲消息流更清晰地抽象出來。這容許更低延遲的處理並更容易支持多個數據源和分佈式數據消耗。與Scribe或Flume等以日誌爲中心的系統相比,Kafka提供了一樣出色的性能,因爲複製而具備更強的耐用性保證,以及更低的端到端延遲。
許多Kafka用戶在處理由多個階段組成的管道時處理數據,其中原始輸入數據從Kafka主題中消費,而後聚合,豐富或以其餘方式轉換爲新主題以供進一步消費或後續處理。
例如,用於推薦新聞文章的處理管道能夠從RSS訂閱源抓取文章內容並將其發佈到「文章」主題; 進一步處理可能會對此內容進行規範化或重複數據刪除,並將已清理的文章內容發佈到新主題; 最終處理階段可能會嘗試向用戶推薦此內容。此類處理管道基於各個主題建立實時數據流的圖形。從0.10.0.0開始,這是一個輕量級但功能強大的流處理庫,名爲Kafka Streams 在Apache Kafka中可用於執行如上所述的此類數據處理。除了Kafka Streams以外,其餘開源流處理工具包括Apache Storm和 Apache Samza。
Event Sourcing是一種應用程序設計風格,其中狀態更改記錄爲按時間排序的記錄序列。Kafka對很是大的存儲日誌數據的支持使其成爲以這種風格構建的應用程序的出色後端。
Kafka能夠做爲分佈式系統的一種外部提交日誌。該日誌有助於在節點之間複製數據,並充當故障節點恢復其數據的從新同步機制。Kafka中的日誌壓縮功能有助於支持此用法。在這種用法中,Kafka相似於Apache BookKeeper項目。
到官網http://kafka.apache.org/downloads.html下載想要的版本;我這裏下載的最新穩定版2.1.0
注:因爲Kafka控制檯腳本對於基於Unix和Windows的平臺是不一樣的,所以在Windows平臺上使用bin\windows\ 而不是bin/ 將腳本擴展名更改成.bat。
[root@along ~]# wget http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz [root@along ~]# tar -C /data/ -xvf kafka_2.11-2.1.0.tgz [root@along ~]# cd /data/kafka_2.11-2.1.0/
kafka正常運行,必須配置zookeeper,不然不管是kafka集羣仍是客戶端的生存者和消費者都沒法正常的工做的;因此須要配置啓動zookeeper服務。
(1)zookeeper須要java環境
[root@along ~]# yum -y install java-1.8.0
(2)這裏kafka下載包已經包括zookeeper服務,因此只需修改配置文件,啓動便可。
若是須要下載指定zookeeper版本;能夠單獨去zookeeper官網http://mirrors.shu.edu.cn/apache/zookeeper/下載指定版本。
[root@along ~]# cd /data/kafka_2.11-2.1.0/ [root@along kafka_2.11-2.1.0]# grep "^[^#]" config/zookeeper.properties dataDir=/tmp/zookeeper #數據存儲目錄 clientPort=2181 #zookeeper端口 maxClientCnxns=0
注:可自行添加修改zookeeper配置
(1)修改配置文件
[root@along kafka_2.11-2.1.0]# grep "^[^#]" config/server.properties broker.id=0 listeners=PLAINTEXT://localhost:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
注:可根據本身需求修改配置文件
(2)配置環境變量
[root@along ~]# vim /etc/profile.d/kafka.sh export KAFKA_HOME="/data/kafka_2.11-2.1.0" export PATH="${KAFKA_HOME}/bin:$PATH" [root@along ~]# source /etc/profile.d/kafka.sh
(3)配置服務啓動腳本
[root@along ~]# vim /etc/init.d/kafka #!/bin/sh # # chkconfig: 345 99 01 # description: Kafka # # File : Kafka # # Description: Starts and stops the Kafka server # source /etc/rc.d/init.d/functions KAFKA_HOME=/data/kafka_2.11-2.1.0 KAFKA_USER=root export LOG_DIR=/tmp/kafka-logs [ -e /etc/sysconfig/kafka ] && . /etc/sysconfig/kafka # See how we were called. case "$1" in start) echo -n "Starting Kafka:" /sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &" echo " done." exit 0 ;; stop) echo -n "Stopping Kafka: " /sbin/runuser -s /bin/sh $KAFKA_USER -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill \-9" echo " done." exit 0 ;; hardstop) echo -n "Stopping (hard) Kafka: " /sbin/runuser -s /bin/sh $KAFKA_USER -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill -9" echo " done." exit 0 ;; status) c_pid=`ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'` if [ "$c_pid" = "" ] ; then echo "Stopped" exit 3 else echo "Running $c_pid" exit 0 fi ;; restart) stop start ;; *) echo "Usage: kafka {start|stop|hardstop|status|restart}" exit 1 ;; esac
(1)後臺啓動zookeeper服務
[root@along ~]# nohup zookeeper-server-start.sh /data/kafka_2.11-2.1.0/config/zookeeper.properties &
(2)啓動kafka服務
[root@along ~]# service kafka start Starting kafka (via systemctl): [ OK ] [root@along ~]# service kafka status Running 86018 [root@along ~]# ss -nutl Netid State Recv-Q Send-Q Local Address:Port Peer Address:Port tcp LISTEN 0 50 :::9092 :::* tcp LISTEN 0 50 :::2181 :::*
建立一個名爲「along」的主題,它只包含一個分區,只有一個副本:
[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic along Created topic "along".
若是咱們運行list topic命令,咱們如今能夠看到該主題:
[root@along ~]# kafka-topics.sh --list --zookeeper localhost:2181 along
Kafka附帶一個命令行客戶端,它將從文件或標準輸入中獲取輸入,並將其做爲消息發送到Kafka集羣。默認狀況下,每行將做爲單獨的消息發送。
運行生產者,而後在控制檯中鍵入一些消息以發送到服務器。
[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic along >This is a message >This is another message
Kafka還有一個命令行使用者,它會將消息轉儲到標準輸出。
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic along --from-beginning This is a message This is another message
全部命令行工具都有其餘選項; 運行不帶參數的命令將顯示更詳細地記錄它們的使用信息。
到目前爲止,咱們一直在與一個broker運行,但這並很差玩。對於Kafka,單個代理只是一個大小爲1的集羣,所以除了啓動一些代理實例以外沒有太多變化。可是爲了感覺它,讓咱們將咱們的集羣擴展到三個節點(仍然在咱們的本地機器上)。
[root@along kafka_2.11-2.1.0]# cd /data/kafka_2.11-2.1.0/ [root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-1.properties [root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-2.properties [root@along kafka_2.11-2.1.0]# vim config/server-1.properties broker.id=1 listeners=PLAINTEXT://:9093 log.dirs=/tmp/kafka-logs-1 [root@along kafka_2.11-2.1.0]# vim config/server-2.properties broker.id=2 listeners=PLAINTEXT://:9094 log.dirs=/tmp/kafka-logs-2
注:該broker.id 屬性是羣集中每一個節點的惟一且永久的名稱。咱們必須覆蓋端口和日誌目錄,由於咱們在同一臺機器上運行這些,而且咱們但願讓全部代理嘗試在同一端口上註冊或覆蓋彼此的數據。
[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-1.properties & [root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-2.properties & [root@along ~]# ss -nutl Netid State Recv-Q Send-Q Local Address:Port Peer Address:Port tcp LISTEN 0 50 ::ffff:127.0.0.1:9092 :::* tcp LISTEN 0 50 ::ffff:127.0.0.1:9093 :::* tcp LISTEN 0 50 ::ffff:127.0.0.1:9094 :::*
(1)如今建立一個複製因子爲3的新主題my-replicated-topic
[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic Created topic "my-replicated-topic".
(2)在一個集羣中,運行「describe topics」命令查看哪一個broker正在作什麼
[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
註釋:第一行給出了全部分區的摘要,每一個附加行提供有關一個分區的信息。因爲咱們只有一個分區用於此主題,所以只有一行。
請注意,Leader: 2,在個人示例中,節點2 是該主題的惟一分區的Leader。
(3)能夠在咱們建立的原始主題上運行相同的命令,以查看它的位置
[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic along Topic:along PartitionCount:1 ReplicationFactor:1 Configs: Topic: along Partition: 0 Leader: 0 Replicas: 0 Isr: 0
(4)向咱們的新主題發佈一些消息:
[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic >my test message 1 >my test message 2 >^C
(5)如今讓咱們使用這些消息:
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic my test message 1 my test message 2
(1)如今讓咱們測試一下容錯性。Broker 2 充當leader 因此讓咱們殺了它:
[root@along ~]# ps aux | grep server-2.properties |awk '{print $2}' 106737 [root@along ~]# kill -9 106737 [root@along ~]# ss -nutl tcp LISTEN 0 50 ::ffff:127.0.0.1:9092 :::* tcp LISTEN 0 50 ::ffff:127.0.0.1:9093 :::*
(2)leader 已切換到其中一個從屬節點,節點2再也不位於同步副本集中:
[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1
(3)即便最初接受寫入的leader 已經失敗,這些消息仍可供消費:
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic my test message 1 my test message 2
從控制檯寫入數據並將其寫回控制檯是一個方便的起點,但有時候可能但願使用其餘來源的數據或將數據從Kafka導出到其餘系統。對於許多系統,您能夠使用Kafka Connect導入或導出數據,而不是編寫自定義集成代碼。
Kafka Connect是Kafka附帶的工具,用於向Kafka導入和導出數據。它是一個可擴展的工具,運行鏈接器,實現與外部系統交互的自定義邏輯。在本快速入門中,咱們將瞭解如何使用簡單的鏈接器運行Kafka Connect,這些鏈接器將數據從文件導入Kafka主題並將數據從Kafka主題導出到文件。
(1)首先建立一些種子數據進行測試:
[root@along ~]# echo -e "foo\nbar" > test.txt
或者在Windows上:
> echo foo> test.txt > echo bar>> test.txt
(2)接下來,啓動兩個以獨立模式運行的鏈接器,這意味着它們在單個本地專用進程中運行。提供三個配置文件做爲參數。
[root@along ~]# connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties [2019-01-16 16:16:31,884] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67) [2019-01-16 16:16:31,903] INFO WorkerInfo values: ... ...
注:Kafka附帶的這些示例配置文件使用您以前啓動的默認本地羣集配置並建立兩個鏈接器:第一個是源鏈接器,它從輸入文件讀取行並生成每一個Kafka主題,第二個是宿鏈接器從Kafka主題讀取消息並將每一個消息生成爲輸出文件中的一行。
(3)驗證是否導入成功(另起終端)
在啓動過程當中,您將看到許多日誌消息,包括一些指示正在實例化鏈接器的日誌消息。
① 一旦Kafka Connect進程啓動,源鏈接器應該開始從test.txt主題讀取行並將其生成到主題connect-test,而且接收器鏈接器應該開始從主題讀取消息connect-test 並將它們寫入文件test.sink.txt。咱們能夠經過檢查輸出文件的內容來驗證數據是否已經過整個管道傳遞:
[root@along ~]# cat test.sink.txt foo bar
② 請注意,數據存儲在Kafka主題中connect-test,所以咱們還能夠運行控制檯使用者來查看主題中的數據(或使用自定義使用者代碼來處理它):
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"}
(4)繼續追加數據,驗證
[root@along ~]# echo Another line>> test.txt [root@along ~]# cat test.sink.txt foo bar Another line [root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"} {"schema":{"type":"string","optional":false},"payload":"Another line"}