若自認英語很是強,請忽視:
http://www.linuxidc.com/Linux/2014-07/104470.htm
三、另外一文也可以:
http://www.sxt.cn/info-2871-u-324.html
其主要內容來源於下面三篇文章:
日誌:每個軟件project師都應該知道的有關實時數據的統一律念 —— 這篇比較抽象,高屋建瓴,理論先行
Building LinkedIn’s Real-time Activity Data Pipeline —— 實踐層的論文,把作事情的來龍去脈都寫明確了
分佈式公佈訂閱消息系統 Kafka 架構設計 —— 落地設計
(二)kafka是什麼?
一、Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.
Kafka是一個 分佈式的、可分區的、可複製的消息系統。它提供了普通消息系統的功能,但具備本身獨特的設計。html
二、可以簡單的理解爲:kafka是一個日誌集羣,各類各樣的server將它們自身的日誌發送到集羣中進行統一彙總和存儲。而後其餘機器從集羣中拉取消息進行分析處理,如ELT、數據挖掘等。java
三、kafka提供了JAVA API,同一時候對多種語言都提供了支持。
(三)主要的架構
首先讓咱們看幾個主要的消息系統術語:
Kafka將消息以topic爲單位進行概括。
將向Kafka topic公佈消息的程序稱爲producers.
將預訂topics並消費消息的程序稱爲consumer.
Kafka以集羣的方式執行,可以由一個或多個服務組成,每個服務叫作一個broker.
producers經過網絡將消息發送到Kafka集羣,集羣向consumers提供消息,例如如下圖所看到的:
(四)分區與副本
一、一個topic是對一組消息的概括。對每個topic,Kafka 對它的日誌進行了分區,例如如下圖所看到的:
二、通常而言,一個topic會有多個分區,每個分區會有多個副本。
分區是分了將一個topic分到多個地方存儲,提升並行處理的能力。副本是爲了容錯,保證數據不丟失。node
三、對於每一個分區。都會選取一個leader。這個分區的所有讀取都在這個leader中進行,而其餘副本會同步leader中的數據,且僅僅作備份。
即leader僅僅是針對一個分區而言,而非整個集羣。linux
一個server對於某個分區是leader,對於其餘分區多是follower。apache
四、 Producer將消息公佈到它指定的topic中,並負責決定公佈到哪一個分區。一般簡單的由負載均衡機制隨機選擇分區,但也可以經過特定的分區函數選擇分區。緩存
使用的不少其它的是另一種。網絡
五、公佈消息一般有兩種模式:隊列模式(queuing)和公佈-訂閱模式(publish-subscribe)。session
隊列模式中。consumers可以同一時候從服務端讀取消息,每個消息僅僅被當中一個consumer讀到。公佈-訂閱模式中消息被廣播到所有的consumer中。架構
Consumers可以增長一個consumer 組。共同競爭一個topic,topic中的消息將被分發到組中的一個成員中。併發
同一組中的consumer可以在不一樣的程序中,也可以在不一樣的機器上。假設所有的consumer都在一個組中,這就成爲了傳統的隊列模式。在各consumer中實現負載均衡。
假設所有的consumer都不在不一樣的組中,這就成爲了公佈-訂閱模式。所有的消息都被分發到所有的consumer中。
更常見的是。每個topic都有若干數量的consumer組,每個組都是一個邏輯上的「訂閱者」,爲了容錯和更好的穩定性,每個組由若干consumer組成。
這事實上就是一個公佈-訂閱模式,僅僅只是訂閱者是個組而不是單個consumer。
由兩個機器組成的集羣擁有4個分區 (P0-P3) 2個consumer組. A組有兩個consumerB組有4個
相比傳統的消息系統,Kafka可以很是好的保證有序性。
傳統的隊列在server上保存有序的消息。假設多個consumers同一時候從這個server消費消息,server就會以消息存儲的順序向consumer分 發消息。儘管server按順序公佈消息,但是消息是被異步的分發到各consumer上。因此當消息到達時可能已經失去了原來的順序。這意味着併發消費將致使 順序錯亂。爲了不故障,這種消息系統一般使用「專用consumer」的概念,事實上就是僅僅贊成一個消費者消費消息,固然這就意味着失去了併發性。
在這方面Kafka作的更好。經過分區的概念,Kafka可以在多個consumer組併發的狀況下提供較好的有序性和負載均衡。
將每個分區分 僅僅分發給一個consumer組。這樣一個分區就僅僅被這個組的一個consumer消費。就可以順序的消費這個分區的消息。因爲有多個分區,依舊可以在多 個consumer組之間進行負載均衡。注意consumer組的數量不能多於分區的數量,也就是有多少分區就贊成多少併發消費。
Kafka僅僅能保證一個分區以內消息的有序性,在不一樣的分區之間是不可以的,這已經可以知足大部分應用的需求。假設需要topic中所有消息的有序性。那就僅僅能讓這個topic僅僅有一個分區。固然也就僅僅有一個consumer組消費它。
2、安裝部署
(一)單機版安裝
Step 1: 下載Kafka
下載最新的版本號並解壓.
$ wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
$ tar -zxvf kafka_2.10-0.8.2.1.tgz
Step 2: 啓動服務
Kafka用到了Zookeeper,所有首先啓動Zookper,如下簡單的啓用一個單實例的Zookkeeper服務。可以在命令的結尾加個&符號,這樣就可以啓動後離開控制檯。
> bin/zookeeper-server-start.sh config/zookeeper.properties &
...
現在啓動Kafka:
> bin/kafka-server-start.sh config/server.properties
Step 3: 建立 topic
建立一個叫作「test」的topic,它僅僅有一個分區。一個副本。
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
[2015-06-04 13:17:13,943] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
Created topic "test".
可以經過list命令查看建立的topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
除了手動建立topic,還可以配置broker讓它本身主動建立topic.
Step 4:發送消息.
Kafka 使用一個簡單的命令行producer,從文件裏或者從標準輸入中讀取消息併發送到服務端。默認的每條命令將發送一條消息。
執行producer並在控制檯中輸一些消息,這些消息將被髮送到服務端:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a messageThis is another message
ctrl+c可以退出發送。
默認狀況下。日誌數據會被放置到/tmp/kafka-logs中。每個分區一個文件夾
Step 5: 啓動consumer
Kafka also has a command line consumer that will dump out messages to standard output.
Kafka也有一個命令行consumer可以讀取消息並輸出到標準輸出:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
你在一個終端中執行consumer命令行,還有一個終端中執行producer命令行。就可以在一個終端輸入消息。還有一個終端讀取消息。
這兩個命令都有本身的可選參數。可以在執行的時候不加不論什麼參數可以看到幫助信息。
(二)集羣安裝
注意,必須先搭建zookeeper集羣。搭建方法請見????
一、使用3臺機器搭建Kafka集羣:
192.168.169.92 gdc-dn01-test
192.168.169.93 gdc-dn02-test
192.168.169.94 gdc-dn03-test
二、在安裝Kafka集羣以前。這裏沒有使用Kafka自帶的Zookeeper,而是獨立安裝了一個Zookeeper集羣,也是使用這3臺機器,保證Zookeeper集羣正常執行。
三、首先,在gdc-dn01-test上準備Kafka安裝文件,運行例如如下命令:
cd
wget http://mirrors.cnnic.cn/apache/kafka/0.8.2.1/kafka_2.10-0.8.2.1.tgz
tar xvzf kafka_2.10-0.8.2.1.tgz
mv kafka_2.10-0.8.2.1 kafka
四、改動配置文件kafka/config/server.properties。改動例如如下內容:
broker.id=0
zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka
這裏需要說明的是,默認Kafka會使用ZooKeeper默認的/路徑。這樣有關Kafka的ZooKeeper配置就會散落在根路徑如下,假設 你有其它的應用也在使用ZooKeeper集羣,查看ZooKeeper中數據可能會不直觀,因此強烈建議指定一個chroot路徑,直接在 zookeeper.connect配置項中指定:
zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka
而且。需要手動在ZooKeeper中建立路徑/kafka,使用例如如下命令鏈接到隨意一臺ZooKeeperserver:
cd ~/zookeeper
bin/zkCli.sh
在ZooKeeper運行例如如下命令建立chroot路徑:
create /kafka ''
這樣。每次鏈接Kafka集羣的時候(使用--zookeeper選項),也必須使用帶chroot路徑的鏈接字符串,後面會看到。
五、而後,將配置好的安裝文件同步到其它的dn0二、dn03節點上:
scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.92:/home/hadoop
scp -r /usr/local/kafka_2.10-0.8.2.1/ 192.168.169.93:/home/hadoop
六、最後,在dn0二、dn03節點上配置改動配置文件kafka/config/server.properties內容例如如下所看到的:
broker.id=1 # 在dn02改動
broker.id=2 # 在dn03改動
因爲Kafka集羣需要保證各個Broker的id在整個集羣中必須惟一。需要調整這個配置項的值(假設在單機上,可以經過創建多個Broker進程來模擬分佈式的Kafka集羣。也需要Broker的id惟一,還需要改動一些配置文件夾的信息)。
七、在集羣中的dn0一、dn0二、dn03這三個節點上分別啓動Kafka,分別運行例如如下命令:
bin/kafka-server-start.sh config/server.properties &
可以經過查看日誌,或者檢查進程狀態。保證Kafka集羣啓動成功。
八、建立一個名稱爲my-replicated-topic5的Topic。5個分區。並且複製因子爲3,運行例如如下命令:
bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic my-replicated-topic5
九、查看建立的Topic。運行例如如下命令:
bin/kafka-topics.sh --describe --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --topic my-replicated-topic5
結果信息例如如下所看到的:
Topic:my-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs:
Topic: my-replicated-topic5 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: my-replicated-topic5 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: my-replicated-topic5 Partition: 2 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: my-replicated-topic5 Partition: 3 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: my-replicated-topic5 Partition: 4 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
上面Leader、Replicas、Isr的含義例如如下:
1 Partition: 分區
2 Leader : 負責讀寫指定分區的節點
3 Replicas : 複製該分區log的節點列表
4 Isr : "in-sync" replicas。當前活躍的副本列表(是一個子集)。並且可能成爲Leader
咱們可以經過Kafka自帶的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh腳本,來驗證演示假設公佈消息、消費消息。
十一、在一個終端,啓動Producer,並向咱們上面建立的名稱爲my-replicated-topic5的Topic中生產消息,運行例如如下腳本:
bin/kafka-console-producer.sh --broker-list 192.168.169.92:9092, 192.168.169.93:9092, 192.168.169.94:9092 --topic my-replicated-topic5
十二、在還有一個終端。啓動Consumer。並訂閱咱們上面建立的名稱爲my-replicated-topic5的Topic中生產的消息,運行例如如下腳本:
bin/kafka-console-consumer.sh --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --from-beginning --topic my-replicated-topic5
可以在Producer終端上輸入字符串消息行,就可以在Consumer終端上看到消費者消費的消息內容。
也可以參考Kafka的Producer和Consumer的Java API,經過API編碼的方式來實現消息生產和消費的處理邏輯。
3、配置文件
全部配置參數請 見http://kafka.apache.org/documentation.html#consumerconfigs
及http://blog.csdn.net/jinhong_lu/article/details/46518613
(一)重要配置參數
0、JVM配置 在bin/kafka-server-start.sh中加入下面內容:
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g -XX:PermSize=48m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35"
一、broker.id=0 整數。建議依據ip區分,用於區分broker,確保每臺機器不一樣
二、log.dirs=/home/data/kafka kafka用於放置消息的文件夾。默以爲/tmp/kafka-logs
三、zookeeper.connect=192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka zk用於放置kafka信息的地方
四、num.partitions=1 建立topic時。默認的分區數
五、num.network.threads=10 broker用於處理網絡請求的線程數。如不配置默以爲3
六、zookeeper.connection.timeout.ms=6000
七、message.max.bytes=1000000000
replica.fetch.max.bytes=1073741824
一條消息的最大字節數,說明例如如下:
kafka中出現下面異常:
[2015-06-09 17:03:05,094] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 616 from client kafka-client on partition [test3,0] (kafka.server.KafkaApis)
kafka.common.MessageSizeTooLargeException: Message size is 2211366 bytes which exceeds the maximum configured message size of 1000012.
緣由是集羣默認每次僅僅能接受約1M的消息。假設client一次發送的消息大於這個數值則會致使異常。
在server.properties中加入下面參數
message.max.bytes=1000000000
replica.fetch.max.bytes=1073741824
同一時候在consumer.properties中加入下面參數:
fetch.message.max.bytes=1073741824
而後從新啓動kafka進程就能夠,現在每次最大可接收100M的消息。
八、delete.topic.enable=true 默以爲false,即delete topic時僅僅是marked for deletion,但並不會真正刪除topic。
九、關於日誌的保存時間或量:
(1)log.retention.hours=24 消息被刪除前保存多少小時,默認1周168小時
(2)log.retention.bytes 默以爲-1,即不大小限制。
注意此外的大小是指一個topic的一個分區的最大字節數。
當超出上述2個限制的不論什麼一個時。日誌均會被刪除。
十、同步發送仍是異步發送,異步吞吐量較大,但可能引入錯誤。默以爲sync
producer.type=sync|async
This parameter specifies whether the messages are sent asynchronously in a background thread. Valid values are (1) async for asynchronous send and (2) sync for synchronous send. By setting the producer to async we allow batching together of requests (which is great for throughput) but open the possibility of a failure of the client machine dropping unsent data.
十一、batch.size 默認值爲16384
在async模式下,producer緩存多少個消息後再一塊兒發送
十二、compression.type 默認值爲none,可選gzip snappy
The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are none, gzip, or snappy. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).
1三、default.replication.factor 消息副本的數量。默以爲1。即沒有副本
下面配置說明來自網上轉載:
每個kafka broker中配置文件server.properties默認必須配置的屬性例如如下:轉載請註明來自:http://blog.csdn.net/lizhitao/article/details/25667831
參數 |
說明(解釋) |
broker.id =0 |
每一個broker在集羣中的惟一表示。要求是正數。當該server的IP地址發生改變時,broker.id沒有變化。則不會影響consumers的消息狀況 |
log.dirs=/data/kafka-logs |
kafka數據的存放地址,多個地址的話用逗號切割,多個文件夾分佈在不一樣磁盤上可以提升讀寫性能 /data/kafka-logs-1,/data/kafka-logs-2 |
port =9092 |
broker server服務port |
message.max.bytes =6525000 |
表示消息體的最大大小。單位是字節 |
num.network.threads =4 |
broker處理消息的最大線程數。普通狀況下數量爲cpu核數 |
num.io.threads =8 |
broker處理磁盤IO的線程數,數值爲cpu核數2倍 |
background.threads =4 |
一些後臺任務處理的線程數,好比過時消息文件的刪除等,普通狀況下不需要去作改動 |
queued.max.requests =500 |
等待IO線程處理的請求隊列最大數,如果等待IO的請求超過這個數值。那麼會中止接受外部消息,應該是一種自我保護機制。 |
host.name |
broker的主機地址,如果設置了,那麼會綁定到這個地址上,如果沒有。會綁定到所有的接口上,並將當中之中的一個發送到ZK,通常不設置 |
socket.send.buffer.bytes=100*1024 |
socket的發送緩衝區。socket的調優參數SO_SNDBUFF |
socket.receive.buffer.bytes =100*1024 |
socket的接受緩衝區。socket的調優參數SO_RCVBUFF |
socket.request.max.bytes =100*1024*1024 |
socket請求的最大數值,防止serverOOM,message.max.bytes一定要小於socket.request.max.bytes,會被topic建立時的指定參數覆蓋 |
log.segment.bytes =1024*1024*1024 |
topic的分區是以一堆segment文件存儲的。這個控制每個segment的大小。會被topic建立時的指定參數覆蓋 |
log.roll.hours =24*7 |
這個參數會在日誌segment沒有達到log.segment.bytes設置的大小。也會強制新建一個segment會被 topic建立時的指定參數覆蓋 |
log.cleanup.policy = delete |
日誌清理策略選擇有:delete和compact主要針對過時數據的處理,或是日誌文件達到限制的額度。會被 topic建立時的指定參數覆蓋 |
log.retention.minutes=300 或 log.retention.hours=24 |
數據文件保留多長時間, 存儲的最大時間超過這個時間會依據log.cleanup.policy設置數據清除策略 log.retention.bytes和log.retention.minutes或log.retention.hours隨意一個達到要求,都會運行刪除
有2刪除數據文件方式: 依照文件大小刪除:log.retention.bytes 依照2中不一樣一時候間粒度刪除:分別爲分鐘,小時 |
log.retention.bytes=-1 |
topic每個分區的最大文件大小,一個topic的限制大小 = 分區數*log.retention.bytes。 -1沒有大小限log.retention.bytes和log.retention.minutes隨意一個達到要求。都會運行刪除,會被topic建立時的指定參數覆蓋 |
log.retention.check.interval.ms=5minutes |
文件大小檢查的週期時間,是否處罰 log.cleanup.policy中設置的策略 |
log.cleaner.enable=false |
是否開啓日誌清理 |
log.cleaner.threads = 2 |
日誌清理執行的線程數 |
log.cleaner.io.max.bytes.per.second=None |
日誌清理時候處理的最大大小 |
log.cleaner.dedupe.buffer.size=500*1024*1024 |
日誌清理去重時候的緩存空間。在空間贊成的狀況下,越大越好 |
log.cleaner.io.buffer.size=512*1024 |
日誌清理時候用到的IO塊大小通常不需要改動 |
log.cleaner.io.buffer.load.factor =0.9 |
日誌清理中hash表的擴大因子通常不需要改動 |
log.cleaner.backoff.ms =15000 |
檢查是否處罰日誌清理的間隔 |
log.cleaner.min.cleanable.ratio=0.5 |
日誌清理的頻率控制。越大意味着更高效的清理。同一時候會存在一些空間上的浪費,會被topic建立時的指定參數覆蓋 |
log.cleaner.delete.retention.ms =1day |
對於壓縮的日誌保留的最長時間,也是client消費消息的最長時間,同log.retention.minutes的差異在於一個控制未壓縮數據,一個控制壓縮後的數據。會被topic建立時的指定參數覆蓋 |
log.index.size.max.bytes =10*1024*1024 |
對於segment日誌的索引文件限制大小,會被topic建立時的指定參數覆蓋 |
log.index.interval.bytes =4096 |
當運行一個fetch操做後,需要必定的空間來掃描近期的offset大小,設置越大,表明掃描速度越快。但是也更好內存,普通狀況下不需要搭理這個參數 |
log.flush.interval.messages=None 好比log.flush.interval.messages=1000 表示每當消息記錄數達到1000時flush一次數據到磁盤 |
log文件」sync」到磁盤以前累積的消息條數,因爲磁盤IO操做是一個慢操做,但又是一個」數據可靠性"的必要手段,因此此參數的設置,需要在"數據可靠性"與"性能"之間作必要的權衡.假設此值過大,將會致使每次"fsync"的時間較長(IO堵塞),假設此值太小,將會致使"fsync"的次數較多,這也意味着整體的client請求有必定的延遲.物理server故障,將會致使沒有fsync的消息丟失. |
log.flush.scheduler.interval.ms =3000 |
檢查是否需要固化到硬盤的時間間隔 |
log.flush.interval.ms = None 好比:log.flush.interval.ms=1000 表示每間隔1000毫秒flush一次數據到磁盤 |
只經過interval來控制消息的磁盤寫入時機,是不足的.此參數用於控制"fsync"的時間間隔,假設消息量始終沒有達到閥值,但是離上一次磁盤同步的時間間隔達到閥值,也將觸發. |
log.delete.delay.ms =60000 |
文件在索引中清除後保留的時間通常不需要去改動 |
log.flush.offset.checkpoint.interval.ms =60000 |
控制上次固化硬盤的時間點,以便於數據恢復通常不需要去改動 |
auto.create.topics.enable =true |
是否贊成本身主動建立topic,如果false,就需要經過命令建立topic |
default.replication.factor =1 |
是否贊成本身主動建立topic。如果false。就需要經過命令建立topic |
num.partitions =1 |
每個topic的分區個數,如果在topic建立時候沒有指定的話會被topic建立時的指定參數覆蓋 |
|
|
下面是kafka中Leader,replicas配置參數 |
|
controller.socket.timeout.ms =30000 |
partition leader與replicas之間通信時,socket的超時時間 |
controller.message.queue.size=10 |
partition leader與replicas數據同步時,消息的隊列尺寸 |
replica.lag.time.max.ms =10000 |
replicas響應partition leader的最長等待時間。如果超過這個時間,就將replicas列入ISR(in-sync replicas)。並以爲它是死的,不會再增長管理中 |
replica.lag.max.messages =4000 |
假設follower落後與leader太多,將會以爲此follower[或者說partition relicas]已經失效 ##一般,在follower與leader通信時,因爲網絡延遲或者連接斷開,總會致使replicas中消息同步滯後 ##假設消息以後太多,leader將以爲此follower網絡延遲較大或者消息吞吐能力有限,將會把此replicas遷移 ##到其它follower中. ##在broker數量較少,或者網絡不足的環境中,建議提升此值. |
replica.socket.timeout.ms=30*1000 |
follower與leader之間的socket超時時間 |
replica.socket.receive.buffer.bytes=64*1024 |
leader複製時候的socket緩存大小 |
replica.fetch.max.bytes =1024*1024 |
replicas每次獲取數據的最大大小 |
replica.fetch.wait.max.ms =500 |
replicas同leader之間通訊的最大等待時間,失敗了會重試 |
replica.fetch.min.bytes =1 |
fetch的最小數據尺寸,假設leader中還沒有同步的數據不足此值,將會堵塞,直到知足條件 |
num.replica.fetchers=1 |
leader進行復制的線程數,增大這個數值會添加follower的IO |
replica.high.watermark.checkpoint.interval.ms =5000 |
每個replica檢查是否將最高水位進行固化的頻率 |
controlled.shutdown.enable =false |
是否贊成控制器關閉broker ,如果設置爲true,會關閉所有在這個broker上的leader。並轉移到其它broker |
controlled.shutdown.max.retries =3 |
控制器關閉的嘗試次數 |
controlled.shutdown.retry.backoff.ms =5000 |
每次關閉嘗試的時間間隔 |
leader.imbalance.per.broker.percentage =10 |
leader的不平衡比例,如果超過這個數值,會對分區進行又一次的平衡 |
leader.imbalance.check.interval.seconds =300 |
檢查leader是否不平衡的時間間隔 |
offset.metadata.max.bytes |
client保留offset信息的最大空間大小 |
kafka中zookeeper參數配置 |
|
zookeeper.connect = localhost:2181 |
zookeeper集羣的地址,可以是多個,多個之間用逗號切割 hostname1:port1,hostname2:port2,hostname3:port3 |
zookeeper.session.timeout.ms=6000 |
ZooKeeper的最大超時時間,就是心跳的間隔,如果沒有反映,那麼以爲已經死了,不易過大 |
zookeeper.connection.timeout.ms =6000 |
ZooKeeper的鏈接超時時間 |
zookeeper.sync.time.ms =2000 |
ZooKeeper集羣中leader和follower之間的同步實際那 |
storm-kafka插件默認kafka的 zk_path例如如下:
public class ZkHosts implements BrokerHosts {
private static final String DEFAULT_ZK_PATH = 「/brokers」;
二、假設出現下面問題,表明偏移量出錯。建議又一次開一個topic
ERROR [KafkaApi-3] Error when processing fetch request for partition [xxxxx,1] offset 112394 from consumer with correlation id 0 (kafka.server.KafkaApis)
kafka.common.OffsetOutOfRangeException: Request for offset 112394 but we only have log segments in the range 0 to 665.
三、當沒有某個topic,或者是某個topic的node放置不在默認位置時,會有下面異常:
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kafka/brokers/topics/mytest/partitions at storm.kafka.Dynam
四、kafka中出現下面異常:
[2015-06-09 17:03:05,094] ERROR [KafkaApi-0] Error processing ProducerRequest with correlation id 616 from client kafka-client on partition [test3,0] (kafka.server.KafkaApis)
kafka.common.MessageSizeTooLargeException: Message size is 2211366 bytes which exceeds the maximum configured message size of 1000012.
緣由是集羣默認每次僅僅能接受約1M的消息。假設client一次發送的消息大於這個數值則會致使異常。
在server.properties中加入下面參數
message.max.bytes=1000000000
replica.fetch.max.bytes=1073741824
同一時候在consumer.properties中加入下面參數:
fetch.message.max.bytes=1073741824
而後從新啓動kafka進程就能夠。現在每次最大可接收100M的消息。
五、open too many files
kafka出現異常。日誌提示open too many file
查找文件打開數量
lsof -p 30353 | wc
假設在1000以上。通常都是不正常,走過65535就會出錯。
緣由打開了太多producer,沒關閉,調用producer.close()就能夠。
6、API
7、zookeeper中的內容
默認狀況,kafka在zk的/brokers文件夾下記錄topic相關的信息,但假設在建立topic時,指定了路徑,則放置到固定的路徑中。如:
bin/kafka-topics.sh --create --zookeeper 192.168.169.91:2181,192.168.169.92:2181,192.168.169.93:2181/kafka --replication-factor 3 --partitions 5 --topic test_topic
建立的topic,其相關信息會放置到/kafka/brokers中,這個文件夾中主要包含2個子文件夾:ids 和 topics
一、ids:記錄這個kafka集羣中有多少個broker
如:
ls /kafka/brokers/ids/ 3 2 5 4
這個集羣有4個節點,節點id分別爲2,3,4。5。 咱們看一下內容
[zk: localhost:2181(CONNECTED) 27] get /kafka/brokers/ids/2 {"jmx_port":-1,"timestamp":"1435833841290","host":"kafka02-log.i.nease.net","version":1,"port":9092} cZxid = 0x1000e8a68 ctime = Thu Jul 02 18:44:01 HKT 2015 mZxid = 0x1000e8a68 mtime = Thu Jul 02 18:44:01 HKT 2015 pZxid = 0x1000e8a68 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x44e440d0bdf06eb dataLength = 104 numChildren = 0記錄着這個節點的一些基本狀況。
二、topics
先看一下有哪些內容:
[zk: localhost:2181(CONNECTED) 29] ls /kafka/brokers/topics/test30/partitions [3, 2, 1, 0, 4]
[zk: localhost:2181(CONNECTED) 30] ls /kafka/brokers/topics/test30/partitions/0 [state]
[zk: localhost:2181(CONNECTED) 1] get /kafka/brokers/topics/test30/partitions/0/state {"controller_epoch":4,"leader":5,"version":1,"leader_epoch":2,"isr":[5]} cZxid = 0x100017c5e ctime = Wed Jul 01 14:54:24 HKT 2015 mZxid = 0x1000e8a84 mtime = Thu Jul 02 18:44:01 HKT 2015 pZxid = 0x100017c5e cversion = 0 dataVersion = 2 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 72 numChildren = 0可以看某個分區的leader是哪一個。從而讀取kafka消息時。可以從這個leader中讀取數據。