既然翻到這裏,默認就認爲已經基本掌握了Kafka的基礎知識,本小結主要給出一次使用Kafka Consumer API的示例。咱們都知道Kafka API有舊版(0.8版以前)和新版(0.9版以後),這裏講的是新版,官網KafkaConsumer有更詳細介紹,可自行前往~html
pom.xml文件java
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.kafka.test</groupId> <artifactId>test</artifactId> <version>1.0-SNAPSHOT</version> <name>maven-kafka</name> <url>http://maven.apache.org</url> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.2</version> </dependency> <dependency> <groupId><org.apache.kafka/groupId> <artifactId>kafka_2.11</artifactId> <version><0.10.2.2/version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>org.xerial.snappy</groupId> <artifactId>snappy-java</artifactId> <version>1.1.2.6</version> </dependency> </dependencies> </project>
kafka consumer消費數據,未SASL認證,這裏的代碼只能消費生產者正在推送的數據:apache
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; public class KafkaConsumerTest { public static void main(String[] args) throws Exception { // Kafka consumer configuration settings String topicName = "XXXX"; Properties props = new Properties(); props.put("bootstrap.servers", "make.kafka.com:9092,make.kafka.com:9093,make.kafka.com:9094"); props.put("group.id", "XXXX"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("offsets.storage", "kafka"); // 要發送自定義對象,須要指定對象的反序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); @SuppressWarnings("resource") final KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props); consumer.subscribe(Arrays.asList(topicName)); while (true) { ConsumerRecords<String, Object> records = consumer.poll(100); for (ConsumerRecord<String, Object> record : records) { System.out.println(record.value()); } } } }
kafka consumer消費數據,SASL認證,能夠消費歷史數據:json
import java.io.File; import java.io.FileOutputStream; import java.io.PrintStream; import java.util.Arrays; import java.util.Collection; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; public class KafkaConsumerTest { public static void main(String[] args) throws Exception { String topicName = "XXXX"; Properties props = new Properties(); System.setProperty("java.security.auth.login.config", "/opt/kafka/kafka1/kafka_2.11-0.10.2.2/config/kafka_client_jaas.conf"); // 環境變量添加,須要輸入配置文件的路徑 props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); props.put("bootstrap.servers", "make.kafka.com:9092,make.kafka.com:9093,make.kafka.com:9094"); props.put("group.id", "XXXX"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("offsets.storage", "kafka"); props.put("max.poll.records",1000); // 要發送自定義對象,須要指定對象的反序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); @SuppressWarnings("resource") final KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props); consumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> collection) { } @Override public void onPartitionsAssigned(Collection<TopicPartition> collection) { Map<TopicPartition,Long> beginningOffset = consumer.beginningOffsets(collection); //--from-beginning for(Map.Entry<TopicPartition,Long> entry : beginningOffset.entrySet()){ //seekToBeginning() consumer.seekToBeginning(collection); } } }); while (true) { ConsumerRecords<String, Object> records = consumer.poll(20000); for (ConsumerRecord<String, Object> record : records) { System.out.println(record.value()); } } } }
另外,選擇把kafka做爲消息的中間件,主要是拿到數據持久化到本地或者HDFS待分析挖掘出重要的信息,能夠使用Sparkstreaming存到HDFS,這裏給出從控制檯信息存到了本地磁盤。bootstrap
import java.io.*; public class IO2File { public static void main(String[] args) throws IOException{ File f = new File("out.json"); f.createNewFile(); FileOutputStream fileOutputStream = new FileOutputStream(f); PrintStream printStream = new PrintStream(fileOutputStream); System.setOut(printStream); System.out.println("xxxxxxx out.json");// 結合上面的代碼 直接把kafka消息的信息打印到控制檯 而後存到磁盤 } }