本部署使用的版本爲kafka_2.8.0-0.8.0。
參考了http://blog.csdn.net/itleochen/article/details/17451455這篇博文;
並根據官網介紹http://kafka.apache.org/documentation.html#quickstart完成。
廢話少說,直接上步驟
1.下載kafka_2.8.0-0.8.0.tar.gz
https://archive.apache.org/dist/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz
2.解壓縮
tar -vxf kafka_2.8.0-0.8.0.tar.gz
3.修改配置文件
修改conf/server.properties
host.name=192.168.110.129(修改成主機ip,否則服務器返回給客戶端的是主機的hostname,客戶端並不必定可以識別)
修改conf/zookeeper.properties 屬性文件
dataDir=/usr/local/tmp/zookeeper (zookeeper臨時數據文件)
4.啓動zookeeper和kafka
cd bin
啓動zookeeper
./zookeeper-server-start.sh ../config/zookeeper.properties & (&推出命令行,服務守護執行)
啓動kafka
./kafka-server-start.sh ../config/server.properties &
5.驗證是否成功
*建立主題
./kafka-create-topic.sh --partition 1 --replica 1 --zookeeper localhost:2181 --topic test
檢查是否建立主題成功
./kafka-list-topic.sh --zookeeper localhost:2181
*啓動produce
./bin/kafka-console-producer.sh --broker-list 192.168.110.129:9092 --topic test
*啓動consumer
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
6.關閉kafka和zookeeper
./kafka-server-stop.sh ../config/server.properties
./zookeeper-server-stop.sh
心得總結:
1.produce啓動的時候參數使用的是kafka的端口而consumer啓動的時候使用的是zookeeper的端口;
2.必須先建立topic才能使用;
3.topic本質是以文件的形式儲存在zookeeper上的。html
消費者java
package com.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; public class KafkaConsumer { private final ConsumerConnector consumer; private KafkaConsumer() { Properties props = new Properties(); // zookeeper 配置 props.put( "zookeeper.connect", "192.168.110.129:2181" ); // group 表明一個消費組 props.put( "group.id", "jd-group" ); // zk鏈接超時 props.put( "zookeeper.session.timeout.ms", "4000" ); props.put( "zookeeper.sync.time.ms", "200" ); props.put( "auto.commit.interval.ms", "1000" ); props.put( "auto.offset.reset", "smallest" ); // 序列化類 props.put( "serializer.class", "kafka.serializer.StringEncoder" ); ConsumerConfig config = new ConsumerConfig( props ); consumer = kafka.consumer.Consumer.createJavaConsumerConnector( config ); } void consume() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put( KafkaProducer.TOPIC, new Integer( 1 ) ); StringDecoder keyDecoder = new StringDecoder( new VerifiableProperties() ); StringDecoder valueDecoder = new StringDecoder( new VerifiableProperties() ); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams( topicCountMap, keyDecoder, valueDecoder ); KafkaStream<String, String> stream = consumerMap.get( KafkaProducer.TOPIC ).get( 0 ); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext()) System.out.println( it.next().message() ); } public static void main(String[] args) { new KafkaConsumer().consume(); } }
生產者apache
package com.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * Hello world! * */ public class KafkaProducer { private final Producer<String, String> producer; public final static String TOPIC = "TEST-TOPIC"; private KafkaProducer() { Properties props = new Properties(); // 此處配置的是kafka的端口 props.put( "metadata.broker.list", "192.168.110.129:9092" ); // 配置value的序列化類 props.put( "serializer.class", "kafka.serializer.StringEncoder" ); // 配置key的序列化類 props.put( "key.serializer.class", "kafka.serializer.StringEncoder" ); // request.required.acks // 0, which means that the producer never waits for an acknowledgement from the broker (the same // behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees // (some data will be lost when a server fails). // 1, which means that the producer gets an acknowledgement after the leader replica has received the // data. This option provides better durability as the client waits until the server acknowledges the // request as successful (only messages that were written to the now-dead leader but not yet // replicated will be lost). // -1, which means that the producer gets an acknowledgement after all in-sync replicas have received // the data. This option provides the best durability, we guarantee that no messages will be lost as // long as at least one in sync replica remains. props.put( "request.required.acks", "-1" ); producer = new Producer<String, String>( new ProducerConfig( props ) ); } void produce() { int messageNo = 1000; final int COUNT = 2000; while (messageNo < COUNT) { String key = String.valueOf( messageNo ); String data = "hello kafka message " + key; producer.send( new KeyedMessage<String, String>( TOPIC, key, data ) ); // System.out.println( data ); messageNo++; } } public static void main(String[] args) { new KafkaProducer().produce(); } }