Kafka的基本概念:java
主題:Topic特指Kafka處理的消息源(feeds of messages)的不一樣分類。linux
分區:Partition Topic物理上的分組,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。apache
Message:消息,是通訊的基本單位,每一個producer能夠向一個topic(主題)發佈一些消息。bootstrap
Producers:消息和數據生產者,向Kafka的一個topic發佈消息的過程叫作producers。緩存
Consumers:消息和數據消費者,訂閱topics並處理其發佈的消息的過程叫作consumers。bash
Broker:緩存代理,Kafka集羣中的一臺或多臺服務器統稱爲broker,這裏用的是AMQP協議。服務器
AMQP協議:Advanced Message Queuing Protocol (高級消息隊列協議)網絡
是一個標準開放的應用層的消息中間件協議。AMQP定義了經過網絡發送的字節流的數據格式。所以兼容性很是好,任何實現AMQP協議的程序均可以和與AMQP協議兼容的其餘程序交互,能夠很容易作到跨語言,跨平臺。架構
Kafka相對傳統技術的優點:分佈式
快速:單一的Kafka代理能夠處理成千上萬的客戶端,每秒處理數兆字節的讀寫操做。
可伸縮:在一組機器上對數據進行分區和簡化,以支持更大的數據
持久:消息是持久性的,並在集羣中進行復制,以防止數據丟失。
設計:它提供了容錯保證和持久性
Kafka的工做流程、架構圖:
圖1
圖2
如圖所示,Kafka的服務器數量通常爲奇數(2n+1),這是由於Zookeeper集羣的工做主機必須超過搭建集羣的主機半數才能對外提供服務,因此,這裏用奇數避免某臺服務器故障致使集羣不能正常工做。
由於Kafka集羣是把狀態保存在Zookeeper中的,而且Kafka的動態擴容是經過Zookeeper來實現的,因此須要優先搭建Zookeerper集羣,創建分佈式狀態管理。捋清楚思路後,開始準備環境,搭建集羣:
server1:192.168.11.141
server2:192.168.11.142
server3:192.168.11.143
zookeeper是基於Java環境開發的因此,須要先安裝Java,而後這裏使用的zookeeper安裝包版本爲zookeeper-3.4.14,Kafka的安裝包版本爲kafka_2.11-2.2.0。
1.修改主機名,關閉selinux,防火牆。
hostnamectl set-hostname --static server1 && bash
setenforce 0
sed -i "s/SELINUX=enforcing/SELINUX=disabled/g" /etc/selinux/config
systemctl stop firewalld && systemctl disable firewalld
2.建立zookeeper和Kafka的存放目錄:
cd /usr/local/
mkdir -pv zookeeper/{zkdata,zkdatalog} #zkdata是存放快照日誌,zkdatalog是存放事物日誌
mkdir -pv kafka/kfdatalogs #kfdatalogs是存放消息日誌
3.下載軟件包,並解壓到各自的工做目錄下(當前在/usr/local/下):
yum -y install java-1.8.0* #這裏個人源用的是阿里的base源和epel源,能夠直接裝這個版本的java。
wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz
tar xzf zookeeper-3.4.14.tar.gz -C zookeeper
tar xzf kafka_2.11-2.2.0.tgz -C kafka
4.生成並更改zookeeper的配置文件(三臺服務器上都須要設置):
#能夠將zoo_sample理解爲zookeeper自帶的配置文件模板,複製出一份以.cfg結尾的配置文件。
cp -av /usr/local/zookeeper/zookeeper-3.4.14/conf/{zoo_sample,zoo.cfg}
tickTime=2000 #zookeeper服務器之間的心跳時間。
initLimit=10 #zookeeper的最大鏈接失敗時間
syncLimit=5 #zookeeper的同步通訊時間
dataDir=/usr/local/zookeeper/zkdata #zookeeper的存放快照日誌的絕對路徑
dataLogDir=/usr/local/zookeeper/zkdatalog #zookeeper的存放事物日誌的絕對路徑
clientPort=2181 #zookeeper與客戶端的鏈接端口
server.1=192.168.11.139:2888:3888 #服務器及其編號,服務器IP地址,通訊端口,選舉端口
server.2=192.168.11.140:2888:3888 #服務器及其編號,服務器IP地址,通訊端口,選舉端口
server.3=192.168.11.141:2888:3888 #服務器及其編號,服務器IP地址,通訊端口,選舉端口
#以上端口都是zookeeper的默認端口,可隨需求進行修改
5.建立myid文件:
在server1上
echo "1" > /usr/local/zookeeper/zkdata/myid
#就是在不一樣的服務器上,將服務器編號發送到zkdata下的myid。
6.啓動zookeeper集羣
cd /usr/local/zookeeper/zookeeper-3.4.14/bin
./zkServer.sh start
./zkServer.sh status
#Mode: leader爲主節點,Mode: follower爲從節點,zk集羣通常只有一個leader,多個follower,主通常是相應客戶端的讀寫請求,而從主同步數據,當主掛掉以後就會從follower裏投票選舉一個leader出來。
到此,zookeeper集羣搭建結束,接下來基於zookeeper搭建kafka集羣:
cd /usr/local/kafka/kafka_2.11-2.2.0/config/
1.修改server.properties文件:
broker.id=1 #這裏和zookeeper中的myid文件同樣,採用的是惟一標識
prot=9092 #Kafka集羣間鏈接的端口,配置文件中沒有,但默認爲9092,可隨需求進行修改,這裏我們加上
log.dirs=/usr/local/kafka/kfdatalogs #存放Kafka消息日誌的絕對路徑
host.name=192.168.11.141 #顧名思義就是服務器的IP地址
log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務
replica.fetch.max.bytes=5242880 #取消息的最大直接數
zookeeper.connect=192.168.11.141:2181,192.168.11.142:2181,192.168.11.143:2181#集羣的各個節點的IP地址及zookeeper的端口,在zookeeper集羣設置的端口是多少這裏的端口就是多少。
未修改的配置文件信息:
num.network.threads=3 #這個是borker進行網絡處理的線程數
num.io.threads=8 #這個是borker進行I/O處理的線程數
num.partitions=1 #默認的分區數,一個topic默認1個分區數
log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務
replica.fetch.max.bytes=5242880 #取消息的最大直接數
log.segment.bytes=1073741824 #這個參數是:由於kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過時的消息若是有,刪除
log.cleaner.enable=false #是否啓用log壓縮,通常不用啓用,啓用的話能夠提升性能
2.啓動Kafka集羣:
cd /usr/local/kafka/kafka_2.11-2.2.0/bin
bash kafka-server-start.sh -daemon ../config/server.properties
jps #查看一下
3.開始測試:
3.1建立topic
./kafka-topics.sh --create --zookeeper 192.168.21.241:2181 --replication-factor 2 --partitions 1 --topic tian
#--replication-factor 2 複製兩份
#--partitions 1 建立一個分區
#--topic tian 主題爲tian
3.2建立一個broker:
./kafka-console-producer.sh --broker-list 192.168.21.241:9092 --topic tian
3.3建立一個consumer:
./kafka-console-consumer.sh --bootstrap-server 192.168.11.141:9092 --topic tian --from-beginning
3.4查看topic:
./kafka-topics.sh --list --zookeeper 192.168.11.141:12181
到此,Kafka集羣的搭建完成。