開發簡單的Kafka應用

  以前基於集羣和單機安裝過kafka,如今利用kafka提供的API構建一個簡單的生產者消費者的項目示例,來跑通kafka的流程,具體過程以下:java

  首先使用eclipse for javaee創建一個maven項目,而後在pom.xml添加以下依賴配置:apache

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.9.2</artifactId>
        <version>0.8.2.2</version>
    </dependency>

  這裏kafka版本是kafka_2.9.2-0.8.2.2,保存以後maven會自動下載依賴,注意要關閉windows防火牆,儘可能專用網絡和外網都要關閉,不然下載的很慢,下載好以後就能夠編寫項目代碼了,這裏的pom.xml全部配置以下:vim

 1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 2   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 3   <modelVersion>4.0.0</modelVersion>
 4 
 5   <groupId>kafkatest</groupId>
 6   <artifactId>kafkatest</artifactId>
 7   <version>0.0.1-SNAPSHOT</version>
 8   <packaging>jar</packaging>
 9 
10   <name>kafkatest</name>
11   <url>http://maven.apache.org</url>
12 
13   <properties>
14     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15   </properties>
16 
17   <dependencies>
18     <dependency>
19       <groupId>junit</groupId>
20       <artifactId>junit</artifactId>
21       <version>3.8.1</version>
22       <scope>test</scope>
23     </dependency>
24     <dependency>
25         <groupId>org.apache.kafka</groupId>
26         <artifactId>kafka_2.9.2</artifactId>
27         <version>0.8.2.2</version>
28     </dependency>
29   </dependencies>
30 </project>

  而後,咱們創建一個簡單生產者類SimpleProducer,代碼以下:windows

 1 package test;
 2 
 3 import java.util.Properties;
 4 
 5 import kafka.javaapi.producer.Producer;
 6 import kafka.producer.KeyedMessage;
 7 import kafka.producer.ProducerConfig;
 8 
 9 public class SimpleProducer {
10         private static Producer<Integer,String> producer;
11         private final Properties props=new Properties();
12         public SimpleProducer(){
13                 //定義鏈接的broker list
14                 props.put("metadata.broker.list", "192.168.1.216:9092");
15                 //定義序列化類 Java中對象傳輸以前要序列化
16                 props.put("serializer.class", "kafka.serializer.StringEncoder");
17                 producer = new Producer<Integer, String>(new ProducerConfig(props));
18         }
19         public static void main(String[] args) {
20                 SimpleProducer sp=new SimpleProducer();
21                 //定義topic
22                 String topic="mytopic";
23                 
24                 //定義要發送給topic的消息
25                 String messageStr = "This is a message";
26                 
27                 //構建消息對象
28                 KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
29          
30                 //推送消息到broker
31                 producer.send(data);
32                 producer.close();
33         }
34 }

  類的代碼很簡單,我這裏是kafka單機環境端口就是kafka broker端口9092,這裏定義topic爲mytopic固然能夠本身隨便定義不用考慮服務器是否建立,對於發送消息的話上面代碼是簡單的單條發送,若是發送數據量很大的話send方法屢次推送會耗費時間,因此建議把data數據按必定量分組放到List中,最後send一下AarrayList便可,這樣速度會大幅度提升api

  接下來寫一個簡單的消費者類SimpleHLConsumer,代碼以下:服務器

 1 package test;
 2 
 3 import java.util.HashMap;
 4 import java.util.List;
 5 import java.util.Map;
 6 import java.util.Properties;
 7 
 8 import kafka.consumer.Consumer;
 9 import kafka.consumer.ConsumerConfig;
10 import kafka.consumer.ConsumerIterator;
11 import kafka.consumer.KafkaStream;
12 import kafka.javaapi.consumer.ConsumerConnector;
13 
14 public class SimpleHLConsumer {
15         private final ConsumerConnector consumer;
16         private final String topic;
17 
18         public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
19                 Properties props = new Properties();
20                 //定義鏈接zookeeper信息
21                 props.put("zookeeper.connect", zookeeper);
22                 //定義Consumer全部的groupID
23                 props.put("group.id", groupId);
24                 props.put("zookeeper.session.timeout.ms", "500");
25                 props.put("zookeeper.sync.time.ms", "250");
26                 props.put("auto.commit.interval.ms", "1000");
27                 consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
28                 this.topic = topic;
29         }
30 
31         public void testConsumer() {
32                 Map<String, Integer> topicCount = new HashMap<String, Integer>();
33                 //定義訂閱topic數量
34                 topicCount.put(topic, new Integer(1));
35                 //返回的是全部topic的Map
36                 Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
37                 //取出咱們要須要的topic中的消息流
38                 List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
39                 for (final KafkaStream stream : streams) {
40                         ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
41                         while (consumerIte.hasNext())
42                                 System.out.println("Message from Topic :" + new String(consumerIte.next().message()));
43                 }
44                 if (consumer != null)
45                         consumer.shutdown();
46         }
47 
48         public static void main(String[] args) {
49                 String topic = "mytopic";
50                 SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.1.216:2181/kafka", "testgroup", topic);
51                 simpleHLConsumer.testConsumer();
52         }
53 
54 }

  消費者代碼主要邏輯就是對生產者發送過來的數據作簡單處理和輸出,注意這裏的地址是zookeeper的地址而且包括節點/kafka,topic名稱要一致網絡

  上面兩個類已經能夠實現消息的生產和消費了,可是如今服務器須要作必定的配置才能夠,不然會拋出異常,就是在以前配置的server.properties基礎之上進行修改,進入kafka安裝目錄下,使用命令 vim config/server.properties 打開配置文件,找到host.name這個配置,首先去掉前面的#註釋,而後把默認的localhost改爲IP地址192.168.1.216,由於eclipse遠程運行代碼時讀取到localhost再執行時就是提交到本地了,因此會拋出異常,固然把代碼打成jar包在服務器運行就不會出現這樣的問題了,這裏要注意:session

  

  修改以後保存並退出,而後確保zookeeper的正常運行eclipse

  若是以前kafka正在運行,那麼就執行 bin/kafka-server-stop.sh  中止kafka服務,而後再執行maven

   nohup bin/kafka-server-start.sh config/server.properties >> /dev/null & 啓動服務,若是原來就是中止的,那麼直接啓動便可

  啓動以後先運行啓動消費者,消費者處於運行等待

  

  而後啓動生產者發送消息,生產者發送完成當即關閉,消費者消費輸出以下:

  

  到這裏,就完成了kafka從生產到消費簡單示例的開發,消息隊列能夠跑通了

相關文章
相關標籤/搜索