kafka集羣搭建,原理詳細介紹、性能優化處理

1、搭建時的環境:

kafka_2.10-0.9.0.1.tgzhtml

zookeeper-3.4.6.tar.gzjava

jdk-7u25-linux-x64.gzlinux

CentOS 6.5git

2、搭建zookeeper集羣啓動起來

你們能夠參考我以前的博客github

zookeeper集羣搭建實戰:http://www.javashuo.com/article/p-qwhbsmqq-m.htmlweb

3、搭建kafka集羣

1:下載解壓

下載 kafka_2.10-0.9.0.1.tgz  ,官網下載Kafka地址http://kafka.apache.org/downloads.htmlapache

(注意這裏最好下載scala2.10版本的kafka,由於scala2.10版本的兼容性比較好和2.11版本差異太大)vim

解壓命令:緩存

 tar -zxvf kafka_2.10-0.9.0.1.tgz性能優化

2:修改配置文件

cd /usr/local/kafka_2.10-0.9.0.1/config/

vim server.properties

配置broker.id從0開始,後面其餘節點配置1,2,3,4等等

事先啓動zookeeper集羣,這裏配置zookeeper集羣的地址

zookeeper.connect=wbx1:2181,wbx2:2181,wbx3:2181

修改見以下圖:

這裏注意其它機器修改

 

3:啓動kafka

修改完配置之後在每一個節點啓動kafka

bin目錄下啓動

./kafka-server-start.sh -daemon ../config/server.properties

這時 每臺機器打上 jps命令 均可以看到kafka進程

4:新建一個topic,準備測試

a:新建一個topic

topic名爲test ,一個擁有2個副本5個分區的topic:(副本數量不能超過broker的個數)

bin目錄 運行下面指令

./kafka-topics.sh -zookeeper wbx1:2181,wbx2:2181,wbx3:2181 -topic test -replication-factor 2 -partitions 5 -create

b:查看每一個節點的信息

如今咱們搭建了一個集羣,怎麼知道每一個節點的信息呢?運行「"describe topics」命令就能夠了:

./kafka-topics.sh --describe --zookeeper wbx1:2181 --topic test

下面解釋一下這些輸出。第一行是對全部分區的一個描述,而後每一個分區都會對應一行。

  • Leader:負責處理消息的讀和寫,leader是從全部節點中隨機選擇的.
  • Replicas:列出了全部的副本節點,無論節點是否在服務中.
  • Isr:是正在服務中的節點.

c:查看當前的topic

bin目錄下執行下面命令

./kafka-topics.sh -zookeeper wbx1:2181,wbx2:2181,wbx3:2181 -list

5:測試 向topic發送消息

a:在provider輸入信息

在一個節點建立一個provider  這裏在wbx3建立

./kafka-console-producer.sh --broker-list wbx1:9092,wbx2:9092,wbx3:9092 --topic test

b:在consumer能接收到信息

在另一個節點建立一個consumer  這裏在wbx2裏建立  wbx1也能夠建立 也能夠收到信息

./kafka-console-consumer.sh --zookeeper wbx:2181,wbx2:2181,wbx3:2181 --from-beginning -topic test

6:kafka監控工具

詳情可見博客:http://www.cnblogs.com/Leo_wl/p/4564699.html

a:下載

在安裝KafkaOffsetMonitor管理平臺時,咱們須要先下載其安裝包,其資源能夠在Github上找到

源碼地址:

https://github.com/quantifind/KafkaOffsetMonitor

jar包地址:

https://github.com/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.1

b:編寫啓動腳本

新建一個啓動腳本,執行以下命令:

touch start.sh

賦予權限 命令:

chmod +x start.sh

編輯腳本:

vim start.sh

java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
 com.quantifind.kafka.offsetapp.OffsetGetterWeb \
 --zk wbx1:2181,wbx2:2181,wbx3:2181 \
 --port 8089 \
 --refresh 10.seconds \
 --retain 1.days

c:啓動kafkaMonitor

./start.sh

d:訪問web

http://192.168.102.136:8089/

4、kafka原理詳解

1:  基本概念

節點服務器(Broker):或緩存代理,Kafka集羣包含一個或多個服務器,這種服務器被稱爲broker。

主題(Topic):一個主題相似新聞中的體育、娛樂等分類概念,實際工程中一個業務一個主題。

分區(Partition):一個topic中的消息數據按照多個分區組織,分區是kafka消息隊列的最小單位,一個分區能夠看作是一個FIFO的隊列。將topic下的message存儲到不一樣的partition下,目的是爲了提升並行性.

偏移量(offset):kafka集羣會將每一個topic進行分區,每一個分區都是一個排序且不可改變的隊列,新的消息會進入隊尾並分配一個惟一ID,官方稱之爲偏移量(offset,即元信息在topic中的位置)。

Leader:負責此partition的讀寫操做,每一個broker都有可能成爲某partition的leader

Replicas:副本,即此partition在那幾個broker上有備份,無論broker是否存活

Isr:存活的replicas

做爲Leader的server承載了所有的請求壓力,所以從集羣的總體考慮,有多少個partitions就意味着有多少個"leader",kafka會將"leader"均衡的分散在每一個實例上,來確保總體的性能穩定。

2:工做原理

A 每一個partition會建立3個備份replica,並分配到broker集羣中; --replication-factor3

B 用zookeeper來管理,consumer、producer、broker的活動狀態;

