【運維技術】從零開始搭建開發使用的Kafka環境

【原創】從零開始搭建開發使用的Kafka環境

入門資料

  1. 百度百科:
    Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。 對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是經過Hadoop的並行加載機制來統一線上和離線的消息處理,也是爲了經過集羣來提供實時的消費。
  2. 歸屬公司
    Apache Kafka
    軟件語言:scalahtml

  3. 相關術語介紹
  • Broker: Kafka集羣包含一個或多個服務器,這種服務器被稱爲broker[
  • Topic:每條發佈到Kafka集羣的消息都有一個類別,這個類別被稱爲Topic。(物理上不一樣Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic便可生產或消費數據而沒必要關心數據存於何處)
  • Partition:Partition是物理上的概念,每一個Topic包含一個或多個Partition.
  • Producer:賦值發佈消息到負責發佈消息到Kafka broker(生產者)
  • Consumer:消息消費者,向Kafka broker讀取消息的客戶端。
  • Consumer Group:每一個Consumer屬於一個特定的Consumer Group(可爲每一個Consumer指定group name,若不指定group name則屬於默認的group)。

目標以及流程:

  1. 單機搭建kafka
  2. 集羣搭建kafka(下一步)
  3. 搭建kafka控制檯(下一步)

kafka控制檯選型:

  1. Kafka Web Console
  2. Kafka Manager
  3. KafkaOffsetMonitor
    比較:
    若只須要監控功能,推薦使用KafkaOffsetMonito,若偏重Kafka集羣管理,推薦使用Kafka Manager。
    由於都是開源程序,穩定性欠缺。故需先了解清楚目前已存在哪些Bug,多測試一下,避免出現相似於Kafka Web Console的問題。

開始單機搭建kafka:

  1. 官網:http://kafka.apache.org/intro
  2. 學習官方網站的快速啓動教程:http://kafka.apache.org/quickstart
  3. 官網的教程比較有服務器上的測試

開始前的備註

# 查看防火牆狀態
systemctl status firewalld
# 關閉防火牆
service firewalld stop  
# 啓動防火牆
service firewalld start

首先要確認你已經安裝了java環境

# 檢查java的命令
java -version

1:下載kafka並解壓

# 獲取kafka最新安裝包,這邊使用的是鏡像地址,能夠去官方網站得到最新地址
wget http://mirrors.hust.edu.cn/apache/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz
# 解壓程序
tar -xzf kafka_2.11-0.11.0.1.tgz
# 進入目錄
cd kafka_2.11-0.11.0.1

配置對應的配置文件,server.properties

# 配置服務器zk地址
zookeeper.connect=localhost:2181
# 配置內網綁定關係
listeners=PLAINTEXT://<your.ip>:9092
# 配置外網綁定關係
advertised.listeners=PLAINTEXT://your.host.name:9092
# 配置kafka使用內存kafka-server-start.sh
# 在start中加入jvm的啓動參數,默認是1G
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"

2:啓動服務器

kafka須要使用zookeeper,因此你首先須要啓動一個zookeeper的服務,若是你沒有的話,就使用kafka內置的腳原本啓動一個單節點的zookeeper的實例
加入& 使進程常駐在內存中
默認端口:9092
默認爲localhost,若是不配置對應的服務器ip的話java

#執行快速啓動zookeeper,經過內置的zookeeper進行啓動,若是要zookeeper服務器的話嗎,須要再server.properties的配置文件裏面加入zookeeper.connect = 你的服務器內網ip:2181
bin/zookeeper-server-start.sh config/zookeeper.properties &

[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

而後啓動kafka的服務器:經過配置文件啓動kafkagit

# 啓動kafka
# server.properties的配置文件中有一個項目: host.name須要配置成爲你的內網服務器ip地址,訪問的時候經過外網環境經過外網ip地址訪問,內網環境經過內網地址訪問
bin/kafka-server-start.sh config/server.properties &
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

3:建立一個topic名字叫作test

# 經過腳本命令建立一個主題爲test的,而且使用的zookeeper的地址爲localhost的
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

確認一下這個topic是否含有完畢github

# 經過zookeeper的地址來訪問對應的topics中的主題列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
test

4:啓動一個生產者發送一些消息過去

# 啓動客戶端推送對應的消息到服務器的kafka提供的端口
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a message
This is another message

5:啓動一個消費者獲取對應主題的消息

# 啓動客戶端獲取對應服務端信息的地址來消費消息,使用pull的方式,每間隔0.1s進行一次服務器獲取
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

This is a message
This is another message

6:將kafka建立集羣節點(暫時省略)

7:使用kafka鏈接進行導入導出數據(暫時省略)

8:使用kafka的流去處理數據(暫時省略)

使用場景切換:本地服務器,變成真實服務器,首先提供外部調用,應該使用的是服務器的地址

這個是在服務器本地測試場景:
切換成服務器場景的狀況下,須要首先在將server.properties的配置文件中的
配置方式修正爲服務器的內網ip地址,對外提供的外網ip地址會進行映射,映射到最終的內網地址中去
新版的只須要修改以下兩個配置:參考文章
http://blog.csdn.net/chenxun_2010/article/details/72626618
zookeeper.connect = localhost:9092
listeners = PLAINTEXT://ip:9092apache

java項目進行場景測試:

服務器kafka版本:2.11- 0.11.1
客戶端kafka版本:0.11.0.1bootstrap

因此去maven中尋找對應的版本的jar包進行使用


org.apache.kafka
kafka-clients
0.11.0.1
api

查看具體的api調用:

  1. 生產者:org.apache.kafka.clients.producer.KafkaProducer.class
  2. 消費者:org.apache.kafka.clients.consumer.KafkaConsumer.class

包結構:
clients
admin
consumer:KafkaProducer(實現類)
producer:KafkaConsumer(實現類)
otherclass
common
server.policy(服務公用)
在具體實現類裏面源碼中有啓動的示例代碼再類的頭部註釋中服務器

遇到了一個問題:
Failed to load class "org.slf4j.impl.StaticLoggerBinder".框架

kafka默認引用再java類中在程序中引入了org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar的jar包
引入對應的實現日誌的框架使用logback框架jvm

再pom的配置文件中加入對應的依賴
而且debug的日誌等級很煩人,因此就加入了配置文件
參考文章:http://www.cnblogs.com/h--d/p/5668152.html

加入logback的庫依賴引用
至少須要引用三個模塊:
logback-classic
logback-core
logback-access
這三個模塊的內容
其中參考了這篇文章以爲很詳細,因此就提供出來:
http://www.cnblogs.com/warking/p/5710303.html

使用一個新的工程進行測試,一個啓動消費者,一個啓動生產者,編寫對應的代碼

  1. 編寫單元測試讀取配置創建鏈接,發送消息
  2. 生產者消費者所須要的依賴jar包
  3. 啓動線程消費者拉取消息成功消費(實現消息隊列的功能)

源碼示例你們能夠去github上拉取:

https://github.com/fly-piglet/kafkastudy

相關文章
相關標籤/搜索