這裏簡單展現一下如何使用kafka0.8的client去消費一個topic。apache
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.2</version> </dependency>
Properties props = new Properties(); props.put("zookeeper.connect", zk); // props.put("auto.offset.reset","smallest"); props.put("group.id",group); props.put("zookeeper.session.timeout.ms", "10000"); props.put("zookeeper.sync.time.ms", "2000"); props.put("auto.commit.interval.ms", "10000"); props.put("consumer.timeout.ms","10000"); //設置ConsumerIterator的hasNext的超時時間,不設置則永遠阻塞直到有新消息來 props.put(org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range"); ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props); ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, consumerCount); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector .createMessageStreams(topicCountMap);
consumerMap.get(topic).stream().forEach(stream -> { pool.submit(new Runnable() { @Override public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); //it.hasNext()取決於consumer.timeout.ms的值,默認爲-1 try{ while (it.hasNext()) { System.out.println(Thread.currentThread().getName()+" hello"); //是hasNext拋出異常,而不是next拋出 System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message())); } }catch (ConsumerTimeoutException e){ e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" end"); } }); });
消費者實例數*每一個實例的消費線程數 <= topic的partition數量,不然多餘的就浪費了。session