依賴包:html
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.9.0.0</version> </dependency>
貌似新版本0.9修改Consumer相關的api,和0.9之前的實現方式不一樣,統一了consumer API的實現,詳情可見http://kafka.apache.org/documentation.html#consumerapigit
生產者的實現:github
public class SimpleProducer { public void send() { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.52.128:9092"); //每次請求成功提交,保證消息發送成功 props.put("acks", "all"); //重試次數 props.put("retries", 1); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //消息發送到某個分區上,默認org.apache.kafka.clients.producer.internals.DefaultPartitioner // props.put("partitioner.class", "com.test.kafka.simple.SimplePartitioner"); Producer<String, String> producer = new KafkaProducer(props); for(int i = 0; i < 100; i++) { producer.send(new ProducerRecord<String, String>("mykafka", "mykafka" + Integer.toString(i), "hello kafka " + Integer.toString(i))); } producer.close(); } public static void main(String[] args) { new SimpleProducer().send(); } }
消費者實現:spring
public class SimpleConsumer { private static final Logger logger = LoggerFactory.getLogger(SimpleConsumer.class); public void poll() { KafkaConsumer<String, String> consumer = null; try { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.52.128:9092"); //設置按期提交offset,也能夠手動調用KafkaConsumer.commitSync()方法提交 props.put("enable.auto.commit", true); props.put("auto.commit.interval.ms", "5000"); //心跳檢測,檢測session鏈接,間隔時間應該小於session-time-out,建議配置不大於1/3 session-time-out props.put("heartbeat.interval.ms", "5000"); props.put("session.timeout.ms", "30000"); props.put("group.id", "test-consumer-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<String, String>(props); Map<String, List<PartitionInfo>> topics = consumer.listTopics(); consumer.subscribe(Arrays.asList("mykafka")); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); System.out.println(); } } } finally { consumer.close(); } } public static void main(String[] args) { new SimpleConsumer().poll(); } }
其中能夠自定義本身的序列化的實現以及消息發送到partitioner的方式:apache
public class SimplePartitioner implements Partitioner { public SimplePartitioner (VerifiableProperties props) { } @Override public int partition(Object o, int i) { int partition = 0; String stringKey = (String) o; int offset = stringKey.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt( stringKey.substring(offset+1)) % i; } return partition; } }
以上是基於官方demo的例子實現的;spring也提供了spring-integration-kafka對Kafka的集成,詳情可參見https://github.com/spring-projects/spring-integration-kafkabootstrap