Kafka1 利用虛擬機搭建本身的Kafka集羣

前言:html

      上週末本身學習了一下Kafka,參考網上的文章,學習過程當中仍是比較順利的,遇到的一些問題最終也都解決了,如今將學習的過程記錄與此,供之後本身查閱,若是能幫助到其餘人,天然是更好的。java

===============================================================長長的分割線====================================================================apache

正文:服務器

  關於Kafka的理論介紹,網上能夠搜到到不少的資料,你們能夠自行搜索,我這裏就不在重複贅述。app

      本文中主要涉及三塊內容: 第一,就是搭建Zookeeper環境;第二,搭建Kafka環境,並學習使用基本命令發送接收消息;第三,使用Java API完成操做,以便初步瞭解在實際項目中的使用方式。less

  閒話少說,言歸正傳,本次的目的是利用VMware搭建一個屬於本身的ZooKeeper和Kafka集羣。本次咱們選擇的是VMware10,具體的安裝步驟你們能夠到網上搜索,資源不少。maven

  第一步,肯定目標:分佈式

      ZooKeeperOne       192.168.224.170  CentOS學習

      ZooKeeperTwo       192.168.224.171  CentOSui

      ZooKeeperThree     192.168.224.172  CentOS

      KafkaOne                192.168.224.180  CentOS

      KafkaTwo                192.168.224.181  CentOS

      咱們安裝的ZooKeeper是3.4.6版本,能夠從這裏下載zookeeper-3.4.6; Kafka安裝的是0.8.1版本,能夠從這裏下載kafka_2.10-0.8.1.tgz; JDK安裝的版本是1.7版本。

      另: 我在學習的時候,搭建了兩臺Kafka服務器,正式環境中咱們最好是搭建2n+1臺,此處僅做爲學些之用,暫不計較。

 

      第二步,搭建Zookeeper集羣:

      此處你們能夠參照我以前寫的一篇文章 ZooKeeper1  利用虛擬機搭建本身的ZooKeeper集羣 ,我在搭建Kafka的環境的時候就是使用的以前搭建好的Zookeeper集羣。

       

      第三步,搭建Kafka集羣:

      (1). 將第一步中下載的 kafka_2.10-0.8.1.tgz 解壓縮後,進入config目錄,會看到以下圖所示的一些配置文件,咱們準備編輯server.properties文件。

      

      (2). 打開 server.properties 文件,須要編輯的屬性以下所示:

1 broker.id=0
2 port=9092
3 host.name=192.168.224.180
4 
5 log.dirs=/opt/kafka0.8.1/kafka-logs
6 
7 zookeeper.connect=192.168.224.170:2181,192.168.224.171:2181,192.168.224.172:2181

      注意: 

      a. broker.id: 每一個kafka對應一個惟一的id,自行分配便可

      b. port: 默認的端口號是9092,使用默認端口便可

      c. host.name: 配置的是當前機器的ip地址

      d. log.dirs: 日誌目錄,此處自定義一個目錄路徑便可

      e. zookeeper.connect: 將咱們在第二步搭建的Zookeeper集羣的配置所有寫上

      (3). 上邊的配置完畢後,咱們須要執行命令 vi /etc/hosts,將相關服務器的host配置以下圖,若是沒有執行此步,後邊咱們在執行一些命令的時候,會報沒法識別主機的錯誤

       

      (4).  通過上述操做,咱們已經完成了對Kafka的配置,很簡單吧?!可是若是咱們執行 bin/kafka-server-start.sh   config/server.properties  & 這個啓動命令,可能咱們會遇到以下兩個問題:

       a. 咱們在啓動的報 Unrecognized VM option '+UseCompressedOops'.Could not create the Java virtual machine. 這個錯誤。

       

       解決方式:

       查看 bin/kafka-run-class.sh 

       找到下面這段代碼,去掉-XX:+UseCompressedOops     

1 if [ -z "$KAFKA_JVM_PERFORMANCE_OPTS" ]; then
2 KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true"
3 fi

        b. 解決了第一個問題,咱們還有可能在啓動的時候遇到 java.lang.NoClassDefFoundError: org/slf4j/impl/StaticLoggerBinder 這個錯誤。

        

        解決方式:

        從網上的下載 slf4j-nop-1.6.0.jar 這個jar包,而後放到kafka安裝目錄下的libs目錄中便可。注意,基於我目前的kafka版本,我最開始從網上下載的slf4j-nop-1.5.0.jar 這個jar包,可是啓動的時候依然會報錯,因此必定要注意版本號哦~

      (5). 如今咱們執行 bin/kafka-server-start.sh   config/server.properties  & 這個啓動命令,應該就能夠正常的啓動Kafka了。命令最後的 & 符號是爲了讓啓動程序在後臺執行。若是不加這個 & 符號,當執行完啓動後,咱們一般會使用 ctrl + c 退出當前控制檯,kafka此時會自動執行shutdown,因此此處最好加上 & 符號。

 

      第三步,使用基本命令建立消息主題,發送和接收主題消息:

      (1). 建立、查看消息主題

 1 #鏈接zookeeper, 建立一個名爲myfirsttopic的topic
 2 bin/kafka-topics.sh --create --zookeeper 192.168.224.170:2181 --replication-factor 2 --partitions 1 --topic myfirsttopic
 3 
 4 # 查看此topic的屬性  
 5 bin/kafka-topics.sh --describe --zookeeper 192.168.224.170:2181 --topic myfirsttopic  
 6 
 7 # 查看已經建立的topic列表  
 8 bin/kafka-topics.sh --list --zookeeper 192.168.224.170:2181

      上述命令執行完畢後,截圖以下:

       

       

      (2). 建立一個消息的生產者:

