1、kafka 簡介html
kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。這種動做(網頁瀏覽,搜索和其餘用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。java
1.1 kafka名詞解釋linux
每一個消息(也叫做record記錄,也被稱爲消息)是由一個key,一個value和時間戳構成。apache
1.2 kafka有四個核心API介紹bash
1.3 kafka基基原理服務器
一般來說,消息模型能夠分爲兩種:隊列和發佈-訂閱式。隊列的處理方式是一組消費者從服務器讀取消息,一條消息只有其中的一個消費者來處理。在發佈-訂閱模型中,消息被廣播給全部的消費者,接收到消息的消費者均可以處理此消息。Kafka爲這兩種模型提供了單一的消費者抽象模型: 消費者組(consumer group)。消費者用一個消費者組名標記本身。網絡
一個發佈在Topic上消息被分發給此消費者組中的一個消費者。假如全部的消費者都在一個組中,那麼這就變成了queue模型。假如全部的消費者都在不一樣的組中,那麼就徹底變成了發佈-訂閱模型。更通用的, 咱們能夠建立一些消費者組做爲邏輯上的訂閱者。每一個組包含數目不等的消費者,一個組內多個消費者能夠用來擴展性能和容錯。 oracle
而且,kafka可以保證生產者發送到一個特定的Topic的分區上,消息將會按照它們發送的順序依次加入,也就是說,若是一個消息M1和M2使用相同的producer發送,M1先發送,那麼M1將比M2的offset低,而且優先的出如今日誌中。消費者收到的消息也是此順序。若是一個Topic配置了複製因子(replication facto)爲N,那麼能夠容許N-1服務器宕機而不丟失任何已經提交(committed)的消息。此特性說明kafka有比傳統的消息系統更強的順序保證。可是,相同的消費者組中不能有比分區更多的消費者,不然多出的消費者一直處於空等待,不會收到消息。socket
1.4 kafka應用場景
構建實時的流數據管道,可靠地獲取系統和應用程序之間的數據。
構建實時流的應用程序,對數據流進行轉換或反應。jsp
1.5 主題和日誌 (Topic和Log)
每個分區(partition)都是一個順序的、不可變的消息隊列,而且能夠持續的添加。分區中的消息都被分了一個序列號,稱之爲偏移量(offset),在每一個分區中此偏移量都是惟一的。Kafka集羣保持全部的消息,直到它們過時,不管消息是否被消費了。實際上消費者所持有的僅有的元數據就是這個偏移量,也就是消費者在這個log中的位置。 這個偏移量由消費者控制:正常狀況當消費者消費消息的時候,偏移量也線性的的增長。可是實際偏移量由消費者控制,消費者能夠將偏移量重置爲更老的一個偏移量,從新讀取消息。 能夠看到這種設計對消費者來講操做自如, 一個消費者的操做不會影響其它消費者對此log的處理。 再說說分區。Kafka中採用分區的設計有幾個目的。一是能夠處理更多的消息,不受單臺服務器的限制。Topic擁有多個分區意味着它能夠不受限的處理更多的數據。第二,分區能夠做爲並行處理的單元,稍後會談到這一點。
1.6 分佈式(Distribution)
Log的分區被分佈到集羣中的多個服務器上。每一個服務器處理它分到的分區。根據配置每一個分區還能夠複製到其它服務器做爲備份容錯。 每一個分區有一個leader,零或多個follower。Leader處理此分區的全部的讀寫請求,而follower被動的複製數據。若是leader宕機,其它的一個follower會被推舉爲新的leader。 一臺服務器可能同時是一個分區的leader,另外一個分區的follower。 這樣能夠平衡負載,避免全部的請求都只讓一臺或者某幾臺服務器處理。
2、kafka 安裝
2.1 jdk安裝
#以oracle jdk爲例,下載地址http://java.sun.com/javase/downloads/index.jsp
yum -y install jdk-8u141-linux-x64.rpm
2.2 安裝zookeeper
wget http://apache.forsale.plus/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz tar zxf zookeeper-3.4.9.tar.gz mv zookeeper-3.4.9 /data/zk
修改配置文件內容以下所示:
[root@localhost ~]# cat /data/zk/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zk/data/zookeeper
dataLogDir=/data/zk/data/logs
clientPort=2181
maxClientCnxns=60
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=zk01:2888:3888
server.2=zk02:2888:3888
server.3=zk03:2888:3888
參數說明:
server.id=host:port:port:表示了不一樣的zookeeper服務器的自身標識,做爲集羣的一部分,每一臺服務器應該知道其餘服務器的信息。用戶能夠從「server.id=host:port:port」 中讀取到相關信息。在服務器的data(dataDir參數所指定的目錄)下建立一個文件名爲myid的文件,這個
文件的內容只有一行,指定的是自身的id值。好比,服務器「1」應該在myid文件中寫入「1」。這個id必須在集羣環境中服務器標識中是惟一的,且大小在1~255之間。這同樣配置中,zoo1表明第一臺服務器的IP地址。第一個端口號(port)是從follower鏈接到leader機器的
端口,第二個端口是用來進行leader選舉時所用的端口。因此,在集羣配置過程當中有三個很是重要的端口:clientPort:218一、port:288八、port:3888。
關於zoo.cfg配置文件說明,參考鏈接https://zookeeper.apache.org/doc/r3.4.10/zookeeperAdmin.html#sc_configuration;
若是想更換日誌輸出位置,除了在zoo.cfg加入"dataLogDir=/data/zk/data/logs"外,還須要修改zkServer.sh文件,大概修改方式地方在125行左右,內容以下:
125 ZOO_LOG_DIR="$($GREP "^[[:space:]]*dataLogDir" "$ZOOCFG" | sed -e 's/.*=//')"
126 if [ ! -w "$ZOO_LOG_DIR" ] ; then
127 mkdir -p "$ZOO_LOG_DIR"
128 fi
在啓動服務以前,還須要分別在zookeeper建立myid,方式以下:
echo 1 > /data/zk/data/zookeeper/myid
啓動服務
/data/zk/bin/zkServer.sh start
驗證服務
### 查看相關端口號
[root@localhost ~]# ss -lnpt|grep java LISTEN 0 50 :::34442 :::* users:(("java",pid=2984,fd=18)) LISTEN 0 50 ::ffff:192.168.15.133:3888 :::* users:(("java",pid=2984,fd=26)) LISTEN 0 50 :::2181 :::* users:(("java",pid=2984,fd=25))
###查看zookeeper服務狀態
[root@localhost ~]# /data/zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfg
Mode: follower
zookeeper相關命令說明,參考https://zookeeper.apache.org/doc/r3.4.10/zookeeperStarted.html (文末有說明);
2.3 安裝kafka
tar zxf kafka_2.11-0.11.0.0.tgz mv kafka_2.11-0.11.0.0 /data/kafka
修改配置
[root@localhost ~]# grep -Ev "^#|^$" /data/kafka/config/server.properties
broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://192.168.15.131:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.15.131:2181,192.168.15.132:2181,192.168.15.133:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
提示:其餘主機將該機器的kafka目錄拷貝便可,而後須要修改broker.id、listeners地址。有關kafka配置文件參數,參考:http://orchome.com/12;
啓動服務
/data/kafka/bin/kafka-server-start.sh /data/kafka/config/server.properties
驗證服務
### 隨便在其中一臺主機執行 /data/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.15.131:2181,192.168.15.132:2181,192.168.15.133:2181 --replication-factor 1 --partitions 1 --topic test ###在其餘主機查看 /data/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.15.131:2181,192.168.15.132:2181,192.168.15.133:2181