首先,搞一臺linux的虛機,再去kafka官網下載0.9版。http://kafka.apache.org/downloads.html
html
解壓
java
tar -xzf kafka_2.11-0.9.0.0.tgz cd kafka_2.11-0.9.0.0
啓動單機版zookeeper,使用默認的配置文件。linux
bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...
啓動kafka的一個broker,默認配置也是單機版。apache
bin/kafka-server-start.sh config/server.properties [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...
建立一個maven工程,添加以下依賴:bootstrap
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.6</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> </dependencies>
java producer 代碼:session
package cn.pior.test; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; /** * 消息生產者 * * @author fei.yin * */ public class KafkaProducerTest { public static String topicName = "test-topic-2"; public static void main(String[] args) throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.56.102:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer(props); for (int i = 0; i < 100; i++) { ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>( topicName, Integer.toString(i), Integer.toString(i)); System.out.println(producerRecord); producer.send(producerRecord); } producer.close(); } }
java consumer 代碼:socket
package cn.pior.test; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaConsumerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.56.102:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>( props); consumer.subscribe(Arrays.asList(KafkaProducerTest.topicName)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value()); } } }
代碼中的:maven
props.put("bootstrap.servers", "192.168.56.102:9092");
是配置是kafka系統的地址和端口。code
參考文檔:server
http://kafka.apache.org/documentation.html#uses