1 #啓動生產者,發送消息
2 bin/kafka-console-producer.sh --broker-list 192.168.224.180:9092 --topic myfirsttopic
3 
4 #啓動消費者,接收消息
5 bin/kafka-console-consumer.sh --zookeeper 192.168.224.170:2181 --from-beginning --topic myfirsttopic

      上述命令執行完畢後,截圖以下:

       

      (3). 按照(1)、(2)這兩步,你應該能夠利用Kafka感覺到了分佈式消息系統。這裏須要着重的再說一下我在這個過程當中發現的一個問題: 你們能夠看下上圖中的consumer的命令,我選擇了zookeeper的其中一臺192.168.224.170:2181接收消息是能夠正常接收的!不要忘了,我是三臺zookeeper的,因此我又嘗試了向192.168.224.171:2181和192.168.224.172:2181接收myfirsttopic這個主題的消息。正常狀況下,三臺訪問的結果應該都是能夠正常的接收消息,可是當時個人狀況在訪問了192.168.224.171:2181這臺時會報 org.apache.zookeeper.clientcnxn 這個錯誤!!!

             我當時多試了兩遍,發現個人三臺zookeeper中,誰是leader(zkServer.sh status命令),concumer鏈接的時候就會報上面的那個異常。後來定位到了zookeeper的zoo.cfg配置文件中的maxClientCnxns屬性,即客戶端最大鏈接數,我當時使用的是默認配置是2。後來我把這個屬性的值調大一些,consumer鏈接zookeeper leader時,就不會報這個錯誤了。若是你選擇將這個屬性註釋掉(從網上查詢到註釋掉該屬性默認值是10),也不會報這個錯誤了。其實網上的不少文章也只是說了此屬性能夠儘可能設置的大一些,沒有解釋其餘的。

             但我後來仍是仔細想了想,當我把maxClientCnxns這個屬性設置爲2時,若是兩臺kafka啓動時,每一個kafka和zookeeper的節點之間創建了一個客戶端鏈接,那麼此時zookeeper的每一個節點的客戶端鏈接數就已經達到了最大鏈接數2,那麼我建立consumer的時候,應該是三臺zookeeper鏈接都有問題,而不是隻有leader會有問題。因此,此處須要各位有看法的再幫忙解釋一下!!!       

 

      第四步,使用Java API 操做Kafka:

      其實Java API提供的功能基本也是基於上邊的客戶端命令來實現的,萬變不離其宗,我將我整理的網上的例子貼到下面,你們能夠在本地Java工程中執行一下,便可瞭解調用方法。

      (1). 個人maven工程中pom.xml的配置

 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 2   <modelVersion>4.0.0</modelVersion>
 3   <groupId>com.ismurf.study</groupId>
 4   <artifactId>com.ismurf.study.kafka</artifactId>
 5   <version>0.0.1-SNAPSHOT</version>
 6   <name>Kafka_Project_0001</name>
 7   <packaging>war</packaging>
 8   
 9   <dependencies>
10           <dependency>
11             <groupId>org.apache.kafka</groupId>
12             <artifactId>kafka_2.10</artifactId>
13             <version>0.8.1.1</version>
14         </dependency>
15   </dependencies>
16   
17   <build>
18       <plugins>
19           <plugin>
20             <groupId>org.apache.maven.plugins</groupId>
21             <artifactId>maven-war-plugin</artifactId>
22             <version>2.1.1</version>
23             <configuration>
24                 <outputFileNameMapping>@{artifactId}@.@{extension}@</outputFileNameMapping>
25             </configuration>
26         </plugin>
27           
28         <!-- Ensures we are compiling at 1.6 level -->
29         <plugin>
30             <groupId>org.apache.maven.plugins</groupId>
31             <artifactId>maven-compiler-plugin</artifactId>
32             <configuration>
33                 <source>1.6</source>
34                 <target>1.6</target>
35             </configuration>
36         </plugin>
37         
38         <plugin>
39             <groupId>org.apache.maven.plugins</groupId>
40             <artifactId>maven-surefire-plugin</artifactId>
41             <configuration>
42                 <skipTests>true</skipTests>
43             </configuration>
44         </plugin>
45       </plugins>
46   </build>
47   
48 </project>

      (2). 實例代碼: 你們能夠參考這片文章的  http://blog.csdn.net/honglei915/article/details/37563647 中的代碼,粘貼到工程後便可使用,上述文章中的代碼整理後目錄截圖以下:

      

相關文章
相關標籤/搜索