C分配的每一個備份replica的id和broker的id保持一致;

D 對每一個partition,會選擇一個broker做爲集羣的leader。

3:kafka特性

  • 高吞吐量是其核心設計之一。據瞭解,Kafka每秒能夠生產約25萬消息(50 MB),每秒處理55萬消息(110 MB)。
  • 數據磁盤持久化:消息不在內存中cache,直接寫入到磁盤,充分利用磁盤的順序讀寫性能。
  • zero-copy:減小IO操做步驟。
  • 支持數據批量發送和拉取。
  • 支持數據壓縮。
  • Topic劃分爲多個partition,提升並行處理能力。

4:使用場景

A 站點用戶活動追蹤:頁面瀏覽,搜索,點擊;

B 用戶/管理員網站操做的監控;

C 日誌處理;

5:分佈式隊列的對比

 

ActiveMQ

RabbitMQ

Kafka

所屬社區/公司

Apache

MozillaPublicLicense

Apache/LinkedIn

開發語言

Java

Erlang

Scala

支持的協議

AMQP、OpenWire、STOMP、REST、XMPP

AMQP

仿AMQP

事務

支持

不支持

不支持

集羣

支持

支持

支持

負載均衡

支持

支持

支持

動態擴容

不支持

不支持

支持

AMQP協議

Advanced Message Queuing Protocol (高級消息隊列協議)

The Advanced Message Queuing Protocol (AMQP):是一個標準開放的應用層的消息中間件(Message Oriented Middleware)協議。AMQP定義了經過網絡發送的字節流的數據格式。所以兼容性很是好,任何實現AMQP協議的程序均可以和與AMQP協議兼容的其餘程序交互,能夠很容易作到跨語言,跨平臺。

 

Kafka與傳統消息系統相比,有如下不一樣:

  • 它被設計爲一個分佈式系統,易於向外擴展;
  • 它同時爲發佈和訂閱提供高吞吐量;
  • 它支持多訂閱者,當失敗時能自動平衡消費者;
  • 它將消息持久化到磁盤,所以可用於批量消費,例如ETL,以及實時應用程序。

五:性能優化

配置優化都是修改server.properties文件中參數值。

1網絡和io操做線程配置優化

# broker處理消息的最大線程數

num.network.threads=xxx

# broker處理磁盤IO的線程數

num.io.threads=xxx

建議配置:

通常num.network.threads主要處理網絡io,讀寫緩衝區數據,基本沒有io等待,配置線程數量爲cpu核數加1.

num.io.threads主要進行磁盤io操做,高峯期可能有些io等待,所以配置須要大些。配置線程數量爲cpu核數2倍,最大不超過3倍.

2log數據文件刷新策略

爲了大幅度提升producer寫入吞吐量,須要按期批量寫文件。

建議配置:

# 每當producer寫入10000條消息時,刷數據到磁盤

log.flush.interval.messages=10000

# 每間隔1秒鐘時間,刷數據到磁盤

log.flush.interval.ms=1000

3日誌保留策略配置

當kafkaserver被寫入海量消息後,會生成不少數據文件,且佔用大量磁盤空間,若是不及時清理,可能磁盤空間不夠用,

kafka默認是保留7天。

建議配置:

# 保留三天,也能夠更短

log.retention.hours=72

# 段文件配置1GB,有利於快速回收磁盤空間,重啓kafka加載也會加快(若是文件太小,則文件數量比較多,

# kafka啓動時是單線程掃描目錄(log.dir)下全部數據文件)

log.segment.bytes=1073741824

 

在kafka的核心思路中,不須要在內存裏緩存數據,由於操做系統的文件緩存已經足夠完善和強大,只要不作隨機寫,順序讀寫的性能是很是高效的。kafka的數據只會順序append,數據的刪除策略是累積到必定程度或者超過必定時間再刪除。kafka另外一個獨特的地方是將消費者信息保存在客戶端而不是MQ服務器,這樣服務器就不用記錄消息的投遞過程,每一個客戶端都本身知道本身下一次應該從什麼地方什麼位置讀取消息,消息的投遞過程也是採用客戶端主動pull的模型,這樣大大減輕了服務器的負擔。kafka還強調減小數據的序列化和拷貝開銷,它會將一些消息組織成MessageSet作批量存儲和發送,而且客戶端在pull數據的時候,儘可能以zero-copy的方式傳輸,利用sendfile(對應java裏的FileChannel.transferTo/transferFrom)這樣的高級IO函數來減小拷貝開銷。可見,kafka是一個精心設計,特定於某些應用的MQ系統,這種偏向特定領域的MQ系統會愈來愈多,垂直化的產品策略值的考慮。

只要磁盤沒有限制而且不出現損失,kafka能夠存儲至關長時間的消息(一週),Kafka在提升效率方面作了很大努力。Kafka的一個主要使用場景是處理網站活動日誌,吞吐量是很是大的,每一個頁面都會產生好屢次寫操做。讀方面,假設每一個消息只被消費一次,讀的量的也是很大的,Kafka也儘可能使讀的操做更輕量化。

咱們以前討論了磁盤的性能問題,線性讀寫的狀況下影響磁盤性能問題大約有兩個方面:太多的瑣碎的I/O操做和太多的字節拷貝。I/O問題發生在客戶端和服務端之間,也發生在服務端內部的持久化的操做中。

 

六:實際開發中問題處理

 

未完待續

相關文章
相關標籤/搜索