前言:html
上週末本身學習了一下Kafka,參考網上的文章,學習過程當中仍是比較順利的,遇到的一些問題最終也都解決了,如今將學習的過程記錄與此,供之後本身查閱,若是能幫助到其餘人,天然是更好的。java
===============================================================長長的分割線====================================================================apache
正文:服務器
關於Kafka的理論介紹,網上能夠搜到到不少的資料,你們能夠自行搜索,我這裏就不在重複贅述。app
本文中主要涉及三塊內容: 第一,就是搭建Zookeeper環境;第二,搭建Kafka環境,並學習使用基本命令發送接收消息;第三,使用Java API完成操做,以便初步瞭解在實際項目中的使用方式。less
閒話少說,言歸正傳,本次的目的是利用VMware搭建一個屬於本身的ZooKeeper和Kafka集羣。本次咱們選擇的是VMware10,具體的安裝步驟你們能夠到網上搜索,資源不少。maven
第一步,肯定目標:分佈式
ZooKeeperOne 192.168.224.170 CentOS學習
ZooKeeperTwo 192.168.224.171 CentOSui
ZooKeeperThree 192.168.224.172 CentOS
KafkaOne 192.168.224.180 CentOS
KafkaTwo 192.168.224.181 CentOS
咱們安裝的ZooKeeper是3.4.6版本,能夠從這裏下載zookeeper-3.4.6; Kafka安裝的是0.8.1版本,能夠從這裏下載kafka_2.10-0.8.1.tgz; JDK安裝的版本是1.7版本。
另: 我在學習的時候,搭建了兩臺Kafka服務器,正式環境中咱們最好是搭建2n+1臺,此處僅做爲學些之用,暫不計較。
第二步,搭建Zookeeper集羣:
此處你們能夠參照我以前寫的一篇文章 ZooKeeper1 利用虛擬機搭建本身的ZooKeeper集羣 ,我在搭建Kafka的環境的時候就是使用的以前搭建好的Zookeeper集羣。
第三步,搭建Kafka集羣:
(1). 將第一步中下載的 kafka_2.10-0.8.1.tgz 解壓縮後,進入config目錄,會看到以下圖所示的一些配置文件,咱們準備編輯server.properties文件。
(2). 打開 server.properties 文件,須要編輯的屬性以下所示:
1 broker.id=0 2 port=9092 3 host.name=192.168.224.180 4 5 log.dirs=/opt/kafka0.8.1/kafka-logs 6 7 zookeeper.connect=192.168.224.170:2181,192.168.224.171:2181,192.168.224.172:2181
注意:
a. broker.id: 每一個kafka對應一個惟一的id,自行分配便可
b. port: 默認的端口號是9092,使用默認端口便可
c. host.name: 配置的是當前機器的ip地址
d. log.dirs: 日誌目錄,此處自定義一個目錄路徑便可
e. zookeeper.connect: 將咱們在第二步搭建的Zookeeper集羣的配置所有寫上
(3). 上邊的配置完畢後,咱們須要執行命令 vi /etc/hosts,將相關服務器的host配置以下圖,若是沒有執行此步,後邊咱們在執行一些命令的時候,會報沒法識別主機的錯誤。
(4). 通過上述操做,咱們已經完成了對Kafka的配置,很簡單吧?!可是若是咱們執行 bin/kafka-server-start.sh config/server.properties & 這個啓動命令,可能咱們會遇到以下兩個問題:
a. 咱們在啓動的報 Unrecognized VM option '+UseCompressedOops'.Could not create the Java virtual machine. 這個錯誤。
解決方式:
查看 bin/kafka-run-class.sh
找到下面這段代碼,去掉-XX:+UseCompressedOops
1 if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then 2 KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true" 3 fi
b. 解決了第一個問題,咱們還有可能在啓動的時候遇到 java.lang.NoClassDefFoundError: org/slf4j/impl/StaticLoggerBinder 這個錯誤。
解決方式:
從網上的下載 slf4j-nop-1.6.0.jar 這個jar包,而後放到kafka安裝目錄下的libs目錄中便可。注意,基於我目前的kafka版本,我最開始從網上下載的slf4j-nop-1.5.0.jar 這個jar包,可是啓動的時候依然會報錯,因此必定要注意版本號哦~
(5). 如今咱們執行 bin/kafka-server-start.sh config/server.properties & 這個啓動命令,應該就能夠正常的啓動Kafka了。命令最後的 & 符號是爲了讓啓動程序在後臺執行。若是不加這個 & 符號,當執行完啓動後,咱們一般會使用 ctrl + c 退出當前控制檯,kafka此時會自動執行shutdown,因此此處最好加上 & 符號。
第三步,使用基本命令建立消息主題,發送和接收主題消息:
(1). 建立、查看消息主題
1 #鏈接zookeeper, 建立一個名爲myfirsttopic的topic 2 bin/kafka-topics.sh --create --zookeeper 192.168.224.170:2181 --replication-factor 2 --partitions 1 --topic myfirsttopic 3 4 # 查看此topic的屬性 5 bin/kafka-topics.sh --describe --zookeeper 192.168.224.170:2181 --topic myfirsttopic 6 7 # 查看已經建立的topic列表 8 bin/kafka-topics.sh --list --zookeeper 192.168.224.170:2181
上述命令執行完畢後,截圖以下:
(2). 建立一個消息的生產者:
1 #啓動生產者,發送消息 2 bin/kafka-console-producer.sh --broker-list 192.168.224.180:9092 --topic myfirsttopic 3 4 #啓動消費者,接收消息 5 bin/kafka-console-consumer.sh --zookeeper 192.168.224.170:2181 --from-beginning --topic myfirsttopic
上述命令執行完畢後,截圖以下:
(3). 按照(1)、(2)這兩步,你應該能夠利用Kafka感覺到了分佈式消息系統。這裏須要着重的再說一下我在這個過程當中發現的一個問題: 你們能夠看下上圖中的consumer的命令,我選擇了zookeeper的其中一臺192.168.224.170:2181接收消息是能夠正常接收的!不要忘了,我是三臺zookeeper的,因此我又嘗試了向192.168.224.171:2181和192.168.224.172:2181接收myfirsttopic這個主題的消息。正常狀況下,三臺訪問的結果應該都是能夠正常的接收消息,可是當時個人狀況在訪問了192.168.224.171:2181這臺時會報 org.apache.zookeeper.clientcnxn 這個錯誤!!!
我當時多試了兩遍,發現個人三臺zookeeper中,誰是leader(zkServer.sh status命令),concumer鏈接的時候就會報上面的那個異常。後來定位到了zookeeper的zoo.cfg配置文件中的maxClientCnxns屬性,即客戶端最大鏈接數,我當時使用的是默認配置是2。後來我把這個屬性的值調大一些,consumer鏈接zookeeper leader時,就不會報這個錯誤了。若是你選擇將這個屬性註釋掉(從網上查詢到註釋掉該屬性默認值是10),也不會報這個錯誤了。其實網上的不少文章也只是說了此屬性能夠儘可能設置的大一些,沒有解釋其餘的。
但我後來仍是仔細想了想,當我把maxClientCnxns這個屬性設置爲2時,若是兩臺kafka啓動時,每一個kafka和zookeeper的節點之間創建了一個客戶端鏈接,那麼此時zookeeper的每一個節點的客戶端鏈接數就已經達到了最大鏈接數2,那麼我建立consumer的時候,應該是三臺zookeeper鏈接都有問題,而不是隻有leader會有問題。因此,此處須要各位有看法的再幫忙解釋一下!!!
第四步,使用Java API 操做Kafka:
其實Java API提供的功能基本也是基於上邊的客戶端命令來實現的,萬變不離其宗,我將我整理的網上的例子貼到下面,你們能夠在本地Java工程中執行一下,便可瞭解調用方法。
(1). 個人maven工程中pom.xml的配置
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 2 <modelVersion>4.0.0</modelVersion> 3 <groupId>com.ismurf.study</groupId> 4 <artifactId>com.ismurf.study.kafka</artifactId> 5 <version>0.0.1-SNAPSHOT</version> 6 <name>Kafka_Project_0001</name> 7 <packaging>war</packaging> 8 9 <dependencies> 10 <dependency> 11 <groupId>org.apache.kafka</groupId> 12 <artifactId>kafka_2.10</artifactId> 13 <version>0.8.1.1</version> 14 </dependency> 15 </dependencies> 16 17 <build> 18 <plugins> 19 <plugin> 20 <groupId>org.apache.maven.plugins</groupId> 21 <artifactId>maven-war-plugin</artifactId> 22 <version>2.1.1</version> 23 <configuration> 24 <outputFileNameMapping>@{artifactId}@.@{extension}@</outputFileNameMapping> 25 </configuration> 26 </plugin> 27 28 <!-- Ensures we are compiling at 1.6 level --> 29 <plugin> 30 <groupId>org.apache.maven.plugins</groupId> 31 <artifactId>maven-compiler-plugin</artifactId> 32 <configuration> 33 <source>1.6</source> 34 <target>1.6</target> 35 </configuration> 36 </plugin> 37 38 <plugin> 39 <groupId>org.apache.maven.plugins</groupId> 40 <artifactId>maven-surefire-plugin</artifactId> 41 <configuration> 42 <skipTests>true</skipTests> 43 </configuration> 44 </plugin> 45 </plugins> 46 </build> 47 48 </project>
(2). 實例代碼: 你們能夠參考這片文章的 http://blog.csdn.net/honglei915/article/details/37563647 中的代碼,粘貼到工程後便可使用,上述文章中的代碼整理後目錄截圖以下: