import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;java
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;apache
public class MyConsumer {
private static KafkaConsumer<String,String> consumer;
private static Properties kfkProperties;bootstrap
static{
kfkProperties = new Properties();
kfkProperties.put("bootstrap.servers","slave1:9092");
kfkProperties.put("group.id","kafkatest");
kfkProperties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kfkProperties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
}異步
/**
* consumer 1 : 自動提交位移
*/
private static void generalConsumerMessageAutoCommit(){
kfkProperties.put("enable.auto.commit",true);
consumer = new KafkaConsumer<>(kfkProperties);
// consumer.subscribe(Collections.singletonList("kafkatest"));
String topic = "kafkatest";
TopicPartition partition = new TopicPartition(topic, 1);
List<TopicPartition> lists=new ArrayList<TopicPartition>();
lists.add(partition);
consumer.assign(lists);
consumer.seekToBeginning(lists);async
// consumer.seek(partition, 0);
try{
while(true){函數
ConsumerRecords<String,String> records = consumer.poll(8000);
for(ConsumerRecord<String,String> record : records){
System.out.println(record.timestamp() + "," +record.topic() + "," + record.partition() + "," + record.offset() + " " + record.key() +"," + record.value());
}
try{
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}測試
}server
}finally {
consumer.close();
}
}get
/**
* consumer 2 : 手動提交位移
*/
private static void generalConsumerMessageManualCommitSync() {
kfkProperties.put("enable.auto.commit",false);
consumer = new KafkaConsumer<>(kfkProperties);
consumer.subscribe(Collections.singletonList("kafkatest"));kafka
while(true){
ConsumerRecords<String,String> records = consumer.poll(80);
for(ConsumerRecord<String,String> record : records){
System.out.println(record.timestamp() + "," +record.topic() + "," + record.partition() + "," + record.offset() + " " + record.key() +"," + record.value());
}
try{
consumer.commitSync();
}catch (CommitFailedException e){
System.out.println("commit failed msg" + e.getMessage());
}
}
}
/**
* consumer 3 異步提交位移
*/
private static void generalConsumerMessageManualCommitAsync(){
kfkProperties.put("enable.auto.commit",false);
consumer = new KafkaConsumer<>(kfkProperties);
consumer.subscribe(Collections.singletonList("kafkatest"));
while(true){
ConsumerRecords<String,String> records = consumer.poll(80);
for(ConsumerRecord<String,String> record : records){
System.out.println(record.timestamp() + "," +record.topic() + "," + record.partition() + "," + record.offset() + " " + record.key() +"," + record.value());
}
consumer.commitAsync();
}
}
/**
* consumer 4 異步提交位移帶回調
*/
private static void generalConsumerMessageManualCommitAsyncWithCallBack(){
kfkProperties.put("enable.auto.commit",false);
consumer = new KafkaConsumer<>(kfkProperties);
consumer.subscribe(Collections.singletonList("kafkatest"));
while(true){
ConsumerRecords<String,String> records = consumer.poll(80);
for(ConsumerRecord<String,String> record : records){
System.out.println(record.timestamp() + "," +record.topic() + "," + record.partition() + "," + record.offset() + " " + record.key() +"," + record.value());
}
consumer.commitAsync((offsets,e)->{
if(null != e){
System.out.println("commit async callback error" + e.getMessage());
System.out.println(offsets);
}
});
}
}
/**
* consumer 5 混合提交方式
*/
private static void generalMixSyncAndAsyncCommit(){
kfkProperties.put("enable.auto.commit",false);
consumer = new KafkaConsumer<>(kfkProperties);
consumer.subscribe(Collections.singletonList("kafkatest"));
try{
while(true){
ConsumerRecords<String,String> records = consumer.poll(80);
for(ConsumerRecord<String,String> record : records){
System.out.println(record.timestamp() + "," +record.topic() + "," + record.partition() + "," + record.offset() + " " + record.key() +"," + record.value());
}
consumer.commitAsync();
}
}catch (Exception e){
System.out.println("commit async error: " + e.getMessage());
}finally {
try{
consumer.commitSync();
}finally {
consumer.close();
}
}
}
/**
*
* @param args
* 主函數測試
*/
public static void main(String[] args) {
MyConsumer.generalConsumerMessageAutoCommit();
}
}