什麼是Kafka
Kafka是一種高吞吐量 的分佈式發佈訂閱消息系統,有以下特性:
1>.經過O(1)的磁盤數據結構提供消息的持久化,這種結構對於即便數以TB的消息存儲也可以保持長時間的穩定性能。
2>.高吞吐量:即便是很是普通的硬件Kafka也能夠支持每秒數百萬的消息。
3>.支持經過Kafka服務器和消費機集羣來分區消息。
4>.支持Hadoop並行數據加載。
Kafka經常使用術語
Broker
Kafka集羣包含一個或多個服務器,這種服務器被稱爲broker
Topic
每條發佈到Kafka集羣的消息都有一個類別,這個類別被稱爲Topic。(物理上不一樣Topic的消息分開存儲,邏輯上一個Topic的消息
雖然保存於一個或多個broker上但用戶只需指定消息的Topic便可生產或消費數據而沒必要關心數據存於何處)
Partition
Partition是物理上的概念,每一個Topic包含一個或多個Partition.爲了更好的實現負載均衡和消息的順序性,kafka的producer在
分發消息時能夠經過分發策略發送給指定的partition。實現分發的程序是須要制定消息的key值,而kafka經過key進行策略分發。
Producer
負責發佈消息到Kafka broker
Consumer
消息消費者,向Kafka broker讀取消息的客戶端。
Consumer Group
每一個Consumer屬於一個特定的ConsumerGroup(可爲每一個Consumer指定group name,若不指定group name則屬於默認的group)。
Kafka官網在哪?
http://kafka.apache.org/
Kafka安裝與配置
JDK安裝
下載JDK (注意:直接複製到下載工具進行下載,版本請使用JDK7,JDK8有可能不兼容kafka_2.11-0.8.2.1)
cd /usr/local/src
wget http://download.oracle.com/otn-pub/java/jdk/7u79-b15/jdk-7u79-linux-x64.rpm
chmod +x jdk-7u79-linux-x64.rpm # 添加執行權限
rpm -ivh jdk-7u79-linux-x64.rpm #安裝
cd /usr/java/ #安裝完成以後,能夠cd /usr/java/ 到安裝目錄查看到安裝目錄查看
添加JDK到系統環境變量
vim /etc/profile #編輯,在最後添加如下代碼
export JAVA_HOME=/usr/java/jdk1.7.0_79
export PATH=$PATH:$JAVA_HOME/bin:/usr/bin:/usr/sbin:/bin:/sbin:/usr/X11R6/bin
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
wq! #保存退出
source /etc/profile #使配置文件當即生效
java -version #出現下述信息 則安裝成功 可進行下一步
java version "1.7.0_79"
Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)
You have new mail in /var/spool/mail/root
kafka服務配置
cd /usr/local/src
wget http://archive.apache.org/dist/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz
#(kafka_2.11-0.8.2.1.tgz版本是已經編譯好的版本,解壓就能使用)
tar -xzvf kafka_2.11-0.8.2.1.tgz #解壓
mv kafka_2.11-0.8.2.1 /usr/local/kafka #移動到安裝目錄
mkdir /usr/local/kafka/log/kafka #建立kafka日誌目錄
mkdir /usr/local/kafka/zookeeper #建立zookeeper目錄
mkdir /usr/local/kafka/log/zookeeper #建立zookeeper日誌目錄
cd /usr/local/kafka/config #進入配置目錄
vim server.properties #編輯修改相應的參數
---------------------------------------------server.properties-------------------------------------
broker.id=0
port=9092 #端口號
host.name=192.168.0.11 #服務器IP地址,修改成本身的服務器IP
log.dirs=/usr/local/kafka/log/kafka #日誌存放路徑,上面建立的目錄
zookeeper.connect=localhost:2181 #zookeeper地址和端口,單機配置部署,localhost:2181
log.cleaner.enable=false #topic是否可刪除
zookeeper.connection.timeout.ms=6000 #超時設置
-----------------------------------------------------------------------------------------------------
vim zookeeper.properties #編輯修改相應的參數
---------------------------------------------zookeeper.properties------------------------------------
dataDir=/usr/local/kafka/zookeeper #zookeeper數據目錄
dataLogDir=/usr/local/kafka/log/zookeeper #zookeeper日誌目錄
clientPort=2181
maxClientCnxns=100
tickTime=2000
initLimit=10
syncLimit=5
------------------------------------------------------------------------------------------------------
cd /usr/local/kafka
vim kafkastart.sh #建立啓動腳本
---------------------------------------------kafkastart.sh--------------------------------------------
#!/bin/sh
#啓動zookeeper
/usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
sleep 3 #等3秒後執行
#啓動kafka
/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
-------------------------------------------------------------------------------------------------------
vim kafkastop.sh #建立中止腳本
----------------------------------------------kafkastop.sh---------------------------------------------
#!/bin/sh
#啓動zookeeper
/usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/config/zookeeper.properties &
sleep 3 #等3秒後執行
#啓動kafka
/usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties &
-------------------------------------------------------------------------------------------------------
chmod +x kafkastart.sh
chmod +x kafkastop.sh
server.properties 配置說明
name 默認值 描述
broker.id none 每個boker都有一個惟一的id做爲它們的名字.這就容許boker切換到別的主機/端口上,consumer依然知道
enable.zookeeper true 容許註冊到zookeeper
log.flush.interval.messages Long.MaxValue 在數據被寫入到硬盤和消費者可用前最大累積的消息的數量
log.flush.interval.ms Long.MaxValue 在數據被寫入到硬盤前的最大時間
log.flush.scheduler.interval.ms Long.MaxValue 檢查數據是否要寫入到硬盤的時間間隔。
log.retention.hours 168 控制一個log保留多長個小時
log.retention.bytes -1 控制log文件最大尺寸
log.cleaner.enable false 是否log cleaning
log.cleanup.policy delete delete仍是compat.其它控制參數還包括
log.cleaner.threads,log.cleaner.io.max.bytes.per.second,
log.cleaner.dedupe.buffer.size,log.cleaner.io.buffer.size,log.cleaner.io.buffer.load.factor,
log.cleaner.backoff.ms,log.cleaner.min.cleanable.ratio,log.cleaner.delete.retention.ms
log.dir /tmp/kafka-logs 指定log文件的根目錄
log.segment.bytes 110241024*1024 單一的log segment文件大小
log.roll.hours 24 * 7 開始一個新的log文件片斷的最大時間
message.max.bytes 1000000 + MessageSet.LogOverhead 一個socket 請求的最大字節數
num.network.threads 3 處理網絡請求的線程數
num.io.threads 8 處理IO的線程數
background.threads 10 後臺線程序
num.partitions 1 默認分區數
socket.send.buffer.bytes 102400 socket SO_SNDBUFF參數
socket.receive.buffer.bytes 102400 socket SO_RCVBUFF參數
zookeeper.connect localhost:2182/kafka 指定zookeeper鏈接字符串, 格式如hostname:port/chroot。
chroot是一個namespace
zookeeper.connection.timeout.ms 6000 指定客戶端鏈接zookeeper的最大超時時間
zookeeper.session.timeout.ms 6000 鏈接zk的session超時時間
zookeeper.sync.time.ms 2000 zk follower落後於zk leader的最長時間
producer.properties 配置說明
#指定kafka節點列表,用於獲取metadata,沒必要所有指定
metadata.broker.list=192.168.2.105:9092,192.168.2.106:9092
# 指定分區處理類。默認kafka.producer.DefaultPartitioner,表經過key哈希到對應分區
#partitioner.class=com.meituan.mafka.client.producer.CustomizePartitioner
# 是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮後消息中會有頭來指明消息壓縮類型,
# 故在消費者端消息解壓是透明的無需指定。
compression.codec=none
# 指定序列化處理類(mafka client API調用說明-->3.序列化約定wiki),默認爲kafka.serializer.DefaultEncoder,
# 即byte[]
serializer.class=com.meituan.mafka.client.codec.MafkaMessageEncoder
# serializer.class=kafka.serializer.DefaultEncoder
# serializer.class=kafka.serializer.StringEncoder
# 若是要壓縮消息,這裏指定哪些topic要壓縮消息,默認empty,表示不壓縮。
#compressed.topics=
########### request ack ###############
#消息的確認模式 默認爲0
# 0:不保證消息的到達確認,只管發送,低延遲可是會出現消息的丟失,在某個server失敗的狀況下,有點像TCP
# 1:發送消息,並會等待leader 收到確認後,必定的可靠性
# -1:發送消息,等待leader收到確認,並進行復制操做後,才返回,最高的可靠性
request.required.acks=0
# 在向producer發送ack以前,broker容許等待的最大時間
# 若是超時,broker將會向producer發送一個error ACK.意味着上一次消息由於某種
# 緣由未能成功(好比follower未能同步成功)
request.timeout.ms=10000
########## end #####################
# 同步仍是異步發送消息,默認「sync」表同步,"async"表異步。異步能夠提升發送吞吐量,
# 也意味着消息將會在本地buffer中,並適時批量發送,可是也可能致使丟失未發送過去的消息
producer.type=sync
############## 異步發送 (如下四個異步參數可選) ####################
# 在async模式下,當message被緩存的時間超過此值後,將會批量發送給broker,默認爲5000ms
# 此值和batch.num.messages協同工做.
queue.buffering.max.ms = 5000
# 在async模式下,producer端容許buffer的最大消息量
# 不管如何,producer都沒法儘快的將消息發送給broker,從而致使消息在producer端大量沉積
# 此時,若是消息的條數達到閥值,將會致使producer端阻塞或者消息被拋棄,默認爲10000
queue.buffering.max.messages=20000
# 若是是異步,指定每次批量發送數據量,默認爲200
batch.num.messages=500
# 當消息在producer端沉積的條數達到"queue.buffering.max.meesages"後
# 阻塞必定時間後,隊列仍然沒有enqueue(producer仍然沒有發送出任何消息)
# 此時producer能夠繼續阻塞或者將消息拋棄,此timeout值用於控制"阻塞"的時間
# -1: 無阻塞超時限制,消息不會被拋棄
# 0:當即清空隊列,消息被拋棄
queue.enqueue.timeout.ms=-1
################ end ###############
# 當producer接收到error ACK,或者沒有接收到ACK時,容許消息重發的次數
# 由於broker並無完整的機制來避免消息重複,因此當網絡異常時(好比ACK丟失)
# 有可能致使broker接收到重複的消息,默認值爲3.
message.send.max.retries=3
# producer刷新topic metada的時間間隔,producer須要知道partition leader的位置,以及當前topic的狀況
# 所以producer須要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,將會當即刷新
# (好比topic失效,partition丟失,leader失效等),此外也能夠經過此參數來配置額外的刷新機制,默認值600000
topic.metadata.refresh.interval.ms=60000
consumer.properties 配置說明
# zookeeper鏈接服務器地址,此處爲線下測試環境配置(kafka消息服務-->kafka broker集羣線上部署環境wiki)
# 配置例子:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
zookeeper.connect=192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka
# zookeeper的session過時時間,默認5000ms,用於檢測消費者是否掛掉,當消費者掛掉,其餘消費者要等該指定時間
# 才能檢查到而且觸發從新負載均衡
zookeeper.session.timeout.ms=5000
zookeeper.connection.timeout.ms=10000
# 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次得到的消息。一旦在更新zookeeper
# 發生異常並重啓,將可能拿到已拿到過的消息
zookeeper.sync.time.ms=2000
#指定消費組
group.id=xxx
# 當consumer消費必定量的消息以後,將會自動向zookeeper提交offset信息
# 注意offset信息並非每消費一次消息就向zk提交一次,而是如今本地保存(內存),並按期提交,默認爲true
auto.commit.enable=true
# 自動更新時間。默認60 * 1000
auto.commit.interval.ms=1000
# 當前consumer的標識,能夠設定,也能夠有系統生成,主要用來跟蹤消息消費狀況,便於觀察
conusmer.id=xxx
# 消費者客戶端編號,用於區分不一樣客戶端,默認客戶端程序自動產生
client.id=xxxx
# 最大取多少塊緩存到消費者(默認10)
queued.max.message.chunks=50
# 當有新的consumer加入到group時,將會reblance,此後將會有partitions的消費端遷移到新
# 的consumer上,若是一個consumer得到了某個partition的消費權限,那麼它將會向zk註冊
# "Partition Owner registry"節點信息,可是有可能此時舊的consumer尚沒有釋放此節點,
# 此值用於控制,註冊節點的重試次數.
rebalance.max.retries=5
# 獲取消息的最大尺寸,broker不會像consumer輸出大於此值的消息chunk
# 每次feth將獲得多條消息,此值爲總大小,提高此值,將會消耗更多的consumer端內存
fetch.min.bytes=6553600
# 當消息的尺寸不足時,server阻塞的時間,若是超時,消息將當即發送給consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
# 若是zookeeper沒有offset值或offset值超出範圍。那麼就給個初始的offset。有smallest、largest、
# anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。默認largest
auto.offset.reset=smallest
# 指定序列化處理類(mafka client API調用說明-->3.序列化約定wiki),默認爲kafka.serializer.DefaultDecoder,
# 即byte[]
derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder
參數參考可見:http://debugo.com/kafka-params/
kafka 服務啓動
sh /usr/local/kafka/kafkastart.sh #kafka啓動
sh /usr/local/kafka/kafkastop.sh #kafka中止
kafka經常使用命令
建立一個名爲test的topic
bin/kafka-topics.sh --create --zookeeper 192.168.1.3:2181 --replication-factor 1 --partitions 5 --topic b
查看當前建立的topic
bin/kafka-topics.sh --zookeeper 192.168.1.3::2181 --list
驗證生產消息成功
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.1.3::9092 --topic boyaa --time -1
#(參數解釋:--time -1 表示從最新的時間的offset中獲得數據條數)
#輸出結果每一個字段分別表示topic、partition、untilOffset
開啓消息生產者
bin/kafka-console-producer.sh --broker-list 192.168.1.3:9092 --sync --topic b
開啓消費者
bin/kafka-console-consumer.sh --zookeeper 192.168.201.73:2181 --topic boyaa --from-beginning
#(--from-beginning 若是consumer以前沒有創建offset,則從producer最開始的數據讀取。)