一、安裝scala 解壓 tar -zxvf scala-2.10.4.tgzjava
二、安裝kafka 解壓 tar -zxvf kafka_2.11-0.9.0.1.tgznode
broker.id=1 不能重複api
host.name=node11(機器的ip)session
advertised.host.name=node11(機器的ip)ide
log.dirs=/usr/local/java/kafkalogs (kafka的日誌路徑)測試
zookeeper.connect=node22:2181,node33:2181,node44:2181 (zookeeper的集羣地址)this
zookeeper集羣環境搭建 參考 https://my.oschina.net/xiaozhou18/blog/787132spa
/bin/kafka-server-start.sh /usr/local/java/kafka/config/server.properties &.net
在node11建立topic bin/kafka-topics.sh --create --zookeeper node22:2181,node33:2181,node44:2181 --replication-factor 1 --partitions 1 --topic testscala
查看topic bin/kafka-topics.sh --list --zookeeper node22:2181,node33:2181,node44:2181
在node22上發送消息至kafka, bin/kafka-console-producer.sh --broker-list node11:9092,node33:9092 --sync --topic test
在node33查看消bin/kafka-console-consumer.sh --zookeeper node22:2181,node33:2181,node44:2181 --topic test-topic --from-beginning
package com.xiaozhou.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class Produce extends Thread{
//定義一個主題
private static String topic="kafka-topic";
public Produce(String topic) {
this.topic = topic;
}
@Override
public void run() {
Producer producer=createProducer();
for(int i=0;i<100;i++){
//發消息
producer.send(new KeyedMessage(topic, "hello"+i));
}
}
public Producer createProducer(){
Properties prop=new Properties();
//往哪幾個broker上發消息
prop.put("metadata.broker.list", "node22:9092,node33:9092");
prop.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(prop);
Producer producer = new Producer(producerConfig);
System.out.println(producer);
return producer;
}
public static void main(String[] args) {
// TODO Auto-generated method stub
Produce p=new Produce(topic);
p.start();
}
}
package com.xiaozhou.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class Consum {
private static String topic="kafka-topic";
public static ConsumerConnector createConsumer(){
Properties props = new Properties();
props.put("zookeeper.connect", "node22:2181,node33:2181,node44:2181");
props.put("group.id", "kafkademo");
props.put("zookeeper.session.timeout.ms", "40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return Consumer.createJavaConsumerConnector( new ConsumerConfig(props));
}
public static void main(String[] args) {
ConsumerConnector consumer = createConsumer();
Map<String,Integer> map=new HashMap<String, Integer>();
map.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(map);
KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
ConsumerIterator<byte[],byte[]> iterator = stream.iterator();
while(iterator.hasNext())
{
byte[] next = iterator.next().message();
System.out.println("接受到的數據是"+new String(next));
}
}
}