架構師之路-如何創建高可用消息中間件kafka

Kafkahtml

1、熟悉kafka java

l Server-1 broker其實就是kafka的server,由於producer和consumer都要去連它。Broker主要仍是作存儲用。node

l Server-2是zookeeper的server端,zookeeper的具體做用你能夠去官網查,在這裏你能夠先想象,它維持了一張表,記錄了各個節點的IP、端口等信息(之後還會講到,它裏面還存了kafka的相關信息)。linux

l Server-三、四、5他們的共同之處就是都配置了zkClient,更明確的說,就是運行前必須配置zookeeper的地址,道理也很簡單,這之間的鏈接都是須要zookeeper來進行分發的。算法

l Server-1和Server-2的關係,他們能夠放在一臺機器上,也能夠分開放,zookeeper也能夠配集羣。目的是防止某一臺掛了。apache

簡單說下整個系統運行的順序:服務器

  1. 啓動zookeeper的server併發

  2. 啓動kafka的servertcp

  3. Producer若是生產了數據,會先經過zookeeper找到broker,而後將數據存放進broker分佈式

  4. Consumer若是要消費數據,會先經過zookeeper找對應的broker,而後消費。

Kafka 分佈式消息隊列 相似產品有JBoss、MQ

1、由Linkedln 開源,使用scala開發,有以下幾個特色:

(1)高吞吐

(2)分佈式

(3)支持多語言客戶端 (C++、Java)

2、組成: 客戶端是 producer 和 consumer,提供一些API,服務器端是Broker,客戶端提供能夠向Broker內發佈消息、消費消息,服務器端提供消息的存儲等功能

Kafka 特色是支持分區、分佈式、可拓展性強

3、Kafka 的消息分幾個層次

(1)Topic 一類主題

(2)Partition 默認每一個消息有2個分區,建立Topic能夠指定分區數,1天有 1億行能夠分8個分區,若是天天幾十萬行就一個分區吧

(3)Message 是每一個消息

4、數據處理流程

1.生產者 生產消息、將消息發佈到指定的topic分區

2.kafka 集羣接收到producer發過來的消息後,將其持久化到硬盤,能夠指定時長,而不關注消息是否被消費

3.consumer從kafka集羣pull或push方式,並控制獲取消息的offset偏移量,consumer重啓時須要根據offset開始再次消費數據,consumer本身維護offset

5、kafka如何實現高吞吐量

1.充分利用磁盤的順序讀寫 2.數據批量發送 3.數據壓縮 4.Topic劃分多個partition

6、kafka 如何實現load balance &HA

1)producer 根據用戶指定的算法,將消息發送到指定的partition 2)存在多個partition,每一個partition存在多個副本replica,每一個replica分佈在不一樣的broker節點上 3)每一個partition須要選取lead partition,leader partition負責讀寫,並由zookeeper負責fail over 快速失敗 4)經過zookeeper管理broker與consumer的動態加入與離開

7、擴容

當須要增長broker節點時,新增的broker會向zookeeper註冊,而producer及consumer會根據zookeeper上的watcher感知這些變化,並及時做出調整

副本分配邏輯規則以下:

在Kafka集羣中,每一個Broker都有均等分配Partition的Leader機會。

上述圖Broker Partition中,箭頭指向爲副本,以Partition-0爲例:broker1中parition-0爲Leader,Broker2中Partition-0爲副本。

上述圖種每一個Broker(按照BrokerId有序)依次分配主Partition,下一個Broker爲副本,如此循環迭代分配,多副本都遵循此規則。

副本分配算法以下:

將全部N Broker和待分配的i個Partition排序.

將第i個Partition分配到第(i mod n)個Broker上.

將第i個Partition的第j個副本分配到第((i + j) mod n)個Broker上.

2、安裝zookeeper,並配置集羣

2.1配置java環境

將jdk-7u79-linux-x64上傳到三臺服務器安裝配置。

給三臺服務器分別建立java文件夾。

將jdk 放到java文件夾下並解壓,而後刪掉壓縮文件。

配置jdk全局變量。

vi /etc/profile

export JAVA_HOME=/usr/local/java/jdk1.7.0_79

export JRE_HOME=/usr/local/java/jdk1.7.0_79/jre

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:$CLASSPATH

export PATH=$JAVA_HOME/bin:$PATH

2.2 修改操做系統的/etc/hosts文件,添加IP與主機名映射:

zookeeper cluster servers

172.16.0.41 edu-zk-01

172.16.0.42 edu-zk-02

172.16.0.43 edu-zk-03

2.3下載zookeeper-3.4.7.tar.gz 到/home/zy/zookeeper目錄

mkdir -p /usr/local/zookeeper

cd / usr/local/zookeeper/

wget

http://apache.fayea.com/zookeeper/zookeeper-3.4.7/zookeeper-3.4.7.tar.gz

2.4 解壓zookeeper安裝包,並對節點重民名

tar -zxvf zookeeper-3.4.7.tar.gz

服務器1:

mv zookeeper-3.4.7 node-01

服務器2: mv zookeeper-3.4.7 node-02

服務器3:

mv zookeeper-3.4.7 node-03

2.5 在zookeeper的各個節點下 建立數據和日誌目錄

cd /usr/local/zookeeper

mkdir data

mkdir logs

2.6 重命名配置文件

將zookeeper/node-0X/conf目錄下的zoo_sample.cfg文件拷貝一份,命名爲zoo.cfg:

cp zoo_sample.cfg zoo.cfg

2.7 修改zoo.cfg 配置文件

三臺服務器作一樣配置:zookeeper/node-01的配置(/usr/local/zookeeper/node-01/conf/zoo.cfg)以下:

參數說明:

tickTime=2000

