jdk:1.6.0_25 64位html
kafka:2.9.2-0.8.2.1java
kafka 官方下載地址 http://apache.fayea.com/kafka/0.8.2.1/kafka_2.9.2-0.8.2.1.tgzapache
tar -zxvf kafka_2.
9.2-
0.8.
2.1.tgz -C /usr/local/ &&
mv kafka_2.
9.2-
0.8.
2.1 kafka
cd /usr/local/kafka
vi config/zookeeper.properties
dataDir=/usr/local/kafka/zookeeper
vi config/server.properties
broker.
id=
0
port=
9092
hostname=
192.168.
194.110
log.dirs=/usr/local/kafka/kafka-logs
zookeeper.connect=
192.168.
194.110:
2181
啓動zookeeperapi
bin/zookeeper-server-start.sh config/zookeeper.properties session
啓動kafka brokerapp
bin/kafka-server-start.sh config/server.properties &dom
查看啓動狀態async
jps
14867 QuorumPeerMain ###存在表明zookeeper服務啓動正常
14919 Kafka ###表明kafka broker啓動成功 maven
關閉防火牆 測試
service iptables stop
單機版kafka producer consumer 消息發送和接收測試
bin/kafka-console-producer.sh --broker-list 192.168.194.110:9092 --topic test ###啓動producer
bin/kafka-console-consumer.sh --zookeeper 192.168.194.110:2181 --topic test --from-beginning
而後在producer端 輸入須要發送的msg內容 回車 看consumer端是否接收到消息
[2015-09-11 13:58:00,470] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:526)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
producer端出現此異常 請將producer監聽的broker 使用ip地址 不要用localhost
到此 單機版kafka已搭建並測試成功了
使用java來調用kafka的producer發消息和consumer來發消息
maven添加依賴
1
<
dependency
>
2
<
groupId
>org.apache.kafka
</
groupId
>
3
<
artifactId
>kafka_2.10
</
artifactId
>
4
<
version
>0.8.2.0
</
version
>
5
</
dependency
>
kafka消息生產者KafkaProducer
import java.util.Date;
import java.util.Properties;
import java.util.Random;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public
class KafkaProducer
extends Thread {
public
static
void main(String[] args) {
Properties props =
new Properties();
props.put("metadata.broker.list", "192.168.194.110:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
//
配置key的序列化類
ProducerConfig config =
new ProducerConfig(props);
Producer<String, String> producer =
new Producer<String, String>(config);
Random rnd =
new Random();
long runtime =
new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com," + ip;
KeyedMessage<String, String> data =
new KeyedMessage<String, String>("test", "test-key",msg);
producer.send(data);
}
}
kafka消息消費者 KafkaConsumer
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
public
class KafkaConsumer{
private
static ConsumerConnector consumer =
null;
public
static
void main(String[] args) {
Properties props =
new Properties();
//
zookeeper 配置
props.put("zookeeper.connect", "192.168.194.110:2181");
//
group 表明一個消費組
props.put("group.id", "jd-group");
//
zk鏈接超時
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
//
序列化類
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig config =
new ConsumerConfig(props);
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topicCountMap =
new HashMap<String, Integer>();
topicCountMap.put("test",
new Integer(1));
StringDecoder keyDecoder =
new StringDecoder(
new VerifiableProperties());
StringDecoder valueDecoder =
new StringDecoder(
new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
KafkaStream<String, String> stream = consumerMap.get("test").get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext())
System.out.println(it.next().message());
}
}
分別啓動producer 和consumer 便可進行簡單的消息發送和接收
結果:
log4j:WARN No appenders could be found
for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http:
//
logging.apache.org/log4j/1.2/faq.html#noconfig for more info. 1441952197141,www.example.com,192.168.2.86