一、 高級消費java
//中止Kafka接收 public void stop(){ try { if (consumer != null) { consumer.shutdown(); consumer=null; isRunning=false; } } catch (Exception e) { LOGGER.warn("shutdown KafkaGroupConsumer failed", e); } try { if (executor != null) { executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS); LOGGER.info("shutdown KafkaGroupConsumer successfully"); executor=null; } } catch (Exception e) { LOGGER.warn("shutdown KafkaGroupConsumer failed", e); } LOGGER.info("shutdown KafkaGroupConsumer successfully"); } public void run() { synchronized (this) { if (consumer != null) { return; } try { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, a_numThreads); Properties props = new Properties(); props.put("zookeeper.connect", zk); props.put("group.id", consumerId); // 手動提交 props.put("enable.auto.commit", "false"); // 序列化類 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig consumerConfig = new ConsumerConfig(props); consumer = kafka.consumer.Consumer .createJavaConsumerConnector(consumerConfig); // 指定數據的解碼器 StringDecoder keyDecoder = new StringDecoder( new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder( new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer .createMessageStreams(topicCountMap, keyDecoder, valueDecoder); List<KafkaStream<String, String>> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(a_numThreads); int threadNumber = 0; for (final KafkaStream<String, String> stream : streams) { executor.submit(new ConsumerKafkaStreamProcesser(stream, threadNumber)); threadNumber++; } } catch (Exception e) { consumer=null; LOGGER.error("KafkaGroupConsumer {}", e); } } } /** * Kafka消費者數據處理線程 */ public class ConsumerKafkaStreamProcesser implements Runnable { // Kafka數據流 private KafkaStream<String, String> stream; // 線程ID編號 private int threadNumber; public ConsumerKafkaStreamProcesser(KafkaStream<String, String> stream, int threadNumber) { this.stream = stream; this.threadNumber = threadNumber; } @Override public void run() { // 1. 獲取數據迭代器 ConsumerIterator<String, String> iter = this.stream.iterator(); // 2. 迭代輸出數據 while (iter.hasNext()) { MessageAndMetadata<String, String> value = iter.next(); LOGGER.info( "{},KafkaGroupConsumer msg key:{},value:{},offset:{}", this.threadNumber, value.key(), value.message(), value.offset()); synchronized (consumer){ consumer.commitOffsets(); } } } }