花費越少的精力在數據移動上,就能越專一於核心業務 --- 《Kafka:The Definitive Guide》java
認識 Kafka 以前,先了解一下發佈與訂閱消息系統:消息的發送者不會直接把消息發送給接收者、發送者以某種方式對消息進行分類,接收者訂閱它們,以便能接受特定類型的消息。發佈與訂閱系統通常會有一個 broker(n. 經紀人、中間商) 也就是發佈消息的中心點。linux
Kafka 是一款基於發佈與訂閱的消息系統,通常被稱爲「分佈式提交日誌」或者「分佈式流平臺」。 Kafka 的數據單元被稱做消息,能夠看做是數據庫中的一行數據,消息是由字節數組組成,故對 kafka 來講消息沒有特別的意義,消息能夠有一個可選的元數據,也就是鍵。鍵也是一個字節數組,一樣對於 kafka 沒有什麼特殊意義。鍵能夠用來將消息以一種可控的方式寫入分區。最簡單的例子就是爲鍵生成一個一致性散列值,而後使用散列值對主題分區數進行取模,爲消息選擇分區。這樣能夠保證具備相同鍵的消息老是被寫在相同的分區上。保證消息在一個主題中順序讀取。redis
爲了提升效率,消息將被分批次寫入 Kafka 。批次就是一組消息,相似於 redis 中的流水線(Pipelined)操做。shell
kafka 的消息經過主題進行分類,主題就至關於數據庫中的表,主題能夠被分紅若干個分區,一個分區就是一個提交日誌,消息以追加的形式被寫入分區。而後按照先入先出的順序讀取。一個主題下的分區也能夠在不一樣的服務器上,以此提供比單個服務器更增強大的性能數據庫
Kafka 的客戶端就是 Kafka 系統的用戶,通常狀況下有兩種基本類型:生產者和消費者apache
Producer 生產者建立消息,通常狀況下,一個消息會被髮布到一個特定的主題上。生產者在默認狀況下將消息均分在主題的每一個分區上數組
Consumer 消費者讀取消息,消費者訂閱一個或多個主題,並按照消息的生成順序讀取他們,消費者經過檢查消息的偏移量來區分已經讀過的消息。這個偏移量會被消費者在 zk 或者 kafka 上保存,若是消費者關閉或者重啓,他的讀取狀態不會消失服務器
消費者是消費者羣組 Consumer group的一部分,羣組能夠保證每一個分區被一個消費者消費(所以消費者數量不能大於分區數量,會形成消費者服務器的浪費),若是一個消費者失效,羣組裏的其餘消費者能夠接管失效消費者的工做。網絡
Kafka 使用 Zookeeper(後面簡稱zk) 保存集羣的元數據信息和消費者信息, Kafka 發行版本自帶 zk,能夠直接從腳本啓動,不過安裝一個完整版的 zk 也不難socket
官方下載地址:http://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/
若是下載速度不如意,可使用個人藍奏雲:https://keats.lanzous.com/iMWi8dpi04f 提取碼: keats
安裝目錄: /usr/local/zookeeper
數據目錄: /var/lib/zookeeper
# tar -zxf zookeeper-3.4.6.tar.gz # mv zookeeper-3.4.6 /usr/local/zookeeper # mkdir -p /var/lib/zookeeper # cat > /usr/local/zookeeper/conf/zoo.cfg << EOF > tickTime=2000 > dataDir=/var/lib/zookeeper > clientPort=2181 > EOF # 接着設置一下環境變量中的 JAVA_HOME,能夠先使用 export 命令查看是否已經設置 # export JAVA_HOME=/xxx # 最後切換到 zk 安裝目錄,啓動 zk # /usr/local/zookeeper/bin/zkServer.sh start
接着經過四字命令 srvr 驗證 zk 是否安裝正確
# telnet localhost 2181 Trying ::1... Connected to localhost. Escape character is '^]'. srvr Zookeeper version: 3.4.6-1569965, built on 02/20/2014 09:09 GMT Latency min/avg/max: 0/0/0 Received: 1 Sent: 0 Connections: 1 Outstanding: 0 Zxid: 0x0 Mode: standalone Node count: 4 Connection closed by foreign host. [root@linux-keats bin]# pwd /usr/local/zookeeper/bin
下載: https://archive.apache.org/dist/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
藍奏雲:下載後將後綴名 zip 改成 tgz:https://keats.lanzous.com/iaZ9hdpj5bi
# tar -zxf kafka_2.11-0.9.0.1.tgz # mv kafka_2.11-0.9.0.1 /usr/local/kafka # mkdir /tmp/kafka-logs # /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
其中 -dadmon 表示 kafka 以守護線程的形式啓動
#broker 的全局惟一編號,集羣中不能重複。int類型 broker.id=0 #是否容許刪除 topic delete.topic.enable=true #處理網絡請求的線程數量 num.network.threads=3 #處理磁盤 IO 的線程數量 num.io.threads=8 #發送套接字的緩衝區大小 socket.send.buffer.bytes=102400 #接收套接字的緩衝區大小 socket.receive.buffer.bytes=102400 #請求套接字的緩衝區大小 socket.request.max.bytes=104857600 #kafka 運行日誌(此日誌很是規意義的日誌)存放的路徑。用上一步建立的目錄。 log.dirs=/tmp/kafka-logs #topic 建立時默認的分區數 num.partitions=1 #用來恢復和清理 data 下數據的線程數量 num.recovery.threads.per.data.dir=1 #segment 文件保留的最長時間,超時將被刪除 log.retention.hours=168 #配置鏈接 Zookeeper 地址。若是是 zk 集羣,使用 , 隔開 zookeeper.connect=localhost:2181
zk 集羣的安裝請度娘 zk 集羣,kafka 能夠按照末尾參考文獻安裝集羣。我這裏測試服務器性能不行還跑了幾個 java 程序,就不裝集羣了
主題相關操做
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test Created topic "test".
建立好主題後,logs 文件夾內就會出現 主題名-分區名 的提交日誌
往主題發送消息
# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test Test Message 1 Test Message 2 ^D
從測試主題讀取消息
# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning Test Message 1 Test Message 2 ^C Processed a total of 2 messages
《kafka權威指南》 --- 美國人著