kafka的使用經驗

kafka參數說明(參考):
https://www.cnblogs.com/weixiuli/p/6413109.htmlhtml


kafka時間戳字段緣由(過時清理,日誌切分,流式處理),0.10版本開始纔有時間戳概念
https://www.cnblogs.com/huxi2b/p/6050778.htmljava

kafka消息是存放在磁盤上,發送一次,累積到必定數量或者時間間隔就落盤一次,消費一次就讀一次磁盤node

topic劃分爲若干分區,分區對一個目錄,分區劃分爲segment,一個segment對應三個二進制文件(後綴分別是index,log,timeindex),相似mysql存儲機制
消息數據存放在log文件裏面,對應的位置存放在index裏面,時間戳存放在timeindex裏面mysql


分區有副本概念,若是一個topic有10個分區,分3個節點,那麼多是3/3/4的存放方式,好比kjLog-1,kjLog-4,kjLog-7這樣的存放方式。
可是其餘kjLog-*目錄也會有,可是目錄下的index,log,timeindex文件必定是空的,沒有數據。sql


副本選舉機制:
若是有多個副本,會有一個選舉機制,假設有1個分區有5個副本,共6份,若是分區1掛了,其餘5份有3份及時和分區1同步了,就會進入ISR,這個東西存放在zk裏面
當kafka發現有個分區掛了後,就從ISR找到每一個可用的副本所在節點ID,下發通知,這些副本里面3份及時的分區對應的節點就會同時向zk註冊,zk機制是隻有一個節點能註冊成功,這樣先註冊的就選舉成功,成爲新的分區,其餘分區都要跟它保持同步。apache


===================
kafka配置:
cd /usr/local/src
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.1.1/kafka_2.11-0.10.1.1.tgz
tar -zxvf kafka_2.11-0.10.1.1.tgz -C /home/dig/service/
cd /home/dig/service/
ln -s kafka_2.11-0.10.1.0 kafkajson

vim /home/dig/service/kafka/config/consumer.properties
zookeeper.connect=bdp-001:2181,bdp-002:2181,bdp-003:2181
zookeeper.connection.timeout.ms=6000
group.id=test-consumer-groupbootstrap


vim /home/dig/service/kafka/config/producer.properties
metadata.broker.list=bdp-001:9092,bdp-002:9092,bdp-003:9092
compression.codec=nonevim

producer.type=sync
serializer.class=kafka.serializer.DefaultEncoder
batch.num.messages=200api


vim /home/dig/service/kafka/config/server.properties
broker.id=0
host.name=bdp-001
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/dig/service/kafka/data
num.partitions=6
num.recovery.threads.per.data.dir=1

#消息保留時間
log.retention.hours=168

#消息最多保留的字節數
log.segment.bytes=1073741824

#每隔多久檢查上面兩個參數是否達到閥值
log.retention.check.interval.ms=300000

log.cleaner.enable=false

zookeeper.connect=bdp-001:2181,bdp-002:2181,bdp-003:2181
zookeeper.connection.timeout.ms=6000
auto.create.topics.enable=false
delete.topic.enable=true


另外server.properties中,不要啓用自動建立topic:
auto.create.topics.enable=true
不然producer發送消息時,提示分區異常。


拷貝到另外兩個節點:
scp -r /home/dig/service/kafka_2.11-0.10.1.0/ bdp-002:/home/dig/service/
scp -r /home/dig/service/kafka_2.11-0.10.1.0/ bdp-003:/home/dig/service/


注意每一個節點id不同:
cat /home/dig/service/kafka/config/server.properties | grep broker.id


臨時啓動kafka:
rm -rf /tmp/bdp-logs
/home/dig/service/kafka/bin/kafka-server-start.sh /home/dig/service/kafka/config/server.properties


