kafka是一個分佈式消息系統,由linkedin使用scala編寫,用做LinkedIn的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎。具備高水平擴展和高吞吐量。html
定義解釋:java
1. Java 和 scala都是運行在JVM上的語言。node
2. erlang和最近比較火的和go語言同樣是從代碼級別就支持高併發的一種語言,因此RabbitMQ天生就有很高的併發性能,可是有RabbitMQ嚴格按照AMQP進行實現,受到了不少限制。kafka的設計目標是高吞吐量,因此kafka本身設計了一套高性能可是不通用的協議,他也是仿照AMQP( Advanced Message Queuing Protocol 高級消息隊列協議)設計的。linux
3. 事務的概念:在數據庫中,多個操做一塊兒提交,要麼操做所有成功,要麼所有失敗。舉個例子, 在轉帳的時候付款和收款,就是一個事物的例子,你給一我的轉帳,你轉成功,而且對方正常行收到款項後,這個操做纔算成功,有一方失敗,那麼這個操做就是失敗的。 對應消在息隊列中,就是多條消息一塊兒發送,要麼所有成功,要麼所有失敗。3箇中只有ActiveMQ支持,這個是由於,RabbitMQ和Kafka爲了更高的性能,而放棄了對事物的支持 。git
4. 集羣:多臺服務器組成的總體叫作集羣,這個總體對生產者和消費者來講,是透明的。其實對消費系統組成的集羣添加一臺服務器減小一臺服務器對生產者和消費者都是無感之的。github
5. 負載均衡,對消息系統來講負載均衡是大量的生產者和消費者向消息系統發出請求消息,系統必須均衡這些請求使得每一臺服務器的請求達到平衡,而不是大量的請求,落到某一臺或幾臺,使得這幾臺服務器高負荷或超負荷工做,嚴重狀況下會中止服務或宕機。數據庫
6. 動態擴容是不少公司要求的技術之一,不支持動態擴容就意味着中止服務,這對不少公司來講是不能夠接受的。bootstrap
最後,kafka的動態擴容是經過 zookeeper 來實現的。vim
zookeeper是一種在分佈式系統中被普遍用來做爲:分佈式狀態管理、分佈式協調管理、分佈式配置管理、和分佈式鎖服務的集羣。kafka增長和減小服務器都會在zookeeper節點上觸發相應的事件,kafka系統會捕獲這些事件,進行新一輪的負載均衡,客戶端也會捕獲這些事件來進行新一輪的處理。瀏覽器
3.1 AMQP協議
Advanced Message Queuing Protocol (高級消息隊列協議)
The Advanced Message Queuing Protocol (AMQP):是一個標準開放的應用層的消息中間件(Message Oriented Middleware)協議。AMQP定義了經過網絡發送的字節流的數據格式。所以兼容性很是好,任何實現AMQP協議的程序均可以和與AMQP協議兼容的其餘程序交互,能夠很容易作到跨語言,跨平臺。
上面說的3種比較流行的消息隊列協議,要麼支持AMQP協議,要麼借鑑了AMQP協議的思想進行了開發、實現、設計。
3.2 一些基本的概念
(1)消費者(consumer):從消息隊列中請求消息的客戶端應用程序
(2)生產者(producer):向broker發佈消息的應用程序
(3)AMQP服務端(broker):用來接收生產者發送的消息並將這些消息路由給服務器中的隊列,便於fafka將生產者發送的消息,動態的添加到磁盤並給每一條消息一個偏移量,因此對於kafka一個broker就是一個應用程序的實例
kafka支持的客戶端語言:Kafka客戶端支持當前大部分主流語言,包括:C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript
可使用以上任何一種語言和kafka服務器進行通訊(即辨析本身的consumer從kafka集羣訂閱消息也能夠本身寫producer程序)
3.3 kafka 架構
生產者生產消息、kafka集羣、消費者獲取消息這樣一種架構,以下圖:
kafka集羣中的消息,是經過Topic(主題)來進行組織的,以下圖:
一些基本的概念:
1. 主題(Topic):一個主題相似新聞中的體育、娛樂、教育等分類概念,在實際工程中一般一個業務一個主題。
2. 分區(Partition):一個Topic中的消息數據按照多個分區組織,分區是kafka消息隊列組織的最小單位,一個分區能夠看做是一個FIFO( First Input First Output的縮寫,先入先出隊列)的隊列。
kafka分區是提升kafka性能的關鍵所在,當你發現你的集羣性能不高時,經常使用手段就是增長Topic的分區,分區裏面的消息是按照重新到老的順序進行組織,消費者從隊列頭訂閱消息,生產者從隊列尾添加消息。
工做圖:
備份(Replication):爲了保證分佈式可靠性,kafka0.8開始對每一個分區的數據進行備份(不一樣的Broker上),防止其中一個Broker宕機形成分區上的數據不可用。
以上基礎知識部分摘抄:http://www.javashuo.com/article/p-azoogwkc-ee.html
4.1 基礎環境
3 臺服務器
192.168.118.14 server1
192.168.118.15 server2
192.168.118.16 server3
Linux服務器一臺、三臺、五臺,zookeeper集羣的工做是超過半數才能對外提供服務。
爲何 zookeeper 集羣節點數量要是奇數?
首先須要明確 zookeeper 選舉的規則:leader選舉,要求 可用節點數量 > 總節點數 / 2 注意是 > 不是 ≥
採用奇數個的節點主要是出於兩方面的考慮:
1. 防止由腦裂形成的集羣不可用
首先,什麼是腦裂?集羣的腦裂一般是發生在節點之間通訊不可達的狀況下,集羣會分裂成不一樣的小集羣,小集羣各自選出本身的master節點,致使原有的集羣出現多個master節點的狀況,這就是腦裂。
下面舉例說一下爲何採用奇數臺節點,就能夠防止因爲腦裂形成的服務不可用:
(1) 假如zookeeper集羣有 5 個節點,發生了腦裂,腦裂成了A、B兩個小集羣:
(a) A : 1個節點 ,B :4個節點 , 或 A、B互換
(b) A : 2個節點, B :3個節點 , 或 A、B互換
能夠看出,上面這兩種狀況下,A、B中總會有一個小集羣知足 可用節點數量 > 總節點數量/2 。因此zookeeper集羣仍然可以選舉出leader , 仍然能對外提供服務,只不過是有一部分節點失效了而已。
(2) 假如zookeeper集羣有4個節點,一樣發生腦裂,腦裂成了A、B兩個小集羣:
(a) A:1個節點 , B:3個節點, 或 A、B互換
(b) A:2個節點 , B:2個節點
能夠看出,狀況(a) 是知足選舉條件的,與(1)中的例子相同。 可是狀況(b) 就不一樣了,由於A和B都是2個節點,都不知足 可用節點數量 > 總節點數量/2 的選舉條件, 因此此時zookeeper就完全不能提供服務了。
綜合上面兩個例子能夠看出: 在節點數量是奇數個的狀況下, zookeeper集羣總能對外提供服務(即便損失了一部分節點);若是節點數量是偶數個,會存在zookeeper集羣不能用的可能性(腦裂成兩個均等的子集羣的時候)。
在生產環境中,若是zookeeper集羣不能提供服務,那將是致命的 , 因此zookeeper集羣的節點數通常採用奇數個。
2. 在容錯能力相同的狀況下,奇數臺更節省資源
leader選舉,要求 可用節點數量 > 總節點數量/2 。注意 是 > , 不是 ≥。
舉兩個例子:
(1) 假如zookeeper集羣1 ,有3個節點,3/2=1.5 , 即zookeeper想要正常對外提供服務(即leader選舉成功),至少須要2個節點是正常的。換句話說,3個節點的zookeeper集羣,容許有一個節點宕機。
(2) 假如zookeeper集羣2,有4個節點,4/2=2 , 即zookeeper想要正常對外提供服務(即leader選舉成功),至少須要3個節點是正常的。換句話說,4個節點的zookeeper集羣,也容許有一個節點宕機。
那麼問題就來了, 集羣1與集羣2都有 容許1個節點宕機 的容錯能力,可是集羣2比集羣1多了1個節點。在相同容錯能力的狀況下,本着節約資源的原則,zookeeper集羣的節點數維持奇數個更好一些。
4.2 zookeeper 集羣搭建
服務器默認關閉 selinux 和 防火牆,在 kafka_2.12-1.0.1 中,已經集成了 zookeeper,不須要單獨安裝 zookeeper
(1)首先須要安裝 jdk
三個節點都須要安裝 jdk 支持:
[root@node1 ~]# tar xf jdk-8u77-linux-x64.tar.gz -C /usr/local/ [root@node1 ~]# vim /etc/profile # 在文件追加如下信息 JAVA_HOME=/usr/local/jdk1.8.0_77 JAVA_BIN=$JAVA_HOME/bin PATH=$PATH:$JAVA_BIN CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export JAVA_HOME JAVA_BIN PATH CLASSPATH [root@node1 ~]# ln -vs /usr/local/jdk1.8.0_77/bin/java /usr/bin/
(2)配置 zookeeper 集羣
[root@node1 ~]# mkdir -pv /opt/kafka/{zkdata,zkdatalogs} mkdir: created directory ‘/opt/kafka’ # kafka 安裝程序主目錄 mkdir: created directory ‘/opt/kafka/zkdata’ # zookeeper 存放快照日誌 mkdir: created directory ‘/opt/kafka/zkdatalogs’ # zookeeper 存放事務日誌 [root@node1 ~]# tar xf kafka_2.12-1.0.1.tgz -C /opt/kafka/ [root@node1 ~]# cd /opt/kafka/kafka_2.12-1.0.1/config/ [root@node1 /opt/kafka/kafka_2.12-1.0.1/config]# egrep ^[a-z] zookeeper.properties dataDir=/opt/kafka/zkdata dataLogDir=/opt/kafka/zkdatalogs clientPort=2181 maxClientCnxns=100 tickTime=2000 initLimit=10 syncLimit=5 server.1=192.168.118.14:2888:3888 server.2=192.168.118.15:2888:3888 server.3=192.168.118.16:2888:3888 # server.1 這個 1 是服務器的標識也能夠是其餘的數字, 表示這個是第幾號服務器,用來標識服務器,這個標識要寫到快照目錄下面myid文件裏 #192.168.118.16爲集羣裏的IP地址,第一個端口是master和slave之間的通訊端口,默認是2888,第二個端口是leader選舉的端口,集羣剛啓動的時候選舉或者leader掛掉以後進行新的選舉的端口默認是3888
dataDir: 快照日誌的存儲路徑 dataLogDir: 事物日誌的存儲路徑,若是不配置這個那麼事物日誌會默認存儲到dataDir制定的目錄,這樣會嚴重影響zk的性能,當zk吞吐量較大的時候,產生的事物日誌、快照日誌太多 clientPort: 這個端口就是客戶端鏈接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求。 maxClientCnxns: 客戶端最大鏈接數 tickTime: 這個時間是做爲 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每一個 tickTime 時間就會發送一個心跳。 initLimit: 這個配置項是用來配置 Zookeeper 接受客戶端(這裏所說的客戶端不是用戶鏈接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集羣中鏈接到 Leader 的 Follower 服務器)初始化鏈接時最長能忍受多少個心跳時間間隔數。當已經超過 5個心跳的時間(也就是 tickTime)長度後 Zookeeper 服務器尚未收到客戶端的返回信息,那麼代表這個客戶端鏈接失敗。總的時間長度就是 5*2000=10 秒 syncLimit: 這個配置項標識 Leader 與Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是5*2000=10秒
建立 myid 文件(切記不能忘記這個步驟)
myid 文件對應:
server.1=192.168.118.14:2888:3888
server.2=192.168.118.15:2888:3888
server.3=192.168.118.16:2888:3888
# node1 echo 1 > /opt/kafka/zkdata/myid # node2 echo 2 > /opt/kafka/zkdata/myid # node3 echo 3 > /opt/kafka/zkdata/myid
三個節點,配置相同,zookeeper 配置惟一不一樣點就是 myid
啓動服務(三個節點都須要啓動)
[root@node1 ~]# cd /opt/kafka/kafka_2.12-1.0.1/bin/ [root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./kafka-server-start.sh -daemon ../config/zookeeper.properties [root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# jps -m 11650 QuorumPeerMain ../config/zookeeper.properties 11679 Jps -m [root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# netstat -ntplu | egrep java tcp6 0 0 192.168.118.14:3888 :::* LISTEN 11650/java tcp6 0 0 :::38269 :::* LISTEN 11650/java tcp6 0 0 :::2181 :::* LISTEN 11650/java
查看集羣狀態:
# node1 [root@node1 ~]# echo status | nc localhost 2181 Zookeeper version: 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT Clients: /0:0:0:0:0:0:0:1:50099[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/0/0 Received: 2 Sent: 1 Connections: 1 Outstanding: 0 Zxid: 0x0 Mode: follower Node count: 4 # node2 [root@node2 ~]# echo status | nc localhost 2181 Zookeeper version: 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT Clients: /0:0:0:0:0:0:0:1:60588[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/0/0 Received: 2 Sent: 1 Connections: 1 Outstanding: 0 Zxid: 0x100000000 Mode: leader Node count: 4 # node3 [root@node3 ~]# echo status | nc localhost 2181 Zookeeper version: 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT Clients: /0:0:0:0:0:0:0:1:40457[0](queued=0,recved=1,sent=0) Latency min/avg/max: 0/0/0 Received: 2 Sent: 1 Connections: 1 Outstanding: 0 Zxid: 0x100000000 Mode: follower Node count: 4
目前 node2 是 leader 節點
4.3 Kafka 集羣搭建
首先建立 kafka 日誌目錄,後面配置會用到
[root@node2 ~]# mkdir -pv /opt/kafka/kafka-logs mkdir: created directory ‘/opt/kafka/kafka-logs’
kafka 配置文件以下(綠色部分是須要修改的):
broker.id=0 #當前機器在集羣中的惟一標識,和zookeeper的myid性質同樣 listeners=PLAINTEXT://192.168.118.14:9092 #當前kafka對外提供服務的端口默認是9092 num.network.threads=3 #這個是borker進行網絡處理的線程數 num.io.threads=8 #這個是borker進行I/O處理的線程數 socket.send.buffer.bytes=102400 #發送緩衝區buffer大小,數據不是一會兒就發送的,先回存儲到緩衝區了到達必定的大小後在發送,能提升性能 socket.receive.buffer.bytes=102400 #kafka接收緩衝區大小,當數據到達必定大小後在序列化到磁盤 socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小 log.dirs=/opt/kafka/kafka_2.12-1.0.1/kafka-logs #消息存放的目錄,這個目錄能夠配置爲「,」逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,若是配置多個目錄,新建立的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個 num.partitions=3 #默認的分區數,一個topic默認1個分區數 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=2 #kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務 transaction.state.log.replication.factor=2 transaction.state.log.min.isr=1 log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天 log.segment.bytes=1073741824 #這個參數是:由於kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過時的消息若是有,刪除 zookeeper.connect=192.168.118.14:2181,192.168.118.15:2181,192.168.118.16:2181 #設置zookeeper的鏈接端口 zookeeper.connection.timeout.ms=6000 # zookeeper leader切換鏈接時間,默認 6秒 group.initial.rebalance.delay.ms=0
啓動 kafka 集羣並測試
1. 啓動服務(3個節點都須要啓動)
[root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./kafka-server-start.sh -daemon ../config/server.properties [root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# jps -m 21088 Jps -m 11650 QuorumPeerMain ../config/zookeeper.properties 21020 Kafka ../config/server.properties
2. 建立 topic 來驗證是否建立成功
建立 topic [root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./kafka-topics.sh --create --zookeeper 192.168.118.14:2181 --replication-factor 3 --partitions 3 --topic superman Created topic "superman". # 解釋 --replication-factor 2 #複製3份 --partitions 1 #建立3個分區 --topic #主題爲 superman 在一臺服務器上建立一個發佈者 [root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./kafka-console-producer.sh --broker-list 192.168.118.14:9092 --topic superman 在一臺服務器上建立一個消費者 [root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.118.14:9092 --topic superman --from-beginning
測試以下圖:
其餘命令
查看 topic
[root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./kafka-topics.sh --list --zookeeper localhost:2181 __consumer_offsets superman
查看 topic 狀態
[root@node1 /opt/kafka/kafka_2.12-1.0.1/bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic superman Topic:superman PartitionCount:3 ReplicationFactor:3 Configs: Topic: superman Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: superman Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: superman Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
這裏記錄兩種監控工具的搭建:
1. kafka manager
2. kafkaoffsetmonitor
kafka manager 主要用來管理 kafka 集羣,kafkaoffsetmonitor 主要用來實時監控消費者信息。
5.1 kafka manager 安裝
kafka manager 下載地址:https://github.com/yahoo/kafka-manager 能夠下載源碼自行編譯。這裏使用已經編譯好的包直接搭建。
注意:上面下載的是源碼,下載後須要按照後面步驟進行編譯。若是以爲麻煩,能夠直接從下面地址下載編譯好的 kafka-manager-1.3.3.7.zip。 連接:https://pan.baidu.com/s/1qYifoa4 密碼:el4o
(1)首先安裝 jdk
[root@192.168.118.17 ~]#tar xf jdk-8u77-linux-x64.tar.gz -C /usr/local/ [root@192.168.118.17 ~]#vim /etc/profile # 追加以下: JAVA_HOME=/usr/local/jdk1.8.0_77 JAVA_BIN=$JAVA_HOME/bin PATH=$PATH:$JAVA_BIN CLASSPATH=$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export JAVA_HOME JAVA_BIN PATH CLASSPATH [root@192.168.118.17 ~]#source /etc/profile [root@192.168.118.17 ~]#ln -vs /usr/local/jdk1.8.0_77/bin/java /usr/bin/ ‘/usr/bin/java’ -> ‘/usr/local/jdk1.8.0_77/bin/java’
(2)解壓配置 kafka manager
修改 kafka-manager.zkhosts
嘗試啓動 kafka manager
[root@192.168.118.17 /opt/kafka-manager-1.3.3.7/conf]#cd ../bin/ [root@192.168.118.17 /opt/kafka-manager-1.3.3.7/bin]#./kafka-manager -Dconfig.file=../conf/application.conf -Dhttp.port=8080 &
啓動成功。
瀏覽器訪問:
(3)配置 kafka manager
點擊保存後,若是kafka 沒有開啓 JMX_PORT 會出現以下 kafka manager 日誌報錯信息:
[error] k.m.a.c.BrokerViewCacheActor - Failed to get broker topic segment metrics for BrokerIdentity(1,192.168.118.15,9092,-1,false) java.lang.IllegalArgumentException: requirement failed: No jmx port but jmx polling enabled!
解決辦法:
修改 kafka-server-start.sh
增長 JMX 的端口信息
修改kafka-run-class.sh
增長綠色部分,注意對應的 ip地址
三個節點都須要修改,修改完畢重啓 kafka 服務, kafka manager 以下:
5.2 KafkaOffsetMonitor
KafkaOffsetMonitor 下載地址:https://github.com/quantifind/KafkaOffsetMonitor/releases 建議使用 v0.2.0版本,實測 v0.2.1版本存在BUG
KafkaOffsetMonitor 的使用很簡單,下載下來直接啓動就OK
[root@192.168.118.17 /opt/kafkaoffsetmonitor]#ls kafka-monitor-start.sh KafkaOffsetMonitor-assembly-0.2.0.jar [root@192.168.118.17 /opt/kafkaoffsetmonitor]#cat kafka-monitor-start.sh nohup java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk 192.168.118.14:2181,192.168.118.15:2181,192.168.118.16:2181 --port 8088 --refresh 5.minutes --retain 1.day &
kafka-monitor-start.sh 爲啓動腳本,須要手動填寫,參數解釋以下:
參數說明:
zk :zookeeper主機地址,若是有多個,用逗號隔開
port :應用程序端口
refresh :應用程序在數據庫中刷新和存儲點的頻率
retain :在db中保留多長時間
dbName :保存的數據庫文件名,默認爲offsetapp
啓動服務
[root@192.168.118.17 /opt/kafkaoffsetmonitor]#chmod +x kafka-monitor-start.sh [root@192.168.118.17 /opt/kafkaoffsetmonitor]#./kafka-monitor-start.sh [root@192.168.118.17 /opt/kafkaoffsetmonitor]#nohup: appending output to ‘nohup.out’ [root@192.168.118.17 /opt/kafkaoffsetmonitor]#lsof -i :8088 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 20884 root 6u IPv6 40838 0t0 TCP *:radan-http (LISTEN)
配置安裝成功。