kafka安裝以及入門demo

jdk:1.6.0_25 64位html

kafka:2.9.2-0.8.2.1java

kafka 官方下載地址 http://apache.fayea.com/kafka/0.8.2.1/kafka_2.9.2-0.8.2.1.tgzapache

 

tar -zxvf kafka_2. 9.2- 0.8. 2.1.tgz -C /usr/local/ &&  mv kafka_2. 9.2- 0.8. 2.1 kafka
cd /usr/local/kafka
vi config/zookeeper.properties

dataDir=/usr/local/kafka/zookeeper

vi config/server.properties
broker. id= 0
port= 9092
hostname= 192.168. 194.110
log.dirs=/usr/local/kafka/kafka-logs
zookeeper.connect= 192.168. 194.110: 2181

 啓動zookeeperapi

bin/zookeeper-server-start.sh config/zookeeper.properties  session

啓動kafka brokerapp

bin/kafka-server-start.sh config/server.properties &dom

查看啓動狀態async

jps
14867 QuorumPeerMain ###存在表明zookeeper服務啓動正常

14919 Kafka ###表明kafka broker啓動成功 maven

關閉防火牆 測試

service iptables stop

單機版kafka producer consumer 消息發送和接收測試

bin/kafka-console-producer.sh --broker-list 192.168.194.110:9092 --topic test ###啓動producer 

bin/kafka-console-consumer.sh --zookeeper 192.168.194.110:2181 --topic test --from-beginning 

而後在producer端 輸入須要發送的msg內容 回車 看consumer端是否接收到消息

[2015-09-11 13:58:00,470] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
    at scala.collection.immutable.Stream.foreach(Stream.scala:526)
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

producer端出現此異常 請將producer監聽的broker 使用ip地址 不要用localhost  

到此 單機版kafka已搭建並測試成功了

使用java來調用kafka的producer發消息和consumer來發消息

maven添加依賴
1  < dependency >  
2          < groupId >org.apache.kafka </ groupId >  
3          < artifactId >kafka_2.10 </ artifactId >  
4          < version >0.8.2.0 </ version >  
5      </ dependency > 

 kafka消息生產者KafkaProducer

import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


public  class KafkaProducer  extends Thread {
      
      
      
     public  static  void main(String[] args) {  
        Properties props =  new Properties();
        props.put("metadata.broker.list", "192.168.194.110:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        
         // 配置key的序列化類
        ProducerConfig config =  new ProducerConfig(props);
        Producer<String, String> producer =  new Producer<String, String>(config);
        Random rnd =  new Random();
         
         long runtime =  new Date().getTime();
         
        String ip = "192.168.2." + rnd.nextInt(255);
         
        String msg = runtime + ",www.example.com," + ip;
        KeyedMessage<String, String> data =  new KeyedMessage<String, String>("test", "test-key",msg);
        producer.send(data);
    }  
       
}  

kafka消息消費者 KafkaConsumer 

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;


public  class KafkaConsumer{
      
      
     private  static ConsumerConnector consumer =  null;
     public  static  void main(String[] args) {  
         Properties props =  new Properties();
          // zookeeper 配置
         props.put("zookeeper.connect", "192.168.194.110:2181");

          // group 表明一個消費組
         props.put("group.id", "jd-group");

          // zk鏈接超時
         props.put("zookeeper.session.timeout.ms", "4000");
         props.put("zookeeper.sync.time.ms", "200");
         props.put("auto.commit.interval.ms", "1000");
         props.put("auto.offset.reset", "smallest");
          // 序列化類
         props.put("serializer.class", "kafka.serializer.StringEncoder");

         ConsumerConfig config =  new ConsumerConfig(props);

         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
         
         Map<String, Integer> topicCountMap =  new HashMap<String, Integer>();
         topicCountMap.put("test",  new Integer(1));

         StringDecoder keyDecoder =  new StringDecoder( new VerifiableProperties());
         StringDecoder valueDecoder =  new StringDecoder( new VerifiableProperties());

         Map<String, List<KafkaStream<String, String>>> consumerMap = 
                 consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
         KafkaStream<String, String> stream = consumerMap.get("test").get(0);
         ConsumerIterator<String, String> it = stream.iterator();
          while (it.hasNext())
             System.out.println(it.next().message());
    }  
       
}  

 分別啓動producer 和consumer 便可進行簡單的消息發送和接收

 結果:

log4j:WARN No appenders could be found  for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http: // logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 1441952197141,www.example.com,192.168.2.86
相關文章
相關標籤/搜索