歸屬公司
Apache Kafka
軟件語言:scalahtml
# 查看防火牆狀態 systemctl status firewalld # 關閉防火牆 service firewalld stop # 啓動防火牆 service firewalld start
# 檢查java的命令 java -version
# 獲取kafka最新安裝包,這邊使用的是鏡像地址,能夠去官方網站得到最新地址 wget http://mirrors.hust.edu.cn/apache/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz # 解壓程序 tar -xzf kafka_2.11-0.11.0.1.tgz # 進入目錄 cd kafka_2.11-0.11.0.1
# 配置服務器zk地址 zookeeper.connect=localhost:2181 # 配置內網綁定關係 listeners=PLAINTEXT://<your.ip>:9092 # 配置外網綁定關係 advertised.listeners=PLAINTEXT://your.host.name:9092 # 配置kafka使用內存kafka-server-start.sh # 在start中加入jvm的啓動參數,默認是1G export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
kafka須要使用zookeeper,因此你首先須要啓動一個zookeeper的服務,若是你沒有的話,就使用kafka內置的腳原本啓動一個單節點的zookeeper的實例
加入& 使進程常駐在內存中
默認端口:9092
默認爲localhost,若是不配置對應的服務器ip的話java
#執行快速啓動zookeeper,經過內置的zookeeper進行啓動,若是要zookeeper服務器的話嗎,須要再server.properties的配置文件裏面加入zookeeper.connect = 你的服務器內網ip:2181 bin/zookeeper-server-start.sh config/zookeeper.properties & [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...
而後啓動kafka的服務器:經過配置文件啓動kafkagit
# 啓動kafka # server.properties的配置文件中有一個項目: host.name須要配置成爲你的內網服務器ip地址,訪問的時候經過外網環境經過外網ip地址訪問,內網環境經過內網地址訪問 bin/kafka-server-start.sh config/server.properties & [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...
# 經過腳本命令建立一個主題爲test的,而且使用的zookeeper的地址爲localhost的 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
確認一下這個topic是否含有完畢github
# 經過zookeeper的地址來訪問對應的topics中的主題列表 bin/kafka-topics.sh --list --zookeeper localhost:2181 test
# 啓動客戶端推送對應的消息到服務器的kafka提供的端口 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
# 啓動客戶端獲取對應服務端信息的地址來消費消息,使用pull的方式,每間隔0.1s進行一次服務器獲取 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
這個是在服務器本地測試場景:
切換成服務器場景的狀況下,須要首先在將server.properties的配置文件中的
配置方式修正爲服務器的內網ip地址,對外提供的外網ip地址會進行映射,映射到最終的內網地址中去
新版的只須要修改以下兩個配置:參考文章
http://blog.csdn.net/chenxun_2010/article/details/72626618
zookeeper.connect = localhost:9092
listeners = PLAINTEXT://ip:9092apache
服務器kafka版本:2.11- 0.11.1
客戶端kafka版本:0.11.0.1bootstrap
因此去maven中尋找對應的版本的jar包進行使用
api
org.apache.kafka
kafka-clients
0.11.0.1
包結構:
clients
admin
consumer:KafkaProducer(實現類)
producer:KafkaConsumer(實現類)
otherclass
common
server.policy(服務公用)
在具體實現類裏面源碼中有啓動的示例代碼再類的頭部註釋中服務器
遇到了一個問題:
Failed to load class "org.slf4j.impl.StaticLoggerBinder".框架
kafka默認引用再java類中在程序中引入了org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar的jar包
引入對應的實現日誌的框架使用logback框架jvm
再pom的配置文件中加入對應的依賴
而且debug的日誌等級很煩人,因此就加入了配置文件
參考文章:http://www.cnblogs.com/h--d/p/5668152.html
加入logback的庫依賴引用
至少須要引用三個模塊:
logback-classic
logback-core
logback-access
這三個模塊的內容
其中參考了這篇文章以爲很詳細,因此就提供出來:
http://www.cnblogs.com/warking/p/5710303.html
使用一個新的工程進行測試,一個啓動消費者,一個啓動生產者,編寫對應的代碼