Zookeeper+Kafka集羣部署

Kafka的基本概念:java

主題:Topic特指Kafka處理的消息源(feeds of messages)的不一樣分類。linux

分區:Partition Topic物理上的分組,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。apache

Message:消息,是通訊的基本單位,每一個producer能夠向一個topic(主題)發佈一些消息。bootstrap

Producers:消息和數據生產者,向Kafka的一個topic發佈消息的過程叫作producers。緩存

Consumers:消息和數據消費者,訂閱topics並處理其發佈的消息的過程叫作consumers。bash

Broker:緩存代理,Kafka集羣中的一臺或多臺服務器統稱爲broker,這裏用的是AMQP協議。服務器

AMQP協議:Advanced Message Queuing Protocol (高級消息隊列協議)網絡

是一個標準開放的應用層的消息中間件協議。AMQP定義了經過網絡發送的字節流的數據格式。所以兼容性很是好,任何實現AMQP協議的程序均可以和與AMQP協議兼容的其餘程序交互,能夠很容易作到跨語言,跨平臺。架構



Kafka相對傳統技術的優點:分佈式

快速:單一的Kafka代理能夠處理成千上萬的客戶端,每秒處理數兆字節的讀寫操做。

可伸縮:在一組機器上對數據進行分區和簡化,以支持更大的數據

持久:消息是持久性的,並在集羣中進行復制,以防止數據丟失。

設計:它提供了容錯保證和持久性


Kafka的工做流程、架構圖:

k1.png

                    圖1




k2.png

                                                                                                                                                圖2



如圖所示,Kafka的服務器數量通常爲奇數(2n+1),這是由於Zookeeper集羣的工做主機必須超過搭建集羣的主機半數才能對外提供服務,因此,這裏用奇數避免某臺服務器故障致使集羣不能正常工做。

由於Kafka集羣是把狀態保存在Zookeeper中的,而且Kafka的動態擴容是經過Zookeeper來實現的,因此須要優先搭建Zookeerper集羣,創建分佈式狀態管理。捋清楚思路後,開始準備環境,搭建集羣:

server1:192.168.11.141

server2:192.168.11.142

server3:192.168.11.143

zookeeper是基於Java環境開發的因此,須要先安裝Java,而後這裏使用的zookeeper安裝包版本爲zookeeper-3.4.14,Kafka的安裝包版本爲kafka_2.11-2.2.0。


1.修改主機名,關閉selinux,防火牆。

hostnamectl set-hostname --static server1 && bash

setenforce 0

sed -i "s/SELINUX=enforcing/SELINUX=disabled/g" /etc/selinux/config

systemctl stop firewalld && systemctl disable firewalld


2.建立zookeeper和Kafka的存放目錄:

cd /usr/local/

mkdir -pv zookeeper/{zkdata,zkdatalog}                 #zkdata是存放快照日誌,zkdatalog是存放事物日誌

mkdir -pv kafka/kfdatalogs                 #kfdatalogs是存放消息日誌


3.下載軟件包,並解壓到各自的工做目錄下(當前在/usr/local/下):

yum -y install java-1.8.0*                                         #這裏個人源用的是阿里的base源和epel源,能夠直接裝這個版本的java。

wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz

tar xzf zookeeper-3.4.14.tar.gz -C zookeeper 

tar xzf kafka_2.11-2.2.0.tgz -C kafka 


4.生成並更改zookeeper的配置文件(三臺服務器上都須要設置):

#能夠將zoo_sample理解爲zookeeper自帶的配置文件模板,複製出一份以.cfg結尾的配置文件。

cp -av /usr/local/zookeeper/zookeeper-3.4.14/conf/{zoo_sample,zoo.cfg}

tickTime=2000              #zookeeper服務器之間的心跳時間。

initLimit=10                 #zookeeper的最大鏈接失敗時間

syncLimit=5          #zookeeper的同步通訊時間

dataDir=/usr/local/zookeeper/zkdata                 #zookeeper的存放快照日誌的絕對路徑

