本文主要講一下怎麼簡單使用kafka0.10 client去收發消息html
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency>
若是你是使用log4j的話,那能夠不用excludedocker
@Test public void send(){ Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,broker); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, 3); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 1000000; i++) { ProducerRecord record = new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(metadata != null) { System.out.printf("Send record partition:%d, offset:%d, keysize:%d, valuesize:%d %n", metadata.partition(), metadata.offset(), metadata.serializedKeySize(), metadata.serializedValueSize()); } if(exception != null) { exception.printStackTrace(); } } }); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } producer.close(); }
@Test public void receive(){ Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); try{ consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(10000); records.forEach(record -> { System.out.printf("client : %s , topic: %s , partition: %d , offset = %d, key = %s, value = %s%n", clientId, record.topic(), record.partition(), record.offset(), record.key(), record.value()); }); } }catch (Exception e){ e.printStackTrace(); }finally { consumer.close(); } }
能夠看到跟0.8版本的不同,不須要topicCountMap了
This client transparently handles the failure of Kafka brokers, and transparently adapts as topic partitions it fetches migrate within the cluster.
The consumer is not thread-safe.apache
我的比較傾向第一個方案,topic的partition有多少個,consumer應用就起多少個實例
對於吞吐量大,又要加速處理消費速度的,那就加上第三個方案api