tickTime這個時間是做爲Zookeeper服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每一個tickTime時間就會發送一個心跳。

initLimit=10

initLimit這個配置項是用來配置Zookeeper接受客戶端(這裏所說的客戶端不是用戶鏈接Zookeeper服務器的客戶端,而是Zookeeper服務器集羣中鏈接到Leader的Follower 服務器)初始化鏈接時最長能忍受多少個心跳時間間隔數。

當已經超過10個心跳的時間(也就是tickTime)長度後Zookeeper 服務器尚未收到客戶端的返回信息,那麼代表這個客戶端鏈接失敗。總的時間長度就是10*2000=20 秒。

syncLimit=5

syncLimit這個配置項標識Leader與Follower之間發送消息,請求和應答時間長度,最長不能超過多少個tickTime的時間長度,總的時間長度就是5*2000=10秒。

dataDir=/usr/local/zookeeper/node-01/data

dataDir顧名思義就是Zookeeper保存數據的目錄,默認狀況下Zookeeper將寫數據的日誌文件也保存在這個目錄裏。

clientPort=2181

clientPort這個端口就是客戶端(應用程序)鏈接Zookeeper服務器的端口,Zookeeper會監聽這個端口接受客戶端的訪問請求。

server.A=B:C:D

server.1=edu-zk-01:2881:3881

server.2=edu-zk-02:2882:3882

server.3=edu-zk-03:2883:3883

A是一個數字,表示這個是第幾號服務器;

B是這個服務器的IP地址(或者是與IP地址作了映射的主機名);

C第一個端口用來集羣成員的信息交換,表示這個服務器與集羣中的Leader服務器交換信息的端口;

D是在leader掛掉時專門用來進行選舉leader所用的端口。

注意:若是是僞集羣的配置方式,不一樣的 Zookeeper 實例通訊端口號不能同樣,因此要給它們分配不一樣的端口號。

2.8 建立myid文件

在dataDir=/usr/local/zookeeper/node-0X/data 下建立myid文件

編輯myid文件,並在對應的IP的機器上輸入對應的編號。如在node-01上,myid文件內容就是1,node-02上就是2,node-03上就是3:

vi /usr/local/zookeeper/node-01/data/myid## 值爲1

vi /usr/local/zookeeper/node-02/data/myid## 值爲2

vi /usr/local/zookeeper/node-03/data/myid## 值爲3

2.9 啓動測試zookeeper

(1)進入/usr/local/zookeeper/node-0X/bin目錄下執行:

/usr/local/zookeeper/node-01/bin/zkServer.sh start

/usr/local/zookeeper/node-02/bin/zkServer.sh start

/usr/local/zookeeper/node-03/bin/zkServer.sh start

(2)輸入jps命令查看進程:

其中,QuorumPeerMain是zookeeper進程,說明啓動正常

(3)查看狀態:

/usr/local/zookeeper/node-01/bin/zkServer.sh status

(4)查看zookeeper服務輸出信息:

因爲服務信息輸出文件在/usr/local/zookeeper/node-0X/bin/zookeeper.out

$ tail -500f zookeeper.out

3、KAFKA集羣配置

4.1 下載 kafka_2.9.2-0.8.1

分別在三臺服務器建立kafka目錄而且下載kafka壓縮包

mkdir /usr/local/kafka

tar –zxvf kafka_2.9.2-0.8.1.tar.gz

4.2 建立log文件夾

mkdir /usr/local/kafka/kafkalogs

4.3 配置kafka

cd /usr/local/kafka/kafka_2.9.2-0.8.1/config

vi server.properties 修改項以下:

broker.id=0 //當前機器在集羣中的惟一標識

port=9092 //kafka對外提供服務的tcp端口

host.name=172.16.0.41 //主機IP地址

log.dirs=/usr/local/kafka/kafkalogs //log存放目錄

message.max.byte=5048576 //kafka一條消息容納的消息最大爲多少

default.replication.factor=2 //每一個分區默認副本數量

replica.fetch.max.bytes=5048576

zookeeper.connect=172.16.0.41:2181,172.16.0.42:2182,172.16.0.43:2183

4.4 啓動kafka

./kafka-server-start.sh -daemon ../config/server.properties //後臺啓動運行

4.5 問題解決

[root@master ~]# /export/kafka/bin/kafka-console-producer.sh --broker-list 10.14.2.201:9092,10.14.2.202:9092,10.14.2.203:9092,10.14.2.204:9092 --topic test

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

/export/kafka/bin/kafka-console-consumer.sh --zookeeper 10.14.2.201:2181,10.14.2.202:2181,10.14.2.203:2181,10.14.2.204:2181 --topic test --from-beginning

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

解決方法:

下載slf4j-1.7.6.zip

http://www.slf4j.org/dist/slf4j-1.7.6.zip

解壓

unzip slf4j-1.7.6.zip

把slf4j-nop-1.7.6.jar 包複製到kafka libs目錄下面

cd slf4j-1.7.6

cp slf4j-nop-1.7.6.jar /export/kafka/libs/

4、KAFKA集羣驗證

5.1 建立topic

./kafka-topics.sh --create --zookeeper 172.16.0.42:2182 --replication-factor 1 --partitions 1 --topic test

5.2 查看topic

./kafka-topics.sh --list --zookeeper 172.16.0.42:2182

5.3 開啓發送者併發送消息

./kafka-console-producer.sh --broker-list 172.16.0.41:9092 --topic test

5.4 開啓消費者並接收消息

./kafka-console-consumer.sh --zookeeper 172.16.0.42:2182 --topic test --from-beginning

推薦閱讀:https://www.roncoo.com/article/index?title=%E6%9E%B6%E6%9E%84%E5%B8%88%E4%B9%8B%E8%B7%AF

相關文章
相關標籤/搜索