kafka的安裝和使用

1、安裝

一、安裝scala  解壓   tar -zxvf    scala-2.10.4.tgzjava

二、安裝kafka 解壓   tar -zxvf    kafka_2.11-0.9.0.1.tgznode

2、修改kafka/config下的server.properties配置文件以下內容

broker.id=1 不能重複api

host.name=node11(機器的ip)session

advertised.host.name=node11(機器的ip)ide

log.dirs=/usr/local/java/kafkalogs  (kafka的日誌路徑)測試

zookeeper.connect=node22:2181,node33:2181,node44:2181  (zookeeper的集羣地址)this

3、把配置好的kafka copy到其餘三臺機器上  要修改broker.id  host.name  advertised.host.name

4、啓動zookeeper集羣

zookeeper集羣環境搭建  參考    https://my.oschina.net/xiaozhou18/blog/787132spa

5、啓動kafka集羣

/bin/kafka-server-start.sh  /usr/local/java/kafka/config/server.properties &.net

6、測試kafka集羣

在node11建立topic bin/kafka-topics.sh --create --zookeeper node22:2181,node33:2181,node44:2181 --replication-factor 1 --partitions 1 --topic testscala

查看topic     bin/kafka-topics.sh --list --zookeeper node22:2181,node33:2181,node44:2181

在node22上發送消息至kafka, bin/kafka-console-producer.sh --broker-list node11:9092,node33:9092  --sync --topic test  

在node33查看消bin/kafka-console-consumer.sh --zookeeper node22:2181,node33:2181,node44:2181  --topic test-topic --from-beginning

7、用java 簡單操做kafak

一、生產者

package com.xiaozhou.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class Produce extends Thread{
    //定義一個主題
    private static String topic="kafka-topic";
    public Produce(String topic) {
        this.topic = topic;
    }
    @Override
    public void run() {
        Producer producer=createProducer();
        for(int i=0;i<100;i++){
            //發消息
            producer.send(new KeyedMessage(topic, "hello"+i));
        }
    }
    public Producer createProducer(){
        Properties prop=new Properties();
        //往哪幾個broker上發消息
        prop.put("metadata.broker.list", "node22:9092,node33:9092");
        prop.put("serializer.class", "kafka.serializer.StringEncoder");
        ProducerConfig producerConfig = new ProducerConfig(prop);
        Producer producer = new Producer(producerConfig);
        System.out.println(producer);
        return producer;
    }
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Produce p=new Produce(topic);
        p.start();
    }

}
 

二、消費者

package com.xiaozhou.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class Consum {
    private static String topic="kafka-topic";
    public static ConsumerConnector createConsumer(){
          Properties props = new Properties();
           props.put("zookeeper.connect", "node22:2181,node33:2181,node44:2181");
           props.put("group.id", "kafkademo");
           props.put("zookeeper.session.timeout.ms", "40000");
           props.put("zookeeper.sync.time.ms", "200");
           props.put("auto.commit.interval.ms", "1000");
           return Consumer.createJavaConsumerConnector( new ConsumerConfig(props));
        
    }
    public static void main(String[] args) {
        ConsumerConnector consumer = createConsumer();
        Map<String,Integer> map=new HashMap<String, Integer>();
        map.put(topic, 1);
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(map);
        KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
        ConsumerIterator<byte[],byte[]> iterator = stream.iterator();
        while(iterator.hasNext())
        {
            byte[] next = iterator.next().message();
            System.out.println("接受到的數據是"+new String(next));
        }
    }

}  

相關文章
相關標籤/搜索