在3臺機器上分別執行:
rm -rf /home/dig/service/zookeeper/data/*

ps -ef |grep -v grep | egrep -i 'kafka|Kafka' |awk '{print $2}' |xargs -t -i kill -9 {}
rm -rf /tmp/bdp-logs && ll /tmp/bdp-logs || jps


全部節點關閉完後先啓動zk:
/home/dig/service/zookeeper/bin/zkServer.sh start

再啓動kafka:
/home/dig/service/kafka/bin/kafka-server-start.sh -daemon /home/dig/service/kafka/config/server.properties
jps
sleep 1
ll /tmp/bdp-logs


查看kafka啓動狀態:
ls /brokers/ids
輸入3個kafka實例的id即集羣啓動成功:
[1, 2, 3]


apache版本的kafka自帶關閉命令無效:
/home/dig/service/kafka/bin/kafka-server-stop.sh /home/dig/service/kafka/config/server.properties

/home/dig/service/kafka/bin/kafka-server-start.sh /home/dig/service/kafka/config/server.properties

有效關閉腳本:
ps -ef |grep -v grep | egrep -i 'kafka|Kafka' |awk '{print $2}' |xargs -t -i kill -9 {}


===========================================
測試kafak是否部署成功:


建立Topic
/home/dig/service/kafka/bin/kafka-topics.sh \
--zookeeper etl1:2181,etl2:2181,etl3:2181 \
--replication-factor 2 \
--partitions 3 \
--create \
--topic kjLog


或者這麼刪除(只刪除zk的元數據,分區文件沒有刪除):
kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic mytest1


刪除話題隊列:
kafka-topics.sh --zookeeper localhost:2181 --delete --topic mytest1


查看存在的話題隊列:
kafka-topics.sh --zookeeper localhost:2181 --list


向話題隊列mytest1 發送消息,運行後直接輸入:
kafka-console-producer.sh --broker-list kafka1:9092 --topic mytest1

消費者從話題隊列mytest3 中取消息(運行後,直接能夠看到輸出結果,老版本是連zk,新版本連bootstrap及broker):
kafka-console-consumer.sh \
--bootstrap-server etl3:9092 \
--topic kjLog \
--consumer-property group.id=kjEtlGroup

# 從頭開始消費
--from-beginning \


消費組是在消費時自動生成的,默認是console-consumer-31553,後面的數組是隨機的,
也能夠消費時指定具體消費組名字


查看有哪些消費組:
kafka-consumer-groups.sh --zookeeper localhost:2181 --list

等價於登陸zk,查看有哪些消費組:
ls /consumers/


===========================================
查看消息數量offset:
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --time -1 --topic kjLog --partitions 0
輸出topic的容量大小
kjLog:2:116060276
kjLog:1:70158992
kjLog:0:15411674


查看指定topic的詳細信息:
kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:12181 --topic kjLog --group kjEtlGroup
輸出結果:
消費者組 話題id 分區id 當前已消費的條數 總條數 未消費的條數 屬主
Group Topic Pid Offset logSize Lag Owner
kjEtlGroup kjLog 0 15484439 15484445 6 none
kjEtlGroup kjLog 1 70655159 70655189 30 none
kjEtlGroup kjLog 2 116860888 116860904 16 none


查看分區詳細信息:
kafka-topics.sh --zookeeper localhost:2181 --describe
輸出結果:
Topic:mytest4 PartitionCount:6 ReplicationFactor:2 Configs:
Topic: mytest4 Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: mytest4 Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1

表示Topic名稱是mytest4,一共6個分區,而且每一個分區都有兩份徹底同樣的。
分區0的leader節點是在id爲2的kafka實例上,複製節點是2和3兩個實例,正在服務的節點也是2和3
分區1的leader節點是在id爲3的kafka實例上,複製節點是3和1兩個實例,正在服務的節點也是3和1


===========================================

遷移消息:
vim topics-to-move.json
{"topics":
[{"topic": "mytest1"}],
"version":1
}


生成遷移計劃:
bdp-reassign-partitions.sh \
--zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181 \
--broker-list "4" \
--topics-to-move-json-file topics-to-move.json \
--generate


輸出以下結果:
Current partition replica assignment

{"version":1,"partitions":[{"topic":"mytest1","partition":2,"replicas":[1]},{"topic":"mytest1","partition":1,"replicas":[3]},{"topic":"mytest1","partition":0,"replicas":[2]}]}
Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"mytest1","partition":2,"replicas":[4]},{"topic":"mytest1","partition":1,"replicas":[4]},{"topic":"mytest1","partition":0,"replicas":[4]}]}


將 Proposed partition 對應json內容寫入到文件reassignment-node.json中

 

執行topic遷移:
bdp-reassign-partitions.sh \
--zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181 \
--reassignment-json-file reassignment-node.json \
--execute


執行遷移後,會在zk上註冊一個節點:
登陸zk查看: get /admin/reassign_partitions
結果以下:
{"version":1,"partitions":[{"topic":"mytest1","partition":2,"replicas":[4]}]}


查看遷移結果:
bdp-reassign-partitions.sh \
--zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181 \
--reassignment-json-file reassignment-node.json \
--verify


上面的方式並無增長新分區,而是對原有分區作了一個副本,增長一個topic的分區方法是:
kafka-topics.sh --zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181 --alter --partitions 5 --topic mytest1
這個命令的效果是topic爲mytest1的分區數量增長到5個,而不是在原有分區數之上再增長5個分區。

kafka分區只能增長,不能經過自帶的命令刪除(可是直接刪除文件夾能夠刪除),不然報錯:
Error while executing topic command The number of partitions for a topic can only be increased
kafka.admin.AdminOperationException: The number of partitions for a topic can only be increased


增長了分區後,遷移分區的方法就簡單了,
編寫以下json,其中12,13,14是分區id, 101,102,103,104,105,106是kafka實例id
vim
{
"partitions": [{
"topic": "mytest1",
"partition": 12,
"replicas": [101,102]
},
{
"topic": "mytest1",
"partition": 13,
"replicas": [103,104]
},
{
"topic": "mytest1",
"partition": 14,
"replicas": [105,106]
}],
"version": 1
}


而後執行分區重分佈命令:
bdp-reassign-partitions.sh \
--zookeeper bdp-001:2181,bdp-002:2181,bdp-003:2181 \
--reassignment-json-file partitions-extension-push-token-topic.json \
--execute

這樣topic爲mytest1的分區就改變了,存放到新的物理服務器的節點上了。


=============================================================
官方文檔:
http://kafka.apache.org/documentation.html


kafka開發文檔:
http://www.aboutyun.com/thread-9906-1-1.html


官方文檔翻譯:
http://www.cnblogs.com/lzqhss/p/4434901.html


參考翻譯:
http://blog.csdn.net/suifeng3051/article/details/48053965


=============================================================
問題1:
http://www.oschina.net/question/196281_248633?sort=time

這是Kafka消息分區形成的,你能夠去了解一下Kafka是如何分區的,就知道緣由了。

問題緣由多是:你的全部消息的Key都是同樣的,使用默認的Partitioner: hash(key)%numPartitions,這樣每次的partion num都是同樣的,因此數據都落到一個分區了。

而同一consumer grop並行消息,也是按照分區來分配的,由於只有一個分區上有數據,因此有一個consumer始終拿不到消息。

解決辦法:1.自定義分區函數。2.消息散列爲不一樣的key

 

問題2:
[2016-08-01 14:13:19,609] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
[2016-08-01 14:13:19,615] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)
[2016-08-01 14:13:27,784] ERROR [test-consumer-group_bdp-003-1470075194621-ae0d9636], error during syncedRebalance (kafka.consumer.ZookeeperConsumerConnector)
kafka.common.ConsumerRebalanceFailedException: test-consumer-group_bdp-003-1470075194621-ae0d9636 can't rebalance after 4 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:355)


解決這是一個bug:
There is a bug in kafka.tools.ConsumerOffsetChecker. If the a particular Zookeeper node holding consumed offset information doesn't exit, the tool exits throwing the execption.

For example, suppose you have a consumer group "mygroup" and a topic "topictest". Then the offset for partition 2 is maintained in Znode: /consumers/mygroup/offsets/topictest/2.

If there is no entry for partition 2 of topic topictest in Znode, then consumer offsetchecker tool will exit while checking offset for partition 2. Basically, it will fail while checking the first partition "n" for which the Znode /consumers/mygroup/offsets/topictest/n is missing on Zookeeper.


問題3:
zookeeper啓用日誌文件,修改conf下面的log4j.properties,但無效。

 

再修改bin下的zkEnv.sh才能生效:
if [ "x${ZOO_LOG_DIR}" = "x" ]
then
ZOO_LOG_DIR="/home/dig/service/zookeeper/log"
fi

if [ "x${ZOO_LOG4J_PROP}" = "x" ]
then
ZOO_LOG4J_PROP="INFO,ROLLINGFILE"
fi


問題4:
kafka消息存放多久?
這個取決於kafka消息清理策略。
默認kafka消息的清理策略是刪除,可是沒有明確寫入server.propertiesp文件中:
log.cleanup.policy=delete

默認消息保存一週時間,默認不限制消息總量的長度,檢查間隔是1ms。
log.retention.hours=168
log.retention.bytes=-1
log.retention.check.interval.ms=1

 

問題5:
logserver報錯:
[ERROR][kafka-producer-network-thread | producer-1][2017-04-05 09:37:21,129][LogServiceImpl.java:onCompletion:220]:Fail to send record to Kafka. Key: appupgrade_alive_user, Value Length: 190
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

緣由是,kafka集羣宕機或者重啓後,topic partition從新選舉leader,而且不是原來的leader後,而生產者仍是使用原來的leader來生產消息,報錯。
解決:重啓生產者所在服務器。

 

問題6:
logserver報錯:
[WARN][kafka-producer-network-thread | producer-1][2017-04-05 09:37:21,234][Sender.java:completeBatch:298]:Got error produce response with correlation id 23026690 on topic-partition kjLog-0, retrying (0 attempts left). Error: NOT_LEADER_FOR_PARTITION

緣由同上,都是由於leader從新選舉致使。

 

問題7:
logserver報錯:
536116 [ERROR][kafka-producer-network-thread | producer-1][2017-04-04 09:26:41,952][LogServiceImpl.java:onCompletion:102]:Fail to send record to Kafka. Key: konkasysteminfo_open_p age, Value Length: 462
536117 org.apache.kafka.common.errors.TimeoutException: Batch containing 46 record(s) expired due to timeout while requesting metadata from brokers for kjLog-2

緣由不肯定,初步懷疑也是分區選舉leader變動致使。

 

===========================================
1.kafka爲何要在topic里加入分區的概念?
topic是邏輯的概念,partition是物理的概念,對用戶來講是透明的。producer只須要關心消息發往哪一個topic,而consumer只關心本身訂閱哪一個topic,並不關心每條消息存於整個集羣的哪一個broker。
logsize是寫入分區的消息條數,offset是已經消費的條數,這兩個只都是從0開始,每次累加(i++),單位是條,不是字節
lag表示沒有消費的,已經緩存的條數,lag = logsize - offsize

爲了性能考慮,若是topic內的消息只存於一個broker,那這個broker會成爲瓶頸,沒法作到水平擴展。因此把topic內的數據分佈到整個集羣就是一個天然而然的設計方式。Partition的引入就是解決水平擴展問題的一個方案。

每一個partition能夠被認爲是一個無限長度的數組,新數據順序追加進這個數組。物理上,每一個partition對應於一個文件夾。一個broker上能夠存放多個partition。這樣,producer能夠將數據發送給多個broker上的多個partition,consumer也能夠並行從多個broker上的不一樣paritition上讀數據,實現了水平擴展
裏所講,每一個partition能夠被認爲是一個無限長度的數組,新數據順序追加進這個數組。物理上,每一個partition對應於一個文件夾。一個broker上能夠存放多個partition。這樣,producer能夠將數據發送給多個broker上的多個partition,consumer也能夠並行從多個broker上的不一樣paritition上讀數據,實現了水平擴展。


2.若是沒有分區,topic中的segment消息寫滿後,直接給訂閱者不是也能夠嗎?
「segment消息寫滿後」,consume消費數據並不須要等到segment寫滿,只要有一條數據被commit,就能夠立馬被消費

segment對應一個文件(實現上對應2個文件,一個數據文件,一個索引文件),一個partition對應一個文件夾,一個partition裏理論上能夠包含任意多個segment。因此partition能夠認爲是在segment上作了一層包裝。
這個問題換個角度問可能更好,「爲何有了partition還須要segment」。
若是不引入segment,一個partition直接對應一個文件(應該說兩個文件,一個數據文件,一個索引文件),那這個文件會一直增大。同時,在作data purge時,須要把文件的前面部分給刪除,不符合kafka對文件的順序寫優化設計方案。引入segment後,每次作data purge,只須要把舊的segment整個文件刪除便可,保證了每一個segment的順序寫。


三、kafka的消息生產者使用的包是import kafka.javaapi.producer.Producer,不是kafka.producer.Producer
import kafka.producer.KeyedMessage;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;

private final Producer<Integer, String> producer;
props.put("metadata.broker.list", KafkaProperties.kafkaConnect);
producer = new Producer<Integer, String>(new ProducerConfig(props));


四、kafka消息發送字節流,擴展序列化類 參考連接:
http://grepcode.com/file/repo1.maven.org/maven2/org.apache.twill/twill-core/0.1.0-incubating/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java


五、kafka消息部分代碼解析:
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

//topicCountMap是設置每一個topic開多少線程,每一個線程處理執行多個task,
//每一個task會從每一個topic的一個分區中消費數據,若是有3個分區,每一個task會同時從3個topic的分區中獲取數據,即每一個task會從3個分區中獲取數據。
//這裏的參數new Integer(1)表示kafka消費服務器開多少個線程,一般最好是線程數量小於等於對應topic的分區數量
topicCountMap.put(topic, new Integer(2));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

//這裏的get(0)表示從第一個分區中提取消息KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);ConsumerIterator<byte[], byte[]> it = stream.iterator();

相關文章
相關標籤/搜索