1、理論介紹
(一)相關資料
一、官方資料,很是詳細:
http://kafka.apache.org/documentation.html#quickstart
二、有一篇翻譯版,基本一致,有些細節不一樣,建議入門時先讀此文,再讀官方文檔。若自認英語很強,請忽視:
http://www.linuxidc.com/Linux/2014-07/104470.htm
三、還有一文也能夠:
http://www.sxt.cn/info-2871-u-324.html
其主要內容來源於如下三篇文章:
日誌:每一個軟件工程師都應該知道的有關實時數據的統一律念 —— 這篇比較抽象,高屋建瓴,理論先行
Building LinkedIn’s Real-time Activity Data Pipeline —— 實踐層的論文,把作事情的來龍去脈都寫明白了
分佈式發佈訂閱消息系統 Kafka 架構設計 —— 落地設計
(二)kafka是什麼?
一、Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.
Kafka是一個 分佈式的、可分區的、可複製的消息系統。它提供了普通消息系統的功能,但具備本身獨特的設計。
二、能夠簡單的理解爲:kafka是一個日誌集羣,各類各樣的服務器將它們自身的日誌發送到集羣中進行統一彙總和存儲,而後其它機器從集羣中拉取消息進行分析處理,如ELT、數據挖掘等。
三、kafka提供了JAVA API,同時對多種語言都提供了支持。
(三)基本的架構
首先讓咱們看幾個基本的消息系統術語:
Kafka將消息以topic爲單位進行概括。
將向Kafka topic發佈消息的程序稱爲producers.
將預訂topics並消費消息的程序稱爲consumer.
Kafka以集羣的方式運行,能夠由一個或多個服務組成,每一個服務叫作一個broker.
producers經過網絡將消息發送到Kafka集羣,集羣向consumers提供消息,以下圖所示:
(四)分區與副本
一、一個topic是對一組消息的概括。對每一個topic,Kafka 對它的日誌進行了分區,以下圖所示:
二、通常而言,一個topic會有多個分區,每一個分區會有多個副本。
分區是分了將一個topic分到多個地方存儲,提升並行處理的能力。副本是爲了容錯,保證數據不丟失。
三、對於每個分區,都會選取一個leader,這個分區的全部讀取都在這個leader中進行,而其它副本會同步leader中的數據,且只作備份。
即leader只是針對一個分區而言,而非整個集羣。一個服務器對於某個分區是leader,對於其它分區多是follower。
四、 Producer將消息發佈到它指定的topic中,並負責決定發佈到哪一個分區。一般簡單的由負載均衡機制隨機選擇分區,但也能夠經過特定的分區函數選擇分區。使用的更多的是第二種。
五、發佈消息一般有兩種模式:隊列模式(queuing)和發佈-訂閱模式(publish-subscribe)。隊列模式中,consumers能夠同時從服務端讀取消息,每一個消息只被其中一個consumer讀到;發佈-訂閱模式中消息被廣播到全部的consumer中。
Consumers能夠加入一個consumer 組,共同競爭一個topic,topic中的消息將被分發到組中的一個成員中。同一組中的consumer能夠在不一樣的程序中,也能夠在不一樣的機器上。若是全部的consumer都在一個組中,這就成爲了傳統的隊列模式,在各consumer中實現負載均衡。
若是全部的consumer都不在不一樣的組中,這就成爲了發佈-訂閱模式,全部的消息都被分發到全部的consumer中。
更常見的是,每一個topic都有若干數量的consumer組,每一個組都是一個邏輯上的「訂閱者」,爲了容錯和更好的穩定性,每一個組由若干consumer組成。這其實就是一個發佈-訂閱模式,只不過訂閱者是個組而不是單個consumer。
由兩個機器組成的集羣擁有4個分區 (P0-P3) 2個consumer組. A組有兩個consumerB組有4個
相比傳統的消息系統,Kafka能夠很好的保證有序性。
傳統的隊列在服務器上保存有序的消息,若是多個consumers同時從這個服務器消費消息,服務器就會以消息存儲的順序向consumer分 發消息。雖然服務器按順序發佈消息,可是消息是被異步的分發到各consumer上,因此當消息到達時可能已經失去了原來的順序,這意味着併發消費將致使 順序錯亂。爲了不故障,這樣的消息系統一般使用「專用consumer」的概念,其實就是隻容許一個消費者消費消息,固然這就意味着失去了併發性。
在這方面Kafka作的更好,經過分區的概念,Kafka能夠在多個consumer組併發的狀況下提供較好的有序性和負載均衡。將每一個分區分 只分發給一個consumer組,這樣一個分區就只被這個組的一個consumer消費,就能夠順序的消費這個分區的消息。由於有多個分區,依然能夠在多 個consumer組之間進行負載均衡。注意consumer組的數量不能多於分區的數量,也就是有多少分區就容許多少併發消費。
Kafka只能保證一個分區以內消息的有序性,在不一樣的分區之間是不能夠的,這已經能夠知足大部分應用的需求。若是須要topic中全部消息的有序性,那就只能讓這個topic只有一個分區,固然也就只有一個consumer組消費它。
2、安裝部署
(一)單機版安裝
Step 1: 下載Kafka
下載最新的版本並解壓.
$ wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
$ tar -zxvf kafka_2.10-0.8.2.1.tgz
Step 2: 啓動服務
Kafka用到了Zookeeper,全部首先啓動Zookper,下面簡單的啓用一個單實例的Zookkeeper服務。能夠在命令的結尾加個&符號,這樣就能夠啓動後離開控制檯。
> bin/zookeeper-server-start.sh config/zookeeper.properties &
...
如今啓動Kafka:
> bin/kafka-server-start.sh config/server.properties
Step 3: 建立 topic
建立一個叫作「test」的topic,它只有一個分區,一個副本。
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
[2015-06-04 13:17:13,943] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
Created topic "test".
能夠經過list命令查看建立的topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
除了手動建立topic,還能夠配置broker讓它自動建立topic.
Step 4:發送消息.
Kafka 使用一個簡單的命令行producer,從文件中或者從標準輸入中讀取消息併發送到服務端。默認的每條命令將發送一條消息。
運行producer並在控制檯中輸一些消息,這些消息將被髮送到服務端:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a messageThis is another message
ctrl+c能夠退出發送。
默認狀況下,日誌數據會被放置到/tmp/kafka-logs中,每一個分區一個目錄
Step 5: 啓動consumer
Kafka also has a command line consumer that will dump out messages to standard output.
Kafka也有一個命令行consumer能夠讀取消息並輸出到標準輸出:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
你在一個終端中運行consumer命令行,另外一個終端中運行producer命令行,就能夠在一個終端輸入消息,另外一個終端讀取消息。
這兩個命令都有本身的可選參數,能夠在運行的時候不加任何參數能夠看到幫助信息。
(二)集羣安裝
注意,必須先搭建zookeeper集羣,搭建方法請見????
一、使用3臺機器搭建Kafka集羣:
192.168.169.92 gdc-dn01-test
192.168.169.93 gdc-dn02-test
192.168.169.94 gdc-dn03-test
二、在安裝Kafka集羣以前,這裏沒有使用Kafka自帶的Zookeeper,而是獨立安裝了一個Zookeeper集羣,也是使用這3臺機器,保證Zookeeper集羣正常運行。
三、首先,在gdc-dn01-test上準備Kafka安裝文件,執行以下命令:
cd
wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
tar xvzf kafka_2.10-0.8.2.1.tgz
mv kafka_2.10-0.8.2.1 kafka
四、修改配置文件kafka/config/server.properties,修改以下內容:
broker.id=0
zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka
這裏須要說明的是,默認Kafka會使用ZooKeeper默認的/路徑,這樣有關Kafka的ZooKeeper配置就會散落在根路徑下面,若是 你有其餘的應用也在使用ZooKeeper集羣,查看ZooKeeper中數據可能會不直觀,因此強烈建議指定一個chroot路徑,直接在 zookeeper.connect配置項中指定:
zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka
並且,須要手動在ZooKeeper中建立路徑/kafka,使用以下命令鏈接到任意一臺ZooKeeper服務器:
cd ~/zookeeper
bin/zkCli.sh
在ZooKeeper執行以下命令建立chroot路徑:
create /kafka ''
這樣,每次鏈接Kafka集羣的時候(使用--zookeeper選項),也必須使用帶chroot路徑的鏈接字符串,後面會看到。
五、而後,將配置好的安裝文件同步到其餘的dn0二、dn03節點上:
scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.92:/home/hadoop
scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.93:/home/hadoop
六、最後,在dn0二、dn03節點上配置修改配置文件kafka/config/server.properties內容以下所示:
broker.id=1 # 在dn02修改
broker.id=2 # 在dn03修改
由於Kafka集羣須要保證各個Broker的id在整個集羣中必須惟一,須要調整這個配置項的值(若是在單機上,能夠經過創建多個Broker進程來模擬分佈式的Kafka集羣,也須要Broker的id惟一,還須要修改一些配置目錄的信息)。
七、在集羣中的dn0一、dn0二、dn03這三個節點上分別啓動Kafka,分別執行以下命令:
bin/kafka-server-start.sh config/server.properties &
能夠經過查看日誌,或者檢查進程狀態,保證Kafka集羣啓動成功。
八、建立一個名稱爲my-replicated-topic5的Topic,5個分區,而且複製因子爲3,執行以下命令:
bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
九、查看建立的Topic,執行以下命令:
bin/kafka-topics.sh --describe --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --topic my-replicated-topic5
結果信息以下所示:
Topic:my-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs:
Topic: my-replicated-topic5 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: my-replicated-topic5 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: my-replicated-topic5 Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: my-replicated-topic5 Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: my-replicated-topic5 Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
上面Leader、Replicas、Isr的含義以下:
1 Partition: 分區
2 Leader : 負責讀寫指定分區的節點
3 Replicas : 複製該分區log的節點列表
4 Isr : "in-sync" replicas,當前活躍的副本列表(是一個子集),而且可能成爲Leader
咱們能夠經過Kafka自帶的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh腳本,來驗證演示若是發佈消息、消費消息。
十一、在一個終端,啓動Producer,並向咱們上面建立的名稱爲my-replicated-topic5的Topic中生產消息,執行以下腳本:
bin/kafka-console-producer.sh --broker-list 192.168.169.92:9092, 192.168.169.93:9092, 192.168.169.94:9092 --topic my-replicated-topic5
十二、在另外一個終端,啓動Consumer,並訂閱咱們上面建立的名稱爲my-replicated-topic5的Topic中生產的消息,執行以下腳本:
bin/kafka-console-consumer.sh --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --from-beginning --topic my-replicated-topic5
能夠在Producer終端上輸入字符串消息行,就能夠在Consumer終端上看到消費者消費的消息內容。
也能夠參考Kafka的Producer和Consumer的Java API,經過API編碼的方式來實現消息生產和消費的處理邏輯。
3、配置文件
所有配置參數請 見http://kafka.apache.org/documentation.html#consumerconfigs
及http://blog.csdn.net/jinhong_lu/article/details/46518613
(一)重要配置參數
0、JVM配置 在bin/kafka-server-start.sh中添加如下內容:
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g -XX:PermSize=48m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
一、broker.id=0 整數,建議根據ip區分,用於區分broker,確保每臺機器不一樣
二、log.dirs=/home/data/kafka kafka用於放置消息的目錄,默認爲/tmp/kafka-logs
三、zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka zk用於放置kafka信息的地方
四、num.partitions=1 建立topic時,默認的分區數
五、num.network.threads=10 broker用於處理網絡請求的線程數,如不配置默認爲3
六、zookeeper.connection.timeout.ms=6000
七、message.max.bytes=1000000000
replica.fetch.max.bytes=1073741824
一條消息的最大字節數,說明以下:
kafka中出現如下異常:
[2015-06-09 17:03:05,094] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 616 from client kafka-client on partition [test3,0] (kafka.server.KafkaApis)
kafka.common.MessageSizeTooLargeException: Message size is 2211366 bytes which exceeds the maximum configured message size of 1000012.
緣由是集羣默認每次只能接受約1M的消息,若是客戶端一次發送的消息大於這個數值則會致使異常。
在server.properties中添加如下參數
message.max.bytes=1000000000
replica.fetch.max.bytes=1073741824
同時在consumer.properties中添加如下參數:
fetch.message.max.bytes=1073741824
而後重啓kafka進程便可,如今每次最大可接收100M的消息。
八、delete.topic.enable=true 默認爲false,即delete topic時只是marked for deletion,但並不會真正刪除topic。
九、關於日誌的保存時間或量:
(1)log.retention.hours=24 消息被刪除前保存多少小時,默認1周168小時
(2)log.retention.bytes 默認爲-1,即不限制大小。注意此外的大小是指一個topic的一個分區的最大字節數。
當超出上述2個限制的任何一個時,日誌均會被刪除。
十、同步發送仍是異步發送,異步吞吐量較大,但可能引入錯誤,默認爲sync
producer.type=sync|async
This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are (1) async for asynchronous send and (2) sync for synchronous send. By setting the producer to async we allow batching together of requests (which is great for throughput) but open the possibility of a failure of the client machine dropping unsent data.
十一、batch.size 默認值爲16384
在async模式下,producer緩存多少個消息後再一塊兒發送
十二、compression.type 默認值爲none,可選gzip snappy
The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are none, gzip, or snappy. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).html
1三、default.replication.factor 消息副本的數量,默認爲1,即沒有副本java
如下配置說明來自網上轉載:node
每一個kafka broker中配置文件server.properties默認必須配置的屬性以下:轉載請註明來自:http://blog.csdn.net/lizhitao/article/details/25667831linux
參數apache |
說明(解釋)緩存 |
broker.id =0服務器 |
每個broker在集羣中的惟一表示,要求是正數。當該服務器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的消息狀況網絡 |
log.dirs=/data/kafka-logssession |
kafka數據的存放地址,多個地址的話用逗號分割,多個目錄分佈在不一樣磁盤上能夠提升讀寫性能 /data/kafka-logs-1,/data/kafka-logs-2架構 |
port =9092 |
broker server服務端口 |
message.max.bytes =6525000 |
表示消息體的最大大小,單位是字節 |
num.network.threads =4 |
broker處理消息的最大線程數,通常狀況下數量爲cpu核數 |
num.io.threads =8 |
broker處理磁盤IO的線程數,數值爲cpu核數2倍 |
background.threads =4 |
一些後臺任務處理的線程數,例如過時消息文件的刪除等,通常狀況下不須要去作修改 |
queued.max.requests =500 |
等待IO線程處理的請求隊列最大數,如果等待IO的請求超過這個數值,那麼會中止接受外部消息,應該是一種自我保護機制。 |
host.name |
broker的主機地址,如果設置了,那麼會綁定到這個地址上,如果沒有,會綁定到全部的接口上,並將其中之一發送到ZK,通常不設置 |
socket.send.buffer.bytes=100*1024 |
socket的發送緩衝區,socket的調優參數SO_SNDBUFF |
socket.receive.buffer.bytes =100*1024 |
socket的接受緩衝區,socket的調優參數SO_RCVBUFF |
socket.request.max.bytes =100*1024*1024 |
socket請求的最大數值,防止serverOOM,message.max.bytes必然要小於socket.request.max.bytes,會被topic建立時的指定參數覆蓋 |
log.segment.bytes =1024*1024*1024 |
topic的分區是以一堆segment文件存儲的,這個控制每一個segment的大小,會被topic建立時的指定參數覆蓋 |
log.roll.hours =24*7 |
這個參數會在日誌segment沒有達到log.segment.bytes設置的大小,也會強制新建一個segment會被 topic建立時的指定參數覆蓋 |
log.cleanup.policy = delete |
日誌清理策略選擇有:delete和compact主要針對過時數據的處理,或是日誌文件達到限制的額度,會被 topic建立時的指定參數覆蓋 |
log.retention.minutes=300 或 log.retention.hours=24 |
數據文件保留多長時間, 存儲的最大時間超過這個時間會根據log.cleanup.policy設置數據清除策略 log.retention.bytes和log.retention.minutes或log.retention.hours任意一個達到要求,都會執行刪除
有2刪除數據文件方式: 按照文件大小刪除:log.retention.bytes 按照2中不一樣時間粒度刪除:分別爲分鐘,小時 |
log.retention.bytes=-1 |
topic每一個分區的最大文件大小,一個topic的大小限制 =分區數*log.retention.bytes。-1沒有大小限log.retention.bytes和log.retention.minutes任意一個達到要求,都會執行刪除,會被topic建立時的指定參數覆蓋 |
log.retention.check.interval.ms=5minutes |
文件大小檢查的週期時間,是否處罰log.cleanup.policy中設置的策略 |
log.cleaner.enable=false |
是否開啓日誌清理 |
log.cleaner.threads = 2 |
日誌清理運行的線程數 |
log.cleaner.io.max.bytes.per.second=None |
日誌清理時候處理的最大大小 |
log.cleaner.dedupe.buffer.size=500*1024*1024 |
日誌清理去重時候的緩存空間,在空間容許的狀況下,越大越好 |
log.cleaner.io.buffer.size=512*1024 |
日誌清理時候用到的IO塊大小通常不須要修改 |
log.cleaner.io.buffer.load.factor =0.9 |
日誌清理中hash表的擴大因子通常不須要修改 |
log.cleaner.backoff.ms =15000 |
檢查是否處罰日誌清理的間隔 |
log.cleaner.min.cleanable.ratio=0.5 |
日誌清理的頻率控制,越大意味着更高效的清理,同時會存在一些空間上的浪費,會被topic建立時的指定參數覆蓋 |
log.cleaner.delete.retention.ms =1day |
對於壓縮的日誌保留的最長時間,也是客戶端消費消息的最長時間,同log.retention.minutes的區別在於一個控制未壓縮數據,一個控制壓縮後的數據。會被topic建立時的指定參數覆蓋 |
log.index.size.max.bytes =10*1024*1024 |
對於segment日誌的索引文件大小限制,會被topic建立時的指定參數覆蓋 |
log.index.interval.bytes =4096 |
當執行一個fetch操做後,須要必定的空間來掃描最近的offset大小,設置越大,表明掃描速度越快,可是也更好內存,通常狀況下不須要搭理這個參數 |
log.flush.interval.messages=None 例如log.flush.interval.messages=1000 表示每當消息記錄數達到1000時flush一次數據到磁盤 |
log文件」sync」到磁盤以前累積的消息條數,由於磁盤IO操做是一個慢操做,但又是一個」數據可靠性"的必要手段,因此此參數的設置,須要在"數據可靠性"與"性能"之間作必要的權衡.若是此值過大,將會致使每次"fsync"的時間較長(IO阻塞),若是此值太小,將會致使"fsync"的次數較多,這也意味着總體的client請求有必定的延遲.物理server故障,將會致使沒有fsync的消息丟失. |
log.flush.scheduler.interval.ms =3000 |
檢查是否須要固化到硬盤的時間間隔 |
log.flush.interval.ms = None 例如:log.flush.interval.ms=1000 表示每間隔1000毫秒flush一次數據到磁盤 |
僅僅經過interval來控制消息的磁盤寫入時機,是不足的.此參數用於控制"fsync"的時間間隔,若是消息量始終沒有達到閥值,可是離上一次磁盤同步的時間間隔達到閥值,也將觸發. |
log.delete.delay.ms =60000 |
文件在索引中清除後保留的時間通常不須要去修改 |
log.flush.offset.checkpoint.interval.ms =60000 |
控制上次固化硬盤的時間點,以便於數據恢復通常不須要去修改 |
auto.create.topics.enable =true |
是否容許自動建立topic,如果false,就須要經過命令建立topic |
default.replication.factor =1 |
是否容許自動建立topic,如果false,就須要經過命令建立topic |
num.partitions =1 |
每一個topic的分區個數,如果在topic建立時候沒有指定的話會被topic建立時的指定參數覆蓋 |
|
|
如下是kafka中Leader,replicas配置參數 |
|
controller.socket.timeout.ms =30000 |
partition leader與replicas之間通信時,socket的超時時間 |
controller.message.queue.size=10 |
partition leader與replicas數據同步時,消息的隊列尺寸 |
replica.lag.time.max.ms =10000 |
replicas響應partition leader的最長等待時間,如果超過這個時間,就將replicas列入ISR(in-sync replicas),並認爲它是死的,不會再加入管理中 |
replica.lag.max.messages =4000 |
若是follower落後與leader太多,將會認爲此follower[或者說partition relicas]已經失效 ##一般,在follower與leader通信時,由於網絡延遲或者連接斷開,總會致使replicas中消息同步滯後 ##若是消息以後太多,leader將認爲此follower網絡延遲較大或者消息吞吐能力有限,將會把此replicas遷移 ##到其餘follower中. ##在broker數量較少,或者網絡不足的環境中,建議提升此值. |
replica.socket.timeout.ms=30*1000 |
follower與leader之間的socket超時時間 |
replica.socket.receive.buffer.bytes=64*1024 |
leader複製時候的socket緩存大小 |
replica.fetch.max.bytes =1024*1024 |
replicas每次獲取數據的最大大小 |
replica.fetch.wait.max.ms =500 |
replicas同leader之間通訊的最大等待時間,失敗了會重試 |
replica.fetch.min.bytes =1 |
fetch的最小數據尺寸,若是leader中還沒有同步的數據不足此值,將會阻塞,直到知足條件 |
num.replica.fetchers=1 |
leader進行復制的線程數,增大這個數值會增長follower的IO |
replica.high.watermark.checkpoint.interval.ms =5000 |
每一個replica檢查是否將最高水位進行固化的頻率 |
controlled.shutdown.enable =false |
是否容許控制器關閉broker ,如果設置爲true,會關閉全部在這個broker上的leader,並轉移到其餘broker |
controlled.shutdown.max.retries =3 |
控制器關閉的嘗試次數 |
controlled.shutdown.retry.backoff.ms =5000 |
每次關閉嘗試的時間間隔 |
leader.imbalance.per.broker.percentage =10 |
leader的不平衡比例,如果超過這個數值,會對分區進行從新的平衡 |
leader.imbalance.check.interval.seconds =300 |
檢查leader是否不平衡的時間間隔 |
offset.metadata.max.bytes |
客戶端保留offset信息的最大空間大小 |
kafka中zookeeper參數配置 |
|
zookeeper.connect = localhost:2181 |
zookeeper集羣的地址,能夠是多個,多個之間用逗號分割hostname1:port1,hostname2:port2,hostname3:port3 |
zookeeper.session.timeout.ms=6000 |
ZooKeeper的最大超時時間,就是心跳的間隔,如果沒有反映,那麼認爲已經死了,不易過大 |
zookeeper.connection.timeout.ms =6000 |
ZooKeeper的鏈接超時時間 |
zookeeper.sync.time.ms =2000 |
ZooKeeper集羣中leader和follower之間的同步實際那 |
4、錯誤處理
一、配置kafka時,若是使用zookeeper create /kafka建立了節點,kafka與storm集成時new ZkHosts(zks) 須要改爲 new ZkHosts(zks,」/kafka/brokers」),否則會報java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/my-replicated-topic5/partitions。
storm-kafka插件默認kafka的 zk_path以下:
public class ZkHosts implements BrokerHosts {
private static final String DEFAULT_ZK_PATH = 「/brokers」;
二、若是出現如下問題,表明偏移量出錯,建議從新開一個topic
ERROR [KafkaApi-3] Error when processing fetch request for partition [xxxxx,1] offset 112394 from consumer with correlation id 0 (kafka.server.KafkaApis)
kafka.common.OffsetOutOfRangeException: Request for offset 112394 but we only have log segments in the range 0 to 665.
三、當沒有某個topic,或者是某個topic的node放置不在默認位置時,會有如下異常:
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kafka/brokers/topics/mytest/partitions at storm.kafka.Dynam
四、kafka中出現如下異常:
[2015-06-09 17:03:05,094] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 616 from client kafka-client on partition [test3,0] (kafka.server.KafkaApis)
kafka.common.MessageSizeTooLargeException: Message size is 2211366 bytes which exceeds the maximum configured message size of 1000012.
緣由是集羣默認每次只能接受約1M的消息,若是客戶端一次發送的消息大於這個數值則會致使異常。
在server.properties中添加如下參數
message.max.bytes=1000000000
replica.fetch.max.bytes=1073741824
同時在consumer.properties中添加如下參數:
fetch.message.max.bytes=1073741824
而後重啓kafka進程便可,如今每次最大可接收100M的消息。
五、open too many files
kafka出現異常,日誌提示open too many file
查找文件打開數量
lsof -p 30353 | wc
若是在1000以上,通常都是不正常,走過65535就會出錯。
緣由打開了太多producer,沒關閉,調用producer.close()便可。
5、經常使用操做
一、啓動集羣
bin/kafka-server-start.sh config/server.properties &
二、建立topic
bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic test_topic
三、查看topic信息
bin/kafka-topics.sh --describe --zookeeper bin/kafka-console-producer.sh --broker-list 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --topic test_topic
四、啓動一個console producer,用於在console中模擬輸入消息
bin/kafka-console-producer.sh --broker-list 192.168.169.92:9092, 192.168.169.93:9092, 192.168.169.94:9092 --topic test_topic
五、啓動一個console consumer,用於模擬接收消息,並在console中輸出
bin/kafka-console-consumer.sh --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --from-beginning --topic test_topic
六、刪除一個topic
bin/kafka-topics.sh --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --delete --topic test2
七、列出全部topic
bin/kafka-topics.sh --zookeeper localhost/kafka --list
6、API
7、zookeeper中的內容
默認狀況,kafka在zk的/brokers目錄下記錄topic相關的信息,但若是在建立topic時,指定了路徑,則放置到固定的路徑中,如:
bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic test_topic
建立的topic,其相關信息會放置到/kafka/brokers中,這個目錄中主要包括2個子目錄:ids 和 topics
一、ids:記錄這個kafka集羣中有多少個broker
如:
ls /kafka/brokers/ids/ 3 2 5 4
這個集羣有4個節點,節點id分別爲2,3,4,5。 咱們看一下內容
[zk: localhost:2181(CONNECTED) 27] get /kafka/brokers/ids/2 {"jmx_port":-1,"timestamp":"1435833841290","host":"kafka02-log.i.nease.net","version":1,"port":9092} cZxid = 0x1000e8a68 ctime = Thu Jul 02 18:44:01 HKT 2015 mZxid = 0x1000e8a68 mtime = Thu Jul 02 18:44:01 HKT 2015 pZxid = 0x1000e8a68 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x44e440d0bdf06eb dataLength = 104 numChildren = 0
記錄着這個節點的一些基本狀況。
二、topics
先看一下有哪些內容:
[zk: localhost:2181(CONNECTED) 29] ls /kafka/brokers/topics/test30/partitions [3, 2, 1, 0, 4]
[zk: localhost:2181(CONNECTED) 30] ls /kafka/brokers/topics/test30/partitions/0 [state]
[zk: localhost:2181(CONNECTED) 1] get /kafka/brokers/topics/test30/partitions/0/state {"controller_epoch":4,"leader":5,"version":1,"leader_epoch":2,"isr":[5]} cZxid = 0x100017c5e ctime = Wed Jul 01 14:54:24 HKT 2015 mZxid = 0x1000e8a84 mtime = Thu Jul 02 18:44:01 HKT 2015 pZxid = 0x100017c5e cversion = 0 dataVersion = 2 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 72 numChildren = 0
能夠看某個分區的leader是哪一個,從而讀取kafka消息時,能夠從這個leader中讀取數據。