dataLogDir=/usr/local/zookeeper/zkdatalog                 #zookeeper的存放事物日誌的絕對路徑

clientPort=2181              #zookeeper與客戶端的鏈接端口

server.1=192.168.11.139:2888:3888                  #服務器及其編號,服務器IP地址,通訊端口,選舉端口

server.2=192.168.11.140:2888:3888                 #服務器及其編號,服務器IP地址,通訊端口,選舉端口

server.3=192.168.11.141:2888:3888                #服務器及其編號,服務器IP地址,通訊端口,選舉端口

#以上端口都是zookeeper的默認端口,可隨需求進行修改


5.建立myid文件:

在server1上

echo "1" > /usr/local/zookeeper/zkdata/myid

#就是在不一樣的服務器上,將服務器編號發送到zkdata下的myid。


6.啓動zookeeper集羣

cd /usr/local/zookeeper/zookeeper-3.4.14/bin

./zkServer.sh start

./zkServer.sh status

#Mode: leader爲主節點,Mode: follower爲從節點,zk集羣通常只有一個leader,多個follower,主通常是相應客戶端的讀寫請求,而從主同步數據,當主掛掉以後就會從follower裏投票選舉一個leader出來。



到此,zookeeper集羣搭建結束,接下來基於zookeeper搭建kafka集羣:

cd /usr/local/kafka/kafka_2.11-2.2.0/config/

1.修改server.properties文件:

broker.id=1              #這裏和zookeeper中的myid文件同樣,採用的是惟一標識

prot=9092              #Kafka集羣間鏈接的端口,配置文件中沒有,但默認爲9092,可隨需求進行修改,這裏我們加上

log.dirs=/usr/local/kafka/kfdatalogs                         #存放Kafka消息日誌的絕對路徑

host.name=192.168.11.141                  #顧名思義就是服務器的IP地址

log.retention.hours=168                  #默認消息的最大持久化時間,168小時,7天

message.max.byte=5242880                      #消息保存的最大值5M

default.replication.factor=2                         #kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務

replica.fetch.max.bytes=5242880                 #取消息的最大直接數

zookeeper.connect=192.168.11.141:2181,192.168.11.142:2181,192.168.11.143:2181#集羣的各個節點的IP地址及zookeeper的端口,在zookeeper集羣設置的端口是多少這裏的端口就是多少。


未修改的配置文件信息:

num.network.threads=3              #這個是borker進行網絡處理的線程數

num.io.threads=8                 #這個是borker進行I/O處理的線程數

num.partitions=1                  #默認的分區數,一個topic默認1個分區數

log.retention.hours=168                  #默認消息的最大持久化時間,168小時,7天

message.max.byte=5242880               #消息保存的最大值5M

default.replication.factor=2                          #kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務

replica.fetch.max.bytes=5242880                          #取消息的最大直接數

log.segment.bytes=1073741824                         #這個參數是:由於kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件

log.retention.check.interval.ms=300000                  #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過時的消息若是有,刪除

log.cleaner.enable=false                      #是否啓用log壓縮,通常不用啓用,啓用的話能夠提升性能


2.啓動Kafka集羣:

cd /usr/local/kafka/kafka_2.11-2.2.0/bin

bash kafka-server-start.sh -daemon ../config/server.properties

jps                   #查看一下


3.開始測試:

3.1建立topic

./kafka-topics.sh --create --zookeeper 192.168.21.241:2181 --replication-factor 2 --partitions 1 --topic tian

#--replication-factor 2                  複製兩份

#--partitions 1                 建立一個分區

#--topic tian                 主題爲tian


3.2建立一個broker:

./kafka-console-producer.sh --broker-list 192.168.21.241:9092 --topic tian


3.3建立一個consumer:

./kafka-console-consumer.sh  --bootstrap-server 192.168.11.141:9092 --topic tian --from-beginning


3.4查看topic:

./kafka-topics.sh --list --zookeeper 192.168.11.141:12181


到此,Kafka集羣的搭建完成。

相關文章
相關標籤/搜索